自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

SpringBoot 整合 Elastic-Job 實(shí)現(xiàn)任務(wù)分布式調(diào)度,實(shí)戰(zhàn)講解!

開(kāi)發(fā) 前端
在分布式環(huán)境環(huán)境下,elastic-job-lite支持的彈性擴(kuò)容、任務(wù)分片是最大的亮點(diǎn),在實(shí)際使用的時(shí)候,任務(wù)分片總數(shù)盡可能大于服務(wù)實(shí)例個(gè)數(shù),并且是倍數(shù)關(guān)系,這樣任務(wù)在分片的時(shí)候,會(huì)更加均勻!

一、背景介紹

在前幾篇文章中,我們?cè)敿?xì)的介紹了 Quartz 的架構(gòu)原理以及應(yīng)用實(shí)踐,雖然 Quartz 也可以通過(guò)集群方式來(lái)保證服務(wù)高可用,但是它也有一個(gè)的弊端,那就是服務(wù)節(jié)點(diǎn)數(shù)量的增加,并不能提升任務(wù)的執(zhí)行效率,即不能實(shí)現(xiàn)水平擴(kuò)展!

之所以產(chǎn)生這樣的結(jié)果,是因?yàn)?Quartz 在分布式集群環(huán)境下是通過(guò)數(shù)據(jù)庫(kù)鎖方式來(lái)實(shí)現(xiàn)有且只有一個(gè)有效的服務(wù)節(jié)點(diǎn)來(lái)運(yùn)行服務(wù),從而保證服務(wù)在集群環(huán)境下定時(shí)任務(wù)不會(huì)被重復(fù)調(diào)用!

如果需要運(yùn)行的定時(shí)任務(wù)很少的話,使用 Quartz 不會(huì)有太大的問(wèn)題,但是如果 現(xiàn)在有這么一個(gè)需求,例如理財(cái)產(chǎn)品,每天6點(diǎn)系統(tǒng)需要計(jì)算每個(gè)賬戶昨天的收益,假如這個(gè)理財(cái)產(chǎn)品,有幾個(gè)億的用戶,如果都在一個(gè)服務(wù)實(shí)例上跑,可能第二天都無(wú)法處理完這項(xiàng)任務(wù)!

類似這樣場(chǎng)景還有很多很多,很顯然 Quartz 很難滿足我們這種大批量、任務(wù)執(zhí)行周期長(zhǎng)的任務(wù)調(diào)度!

因此短板,當(dāng)當(dāng)網(wǎng)基于 Quartz 開(kāi)發(fā)了一套適合在分布式環(huán)境下能高效率的使用服務(wù)器資源的 Elastic-Job 定時(shí)任務(wù)框架!

Elastic-Job-Lite最大的亮點(diǎn)就是支持彈性擴(kuò)容縮容,怎么實(shí)現(xiàn)的呢?

比如現(xiàn)在有個(gè)任務(wù)要執(zhí)行,如果將任務(wù)進(jìn)行分片成10個(gè),那么可以同時(shí)在10個(gè)服務(wù)實(shí)例上并行執(zhí)行,互相不影響,從而大大的提升了任務(wù)執(zhí)行效率,并且充分的利用服務(wù)器資源!

對(duì)于上面的理財(cái)產(chǎn)品,如果這個(gè)任務(wù)需要處理1個(gè)億用戶,那么我們可以通過(guò)水平擴(kuò)展,比如對(duì)任務(wù)進(jìn)行分片為500,讓500個(gè)服務(wù)實(shí)例同時(shí)運(yùn)行,每個(gè)服務(wù)實(shí)例處理20萬(wàn)條數(shù)據(jù),不出意外的話,1 - 2個(gè)小時(shí)可以全部跑完,如果時(shí)間還是很長(zhǎng),還可以繼續(xù)水平擴(kuò)張,添加服務(wù)實(shí)例來(lái)運(yùn)行!

2015 年,當(dāng)當(dāng)網(wǎng)將其開(kāi)源,瞬間吸引了一大批程序員的關(guān)注,同時(shí)登頂開(kāi)源中國(guó)第一名!

下面我們就一起來(lái)了解一下這款使用非常廣泛的分布式調(diào)度框架。

二、項(xiàng)目架構(gòu)介紹

Elastic-Job 最開(kāi)始只有一個(gè) elastic-job-core 的項(xiàng)目,定位輕量級(jí)、無(wú)中心化,最核心的服務(wù)就是支持彈性擴(kuò)容和數(shù)據(jù)分片!

從 2.X 版本以后,主要分為 Elastic-Job-Lite 和 Elastic-Job-Cloud 兩個(gè)子項(xiàng)目。

其中,Elastic-Job-Lite 定位為輕量級(jí) 無(wú) 中 心 化 解 決 方 案 , 使 用jar 包 的 形 式 提 供 分 布 式 任 務(wù) 的 協(xié) 調(diào) 服 務(wù) 。

而 Elastic-Job-Cloud 使用 Mesos + Docker 的解決方案,額外提供資源治理、應(yīng)用分發(fā)以及進(jìn)程隔離等服務(wù)(跟 Lite 的區(qū)別只是部署方式不同,他們使用相同的 API,只要開(kāi)發(fā)一次)。

今天我們主要介紹的是Elastic-Job-Lite,最主要的功能特性如下:

  • 分布式調(diào)度協(xié)調(diào):采用 zookeeper 實(shí)現(xiàn)注冊(cè)中心,進(jìn)行統(tǒng)一調(diào)度。
  • 支持任務(wù)分片:將需要執(zhí)行的任務(wù)進(jìn)行分片,實(shí)現(xiàn)并行調(diào)度。
  • 支持彈性擴(kuò)容縮容:將任務(wù)拆分為 n 個(gè)任務(wù)項(xiàng)后,各個(gè)服務(wù)器分別執(zhí)行各自分配到的任務(wù)項(xiàng)。一旦有新的服務(wù)器加入集群,或現(xiàn)有服務(wù)器下線,elastic-job 將在保留本次任務(wù)執(zhí)行不變的情況下,下次任務(wù)開(kāi)始前觸發(fā)任務(wù)重分片。

當(dāng)然,還有失效轉(zhuǎn)移、錯(cuò)過(guò)執(zhí)行作業(yè)重觸發(fā)等等功能,大家可以訪問(wèn)官網(wǎng)文檔,以獲取更多詳細(xì)資料。

應(yīng)用在各自的節(jié)點(diǎn)執(zhí)行任務(wù),通過(guò) zookeeper 注冊(cè)中心協(xié)調(diào)。節(jié)點(diǎn)注冊(cè)、節(jié)點(diǎn)選舉、任務(wù)分片、監(jiān)聽(tīng)都在 E-Job 的代碼中完成。下圖是官網(wǎng)提供得架構(gòu)圖。

圖片圖片

啥也不用多說(shuō)了,下面我們直接通過(guò)實(shí)踐介紹,更容易了解里面是怎么玩的!

三、應(yīng)用實(shí)踐

3.1、zookeeper 安裝

elastic-job-lite,是直接依賴 zookeeper 的,因此在開(kāi)發(fā)之前我們需要先準(zhǔn)備好對(duì)應(yīng)的 zookeeper 環(huán)境,關(guān)于 zookeeper 的安裝過(guò)程,就不多說(shuō)了,非常簡(jiǎn)單,網(wǎng)上都有教程!

3.2、elastic-job-lite-console 安裝

elastic-job-lite-console,主要是一個(gè)任務(wù)作業(yè)可視化界面管理系統(tǒng)。

可以單獨(dú)部署,與平臺(tái)不關(guān),主要是通過(guò)配置注冊(cè)中心和數(shù)據(jù)源來(lái)抓取數(shù)據(jù)。

獲取的方式也很簡(jiǎn)單,直接訪問(wèn)https://github.com/apache/shardingsphere-elasticjob地址,然后切換到2.1.5的版本號(hào),然后執(zhí)行mvn clean install進(jìn)行打包,獲取對(duì)應(yīng)的安裝包將其解壓,進(jìn)行bin文件夾啟動(dòng)服務(wù)即可!

圖片圖片

如果你的網(wǎng)速像蝸牛一樣的慢,還有一個(gè)辦法就是從這個(gè)地址https://gitee.com/elasticjob/elastic-job獲取對(duì)應(yīng)的源碼!

啟動(dòng)服務(wù)后,在瀏覽器訪問(wèn)http://127.0.0.1:8899,輸入賬戶、密碼(都是root)即可進(jìn)入控制臺(tái)頁(yè)面,類似如下界面!

圖片圖片

進(jìn)入之后,將上文所在的 zookeeper 注冊(cè)中心進(jìn)行配置,包括數(shù)據(jù)庫(kù) mysql 的數(shù)據(jù)源也可以配置一下!

3.3、創(chuàng)建工程

本文采用springboot來(lái)搭建工程為例,創(chuàng)建工程并添加elastic-job-lite依賴!

<!-- 引入elastic-job-lite核心模塊 -->
<dependency>
    <groupId>com.dangdang</groupId>
    <artifactId>elastic-job-lite-core</artifactId>
    <version>2.1.5</version>
</dependency>

<!-- 使用springframework自定義命名空間時(shí)引入 -->
<dependency>
    <groupId>com.dangdang</groupId>
    <artifactId>elastic-job-lite-spring</artifactId>
    <version>2.1.5</version>
</dependency>

在配置文件application.properties中提前配置好 zookeeper 注冊(cè)中心相關(guān)信息!

#zookeeper config
zookeeper.serverList=127.0.0.1:2181
zookeeper.namespace=example-elastic-job-test

3.4、新建 ZookeeperConfig 配置類

@Configuration
@ConditionalOnExpression("'${zookeeper.serverList}'.length() > 0")
public class ZookeeperConfig {

    /**
     * zookeeper 配置
     * @return
     */
    @Bean(initMethod = "init")
    public ZookeeperRegistryCenter zookeeperRegistryCenter(@Value("${zookeeper.serverList}") String serverList, 
                                                           @Value("${zookeeper.namespace}") String namespace){
        return new ZookeeperRegistryCenter(new ZookeeperConfiguration(serverList,namespace));
    }

}

3.5、新建任務(wù)處理類

elastic-job支持三種類型的作業(yè)任務(wù)處理!

  • Simple 類型作業(yè):Simple 類型用于一般任務(wù)的處理,只需實(shí)現(xiàn)SimpleJob接口。該接口僅提供單一方法用于覆蓋,此方法將定時(shí)執(zhí)行,與Quartz原生接口相似。
  • Dataflow 類型作業(yè):Dataflow 類型用于處理數(shù)據(jù)流,需實(shí)現(xiàn)DataflowJob接口。該接口提供2個(gè)方法可供覆蓋,分別用于抓取(fetchData)和處理(processData)數(shù)據(jù)。
  • Script類型作業(yè):Script 類型作業(yè)意為腳本類型作業(yè),支持 shell,python,perl等所有類型腳本。只需通過(guò)控制臺(tái)或代碼配置 scriptCommandLine 即可,無(wú)需編碼。執(zhí)行腳本路徑可包含參數(shù),參數(shù)傳遞完畢后,作業(yè)框架會(huì)自動(dòng)追加最后一個(gè)參數(shù)為作業(yè)運(yùn)行時(shí)信息。

3.6、新建 Simple 類型作業(yè)

編寫一個(gè)SimpleJob接口的實(shí)現(xiàn)類MySimpleJob,當(dāng)前工作主要是打印一條日志。

@Slf4j
public class MySimpleJob implements SimpleJob {

    @Override
    public void execute(ShardingContext shardingContext) {
        log.info(String.format("Thread ID: %s, 作業(yè)分片總數(shù): %s, " +
                        "當(dāng)前分片項(xiàng): %s.當(dāng)前參數(shù): %s," +
                        "作業(yè)名稱: %s.作業(yè)自定義參數(shù): %s"
                ,
                Thread.currentThread().getId(),
                shardingContext.getShardingTotalCount(),
                shardingContext.getShardingItem(),
                shardingContext.getShardingParameter(),
                shardingContext.getJobName(),
                shardingContext.getJobParameter()
        ));
    }
}

創(chuàng)建一個(gè)MyElasticJobListener任務(wù)監(jiān)聽(tīng)器,用于監(jiān)聽(tīng)MySimpleJob的任務(wù)執(zhí)行情況。

@Slf4j
public class MyElasticJobListener implements ElasticJobListener {

    private long beginTime = 0;

    @Override
    public void beforeJobExecuted(ShardingContexts shardingContexts) {
        beginTime = System.currentTimeMillis();
        log.info("===>{} MyElasticJobListener BEGIN TIME: {} <===",shardingContexts.getJobName(),  DateFormatUtils.format(new Date(), "yyyy-MM-dd HH:mm:ss"));
    }

    @Override
    public void afterJobExecuted(ShardingContexts shardingContexts) {
        long endTime = System.currentTimeMillis();
        log.info("===>{} MyElasticJobListener END TIME: {},TOTAL CAST: {} <===",shardingContexts.getJobName(), DateFormatUtils.format(new Date(), "yyyy-MM-dd HH:mm:ss"), endTime - beginTime);
    }

}

創(chuàng)建一個(gè)MySimpleJobConfig類,將MySimpleJob其注入到zookeeper。

@Configuration
public class MySimpleJobConfig {

    /**
     * 任務(wù)名稱
     */
    @Value("${simpleJob.mySimpleJob.name}")
    private String mySimpleJobName;

    /**
     * cron表達(dá)式
     */
    @Value("${simpleJob.mySimpleJob.cron}")
    private String mySimpleJobCron;

    /**
     * 作業(yè)分片總數(shù)
     */
    @Value("${simpleJob.mySimpleJob.shardingTotalCount}")
    private int mySimpleJobShardingTotalCount;

    /**
     * 作業(yè)分片參數(shù)
     */
    @Value("${simpleJob.mySimpleJob.shardingItemParameters}")
    private String mySimpleJobShardingItemParameters;

    /**
     * 自定義參數(shù)
     */
    @Value("${simpleJob.mySimpleJob.jobParameters}")
    private String mySimpleJobParameters;

    @Autowired
    private ZookeeperRegistryCenter registryCenter;

    @Bean
    public MySimpleJob mySimpleJob() {
        return new MySimpleJob();
    }

    @Bean(initMethod = "init")
    public JobScheduler simpleJobScheduler(final MySimpleJob mySimpleJob) {
        //配置任務(wù)監(jiān)聽(tīng)器
         MyElasticJobListener elasticJobListener = new MyElasticJobListener();
        return new SpringJobScheduler(mySimpleJob, registryCenter, getLiteJobConfiguration(), elasticJobListener);
    }

    private LiteJobConfiguration getLiteJobConfiguration() {
        // 定義作業(yè)核心配置
        JobCoreConfiguration simpleCoreConfig = JobCoreConfiguration.newBuilder(mySimpleJobName, mySimpleJobCron, mySimpleJobShardingTotalCount).
                shardingItemParameters(mySimpleJobShardingItemParameters).jobParameter(mySimpleJobParameters).build();
        // 定義SIMPLE類型配置
        SimpleJobConfiguration simpleJobConfig = new SimpleJobConfiguration(simpleCoreConfig, MySimpleJob.class.getCanonicalName());
        // 定義Lite作業(yè)根配置
        LiteJobConfiguration simpleJobRootConfig = LiteJobConfiguration.newBuilder(simpleJobConfig).overwrite(true).build();
        return simpleJobRootConfig;

    }
}

在配置文件application.properties中配置好對(duì)應(yīng)的mySimpleJob參數(shù)!

#elastic job
#simpleJob類型的job
simpleJob.mySimpleJob.name=mySimpleJob
simpleJob.mySimpleJob.crnotallow=0/15 * * * * ?
simpleJob.mySimpleJob.shardingTotalCount=3
simpleJob.mySimpleJob.shardingItemParameters=0=a,1=b,2=c
simpleJob.mySimpleJob.jobParameters=helloWorld

運(yùn)行程序,看看效果如何?

圖片圖片

圖片圖片

在上圖demo中,配置的分片數(shù)為3,這個(gè)時(shí)候會(huì)有3個(gè)線程進(jìn)行同時(shí)執(zhí)行任務(wù),因?yàn)槎际窃谝慌_(tái)機(jī)器上執(zhí)行的,這個(gè)任務(wù)被執(zhí)行來(lái)3次,下面修改一下端口配置,創(chuàng)建三個(gè)相同的服務(wù)實(shí)例,在看看效果如下:

圖片圖片

很清晰的看到任務(wù)被執(zhí)行一次!

3.7、新建 DataFlowJob 類型作業(yè)

DataFlowJob 類型的任務(wù)配置和SimpleJob類似,操作也很簡(jiǎn)單!

創(chuàng)建一個(gè)DataflowJob類型的實(shí)現(xiàn)類MyDataFlowJob。

@Slf4j
public class MyDataFlowJob implements DataflowJob<String> {

    private boolean flag = false;

    @Override
    public List<String> fetchData(ShardingContext shardingContext) {
        log.info("開(kāi)始獲取數(shù)據(jù)");
        if (flag) {
            return null;
        }
        return Arrays.asList("qingshan", "jack", "seven");
    }

    @Override
    public void processData(ShardingContext shardingContext, List<String> data) {
        for (String val : data) {
            // 處理完數(shù)據(jù)要移除掉,不然就會(huì)一直跑,處理可以在上面的方法里執(zhí)行。這里采用 flag
            log.info("開(kāi)始處理數(shù)據(jù):" + val);
        }
        flag = true;
    }
}

接著創(chuàng)建MyDataFlowJob的配置類,將其注入到zookeeper注冊(cè)中心。

Configuration
public class MyDataFlowJobConfig {

    /**
     * 任務(wù)名稱
     */
    @Value("${dataflowJob.myDataflowJob.name}")
    private String jobName;

    /**
     * cron表達(dá)式
     */
    @Value("${dataflowJob.myDataflowJob.cron}")
    private String jobCron;

    /**
     * 作業(yè)分片總數(shù)
     */
    @Value("${dataflowJob.myDataflowJob.shardingTotalCount}")
    private int jobShardingTotalCount;

    /**
     * 作業(yè)分片參數(shù)
     */
    @Value("${dataflowJob.myDataflowJob.shardingItemParameters}")
    private String jobShardingItemParameters;

    /**
     * 自定義參數(shù)
     */
    @Value("${dataflowJob.myDataflowJob.jobParameters}")
    private String jobParameters;

    @Autowired
    private ZookeeperRegistryCenter registryCenter;


    @Bean
    public MyDataFlowJob myDataFlowJob() {
        return new MyDataFlowJob();
    }

    @Bean(initMethod = "init")
    public JobScheduler dataFlowJobScheduler(final MyDataFlowJob myDataFlowJob) {
        MyElasticJobListener elasticJobListener = new MyElasticJobListener();
        return new SpringJobScheduler(myDataFlowJob, registryCenter, getLiteJobConfiguration(), elasticJobListener);
    }

    private LiteJobConfiguration getLiteJobConfiguration() {
        // 定義作業(yè)核心配置
        JobCoreConfiguration dataflowCoreConfig = JobCoreConfiguration.newBuilder(jobName, jobCron, jobShardingTotalCount).
                shardingItemParameters(jobShardingItemParameters).jobParameter(jobParameters).build();
        // 定義DATAFLOW類型配置
        DataflowJobConfiguration dataflowJobConfig = new DataflowJobConfiguration(dataflowCoreConfig, MyDataFlowJob.class.getCanonicalName(), false);
        // 定義Lite作業(yè)根配置
        LiteJobConfiguration dataflowJobRootConfig = LiteJobConfiguration.newBuilder(dataflowJobConfig).overwrite(true).build();
        return dataflowJobRootConfig;

    }
}

最后,在配置文件application.properties中配置好對(duì)應(yīng)的myDataflowJob參數(shù)!

#dataflow類型的job
dataflowJob.myDataflowJob.name=myDataflowJob
dataflowJob.myDataflowJob.crnotallow=0/15 * * * * ?
dataflowJob.myDataflowJob.shardingTotalCount=1
dataflowJob.myDataflowJob.shardingItemParameters=0=a,1=b,2=c
dataflowJob.myDataflowJob.jobParameters=myDataflowJobParamter

運(yùn)行程序,看看效果如何?

圖片圖片

需要注意的地方是,如果配置的是流式處理類型,它會(huì)不停的拉取數(shù)據(jù)、處理數(shù)據(jù),在拉取的時(shí)候,如果返回為空,就不會(huì)處理數(shù)據(jù)!

如果配置的是非流式處理類型,和上面介紹的simpleJob類型,處理一樣!

3.8、新建 ScriptJob 類型作業(yè)

ScriptJob 類型的任務(wù)配置和上面類似,主要是用于定時(shí)執(zhí)行某個(gè)腳本,一般用的比較少!

因?yàn)槟繕?biāo)是腳本,沒(méi)有執(zhí)行的任務(wù),所以無(wú)需編寫任務(wù)作業(yè)類型!

只需要編寫一個(gè)ScriptJob類型的配置類即可,命令是echo 'Hello World !內(nèi)容!

@Configuration
public class MyScriptJobConfig {

    /**
     * 任務(wù)名稱
     */
    @Value("${scriptJob.myScriptJob.name}")
    private String jobName;

    /**
     * cron表達(dá)式
     */
    @Value("${scriptJob.myScriptJob.cron}")
    private String jobCron;

    /**
     * 作業(yè)分片總數(shù)
     */
    @Value("${scriptJob.myScriptJob.shardingTotalCount}")
    private int jobShardingTotalCount;

    /**
     * 作業(yè)分片參數(shù)
     */
    @Value("${scriptJob.myScriptJob.shardingItemParameters}")
    private String jobShardingItemParameters;

    /**
     * 自定義參數(shù)
     */
    @Value("${scriptJob.myScriptJob.jobParameters}")
    private String jobParameters;

    @Autowired
    private ZookeeperRegistryCenter registryCenter;


    @Bean(initMethod = "init")
    public JobScheduler scriptJobScheduler() {
        MyElasticJobListener elasticJobListener = new MyElasticJobListener();
        return new JobScheduler(registryCenter, getLiteJobConfiguration(), elasticJobListener);
    }

    private LiteJobConfiguration getLiteJobConfiguration() {
        // 定義作業(yè)核心配置
        JobCoreConfiguration scriptCoreConfig = JobCoreConfiguration.newBuilder(jobName, jobCron, jobShardingTotalCount).
                shardingItemParameters(jobShardingItemParameters).jobParameter(jobParameters).build();
        // 定義SCRIPT類型配置
        ScriptJobConfiguration scriptJobConfig = new ScriptJobConfiguration(scriptCoreConfig, "echo 'Hello World !'");
        // 定義Lite作業(yè)根配置
        LiteJobConfiguration scriptJobRootConfig = LiteJobConfiguration.newBuilder(scriptJobConfig).overwrite(true).build();
        return scriptJobRootConfig;

    }
}

在配置文件application.properties中配置好對(duì)應(yīng)的myScriptJob參數(shù)!

#script類型的job
scriptJob.myScriptJob.name=myScriptJob
scriptJob.myScriptJob.crnotallow=0/15 * * * * ?
scriptJob.myScriptJob.shardingTotalCount=3
scriptJob.myScriptJob.shardingItemParameters=0=a,1=b,2=c
scriptJob.myScriptJob.jobParameters=myScriptJobParamter

運(yùn)行程序,看看效果如何?

圖片圖片

3.9、將任務(wù)狀態(tài)持久化到數(shù)據(jù)庫(kù)

可能有的人會(huì)發(fā)出疑問(wèn),elastic-job是如何存儲(chǔ)數(shù)據(jù)的,用ZooInspector客戶端鏈接zookeeper注冊(cè)中心,你發(fā)現(xiàn)對(duì)應(yīng)的任務(wù)配置被存儲(chǔ)到相應(yīng)的樹(shù)根上!

圖片圖片

而具體作業(yè)任務(wù)執(zhí)行軌跡和狀態(tài)結(jié)果是不會(huì)存儲(chǔ)到zookeeper,需要我們?cè)陧?xiàng)目中通過(guò)數(shù)據(jù)源方式進(jìn)行持久化!

將任務(wù)狀態(tài)持久化到數(shù)據(jù)庫(kù)配置過(guò)程也很簡(jiǎn)單,只需要在對(duì)應(yīng)的配置類上注入數(shù)據(jù)源即可,以MySimpleJobConfig為例,代碼如下:

@Configuration
public class MySimpleJobConfig {

    /**
     * 任務(wù)名稱
     */
    @Value("${simpleJob.mySimpleJob.name}")
    private String mySimpleJobName;

    /**
     * cron表達(dá)式
     */
    @Value("${simpleJob.mySimpleJob.cron}")
    private String mySimpleJobCron;

    /**
     * 作業(yè)分片總數(shù)
     */
    @Value("${simpleJob.mySimpleJob.shardingTotalCount}")
    private int mySimpleJobShardingTotalCount;

    /**
     * 作業(yè)分片參數(shù)
     */
    @Value("${simpleJob.mySimpleJob.shardingItemParameters}")
    private String mySimpleJobShardingItemParameters;

    /**
     * 自定義參數(shù)
     */
    @Value("${simpleJob.mySimpleJob.jobParameters}")
    private String mySimpleJobParameters;

    @Autowired
    private ZookeeperRegistryCenter registryCenter;

    @Autowired
    private DataSource dataSource;;


    @Bean
    public MySimpleJob stockJob() {
        return new MySimpleJob();
    }

    @Bean(initMethod = "init")
    public JobScheduler simpleJobScheduler(final MySimpleJob mySimpleJob) {
        //添加事件數(shù)據(jù)源配置
        JobEventConfiguration jobEventConfig = new JobEventRdbConfiguration(dataSource);
        MyElasticJobListener elasticJobListener = new MyElasticJobListener();
        return new SpringJobScheduler(mySimpleJob, registryCenter, getLiteJobConfiguration(), jobEventConfig, elasticJobListener);
    }

    private LiteJobConfiguration getLiteJobConfiguration() {
        // 定義作業(yè)核心配置
        JobCoreConfiguration simpleCoreConfig = JobCoreConfiguration.newBuilder(mySimpleJobName, mySimpleJobCron, mySimpleJobShardingTotalCount).
                shardingItemParameters(mySimpleJobShardingItemParameters).jobParameter(mySimpleJobParameters).build();
        // 定義SIMPLE類型配置
        SimpleJobConfiguration simpleJobConfig = new SimpleJobConfiguration(simpleCoreConfig, MySimpleJob.class.getCanonicalName());
        // 定義Lite作業(yè)根配置
        LiteJobConfiguration simpleJobRootConfig = LiteJobConfiguration.newBuilder(simpleJobConfig).overwrite(true).build();
        return simpleJobRootConfig;

    }
}

同時(shí),需要在配置文件application.properties中配置好對(duì)應(yīng)的datasource參數(shù)!

spring.datasource.url=jdbc:mysql://127.0.0.1:3306/example-elastic-job-test
spring.datasource.username=root
spring.datasource.password=root
spring.datasource.driver-class-name=com.mysql.jdbc.Driver

運(yùn)行程序,然后在elastic-job-lite-console控制臺(tái)配置對(duì)應(yīng)的數(shù)據(jù)源!

圖片圖片

最后,點(diǎn)擊【作業(yè)軌跡】即可查看對(duì)應(yīng)作業(yè)執(zhí)行情況!

圖片圖片

圖片圖片

四、小結(jié)

本文主要圍繞elasticjob的使用進(jìn)行簡(jiǎn)單介紹,希望大家有所收獲!

在分布式環(huán)境環(huán)境下,elastic-job-lite支持的彈性擴(kuò)容、任務(wù)分片是最大的亮點(diǎn),在實(shí)際使用的時(shí)候,任務(wù)分片總數(shù)盡可能大于服務(wù)實(shí)例個(gè)數(shù),并且是倍數(shù)關(guān)系,這樣任務(wù)在分片的時(shí)候,會(huì)更加均勻!

如果想深入的了解elasticjob,大家可以訪問(wèn)官方文檔,獲取更加詳細(xì)的使用教程!

五、參考

1、https://shardingsphere.apache.org

2、https://www.cnblogs.com/wuzhenzhao/p/13299497.html

責(zé)任編輯:武曉燕 來(lái)源: 潘志的技術(shù)筆記
相關(guān)推薦

2021-01-28 07:32:14

框架分布式調(diào)度

2019-11-12 09:32:39

分布式elastic-job分片

2023-05-08 16:38:46

任務(wù)調(diào)度分布式任務(wù)調(diào)度

2023-01-04 09:23:58

2023-01-13 07:39:07

2020-07-17 09:33:39

CPU內(nèi)存調(diào)度

2023-11-07 07:56:40

2022-06-20 15:32:55

Stage模型分布式開(kāi)發(fā)

2020-09-29 19:20:05

鴻蒙

2023-06-26 00:14:28

Openjob分布式任務(wù)

2019-11-15 10:16:27

分布式任務(wù)框架

2022-01-27 08:44:58

調(diào)度系統(tǒng)開(kāi)源

2020-11-06 12:12:35

HarmonyOS

2022-12-29 08:32:50

xxl-job緩存Schedule

2022-06-27 08:21:05

Seata分布式事務(wù)微服務(wù)

2022-01-10 11:58:51

SpringBootPulsar分布式

2022-06-13 07:43:21

分布式Spring

2021-11-10 16:10:18

鴻蒙HarmonyOS應(yīng)用

2015-11-05 10:51:35

當(dāng)當(dāng)分布式調(diào)度開(kāi)源項(xiàng)目

2024-11-04 08:02:23

SpringRabbitMQ中間件
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)