外排序的一个例子是外归并排序(External merge sort),它读入一些能放在内存内的数据量,在内存中排序后输出为一个顺串(即是内部数据有序的临时文件),处理完所有的数据后再进行归并。比如,要对900MB 的数据进行排序,但机器上只有100 MB的可用内存时,外归并排序按如下方法操作:
1. 二分文件位置,选取每一个文件的枢轴,将每一个文件划分为thread个片段,使得每一个thread处理所有文件片段和相对均衡。
2. 然后用每一个线程各自处理属于他们的K个文件片段,规模为K的最小堆维护K路归并,构造一个大小为k的堆,先将k个节点的头元素插入到堆中,然后每次取出头结点,取出来的元素属于哪个子数组,再添加这个子数组的下一个元素进入堆中,来维护这个堆。这里的排序结果也是最后的排序结果,直接输出到文件。多线程并行处理。
1. 数据并行拆分: (partition_and_sort)
2. k路归并堆排序:(heapsort)
构造一个大小为k的最小二叉堆,先将k个节点的头元素插入到堆中,然后每次取出头结点,取出来的元素属于哪个子数组,再添加这个子数组的下一个元素进入堆中,来维护这个堆。这里的排序结果也是最后的排序结果,减少IO操作
3. 缓冲区(buffer)
如果将每个节点的最小值放入内存,例如2,1,3,4放入内存,但是把最小值1拿掉之后需要补充一个元素,将外部内存的2拿到内存里来,可是外部内存可能在硬盘或网络,此过程相比内存操作会慢很多,不断读取外部内存效率很低,所以采用缓冲区,每次读取k个节点前部分数据到内存缓冲区(几k或几M)。
1 #include <stdio.h>
2 #include <cstring>
3 #include <string>
4 #include <atomic>
5 #include <queue>
6 #include <vector>
7 #include <Windows.h>
8 #include <ppl.h>
9 #include <functional>
10 #include <io.h>
11 #include <time.h>
12 #define MAX_THREADS 4
13 #define MAX_K 100
14 using namespace std;
15 using namespace concurrency;
16 const int dx = 20;//并行快速排序的dx优化
17 const long long PARTITION_SIZE = 100000000;
18 const long long BUFFER_SIZE = 200000;
19 const long long EACH_NUM = (PARTITION_SIZE / MAX_THREADS);
20 int parts, heapsize[MAX_THREADS];
21 long long data_size;
22 mutex m;
23 typedef pair<int, int> node; // (int,文件id)
24 // 每一个线程维护的最小堆,堆大小是文件数,K路归并
25 node heap[MAX_THREADS][MAX_K + 10];
26
27 void parallel_qsort(int *begin, int *end) {//并行快速排序
28 if (begin >= end - 1) return;
29 int *key = rand() % (end - begin) + begin;
30 swap(*key, *begin);
31 int *i = begin, *j = begin;
32 for (key = begin; j < end; j++) {
33 if (*j < *key) {
34 i++;
35 swap(*i, *j);
36 }
37 }
38 swap(*begin, *i);
39 if (i - begin > dx && end - i > dx) {//dx优化
40 parallel_for(0, 2, [&](int x) {
41 if (x) parallel_qsort(begin, i);
42 else parallel_qsort(i + 1, end);
43 });
44 } else {
45 parallel_qsort(begin, i);
46 parallel_qsort(i + 1, end);
47 }
48 }
49
50 // 添加新元素,向上找到合适的插入位置
51 inline void up(int idx, int x) {
52 int fa = x >> 1; node tmp = heap[idx][x];
53 while (fa) {
54 if (tmp < heap[idx][fa])//cmp
55 heap[idx][x] = heap[idx][fa];
56 else break;
57 x = fa; fa = x >> 1;
58 }
59 heap[idx][x] = tmp;
60 }
61
62 // 向下找到合适的插入位置
63 inline void down(int idx, int x) {
64 int ch = x << 1; node tmp = heap[idx][x];
65 while (ch <= heapsize[idx]) {
66 if (ch < heapsize[idx] && heap[idx][ch + 1] < heap[idx][ch]) ch++;//cmp
67 if (heap[idx][ch] < tmp)//cmp
68 heap[idx][x] = heap[idx][ch];
69 else break;
70 x = ch; ch = x << 1;
71 }
72 heap[idx][x] = tmp;
73 }
74
75 inline void push(int idx, node val) { // 向最小堆插入元素
76 heap[idx][++heapsize[idx]] = val;
77 up(idx, heapsize[idx]);
78 }
79 inline node top(int idx) { return heap[idx][1]; }
80 inline void pop(int idx) { // pop堆顶最小元素
81 heap[idx][1] = heap[idx][heapsize[idx]--];
82 down(idx, 1);
83 }
84
85 inline void ch_size(string file_name, fpos_t size) {
86 FILE *fout = fopen(file_name.c_str(), "wb");
87 _chsize_s(fileno(fout), size * sizeof(int));
88 fclose(fout);
89 }
90 inline int seek_dat(FILE* &f, fpos_t pos) {
91 int *get = new int;
92 pos *= sizeof(int);
93 fsetpos(f, &pos);
94 fread(get, sizeof(int), 1, f);
95 int tmp = *get; delete get;
96 return tmp;
97 }
98
99 void partition_and_sort(string in_file) {
100 int *arr = new int[PARTITION_SIZE];
101 for (long long i = 0; i < (data_size - 1) / PARTITION_SIZE + 1; i++) {
102 atomic_int each_get[MAX_THREADS + 1] = {};
103 string tmp_file = "temp\\part" + to_string(i) + ".dat";
104 clock_t start = clock();
105 cout << "Reading part " << i << "...";
106 parallel_for(0, MAX_THREADS, [&](long long x) {
107 FILE* fin = fopen(in_file.c_str(), "rb");
108 fpos_t pos = (PARTITION_SIZE * i + EACH_NUM * x) * sizeof(int);
109 if (fsetpos(fin, &pos) == 0)
110 each_get[x] = fread(arr + EACH_NUM * x, sizeof(int), EACH_NUM, fin);
111 each_get[MAX_THREADS] += each_get[x];
112 fclose(fin);
113 });
114 cout << "\rSorting part " << i << "...";
115 parallel_qsort(arr, arr + each_get[MAX_THREADS]); // 并行快速排序
116 cout << "\rWriting part " << i << "...";
117 ch_size(tmp_file, each_get[MAX_THREADS]);
118 parallel_for(0, MAX_THREADS, [&](long long x) {
119 FILE* fout = fopen(tmp_file.c_str(), "rb+");
120 fpos_t pos = EACH_NUM * x * sizeof(int);
121 if (fsetpos(fout, &pos) == 0)
122 fwrite(arr + EACH_NUM * x, sizeof(int), each_get[x], fout);
123 fclose(fout);
124 });
125 clock_t end = clock();
126 cout << "\rPart " << i << " established. Time usage = " << end - start << "ms.\n";
127 }
128 delete[] arr;
129 }
130
131 void merge_file() {
132 FILE* fin[MAX_K] = {};
133 fpos_t size[MAX_K] = {}, seek_pos[MAX_THREADS + 1][MAX_K + 1] = {};
134 for (int i = 0; i < parts; i++) {
135 fin = fopen(("temp\\part" + to_string(i) + ".dat").c_str(), "rb");
136 fseek(fin, 0, SEEK_END);
137 fgetpos(fin, &size);
138 size /= sizeof(int); // 有多少数
139 seek_pos[MAX_THREADS][parts] += (seek_pos[MAX_THREADS] = size); // seek_pos[线程id][文件id] = 文件位置
140 }
141 cout << "\nInitializing merging operation...\n";
142 for (long long i = 1; i < MAX_THREADS; i++) {
143 fpos_t l0 = 0, r0 = size[0] - 1;
144 while (r0 - l0 > 1) { // 二分文件0的位置
145 seek_pos[parts] = seek_pos[0] = (l0 + r0) / 2;
146 int get0 = seek_dat(fin[0], seek_pos[0]);
147 for (int idx = 1; idx < parts; idx++) {
148 fpos_t l = 0, r = size[idx];
149 while (r - l > 0) { // 二分其他文件的位置,找到get0
150 seek_pos[idx] = (l + r) / 2;
151 int get = seek_dat(fin[idx], seek_pos[idx]);
152 if (get0 <= get) r = seek_pos[idx];
153 else l = seek_pos[idx] + 1;
154 }
155 seek_pos[parts] += (seek_pos[idx] = r);
156 }
157 // 二分文件0位置的目的是使得分治的较为均衡,所有文件相对片段长度之和接近于 data_size / MAX_THREADS
158 if (seek_pos[parts] * MAX_THREADS < data_size * i) l0 = seek_pos[0] + 1;
159 else r0 = seek_pos[0] - 1;
160 }
161 }
162 for (int i = 0; i < parts; i++) fclose(fin);
163 clock_t start = clock(); atomic_llong all_write = 0;
164 parallel_for(0, MAX_THREADS, [&](int x) { // 线程处理外循环
165 FILE *fin[MAX_K] = {}, *fout = fopen("ans.dat", "rb+");
166 fpos_t fpos = seek_pos[x][parts] * sizeof(int);
167 fsetpos(fout, &fpos);
168 int **buf = new int*[MAX_K + 1];// 开文件数个buffer,K路归并
169 for (int i = 0; i <= MAX_K; i++) buf = new int[BUFFER_SIZE];
170 int pos[MAX_K + 1] = {};//buffer pos
171 fpos_t all[MAX_K] = {};
172 for (int i = 0; i < parts; i++) {
173 fin = fopen(("temp\\part" + to_string(i) + ".dat").c_str(), "rb");
174 fpos = seek_pos[x] * sizeof(int);
175 fsetpos(fin, &fpos);
176 all = seek_pos[x + 1] - seek_pos[x]; // 记录每一个文件一个线程处理的长度
177 fread(buf, sizeof(int), BUFFER_SIZE, fin);
178 }
179 for (int i = 0; i < parts; i++) {
180 // 向最小堆中读入所有文件属于该线程处理的部分的第一个元素
181 push(x, node(buf[0], i)); //(线程id,pair(buffer,文件id))
182 pos = 1; all--;
183 }
184 while (heapsize[x]) {
185 // buf[parts]: k路归并排好序的缓冲区
186 if (pos[parts] == BUFFER_SIZE) {
187 fwrite(buf[parts], sizeof(int), BUFFER_SIZE, fout);
188 all_write += BUFFER_SIZE;
189 if (all_write % 1000000 == 0) {
190 m.lock();
191 cout << "\rStart merging... " << (all_write * 100) / data_size
192 << "% completed.";
193 m.unlock();
194 }
195 pos[parts] = 0;
196 }
197 int bel = top(x).second;
198 buf[parts][pos[parts]++] = top(x).first;
199 if (all[bel]) {
200 heap[x][1] = node(buf[bel][pos[bel]], bel); down(x, 1);// 该buffer的新元素替换heap的顶部最小元素
201 if ((++pos[bel]) == BUFFER_SIZE) {
202 fread(buf[bel], sizeof(int), BUFFER_SIZE, fin[bel]);
203 pos[bel] = 0;
204 }
205 all[bel]--;
206 } else pop(x); // 该文件属于线程x的部分全部处理完了,就直接pop
207 }
208 fwrite(buf[parts], sizeof(int), pos[parts], fout); // 把余下排好序的buffer写入文件
209 cout << "\rStart merging... 100% completed.";
210 for (int i = 0; i < parts; i++) fclose(fin); fclose(fout);
211 for (int i = 0; i < MAX_K; i++) delete[] buf; delete[] buf;
212 });
213 clock_t end = clock();
214 cout << "\nMerging finished. Time usage = " << end - start << "ms.\n";
215 }
216
217
218 int main() {
219 string in_file;
220 cout << "Enter data file name: ";
221 cin >> in_file;
222 FILE* fin = fopen(in_file.c_str(), "rb");
223 if (fin == NULL) {
224 cout << "Could not open that file.\n";
225 main();
226 }
227 clock_t start_time = clock();
228 fseek(fin, 0, SEEK_END);
229 fgetpos(fin, &data_size);
230 data_size /= sizeof(int);
231 parts = (data_size - 1) / PARTITION_SIZE + 1;
232 fclose(fin);
233 cout << "\nPartitioning " << data_size << " elements(int)...\n";
234 system("mkdir temp");
235 parallel_for(0, 2, [&](int x) {
236 if (x) partition_and_sort(in_file);
237 else ch_size("ans.dat", data_size);
238 });
239 merge_file();
240 clock_t end_time = clock();
241 system("rd /s/q temp");
242 cout << "\nExternal sorting complete, result saved to \"ans.dat\".\n"
243 << "Time usage = " << end_time - start_time << "ms.\n";
244 system("pause");
245 return 0;
246 }
简单无堆无缓冲区单线程K路归并版本: https://www.cnblogs.com/this-543273659/archive/2011/07/30/2122083.html