java使用默認(rèn)線程池踩過的坑
場景
一個調(diào)度器,兩個調(diào)度任務(wù),分別處理兩個目錄下的txt文件,某個調(diào)度任務(wù)應(yīng)對某些復(fù)雜問題的時候會持續(xù)特別長的時間,甚至有一直阻塞的可能。我們需要一個manager來管理這些task,當(dāng)這個task的上一次執(zhí)行時間距離現(xiàn)在超過5個調(diào)度周期的時候,就直接停掉這個線程,然后再重啟它,保證兩個目標(biāo)目錄下沒有待處理的txt文件堆積。
問題
直接使用java默認(rèn)的線程池調(diào)度task1和task2.由于外部txt的種種不可控原因,導(dǎo)致task2線程阻塞?,F(xiàn)象就是task1和線程池調(diào)度器都正常運(yùn)行著,但是task2遲遲沒有動作。
當(dāng)然,找到具體的阻塞原因并進(jìn)行針對性解決是很重要的。但是,這種措施很可能并不能完全、徹底、全面的處理好所有未知情況。我們需要保證任務(wù)線程或者調(diào)度器的健壯性!
方案計(jì)劃
線程池調(diào)度器并沒有原生的針對被調(diào)度線程的業(yè)務(wù)運(yùn)行狀態(tài)進(jìn)行監(jiān)控處理的API。因?yàn)閠ask2是阻塞在我們的業(yè)務(wù)邏輯里的,所以***的方式是寫一個TaskManager,所有的任務(wù)線程在執(zhí)行任務(wù)前全部到這個TaskManager這里來注冊自己。這個TaskManager就負(fù)責(zé)對于每個自己管轄范圍內(nèi)的task進(jìn)行實(shí)時全程監(jiān)控!
后面的重點(diǎn)就是如何處理超過5個執(zhí)行周期的task了。
方案如下:
●一旦發(fā)現(xiàn)這個task線程,立即中止它,然后再次重啟;
●一旦發(fā)現(xiàn)這個task線程,直接將整個pool清空并停止,重新放入這兩個task ——【task明確的情況下】;
方案實(shí)施
中止后重啟
●Task實(shí)現(xiàn)類
- class FileTask extends Thread {
- private long lastExecTime = 0;
- protected long interval = 10000;
- public long getLastExecTime() {
- return lastExecTime;
- }
- public void setLastExecTime(long lastExecTime) {
- this.lastExecTime = lastExecTime;
- }
- public long getInterval() {
- return interval;
- }
- public void setInterval(long interval) {
- this.interval = interval;
- }
- public File[] getFiles() {
- return null;
- }
●Override
- public void run() {
- while (!Thread.currentThread().isInterrupted()) {
- lastExecTime = System.currentTimeMillis();
- System.out.println(Thread.currentThread().getName() + " is running -> " + new Date());
- try {
- Thread.sleep(getInterval() * 6 * 1000);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- e.printStackTrace(); // 當(dāng)線程池shutdown之后,這里就會拋出exception了
- }
- }
- }
- }
●TaskManager
- public class TaskManager implements Runnable {
- private final static Log logger = LogFactory.getLog(TaskManager .class);
- public Set<FileTask> runners = new CopyOnWriteArraySet<FileTask>();
- ExecutorService pool = Executors.newCachedThreadPool();
- public void registerCodeRunnable(FileTask process) {
- runners.add(process);
- }
- public TaskManager (Set<FileTask> runners) {
- this.runners = runners;
- }
@Override
- public void run() {
- while (!Thread.currentThread().isInterrupted()) {
- try {
- long current = System.currentTimeMillis();
- for (FileTask wrapper : runners) {
- if (current - wrapper.getLastExecTime() > wrapper.getInterval() * 5) {
- wrapper.interrupt();
- for (File file : wrapper.getFiles()) {
- file.delete();
- }
- wrapper.start();
- }
- }
- } catch (Exception e1) {
- logger.error("Error happens when we trying to interrupt and restart a task ");
- ExceptionCollector.registerException(e1);
- }
- try {
- Thread.sleep(500);
- } catch (InterruptedException e) {
- }
- }
- }
這段代碼會報(bào)錯 java.lang.Thread IllegalThreadStateException。為什么呢?其實(shí)這是一個很基礎(chǔ)的問題,您應(yīng)該不會像我一樣馬虎。查看Thread.start()的注釋, 有這樣一段:
It is never legal to start a thread more than once. In particular, a thread may not be restarted once it has completed execution.
是的,一個線程不能夠啟動兩次。那么它是怎么判斷的呢?
- public synchronized void start() {
- /**
- * A zero status value corresponds to state "NEW". 0對應(yīng)的是state NEW
- */
if (threadStatus != 0) //如果不是NEW state,就直接拋出異常!#p#
- throw new IllegalThreadStateException();
- group.add(this);
- boolean started = false;
- try {
- start0(); // 啟動線程的native方法
- started = true;
- } finally {
- try {
- if (!started) {
- group.threadStartFailed(this);
- }
- } catch (Throwable ignore) {
- }
- }
- }
恩,只有是NEW狀態(tài)才能夠調(diào)用native方法啟動一個線程。好吧,到這里了,就普及也自補(bǔ)一下jvm里的線程狀態(tài):
所有的線程狀態(tài)::
●NEW —— 還沒有啟動過
●RUNNABLE —— 正在jvm上運(yùn)行著
●BLOCKED —— 正在等待鎖/信號量被釋放
●WAITING —— 等待其他某個線程的某個特定動作
●TIMED_WAITING —— A thread that is waiting for another thread to perform an action for up to a specified waiting time is in this state.
●TERMINATED —— 退出,停止
線程在某個時間點(diǎn)上只可能存在一種狀態(tài),這些狀態(tài)是jvm里的,并不反映操作系統(tǒng)線程的狀態(tài)。查一下Thread的API,沒有對其狀態(tài)進(jìn)行修改的API。那么這條路是不通的嗎?
仔細(xì)考慮一下……
如果把任務(wù)做成Runnable實(shí)現(xiàn)類,然后在把這個實(shí)現(xiàn)類丟進(jìn)線程池調(diào)度器之前,利用此Runnable構(gòu)造一個Thread,是不是這個Thread對象就能夠控制這個runnable對象,進(jìn)而控制在線程池中運(yùn)行著的task了呢?非也!讓我們看看Thread和ThreadPoolExecutor對Runnable的處理吧。
●Thread
- /* What will be run. */
- private Runnable target;
結(jié)合上面的start()方法,很容易猜出,start0()會把target弄成一個線程來進(jìn)行運(yùn)行。
●ThreadPoolExecutor
- public void execute(Runnable command) {
- if (command == null)
- throw new NullPointerException();
- int c = ctl.get();
- if (workerCountOf(c) < corePoolSize) {
- if (addWorker(command, true))
- return;
- c = ctl.get();
- }
- if (isRunning(c) && workQueue.offer(command)) {
- int recheck = ctl.get();
- if (! isRunning(recheck) && remove(command))
- reject(command);
- else if (workerCountOf(recheck) == 0)
- addWorker(null, false);
- }
- else if (!addWorker(command, false))
- reject(command);
- }
- private boolean addWorker(Runnable firstTask, boolean core) {
- …
- boolean workerStarted = false;
- boolean workerAdded = false;
- Worker w = null;
- try {
- final ReentrantLock mainLock = this.mainLock;
- w = new Worker(firstTask);
- final Thread t = w.thread;
- if (t != null) {
- mainLock.lock();
- try {
- int c = ctl.get();
- int rs = runStateOf(c);
- if (rs < SHUTDOWN ||
- (rs == SHUTDOWN && firstTask == null)) {
- if (t.isAlive()) // precheck that t is startable
- throw new IllegalThreadStateException();
- workers.add(w);
- int s = workers.size();
- if (s > largestPoolSize)
- largestPoolSize = s;
- workerAdded = true;
- }
- } finally {
- mainLock.unlock();
- }
- if (workerAdded) {
- t.start();
- workerStarted = true;
- }
- }
- } finally {
- if (! workerStarted)
- addWorkerFailed(w);
- }
- return workerStarted;
- }
那么Worker又是怎樣的呢?
●Worker
- private final class Worker
- extends AbstractQueuedSynchronizer
- implements Runnable
- {
- final Thread thread;
- Runnable firstTask;
- volatile long completedTasks;
- Worker(Runnable firstTask) {
- setState(-1); //調(diào)用runWorker之前不可以interrupt
- this.firstTask = firstTask;
- this.thread = getThreadFactory().newThread(this);
- }
- public void run() {
- runWorker(this);
- }
- ……
- …….
- void interruptIfStarted() {
- Thread t;
- if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
- try {
- t.interrupt();
- } catch (SecurityException ignore) {
- }
- }
- }
- }
可見worker里既包裝了Runnable對象——task,又包裝了一個Thread對象——以自己作為初始化參數(shù),因?yàn)閣orker也是Runnable對象。然后對外提供了運(yùn)行與停止接口,run()和interruptIfStarted()。回顧上面使用Thread的例子不禁有了新的領(lǐng)悟,我們把一個Thread對象交給ThreadPoolExecutor執(zhí)行后,實(shí)際的調(diào)用是對Thread(FileTask())對象,我們暫時稱之為workerWrapper。那么我們在池外進(jìn)行FileTask.interrupt()操作影響的是FileTask對象,而不是workerWrapper。所以可能上面對于start()方法二次調(diào)用不是特別適當(dāng)。更恰當(dāng)?shù)膽?yīng)該是在fileTask.interrupt()的時候就跑出異常,因?yàn)閺膩頉]有對fileTask對象執(zhí)行過start()方法,這時候去interrupt就會出現(xiàn)錯誤。具體如下圖:
分析到此,我們已經(jīng)明確除了調(diào)用ThreadPoolExecutor了的interruptWorkers()方法別無其他途徑操作這些worker了。
- private void interruptWorkers() {
- final ReentrantLock mainLock = this.mainLock;
- mainLock.lock();
- try {
- for (Worker w : workers)
- w.interruptIfStarted();
- } finally {
- mainLock.unlock();
- }
- }
重啟線程池
●TaskManager
- public class TaskManager implements Runnable {
- …..
- public TaskManager (Set<FileTask> runners) {
- super();
- this.runners = runners;
- executeTasks(runners);
- }
- private void executeTasks(Set<FileTask> runners) {
- for (FileTask task : runners) {
- pool.execute(task);
- System.out.println(task.getClass().getSimpleName() + " has been started");
- }
- }
@Override
- public void run() {
- while (!Thread.currentThread().isInterrupted()) {
- try {
- long current = System.currentTimeMillis();
- for (FileTask wrapper : runners) {
- if (wrapper.getLastExecTime() != 0 && current - wrapper.getLastExecTime() > wrapper.getInterval() * 5 * 1000) { // 開始忘了乘以1000
- wrapper.interrupt();
- if (wrapper.getFiles() != null){
- for (File file : wrapper.getFiles()) {
- file.delete();
- }
- }
- System.out.println("Going to shutdown the thread pool");
- List<Runnable> shutdownNow = pool.shutdownNow(); // 不等當(dāng)前pool里的任務(wù)執(zhí)行完,直接關(guān)閉線程池
- for (Runnable run : shutdownNow) {
- System.out.println(run + " going to be shutdown");
- }
- while (pool.awaitTermination(1, TimeUnit.SECONDS)) {
- System.out.println("The thread pool has been shutdown " + new Date());
- executeTasks(runners); // 重新執(zhí)行
- Thread.sleep(200);
- }
- }
- }
- } catch (Exception e1) {
- e1.printStackTrace();
- }
- try {
- Thread.sleep(500);
- } catch (InterruptedException e) {
- }
- }
- }
- public static void main(String[] args) {
- Set<FileTask> tasks = new HashSet<FileTask>();
- FileTask task = new FileTask();
- task.setInterval(1);
- task.setName("task-1");
- tasks.add(task);
- FileTask task1 = new FileTask();
- task1.setInterval(2);
- task.setName("task-2");
- tasks.add(task1);
- TaskManager codeManager = new TaskManager (tasks);
- new Thread(codeManager).start();
- }
- }
成功!把整個的ThreadPoolExector里所有的worker全部停止,之后再向其隊(duì)列里重新加入要執(zhí)行的兩個task(注意這里并沒有清空,只是停止而已)。這樣做雖然能夠及時處理task,但是一個很致命的缺點(diǎn)在于,如果不能明確的知道ThreadPoolExecutor要執(zhí)行的task,就沒有辦法重新執(zhí)行這些任務(wù)。#p#
定制線程池
好吧!停止鉆研別人的東西!我們完全可以自己寫一個自己的ThreadPoolExecutor,只要把worker暴露出來就可以了。這里是不是回想起前面的start問題來了,沒錯,我們即便能夠直接針對Thread進(jìn)行interrupt, 但是不能再次start它了。那么clone一個同樣的Thread行不行呢?#p#
●Thread
- @Override
- protected Object clone() throws CloneNotSupportedException {
- throw new CloneNotSupportedException();
- }
答案顯而易見,線程是不支持clone 的。我們需要重新new 一個Thread來重新運(yùn)行。其實(shí)我們只需要將原來的Worker里的Runnable換成我們自己的task,然后將訪問權(quán)限適當(dāng)放開就可以了。還有,就是讓我們的CustomThreadPoolExecutor繼承Thread,因?yàn)樗枰〞r監(jiān)控自己的所有的worker里Thread的運(yùn)行狀態(tài)。
●CustomThreadPoolExecutor
- public class CustomThreadPoolExecutor extends ThreadPoolExecutor implements Runnable {
- public void execute(Testask command) {
- ….//將執(zhí)行接口改為接收我們的業(yè)務(wù)類
- }
- …
- …
- private final class Worker
- extends AbstractQueuedSynchronizer
- implements Runnable
- {
- …
- Testask firstTask; //將Runnable改為我們的業(yè)務(wù)類,方便查看狀態(tài)
- …
- Worker(Testask firstTask) {
- …//同樣將初始化參數(shù)改為我們的業(yè)務(wù)類
- }
- }
- public static void main(String[] args) {
- CustomThreadPoolExecutor pool = new CustomThreadPoolExecutor(0, Integer.MAX_VALUE,
- 60L, TimeUnit.SECONDS,
- new SynchronousQueue<Runnable>());
- Testask task = new Testask();
- task.setInterval(1);
- pool.execute(task);
- Testask task1 = new Testask();
- task1.setInterval(2);
- pool.execute(task1);
- new Thread(pool).start();
- }
- @Override
- public void run() {
- while (!Thread.currentThread().isInterrupted()) {
- try {
- long current = System.currentTimeMillis();
- Set<Testask> toReExecute = new HashSet<Testask>();
- System.out.println("\t number is " + number);
- for (Worker wrapper : workers) {
- Testask tt = wrapper.firstTask;
- if (tt != null) {
- if (current - tt.getLastExecTime() > tt.getInterval() * 5 * 1000) {
- wrapper.interruptIfStarted();
- remove(tt);
- if (tt.getFiles() != null) {
- for (File file : tt.getFiles()) {
- file.delete();
- }
- }
- System.out.println("THread is timeout : " + tt + " " + new Date());
- toReExecute.add(tt);
- }
- }
- }
- if (toReExecute.size() > 0) {
- mainLock.lock();
- try {
- for (Testask tt : toReExecute) {
- execute(tt); // execute this task again
- }
- } finally {
- mainLock.unlock();
- }
- }
- } catch (Exception e1) {
- System.out.println("Error happens when we trying to interrupt and restart a code task ");
- }
- try {
- Thread.sleep(500);
- } catch (InterruptedException e) {
- }
- }
- }
- }
●Testask
- class Testask implements Runnable {
- …..
- @Override
- public void run() {
- while (!Thread.currentThread().isInterrupted()) {
- lastExecTime = System.currentTimeMillis();
- System.out.println(Thread.currentThread().getName() + " is running -> " + new Date());
- try {
- CustomThreadPoolExecutor.number++;
- Thread.sleep(getInterval() * 6 * 1000);
- System.out.println(Thread.currentThread().getName() + " after sleep");
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- System.out.println("InterruptedException happens");
- }
- }
- System.out.println("Going to die");
- }
- }
最終方案
綜上,最穩(wěn)妥的就是使用JDK自帶的ThreadPoolExecutor, 如果需要對池里的task進(jìn)行任意時間的控制,可以考慮全面更新,全方面,360度無死角的定制自己的線程池當(dāng)然是***的方案,但是一定要注意對于共享對象的處理,適當(dāng)?shù)奶幚砗貌l(fā)訪問共享對象的方法。
鑒于我們的場景,由于時間緊,而且需要了解的task并不多,暫時選用全部重新更新的策略。上線后,抽時間把自己定制的ThreadPoolExecutor搞定,然后更新上去!