使用Java多線程實現(xiàn)任務(wù)分發(fā)
多線程下載由來已久,如 FlashGet、NetAnts 等工具,它們都是依懶于 HTTP 協(xié)議的支持(Range 字段指定請求內(nèi)容范圍),首先能讀取出請求內(nèi)容 (即欲下載的文件) 的大小,劃分出若干區(qū)塊,把區(qū)塊分段分發(fā)給每個線程去下載,線程從本段起始處下載數(shù)據(jù)及至段尾,多個線程下載的內(nèi)容最終會寫入到同一個文件中。
只研究有用的,工作中的需求:要把多個任務(wù)分派給Java的多個線程去執(zhí)行,這其中就會有一個任務(wù)列表指派到線程的策略思考:已知:1. 一個待執(zhí)行的任務(wù)列表,2. 指定要啟動的線程數(shù);問題是:每個線程實際要執(zhí)行哪些任務(wù)。
使用Java多線程實現(xiàn)這種任務(wù)分發(fā)的策略是:任務(wù)列表連續(xù)按線程數(shù)分段,先保證每線程平均能分配到的任務(wù)數(shù),余下的任務(wù)從前至后依次附加到線程中--只是數(shù)量上,實際每個線程執(zhí)行的任務(wù)都還是連續(xù)的。如果出現(xiàn)那種僧多(線程) 粥(任務(wù)) 少的情況,實際啟動的線程數(shù)就等于任務(wù)數(shù),一挑一。這里只實現(xiàn)了每個線程各掃自家門前雪,動作快的完成后眼見別的線程再累都是愛莫能助。
實現(xiàn)及演示代碼如下:由三個類實現(xiàn),寫在了一個 Java 文件中:TaskDistributor 為任務(wù)分發(fā)器,Task 為待執(zhí)行的任務(wù),WorkThread 為自定的工作線程。代碼中運用了命令模式,如若能配以監(jiān)聽器,用上觀察者模式來控制 UI 顯示就更絕妙不過了,就能實現(xiàn)像下載中的區(qū)塊著色跳躍的動感了,在此定義下一步的著眼點了。
代碼中有較為詳細的注釋,看這些注釋和執(zhí)行結(jié)果就很容易理解的。main() 是測試方法
- package com.unmi.common;
- import java.util.ArrayList;
- import java.util.List;
- /**
- * 指派任務(wù)列表給線程的分發(fā)器
- * @author Unmi
- * QQ: 1125535 Email: fantasia@sina.com
- * MSN: kypfos@msn.com 2008-03-25
- */
- public class TaskDistributor {
- /**
- * 測試方法
- * @param args
- */
- public static void main(String[] args) {
- //初始化要執(zhí)行的任務(wù)列表
- List taskList = new ArrayList();
- for (int i = 0; i < 108; i++) {
- taskList.add(new Task(i));
- }
- //設(shè)定要啟動的工作線程數(shù)為 5 個
- int threadCount = 5;
- List[] taskListPerThread = distributeTasks(taskList, threadCount);
- System.out.println("實際要啟動的工作線程數(shù):"+taskListPerThread.length);
- for (int i = 0; i < taskListPerThread.length; i++) {
- Thread workThread = new WorkThread(taskListPerThread[i],i);
- workThread.start();
- }
- }
- /**
- * 把 List 中的任務(wù)分配給每個線程,先平均分配,剩于的依次附加給前面的線程
- * 返回的數(shù)組有多少個元素 (List) 就表明將啟動多少個工作線程
- * @param taskList 待分派的任務(wù)列表
- * @param threadCount 線程數(shù)
- * @return 列表的數(shù)組,每個元素中存有該線程要執(zhí)行的任務(wù)列表
- */
- public static List[] distributeTasks(List taskList, int threadCount) {
- // 每個線程至少要執(zhí)行的任務(wù)數(shù),假如不為零則表示每個線程都會分配到任務(wù)
- int minTaskCount = taskList.size() / threadCount;
- // 平均分配后還剩下的任務(wù)數(shù),不為零則還有任務(wù)依個附加到前面的線程中
- int remainTaskCount = taskList.size() % threadCount;
- // 實際要啟動的線程數(shù),如果工作線程比任務(wù)還多
- // 自然只需要啟動與任務(wù)相同個數(shù)的工作線程,一對一的執(zhí)行
- // 畢竟不打算實現(xiàn)了線程池,所以用不著預(yù)先初始化好休眠的線程
- int actualThreadCount = minTaskCount > 0 ? threadCount : remainTaskCount;
- // 要啟動的線程數(shù)組,以及每個線程要執(zhí)行的任務(wù)列表
- List[] taskListPerThread = new List[actualThreadCount];
- int taskIndex = 0;
- //平均分配后多余任務(wù),每附加給一個線程后的剩余數(shù),重新聲明與 remainTaskCount
- //相同的變量,不然會在執(zhí)行中改變 remainTaskCount 原有值,產(chǎn)生麻煩
- int remainIndces = remainTaskCount;
- for (int i = 0; i < taskListPerThread.length; i++) {
- taskListPerThread[i] = new ArrayList();
- // 如果大于零,線程要分配到基本的任務(wù)
- if (minTaskCount > 0) {
- for (int j = taskIndex; j < minTaskCount + taskIndex; j++) {
- taskListPerThread[i].add(taskList.get(j));
- }
- taskIndex += minTaskCount;
- }
- // 假如還有剩下的,則補一個到這個線程中
- if (remainIndces > 0) {
- taskListPerThread[i].add(taskList.get(taskIndex++));
- remainIndces--;
- }
- }
- // 打印任務(wù)的分配情況
- for (int i = 0; i < taskListPerThread.length; i++) {
- System.out.println("線程 " + i + " 的任務(wù)數(shù):" +
- taskListPerThread[i].size() + " 區(qū)間["
- + taskListPerThread[i].get(0).getTaskId() + ","
- + taskListPerThread[i].get(taskListPerThread[i].size() - 1).getTaskId() + "]");
- }
- return taskListPerThread;
- }
- }
- /**
- * 要執(zhí)行的任務(wù),可在執(zhí)行時改變它的某個狀態(tài)或調(diào)用它的某個操作
- * 例如任務(wù)有三個狀態(tài),就緒,運行,完成,默認(rèn)為就緒態(tài)
- * 要進一步完善,可為 Task 加上狀態(tài)變遷的監(jiān)聽器,因之決定UI的顯示
- */
- class Task {
- public static final int READY = 0;
- public static final int RUNNING = 1;
- public static final int FINISHED = 2;
- private int status;
- //聲明一個任務(wù)的自有業(yè)務(wù)含義的變量,用于標(biāo)識任務(wù)
- private int taskId;
- //任務(wù)的初始化方法
- public Task(int taskId){
- this.status = READY;
- this.taskId = taskId;
- }
- /**
- * 執(zhí)行任務(wù)
- */
- public void execute() {
- // 設(shè)置狀態(tài)為運行中
- setStatus(Task.RUNNING);
- System.out.println("當(dāng)前線程 ID 是:" + Thread.currentThread().getName()
- +" | 任務(wù) ID 是:"+this.taskId);
- // 附加一個延時
- try {
- Thread.sleep(1000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- // 執(zhí)行完成,改狀態(tài)為完成
- setStatus(FINISHED);
- }
- public void setStatus(int status) {
- this.status = status;
- }
- public int getTaskId() {
- return taskId;
- }
- }
- /**
- * 自定義的工作線程,持有分派給它執(zhí)行的任務(wù)列表
- */
- class WorkThread extends Thread {
- //本線程待執(zhí)行的任務(wù)列表,你也可以指為任務(wù)索引的起始值
- private List taskList = null;
- private int threadId;
- /**
- * 構(gòu)造工作線程,為其指派任務(wù)列表,及命名線程 ID
- * @param taskList 欲執(zhí)行的任務(wù)列表
- * @param threadId 線程 ID
- */
- public WorkThread(List taskList,int threadId) {
- this.taskList = taskList;
- this.threadId = threadId;
- }
- /**
- * 執(zhí)行被指派的所有任務(wù)
- */
- public void run() {
- for (Task task : taskList) {
- task.execute();
- }
- }
- }
執(zhí)行結(jié)果如下,注意觀察每個Java多線程分配到的任務(wù)數(shù)量及區(qū)間。直到所有的線程完成了所分配到的任務(wù)后程序結(jié)束:
- 線程 0 的任務(wù)數(shù):22 區(qū)間[0,21]
- 線程 1 的任務(wù)數(shù):22 區(qū)間[22,43]
- 線程 2 的任務(wù)數(shù):22 區(qū)間[44,65]
- 線程 3 的任務(wù)數(shù):21 區(qū)間[66,86]
- 線程 4 的任務(wù)數(shù):21 區(qū)間[87,107]
- 實際要啟動的工作線程數(shù):5
- 當(dāng)前線程 ID 是:Thread-0 | 任務(wù) ID 是:0
- 當(dāng)前線程 ID 是:Thread-1 | 任務(wù) ID 是:22
- 當(dāng)前線程 ID 是:Thread-2 | 任務(wù) ID 是:44
- 當(dāng)前線程 ID 是:Thread-3 | 任務(wù) ID 是:66
- 當(dāng)前線程 ID 是:Thread-4 | 任務(wù) ID 是:87
- 當(dāng)前線程 ID 是:Thread-0 | 任務(wù) ID 是:1
- 當(dāng)前線程 ID 是:Thread-1 | 任務(wù) ID 是:23
- 當(dāng)前線程 ID 是:Thread-2 | 任務(wù) ID 是:45
上面坦白來只算是基本功夫,貼出來還真見笑了。還有更為復(fù)雜的功能.
像Java多線程的下載工具的確更充分利用了網(wǎng)絡(luò)資源,而且像 FlashGet、NetAnts 都實現(xiàn)了:假如某個線程下載完了欲先所分配段的內(nèi)容之后,會幫其他線程下載未完成數(shù)據(jù),直到任務(wù)完成;或某一下載線程的未完成段區(qū)間已經(jīng)很小了,用不著別人來幫忙時,這就涉及到任務(wù)的進一步分配。再如,以上兩個工具都能動態(tài)增加、減小或中止線程,越說越復(fù)雜了,它們原本比這復(fù)雜多了,這些實現(xiàn)可能定義各種隊列來實現(xiàn),如未完成任務(wù)隊列、下載中任務(wù)隊列和已完成隊列等。
【編輯推薦】