詳解F#異步及并行模式中的輕量級(jí)代理
F#目前還有些待字閨中的意思,不過隨著大家對(duì)F#了解的加深。希望更多的程序員能運(yùn)用好F#。在此之前,我們51CTO曾報(bào)道過《詳解F#異步及并行模式中的并行CPU及I/O計(jì)算》
在本系列的第3部分中,我們會(huì)來探索F#中輕量級(jí)的,交互式的代理,以及與代理有關(guān)的一些模式,包括“隔離的內(nèi)部狀態(tài)”。(譯注:由于原文較長,因此譯文分為兩段,目前是第一段,講解了F#中異步代理的基本使用方式。)
第1部分描述了F#作為一個(gè)并行及異步語言,是如何支持輕量級(jí)的響應(yīng)操作,并給出了CPU異步并行和I/O異步并行兩種模式。
第2部分描述了如何從異步計(jì)算或后臺(tái)計(jì)算單元中獲得結(jié)果。
模式4:您的第一個(gè)代理
我們來觀察您所創(chuàng)建的第一個(gè)異步代理:
- type Agent<'T> = MailboxProcessor<'T>
- let agent =
- Agent.Start(fun inbox ->
- async { while true do
- let! msg = inbox.Receive()
- printfn "got message '%s'" msg } )
這個(gè)代理不斷地異步等待消息,并將它們打印出來。在這段代碼中,每個(gè)消息都是一個(gè)字符串,且agent的類型是:
agent.Post "hello!"這便會(huì)打印出:
got message 'hello!'也可以這樣發(fā)送多條消息:
for i in 1 .. 10000 do
agent.Post (sprintf "message %d" i)
這樣便可以打印出10000條消息。
您可以認(rèn)為每個(gè)代理對(duì)象都包含一個(gè)消息隊(duì)列(或管道),并在消息到達(dá)時(shí)進(jìn)行響應(yīng)。一個(gè)委托一般都使用異步的循環(huán)等待來消息并進(jìn)行處理。如在上面的例子中,代理使用while循環(huán)進(jìn)行處理。
許多讀者可能已經(jīng)對(duì)代理頗為熟悉了。如Erlang,它便是基于代理設(shè)計(jì)的(在那里被稱為進(jìn)程)。而不久之前,一個(gè)基于.NET平臺(tái)的實(shí)驗(yàn)性的孵化型語言,Axum,也注重了基于代理編程的重要性。Axum與F#中的代理設(shè)計(jì)相互影響,而其他包含輕量級(jí)線程的語言也強(qiáng)調(diào)了基于代理的組合與設(shè)計(jì)。
上面的例子一開始創(chuàng)建了一個(gè)類型的縮寫:Agent,它代表了F#類庫中基于內(nèi)存的代理類型“MailboxProcessor”。如果您愿意的話也可以使用這個(gè)完整的名字,不過我更喜歡簡單的命名。
您的第一批10萬個(gè)代理
代理對(duì)象非常輕量,這是因?yàn)樗贔#的異步編程模型。例如,您可以在一個(gè).NET進(jìn)程中創(chuàng)建成百上千,甚至更多個(gè)代理。例如,我們來創(chuàng)建10萬個(gè)簡單的代理對(duì)象:
- let agents =
- [ for i in 0 .. 100000 ->
- Agent.Start(fun inbox ->
- async { while true do
- let! msg = inbox.Receive()
- if i % 10000 = 0 then
- printfn "agent %d got message '%s'" i msg } ) ]
您可以這樣向每個(gè)代理對(duì)象發(fā)送消息:
for agent in agents do
agent.Post "ping!每第1萬個(gè)代理對(duì)象會(huì)在收到消息時(shí)打印信息。這個(gè)代理集合在處理消息時(shí)非常迅速,只要幾秒鐘時(shí)間。代理和內(nèi)存中的消息處理非???。
很顯然,代理并不與.NET線程直接對(duì)應(yīng)──您不可能在單個(gè)應(yīng)用程序中創(chuàng)建10萬的線程(在32位操作系統(tǒng)中,即便1000個(gè)線程也已經(jīng)太多了)。相反,在代理等待消息時(shí),它實(shí)際上只是表現(xiàn)為一個(gè)回調(diào)函數(shù),一些對(duì)象分配,以及代理所引用的閉包等等。在收到消息之后,代理的工作會(huì)在一個(gè)線程池(默認(rèn)便是.NET線程池)中分配并執(zhí)行。
盡管需要10萬個(gè)代理的情況并不多見,不過2000多個(gè)代理倒是很正常的。接下來我們便會(huì)看到這樣一些例子。
高伸縮的Web服務(wù)器處理請(qǐng)求
在F#編程中,異步代理的思想其實(shí)是一種在多個(gè)環(huán)境中反復(fù)出現(xiàn)的設(shè)計(jì)模式。在F#中,我們經(jīng)常使用“代理”這個(gè)詞表示一種隨時(shí)發(fā)生的,特別是通過循環(huán),或是處理消息,或是產(chǎn)生結(jié)果的異步計(jì)算。
例如,在以后的文章中,我們會(huì)來關(guān)注如何使用F#構(gòu)建伸縮性強(qiáng)的TCP或HTTP服務(wù)器應(yīng)用程序,并將它們部署到EC2或是Windows Azure中去。這里我們打算用“股票服務(wù)器”作為例子,它接受TCP或HTTP連接,并向客戶端返回一系列的股票信息。每個(gè)客戶端會(huì)每隔一秒鐘收到一條股票信息。這個(gè)服務(wù)最終會(huì)以單個(gè)URL或REST API的形式發(fā)布。
在實(shí)現(xiàn)時(shí),我們?yōu)槊總€(gè)客戶端請(qǐng)求分配一個(gè)異步代理(由于只是演示,我們?cè)谶@里便不斷地寫入相同的AAPL股票信息):
- open System.Net.Sockets
- /// serve up a stream of quotes
- let serveQuoteStream (client: TcpClient) = async {
- let stream = client.GetStream()
- while true do
- do! stream.AsyncWrite( "AAPL 200.38"B )
- do! Async.Sleep 1000.0 // sleep one second}
每個(gè)代理會(huì)一直運(yùn)行到客戶端連接斷開。因?yàn)榇矸浅]p量,因此這個(gè)股票服務(wù)能夠在一臺(tái)機(jī)器上支持?jǐn)?shù)千個(gè)并發(fā)連接(如果使用云托管服務(wù)則會(huì)有更好的伸縮性)。而同一時(shí)刻會(huì)出現(xiàn)多少個(gè)代理對(duì)象則取決于客戶端的數(shù)量。
上面的例子演示了使用F#進(jìn)行網(wǎng)絡(luò)編程是多么的方便──網(wǎng)絡(luò)協(xié)議在此變成了基于異步代理的數(shù)據(jù)流讀寫。在以后的文章中我們會(huì)觀察更多使用F#進(jìn)行伸縮性強(qiáng)的TCP/HTTP編程。
代理與隔離狀態(tài)(命令式)
F#代理編程的一個(gè)優(yōu)秀的關(guān)鍵之處便是其隔離性。隔離性則意味著資源“歸屬”與某個(gè)特定的代理,而不會(huì)暴露給其他代理。因此,獨(dú)立狀態(tài)對(duì)并發(fā)的訪問及數(shù)據(jù)競爭是一種良好的保護(hù)。
在F#中,異步代理的獨(dú)立性直接表現(xiàn)為文法上的作用域。例如,下面的代碼使用一個(gè)字典來累計(jì)發(fā)送至代理對(duì)象的不同消息的次數(shù)。內(nèi)部的字典在文法上是異步代理私有的,因此我們無法在代理外部對(duì)字典進(jìn)行讀寫。這意味著字典的可變狀態(tài)實(shí)際上是被隔離的,代理保證了對(duì)它的非并發(fā)的安全訪問。
- type Agent<'T> = MailboxProcessor<'T>
- open System.Collections.Generic
- let agent =
- Agent.Start(fun inbox ->
- async { let strings = Dictionary<string,int>()
- while true do
- let! msg = inbox.Receive()
- if strings.ContainsKey msg then
- strings.[msg] <- strings.[msg] + 1
- else
- strings.[msg] <- 0
- printfn "message '%s' now seen '%d' times" msg strings.[msg] } )
狀態(tài)隔離是F#的基本特性,它并不是代理所獨(dú)有的。例如,下面的代碼對(duì)StreamReader和ResizeArray(這是F#中對(duì)System.Collections.Generics.List的別稱)的隔離訪問。請(qǐng)注意這段代碼和.NET類庫中的System.IO.File.ReadAllLines方法功能相同:
- let readAllLines (file:string) =
- use reader = new System.IO.StreamReader(file)
- let lines = ResizeArray<_>()
- while not reader.EndOfStream do
- lines.Add (reader.ReadLine())
- lines.ToArray()
在這里,reader和ResizeArray對(duì)象都無法在函數(shù)外部使用。在代理或其他持續(xù)計(jì)算的情況里,隔離性是至關(guān)重要的──狀態(tài)在這里永遠(yuǎn)獨(dú)立于程序運(yùn)行中的其他部分。
說到底,隔離性是個(gè)動(dòng)態(tài)的屬性,它經(jīng)常受到文法的約束。例如,考慮這樣一個(gè)延遲的,隨需加載的讀取器,它會(huì)讀取文件中的所有行:
- let readAllLines (file:string) =
- seq { use reader = new System.IO.StreamReader(file)
- while not reader.EndOfStream do
- yield reader.ReadLine() }
隔離狀態(tài)經(jīng)常包含可變的值,包括F#中的引用單元。例如,下面的代碼在一個(gè)引用單元中不斷累計(jì)接受到的消息個(gè)數(shù):
- let agent =
- Agent.Start(fun inbox ->
- async { let count = ref 0
- while true do
- let! msg = inbox.Receive()
- incr count
- printfn "now seen a total of '%d' messages" !count } )
再次強(qiáng)調(diào),這里可變的狀態(tài)被隔離了,確保了對(duì)它單線程的安全訪問。
在代理中使用循環(huán)和隔離狀態(tài)(函數(shù)式)
F#程序員了解兩種實(shí)現(xiàn)循環(huán)的方法:使用“命令式”的while/for以及可變的累加器(ref或mutable),或是“函數(shù)式”風(fēng)格的調(diào)用,將累加狀態(tài)作為參數(shù)傳遞給一個(gè)或多個(gè)遞歸函數(shù)。例如,計(jì)算文件行數(shù)的程序可以使用命令式的方式來寫:
- let countLines (file:string) =
- use reader = new System.IO.StreamReader(file)
- let count = ref 0
- while not reader.EndOfStream do
- reader.ReadLine() |> ignore
- incr count
- !count或是遞歸式的:
- let countLines (file:string) =
- use reader = new System.IO.StreamReader(file)
- let rec loop n =
- if reader.EndOfStream then n
- else
- reader.ReadLine() |> ignore
- loop (n+1)
- loop 0
在使用時(shí),兩種方式都是可行的:函數(shù)式的做法相對(duì)更加高級(jí)一些,但是大大減少了代碼中顯式的狀態(tài)修改次數(shù),且更為通用。兩種寫法對(duì)于F#程序員來說,一般都可以理解,他們還可以將“while”循環(huán)轉(zhuǎn)化為等價(jià)的“let rec”函數(shù)(這是個(gè)不錯(cuò)的面試問題!)。
有趣的是,在編寫異步循環(huán)時(shí)的規(guī)則也一樣:您可以使用命令式的“while”或函數(shù)式的“let rec”中的任意一種來定義循環(huán)。例如,這里有一個(gè)利用遞歸的異步函數(shù)統(tǒng)計(jì)消息數(shù)量的做法:
- let agent =
- Agent.Start(fun inbox ->
- let rec loop n = async {
- let! msg = inbox.Receive()
- printfn "now seen a total of %d messages" (n+1)
- return! loop (n+1)
- }
- loop 0 )
這樣我們便可以獲得這樣的輸出:
now seen a total of 0 messages
now seen a total of 1 messages
....
now seen a total of 10000 messages
再提一次,定義代理對(duì)象的兩種常見模式為命令式的:
- let agent =
- Agent.Start(fun inbox ->
- async {
- // isolated imperative state goes here
- ...
- while <condition> do
- // read messages and respond
- ...
- })
及函數(shù)式的:
- let agent =
- Agent.Start(fun inbox ->
- let rec loop arg1 arg2 = async {
- // receive and process messages here
- ...
- return! loop newArg1 newArg2
- }
- loop initialArg1 initialArg2)
再次強(qiáng)調(diào),兩種方法在F#都是合理的──使用遞歸的異步函數(shù)可能是更高級(jí)的方法,但更為函數(shù)式且更為通用。
原文標(biāo)題:F#中的異步及并行模式(3 - 上):代理
鏈接:http://www.cnblogs.com/JeffreyZhao/archive/2010/03/15/async-and-parallel-design-patterns-in-fsharp-3-agents.html
【F#教程回顧】