并行归并排序的实现及其性能优化

串行归并排序的时间复杂度为 $O(nlogn)$。与快速排序类似,它的递归过程也可以并行处理,并且dx优化也能收获不错的效果。

串行版本

#include <iostream>
#include <stdio.h>
using namespace std;

void merge(int* a, int low, int mid, int high) {
int i = low, j = mid + 1, size = 0;
int* temp = new int[high - low + 1];
for (; (i <= mid) && (j <= high); size++)
if (a[i] < a[j])
temp[size] = a[i++];
else
temp[size] = a[j++];
while (i <= mid) temp[size++] = a[i++];
for (i = 0; i < size; i++)
a[low + i] = temp[i];
delete[] temp;
}

void merge_sort(int* a, int low, int high) {
if (low >= high)
return;
int mid = (low + high) >> 1;
merge_sort(a, low, mid);
merge_sort(a, mid + 1, high);
merge(a, low, mid, high);
}

int main() {
int n;
cin >> n;
int* a = new int[n];
for (int i = 0; i < n; i++)
scanf("%d", &a[i]);
merge_sort(a, 0, n - 1);
return 0;
}

并行以及优化

显然我们可以将merge_sort中的两次递归并行处理,但每次启动线程都会消耗大量的常数时间,在lowhigh相差很小的时候,并行递归的开销远大于串行递归。于是我们对并行版本的快速排序进行优化,只有当high - low > dx时才并行递归,否则串行递归。

merge_sort函数进行修改,得到:

void merge_sort(int* a, int low, int high) {
if (low >= high)
return;
int mid = (low + high) >> 1;
if (high - low > dx) {
parallel_for(0, 2, [&](int x) {
if (x == 0) merge_sort(a, low, mid);
else merge_sort(a, mid + 1, high);
});
} else {
merge_sort(a, low, mid);
merge_sort(a, mid + 1, high);
}
merge(a, low, mid, high);
}

并行归并

此外,我们还可以将归并操作(merge函数)并行化,下面就主要介绍并行归并算法。

并行归并算法有许多种,比较著名的有Valiant的并行归并算法、Cole的并行归并排序算法。其基本思想都是采取一定方法,将两个有序序列分解成若干个成对的段(这种成对的段叫段组),然后将每个段组都归并好,归并好的各段组自然连成了一个有序序列。

段组分解的基本思想

下面就来讲解一种最简单的基于二分查找的段组分解算法的基本思想。
先看一个实例,有以下两组有序数据,如何将它们进行并行归并呢?

  • 第一组:15 25 33 47 58 59 62 64
  • 第二组:12 18 27 31 36 38 42 80

考虑将第1组数据以中间位置数据47作为枢轴数据平分成两段,如下所示:

  • 第一组:(15 25 33 47)(58 59 62 64)

然后再在第2组数据中使用对半查找方法查找一个刚好小于等于枢轴数据47的数据42,以42为枢轴将第2组数据分成两段如下:

  • 第二组:(12 18 27 31 36 38 42)(80)

此时可以发现,第一组的第1段数据和第二组的第1段数据均小于第1组的第2段或第二组的第2段数据。
将第一组的第1段数据和第二组的第1段数据进行归并,得到以下数据序列:

  • 第一段归并:(12 15 18 25 27 31 33 36 38 42 47)

将第一组的第2段数据和第二组的第2段数据进行归并,

  • 第二段归并:(58 59 62 64 80)

很容易发现,上面两段归并后的数据连起来后,自然就形成了一个有序系列。由于上面两次归并操作都是独立的,因此它可以并行地进行。这种可以归并的两个成对数据段就是前面说过的段组。

47在第一组数据中的位置是342在第二组数据中的位置是6。位置对(3,6)将上面两组数据分成了两个可以并行归并的段组。

复杂度分析

从上面的例子可以看出,在第一组数据的枢轴数据47确定以后,在第二组数据中使用折半查找方法找到刚好小于等于枢轴数据47的数据42,时间复杂度为$O(logn)$。为了让效率最优化,每个线程的负载应该尽可能均衡,这就要求选取的位置对能够将原数据分成元素数量相对均衡的段组。于是我们也应使用折半查找确定第一组数据的枢轴的合适位置,整体操作时间复杂度$O((logn)^2)$。算法流程如下:

  1. 确定两组数据的数量,分别记为n1n2;
  2. 准备折半查找, l = 0, r = n1;
  3. 确定第一组数据的枢轴位置pos1 = (l + r) / 2;(若l >= r则退出)
  4. 在第二组数据中,使用折半查找方法找到对应的枢轴pos2;(具体流程略,时间复杂度$O(logn)$)
  5. 如果分割得到的第一段数据规模比第二段数据规模小,则l = pos1 + 1,回到3;(pos1向后调整)
  6. 如果分割得到的第一段数据规模比第二段数据规模大,则r = pos1 - 1,回到3;(pos1向前调整)

可以看到,这段算法实质上是两个折半查找的嵌套,在折半查找pos1的过程中嵌套折半查找pos2,整体时间复杂度$O((logn)^2)$。

查找枢轴的实现代码如下(第一组数据为a[low ... mid],第二组数据为a[mid + 1 ... high]):

int seek_pos[MAX_THREADS + 1][3] = {}, n1 = mid - low + 1, n2 = high - mid;
for (int i = 1; i < MAX_THREADS; i++) {//找第i个枢轴
int l1 = low, r1 = mid, pos1 = mid, pos2 = high;
while (r1 - l1 > 0) {
pos1 = (l1 + r1) / 2;//折半查找,先假定第一组数据的枢轴
int l2 = mid + 1, r2 = high + 1;
while (r2 - l2 > 0) {//再用折半查找确定第二组的枢轴
pos2 = (l2 + r2) / 2;
if (a[pos1] <= a[pos2])
r2 = pos2;
else
l2 = pos2 + 1;
}
pos2 = r2;
if ((pos1 + pos2 - low - mid) * MAX_THREADS < (n1 + n2) * i)
l1 = pos1 + 1;
else
r1 = pos1 - 1;
}
seek_pos[i][1] = pos1;//将枢轴位置记录到数组中
seek_pos[i][2] = pos2;
seek_pos[i][0] = seek_pos[i][1] + seek_pos[i][2] - low - mid - 1;//输出枢轴位置
}
//边界条件
seek_pos[0][1] = low;
seek_pos[0][2] = mid + 1;
seek_pos[0][0] = seek_pos[0][1] + seek_pos[0][2] - low - mid - 1;
seek_pos[MAX_THREADS][1] = mid + 1;
seek_pos[MAX_THREADS][2] = high + 1;
seek_pos[MAX_THREADS][0] = seek_pos[MAX_THREADS][1] + seek_pos[MAX_THREADS][2] - low - mid - 1;

上述代码中,seek_pos[][]数组记录了各对枢轴的位置。(如果计划启动 4 个线程,则需要查找 3 对枢轴,将原数据等分 4 份)其中,seek_pos[x][1]记录了第一组数据的第x个枢轴, seek_pos[x][2]记录了第二组数据的第x个枢轴, seek_pos[x][0]记录了即将归并得到的有序序列(即下面代码中的temp[]数组)中的第x个枢轴。

接下来便是并行归并操作,将归并的结果暂存到temp[]数组中:

int* temp = new int[high - low + 1];
parallel_for(0, MAX_THREADS, [&](int x) {
int i = seek_pos[x][1], j = seek_pos[x][2], k = seek_pos[x][0];
while (i < seek_pos[x + 1][1] && j < seek_pos[x + 1][2])
if (a[i] < a[j])
temp[k++] = a[i++];
else
temp[k++] = a[j++];
while (i < seek_pos[x + 1][1]) temp[k++] = a[i++];
while (j < seek_pos[x + 1][2]) temp[k++] = a[j++];
});

所有的线程都完成了归并操作后,将temp[]数组中的内容拷贝回原数组中对应的位置,则完成了对a[]数组[low, high]区域的归并:

parallel_for(0, MAX_THREADS, [&](int x) {
for (int i = seek_pos[x][0]; i < seek_pos[x + 1][0]; i++)
a[low + i] = temp[i];
});
delete[] temp;

将上述代码片段拼在一起,便得到了并行归并的函数parallel_merge ,替换串行代码中的merge函数即可:

void parallel_merge(int* a, int low, int mid, int high) {//[low, mid], [mid + 1, high]
int seek_pos[MAX_THREADS + 1][3] = {}, n1 = mid - low + 1, n2 = high - mid;
for (int i = 1; i < MAX_THREADS; i++) {
int l1 = low, r1 = mid, pos1 = mid, pos2 = high;
while (r1 - l1 > 0) {
pos1 = (l1 + r1) / 2;
int l2 = mid + 1, r2 = high + 1;
while (r2 - l2 > 0) {
pos2 = (l2 + r2) / 2;
if (a[pos1] <= a[pos2]) r2 = pos2;
else l2 = pos2 + 1;
}
pos2 = r2;
if ((pos1 + pos2 - low - mid) * MAX_THREADS < (n1 + n2) * i) l1 = pos1 + 1;
else r1 = pos1 - 1;
}
seek_pos[i][1] = pos1;
seek_pos[i][2] = pos2;
seek_pos[i][0] = seek_pos[i][1] + seek_pos[i][2] - low - mid - 1;
}
seek_pos[0][1] = low;
seek_pos[0][2] = mid + 1;
seek_pos[0][0] = seek_pos[0][1] + seek_pos[0][2] - low - mid - 1;
seek_pos[MAX_THREADS][1] = mid + 1;
seek_pos[MAX_THREADS][2] = high + 1;
seek_pos[MAX_THREADS][0] = seek_pos[MAX_THREADS][1] + seek_pos[MAX_THREADS][2] - low - mid - 1;

int* temp = new int[high - low + 1];
parallel_for(0, MAX_THREADS, [&](int x) {
int i = seek_pos[x][1], j = seek_pos[x][2], k = seek_pos[x][0];
while (i < seek_pos[x + 1][1] && j < seek_pos[x + 1][2])
if (a[i] < a[j])
temp[k++] = a[i++];
else
temp[k++] = a[j++];
while (i < seek_pos[x + 1][1]) temp[k++] = a[i++];
while (j < seek_pos[x + 1][2]) temp[k++] = a[j++];
});
parallel_for(0, MAX_THREADS, [&](int x) {
for (int i = seek_pos[x][0]; i < seek_pos[x + 1][0]; i++)
a[low + i] = temp[i];
});
delete[] temp;
}

实际应用情况

在数据规模较小时,由于寻找枢轴、启动线程的开销相对较大,并行归并的运行效率甚至不及串行归并。只有在数据规模极大,使得寻找枢轴、启动线程的开销可以忽略的情况下(比如外排序),并行归并才能获得比较可观的加速效果。

参考文献:《多核计算与程序设计》周伟明著 华中科技大学出版社