自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

自己開發(fā)一個(gè)分布式的 Xxl-Job 任務(wù)調(diào)度組件

開發(fā) 前端
如果我們的任務(wù)是比較大型的,比如;定時(shí)跑批T+1結(jié)算、商品秒殺前狀態(tài)變更、刷新數(shù)據(jù)預(yù)熱到緩存等等,這些定時(shí)任務(wù)都相同的特點(diǎn);作業(yè)量大、實(shí)時(shí)性強(qiáng)、可用率高。而這時(shí)候如果只是單純使用Schedule就顯得不足以控制。
前言
@SpringBootApplication
@EnableScheduling
public class Application{
public static void mian(String[] args){
SpringApplication.run(Application.class,args);
}

@Scheduled(cron = "0/3 * * * * *")
public void demoTask() {
//...
}
}
  • 咔咔,上面這段代碼很熟悉吧,他就是SpringBoot的Schedule定時(shí)任務(wù),簡(jiǎn)單易用。在我們開發(fā)中如果需要做一些定時(shí)或指定時(shí)刻循環(huán)執(zhí)行邏輯時(shí)候,基本都會(huì)使用到Schedule。
  • 但是,如果我們的任務(wù)是比較大型的,比如;定時(shí)跑批T+1結(jié)算、商品秒殺前狀態(tài)變更、刷新數(shù)據(jù)預(yù)熱到緩存等等,這些定時(shí)任務(wù)都相同的特點(diǎn);作業(yè)量大、實(shí)時(shí)性強(qiáng)、可用率高。而這時(shí)候如果只是單純使用Schedule就顯得不足以控制。
  • 那么,我們產(chǎn)品需求就出來(lái)了,分布式DcsSchedule任務(wù);
  • 多機(jī)器部署任務(wù)
  • 統(tǒng)一控制中心啟停
  • 宕機(jī)災(zāi)備,自動(dòng)啟動(dòng)執(zhí)行
  • 實(shí)時(shí)檢測(cè)任務(wù)執(zhí)行信息:部署數(shù)量、任務(wù)總量、成功次數(shù)、失敗次數(shù)、執(zhí)行耗時(shí)等
  • 嗯?有人憋半天了想說(shuō)可以用Quertz,嗯可以的,但這不是本篇文章的重點(diǎn)。難道你不想看看一個(gè)自言開源中間件是怎么誕生的嗎,怎么推到中心Maven倉(cāng)的嗎?比如下圖;真香不!
  • 首頁(yè)監(jiān)控

  • 任務(wù)列表

  • 好了,接下來(lái)開始介紹這個(gè)中間件如何使用和怎么開發(fā)的了!
  • 中間件使用
  • 1. 版本記錄
  • 版本 發(fā)布日期 備注 1 1.0.0-RELEASE 2019-12-07 基本功能實(shí)現(xiàn);任務(wù)接入、分布式啟停 1.0.1-RELEASE 2019-12-07 上傳測(cè)試版本
  • 2. 環(huán)境準(zhǔn)備
  • jdk1.8
  • StringBoot 2.x
  • 配置中心zookeeper 3.4.14 {準(zhǔn)備好zookeeper服務(wù),如果windows調(diào)試可以從這里下載:https://www-eu.apache.org/dist/zookeeper}
  • 下載后解壓,在bin同級(jí)路徑創(chuàng)建文件夾data、logs
  • 修改conf/zoo.cfg,修改配置如下;dataDir=D:\\Program Files\\apache-zookeeper-3.4.14\\data
    dataLogDir=D:\\Program Files\\apache-zookeeper-3.4.14\\logs
  • 打包部署控制平臺(tái)
  • 下載地址:https://git***.com/fuzhengwei/itstack-middleware-control.git
  • 部署訪問(wèn):http://localhost:7397
  • 3. 配置POM
<dependency>
<groupId>org.itstack.middleware</groupId>
<artifactId>schedule-spring-boot-starter</artifactId>
<version>1.0.0-RELEASE</version>
</dependency>
  • 4. 引入分布式任務(wù)DcsSchedule @EnableDcsScheduling
?與SpringBoot的Sceduling非常像,他的注解是;@EnableScheduling,盡可能降低使用難度
  • 這個(gè)注解主要方便?給我們自己的中間件一個(gè)入口,也是扒拉源碼發(fā)現(xiàn)的可以這么干{我一直說(shuō)好的代碼都很騷氣}
@SpringBootApplication
@EnableDcsScheduling
public class HelloWorldApplication {

public static void main(String[] args) {
SpringApplication.run(HelloWorldApplication.class, args);
}

}

5. 在任務(wù)方法上添加注解

  • 這個(gè)注解也和SpringBoot的Schedule很像,但是多了desc描述和啟停初始化控制
  • cron:執(zhí)行計(jì)劃
  • desc:任務(wù)描述
  • autoStartup:默認(rèn)啟動(dòng)狀態(tài)
  • 如果你的任務(wù)需要參數(shù)可以通過(guò)引入service去調(diào)用獲取等方式都可以
@Component("demoTaskThree")
public class DemoTaskThree {

@DcsScheduled(cron = "0 0 9,13 * * *", desc = "03定時(shí)任務(wù)執(zhí)行測(cè)試:taskMethod01", autoStartup = false)
public void taskMethod01(){
System.out.println("03定時(shí)任務(wù)執(zhí)行測(cè)試:taskMethod01");
}

@DcsScheduled(cron = "0 0/30 8-10 * * *", desc = "03定時(shí)任務(wù)執(zhí)行測(cè)試:taskMethod02", autoStartup = false)
public void taskMethod02(){
System.out.println("03定時(shí)任務(wù)執(zhí)行測(cè)試:taskMethod02");
}

}

6. 啟動(dòng)驗(yàn)證

  1. 啟動(dòng)SpringBoot工程即可,autoStartup = true的會(huì)自動(dòng)啟動(dòng)任務(wù)(任務(wù)是多線程并行執(zhí)行的)
  2. 啟動(dòng)控制平臺(tái):itstack-middleware-control,訪問(wèn):http://localhost:7397/ 成功界面如下;可以開啟/關(guān)閉驗(yàn)證了!{功能還在完善}

中間件開發(fā)

以SpringBoot為基礎(chǔ)開發(fā)一款中間件我也是第一次,因?yàn)榻佑|SpringBoot也剛剛1個(gè)月左右。雖然SpringBoot已經(jīng)出來(lái)挺久的了,但由于我們項(xiàng)目開發(fā)并不使用SpringBoot的一套東西,所以一直依賴沒有接觸。直到上個(gè)月開始考慮領(lǐng)域驅(qū)動(dòng)設(shè)計(jì)才接觸,嗯!真的不錯(cuò),那么就開始了夯實(shí)技能、學(xué)習(xí)思想用到項(xiàng)目里。

按照我的產(chǎn)品需求,開發(fā)這么一款分布式任務(wù)的中間件,我腦袋中的模型已經(jīng)存在了。另外就是需要開發(fā)過(guò)程中去探索我需要的知識(shí)工具,簡(jiǎn)單包括;

  1. 讀取Yml自定義配置
  2. 使用zookeeper作為配置中心,這樣如果有機(jī)器宕機(jī)了就可以通過(guò)臨時(shí)節(jié)點(diǎn)監(jiān)聽知道
  3. 通過(guò)Spring類;ApplicationContextAware, BeanPostProcessor, ApplicationListener,執(zhí)行服務(wù)啟動(dòng)、注解掃描、節(jié)點(diǎn)掛在
  4. 分布式任務(wù)統(tǒng)一控制臺(tái),來(lái)管理任務(wù)

1. 工程模型

schedule-spring-boot-starter
└── src
├── main
│ ├── java
│ │ └── org.itstack.middleware.schedule
│ │ ├── annotation
│ │ │ ├── DcsScheduled.java
│ │ │ └── EnableDcsScheduling.java
│ │ ├── annotation
│ │ │ └── InstructStatus.java
│ │ ├── config
│ │ │ ├── DcsSchedulingConfiguration.java
│ │ │ ├── StarterAutoConfig.java
│ │ │ └── StarterServiceProperties.java
│ │ ├── domain
│ │ │ ├── DataCollect.java
│ │ │ ├── DcsScheduleInfo.java
│ │ │ ├── DcsServerNode.java
│ │ │ ├── ExecOrder.java
│ │ │ └── Instruct.java
│ │ ├── export
│ │ │ └── DcsScheduleResource.java
│ │ ├── service
│ │ │ ├── HeartbeatService.java
│ │ │ └── ZkCuratorServer.java
│ │ ├── task
│ │ │ ├── TaskScheduler.java
│ │ │ ├── ScheduledTask.java
│ │ │ ├── SchedulingConfig.java
│ │ │ └── SchedulingRunnable.java
│ │ ├── util
│ │ │ └── StrUtil.java
│ │ └── DoJoinPoint.java
│ └── resources
│ └── META_INF
│ └── spring.factories
└── test
└── java
└── org.itstack.demo.test
└── ApiTest.java

2. 代碼講解

  1. 篇幅較長(zhǎng),只講解部分重點(diǎn)代碼塊,如果你愿意參與到開源編寫,可以和我申請(qǐng)
  2. 我說(shuō)過(guò)好的代碼都很騷氣,那么就從這部分入手吧

2.1 自定義注解

annotation/EnableDcsScheduling.java & 自定義注解

這個(gè)注解一堆的圈A,這些配置都是為了開始啟動(dòng)執(zhí)行我們的中間件;

  • Target 標(biāo)識(shí)需要放到類上執(zhí)行
  • Retention 注釋將由編譯器記錄在類文件中,并且在運(yùn)行時(shí)由VM保留,因此可以反射地讀取它們
  • Import 引入入口資源,在程序啟動(dòng)時(shí)會(huì)執(zhí)行到自己定義的類中,以方便我們;初始化配置/服務(wù)、啟動(dòng)任務(wù)、掛在節(jié)點(diǎn)
  • ComponentScan 告訴程序掃描位置
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Import({DcsSchedulingConfiguration.class})
@ImportAutoConfiguration({SchedulingConfig.class, CronTaskRegister.class, DoJoinPoint.class})
@ComponentScan("org.itstack.middleware.*")
public @interface EnableDcsScheduling {
}

2.2 掃描自定義注解、初始化配置/服務(wù)、啟動(dòng)任務(wù)、掛在節(jié)點(diǎn)

config/DcsSchedulingConfiguration.java & 初始化配置/服務(wù)、啟動(dòng)任務(wù)、掛在節(jié)點(diǎn)

  • 寫到這的時(shí)候,我們的自定義注解有了,已經(jīng)寫到方法上了,那么我們?cè)趺茨玫侥兀?/span>
  • 需要通過(guò)實(shí)現(xiàn)BeanPostProcessor.postProcessAfterInitialization,在每個(gè)bean實(shí)例化的時(shí)候進(jìn)行掃描
  • 這里遇到一個(gè)有趣的問(wèn)題,一個(gè)方法會(huì)得到兩次,因?yàn)橛幸粋€(gè)CGLIB給代理的,像真假美猴王一樣,幾乎一毛一樣。扒了源碼才看到,生命注解批注沒有。好那就可以判斷了!method.getDeclaredAnnotations()
  • 我們將掃描下來(lái)的任務(wù)信息匯總到Map中,當(dāng)Spring初始化完成后,在執(zhí)行我們中間件內(nèi)容。{太早執(zhí)行有點(diǎn)喧賓奪主了!主要人家也不讓呀,給你拋異常。}
@Override
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
Class<?> targetClass = AopProxyUtils.ultimateTargetClass(bean);
if (this.nonAnnotatedClasses.contains(targetClass)) return bean;
Method[] methods = ReflectionUtils.getAllDeclaredMethods(bean.getClass());
if (methods == null) return bean;
for (Method method : methods) {
DcsScheduled dcsScheduled = AnnotationUtils.findAnnotation(method, DcsScheduled.class);
if (null == dcsScheduled || 0 == method.getDeclaredAnnotations().length) continue;
List<ExecOrder> execOrderList = Constants.execOrderMap.computeIfAbsent(beanName, k -> new ArrayList<>());
ExecOrder execOrder = new ExecOrder();
execOrder.setBean(bean);
execOrder.setBeanName(beanName);
execOrder.setMethodName(method.getName());
execOrder.setDesc(dcsScheduled.desc());
execOrder.setCron(dcsScheduled.cron());
execOrder.setAutoStartup(dcsScheduled.autoStartup());
execOrderList.add(execOrder);
this.nonAnnotatedClasses.add(targetClass);
}
return bean;
}
  • 初始化服務(wù)連接zookeeper配置中心
  • 連接后將創(chuàng)建我們的節(jié)點(diǎn)以及添加監(jiān)聽,這個(gè)監(jiān)聽主要負(fù)責(zé)分布式消息通知,收到通知負(fù)責(zé)控制任務(wù)啟停
  • 這里包括了循環(huán)創(chuàng)建節(jié)點(diǎn)以及批量節(jié)點(diǎn)刪除,似乎!面試題會(huì)問(wèn)
private void init_server(ApplicationContext applicationContext) {
try {
//獲取zk連接
CuratorFramework client = ZkCuratorServer.getClient(Constants.Global.zkAddress);
//節(jié)點(diǎn)組裝
path_root_server = StrUtil.joinStr(path_root, LINE, "server", LINE, schedulerServerId);
path_root_server_ip = StrUtil.joinStr(path_root_server, LINE, "ip", LINE, Constants.Global.ip);
//創(chuàng)建節(jié)點(diǎn)&遞歸刪除本服務(wù)IP下的舊內(nèi)容
ZkCuratorServer.deletingChildrenIfNeeded(client, path_root_server_ip);
ZkCuratorServer.createNode(client, path_root_server_ip);
ZkCuratorServer.setData(client, path_root_server, schedulerServerName);
//添加節(jié)點(diǎn)&監(jiān)聽
ZkCuratorServer.createNodeSimple(client, Constants.Global.path_root_exec);
ZkCuratorServer.addTreeCacheListener(applicationContext, client, Constants.Global.path_root_exec);
} catch (Exception e) {
logger.error("itstack middleware schedule init server error!", e);
throw new RuntimeException(e);
}
}
  • 啟動(dòng)標(biāo)記了True的Schedule任務(wù)
  • Scheduled默認(rèn)是單線程執(zhí)行的,這里擴(kuò)展為多線程并行執(zhí)行
private void init_task(ApplicationContext applicationContext){
CronTaskRegister cronTaskRegistrar = applicationContext.getBean("itstack-middlware-schedule-cronTaskRegister", CronTaskRegister.class);
Set<String> beanNames = Constants.execOrderMap.keySet();
for (String beanName : beanNames) {
List<ExecOrder> execOrderList = Constants.execOrderMap.get(beanName);
for (ExecOrder execOrder : execOrderList) {
if (!execOrder.getAutoStartup()) continue;
SchedulingRunnable task = new SchedulingRunnable(execOrder.getBean(), execOrder.getBeanName(), execOrder.getMethodName());
cronTaskRegistrar.addCronTask(task, execOrder.getCron());
}
}
}
  • 掛在任務(wù)節(jié)點(diǎn)到zookeeper掛在
  • 按照不同的場(chǎng)景,有些內(nèi)容是掛在到虛擬機(jī)節(jié)點(diǎn)。{又來(lái)個(gè)面試題,虛擬節(jié)點(diǎn)數(shù)據(jù)怎么掛在,創(chuàng)建的是永久節(jié)點(diǎn),那么虛擬值怎么加?}
  • path_root_server_ip_clazz_method;這個(gè)結(jié)構(gòu)是:根目錄、服務(wù)、IP、類、方法
private void init_node() throws Exception {
Set<String> beanNames = Constants.execOrderMap.keySet();
for (String beanName : beanNames) {
List<ExecOrder> execOrderList = Constants.execOrderMap.get(beanName);
for (ExecOrder execOrder : execOrderList) {
String path_root_server_ip_clazz = StrUtil.joinStr(path_root_server_ip, LINE, "clazz", LINE, execOrder.getBeanName());
String path_root_server_ip_clazz_method = StrUtil.joinStr(path_root_server_ip_clazz, LINE, "method", LINE, execOrder.getMethodName());
String path_root_server_ip_clazz_method_status = StrUtil.joinStr(path_root_server_ip_clazz, LINE, "method", LINE, execOrder.getMethodName(), "/status");
//添加節(jié)點(diǎn)
ZkCuratorServer.createNodeSimple(client, path_root_server_ip_clazz);
ZkCuratorServer.createNodeSimple(client, path_root_server_ip_clazz_method);
ZkCuratorServer.createNodeSimple(client, path_root_server_ip_clazz_method_status);
//添加節(jié)點(diǎn)數(shù)據(jù)[臨時(shí)]
ZkCuratorServer.appendPersistentData(client, path_root_server_ip_clazz_method + "/value", JSON.toJSONString(execOrder));
//添加節(jié)點(diǎn)數(shù)據(jù)[永久]
ZkCuratorServer.setData(client, path_root_server_ip_clazz_method_status, execOrder.getAutoStartup() ? "1" : "0");
}
}
}

2.3 zookeeper控制服務(wù)

service/ZkCuratorServer.java & zk服務(wù)

  • 這里提供一個(gè)zk的方法集合,其中比較重要的方法添加監(jiān)聽
  • zookeeper有一個(gè)特性是對(duì)這個(gè)監(jiān)聽后,當(dāng)節(jié)點(diǎn)內(nèi)容發(fā)生變化時(shí)會(huì)收到通知,當(dāng)然宕機(jī)也是收得到的,這個(gè)也就是我們后面開發(fā)災(zāi)備的核心觸發(fā)點(diǎn)
public static void addTreeCacheListener(final ApplicationContext applicationContext, final CuratorFramework client, String path) throws Exception {
TreeCache treeCache = new TreeCache(client, path);
treeCache.start();
treeCache.getListenable().addListener((curatorFramework, event) -> {
//...
switch (event.getType()) {
case NODE_ADDED:
case NODE_UPDATED:
if (Constants.Global.ip.equals(instruct.getIp()) && Constants.Global.schedulerServerId.equals(instruct.getSchedulerServerId())) {
//執(zhí)行命令
Integer status = instruct.getStatus();
switch (status) {
case 0: //停止任務(wù)
cronTaskRegistrar.removeCronTask(instruct.getBeanName() + "_" + instruct.getMethodName());
setData(client, path_root_server_ip_clazz_method_status, "0");
logger.info("itstack middleware schedule task stop {} {}", instruct.getBeanName(), instruct.getMethodName());
break;
case 1: //啟動(dòng)任務(wù)
cronTaskRegistrar.addCronTask(new SchedulingRunnable(scheduleBean, instruct.getBeanName(), instruct.getMethodName()), instruct.getCron());
setData(client, path_root_server_ip_clazz_method_status, "1");
logger.info("itstack middleware schedule task start {} {}", instruct.getBeanName(), instruct.getMethodName());
break;
case 2: //刷新任務(wù)
cronTaskRegistrar.removeCronTask(instruct.getBeanName() + "_" + instruct.getMethodName());
cronTaskRegistrar.addCronTask(new SchedulingRunnable(scheduleBean, instruct.getBeanName(), instruct.getMethodName()), instruct.getCron());
setData(client, path_root_server_ip_clazz_method_status, "1");
logger.info("itstack middleware schedule task refresh {} {}", instruct.getBeanName(), instruct.getMethodName());
break;
}
}
break;
case NODE_REMOVED:
break;
default:
break;
}
});
}

2.4 并行任務(wù)注冊(cè)

  • 由于默認(rèn)的SpringBoot是單線程的,所以這里改造了下,可以支持多線程并行執(zhí)行
  • 包括了添加任務(wù)和刪除任務(wù),也就是執(zhí)行取消future.cancel(true)
public void addCronTask(SchedulingRunnable task, String cronExpression) {
if (null != Constants.scheduledTasks.get(task.taskId())) {
removeCronTask(task.taskId());
}
CronTask cronTask = new CronTask(task, cronExpression);
Constants.scheduledTasks.put(task.taskId(), scheduleCronTask(cronTask));
}
public void removeCronTask(String taskId) {
ScheduledTask scheduledTask = Constants.scheduledTasks.remove(taskId);
if (scheduledTask == null) return;
scheduledTask.cancel();
}

2.5 待擴(kuò)展的自定義AOP

  • 我們最開始配置的掃描@ComponentScan("org.itstack.middleware.*"),主要用到這里的自定義注解,否則是掃描不到的,也就是你自定義切面失效的效果
  • 目前這里的功能并沒有擴(kuò)展,基本只是打印執(zhí)行耗時(shí),后續(xù)完善的任務(wù)執(zhí)行耗時(shí)監(jiān)聽等,就需要這里來(lái)完善
@Pointcut("@annotation(org.itstack.middleware.schedule.annotation.DcsScheduled)")
public void aopPoint(){
}

@Around("aopPoint()")
public Object doRouter(ProceedingJoinPoint jp) throws Throwable {
long begin = System.currentTimeMillis();
Method method = getMethod(jp);
try {
return jp.proceed();
} finally {
long end = System.currentTimeMillis();
logger.info("\nitstack middleware schedule method:{}.{} take time(m):{}", jp.getTarget().getClass().getSimpleName(), method.getName(), (end - begin));
}
}

3. Jar包發(fā)布

開發(fā)完成后還是需要將Jar包發(fā)布到manven中心倉(cāng)庫(kù)的,這個(gè)過(guò)程較長(zhǎng)單獨(dú)寫了博客;發(fā)布Jar包到Maven中央倉(cāng)庫(kù)(為開發(fā)開源中間件做準(zhǔn)備)

綜上總結(jié)

  1. 要開發(fā)要實(shí)現(xiàn)的還很多,一個(gè)周末也干不完所有的!而且需要有想法的小猿/媛伴一起加入!
  2. 這里沒有講解分布式任務(wù)中間件控制平臺(tái)itstack-middleware-control,因?yàn)楸容^簡(jiǎn)單只是使用了中間件的zk功能接口做展示和操作。
  3. 中間件開發(fā)是一件非常有意思的事情,不同于業(yè)務(wù)它更像易筋經(jīng),寺廟老僧,劍走偏鋒,馳騁縱橫,騷招滿屏。
責(zé)任編輯:武曉燕 來(lái)源: 今日頭條
相關(guān)推薦

2022-01-27 08:44:58

調(diào)度系統(tǒng)開源

2023-01-04 09:23:58

2023-11-07 07:56:40

2023-11-22 10:07:22

2024-11-06 18:01:15

分布式任務(wù)調(diào)度組件

2021-12-26 00:03:27

響應(yīng)式編程異步

2022-09-23 13:57:11

xxl-job任務(wù)調(diào)度中間件

2019-07-19 15:51:11

框架選型分布式

2024-08-07 08:15:47

2020-07-17 09:33:39

CPU內(nèi)存調(diào)度

2024-08-27 09:34:24

2024-09-09 08:11:12

2020-09-29 19:20:05

鴻蒙

2023-06-26 00:14:28

Openjob分布式任務(wù)

2022-03-26 17:13:22

ElasticJobxxl-job分布式

2025-01-06 08:53:37

2021-11-10 16:10:18

鴻蒙HarmonyOS應(yīng)用

2023-05-08 16:38:46

任務(wù)調(diào)度分布式任務(wù)調(diào)度

2020-11-06 12:12:35

HarmonyOS

2024-07-31 08:18:40

點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)