F#中關(guān)于代理的基本使用
中輕量級的,交互式的代理,以及與代理有關(guān)的一些模式,包括“隔離的內(nèi)部狀態(tài)”是F#語言中的重要部分。首先我們來觀察您所創(chuàng)建的第一個異步代理:
- type Agent<'T> = MailboxProcessor<'T>
- let agent =
- Agent.Start(fun inbox ->
- async { while true do
- let! msg = inbox.Receive()
- printfn "got message '%s'" msg } )
這個代理不斷地異步等待消息,并將它們打印出來。在這段代碼中,每個消息都是一個字符串,且agent的類型是:
- agent.Post "hello!"這便會打印出:
- got message 'hello!'也可以這樣發(fā)送多條消息:
- for i in 1 .. 10000 do
- agent.Post (sprintf "message %d" i)這樣便可以打印出10000條消息。
您可以認(rèn)為每個代理對象都包含一個消息隊列(或管道),并在消息到達(dá)時進(jìn)行響應(yīng)。一個委托一般都使用異步的循環(huán)等待來消息并進(jìn)行處理。如在上面的例子中,代理使用while循環(huán)進(jìn)行處理。許多讀者可能已經(jīng)對代理頗為熟悉了。如Erlang,它便是基于代理設(shè)計的(在那里被稱為進(jìn)程)。而不久之前,一個基于.NET平臺的實驗性的孵化型語言,Axum,也注重了基于代理編程的重要性。Axum與F#中的代理設(shè)計相互影響,而其他包含輕量級線程的語言也強(qiáng)調(diào)了基于代理的組合與設(shè)計。
上面的例子一開始創(chuàng)建了一個類型的縮寫:Agent,它代表了F#類庫中基于內(nèi)存的代理類型“MailboxProcessor”。如果您愿意的話也可以使用這個完整的名字,不過我更喜歡簡單的命名。
您的第一批10萬個代理
代理對象非常輕量,這是因為它基于F#的異步編程模型。例如,您可以在一個.NET進(jìn)程中創(chuàng)建成百上千,甚至更多個代理。例如,我們來創(chuàng)建10萬個簡單的代理對象:
- let agents =
- [ for i in 0 .. 100000 ->
- Agent.Start(fun inbox ->
- async { while true do
- let! msg = inbox.Receive()
- if i % 10000 = 0 then
- printfn "agent %d got message '%s'" i msg } ) ]您可以這樣向每個代理對象發(fā)送消息:
- for agent in agents do
- agent.Post "ping!
每第1萬個代理對象會在收到消息時打印信息。這個代理集合在處理消息時非常迅速,只要幾秒鐘時間。代理和內(nèi)存中的消息處理非???。很顯然,代理并不與.NET線程直接對應(yīng)──您不可能在單個應(yīng)用程序中創(chuàng)建10萬的線程(在32位操作系統(tǒng)中,即便1000個線程也已經(jīng)太多了)。相反,在代理等待消息時,它實際上只是表現(xiàn)為一個回調(diào)函數(shù),一些對象分配,以及代理所引用的閉包等等。在收到消息之后,代理的工作會在一個線程池(默認(rèn)便是.NET線程池)中分配并執(zhí)行。盡管需要10萬個代理的情況并不多見,不過2000多個代理倒是很正常的。接下來我們便會看到這樣一些例子。
高伸縮的Web服務(wù)器處理請求
在F#編程中,異步代理的思想其實是一種在多個環(huán)境中反復(fù)出現(xiàn)的設(shè)計模式。在F#中,我們經(jīng)常使用“代理”這個詞表示一種隨時發(fā)生的,特別是通過循環(huán),或是處理消息,或是產(chǎn)生結(jié)果的異步計算。
例如,在以后的文章中,我們會來關(guān)注如何使用F#構(gòu)建伸縮性強(qiáng)的TCP或HTTP服務(wù)器應(yīng)用程序,并將它們部署到EC2或是Windows Azure中去。這里我們打算用“股票服務(wù)器”作為例子,它接受TCP或HTTP連接,并向客戶端返回一系列的股票信息。每個客戶端會每隔一秒鐘收到一條股票信息。這個服務(wù)最終會以單個URL或REST API的形式發(fā)布。在實現(xiàn)時,我們?yōu)槊總€客戶端請求分配一個異步代理(由于只是演示,我們在這里便不斷地寫入相同的AAPL股票信息):
- open System.Net.Sockets
- /// serve up a stream of quotes
- let serveQuoteStream (client: TcpClient) = async {
- let stream = client.GetStream()
- while true do
- do! stream.AsyncWrite( "AAPL 200.38"B )
- do! Async.Sleep 1000.0 // sleep one second}
每個代理會一直運行到客戶端連接斷開。因為代理非常輕量,因此這個股票服務(wù)能夠在一臺機(jī)器上支持?jǐn)?shù)千個并發(fā)連接(如果使用云托管服務(wù)則會有更好的伸縮性)。而同一時刻會出現(xiàn)多少個代理對象則取決于客戶端的數(shù)量。
上面的例子演示了使用F#進(jìn)行網(wǎng)絡(luò)編程是多么的方便──網(wǎng)絡(luò)協(xié)議在此變成了基于異步代理的數(shù)據(jù)流讀寫。在以后的文章中我們會觀察更多使用F#進(jìn)行伸縮性強(qiáng)的TCP/HTTP編程。#p#
代理與隔離狀態(tài)(命令式)
F#代理編程的一個優(yōu)秀的關(guān)鍵之處便是其隔離性。隔離性則意味著資源“歸屬”與某個特定的代理,而不會暴露給其他代理。因此,獨立狀態(tài)對并發(fā)的訪問及數(shù)據(jù)競爭是一種良好的保護(hù)。
在F#中,異步代理的獨立性直接表現(xiàn)為文法上的作用域。例如,下面的代碼使用一個字典來累計發(fā)送至代理對象的不同消息的次數(shù)。內(nèi)部的字典在文法上是異步代理私有的,因此我們無法在代理外部對字典進(jìn)行讀寫。這意味著字典的可變狀態(tài)實際上是被隔離的,代理保證了對它的非并發(fā)的安全訪問。
- type Agent<'T> = MailboxProcessor<'T>
- open System.Collections.Generic
- let agent =
- Agent.Start(fun inbox ->
- async { let strings = Dictionary<string,int>()
- while true do
- let! msg = inbox.Receive()
- if strings.ContainsKey msg then
- strings.[msg] <- strings.[msg] + 1
- else
- strings.[msg] <- 0
- printfn "message '%s' now seen '%d' times" msg strings.[msg] } )
狀態(tài)隔離是F#的基本特性,它并不是代理所獨有的。例如,下面的代碼對StreamReader和ResizeArray(這是F#中對System.Collections.Generics.List的別稱)的隔離訪問。請注意這段代碼和.NET類庫中的System.IO.File.ReadAllLines方法功能相同:
- let readAllLines (file:string) =
- use reader = new System.IO.StreamReader(file)
- let lines = ResizeArray<_>()
- while not reader.EndOfStream do
- lines.Add (reader.ReadLine())
- lines.ToArray()
在這里,reader和ResizeArray對象都無法在函數(shù)外部使用。在代理或其他持續(xù)計算的情況里,隔離性是至關(guān)重要的──狀態(tài)在這里永遠(yuǎn)獨立于程序運行中的其他部分。說到底,隔離性是個動態(tài)的屬性,它經(jīng)常受到文法的約束。例如,考慮這樣一個延遲的,隨需加載的讀取器,它會讀取文件中的所有行:
- let readAllLines (file:string) =
- seq { use reader = new System.IO.StreamReader(file)
- while not reader.EndOfStream do
- yield reader.ReadLine() }
隔離狀態(tài)經(jīng)常包含可變的值,包括F#中的引用單元。例如,下面的代碼在一個引用單元中不斷累計接受到的消息個數(shù):
- let agent =
- Agent.Start(fun inbox ->
- async { let count = ref 0
- while true do
- let! msg = inbox.Receive()
- incr count
- printfn "now seen a total of '%d' messages" !count } )
再次強(qiáng)調(diào),這里可變的狀態(tài)被隔離了,確保了對它單線程的安全訪問。#p#
在代理中使用循環(huán)和隔離狀態(tài)(函數(shù)式)
F#程序員了解兩種實現(xiàn)循環(huán)的方法:使用“命令式”的while/for以及可變的累加器(ref或mutable),或是“函數(shù)式”風(fēng)格的調(diào)用,將累加狀態(tài)作為參數(shù)傳遞給一個或多個遞歸函數(shù)。例如,計算文件行數(shù)的程序可以使用命令式的方式來寫:
- let countLines (file:string) =
- use reader = new System.IO.StreamReader(file)
- let count = ref 0
- while not reader.EndOfStream do
- reader.ReadLine() |> ignore
- incr count
- !count或是遞歸式的:
- let countLines (file:string) =
- use reader = new System.IO.StreamReader(file)
- let rec loop n =
- if reader.EndOfStream then n
- else
- reader.ReadLine() |> ignore
- loop (n+1)
- loop 0
在使用時,兩種方式都是可行的:函數(shù)式的做法相對更加高級一些,但是大大減少了代碼中顯式的狀態(tài)修改次數(shù),且更為通用。兩種寫法對于F#程序員來說,一般都可以理解,他們還可以將“while”循環(huán)轉(zhuǎn)化為等價的“let rec”函數(shù)(這是個不錯的面試問題?。?/p>
有趣的是,在編寫異步循環(huán)時的規(guī)則也一樣:您可以使用命令式的“while”或函數(shù)式的“let rec”中的任意一種來定義循環(huán)。例如,這里有一個利用遞歸的異步函數(shù)統(tǒng)計消息數(shù)量的做法:
- let agent =
- Agent.Start(fun inbox ->
- let rec loop n = async {
- let! msg = inbox.Receive()
- printfn "now seen a total of %d messages" (n+1)
- return! loop (n+1)
- }
- loop 0 )這樣我們便可以獲得這樣的輸出:
- now seen a total of 0 messages
- now seen a total of 1 messages
- ....
- now seen a total of 10000 messages再提一次,定義代理對象的兩種常見模式為命令式的:
- let agent =
- Agent.Start(fun inbox ->
- async {
- // isolated imperative state goes here
- ...
- while <condition> do
- // read messages and respond
- ...
- })及函數(shù)式的:
- let agent =
- Agent.Start(fun inbox ->
- let rec loop arg1 arg2 = async {
- // receive and process messages here
- ...
- return! loop newArg1 newArg2
- }
- loop initialArg1 initialArg2)
再次強(qiáng)調(diào),兩種方法在F#都是合理的──使用遞歸的異步函數(shù)可能是更高級的方法,但更為函數(shù)式且更為通用。
文章轉(zhuǎn)自老趙的博客,
原文鏈接:http://blog.zhaojie.me/2010/03/async-and-parallel-design-patterns-in-fsharp-3-agents.html
【編輯推薦】