F#并行排序算法輕松實(shí)現(xiàn)
F#并行排序算法中其中有一種比較常見的方法就是先把要處理的數(shù)據(jù)分成若干份,然后讓不同的線程(CPU)去處理,然后所有的線程處理完成后,把結(jié)果匯聚在一起,在一個(gè)獨(dú)立的線程里完成結(jié)果合并,從而形成最終結(jié)果。在做分割的時(shí)候盡量讓每個(gè)線程只訪問自己獨(dú)立的數(shù)據(jù),而不訪問全局?jǐn)?shù)據(jù)和其它線程的數(shù)據(jù)(這里說的數(shù)據(jù)是非只讀數(shù)據(jù)),在合并結(jié)果的時(shí)候要有一種高效的算法來合并。
排序算法里有歸并排序算法,我們先寫一個(gè)多路歸并排序算法,然后把要排序的數(shù)組分成CPU的個(gè)數(shù)份,讓每個(gè)CPU去對(duì)每一份進(jìn)行排序,所有線程排序完成后匯聚在一起,在一個(gè)獨(dú)立的線程里進(jìn)行歸并排序。
大概再解釋一下代碼,可能有些人對(duì)F#還不熟悉。
1、歸并算法的思路就是把多個(gè)已經(jīng)排序的數(shù)組合并成一個(gè)大的排序數(shù)組,先從每個(gè)分?jǐn)?shù)組的最小下標(biāo)開始,誰都最小就放到大數(shù)組里,然后這個(gè)數(shù)組的下標(biāo)加一,然后再比較,再把最小的放到大數(shù)組里,重復(fù),直到所有的小數(shù)組的下標(biāo)已經(jīng)指向到末尾。其中會(huì)用到一個(gè)臨時(shí)變量min,所以用mutable關(guān)鍵字修飾。
2、F#的數(shù)組的長(zhǎng)度用Array.length方法得出,變量和數(shù)組的賦值符號(hào)是<-,而不是=,=相當(dāng)于c#里的==,f#里沒有continue和break等關(guān)鍵字
3、async關(guān)鍵字是一個(gè)新的并行基元,用它擴(kuò)住的代碼由f#自動(dòng)的異步在線程池里執(zhí)行,如果里面要返回結(jié)果的話,要用let!和return!關(guān)鍵字,我們的排序只是對(duì)數(shù)組進(jìn)行操作,并不返回,所以這里比較簡(jiǎn)單。
4、(fun a b -> a - b)是一個(gè)lamda表達(dá)式,它可以自動(dòng)轉(zhuǎn)換成Comparer
5、Array.map表示把一個(gè)數(shù)組里的每個(gè)元素應(yīng)用一個(gè)方法,它這時(shí)候不執(zhí)行,會(huì)通過管道傳遞給Async.Parallel方法,Async.Parallel方法返回一個(gè)異步執(zhí)行數(shù)組Async<'a array>,最后用Async.Run來真正執(zhí)行Async.Parallel返回的結(jié)果。
6、|>表示管道的意思,大致就是把前一個(gè)函數(shù)的結(jié)果讓后一個(gè)函數(shù)來用,這樣一條語句可以表達(dá)很連貫的邏輯。
F#并行排序算法整體代碼如下:
1 #light
2
3 open System
4 open System.Diagnostics
5 open Microsoft.FSharp.Control.CommonExtensions
6
7 let merge_sort destArray source cmp =
8 let N = Array.length source
9 let L = Array.length destArray - 1
10 let posArr = Array.create N 0
11 for i = 0 to L do
12 let mutable min = -1
13 for j = 0 to N - 1 do
14 if posArr.[j] >= Array.length source.[j] then ()
15 else
16 if min = -1 then min <- j
17 else
18 if (cmp source.[j].[posArr.[j]] source.[min].[posArr.[min]]) < 0 then min <- j
19 if min = -1 then ()
20 else
21 destArray.[i] <- source.[min].[posArr.[min]]
22 posArr.[min] <- posArr.[min] + 1
23
24 let parallel_sort cmp arr =
25 let processorCount = Environment.ProcessorCount;
26 let partArray = Array.create processorCount [||]
27 let mutable remain = Array.length arr
28 let partLen = Array.length arr / processorCount
29
30 for i = 0 to processorCount - 1 do
31 if i = processorCount - 1 then
32 let temp_arr = Array.create remain 0
33 Array.Copy(arr, i*partLen, temp_arr, 0, remain)
34 partArray.[i] <- temp_arr
35 else
36 let temp_arr = Array.create partLen 0
37 Array.Copy(arr, i*partLen, temp_arr, 0, partLen)
38 remain <- remain - partLen
39 partArray.[i] <- temp_arr
40
41 let a_sort_one arr =
42 async {
43 Array.sort cmp arr
44 }
45
46 let a_sort_all =
47 partArray
48 |> Array.map (fun f -> a_sort_one f)
49 |> Async.Parallel
50 |> Async.Run
51
52 a_sort_all
53 let ret = Array.create (Array.length arr) 0
54 merge_sort ret partArray (fun a b -> a - b)
55 ret
56
57 let arr = Array.create 1000000 0
58 let rnd = new Random()
59 for i = 0 to Array.length arr - 1 do
60 arr.[i] <- rnd.Next()
61
62 let stop = Stopwatch.StartNew()
63 stop.Start
64 let sorted_arr = parallel_sort (fun a b -> a-b) arr
65 stop.Stop
66 printfn "并行排序結(jié)果\r\n=%A\r\n用時(shí)%d毫秒" sorted_arr stop.ElapsedMilliseconds
67
68 let stop2 = Stopwatch.StartNew()
69 Array.sort (fun a b -> a-b) arr
70 stop.Stop
71 printfn "串行排序結(jié)果\r\n=%A\r\n用時(shí)%d毫秒" arr stop2.ElapsedMilliseconds
72
73 Console.ReadKey(true)
我本機(jī),IBM X200測(cè)試串行排序大約在1200多秒,并行排序在900秒左右。
F#并行排序算法相關(guān)鏈接:
從簡(jiǎn)單的F# 表達(dá)式構(gòu)建并發(fā)應(yīng)用程序
http://msdn.microsoft.com/zh-cn/magazine/cc967279.aspx
Visual Studio 2010將正式包含F(xiàn)#
http://developer.51cto.com/art/200812/103775.htm
本文來自蛙蛙王子的博客園文章《蛙蛙推薦:F#實(shí)現(xiàn)并行排序算法》
【編輯推薦】