Spring Boot + Nacos 實(shí)現(xiàn)了一個(gè)動(dòng)態(tài)化線程池,非常實(shí)用!
在后臺(tái)開發(fā)中,會(huì)經(jīng)常用到線程池技術(shù),對(duì)于線程池核心參數(shù)的配置很大程度上依靠經(jīng)驗(yàn)。然而,由于系統(tǒng)運(yùn)行過程中存在的不確定性,我們很難一勞永逸地規(guī)劃一個(gè)合理的線程池參數(shù)。在對(duì)線程池配置參數(shù)進(jìn)行調(diào)整時(shí),一般需要對(duì)服務(wù)進(jìn)行重啟,這樣修改的成本就會(huì)偏高。一種解決辦法就是,將線程池的配置放到平臺(tái)側(cè),運(yùn)行開發(fā)同學(xué)根據(jù)系統(tǒng)運(yùn)行情況對(duì)核心參數(shù)進(jìn)行動(dòng)態(tài)配置。
本文以Nacos作為服務(wù)配置中心,以修改線程池核心線程數(shù)、最大線程數(shù)為例,實(shí)現(xiàn)一個(gè)簡(jiǎn)單的動(dòng)態(tài)化線程池。
代碼實(shí)現(xiàn)
1.依賴
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
<version>2021.1</version>
</dependency>
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId>
<version>2021.1</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
2.配置yml文件
bootstrap.yml:
server:
port: 8010
# 應(yīng)用名稱(nacos會(huì)將該名稱當(dāng)做服務(wù)名稱)
spring:
application:
name: order-service
cloud:
nacos:
discovery:
namespace: public
server-addr: 192.168.174.129:8848
config:
server-addr: 192.168.174.129:8848
file-extension: yml
application.yml:
spring:
profiles:
active: dev
為什么要配置兩個(gè)yml文件?
springboot中配置文件的加載是存在優(yōu)先級(jí)順序的,bootstrap優(yōu)先級(jí)高于application。
nacos在項(xiàng)目初始化時(shí),要保證先從配置中心進(jìn)行配置拉取,拉取配置之后才能保證項(xiàng)目的正常啟動(dòng)。
3.nacos配置
登錄到nacos管理頁面,新建配置,如下圖所示:
圖片
注意Data ID的命名格式為,${spring.application.name}-${spring.profile.active}.${spring.cloud.nacos.config.file-extension} ,在本文中,Data ID的名字就是order-service-dev.yml。
圖片
這里我們只配置了兩個(gè)參數(shù),核心線程數(shù)量和最大線程數(shù)。
4.線程池配置和nacos配置變更監(jiān)聽
@RefreshScope
@Configuration
public class DynamicThreadPool implements InitializingBean {
@Value("${core.size}")
private String coreSize;
@Value("${max.size}")
private String maxSize;
private static ThreadPoolExecutor threadPoolExecutor;
@Autowired
private NacosConfigManager nacosConfigManager;
@Autowired
private NacosConfigProperties nacosConfigProperties;
@Override
public void afterPropertiesSet() throws Exception {
//按照nacos配置初始化線程池
threadPoolExecutor = new ThreadPoolExecutor(Integer.parseInt(coreSize), Integer.parseInt(maxSize), 10L, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(10),
new ThreadFactoryBuilder().setNameFormat("c_t_%d").build(),
new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
System.out.println("rejected!");
}
});
//nacos配置變更監(jiān)聽
nacosConfigManager.getConfigService().addListener("order-service-dev.yml", nacosConfigProperties.getGroup(),
new Listener() {
@Override
public Executor getExecutor() {
return null;
}
@Override
public void receiveConfigInfo(String configInfo) {
//配置變更,修改線程池配置
System.out.println(configInfo);
changeThreadPoolConfig(Integer.parseInt(coreSize), Integer.parseInt(maxSize));
}
});
}
/**
* 打印當(dāng)前線程池的狀態(tài)
*/
public String printThreadPoolStatus() {
return String.format("core_size:%s,thread_current_size:%s;" +
"thread_max_size:%s;queue_current_size:%s,total_task_count:%s", threadPoolExecutor.getCorePoolSize(),
threadPoolExecutor.getActiveCount(), threadPoolExecutor.getMaximumPoolSize(), threadPoolExecutor.getQueue().size(),
threadPoolExecutor.getTaskCount());
}
/**
* 給線程池增加任務(wù)
*
* @param count
*/
public void dynamicThreadPoolAddTask(int count) {
for (int i = 0; i < count; i++) {
int finalI = i;
threadPoolExecutor.execute(new Runnable() {
@Override
public void run() {
try {
System.out.println(finalI);
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
}
}
/**
* 修改線程池核心參數(shù)
*
* @param coreSize
* @param maxSize
*/
private void changeThreadPoolConfig(int coreSize, int maxSize) {
threadPoolExecutor.setCorePoolSize(coreSize);
threadPoolExecutor.setMaximumPoolSize(maxSize);
}
}
這個(gè)代碼就是實(shí)現(xiàn)動(dòng)態(tài)線程池和核心了,需要說明的是:
- @RefreshScope:這個(gè)注解用來支持nacos的動(dòng)態(tài)刷新功能;
- @Value("${max.size}"),@Value("${core.size}"):這兩個(gè)注解用來讀取我們上一步在nacos配置的具體信息;同時(shí),nacos配置變更時(shí),能夠?qū)崟r(shí)讀取到變更后的內(nèi)容
- nacosConfigManager.getConfigService().addListener:配置監(jiān)聽,nacos配置變更時(shí)實(shí)時(shí)修改線程池的配置。
5.controller
為了觀察線程池動(dòng)態(tài)變更的效果,增加Controller類。
@RestController
@RequestMapping("/threadpool")
public class ThreadPoolController {
@Autowired
private DynamicThreadPool dynamicThreadPool;
/**
* 打印當(dāng)前線程池的狀態(tài)
*/
@GetMapping("/print")
public String printThreadPoolStatus() {
return dynamicThreadPool.printThreadPoolStatus();
}
/**
* 給線程池增加任務(wù)
*
* @param count
*/
@GetMapping("/add")
public String dynamicThreadPoolAddTask(int count) {
dynamicThreadPool.dynamicThreadPoolAddTask(count);
return String.valueOf(count);
}
}
6.測(cè)試
啟動(dòng)項(xiàng)目,訪問http://localhost:8010/threadpool/print打印當(dāng)前線程池的配置。
圖片
可以看到,這個(gè)就是我們之前在nacos配置的線程數(shù)。
訪問http://localhost:8010/threadpool/add?count=20增加20個(gè)任務(wù),重新打印線程池配置
圖片
可以看到已經(jīng)有線程在排隊(duì)了。
為了能夠看到效果,我們多訪問幾次/add接口,增加任務(wù)數(shù),在控制臺(tái)出現(xiàn)拒絕信息時(shí)調(diào)整nacos配置。
圖片
此時(shí),執(zhí)行/add命令時(shí),所有的線程都會(huì)提示rejected。
調(diào)整nacos配置,將核心線程數(shù)調(diào)整為50,最大線程數(shù)調(diào)整為100.
圖片
重新多次訪問/add接口增加任務(wù),發(fā)現(xiàn)沒有拒絕信息了。這時(shí),打印具體的線程狀態(tài),發(fā)現(xiàn)線程池參數(shù)修改成功。
總結(jié)
這里,只是簡(jiǎn)單實(shí)現(xiàn)了一個(gè)可以調(diào)整核心線程數(shù)和最大線程數(shù)的動(dòng)態(tài)線程池。具體的線程池實(shí)現(xiàn)原理可以參考美團(tuán)的這篇文章:https://tech.meituan.com/2020/04/02/java-pooling-pratice-in-meituan.html,結(jié)合監(jiān)控告警等實(shí)現(xiàn)一個(gè)完善的動(dòng)態(tài)線程池產(chǎn)品。
優(yōu)秀的輪子還有好多,比如Hippo4J ,使用起來和dynamic-tp差不多。Hippo4J 有無依賴中間件實(shí)現(xiàn)動(dòng)靜線程池,也有默認(rèn)實(shí)現(xiàn)Nacos和Apollo的版本,而dynamic-tp 默認(rèn)實(shí)現(xiàn)依賴Nacos或Apollo。