自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

項(xiàng)目終于用上了xxl-job,好起來(lái)了!

云計(jì)算 分布式
設(shè)計(jì)思想 是將調(diào)度行為抽象形成 調(diào)度中心 平臺(tái),平臺(tái)本身不承擔(dān)業(yè)務(wù)邏輯,而是負(fù)責(zé)發(fā)起 調(diào)度請(qǐng)求 后,由 執(zhí)行器 接收調(diào)度請(qǐng)求并執(zhí)行 任務(wù),這里的 任務(wù) 抽象為 分散的 JobHandler。通過(guò)這種方式即可實(shí)現(xiàn) 調(diào)度 與 任務(wù) 相互解耦,從而提高系統(tǒng)整體的穩(wěn)定性和拓展性。

本篇文章主要記錄項(xiàng)目中遇到的 xxl-job 的實(shí)戰(zhàn),希望能通過(guò)這篇文章告訴讀者們什么是 xxl-job 以及怎么使用 xxl-job 并分享一個(gè)實(shí)戰(zhàn)案例。

那么下面先說(shuō)明什么是 xxl-job 以及為什么要使用它。

xxl-job 是什么?

XXL-JOB 是一個(gè)分布式任務(wù)調(diào)度平臺(tái),其核心設(shè)計(jì)目標(biāo)是開(kāi)發(fā)迅速、學(xué)習(xí)簡(jiǎn)單、輕量級(jí)、易擴(kuò)展。

設(shè)計(jì)思想 是將調(diào)度行為抽象形成 調(diào)度中心 平臺(tái),平臺(tái)本身不承擔(dān)業(yè)務(wù)邏輯,而是負(fù)責(zé)發(fā)起 調(diào)度請(qǐng)求 后,由 執(zhí)行器 接收調(diào)度請(qǐng)求并執(zhí)行 任務(wù),這里的 任務(wù) 抽象為 分散的 JobHandler。通過(guò)這種方式即可實(shí)現(xiàn) 調(diào)度 與 任務(wù) 相互解耦,從而提高系統(tǒng)整體的穩(wěn)定性和拓展性。

為了更好理解,這里放一張官網(wǎng)的架構(gòu)圖:

圖片圖片

任務(wù)調(diào)度是什么?

在開(kāi)發(fā)項(xiàng)目時(shí)大家是否也遇到過(guò)類(lèi)似的場(chǎng)景問(wèn)題:

  • 系統(tǒng)需要定時(shí)在每天0點(diǎn)進(jìn)行數(shù)據(jù)備份。
  • 系統(tǒng)需要在活動(dòng)開(kāi)始前幾小時(shí)預(yù)熱執(zhí)行一些前置業(yè)務(wù)。
  • 系統(tǒng)需要定時(shí)對(duì) MQ 消息表的發(fā)送裝填,對(duì)發(fā)送失敗的 MQ 消息進(jìn)行補(bǔ)償重新發(fā)送。

這些場(chǎng)景問(wèn)題都可以通過(guò) 任務(wù)調(diào)度 來(lái)解決,任務(wù)調(diào)度指的是系統(tǒng)在約定的指定時(shí)間自動(dòng)去執(zhí)行指定的任務(wù)的過(guò)程。

單體系統(tǒng) 中有許多實(shí)現(xiàn) 任務(wù)調(diào)度 的方式,如多線(xiàn)程方式、Timer 類(lèi)、Spring Tasks 等等。這里比較常用的是 Spring Tasks(通過(guò) @EnableScheduling + @Scheduled 的注解可以自定義定時(shí)任務(wù),有興趣的可以去了解一下)

為什么需要分布式任務(wù)調(diào)度平臺(tái)?

分布式下,每個(gè)服務(wù)都可以搭建為集群,這樣的好處是可以將任務(wù)切片分給每一個(gè)服務(wù)從而實(shí)現(xiàn)并行執(zhí)行,提高任務(wù)調(diào)度的處理效率。那么為什么 分布式系統(tǒng) 不能使用 單體系統(tǒng) 的任務(wù)調(diào)度實(shí)現(xiàn)方式呢。

在集群服務(wù)下,如果還是使用每臺(tái)機(jī)器按照單體系統(tǒng)的任務(wù)調(diào)度實(shí)現(xiàn)方式實(shí)現(xiàn)的話(huà),會(huì)出現(xiàn)下面這四個(gè)問(wèn)題:

  1. 怎么做到對(duì)任務(wù)的控制(如何避免任務(wù)重復(fù)執(zhí)行)。
  2. 如果某臺(tái)機(jī)器宕機(jī)了,會(huì)不會(huì)存在任務(wù)丟失。
  3. 如果要增加服務(wù)實(shí)例,怎么做到彈性擴(kuò)容。
  4. 如何做到對(duì)任務(wù)調(diào)度的執(zhí)行情況統(tǒng)一監(jiān)測(cè)。

通過(guò)上面的問(wèn)題可以了解到分布式系統(tǒng)下需要一個(gè)滿(mǎn)足高可用、容錯(cuò)管理、負(fù)載均衡等功能的任務(wù)調(diào)度平臺(tái)來(lái)實(shí)現(xiàn)任務(wù)調(diào)度。分布式系統(tǒng)下,也有許多可以實(shí)現(xiàn)任務(wù)調(diào)度的第三方的分布式任務(wù)調(diào)度系統(tǒng),如 xxl-job、Quartz、elastic-job 等等常用的分布式任務(wù)調(diào)度系統(tǒng)。

如何使用 xxl-job

作為開(kāi)源軟件的 xxl-job,可以在 github 或 gitee上查看和下載 xxl-job 的源碼。

下面將介紹我使用 xxl-job 的流程(如果有操作不當(dāng)?shù)模梢圆榭垂俜降闹形奈臋n:https://www.xuxueli.com/xxl-job)

dokcer 下安裝 xxl-job:

1、docker 下拉取 xxl-job 的鏡像(這里使用 2.3.1 版本)

docker pull xuxueli/xxl-job-admin:2.3.1

2、創(chuàng)建映射容器的文件目錄

mkdir -p -m 777 /mydata/xxl-job/data/applogs

3、在 /mydata/xxl-job 的目錄下創(chuàng)建 application.properties 文件

由于 application.properties 的代碼過(guò)長(zhǎng),這里就不展示了,需要的可以去 gitee 上獲取,具體路徑如圖:

圖片圖片

這里需要注意數(shù)據(jù)庫(kù)位置的填寫(xiě):

圖片圖片

如果還需要更改端口的可以更改這里:

圖片圖片

這里還需要注意告警郵箱和訪(fǎng)問(wèn)口令(后續(xù)Spring Boot配置用到):

圖片圖片

4、將 tables_xxl-job.sql 文件導(dǎo)入上面步驟3指定的數(shù)據(jù)庫(kù)(自己填寫(xiě)的那個(gè)數(shù)據(jù)庫(kù))

同樣由于文件代碼過(guò)長(zhǎng),這里展示 gitee 上獲取的路徑圖:

圖片圖片

5、執(zhí)行 docker 命令

注意這里的 -p 8088:8088 是因?yàn)槲腋牧饲懊?application.porperties 文件的端口號(hào)為 8088,所以這里我執(zhí)行的 docker 命令為 -p 8088:8088,如果沒(méi)有更改的這里一定要改為 -p 8080:8080。

docker run  -p 8088:8088 \
-d --name=xxl-job-admin --restart=always \
-v /mydata/xxl-job/application.properties:/application.properties \
-v /mydata/xxl-job/data/applogs:/data/applogs \
-e PARAMS='--spring.config.locatinotallow=/application.properties' xuxueli/xxl-job-admin:2.3.1

執(zhí)行后通過(guò) docker ps 查看是否成功運(yùn)行,如果失敗可以通過(guò) docker logs xxl-job-admin 查看具體錯(cuò)誤日志。

6、通過(guò) http://192.168.101.25:8088/xxl-job-admin/ 訪(fǎng)問(wèn)(這里ip和端口是自己的)

圖片圖片

賬號(hào):admin 密碼:123456

到這里就算是完成了 xxl-job 在 docker 的搭建。

Spring Boot 項(xiàng)目集成 xxl-job

xxl-job 由 調(diào)度中心 和 執(zhí)行器 組成,上面已經(jīng)完成了在 docker 上部署調(diào)度中心了,接下來(lái)介紹怎么配置部署執(zhí)行器項(xiàng)目。

1、在 Spring Boot 項(xiàng)目中導(dǎo)入 maven 依賴(lài)

<dependency>
    <groupId>com.xuxueli</groupId>
    <artifactId>xxl-job-core</artifactId>
    <version>2.3.1</version>
</dependency>

這里需要注意版本號(hào)與 xxl-job 版本需要一致,這里我配置的都是 2.3.1 版本。

2、在 Spring Boot 項(xiàng)目中配置 application.yml 文件

xxl:
  job:
    admin:
      addresses: http://192.168.101.25:8088/xxl-job-admin
    executor:
      appname: media-process-service
      address:
      ip:
      port: 9999
      logpath: /data/applogs/xxl-job/jobhandler
      logretentiondays: 30
    accessToken: default_token
  • 這里的 xxl.job.admin.addresses 用于指定調(diào)度中心的地址。
  • 這里的 xxl.job.accessToken 用于指定訪(fǎng)問(wèn)口令(也就是前面搭建 xxl-job 中步驟3指定的)。
  • 這里的 xxl.job.executor.appname 用于指定執(zhí)行器的名稱(chēng)(需要與后續(xù)配置執(zhí)行器的名稱(chēng)一致)。關(guān)注工眾號(hào):碼猿技術(shù)專(zhuān)欄,回復(fù)關(guān)鍵詞:1111 獲取阿里內(nèi)部Java性能調(diào)優(yōu)手冊(cè)!
  • 這里的 xxl.job.executor.port 用于指定執(zhí)行器的端口(執(zhí)行器實(shí)際上是一個(gè)內(nèi)嵌的 Server,默認(rèn)端口為9999,配置多個(gè)同一服務(wù)實(shí)例時(shí)需要指定不同的執(zhí)行器端口,否則會(huì)端口沖突)。
  • 其他屬性只需要照著配置即可(想要了解屬性的具體含義可以查看中文文檔中的2.4配置部署執(zhí)行器項(xiàng)目章節(jié))。

3、編寫(xiě)配置類(lèi)

/**
 * XXL-JOB配置類(lèi)
 *
 * @author 公眾號(hào):碼猿技術(shù)專(zhuān)欄
 */
@Slf4j
@Configuration
public class XxlJobConfig {
    @Value("${xxl.job.admin.addresses}")
    private String adminAddresses;

    @Value("${xxl.job.accessToken}")
    private String accessToken;

    @Value("${xxl.job.executor.appname}")
    private String appname;

    @Value("${xxl.job.executor.address}")
    private String address;

    @Value("${xxl.job.executor.ip}")
    private String ip;

    @Value("${xxl.job.executor.port}")
    private int port;

    @Value("${xxl.job.executor.logpath}")
    private String logPath;

    @Value("${xxl.job.executor.logretentiondays}")
    private int logRetentionDays;

    @Bean
    public XxlJobSpringExecutor xxlJobExecutor() {
        log.info(">>>>>>>>>>> xxl-job config init.");
        XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();
        xxlJobSpringExecutor.setAdminAddresses(adminAddresses);
        xxlJobSpringExecutor.setAppname(appname);
        xxlJobSpringExecutor.setAddress(address);
        xxlJobSpringExecutor.setIp(ip);
        xxlJobSpringExecutor.setPort(port);
        xxlJobSpringExecutor.setAccessToken(accessToken);
        xxlJobSpringExecutor.setLogPath(logPath);
        xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays);
        return xxlJobSpringExecutor;
    }
}

4、調(diào)度中心中新增執(zhí)行器

圖片圖片

執(zhí)行器的配置屬性:

  • AppName: 每個(gè)執(zhí)行器集群的唯一標(biāo)示 AppName,執(zhí)行器會(huì)周期性以 AppName 為對(duì)象進(jìn)行自動(dòng)注冊(cè)??赏ㄟ^(guò)該配置自動(dòng)發(fā)現(xiàn)注冊(cè)成功的執(zhí)行器,供任務(wù)調(diào)度時(shí)使用。
  • 名稱(chēng): 執(zhí)行器的名稱(chēng)(可以使用中文更好地體現(xiàn)該執(zhí)行器是用來(lái)干嘛的)。
  • 注冊(cè)方式:調(diào)度中心獲取執(zhí)行器地址的方式(一般為了方便可以選用自動(dòng)注冊(cè)即可)。
  • 自動(dòng)注冊(cè):執(zhí)行器自動(dòng)進(jìn)行執(zhí)行器注冊(cè),調(diào)度中心通過(guò)底層注冊(cè)表可以動(dòng)態(tài)發(fā)現(xiàn)執(zhí)行器機(jī)器地址。
  • 手動(dòng)錄入:人工手動(dòng)錄入執(zhí)行器的地址信息,多地址逗號(hào)分隔,供調(diào)度中心使用。
  • 機(jī)器地址:"注冊(cè)方式"為"手動(dòng)錄入"時(shí)有效,支持人工維護(hù)執(zhí)行器的地址信息。

5、配置自定義任務(wù)

配置自定義任務(wù)有許多種模式,如 Bean模式(基于方法)、Bean模式(基于類(lèi))、GLUE模式等等。這里介紹通過(guò) Bean模式(基于方法) 是如何自定義任務(wù)的(對(duì)于其余的模式可以參考官方文檔)。

Bean模式(基于方法)也就是每個(gè)任務(wù)對(duì)應(yīng)一個(gè)方法,通過(guò)添加 @XxLJob(value="自定義JobHandler名稱(chēng)", init = "JobHandler初始化方法", destroy = "JobHandler銷(xiāo)毀方法") 注解即可完成定義。

/**
 * 任務(wù)處理類(lèi)
 *
 * @author 公眾號(hào):碼猿技術(shù)專(zhuān)欄
 */
@Component
public class TestJob {
    /**
     * 測(cè)試任務(wù)
     */
    @XxlJob("testHandler")
    public void testHandler() {
        XxlJobHelper.handleSuccess("本次測(cè)試任務(wù)調(diào)度成功");
    }
}
  • 通過(guò)注解也可以指定 初始化方法和銷(xiāo)毀方法,如果不填寫(xiě)可以直接寫(xiě)一個(gè) 自定義的JobHandler名稱(chēng) 用于后面在調(diào)度中心中配置任務(wù)時(shí)對(duì)應(yīng)任務(wù)的 JobHandler 屬性值。
  • 可以通過(guò) XxlJobHelper.log 來(lái)打印日志,通過(guò)調(diào)度中心可以查看執(zhí)行日志的情況。
  • 可以通過(guò) XxlJobHelper.handleFail 或 XxlJobHelper.handleSuccess 手動(dòng)設(shè)置任務(wù)調(diào)度的結(jié)果(不設(shè)置時(shí)默認(rèn)結(jié)果為成功狀態(tài),除非任務(wù)執(zhí)行時(shí)出現(xiàn)異常)。

6、調(diào)度中心中新增任務(wù)

圖片圖片

這里主要注意 Cron 表達(dá)式的時(shí)間配置以及 JobHandler 的值需要與自定義任務(wù)方法的注解上的 value 屬性值一致即可。

關(guān)于高級(jí)配置這里放一張中文文檔的詳細(xì)說(shuō)明(也可以直接去看文檔):

圖片圖片

需要搭建集群或過(guò)期策略等高級(jí)玩法時(shí)可以進(jìn)行配置。

到這里就完成了 SpringBoot 集成 xxl-job 實(shí)現(xiàn)分布式任務(wù)調(diào)度的全過(guò)程了,接下來(lái)會(huì)通過(guò)一個(gè)實(shí)戰(zhàn)案例來(lái)具體看看 xxl-job 的用處。

xxl-job 實(shí)戰(zhàn)

下面通過(guò)一個(gè)最近自己在跟著做的學(xué)習(xí)項(xiàng)目中使用到 xxl-job 的場(chǎng)景案例來(lái)具體了解一下如何利用 xxl-job 來(lái)實(shí)現(xiàn)任務(wù)調(diào)度。

實(shí)戰(zhàn)背景

當(dāng)前項(xiàng)目需要對(duì)上傳到分布式文件系統(tǒng) minio 中的視頻文件進(jìn)行統(tǒng)一格式的視頻轉(zhuǎn)碼操作,由于本身視頻轉(zhuǎn)碼操作會(huì)帶了很大的時(shí)間消耗以及 CPU 的開(kāi)銷(xiāo),所以考慮集群服務(wù)下使用 xxl-job 的方式以任務(wù)調(diào)度的方式定時(shí)處理視頻轉(zhuǎn)碼操作。

這樣可以帶來(lái)兩個(gè)好處:① 以任務(wù)調(diào)度的方式,可以使得視頻轉(zhuǎn)碼操作不會(huì)阻塞主線(xiàn)程,避免影響主要業(yè)務(wù)的吞吐量;關(guān)注工眾號(hào):碼猿技術(shù)專(zhuān)欄,回復(fù)關(guān)鍵詞:1111 獲取阿里內(nèi)部Java性能調(diào)優(yōu)手冊(cè)?、?以集群服務(wù)分片接收任務(wù)的方式,可以將任務(wù)均分給每個(gè)機(jī)器使得任務(wù)調(diào)度可以并行執(zhí)行,提高總?cè)蝿?wù)處理時(shí)間以及降低單臺(tái)機(jī)器 CPU 的開(kāi)銷(xiāo);

xxl-job 執(zhí)行流程圖

xxl-job實(shí)戰(zhàn).pngxxl-job實(shí)戰(zhàn).png

怎么將任務(wù)均分給每臺(tái)服務(wù)器?

由于任務(wù)執(zhí)行時(shí)間過(guò)長(zhǎng),需要搭建集群服務(wù)來(lái)做到并行任務(wù)調(diào)度,從而減小 CPU 的開(kāi)銷(xiāo),那么怎么均分任務(wù)呢?

利用 xxl-job 在集群部署時(shí),配置路由策略中選擇 分片廣播 的方式,可以使一次任務(wù)調(diào)度會(huì)廣播觸發(fā)集群中所有的執(zhí)行器執(zhí)行一次任務(wù),并且可以向系統(tǒng)傳遞分片參數(shù)。

圖片圖片

利用這一特性可以根據(jù) 當(dāng)前執(zhí)行器的分片序號(hào)和分片總數(shù) 來(lái)獲取對(duì)應(yīng)的任務(wù)記錄。

先來(lái)看看 Bean 模式下怎么獲取分片序號(hào)和分片總數(shù):

// 分片序號(hào)(當(dāng)前執(zhí)行器序號(hào))
int shardIndex = XxlJobHelper.getShardIndex();
// 分片總數(shù)(執(zhí)行器總數(shù))
int shardTotal = XxlJobHelper.getShardTotal();

有了這兩個(gè)屬性,當(dāng)執(zhí)行器掃描數(shù)據(jù)庫(kù)獲取記錄時(shí),可以根據(jù) 取模 的方式獲取屬于當(dāng)前執(zhí)行器的任務(wù),可以這樣編寫(xiě) sql 獲取任務(wù)記錄:

select * from media_process m
where m.id % #{shareTotal} = #{shareIndex}  
  and (m.status = '1' or m.status = '3')
  and m.fail_count < 3
limit #{count}

掃描任務(wù)表,根據(jù)任務(wù) id 對(duì)分片總數(shù) 取模 來(lái)實(shí)現(xiàn)對(duì)所有分片的均分任務(wù),通過(guò)判斷是否是當(dāng)前分片序號(hào),并且當(dāng)前任務(wù)狀態(tài)為 1(未處理)或 3(處理失?。┎⑶耶?dāng)前任務(wù)失敗次數(shù)小于3次時(shí)可以取得當(dāng)前任務(wù)。每次掃描只取出 count 個(gè)任務(wù)數(shù)(批量處理)。

圖片圖片

因此通過(guò) xxl-job 的分片廣播 + 取模 的方式即可實(shí)現(xiàn)對(duì)集群服務(wù)均分任務(wù)的操作。

怎么確保任務(wù)不會(huì)被重復(fù)消費(fèi)?

由于視頻轉(zhuǎn)碼本身處理時(shí)間就會(huì)比較長(zhǎng),所以更不允許服務(wù)重復(fù)執(zhí)行,雖然上面通過(guò)分片廣播+取模的方式提高了任務(wù)不會(huì)被重復(fù)執(zhí)行的機(jī)率,但是依舊存在如下情況:

如下圖,有三臺(tái)集群機(jī)器和六個(gè)任務(wù),剛開(kāi)始分配好了每臺(tái)機(jī)器兩個(gè)任務(wù),執(zhí)行器0正準(zhǔn)備執(zhí)行任務(wù)3時(shí),剛好執(zhí)行器2宕機(jī)了,此時(shí)執(zhí)行器1剛好執(zhí)行一次任務(wù),因?yàn)榉制倲?shù)減小,導(dǎo)致執(zhí)行器1重新分配到需要執(zhí)行的任務(wù)正好也是任務(wù)3,那么此時(shí)就會(huì)出現(xiàn)執(zhí)行器0和執(zhí)行器1都在執(zhí)行任務(wù)3的情況。

圖片圖片

那么這種情況就需要實(shí)現(xiàn)冪等性了,冪等性有很多種實(shí)現(xiàn)方法,有興趣了解的可以參考:接口冪等性的實(shí)現(xiàn)方案

這里使用樂(lè)觀(guān)鎖的方式實(shí)現(xiàn)冪等性,具體 sql 如下:

update media_process m
set m.status = '2'
where (m.status = '1' or m.status = '3')
  and m.fail_count < 3
  and m.id = #{id}

這里只需要依靠任務(wù)的狀態(tài)即可實(shí)現(xiàn)(未處理1;處理中2;處理失敗3;處理成功4),可以看到這里類(lèi)似于 CAS 的方式通過(guò)比較和設(shè)置的方式只有在狀態(tài)為未處理或處理失敗時(shí)才能設(shè)置為處理中。這樣在并發(fā)場(chǎng)景下,即使多個(gè)執(zhí)行器同時(shí)處理該任務(wù),也只有一個(gè)任務(wù)可以設(shè)置成功進(jìn)入處理任務(wù)階段。

為了真正達(dá)到冪等性,還需要設(shè)置一下 xxl-job 的調(diào)度過(guò)期策略和阻塞處理策略來(lái)保證真正的冪等性。分別設(shè)置為 忽略(調(diào)度過(guò)期后,忽略過(guò)期的任務(wù),從當(dāng)前時(shí)間開(kāi)始重新計(jì)算下次觸發(fā)時(shí)間) 和 丟棄后續(xù)調(diào)度(調(diào)度請(qǐng)求進(jìn)入單機(jī)執(zhí)行器后,發(fā)現(xiàn)執(zhí)行器存在運(yùn)行的調(diào)度任務(wù),本次請(qǐng)求將會(huì)被丟棄并標(biāo)記為失?。?/p>

圖片圖片

編寫(xiě)完成該功能所需的所有任務(wù)

1、分片視頻轉(zhuǎn)碼處理

代碼(這里的代碼只展示部分核心步驟代碼):

/**
 * 視頻轉(zhuǎn)碼處理任務(wù)
 */
@XxlJob("videoTranscodingHandler")
public void videoTranscodingHandler() throws InterruptedException {
    // 1. 分片獲取當(dāng)前執(zhí)行器需要執(zhí)行的所有任務(wù)
    List<MediaProcess> mediaProcessList = mediaProcessService.getMediaProcessList(shardIndex, shardTotal, count);
    // 通過(guò)JUC工具類(lèi)阻塞直到所有任務(wù)執(zhí)行完
    CountDownLatch countDownLatch = new CountDownLatch(mediaProcessList.size());
    // 遍歷所有任務(wù)
    mediaProcessList.forEach(mediaProcess -> {
        // 以多線(xiàn)程的方式執(zhí)行所有任務(wù)
        executor.execute(() -> {
            try {
                // 2. 嘗試搶占任務(wù)(通過(guò)樂(lè)觀(guān)鎖實(shí)現(xiàn))
                boolean res = mediaProcessService.startTask(id);
                if (!res) {
                    XxlJobHelper.log("任務(wù)搶占失敗,任務(wù)id{}", id);
                    return;
                }
                
                // 3. 從minio中下載視頻到本地
                File file = mediaFileService.downloadFileFromMinIO(bucket, objectName);
                // 下載失敗
                if (file == null) {
                    XxlJobHelper.log("下載視頻出錯(cuò),任務(wù)id:{},bucket:{},objectName:{}", id, bucket, objectName);
                    // 出現(xiàn)異常重置任務(wù)狀態(tài)為處理失敗等待下一次處理
                    mediaProcessService.saveProcessFinishStatus(id, Constants.MediaProcessCode.FAIL.getValue(), fileId, null, "下載視頻到本地失敗");
                    return;
                }
                
                // 4. 視頻轉(zhuǎn)碼
                String result = videoUtil.generateMp4();
                if (!result.equals("success")) {
                    XxlJobHelper.log("視頻轉(zhuǎn)碼失敗,原因:{},bucket:{},objectName:{},", result, bucket, objectName);
                    // 出現(xiàn)異常重置任務(wù)狀態(tài)為處理失敗等待下一次處理
                    mediaProcessService.saveProcessFinishStatus(id, Constants.MediaProcessCode.FAIL.getValue(), fileId, null, "視頻轉(zhuǎn)碼失敗");
                    return;
                }
                
                // 5. 上傳轉(zhuǎn)碼后的文件
                boolean b1 = mediaFileService.addMediaFilesToMinIO(new_File.getAbsolutePath(), "video/mp4", bucket, objectNameMp4);
                if (!b1) {
                    XxlJobHelper.log("上傳 mp4 到 minio 失敗,任務(wù)id:{}", id);
                    // 出現(xiàn)異常重置任務(wù)狀態(tài)為處理失敗等待下一次處理
                    mediaProcessService.saveProcessFinishStatus(id, Constants.MediaProcessCode.FAIL.getValue(), fileId, null, "上傳 mp4 文件到 minio 失敗");
                    return;
                }

                // 6. 更新任務(wù)狀態(tài)為成功
                mediaProcessService.saveProcessFinishStatus(id, Constants.MediaProcessCode.SUCCESS.getValue(), fileId, url, "創(chuàng)建臨時(shí)文件異常");
                
            } finally {
                countDownLatch.countDown();
            }
        });
    });
    // 阻塞直到所有方法執(zhí)行完成(30min后不再等待)
    countDownLatch.await(30, TimeUnit.MINUTES);
}

核心任務(wù) - 分片獲取任務(wù)后執(zhí)行視頻轉(zhuǎn)碼任務(wù),步驟如下:

  • 通過(guò) 分片廣播拿到的參數(shù)以取模的方式 獲取當(dāng)前執(zhí)行器所屬的任務(wù)記錄集合
  • 遍歷集合,以 多線(xiàn)程的方式 并發(fā)地執(zhí)行任務(wù)
  • 每次執(zhí)行任務(wù)前需要先通過(guò) 數(shù)據(jù)庫(kù)樂(lè)觀(guān)鎖的方式 搶占當(dāng)前任務(wù),搶占到才能執(zhí)行;關(guān)注工眾號(hào):碼猿技術(shù)專(zhuān)欄,回復(fù)關(guān)鍵詞:1111 獲取阿里內(nèi)部Java性能調(diào)優(yōu)手冊(cè)!
  • 執(zhí)行任務(wù)過(guò)程分為 分布式文件系統(tǒng)下載需要轉(zhuǎn)碼的視頻文件 -> 視頻轉(zhuǎn)碼 -> 上傳轉(zhuǎn)碼后的視頻 -> 更新任務(wù)狀態(tài)(處理成功)
  • 使用JUC工具類(lèi) CountDownLatch 實(shí)現(xiàn)所有任務(wù)執(zhí)行完后才退出方法
  • 中間使用 xxl-job 的日志記錄錯(cuò)誤信息和執(zhí)行結(jié)果

2、清理任務(wù)表中轉(zhuǎn)碼成功的任務(wù)的記錄并將其插入任務(wù)歷史表

由于任務(wù)表處理完任務(wù)后只是更新任務(wù)狀態(tài),這樣隨著任務(wù)增多會(huì)導(dǎo)致檢索起來(lái)時(shí)間消耗過(guò)大,所以使用任務(wù)調(diào)度的方式定期掃描任務(wù)表,將任務(wù)狀態(tài)為處理成功的任務(wù)刪除并重新插入任務(wù)歷史表中留存(由于代碼過(guò)于簡(jiǎn)單,這里就不做展示了)。

主要實(shí)現(xiàn)兩個(gè)功能:① 清理任務(wù)表中已成功處理的任務(wù);② 將處理成功的任務(wù)記錄插入歷史表中;

3、視頻補(bǔ)償機(jī)制

由于使用樂(lè)觀(guān)鎖會(huì)將任務(wù)狀態(tài)更新為處理中,如果此時(shí)執(zhí)行任務(wù)的執(zhí)行器(服務(wù))宕機(jī)了,會(huì)導(dǎo)致該任務(wù)記錄一直存在,因?yàn)闃?lè)觀(guān)鎖的原因別的執(zhí)行器也無(wú)法獲取,這個(gè)時(shí)候同樣需要使用任務(wù)調(diào)度的方式,定期掃描任務(wù)表,判斷任務(wù)是否處于處理中狀態(tài)并且任務(wù)創(chuàng)建時(shí)間遠(yuǎn)大于30分鐘,則說(shuō)明任務(wù)超時(shí)了,則是使用任務(wù)調(diào)度的方式重新更新任務(wù)的狀態(tài)為未處理,等待下一次視頻轉(zhuǎn)碼任務(wù)的調(diào)度處理。此外視頻補(bǔ)償機(jī)制任務(wù)調(diào)度還需要檢查是否存在任務(wù)最大次數(shù)已經(jīng)大于3次的,如果存在則交付給人工處理(由于代碼過(guò)于簡(jiǎn)單,這里就不做展示了)。

主要實(shí)現(xiàn)兩個(gè)功能:① 處理任務(wù)超時(shí)情況下的任務(wù),做出補(bǔ)償;② 處理失敗次數(shù)大于3次的任務(wù),做出補(bǔ)償;

測(cè)試并查看日志

準(zhǔn)備好的任務(wù)表記錄:

圖片圖片

啟動(dòng)三臺(tái)媒資服務(wù)器,并開(kāi)啟任務(wù):

圖片圖片

可以單獨(dú)查看每個(gè)任務(wù)的日志:

圖片圖片

通過(guò)日志中的執(zhí)行日志查看具體日志信息:

圖片圖片

可以看到直接為了測(cè)試改錯(cuò)的路徑導(dǎo)致下載視頻出錯(cuò):

圖片圖片

查看數(shù)據(jù)庫(kù)表的變化:

圖片圖片

到這里可以看到核心的視頻轉(zhuǎn)碼任務(wù)執(zhí)行成功,并且邏輯正確,能夠起到分布式任務(wù)調(diào)度的作用。

總結(jié)

這就是本次 xxl-job 實(shí)戰(zhàn)的全部?jī)?nèi)容了,寫(xiě)這篇文章主要是為了記錄一下項(xiàng)目中是如何使用 xxl-job 的,并且提供一種分片廣播均分任務(wù)的思路以及冪等性問(wèn)題如何處理,具體使用 xxl-job 還需根據(jù)自己項(xiàng)目的需求,遇到問(wèn)題可以參考官網(wǎng)。

責(zé)任編輯:武曉燕 來(lái)源: 碼猿技術(shù)專(zhuān)欄
相關(guān)推薦

2024-08-27 09:34:24

2022-12-19 08:32:57

項(xiàng)目Feign框架

2020-07-17 09:33:39

CPU內(nèi)存調(diào)度

2022-03-26 17:13:22

ElasticJobxxl-job分布式

2022-12-13 08:29:13

項(xiàng)目插入式注解

2024-09-14 09:59:04

2022-09-23 13:57:11

xxl-job任務(wù)調(diào)度中間件

2023-01-04 09:23:58

2023-11-30 22:06:43

2024-09-09 08:11:12

2024-07-31 08:18:40

2024-12-04 10:47:26

2021-12-26 00:03:27

響應(yīng)式編程異步

2022-01-27 08:44:58

調(diào)度系統(tǒng)開(kāi)源

2021-12-26 19:07:51

MySQL存儲(chǔ)容器

2022-12-29 08:32:50

xxl-job緩存Schedule

2023-06-27 07:44:53

xxl-job分布式任務(wù)調(diào)度平臺(tái)

2020-07-23 10:51:29

NginxWebApache

2023-11-07 07:56:40

2024-12-12 08:35:58

點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)