转自:AIfred
问题:
对一个 10GB 的数据文件排序,而计算机内存仅有 4GB
思路:
将整个文件读入内存排序显然不行。可以将这个 10GB 的大文件分区为 100 个 100MB 的小文件,把这些小文件的数据依次读入内存、排序、再输出,于是我们便得到了 100 个各自有序的小文件。接下来再将这 100 个小文件两两归并,便得到了一个有序的大文件,完成了排序操作。在实际中,如果仅仅使用普通串行算法实现,整个程序的效率非常低。尽可能多的并行化会大幅提升效率。
用到知识点:
1. 多线程读写二进制文件
2. 并行归并排序(sort部分并行,merge部分并行)
3. 解决线程冲突(C++11的原子操作)
分区排序
对待排序的大文件,首先需要定义每个分区最多的元素数量partition_size。例如待排序文件中有5500个元素,设partition_size = 1000, 需要创建6个分区,数量分别为5*1000,1*500。然后分别对这个6个分区单独内排序即可。
4-11:对第i 个分区,首先将待排序文件中对应该分区部分的元素并行读入内存
26-31:对第i 个分区,将排序好的对应分区的元素并行写入文件
由于不能将整个文件全部读入内存,只能通过fsetpos 和 fread 函数来读取该位置的元素的值。
1 void partition_and_sort(string in_file, long long n) {
2 int* arr = new int[PARTITION_SIZE]; // 一个分区大小的临时数组
3 // 分区循环
4 for (long long i = 0; i < (n - 1) / PARTITION_SIZE + 1; i++) {
5 // each_get: 分区内一个线程处理大小, all_get: 分区大小
6 int each_get[4] = {}, all_get = 0;
7 clock_t start_time = clock();
8 // 多线程读取一个分区
9 parallel_for(0, MAX_THREADS, [&](long long x) {
10 FILE* fin = fopen(in_file.c_str(), "rb");
11 // 在文件中定位到该线程读取的位置
12 fpos_t pos = (PARTITION_SIZE * i + EACH_NUM * x) * sizeof(int);
13 if (fsetpos(fin, &pos) == 0)
14 // 读取 EACH_NUM个值,返回个数
15 each_get[x] = fread(arr + EACH_NUM * x, sizeof(int), EACH_NUM, fin);
16 fclose(fin);
17 });
18 clock_t end_time = clock();
19 for (int i = 0; i < MAX_THREADS; i++) all_get += each_get;
20 cout << "\nRead " << all_get << " numbers from \"" << in_file << "\". "
21 << "Time usage = " << end_time - start_time << "ms.\n";
22 start_time = clock();
23 parallel_qsort(arr, arr + all_get); // 并行快速排序
24 end_time = clock();
25 cout << "Sorting finished. Time usage = " << end_time - start_time << "ms.\n";
26 string tmp_file = "temp\\part" + to_string(parts++) + ".dat";
27 FILE *fout = fopen(tmp_file.c_str(), "wb");
28 chsize(fileno(fout), all_get * sizeof(int));
29 fclose(fout);
30 start_time = clock();
31 // 对一个分区并行写入文件
32 parallel_for(0, MAX_THREADS, [&](long long x) {
33 FILE* fout = fopen(tmp_file.c_str(), "rb+");
34 fpos_t pos = EACH_NUM * x * sizeof(int);
35 if (fsetpos(fout, &pos) == 0)
36 fwrite(arr + EACH_NUM * x, sizeof(int), each_get[x], fout);
37 fclose(fout);
38 });
39 end_time = clock();
40 cout << "Part " << parts - 1 << " saved to file \"" << tmp_file << "\". "
41 << "Time usage = " << end_time - start_time << "ms.\n";
42 }
43 delete[] arr;
44 }
并行快排:算法导论版快排,一个for循环维护 i , j 区间,[0, i]其值<=key,(i, j)其值>key。
1 void parallel_qsort(int *begin, int *end) {
2 if (begin >= end - 1) return;
3 int *key = rand() % (end - begin) + begin; // 选 pivot
4 swap(*key, *begin);
5 int *i = begin, *j = begin;
6 for (key = begin; j < end; j++) {
7 if (*j < *key) {
8 i++;
9 swap(*i, *j);
10 }
11 }
12 swap(*begin, *i);
13 if (i - begin > dx && end - i > dx) { // 如果两个区间长度均大于dx,并行
14 parallel_for(0, 2, [&](int x) {
15 if (x) parallel_qsort(begin, i);
16 else parallel_qsort(i + 1, end);
17 });
18 } else { // 区间长度较小则不并行
19 parallel_qsort(begin, i);
20 parallel_qsort(i + 1, end);
21 }
22 }
归并:
是外排序核心,整个程序的效率高低取决于该阶段的算法。由于待归并的数据规模较大,使用并行归并算法可以显著提升效率。
并行一般需要分治的思想,需要先找到各对枢轴,将原数据划分成均匀的几部分,再启动多个线程进行归并。
段组分解的基本思想
下面就来讲解一种最简单的基于二分查找的段组分解算法的基本思想。 先看一个实例,有以下两组有序数据,如何将它们进行并行归并呢?
- 第一组:
15 25 33 47 58 59 62 64
- 第二组:
12 18 27 31 36 38 42 80
考虑将第1 组数据以中间位置数据47 作为枢轴数据平分成两段,如下所示:
- 第一组:
(15 25 33 47)(58 59 62 64)
然后再在第2 组数据中使用对半查找方法查找一个刚好小于等于枢轴数据47 的数据42 ,以42 为枢轴将第2 组数据分成两段如下:
- 第二组:
(12 18 27 31 36 38 42)(80)
此时可以发现,第一组的第1 段数据和第二组的第1 段数据均小于第1 组的第2 段或第二组的第2 段数据。 将第一组的第1 段数据和第二组的第1 段数据进行归并,得到以下数据序列:
- 第一段归并:
(12 15 18 25 27 31 33 36 38 42 47)
将第一组的第2 段数据和第二组的第2 段数据进行归并,
很容易发现,上面两段归并后的数据连起来后,自然就形成了一个有序系列。由于上面两次归并操作都是独立的,因此它可以并行地进行。这种可以归并的两个成对数据段就是前面说过的段组。
47 在第一组数据中的位置是3 ;42 在第二组数据中的位置是6 。位置对(3,6) 将上面两组数据分成了两个可以并行归并的段组。
从上面的例子可以看出,在第一组数据的枢轴数据47 确定以后,在第二组数据中使用折半查找方法找到刚好小于等于枢轴数据47 的数据42 ,时间复杂度为O(logn)O(logn)。为了让效率最优化,每个线程的负载应该尽可能均衡,这就要求选取的位置对能够将原数据分成元素数量相对均衡的段组。于是我们也应使用折半查找确定第一组数据的枢轴的合适位置。
嵌套二分求枢轴:时间复杂度O((logn)^2)
下面程序假设为4个线程,找3个枢轴将两个要合并的数组分别分成4段,每一对片段合并后的长度一致,使负载尽可能均衡。
由于不能一次全部将整个文件装进内存,此处涉及磁盘操作,但由于折半查找的效率较高,访问磁盘的次数并不多,开销相对较小。
1 void get_fpos() {
2 long long seek_pos[MAX_THREADS + 1][3] = {}, n1 = size1 / 4, n2 = size2 / 4;
3 for (long long i = 1; i < MAX_THREADS; i++) {//找第i个枢轴
4 long long pos1, pos2, fpos, l1 = 0, r1 = n1 - 1;
5 int *get1 = new int;
6 while (r1 - l1 > 1) {
7 pos1 = (l1 + r1) / 2;//二分查找,先假定file1的枢轴
8 fpos = pos1 * sizeof(int);
9 fsetpos(fin1, &fpos);
10 fread(get1, sizeof(int), 1, fin1);
11 long long l2 = 0, r2 = n2;
12 int* get2 = new int;
13 while (r2 - l2 > 0) {//再用二分查找确定file2的枢轴
14 pos2 = (l2 + r2) / 2;
15 fpos = pos2 * sizeof(int);
16 fsetpos(fin2, &fpos);
17 fread(get2, sizeof(int), 1, fin2);
18 if (*get1 <= *get2)
19 r2 = pos2;
20 else
21 l2 = pos2 + 1;
22 }
23 delete get2; get2 = NULL;
24 pos2 = r2;
25 //如果这两个枢轴将file1和file2划分的不够均匀,则对pos1进行调整
26 if ((pos1 + pos2) * long long(MAX_THREADS) < (n1 + n2) * i)
27 l1 = pos1 + 1;
28 else
29 r1 = pos1 - 1;
30 }
31 delete get1; get1 = NULL;
32 seek_pos[1] = pos1 * sizeof(int);//记录file1枢轴位置
33 seek_pos[2] = pos2 * sizeof(int);//记录file2枢轴位置
34 seek_pos[0] = seek_pos[1] + seek_pos[2];//输出文件枢轴位置
35 }
36 fclose(fin1); fclose(fin2);
37 //边界细节
38 seek_pos[0][0] = seek_pos[0][1] = seek_pos[0][2] = 0;
39 seek_pos[MAX_THREADS][1] = n1 * sizeof(int);
40 seek_pos[MAX_THREADS][2] = n2 * sizeof(int);
41 seek_pos[MAX_THREADS][0] = seek_pos[MAX_THREADS][1] + seek_pos[MAX_THREADS][2];
42 }
完成归并操作的merge_file 函数需要传入两个string 类型的参数,表示需要归并的两组数据的文件名,返回归并后的数据的文件名:
string merge_file(string in_file1, string in_file2) {}
在这个函数中,我们首先需要获取到两个输入文件的大小(从而得到元素数量):
1 void get_size() {
2 FILE* fin1 = fopen(in_file1.c_str(), "rb");
3 FILE* fin2 = fopen(in_file2.c_str(), "rb");
4 fseek(fin1, 0, SEEK_END); // 将文件指针设为文末
5 fseek(fin2, 0, SEEK_END);
6 fpos_t size1 = 0, size2 = 0;
7 fgetpos(fin1, &size1); // 获得文件大小
8 fgetpos(fin2, &size2);
9 }
于是size1 / sizeof(int) 便是第一个文件中的元素数量,size2 / sizeof(int) 便是第二个文件中的元素数量。
下面创建输出文件,并预先调整其大小(为了并行写入):
1 void create_output_file() {
2 string out_file = "temp\\part" + to_string(parts++) + ".dat";
3 FILE *fout = fopen(out_file.c_str(), "wb");
4 _chsize_s(fileno(fout), (n1 + n2) * sizeof(int));
5 }
parts 是一个全局变量,用来统计已经创建的文件数量并作为新文件的文件名。(比如前一阶段 partition 操作将原数据分割为5 个part ,分别命名为part0.dat , part1.dat … part4.dat , 那么在归并阶段形成的新文件将继续命名为 part5.dat , part6.dat … 比如 part0.dat 和part1.dat 归并得到part5.dat , part3.dat 和 part4.dat 归并得到part6.dat …)由于此时可能有多个线程在执行merge_file 函数,同时访问全局变量可能会发生访问冲突,故parts 必须定义为atomic_int 类型,实现原子操作。
归并操作:
1 //多线程归并操作,每个线程只负责归并对应的段组
2 parallel_for(0, MAX_THREADS, [&](int x) {
3 FILE* fin1 = fopen(in_file1.c_str(), "rb");
4 FILE* fin2 = fopen(in_file2.c_str(), "rb");
5 FILE* fout = fopen(out_file.c_str(), "rb+");
6 //根据枢轴位置确定读入起点
7 fsetpos(fin1, &seek_pos[x][1]);
8 fsetpos(fin2, &seek_pos[x][2]);
9 fsetpos(fout, &seek_pos[x][0]);
10 //输入输出缓冲,优化读写性能
11 int *buf0 = new int[BUFFER_SIZE];//输出文件缓冲
12 int *buf1 = new int[BUFFER_SIZE];//fin1缓冲
13 int *buf2 = new int[BUFFER_SIZE];//fin2缓冲
14 int i = 0, j = 0, k = 0;//i, j, k分别是buf1,buf2,buf0的数组下标
15 //该线程应该从file1中读取all1个数据,从file2中读取all2个数据
16 long long all1 = (seek_pos[x + 1][1] - seek_pos[x][1]) / sizeof(int);
17 long long all2 = (seek_pos[x + 1][2] - seek_pos[x][2]) / sizeof(int);
18 //先读取到缓冲区
19 fread(buf1, sizeof(int), BUFFER_SIZE, fin1);
20 fread(buf2, sizeof(int), BUFFER_SIZE, fin2);
21 while (all1 > 0 && all2 > 0) {//归并排序
22 if (buf1 < buf2[j]) {
23 buf0[k++] = buf1[i++]; all1--;
24 if (i == BUFFER_SIZE) {//如果缓冲区读完了,就更新缓冲区,重置i
25 fread(buf1, sizeof(int), BUFFER_SIZE, fin1);
26 i = 0;
27 }
28 } else {
29 buf0[k++] = buf2[j++]; all2--;
30 if (j == BUFFER_SIZE) {
31 fread(buf2, sizeof(int), BUFFER_SIZE, fin2);
32 j = 0;
33 }
34 }
35 if (k == BUFFER_SIZE) {//如果缓冲区写满了,就全部写入文件,重置k
36 fwrite(buf0, sizeof(int), BUFFER_SIZE, fout);
37 k = 0;
38 }
39 }
40 while (all1 > 0) {//归并流程-如果file1中还有剩余数据,直接追加输出
41 buf0[k++] = buf1[i++]; all1--;
42 if (i == BUFFER_SIZE) {
43 fread(buf1, sizeof(int), BUFFER_SIZE, fin1);
44 i = 0;
45 }
46 if (k == BUFFER_SIZE) {
47 fwrite(buf0, sizeof(int), BUFFER_SIZE, fout);
48 k = 0;
49 }
50 }
51 while (all2 > 0) {//归并流程-如果file2中还有剩余数据,直接追加输出
52 buf0[k++] = buf2[j++]; all2--;
53 if (j == BUFFER_SIZE) {
54 fread(buf2, sizeof(int), BUFFER_SIZE, fin2);
55 j = 0;
56 }
57 if (k == BUFFER_SIZE) {
58 fwrite(buf0, sizeof(int), BUFFER_SIZE, fout);
59 k = 0;
60 }
61 }
62 fwrite(buf0, sizeof(int), k, fout);//写入输出文件
63 fclose(fin1);
64 fclose(fin2);
65 fclose(fout);
66 delete[] buf0;
67 delete[] buf1;
68 delete[] buf2;
69 });
在归并操作中,一个非常重要的优化便是输入输出缓冲区。输入缓冲即预先从文件一次性读入BUFFER_SIZE 个数据,之后频繁的读取操作便从缓冲区获取,当缓冲区读完时(即i , j 指向了 buf1 或buf2 的尾端),再一次性从文件中读取 BUFFER_SIZE 个数据,重置i 或j ;输出缓冲即将数据先写入缓冲区,待缓冲区填满后(即k 指向buf0 的尾端)再一次性写入文件。缓冲区的设立,避免了频繁的IO操作,配合多线程可以使读写磁盘速率达到最大,性能大幅提升。
merge_file 函数编写完毕,我们需要递归调用它,直到所有的小分区都两两归并完成。递归的过程可以并行处理(不过由于merge_file 函数本身已做并行化处理,此处串行递归亦可,效率不会太差):
1 string merge(int l, int r) {
2 if (l == r) return "temp\\part" + to_string(l) + ".dat";
3 int mid = (l + r) / 2;
4 string file1, file2;
5 parallel_for(0, 2, [&](int x) {//此处使用串行递归亦可,不过效率略低一点
6 if (x == 0) file1 = merge(l, mid);
7 if (x == 1) file2 = merge(mid + 1, r);
8 });
9 return merge_file(file1, file2);
10 }
整合:
1 #include <iostream>
2 #include <stdio.h>
3 #include <cstring>
4 #include <string>
5 #include <atomic>
6 #include <Windows.h>
7 #include <ppl.h>
8 #include <io.h>
9 #include <time.h>
10 #define MAX_THREADS 4//最多线程数量
11 using namespace std;
12 using namespace concurrency;
13 const int dx = 20;//并行快速排序的dx优化
14 const long long PARTITION_SIZE = 100000000;//分区大小
15 const long long BUFFER_SIZE = 10000000;//输入输出缓冲区大小
16 const long long EACH_NUM = (PARTITION_SIZE / MAX_THREADS);
17 atomic_int parts;
18 void parallel_qsort(int *begin, int *end) {//并行快速排序
19 if (begin >= end - 1) return;
20 int *key = rand() % (end - begin) + begin;
21 swap(*key, *begin);
22 int *i = begin, *j = begin;
23 for (key = begin; j < end; j++) {
24 if (*j < *key) {
25 i++;
26 swap(*i, *j);
27 }
28 }
29 swap(*begin, *i);
30 if (i - begin > dx && end - i > dx) {//dx优化
31 parallel_for(0, 2, [&](int x) {
32 if (x) parallel_qsort(begin, i);
33 else parallel_qsort(i + 1, end);
34 });
35 } else {
36 parallel_qsort(begin, i);
37 parallel_qsort(i + 1, end);
38 }
39 }
40 void partition_and_sort(string in_file, long long n) {
41 int* arr = new int[PARTITION_SIZE];
42 for (long long i = 0; i < (n - 1) / PARTITION_SIZE + 1; i++) {//准备建立第i个分区
43 int each_get[4] = {}, all_get = 0;
44 //并行读取int数据至arr[]中,每个线程计划读取EACH_NUM个,一共计划读取PARTITION_SIZE个
45 //每个线程实际读取each_get[x]个,实际一共读取all_get个
46 clock_t start_time = clock();
47 parallel_for(0, MAX_THREADS, [&](long long x) {
48 FILE* fin = fopen(in_file.c_str(), "rb");
49 fpos_t pos = (PARTITION_SIZE * i + EACH_NUM * x) * sizeof(int);
50 if (fsetpos(fin, &pos) == 0)
51 each_get[x] = fread(arr + EACH_NUM * x, sizeof(int), EACH_NUM, fin);
52 fclose(fin);
53 });
54 clock_t end_time = clock();
55 for (int i = 0; i < MAX_THREADS; i++) all_get += each_get;
56 cout << "\nRead " << all_get << " numbers from \"" << in_file << "\". "
57 << "Time usage = " << end_time - start_time << "ms.\n";
58 //对arr进行并行快速排序
59 start_time = clock();
60 parallel_qsort(arr, arr + all_get);
61 end_time = clock();
62 cout << "Sorting finished. Time usage = " << end_time - start_time << "ms.\n";
63 //创建分区文件并调整大小
64 string tmp_file = "temp\\part" + to_string(parts++) + ".dat";
65 FILE *fout = fopen(tmp_file.c_str(), "wb");
66 chsize(fileno(fout), all_get * sizeof(int));
67 fclose(fout);
68 //并行将arr全部写入分区文件
69 start_time = clock();
70 parallel_for(0, MAX_THREADS, [&](long long x) {
71 FILE* fout = fopen(tmp_file.c_str(), "rb+");
72 fpos_t pos = EACH_NUM * x * sizeof(int);
73 if (fsetpos(fout, &pos) == 0)
74 fwrite(arr + EACH_NUM * x, sizeof(int), each_get[x], fout);
75 fclose(fout);
76 });
77 end_time = clock();
78 cout << "Part " << parts - 1 << " saved to file \"" << tmp_file << "\". "
79 << "Time usage = " << end_time - start_time << "ms.\n";
80 }
81 delete[] arr;
82 }
83 string merge_file(string in_file1, string in_file2) {//将两个文件归并
84 string out_file = "temp\\part" + to_string(parts++) + ".dat";
85 //首先获取两个文件的长度
86 FILE* fin1 = fopen(in_file1.c_str(), "rb");
87 FILE* fin2 = fopen(in_file2.c_str(), "rb");
88 clock_t seekpos_start_time = clock();
89 fseek(fin1, 0, SEEK_END);
90 fseek(fin2, 0, SEEK_END);
91 fpos_t size1 = 0, size2 = 0;
92 fgetpos(fin1, &size1);
93 fgetpos(fin2, &size2);
94 long long seek_pos[MAX_THREADS + 1][3] = {}, n1 = size1 / 4, n2 = size2 / 4;
95 for (long long i = 1; i < MAX_THREADS; i++) {//找第i个枢轴
96 long long pos1, pos2, fpos, l1 = 0, r1 = n1 - 1;
97 int *get1 = new int;
98 while (r1 - l1 > 1) {
99 pos1 = (l1 + r1) / 2;//二分查找,先假定file1的枢轴
100 fpos = pos1 * sizeof(int);
101 fsetpos(fin1, &fpos);
102 fread(get1, sizeof(int), 1, fin1);
103 long long l2 = 0, r2 = n2;
104 int* get2 = new int;
105 while (r2 - l2 > 0) {//再用二分查找确定file2的枢轴
106 pos2 = (l2 + r2) / 2;
107 fpos = pos2 * sizeof(int);
108 fsetpos(fin2, &fpos);
109 fread(get2, sizeof(int), 1, fin2);
110 if (*get1 <= *get2)
111 r2 = pos2;
112 else
113 l2 = pos2 + 1;
114 }
115 delete get2; get2 = NULL;
116 pos2 = r2;
117 //如果这两个枢轴将file1和file2划分的不够均匀,则对pos1进行调整
118 if ((pos1 + pos2) * long long(MAX_THREADS) < (n1 + n2) * i)
119 l1 = pos1 + 1;
120 else
121 r1 = pos1 - 1;
122 }
123 delete get1; get1 = NULL;
124 seek_pos[1] = pos1 * sizeof(int);//记录file1枢轴位置
125 seek_pos[2] = pos2 * sizeof(int);//记录file2枢轴位置
126 seek_pos[0] = seek_pos[1] + seek_pos[2];//输出文件枢轴位置
127 }
128 fclose(fin1); fclose(fin2);
129 //边界细节
130 seek_pos[0][0] = seek_pos[0][1] = seek_pos[0][2] = 0;
131 seek_pos[MAX_THREADS][1] = n1 * sizeof(int);
132 seek_pos[MAX_THREADS][2] = n2 * sizeof(int);
133 seek_pos[MAX_THREADS][0] = seek_pos[MAX_THREADS][1] + seek_pos[MAX_THREADS][2];
134 clock_t seekpos_end_time = clock();
135 //调整输出文件大小
136 clock_t chsize_start_time = clock();
137 FILE *fout = fopen(out_file.c_str(), "wb");
138 _chsize_s(fileno(fout), (n1 + n2) * sizeof(int));
139 fclose(fout);
140 clock_t chsize_end_time = clock();
141 //多线程归并操作,每个线程只负责归并对应的段组
142 clock_t merge_start_time = clock();
143 parallel_for(0, MAX_THREADS, [&](int x) {
144 FILE* fin1 = fopen(in_file1.c_str(), "rb");
145 FILE* fin2 = fopen(in_file2.c_str(), "rb");
146 FILE* fout = fopen(out_file.c_str(), "rb+");
147 //根据枢轴位置确定读入起点
148 fsetpos(fin1, &seek_pos[x][1]);
149 fsetpos(fin2, &seek_pos[x][2]);
150 fsetpos(fout, &seek_pos[x][0]);
151 //输入输出缓冲,优化读写性能
152 int *buf0 = new int[BUFFER_SIZE];
153 int *buf1 = new int[BUFFER_SIZE];
154 int *buf2 = new int[BUFFER_SIZE];
155 int i = 0, j = 0, k = 0;//i, j, k分别是buf1,buf2,buf0的数组下标
156 //该线程应该从file1中读取all1个数据,从file2中读取all2个数据
157 long long all1 = (seek_pos[x + 1][1] - seek_pos[x][1]) / sizeof(int);
158 long long all2 = (seek_pos[x + 1][2] - seek_pos[x][2]) / sizeof(int);
159 //先读取到缓冲区
160 fread(buf1, sizeof(int), BUFFER_SIZE, fin1);
161 fread(buf2, sizeof(int), BUFFER_SIZE, fin2);
162 while (all1 > 0 && all2 > 0) {//归并排序
163 if (buf1 < buf2[j]) {
164 buf0[k++] = buf1[i++]; all1--;
165 if (i == BUFFER_SIZE) {//如果缓冲区读完了,就更新缓冲区,重置i
166 fread(buf1, sizeof(int), BUFFER_SIZE, fin1);
167 i = 0;
168 }
169 } else {
170 buf0[k++] = buf2[j++]; all2--;
171 if (j == BUFFER_SIZE) {
172 fread(buf2, sizeof(int), BUFFER_SIZE, fin2);
173 j = 0;
174 }
175 }
176 if (k == BUFFER_SIZE) {//如果缓冲区写满了,就全部写入文件,重置k
177 fwrite(buf0, sizeof(int), BUFFER_SIZE, fout);
178 k = 0;
179 }
180 }
181 while (all1 > 0) {//归并流程-如果file1中还有剩余数据,直接追加输出
182 buf0[k++] = buf1[i++]; all1--;
183 if (i == BUFFER_SIZE) {
184 fread(buf1, sizeof(int), BUFFER_SIZE, fin1);
185 i = 0;
186 }
187 if (k == BUFFER_SIZE) {
188 fwrite(buf0, sizeof(int), BUFFER_SIZE, fout);
189 k = 0;
190 }
191 }
192 while (all2 > 0) {//归并流程-如果file2中还有剩余数据,直接追加输出
193 buf0[k++] = buf2[j++]; all2--;
194 if (j == BUFFER_SIZE) {
195 fread(buf2, sizeof(int), BUFFER_SIZE, fin2);
196 j = 0;
197 }
198 if (k == BUFFER_SIZE) {
199 fwrite(buf0, sizeof(int), BUFFER_SIZE, fout);
200 k = 0;
201 }
202 }
203 fwrite(buf0, sizeof(int), k, fout);//写入输出文件
204 fclose(fin1);
205 fclose(fin2);
206 fclose(fout);
207 delete[] buf0;
208 delete[] buf1;
209 delete[] buf2;
210 });
211 clock_t merge_end_time = clock();
212 Sleep(100);
213 //输入文件的数据已归并至新文件中,不再需要,删除。
214 system(("del " + in_file1).c_str());
215 system(("del " + in_file2).c_str());
216 cout << "\nPart \"" << in_file1 << "\" and \"" << in_file2 << "\" merged, "
217 << "result saved to \"" << out_file << "\".\n"
218 << "Time usage: seek_pos: " << seekpos_end_time - seekpos_start_time << "ms, "
219 << "chsize: " << chsize_end_time - chsize_start_time << "ms, "
220 << "parallel_merge: " << merge_end_time - merge_start_time << "ms.\n";
221 return out_file;
222 }
223 string merge(int l, int r) {//递归归并操作
224 if (l == r) return "temp\\part" + to_string(l) + ".dat";
225 int mid = (l + r) / 2;
226 string file1, file2;
227 parallel_for(0, 2, [&](int x) {//此处使用串行递归亦可,不过效率略低一点
228 if (x == 0) file1 = merge(l, mid);
229 if (x == 1) file2 = merge(mid + 1, r);
230 });
231 return merge_file(file1, file2);
232 }
233 int main() {
234 string in_file, out_file;
235 cout << "Enter data file name: ";
236 cin >> in_file;
237 FILE* fin = fopen(in_file.c_str(), "rb");
238 if (fin == NULL) {
239 cout << "Could not open that file.\n";
240 main();
241 }
242 clock_t start_time = clock();
243 //获取输入文件的大小
244 fseek(fin, 0, SEEK_END);
245 fpos_t pos = 0;
246 fgetpos(fin, &pos);
247 fclose(fin);
248 //创建临时文件目录
249 system("mkdir temp");
250 //将待排序大文件分区,并对各个小分区进行快速排序
251 partition_and_sort(in_file, pos / 4); // pos是字节数,所以要/4统计int个数
252 cout << "\nStart merging...\n";
253 //将各个小分区归并
254 out_file = merge(0, parts - 1);
255 system(("move " + out_file + " ans.dat").c_str());
256 system("rd temp");
257 clock_t end_time = clock();
258 cout << "External sorting complete, result saved to \"ans.dat\".\n"
259 << "Time usage = " << end_time - start_time << "ms.\n";
260 system("pause");
261 return 0;
262 }
windows下排序 1e10 个int 数据( 37.2GB ),耗时约 30min,经过验证,结果正确。
总结
外排序的算法不难,单线程实现很简单:分区阶段便是读取文本文件、排序、再写到文本文件中;归并阶段也是简单地用单线程读写文本文件,效率极低。后来,多线程读写二进制文件,快速排序和归并排序也实现了高效的并行,整个程序几乎全程并行处理,效率翻了几番。最后便是一些细节的优化,比如原子操作,读写缓冲区等等。看似简单的一个排序算法,其中包涵的知识竟是如此丰富。 |