使用Spring Boot + Quartz 實(shí)現(xiàn)分布式定時(shí)任務(wù)平臺(tái)
本文將從項(xiàng)目實(shí)戰(zhàn)出發(fā)來(lái)介紹分布式定時(shí)任務(wù)的實(shí)現(xiàn)。在某些應(yīng)用場(chǎng)景下要求任務(wù)必須具備高可用性和可擴(kuò)展性,單臺(tái)服務(wù)器不能滿(mǎn)足業(yè)務(wù)需求,這時(shí)就需要使用Quartz實(shí)現(xiàn)分布式定時(shí)任務(wù)。
一、分布式任務(wù)應(yīng)用場(chǎng)景
定時(shí)任務(wù)系統(tǒng)在應(yīng)用平臺(tái)中的重要性不言而喻,特別是互聯(lián)網(wǎng)電商、金融等行業(yè)更是離不開(kāi)定時(shí)任務(wù)。在任務(wù)數(shù)量不多、執(zhí)行頻率不高時(shí),單臺(tái)服務(wù)器完全能夠滿(mǎn)足。
但是隨著業(yè)務(wù)逐漸增加,定時(shí)任務(wù)系統(tǒng)必須具備高可用和水平擴(kuò)展的能力,單臺(tái)服務(wù)器已經(jīng)不能滿(mǎn)足需求。因此需要把定時(shí)任務(wù)系統(tǒng)部署到集群中,實(shí)現(xiàn)分布式定時(shí)任務(wù)系統(tǒng)集群。
Quartz的集群功能通過(guò)故障轉(zhuǎn)移和負(fù)載平衡功能為調(diào)度程序帶來(lái)高可用性和可擴(kuò)展性。
Quartz是通過(guò)數(shù)據(jù)庫(kù)表來(lái)存儲(chǔ)和共享任務(wù)信息的。獨(dú)立的Quartz節(jié)點(diǎn)并不與另一個(gè)節(jié)點(diǎn)或者管理節(jié)點(diǎn)通信,而是通過(guò)數(shù)據(jù)庫(kù)鎖機(jī)制來(lái)調(diào)度執(zhí)行定時(shí)任務(wù)。
需要注意的是,在集群環(huán)境下,時(shí)鐘必須同步,否則執(zhí)行時(shí)間不一致。
二、Quartz實(shí)現(xiàn)分布式定時(shí)任務(wù)
1. 添加Quartz依賴(lài)
首先,引入Quartz中提供分布式處理的JAR包以及數(shù)據(jù)庫(kù)和連接相關(guān)的依賴(lài)。示例代碼如下:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-quartz</artifactId>
</dependency>
<!-- mysql -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
</dependency>
<!-- orm -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>
在上面的示例中,除了添加Quartz依賴(lài)外,還需要添加mysql-connector-java和
spring-boot-starter-data-jpa兩個(gè)組件,這兩個(gè)組件主要用于JOB持久化到MySQL數(shù)據(jù)庫(kù)。
2. 初始化Quartz數(shù)據(jù)庫(kù)
分布式Quartz定時(shí)任務(wù)的配置信息存儲(chǔ)在數(shù)據(jù)庫(kù)中,數(shù)據(jù)庫(kù)初始化腳本可以在官方網(wǎng)站中查找,默認(rèn)保存在quartz-2.2.3-distribution\src\org\quartz\impl\jdbcjobstore\tables-mysql.sql目錄下。首先創(chuàng)建quartz_jobs數(shù)據(jù)庫(kù),然后在數(shù)據(jù)庫(kù)中執(zhí)行tables-mysql.sql初始化腳本。具體示例如下:
DROP TABLE IF EXISTS QRTZ_FIRED_TRIGGERS;
DROP TABLE IF EXISTS QRTZ_PAUSED_TRIGGER_GRPS;
DROP TABLE IF EXISTS QRTZ_SCHEDULER_STATE;
DROP TABLE IF EXISTS QRTZ_LOCKS;
DROP TABLE IF EXISTS QRTZ_SIMPLE_TRIGGERS;
DROP TABLE IF EXISTS QRTZ_SIMPROP_TRIGGERS;
DROP TABLE IF EXISTS QRTZ_CRON_TRIGGERS;
DROP TABLE IF EXISTS QRTZ_BLOB_TRIGGERS;
DROP TABLE IF EXISTS QRTZ_TRIGGERS;
DROP TABLE IF EXISTS QRTZ_JOB_DETAILS;
DROP TABLE IF EXISTS QRTZ_CALENDARS;
CREATE TABLE QRTZ_JOB_DETAILS
(
SCHED_NAME VARCHAR(120) NOT NULL,
JOB_NAME VARCHAR(200) NOT NULL,
JOB_GROUP VARCHAR(200) NOT NULL,
DESCRIPTION VARCHAR(250) NULL,
JOB_CLASS_NAME VARCHAR(250) NOT NULL,
IS_DURABLE VARCHAR(1) NOT NULL,
IS_NONCONCURRENT VARCHAR(1) NOT NULL,
IS_UPDATE_DATA VARCHAR(1) NOT NULL,
REQUESTS_RECOVERY VARCHAR(1) NOT NULL,
JOB_DATA BLOB NULL,
PRIMARY KEY (SCHED_NAME,JOB_NAME,JOB_GROUP)
);
CREATE TABLE QRTZ_TRIGGERS
(
SCHED_NAME VARCHAR(120) NOT NULL,
TRIGGER_NAME VARCHAR(200) NOT NULL,
TRIGGER_GROUP VARCHAR(200) NOT NULL,
JOB_NAME VARCHAR(200) NOT NULL,
JOB_GROUP VARCHAR(200) NOT NULL,
DESCRIPTION VARCHAR(250) NULL,
NEXT_FIRE_TIME BIGINT(13) NULL,
PREV_FIRE_TIME BIGINT(13) NULL,
PRIORITY INTEGER NULL,
TRIGGER_STATE VARCHAR(16) NOT NULL,
TRIGGER_TYPE VARCHAR(8) NOT NULL,
START_TIME BIGINT(13) NOT NULL,
END_TIME BIGINT(13) NULL,
CALENDAR_NAME VARCHAR(200) NULL,
MISFIRE_INSTR SMALLINT(2) NULL,
JOB_DATA BLOB NULL,
PRIMARY KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP),
FOREIGN KEY (SCHED_NAME,JOB_NAME,JOB_GROUP)
REFERENCES QRTZ_JOB_DETAILS(SCHED_NAME,JOB_NAME,JOB_GROUP)
);
CREATE TABLE QRTZ_SIMPLE_TRIGGERS
(
SCHED_NAME VARCHAR(120) NOT NULL,
TRIGGER_NAME VARCHAR(200) NOT NULL,
TRIGGER_GROUP VARCHAR(200) NOT NULL,
REPEAT_COUNT BIGINT(7) NOT NULL,
REPEAT_INTERVAL BIGINT(12) NOT NULL,
TIMES_TRIGGERED BIGINT(10) NOT NULL,
PRIMARY KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP),
FOREIGN KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP)
REFERENCES QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP)
);
CREATE TABLE QRTZ_CRON_TRIGGERS
(
SCHED_NAME VARCHAR(120) NOT NULL,
TRIGGER_NAME VARCHAR(200) NOT NULL,
TRIGGER_GROUP VARCHAR(200) NOT NULL,
CRON_EXPRESSION VARCHAR(200) NOT NULL,
TIME_ZONE_ID VARCHAR(80),
PRIMARY KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP),
FOREIGN KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP)
REFERENCES QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP)
);
CREATE TABLE QRTZ_SIMPROP_TRIGGERS
(
SCHED_NAME VARCHAR(120) NOT NULL,
TRIGGER_NAME VARCHAR(200) NOT NULL,
TRIGGER_GROUP VARCHAR(200) NOT NULL,
STR_PROP_1 VARCHAR(512) NULL,
STR_PROP_2 VARCHAR(512) NULL,
STR_PROP_3 VARCHAR(512) NULL,
INT_PROP_1 INT NULL,
INT_PROP_2 INT NULL,
LONG_PROP_1 BIGINT NULL,
LONG_PROP_2 BIGINT NULL,
DEC_PROP_1 NUMERIC(13,4) NULL,
DEC_PROP_2 NUMERIC(13,4) NULL,
BOOL_PROP_1 VARCHAR(1) NULL,
BOOL_PROP_2 VARCHAR(1) NULL,
PRIMARY KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP),
FOREIGN KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP)
REFERENCES QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP)
);
CREATE TABLE QRTZ_BLOB_TRIGGERS
(
SCHED_NAME VARCHAR(120) NOT NULL,
TRIGGER_NAME VARCHAR(200) NOT NULL,
TRIGGER_GROUP VARCHAR(200) NOT NULL,
BLOB_DATA BLOB NULL,
PRIMARY KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP),
FOREIGN KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP)
REFERENCES QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP)
);
CREATE TABLE QRTZ_CALENDARS
(
SCHED_NAME VARCHAR(120) NOT NULL,
CALENDAR_NAME VARCHAR(200) NOT NULL,
CALENDAR BLOB NOT NULL,
PRIMARY KEY (SCHED_NAME,CALENDAR_NAME)
);
CREATE TABLE QRTZ_PAUSED_TRIGGER_GRPS
(
SCHED_NAME VARCHAR(120) NOT NULL,
TRIGGER_GROUP VARCHAR(200) NOT NULL,
PRIMARY KEY (SCHED_NAME,TRIGGER_GROUP)
);
CREATE TABLE QRTZ_FIRED_TRIGGERS
(
SCHED_NAME VARCHAR(120) NOT NULL,
ENTRY_ID VARCHAR(95) NOT NULL,
TRIGGER_NAME VARCHAR(200) NOT NULL,
TRIGGER_GROUP VARCHAR(200) NOT NULL,
INSTANCE_NAME VARCHAR(200) NOT NULL,
FIRED_TIME BIGINT(13) NOT NULL,
SCHED_TIME BIGINT(13) NOT NULL,
PRIORITY INTEGER NOT NULL,
STATE VARCHAR(16) NOT NULL,
JOB_NAME VARCHAR(200) NULL,
JOB_GROUP VARCHAR(200) NULL,
IS_NONCONCURRENT VARCHAR(1) NULL,
REQUESTS_RECOVERY VARCHAR(1) NULL,
PRIMARY KEY (SCHED_NAME,ENTRY_ID)
);
CREATE TABLE QRTZ_SCHEDULER_STATE
(
SCHED_NAME VARCHAR(120) NOT NULL,
INSTANCE_NAME VARCHAR(200) NOT NULL,
LAST_CHECKIN_TIME BIGINT(13) NOT NULL,
CHECKIN_INTERVAL BIGINT(13) NOT NULL,
PRIMARY KEY (SCHED_NAME,INSTANCE_NAME)
);
CREATE TABLE QRTZ_LOCKS
(
SCHED_NAME VARCHAR(120) NOT NULL,
LOCK_NAME VARCHAR(40) NOT NULL,
PRIMARY KEY (SCHED_NAME,LOCK_NAME)
);
使用tables-mysql.sql創(chuàng)建表的語(yǔ)句執(zhí)行完成后,說(shuō)明Quartz的數(shù)據(jù)庫(kù)和表創(chuàng)建成功,我們查看數(shù)據(jù)庫(kù)的ER圖,如下圖所示。
3. 配置數(shù)據(jù)庫(kù)和Quartz
修改application.properties配置文件,配置數(shù)據(jù)庫(kù)與Quartz。具體操作如下:
# server.port=8090
# Quartz 數(shù)據(jù)庫(kù)
spring.datasource.url=jdbc:mysql://localhost:3306/quartz_jobs?useSSL=false&serverTimezone=UTC
spring.datasource.username=root
spring.datasource.password=root
spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver
spring.datasource.max-active=1000
spring.datasource.max-idle=20
spring.datasource.min-idle=5
spring.datasource.initial-size=10
# 是否使用properties作為數(shù)據(jù)存儲(chǔ)
org.quartz.jobStore.useProperties=false
# 數(shù)據(jù)庫(kù)中表的命名前綴
org.quartz.jobStore.tablePrefix=QRTZ_
# 是否是一個(gè)集群,是不是分布式的任務(wù)
org.quartz.jobStore.isClustered=true
# 集群檢查周期,單位為毫秒,可以自定義縮短時(shí)間。當(dāng)某一個(gè)節(jié)點(diǎn)宕機(jī)的時(shí)候,其他節(jié)點(diǎn)等待多久后開(kāi)始執(zhí)行任務(wù)
org.quartz.jobStore.clusterCheckinInterval=5000
# 單位為毫秒,集群中的節(jié)點(diǎn)退出后,再次檢查進(jìn)入的時(shí)間間隔
org.quartz.jobStore.misfireThreshold=60000
# 事務(wù)隔離級(jí)別
org.quartz.jobStore.txIsolationLevelReadCommitted=true
# 存儲(chǔ)的事務(wù)管理類(lèi)型
org.quartz.jobStore.class=org.quartz.impl.jdbcjobstore.JobStoreTX
# 使用的Delegate類(lèi)型
org.quartz.jobStore.driverDelegateClass=org.quartz.impl.jdbcjobstore.StdJDBCDelegate
# 集群的命名,一個(gè)集群要有相同的命名
org.quartz.scheduler.instanceName=ClusterQuartz
# 節(jié)點(diǎn)的命名,可以自定義。AUTO代表自動(dòng)生成
org.quartz.scheduler.instanceId=AUTO
# rmi遠(yuǎn)程協(xié)議是否發(fā)布
org.quartz.scheduler.rmi.export=false
# rmi遠(yuǎn)程協(xié)議代理是否創(chuàng)建
org.quartz.scheduler.rmi.proxy=false
# 是否使用用戶(hù)控制的事務(wù)環(huán)境觸發(fā)執(zhí)行任務(wù)
org.quartz.scheduler.wrapJobExecutionInUserTransaction=false
上面的配置主要是Quartz數(shù)據(jù)庫(kù)和Quartz分布式集群相關(guān)的屬性配置。分布式定時(shí)任務(wù)的配置存儲(chǔ)在數(shù)據(jù)庫(kù)中,所以需要配置數(shù)據(jù)庫(kù)連接和Quartz配置信息,為Quartz提供數(shù)據(jù)庫(kù)配置信息,如數(shù)據(jù)庫(kù)、數(shù)據(jù)表的前綴之類(lèi)。
4. 定義定時(shí)任務(wù)
后臺(tái)定時(shí)任務(wù)與普通Quartz任務(wù)并無(wú)差異,只是增加了@
PersistJobDataAfterExecution注解和@DisallowConcurrentExecution注解。創(chuàng)建QuartzJob定時(shí)任務(wù)類(lèi)并實(shí)現(xiàn)Quartz定時(shí)任務(wù)的具體示例代碼如下:
// 持久化
@PersistJobDataAfterExecution
// 禁止并發(fā)執(zhí)行
@DisallowConcurrentExecution
public class QuartzJob extends QuartzJobBean {
private static final Logger log = LoggerFactory.getLogger(QuartzJob.class);
@Override
protected void executeInternal(JobExecutionContext context) throws JobExecutionException {
String taskName = context.getJobDetail().getJobDataMap().getString("name");
log.info("---> Quartz job, time:{"+new Date()+"} ,name:{"+taskName+"}<----");
}
}
在上面的示例中,創(chuàng)建了QuartzJob定時(shí)任務(wù)類(lèi),使用@
PersistJobDataAfterExecution注解持久化任務(wù)信息。DisallowConcurrentExecution禁止并發(fā)執(zhí)行,避免同一個(gè)任務(wù)被多次并發(fā)執(zhí)行。
5. SchedulerConfig配置
創(chuàng)建SchedulerConfig配置類(lèi),初始化Quartz分布式集群相關(guān)配置,包括集群設(shè)置、數(shù)據(jù)庫(kù)等。示例代碼如下:
@Configuration
public class SchedulerConfig {
@Autowired
private DataSource dataSource;
/**
* 調(diào)度器
*
* @return
* @throws Exception
*/
@Bean
public Scheduler scheduler() throws Exception {
Scheduler scheduler = schedulerFactoryBean().getScheduler();
return scheduler;
}
/**
* Scheduler工廠(chǎng)類(lèi)
*
* @return
* @throws IOException
*/
@Bean
public SchedulerFactoryBean schedulerFactoryBean() throws IOException {
SchedulerFactoryBean factory = new SchedulerFactoryBean();
factory.setSchedulerName("Cluster_Scheduler");
factory.setDataSource(dataSource);
factory.setApplicationContextSchedulerContextKey("applicationContext");
factory.setTaskExecutor(schedulerThreadPool());
//factory.setQuartzProperties(quartzProperties());
factory.setStartupDelay(10);// 延遲10s執(zhí)行
return factory;
}
/**
* 配置Schedule線(xiàn)程池
*
* @return
*/
@Bean
public Executor schedulerThreadPool() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(Runtime.getRuntime().availableProcessors());
executor.setMaxPoolSize(Runtime.getRuntime().availableProcessors());
executor.setQueueCapacity(Runtime.getRuntime().availableProcessors());
return executor;
}
}
在上面的示例中,主要是配置Schedule線(xiàn)程池、配置Quartz數(shù)據(jù)庫(kù)、創(chuàng)建Schedule調(diào)度器實(shí)例等初始化配置。
6. 觸發(fā)定時(shí)任務(wù)
配置完成之后,還需要觸發(fā)定時(shí)任務(wù),創(chuàng)建JobStartupRunner類(lèi)以便在系統(tǒng)啟動(dòng)時(shí)觸發(fā)所有定時(shí)任務(wù)。示例代碼如下:
@Component
public class JobStartupRunner implements CommandLineRunner {
@Autowired
SchedulerConfig schedulerConfig;
private static String TRIGGER_GROUP_NAME = "test_trigger";
private static String JOB_GROUP_NAME = "test_job";
@Override
public void run(String... args) throws Exception {
Scheduler scheduler;
try {
scheduler = schedulerConfig.scheduler();
TriggerKey triggerKey = TriggerKey.triggerKey("trigger1", TRIGGER_GROUP_NAME);
CronTrigger trigger = (CronTrigger) scheduler.getTrigger(triggerKey);
if (null == trigger) {
Class clazz = QuartzJob.class;
JobDetail jobDetail = JobBuilder.newJob(clazz).withIdentity("job1", JOB_GROUP_NAME).usingJobData("name","weiz QuartzJob").build();
CronScheduleBuilder scheduleBuilder = CronScheduleBuilder.cronSchedule("0/10 * * * * ?");
trigger = TriggerBuilder.newTrigger().withIdentity("trigger1", TRIGGER_GROUP_NAME)
.withSchedule(scheduleBuilder).build();
scheduler.scheduleJob(jobDetail, trigger);
System.out.println("Quartz 創(chuàng)建了job:...:" + jobDetail.getKey());
} else {
System.out.println("job已存在:{}" + trigger.getKey());
}
TriggerKey triggerKey2 = TriggerKey.triggerKey("trigger2", TRIGGER_GROUP_NAME);
CronTrigger trigger2 = (CronTrigger) scheduler.getTrigger(triggerKey2);
if (null == trigger2) {
Class clazz = QuartzJob2.class;
JobDetail jobDetail2 = JobBuilder.newJob(clazz).withIdentity("job2", JOB_GROUP_NAME).usingJobData("name","weiz QuartzJob2").build();
CronScheduleBuilder scheduleBuilder = CronScheduleBuilder.cronSchedule("0/10 * * * * ?");
trigger2 = TriggerBuilder.newTrigger().withIdentity("trigger2", TRIGGER_GROUP_NAME)
.withSchedule(scheduleBuilder).build();
scheduler.scheduleJob(jobDetail2, trigger2);
System.out.println("Quartz 創(chuàng)建了job:...:{}" + jobDetail2.getKey());
} else {
System.out.println("job已存在:{}" + trigger2.getKey());
}
scheduler.start();
} catch (Exception e) {
System.out.println(e.getMessage());
}
}
}
在上面的示例中,為了適應(yīng)分布式集群,我們?cè)谙到y(tǒng)啟動(dòng)時(shí)觸發(fā)定時(shí)任務(wù),判斷任務(wù)是否已經(jīng)創(chuàng)建、是否正在執(zhí)行。如果集群中的其他示例已經(jīng)創(chuàng)建了任務(wù),則啟動(dòng)時(shí)無(wú)須觸發(fā)任務(wù)。
三、 驗(yàn)證測(cè)試
配置完成之后,接下來(lái)啟動(dòng)任務(wù),測(cè)試分布式任務(wù)配置是否成功。啟動(dòng)一個(gè)實(shí)例,可以看到定時(shí)任務(wù)執(zhí)行了,然后每10秒鐘打印輸出一次,如下圖所示。
接下來(lái),模擬分布式部署的情況。我們?cè)賳?dòng)一個(gè)測(cè)試程序?qū)嵗?,這樣就有兩個(gè)后臺(tái)定時(shí)任務(wù)實(shí)例。
實(shí)例1:
實(shí)例2:
從上面的日志中可以看到,Quartz Job和Quartz Job2交替地在兩個(gè)任務(wù)實(shí)例進(jìn)程中執(zhí)行,同一時(shí)刻同一個(gè)任務(wù)只有一個(gè)進(jìn)程在執(zhí)行,這說(shuō)明已經(jīng)達(dá)到了分布式后臺(tái)定時(shí)任務(wù)的效果。
接下來(lái),停止任務(wù)實(shí)例1,測(cè)試任務(wù)實(shí)例2是否會(huì)接管所有任務(wù)繼續(xù)執(zhí)行。如圖10-11所示,停止任務(wù)實(shí)例1后,任務(wù)實(shí)例2接管了所有的定時(shí)任務(wù)。這樣如果集群中的某個(gè)實(shí)例異常了,其他實(shí)例能夠接管所有的定時(shí)任務(wù),確保任務(wù)集群的穩(wěn)定運(yùn)行。
最后
以上,我們就把Spring Boot集成Quartz實(shí)現(xiàn)分布式定時(shí)任務(wù)的功能介紹完了。分布式定時(shí)任務(wù)在應(yīng)用開(kāi)發(fā)中非常重要的功能模塊,希望大家能夠熟練掌握。