F#中的異步及并行模式:代理的高級(jí)使用
本文我們會(huì)來探索F#函數(shù)式編程語言的異步及并行模式、交互式的代理,以及與代理有關(guān)的一些模式,包括隔離的內(nèi)部狀態(tài)。
消息與聯(lián)合類型
很多時(shí)候我們會(huì)使用聯(lián)合類型(Union Type)作為消息的類型。例如,我將要展示一個(gè)基于代理的DirectX示例,我們要在模擬引擎中使用如下的消息:
- type Message =
- | PleaseTakeOneStep
- | PleaseAddOneBall of Ball 模擬引擎中的代理:
- let simulationEngine =
- Agent.Start(fun inbox ->
- async { while true do
- // Wait for a message
- let! msg = inbox.Receive()
- // Process a message
- match msg with
- | PleaseTakeOneStep -> state.Transform moveBalls
- | PleaseAddOneBall ball -> state.AddObject ball })
在很多情況下使用強(qiáng)類型消息是個(gè)不錯(cuò)的做法。不過,在某些您需要和其他消息機(jī)制協(xié)作的時(shí)候,也無需擔(dān)心使用如“obj”和“string”等泛化的消息類型,此時(shí)代理只需要在運(yùn)行時(shí)進(jìn)行類型判斷或轉(zhuǎn)化即可。
參數(shù)化代理及抽象代理
代理只是F#編碼中的一種設(shè)計(jì)模式。這意味著您可以將F#中各種常用的技巧,如參數(shù)化,抽象或是代碼片段重用與代理一起使用。例如,您可以把之前的serveQuoteStream函數(shù)參數(shù)化,指定每條股票消息傳輸中的間隔時(shí)間:
- open System.Net.Sockets
- /// serve up a stream of quotes
- let serveQuoteStream (client: TcpClient, periodMilliseconds: int) = async {
- let stream = client.GetStream()
- while true do
- do! stream.AsyncWrite( "AAPL 439.2"B )
- do! Async.Sleep periodMilliseconds
- }
- 這意味著您的股票服務(wù)器中不同的請(qǐng)求可以擁有不同長(zhǎng)度的間隔。與此類似,您可以使用函數(shù)參數(shù),將整個(gè)代理類的功能進(jìn)行抽象:
- let iteratingAgent job =
- Agent.Start(fun inbox ->
- async { while true do
- let! msg = inbox.Receive()
- do! job msg })
- let foldingAgent job initialState =
- Agent.Start(fun inbox ->
- let rec loop state = async {
- let! msg = inbox.Receive()
- let! state = job state msg
- return! loop state
- }
- loop initialState)您可以這樣使用***個(gè)函數(shù):
- let agent1 = iteratingAgent (fun msg -> async { do printfn "got message '%s'" msg }) 及第二個(gè):
- let agent2 =
- foldingAgent (fun state msg ->
- async { if state % 1000 = 0 then printfn "count = '%d'" msg;
- return state + 1 }) 0 從代理返回結(jié)果
在以后的文章中,我們會(huì)討論一些訪問執(zhí)行中的代理的部分結(jié)果的技巧,例如,我們可以使用每個(gè)MailboxProcessor代理的PostAndAsyncReply方法。這樣的技巧在創(chuàng)建網(wǎng)絡(luò)通信代理時(shí)顯得尤其重要。
然而,這種做法很多時(shí)候有些過了,我們可能只是需要將結(jié)果匯報(bào)給一些如GUI般的監(jiān)視環(huán)境。匯報(bào)部分結(jié)果的簡(jiǎn)單方法之一,便是之前在第二篇文章中討論過的設(shè)計(jì)模式。下面便是這樣一個(gè)例子,它創(chuàng)建了一個(gè)代理,對(duì)每1000條消息進(jìn)行采樣,并將得到的事件分發(fā)給GUI或其他管理線程(請(qǐng)注意,其中用到了第二篇文章中SynchronizationContext的兩個(gè)擴(kuò)展方法CaptureCurrent和RaiseEvent)。
- // Receive messages and raise an event on each 1000th message
- type SamplingAgent() =
- // The event that is raised
- // Capture the synchronization context to allow us to raise events
- // back on the GUI thread
- let syncContext = SynchronizationContext.CaptureCurrent()
- // The internal mailbox processor agent
- let agent =
- new MailboxProcessor<_>(fun inbox ->
- async { let count = ref 0
- while true do
- let! msg = inbox.Receive()
- incr count
- if !count % 1000 = 0 then
- syncContext.RaiseEvent sample msg })
- /// Post a message to the agent
- member x.Post msg = agent.Post msg
- /// Start the agent
- member x.Start () = agent.Start()
- /// Raised every 1000'th message
- member x.Sample = sample.Publish您可以這樣使用代理:
- let agent = SamplingAgent()
- agent.Sample.Add (fun s -> printfn "sample: %s" s)
- agent.Start()
- for i = 0 to 10000 do
- agent.Post (sprintf "message %d" i) 與預(yù)料一致,這會(huì)報(bào)告agent的消息采樣:
- sample: message 999
- sample: message 1999
- sample: message 2999
- sample: message 3999
- sample: message 4999
- sample: message 5999
- sample: message 6999
- sample: message 7999
- sample: message 8999
- sample: message 9999
#p#
代理及錯(cuò)誤
我們都無法避免錯(cuò)誤和異常。良好的錯(cuò)誤檢測(cè),報(bào)告及記錄的措施是基于代理編程的基本要素。我們來看一下如何在F#的內(nèi)存代理(MailboxProcessor)中檢測(cè)和轉(zhuǎn)發(fā)錯(cuò)誤。
首先,F(xiàn)#異步代理的神奇之處在于異??梢杂蒩sync { ... }自動(dòng)捕獲及分發(fā),即使跨過多個(gè)異步等待及I/O操作。您也可以在async { ... }中使用try/with,try/finally及use關(guān)鍵字來捕獲異?;蜥尫刨Y源。這意味著我們只需要在代理中處理那些未捕獲的錯(cuò)誤即可。當(dāng)MailboxProcessor代理中出現(xiàn)未捕獲的異常時(shí)便會(huì)觸發(fā)Error事件。一個(gè)常見的模式是將所有的錯(cuò)誤轉(zhuǎn)發(fā)給一個(gè)監(jiān)視進(jìn)程,例如:
- type Agent<'T> = MailboxProcessor<'T>
- let supervisor =
- Agent<System.Exception>.Start(fun inbox ->
- async { while true do
- let! err = inbox.Receive()
- printfn "an error occurred in an agent: %A" err })
- let agent =
- new Agent<int>(fun inbox ->
- async { while true do
- let! msg = inbox.Receive()
- if msg % 1000 = 0 then
- failwith "I don't like that cookie!" })
- agent.Error.Add(fun error -> supervisor.Post error)
- agent.Start() 我們也可以很方便地并行這些配置操作:
- let agent =
- new Agent<int>(fun inbox ->
- async { while true do
- let! msg = inbox.Receive()
- if msg % 1000 = 0 then
- failwith "I don't like that cookie!" })
- |> Agent.reportErrorsTo supervisor
- |> Agent.start 或使用輔助模塊:
- module Agent =
- let reportErrorsTo (supervisor: Agent<exn>) (agent: Agent<_>) =
- agent.Error.Add(fun error -> supervisor.Post error); agent
- let start (agent: Agent<_>) = agent.Start(); agent
下面是一個(gè)例子,我們創(chuàng)建了10000個(gè)代理,其中某些會(huì)報(bào)告錯(cuò)誤:
- let supervisor =
- Agent<int * System.Exception>.Start(fun inbox ->
- async { while true do
- let! (agentId, err) = inbox.Receive()
- printfn "an error '%s' occurred in agent %d" err.Message agentId })
- let agents =
- [ for agentId in 0 .. 10000 ->
- let agent =
- new Agent<string>(fun inbox ->
- async { while true do
- let! msg = inbox.Receive()
- if msg.Contains("agent 99") then
- failwith "I don't like that cookie!" })
- agent.Error.Add(fun error -> supervisor.Post (agentId,error))
- agent.Start()
- (agentId, agent) ]我們發(fā)送消息:
- for (agentId, agent) in agents do
- agent.Post (sprintf "message to agent %d" agentId ) 便可看到:
- an error 'I don't like that cookie!' occurred in agent 99
- an error 'I don't like that cookie!' occurred in agent 991
- an error 'I don't like that cookie!' occurred in agent 992
- an error 'I don't like that cookie!' occurred in agent 993
- ...
- an error 'I don't like that cookie!' occurred in agent 999
這一節(jié)我們處理了F#內(nèi)存中的MailboxProcessor代理發(fā)生的錯(cuò)誤。其他一些代理(例如,表示服務(wù)器端請(qǐng)求的代理)也可以這樣進(jìn)行設(shè)計(jì)與架構(gòu),以便進(jìn)行優(yōu)雅的錯(cuò)誤轉(zhuǎn)發(fā)及重試。
總結(jié)
隔離的代理是一種常用的編程模式,它不斷運(yùn)用在各種編程領(lǐng)域中,從設(shè)備驅(qū)動(dòng)編程到用戶界面,還包括分布式編程及高度伸縮的通信服務(wù)器。每次您編寫了一個(gè)對(duì)象,線程或是異步工作程序,用于處理一個(gè)長(zhǎng)時(shí)間的通信(如向聲卡發(fā)送數(shù)據(jù),從網(wǎng)絡(luò)讀取數(shù)據(jù),或是響應(yīng)一個(gè)輸入的事件流),您其實(shí)就是在編寫一種代理。每次您在寫一個(gè)ASP.NET網(wǎng)頁處理程序時(shí),其實(shí)您也在使用一種形式的代理(每次調(diào)用時(shí)都重置狀態(tài))。在各種情況下,隔離與通信有關(guān)的狀態(tài)是很常見的需求。
隔離的代理是一種最終的實(shí)現(xiàn)方式──例如,實(shí)現(xiàn)可伸縮的編程算法,包括可伸縮的請(qǐng)求服務(wù)器及分布式編程算法。與其他各種異步及并發(fā)編程模式一樣,它們也不能被濫用。然而,他們是一種優(yōu)雅、強(qiáng)大且高效的技術(shù),使用非常廣泛。
F#是一個(gè)獨(dú)特的,隨Visual Studio 2010一同出現(xiàn)的托管語言,完整支持輕量級(jí)的異步計(jì)算及內(nèi)存種的代理。在F#中,異步代理可以通過組合的形式編寫,而不用使用回調(diào)函數(shù)或控制反轉(zhuǎn)等方式。這里有些權(quán)衡的地方──例如:在以后的文章中,我們會(huì)觀察如何使用.NET類庫中標(biāo)準(zhǔn)的APM模式來釋放您的代理。然而,優(yōu)勢(shì)也是很明顯的:易于控制,伸縮性強(qiáng),并且在需要的時(shí)候,便可以在組織起CPU和I/O并行操作的同時(shí),保持CPU密集型代碼在.NET中的完整性能。
當(dāng)然,也有其他一些.NET或基于JVM的語言支持輕量級(jí)的交互式代理──早前,有人認(rèn)為這在.NET是“不可能”的事情,因?yàn)榫€程的代價(jià)十分昂貴。而如今,F(xiàn)#在2007年引入了“async { ... }”,這被視為語言設(shè)計(jì)上的一個(gè)突破──它讓程序員可以在一個(gè)被業(yè)界廣泛認(rèn)可的編程平臺(tái)上構(gòu)建輕量級(jí)、組合式的異步編程及交互式的代理。除了Axum語言原型(它也受了F#的影響)之外,F(xiàn)#還證明了一個(gè)異步語言特性是一個(gè)完全可行的方法,這也解放了如今業(yè)界運(yùn)行時(shí)系統(tǒng)設(shè)計(jì)領(lǐng)域的一個(gè)爭(zhēng)論話題:我們是否要將線程做得輕量?
文章轉(zhuǎn)自老趙的博客,
原文地址:http://blog.zhaojie.me/2010/03/async-and-parallel-design-patterns-in-fsharp-3-more-agents.html
【編輯推薦】
- 詳解F#異步及并行模式中的輕量級(jí)代理
- 詳解F#異步及并行模式中的并行CPU及I/O計(jì)算
- TechED 09視頻專訪:F#與函數(shù)式編程語言
- F#中DSL原型設(shè)計(jì):語法檢查和語義分析
- 大話F#和C#:是否會(huì)重蹈C#失敗的覆轍?