好得很程序员自学网

<tfoot draggable='sEl'></tfoot>

基于内存共享的并行排序算法慢谈

基于内存共享的并行排序算法慢谈

基于内存共享的并行排序算法慢谈(上)

1.前言

并行程序设计的重要性我就不多说了,可悲的是明知它重要,我却没学好。这不还是碰到了,请听题:

请用Python多线程对一个4G以上的文件, 进行外排序,尽量优化性能。假设系统内核数为8,Mem=512MB,关键字是字符串

记得上学期开始上并行程序设计的时候,我还是激情澎湃的。但是后来就萎了。

每次上手一样新东西,我都会到园里翻翻看,但是不幸的是园里关于这块好的文章不太多。

我不知道是什么原因,可能是现成的东西太多了吧。这块骨头不好啃啊,所以打算分几个篇幅来说。

本文的主题是-基于内存共享的并行排序算法。本人水平有限,如果有什么不恰当的,

请大家一定不要吝啬指正,这也是我写本文的目的之一,有讨论才有进步嘛。

2.大纲

1.前言

2.大纲

3.基于内存共享的并行排序设计

4.高级排序算法并行化

5.外排序与内排序

6.实验

3.基于内存共享的并行排序设计

事实上并行里面应用更广泛的是基于消息传递的并行程序设计,我想可能是因为它们可并行的规模级别不一样吧。

通过shell命令:cat /proc/cpuinfo |grep 'processor'|wc -l

就可以查看系统CPU的核数,k=8。

算法的并行化与算法并行部分的依赖性有关,关于这块我也不是很懂,就不多说了。

排序不像查找,查找可以让几个进程并行的独立的工作,这个独立还是永久独立的。

排序的含义决定了他是一个整体相关的操作,好像也只有快速排序能打破它这个限制。

下面我们来具体探讨几种高级排序算法如何并行化。

4.高级排序算法并行化

假设待排序规模为N。

4.1希尔排序

希尔排序算法中的增量序列至今仍没被完全研究透。但我们知道这个序列总是从大到小最终一定要变为1的。

当d>1时,可以有d-1个线程同时并行在各自的序列内进行插入排序。当d=1时就不能并行了。

d>=k的时候,可并行的最多,加速效率是最大的。

那么这里有个问题需要弄清楚:对于一个希尔排序[d 1 ,...d n ],可并行的计算量占整体计算量的比例是多少。

这些可并行的计算量里又有多少加速效率是最大的。这个我们去分析过,但我觉得d=1占的计算比例肯定不小,

就算最好情况也要N次比较。最差情况下的比较次数应该不会超过d n-1 N。有兴趣的可以量化下。

4.2堆排序

堆排序包括初始建堆和筛选。

依赖性好像很大,高手想想办法吧,偶是不行了。

4.3快速排序

快速排序一直是个宠儿,园里关于快速排序并行化的文章倒有:

http://www.cnblogs.com/itfreer/archive/2012/05/14/erlang_in_practise_quick-sort.html

快速排序的并行化好像也很有优势,因为他不需要最后做'统一'处理,而希尔排序和归并排序都需要。

本文也首先考虑用快速排序解决问题,因为它可以最大化大发挥并行的威力。

4.4归并排序

我没研究过k路归并排序里哪个最好,只知道最常用的是2路归并排序。

其并行化有点类似于希尔排序,最后一步也就是合并最后两个有序块时,应该也是不能并行的。

不过归并思想在外排序中很有用,后面会再提到的。

4.5外并行

前面讨论的都是算法内部的并行,事实上也可以是外部并行,就是多个线程对不同的数据并行的排序。

事实上,由于快排的递归思想,它也可以看成是外部的。

基于内存共享的并行排序算法慢谈(中)

题目再现:请用Python多线程对一个4G以上的文件, 进行外排序,尽量优化性能。假设系统内核数为8,Mem=512MB,关键字是字符串

5.外排序与内排序

5.1.归并外排序分析

说到外排序仿佛用的都是归并排序的思想,具体我就不多说了,见园里的:

http://www.cnblogs.com/this-543273659/archive/2011/07/30/2122083.html

整个流程大致如下:

A: 将N个数据分成M个块,在内存中分别对 这M个块各自排完序(快速排序)并输出,产生M个子文件

读入一次N,输出一次N,空间复杂度N/M,时间复杂度N+O(Nlog(N/M))

B: 选出M个d i1 ,i表示来自第i个文件,1表示它是第i个文件里第i小的。

M个文件N个数据都要读入并输出,即IO为2N,空间复杂度M

C: 对这M个数据排序(插入排序),排序后选出第1个假设为d j1 ,将其输出到最终文件,

 并从第j个子文件里再取一个第2小的,凑成M个数据。依次类推直到M个文件中数据被取完。

 时间复杂度O(M 2 )+(N-M)O(M)

总共IO次数4N,空间复杂度max(M,N/M),时间复杂度S=O(4N)+O(Nlog(N/M))+O(M 2 )+(N-M)O(M)

5.1.1.每个块里选一个是不是最好的

假设从排序后的各个文件里选出K>1个,共有MK个数据

空间复杂度max(MK,N/M),时间复杂度S'=O(4N)+O(Nlog(N/M))+O(MK) 2 +(N-MK)/KO(MK)

S-S'=O(M 2 )+(N-M)O(M)-O(MK) 2 -(N-MK)/KO(MK)

这里就不太好分析了,要具体情况具体分析了,不过一般情况下M<=Mem<<N

S-S'=O(M 2 )-O(MK) 2 +a(N-M)M-a(N-MK)M=O(M 2 )-O(MK) 2 +aM 2 (K-1)

估摸着不会产生太大的影响,有兴趣的可以在实验中观察看看,这里就不多说了。

5.1.2.M取多少是最好的

N/Mem<=M<=Mem,要是N/Mem=Mem就完美了。令PS为S中M可影响的近似部分

PS=aNlog(N/M)+bNM 求导得到使得PS最小的M取值竟然是M=a/b,神那,这叫我情何以堪。

昨天跟老大讨论的时候,老大说外排序的主要性能瓶颈就是IO,要减少IO次数,还要使用预取思想,提前把数据导入到内存。

因为IO集中起来连续读要快些。 我问他这个M取多少最好,他的意思是尽量使得N/M接近Mem,最大限度的利用内存。

并且在归并阶段,也尽量的多导入数据。

5.2.其它外排序分析

除了归并外,还有没有更好的,各位大神一起发发力,小弟快不行了。

我想可不可以用快速排序做外排序

A随机选一个数据,扫描文件,将其分成两个子文件,这个临界数据可以先放到内存里。前提是Mem>>N/Mem

B对子文件进行递归操作,直到内存能够容得下分出来的子文件,此时把这个子文件调到内存里做内排序

理想状态下,假设每次快排都能平均分,则

IO代价为O(log 2 (N/Mem)N),排序代价为N/MemO(MemlogMem)

K=O(log 2 (N/Mem)N)+N/MemO(MemlogMem)

令N=8G,Mem=512M,M=16,N/Mem=16则

S=O(4N)+O(Nlog(N/16))+16*16+(N-16)16

K=O(4N)+O(Nlog(N/16))

也就是在磁盘IO相等的情况下,快速外排序还好点。

但是当N/Mem>16,比如N/Mem=32,磁盘IO次数上升到O(5N),这个就不好说了。

如果不是理想状态,IO的次数就更多了, 同样有兴趣的可以做实验看看。

本文还是以归并外排序作为解决策略。

5.3内排序分析

(上)已经提到内排序将使用并行化的快速排序,关于内排序我以前曾分析过

在某些情况下  高级排序算法不一定比低级排序算法好  我这个就不重复说了

 

基于内存共享的并行排序算法慢谈(下)

题目再现:请用Python多线程对一个4G以上的文件, 进行外排序,尽量优化性能。假设系统内核数为8,Mem=512MB,关键字是字符串

6.实验

6.1python多线程

关于python多线程可以参考: http://www.cnblogs.com/holbrook/archive/2012/03/02/2376940.html

python的多线程机制和Java很像,这里就不多讲了。似乎没有涉及到共享变量,锁也不用了。

6.2问题解决思路

  我也想过先划分数据再交给线程,但是没试过,感觉需要很多内存,搞不好还会浪费。

  第三个非常恶劣的问题就是,线程里面新建线程会产生很多问题,具体我说不清楚。

  第四个问题就是,pdb调试无法在线程里面 设置断点,线程的问题无法跟踪,可能是我太蠢了。

由于以上问题,我最好只能换了一种方法,就是用前面提到的算法间并行。

开始时主线程读取数据,每读到一定量时就开辟一个新的线程,把线程插入到一个队列里。

主线程把读到的数据块交给线程排序。线程启动,线程排完序后会将结果输出到文件。

主线程每次读新数据的时候,先判断下是否还有足够内存开瓶新的数据。若没有则等待线程队里的第一个线程结束。

因为第一个线程是最先开辟的,它结束后就可以释放内存了。

当主线程读完所有数据时,等待线程队列里的所有线程结束。

然后主线程使用归并思想对子文件数据进行归并,并输出到最终文件。

在归并的时候,若内存有多,尽量预取数据。

6.3完整代码

产生待排序数据

 1   import   random
   2  
  3   def   generatekey(num):
   4      str =  []
   5       while  num >  0:
   6          str.append(chr(random.randint(97, 122 )))
   7          num = num - 1
  8      str.append( '  \n  '  )
   9       return   ''  .join(str)
  10  
 11   print   '  please enter the number  ' 
 12  N =  input()
  13  f=open( '  unsortdata  ' , '  w  '  )
  14   while  N >  0:
  15      str = generatekey(30 )
  16       f.write(str)
  17      N = N - 1
 18      
 19  f.close()

多线程排序

 1   import   threading
    2   import   time
    3   import   random
    4  
   5   class   MyThread(threading.Thread): 
    6  
   7       def   __init__  (self, file, data):
    8          threading.Thread. __init__  (self)
    9          self.file =  file
   10          self.data =  data
   11  
  12       def   run(self):
   13          last =  len(self.data)
   14          self.quickSort(0, last - 1 )
   15           for  j  in   range(last):
   16               self.file.write(self.data[j])
   17           self.file.close()
   18  
  19       def   quickSort(self, first, last):
   20           if  last - first > 7 :
   21              mid =  self.partition(first, last)
   22               if  first < mid - 1 :
   23                  self.quickSort(first, mid - 1 )
   24               if  mid + 1 <  last:
   25                  self.quickSort(mid + 1 , last)
   26           else  :
   27               self.insertSort(first, last)
   28  
  29       def   insertSort(self, first, last):
   30           for  i  in  range(first + 1, last + 1 ):
   31               if  self.data[i - 1] >  self.data[i]:
   32                  comp =  self.data[i]
   33                  low =  first
   34                  high = i - 1
  35                   while  low <=  high:
   36                      mid = (low + high) / 2
  37                       if  self.data[mid] <  comp:
   38                          low = mid + 1
  39                       else  :
   40                          high = mid - 1
  41                  j = i - 1
  42                   while  j >= high + 1 :
   43                      self.data[j + 1] =  self.data[j]
   44                      j = j - 1
  45                  self.data[high + 1] =  comp
   46  
  47       def   partition(self, first, last): 
   48          mid =  random.randint(first, last)
   49          comp =  self.data[mid]
   50           #  交换临界值和数组第一个值 
  51          temp =  self.data[first]
   52          self.data[first] =  comp
   53          self.data[mid] =  temp
   54           while  first <  last:
   55               while  first < last  and  self.data[last] >=  comp:
   56                  last = last - 1
  57               if  first <  last:
   58                  self.data[first] =  self.data[last]
   59                  first = first + 1
  60               while  first < last  and  self.data[first] <=  comp:
   61                  first = first + 1
  62               if  first <  last:
   63                  self.data[last] =  self.data[first]
   64                  last = last - 1
  65          self.data[first] =  comp
   66           return   first
   67  
  68   def   getbuffer(flist, i, X):
   69      temp =  []
   70       for  j  in   range(X):
   71          str =  flist[i].readline()
   72           if  str ==  ''  :
   73               break 
  74           temp.append(str)
   75       return   temp
   76  
  77   def   outerMergeSort(X, N, M):
   78      buffer =  []
   79      Mlist =  []
   80      flist =  []
   81      num =  0
   82       for  i  in   range(M):
   83          flist.append(open( '  sortdata  '  + str(i),  '  r  '  ))
   84           buffer.append(getbuffer(flist, i, X))
   85           Mlist.append([buffer[i][0], i])
   86           del   buffer[i][0]
   87      f = open( '  sortdata  ' ,  '  w  '  )        
   88       while (num < N  and  len(Mlist) >  0):
   89           Mlist.sort()
   90  
  91           f.write(Mlist[0][0])
   92          num = num + 1
  93          fro = Mlist[0][1 ]
   94           del   Mlist[0]
   95           #  here has a bug if X == 1 
  96           if  len(buffer[fro]) >  0:
   97               Mlist.append([buffer[fro][0], fro])
   98           if  len(buffer[fro]) <= 1 :
   99              buffer[fro] =  getbuffer(flist, fro, X)
  100           else  :
  101               del   buffer[fro][0]
  102  
 103       f.close()
  104       for  i  in   range(M):
  105           flist[i].close()
  106  
 107   if   __name__  ==  '  __main__  '  :
  108       #  N为待排序的总数据量 
 109       #  M为线程数,Sum为每个线程处理的数据量 
 110       #  X为每个子文件的预取数组大小 
 111       #  L为内存限制的数据量 
 112       print   '  please enter the number N  ' 
 113      N =  input()
  114       print   '  please enter the number M  ' 
 115      M =  input()
  116      Sum = N /  M
  117       #  print 'please enter the mem limit data' 
 118      L = 8 *  Sum 
  119      X = (L - M) /  M
  120      ThreadL = L /  Sum
  121      filename =  '  unsortdata  ' 
 122      f = open(filename,  '  r  '  )
  123      i =  0
  124       #  tlist存放线程 
 125      tlist =  []
  126      data =  []
  127      start =  time.time()
  128       while  i <  M:
  129           #  主线程从unsortdata中读取Sum行 
 130           for  j  in   range(Sum):
  131               data.append(f.readline())
  132           if  len(tlist) ==  ThreadL:
  133               tlist[0].join()
  134           #  新建子文件用于输出线程排序结果 
 135          file = open( '  sortdata  '  + str(i),  '  w+  '  )
  136           #  新建线程使用混合快速排序对data进行排序,将排序后的结果输出到flist[i]中 
 137          t =  MyThread(file,  data[:])
  138           tlist.append(t)
  139           t.start()
  140           if  len(tlist) ==  ThreadL:
  141               del   tlist[0]
  142          data =  []
  143          i = i + 1
 144  
 145       f.close()
  146       for  i  in   range(len(tlist)):
  147           tlist[i].join()
  148  
 149       #  归并外排序 
 150       outerMergeSort(X, N, M)
  151      finish =  time.time()
  152       print   '  total cost time:%s\n  '  % (finish - start)

6.4后期实验

可以做很多后期实验,但是时间有限

N=200000

M=1,T=12.854446888

M=2,T=8.58908414841

M=4,T=6.01675200462

M=8,T=4.41931986809

M=16,T=3.61740279198 内存限制8线程

这些都是一次实验,没求平均。

......

全文完

 

分类:  作业及项目

标签:  python ,  排序 ,  多线程

作者: Leo_wl

    

出处: http://www.cnblogs.com/Leo_wl/

    

本文版权归作者和博客园共有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出原文连接,否则保留追究法律责任的权利。

版权信息

查看更多关于基于内存共享的并行排序算法慢谈的详细内容...

  阅读:69次