張開(kāi)濤:線程的中斷、超時(shí)與降級(jí)
最近一位朋友在公眾號(hào)留言問(wèn)一個(gè)關(guān)于熔斷的問(wèn)題:使用hystrix進(jìn)行httpclient超時(shí)熔斷錯(cuò)誤,我是順序操作的(沒(méi)有并發(fā)),發(fā)現(xiàn)hystrix會(huì)超時(shí)斷開(kāi),但是會(huì)導(dǎo)致hystrix線程池不斷增多,直到后面因線程池裝不下拒絕?
而該問(wèn)題跟線程中斷、超時(shí)與降級(jí)等有關(guān),因此本文將詳細(xì)介紹導(dǎo)致這個(gè)問(wèn)題背后的原因。
當(dāng)我們?cè)诰€程中執(zhí)行如用戶請(qǐng)求任務(wù)時(shí),比如HTTP處理線程,最擔(dān)心的什么?
- 線程數(shù)***增長(zhǎng);
- 線程執(zhí)行時(shí)間長(zhǎng);
- 線程不可中斷。
對(duì)于線程數(shù)***增長(zhǎng),我們可以通過(guò)使用線程池來(lái)控制線程數(shù)量,控制線程不是***增長(zhǎng)的。
對(duì)于線程執(zhí)行時(shí)間長(zhǎng),我們應(yīng)設(shè)置合理的超時(shí)時(shí)間來(lái)保障線程執(zhí)行時(shí)間可控,當(dāng)超時(shí)時(shí)要么返回給用戶錯(cuò)誤頁(yè)面,要么可以返回降級(jí)頁(yè)面。
對(duì)于線程不可中斷,我們應(yīng)想辦法將線程設(shè)計(jì)的可中斷,從而在遇到問(wèn)題可中斷線程并降級(jí)處理。
接下來(lái)的部分將主要講解線程中斷。
線程中斷是通過(guò)Thread.interrupt()方法來(lái)做,一般是在A線程來(lái)中斷B線程。
首先我們來(lái)看下該方法的一些Javadoc描述:
1. 如果線程被Object的wait()、wait(long)、wait(long, int) 或者
Thread的join()、join(long)、join(long, int)、sleep(long)、sleep(long, int)方法阻塞,執(zhí)行線程中斷,且拋出InterruptedException,但中斷狀態(tài)并清空重置,即Thread. isInterrupted()返回false;
2. 如果線程被java.nio.channels.InterruptibleChannel上的一個(gè)I/O操作阻塞,執(zhí)行線程中斷,且該
InterruptibleChannel將被關(guān)閉,拋出java.nio.channels.ClosedByInterruptException,線程中斷狀態(tài)會(huì)設(shè)置,即Thread. isInterrupted()返回true;
3. 如果線程被java.nio.channels.Selector阻塞,執(zhí)行線程中斷,該Selector#select()方法將立即返回,相當(dāng)于調(diào)用了java.nio.channels.Selector#wakeup(),不會(huì)拋出異常,但會(huì)設(shè)置中斷狀態(tài),即Thread. isInterrupted()返回true;
4. 如果不滿足以上條件的,那么執(zhí)行線程中斷不會(huì)拋出異常,僅設(shè)置中斷狀態(tài),即Thread. isInterrupted()返回true。也就是說(shuō)我們代碼要根據(jù)該狀態(tài)來(lái)決定下一步怎么做。
從如上描述可以看出,如果方法異常描述上有拋出
InterruptedException、ClosedByInterruptException異常的,說(shuō)明該方法可以中斷,如“public final native void wait(longtimeout) throws InterruptedException”,但是中斷狀態(tài)會(huì)被會(huì)被重置要看其Javadoc描述。其他情況基本都是設(shè)置中斷狀態(tài)而不會(huì)中斷掉操作。
BIO(Blocking I/O)操作不可中斷
如java.net.Socket讀寫網(wǎng)絡(luò)I/O時(shí)是阻塞的,除了設(shè)置超時(shí)時(shí)間外,還應(yīng)該考慮讓它可中斷或者盡早中斷??梢詤⒖肌赌愕腏ava代碼可中斷嗎》。還有如JDBC驅(qū)動(dòng)mysql-connector-java、HttpClient等大部分都是使用BIO,它們也是不可中斷的。
NIO(New I/O)操作可中斷
NIO涉及到兩部分:
java.nio.channels.Selector和java.nio.channels.InterruptibleChannel,它們是可中斷的。如java.nio.channels#SocketChannel實(shí)現(xiàn)了InterruptibleChannel,如下方法都是可中斷的,并會(huì)拋出ClosedByInterruptException異常:
- connect(SocketAddress remote)
- read(ByteBuffer[] dsts, int offset, int length)
- read(ByteBuffer[] dsts)
- write(ByteBuffer src)
線程、BIO與中斷
我們使用BIO實(shí)現(xiàn)的HttpClient來(lái)做個(gè)實(shí)驗(yàn),如下代碼所示:
- public class BlockingIOTest {
- public static void main(String[] args) throws Exception {
- Thread threadA = new Thread(()-> {
- try {
- //該阻塞5s
- String url = "http://localhost:9090/ajax";
- //HttpClient是BIO,不可中斷
- HttpResponse response = HttpClientUtils.getHttpClient().execute(new HttpGet(url));
- System.out.println("http status code : " + response.getStatusLine().getStatusCode());
- //雖然在threadB執(zhí)行了threadA線程中斷
- //但是僅僅是設(shè)置了中斷狀態(tài)為true
- //并沒(méi)有中斷線程A的執(zhí)行,該線程還是正常的執(zhí)行完成了
- System.out.println("threadA is interrupted: " + Thread.currentThread().isInterrupted());
- } catch (Exception e) {
- e.printStackTrace();
- }
- });
- Thread threadB = new Thread(()-> {
- try {
- Thread.sleep(2000L);
- //休眠2s后,中斷線程A
- threadA.interrupt();
- } catch (Exception e) {
- }
- });
- threadA.start();
- threadB.start();
- Thread.sleep(15000L);
- }
- }
如上代碼的輸出結(jié)果為:
- http status code : 200
- threadA is interrupted: true
如上代碼的執(zhí)行流程是這樣的:
- 線程A通過(guò)BIO實(shí)現(xiàn)HttpClient遠(yuǎn)程調(diào)用http://localhost:9090/ajax獲取數(shù)據(jù),而該服務(wù)需要5s才能響應(yīng);
- 線程B在線程A執(zhí)行2s后進(jìn)行了中斷處理,但是線程A調(diào)用的HttpClient是阻塞且不可中斷的操作,僅僅是設(shè)置了線程A的中斷狀態(tài)為true,因此其一直等待網(wǎng)絡(luò)I/O完成;
- 當(dāng)線程A從遠(yuǎn)程獲取到結(jié)果后繼續(xù)執(zhí)行,Thread.currentThread().isInterrupted()將輸出true,表示線程A被設(shè)置了中斷狀態(tài)。
從而需要注意設(shè)置了中斷狀態(tài)與中斷執(zhí)行不是一回事。因此對(duì)于使用BIO,一定要設(shè)置好連接和讀寫的超時(shí)時(shí)間,另外可以參考《你的Java代碼可中斷嗎》進(jìn)行可中斷設(shè)計(jì)。
線程池、Future與中斷
我們往線程池提交一個(gè)HttpClient任務(wù),并通過(guò)Future來(lái)等待執(zhí)行結(jié)果,如下代碼所示:
- public class ThreadPoolTest {
- private static ExecutorService executorService = Executors.newFixedThreadPool(5);
- public static void main(String[] args) throws Exception {
- Future<Integer> futureA = executorService.submit((Callable) () -> {
- //該url會(huì)阻塞5s
- String url = "http://localhost:9090/ajax";
- //HttpClient是BIO,不可中斷
- HttpResponse response = HttpClientUtils.getHttpClient().execute(new HttpGet(url));
- Integer result = response.getStatusLine().getStatusCode();
- System.out.println("thread a result : " + result);
- return response.getStatusLine().getStatusCode();
- });
- Future<Integer> futureB = executorService.submit((Callable) () -> {
- //該url會(huì)阻塞5s
- String url = "http://localhost:9090/ajax";
- //HttpClient是BIO,不可中斷
- HttpResponse response = HttpClientUtils.getHttpClient().execute(new HttpGet(url));
- Integer result = response.getStatusLine().getStatusCode();
- System.out.println("thread b result : " + result);
- return result;
- });
- try {
- Integer resultA = futureA.get(100, TimeUnit.MILLISECONDS);
- } catch (TimeoutException e) {
- System.out.println("future a timeout");
- }
- try {
- Integer resultB = futureB.get(100, TimeUnit.MILLISECONDS);
- } catch (TimeoutException e) {
- System.out.println("future b timeout");
- }
- executorService.awaitTermination(10000L, TimeUnit.MILLISECONDS);
- }
- }
如上代碼的輸出結(jié)果為:
- future a timeout
- future b timeout
- thread a result : 200
- thread b result : 200
如上代碼的執(zhí)行流程是這樣的:
- 主線程往線程池提交了兩個(gè)HttpClient阻塞調(diào)用任務(wù),該任務(wù)響應(yīng)時(shí)間為5s;
- 主線程阻塞在兩個(gè)帶超時(shí)的Future等待上,F(xiàn)uture在等待線程池任務(wù)執(zhí)行結(jié)束,F(xiàn)uture的超時(shí)時(shí)間設(shè)置為100ms,所以很快就超時(shí)并返回了,主線程繼續(xù)執(zhí)行,在《億級(jí)流量》中我們用到了很多這種方法進(jìn)行并發(fā)獲取數(shù)據(jù)和降級(jí)或熔斷處理;
- 線程池中的兩個(gè)任務(wù)其實(shí)并沒(méi)有被中斷,還是占用著線程池中的線程,在后臺(tái)繼續(xù)執(zhí)行,直到完成。
從如上可以看出,使用Future時(shí)只是在主線程解除了阻塞,并沒(méi)有連帶把線程池任務(wù)取消掉,還是占用著線程并阻塞執(zhí)行。
之前有位同學(xué)在公眾號(hào)后臺(tái)留言咨詢:
使用hystrix進(jìn)行httpclient超時(shí)熔斷錯(cuò)誤,我是順序操作的(沒(méi)有并發(fā)),發(fā)現(xiàn)hystrix會(huì)超時(shí)斷開(kāi),但是會(huì)導(dǎo)致hystrix線程池不斷增多,直到后面因線程池裝不下拒絕?
看完如上示例,應(yīng)該能解決該讀者的疑惑。雖然熔斷了,但是線程中的操作并沒(méi)有真正的中斷,而是還占著線程資源。
接下來(lái)我們可以簡(jiǎn)單看下Future其中的一個(gè)實(shí)現(xiàn)FutureTask:
超時(shí)等待方法get(long timeout, TimeUnit unit)偽代碼:
- while(true) {
- if (Thread.interrupted()) {//如果當(dāng)前線程中斷了,處理現(xiàn)場(chǎng),并拋出中斷異常
- //some code
- throw new InterruptedException();
- }
- //判斷剩余休眠時(shí)間
- nanos = deadline - System.nanoTime();
- if (nanos <= 0L) {//如果沒(méi)有休眠時(shí)間了,則處理線程,并終止執(zhí)行
- //some code
- return state;
- }
- //休眠一段時(shí)間,內(nèi)部實(shí)現(xiàn)為UNSAFE.park(false, nanos)
- LockSupport.parkNanos(this, nanos);
- }
取消方法cancel(boolean mayInterruptIfRunning)偽代碼:
- if (mayInterruptIfRunning) {//中斷當(dāng)前線程
- Thread t = runner;
- if (t != null)
- t.interrupt();
- }
- //執(zhí)行UNSAFE.unpark(thread)喚醒休眠的當(dāng)前線程
- LockSupport.unpark(t);
即當(dāng)我們調(diào)用Future#cancel時(shí),是通過(guò)喚醒Future所在線程實(shí)現(xiàn),當(dāng)然實(shí)際是比這個(gè)要復(fù)雜的。
回填結(jié)果方法set(V v)偽代碼:
- //修改Future狀態(tài)為完成
- //保持v的值,從而Future#get能獲取到
- //通過(guò)LockSupport.unpark(t)喚醒休眠的線程
當(dāng)線程池中的線程執(zhí)行完成后,是通過(guò)Future#set把值設(shè)置回Future,從而喚醒休眠的線程,即阻塞在Future#get的等待,然后獲取到該結(jié)果。
鎖與中斷
synchronized和ReentrantLock#lock()在獲取鎖的過(guò)程中是不可中斷的,假設(shè)出現(xiàn)了死鎖將一直僵持在那,無(wú)法終止阻塞。但我們可以使用可中斷的
ReentrantLock#lockInterruptibly()方法或者ReentrantLock#tryLock(long timeout, TimeUnit unit)實(shí)現(xiàn)可中斷。
總結(jié)
在設(shè)計(jì)高可用系統(tǒng)時(shí),盡量使用線程池,而不是通過(guò)每個(gè)請(qǐng)求創(chuàng)建一個(gè)線程來(lái)實(shí)現(xiàn),通過(guò)線程池的拒絕策略來(lái)優(yōu)雅的拒絕無(wú)法處理的請(qǐng)求。
檢查整個(gè)請(qǐng)求鏈路設(shè)置合理的超時(shí)時(shí)間,跟調(diào)用方協(xié)商合理的SLA、降級(jí)限流方案。更長(zhǎng)的超時(shí)時(shí)間意味著出現(xiàn)問(wèn)題時(shí)請(qǐng)求堆積的越多,越可能產(chǎn)生雪崩。
明確知道自己的服務(wù)是否可中斷,如果不可中斷,應(yīng)該使用線程池和Future實(shí)現(xiàn)偽可中斷,通過(guò)Future配置合理的超時(shí)時(shí)間,當(dāng)超時(shí)時(shí)執(zhí)行相應(yīng)的降級(jí)策略。也要清楚的知道通過(guò)Future只是偽中斷,線程池中的任務(wù)還是在后臺(tái)執(zhí)行,當(dāng)Future超時(shí)后進(jìn)行重試時(shí),會(huì)對(duì)調(diào)用的服務(wù)產(chǎn)生更多的請(qǐng)求,從而造成一種DDos,一定要注意相應(yīng)的處理策略。
池大小、超時(shí)時(shí)間和中斷沒(méi)有***的配置策略,要根據(jù)自己的場(chǎng)景來(lái)動(dòng)態(tài)調(diào)整,在系統(tǒng)遇到高并發(fā)或者異常時(shí),我們要保護(hù)什么,放棄什么,要有權(quán)衡。
【本文是51CTO專欄作者“張開(kāi)濤”的原創(chuàng)文章,作者微信公眾號(hào):開(kāi)濤的博客( kaitao-1234567)】