并行外排序的实现及其性能优化

在算法中,排序一直是个经典的问题,不同规模的数据量对应了不同的排序算法。当数据规模较小时(1e3 以下),我们可以使用冒泡排序、插入排序等简单的排序算法来解决;当数据规模比较大时(比如达到 1e8 ),优化时间复杂度成为了解决问题的关键,于是我们采用快速排序、归并排序等时间复杂度为$O(nlogn)$的算法来解决;当数据规模巨大时(比如达到 1e10),普通计算机的内存已无法容纳下这些数据,空间复杂度成为了解决问题的关键。对于这种情况,我们可以采用外排序来解决问题。下面我们便来详细介绍一下外排序的原理、实现、以及优化。(环境:Windows 10 Pro x64 + Visual Studio 2015 Professional)

原理

比如我们需要对一个 10GB 的数据文件排序,而计算机内存仅有 4GB ,将整个文件读入内存进行排序显然不可行。我们可以将这个 10GB 的大文件分区为 100 个 100MB 的小文件,把这些小文件的数据依次读入内存、排序、再输出,于是我们便得到了 100 个各自有序的小文件。接下来再将这 100 个小文件两两归并,便得到了一个有序的大文件,完成了排序操作。

实现

从原理上看,外排序的实现似乎并不繁琐。但在实际中,如果我们仅仅使用普通串行算法作为实现,整个程序的效率是非常低的。尽可能多的并行化显然会大幅提升效率。

因此,请先了解以下预备知识:

下面具体谈谈实现细节。

分区 & 排序

对于待排序的大文件,我们首先需要定义每个分区最多元素数量PARTITION_SIZE。比如待排序文件中有 5500 个元素,定义PARTITION_SIZE = 1000,那么我们便需要创建 6 个分区,分区元素数量分别为1000, 1000, 1000, 1000, 1000, 500。接着,我们对这 6 个分区单独进行快速排序即可。

对于第i个分区,我们首先将待排序文件中对应该分区部分的元素并行读入内存,代码如下:

void parallel_read() {
int each_get[4] = {}, all_get = 0;
parallel_for(0, MAX_THREADS, [&](long long x) {
FILE* fin = fopen(in_file.c_str(), "rb");
fpos_t pos = (PARTITION_SIZE * i + EACH_NUM * x) * sizeof(int);
if (fsetpos(fin, &pos) == 0)
each_get[x] = fread(arr + EACH_NUM * x, sizeof(int), EACH_NUM, fin);
fclose(fin);
});
for (int i = 0; i < MAX_THREADS; i++) all_get += each_get[i];
}

接下来,我们需要对arr[]all_get个元素进行快速排序,并行快速排序代码如下:

void parallel_qsort(int *begin, int *end) {
if (begin >= end - 1) return;
int *key = rand() % (end - begin) + begin;
swap(*key, *begin);
int *i = begin, *j = begin;
for (key = begin; j < end; j++) {
if (*j < *key) {
i++;
swap(*i, *j);
}
}
swap(*begin, *i);
if (i - begin > dx && end - i > dx) {
parallel_for(0, 2, [&](int x) {
if (x) parallel_qsort(begin, i);
else parallel_qsort(i + 1, end);
});
} else {
parallel_qsort(begin, i);
parallel_qsort(i + 1, end);
}
}

调用方法如下:

parallel_qsort(arr, arr + all_get);

然后,我们需要将arr[]中的内容写入分区文件。为了并行写入,首先需要创建文件并调整其大小:

void create_file() {
string tmp_file = "temp\\part" + to_string(parts++) + ".dat";
FILE *fout = fopen(tmp_file.c_str(), "wb");
chsize(fileno(fout), all_get * sizeof(int));
fclose(fout);
}

最后,将arr[]写入文件中:

parallel_for(0, MAX_THREADS, [&](long long x) {
FILE* fout = fopen(tmp_file.c_str(), "rb+");
fpos_t pos = EACH_NUM * x * sizeof(int);
if (fsetpos(fout, &pos) == 0)
fwrite(arr + EACH_NUM * x, sizeof(int), each_get[x], fout);
fclose(fout);
});

当然,这些只是在建立第i个分区时需要进行的操作。完整的分区操作还需要在最外层套一个循环:

for (long long i = 0; i < (n - 1) / PARTITION_SIZE + 1; i++) {

将上述代码片段整合起来,便得到了分区 & 排序模块的全部代码:

void partition_and_sort(string in_file, long long n) {
int* arr = new int[PARTITION_SIZE];
for (long long i = 0; i < (n - 1) / PARTITION_SIZE + 1; i++) {
int each_get[4] = {}, all_get = 0;
clock_t start_time = clock();
parallel_for(0, MAX_THREADS, [&](long long x) {
FILE* fin = fopen(in_file.c_str(), "rb");
fpos_t pos = (PARTITION_SIZE * i + EACH_NUM * x) * sizeof(int);
if (fsetpos(fin, &pos) == 0)
each_get[x] = fread(arr + EACH_NUM * x, sizeof(int), EACH_NUM, fin);
fclose(fin);
});
clock_t end_time = clock();
for (int i = 0; i < MAX_THREADS; i++) all_get += each_get[i];
cout << "\nRead " << all_get << " numbers from \"" << in_file << "\". "
<< "Time usage = " << end_time - start_time << "ms.\n";

start_time = clock();
parallel_qsort(arr, arr + all_get);
end_time = clock();
cout << "Sorting finished. Time usage = " << end_time - start_time << "ms.\n";

string tmp_file = "temp\\part" + to_string(parts++) + ".dat";
FILE *fout = fopen(tmp_file.c_str(), "wb");
chsize(fileno(fout), all_get * sizeof(int));
fclose(fout);

start_time = clock();
parallel_for(0, MAX_THREADS, [&](long long x) {
FILE* fout = fopen(tmp_file.c_str(), "rb+");
fpos_t pos = EACH_NUM * x * sizeof(int);
if (fsetpos(fout, &pos) == 0)
fwrite(arr + EACH_NUM * x, sizeof(int), each_get[x], fout);
fclose(fout);
});
end_time = clock();
cout << "Part " << parts - 1 << " saved to file \"" << tmp_file << "\". "
<< "Time usage = " << end_time - start_time << "ms.\n";
}
delete[] arr;
}

这就是分区 & 排序模块,读入、排序、写出全部并行化处理,效率较高。

归并

这是外排序核心阶段,整个程序的效率高低便取决于该阶段的算法。由于待归并的数据规模较大,使用并行归并算法可以显著提升效率。我们需要先找到各对枢轴,将原数据划分成均匀的几部分,再启动多个线程进行归并,具体算法原理在《并行归并排序及其性能优化》中有详细的阐述。

完成归并操作的merge_file函数需要传入两个string类型的参数,表示需要归并的两组数据的文件名,返回归并后的数据的文件名:

string merge_file(string in_file1, string in_file2) {}

在这个函数中,我们首先需要获取到两个输入文件的大小(从而得到元素数量):

void get_size() {
FILE* fin1 = fopen(in_file1.c_str(), "rb");
FILE* fin2 = fopen(in_file2.c_str(), "rb");
fseek(fin1, 0, SEEK_END);
fseek(fin2, 0, SEEK_END);
fpos_t size1 = 0, size2 = 0;
fgetpos(fin1, &size1);
fgetpos(fin2, &size2);
}

于是size1 / sizeof(int)便是第一个文件中的元素数量,size2 / sizeof(int)便是第二个文件中的元素数量。
接着,我们需要寻找枢轴位置。在折半查找时,由于不能将整个文件全部读入内存,我们只能通过fsetposfread函数来读取该位置的元素的值。这两个函数时间复杂度均为O(1) ,尽管涉及磁盘操作,但由于折半查找的效率很高,访问磁盘的次数并不多,开销相对较小。

void get_fpos() {
long long seek_pos[MAX_THREADS + 1][3] = {}, n1 = size1 / 4, n2 = size2 / 4;
for (long long i = 1; i < MAX_THREADS; i++) {//找第i个枢轴
long long pos1, pos2, fpos, l1 = 0, r1 = n1 - 1;
int *get1 = new int;
while (r1 - l1 > 1) {
pos1 = (l1 + r1) / 2;//二分查找,先假定file1的枢轴
fpos = pos1 * sizeof(int);
fsetpos(fin1, &fpos);
fread(get1, sizeof(int), 1, fin1);
long long l2 = 0, r2 = n2;
int* get2 = new int;

while (r2 - l2 > 0) {//再用二分查找确定file2的枢轴
pos2 = (l2 + r2) / 2;
fpos = pos2 * sizeof(int);
fsetpos(fin2, &fpos);
fread(get2, sizeof(int), 1, fin2);
if (*get1 <= *get2)
r2 = pos2;
else
l2 = pos2 + 1;
}
delete get2; get2 = NULL;
pos2 = r2;

//如果这两个枢轴将file1和file2划分的不够均匀,则对pos1进行调整
if ((pos1 + pos2) * long long(MAX_THREADS) < (n1 + n2) * i)
l1 = pos1 + 1;
else
r1 = pos1 - 1;
}
delete get1; get1 = NULL;
seek_pos[i][1] = pos1 * sizeof(int);//记录file1枢轴位置
seek_pos[i][2] = pos2 * sizeof(int);//记录file2枢轴位置
seek_pos[i][0] = seek_pos[i][1] + seek_pos[i][2];//输出文件枢轴位置
}
fclose(fin1); fclose(fin2);
//边界细节
seek_pos[0][0] = seek_pos[0][1] = seek_pos[0][2] = 0;
seek_pos[MAX_THREADS][1] = n1 * sizeof(int);
seek_pos[MAX_THREADS][2] = n2 * sizeof(int);
seek_pos[MAX_THREADS][0] = seek_pos[MAX_THREADS][1] + seek_pos[MAX_THREADS][2];
}

在上面的代码中,如果MAX_THREADS = 4,程序就会查找 3 个枢轴位置,将待归并的两组数据均分为 4 个规模一致的段组,使负载尽可能均衡。
创建输出文件,并预先调整其大小(为了并行写入):

void create_output_file() {
string out_file = "temp\\part" + to_string(parts++) + ".dat";
FILE *fout = fopen(out_file.c_str(), "wb");
_chsize_s(fileno(fout), (n1 + n2) * sizeof(int));
}

需要强调的是,在这段代码中,parts是一个全局变量,用来统计已经创建的文件数量并作为新文件的文件名。(比如前一阶段 partition操作将原数据分割为5part,分别命名为part0.dat, part1.datpart4.dat, 那么在归并阶段形成的新文件将继续命名为 part5.dat, part6.dat … 比如 part0.datpart1.dat归并得到part5.datpart3.datpart4.dat归并得到part6.dat…)由于此时可能有多个线程在执行merge_file函数,同时访问全局变量可能会发生访问冲突,故parts必须定义为atomic_int类型,实现原子操作。

接下来便是归并操作:

//多线程归并操作,每个线程只负责归并对应的段组
parallel_for(0, MAX_THREADS, [&](int x) {
FILE* fin1 = fopen(in_file1.c_str(), "rb");
FILE* fin2 = fopen(in_file2.c_str(), "rb");
FILE* fout = fopen(out_file.c_str(), "rb+");
//根据枢轴位置确定读入起点
fsetpos(fin1, &seek_pos[x][1]);
fsetpos(fin2, &seek_pos[x][2]);
fsetpos(fout, &seek_pos[x][0]);
//输入输出缓冲,优化读写性能
int *buf0 = new int[BUFFER_SIZE];//输出文件缓冲
int *buf1 = new int[BUFFER_SIZE];//fin1缓冲
int *buf2 = new int[BUFFER_SIZE];//fin2缓冲
int i = 0, j = 0, k = 0;//i, j, k分别是buf1,buf2,buf0的数组下标
//该线程应该从file1中读取all1个数据,从file2中读取all2个数据
long long all1 = (seek_pos[x + 1][1] - seek_pos[x][1]) / sizeof(int);
long long all2 = (seek_pos[x + 1][2] - seek_pos[x][2]) / sizeof(int);
//先读取到缓冲区
fread(buf1, sizeof(int), BUFFER_SIZE, fin1);
fread(buf2, sizeof(int), BUFFER_SIZE, fin2);
while (all1 > 0 && all2 > 0) {//归并排序
if (buf1[i] < buf2[j]) {
buf0[k++] = buf1[i++]; all1--;
if (i == BUFFER_SIZE) {//如果缓冲区读完了,就更新缓冲区,重置i
fread(buf1, sizeof(int), BUFFER_SIZE, fin1);
i = 0;
}
} else {
buf0[k++] = buf2[j++]; all2--;
if (j == BUFFER_SIZE) {
fread(buf2, sizeof(int), BUFFER_SIZE, fin2);
j = 0;
}
}
if (k == BUFFER_SIZE) {//如果缓冲区写满了,就全部写入文件,重置k
fwrite(buf0, sizeof(int), BUFFER_SIZE, fout);
k = 0;
}
}
while (all1 > 0) {//归并流程-如果file1中还有剩余数据,直接追加输出
buf0[k++] = buf1[i++]; all1--;
if (i == BUFFER_SIZE) {
fread(buf1, sizeof(int), BUFFER_SIZE, fin1);
i = 0;
}
if (k == BUFFER_SIZE) {
fwrite(buf0, sizeof(int), BUFFER_SIZE, fout);
k = 0;
}
}
while (all2 > 0) {//归并流程-如果file2中还有剩余数据,直接追加输出
buf0[k++] = buf2[j++]; all2--;
if (j == BUFFER_SIZE) {
fread(buf2, sizeof(int), BUFFER_SIZE, fin2);
j = 0;
}
if (k == BUFFER_SIZE) {
fwrite(buf0, sizeof(int), BUFFER_SIZE, fout);
k = 0;
}
}
fwrite(buf0, sizeof(int), k, fout);//写入输出文件
fclose(fin1);
fclose(fin2);
fclose(fout);
delete[] buf0;
delete[] buf1;
delete[] buf2;
});

在归并操作中,一个非常重要的优化便是输入输出缓冲区。输入缓冲即预先从文件一次性读入BUFFER_SIZE个数据,之后频繁的读取操作便从缓冲区获取,当缓冲区读完时(即i, j指向了 buf1buf2的尾端),再一次性从文件中读取 BUFFER_SIZE个数据,重置ij ;输出缓冲即将数据先写入缓冲区,待缓冲区填满后(即k指向buf0的尾端)再一次性写入文件。缓冲区的设立,避免了频繁的IO操作,配合多线程可以使读写磁盘速率达到最大,性能大幅提升。

归并操作完成后,删除两个输入文件,返回归并后的文件:

system(("del " + in_file1).c_str());
system(("del " + in_file2).c_str());
return out_file;

于是merge_file函数编写完毕,我们需要递归调用它,直到所有的小分区都两两归并完成。递归的过程可以并行处理(不过由于merge_file函数本身已做并行化处理,此处串行递归亦可,效率不会太差):

string merge(int l, int r) {
if (l == r) return "temp\\part" + to_string(l) + ".dat";
int mid = (l + r) / 2;
string file1, file2;
parallel_for(0, 2, [&](int x) {//此处使用串行递归亦可,不过效率略低一点
if (x == 0) file1 = merge(l, mid);
if (x == 1) file2 = merge(mid + 1, r);
});
return merge_file(file1, file2);
}

整合

整个排序流程都已讲解完毕,下面是该程序的全部源代码:

#include <iostream>
#include <stdio.h>
#include <cstring>
#include <string>
#include <atomic>
#include <Windows.h>
#include <ppl.h>
#include <io.h>
#include <time.h>
#define MAX_THREADS 4//最多线程数量
using namespace std;
using namespace concurrency;
const int dx = 20;//并行快速排序的dx优化
const long long PARTITION_SIZE = 100000000;//分区大小
const long long BUFFER_SIZE = 10000000;//输入输出缓冲区大小
const long long EACH_NUM = (PARTITION_SIZE / MAX_THREADS);
atomic_int parts;

void parallel_qsort(int *begin, int *end) {//并行快速排序
if (begin >= end - 1) return;
int *key = rand() % (end - begin) + begin;
swap(*key, *begin);
int *i = begin, *j = begin;
for (key = begin; j < end; j++) {
if (*j < *key) {
i++;
swap(*i, *j);
}
}
swap(*begin, *i);
if (i - begin > dx && end - i > dx) {//dx优化
parallel_for(0, 2, [&](int x) {
if (x) parallel_qsort(begin, i);
else parallel_qsort(i + 1, end);
});
} else {
parallel_qsort(begin, i);
parallel_qsort(i + 1, end);
}
}

void partition_and_sort(string in_file, long long n) {
int* arr = new int[PARTITION_SIZE];
for (long long i = 0; i < (n - 1) / PARTITION_SIZE + 1; i++) {//准备建立第i个分区
int each_get[4] = {}, all_get = 0;

//并行读取int数据至arr[]中,每个线程计划读取EACH_NUM个,一共计划读取PARTITION_SIZE个
//每个线程实际读取each_get[x]个,实际一共读取all_get个
clock_t start_time = clock();
parallel_for(0, MAX_THREADS, [&](long long x) {
FILE* fin = fopen(in_file.c_str(), "rb");
fpos_t pos = (PARTITION_SIZE * i + EACH_NUM * x) * sizeof(int);
if (fsetpos(fin, &pos) == 0)
each_get[x] = fread(arr + EACH_NUM * x, sizeof(int), EACH_NUM, fin);
fclose(fin);
});
clock_t end_time = clock();
for (int i = 0; i < MAX_THREADS; i++) all_get += each_get[i];
cout << "\nRead " << all_get << " numbers from \"" << in_file << "\". "
<< "Time usage = " << end_time - start_time << "ms.\n";

//对arr进行并行快速排序
start_time = clock();
parallel_qsort(arr, arr + all_get);
end_time = clock();
cout << "Sorting finished. Time usage = " << end_time - start_time << "ms.\n";

//创建分区文件并调整大小
string tmp_file = "temp\\part" + to_string(parts++) + ".dat";
FILE *fout = fopen(tmp_file.c_str(), "wb");
chsize(fileno(fout), all_get * sizeof(int));
fclose(fout);

//并行将arr全部写入分区文件
start_time = clock();
parallel_for(0, MAX_THREADS, [&](long long x) {
FILE* fout = fopen(tmp_file.c_str(), "rb+");
fpos_t pos = EACH_NUM * x * sizeof(int);
if (fsetpos(fout, &pos) == 0)
fwrite(arr + EACH_NUM * x, sizeof(int), each_get[x], fout);
fclose(fout);
});
end_time = clock();
cout << "Part " << parts - 1 << " saved to file \"" << tmp_file << "\". "
<< "Time usage = " << end_time - start_time << "ms.\n";
}
delete[] arr;
}

string merge_file(string in_file1, string in_file2) {//将两个文件归并
string out_file = "temp\\part" + to_string(parts++) + ".dat";
//首先获取两个文件的长度
FILE* fin1 = fopen(in_file1.c_str(), "rb");
FILE* fin2 = fopen(in_file2.c_str(), "rb");
clock_t seekpos_start_time = clock();
fseek(fin1, 0, SEEK_END);
fseek(fin2, 0, SEEK_END);
fpos_t size1 = 0, size2 = 0;
fgetpos(fin1, &size1);
fgetpos(fin2, &size2);

long long seek_pos[MAX_THREADS + 1][3] = {}, n1 = size1 / 4, n2 = size2 / 4;
for (long long i = 1; i < MAX_THREADS; i++) {//找第i个枢轴
long long pos1, pos2, fpos, l1 = 0, r1 = n1 - 1;
int *get1 = new int;
while (r1 - l1 > 1) {
pos1 = (l1 + r1) / 2;//二分查找,先假定file1的枢轴
fpos = pos1 * sizeof(int);
fsetpos(fin1, &fpos);
fread(get1, sizeof(int), 1, fin1);
long long l2 = 0, r2 = n2;
int* get2 = new int;

while (r2 - l2 > 0) {//再用二分查找确定file2的枢轴
pos2 = (l2 + r2) / 2;
fpos = pos2 * sizeof(int);
fsetpos(fin2, &fpos);
fread(get2, sizeof(int), 1, fin2);
if (*get1 <= *get2)
r2 = pos2;
else
l2 = pos2 + 1;
}
delete get2; get2 = NULL;
pos2 = r2;

//如果这两个枢轴将file1和file2划分的不够均匀,则对pos1进行调整
if ((pos1 + pos2) * long long(MAX_THREADS) < (n1 + n2) * i)
l1 = pos1 + 1;
else
r1 = pos1 - 1;
}
delete get1; get1 = NULL;
seek_pos[i][1] = pos1 * sizeof(int);//记录file1枢轴位置
seek_pos[i][2] = pos2 * sizeof(int);//记录file2枢轴位置
seek_pos[i][0] = seek_pos[i][1] + seek_pos[i][2];//输出文件枢轴位置
}
fclose(fin1); fclose(fin2);
//边界细节
seek_pos[0][0] = seek_pos[0][1] = seek_pos[0][2] = 0;
seek_pos[MAX_THREADS][1] = n1 * sizeof(int);
seek_pos[MAX_THREADS][2] = n2 * sizeof(int);
seek_pos[MAX_THREADS][0] = seek_pos[MAX_THREADS][1] + seek_pos[MAX_THREADS][2];
clock_t seekpos_end_time = clock();

//调整输出文件大小
clock_t chsize_start_time = clock();
FILE *fout = fopen(out_file.c_str(), "wb");
_chsize_s(fileno(fout), (n1 + n2) * sizeof(int));
fclose(fout);
clock_t chsize_end_time = clock();

//多线程归并操作,每个线程只负责归并对应的段组
clock_t merge_start_time = clock();
parallel_for(0, MAX_THREADS, [&](int x) {
FILE* fin1 = fopen(in_file1.c_str(), "rb");
FILE* fin2 = fopen(in_file2.c_str(), "rb");
FILE* fout = fopen(out_file.c_str(), "rb+");
//根据枢轴位置确定读入起点
fsetpos(fin1, &seek_pos[x][1]);
fsetpos(fin2, &seek_pos[x][2]);
fsetpos(fout, &seek_pos[x][0]);
//输入输出缓冲,优化读写性能
int *buf0 = new int[BUFFER_SIZE];
int *buf1 = new int[BUFFER_SIZE];
int *buf2 = new int[BUFFER_SIZE];
int i = 0, j = 0, k = 0;//i, j, k分别是buf1,buf2,buf0的数组下标
//该线程应该从file1中读取all1个数据,从file2中读取all2个数据
long long all1 = (seek_pos[x + 1][1] - seek_pos[x][1]) / sizeof(int);
long long all2 = (seek_pos[x + 1][2] - seek_pos[x][2]) / sizeof(int);
//先读取到缓冲区
fread(buf1, sizeof(int), BUFFER_SIZE, fin1);
fread(buf2, sizeof(int), BUFFER_SIZE, fin2);
while (all1 > 0 && all2 > 0) {//归并排序
if (buf1[i] < buf2[j]) {
buf0[k++] = buf1[i++]; all1--;
if (i == BUFFER_SIZE) {//如果缓冲区读完了,就更新缓冲区,重置i
fread(buf1, sizeof(int), BUFFER_SIZE, fin1);
i = 0;
}
} else {
buf0[k++] = buf2[j++]; all2--;
if (j == BUFFER_SIZE) {
fread(buf2, sizeof(int), BUFFER_SIZE, fin2);
j = 0;
}
}
if (k == BUFFER_SIZE) {//如果缓冲区写满了,就全部写入文件,重置k
fwrite(buf0, sizeof(int), BUFFER_SIZE, fout);
k = 0;
}
}
while (all1 > 0) {//归并流程-如果file1中还有剩余数据,直接追加输出
buf0[k++] = buf1[i++]; all1--;
if (i == BUFFER_SIZE) {
fread(buf1, sizeof(int), BUFFER_SIZE, fin1);
i = 0;
}
if (k == BUFFER_SIZE) {
fwrite(buf0, sizeof(int), BUFFER_SIZE, fout);
k = 0;
}
}
while (all2 > 0) {//归并流程-如果file2中还有剩余数据,直接追加输出
buf0[k++] = buf2[j++]; all2--;
if (j == BUFFER_SIZE) {
fread(buf2, sizeof(int), BUFFER_SIZE, fin2);
j = 0;
}
if (k == BUFFER_SIZE) {
fwrite(buf0, sizeof(int), BUFFER_SIZE, fout);
k = 0;
}
}
fwrite(buf0, sizeof(int), k, fout);//写入输出文件
fclose(fin1);
fclose(fin2);
fclose(fout);
delete[] buf0;
delete[] buf1;
delete[] buf2;
});
clock_t merge_end_time = clock();
Sleep(100);
//输入文件的数据已归并至新文件中,不再需要,删除。
system(("del " + in_file1).c_str());
system(("del " + in_file2).c_str());
cout << "\nPart \"" << in_file1 << "\" and \"" << in_file2 << "\" merged, "
<< "result saved to \"" << out_file << "\".\n"
<< "Time usage: seek_pos: " << seekpos_end_time - seekpos_start_time << "ms, "
<< "chsize: " << chsize_end_time - chsize_start_time << "ms, "
<< "parallel_merge: " << merge_end_time - merge_start_time << "ms.\n";
return out_file;
}

string merge(int l, int r) {//递归归并操作
if (l == r) return "temp\\part" + to_string(l) + ".dat";
int mid = (l + r) / 2;
string file1, file2;
parallel_for(0, 2, [&](int x) {//此处使用串行递归亦可,不过效率略低一点
if (x == 0) file1 = merge(l, mid);
if (x == 1) file2 = merge(mid + 1, r);
});
return merge_file(file1, file2);
}

int main() {
string in_file, out_file;
cout << "Enter data file name: ";
cin >> in_file;
FILE* fin = fopen(in_file.c_str(), "rb");
if (fin == NULL) {
cout << "Could not open that file.\n";
main();
}
clock_t start_time = clock();
//获取输入文件的大小
fseek(fin, 0, SEEK_END);
fpos_t pos = 0;
fgetpos(fin, &pos);
fclose(fin);
//创建临时文件目录
system("mkdir temp");
//将待排序大文件分区,并对各个小分区进行快速排序
partition_and_sort(in_file, pos / 4);
cout << "\nStart merging...\n";
//将各个小分区归并
out_file = merge(0, parts - 1);
system(("move " + out_file + " ans.dat").c_str());
system("rd temp");
clock_t end_time = clock();
cout << "External sorting complete, result saved to \"ans.dat\".\n"
<< "Time usage = " << end_time - start_time << "ms.\n";
system("pause");
return 0;
}

运行结果

在13寸的MacBook Pro(2015年版)上(不得不特意装Windows跑程序…谁让我菜到只会写Win下的多线程…),排序 1e10 个int数据( 37.2GB ),耗时约 30min,经过验证,结果正确。在屏幕上得到的输出如下(部分输出省略):

Enter data file name: 1e10.dat

Read 100000000 numbers from "1e10.dat". Time usage = 399ms.
Sorting finished. Time usage = 5457ms.
Part 0 saved to file "temp\part0.dat". Time usage = 789ms.

Read 100000000 numbers from "1e10.dat". Time usage = 410ms.
Sorting finished. Time usage = 5484ms.
Part 1 saved to file "temp\part1.dat". Time usage = 1193ms.

......

Read 100000000 numbers from "1e10.dat". Time usage = 371ms.
Sorting finished. Time usage = 5532ms.
Part 98 saved to file "temp\part98.dat". Time usage = 754ms.

Read 100000000 numbers from "1e10.dat". Time usage = 457ms.
Sorting finished. Time usage = 5471ms.
Part 99 saved to file "temp\part99.dat". Time usage = 695ms.

Start merging...

Part "temp\part97.dat" and "temp\part98.dat" merged, result saved to "temp\part1
04.dat".
Time usage: seek_pos: 17ms, chsize: 6932ms, parallel_merge: 10973ms.

......

Part "temp\part196.dat" and "temp\part187.dat" merged, result saved to "temp\par
t197.dat".
Time usage: seek_pos: 67ms, chsize: 44997ms, parallel_merge: 53936ms.

Part "temp\part197.dat" and "temp\part192.dat" merged, result saved to "temp\par
t198.dat".
Time usage: seek_pos: 95ms, chsize: 82435ms, parallel_merge: 80070ms.
1 file(s) moved.
The directory is not empty.
External sorting complete, result saved to "ans.dat".
Time usage = 2097036ms.
Press any key to continue . . .

总结

外排序的算法不难,最开始的时候我的实现也很简单:分区阶段便是读取文本文件、排序、再写到文本文件中;归并阶段也是简单地用单线程读写文本文件,效率极低。后来学会了二进制文件的读写操作,进而实现了多线程读写二进制文件,快速排序和归并排序也实现了高效的并行,整个程序几乎全程并行处理,效率翻了几番。最后便是一些细节的优化,比如原子操作,读写缓冲区等等。看似简单的一个排序算法,其中包涵的知识竟是如此丰富,可谓学无止境!

提升空间

事实证明外排序的效率主要依赖于磁盘,归并阶段采用K路归并可以显著减少IO量,改天不妨试试。

后记–2017.3.17

今天实现了最小堆并行k路归并,同时将并行快速排序改用了并行基数排序,效率倍增。

全部代码如下:

#include <stdio.h>
#include <cstring>
#include <string>
#include <atomic>
#include <queue>
#include <vector>
#include <Windows.h>
#include <ppl.h>
#include <functional>
#include <io.h>
#include <time.h>
#define MAX_THREADS 4
#define MAX_K 100
using namespace std;
using namespace concurrency;
const long long PARTITION_SIZE = 100000000;
const long long BUFFER_SIZE = 200000;
const long long EACH_NUM = (PARTITION_SIZE / MAX_THREADS);
int parts, heapsize[MAX_THREADS];
long long data_size;
mutex m;
typedef pair<int, int> node;
node heap[MAX_THREADS][MAX_K + 10];

inline void up(int idx, int x) {
int fa = x >> 1; node tmp = heap[idx][x];
while (fa) {
if (tmp < heap[idx][fa])//cmp
heap[idx][x] = heap[idx][fa];
else break;
x = fa; fa = x >> 1;
}
heap[idx][x] = tmp;
}

inline void down(int idx, int x) {
int ch = x << 1; node tmp = heap[idx][x];
while (ch <= heapsize[idx]) {
if (ch < heapsize[idx] && heap[idx][ch + 1] < heap[idx][ch]) ch++;//cmp
if (heap[idx][ch] < tmp)//cmp
heap[idx][x] = heap[idx][ch];
else break;
x = ch; ch = x << 1;
}
heap[idx][x] = tmp;
}

inline void push(int idx, node val) {
heap[idx][++heapsize[idx]] = val;
up(idx, heapsize[idx]);
}
inline node top(int idx) { return heap[idx][1]; }
inline void pop(int idx) {
heap[idx][1] = heap[idx][heapsize[idx]--];
down(idx, 1);
}

inline void ch_size(string file_name, fpos_t size) {
FILE *fout = fopen(file_name.c_str(), "wb");
_chsize_s(fileno(fout), size * sizeof(int));
fclose(fout);
}

inline int seek_dat(FILE* &f, fpos_t pos) {
int *get = new int;
pos *= sizeof(int);
fsetpos(f, &pos);
fread(get, sizeof(int), 1, f);
int tmp = *get; delete get;
return tmp;
}

void partition_and_sort(string in_file) {
int *arr = new int[PARTITION_SIZE];
for (long long i = 0; i < (data_size - 1) / PARTITION_SIZE + 1; i++) {
atomic_int each_get[MAX_THREADS + 1] = {};
string tmp_file = "temp\\part" + to_string(i) + ".dat";
clock_t start = clock();
cout << "Reading part " << i << "...";
parallel_for(0, MAX_THREADS, [&](long long x) {
FILE* fin = fopen(in_file.c_str(), "rb");
fpos_t pos = (PARTITION_SIZE * i + EACH_NUM * x) * sizeof(int);
if (fsetpos(fin, &pos) == 0)
each_get[x] = fread(arr + EACH_NUM * x, sizeof(int), EACH_NUM, fin);
each_get[MAX_THREADS] += each_get[x];
fclose(fin);
});
cout << "\rSorting part " << i << "...";
parallel_radixsort(arr, arr + each_get[MAX_THREADS]);
cout << "\rWriting part " << i << "...";
ch_size(tmp_file, each_get[MAX_THREADS]);
parallel_for(0, MAX_THREADS, [&](long long x) {
FILE* fout = fopen(tmp_file.c_str(), "rb+");
fpos_t pos = EACH_NUM * x * sizeof(int);
if (fsetpos(fout, &pos) == 0)
fwrite(arr + EACH_NUM * x, sizeof(int), each_get[x], fout);
fclose(fout);
});
clock_t end = clock();
cout << "\rPart " << i << " established. Time usage = " << end - start << "ms.\n";
}
delete[] arr;
}

void merge_file() {
FILE* fin[MAX_K] = {};
fpos_t size[MAX_K] = {}, seek_pos[MAX_THREADS + 1][MAX_K + 1] = {};
for (int i = 0; i < parts; i++) {
fin[i] = fopen(("temp\\part" + to_string(i) + ".dat").c_str(), "rb");
fseek(fin[i], 0, SEEK_END);
fgetpos(fin[i], &size[i]);
size[i] /= sizeof(int);
seek_pos[MAX_THREADS][parts] += (seek_pos[MAX_THREADS][i] = size[i]);
}

cout << "\nInitializing merging operation...\n";
for (long long i = 1; i < MAX_THREADS; i++) {
fpos_t l0 = 0, r0 = size[0] - 1;
while (r0 - l0 > 1) {
seek_pos[i][parts] = seek_pos[i][0] = (l0 + r0) / 2;
int get0 = seek_dat(fin[0], seek_pos[i][0]);
for (int idx = 1; idx < parts; idx++) {
fpos_t l = 0, r = size[idx];
while (r - l > 0) {
seek_pos[i][idx] = (l + r) / 2;
int get = seek_dat(fin[idx], seek_pos[i][idx]);
if (get0 <= get) r = seek_pos[i][idx];
else l = seek_pos[i][idx] + 1;
}
seek_pos[i][parts] += (seek_pos[i][idx] = r);
}
if (seek_pos[i][parts] * MAX_THREADS < data_size * i) l0 = seek_pos[i][0] + 1;
else r0 = seek_pos[i][0] - 1;
}
}
for (int i = 0; i < parts; i++) fclose(fin[i]);

clock_t start = clock(); atomic_llong all_write = 0;
parallel_for(0, MAX_THREADS, [&](int x) {
FILE *fin[MAX_K] = {}, *fout = fopen("ans.dat", "rb+");
fpos_t fpos = seek_pos[x][parts] * sizeof(int);
fsetpos(fout, &fpos);
int **buf = new int*[MAX_K + 1];
for (int i = 0; i <= MAX_K; i++) buf[i] = new int[BUFFER_SIZE];
int pos[MAX_K + 1] = {};//buffer pos
fpos_t all[MAX_K] = {};
for (int i = 0; i < parts; i++) {
fin[i] = fopen(("temp\\part" + to_string(i) + ".dat").c_str(), "rb");
fpos = seek_pos[x][i] * sizeof(int);
fsetpos(fin[i], &fpos);
all[i] = seek_pos[x + 1][i] - seek_pos[x][i];
fread(buf[i], sizeof(int), BUFFER_SIZE, fin[i]);
}
for (int i = 0; i < parts; i++) {
push(x, node(buf[i][0], i));
pos[i] = 1; all[i]--;
}
while (heapsize[x]) {
if (pos[parts] == BUFFER_SIZE) {
fwrite(buf[parts], sizeof(int), BUFFER_SIZE, fout);
all_write += BUFFER_SIZE;
if (all_write % 1000000 == 0) {
m.lock();
cout << "\rStart merging... " << (all_write * 100) / data_size
<< "% completed.";
m.unlock();
}
pos[parts] = 0;
}
int bel = top(x).second;
buf[parts][pos[parts]++] = top(x).first;
if (all[bel]) {
heap[x][1] = node(buf[bel][pos[bel]], bel); down(x, 1);
if ((++pos[bel]) == BUFFER_SIZE) {
fread(buf[bel], sizeof(int), BUFFER_SIZE, fin[bel]);
pos[bel] = 0;
}
all[bel]--;
} else pop(x);
}
fwrite(buf[parts], sizeof(int), pos[parts], fout);
cout << "\rStart merging... 100% completed.";
for (int i = 0; i < parts; i++) fclose(fin[i]); fclose(fout);
for (int i = 0; i < MAX_K; i++) delete[] buf[i]; delete[] buf;
});
clock_t end = clock();

cout << "\nMerging finished. Time usage = " << end - start << "ms.\n";
}

int main() {
string in_file;
cout << "Enter data file name: ";
cin >> in_file;
FILE* fin = fopen(in_file.c_str(), "rb");
if (fin == NULL) {
cout << "Could not open that file.\n";
main();
}

clock_t start_time = clock();
fseek(fin, 0, SEEK_END);
fgetpos(fin, &data_size);
data_size /= sizeof(int);
parts = (data_size - 1) / PARTITION_SIZE + 1;
fclose(fin);
cout << "\nPartitioning " << data_size << " elements(int)...\n";
system("mkdir temp");
parallel_for(0, 2, [&](int x) {
if (x) partition_and_sort(in_file);
else ch_size("ans.dat", data_size);
});
merge_file();
clock_t end_time = clock();

system("rd /s/q temp");
cout << "\nExternal sorting complete, result saved to \"ans.dat\".\n"
<< "Time usage = " << end_time - start_time << "ms.\n";
system("pause");
return 0;
}

在2015年版MacBook Pro上仅用约 9min 完成排序 1e10 个int数据(37.2GB),相对于并行二路归并的30分钟耗时有了大幅提升。

运行结果如下:

Enter data file name: 1e10.dat

Partitioning 10000000000 elements(int)...
Part 0 established. Time usage = 4266ms.
Part 1 established. Time usage = 3542ms.
Part 2 established. Time usage = 2959ms.
......
Part 98 established. Time usage = 2286ms.
Part 99 established. Time usage = 2280ms.

Initializing merging operation...
Start merging... 100% completed.
Merging finished. Time usage = 316264ms.

External sorting complete, result saved to "ans.dat".
Time usage = 576039ms.

至此,外排序以及我能想到的各种优化均已实现,整个 Project 算是画上句号。