詳細(xì)的.Net并行編程高級(jí)教程--Parallel
一直覺得自己對(duì)并發(fā)了解不夠深入,特別是看了《代碼整潔之道》覺得自己有必要好好學(xué)學(xué)并發(fā)編程,因?yàn)樾阅芤彩呛饬看a整潔的一大標(biāo)準(zhǔn)。而且在《失控》這本書中也多次提到并發(fā),不管是計(jì)算機(jī)還是生物都并發(fā)處理著各種事物。人真是奇怪,當(dāng)你關(guān)注一個(gè)事情的時(shí)候,你會(huì)發(fā)現(xiàn)周圍的事物中就常出現(xiàn)那個(gè)事情。所以好奇心驅(qū)使下學(xué)習(xí)并發(fā)。便有了此文。
一、理解硬件線程和軟件線程
多核處理器帶有一個(gè)以上的物理內(nèi)核--物理內(nèi)核是真正的獨(dú)立處理單元,多個(gè)物理內(nèi)核使得多條指令能夠同時(shí)并行運(yùn)行。硬件線程也稱為邏輯內(nèi)核,一個(gè)物理內(nèi) 核可以使用超線程技術(shù)提供多個(gè)硬件線程。所以一個(gè)硬件線程并不代表一個(gè)物理內(nèi)核;Windows中每個(gè)運(yùn)行的程序都是一個(gè)進(jìn)程,每一個(gè)進(jìn)程都會(huì)創(chuàng)建并運(yùn)行 一個(gè)或多個(gè)線程,這些線程稱為軟件線程。硬件線程就像是一條泳道,而軟件線程就是在其中游泳的人。
二、并行場合
.Net Framework4 引入了新的Task Parallel Library(任務(wù)并行庫,TPL),它支持?jǐn)?shù)據(jù)并行、任務(wù)并行和流水線。讓開發(fā)人員應(yīng)付不同的并行場合。
-
數(shù)據(jù)并行:有大量數(shù)據(jù)需要處理,并且必須對(duì)每一份數(shù)據(jù)執(zhí)行同樣的操作。比如通過256bit的密鑰對(duì)100個(gè)Unicode字符串進(jìn)行AES算法加密。
-
任務(wù)并行:通過任務(wù)并發(fā)運(yùn)行不同的操作。例如生成文件散列碼,加密字符串,創(chuàng)建縮略圖。
-
流水線:這是任務(wù)并行和數(shù)據(jù)并行的結(jié)合體。
TPL引入了System.Threading.Tasks ,主類是Task,這個(gè)類表示一個(gè)異步的并發(fā)的操作,然而我們不一定要使用Task類的實(shí)例,可以使用Parallel靜態(tài)類。它提供了 Parallel.Invoke, Parallel.For Parallel.Forecah 三個(gè)方法。
三、Parallel.Invoke
試圖讓很多方法并行運(yùn)行的最簡單的方法就是使用Parallel類的Invoke方法。例如有四個(gè)方法:
-
WatchMovie
-
HaveDinner
-
ReadBook
-
WriteBlog
通過下面的代碼就可以使用并行。
System.Threading.Tasks.Parallel.Invoke(WatchMovie, HaveDinner, ReadBook, WriteBlog);
這段代碼會(huì)創(chuàng)建指向每一個(gè)方法的委托。Invoke方法接受一個(gè)Action的參數(shù)組。
1 |
|
用lambda表達(dá)式或匿名委托可以達(dá)到同樣的效果。
System.Threading.Tasks.Parallel.Invoke(() => WatchMovie(), () => HaveDinner(), () => ReadBook(), delegate() { WriteBlog(); });
1.沒有特定的執(zhí)行順序。
Parallel.Invoke方法只有在4個(gè)方法全部完成之后才會(huì)返回。它至少需要4個(gè)硬件線程才足以讓這4個(gè)方法并發(fā)運(yùn)行。但并不保證這4個(gè)方法能夠同時(shí)啟動(dòng)運(yùn)行,如果一個(gè)或者多個(gè)內(nèi)核處于繁忙狀態(tài),那么底層的調(diào)度邏輯可能會(huì)延遲某些方法的初始化執(zhí)行。
給方法加上延時(shí),就可以看到必須等待最長的方法執(zhí)行完成才回到主方法。
- static void Main(string[] args)
- {
- System.Threading.Tasks.Parallel.Invoke(WatchMovie, HaveDinner, ReadBook,
- WriteBlog);
- Console.WriteLine("執(zhí)行完成");
- Console.ReadKey();
- }
- static void WatchMovie()
- {
- Thread.Sleep(5000);
- Console.WriteLine("看電影");
- }
- static void HaveDinner()
- {
- Thread.Sleep(1000);
- Console.WriteLine("吃晚飯");
- }
- static void ReadBook()
- {
- Thread.Sleep(2000);
- Console.WriteLine("讀書");
- }
- static void WriteBlog()
- {
- Thread.Sleep(3000);
- Console.WriteLine("寫博客");
- }
這樣會(huì)造成很多邏輯內(nèi)核處于長時(shí)間閑置狀態(tài)。
四、Parallel.For
Parallel.For為固定數(shù)目的獨(dú)立For循環(huán)迭代提供了負(fù)載均衡 (即將工作分發(fā)到不同的任務(wù)中執(zhí)行,這樣所有的任務(wù)在大部分時(shí)間都可以保持繁忙) 的并行執(zhí)行。從而能盡可能地充分利用所有的可用的內(nèi)核。
我們比較下下面兩個(gè)方法,一個(gè)使用For循環(huán),一個(gè)使用Parallel.For 都是生成密鑰在轉(zhuǎn)換為十六進(jìn)制字符串。
- private static void GenerateAESKeys()
- {
- var sw = Stopwatch.StartNew();
- for (int i = 0; i < NUM_AES_KEYS; i++)
- {
- var aesM = new AesManaged();
- aesM.GenerateKey();
- byte[] result = aesM.Key;
- string hexStr = ConverToHexString(result);
- }
- Console.WriteLine("AES:"+sw.Elapsed.ToString());
- }
- private static void ParallelGenerateAESKeys()
- {
- var sw = Stopwatch.StartNew();
- System.Threading.Tasks.Parallel.For(1, NUM_AES_KEYS + 1, (int i) =>
- {
- var aesM = new AesManaged();
- aesM.GenerateKey();
- byte[] result = aesM.Key;
- string hexStr = ConverToHexString(result);
- });
- Console.WriteLine("Parallel_AES:" + sw.Elapsed.ToString());
- }
private static int NUM_AES_KEYS = 100000;
static void Main(string[] args)
{
Console.WriteLine("執(zhí)行"+NUM_AES_KEYS+"次:"); GenerateAESKeys();
ParallelGenerateAESKeys();
Console.ReadKey();
}
執(zhí)行1000000次
這里并行的時(shí)間是串行的一半。
五、Parallel.ForEach
在Parallel.For中,有時(shí)候?qū)扔醒h(huán)進(jìn)行優(yōu)化可能會(huì)是一個(gè)非常復(fù)雜的任務(wù)。Parallel.ForEach為固定數(shù)目的獨(dú)立For Each循環(huán)迭代提供了負(fù)載均衡的并行執(zhí)行,且支持自定義分區(qū)器,讓使用者可以完全掌握數(shù)據(jù)分發(fā)。實(shí)質(zhì)就是將所有要處理的數(shù)據(jù)區(qū)分為多個(gè)部分,然后并行運(yùn) 行這些串行循環(huán)。
修改上面的代碼:
- System.Threading.Tasks.Parallel.ForEach(Partitioner.Create(1, NUM_AES_KEYS + 1), range =>
- {
- var aesM = new AesManaged();
- Console.WriteLine("AES Range({0},{1} 循環(huán)開始時(shí)間:{2})",range.Item1,range.Item2,DateTime.Now.TimeOfDay);
- for (int i = range.Item1; i < range.Item2; i++)
- {
- aesM.GenerateKey();
- byte[] result = aesM.Key;
- string hexStr = ConverToHexString(result);
- }
- Console.WriteLine("AES:"+sw.Elapsed.ToString());
- });
從執(zhí)行結(jié)果可以看出,分了13個(gè)段執(zhí)行的。
第二次執(zhí)行還是13個(gè)段。速度上稍微有差異。開始沒有指定分區(qū)數(shù),Partitioner.Create使用的是內(nèi)置默認(rèn)值。
而且我們發(fā)現(xiàn)這些分區(qū)并不是同時(shí)執(zhí)行的,大致是分了三個(gè)時(shí)間段執(zhí)行。而且執(zhí)行順序是不同的。總的時(shí)間和Parallel.For的方法差不多。
public static ParallelLoopResult ForEach<TSource>(Partitioner<TSource> source, Action<TSource> body)
Parallel.ForEach方法定義了source和Body兩個(gè)參數(shù)。source是指分區(qū)器。提供了分解為多個(gè)分區(qū)的數(shù)據(jù)源。body是 要調(diào)用的委托。它接受每一個(gè)已定義的分區(qū)作為參數(shù)。一共有20多個(gè)重載,在上面的例子中,分區(qū)的類型為Tuple<int,int>,是一個(gè) 二元組類型。此外,返回一個(gè)ParallelLoopResult的值。
Partitioner.Create 創(chuàng)建分區(qū)是根據(jù)邏輯內(nèi)核數(shù)及其他因素決定。
- public static OrderablePartitioner<Tuple<int, int>> Create(int fromInclusive, int toExclusive)
- {
- int num = 3;
- if (toExclusive <= fromInclusive)
- throw new ArgumentOutOfRangeException("toExclusive");
- int rangeSize = (toExclusive - fromInclusive) / (PlatformHelper.ProcessorCount * num);
- if (rangeSize == 0)
- rangeSize = 1;
- return Partitioner.Create<Tuple<int, int>>(Partitioner.CreateRanges(fromInclusive, toExclusive, rangeSize), EnumerablePartitionerOptions.NoBuffering);
- }
因此我們可以修改分區(qū)數(shù)目,rangesize大致為250000左右。也就是說我的邏輯內(nèi)核是4.
var rangesize = (int) (NUM_AES_KEYS/Environment.ProcessorCount) + 1;
System.Threading.Tasks.Parallel.ForEach(Partitioner.Create(1, NUM_AES_KEYS + 1,rangesize), range =>
再次執(zhí)行:
分區(qū)變成了四個(gè),時(shí)間上沒有多大差別(***個(gè)時(shí)間是串行時(shí)間)。我們看見這四個(gè)分區(qū)幾乎是同時(shí)執(zhí)行的。大部分情況下,TPL在幕后使用的負(fù)載均衡機(jī)制都是非常高效的,然而對(duì)分區(qū)的控制便于使用者對(duì)自己的工作負(fù)載進(jìn)行分析,來改進(jìn)整體的性能。
Parallel.ForEach也能對(duì)IEnumerable<int>集合進(jìn)行重構(gòu)。Enumerable.Range生產(chǎn)了序列化的數(shù)目。但這樣就沒有上面的分區(qū)效果。
- private static void ParallelForEachGenerateMD5HasHes()
- {
- var sw = Stopwatch.StartNew();
- System.Threading.Tasks.Parallel.ForEach(Enumerable.Range(1, NUM_AES_KEYS), number =>
- {
- var md5M = MD5.Create();
- byte[] data = Encoding.Unicode.GetBytes(Environment.UserName + number);
- byte[] result = md5M.ComputeHash(data);
- string hexString = ConverToHexString(result);
- });
- Console.WriteLine("MD5:"+sw.Elapsed.ToString());
- }
#p#
六、從循環(huán)中退出
和串行運(yùn)行中的break不同,ParallelLoopState 提供了兩個(gè)方法用于停止Parallel.For 和 Parallel.ForEach的執(zhí)行。
-
Break:讓循環(huán)在執(zhí)行了當(dāng)前迭代后盡快停止執(zhí)行。比如執(zhí)行到100了,那么循環(huán)會(huì)處理掉所有小于100的迭代。
-
Stop:讓循環(huán)盡快停止執(zhí)行。如果執(zhí)行到了100的迭代,那不能保證處理完所有小于100的迭代。
修改上面的方法:執(zhí)行3秒后退出。
- private static void ParallelLoopResult(ParallelLoopResult loopResult)
- {
- string text;
- if (loopResult.IsCompleted)
- {
- text = "循環(huán)完成";
- }
- else
- {
- if (loopResult.LowestBreakIteration.HasValue)
- {
- text = "Break終止";
- }
- else
- {
- text = "Stop 終止";
- }
- }
- Console.WriteLine(text);
- }
- private static void ParallelForEachGenerateMD5HasHesBreak()
- {
- var sw = Stopwatch.StartNew();
- var loopresult= System.Threading.Tasks.Parallel.ForEach(Enumerable.Range(1, NUM_AES_KEYS), (int number,ParallelLoopState loopState) =>
- {
- var md5M = MD5.Create();
- byte[] data = Encoding.Unicode.GetBytes(Environment.UserName + number);
- byte[] result = md5M.ComputeHash(data);
- string hexString = ConverToHexString(result);
- if (sw.Elapsed.Seconds > 3)
- {
- loopState.Stop();
- }
- });
- ParallelLoopResult(loopresult);
- Console.WriteLine("MD5:" + sw.Elapsed);
- }
七、捕捉并行循環(huán)中發(fā)生的異常。
當(dāng)并行迭代中調(diào)用的委托拋出異常,這個(gè)異常沒有在委托中被捕獲到時(shí),就會(huì)變成一組異常,新的System.AggregateException負(fù)責(zé)處理這一組異常。
- private static void ParallelForEachGenerateMD5HasHesException()
- {
- var sw = Stopwatch.StartNew();
- var loopresult = new ParallelLoopResult();
- try
- {
- loopresult = System.Threading.Tasks.Parallel.ForEach(Enumerable.Range(1, NUM_AES_KEYS), (number, loopState) =>
- {
- var md5M = MD5.Create();
- byte[] data = Encoding.Unicode.GetBytes(Environment.UserName + number);
- byte[] result = md5M.ComputeHash(data);
- string hexString = ConverToHexString(result);
- if (sw.Elapsed.Seconds > 3)
- {
- throw new TimeoutException("執(zhí)行超過三秒");
- }
- });
- }
- catch (AggregateException ex)
- {
- foreach (var innerEx in ex.InnerExceptions)
- {
- Console.WriteLine(innerEx.ToString());
- }
- }
- ParallelLoopResult(loopresult);
- Console.WriteLine("MD5:" + sw.Elapsed);
- }
結(jié)果:
異常出現(xiàn)了好幾次。
#p#
八、指定并行度。
TPL的方法總會(huì)試圖利用所有可用的邏輯內(nèi)核來實(shí)現(xiàn)***的結(jié)果,但有時(shí)候你并不希望在并行循環(huán)中使用所有的內(nèi)核。比如你需要留出一個(gè)不參與并行計(jì)算 的內(nèi)核,來創(chuàng)建能夠響應(yīng)用戶的應(yīng)用程序,而且這個(gè)內(nèi)核需要幫助你運(yùn)行代碼中的其他部分。這個(gè)時(shí)候一種好的解決方法就是指定***并行度。
這需要?jiǎng)?chuàng)建一個(gè)ParallelOptions的實(shí)例,設(shè)置MaxDegreeOfParallelism的值。
- private static void ParallelMaxDegree(int maxDegree)
- {
- var parallelOptions = new ParallelOptions();
- parallelOptions.MaxDegreeOfParallelism = maxDegree;
- var sw = Stopwatch.StartNew();
- System.Threading.Tasks.Parallel.For(1, NUM_AES_KEYS + 1, parallelOptions, (int i) =>
- {
- var aesM = new AesManaged();
- aesM.GenerateKey();
- byte[] result = aesM.Key;
- string hexStr = ConverToHexString(result);
- });
- Console.WriteLine("AES:" + sw.Elapsed.ToString());
- }
調(diào)用:如果在四核微處理器上運(yùn)行,那么將使用3個(gè)內(nèi)核。
ParallelMaxDegree(Environment.ProcessorCount - 1);
時(shí)間上大致慢了點(diǎn)(***次Parallel.For 3.18s),但可以騰出一個(gè)內(nèi)核來處理其他的事情。
小結(jié):這次學(xué)習(xí)了Parallel相關(guān)方法以及如何退出并行循環(huán)和捕獲異常、設(shè)置并行度,還有并行相關(guān)的知識(shí)。園子里也有類似的博客。但作為自己知識(shí)的管理,在這里梳理一遍。