新聞中心
這里我們會(huì)使用這個(gè)設(shè)計(jì)模式開發(fā)一個(gè)示例,分析F#函數(shù)式編程語(yǔ)言中的反饋進(jìn)度事件,其中部分示例代碼來自于F# JAOO Tutorial。

創(chuàng)新互聯(lián)公司-專業(yè)網(wǎng)站定制、快速模板網(wǎng)站建設(shè)、高性價(jià)比棗莊網(wǎng)站開發(fā)、企業(yè)建站全套包干低至880元,成熟完善的模板庫(kù),直接使用。一站式棗莊網(wǎng)站制作公司更省心,省錢,快速模板網(wǎng)站建設(shè)找我們,業(yè)務(wù)覆蓋棗莊地區(qū)。費(fèi)用合理售后完善,十年實(shí)體公司更值得信賴。
我們先來看一下這個(gè)設(shè)計(jì)模式的一個(gè)基礎(chǔ)示例。在下面的代碼中,我們會(huì)定義一個(gè)對(duì)象,以此來協(xié)調(diào)一組同時(shí)執(zhí)行的異步任務(wù)。每個(gè)任務(wù)在結(jié)束之后會(huì)主動(dòng)匯報(bào)它的結(jié)果,而不是等待統(tǒng)一的收集過程。
- type AsyncWorker<'T>(jobs: seq
>) = - // This declares an F# event that we can raise
- let jobCompleted = new Event
() - /// Start an instance of the work
- member x.Start() =
- // Capture the synchronization context to allow us to raise events back on the GUI thread
- let syncContext = SynchronizationContext.CaptureCurrent()
- // Mark up the jobs with numbers
- let jobsjobs = jobs |> Seq.mapi (fun i job -> (job,i+1))
- let work =
- Async.Parallel
- [ for (job,jobNumber) in jobs ->
- async { let! result = job
- syncContext.RaiseEvent jobCompleted (jobNumber,result)
- return result } ]
- Async.Start(work |> Async.Ignore)
- /// Raised when a particular job completes
- member x.JobCompleted = jobCompleted.Publish
設(shè)計(jì)模式的一些關(guān)鍵之處已經(jīng)使用黃色進(jìn)行高亮:在對(duì)象的Start方法中,我們?cè)贕UI線程中捕獲了當(dāng)前的“同步上下文”,這使得我們可以從GUI的上下文中運(yùn)行代碼或觸發(fā)事件。我們還定義了一個(gè)私有的輔助函數(shù)來觸發(fā)任意的F#事件,這雖不必須但可以使我們的代碼變的更為整潔。定義了多個(gè)事件。這些事件作為屬性發(fā)布,如果該對(duì)象還需要被其他.NET語(yǔ)言使用,則為它標(biāo)記一個(gè)[
我們這里通過指定一個(gè)定義了任務(wù)內(nèi)容的異步工作流來啟動(dòng)后臺(tái)任務(wù)。Async.Start可以用來啟動(dòng)這個(gè)工作流(雖然Async.StartWithContinuations更為常用,例如在后面的示例中)。在后臺(tái)任務(wù)產(chǎn)生進(jìn)度之后,便會(huì)在合適的時(shí)候觸發(fā)這些事件。這段代碼使用了兩個(gè)基于System.Threading.SynchronizationContext的輔助方法,它們會(huì)在這個(gè)系列的文章中多次出現(xiàn)。如下:
- type SynchronizationContext with
- /// A standard helper extension method to raise an event on the GUI thread
- member syncContext.RaiseEvent (event: Event<_>) args =
- syncContext.Post((fun _ -> event.Trigger args),state=null)
- /// A standard helper extension method to capture the current synchronization context.
- /// If none is present, use a context that executes work in the thread pool.
- static member CaptureCurrent () =
- match SynchronizationContext.Current with
- | null -> new SynchronizationContext()
- | ctxt -> ctxt
- 您現(xiàn)在便可以使用這個(gè)組件來管理一系列CPU密集型異步任務(wù):
- let rec fib i = if i < 2 then 1 else fib (i-1) + fib (i-2)
- let worker =
- new AsyncWorker<_>( [ for i in 1 .. 100 -> async { return fib (i % 40) } ] )
- worker.JobCompleted.Add(fun (jobNumber, result) ->
- printfn "job %d completed with result %A" jobNumber result)
- worker.Start()
在執(zhí)行時(shí),每個(gè)任務(wù)結(jié)束之后便會(huì)匯報(bào)結(jié)果:
- job 1 completed with result 1
- job 2 completed with result 2
- ...
- job 39 completed with result 102334155
- job 77 completed with result 39088169
- job 79 completed with result 102334155
我們可以使用多種方式讓后臺(tái)運(yùn)行的任務(wù)匯報(bào)結(jié)果。在90%的情況下最簡(jiǎn)單的便是上面的方法:在GUI(或ASP.NET的Page_Load)線程中觸發(fā).NET事件。這個(gè)技巧隱藏了后臺(tái)線程的使用細(xì)節(jié),并利用了所有.NET程序員都非常熟悉的標(biāo)準(zhǔn).NET慣例,以此保證用于實(shí)現(xiàn)并行編程的技術(shù)都得到了有效的封裝。
匯報(bào)異步I/O的進(jìn)度
反饋進(jìn)度的事件模式也可以用在異步I/O操作上。例如這里有一系列I/O任務(wù):
- open System.IO
- open System.Net
- open Microsoft.FSharp.Control.WebExtensions
- /// Fetch the contents of a web page, asynchronously.
- let httpAsync(url:string) =
- async { let req = WebRequest.Create(url)
- use! resp = req.AsyncGetResponse()
- use stream = resp.GetResponseStream()
- use reader = new StreamReader(stream)
- let text = reader.ReadToEnd()
- return text }
- let urls =
- [ "http://www.live.com";
- "http://news.live.com";
- "http://www.yahoo.com";
- "http://news.yahoo.com";
- "http://www.google.com";
- "http://news.google.com"; ]
- let jobs = [ for url in urls -> httpAsync url ]
- let worker = new AsyncWorker<_>(jobs)
- worker.JobCompleted.Add(fun (jobNumber, result) ->
- printfn "job %d completed with result %A" jobNumber result.Length)
- worker.Start()
在執(zhí)行過程中便會(huì)反饋進(jìn)度結(jié)果,表現(xiàn)為每個(gè)Web頁(yè)面的長(zhǎng)度:
- job 5 completed with result 8521
- job 6 completed with result 155767
- job 3 completed with result 117778
- job 1 completed with result 16490
- job 4 completed with result 175186
- job 2 completed with result 70362
#p#
反饋多種不同事件的任務(wù)
在這個(gè)設(shè)計(jì)模式中,我們使用了一個(gè)對(duì)象來封裝和監(jiān)督異步組合任務(wù)的執(zhí)行過程,即使我們需要豐富API,也可以輕松地添加多個(gè)事件。例如,以下的代碼添加了額外的事件來表示所有的任務(wù)已經(jīng)完成了,或是其中某個(gè)任務(wù)出現(xiàn)了錯(cuò)誤,還有便是整個(gè)組合完成之前便成功地取消了任務(wù)。以下高亮的代碼便展示了事件的聲明,觸發(fā)及發(fā)布:
- open System
- open System.Threading
- open System.IO
- open Microsoft.FSharp.Control.WebExtensions
- type AsyncWorker<'T>(jobs: seq
>) = - // Each of these lines declares an F# event that we can raise
- let allCompleted = new Event<'T[]>()
- let error = new Event
() - let canceled = new Event
() - let jobCompleted = new Event
() - let cancellationCapability = new CancellationTokenSource()
- /// Start an instance of the work
- member x.Start() =
- // Capture the synchronization context to allow us to raise events back on the GUI thread
- let syncContext = SynchronizationContext.CaptureCurrent()
- // Mark up the jobs with numbers
- let jobsjobs = jobs |> Seq.mapi (fun i job -> (job,i+1))
- let work =
- Async.Parallel
- [ for (job,jobNumber) in jobs ->
- async { let! result = job
- syncContext.RaiseEvent jobCompleted (jobNumber,result)
- return result } ]
- Async.StartWithContinuations
- ( work,
- (fun res -> raiseEventOnGuiThread allCompleted res),
- (fun exn -> raiseEventOnGuiThread error exn),
- (fun exn -> raiseEventOnGuiThread canceled exn ),
- cancellationCapability.Token)
- member x.CancelAsync() =
- cancellationCapability.Cancel()
- /// Raised when a particular job completes
- member x.JobCompleted = jobCompleted.Publish
- /// Raised when all jobs complete
- member x.AllCompleted = allCompleted.Publish
- /// Raised when the composition is cancelled successfully
- member x.Canceled = canceled.Publish
- /// Raised when the composition exhibits an error
- member x.Error = error.Publish我們可以使用最普通的做法來響應(yīng)這些額外的事件,例如:
- let worker = new AsyncWorker<_>(jobs)
- worker.JobCompleted.Add(fun (jobNumber, result) ->
- printfn "job %d completed with result %A" jobNumber result.Length)
- worker.AllCompleted.Add(fun results ->
- printfn "all done, results = %A" results )
- worker.Start()
如上,這個(gè)監(jiān)視中異步工作流可以支持任務(wù)的取消操作。反饋進(jìn)度的事件模式可用于相當(dāng)部分需要全程匯報(bào)進(jìn)度的場(chǎng)景。在下一個(gè)示例中,我們使用這個(gè)模式來封裝后臺(tái)對(duì)于一系列Twitter采樣消息的讀取操作。運(yùn)行這個(gè)示例需要一個(gè)Twitter帳號(hào)和密碼。在這里只會(huì)發(fā)起一個(gè)事件,如果需要的話您也可以在某些情況下發(fā)起更多事件。F# JAOO Tutorial中也包含了這個(gè)示例。
- // F# Twitter Feed Sample using F# Async Programming and Event processing
- //
- #r "System.Web.dll"
- #r "System.Windows.Forms.dll"
- #r "System.Xml.dll"
- open System
- open System.Globalization
- open System.IO
- open System.Net
- open System.Web
- open System.Threading
- open Microsoft.FSharp.Control.WebExtensions
- /// A component which listens to tweets in the background and raises an
- /// event each time a tweet is observed
- type TwitterStreamSample(userName:string, password:string) =
- let tweetEvent = new Event<_>()
- let streamSampleUrl = "http://stream.twitter.com/1/statuses/sample.xml?delimited=length"
- /// The cancellation condition
- let mutable group = new CancellationTokenSource()
- /// Start listening to a stream of tweets
- member this.StartListening() =
- // Capture the synchronization context to allow us to raise events back on the GUI thread
- // Capture the synchronization context to allow us to raise events back on the GUI thread
- let syncContext = SynchronizationContext.CaptureCurrent()
- /// The background process
- let listener (syncContext: SynchronizationContext) =
- async { let credentials = NetworkCredential(userName, password)
- let req = WebRequest.Create(streamSampleUrl, Credentials=credentials)
- use! resp = req.AsyncGetResponse()
- use stream = resp.GetResponseStream()
- use reader = new StreamReader(stream)
- let atEnd = reader.EndOfStream
- let rec loop() =
- async {
- let atEnd = reader.EndOfStream
- if not atEnd then
- let sizeLine = reader.ReadLine()
- let size = int sizeLine
- let buffer = Array.zeroCreate size
- let _numRead = reader.ReadBlock(buffer,0,size)
- let text = new System.String(buffer)
- syncContext.RaiseEvent tweetEvent text
- return! loop()
- }
- return! loop() }
- Async.Start(listener, group.Token)
- /// Stop listening to a stream of tweets
- member this.StopListening() =
- group.Cancel();
- group <- new CancellationTokenSource()
- /// Raised when the XML for a tweet arrives
- member this.NewTweet = tweetEvent.Publish在Twitter的標(biāo)準(zhǔn)采樣消息流中每出現(xiàn)一條消息便會(huì)觸發(fā)一個(gè)事件,并同時(shí)提供消息的內(nèi)容。我們可以這樣監(jiān)聽事件流:
- let userName = "..." // set Twitter user name here
- let password = "..." // set Twitter user name here
- let twitterStream = new TwitterStreamSample(userName, password)
- twitterStream.NewTweet
- |> Event.add (fun s -> printfn "%A" s)
- twitterStream.StartListening()
- twitterStream.StopListening()
#p#
程序運(yùn)行后便會(huì)不斷打印出每條消息的XML數(shù)據(jù)。您可以從Twitter API頁(yè)面中來了解采樣消息流的使用方式。如果您想同時(shí)解析這些消息,以下便是這一工作的示例代碼。不過,也請(qǐng)關(guān)注Twitter API頁(yè)面中的指導(dǎo)準(zhǔn)則。例如,如果需要構(gòu)建一個(gè)高可靠性的系統(tǒng),您最好在處理前進(jìn)行保存,或是使用消息隊(duì)列。
- #r "System.Xml.dll"
- #r "System.Xml.Linq.dll"
- open System.Xml
- open System.Xml.Linq
- let xn (s:string) = XName.op_Implicit s
- /// The results of the parsed tweet
- type UserStatus =
- { UserName : string
- ProfileImage : string
- Status : string
- StatusDate : DateTime }
- /// Attempt to parse a tweet
- let parseTweet (xml: string) =
- let document = XDocument.Parse xml
- let node = document.Root
- if node.Element(xn "user") <> null then
- Some { UserName = node.Element(xn "user").Element(xn "screen_name").Value;
- ProfileImage = node.Element(xn "user").Element(xn "profile_image_url").Value;
- Status = node.Element(xn "text").Value |> HttpUtility.HtmlDecode;
- StatusDate = node.Element(xn "created_at").Value |> (fun msg ->
- DateTime.ParseExact(msg, "ddd MMM dd HH:mm:ss +0000 yyyy",
- CultureInfo.CurrentCulture)); }
- else
- None基于事件流還可以使用組合式的編程:
- twitterStream.NewTweet
- |> Event.choose parseTweet
- |> Event.add (fun s -> printfn "%A" s)
- twitterStream.StartListening()或是收集統(tǒng)計(jì)數(shù)據(jù):
- let addToMultiMap key x multiMap =
- let prev = match Map.tryFind key multiMap with None -> [] | Some v -> v
- Map.add x.UserName (x::prev) multiMap
- /// An event which triggers on every 'n' triggers of the input event
- let every n (ev:IEvent<_>) =
- let out = new Event<_>()
- let count = ref 0
- ev.Add (fun arg -> incr count; if !count % n = 0 then out.Trigger arg)
- out.Publish
- twitterStream.NewTweet
- |> Event.choose parseTweet
- // Build up the table of tweets indexed by user
- |> Event.scan (fun z x -> addToMultiMap x.UserName x z) Map.empty
- // Take every 20’ˉth index
- |> every 20
- // Listen and display the average of #tweets/user
- |> Event.add (fun s ->
- let avg = s |> Seq.averageBy (fun (KeyValue(_,d)) -> float d.Length)
- printfn "#users = %d, avg tweets = %g" s.Count avg)
twitterStream.StartListening()以上代碼對(duì)采樣消息流的內(nèi)容進(jìn)行統(tǒng)計(jì),每收到20條消息便打印出每個(gè)用戶的平均推數(shù)。
- #users = 19, avg tweets = 1.05263
- #users = 39, avg tweets = 1.02564
- #users = 59, avg tweets = 1.01695
- #users = 79, avg tweets = 1.01266
- #users = 99, avg tweets = 1.0101
- #users = 118, avg tweets = 1.01695
- #users = 138, avg tweets = 1.01449
- #users = 158, avg tweets = 1.01266
- #users = 178, avg tweets = 1.01124
- #users = 198, avg tweets = 1.0101
- #users = 218, avg tweets = 1.00917
- #users = 237, avg tweets = 1.01266
- #users = 257, avg tweets = 1.01167
- #users = 277, avg tweets = 1.01083
- #users = 297, avg tweets = 1.0101
- #users = 317, avg tweets = 1.00946
- #users = 337, avg tweets = 1.0089
- #users = 357, avg tweets = 1.0084
- #users = 377, avg tweets = 1.00796
- #users = 396, avg tweets = 1.0101
- #users = 416, avg tweets = 1.00962
- #users = 435, avg tweets = 1.01149
- #users = 455, avg tweets = 1.01099
- #users = 474, avg tweets = 1.01266
- #users = 494, avg tweets = 1.01215
- #users = 514, avg tweets = 1.01167
- #users = 534, avg tweets = 1.01124
- #users = 554, avg tweets = 1.01083
- #users = 574, avg tweets = 1.01045
- #users = 594, avg tweets = 1.0101
只要使用稍稍不同的分析方式,我們便可以顯示出Twitter提供的采樣消息流中發(fā)推超過1次的用戶,以及他們最新的推內(nèi)容。以下代碼可以在F#的交互命令行中使用,如之前文章中的做法,在數(shù)據(jù)表格中顯示內(nèi)容:
- open System.Drawing
- open System.Windows.Forms
- let form = new Form(Visible = true, Text = "A Simple F# Form", TopMost = true, SizeSize = Size(600,600))
- let data = new DataGridView(Dock = DockStyle.Fill, Text = "F# Programming is Fun!",
- Font = new Font("Lucida Console",12.0f),
- ForeColor = Color.DarkBlue)
- form.Controls.Add(data)
- data.DataSource <- [| (10,10,10) |]
- data.Columns.[0].Width <- 200
- data.Columns.[2].Width <- 500
- twitterStream.NewTweet
- |> Event.choose parseTweet
- // Build up the table of tweets indexed by user
- |> Event.scan (fun z x -> addToMultiMap x.UserName x z) Map.empty
- // Take every 20’ˉth index
- |> every 20
- // Listen and display those with more than one tweet
- |> Event.add (fun s ->
- let moreThanOneMessage = s |> Seq.filter (fun (KeyValue(_,d)) -> d.Length > 1)
- data.DataSource <-
- moreThanOneMessage
- |> Seq.map (fun (KeyValue(user,d)) -> (user, d.Length, d.Head.Status))
- |> Seq.filter (fun (_,n,_) -> n > 1)
- |> Seq.sortBy (fun (_,n,_) -> -n)
- |> Seq.toArray)
twitterStream.StartListening()以下是部分采樣結(jié)果:請(qǐng)注意,在上面的示例中,我們使用阻塞式的I/O操作來讀取Twitter消息流。這有兩個(gè)原因──Twitter數(shù)據(jù)流十分活躍(且一直如此),而且我們可以假設(shè)不會(huì)有太多的Twitter流──如這里只有1個(gè)。此外,Twitter會(huì)對(duì)單一帳號(hào)的采樣次數(shù)進(jìn)行限制。文章后續(xù)的內(nèi)容中,我們會(huì)演示如何對(duì)此類XML片段進(jìn)行非阻塞的讀取。
用F#做并行,用C#/VB做GUI,反饋進(jìn)度的事件模式,對(duì)于那種F#程序員實(shí)現(xiàn)異步計(jì)算組件,并交給C#或VB程序員來使用的場(chǎng)景非常有用。在下面的示例中,發(fā)布出去的事件需要被標(biāo)記為[
- /// Raised when a particular job completes
- [
] - member x.JobCompleted = jobCompleted.Publish
- /// Raised when all jobs complete
- [
] - member x.AllCompleted = allCompleted.Publish
- /// Raised when the composition is cancelled successfully
- [
] - member x.Canceled = canceled.Publish
- /// Raised when the composition exhibits an error
- [
] - member x.Error = error.Publish模式的限制
反饋進(jìn)度的事件模式會(huì)有一些假設(shè):并行處理組件的使用者是那些GUI應(yīng)用程序(如Windows Forms),服務(wù)器端應(yīng)用程序(如ASP.NET)或其他一些能夠?qū)⑹录挥杀O(jiān)控方使用場(chǎng)景。我們也可以調(diào)整這一模式中發(fā)起事件的方式,例如將消息發(fā)送給一個(gè)MailboxProcessor或簡(jiǎn)單地記錄它們。然而這里還是有一些假設(shè),需要有個(gè)主線程或是其他某個(gè)監(jiān)控者來監(jiān)聽這些事件,或是合理的保存它們。
反饋進(jìn)度的事件模式同樣假設(shè)封裝后對(duì)象可以獲取GUI線程的同步上下文,這通常是隱式的(如上面那些例子)。這一般是個(gè)合理的假設(shè)。還有一種做法是由外部參數(shù)來獲得這個(gè)上下文,雖然它在.NET編程中并非是種常見的做法。
如果您對(duì)于.NET 4.0中的IObservable接口較為熟悉,您可能會(huì)考慮讓TwitterStreamSample類型實(shí)現(xiàn)這個(gè)接口。然而,對(duì)于最終數(shù)據(jù)源來說,這個(gè)做法的好處不大。例如,以后TwitterStreamSample類型可能會(huì)需要提供更多種事件,例如在發(fā)生錯(cuò)誤并自動(dòng)重建連接時(shí)匯報(bào),或是匯報(bào)暫停或延遲狀況。在這樣的場(chǎng)景中,發(fā)起.NET事件就夠了,部分原因是為了讓更多.NET程序員熟悉這個(gè)對(duì)象。在F#種,所有發(fā)布出去的IEvent<_>對(duì)象會(huì)自動(dòng)實(shí)現(xiàn)IObservable,這樣其他人在使用時(shí)便可以直接使用Observable組合器。
結(jié)論
反饋進(jìn)度的事件模式是一種用于強(qiáng)大而優(yōu)雅的做法,用于在某個(gè)邊界之后對(duì)并行的執(zhí)行過程加以封裝,并同時(shí)匯報(bào)執(zhí)行的結(jié)果或是進(jìn)度。在外部,AsyncWoker對(duì)象的表現(xiàn)形式一般是單線程的。假設(shè)您的異步輸入是獨(dú)立的,這意味著該組件不需要將程序的其他部分暴露在多線程的競(jìng)爭(zhēng)條件下面。
所有的JavaScript,ASP.NET以及GUI框架的程序員(如Windows Forms)都明白,框架的單線程特性既是優(yōu)勢(shì)也是劣勢(shì)──問題變得簡(jiǎn)單了(沒有數(shù)據(jù)競(jìng)爭(zhēng)),但并行和異步編程卻變得很困難。在.NET編程中,I/O和繁重的CPU計(jì)算必須交由后臺(tái)線程去處理。上面的設(shè)計(jì)模式可以同時(shí)給您兩個(gè)世界的優(yōu)勢(shì):您得到了獨(dú)立的,可以互操作的,通信豐富的后臺(tái)處理組件,其中包括了對(duì)I/O及并行計(jì)算的支持,同時(shí)還在您的大部分代碼中保留了單線程GUI編程的簡(jiǎn)單性。正如之前表現(xiàn)的那樣,這些組件還保持了很高的通用性及可復(fù)用性,這使得獨(dú)立的單元測(cè)試也變得非常容易。
文章轉(zhuǎn)自老趙的博客,
原文鏈接:http://blog.zhaojie.me/2010/03/async-and-parallel-design-patterns-in-fsharp-2-reporting-progress-with-events.html
【編輯推薦】
- 詳解F#對(duì)象序列化為XML的實(shí)現(xiàn)方法
- F#運(yùn)算符定義規(guī)則總結(jié)
- 淺析F#簡(jiǎn)易Comet聊天服務(wù)實(shí)例
- 詳解F#版本的CodeTimer方法實(shí)現(xiàn)
- TechED 09視頻專訪:F#與函數(shù)式編程語(yǔ)言
新聞名稱:F#中的異步及并行模式:反饋進(jìn)度的事件
標(biāo)題路徑:http://fisionsoft.com.cn/article/dhcsiij.html


咨詢
建站咨詢
