Java多線程設(shè)計(jì)模式之線程池模式
前序:
Thread-Per-Message Pattern,是一種對于每個(gè)命令或請求,都分配一個(gè)線程,由這個(gè)線程執(zhí)行工作。它將“委托消息的一端”和“執(zhí)行消息的一端”用兩個(gè)不同的線程來實(shí)現(xiàn)。該線程模式主要包括三個(gè)部分:
1,Request參與者(委托人),也就是消息發(fā)送端或者命令請求端
2,Host參與者,接受消息的請求,負(fù)責(zé)為每個(gè)消息分配一個(gè)工作線程。
3,Worker參與者,具體執(zhí)行Request參與者的任務(wù)的線程,由Host參與者來啟動(dòng)。
由于常規(guī)調(diào)用一個(gè)方法后,必須等待該方法完全執(zhí)行完畢后才能繼續(xù)執(zhí)行下一步操作,而利用線程后,就不必等待具體任務(wù)執(zhí)行完畢,就可以馬上返回繼續(xù)執(zhí)行下一步操作。
背景:
由于在Thread-Per-Message Pattern中對于每一個(gè)請求都會(huì)生成啟動(dòng)一個(gè)線程,而線程的啟動(dòng)是很花費(fèi)時(shí)間的工作,所以鑒于此,提出了Worker Thread,重復(fù)利用已經(jīng)啟動(dòng)的線程。
線程池:
Worker Thread,也稱為工人線程或背景線程,不過一般都稱為線程池。該模式主要在于,事先啟動(dòng)一定數(shù)目的工作線程。當(dāng)沒有請求工作的時(shí)候,所有的工人線程都會(huì)等待新的請求過來,一旦有工作到達(dá),就馬上從線程池中喚醒某個(gè)線程來執(zhí)行任務(wù),執(zhí)行完畢后繼續(xù)在線程池中等待任務(wù)池的工作請求的到達(dá)。
任務(wù)池:主要是存儲(chǔ)接受請求的集合,利用它可以緩沖接受到的請求,可以設(shè)置大小來表示同時(shí)能夠接受最大請求數(shù)目。這個(gè)任務(wù)池主要是供線程池來訪問。
線程池:這個(gè)是工作線程所在的集合,可以通過設(shè)置它的大小來提供并發(fā)處理的工作量。對于線程池的大小,可以事先生成一定數(shù)目的線程,根據(jù)實(shí)際情況來動(dòng)態(tài)增加或者減少線程數(shù)目。線程池的大小不是越大越好,線程的切換也會(huì)耗時(shí)的。
存放池的數(shù)據(jù)結(jié)構(gòu),可以用數(shù)組也可以利用集合,在集合類中一般使用Vector,這個(gè)是線程安全的。
Worker Thread的所有參與者:
1,Client參與者,發(fā)送Request的參與者
2,Channel參與者,負(fù)責(zé)緩存Request的請求,初始化啟動(dòng)線程,分配工作線程
3,Worker參與者,具體執(zhí)行Request的工作線程
4,Request參與者
注意:將在Worker線程內(nèi)部等待任務(wù)池非空的方式稱為正向等待。
將在Channel線程提供Worker線程來判斷任務(wù)池非空的方式稱為反向等待。
線程池實(shí)例1:
利用同步方法來實(shí)現(xiàn),使用數(shù)組來作為任務(wù)池的存放數(shù)據(jù)結(jié)構(gòu)。在Channel有緩存請求方法和處理請求方法,利用生成者與消費(fèi)者模式來處理存儲(chǔ)請求,利用反向等待來判斷任務(wù)池的非空狀態(tài)。
Channel參與者:
- package whut.threadpool;
- //用到了生產(chǎn)者與消費(fèi)者模式
- //生成線程池,接受客戶端線程的請求,找到一個(gè)工作線程分配該客戶端請求
- public class Channel {
- private static final int MAX_REQUEST = 100;// 并發(fā)數(shù)目,就是同時(shí)可以接受多少個(gè)客戶端請求
- //利用數(shù)組來存放請求,每次從數(shù)組末尾添加請求,從開頭移除請求來處理
- private final Request[] requestQueue;// 存儲(chǔ)接受客戶線程的數(shù)目
- private int tail;//下一次存放Request的位置
- private int head;//下一次獲取Request的位置
- private int count;// 當(dāng)前request數(shù)量
- private final WorkerThread[] threadPool;// 存儲(chǔ)線程池中的工作線程
- // 運(yùn)用數(shù)組來存儲(chǔ)
- public Channel(int threads) {
- this.requestQueue = new Request[MAX_REQUEST];
- this.head = 0;
- this.head = 0;
- this.count = 0;
- threadPool = new WorkerThread[threads];
- // 啟動(dòng)工作線程
- for (int i = 0; i < threadPool.length; i++) {
- threadPool[i] = new WorkerThread("Worker-" + i, this);
- }
- }
- public void startWorkers() {
- for (int i = 0; i < threadPool.length; i++) {
- threadPool[i].start();
- }
- }
- // 接受客戶端請求線程
- public synchronized void putRequest(Request request) {
- // 當(dāng)Request的數(shù)量大于或等于同時(shí)接受的數(shù)目時(shí)候,要等待
- while (count >= requestQueue.length)
- try {
- wait();
- } catch (InterruptedException e) {
- }
- requestQueue[tail] = request;
- tail = (tail + 1) % requestQueue.length;
- count++;
- notifyAll();
- }
- // 處理客戶端請求線程
- public synchronized Request takeRequest() {
- while (count <= 0)
- try {
- wait();
- } catch (InterruptedException e) {
- }
- Request request = requestQueue[head];
- head = (head + 1) % requestQueue.length;
- count--;
- notifyAll();
- return request;
- }
- }
客戶端請求線程:
- package whut.threadpool;
- import java.util.Random;
- //向Channel發(fā)送Request請求的
- public class ClientThread extends Thread{
- private final Channel channel;
- private static final Random random=new Random();
- public ClientThread(String name,Channel channel)
- {
- super(name);
- this.channel=channel;
- }
- public void run()
- {
- try{
- for(int i=0;true;i++)
- {
- Request request=new Request(getName(),i);
- channel.putRequest(request);
- Thread.sleep(random.nextInt(1000));
- }
- }catch(InterruptedException e)
- {
- }
- }
- }
工作線程:
- package whut.threadpool;
- //具體工作線程
- public class WorkerThread extends Thread{
- private final Channel channel;
- public WorkerThread(String name,Channel channel)
- {
- super(name);
- this.channel=channel;
- }
- public void run()
- {
- while(true)
- {
- Request request=channel.takeRequest();
- request.execute();
- }
- }
- }
#p#
線程池實(shí)例2:
工作線程:
利用同步塊來處理,利用Vector來存儲(chǔ)客戶端請求。在Channel有緩存請求方法和處理請求方法,利用生成者與消費(fèi)者模式來處理存儲(chǔ)請求,利用正向等待來判斷任務(wù)池的非空狀態(tài)。
這種實(shí)例,可以借鑒到網(wǎng)絡(luò)ServerSocket處理用戶請求的模式中,有很好的擴(kuò)展性與實(shí)用性。
利用Vector來存儲(chǔ),依舊是每次集合的最后一個(gè)位置添加請求,從開始位置移除請求來處理。
Channel參與者:
- package whut.threadpool2;
- import java.util.Vector;
- /*
- * 這個(gè)主要的作用如下
- * 0,緩沖客戶請求線程(利用生產(chǎn)者與消費(fèi)者模式)
- * 1,存儲(chǔ)客戶端請求的線程
- * 2,初始化啟動(dòng)一定數(shù)量的線程
- * 3,主動(dòng)來喚醒處于任務(wù)池中wait set的一些線程來執(zhí)行任務(wù)
- */
- public class Channel {
- public final static int THREAD_COUNT=4;
- public static void main(String[] args) {
- //定義兩個(gè)集合,一個(gè)是存放客戶端請求的,利用Vector,
- //一個(gè)是存儲(chǔ)線程的,就是線程池中的線程數(shù)目
- //Vector是線程安全的,它實(shí)現(xiàn)了Collection和List
- //Vector 類可以實(shí)現(xiàn)可增長的對象數(shù)組。與數(shù)組一樣,
- //它包含可以使用整數(shù)索引進(jìn)行訪問的組件。但Vector 的大小可以根據(jù)需要增大或縮小,
- //以適應(yīng)創(chuàng)建 Vector 后進(jìn)行添加或移除項(xiàng)的操作。
- //Collection中主要包括了list相關(guān)的集合以及set相關(guān)的集合,Queue相關(guān)的集合
- //注意:Map不是Collection的子類,都是java.util.*下的同級(jí)包
- Vector pool=new Vector();
- //工作線程,初始分配一定限額的數(shù)目
- WorkerThread[] workers=new WorkerThread[THREAD_COUNT];
- //初始化啟動(dòng)工作線程
- for(int i=0;i<workers.length;i++)
- {
- workers[i]=new WorkerThread(pool);
- workers[i].start();
- }
- //接受新的任務(wù),并且將其存儲(chǔ)在Vector中
- Object task=new Object();//模擬的任務(wù)實(shí)體類
- //此處省略具體工作
- //在網(wǎng)絡(luò)編程中,這里就是利用ServerSocket來利用ServerSocket.accept接受一個(gè)Socket從而喚醒線程
- //當(dāng)有具體的請求達(dá)到
- synchronized(pool)
- {
- pool.add(pool.size(), task);
- pool.notifyAll();//通知所有在pool wait set中等待的線程,喚醒一個(gè)線程進(jìn)行處理
- }
- //注意上面這步驟添加任務(wù)池請求,以及通知線程,都可以放在工作線程內(nèi)部實(shí)現(xiàn)
- //只需要定義該方法為static,在方法體用同步塊,且共享的線程池也是static即可
- //下面這步,可以有可以沒有根據(jù)實(shí)際情況
- //取消等待的線程
- for(int i=0;i<workers.length;i++)
- {
- workers[i].interrupt();
- }
- }
- }
工作線程:
- package whut.threadpool2;
- import java.util.List;
- public class WorkerThread extends Thread {
- private List pool;//任務(wù)請求池
- private static int fileCompressed=0;//所有實(shí)例共享的
- public WorkerThread(List pool)
- {
- this.pool=pool;
- }
- //利用靜態(tài)synchronized來作為整個(gè)synchronized類方法,僅能同時(shí)一個(gè)操作該類的這個(gè)方法
- private static synchronized void incrementFilesCompressed()
- {
- fileCompressed++;
- }
- public void run()
- {
- //保證無限循環(huán)等待中
- while(true)
- {
- //共享互斥來訪問pool變量
- synchronized(pool)
- {
- //利用多線程設(shè)計(jì)模式中的
- //Guarded Suspension Pattern,警戒條件為pool不為空,否則無限的等待中
- while(pool.isEmpty())
- {
- try{
- pool.wait();//進(jìn)入pool的wait set中等待著,釋放了pool的鎖
- }catch(InterruptedException e)
- {
- }
- }
- //當(dāng)線程被喚醒,需要重新獲取pool的鎖,
- //再次繼續(xù)執(zhí)行synchronized代碼塊中其余的工作
- //當(dāng)不為空的時(shí)候,繼續(xù)再判斷是否為空,如果不為空,則跳出循環(huán)
- //必須先從任務(wù)池中移除一個(gè)任務(wù)來執(zhí)行,統(tǒng)一用從末尾添加,從開始處移除
- pool.remove(0);//獲取任務(wù)池中的任務(wù),并且要進(jìn)行轉(zhuǎn)換
- }
- //下面是線程所要處理的具體工作
- }
- }
- }