自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

DDD 架構(gòu),MQ 應(yīng)該放那一層使用?

開(kāi)發(fā) 前端
因?yàn)槲覀儽菊滤v解的內(nèi)容是把 RocketMQ 放入 DDD 架構(gòu)中進(jìn)行使用,那么也就引申出領(lǐng)域事件定義。所以我們先來(lái)了解下,什么是領(lǐng)域事件。

本文的宗旨在于通過(guò)簡(jiǎn)單干凈實(shí)踐的方式教會(huì)讀者,使用 Docker 配置 RocketMQ 并在基于 DDD 分層結(jié)構(gòu)的 SpringBoot 工程中使用 RocketMQ 技術(shù)。因?yàn)榇蟛糠?MQ 的發(fā)送都是基于特定業(yè)務(wù)場(chǎng)景的,所以本章節(jié)也是基于 《MyBatis 使用教程和插件開(kāi)發(fā)》 章節(jié)的擴(kuò)展。

本章也會(huì)包括關(guān)于 MQ 消息的發(fā)送和接收應(yīng)該處于 DDD 的哪一層的實(shí)踐講解和使用。

本文涉及的工程:

  • xfg-dev-tech-rocketmq:https://gitcode.net/KnowledgePlanet/road-map/xfg-dev-tech-rocketmq
  • RocketMQ Docker 安裝:rocketmq-docker-compose-mac-amd-arm.yml
  • 導(dǎo)入測(cè)試庫(kù)表 road-map.sql

一、案例背景

首先我們要知道,MQ 消息的作用是用于;解耦過(guò)長(zhǎng)的業(yè)務(wù)流程和應(yīng)對(duì)流量沖擊的消峰。如;用戶下單支付完成后,拿到支付消息推動(dòng)后續(xù)的發(fā)貨流程。也可以是我們基于 《MyBatis 使用教程和插件開(kāi)發(fā)》 中的案例場(chǎng)景,給雇員提升級(jí)別和薪資的時(shí)候,也發(fā)送一條MQ消息,用于發(fā)送郵件通知給用戶。

圖片圖片

  • 從薪資調(diào)整到郵件發(fā)送,這里是2個(gè)業(yè)務(wù)流程,通過(guò) MQ 消息的方式進(jìn)行連接。
  • 其實(shí)MQ消息的使用場(chǎng)景特別多,原來(lái)你可能使用多線程的一些操作,現(xiàn)在就擴(kuò)展為多實(shí)例的操作了。發(fā)送 MQ 消息出來(lái),讓?xiě)?yīng)用的各個(gè)實(shí)例接收并進(jìn)行消費(fèi)。

二、領(lǐng)域事件

因?yàn)槲覀儽菊滤v解的內(nèi)容是把 RocketMQ 放入 DDD 架構(gòu)中進(jìn)行使用,那么也就引申出領(lǐng)域事件定義。所以我們先來(lái)了解下,什么是領(lǐng)域事件。

領(lǐng)域事件,可以說(shuō)是解耦微服務(wù)設(shè)計(jì)的關(guān)鍵。領(lǐng)域事件也是領(lǐng)域模型中非常重要的一部分內(nèi)容,用于標(biāo)示當(dāng)前領(lǐng)域模型中發(fā)生的事件行為。一個(gè)領(lǐng)域事件會(huì)推進(jìn)業(yè)務(wù)流程的進(jìn)一步操作,在實(shí)現(xiàn)業(yè)務(wù)解耦的同時(shí),也推動(dòng)了整個(gè)業(yè)務(wù)的閉環(huán)。

圖片圖片

  • 首先,我們需要在領(lǐng)域模型層,添加一塊 event 區(qū)域。它的存在是為了定義出于當(dāng)前領(lǐng)域下所需的事件消息信息。信息的類型可以是model 下的實(shí)體對(duì)象、聚合對(duì)象。
  • 之后,消息的發(fā)送是放在基礎(chǔ)設(shè)置層。本身基礎(chǔ)設(shè)置層就是依賴倒置于模型層,所以在模型層所定義的 event 對(duì)象,可以很方便的在基礎(chǔ)設(shè)置層使用。而且大部分開(kāi)發(fā)的時(shí)候,MQ消息的發(fā)送與數(shù)據(jù)庫(kù)操作都是關(guān)聯(lián)的,采用的方式是,做完數(shù)據(jù)落庫(kù)后,推送MQ消息。所以定義在倉(cāng)儲(chǔ)中實(shí)現(xiàn),會(huì)更加得心應(yīng)手、水到渠成。
  • 最后,就是 MQ 的消息,MQ 的消費(fèi)可以是自身服務(wù)所發(fā)出的消息,也可以是外部其他微服務(wù)的消息。就在小傅哥所整體講述的這套簡(jiǎn)明教程中 DDD 部分的觸發(fā)器層。

三、環(huán)境安裝

本案例涉及了數(shù)據(jù)庫(kù)和RocketMQ的使用,都已經(jīng)在工程中提供了安裝腳本,可以按需執(zhí)行。

圖片圖片

這里主要介紹 RocketMQ 的安裝;

1. 執(zhí)行 compose yml

文件:docs/rocketmq/rocketmq-docker-compose-mac-amd-arm.yml - 關(guān)于安裝小傅哥提供了不同的鏡像,包括Mac、Mac M1、Windows 可以按需選擇使用。

version: '3'
services:
  # https://hub.docker.com/r/xuchengen/rocketmq
  # 注意修改項(xiàng);
  # 01:data/rocketmq/conf/broker.conf 添加 brokerIP1=127.0.0.1
  # 02:data/console/config/application.properties server.port=9009 - 如果8080端口被占用,可以修改或者添加映射端口
  rocketmq:
    image: livinphp/rocketmq:5.1.0
    container_name: rocketmq
    ports:
      - 9009:9009
      - 9876:9876
      - 10909:10909
      - 10911:10911
      - 10912:10912
    volumes:
      - ./data:/home/app/data
    environment:
      TZ: "Asia/Shanghai"
      NAMESRV_ADDR: "rocketmq:9876"
  • 在 IDEA 中打開(kāi) rocketmq-docker-compose-mac-amd-arm.yml 你會(huì)看到一個(gè)綠色的按鈕在左側(cè)側(cè)邊欄,點(diǎn)擊即可安裝?;蛘吣阋部梢允褂妹畎惭b:# /usr/local/bin/docker-compose -f /docs/dev-ops/environment/environment-docker-compose.yml up -d - 比較適合在云服務(wù)器上執(zhí)行。
  • 首次安裝可能使用不了,一個(gè)原因是 brokerIP1 未配置IP,另外一個(gè)是默認(rèn)的 8080 端口占用??梢园凑杖缦滦「蹈缯f(shuō)的方式修改。

2. 修改默認(rèn)配合

  1. 打開(kāi) data/rocketmq/conf/broker.conf 添加一條 brokerIP1=127.0.0.1 在結(jié)尾
# 集群名稱
brokerClusterName = DefaultCluster
# BROKER 名稱
brokerName = broker-a
# 0 表示 Master, > 0 表示 Slave
brokerId = 0
# 刪除文件時(shí)間點(diǎn),默認(rèn)凌晨 4 點(diǎn)
deleteWhen = 04
# 文件保留時(shí)間,默認(rèn) 48 小時(shí)
fileReservedTime = 48
# BROKER 角色 ASYNC_MASTER為異步主節(jié)點(diǎn),SYNC_MASTER為同步主節(jié)點(diǎn),SLAVE為從節(jié)點(diǎn)
brokerRole = ASYNC_MASTER
# 刷新數(shù)據(jù)到磁盤(pán)的方式,ASYNC_FLUSH 刷新
flushDiskType = ASYNC_FLUSH
# 存儲(chǔ)路徑
storePathRootDir = /home/app/data/rocketmq/store
# IP地址
brokerIP1 = 127.0.0.1
  1. 打開(kāi) ``data/console/config/application.properties修改server.port=9009` 端口。
server.address=0.0.0.0
server.port=9009
  • 修改配置后,重啟服務(wù)。

3. RockMQ登錄與配置

3.1 登錄

RocketMQ 此鏡像,會(huì)在安裝后在控制臺(tái)打印登錄賬號(hào)信息,你可以查看使用。

圖片圖片

圖片圖片

登錄:http://localhost:9009/

3.2 創(chuàng)建Topic

圖片圖片

  • 也可以使用命令創(chuàng)建:docker exec -it rocketmq sh /home/app/rocketmq/bin/mqadmin updateTopic -n localhost:9876 -c DefaultCluster -t xfg-mq

3.3 創(chuàng)建消費(fèi)者組

圖片圖片

  • 也可以使用命令創(chuàng)建:docker exec -it rocketmq sh /home/app/rocketmq/bin/mqadmin updateSubGroup -n localhost:9876 -c DefaultCluster -g xfg-group

四、工程實(shí)現(xiàn)

1. 工程結(jié)構(gòu)

圖片圖片

  • MQ 的使用無(wú)論是 RocketMQ 還是 Kafka 等,都很簡(jiǎn)單。但在使用之前,要考慮好怎么在架構(gòu)中合理的使用。如果最初沒(méi)有定義好這些,那么胡亂的任何地方都能發(fā)送和接收MQ,最后的工程將非常難以維護(hù)。
  • 所以這里整個(gè)MQ的生產(chǎn)和消費(fèi),是按照整個(gè) DDD 領(lǐng)域事件結(jié)構(gòu)進(jìn)行設(shè)計(jì)。分為在 domain 使用基礎(chǔ)層生產(chǎn)消息,再有 trigger 層接收消息。

2. 配置文件

引入POM

<!-- https://mvnrepository.com/artifact/org.apache.rocketmq/rocketmq-client-java -->
<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-client-java</artifactId>
    <version>5.0.4</version>
</dependency>
<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>2.2.0</version>
</dependency>

添加配置

# RocketMQ 配置
rocketmq:
  name-server: 127.0.0.1:9876
  consumer:
    group: xfg-group
    # 一次拉取消息最大值,注意是拉取消息的最大值而非消費(fèi)最大值
    pull-batch-size: 10
  producer:
    # 發(fā)送同一類消息的設(shè)置為同一個(gè)group,保證唯一
    group: xfg-group
    # 發(fā)送消息超時(shí)時(shí)間,默認(rèn)3000
    sendMessageTimeout: 10000
    # 發(fā)送消息失敗重試次數(shù),默認(rèn)2
    retryTimesWhenSendFailed: 2
    # 異步消息重試此處,默認(rèn)2
    retryTimesWhenSendAsyncFailed: 2
    # 消息最大長(zhǎng)度,默認(rèn)1024 * 1024 * 4(默認(rèn)4M)
    maxMessageSize: 4096
    # 壓縮消息閾值,默認(rèn)4k(1024 * 4)
    compressMessageBodyThreshold: 4096
    # 是否在內(nèi)部發(fā)送失敗時(shí)重試另一個(gè)broker,默認(rèn)false
    retryNextServer: false

3. 定義領(lǐng)域事件

源碼:cn.bugstack.xfg.dev.tech.domain.salary.event.SalaryAdjustEvent

圖片圖片

@EqualsAndHashCode(callSuper = true)
@Data
public class SalaryAdjustEvent extends BaseEvent<AdjustSalaryApplyOrderAggregate> {

    public static String TOPIC = "xfg-mq";

    public static SalaryAdjustEvent create(AdjustSalaryApplyOrderAggregate adjustSalaryApplyOrderAggregate) {
        SalaryAdjustEvent event = new SalaryAdjustEvent();
        event.setId(RandomStringUtils.randomNumeric(11));
        event.setTimestamp(new Date());
        event.setData(adjustSalaryApplyOrderAggregate);
        return event;
    }

}
  • 每個(gè)領(lǐng)域的消息,都有領(lǐng)域自己定義。發(fā)送的時(shí)候再交給基礎(chǔ)設(shè)施層來(lái)發(fā)送。

4. 消息發(fā)送

源碼:cn.bugstack.xfg.dev.tech.infrastructure.event.EventPublisher

圖片圖片

@Component
@Slf4j
public class EventPublisher {

    @Setter(onMethod_ = @Autowired)
    private RocketMQTemplate rocketmqTemplate;

    /**
     * 普通消息
     *
     * @param topic   主題
     * @param message 消息
     */
    public void publish(String topic, BaseEvent<?> message) {
        try {
            String mqMessage = JSON.toJSONString(message);
            log.info("發(fā)送MQ消息 topic:{} message:{}", topic, mqMessage);
            rocketmqTemplate.convertAndSend(topic, mqMessage);
        } catch (Exception e) {
            log.error("發(fā)送MQ消息失敗 topic:{} message:{}", topic, JSON.toJSONString(message), e);
            // 大部分MQ發(fā)送失敗后,會(huì)需要任務(wù)補(bǔ)償
        }
    }

    /**
     * 延遲消息
     *
     * @param topic          主題
     * @param message        消息
     * @param delayTimeLevel 延遲時(shí)長(zhǎng)
     */
    public void publishDelivery(String topic, BaseEvent<?> message, int delayTimeLevel) {
        try {
            String mqMessage = JSON.toJSONString(message);
            log.info("發(fā)送MQ延遲消息 topic:{} message:{}", topic, mqMessage);
            rocketmqTemplate.syncSend(topic, MessageBuilder.withPayload(message).build(), 1000, delayTimeLevel);
        } catch (Exception e) {
            log.error("發(fā)送MQ延遲消息失敗 topic:{} message:{}", topic, JSON.toJSONString(message), e);
            // 大部分MQ發(fā)送失敗后,會(huì)需要任務(wù)補(bǔ)償
        }
    }

}
  • 在基礎(chǔ)設(shè)施層提供 event 事件的處理,也就是 MQ 消息的發(fā)送。

源碼:cn.bugstack.xfg.dev.tech.infrastructure.repository.SalaryAdjustRepository

@Resource
private EventPublisher eventPublisher;
    
@Override
@Transactional(rollbackFor = Exception.class, timeout = 350, propagation = Propagation.REQUIRED, isolation = Isolation.DEFAULT)
public String adjustSalary(AdjustSalaryApplyOrderAggregate adjustSalaryApplyOrderAggregate) {
   
  // ... 省略部分代碼 

    eventPublisher.publish(SalaryAdjustEvent.TOPIC, SalaryAdjustEvent.create(adjustSalaryApplyOrderAggregate));
    return orderId;
}

在 SalaryAdjustRepository 倉(cāng)儲(chǔ)的實(shí)現(xiàn)中,做完業(yè)務(wù)流程開(kāi)始發(fā)送 MQ 消息。這里有2點(diǎn)要注意;

  1. 消息發(fā)送,不要寫(xiě)在數(shù)據(jù)庫(kù)事務(wù)中。因?yàn)槭聞?wù)一直占用數(shù)據(jù)庫(kù)連接,需要快速釋放。
  2. 對(duì)于一些強(qiáng)MQ要求的場(chǎng)景,需要在發(fā)送MQ前,寫(xiě)入一條數(shù)據(jù)庫(kù) Task 記錄,發(fā)送消息后更新 Task 狀態(tài)為成功。如果長(zhǎng)時(shí)間未更新數(shù)據(jù)庫(kù)狀態(tài)或者為失敗的,則需要由任務(wù)補(bǔ)償進(jìn)行處理。

5. 消費(fèi)消息

源碼:cn.bugstack.xfg.dev.tech.trigger.mq.SalaryAdjustMQListener

圖片圖片

@Component
@Slf4j
@RocketMQMessageListener(topic = "xfg-mq", consumerGroup = "xfg-group")
public class SalaryAdjustMQListener implements RocketMQListener<String> {

    @Override
    public void onMessage(String s) {
        log.info("接收到MQ消息 {}", s);
    }

}
  • 消費(fèi)消息,配置消費(fèi)者組合消費(fèi)的主題,之后就可以接收到消息了。接收以后你可以做自己的業(yè)務(wù),如果拋出異常,消息會(huì)進(jìn)行重新接收處理。

六、測(cè)試驗(yàn)證

1. 單獨(dú)發(fā)送消息測(cè)試

@Slf4j
@RunWith(SpringRunner.class)
@SpringBootTest
public class RocketMQTest {

    @Setter(onMethod_ = @Autowired)
    private RocketMQTemplate rocketmqTemplate;

    @Test
    public void test() throws InterruptedException {
        while (true) {
            rocketmqTemplate.convertAndSend("xfg-mq", "我是測(cè)試消息");
            Thread.sleep(3000);
        }
    }

}
  • 這里方便你來(lái)發(fā)送消息,驗(yàn)證流程。

2. 業(yè)務(wù)流程消息驗(yàn)證

@Test
public void test_execSalaryAdjust() throws InterruptedException {
    AdjustSalaryApplyOrderAggregate adjustSalaryApplyOrderAggregate = AdjustSalaryApplyOrderAggregate.builder()
            .employeeNumber("10000001")
            .orderId("100908977676003")
            .employeeEntity(EmployeeEntity.builder().employeeLevel(EmployeePostVO.T3).employeeTitle(EmployeePostVO.T3).build())
            .employeeSalaryAdjustEntity(EmployeeSalaryAdjustEntity.builder()
                    .adjustTotalAmount(new BigDecimal(100))
                    .adjustBaseAmount(new BigDecimal(80))
                    .adjustMeritAmount(new BigDecimal(20)).build())
            .build();
    String orderId = salaryAdjustApplyService.execSalaryAdjust(adjustSalaryApplyOrderAggregate);
    log.info("調(diào)薪測(cè)試 req: {} res: {}", JSON.toJSONString(adjustSalaryApplyOrderAggregate), orderId);
    Thread.sleep(Integer.MAX_VALUE);
}
23-07-29.15:40:52.307 [main            ] INFO  HikariDataSource       - HikariPool-1 - Start completed.
23-07-29.15:40:52.445 [main            ] INFO  EventPublisher         - 發(fā)送MQ消息 topic:xfg-mq message:{"data":{"employeeEntity":{"employeeLevel":"T3","employeeTitle":"T3"},"employeeNumber":"10000001","employeeSalaryAdjustEntity":{"adjustBaseAmount":80,"adjustMeritAmount":20,"adjustTotalAmount":100},"orderId":"100908977676004"},"id":"98117654515","timestamp":"2023-07-29 15:40:52.425"}
23-07-29.15:40:52.517 [main            ] INFO  ISalaryAdjustApplyServiceTest - 調(diào)薪測(cè)試 req: {"employeeEntity":{"employeeLevel":"T3","employeeTitle":"T3"},"employeeNumber":"10000001","employeeSalaryAdjustEntity":{"adjustBaseAmount":80,"adjustMeritAmount":20,"adjustTotalAmount":100},"orderId":"100908977676004"} res: 100908977676004
23-07-29.15:40:52.520 [ConsumeMessageThread_1] INFO  SalaryAdjustMQListener - 接收到MQ消息 {"data":{"employeeEntity":{"employeeLevel":"T3","employeeTitle":"T3"},"employeeNumber":"10000001","employeeSalaryAdjustEntity":{"adjustBaseAmount":80,"adjustMeritAmount":20,"adjustTotalAmount":100},"orderId":"100908977676004"},"id":"98117654515","timestamp":"2023-07-29 15:40:52.425"}
  • 當(dāng)執(zhí)行一次加薪調(diào)整后,就會(huì)接收到MQ消息了。
責(zé)任編輯:武曉燕 來(lái)源: bugstack蟲(chóng)洞棧
相關(guān)推薦

2023-11-24 07:16:10

DDD微服務(wù)

2020-09-07 06:38:54

HA高可用協(xié)議

2024-05-21 09:26:54

微服務(wù)DDD建模架構(gòu)

2019-01-18 16:39:08

系統(tǒng)層中間件層應(yīng)用層

2022-01-10 13:01:32

指針Struct內(nèi)存

2021-10-29 21:26:39

前端引擎層類型

2025-01-15 08:46:55

2025-02-05 09:46:13

OracleDBA投資

2023-02-15 13:50:58

DDD戰(zhàn)略設(shè)計(jì)

2022-01-11 20:43:16

TCPIP模型

2009-06-10 09:58:14

程序員職場(chǎng)層次

2021-10-26 16:20:34

比特幣區(qū)塊鏈加密貨幣

2025-01-16 10:38:31

2021-03-18 13:20:52

Linux MintLinuxLinux發(fā)行版

2011-04-19 13:53:41

三層架構(gòu)

2024-06-20 13:22:13

C++11C++模板

2010-11-10 10:39:19

2024-04-11 10:01:29

2023-08-06 23:31:36

架構(gòu)系統(tǒng)RPC

2010-08-02 11:04:25

Flex程序員
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)