詳解F#異步及并行模式中的并行CPU及I/O計算
51CTO將持續(xù)關(guān)注函數(shù)式編程語言F#的發(fā)展,今天將為大家講述的是F#異步及并行模式中的并行CPU及I/O計算。
最后還是忍不住翻譯文章了。這系列的文章談?wù)摰氖荈#中常見的異步及并行模式,作者為F#語言的主要設(shè)計者Don Syme。異步相關(guān)的編程是F#語言中最重要的優(yōu)勢之一(我甚至在考慮“之一”兩個字能否去掉)。F#是一門非常有特色的語言,是一門能夠開闊眼界,改變您編程思路的語言,它經(jīng)過了幾年設(shè)計以及多個預(yù)覽之后終于要正式露面了——此刻不上,更待何時。
介紹
F#是一門并行(parallel)及響應(yīng)式(reactive)語言。這個說法意味著一個F#程序可以存在多個進(jìn)行中的運(yùn)算(如使用.NET線程進(jìn)行F#計算),或是多個等待中的回應(yīng)(如等待事件或消息的回調(diào)函數(shù)及代理對象)。
F#的異步表達(dá)式是簡化異步及響應(yīng)式程序編寫的方式之一。在這篇及今后的文章中,我會探討一些使用F#進(jìn)行異步編程的基本方式──大致說來,它們都是F#異步編程時使用的模式。這里我假設(shè)您已經(jīng)掌握了async的基本使用方式,如入門指南中的內(nèi)容。
我們從兩個簡單的設(shè)計模式開始:CPU異步并行(Parallel CPU Asyncs)和I/O異步并行(Paralle I/O Asyncs)。
本系列的第2部分描述了如何從異步計算或后臺計算單元中獲得結(jié)果。
第3部分則描述了F#中輕量級的,響應(yīng)式的,各自獨(dú)立的代理對象。
模式1:CPU異步并行
首先來了解第一個模式:CPU異步并行,這意味著并行地開展一系列的CPU密集型計算。下面的代碼計算的是斐波那契數(shù)列,它會將這些計算進(jìn)行并行地調(diào)配:
- let rec fib x = if x <= 2 then 1 else fib(x-1) + fib(x-2)
- let fibs =
- Async.Parallel [ for i in 0..40 -> async { return fib(i) } ]
- |> Async.RunSynchronously
結(jié)果是:
- val fibs : int array =
- [|1; 1; 2; 3; 5; 8; 13; 21; 34; 55; 89; 144; 233; 377; 610; 987; 1597; 2584;
- 4181; 6765; 10946; 17711; 28657; 46368; 75025; 121393; 196418; 317811;
- 514229; 832040; 1346269; 2178309; 3524578; 5702887; 9227465; 14930352;
- 24157817; 39088169; 63245986; 102334155|]
上面的代碼展示了并行CPU異步計算模式的要素:
1.“async { … }”用于指定一系列的CPU任務(wù)。
2.這些任務(wù)使用Async.Parallel進(jìn)行fork-join式的組合。
在這里,我們使用Async.RunSynchronously方法來執(zhí)行組合后的任務(wù),這會啟動一個異步任務(wù),并同步地等待其最后結(jié)果。您可以使用這個模式來完成各種CPU并行(例如對矩陣乘法進(jìn)行劃分和并行計算)或是批量處理任務(wù)。
模式2:I/O異步并行
現(xiàn)在我們已經(jīng)展示了在F#中進(jìn)行CPU密集型并行編程的方式。F#異步編程的重點(diǎn)之一,便是可以用相同的方式進(jìn)行CPU和I/O密集型的計算。這便是我們的第二種模式:I/O異步并行,即同時開展多個I/O操作(也被稱為overlapped I/O)。例如下面的代碼便并行地請求多個Web頁面,并響應(yīng)每個請求的回復(fù),再返回收集到的結(jié)果。
- open System
- open System.Net
- open Microsoft.FSharp.Control.WebExtensions
- let http url =
- async { let req = WebRequest.Create(Uri url)
- use! resp = req.AsyncGetResponse()
- use stream = resp.GetResponseStream()
- use reader = new StreamReader(stream)
- let contents = reader.ReadToEnd()
- return contents }
- let sites = ["http://www.bing.com";
- "http://www.google.com";
- "http://www.yahoo.com";
- "http://www.search.com"]
- let htmlOfSites =
- Async.Parallel [for site in sites -> http site ]
- |> Async.RunSynchronously
上面的代碼示例展示了I/O異步并行模式的基礎(chǔ):
1.“async { … }”用于編寫任務(wù),其中包含了一些異步I/O。
2.這些任務(wù)使用Async.Parallel進(jìn)行fork-join式的組合。
在這里,我們使用Async.RunSynchronously方法來執(zhí)行組合后的任務(wù),這會啟動一個異步任務(wù),并同步地等待其最后結(jié)果。
使用let!(或與它類似的資源釋放指令use!)是進(jìn)行異步操作的基礎(chǔ)方法。例如:
- let! resp = req.AsyncGetResponse()
上面這行代碼會“響應(yīng)”一個HTTP GET操作所得到的回復(fù),即async { … }在AsyncGetResponse操作完成之后的部分。然而,在等待響應(yīng)的過程中并不會阻塞任何.NET或操作系統(tǒng)的線程:只有活動的CPU密集型運(yùn)算會使用下層的.NET或操作系統(tǒng)線程。與此不同,等待中的響應(yīng)操作(例如回調(diào)函數(shù),事件處理程序和代理對象)資源占用非常少,幾乎只相當(dāng)于一個注冊好的對象而已。因此,您可以同時擁有數(shù)千個甚至數(shù)百萬個等待中的響應(yīng)操作。例如,一個典型的GUI應(yīng)用程序會注冊一些事件處理程序,而一個典型Web爬蟲會為每個發(fā)出的請求注冊一個回調(diào)函數(shù)。
在上面的代碼中,我們使用了“use!”而不是“let!”,這表示W(wǎng)eb請求相關(guān)的資源會在變量超出字面的作用域之后得到釋放。
I/O并行的美妙之處在于其伸縮性。在多核的環(huán)境下,如果您可以充分利用計算資源,則通常會獲得2倍、4倍甚至8倍的性能提高。而在I/O并行編程中,您可以同時進(jìn)行成百上千的I/O操作(不過實(shí)際的并行效果還要取決于您的操作系統(tǒng)和網(wǎng)絡(luò)連接狀況),這意味著10倍、100倍、1000倍甚至更多的性能增強(qiáng)──而這一切在一臺單核的機(jī)器上也可以實(shí)現(xiàn)。例如,這里有一個使用F#異步功能的示例,而最終它們可以在一個IronPython應(yīng)用程序中使用。
許多現(xiàn)代應(yīng)用程序都是I/O密集型應(yīng)用,因此這些設(shè)計模式在實(shí)踐中都有很重要的意義。
始于GUI線程,終于GUI線程
這兩個設(shè)計模式有個重要的變化,這便是使用Async.StartWithContinuations來代替Async.RunSynchronously方法。在一個并行操作開啟之后,您可以指定三個函數(shù),分別在它成功、失敗或取消時調(diào)用。
對于諸如“我想要獲得一個異步操作的結(jié)果,但我不能使用RunSynchronously方法”之類的問題,您便應(yīng)該考慮:
1.使用let!(或use!)把這個異步操作作為更大的異步任務(wù)的一部分,
2.使用Async.StartWithContinuations方法執(zhí)行異步操作
在那些需要在GUI線程上發(fā)起異步操作的場景中,Async.StartWithContinuations方法尤其有用。因為,您不會因此阻塞住GUI線程,而且可以在異步操作完成后直接進(jìn)行GUI的更新。例如,在F# JAOO Tutorial的BingTranslator示例中便使用了這個做法──您可以在本文結(jié)尾瀏覽它的完整代碼,不過這里最值得關(guān)注的部分則是在點(diǎn)擊“Translate”按鈕之后發(fā)生的事情:
- button.Click.Add(fun args ->
- let text = textBox.Text
- translated.Text <- "Translating..."
- let task =
- async { let! languages = httpLines languageUri
- let! fromLang = detectLanguage text
- let! results = Async.Parallel [for lang in languages -> translateText (text, fromLang, lang)]
- return (fromLang,results) }
- Async.StartWithContinuations(
- task,
- (fun (fromLang,results) ->
- for (toLang, translatedText) in results do
- translated.Text <- translated.Text + sprintf "\r\n%s --> %s: \"%s\"" fromLang toLang translatedText),
- (fun exn -> MessageBox.Show(sprintf "An error occurred: %A" exn) |> ignore),
- (fun cxn -> MessageBox.Show(sprintf "A cancellation error ocurred: %A" cxn) |> ignore)))
高亮的部分,尤其是在async塊里的部分,展示了使用Async.Parallel將一種語言并行地翻譯成多種語言的做法。這個異步組合操作由Async.StartWithContinuations發(fā)起,它會在遇到第一個I/O操作時立即返回(譯注:存疑,為什么是在遇上I/O操作才返回?),并指定了三個函數(shù),分別在異步操作的成功,失敗或取消時調(diào)用。以下是任務(wù)完成后的截圖(不過在此不保證翻譯的準(zhǔn)確性……):
Async.StartWithContinuations有一個重要的特性:如果異步操作由GUI線程發(fā)起(例如一個SynchronizationContext.Current不為null的線程),那么操作完成后的回調(diào)函數(shù)也是在GUI線程中調(diào)用的。這使GUI更新操作變的十分安全。F#異步類庫允許您組合多個I/O任務(wù),并在GUI線程中直接使用,而無需您親自從后臺線程中更新GUI元素。在以后的文章中我們會進(jìn)行更詳細(xì)地解釋。
關(guān)于Async.Parallel工作方式:
在執(zhí)行時,由Async.Parallel組合而成的異步操作會通過一個等待計算的隊列來逐步發(fā)起。與大部分進(jìn)行異步處理的類庫一樣,它在內(nèi)部使用的是QueueUserWorkItem方法。當(dāng)然,我們也有辦法使用分離的隊列,在以后的文章中我們會進(jìn)行一些討論。
Async.Parallel方法并沒有什么神奇之處,您也完全可以使用Microsoft.FSharp.Control.Async類庫中的其他原語來定義您自己的異步組合方式──例如Async.StartChild方法。我們會在以后的文章中討論這個話題。
更多示例
在F# JAOO Tutorial包含多個使用這些模式的示例代碼:
BingTranslator.fsx與BingTranslatorShort.fsx:使用F#調(diào)用REST API,它們與其他基于Web的HTTP服務(wù)的調(diào)用方式十分類似。文末包含了示例的完整代碼。
AsyncImages.fsx:并行磁盤I/O及圖像處理。
PeriodicTable.fsx:調(diào)用一個Web服務(wù),并行地獲取原子質(zhì)量。
本文模式的限制
上文介紹的兩個并行模式有一些限制。很明顯,使用Async.Parallel生成的異步操作在執(zhí)行時十分“安靜”──比方說,它們無法返回進(jìn)度或部分的結(jié)果。為此,我們需要構(gòu)建一個更為“豐富”的對象,它會在部分操作完成之后觸發(fā)一些事件。在以后的文章中我們會來關(guān)注這樣的設(shè)計模式。
此外,Async.Parallel只能處理固定數(shù)量的任務(wù)。在以后的文章中,我們會遇到很多一邊處理一邊生成任務(wù)的情況。換個方式來看,即Async.Parallel無法處理即時獲得的消息──例如,除了取消任務(wù)之外,一個代理對象的工作進(jìn)度是可以得到控制的。
總結(jié)
CPU異步并行與I/O異步并行,是F#異步編程中最為簡單的兩種設(shè)計模式,而簡單的事物往往也是非常重要而強(qiáng)大的。請注意,兩種模式的不同之處,僅僅在于I/O并行使用了包含了I/O請求的async塊,以及一些額外的CPU任務(wù),如創(chuàng)建請求對象及后續(xù)處理。
在今后的文章里,我們會關(guān)注F#中其他一些并行及響應(yīng)式編程方面的設(shè)計方式,包括:
- 從GUI線程中發(fā)起異步操作
- 定義輕量級異步代理對象
- 使用async定義后臺工作程序
- 使用async構(gòu)建.NET任務(wù)
- 使用async調(diào)用.NET的APM模式
- 取消異步操作
BingTranslator代碼示例
以下是BingTranslator的示例代碼,在運(yùn)行時您需要申請一個Live API 1.1 AppID。請注意,這個示例需要根據(jù)Bing API 2.0進(jìn)行適當(dāng)調(diào)整,至少在2.0中已經(jīng)不包含這里的語言檢測API了──不過這些代碼仍然是不錯的示例:
- open System
- open System.Net
- open System.IO
- open System.Drawing
- open System.Windows.Forms
- open System.Text
- /// A standard helper to read all the lines of a HTTP request. The actual read of the lines is
- /// synchronous once the HTTP response has been received.
- let httpLines (uri:string) =
- async { let request = WebRequest.Create uri
- use! response = request.AsyncGetResponse()
- use stream = response.GetResponseStream()
- use reader = new StreamReader(stream)
- let lines = [ while not reader.EndOfStream do yield reader.ReadLine() ]
- return lines }
- type System.Net.WebRequest with
- /// An extension member to write content into an WebRequest.
- /// The write of the content is synchronous.
- member req.WriteContent (content:string) =
- let bytes = Encoding.UTF8.GetBytes content
- req.ContentLength <- int64 bytes.Length
- use stream = req.GetRequestStream()
- stream.Write(bytes,0,bytes.Length)
- /// An extension member to read the content from a response to a WebRequest.
- /// The read of the content is synchronous once the response has been received.
- member req.AsyncReadResponse () =
- async { use! response = req.AsyncGetResponse()
- use responseStream = response.GetResponseStream()
- use reader = new StreamReader(responseStream)
- return reader.ReadToEnd() }
- #load @"C:\fsharp\staging\docs\presentations\2009-10-04-jaoo-tutorial\BingAppId.fs"
- //let myAppId = "please set your Bing AppId here"
- /// The URIs for the REST service we are using
- let detectUri = "http://api.microsofttranslator.com/V1/Http.svc/Detect?appId=" + myAppId
- let translateUri = "http://api.microsofttranslator.com/V1/Http.svc/Translate?appId=" + myAppId + "&"
- let languageUri = "http://api.microsofttranslator.com/V1/Http.svc/GetLanguages?appId=" + myAppId
- let languageNameUri = "http://api.microsofttranslator.com/V1/Http.svc/GetLanguageNames?appId=" + myAppId
- /// Create the user interface elements
- let form = new Form (Visible=true, TopMost=true, Height=500, Width=600)
- let textBox = new TextBox (Width=450, Text="Enter some text", Font=new Font("Consolas", 14.0F))
- let button = new Button (Text="Translate", Left = 460)
- let translated = new TextBox (Width = 590, Height = 400, Top = 50, ScrollBars = ScrollBars.Both, Multiline = true, Font=new Font("Consolas", 14.0F))
- form.Controls.Add textBox
- form.Controls.Add button
- form.Controls.Add translated
- /// An async method to call the language detection API
- let detectLanguage text =
- async { let request = WebRequest.Create (detectUri, Method="Post", ContentType="text/plain")
- do request.WriteContent text
- return! request.AsyncReadResponse() }
- /// An async method to call the text translation API
- let translateText (text, fromLang, toLang) =
- async { let uri = sprintf "%sfrom=%s&to=%s" translateUri fromLang toLang
- let request = WebRequest.Create (uri, Method="Post", ContentType="text/plain")
- request.WriteContent text
- let! translatedText = request.AsyncReadResponse()
- return (toLang, translatedText) }
- button.Click.Add(fun args ->
- let text = textBox.Text
- translated.Text <- "Translating..."
- let task =
- async { /// Get the supported languages
- let! languages = httpLines languageUri
- /// Detect the language of the input text. This could be done in parallel with the previous step.
- let! fromLang = detectLanguage text
- /// Translate into each language, in parallel
- let! results = Async.Parallel [for lang in languages -> translateText (text, fromLang, lang)]
- /// Return the results
- return (fromLang,results) }
- /// Start the task. When it completes, show the results.
- Async.StartWithContinuations(
- task,
- (fun (fromLang,results) ->
- for (toLang, translatedText) in results do
- translated.Text <- translated.Text + sprintf "\r\n%s --> %s: \"%s\"" fromLang toLang translatedText),
- (fun exn -> MessageBox.Show(sprintf "An error occurred: %A" exn) |> ignore),
- (fun cxn -> MessageBox.Show(sprintf "A cancellation error ocurred: %A" cxn) |> ignore)))