.NET 4多核并行中的Task優(yōu)化線程池
閱讀本篇前,讀者需對.NET4 System.Threading.Tasks 以及 Task Schedulers 有一定的了解。如果不是很了解,請查閱以下相關(guān)信息:
Task: http://msdn.microsoft.com/en-us/library/system.threading.tasks.task%28VS.100%29.aspx
Task Schedulers: http://msdn.microsoft.com/en-us/library/dd997402.aspx
首先回顧相關(guān)場景:最近工作需要一直在.NET4下編寫window service。在WindowsService下使用了多線程相關(guān)技術(shù)。期間就用了到了線程池。使用線程池的目的:在系統(tǒng)中進(jìn)行多線程并發(fā)也擔(dān)心并發(fā)數(shù)量太大影響性能。于是使用線程池進(jìn)行排隊(duì)。一批一批執(zhí)行多線程。當(dāng)我在使用傳統(tǒng)的.NET線程池的過程中碰見了一些問題,請看以下代碼:
- try
- {
- ThreadPool.SetMaxThreads(2, 2);
- for (int i = 0; i < 5; i++)
- {
- ThreadPool.QueueUserWorkItem(new WaitCallback(InvokeThread1), i);
- }
- }
- catch
- {
- Console.WriteLine("error");
- }
這里建立一個(gè)同時(shí)2個(gè)線程并發(fā)的線程池。在上述代碼第7行傳入InvokeThread1方法:
- static void InvokeThread1(object obj) {
- throw new NullReferenceException();
- }
假設(shè)程序發(fā)生異常,這個(gè)異常卻讓整個(gè)程線程池序崩潰了。主程序并未catch到這個(gè)exception。也許您會說這本來就是這樣的嘛,有什么好貼出來的。但是在.NET4中我們可以避免掉這個(gè)問題。(此時(shí)體現(xiàn)出.NET4的異常強(qiáng)大)。還有個(gè)問題有必要提到:如果一次有兩個(gè)線程同時(shí)并發(fā)(一共要執(zhí)行5個(gè)線程,每次并發(fā)2個(gè))。假設(shè)其中一個(gè)線程執(zhí)行過程中出現(xiàn)了異常,要讓這兩個(gè)線程以外的三個(gè)線程都停止運(yùn)行,來節(jié)省系統(tǒng)資源。傳統(tǒng)的線程池也許可以做到,但是控制起來估計(jì)不會讓你太輕松。但是在.NET4的Task機(jī)制中,這些都得到了妥善的解決,現(xiàn)將以上兩個(gè)問題解決方案給出。如果存在不足的地方,請您指出。
一、自定義TaskScheduler
TaskScheduler代碼如下:
自定義TaskScheduler
- //自定義TaskScheduler
- public class CustomTaskScheduler : TaskScheduler, IDisposable
- {
- //調(diào)用Task的線程
- Thread[] _Threads;
- //Task Collection
- BlockingCollection<Task> _Tasks = new BlockingCollection<Task>();
- int _ConcurrencyLevel;
- //設(shè)置schedule并發(fā)
- public CustomTaskScheduler(int concurrencyLevel)
- {
- _Threads = new Thread[concurrencyLevel];
- this._ConcurrencyLevel = concurrencyLevel;
- for (int i = 0; i < concurrencyLevel; i++)
- {
- _Threads[i] = new Thread(() =>
- {
- foreach (Task task in _Tasks.GetConsumingEnumerable())
- this.TryExecuteTask(task);27
- });
- _Threads[i].Start();
- }
- }
- protected override void QueueTask(Task task)
- {
- _Tasks.Add(task);
- }
- protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued)
- {
- if (_Threads.Contains(Thread.CurrentThread)) return TryExecuteTask(task);
- return false;
- }
- public override int MaximumConcurrencyLevel50 {
- get
- {
- return _ConcurrencyLevel;54 }
- }
- protected override IEnumerable<Task> GetScheduledTasks()
- {
- return _Tasks.ToArray();
- }
- public void Dispose()
- {
- this._Tasks.CompleteAdding();
- foreach (Thread t in _Threads)
- {
- t.Join();
- }
- }
- }
該scheduler代碼很簡單,重寫相關(guān)System.Threading.Tasks.TaskScheduler類下的相關(guān)方法即可,代碼中已給出相關(guān)注釋。
二、使用自定義的TaskScheduler
調(diào)用TaskScheduler代碼:
- List<string> listMsg = new List<string>() { "Task1", "Task2", "Task3", "Task4", "Task5", "Task6" };
- List<Task> listTask = new List<Task>();
- foreach (string msg in listMsg)
- {
- Task myTask = new Task(obj => InvokeThread2((string)obj), msg, token);
- listTask.Add(myTask);
- myTask.Start(customTaskScheduler);
- }
- try
- {
- //等待所有線程全部運(yùn)行結(jié)束
- Task.WaitAll(listTask.ToArray());
- }
- catch (AggregateException ex)
- {
- //.NET4 Task的統(tǒng)一異常處理機(jī)制
- foreach (Exception inner in ex.InnerExceptions)
- {
- Console.WriteLine("Exception type {0} from {1}",
- inner.GetType(), inner.Source);
- }
- }
- Console.ReadLine();
InvokeThread2 相關(guān)代碼:
- static void InvokeThread2(string msg)
- {
- try
- {
- var x = Convert.ToInt32(msg.Replace("Task", "").Trim());
- Console.WriteLine(msg);
- Thread.Sleep(1000 * 5);
- Console.WriteLine("{0} ok", msg); }
- catch (Exception ex)
- {
- //如果有異常發(fā)生則取消正在排隊(duì)的所有線程。
- tokenSource.Cancel();
- Exception exception = new Exception("error");
- exception.Source = msg;
- throw exception;
- } }
以上代碼運(yùn)行效果如下:
接著在TaskScheduler調(diào)用代碼中如果將第一行代碼listMsg值修改成 List<string> listMsg = new List<string>() { "Task1", "Task2", "TaskA", "Task3", "Task4", "Task5", "Task6", "Task7", "Task8", "Task9" };這時(shí)候我們將得到以下結(jié)果:
這個(gè)運(yùn)行結(jié)果重點(diǎn)要強(qiáng)調(diào)的地方為:后面這7個(gè)exception。聰明的您或許已經(jīng)看出來前6個(gè)exception屬于沒有執(zhí)行的"Task4", "Task5", "Task6", "Task7", "Task8", "Task9",而最后一個(gè)exception才是真正的發(fā)生異常的"TaskA"。這里主要用到了Task的統(tǒng)一異常處理機(jī)制AggregateException。可以從運(yùn)行結(jié)果得到:Task1,Task2,Task3執(zhí)行成功了,但是TaskA發(fā)生了異常導(dǎo)致了后面排隊(duì)的"Task4", "Task5", "Task6", "Task7", "Task8", "Task9"都不會執(zhí)行了。節(jié)省了系統(tǒng)資源,同時(shí)也提高了系統(tǒng)性能。
三、小結(jié)
本文主要用到了的是.NET4 的Task相關(guān)技術(shù),Task讓我們在多核并行控制的時(shí)候更加簡單,功能更加強(qiáng)大。如果需進(jìn)一步了解相關(guān)技術(shù),博客園已經(jīng)有不少教程。微軟的MSDN也提供了很多參考資料。 最后希望本文可以給您的開發(fā)帶來幫助。
原文鏈接:http://www.cnblogs.com/ryanding/archive/2011/03/22/1990799.html
【編輯推薦】