五種判斷線程池任務(wù)執(zhí)行完成的方式
Thread線程是否執(zhí)行完成,我們可以調(diào)用join方法然后等待線程執(zhí)行完成;那在使用線程池的時(shí)候,我們?nèi)绾沃谰€程已經(jīng)執(zhí)行完成了?本文就帶給大家五種判斷的方式:
- isTerminated() 方式,在執(zhí)行 shutdown() ,關(guān)閉線程池后,判斷是否所有任務(wù)已經(jīng)完成。
- ThreadPoolExecutor 的 getCompletedTaskCount() 方法,判斷完成任務(wù)數(shù)和全部任務(wù)數(shù)是否相等。
- CountDownLatch計(jì)數(shù)器,使用閉鎖計(jì)數(shù)來(lái)判斷是否全部完成。
- 手動(dòng)維護(hù)一個(gè)公共計(jì)數(shù) ,原理和閉鎖類(lèi)似,就是更加靈活。
- 使用submit向線程池提交任務(wù),F(xiàn)uture判斷任務(wù)執(zhí)行狀態(tài)。
方法一:isTerminated()
測(cè)試代碼
package pool;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* @author 百里
*/
public class BaiLiIsShutdownThreadPoolDemo {
/**
* 創(chuàng)建一個(gè)最大線程數(shù)15的線程池
*/
public static ThreadPoolExecutor pool = new ThreadPoolExecutor(
10,
15,
0L,
TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(10));
/**
* 線程執(zhí)行方法,隨機(jī)等待0到10秒
*/
private static void sleepMethod(int index){
try {
long sleepTime = new Double(Math.random() * 10000).longValue();
Thread.sleep(sleepTime);
System.out.println("當(dāng)前線程執(zhí)行結(jié)束: " + index);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
/**
* 方法一:isTerminated
* @param args
* @throws InterruptedException
*/
public static void main(String[] args) throws InterruptedException {
for (int i = 0; i < 10; i++) {
int index = i;
pool.execute(() -> sleepMethod(index));
}
pool.shutdown();
while (!pool.isTerminated()){
Thread.sleep(1000);
System.out.println("還沒(méi)停止。。。");
}
System.out.println("全部執(zhí)行完畢");
}
}
上述代碼處理邏輯在主線程中進(jìn)行循環(huán)判斷,全部任務(wù)是否已經(jīng)完成。
這里有兩個(gè)主要方法:
- shutdown() :對(duì)線程池進(jìn)行有序關(guān)閉。調(diào)用該方法后,線程池將不再接受新的任務(wù),但會(huì)繼續(xù)執(zhí)行已提交的任務(wù)。如果線程池已經(jīng)處于關(guān)閉狀態(tài),則對(duì)該方法的調(diào)用沒(méi)有額外的作用。
- isTerminated() :判斷線程池中的所有任務(wù)是否在關(guān)閉后完成。只有在調(diào)用了shutdown()或shutdownNow()方法后,所有任務(wù)執(zhí)行完畢,才會(huì)返回true。需要注意的是,在調(diào)用shutdown()之前調(diào)用isTerminated()方法始終返回false。
優(yōu)缺點(diǎn)分析
優(yōu)點(diǎn) :操作簡(jiǎn)單。
缺點(diǎn) :需要關(guān)閉線程池。并且日常使用是將線程池注入到Spring容器,然后各個(gè)組件中統(tǒng)一用同一個(gè)線程池,不能直接關(guān)閉線程池。
方法二:getCompletedTaskCount()
測(cè)試代碼
package pool;
import java.util.concurrent.*;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/**
* @author 百里
*/
public class BaiLiIsShutdownThreadPoolDemo {
/**
* 創(chuàng)建一個(gè)最大線程數(shù)15的線程池
*/
public static ThreadPoolExecutor pool = new ThreadPoolExecutor(
10,
15,
0L,
TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(10));
/**
* 線程執(zhí)行方法,隨機(jī)等待0到10秒
*/
private static void sleepMethod(int index){
try {
long sleepTime = new Double(Math.random() * 10000).longValue();
Thread.sleep(sleepTime);
System.out.println("當(dāng)前線程執(zhí)行結(jié)束: " + index);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
/**
* 方法二:getCompletedTaskCount
* @param args
* @throws InterruptedException
*/
public static void main(String[] args) throws InterruptedException {
for (int i = 0; i < 10; i++) {
int index = i;
pool.execute(() -> sleepMethod(index));
}
//當(dāng)線程池完成的線程數(shù)等于線程池中的總線程數(shù)
while (!(pool.getTaskCount() == pool.getCompletedTaskCount())) {
System.out.println("任務(wù)總數(shù):" + pool.getTaskCount() + "; 已經(jīng)完成任務(wù)數(shù):" + pool.getCompletedTaskCount());
Thread.sleep(1000);
System.out.println("還沒(méi)停止。。。");
}
System.out.println("全部執(zhí)行完畢");
}
}
上述代碼處理邏輯還是一樣在主線程循環(huán)判斷,主要就兩個(gè)方法:
- getTaskCount() :返回計(jì)劃執(zhí)行的任務(wù)總數(shù)。由于任務(wù)和線程的狀態(tài)可能在計(jì)算過(guò)程中動(dòng)態(tài)變化,返回的值只是一個(gè)近似值。這個(gè)方法返回的是線程池提交的任務(wù)總數(shù),包括已經(jīng)完成和正在執(zhí)行中的任務(wù)。
- getCompletedTaskCount() :返回已經(jīng)完成執(zhí)行的任務(wù)的大致總數(shù)。由于任務(wù)和線程的狀態(tài)可能在計(jì)算過(guò)程中動(dòng)態(tài)改變,返回的值只是一個(gè)近似值,并且在連續(xù)的調(diào)用中不會(huì)減少。這個(gè)方法返回的是已經(jīng)完成執(zhí)行的任務(wù)數(shù)量,不包括正在執(zhí)行中的任務(wù)。
優(yōu)缺點(diǎn)分析
- 優(yōu)點(diǎn) :不必關(guān)閉線程池,避免了創(chuàng)建和銷(xiāo)毀帶來(lái)的損耗。
- 缺點(diǎn) :使用這種判斷存在很大的限制條件;必須確定在循環(huán)判斷過(guò)程中沒(méi)有新的任務(wù)產(chǎn)生。
方法三:CountDownLatch
測(cè)試代碼
package pool;
import java.util.concurrent.*;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/**
* @author 百里
*/
public class BaiLiIsShutdownThreadPoolDemo {
/**
* 創(chuàng)建一個(gè)最大線程數(shù)15的線程池
*/
public static ThreadPoolExecutor pool = new ThreadPoolExecutor(
10,
15,
0L,
TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(10));
/**
* 線程執(zhí)行方法,隨機(jī)等待0到10秒
*/
private static void sleepMethod(int index){
try {
long sleepTime = new Double(Math.random() * 10000).longValue();
Thread.sleep(sleepTime);
System.out.println("當(dāng)前線程執(zhí)行結(jié)束: " + index);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
/**
* 方法三:CountDownLatch
* @throws Exception
*/
public static void main(String[] args) throws Exception {
//計(jì)數(shù)器,判斷線程是否執(zhí)行結(jié)束
CountDownLatch taskLatch = new CountDownLatch(10);
for (int i = 0; i < 10; i++) {
int index = i;
pool.execute(() -> {
sleepMethod(index);
taskLatch.countDown();
System.out.println("當(dāng)前計(jì)數(shù)器數(shù)量:" + taskLatch.getCount());
});
}
//當(dāng)前線程阻塞,等待計(jì)數(shù)器置為0
taskLatch.await();
System.out.println("全部執(zhí)行完畢");
}
}
優(yōu)缺點(diǎn)分析
優(yōu)點(diǎn) :代碼優(yōu)雅,不需要對(duì)線程池進(jìn)行操作。
缺點(diǎn) :需要提前知道線程數(shù)量;性能較差;還需要在線程代碼塊內(nèi)加上異常判斷,否則在 countDown之前發(fā)生異常而沒(méi)有處理,就會(huì)導(dǎo)致主線程永遠(yuǎn)阻塞在 await。
方法四:公共計(jì)數(shù)
測(cè)試代碼
package pool;
import java.util.concurrent.*;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/**
* @author 百里
*/
public class BaiLiIsShutdownThreadPoolDemo {
/**
* 創(chuàng)建一個(gè)最大線程數(shù)15的線程池
*/
public static ThreadPoolExecutor pool = new ThreadPoolExecutor(
10,
15,
0L,
TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(10));
/**
* 線程執(zhí)行方法,隨機(jī)等待0到10秒
*/
private static void sleepMethod(int index){
try {
long sleepTime = new Double(Math.random() * 10000).longValue();
Thread.sleep(sleepTime);
System.out.println("當(dāng)前線程執(zhí)行結(jié)束: " + index);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
private static int taskNum = 0; //計(jì)數(shù)器
/**
* 方法四:公共計(jì)數(shù)
* @throws Exception
*/
public static void main(String[] args) throws Exception {
Lock lock = new ReentrantLock();
for (int i = 0; i < 10; i++) {
int index = i;
pool.execute(() -> {
sleepMethod(index);
lock.lock();
taskNum++;
lock.unlock();
});
}
while(taskNum < 10) {
Thread.sleep(1000);
System.out.println("還沒(méi)停止。。。當(dāng)前完成任務(wù)數(shù):" + taskNum);
}
System.out.println("全部執(zhí)行完畢");
}
}
這種實(shí)現(xiàn)其實(shí)就是通過(guò)加鎖計(jì)數(shù),然后循環(huán)判斷。
優(yōu)缺點(diǎn)分析
- 優(yōu)點(diǎn) :手動(dòng)維護(hù)方式更加靈活,對(duì)于一些特殊場(chǎng)景可以手動(dòng)處理。
- 缺點(diǎn) :和CountDownLatch相比,一樣需要知道線程數(shù)目,但是代碼實(shí)現(xiàn)比較麻煩。
方法五:Future
測(cè)試代碼
package pool;
import java.util.concurrent.*;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/**
* @author 百里
*/
public class BaiLiIsShutdownThreadPoolDemo {
/**
* 創(chuàng)建一個(gè)最大線程數(shù)15的線程池
*/
public static ThreadPoolExecutor pool = new ThreadPoolExecutor(
10,
15,
0L,
TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(10));
/**
* 線程執(zhí)行方法,隨機(jī)等待0到10秒
*/
private static void sleepMethod(int index){
try {
long sleepTime = new Double(Math.random() * 10000).longValue();
Thread.sleep(sleepTime);
System.out.println("當(dāng)前線程執(zhí)行結(jié)束: " + index);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
/**
* 方法五:Future
* @throws Exception
*/
public static void main(String[] args) throws Exception {
Future future = pool.submit(() -> sleepMethod(1));
while (!future.isDone()){
Thread.sleep(1000);
System.out.println("還沒(méi)停止。。。");
}
System.out.println("全部執(zhí)行完畢");
}
}
優(yōu)缺點(diǎn)分析
優(yōu)點(diǎn):使用簡(jiǎn)單,不需要關(guān)閉線程池。
缺點(diǎn):每個(gè)提交給線程池的任務(wù)都會(huì)關(guān)聯(lián)一個(gè)Future對(duì)象,這可能會(huì)引入額外的內(nèi)存開(kāi)銷(xiāo)。如果需要處理大量的任務(wù),可能會(huì)占用較多的內(nèi)存。
測(cè)試代碼匯總
package pool;
import java.util.concurrent.*;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/**
* 五種判斷線程池任務(wù)執(zhí)行完成的方式
* @author 百里
*/
public class BaiLiIsShutdownThreadPoolDemo {
/**
* 創(chuàng)建一個(gè)最大線程數(shù)15的線程池
*/
public static ThreadPoolExecutor pool = new ThreadPoolExecutor(
10,
15,
0L,
TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(10));
/**
* 線程執(zhí)行方法,隨機(jī)等待0到10秒
*/
private static void sleepMethod(int index){
try {
long sleepTime = new Double(Math.random() * 10000).longValue();
Thread.sleep(sleepTime);
System.out.println("當(dāng)前線程執(zhí)行結(jié)束: " + index);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
/**
* 方法一:isTerminated
* @param args
* @throws InterruptedException
*/
public static void isTerminatedTest(String[] args) throws InterruptedException {
for (int i = 0; i < 10; i++) {
int index = i;
pool.execute(() -> sleepMethod(index));
}
pool.shutdown();
while (!pool.isTerminated()){
Thread.sleep(1000);
System.out.println("還沒(méi)停止。。。");
}
System.out.println("全部執(zhí)行完畢");
}
/**
* 方法二:getCompletedTaskCount
* @param args
* @throws InterruptedException
*/
public static void completedTaskCountTest(String[] args) throws InterruptedException {
for (int i = 0; i < 10; i++) {
int index = i;
pool.execute(() -> sleepMethod(index));
}
//當(dāng)線程池完成的線程數(shù)等于線程池中的總線程數(shù)
while (!(pool.getTaskCount() == pool.getCompletedTaskCount())) {
System.out.println("任務(wù)總數(shù):" + pool.getTaskCount() + "; 已經(jīng)完成任務(wù)數(shù):" + pool.getCompletedTaskCount());
Thread.sleep(1000);
System.out.println("還沒(méi)停止。。。");
}
System.out.println("全部執(zhí)行完畢");
}
/**
* 方法三:CountDownLatch
* @throws Exception
*/
public static void countDownLatchTest(String[] args) throws Exception {
//計(jì)數(shù)器,判斷線程是否執(zhí)行結(jié)束
CountDownLatch taskLatch = new CountDownLatch(10);
for (int i = 0; i < 10; i++) {
int index = i;
pool.execute(() -> {
sleepMethod(index);
taskLatch.countDown();
System.out.println("當(dāng)前計(jì)數(shù)器數(shù)量:" + taskLatch.getCount());
});
}
//當(dāng)前線程阻塞,等待計(jì)數(shù)器置為0
taskLatch.await();
System.out.println("全部執(zhí)行完畢");
}
private static int taskNum = 0;
/**
* 方法四:公共計(jì)數(shù)
* @throws Exception
*/
public static void countTest(String[] args) throws Exception {
Lock lock = new ReentrantLock();
for (int i = 0; i < 10; i++) {
int index = i;
pool.execute(() -> {
sleepMethod(index);
lock.lock();
taskNum++;
lock.unlock();
});
}
while(taskNum < 10) {
Thread.sleep(1000);
System.out.println("還沒(méi)停止。。。當(dāng)前完成任務(wù)數(shù):" + taskNum);
}
System.out.println("全部執(zhí)行完畢");
}
/**
* 方法五:Future
* @throws Exception
*/
public static void futureTest(String[] args) throws Exception {
Future future = pool.submit(() -> sleepMethod(1));
while (!future.isDone()){
Thread.sleep(1000);
System.out.println("還沒(méi)停止。。。");
}
System.out.println("全部執(zhí)行完畢");
}
}