自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

java使用默認(rèn)線程池踩過的坑

網(wǎng)絡(luò) 網(wǎng)絡(luò)管理 網(wǎng)絡(luò)運(yùn)維
直接使用java默認(rèn)的線程池調(diào)度task1和task2.由于外部txt的種種不可控原因,導(dǎo)致task2線程阻塞?,F(xiàn)象就是task1和線程池調(diào)度器都正常運(yùn)行著,但是task2遲遲沒有動作。我們需要保證任務(wù)線程或者調(diào)度器的健壯性!

場景

一個調(diào)度器,兩個調(diào)度任務(wù),分別處理兩個目錄下的txt文件,某個調(diào)度任務(wù)應(yīng)對某些復(fù)雜問題的時候會持續(xù)特別長的時間,甚至有一直阻塞的可能。我們需要一個manager來管理這些task,當(dāng)這個task的上一次執(zhí)行時間距離現(xiàn)在超過5個調(diào)度周期的時候,就直接停掉這個線程,然后再重啟它,保證兩個目標(biāo)目錄下沒有待處理的txt文件堆積。

java使用默認(rèn)線程池踩過的坑

問題

直接使用java默認(rèn)的線程池調(diào)度task1和task2.由于外部txt的種種不可控原因,導(dǎo)致task2線程阻塞?,F(xiàn)象就是task1和線程池調(diào)度器都正常運(yùn)行著,但是task2遲遲沒有動作。

當(dāng)然,找到具體的阻塞原因并進(jìn)行針對性解決是很重要的。但是,這種措施很可能并不能完全、徹底、全面的處理好所有未知情況。我們需要保證任務(wù)線程或者調(diào)度器的健壯性!

方案計(jì)劃

線程池調(diào)度器并沒有原生的針對被調(diào)度線程的業(yè)務(wù)運(yùn)行狀態(tài)進(jìn)行監(jiān)控處理的API。因?yàn)閠ask2是阻塞在我們的業(yè)務(wù)邏輯里的,所以***的方式是寫一個TaskManager,所有的任務(wù)線程在執(zhí)行任務(wù)前全部到這個TaskManager這里來注冊自己。這個TaskManager就負(fù)責(zé)對于每個自己管轄范圍內(nèi)的task進(jìn)行實(shí)時全程監(jiān)控!

java使用默認(rèn)線程池踩過的坑

后面的重點(diǎn)就是如何處理超過5個執(zhí)行周期的task了。

方案如下:

●一旦發(fā)現(xiàn)這個task線程,立即中止它,然后再次重啟;

一旦發(fā)現(xiàn)這個task線程,直接將整個pool清空并停止,重新放入這兩個task ——【task明確的情況下】;

方案實(shí)施

中止后重啟

Task實(shí)現(xiàn)類

  1. class FileTask extends Thread { 
  2. private long lastExecTime = 0
  3. protected long interval = 10000
  4. public long getLastExecTime() { 
  5.     return lastExecTime; 
  6. public void setLastExecTime(long lastExecTime) { 
  7.     this.lastExecTime = lastExecTime; 
  8. public long getInterval() { 
  9.     return interval; 
  10. public void setInterval(long interval) { 
  11.     this.interval = interval; 
  12. }  
  13. public File[] getFiles() { 
  14.     return null; 

Override

  1. public void run() { 
  2. while (!Thread.currentThread().isInterrupted()) { 
  3. lastExecTime = System.currentTimeMillis(); 
  4. System.out.println(Thread.currentThread().getName() + " is running -> " + new Date()); 
  5. try { 
  6. Thread.sleep(getInterval() * 6 * 1000); 
  7. } catch (InterruptedException e) { 
  8. Thread.currentThread().interrupt(); 
  9. e.printStackTrace();    // 當(dāng)線程池shutdown之后,這里就會拋出exception了 
  10.             } 
  11.         } 
  12.     } 
  13.     

TaskManager

  1. public class TaskManager  implements Runnable { 
  2. private final static Log logger = LogFactory.getLog(TaskManager .class); 
  3. public Set<FileTask> runners = new CopyOnWriteArraySet<FileTask>(); 
  4. ExecutorService pool = Executors.newCachedThreadPool(); 
  5. public void registerCodeRunnable(FileTask process) { 
  6. runners.add(process); 
  7. public TaskManager (Set<FileTask> runners) { 
  8. this.runners = runners; 

@Override

  1. public void run() { 
  2.        while (!Thread.currentThread().isInterrupted()) { 
  3.            try { 
  4.                long current = System.currentTimeMillis(); 
  5.                for (FileTask wrapper : runners) { 
  6.                    if (current - wrapper.getLastExecTime() > wrapper.getInterval() * 5) { 
  7.                        wrapper.interrupt(); 
  8.                        for (File file : wrapper.getFiles()) { 
  9.                            file.delete(); 
  10.                        } 
  11.                     wrapper.start();   
  12.                    } 
  13.                } 
  14.            } catch (Exception e1) { 
  15.                logger.error("Error happens when we trying to interrupt and restart a task "); 
  16.                ExceptionCollector.registerException(e1); 
  17.            } 
  18.            try { 
  19.                Thread.sleep(500); 
  20.            } catch (InterruptedException e) { 
  21.            } 
  22.        } 
  23.    } 
  24.     

這段代碼會報(bào)錯 java.lang.Thread IllegalThreadStateException。為什么呢?其實(shí)這是一個很基礎(chǔ)的問題,您應(yīng)該不會像我一樣馬虎。查看Thread.start()的注釋, 有這樣一段:

It is never legal to start a thread more than once. In particular, a thread may not be restarted once it has completed execution.

是的,一個線程不能夠啟動兩次。那么它是怎么判斷的呢?

  1. public synchronized void start() { 
  2.         /** 
  3.          * A zero status value corresponds to state "NEW".    0對應(yīng)的是state NEW 
  4.          */ 

if (threadStatus != 0) //如果不是NEW state,就直接拋出異常!#p#

  1. throw new IllegalThreadStateException(); 
  2.         group.add(this); 
  3.         boolean started = false
  4.         try { 
  5.         start0();    // 啟動線程的native方法 
  6.         started = true
  7.         } finally { 
  8.             try { 
  9.                 if (!started) { 
  10.                     group.threadStartFailed(this); 
  11.                 } 
  12.             } catch (Throwable ignore) { 
  13.             } 
  14.         } 
  15.     } 

恩,只有是NEW狀態(tài)才能夠調(diào)用native方法啟動一個線程。好吧,到這里了,就普及也自補(bǔ)一下jvm里的線程狀態(tài):

所有的線程狀態(tài)::

NEW —— 還沒有啟動過

RUNNABLE —— 正在jvm上運(yùn)行著

BLOCKED —— 正在等待鎖/信號量被釋放

WAITING —— 等待其他某個線程的某個特定動作

TIMED_WAITING —— A thread that is waiting for another thread to perform an action for up to a specified waiting time is in this state.

TERMINATED —— 退出,停止

線程在某個時間點(diǎn)上只可能存在一種狀態(tài),這些狀態(tài)是jvm里的,并不反映操作系統(tǒng)線程的狀態(tài)。查一下Thread的API,沒有對其狀態(tài)進(jìn)行修改的API。那么這條路是不通的嗎?

仔細(xì)考慮一下……

如果把任務(wù)做成Runnable實(shí)現(xiàn)類,然后在把這個實(shí)現(xiàn)類丟進(jìn)線程池調(diào)度器之前,利用此Runnable構(gòu)造一個Thread,是不是這個Thread對象就能夠控制這個runnable對象,進(jìn)而控制在線程池中運(yùn)行著的task了呢?非也!讓我們看看Thread和ThreadPoolExecutor對Runnable的處理吧。

Thread

  1. /* What will be run. */ 
  2. private Runnable target; 

 

結(jié)合上面的start()方法,很容易猜出,start0()會把target弄成一個線程來進(jìn)行運(yùn)行。

ThreadPoolExecutor

  1. public void execute(Runnable command) { 
  2.         if (command == null) 
  3.             throw new NullPointerException(); 
  4.         int c = ctl.get(); 
  5.         if (workerCountOf(c) < corePoolSize) { 
  6.             if (addWorker(command, true)) 
  7.                 return; 
  8.             c = ctl.get(); 
  9.         } 
  10.         if (isRunning(c) && workQueue.offer(command)) { 
  11.             int recheck = ctl.get(); 
  12.             if (! isRunning(recheck) && remove(command)) 
  13.                 reject(command); 
  14.             else if (workerCountOf(recheck) == 0) 
  15.                 addWorker(null, false); 
  16.         } 
  17.         else if (!addWorker(command, false)) 
  18.             reject(command); 
  19. private boolean addWorker(Runnable firstTask, boolean core) { 
  20. … 
  21. boolean workerStarted = false
  22. boolean workerAdded = false
  23. Worker w = null
  24. try { 
  25. final ReentrantLock mainLock = this.mainLock; 
  26. w = new Worker(firstTask); 
  27. final Thread t = w.thread; 
  28. if (t != null) { 
  29. mainLock.lock(); 
  30. try { 
  31. int c = ctl.get(); 
  32. int rs = runStateOf(c); 
  33. if (rs < SHUTDOWN || 
  34. (rs == SHUTDOWN && firstTask == null)) { 
  35. if (t.isAlive()) // precheck that t is startable 
  36. throw new IllegalThreadStateException(); 
  37. workers.add(w); 
  38. int s = workers.size(); 
  39. if (s > largestPoolSize) 
  40. largestPoolSize = s; 
  41. workerAdded = true
  42. } finally { 
  43. mainLock.unlock(); 
  44. if (workerAdded) { 
  45. t.start(); 
  46. workerStarted = true
  47. } finally { 
  48. if (! workerStarted) 
  49. addWorkerFailed(w); 
  50. return workerStarted; 

那么Worker又是怎樣的呢?

Worker

  1. private final class Worker 
  2. extends AbstractQueuedSynchronizer 
  3. implements Runnable 
  4. final Thread thread; 
  5. Runnable firstTask; 
  6. volatile long completedTasks; 
  7. Worker(Runnable firstTask) { 
  8. setState(-1); //調(diào)用runWorker之前不可以interrupt 
  9. this.firstTask = firstTask; 
  10. this.thread = getThreadFactory().newThread(this); 
  11. public void run() { 
  12. runWorker(this); 
  13. ……   
  14. ……. 
  15. void interruptIfStarted() { 
  16. Thread t; 
  17. if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) { 
  18. try { 
  19. t.interrupt(); 
  20. } catch (SecurityException ignore) { 

可見worker里既包裝了Runnable對象——task,又包裝了一個Thread對象——以自己作為初始化參數(shù),因?yàn)閣orker也是Runnable對象。然后對外提供了運(yùn)行與停止接口,run()和interruptIfStarted()。回顧上面使用Thread的例子不禁有了新的領(lǐng)悟,我們把一個Thread對象交給ThreadPoolExecutor執(zhí)行后,實(shí)際的調(diào)用是對Thread(FileTask())對象,我們暫時稱之為workerWrapper。那么我們在池外進(jìn)行FileTask.interrupt()操作影響的是FileTask對象,而不是workerWrapper。所以可能上面對于start()方法二次調(diào)用不是特別適當(dāng)。更恰當(dāng)?shù)膽?yīng)該是在fileTask.interrupt()的時候就跑出異常,因?yàn)閺膩頉]有對fileTask對象執(zhí)行過start()方法,這時候去interrupt就會出現(xiàn)錯誤。具體如下圖:

java使用默認(rèn)線程池踩過的坑

分析到此,我們已經(jīng)明確除了調(diào)用ThreadPoolExecutor了的interruptWorkers()方法別無其他途徑操作這些worker了。

  1. private void interruptWorkers() { 
  2. final ReentrantLock mainLock = this.mainLock; 
  3. mainLock.lock(); 
  4. try { 
  5. for (Worker w : workers) 
  6. w.interruptIfStarted(); 
  7. } finally { 
  8. mainLock.unlock(); 

重啟線程池

●TaskManager

  1. public class TaskManager  implements Runnable { 
  2. ….. 
  3. public TaskManager (Set<FileTask> runners) { 
  4. super(); 
  5. this.runners = runners; 
  6. executeTasks(runners); 
  7. private void executeTasks(Set<FileTask> runners) { 
  8. for (FileTask task : runners) { 
  9. pool.execute(task); 
  10. System.out.println(task.getClass().getSimpleName() + " has been started"); 

@Override

  1. public void run() { 
  2. while (!Thread.currentThread().isInterrupted()) { 
  3. try { 
  4. long current = System.currentTimeMillis(); 
  5. for (FileTask wrapper : runners) { 
  6. if (wrapper.getLastExecTime() != 0 && current - wrapper.getLastExecTime() > wrapper.getInterval() * 5 * 1000) {    // 開始忘了乘以1000 
  7. wrapper.interrupt(); 
  8. if (wrapper.getFiles() != null){ 
  9. for (File file : wrapper.getFiles()) { 
  10. file.delete(); 
  11. System.out.println("Going to shutdown the thread pool"); 
  12. List<Runnable> shutdownNow = pool.shutdownNow();    // 不等當(dāng)前pool里的任務(wù)執(zhí)行完,直接關(guān)閉線程池 
  13. for (Runnable run : shutdownNow) { 
  14. System.out.println(run + " going to be shutdown"); 
  15. while (pool.awaitTermination(1, TimeUnit.SECONDS)) {   
  16. System.out.println("The thread pool has been shutdown " + new Date()); 
  17. executeTasks(runners); // 重新執(zhí)行 
  18. Thread.sleep(200); 
  19. } catch (Exception e1) { 
  20. e1.printStackTrace(); 
  21. try { 
  22. Thread.sleep(500); 
  23. } catch (InterruptedException e) { 
  24. public static void main(String[] args) { 
  25. Set<FileTask> tasks = new HashSet<FileTask>(); 
  26.         
  27. FileTask task = new FileTask(); 
  28. task.setInterval(1); 
  29. task.setName("task-1"); 
  30. tasks.add(task); 
  31.                
  32. FileTask task1 = new FileTask(); 
  33. task1.setInterval(2); 
  34. task.setName("task-2"); 
  35. tasks.add(task1); 
  36.         
  37. TaskManager  codeManager = new TaskManager (tasks); 
  38. new Thread(codeManager).start(); 
  39. }    

成功!把整個的ThreadPoolExector里所有的worker全部停止,之后再向其隊(duì)列里重新加入要執(zhí)行的兩個task(注意這里并沒有清空,只是停止而已)。這樣做雖然能夠及時處理task,但是一個很致命的缺點(diǎn)在于,如果不能明確的知道ThreadPoolExecutor要執(zhí)行的task,就沒有辦法重新執(zhí)行這些任務(wù)。#p#

定制線程池

好吧!停止鉆研別人的東西!我們完全可以自己寫一個自己的ThreadPoolExecutor,只要把worker暴露出來就可以了。這里是不是回想起前面的start問題來了,沒錯,我們即便能夠直接針對Thread進(jìn)行interrupt, 但是不能再次start它了。那么clone一個同樣的Thread行不行呢?#p#

Thread

  1. @Override 
  2. protected Object clone() throws CloneNotSupportedException { 
  3. throw new CloneNotSupportedException(); 

 

答案顯而易見,線程是不支持clone 的。我們需要重新new 一個Thread來重新運(yùn)行。其實(shí)我們只需要將原來的Worker里的Runnable換成我們自己的task,然后將訪問權(quán)限適當(dāng)放開就可以了。還有,就是讓我們的CustomThreadPoolExecutor繼承Thread,因?yàn)樗枰〞r監(jiān)控自己的所有的worker里Thread的運(yùn)行狀態(tài)。

CustomThreadPoolExecutor

  1. public class CustomThreadPoolExecutor extends ThreadPoolExecutor implements Runnable {  
  2. public void execute(Testask command) { 
  3. ….//將執(zhí)行接口改為接收我們的業(yè)務(wù)類 
  4. … 
  5. … 
  6. private final class Worker 
  7. extends AbstractQueuedSynchronizer 
  8. implements Runnable 
  9. … 
  10. Testask firstTask; //將Runnable改為我們的業(yè)務(wù)類,方便查看狀態(tài) 
  11. … 
  12. Worker(Testask firstTask) { 
  13. …//同樣將初始化參數(shù)改為我們的業(yè)務(wù)類 
  14.  
  15. public static void main(String[] args) { 
  16. CustomThreadPoolExecutor pool = new CustomThreadPoolExecutor(0, Integer.MAX_VALUE, 
  17. 60L, TimeUnit.SECONDS, 
  18. new SynchronousQueue<Runnable>()); 
  19.          
  20. Testask task = new Testask(); 
  21. task.setInterval(1); 
  22. pool.execute(task); 
  23.          
  24. Testask task1 = new Testask(); 
  25. task1.setInterval(2); 
  26. pool.execute(task1); 
  27.          
  28. new Thread(pool).start(); 
  29.  
  30. @Override 
  31. public void run() { 
  32. while (!Thread.currentThread().isInterrupted()) { 
  33. try { 
  34. long current = System.currentTimeMillis(); 
  35. Set<Testask> toReExecute = new HashSet<Testask>(); 
  36. System.out.println("\t number is " + number); 
  37. for (Worker wrapper : workers) { 
  38. Testask tt = wrapper.firstTask; 
  39. if (tt != null) { 
  40. if (current - tt.getLastExecTime() > tt.getInterval() * 5 * 1000) { 
  41. wrapper.interruptIfStarted(); 
  42. remove(tt); 
  43. if (tt.getFiles() != null) { 
  44. for (File file : tt.getFiles()) { 
  45. file.delete(); 
  46. System.out.println("THread is timeout : " + tt + " " + new Date()); 
  47. toReExecute.add(tt); 
  48. if (toReExecute.size() > 0) { 
  49. mainLock.lock(); 
  50. try { 
  51. for (Testask tt : toReExecute) { 
  52. execute(tt);    // execute this task again 
  53. }  
  54. } finally { 
  55. mainLock.unlock(); 
  56. } catch (Exception e1) { 
  57. System.out.println("Error happens when we trying to interrupt and restart a code task "); 
  58. try { 
  59. Thread.sleep(500); 
  60. } catch (InterruptedException e) { 

Testask

  1. class Testask implements Runnable { 
  2. ….. 
  3.  
  4. @Override 
  5. public void run() { 
  6. while (!Thread.currentThread().isInterrupted()) { 
  7. lastExecTime = System.currentTimeMillis(); 
  8. System.out.println(Thread.currentThread().getName() + " is running -> " + new Date()); 
  9. try { 
  10. CustomThreadPoolExecutor.number++; 
  11. Thread.sleep(getInterval() * 6 * 1000); 
  12.                 System.out.println(Thread.currentThread().getName() + " after sleep"); 
  13. } catch (InterruptedException e) { 
  14. Thread.currentThread().interrupt(); 
  15. System.out.println("InterruptedException happens"); 
  16. System.out.println("Going to die"); 

最終方案

綜上,最穩(wěn)妥的就是使用JDK自帶的ThreadPoolExecutor, 如果需要對池里的task進(jìn)行任意時間的控制,可以考慮全面更新,全方面,360度無死角的定制自己的線程池當(dāng)然是***的方案,但是一定要注意對于共享對象的處理,適當(dāng)?shù)奶幚砗貌l(fā)訪問共享對象的方法。

 

鑒于我們的場景,由于時間緊,而且需要了解的task并不多,暫時選用全部重新更新的策略。上線后,抽時間把自己定制的ThreadPoolExecutor搞定,然后更新上去!

 

責(zé)任編輯:守望幸福 來源: 51CTO.com
相關(guān)推薦

2017-07-17 15:46:20

Oracle并行機(jī)制

2024-02-04 08:26:38

線程池參數(shù)內(nèi)存

2024-04-01 08:05:27

Go開發(fā)Java

2024-05-06 00:00:00

緩存高并發(fā)數(shù)據(jù)

2018-01-10 13:40:03

數(shù)據(jù)庫MySQL表設(shè)計(jì)

2019-10-30 14:44:41

Prometheus開源監(jiān)控系統(tǒng)

2025-04-14 09:31:03

2025-04-29 10:17:42

2018-09-11 09:14:52

面試公司缺點(diǎn)

2023-03-13 13:36:00

Go擴(kuò)容切片

2020-11-03 13:50:31

Redis緩存數(shù)據(jù)庫

2022-04-26 21:49:55

Spring事務(wù)數(shù)據(jù)庫

2024-12-13 08:21:04

2018-01-10 06:17:24

2024-04-10 08:39:56

BigDecimal浮點(diǎn)數(shù)二進(jìn)制

2019-12-12 14:32:26

SQL語句數(shù)據(jù)庫

2019-02-19 09:46:58

美圖容器化Kubernetes

2021-09-11 15:26:23

Java多線程線程池

2022-07-06 11:47:27

JAVAfor循環(huán)

2024-10-08 08:14:08

用戶生命周期分析服務(wù)
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號