C# 4.0中的協(xié)變和逆變
在上一篇文章中,我們實(shí)現(xiàn)了一個(gè)簡(jiǎn)單的爬蟲,并指出了這種方式的缺陷?,F(xiàn)在,我們就來看一下,如何使用C# 4.0中所引入的“協(xié)變和逆變”特性來改進(jìn)這種消息執(zhí)行方式,這也是我認(rèn)為在“普適Actor模型”中最合適的做法。這次,我們動(dòng)真格的了,我們會(huì)一條一條地改進(jìn)前文提出的缺陷。
協(xié)變和逆變
在以前的幾篇文章中,我們一直掛在嘴邊的說法便是消息于Actor類型的“耦合”太高。例如在簡(jiǎn)單的爬蟲實(shí)現(xiàn)中,Crawler接受的消息為Crawl(Monitor, string),它的第一個(gè)參數(shù)為Monitor類型。但是在實(shí)際應(yīng)用中,這個(gè)參數(shù)很可能需要是各種類型,唯一的“約束”只是它必須能夠接受一個(gè)ICrawlResponseHandler類型的消息,這樣我們就能把抓取的結(jié)果傳遞給它。至于操作Crawler對(duì)象的是Monitor還是Robot,還是我們單元測(cè)試時(shí)動(dòng)態(tài)創(chuàng)建的Mock對(duì)象(這很重要),Crawler一概不會(huì)關(guān)心。
但就是這個(gè)約束,在以前的實(shí)現(xiàn)中,我們必須讓這個(gè)目標(biāo)繼承Actor< ICrawlResponseHandler>,這樣它也就無法接受其他類型的消息了。例如Monitor還要負(fù)責(zé)一些查詢操作我們?cè)撛趺崔k呢?幸運(yùn)的是,在.NET 4.0(C# 4.0)中,我們只需要讓這個(gè)目標(biāo)實(shí)現(xiàn)這樣一個(gè)接口即可:
- public interface IPort< out T>
- {
- void Post(Action< T> message);
- }
瞅到out關(guān)鍵字了沒?事實(shí)上,還有一個(gè)東西您在這里還沒有看到,這便是Action委托在.NET 4.0中的簽名:
- public delegate void Action< in T>(T obj);
就在這樣一個(gè)簡(jiǎn)單的示例中,協(xié)變和逆變所需要的in和out都出現(xiàn)了。這意味著如果有兩個(gè)類型Parent和Child,其中Child是Parent的子類(或Parent接口的實(shí)現(xiàn)),那么實(shí)現(xiàn)了IPort< Child>的對(duì)象便可以自動(dòng)賦值給IPort< Parent>類型的參數(shù)或引用1。使用代碼來說明問題可能會(huì)更清楚一些:
- public class Parent
- {
- public void ParentMethod() { };
- }
- public class Child : Parent { }
- static void Main(string[] args)
- {
- IPort< Child> childPort = new ChildPortType();
- IPort< Parent> parentPort = childPort; // 自動(dòng)轉(zhuǎn)化
- parentPort.Post(p => p.ParentMethod()); // 可以接受Action< Parent>類型作為消息
- }
這意味著,我們可以把ICrawlRequestHandler和ICrawlResponseHandler類型寫成下面的形式:
- internal interface ICrawlRequestHandler
- {
- void Crawl(IPort< ICrawlResponseHandler> collector, string url);
- }
- internal interface ICrawlResponseHandler
- {
- void Succeeded(IPort< ICrawlRequestHandler> crawler, string url, string content, List< string> links);
- void Failed(IPort< ICrawlRequestHandler> crawler, string url, Exception ex);
- }
如今,Monitor和Crawler便可以寫成如下模樣:
- internal class Crawler : Actor< Action< Crawler>>, IPort< Crawler>, ICrawlRequestHandler
- {
- protected override void Receive(Action< Crawler> message) { message(this); }
- #region ICrawlRequestHandler Members
- void ICrawlRequestHandler.Crawl(IPort< ICrawlResponseHandler> collector, string url)
- {
- try
- {
- string content = new WebClient().DownloadString(url);
- var matches = Regex.Matches(content, @"href=""(http://[^""]+)""").Cast< Match>();
- var links = matches.Select(m => m.Groups[1].Value).Distinct().ToList();
- collector.Post(m => m.Succeeded(this, url, content, links));
- }
- catch (Exception ex)
- {
- collector.Post(m => m.Failed(this, url, ex));
- }
- }
- #endregion
- }
- public class Monitor : Actor< Action< Monitor>>, IPort< Monitor>, ICrawlResponseHandler
- {
- protected override void Receive(Action< Monitor> message) { message(this); }
- #region ICrawlResponseHandler Members
- void ICrawlResponseHandler.Succeeded(IPort< ICrawlRequestHandler> crawler,
- string url, string content, List< string> links) { ... }
- void ICrawlResponseHandler.Failed(IPort< ICrawlRequestHandler> crawler,
- string url, Exception ex) { ... }
- #endregion
- private void DispatchCrawlingTasks(IPort< ICrawlRequestHandler> reusableCrawler)
- {
- if (this.m_readyToCrawl.Count < = 0)
- {
- this.WorkingCrawlerCount--;
- }
- var url = this.m_readyToCrawl.Dequeue();
- reusableCrawler.Post(c => c.Crawl(this, url));
- while (this.m_readyToCrawl.Count > 0 &&
- this.WorkingCrawlerCount < this.MaxCrawlerCount)
- {
- var newUrl = this.m_readyToCrawl.Dequeue();
- IPort< ICrawlRequestHandler> crawler = new Crawler();
- crawler.Post(c => c.Crawl(this, newUrl));
- this.WorkingCrawlerCount++;
- }
- }
- }
Monitor的具體實(shí)現(xiàn)和上篇文章區(qū)別不大,您可以參考文章末尾給出的完整代碼,并配合前文的分析來理解,這里我們只關(guān)注被標(biāo)紅的兩行代碼。
在第一行中我們創(chuàng)建了一個(gè)Crawler類型的對(duì)象,并把它賦值給IPort< ICrawlerRequestHandler>類型的變量中。請(qǐng)注意,Crawler對(duì)象并沒有實(shí)現(xiàn)這個(gè)接口,它只是實(shí)現(xiàn)了IPort< Crawler>及ICrawlerRequestHandler。不過由于IPort< T>支持協(xié)變,于是IPort< Crawler>被安全地轉(zhuǎn)換成了IPort< ICrawlerRequestHandler>對(duì)象。
第二行中再次發(fā)生了協(xié)變:ICrawlRequestHandler.Crawel的第一個(gè)參數(shù)需要IPort< ICrawlResponseHandler>類型的對(duì)象,但是this是Monitor類型的,它并沒有實(shí)現(xiàn)這個(gè)接口。不過,和上面描述的一樣,由于IPort< T>支持協(xié)變,因此這樣的類型轉(zhuǎn)化是安全的,允許的。于是在Crawler類便可以操作一個(gè)“抽象”,而不是具體的Monitor類型來辦事了。
神奇不?但就是這么簡(jiǎn)單。
“內(nèi)部”消息控制
在上一篇文章中,我們還提出了Crawler實(shí)現(xiàn)的另一個(gè)缺點(diǎn):沒有使用異步IO。WebClient本身的DownloadStringAsync方法可以進(jìn)行異步下載,但是如果在異步完成的后續(xù)操作(如分析鏈接)會(huì)在IO線程池中運(yùn)行,這樣我們就很難對(duì)任務(wù)所分配的運(yùn)算能力進(jìn)行控制。我們當(dāng)時(shí)提出,可以把后續(xù)操作作為消息發(fā)送給Crawler本身,也就是進(jìn)行“內(nèi)部”消息控制——可惜的是,我們當(dāng)時(shí)無法做到。不過現(xiàn)在,由于Crawler實(shí)現(xiàn)的是IPort< Crawler>接口,因此,我們可以把Crawler內(nèi)部的任何方法作為消息傳遞給自身,如下:
- internal class Crawler : Actor< Action< Crawler>>, IPort< Crawler>, ICrawlRequestHandler
- {
- protected override void Receive(Action< Crawler> message) { message(this); }
- #region ICrawlRequestHandler Members
- public void Crawl(IPort< ICrawlResponseHandler> collector, string url)
- {
- WebClient client = new WebClient();
- client.DownloadStringCompleted += (sender, e) =>
- {
- if (e.Error == null)
- {
- this.Post(c => c.Crawled(collector, url, e.Result));
- }
- else
- {
- collector.Post(c => c.Failed(this, url, e.Error));
- }
- };
- client.DownloadStringAsync(new Uri(url));
- }
- private void Crawled(IPort< ICrawlResponseHandler> collector, string url, string content)
- {
- var matches = Regex.Matches(content, @"href=""(http://[^""]+)""").Cast< Match>();
- var links = matches.Select(m => m.Groups[1].Value).Distinct().ToList();
- collector.Post(c => c.Succeeded(this, url, content, links));
- }
- #endregion
- }
我們準(zhǔn)備了一個(gè)private的Crawled方法,如果抓取成功了,我們會(huì)把這個(gè)方法的調(diào)用封裝在一條消息中重新發(fā)給自身。請(qǐng)注意,這是個(gè)私有方法,因此這里完全是在做“內(nèi)部”消息控制。
開啟抓取任務(wù)
在上一篇文章中,我們?yōu)镸onitor添加了一個(gè)Start方法,它的作用是啟動(dòng)URL。我們知道,對(duì)單個(gè)Actor來說消息的處理是線程安全的,但是這個(gè)前提是使用“消息”傳遞的方式進(jìn)行通信,如果直接調(diào)用Start公有方法,便會(huì)破壞這種線程安全特性。不過現(xiàn)在的Monitor已經(jīng)不受接口的限制,可以自由接受任何它可以執(zhí)行的消息,因此我們只要對(duì)外暴露一個(gè)Crawl方法即可:
- public class Monitor : Actor< Action< Monitor>>, IPort< Monitor>,
- ICrawlResponseHandler,
- IStatisticRequestHandelr
- {
- ...
- public void Crawl(string url)
- {
- if (this.m_allUrls.Contains(url)) return;
- this.m_allUrls.Add(url);
- if (this.WorkingCrawlerCount < this.MaxCrawlerCount)
- {
- this.WorkingCrawlerCount++;
- IPort< ICrawlRequestHandler> crawler = new Crawler();
- crawler.Post(c => c.Crawl(this, url));
- }
- else
- {
- this.m_readyToCrawl.Enqueue(url);
- }
- }
- }
于是我們便可以向Monitor發(fā)送消息,讓其抓取特定的URL:
- string[] urls =
- {
- "http://www.cnblogs.com/dudu/",
- "http://www.cnblogs.com/TerryLee/",
- "http://www.cnblogs.com/JeffreyZhao/"
- };
- Random random = new Random(DateTime.Now.Millisecond);
- Monitor monitor = new Monitor(10);
- foreach (var url in urls)
- {
- var urlToCrawl = url;
- monitor.Post(m => m.Crawl(urlToCrawl));
- Thread.Sleep(random.Next(1000, 3000));
- }
上面的代碼會(huì)每隔1到3秒發(fā)出一個(gè)抓取請(qǐng)求。由于我們使用了消息傳遞的方式進(jìn)行通信,因此對(duì)于Monitor來說,這一切都是線程安全的。我們可以隨時(shí)隨地為Monitor添加抓取任務(wù)。
接受多種消息(協(xié)議)
我們?cè)儆^察一下Monitor的簽名:
class Monitor : Actor< Action< Monitor>>, IPort< Monitor>, ICrawlResponseHandler
可以發(fā)現(xiàn),如今的Monitor已經(jīng)和它實(shí)現(xiàn)的協(xié)議沒有一對(duì)一的關(guān)系了。也就是說,它可以添加任意功能,可以接受任意類型的消息,我們只要讓它實(shí)現(xiàn)另一個(gè)接口即可。于是乎,我們?cè)僖粋€(gè)“查詢”功能2:
- public interface IStatisticRequestHandelr
- {
- void GetCrawledCount(IPort< IStatisticResponseHandler> requester);
- void GetContent(IPort< IStatisticResponseHandler> requester, string url);
- }
- public interface IStatisticResponseHandler
- {
- void ReplyCrawledCount(int count);
- void ReplyContent(string url, string content);
- }
為了讓Monior支持查詢,我們還需要為它添加這樣的代碼:
- public class Monitor : Actor< Action< Monitor>>, IPort< Monitor>,
- ICrawlResponseHandler,
- IStatisticRequestHandelr
- {
- ...
- #region IStatisticRequestHandelr Members
- void IStatisticRequestHandelr.GetCrawledCount(IPort< IStatisticResponseHandler> requester)
- {
- requester.Post(r => r.ReplyCrawledCount(this.m_urlContent.Count));
- }
- void IStatisticRequestHandelr.GetContent(IPort< IStatisticResponseHandler> requester, string url)
- {
- string content;
- if (!this.m_urlContent.TryGetValue(url, out content))
- {
- content = null;
- }
- requester.Post(r => r.ReplyContent(url, content));
- }
- #endregion
- }
最后,我們來嘗試著使用這個(gè)“查詢”功能。首先,我們編寫一個(gè)測(cè)試用的TestStatisticPort類:
- public class TestStatisticPort : IPort< IStatisticResponseHandler>, IStatisticResponseHandler
- {
- private IPort< IStatisticRequestHandelr> m_statisticPort;
- public TestStatisticPort(IPort< IStatisticRequestHandelr> statisticPort)
- {
- this.m_statisticPort = statisticPort;
- }
- public void Start()
- {
- while (true)
- {
- Console.ReadLine();
- this.m_statisticPort.Post(s => s.GetCrawledCount(this));
- }
- }
- #region IPort< IStatisticResponseHandler> Members
- void IPort< IStatisticResponseHandler>.Post(Action< IStatisticResponseHandler> message)
- {
- message(this);
- }
- #endregion
- #region IStatisticResponseHandler Members
- void IStatisticResponseHandler.ReplyCrawledCount(int count)
- {
- Console.WriteLine("Crawled: {0}", count);
- }
- void IStatisticResponseHandler.ReplyContent(string url, string content) { ... }
- #endregion
- }
當(dāng)調(diào)用Start方法時(shí),控制臺(tái)將會(huì)等待用戶敲擊回車鍵。當(dāng)按下回車鍵時(shí),TestStatisticPort將會(huì)向Monitor發(fā)送一個(gè)IStatisticRequestHandler.GetCrawledCount消息。Monitor回復(fù)之后,屏幕上便會(huì)顯示當(dāng)前已經(jīng)抓取成功的URL數(shù)目。例如,我們可以編寫如下的測(cè)試代碼:
- static void Main(string[] args)
- {
- var monitor = new Monitor(5);
- monitor.Post(m => m.Crawl("http://www.cnblogs.com/"));
- TestStatisticPort testPort = new TestStatisticPort(monitor);
- testPort.Start();
- }
隨意敲擊幾下回車,結(jié)果如下:
總結(jié)
如今的做法,兼顧了強(qiáng)類型檢查,并使用C# 4.0中的協(xié)變和逆變特性,把上一篇文章中提出的問題解決了,不知您是否理解了這些內(nèi)容?只可惜,我們?cè)贑# 3.0中還沒有協(xié)變和逆變。因此,我們還必須思考一個(gè)適合C# 3.0的做法。
順便一提,由于F#不支持協(xié)變和逆變,因此本文的做法無法在F#中使用。
注1:關(guān)于協(xié)變和逆變特性,我認(rèn)為腦袋兄的這篇文章講的非常清楚——您看得頭暈了?是的,剛開始了解協(xié)變和逆變,以及它們之間的嵌套規(guī)則時(shí)我也頭暈,但是您在掌握之后就會(huì)發(fā)現(xiàn),這的確是一個(gè)非常有用的特性。
注2:不知您是否發(fā)現(xiàn),與之前internal的Crawl相關(guān)接口不同,Statistic相關(guān)接口是public的。我們?cè)谑褂媒涌谧鳛橄r(shí),也可以通過這種辦法來控制哪些消息是可以對(duì)外暴露的。這也算是一種額外的收獲吧。
【編輯推薦】