C# Actor模型開發(fā)實例:網(wǎng)絡(luò)爬蟲
之前的幾篇文章大都在擺一些“小道理”,有經(jīng)驗的朋友容易想象出來其中的含義,不過對于那些還不了解Actor模型的朋友來說,這些內(nèi)容似乎有些太過了。此外,乒乓測試雖然經(jīng)典,但是不太容易說明問題。因此,今天我們就來看一個簡單的有些簡陋的網(wǎng)絡(luò)爬蟲,對于Actor模型的使用來說,它至少比乒乓測試能夠說明問題。對了,我們先來使用那“中看不中用”的消息執(zhí)行方式。
功能簡介
這個網(wǎng)絡(luò)爬蟲的功能還是用于演示,先來列舉出它的實現(xiàn)目標(biāo)吧:
◆給出一個初始鏈接,然后抓取它的HTML并分析出所有html鏈接,然后繼續(xù)爬,不斷爬,直到爬完所有鏈接為止。
◆多線程運行,我們可以指定由多少個爬蟲同時工作。
◆多個爬蟲組成一個“工作單元”,程序中可以同時出現(xiàn)多個工作單元,工作單元之間互相獨立。
◆能簡化的地方便簡化,如一切不涉及任何永久性存儲(也就是說,只使用內(nèi)存),沒有太復(fù)雜的容錯機(jī)制。
的確很簡單吧?那么,現(xiàn)在您不妨先在腦海中想象一下,在不用Actor模型的時候您會怎么實現(xiàn)這個功能。然后,我們就要動手使用ActorLite這個小類庫了。
協(xié)議制定
正如我們不斷強調(diào)的那樣,在Actor模型中唯一的通信方式便是互相發(fā)送消息。于是使用Actor模型的第一步往往便是設(shè)計Actor類型,以及它們之間傳遞的消息。在這個簡單的場景中,我們會定義兩種Actor類型。一是Monitor,二是Crawler。一個Monitor便代表一個“工作單元”,它管理了多個爬蟲,即Crawler。
Monitor將負(fù)責(zé)在合適的時候創(chuàng)建Crawler,并向其發(fā)送一個消息,讓其開始工作。在我們的系統(tǒng)中,我們使用ICrawlRequestHandler接口來表示這個消息:
C# Actor模型·創(chuàng)建網(wǎng)絡(luò)爬蟲:
- public interface ICrawlRequestHandler
- {
- void Crawl(Monitor monitor, string url);
- }
在接受到上面的Crawl消息后,Crawler將去抓取指定的url對象,并將結(jié)果發(fā)還給Monitor。在這里我們要求報告Cralwer向Monitor報告“成功”和“失敗”兩種消息1:
C# Actor模型·爬蟲報告消息
- public interface ICrawlResponseHandler
- {
- void Succeeded(Crawler crawler, string url, List<string> links);
- void Failed(Crawler crawler, string url, Exception ex);
- }
我們使用“接口”這種方式定義了“消息組”,把Succeeded和Failed兩種關(guān)系密切的消息綁定在一起。如果抓取成功,則Crawler會從抓取內(nèi)容中獲得額外的鏈接,并發(fā)還給Monitor——失敗的時候自然就發(fā)還一個異常對象。此外,無論是成功還是失敗,我們都會把Crawler對象交給Monitor,Monitor會安排給Crawler新的抓取任務(wù)。
因此,Monitor和Cralwer類的定義大約應(yīng)該是這樣的:
C# Actor模型·爬蟲和監(jiān)控類的定義
- public class Monitor : Actor<Action<ICrawlResponseHandler>>, ICrawlResponseHandler
- {
- protected override void Receive(Action<ICrawlResponseHandler> message)
- {
- message(this);
- }
- #region ICrawlResponseHandler Members
- void ICrawlResponseHandler.Succeeded(Crawler crawler, string url, List<string> links)
- {
- ...
- }
- void ICrawlResponseHandler.Failed(Crawler crawler, string url, Exception ex)
- {
- ...
- }
- #endregion
- }
- public class Crawler : Actor<Action<ICrawlRequestHandler>>, ICrawlRequestHandler
- {
- protected override void Receive(Action<ICrawlRequestHandler> message)
- {
- message(this);
- }
- #region ICrawlRequestHandler Members
- void ICrawlRequestHandler.Crawl(Monitor monitor, string url)
- {
- ...
- }
- #endregion
- }
Crawler實現(xiàn)
我們先從簡單的Crawler類的實現(xiàn)開始。Crawler類只需要實現(xiàn)ICrawlRequestHandler接口的Crawl方法即可:
C# Actor模型·爬蟲的實現(xiàn)
- void ICrawlRequestHandler.Crawl(Monitor monitor, string url)
- {
- try
- {
- string content = new WebClient().DownloadString(url);
- var matches = Regex.Matches(content, @"href=""(http://[^""]+)""").Cast<Match>();
- var links = matches.Select(m => m.Groups[1].Value).Distinct().ToList();
- monitor.Post(m => m.Succeeded(this, url, links));
- }
- catch (Exception ex)
- {
- monitor.Post(m => m.Failed(this, url, ex));
- }
- }
沒錯,使用WebClient下載頁面內(nèi)容只需要一行代碼就可以了。然后便是使用正則表達(dá)式提取出頁面上所有的鏈接。很顯然這里是有問題的,因為我們我只分析出以“http://”開頭的地址,但是無視其他的“相對地址”——不過作為一個小實驗來說已經(jīng)足夠說明問題了。最后自然是使用Post方法將結(jié)果發(fā)還給Monitor。在拋出異常的情況下,這幾行代碼的邏輯也非常自然。
Monitor實現(xiàn)
Monitor相對來說便略顯復(fù)雜了一些。我們知道,Monitor要負(fù)責(zé)控制Crawler的數(shù)量,那么必然需要負(fù)責(zé)維護(hù)一些必要的字段:
C# Actor模型·監(jiān)控的實現(xiàn)
- private HashSet<string> m_allUrls; // 所有待爬或爬過的url
- private Queue<string> m_readyToCrawl; // 待爬的url
- public int MaxCrawlerCount { private set; get; } // 最大爬蟲數(shù)目
- public int WorkingCrawlerCount { private set; get; } // 正在工作的爬蟲數(shù)目
- public Monitor(int maxCrawlerCount)
- {
- this.m_allUrls = new HashSet<string>();
- this.m_readyToCrawl = new Queue<string>();
- this.MaxCrawlerCount = maxCrawlerCount;
- this.WorkingCrawlerCount = 0;
- }
Monitor要處理的自然是ICrawlResponseHandler中的Succeeded或Failed方法:
- void ICrawlResponseHandler.Succeeded(Crawler crawler, string url, List<string> links)
- {
- Console.WriteLine("{0} crawled, {1} link(s).", url, links.Count);
- foreach (var newUrl in links)
- {
- if (!this.m_allUrls.Contains(newUrl))
- {
- this.m_allUrls.Add(newUrl);
- this.m_readyToCrawl.Enqueue(newUrl);
- }
- }
- this.DispatchCrawlingTasks(crawler);
- }
- void ICrawlResponseHandler.Failed(Crawler crawler, string url, Exception ex)
- {
- Console.WriteLine("{0} error occurred: {1}.", url, ex.Message);
- this.DispatchCrawlingTasks(crawler);
- }
在抓取成功時,Monitor將遍歷links列表中的所有地址,如果發(fā)現(xiàn)新的url,則加入相關(guān)集合中。在抓取失敗的情況下,我們也只是簡單的繼續(xù)下去而已。而“繼續(xù)”則是由DispatchCrawlingTasks方法實現(xiàn)的,我們需要傳入一個“可復(fù)用”的Crawler對象:
C# Actor模型·爬蟲的傳入
- private void DispatchCrawlingTasks(Crawler reusableCrawler)
- {
- if (this.m_readyToCrawl.Count <= 0)
- {
- this.WorkingCrawlerCount--;
- return;
- }
- var url = this.m_readyToCrawl.Dequeue();
- reusableCrawler.Post(c => c.Crawl(this, url));
- while (this.m_readyToCrawl.Count > 0 &&
- this.WorkingCrawlerCount < this.MaxCrawlerCount)
- {
- var newUrl = this.m_readyToCrawl.Dequeue();
- new Crawler().Post(c => c.Crawl(this, newUrl));
- this.WorkingCrawlerCount++;
- }
- }
如果已經(jīng)沒有需要抓取的內(nèi)容了,則直接拋棄Crawler對象即可,否則則分派一個新任務(wù)。接著便不斷創(chuàng)建新的爬蟲,分配新的抓取任務(wù),直到爬蟲數(shù)額用滿,或者沒有需要抓取的內(nèi)容位置。
使用
我們使用區(qū)區(qū)幾十行代碼遍實現(xiàn)了一個簡單的多線程爬蟲,其中一個關(guān)鍵便是使用了Actor模型。使用Actor模型,對象之間通過消息傳遞進(jìn)行交互。而且對于單個Actor對象來說,消息的執(zhí)行完全是線程安全的。因此,我們只要作用最直接的邏輯便可以完成整個實現(xiàn),從而回避了內(nèi)存共享的并行模式中所使用的互斥體、鎖等各類組件。
不過有沒有發(fā)現(xiàn),我們沒有一個入口可以“開啟”一個抓取任務(wù)啊,Monitor類中還缺少了點什么。好吧,那么我們補上一個Start方法:
C# Actor模型·爬蟲開始工作
- public class Monitor : Actor<Action<ICrawlResponseHandler>>, ICrawlResponseHandler
- {
- ...
- public void Start(string url)
- {
- this.m_allUrls.Add(url);
- this.WorkingCrawlerCount++;
- new Crawler().Post(c => c.Crawl(this, url));
- }
- }
于是,我們便可以這樣打開一個或多個抓取任務(wù):
- static class Program
- {
- static void Main(string[] args)
- {
- new Monitor(5).Start("http://www.cnblogs.com/");
- new Monitor(10).Start("http://www.csdn.net/");
- Console.ReadLine();
- }
- }
這里我們新建兩個工作單元,也就是啟動了兩個抓取任務(wù)。一是使用5個爬蟲抓取cnblogs.com,二是使用10個爬蟲抓取csdn.net。
缺陷
這里的缺陷是什么?其實很明顯,您發(fā)現(xiàn)了嗎?
使用Actor模型可以保證消息執(zhí)行的線程安全,不過很明顯Start方法并非如此,我們只能用它來“開啟”一個抓取任務(wù)。但是如果我們想再次“手動”提交一個需要抓取的URL怎么辦?所以理想的方法,其實也應(yīng)該是向Monitor發(fā)送一個消息來啟動第一個URL抓取任務(wù)。需要補充,則發(fā)送多個URL即可??墒?,這個消息定義在什么地方才合適呢?我們的Monitor類已經(jīng)實現(xiàn)了Actor<Action<ICrawlResponseHandler>>,已經(jīng)沒有辦法接受另一個接口作為消息了,不是嗎?
這就是一個致命的限制:一個Actor雖然可以實現(xiàn)多個接口,但只能接受其中一個作為消息。同樣的,如果我們要為Monitor提供其他功能,例如“查詢”某個URL的抓取狀態(tài),也因為同樣的原因而無法實現(xiàn)。還有,便是在前幾篇文章中談到的問題了。Crawler和Monitor直接耦合,我們向Crawler發(fā)送的消息只能攜帶一個Monitor對象。
最后,便是一個略顯特別的問題了。我們這里使用WebClient的DownloadString方法來獲取網(wǎng)頁的內(nèi)容,但是這是個同步IO操作,理想的做法中我們應(yīng)該使用異步的方法。所以,我們可以這么寫:
- void ICrawlRequestHandler.Crawl(Monitor monitor, string url)
- {
- WebClient webClient = new WebClient();
- webClient.DownloadStringCompleted += (sender, e) =>
- {
- if (e.Error == null)
- {
- var matches = Regex.Matches(e.Result, @"href=""(http://[^""]+)""").Cast<Match>();
- var links = matches.Select(m => m.Groups[1].Value).Distinct().ToList();
- monitor.Post(m => m.Succeeded(this, url, links));
- }
- else
- {
- monitor.Post(m => m.Failed(this, url, e.Error));
- }
- };
- webClient.DownloadStringAsync(new Uri(url));
- }
如果您還記得老趙在最近一篇文章中關(guān)于IO線程池的討論,就可以了解到DownloadStringCompleted事件的處理方法會在統(tǒng)一的IO線程池中運行,這樣我們無法控制其運算能力。因此,我們應(yīng)該在回調(diào)函數(shù)中向Crawler自己發(fā)送一條消息表示抓取完畢……呃,但是我們現(xiàn)在做不到埃
嗯,下次再說吧。
【編輯推薦】