C#多線程處理多個隊列數(shù)據(jù)的方法
本文轉(zhuǎn)載自微信公眾號「后端Q」,作者conan。轉(zhuǎn)載本文請聯(lián)系后端Q公眾號。
概述
多線程(multithreading),是指從軟件或者硬件上實現(xiàn)多個線程并發(fā)執(zhí)行的技術(shù)。具有多線程能力的計算機(jī)因有硬件支持而能夠在同一時間執(zhí)行多于一個線程,進(jìn)而提升整體處理性能。具有這種能力的系統(tǒng)包括對稱多處理機(jī)、多核心處理器以及芯片級多處理或同時多線程處理器。在一個程序中,這些獨立運行的程序片段叫作“線程”(Thread),利用它編程的概念就叫作“多線程處理”。
隊列(Queue)代表了一個先進(jìn)先出的對象集合。當(dāng)您需要對各項進(jìn)行先進(jìn)先出的訪問時,則使用隊列。當(dāng)您在列表中添加一項,稱為入隊,當(dāng)您從列表中移除一項時,稱為出隊。
比如平常我們在處理定時任務(wù)的時候,假設(shè)就一臺機(jī)器,我們不可能單線程一條一條數(shù)據(jù)的去跑,這時候就需要提高機(jī)器資源的利用率。
下面我們來介紹下,如何實現(xiàn)多線程+隊列以提高并發(fā)處理能力。
代碼實現(xiàn)
1、定義線程數(shù)threadNum和隊列queues
- /// <summary>
- /// 線程總數(shù)
- /// </summary>
- private int threadNum = 4;
- /// <summary>
- /// 總數(shù)
- /// </summary>
- private int totalCount = 0;
- /// <summary>
- /// 已處理
- /// </summary>
- private int index = 0;
- /// <summary>
- /// 隊列
- /// </summary>
- private ConcurrentQueue<AssetRepayment> queues = new ConcurrentQueue<AssetRepayment>();
2、定義線程列表,往線程添加數(shù)據(jù)
- public void SubDeTransaction()
- {
- var list = new List<AssetRepayment>();
- for (int i = 0; i < 1000; i++)
- {
- list.Add(new AssetRepayment() { Title = i.ToString() + "---" + Guid.NewGuid().ToString() });
- }
- if (list == null || list.Count() == 0)
- {
- Console.WriteLine("沒有可執(zhí)行的數(shù)據(jù)");
- return;
- }
- totalCount = list.Count;
- Console.WriteLine("可執(zhí)行的數(shù)據(jù):" + list.Count() + "條");
- foreach (var item in list)
- {
- queues.Enqueue(item);
- }
- List<Task> tasks = new List<Task>();
- for (int i = 0; i < threadNum; i++)
- {
- var task = Task.Run(() =>
- {
- Process();
- });
- tasks.Add(task);
- }
- var taskList = Task.Factory.ContinueWhenAll(tasks.ToArray(), (ts) =>
- {
- });
- taskList.Wait();
- }
3、對線程數(shù)進(jìn)行限制 for (int i = 0; i < threadNum; i++)
- var taskList = Task.Factory.ContinueWhenAll(tasks.ToArray(), (ts) =>
- {
- });
- taskList.Wait();
4、從隊列取出數(shù)據(jù)進(jìn)行業(yè)務(wù)處理
- private void Process()
- {
- while (true)
- {
- var currentIndex = Interlocked.Increment(ref index);
- AssetRepayment repayId = null;
- var isExit = queues.TryDequeue(out repayId);
- if (!isExit)
- {
- break;
- }
- try
- {
- Console.WriteLine(repayId.Title);
- Console.WriteLine(string.Format(" 共{0}條 當(dāng)前第{1}條", totalCount, currentIndex));
- }
- catch (Exception ex)
- {
- Console.WriteLine(ex);
- }
- }
- }
運行測試
代碼地址
https://gitee.com/conanOpenSource_admin/Example