自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

Spring Boot 打造全能異步處理方案,簡單高效!

開發(fā) 前端
通過本文,開發(fā)者不僅可以掌握如何在 Spring Boot 中高效實現(xiàn)異步任務(wù)管理,還可以學(xué)到如何編寫更加清晰、可維護、易擴展的代碼。

在系統(tǒng)設(shè)計中,遵循“開閉原則”是良好實踐。隨著業(yè)務(wù)不斷演化和更新,核心代碼的頻繁改動不僅可能引入更多錯誤風險,還可能影響整體系統(tǒng)穩(wěn)定性。盡管新增的功能大多是對現(xiàn)有功能的擴展,但如何在保證性能和質(zhì)量的前提下平穩(wěn)實現(xiàn)這些更新,成為了一大挑戰(zhàn)。為了解決這一問題,我設(shè)計了一套通用的異步處理SDK,能夠高效地實現(xiàn)各種異步任務(wù)的處理需求。

通過該SDK,不僅可以確保方法正常執(zhí)行、不影響主業(yè)務(wù)流程,還能通過可靠的兜底方案保障數(shù)據(jù)不丟失,實現(xiàn)最終一致性。

設(shè)計優(yōu)勢

  1. 無侵入性:獨立的數(shù)據(jù)庫、定時任務(wù)、消息隊列及人工操作界面(支持統(tǒng)一登錄認證)。
  2. 事務(wù)安全:基于Spring的事務(wù)事件機制,異步策略解析失敗時不影響主業(yè)務(wù)流程。
  3. 完善的兜底方案:即使異步策略解析失?。ㄈ缡聞?wù)提交后或回滾后),也有多層次的補償機制(除非數(shù)據(jù)庫或隊列存在問題)。

工作原理

  1. 注解緩存:容器初始化時,掃描并緩存所有帶有@AsyncExec注解的方法。
  2. AOP切面:方法執(zhí)行時通過AOP切面發(fā)布事件。
  3. 事務(wù)監(jiān)聽:通過@TransactionalEventListener實現(xiàn)事務(wù)事件監(jiān)聽,處理異步策略。

事務(wù)事件監(jiān)聽核心代碼

@TransactionalEventListener(fallbackExecution = true, phase = TransactionPhase.AFTER_COMPLETION)
  • fallbackExecution = true:即使沒有事務(wù),也能處理事件。
  • TransactionPhase.AFTER_COMPLETION:事務(wù)提交或回滾后均會處理事件。

核心組件

  • 消息隊列:Kafka
  • 定時任務(wù):XXL Job
  • 數(shù)據(jù)庫:MySQL
  • 前端界面:基于Vue實現(xiàn)
  • 設(shè)計模式:策略模式、模板方法、動態(tài)代理

以下是一個完整的 Spring Boot 項目示例,展示如何使用 Kafka 消息隊列、XXL Job 定時任務(wù)、MySQL 數(shù)據(jù)庫和 Vue 前端界面,并結(jié)合策略模式、模板方法和動態(tài)代理實現(xiàn)通用異步處理功能。

項目結(jié)構(gòu)

核心模塊

  1. 任務(wù)狀態(tài)枚舉 (TaskStatus):定義任務(wù)的生命周期狀態(tài)。
  2. 任務(wù)實體 (AsyncTask):數(shù)據(jù)庫表對應(yīng)的實體類,記錄任務(wù)的執(zhí)行信息。
  3. 任務(wù)處理策略 (AsyncStrategy):定義任務(wù)的執(zhí)行邏輯,支持動態(tài)擴展。
  4. 任務(wù)調(diào)度器 (AsyncTaskScheduler):調(diào)度任務(wù)執(zhí)行并管理任務(wù)狀態(tài)。
  5. 任務(wù)監(jiān)控器 (TaskMonitorService):實時監(jiān)控任務(wù)狀態(tài),提供告警能力。

數(shù)據(jù)庫設(shè)計

任務(wù)表 DDL

CREATE TABLE async_task (
    id BIGINTAUTO_INCREMENTPRIMARYKEY,
    task_name VARCHAR(255)NOTNULLCOMMENT'任務(wù)名稱',
    task_status INTNOTNULLDEFAULT0COMMENT'任務(wù)狀態(tài)',
    task_param TEXTCOMMENT'任務(wù)參數(shù)',
    retry_count INTDEFAULT0COMMENT'重試次數(shù)',
    create_time DATETIMEDEFAULTCURRENT_TIMESTAMPCOMMENT'創(chuàng)建時間',
    update_time DATETIMEDEFAULTCURRENT_TIMESTAMPONUPDATECURRENT_TIMESTAMPCOMMENT'更新時間'
);

核心代碼實現(xiàn)

任務(wù)狀態(tài)枚舉

package com.icoderoad.async.enums;
public enum TaskStatus {
    INITIAL(0, "初始化"),
    PROCESSING(1, "執(zhí)行中"),
    SUCCESS(2, "執(zhí)行成功"),
    FAILED(3, "執(zhí)行失敗");


    private final int code;
    private final String description;


    TaskStatus(int code, String description) {
        this.code = code;
        this.description = description;
    }


    public int getCode() {
        return code;
    }


    public String getDescription() {
        return description;
    }


    public static TaskStatus fromCode(int code) {
        for (TaskStatus status : values()) {
            if (status.code == code) {
                return status;
            }
        }
        throw new IllegalArgumentException("未知任務(wù)狀態(tài)代碼:" + code);
    }
}

任務(wù)實體

package com.icoderoad.async.entity;
@Data
@TableName("async_task")
public class AsyncTask {
    private Long id;
    private String taskName;
    private TaskStatus taskStatus;
    private String taskParam;
    private Integer retryCount;
    private LocalDateTime createTime;
    private LocalDateTime updateTime;
}

任務(wù)策略接口

package com.icoderoad.async.strategy;
public interface AsyncStrategy {
    void execute(String taskParam);
}

任務(wù)執(zhí)行服務(wù)

package com.icoderoad.async.service;


import com.icoderoad.async.entity.AsyncTask;
import com.icoderoad.async.enums.TaskStatus;
import com.icoderoad.async.repository.AsyncTaskRepository;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;


import java.time.LocalDateTime;
import java.util.List;
import java.util.Optional;


@Service
public class AsyncTaskService {


    private final AsyncTaskRepository asyncTaskRepository;


    @Autowired
    public AsyncTaskService(AsyncTaskRepository asyncTaskRepository) {
        this.asyncTaskRepository = asyncTaskRepository;
    }


    /**
     * 獲取所有異步任務(wù)
     */
    public List<AsyncTask> getAllTasks() {
        return asyncTaskRepository.findAll();
    }


    /**
     * 重試任務(wù)
     *
     * @param taskId 任務(wù)ID
     * @return 是否重試成功
     */
    @Transactional
    public boolean retryTask(Long taskId) {
        Optional<AsyncTask> optionalTask = asyncTaskRepository.findById(taskId);
        if (optionalTask.isPresent()) {
            AsyncTask task = optionalTask.get();


            // 檢查任務(wù)是否允許重試
            if (task.getTaskStatus() == TaskStatus.FAILED) {
                task.setTaskStatus(TaskStatus.INITIAL); // 將狀態(tài)重置為初始化
                task.setRetryCount(task.getRetryCount() + 1); // 增加重試次數(shù)
                task.setUpdateTime(LocalDateTime.now());
                asyncTaskRepository.save(task);
                return true;
            }
        }
        return false; // 任務(wù)不存在或狀態(tài)異常
    }


    /**
     * 創(chuàng)建新異步任務(wù)
     *
     * @param taskName  任務(wù)名稱
     * @param taskParam 任務(wù)參數(shù)
     */
    @Transactional
    public void createAsyncTask(String taskName, String taskParam) {
        AsyncTask newTask = new AsyncTask();
        newTask.setTaskName(taskName);
        newTask.setTaskParam(taskParam);
        newTask.setTaskStatus(TaskStatus.INITIAL);
        newTask.setRetryCount(0);
        newTask.setCreateTime(LocalDateTime.now());
        newTask.setUpdateTime(LocalDateTime.now());
        asyncTaskRepository.save(newTask);
    }


    /**
     * 獲取待執(zhí)行的任務(wù)(供調(diào)度器使用)
     */
    public List<AsyncTask> getPendingTasks() {
        return asyncTaskRepository.findByTaskStatus(TaskStatus.INITIAL);
    }


    /**
     * 更新任務(wù)狀態(tài)
     *
     * @param task 更新后的任務(wù)
     */
    @Transactional
    public void updateTask(AsyncTask task) {
        task.setUpdateTime(LocalDateTime.now());
        asyncTaskRepository.save(task);
    }
}

任務(wù)調(diào)度器

@Component
public class AsyncTaskScheduler {


    private final AsyncTaskService asyncTaskService;
    private final TaskMonitorService taskMonitorService;


    @Scheduled(fixedRate = 5000)
    public void scheduleTasks() {
        List<AsyncTask> tasks = asyncTaskService.getPendingTasks();
        tasks.forEach(task -> {
            try {
                asyncTaskService.executeTask(task);
                task.setTaskStatus(TaskStatus.SUCCESS);
            } catch (Exception e) {
                task.setTaskStatus(TaskStatus.FAILED);
                taskMonitorService.alertOnFailedTask(task);
            }
            asyncTaskService.updateTask(task);
        });
    }
}

配置文件

application.yml

spring:
  kafka:
    bootstrap-servers: localhost:9092
    consumer:
      group-id: async-tasks-group
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer

Controller 類代碼

package com.icoderoad.async.controller;


import com.icoderoad.async.dto.AsyncTaskDto;
import com.icoderoad.async.entity.AsyncTask;
import com.icoderoad.async.service.AsyncTaskService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;


import java.util.List;
import java.util.stream.Collectors;


@RestController
@RequestMapping("/api/tasks")
public class AsyncTaskController {


    private final AsyncTaskService asyncTaskService;


    @Autowired
    public AsyncTaskController(AsyncTaskService asyncTaskService) {
        this.asyncTaskService = asyncTaskService;
    }


    /**
     * 獲取所有異步任務(wù)
     */
    @GetMapping
    public ResponseEntity<List<AsyncTaskDto>> getAllTasks() {
        List<AsyncTask> tasks = asyncTaskService.getAllTasks();
        List<AsyncTaskDto> taskDtos = tasks.stream()
                .map(task -> new AsyncTaskDto(task.getId(), task.getTaskName(), task.getTaskStatus(), task.getTaskParam()))
                .collect(Collectors.toList());
        return ResponseEntity.ok(taskDtos);
    }


    /**
     * 根據(jù) ID 重試任務(wù)
     */
    @PostMapping("/{id}/retry")
    public ResponseEntity<String> retryTask(@PathVariable Long id) {
        boolean result = asyncTaskService.retryTask(id);
        return result ? ResponseEntity.ok("任務(wù)重試成功") : ResponseEntity.badRequest().body("任務(wù)重試失敗,任務(wù)可能不存在或狀態(tài)異常");
    }


    /**
     * 創(chuàng)建新異步任務(wù)
     */
    @PostMapping
    public ResponseEntity<String> createTask(@RequestParam String taskName, @RequestParam String taskParam) {
        asyncTaskService.createAsyncTask(taskName, taskParam);
        return ResponseEntity.ok("異步任務(wù)創(chuàng)建成功");
    }
}

DTO 類代碼

用于返回給前端的任務(wù)數(shù)據(jù)傳輸對象。

package com.icoderoad.async.dto;


import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;


@Data
@AllArgsConstructor
@NoArgsConstructor
public class AsyncTaskDto {
    private Long id;
    private String taskName;
    private Integer taskStatus;
    private String taskParam;
}

前端界面 (Vue.js)

異步任務(wù)列表
<template>
  <div>
    <h1>異步任務(wù)管理</h1>
    <table>
      <thead>
        <tr>
          <th>任務(wù)名稱</th>
          <th>狀態(tài)</th>
          <th>參數(shù)</th>
          <th>操作</th>
        </tr>
      </thead>
      <tbody>
        <tr v-for="task in tasks" :key="task.id">
          <td>{{ task.taskName }}</td>
          <td>{{ task.taskStatus }}</td>
          <td>{{ task.taskParam }}</td>
          <td><button @click="retryTask(task.id)">重試</button></td>
        </tr>
      </tbody>
    </table>
  </div>
</template>

<script>
export default {
  data() {
    return {
      tasks: [],
    };
  },
  methods: {
    fetchTasks() {
      axios.get('/api/tasks').then(response => {
        this.tasks = response.data;
      });
    },
    retryTask(taskId) {
      axios.post(`/api/tasks/${taskId}/retry`);
    },
  },
  mounted() {
    this.fetchTasks();
  },
};
</script>

總結(jié)

本文深入探討了基于 Spring Boot 開發(fā)異步任務(wù)管理功能的實現(xiàn)方法,涵蓋了從控制器設(shè)計到服務(wù)層邏輯優(yōu)化的全過程。通過清晰的代碼示例和詳細的講解,讀者可以輕松掌握以下關(guān)鍵內(nèi)容:

1.異步任務(wù)管理的核心功能:

  • 實現(xiàn)了任務(wù)的增刪改查、狀態(tài)管理,以及失敗任務(wù)的重試機制,確保異步任務(wù)生命周期的完整性。

2.面向業(yè)務(wù)場景的邏輯優(yōu)化:

  • 針對任務(wù)狀態(tài)進行了明確的校驗與約束,通過 TaskStatus 枚舉提升代碼的可讀性和維護性。
  • 重試邏輯中考慮了任務(wù)的狀態(tài)異常場景,避免因錯誤操作導(dǎo)致任務(wù)狀態(tài)混亂。

3.面向開發(fā)實踐的細節(jié)增強:

  • 使用 Spring 的 @Transactional 注解確保數(shù)據(jù)操作的事務(wù)安全,避免并發(fā)修改導(dǎo)致數(shù)據(jù)不一致。
  • 在任務(wù)的創(chuàng)建、更新、重試操作中添加 createTime 和 updateTime 字段的動態(tài)更新,確保時間戳的準確性。
  • 提供了調(diào)度器支持的擴展方法,為后續(xù)的任務(wù)調(diào)度和自動化運行奠定基礎(chǔ)。

4.代碼解耦與擴展性設(shè)計:

  • 通過服務(wù)層和數(shù)據(jù)層的職責分離,實現(xiàn)了業(yè)務(wù)邏輯和數(shù)據(jù)訪問的解耦。
  • 使用 Spring Data JPA 提供的數(shù)據(jù)倉庫方法,使代碼更簡潔高效,并易于擴展其他查詢需求。

5.可靠性與易用性兼?zhèn)洌?/p>

  • 在功能實現(xiàn)的同時,確保代碼的健壯性和高可維護性。無論是單獨的功能測試,還是集成到更復(fù)雜的業(yè)務(wù)流程中,都能穩(wěn)定運行。

通過本文,開發(fā)者不僅可以掌握如何在 Spring Boot 中高效實現(xiàn)異步任務(wù)管理,還可以學(xué)到如何編寫更加清晰、可維護、易擴展的代碼。希望這篇文章能夠為您在實際開發(fā)中提供參考,并助力您設(shè)計出更優(yōu)雅的任務(wù)管理系統(tǒng)!

責任編輯:武曉燕 來源: 路條編程
相關(guān)推薦

2024-03-26 08:08:08

SpringBPMN模型

2025-02-07 11:32:20

2023-04-28 15:15:39

數(shù)據(jù)庫JPA

2025-03-26 08:28:36

2025-01-13 12:46:31

SpringBootJacksonJSON

2020-12-01 08:32:12

Spring Boot

2024-07-31 15:57:41

2024-10-15 10:28:43

2025-03-31 08:39:55

2019-01-15 11:40:14

開發(fā)技能代碼

2022-04-08 16:27:48

SpringBoot異常處理

2023-09-13 08:56:51

2024-12-17 00:00:00

Spring線程

2020-01-02 16:30:02

Spring BootJava異步請求

2009-06-17 16:39:03

Spring JMS

2022-09-29 09:19:04

線程池并發(fā)線程

2024-09-26 09:28:06

內(nèi)存Spring

2025-03-03 01:25:00

SpringAOP日志

2025-01-20 13:30:50

2025-02-05 12:28:44

點贊
收藏

51CTO技術(shù)棧公眾號