自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

利用DUCC配置平臺(tái)實(shí)現(xiàn)一個(gè)動(dòng)態(tài)化線程池

開發(fā) 架構(gòu)
當(dāng)前項(xiàng)目中使用的是Spring 框架提供的線程池類ThreadPoolTaskExecutor,而ThreadPoolTaskExecutor底層又使用里了JDK中線程池類ThreadPoolExecutor,線程池類ThreadPoolExecutor有兩個(gè)成員方法setCorePoolSize、setMaximumPoolSize可以在運(yùn)行時(shí)設(shè)置核心線程數(shù)和最大線程數(shù)。

?作者:京東零售 張賓

1.背景

在后臺(tái)開發(fā)中,會(huì)經(jīng)常用到線程池技術(shù),對(duì)于線程池核心參數(shù)的配置很大程度上依靠經(jīng)驗(yàn)。然而,由于系統(tǒng)運(yùn)行過程中存在的不確定性,我們很難一勞永逸地規(guī)劃一個(gè)合理的線程池參數(shù)。在對(duì)線程池配置參數(shù)進(jìn)行調(diào)整時(shí),一般需要對(duì)服務(wù)進(jìn)行重啟,這樣修改的成本就會(huì)偏高。一種解決辦法就是,將線程池的配置放到配置平臺(tái)側(cè),系統(tǒng)運(yùn)行期間開發(fā)人員根據(jù)系統(tǒng)運(yùn)行情況對(duì)核心參數(shù)進(jìn)行動(dòng)態(tài)配置。

本文以公司DUCC配置平臺(tái)作為服務(wù)配置中心,以修改線程池核心線程數(shù)、最大線程數(shù)為例,實(shí)現(xiàn)一個(gè)簡(jiǎn)單的動(dòng)態(tài)化線程池。

2.代碼實(shí)現(xiàn)

當(dāng)前項(xiàng)目中使用的是Spring 框架提供的線程池類ThreadPoolTaskExecutor,而ThreadPoolTaskExecutor底層又使用里了JDK中線程池類ThreadPoolExecutor,線程池類ThreadPoolExecutor有兩個(gè)成員方法setCorePoolSize、setMaximumPoolSize可以在運(yùn)行時(shí)設(shè)置核心線程數(shù)和最大線程數(shù)。

setCorePoolSize方法執(zhí)行流程是:首先會(huì)覆蓋之前構(gòu)造函數(shù)設(shè)置的corePoolSize,然后,如果新的值比原始值要小,當(dāng)多余的工作線程下次變成空閑狀態(tài)的時(shí)候會(huì)被中斷并銷毀,如果新的值比原來的值要大且工作隊(duì)列不為空,則會(huì)創(chuàng)建新的工作線程。流程圖如下:

setMaximumPoolSize方法:首先會(huì)覆蓋之前構(gòu)造函數(shù)設(shè)置的maximumPoolSize,然后,如果新的值比原來的值要小,當(dāng)多余的工作線程下次變成空閑狀態(tài)的時(shí)候會(huì)被中斷并銷毀。

Spring 框架提供的線程池類ThreadPoolTaskExecutor,此類封裝了對(duì)ThreadPoolExecutor有兩個(gè)成員方法setCorePoolSize、setMaximumPoolSize的調(diào)用。

基于以上源代碼分析,要實(shí)現(xiàn)一個(gè)簡(jiǎn)單的動(dòng)態(tài)線程池需要以下幾步:

(1)定義一個(gè)動(dòng)態(tài)線程池類,繼承ThreadPoolTaskExecutor,目的跟非動(dòng)態(tài)配置的線程池類ThreadPoolTaskExecutor區(qū)分開;

(2)定義和實(shí)現(xiàn)一個(gè)動(dòng)態(tài)線程池配置定時(shí)刷的類,目的定時(shí)對(duì)比ducc配置的線程池?cái)?shù)和本地應(yīng)用中線程數(shù)是否一致,若不一致,則更新本地動(dòng)態(tài)線程池線程池?cái)?shù);

(3)引入公司ducc配置平臺(tái)相關(guān)jar包并創(chuàng)建一個(gè)動(dòng)態(tài)線程池配置key;

(4)定義和實(shí)現(xiàn)一個(gè)應(yīng)用啟動(dòng)后根據(jù)動(dòng)態(tài)線程池Bean和從ducc配置平臺(tái)拉取配置刷新應(yīng)用中的線程數(shù)配置;

接下來代碼一一實(shí)現(xiàn):

(1)動(dòng)態(tài)線程池類

/**
* 動(dòng)態(tài)線程池
*
*/
public class DynamicThreadPoolTaskExecutor extends ThreadPoolTaskExecutor {
}

(2)動(dòng)態(tài)線程池配置定時(shí)刷新類

@Slf4j
public class DynamicThreadPoolRefresh implements InitializingBean {
/**
* Maintain all automatically registered and manually registered DynamicThreadPoolTaskExecutor.
*/
private static final ConcurrentMap<String, DynamicThreadPoolTaskExecutor> DTP_REGISTRY = new ConcurrentHashMap<>();

/**
* @param threadPoolBeanName
* @param threadPoolTaskExecutor
*/
public static void registerDynamicThreadPool(String threadPoolBeanName, DynamicThreadPoolTaskExecutor threadPoolTaskExecutor) {
log.info("DynamicThreadPool register ThreadPoolTaskExecutor, threadPoolBeanName: {}, executor: {}", threadPoolBeanName, ExecutorConverter.convert(threadPoolBeanName, threadPoolTaskExecutor.getThreadPoolExecutor()));
DTP_REGISTRY.putIfAbsent(threadPoolBeanName, threadPoolTaskExecutor);
}

@Override
public void afterPropertiesSet() throws Exception {
this.refresh();
//創(chuàng)建定時(shí)任務(wù)線程池
ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(1, (new BasicThreadFactory.Builder()).namingPattern("DynamicThreadPoolRefresh-%d").daemon(true).build());
//延遲1秒執(zhí)行,每個(gè)1分鐘check一次
executorService.scheduleAtFixedRate(new RefreshThreadPoolConfig(), 1000L, 60000L, TimeUnit.MILLISECONDS);
}

private void refresh() {
String dynamicThreadPool = "";
try {
if (DTP_REGISTRY.isEmpty()) {
log.debug("DynamicThreadPool refresh DTP_REGISTRY is empty");
return;
}
dynamicThreadPool = DuccConfigUtil.getValue(DuccConfigConstants.DYNAMIC_THREAD_POOL);
if (StringUtils.isBlank(dynamicThreadPool)) {
log.debug("DynamicThreadPool refresh dynamicThreadPool not config");
return;
}
log.debug("DynamicThreadPool refresh dynamicThreadPool:{}", dynamicThreadPool);
List<ThreadPoolProperties> threadPoolPropertiesList = JsonUtil.json2Object(dynamicThreadPool, new TypeReference<List<ThreadPoolProperties>>() {
});
if (CollectionUtils.isEmpty(threadPoolPropertiesList)) {
log.error("DynamicThreadPool refresh dynamicThreadPool json2Object error!{}", dynamicThreadPool);
return;
}
for (ThreadPoolProperties properties : threadPoolPropertiesList) {
doRefresh(properties);
}
} catch (Exception e) {
log.error("DynamicThreadPool refresh exception!dynamicThreadPool:{}", dynamicThreadPool, e);
}
}

/**
* @param properties
*/
private void doRefresh(ThreadPoolProperties properties) {
if (StringUtils.isBlank(properties.getThreadPoolBeanName())
|| properties.getCorePoolSize() < 1
|| properties.getMaxPoolSize() < 1
|| properties.getMaxPoolSize() < properties.getCorePoolSize()) {
log.error("DynamicThreadPool refresh, invalid parameters exist, properties: {}", properties);
return;
}
DynamicThreadPoolTaskExecutor threadPoolTaskExecutor = DTP_REGISTRY.get(properties.getThreadPoolBeanName());
if (Objects.isNull(threadPoolTaskExecutor)) {
log.warn("DynamicThreadPool refresh, DTP_REGISTRY not found {}", properties.getThreadPoolBeanName());
return;
}
ThreadPoolProperties oldProp = ExecutorConverter.convert(properties.getThreadPoolBeanName(), threadPoolTaskExecutor.getThreadPoolExecutor());
if (Objects.equals(oldProp.getCorePoolSize(), properties.getCorePoolSize())
&& Objects.equals(oldProp.getMaxPoolSize(), properties.getMaxPoolSize())) {
log.warn("DynamicThreadPool refresh, properties of [{}] have not changed.", properties.getThreadPoolBeanName());
return;
}
if (!Objects.equals(oldProp.getCorePoolSize(), properties.getCorePoolSize())) {
threadPoolTaskExecutor.setCorePoolSize(properties.getCorePoolSize());
log.info("DynamicThreadPool refresh, corePoolSize changed!{} {}", properties.getThreadPoolBeanName(), properties.getCorePoolSize());
}
if (!Objects.equals(oldProp.getMaxPoolSize(), properties.getMaxPoolSize())) {
threadPoolTaskExecutor.setMaxPoolSize(properties.getMaxPoolSize());
log.info("DynamicThreadPool refresh, maxPoolSize changed!{} {}", properties.getThreadPoolBeanName(), properties.getMaxPoolSize());
}

ThreadPoolProperties newProp = ExecutorConverter.convert(properties.getThreadPoolBeanName(), threadPoolTaskExecutor.getThreadPoolExecutor());
log.info("DynamicThreadPool refresh result!{} oldProp:{},newProp:{}", properties.getThreadPoolBeanName(), oldProp, newProp);
}

private class RefreshThreadPoolConfig extends TimerTask {
private RefreshThreadPoolConfig() {
}

@Override
public void run() {
DynamicThreadPoolRefresh.this.refresh();
}
}

}

線程池配置類

@Data
public class ThreadPoolProperties {
/**
* 線程池名稱
*/
private String threadPoolBeanName;
/**
* 線程池核心線程數(shù)量
*/
private int corePoolSize;
/**
* 線程池最大線程池?cái)?shù)量
*/
private int maxPoolSize;
}

(3)引入公司ducc配置平臺(tái)相關(guān)jar包并創(chuàng)建一個(gè)動(dòng)態(tài)線程池配置key

配置value:

[
{
"threadPoolBeanName": "submitOrderThreadPoolTaskExecutor",
"corePoolSize": 32,
"maxPoolSize": 128
}
]

(4) 應(yīng)用啟動(dòng)刷新應(yīng)用本地動(dòng)態(tài)線程池配置

@Slf4j
public class DynamicThreadPoolPostProcessor implements BeanPostProcessor {

@Override
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
if (bean instanceof DynamicThreadPoolTaskExecutor) {
DynamicThreadPoolRefresh.registerDynamicThreadPool(beanName, (DynamicThreadPoolTaskExecutor) bean);
}
return bean;
}
}

3.動(dòng)態(tài)線程池應(yīng)用

動(dòng)態(tài)線程池Bean聲明

<!-- 普通線程池 -->
<bean id="threadPoolTaskExecutor" class="com.jd.concurrent.ThreadPoolTaskExecutorWrapper">
<!-- 核心線程數(shù),默認(rèn)為 -->
<property name="corePoolSize" value="128"/>
<!-- 最大線程數(shù),默認(rèn)為Integer.MAX_VALUE -->
<property name="maxPoolSize" value="512"/>
<!-- 隊(duì)列最大長(zhǎng)度,一般需要設(shè)置值>=notifyScheduledMainExecutor.maxNum;默認(rèn)為Integer.MAX_VALUE -->
<property name="queueCapacity" value="500"/>
<!-- 線程池維護(hù)線程所允許的空閑時(shí)間,默認(rèn)為60s -->
<property name="keepAliveSeconds" value="60"/>
<!-- 線程池對(duì)拒絕任務(wù)(無線程可用)的處理策略,目前只支持AbortPolicy、CallerRunsPolicy;默認(rèn)為后者 -->
<property name="rejectedExecutionHandler">
<!-- AbortPolicy:直接拋出java.util.concurrent.RejectedExecutionException異常 -->
<!-- CallerRunsPolicy:主線程直接執(zhí)行該任務(wù),執(zhí)行完之后嘗試添加下一個(gè)任務(wù)到線程池中,可以有效降低向線程池內(nèi)添加任務(wù)的速度 -->
<!-- DiscardOldestPolicy:拋棄舊的任務(wù)、暫不支持;會(huì)導(dǎo)致被丟棄的任務(wù)無法再次被執(zhí)行 -->
<!-- DiscardPolicy:拋棄當(dāng)前任務(wù)、暫不支持;會(huì)導(dǎo)致被丟棄的任務(wù)無法再次被執(zhí)行 -->
<bean class="java.util.concurrent.ThreadPoolExecutor$CallerRunsPolicy"/>
</property>
</bean>
<!-- 動(dòng)態(tài)線程池 -->
<bean id="submitOrderThreadPoolTaskExecutor" class="com.jd.concurrent.DynamicThreadPoolTaskExecutor">
<!-- 核心線程數(shù),默認(rèn)為 -->
<property name="corePoolSize" value="32"/>
<!-- 最大線程數(shù),默認(rèn)為Integer.MAX_VALUE -->
<property name="maxPoolSize" value="128"/>
<!-- 隊(duì)列最大長(zhǎng)度,一般需要設(shè)置值>=notifyScheduledMainExecutor.maxNum;默認(rèn)為Integer.MAX_VALUE -->
<property name="queueCapacity" value="500"/>
<!-- 線程池維護(hù)線程所允許的空閑時(shí)間,默認(rèn)為60s -->
<property name="keepAliveSeconds" value="60"/>
<!-- 線程池對(duì)拒絕任務(wù)(無線程可用)的處理策略,目前只支持AbortPolicy、CallerRunsPolicy;默認(rèn)為后者 -->
<property name="rejectedExecutionHandler">
<!-- AbortPolicy:直接拋出java.util.concurrent.RejectedExecutionException異常 -->
<!-- CallerRunsPolicy:主線程直接執(zhí)行該任務(wù),執(zhí)行完之后嘗試添加下一個(gè)任務(wù)到線程池中,可以有效降低向線程池內(nèi)添加任務(wù)的速度 -->
<!-- DiscardOldestPolicy:拋棄舊的任務(wù)、暫不支持;會(huì)導(dǎo)致被丟棄的任務(wù)無法再次被執(zhí)行 -->
<!-- DiscardPolicy:拋棄當(dāng)前任務(wù)、暫不支持;會(huì)導(dǎo)致被丟棄的任務(wù)無法再次被執(zhí)行 -->
<bean class="java.util.concurrent.ThreadPoolExecutor$CallerRunsPolicy"/>
</property>
</bean>
<!-- 動(dòng)態(tài)線程池刷新配置 -->
<bean class="com.jd.concurrent.DynamicThreadPoolPostProcessor"/>
<bean class="com.jd.concurrent.DynamicThreadPoolRefresh"/>

業(yè)務(wù)類注入Spring Bean后,直接使用即可

@Resource
private ThreadPoolTaskExecutor submitOrderThreadPoolTaskExecutor;


Runnable asyncTask = ()->{...};
CompletableFuture.runAsync(asyncTask, this.submitOrderThreadPoolTaskExecutor);

4.小結(jié)

本文從實(shí)際項(xiàng)目的業(yè)務(wù)痛點(diǎn)場(chǎng)景出發(fā),并基于公司已有的ducc配置平臺(tái)簡(jiǎn)單實(shí)現(xiàn)了線程池線程數(shù)量可配置。?

責(zé)任編輯:武曉燕 來源: 今日頭條
相關(guān)推薦

2022-12-30 08:29:07

Nacos動(dòng)態(tài)化線程池

2024-11-08 14:11:09

2024-02-04 09:19:00

Nacos動(dòng)態(tài)化線程池

2022-03-09 09:43:01

工具類線程項(xiàng)目

2020-10-27 13:24:35

線程池系統(tǒng)模型

2023-04-19 13:18:41

動(dòng)態(tài)線程池平臺(tái)

2022-08-29 09:06:43

hippo4j動(dòng)態(tài)線程池

2021-10-27 06:49:34

線程池Core函數(shù)

2021-05-27 09:50:03

連接池FTP服務(wù)器

2021-07-31 22:20:00

線程池系統(tǒng)參數(shù)

2022-02-10 11:43:54

DUBBO線程池QPS

2021-04-18 07:12:08

Dubbo線程池

2022-09-08 06:23:37

C++HTTP 服務(wù)器

2025-01-09 11:24:59

線程池美團(tuán)動(dòng)態(tài)配置中心

2024-12-10 00:00:25

2021-03-29 08:47:24

線程面試官線程池

2025-02-28 08:46:24

框架微服務(wù)架構(gòu)

2022-09-06 08:31:09

線程池工具系統(tǒng)

2019-12-11 10:45:08

Python 開發(fā)編程語言

2021-03-31 13:28:17

開源工具Python編程語言
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)