F#中的異步及并行模式:反饋進度的事件
這里我們會使用這個設計模式開發(fā)一個示例,分析F#函數式編程語言中的反饋進度事件,其中部分示例代碼來自于F# JAOO Tutorial。
我們先來看一下這個設計模式的一個基礎示例。在下面的代碼中,我們會定義一個對象,以此來協(xié)調一組同時執(zhí)行的異步任務。每個任務在結束之后會主動匯報它的結果,而不是等待統(tǒng)一的收集過程。
- type AsyncWorker<'T>(jobs: seq<Async<'T>>) =
- // This declares an F# event that we can raise
- let jobCompleted = new Event<int * 'T>()
- /// Start an instance of the work
- member x.Start() =
- // Capture the synchronization context to allow us to raise events back on the GUI thread
- let syncContext = SynchronizationContext.CaptureCurrent()
- // Mark up the jobs with numbers
- let jobsjobs = jobs |> Seq.mapi (fun i job -> (job,i+1))
- let work =
- Async.Parallel
- [ for (job,jobNumber) in jobs ->
- async { let! result = job
- syncContext.RaiseEvent jobCompleted (jobNumber,result)
- return result } ]
- Async.Start(work |> Async.Ignore)
- /// Raised when a particular job completes
- member x.JobCompleted = jobCompleted.Publish
設計模式的一些關鍵之處已經使用黃色進行高亮:在對象的Start方法中,我們在GUI線程中捕獲了當前的“同步上下文”,這使得我們可以從GUI的上下文中運行代碼或觸發(fā)事件。我們還定義了一個私有的輔助函數來觸發(fā)任意的F#事件,這雖不必須但可以使我們的代碼變的更為整潔。定義了多個事件。這些事件作為屬性發(fā)布,如果該對象還需要被其他.NET語言使用,則為它標記一個[<CLIEvent>]屬性。
我們這里通過指定一個定義了任務內容的異步工作流來啟動后臺任務。Async.Start可以用來啟動這個工作流(雖然Async.StartWithContinuations更為常用,例如在后面的示例中)。在后臺任務產生進度之后,便會在合適的時候觸發(fā)這些事件。這段代碼使用了兩個基于System.Threading.SynchronizationContext的輔助方法,它們會在這個系列的文章中多次出現。如下:
- type SynchronizationContext with
- /// A standard helper extension method to raise an event on the GUI thread
- member syncContext.RaiseEvent (event: Event<_>) args =
- syncContext.Post((fun _ -> event.Trigger args),state=null)
- /// A standard helper extension method to capture the current synchronization context.
- /// If none is present, use a context that executes work in the thread pool.
- static member CaptureCurrent () =
- match SynchronizationContext.Current with
- | null -> new SynchronizationContext()
- | ctxt -> ctxt
- 您現在便可以使用這個組件來管理一系列CPU密集型異步任務:
- let rec fib i = if i < 2 then 1 else fib (i-1) + fib (i-2)
- let worker =
- new AsyncWorker<_>( [ for i in 1 .. 100 -> async { return fib (i % 40) } ] )
- worker.JobCompleted.Add(fun (jobNumber, result) ->
- printfn "job %d completed with result %A" jobNumber result)
- worker.Start()
在執(zhí)行時,每個任務結束之后便會匯報結果:
- job 1 completed with result 1
- job 2 completed with result 2
- ...
- job 39 completed with result 102334155
- job 77 completed with result 39088169
- job 79 completed with result 102334155
我們可以使用多種方式讓后臺運行的任務匯報結果。在90%的情況下最簡單的便是上面的方法:在GUI(或ASP.NET的Page_Load)線程中觸發(fā).NET事件。這個技巧隱藏了后臺線程的使用細節(jié),并利用了所有.NET程序員都非常熟悉的標準.NET慣例,以此保證用于實現并行編程的技術都得到了有效的封裝。
匯報異步I/O的進度
反饋進度的事件模式也可以用在異步I/O操作上。例如這里有一系列I/O任務:
- open System.IO
- open System.Net
- open Microsoft.FSharp.Control.WebExtensions
- /// Fetch the contents of a web page, asynchronously.
- let httpAsync(url:string) =
- async { let req = WebRequest.Create(url)
- use! resp = req.AsyncGetResponse()
- use stream = resp.GetResponseStream()
- use reader = new StreamReader(stream)
- let text = reader.ReadToEnd()
- return text }
- let urls =
- [ "http://www.live.com";
- "http://news.live.com";
- "http://www.yahoo.com";
- "http://news.yahoo.com";
- "http://www.google.com";
- "http://news.google.com"; ]
- let jobs = [ for url in urls -> httpAsync url ]
- let worker = new AsyncWorker<_>(jobs)
- worker.JobCompleted.Add(fun (jobNumber, result) ->
- printfn "job %d completed with result %A" jobNumber result.Length)
- worker.Start()
在執(zhí)行過程中便會反饋進度結果,表現為每個Web頁面的長度:
- job 5 completed with result 8521
- job 6 completed with result 155767
- job 3 completed with result 117778
- job 1 completed with result 16490
- job 4 completed with result 175186
- job 2 completed with result 70362
#p#
反饋多種不同事件的任務
在這個設計模式中,我們使用了一個對象來封裝和監(jiān)督異步組合任務的執(zhí)行過程,即使我們需要豐富API,也可以輕松地添加多個事件。例如,以下的代碼添加了額外的事件來表示所有的任務已經完成了,或是其中某個任務出現了錯誤,還有便是整個組合完成之前便成功地取消了任務。以下高亮的代碼便展示了事件的聲明,觸發(fā)及發(fā)布:
- open System
- open System.Threading
- open System.IO
- open Microsoft.FSharp.Control.WebExtensions
- type AsyncWorker<'T>(jobs: seq<Async<'T>>) =
- // Each of these lines declares an F# event that we can raise
- let allCompleted = new Event<'T[]>()
- let error = new Event<System.Exception>()
- let canceled = new Event<System.OperationCanceledException>()
- let jobCompleted = new Event<int * 'T>()
- let cancellationCapability = new CancellationTokenSource()
- /// Start an instance of the work
- member x.Start() =
- // Capture the synchronization context to allow us to raise events back on the GUI thread
- let syncContext = SynchronizationContext.CaptureCurrent()
- // Mark up the jobs with numbers
- let jobsjobs = jobs |> Seq.mapi (fun i job -> (job,i+1))
- let work =
- Async.Parallel
- [ for (job,jobNumber) in jobs ->
- async { let! result = job
- syncContext.RaiseEvent jobCompleted (jobNumber,result)
- return result } ]
- Async.StartWithContinuations
- ( work,
- (fun res -> raiseEventOnGuiThread allCompleted res),
- (fun exn -> raiseEventOnGuiThread error exn),
- (fun exn -> raiseEventOnGuiThread canceled exn ),
- cancellationCapability.Token)
- member x.CancelAsync() =
- cancellationCapability.Cancel()
- /// Raised when a particular job completes
- member x.JobCompleted = jobCompleted.Publish
- /// Raised when all jobs complete
- member x.AllCompleted = allCompleted.Publish
- /// Raised when the composition is cancelled successfully
- member x.Canceled = canceled.Publish
- /// Raised when the composition exhibits an error
- member x.Error = error.Publish我們可以使用最普通的做法來響應這些額外的事件,例如:
- let worker = new AsyncWorker<_>(jobs)
- worker.JobCompleted.Add(fun (jobNumber, result) ->
- printfn "job %d completed with result %A" jobNumber result.Length)
- worker.AllCompleted.Add(fun results ->
- printfn "all done, results = %A" results )
- worker.Start()
如上,這個監(jiān)視中異步工作流可以支持任務的取消操作。反饋進度的事件模式可用于相當部分需要全程匯報進度的場景。在下一個示例中,我們使用這個模式來封裝后臺對于一系列Twitter采樣消息的讀取操作。運行這個示例需要一個Twitter帳號和密碼。在這里只會發(fā)起一個事件,如果需要的話您也可以在某些情況下發(fā)起更多事件。F# JAOO Tutorial中也包含了這個示例。
- // F# Twitter Feed Sample using F# Async Programming and Event processing
- //
- #r "System.Web.dll"
- #r "System.Windows.Forms.dll"
- #r "System.Xml.dll"
- open System
- open System.Globalization
- open System.IO
- open System.Net
- open System.Web
- open System.Threading
- open Microsoft.FSharp.Control.WebExtensions
- /// A component which listens to tweets in the background and raises an
- /// event each time a tweet is observed
- type TwitterStreamSample(userName:string, password:string) =
- let tweetEvent = new Event<_>()
- let streamSampleUrl = "http://stream.twitter.com/1/statuses/sample.xml?delimited=length"
- /// The cancellation condition
- let mutable group = new CancellationTokenSource()
- /// Start listening to a stream of tweets
- member this.StartListening() =
- // Capture the synchronization context to allow us to raise events back on the GUI thread
- // Capture the synchronization context to allow us to raise events back on the GUI thread
- let syncContext = SynchronizationContext.CaptureCurrent()
- /// The background process
- let listener (syncContext: SynchronizationContext) =
- async { let credentials = NetworkCredential(userName, password)
- let req = WebRequest.Create(streamSampleUrl, Credentials=credentials)
- use! resp = req.AsyncGetResponse()
- use stream = resp.GetResponseStream()
- use reader = new StreamReader(stream)
- let atEnd = reader.EndOfStream
- let rec loop() =
- async {
- let atEnd = reader.EndOfStream
- if not atEnd then
- let sizeLine = reader.ReadLine()
- let size = int sizeLine
- let buffer = Array.zeroCreate size
- let _numRead = reader.ReadBlock(buffer,0,size)
- let text = new System.String(buffer)
- syncContext.RaiseEvent tweetEvent text
- return! loop()
- }
- return! loop() }
- Async.Start(listener, group.Token)
- /// Stop listening to a stream of tweets
- member this.StopListening() =
- group.Cancel();
- group <- new CancellationTokenSource()
- /// Raised when the XML for a tweet arrives
- member this.NewTweet = tweetEvent.Publish在Twitter的標準采樣消息流中每出現一條消息便會觸發(fā)一個事件,并同時提供消息的內容。我們可以這樣監(jiān)聽事件流:
- let userName = "..." // set Twitter user name here
- let password = "..." // set Twitter user name here
- let twitterStream = new TwitterStreamSample(userName, password)
- twitterStream.NewTweet
- |> Event.add (fun s -> printfn "%A" s)
- twitterStream.StartListening()
- twitterStream.StopListening()
#p#
程序運行后便會不斷打印出每條消息的XML數據。您可以從Twitter API頁面中來了解采樣消息流的使用方式。如果您想同時解析這些消息,以下便是這一工作的示例代碼。不過,也請關注Twitter API頁面中的指導準則。例如,如果需要構建一個高可靠性的系統(tǒng),您最好在處理前進行保存,或是使用消息隊列。
- #r "System.Xml.dll"
- #r "System.Xml.Linq.dll"
- open System.Xml
- open System.Xml.Linq
- let xn (s:string) = XName.op_Implicit s
- /// The results of the parsed tweet
- type UserStatus =
- { UserName : string
- ProfileImage : string
- Status : string
- StatusDate : DateTime }
- /// Attempt to parse a tweet
- let parseTweet (xml: string) =
- let document = XDocument.Parse xml
- let node = document.Root
- if node.Element(xn "user") <> null then
- Some { UserName = node.Element(xn "user").Element(xn "screen_name").Value;
- ProfileImage = node.Element(xn "user").Element(xn "profile_image_url").Value;
- Status = node.Element(xn "text").Value |> HttpUtility.HtmlDecode;
- StatusDate = node.Element(xn "created_at").Value |> (fun msg ->
- DateTime.ParseExact(msg, "ddd MMM dd HH:mm:ss +0000 yyyy",
- CultureInfo.CurrentCulture)); }
- else
- None基于事件流還可以使用組合式的編程:
- twitterStream.NewTweet
- |> Event.choose parseTweet
- |> Event.add (fun s -> printfn "%A" s)
- twitterStream.StartListening()或是收集統(tǒng)計數據:
- let addToMultiMap key x multiMap =
- let prev = match Map.tryFind key multiMap with None -> [] | Some v -> v
- Map.add x.UserName (x::prev) multiMap
- /// An event which triggers on every 'n' triggers of the input event
- let every n (ev:IEvent<_>) =
- let out = new Event<_>()
- let count = ref 0
- ev.Add (fun arg -> incr count; if !count % n = 0 then out.Trigger arg)
- out.Publish
- twitterStream.NewTweet
- |> Event.choose parseTweet
- // Build up the table of tweets indexed by user
- |> Event.scan (fun z x -> addToMultiMap x.UserName x z) Map.empty
- // Take every 20’ˉth index
- |> every 20
- // Listen and display the average of #tweets/user
- |> Event.add (fun s ->
- let avg = s |> Seq.averageBy (fun (KeyValue(_,d)) -> float d.Length)
- printfn "#users = %d, avg tweets = %g" s.Count avg)
twitterStream.StartListening()以上代碼對采樣消息流的內容進行統(tǒng)計,每收到20條消息便打印出每個用戶的平均推數。
- #users = 19, avg tweets = 1.05263
- #users = 39, avg tweets = 1.02564
- #users = 59, avg tweets = 1.01695
- #users = 79, avg tweets = 1.01266
- #users = 99, avg tweets = 1.0101
- #users = 118, avg tweets = 1.01695
- #users = 138, avg tweets = 1.01449
- #users = 158, avg tweets = 1.01266
- #users = 178, avg tweets = 1.01124
- #users = 198, avg tweets = 1.0101
- #users = 218, avg tweets = 1.00917
- #users = 237, avg tweets = 1.01266
- #users = 257, avg tweets = 1.01167
- #users = 277, avg tweets = 1.01083
- #users = 297, avg tweets = 1.0101
- #users = 317, avg tweets = 1.00946
- #users = 337, avg tweets = 1.0089
- #users = 357, avg tweets = 1.0084
- #users = 377, avg tweets = 1.00796
- #users = 396, avg tweets = 1.0101
- #users = 416, avg tweets = 1.00962
- #users = 435, avg tweets = 1.01149
- #users = 455, avg tweets = 1.01099
- #users = 474, avg tweets = 1.01266
- #users = 494, avg tweets = 1.01215
- #users = 514, avg tweets = 1.01167
- #users = 534, avg tweets = 1.01124
- #users = 554, avg tweets = 1.01083
- #users = 574, avg tweets = 1.01045
- #users = 594, avg tweets = 1.0101
只要使用稍稍不同的分析方式,我們便可以顯示出Twitter提供的采樣消息流中發(fā)推超過1次的用戶,以及他們最新的推內容。以下代碼可以在F#的交互命令行中使用,如之前文章中的做法,在數據表格中顯示內容:
- open System.Drawing
- open System.Windows.Forms
- let form = new Form(Visible = true, Text = "A Simple F# Form", TopMost = true, SizeSize = Size(600,600))
- let data = new DataGridView(Dock = DockStyle.Fill, Text = "F# Programming is Fun!",
- Font = new Font("Lucida Console",12.0f),
- ForeColor = Color.DarkBlue)
- form.Controls.Add(data)
- data.DataSource <- [| (10,10,10) |]
- data.Columns.[0].Width <- 200
- data.Columns.[2].Width <- 500
- twitterStream.NewTweet
- |> Event.choose parseTweet
- // Build up the table of tweets indexed by user
- |> Event.scan (fun z x -> addToMultiMap x.UserName x z) Map.empty
- // Take every 20’ˉth index
- |> every 20
- // Listen and display those with more than one tweet
- |> Event.add (fun s ->
- let moreThanOneMessage = s |> Seq.filter (fun (KeyValue(_,d)) -> d.Length > 1)
- data.DataSource <-
- moreThanOneMessage
- |> Seq.map (fun (KeyValue(user,d)) -> (user, d.Length, d.Head.Status))
- |> Seq.filter (fun (_,n,_) -> n > 1)
- |> Seq.sortBy (fun (_,n,_) -> -n)
- |> Seq.toArray)
twitterStream.StartListening()以下是部分采樣結果:請注意,在上面的示例中,我們使用阻塞式的I/O操作來讀取Twitter消息流。這有兩個原因──Twitter數據流十分活躍(且一直如此),而且我們可以假設不會有太多的Twitter流──如這里只有1個。此外,Twitter會對單一帳號的采樣次數進行限制。文章后續(xù)的內容中,我們會演示如何對此類XML片段進行非阻塞的讀取。
用F#做并行,用C#/VB做GUI,反饋進度的事件模式,對于那種F#程序員實現異步計算組件,并交給C#或VB程序員來使用的場景非常有用。在下面的示例中,發(fā)布出去的事件需要被標記為[<CLIEvent>],以此保證它們在C#或VB程序員看來也是標準的事件。例如在上面第二個示例中,您需要使用:
- /// Raised when a particular job completes
- [<CLIEvent>]
- member x.JobCompleted = jobCompleted.Publish
- /// Raised when all jobs complete
- [<CLIEvent>]
- member x.AllCompleted = allCompleted.Publish
- /// Raised when the composition is cancelled successfully
- [<CLIEvent>]
- member x.Canceled = canceled.Publish
- /// Raised when the composition exhibits an error
- [<CLIEvent>]
- member x.Error = error.Publish模式的限制
反饋進度的事件模式會有一些假設:并行處理組件的使用者是那些GUI應用程序(如Windows Forms),服務器端應用程序(如ASP.NET)或其他一些能夠將事件交由監(jiān)控方使用場景。我們也可以調整這一模式中發(fā)起事件的方式,例如將消息發(fā)送給一個MailboxProcessor或簡單地記錄它們。然而這里還是有一些假設,需要有個主線程或是其他某個監(jiān)控者來監(jiān)聽這些事件,或是合理的保存它們。
反饋進度的事件模式同樣假設封裝后對象可以獲取GUI線程的同步上下文,這通常是隱式的(如上面那些例子)。這一般是個合理的假設。還有一種做法是由外部參數來獲得這個上下文,雖然它在.NET編程中并非是種常見的做法。
如果您對于.NET 4.0中的IObservable接口較為熟悉,您可能會考慮讓TwitterStreamSample類型實現這個接口。然而,對于最終數據源來說,這個做法的好處不大。例如,以后TwitterStreamSample類型可能會需要提供更多種事件,例如在發(fā)生錯誤并自動重建連接時匯報,或是匯報暫?;蜓舆t狀況。在這樣的場景中,發(fā)起.NET事件就夠了,部分原因是為了讓更多.NET程序員熟悉這個對象。在F#種,所有發(fā)布出去的IEvent<_>對象會自動實現IObservable,這樣其他人在使用時便可以直接使用Observable組合器。
結論
反饋進度的事件模式是一種用于強大而優(yōu)雅的做法,用于在某個邊界之后對并行的執(zhí)行過程加以封裝,并同時匯報執(zhí)行的結果或是進度。在外部,AsyncWoker對象的表現形式一般是單線程的。假設您的異步輸入是獨立的,這意味著該組件不需要將程序的其他部分暴露在多線程的競爭條件下面。
所有的JavaScript,ASP.NET以及GUI框架的程序員(如Windows Forms)都明白,框架的單線程特性既是優(yōu)勢也是劣勢──問題變得簡單了(沒有數據競爭),但并行和異步編程卻變得很困難。在.NET編程中,I/O和繁重的CPU計算必須交由后臺線程去處理。上面的設計模式可以同時給您兩個世界的優(yōu)勢:您得到了獨立的,可以互操作的,通信豐富的后臺處理組件,其中包括了對I/O及并行計算的支持,同時還在您的大部分代碼中保留了單線程GUI編程的簡單性。正如之前表現的那樣,這些組件還保持了很高的通用性及可復用性,這使得獨立的單元測試也變得非常容易。
文章轉自老趙的博客,
原文鏈接:http://blog.zhaojie.me/2010/03/async-and-parallel-design-patterns-in-fsharp-2-reporting-progress-with-events.html
【編輯推薦】