自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

面試官:使用 RocketMQ 怎么進行灰度發(fā)布?

開發(fā) 架構
本文介紹了 RocketMQ 灰度消息的使用方法,場景比較簡單。對于全鏈路的復雜灰度場景,可以參考使用阿里的微服引擎 MSE。

大家好,我是君哥。

今天來聊一聊 RocketMQ 的灰度方案。

灰度發(fā)布是指在黑與白之間,平滑過渡的一種發(fā)布方式。在大流量的系統(tǒng)中,如果一次升級改造范圍比較大,或者影響內容不太確定,一般會采用切量的方式進行升級,這樣可以減少生產(chǎn)變更帶來的影響。

圖片

如上圖,對 ServiceA 這個服務進行升級,采用灰度發(fā)布,先升級 Server5,一周后如果沒有問題,升級 Server4 和 Server 3,再運行一周沒有問題,把剩下兩個節(jié)點都升級。

上面的案例是一個 RPC 的調用。但如果使用消息隊列該怎么做呢?使用消息隊列,并不能使用網(wǎng)關來進行流量轉發(fā)。這里需要分不同場景進行分析。

1、只升級消費者

這是最簡單的情況,比如只有消費者修改了消費邏輯,就是 RPC 調用的情況類似,我們只要把消費者進行灰度發(fā)布就可以。如下圖:

圖片

2、生產(chǎn)者也升級

下面是一個訂單的實體類,我們新加了一個屬性,訂單生成時間

public class Order {
private Long id;
private Long userId;
private Long productId;
private Integer count;
private BigDecimal payAmount;
/**訂單狀態(tài):0:創(chuàng)建中;1:已完結*/
private Integer status;
/**新加屬性,訂單生成時間*/
private String createTime;
}

消費端的改造是需要對 createTime 這個屬性進行處理。

(1)消費端過濾

在生產(chǎn)者的 Order 類中增加 createTime 屬性,如果我們直接使用 createTime 屬性來過濾,消費者并不能實現(xiàn)灰度,因為所有的消費者都可能會拉取到帶有 createTime 屬性的消息。

RocketMQ 中 Message 的定義如下:

public class Message implements Serializable {
private String topic;
private int flag;
private Map<String, String> properties;
private byte[] body;
private String transactionId;
}

可以在 properties 屬性中增加一個灰度標識,比如生產(chǎn)者發(fā)送消息的時候封裝如下:

Message msg = buildMessage(topic);
msg.putUserProperty("gray", "true");

注意:也可以在 SendMessageHook 這個鉤子函數(shù)中定義。通過這種方式可以在消費端新增加一個灰度 Consumer Group,用來對灰度消息則進行消費。如下圖:

圖片

對于灰度 Consumer Group 判斷到 gray 屬性是 true 時進行消費,而對于普通 Consumer Group,判斷到 gray 屬性不等于 true 時再進行消費。這里可以借助 RocketMQ 客戶端的 FilterMessageHook,代碼如下:

defaultMQPushConsumerImpl.registerFilterMessageHook(new FilterMessageHook() {
@Override
public String hookName(){
return "filterHook";
}

@Override
public void filterMessage(FilterMessageContext context){
List<MessageExt> messages = context.getMsgList();
context.setMsgList(messages.stream().filter(m -> StringUtils.equals(m.getProperty("gray"),"true"))
.collect(Collectors.toList()));
}
});

?不過這樣會有兩個問題,灰度和正常的兩個 Consumer Group 相當于是廣播組:

  1. 兩個組都要對所有的消息進行拉取,比如本來使用灰度發(fā)布計劃切 10% 的流量,但實際上全部流量都切過去了,只是根據(jù)屬性做了判斷。這讓消費端整體承擔了兩倍的壓力;
  2. 因為兩個消費者組都要去 Broker 拉取消息,Broker 的壓力也增加了一倍。

(2)Broker 過濾

使用 tag 過濾

如果一個 Consumer 不訂閱一個 Topic 中的全部消息,可以通過 Tag 來過濾。比如一個 Consumer 訂閱了 TopicA 這個 Topic 中的 Tag1 和 Tag2 這兩個 tag,那這個 Consumer 的訂閱關系如下圖:

圖片

SubscriptionData 這個對象封裝了 Topic、tag 以及所訂閱 tag 的 hashcode 集合。

Consumer 發(fā)送拉取消息請求時,會把訂閱關系傳給 Broker(Broker 解析成 SubscriptionData 對象),Broker 使用 consumequeue 獲取消息時,首先判斷判斷最后 8 個字節(jié)的 tag hashcode 是否在 SubscriptionData 的 codeSet 中,如果不在就跳過,如果存在把消息返回給 Consumer。如下圖:

圖片

這樣可以在灰度 Producer 發(fā)送消息時加上 Tag,如下代碼:

Message msg = new Message();
msg.setBody("Test");
msg.setTopic("Topic");
msg.setTags("Gray");

而在灰度消費者訂閱 Gray 這個 tag。這樣就避免了 2.1 節(jié)中消息全量拉取的問題。

使用 SQL92 過濾

使用 SQL92 過濾,可以應對更加復雜的場景,不僅可以過濾 Tag,還可以過濾 UserProperty。

比如下面是一個生產(chǎn)者的代碼:

Message msg = new Message();
msg.setTopic("testTopic");
msg.setTags("tag1");
msg.putUserProperty("gray","true");

這樣消費者初始化的時候,可以定義使用 SQL92 過濾,代碼如下:

consumer.subscribe("testTopic",
MessageSelector.bySql("(TAGS is not null and TAGS in TAGS='''''tag1''''')" +
"and (gray is not null gray='true')"));

下面是 bySql 的源代碼:

public static MessageSelector bySql(String sql){
return new MessageSelector(ExpressionType.SQL92, sql);
}

3、總結

本文介紹了 RocketMQ 灰度消息的使用方法,場景比較簡單。對于全鏈路的復雜灰度場景,可以參考使用阿里的微服引擎 MSE。

責任編輯:姜華 來源: 君哥聊技術
相關推薦

2024-03-06 15:38:06

Spring微服務架構擴展組件

2025-04-14 11:41:12

RocketMQ長輪詢配置

2023-11-21 09:35:49

全量部署微服務

2024-07-23 08:21:19

2022-05-23 08:43:02

BigIntJavaScript內置對象

2015-08-13 10:29:12

面試面試官

2024-10-24 09:22:30

2021-08-02 17:21:08

設計模式訂閱

2021-05-19 06:07:21

CSS 斜線效果技巧

2022-05-16 11:04:43

RocketMQPUSH 模式PULL 模式

2023-02-16 08:10:40

死鎖線程

2021-10-27 10:27:36

微信小程序流程

2024-01-02 07:37:52

FlaggerKubernetesIstio

2023-12-26 09:34:47

系統(tǒng)MongoDB存儲

2024-07-22 14:09:22

@AsyncJava

2022-10-10 12:31:37

服務器性能

2021-04-12 21:34:29

Redis故障數(shù)據(jù)

2023-02-08 07:04:20

死鎖面試官單元

2024-03-18 14:06:00

停機Spring服務器

2021-11-02 09:05:25

Redis
點贊
收藏

51CTO技術棧公眾號