Erlang并行快速排序?qū)崙?zhàn)
本節(jié)我將用Erlang多進程方式實現(xiàn)快速排序,快速排序采用的是分治的思想,即將原問題分解為若干個規(guī)模更小但結(jié)構(gòu)與原問題相似的子問題。遞歸地解這些子問題,然后將這些子問題的解組合為原問題的解。通過對比快速排序的串行算法,我們將此串行算法改進為并行算法。
首先,來看看快速排序的串行算法:
1.從數(shù)列中挑出一個元素,稱為 "基準"(pivot);
2.重新排序數(shù)列,所有元素比基準值小的擺放在基準前面,所有元素比基準值大的擺在基準的后面(相同的數(shù)可以到任一邊)。在這個分割結(jié)束之后,該基準就處于數(shù)列的中間位置。這個稱為分割(partition)操作;
3.遞歸地(recursive)把小于基準值元素的子數(shù)列和大于基準值元素的子數(shù)列排序。
此思想網(wǎng)上到處都有,現(xiàn)在來看看快速排序的并行算法:
其中的一個簡單思想是,對每次劃分過后得到的兩個序列分別使用兩個處理器完成遞歸排序。例如對一個長為n的序列,首先劃分得到兩個長為n/2的序列,將其交給兩個處理器分別處理;而后進一步劃分得到四個長為n/4的序列,再分別交給四個處理器處理:如此遞歸下去最終得到排序好的序列。
當然,這里是理想的劃分情況,如果劃分步驟不能達到平均分配的目的,那么排序效率會相對較差。
跟上節(jié)講得并行枚舉排序進程調(diào)用的區(qū)別在于,枚舉排序一次性將數(shù)據(jù)傳給所有節(jié)點進程,而快速排序進程調(diào)用在分割數(shù)據(jù)的過程中呈現(xiàn)的是樹狀形式。
并行快排的偽代碼如下:
- //快速排序的并行算法
- 輸入:無序數(shù)組data[1,n],使用的處理器個數(shù)為2^m
- 輸出:有序數(shù)組data[1,n]
- Begin
- para_quicksort(data,1,n,m,0)
- End
- procedure para_quicksort(data,i,j,m,id)
- Begin
- if (j-i) <=k or m=0 then
- Pid call quicksort(data,i,j)
- else
- Pid:r = patition(data,i,j)
- Pid send data[r+1,m-1] to Pid+2 ^(m-1)
- para_quicksort(data,i,r-1,m-1,id)
- par_quicksort(data,r+1,j,m-1,id+2^(m-1) )
- Pid+2 ^ (m-1) send data[r+1,m-1] back to Pid
- end if
- End
現(xiàn)在,就來看看Erlang代碼實現(xiàn)并行快排:
設置啟動接口,啟動多個進程之后,在總控端調(diào)用Dict = store(['node1@pc1305','node2@pc1305']...)存儲各節(jié)點進程的字典表,然后調(diào)用start([3,4,512,1,2,5,……],2,Dict)進行快排,其中第二參數(shù)表示需要最多2^2=4個進程來完成任務,總控端也作為一個排序的節(jié)點。
- %% 啟動排序時,需存儲各節(jié)點信息:使用Dict = store(['node1@pc1305','node2@pc1305']...)方法
- %%然后,便可啟動start(Data,M,Dict) 進行排序,M為啟動的進程個數(shù)指標,Dict為上述存儲的節(jié)點
- -module(para_qsort).
- -export([para_qsort/3,start/3,store/1,lookup/2]).
- store(L) -> store(L,[]).
- store([],Ret) -> Ret;
- store(L,Ret) ->
- Value = lists:nth(1,L),
- Key = length(Ret)+1,
- io:format("Key=~p Value=~p~n",[Key,Value]),
- New = [{Key,Value} | Ret],
- store(lists:delete(Value,L) , New).
- lookup(Key,Dict) ->
- {K,V} = lists:nth(1,Dict),
- if
- K =:= Key ->
- V;
- K =/= Key ->
- Filter = lists:delete({K,V},Dict),
- lookup(Key,Filter)
- end.
- start(Data,M,Dict) ->
- register(monitor, spawn(fun() -> wait([]) end)),
- put(monitor, node()),
- NewDict = [{monitor, get(monitor)} | Dict],
- para_qsort(Data,M,NewDict).
wait/1函數(shù)主要對各節(jié)點排序好的結(jié)果進行歸并整理,然后輸出最終結(jié)果,代碼如下
- %%服務器節(jié)點監(jiān)聽
- wait(Ret) ->
- Len1 = parser(Ret)+length(Ret)-1,
- Len2 = len(Ret),
- if
- (Len1 =:= Len2) and (Len2 =/=0) ->
- SortRet= merge(Ret,[],1),
- io:format("Para qsort result:~n",[]),
- io:format("~p~n",[SortRet]);
- (Len1 =/= Len2) or (Len2 =:= 0) ->
- receive
- {Node,Pid,L} ->
- {Data,I,J} = L,
- Temp = [{Node,Data,I,J}|Ret],
- wait(Temp)
- end;
- die ->
- quit
- end.
- len([]) -> 0;
- len([H|T])->
- {Node,Data,I,J} = H,
- length(Data).
- parser([]) -> 0;
- parser([H|T]) ->
- {Node,Data,I,J}=H,
- J-I+1+parser(T).
- merge([],Ret,Len) -> Ret;
- merge(T,Ret,Start) ->
- H = lists:nth(1,T),
- {Node,Data,I,J} = H,
- if
- I =:= Start ->
- ListBetween = lists:sublist(Data,I, J-I+1),
- case (I=:=1) of
- true ->
- Temp = lists:append(Ret,ListBetween);
- false ->
- Pivo = lists:append(Ret,[lists:nth(I-1,Data)]),
- Temp = lists:append(Pivo,ListBetween)
- end,
- io:format("~n-----<< ~p >> processing data[~p,~p]~n-------------------------Result=~p~n",[Node,I,J,Temp]),
- Len = Start+J-I+2,
- NewData = lists:delete(H,T),
- merge(NewData,Temp,Len);
- I =/= Start ->
- NewData = lists:delete(H,T),
- Temp = lists:append(NewData,[H]),
- merge(Temp,Ret,Start)
- end.
para_qsort完成對排序任務的分發(fā),代碼如下:
- para_qsort(Data,M,Dict) ->
- %%進程個數(shù)最多為2^M次方
- I = 1,
- J = length(Data),
- ID = 0,
- para_qsort(Data,I,J,M,ID,Dict).
- para_qsort(Data,I,J, M, ID,Dict) ->
- %% 閥值,列表排序個數(shù)小于K時直接調(diào)用qsort直接快排
- K = 4,
- Value1 = (J-I) < K,
- Value2 = (M =:= 0),
- Value = Value1 or Value2,
- if
- Value =:= true ->
- %io:format("~nCurrent node: (~p) ~nData= ~p I=~p J=~p~n",[node(),Data,I,J]),
- ListBefore = lists:sublist(Data, I-1),
- ListBetween = lists:sublist(Data,I, J-I+1),
- ListAfter = lists:sublist(Data,J+1,length(Data)-J),
- Ret = lists:sort(ListBetween),
- Temp = lists:append(ListBefore,Ret),
- NewData = lists:append(Temp, ListAfter),
- {monitor, lookup(monitor,Dict) } ! {node(),self(),{NewData,I,J} },
- io:format("-------------------------------------------------------------");
- Value =:= false ->
- {NewData,R} = partition(Data,I,J),
- send(NewData, R+1, J, M-1,ID + math:pow(2 ,(M-1)), Dict ),
- para_qsort(NewData,I,R-1,M-1,ID, Dict)
- end.
其中patition用于將data按基準值劃分為兩部分,代碼如下:
- partition(Data, K, L) ->
- Pivo= lists:nth(L, Data),
- ListBefore = lists:sublist(Data, K-1),
- ListBetween = lists:sublist(Data,K, L-K),
- ListAfter = lists:sublist(Data,L+1,length(Data)-L),
- Left = [X|| X <- ListBetween,X =<Pivo],
- Right = [X || X <- ListBetween,X > Pivo],
- Ret = lists:append(Left,[Pivo|Right]),
- Temp = lists:append(ListBefore,Ret),
- NewData = lists:append(Temp, ListAfter),
- Len = length(ListBefore)+length(Left)+1,
- {NewData, Len}.
任務分割完后,會將此任務發(fā)送給下個節(jié)點進程進行處理,send代碼如下:
- send( Data, I, J, M,No ,Dict) ->
- ID = trunc(No),
- Node = lookup(ID,Dict),
- Pid = spawn(Node, fun() -> loop() end),
- Pid ! {node(),self(), {Data,I,J,M,ID,Dict}}.
- %%客戶節(jié)點N準備接受排序數(shù)據(jù)
- loop() ->
- receive
- {Node,Pid,die} ->
- disconnect_node(Node),
- io:format("Node (~p) disconnected~n",[Node]),
- quit;
- {Node,Pid,L} ->
- {Data,I,J,M,ID,Dict} = L,
- %io:format("4----:Current node:~p server node:~p~n",[node(),lookup(monitor,Dict)]),
- para_qsort(Data,I,J,M,ID,Dict)
- %loop()
- end.
思想比較簡單,我只是沒對它進行進一步優(yōu)化,最終運行結(jié)果如下:
大家可以看到,此data的劃分就是不均勻的,node1、node3只處理了一個數(shù)據(jù),node2處理了3個數(shù)據(jù),其余的都是server處理的,因此排序的效率問題很差。此程序只是展示如何使用Erlang實現(xiàn)并行快排,并不是學習如何優(yōu)化問題。
大致就說這么多了,以后我將實現(xiàn)PSRS并行算法的Erlang實現(xiàn),這個算法Erlang實現(xiàn)起來比較麻煩,大家可以先去看看。
原文:http://www.cnblogs.com/itfreer/archive/2012/05/14/Erlang_in_practise_quick-sort.html
【編輯推薦】