響應(yīng)時間是接口監(jiān)控的黃金指標(biāo)之一:假設(shè)接口接收請求的時間是t1,接口處理完請求,響應(yīng)的時間是t2,則接口響應(yīng)時間是:t2-t1,將響應(yīng)時間指標(biāo)接入監(jiān)控報警系統(tǒng),當(dāng)響應(yīng)時間大于閾值的時候則進行報警;但是在線程被阻塞的情況下,由于接口一直沒有返回,響應(yīng)時間也就無法監(jiān)控到。
背景介紹
在過去處理過的服務(wù)故障中,有一類比較典型的場景是業(yè)務(wù)線程被阻塞(造成阻塞的原因也是多種多樣),慢慢導(dǎo)致業(yè)務(wù)線程池中的全部線程被阻塞,最終造成無法對外提供服務(wù)(現(xiàn)象則是CPU、Load、內(nèi)存等指標(biāo)都比較低,請求接口后響應(yīng)超時或者沒有響應(yīng))。
問題分析
響應(yīng)時間是接口監(jiān)控的黃金指標(biāo)之一:假設(shè)接口接收請求的時間是t1,接口處理完請求,響應(yīng)的時間是t2,則接口響應(yīng)時間是:t2-t1,將響應(yīng)時間指標(biāo)接入監(jiān)控報警系統(tǒng),當(dāng)響應(yīng)時間大于閾值的時候則進行報警;但是在線程被阻塞的情況下,由于接口一直沒有返回,響應(yīng)時間也就無法監(jiān)控到。
阻塞的線程往往是業(yè)務(wù)線程,這些業(yè)務(wù)線程可能是:
- 基于tomcat提供http服務(wù)的tomcat線程,線程名類似:http-nio-8080-exec-1
- 基于RocketMQ的消息消費者線程,線程名類似:ConsumeMessageThread_1
- 基于HSF Provider的線程,線程名類似:HSFBizProcessor-DEFAULT-12-thread-3
- … …
如果我們能夠在這些業(yè)務(wù)線程執(zhí)行的必經(jīng)路徑上進行攔截,那么就能記錄下線程開始執(zhí)行的時間,同時啟動定時器不斷檢查線程已執(zhí)行的時間,當(dāng)已執(zhí)行時間大于設(shè)定的閾值則打印出線程棧進行報警;當(dāng)線程正常返回則刪除該線程記錄,所以需要解決的主要是兩個問題:
- 如何攔截線程
- 定時檢查線程執(zhí)行時間是否超過閾值
解決思路
通過問題分析,可以確定主要需要解決以下兩個問題
檢測阻塞線程
該模塊主要做三件事:
- 業(yè)務(wù)線程開始執(zhí)行的時候,進行線程注冊
- 業(yè)務(wù)線程結(jié)束執(zhí)行或拋異常的時候,刪除線程注冊信息
- 定時檢測注冊的線程是否發(fā)生阻塞,如果發(fā)生阻塞則打印線程棧
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
public class BlockedThreadChecker {
protected final static Log logger = LogFactory.getLog(BlockedThreadChecker.class);
private static volatile BlockedThreadChecker instance;
private final static int DELAY = 10;
private final static int PERIOD = 1000;
private ScheduledThreadPoolExecutor executor;
private final Map<Thread, Task> threads = new ConcurrentHashMap<>();
private BlockedThreadChecker(){
logger.info("init BlockedThreadChecker... ...classloader:" + this.getClass().getClassLoader() + ",parent classloader:" + this.getClass().getClassLoader().getParent());
int coreSize = Runtime.getRuntime().availableProcessors();
ThreadFactory threadFactory = new ThreadFactory() {
final AtomicInteger counter = new AtomicInteger();
@Override
public Thread newThread(Runnable r){
Thread thread = new Thread(r, "BlockThreadCheckerTimer-" + counter.incrementAndGet());
thread.setDaemon(true);
return thread;
}
};
executor = new ScheduledThreadPoolExecutor(coreSize, threadFactory);
executor.scheduleAtFixedRate(new Runnable() {
@Override
public void run(){
long now = System.currentTimeMillis();
for(Map.Entry<Thread,Task> entry : threads.entrySet()){
long execStart = entry.getValue().startTime;
long dur = now - execStart;
if(dur >= entry.getValue().maxExecTime){
BlockedThreadException e = new BlockedThreadException(entry.getKey().getName() + " has been blocked " + dur + " ms");
e.setStackTrace(entry.getKey().getStackTrace());
logger.error(e.getMessage(),e);
}
}
}
},DELAY,PERIOD, TimeUnit.MILLISECONDS);
}
public static BlockedThreadChecker getInstance(){
if(instance != null){
return instance;
}
synchronized (BlockedThreadChecker.class){
if(instance != null){
return instance;
}
instance = new BlockedThreadChecker();
}
return instance;
}
public void registerThread(Thread thread){
registerThread(thread, new Task());
}
public void registerThread(Thread thread,Task task){
threads.put(thread, task);
logger.info("registerThread " + thread.getName());
}
public void unregisterThread(Thread thread){
threads.remove(thread);
logger.info("unregisterThread " + thread.getName());
}
class Task {
long startTime = System.currentTimeMillis();
long maxExecTime = 10000L;
}
}
攔截線程
方案一
服務(wù)中幾種常見業(yè)務(wù)線程:
- 基于tomcat提供http服務(wù)的tomcat線程,通過實現(xiàn)自定義Filter,在Filter中完成線程的注冊和取消注冊操作;
- 基于RocketMQ的消息消費者線程,根據(jù)業(yè)務(wù)需求統(tǒng)一實現(xiàn)MessageListenerConcurrently、MessageListenerOrderly等,在統(tǒng)一實現(xiàn)類中完成線程的注冊和取消注冊;
- 基于HSF Provider的線程,通過實現(xiàn)自定義Filter,在Filter中完成線程的注冊和取消注冊操作。
該方案實現(xiàn)簡單,但是對于業(yè)務(wù)侵入性比較強,侵入性強意味著業(yè)務(wù)在意識不到問題的時候,沒有改變的動力。
方案二
基于jvm-sandbox實現(xiàn)自定義module,實現(xiàn)思路如下:
import com.alibaba.jvm.sandbox.api.Information;
import com.alibaba.jvm.sandbox.api.LoadCompleted;
import com.alibaba.jvm.sandbox.api.Module;
import com.alibaba.jvm.sandbox.api.listener.ext.Advice;
import com.alibaba.jvm.sandbox.api.listener.ext.AdviceListener;
import com.alibaba.jvm.sandbox.api.listener.ext.EventWatchBuilder;
import com.alibaba.jvm.sandbox.api.resource.ModuleEventWatcher;
import org.kohsuke.MetaInfServices;
import sun.misc.Unsafe;
import javax.annotation.Resource;
import java.lang.reflect.Field;
import java.util.Properties;
@MetaInfServices(Module.class)
@Information(id = "blocked-thread-module", version = "0.0.1", author = "yuji")
public class BlockedThreadModule implements Module, LoadCompleted {
@Resource
private ModuleEventWatcher moduleEventWatcher;
private AdviceListener adviceListener = new AdviceListener() {
@Override
protected void before(Advice advice) throws Throwable {
if (!advice.isProcessTop()) {
return;
}
BlockedThreadChecker.getInstance().registerThread(Thread.currentThread());
}
@Override
protected void afterReturning(Advice advice){
if (!advice.isProcessTop()) {
return;
}
BlockedThreadChecker.getInstance().unregisterThread(Thread.currentThread());
}
@Override
protected void afterThrowing(Advice advice){
if (!advice.isProcessTop()) {
return;
}
BlockedThreadChecker.getInstance().unregisterThread(Thread.currentThread());
}
};
@Override
public void loadCompleted(){
new EventWatchBuilder(moduleEventWatcher)
.onClass("javax.servlet.http.HttpServlet")
.onBehavior("service")
.onWatch(adviceListener);
new EventWatchBuilder(moduleEventWatcher)
.onClass("com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently")
.includeSubClasses()
.onBehavior("consumeMessage")
.onWatch(adviceListener);
new EventWatchBuilder(moduleEventWatcher)
.onClass("com.alibaba.rocketmq.client.consumer.listener.MessageListenerOrderly")
.includeSubClasses()
.onBehavior("consumeMessage")
.onWatch(adviceListener);
new EventWatchBuilder(moduleEventWatcher)
.onClass("com.taobao.hsf.remoting.provider.ReflectInvocationHandler")
.includeSubClasses()
.onBehavior("invoke")
.onWatch(adviceListener);
}
}
通過在應(yīng)用啟動參數(shù)中增加javaagent=jvm-sandbox agent的方式來使用,相比較方案一業(yè)務(wù)應(yīng)用不需要改動任何代碼,也不需要對已有封裝的框架進行修改,缺點是jvm-sandbox需要提前部署到每個應(yīng)用的機器上,會給運維帶來工作量,個人認(rèn)為這種方案是最穩(wěn)定的。
方案三
為了避免方案二中運維工作,一種思路是以jar包的形式提供給業(yè)務(wù)方使用,業(yè)務(wù)方引入jar包就可以了
,主要面臨兩個問題需要解決。
如何觸發(fā)jar包執(zhí)行初始化邏輯
一種方式是通過spring boot starter的方式,比如
arthas-spring-boot-starter;
一種是根據(jù)spring容器初始化流程,選擇某個切入點,比如通過實現(xiàn)ApplicationListener接口,監(jiān)聽spring初始化完成的ApplicationEvent來實現(xiàn)。
如何初始化jvm-sandbox
初始化的核心邏輯如下:
//通過ByteBuddyAgent獲取Instrumentation
Instrumentation inst = ByteBuddyAgent.install();
//將相應(yīng)版本的sandbox-spy.jar添加到BootstrapClassLoader搜索路徑中
//這一步的操作是由于sandbox-spy中包名是以java開頭的,所以只能通過BootstrapClassLoader進行加載
JarFile spyJarFile = new JarFile("/目錄/sandbox-spy-version.jar");
inst.appendToBootstrapClassLoaderSearch(spyJarFile);
//構(gòu)造jvm-sandbox CoreFeatureString
String sandboxCoreFeatureString = String.format(";system_module=%s;mode=%s;sandbox_home=%s;provider=%s;namespace=%s;unsafe.enable=true;",systemModule, "agent", sandboxHome, provider, NAMESPACE );
CoreConfigure coreConfigure = CoreConfigure.toConfigure(sandboxCoreFeatureString,null);
CoreLoadedClassDataSource classDataSource = new DefaultCoreLoadedClassDataSource(inst,true);
ProviderManager providerManager = new DefaultProviderManager(coreConfigure);
//核心類,用戶自定義的module是在這個類中完成加載和初始化的
CoreModuleManager coreModuleManager = new DefaultCoreModuleManager(coreConfigure,inst,classDataSource,providerManager);
//初始化命名空間與SpyHandler對于關(guān)系
SpyUtils.init(NAMESPACE);
//加載各種module
coreModuleManager.reset();
上面代碼總體邏輯是沒有問題的,需要考慮的細節(jié)是上面代碼在不同類加載器體系下的兼容問題。
Tomcat

tomcat類加載器關(guān)系
pandora可運行jar包

pandora類加載器關(guān)系
idea

idea應(yīng)用類加載器關(guān)系
經(jīng)驗總結(jié)
從目前的三種方案來說,個人比較傾向方案二。
參考資料
bytebuddy
jvm-sandbox
arthas