探究Visual Studio 2010中Parallel的使用
之前51cto曾經(jīng)報(bào)道過關(guān)于Visual Studio 2010中Parallel類實(shí)現(xiàn)并行計(jì)算,本文我們主要分析如何利用Parallel.For和Parallel.ForEach函數(shù)來并行化for循環(huán)和foreach循環(huán)。實(shí)際上,Parallel.For和Parallel.ForEach函數(shù)主要是針對(duì)“并行數(shù)據(jù)”的并行化操作,所謂并行數(shù)據(jù),就是整個(gè)數(shù)據(jù)集中數(shù)據(jù)單元是相互獨(dú)立的,可以同時(shí)進(jìn)行處理。
在實(shí)際開發(fā)中,我們遇到的可以并行處理的不僅包括“并行數(shù)據(jù)”,還包括可以同時(shí)進(jìn)行的“并行邏輯”。所謂“并行邏輯”,就是相互獨(dú)立,可以同時(shí)執(zhí)行的多個(gè)任務(wù)。比如,程序員陳良喬每天早上要做兩件事情:燒水洗臉和鍛煉身體。這兩件事情就是相互獨(dú)立可以并行的,也就是說他在燒水的時(shí)候可以同時(shí)鍛煉身體。在以前的單核時(shí)代,CPU在同一時(shí)間只能完成一件事情,那么陳良喬只能先燒水后鍛煉,或者是先鍛煉后燒水,這導(dǎo)致他上班總是遲到。
進(jìn)入多核時(shí)代,CPU可以在同一時(shí)間完成多件事情了,借助.Net Framework 4.0中的Parallel類,我們可以方便地處理“并行邏輯”?,F(xiàn)在,程序員陳良喬可以一邊鍛煉一邊燒水,再也沒有遲到過了。他逢人便說:“Parallel真是個(gè)好東西!自從用了它,我腰也不酸了,背也不疼了,編程更有勁兒了”
使用Parallel.Invoke處理并行邏輯
跟Parallel.For函數(shù)相似,Parallel.Invoke也是Parallel類的一個(gè)靜態(tài)函數(shù),它可以接受一個(gè)Action[]類型的對(duì)象作為參數(shù),這個(gè)對(duì)象,就是我們要執(zhí)行的任務(wù)。系統(tǒng)會(huì)根據(jù)代碼運(yùn)行的硬件環(huán)境,主要是CPU運(yùn)算核心的個(gè)數(shù),自動(dòng)地進(jìn)行線程的創(chuàng)建和分配。這有些類似于我們所熟悉的多線程開發(fā),通過為每個(gè)線程指定一個(gè)線程函數(shù)而讓多個(gè)任務(wù)同時(shí)進(jìn)行,只是Parallel.Invoke函數(shù)簡(jiǎn)化了線程的創(chuàng)建和分配等繁瑣的動(dòng)作,我們只需要提供核心的線程函數(shù)就可以了。下面我們來看一個(gè)實(shí)際的例子。在上文中,我們介紹了程序員陳良喬起床的例子,在以前的單核時(shí)代,他起床大約是這個(gè)樣子的:
- // 串行式起床
- private static void GetUp()
- {
- Start("GetUp");
- // 先燒水
- boil();
- // 后鍛煉
- exercise();
- End("GetUp");
- }
- // 鍛煉
- private static void exercise()
- {
- Console.WriteLine("Exercise");
- Thread.Sleep(2000);
- Console.WriteLine("Finish Exercise");
- }
- // 燒水
- private static void boil()
- {
- Console.WriteLine("Boil");
- Thread.Sleep(3000);
- Console.WriteLine("Finish Boil");
- }
在單核時(shí)代,CPU在同一時(shí)間只能做一件事情,所以他只能先燒水,后鍛煉,這樣顯然會(huì)耽誤時(shí)間。一天,他又因?yàn)檫@事而遲到了,老板罵道,“你是豬啊,你不會(huì)用Parallel.Invoke一邊燒水一邊鍛煉啊?”于是,有了下面的并行式起床:
- // 并行式起床
- private static void ParallelGetUp()
- {
- Start("ParallelGetUp");
- // 在燒水的同時(shí),鍛煉身體
- var steps = new Action[] { () => boil(), () => exercise() };
- Parallel.Invoke(steps);
- End("ParallelGetUp");
- }
通過Parallel.Invoke函數(shù),我們將一些相互獨(dú)立的任務(wù)同時(shí)執(zhí)行,實(shí)現(xiàn)了“并行邏輯”,也大大地提高了應(yīng)用程序的性能和效率。從下面的截圖中,我們可以明顯地看出兩種方式的差別。串行方式所耗費(fèi)的時(shí)間,是兩個(gè)步驟的時(shí)間總和,而并行方式所耗費(fèi)的時(shí)間,大約是單個(gè)任務(wù)的耗時(shí)最長(zhǎng)的哪一個(gè)。
#p#
對(duì)Parallel.Invoke進(jìn)行控制
Parallel.Invoke提供了一個(gè)重載版本,它可以接受一個(gè)ParallelOptions對(duì)象作為參數(shù),對(duì)Parallel.Invoke的執(zhí)行進(jìn)行控制。通過這個(gè)對(duì)象,我們可以控制并行的最大線程數(shù),各個(gè)任務(wù)是否取消執(zhí)行等等。例如,在一個(gè)智能化的家中,系統(tǒng)會(huì)判斷主人是否離開房間,如果主人離開了房間,則自動(dòng)關(guān)閉屋子里的各種電器。利用Parallel.Invoke我們可以實(shí)現(xiàn)如下:
- public static void PInvokeCancel()
- {
- // 創(chuàng)建取消對(duì)象
- CancellationTokenSource cts = new CancellationTokenSource();
- // 利用取消對(duì)象,創(chuàng)建ParallelOptions
- ParallelOptions pOption = new ParallelOptions() { CancellationToken = cts.Token };
- // 設(shè)置最大線程數(shù)
- pOption.MaxDegreeOfParallelism = 2;
- // 創(chuàng)建一個(gè)守護(hù)監(jiān)視進(jìn)程
- Task.Factory.StartNew(() =>
- {
- Console.WriteLine("Cancellation in 5 sec.");
- Thread.Sleep(5000);
- // 取消,結(jié)束任務(wù)的執(zhí)行
- cts.Cancel();
- Console.WriteLine("Canceled requested");
- });
- try
- {
- // 以ParallelOptions作為參數(shù),
- // 調(diào)用Parallel.Invoke
- Parallel.Invoke(pOption, () => ShutdownLights(pOption.CancellationToken),
- () => ShutdownComputer(pOption.CancellationToken));
- //輸出執(zhí)行結(jié)果
- Console.WriteLine("Lights and computer are tuned off.");
- }
- catch (Exception e)
- {
- Console.WriteLine(e.Message);
- }
- }
- private static void ShutdownLights(CancellationToken token)
- {
- while (!token.IsCancellationRequested)
- {
- Console.WriteLine("Light is on. " );
- Thread.Sleep(1000);
- }
- }
- private static void ShutdownComputer(CancellationToken token)
- {
- while (!token.IsCancellationRequested)
- {
- Console.WriteLine("Computer is on." );
- Thread.Sleep(1000);
- }
- }
除了這種方式之外,ParallelOptions更多地應(yīng)用在取消任務(wù)隊(duì)列中還未來得及執(zhí)行的任務(wù)。當(dāng)我們限制了最大并發(fā)線程數(shù)的時(shí)候,如果需要通過Parallel.Invoke執(zhí)行的任務(wù)較多,則有可能部分任務(wù)在隊(duì)列中排隊(duì)而得不到及時(shí)的執(zhí)行,如果到了一定的條件這些任務(wù)還沒有執(zhí)行,我們可能取消這些任務(wù)。一個(gè)恰當(dāng)?shù)默F(xiàn)實(shí)生活中的例子就是火車站買票?;疖囌举I票的人很多,但是售票的窗口有限,當(dāng)?shù)搅讼掳鄷r(shí)間后,窗口就不再售票了,也就是剩下的售票任務(wù)需要取消掉。我們可以用下面的代碼來模擬這樣一個(gè)場(chǎng)景:
- public static void PInvokeCancel()
- {
- // 創(chuàng)建取消對(duì)象
- CancellationTokenSource cts = new CancellationTokenSource();
- // 利用取消對(duì)象,創(chuàng)建ParallelOptions
- ParallelOptions pOption = new ParallelOptions() { CancellationToken = cts.Token };
- // 設(shè)置最大線程數(shù),也就相當(dāng)于20個(gè)售票窗口
- pOption.MaxDegreeOfParallelism = 20;
- // 創(chuàng)建一個(gè)守護(hù)監(jiān)視進(jìn)程
- // 當(dāng)?shù)较掳鄷r(shí)間后就取消剩下的售票活動(dòng)
- Task.Factory.StartNew(() =>
- {
- Console.WriteLine("Cancellation in 5 sec.");
- Thread.Sleep(5000);
- // 取消,結(jié)束任務(wù)的執(zhí)行
- cts.Cancel();
- Console.WriteLine("Canceled requested");
- });
- try
- {
- // 創(chuàng)建售票活動(dòng)
- Action[] CustomerServices = CreateCustomerService(1000);
- // 以ParallelOptions作為參數(shù),
- // 調(diào)用Parallel.Invoke
- Parallel.Invoke(pOption, CustomerServices);
- }
- catch (Exception e)
- {
- // 當(dāng)任務(wù)取消后,拋出一個(gè)異常
- Console.WriteLine(e.Message);
- }
- }
- // 創(chuàng)建售票的活動(dòng)
- static Action[] CreateCustomerService(int n)
- {
- Action[] result = new Action[n];
- for (int i = 0; i < n; i++)
- {
- result[i] = () =>
- {
- Console.WriteLine("Customer Service {0}", Task.CurrentId);
- // 模擬售票需要的時(shí)間
- Thread.Sleep(2000);
- };
- }
- return result;
- }
#p#
并行任務(wù)之間的同步
有時(shí)候我們?cè)谔幚聿⑿腥蝿?wù)的時(shí)候,各個(gè)任務(wù)之間需要同步,也就是同時(shí)執(zhí)行的并行任務(wù),需要在共同到達(dá)某一個(gè)狀態(tài)的后再一共繼續(xù)執(zhí)行。我們可以舉一個(gè)現(xiàn)實(shí)生活中的例子。陳良喬,賈瑋和單春暉是好朋友,他們相約到電影院看《建國(guó)大業(yè)》。他們?nèi)齻€(gè)住在不同的地方,為了能一起買票進(jìn)電影院,他們約好先在電影院門口的KFC會(huì)合,然后再一起進(jìn)電影院。這其中就涉及到一個(gè)同步的問題:他們需要先在KFC會(huì)合。他們是從家里分別到KFC的,但是需要在KFC進(jìn)行同步,等到三個(gè)人都到齊后在完成后后繼的動(dòng)作,進(jìn)電影院看電影。
為了完成并行任務(wù)之間的同步,.NET Framework中提供了一個(gè)類Barrier。顧名思義,Barrier就像一個(gè)關(guān)卡或者是剪票口一樣,通過Barrier類,我們可以管理并行任務(wù)的執(zhí)行,完成他們之間的同步。Barrier類的使用非常簡(jiǎn)單,我們只需要在主線程中聲明一個(gè)Barrier對(duì)象,同時(shí)指明需要同步的任務(wù)數(shù)。然后,在需要進(jìn)行同步的地方調(diào)用Barrier類的SignalAndWait函數(shù)就可以了。 當(dāng)一個(gè)并行任務(wù)到達(dá)SignalAndWait后,它會(huì)暫停執(zhí)行,等待所有并行任務(wù)都到達(dá)同步點(diǎn)之后再繼續(xù)往下執(zhí)行。下面我們以一個(gè)實(shí)際的例子,來看看如何利用Barrier類完成看電影的同步問題。
- using System;
- using System.Collections.Generic;
- using System.Linq;
- using System.Text;
- using System.Threading;
- using System.Threading.Tasks;
- namespace ParallelBarrier
- {
- class Program
- {
- // 用于同步的Barrier對(duì)象
- static Barrier sync;
- static void Main(string[] args)
- {
- // 創(chuàng)建Barrier對(duì)象,這里我們需要同步
- // 任務(wù)有三個(gè)
- sync = new Barrier(3);
- // 開始執(zhí)行并行任務(wù)
- var steps = new Action[] { () => gotothecinema("陳良喬", TimeSpan.FromSeconds(5) ),
- () => gotothecinema("賈瑋", TimeSpan.FromSeconds(2) ),
- () => gotothecinema("單春暉", TimeSpan.FromSeconds(4) )};
- Parallel.Invoke(steps);
- Console.ReadKey();
- }
- // 任務(wù)
- static void gotothecinema(string strName, TimeSpan timeToKFC )
- {
- Console.WriteLine("[{0}] 從家里出發(fā)。", strName);
- // 從家里到KFC
- Thread.Sleep(timeToKFC);
- Console.WriteLine("[{0}] 到達(dá)KFC。", strName);
- // 等待其他人到達(dá)
- sync.SignalAndWait();
- // 同步后,進(jìn)行后繼動(dòng)作
- Console.WriteLine("[{0}] 買票進(jìn)電影院。", strName);
- }
- }
- }
在這段代碼中,我們首先創(chuàng)建了Barrier對(duì)象,因?yàn)樵谶@里需要同步的任務(wù)有三個(gè),所以創(chuàng)建Barrier對(duì)象時(shí)是的參數(shù)是3。然后就是使用Parallel.Invoke執(zhí)行并行任務(wù)。我們?cè)诓⑿腥蝿?wù)gotothecinema中設(shè)置了一個(gè)同步點(diǎn),在這里我們調(diào)用Barrier對(duì)象的SignalAndWait函數(shù),它表示當(dāng)前任務(wù)已經(jīng)到達(dá)同步點(diǎn)并同時(shí)等待其他任務(wù)到達(dá)同步點(diǎn)。當(dāng)所有任務(wù)都到達(dá)同步點(diǎn)之后,再繼續(xù)往下執(zhí)行。運(yùn)行上面的程序,我們可以獲得這樣的輸出:
#p#
更復(fù)雜的任務(wù)之間的同步
我們?cè)谑褂肂arrier進(jìn)行并行任務(wù)之間的同步時(shí),有這樣一個(gè)缺陷,我們需要預(yù)先知道所有需要同步的并行任務(wù)的數(shù)目,如果這個(gè)數(shù)目是隨機(jī)的,就無法使用Barrier進(jìn)行任務(wù)之間的同步了。并行任務(wù)數(shù)目不定這種情況很常見。我們還是來看上文中看電影的例子,每場(chǎng)進(jìn)電影院看電影的觀眾數(shù)目是不固定的,那么退場(chǎng)的觀眾也是不固定的,甚至還有中途退場(chǎng)的。當(dāng)所有觀眾都退場(chǎng)后,我們需要打掃電影院的衛(wèi)生。這里需要的同步的就是所有觀眾都退場(chǎng)。針對(duì)這種數(shù)目不定的多個(gè)并行任務(wù),.NET Framework提供了CountdownEvent這個(gè)類來進(jìn)行任務(wù)之間的同步。
就像它的名字一樣,CountdownEvent基于這樣一個(gè)簡(jiǎn)單的規(guī)則:當(dāng)有新的需要同步的任務(wù)產(chǎn)生時(shí),就調(diào)用AddCount增加它的計(jì)數(shù),當(dāng)有任務(wù)到達(dá)同步點(diǎn)是,就調(diào)用Signal函數(shù)減小它的計(jì)數(shù),當(dāng)CountdownEvent的計(jì)數(shù)為零時(shí),就表示所有需要同步的任務(wù)已經(jīng)完成,可以開始下一步任務(wù)了。下面我們利用CountdownEvent來模擬一下觀眾進(jìn)場(chǎng)立場(chǎng)的情景。
- using System;
- using System.Collections.Generic;
- using System.Linq;
- using System.Text;
- using System.Threading;
- using System.Threading.Tasks;
- namespace CountdownEventDemo
- {
- // 觀眾類,用來表示一位觀眾
- class Customer
- {
- public Customer(int nID)
- {
- m_nID = nID;
- }
- // 觀眾的ID
- public int m_nID;
- }
- class Program
- {
- static void Main(string[] args)
- {
- // 創(chuàng)建CountdownEvent同步對(duì)象
- using (var countdown = new CountdownEvent(1))
- {
- // 產(chǎn)生一個(gè)隨機(jī)數(shù),表示觀眾的數(shù)目
- Random countRandom = new Random(DateTime.Now.Millisecond);
- int nCount = countRandom.Next(10);
- // 構(gòu)造每一位觀眾看電影的任務(wù)
- Action[] seeafilm = new Action[ nCount ];
- for (int i = 0; i < nCount; i++)
- {
- // 構(gòu)造Customer對(duì)象,表示觀眾
- Customer currentCustomer = new Customer( i+1 );
- seeafilm[i] = () =>
- {
- // 觀眾進(jìn)場(chǎng)
- countdown.AddCount();
- Console.WriteLine("觀眾 {0} 進(jìn)場(chǎng)。", currentCustomer.m_nID);
- // 模擬看電影的時(shí)間
- Thread.Sleep(countRandom.Next(3000,6000));
- // 觀眾退場(chǎng)
- countdown.Signal();
- Console.WriteLine("觀眾 {0} 退場(chǎng)。", currentCustomer.m_nID);
- };
- }
- //并行執(zhí)行任務(wù)
- Parallel.Invoke( seeafilm );
- // 在此同步,最后CountdownEvent的計(jì)數(shù)變?yōu)榱?
- countdown.Signal();
- countdown.Wait();
- }
- Console.WriteLine("所有觀眾退場(chǎng),開始打掃衛(wèi)生。");
- Console.ReadKey();
- }
在這段代碼中,我們使用CountdownEvent進(jìn)行隨機(jī)個(gè)數(shù)任務(wù)之間的同步。最后,我們可以得到這樣的輸出。
通過Parallel.Invoke函數(shù),我們可以輕松地將相互獨(dú)立的任務(wù)并行執(zhí)行,同時(shí)通過Barrier和CountdownEvent類進(jìn)行任務(wù)之間的同步。這種并行計(jì)算的開發(fā)方式,比以前那種基于線程的并行計(jì)算開發(fā)方式簡(jiǎn)便很多,解放了程序員的腦袋,讓他們可以把更多的腦力放到業(yè)務(wù)邏輯問題的解決之上。使用Parallel類,多快好省地開發(fā)并行計(jì)算應(yīng)用程序。
【編輯推薦】