作者|張健
1. 背景
2022年春節(jié)活動(dòng)在8款字節(jié)系 APP 上線(xiàn),包含了紅包雨、集年味卡和煙火大會(huì)等諸多玩法。紅包雨、集卡開(kāi)獎(jiǎng)和煙火大會(huì)都存在高峰值突發(fā)流量。其中,紅包雨活動(dòng)會(huì)在10分鐘內(nèi)給幾千萬(wàn)甚至上億用戶(hù)發(fā)放上億現(xiàn)金獎(jiǎng)勵(lì),且大多數(shù)請(qǐng)求集中在前3分鐘。在項(xiàng)目啟動(dòng)時(shí),紅包雨活動(dòng)作為最大的流量來(lái)源,預(yù)估的發(fā)紅包峰值流量有180萬(wàn) QPS 。
為了保證用戶(hù)體驗(yàn)、活動(dòng)效果和資金安全,紅包雨系統(tǒng)需要保證超高的穩(wěn)定性。在系統(tǒng)設(shè)計(jì)上不能強(qiáng)依賴(lài)任何外部系統(tǒng),在極端情況下僅需要紅包雨服務(wù)可用,用戶(hù)請(qǐng)求即可正常處理并返回結(jié)果。獎(jiǎng)勵(lì)系統(tǒng)作為紅包系統(tǒng)的下游服務(wù),負(fù)責(zé)用戶(hù)獎(jiǎng)勵(lì)的入賬,需要承載最高180萬(wàn) QPS 的獎(jiǎng)勵(lì)發(fā)放請(qǐng)求,并且在出現(xiàn)異常情況時(shí)保證用戶(hù)體驗(yàn)無(wú)損,獎(jiǎng)勵(lì)可以最終入賬,做到不超發(fā)不少發(fā)。
2. 技術(shù)挑戰(zhàn)
2.1 峰值流量高
除夕當(dāng)天會(huì)進(jìn)行7場(chǎng)紅包雨,從12:00起每小時(shí)進(jìn)行一場(chǎng),集卡開(kāi)獎(jiǎng)和煙火大會(huì)于19:30開(kāi)始。當(dāng)晚20:00前后,紅包雨、集卡開(kāi)獎(jiǎng)和煙火大會(huì)的發(fā)獎(jiǎng)流量將會(huì)疊加在一起,屆時(shí)可能產(chǎn)生超過(guò)200萬(wàn) QPS 的發(fā)獎(jiǎng)流量。下游資產(chǎn)中臺(tái)服務(wù)僅提供30萬(wàn) QPS 的現(xiàn)金紅包、40萬(wàn) QPS 的優(yōu)惠券入賬能力。獎(jiǎng)勵(lì)系統(tǒng)需要削峰限流,異步入賬獎(jiǎng)勵(lì),確保下游服務(wù)不過(guò)載。
2.2 獎(jiǎng)勵(lì)種類(lèi)多
除現(xiàn)金紅包外,在集卡和煙火大會(huì)場(chǎng)景會(huì)發(fā)放10多種優(yōu)惠券、實(shí)物獎(jiǎng)勵(lì)、頭像掛件等。不同的優(yōu)惠券由不同的下游系統(tǒng)發(fā)放,且每個(gè)系統(tǒng)的吞吐能力不同,甚至部分系統(tǒng)只能提供2000 TPS 的處理能力。獎(jiǎng)勵(lì)系統(tǒng)在進(jìn)行削峰限流時(shí),不同獎(jiǎng)勵(lì)種類(lèi)限流的閾值需要根據(jù)下游系統(tǒng)吞吐能力進(jìn)行個(gè)性化配置。下游系統(tǒng)能力有限的情況下,需要保證現(xiàn)金優(yōu)先入賬。
2.3 系統(tǒng)高可靠
引入消息隊(duì)列進(jìn)行獎(jiǎng)勵(lì)異步發(fā)放后,需要盡可能保證獎(jiǎng)勵(lì)事件的可靠投遞和可靠消費(fèi),任何獎(jiǎng)勵(lì)最終都要入賬,還需兼顧消息隊(duì)列集群的穩(wěn)定和容災(zāi)。
在內(nèi)部服務(wù)出災(zāi)的情況下,或獎(jiǎng)勵(lì)事件在消息隊(duì)列中堆積時(shí),需要做到用戶(hù)無(wú)感知,用戶(hù)在活動(dòng)錢(qián)包頁(yè)可見(jiàn)獎(jiǎng)勵(lì)流水,隨時(shí)可以正常提現(xiàn)。除通過(guò)消費(fèi)獎(jiǎng)勵(lì)事件入賬外,還需引入用戶(hù)提現(xiàn)行為觸發(fā)強(qiáng)制入賬的能力,與此同時(shí)還要保證安全可靠,不能被黑產(chǎn)攻擊造成資金損失。
3. 技術(shù)方案
基于春節(jié)活動(dòng)峰值流量高、穩(wěn)定性要求高的特點(diǎn),為了保證高峰值流量下獎(jiǎng)勵(lì)系統(tǒng)穩(wěn)定可靠,技術(shù)方案選型時(shí)選擇了基于消息隊(duì)列削峰、異步處理請(qǐng)求的總體方案。獎(jiǎng)勵(lì)發(fā)放的大概流程如下:
在獎(jiǎng)勵(lì)事件生產(chǎn)側(cè),為了盡可能降低上游接入方的開(kāi)發(fā)成本,基于不同接入場(chǎng)景特性,由獎(jiǎng)勵(lì)系統(tǒng)提供獎(jiǎng)勵(lì) SDK ,并定義簡(jiǎn)單清晰的發(fā)獎(jiǎng)接口,供接入方選用。獎(jiǎng)勵(lì)事件的可靠投遞由 SDK 內(nèi)部保證。獎(jiǎng)勵(lì)事件 MQ 使用了公司內(nèi) ByteMQ 和 RocketMQ 兩種消息隊(duì)列,防止因單個(gè)消息隊(duì)列集群宕機(jī)導(dǎo)致整個(gè)系統(tǒng)不可用。
在獎(jiǎng)勵(lì)事件消費(fèi)側(cè),針對(duì)每一個(gè) Topic 創(chuàng)建一個(gè)消費(fèi)者服務(wù),四個(gè)消費(fèi)者功能完全一致。由消費(fèi)者服務(wù)保證消息可靠消費(fèi)和消費(fèi)限速。
除激勵(lì)金幣外,其他獎(jiǎng)勵(lì)類(lèi)型通過(guò)資產(chǎn)中臺(tái)服務(wù)調(diào)用各個(gè)下游發(fā)放。春節(jié)活動(dòng)期間,資產(chǎn)中臺(tái)暫未支持發(fā)獎(jiǎng)?wù)埱蟮南鞣?,需要在?jiǎng)勵(lì)系統(tǒng)前置進(jìn)行。業(yè)務(wù)上,同一訂單號(hào)只能發(fā)放一種獎(jiǎng)勵(lì)一次,由于資產(chǎn)中臺(tái)和激勵(lì)中臺(tái)系統(tǒng)之間數(shù)據(jù)隔離,需要獎(jiǎng)勵(lì)系統(tǒng)支持單一訂單號(hào)跨服務(wù)發(fā)放冪等。
3.1 獎(jiǎng)勵(lì)SDK設(shè)計(jì)
SDK 以代碼“內(nèi)嵌”的方式運(yùn)行在接入方服務(wù)內(nèi),可以避免 RPC 方式網(wǎng)絡(luò)傳輸、請(qǐng)求數(shù)據(jù)序列化和返回?cái)?shù)據(jù)反序列化帶來(lái)的時(shí)延和性能消耗。盡管 SDK 的整體時(shí)延和性能優(yōu)于 RPC 方式,對(duì) SDK 本身的穩(wěn)定性、性能消耗和接口響應(yīng)時(shí)延依然有非常高的要求。以紅包雨場(chǎng)景為例,發(fā)獎(jiǎng)接口需要50ms內(nèi)返回,若響應(yīng)時(shí)間超過(guò)50ms將會(huì)增加整個(gè)活動(dòng)玩法接口的處理時(shí)間,影響紅包雨服務(wù)的吞吐量,最終會(huì)影響用戶(hù)參與春節(jié)活動(dòng)的體驗(yàn)。
獎(jiǎng)勵(lì) SDK 在功能上實(shí)現(xiàn)了獎(jiǎng)勵(lì)Token 的生成和存儲(chǔ)和獎(jiǎng)勵(lì)事件的可靠投遞。 接口設(shè)計(jì)上面向不同接入場(chǎng)景針對(duì)性地提供定制接口,最大限度的降低使用方的理解和接入成本,減少開(kāi)發(fā)周期。
為了保證 SDK 代碼結(jié)構(gòu)清晰,并具有較高的拓展性和可維護(hù)性,在代碼結(jié)構(gòu)層面,SDK 內(nèi)部使用了分層設(shè)計(jì),分為了對(duì)外接口層、內(nèi)部接口層和內(nèi)部實(shí)現(xiàn)層。
3.1.1 對(duì)外接口層
對(duì)外接口層定義了暴露給使用者的外部接口,除初始化、反初始化等接口和通用的異步發(fā)獎(jiǎng)接口外,還為紅包雨、煙火大會(huì)和集卡分別提供差異化定制接口。通用異步發(fā)獎(jiǎng)接口定義和獎(jiǎng)勵(lì) RPC 服務(wù)的異步發(fā)獎(jiǎng)接口保持一致,通過(guò)調(diào)用 RPC 接口和通過(guò) SDK 發(fā)獎(jiǎng)的接入方可以低成本的雙向遷移。
定制接口結(jié)合使用場(chǎng)景的特點(diǎn),固化諸如活動(dòng) ID、場(chǎng)景 ID、獎(jiǎng)勵(lì)類(lèi)型等通用參數(shù),減少接口入?yún)€(gè)數(shù),函數(shù)名稱(chēng)語(yǔ)義更清晰,可進(jìn)一步降低接入方的使用成本,提升接入方代碼的可讀性和可維護(hù)性。對(duì)于部分場(chǎng)景,還承擔(dān)了全局冪等 ID的拼接工作。
發(fā)獎(jiǎng)?wù)埱蟪脩?hù)信息(用戶(hù) ID、設(shè)備 ID 和 AppID )、獎(jiǎng)勵(lì)信息(獎(jiǎng)勵(lì)類(lèi)型、數(shù)值)外,還需攜帶一個(gè)全局唯一 ID 作為訂單號(hào),以實(shí)現(xiàn)根據(jù)訂單號(hào)冪等的能力。訂單號(hào)由接入方根據(jù)活動(dòng)信息和用戶(hù)信息拼接而成。所有的接口都支持調(diào)用方寫(xiě)入拓展字段(Map 格式的鍵值對(duì))保存業(yè)務(wù)自定義信息。
3.1.2 內(nèi)部接口層
內(nèi)部接口層提供了通用的獎(jiǎng)勵(lì)異步發(fā)放接口(SendBonus)、Token 生成和存儲(chǔ)接口(GenBonusToken)、初始化接口和反初始化接口。外部接口基于內(nèi)部接口進(jìn)行差異化封裝,提供更細(xì)化的功能。內(nèi)部接口層對(duì)上層屏蔽內(nèi)部實(shí)現(xiàn)細(xì)節(jié)。
以異步發(fā)放接口 SendBonus 函數(shù)為例,主要集成了參數(shù)檢查、打點(diǎn)監(jiān)控、虛擬隊(duì)列(Queue)選擇、獎(jiǎng)勵(lì)消息的構(gòu)造和發(fā)送、獎(jiǎng)勵(lì) Token 的生成和存儲(chǔ)等功能。參數(shù)校驗(yàn)通過(guò)后,SendBonus 接口即返回獎(jiǎng)勵(lì) Token,供上層調(diào)用者使用(一般是返回給前端和客戶(hù)端)。
/*
SendBonus
@act 活動(dòng)信息
@user 用戶(hù)信息
@bonus 獎(jiǎng)勵(lì)信息
*/
func SendBonus(ctx context.Context, act Activity, user User, bonus *BonusContent) (string, error) {
// 參數(shù)檢查
if err := CheckParams(act, user); err != nil {
// 輸出錯(cuò)誤日志,監(jiān)控異常請(qǐng)求
return "", err
}
// 檢查獎(jiǎng)勵(lì)類(lèi)型是否合法
cfg, err := CheckBonus(bonus)
if err != nil {
// 輸出錯(cuò)誤日志,監(jiān)控異常請(qǐng)求
return "", err
}
// 構(gòu)造獎(jiǎng)勵(lì)消息
message := &event.BonusEvent{...}
// SendEvent內(nèi)部根據(jù)獎(jiǎng)勵(lì)屬性選擇隊(duì)列
if err = queue.SendEvent(ctx, message); err != nil {
return GenBonusToken(ctx, act, user, info, true), err
}
// 構(gòu)造并返回獎(jiǎng)勵(lì)Token
return GenBonusToken(ctx, act, user, info, true), nil
}
3.1.3 內(nèi)部實(shí)現(xiàn)層
內(nèi)部實(shí)現(xiàn)層主要包含獎(jiǎng)勵(lì) Token 和虛擬隊(duì)列 Queue 兩大模塊。Token 模塊負(fù)責(zé) Token 的生成、存儲(chǔ)和查詢(xún);Queue 模塊負(fù)責(zé)實(shí)現(xiàn)消息的可靠投遞。
A. Token 模塊
在整個(gè)活動(dòng)系統(tǒng)內(nèi)部,獎(jiǎng)勵(lì)系統(tǒng)通過(guò)消費(fèi)獎(jiǎng)勵(lì)事件(異步消息)進(jìn)行真實(shí)的獎(jiǎng)勵(lì)發(fā)放。在獎(jiǎng)勵(lì)系統(tǒng)內(nèi)部出災(zāi)或獎(jiǎng)勵(lì)實(shí)際入賬存在壓?jiǎn)蔚那闆r下,引入 Token 機(jī)制來(lái)保證用戶(hù)體驗(yàn)無(wú)損、保證用戶(hù)在活動(dòng)頁(yè)面可見(jiàn)獎(jiǎng)勵(lì)流水、保證用戶(hù)使用獎(jiǎng)勵(lì)時(shí)可操作(現(xiàn)金可提現(xiàn)、優(yōu)惠券可使用等)。Token 作為用戶(hù)獲得獎(jiǎng)勵(lì)的憑據(jù)而存在,和獎(jiǎng)勵(lì)事件一一對(duì)應(yīng)。Token 的產(chǎn)生和流轉(zhuǎn)過(guò)程如下圖所示:
Token 數(shù)據(jù)結(jié)構(gòu)和加解密
Token 內(nèi)部數(shù)據(jù)結(jié)構(gòu)使用 Protobuf 定義,相對(duì)于 JSON 方式序列化和反序列化性能均有提升、序列化后的數(shù)據(jù)大小減小了50%。Token 數(shù)據(jù)會(huì)返回給客戶(hù)端并保存在本地,為防止黑產(chǎn)解析 Token 構(gòu)造數(shù)據(jù)惡意請(qǐng)求服務(wù)端接口,需要對(duì)Token 數(shù)據(jù)進(jìn)行加密。Token 對(duì)象使用 Protobuf 進(jìn)行序列化后的明文使用公司內(nèi)的 KMS 工具進(jìn)行加密。加密后的密文使用 Base64 算法進(jìn)行編碼,以便在網(wǎng)絡(luò)傳輸和客戶(hù)端本地存儲(chǔ)。解密時(shí)先進(jìn)行 Base64 解碼,再使用 KMS 工具進(jìn)行解密,拿到的明文使用 Brotobuf 進(jìn)行反序列化后即可得到 Token 對(duì)象。
Token 數(shù)據(jù)內(nèi)容如下所示:
syntax = "proto3";
message BonusToken {
string TradeNo = 1; // 訂單號(hào),全局唯一,用于冪等
int64 UserID = 2; // 發(fā)獎(jiǎng)當(dāng)時(shí)的APP內(nèi)的UID
string Activity = 3; // 活動(dòng)
string Scene = 4; // 場(chǎng)景
int64 AwardType = 5; // 獎(jiǎng)勵(lì)類(lèi)型
int32 AwardCount = 6; // 獎(jiǎng)勵(lì)數(shù)值
int64 AwardTime = 7; // 獎(jiǎng)勵(lì)發(fā)放時(shí)間戳
string Desc = 8; // 獎(jiǎng)勵(lì)文案
}
Token 存儲(chǔ)
Token 存儲(chǔ)是典型的寫(xiě)多讀少場(chǎng)景,底層存儲(chǔ)需要直接承載發(fā)獎(jiǎng)的峰值流量(預(yù)估350萬(wàn) QPS ,部分場(chǎng)景一次請(qǐng)求會(huì)發(fā)放多個(gè)獎(jiǎng)勵(lì)),用戶(hù)進(jìn)入錢(qián)包頁(yè)面才會(huì)讀取存儲(chǔ)(預(yù)估40萬(wàn)QPS),讀寫(xiě)請(qǐng)求量級(jí)相差較多。數(shù)據(jù)的有效期較短,獎(jiǎng)勵(lì)真正入賬后即可刪除。寫(xiě)入場(chǎng)景均為插入單個(gè) Token,讀取場(chǎng)景均為讀 Token 列表。
Token 主要由紅包雨、集卡開(kāi)獎(jiǎng)和煙火大會(huì)發(fā)獎(jiǎng)產(chǎn)生,其中紅包雨和集卡開(kāi)獎(jiǎng)的獎(jiǎng)勵(lì)數(shù)量有明確的數(shù)量上限。在煙火大會(huì)玩法中,用戶(hù)最快每30秒即可領(lǐng)取一次獎(jiǎng)勵(lì),對(duì)用戶(hù)領(lǐng)獎(jiǎng)次數(shù)沒(méi)有限制,理論上單個(gè)用戶(hù)在整個(gè)煙火大會(huì)活動(dòng)可以產(chǎn)生500個(gè) Token。
基于預(yù)估的線(xiàn)上流量、讀寫(xiě)模型和活動(dòng)特點(diǎn),決定使用 Redis 作為底層存儲(chǔ),數(shù)據(jù)結(jié)構(gòu)使用 Hash,用戶(hù)的 ActID 作為 Hash 數(shù)據(jù)的 Key、Token 的訂單號(hào) TradeNo 作為 Hash 的 Field、Token 序列化后的明文作為 Hash 的 Value。
Token 服務(wù)
Token 服務(wù)提供了查詢(xún)用戶(hù) Token 列表和加密 Token 合法性校驗(yàn)接口。根據(jù)Token 密文是否可以正常解密、解密后的 Token 是否存在于 Redis 中,Token 合法性校驗(yàn)接口返回三種結(jié)果:
- 非法 Token:密文無(wú)法解密
- 未知 Token:密文可解密,但存儲(chǔ)無(wú)記錄
- 合法 Token:密文可解密,且存儲(chǔ)有記錄
獎(jiǎng)勵(lì) SDK 在寫(xiě) Token 的 Redis 時(shí)不會(huì)進(jìn)行失敗重試,存在極少數(shù) Token 沒(méi)有保存成功的情況。為了保證資金安全、防止黑產(chǎn)惡意攻擊,可解密的未知 Token 不能用作強(qiáng)制入賬。
Token 使用
用戶(hù)參與活動(dòng)獲得獎(jiǎng)勵(lì)后,Token 由活動(dòng)前端調(diào)用客戶(hù)端 JSB 進(jìn)行保存。用戶(hù)查看獎(jiǎng)勵(lì)流水時(shí),活動(dòng)錢(qián)包頁(yè)前端會(huì)通過(guò) JSB 讀取本地 Token 列表,在請(qǐng)求資產(chǎn)中臺(tái)服務(wù)時(shí)攜帶。資產(chǎn)中臺(tái)服務(wù)使用 TokenSDK 進(jìn)行解密,同時(shí)會(huì)請(qǐng)求 Token 服務(wù)讀取服務(wù)端 Token 列表,并進(jìn)行合并操作。資產(chǎn)中臺(tái)還會(huì)在合并后的列表中刪除已經(jīng)入賬的 Token,在返回給用戶(hù)的流水里插入暫未入賬的流水并修正活動(dòng)錢(qián)包余額,保證用戶(hù)獎(jiǎng)勵(lì)及時(shí)可見(jiàn)。
用戶(hù)在活動(dòng)錢(qián)包頁(yè)進(jìn)行提現(xiàn)時(shí),也會(huì)將客戶(hù)端本地 Token 帶給資產(chǎn)中臺(tái)服務(wù)。資產(chǎn)中臺(tái)服務(wù)對(duì)未入賬的合法 Token 進(jìn)行強(qiáng)制入賬,保證用戶(hù)可以完成提現(xiàn)操作。
客戶(hù)端和服務(wù)端 Token 的作用
當(dāng)獎(jiǎng)勵(lì)系統(tǒng)依賴(lài)的消息隊(duì)列出災(zāi)導(dǎo)致無(wú)法寫(xiě)入或消費(fèi)時(shí)、或由于削峰限流導(dǎo)致獎(jiǎng)勵(lì)真實(shí)入賬存在延遲時(shí),兩種 Token 都可以在一定程度上保證用戶(hù)體驗(yàn)無(wú)損。
客戶(hù)端 Token 通過(guò)用戶(hù)設(shè)備和后臺(tái)服務(wù)之間的網(wǎng)絡(luò)傳遞,保存于用戶(hù)設(shè)備存儲(chǔ)。服務(wù)端 Token 通過(guò)內(nèi)部網(wǎng)絡(luò)傳遞,保存于中心化的 Redis 存儲(chǔ)。兩種 Token 互為備份,在本地 Token 不可取時(shí),可以依賴(lài)服務(wù)端 Token。服務(wù)端 Token 服務(wù)出災(zāi)時(shí),客戶(hù)端 Token 仍然可以保證用戶(hù)體驗(yàn)。
本次活動(dòng)在字節(jié)系8個(gè) APP 同時(shí)上線(xiàn),Token 服務(wù)還可以保證用戶(hù)在不同 APP 上,甚至不同的設(shè)備上的體驗(yàn)一致。
B. Queue 模塊
Queue 模塊負(fù)責(zé)提供 “可靠” 的消息投遞服務(wù)。對(duì)外暴露的 SendEvent 函數(shù)能夠根據(jù)獎(jiǎng)勵(lì)選用對(duì)應(yīng)的虛擬隊(duì)列進(jìn)行消息發(fā)送、并提供統(tǒng)一的監(jiān)控能力。
func SendEvent(ctx context.Context, msg *BonusEvent) error {
// 根據(jù)獎(jiǎng)勵(lì)信息選擇專(zhuān)用的虛擬隊(duì)列
queue := GetQueue(msg.Activity, msg.Scene, msg.BonusType)
data, err := proto.Marshal(message)
if err != nil {
return err
}
return queue.Send(ctx, message.UserID, message.UniqueID, data)
}
虛擬隊(duì)列(Queue)是對(duì)公司內(nèi) ByteMQ 和 RocketMQ 的封裝,內(nèi)部通過(guò)代碼封裝屏蔽了兩種消息隊(duì)列 Producer-SDK 的使用細(xì)節(jié),并支持使用兩種 MQ 進(jìn)行互備,提升整個(gè)系統(tǒng)的容災(zāi)能力。虛擬隊(duì)列的類(lèi)圖如下所示:
虛擬隊(duì)列的 Send 方法可根據(jù)用戶(hù) ID 動(dòng)態(tài)的調(diào)整主備生產(chǎn)者的使用比例,在單個(gè)生產(chǎn)者失敗的情況下提供自動(dòng)容災(zāi)能力。
func (q *Queue) Send(ctx context.Context, uid int64, tradeNo string, data []byte) error {
var err error
if (uid % 100) < GetQueueRatio(q.Name()) {
err = q.Master.Send(ctx, tradeNo, data)
if err != nil {
err = q.Backup.Send(ctx, tradeNo, data)
}
} else {
err = q.Backup.Send(ctx, tradeNo, data)
if err != nil {
err = q.Master.Send(ctx, tradeNo, data)
}
}
return err
}
使用 RocketMQ 或 ByteMQ 的 SDK 異步批量發(fā)送功能時(shí),由 Producer 屏蔽兩個(gè) SDK 失敗回調(diào)的差異,統(tǒng)一使用失敗消息通道返回給上層。虛擬隊(duì)列的 Retry 邏輯負(fù)責(zé)讀取主備 Producer 的失敗消息,并采取主備輪轉(zhuǎn)的方式進(jìn)行發(fā)送重試。在服務(wù)進(jìn)程無(wú)異常退出的情況下,可保證消息最終發(fā)送成功。進(jìn)程正常退出時(shí),Close 方法會(huì)等待所有消息處理完成再返回。
消息隊(duì)列 Topic可配置
虛擬隊(duì)列內(nèi)部使用了 Master 和 Backup 兩個(gè)消息隊(duì)列,通過(guò)代碼抽象和底層消息隊(duì)列類(lèi)型做了解耦。在真實(shí)線(xiàn)上環(huán)境,為了達(dá)到災(zāi)備的目的,單個(gè)虛擬隊(duì)列的 Master 和 Backup 需要使用不同類(lèi)型或者不同物理集群的消息隊(duì)列 Topic。
在春節(jié)活動(dòng)期間,ByteMQ 和 RocketMQ 的研發(fā)和運(yùn)維團(tuán)隊(duì)分別提供了一個(gè)活動(dòng)專(zhuān)用集群,并做重點(diǎn)運(yùn)維保障。獎(jiǎng)勵(lì)系統(tǒng)在 ByteMQ 和 RocketMQ 的活動(dòng)集群申請(qǐng)各申請(qǐng)了兩個(gè) Topic。基于4個(gè) Topic,在上層構(gòu)建了3個(gè)虛擬隊(duì)列。
Topic 的 Producer 實(shí)例可以在不同的 Queue 中復(fù)用。上圖中,ByteMQ 的生產(chǎn)者 S 在 Special Queue 中作為 Master,在 Express Queue 中作為 Backup;RocketMQ 的生產(chǎn)者 B 同時(shí)在 Massive 和 Special Queue 中作為 Backup。
獎(jiǎng)勵(lì) SDK 內(nèi)部使用的消息隊(duì)列 Topic 配置在了動(dòng)態(tài)配置 TCC 中,虛擬隊(duì)列和 Producer 實(shí)例之間的映射關(guān)系也可通過(guò) TCC 配置。做到了代碼和消息隊(duì)列集群、Topic 解耦。開(kāi)發(fā)測(cè)試、線(xiàn)上運(yùn)行階段可以非常方便的更換消息隊(duì)列Topic。
獎(jiǎng)勵(lì)對(duì)應(yīng)的虛擬隊(duì)列可配置
獎(jiǎng)勵(lì)類(lèi)型和虛擬隊(duì)列的對(duì)應(yīng)關(guān)系配置在 TCC 中,不同的獎(jiǎng)勵(lì)類(lèi)型可以動(dòng)態(tài)的指定發(fā)送的虛擬隊(duì)列,沒(méi)有配置時(shí)默認(rèn)使用 Massive 虛擬隊(duì)列。在 SendEvent 方法中,調(diào)用 GetQueue 發(fā)放選用虛擬隊(duì)列。春節(jié)活動(dòng)期間,Massive 虛擬隊(duì)列承載所有場(chǎng)景發(fā)放的現(xiàn)金獎(jiǎng)勵(lì);Special 虛擬隊(duì)列承載了所有場(chǎng)景發(fā)放的優(yōu)惠券;Express 虛擬隊(duì)列承載了所有場(chǎng)景下的激勵(lì)金幣獎(jiǎng)勵(lì)。
消息異步批量發(fā)送
ByteMQ 和 RocketMQ 的生產(chǎn)者 SDK 均支持同步發(fā)送和異步批量發(fā)送消息。RocketMQ 同步發(fā)送時(shí)延 P99為20 ms,而 ByteMQ 同步發(fā)送時(shí)延 P99為秒級(jí)。在發(fā)送同等數(shù)量級(jí)的消息時(shí),RocketMQ 的 CPU 占用明顯高于 ByteMQ。在異步發(fā)送模式下,消息隊(duì)列的生產(chǎn)者 SDK 會(huì)啟動(dòng)協(xié)程定時(shí)或當(dāng)緩沖區(qū)內(nèi)的消息達(dá)到閾值時(shí)發(fā)送。定時(shí)的時(shí)間間隔和緩沖區(qū)閾值可以在初始化時(shí)配置。批量發(fā)送可以降低生產(chǎn)者對(duì)消息隊(duì)列服務(wù)的請(qǐng)求次數(shù),假設(shè)每100個(gè)消息批量發(fā)送一次,最高可以將消息隊(duì)列服務(wù)的 QPS 降低100倍,極大的減輕消息隊(duì)列集群的負(fù)載。
為了降低獎(jiǎng)勵(lì)事件發(fā)送接口的響應(yīng)時(shí)延,以及保持消息隊(duì)列集群負(fù)載低水位,在大流量發(fā)獎(jiǎng)場(chǎng)景均使用異步批量發(fā)送模式,并配置 ByteMQ 承載主要的流量。
3.2 消費(fèi)者設(shè)計(jì)
消息隊(duì)列的削峰功能,基于控制消費(fèi)者的消費(fèi)速度實(shí)現(xiàn)。RocketMQ 消費(fèi)方式基于長(zhǎng)輪訓(xùn)方式實(shí)現(xiàn),兼具了推拉兩種模式的優(yōu)點(diǎn)。ByteMQ 消費(fèi)方式為拉模式。消費(fèi)者實(shí)例可通過(guò)控制拉消息的頻率和單次拉取消息的數(shù)量來(lái)控制消費(fèi)速度。
在春節(jié)活動(dòng)獎(jiǎng)勵(lì)發(fā)放場(chǎng)景,不僅需要?jiǎng)討B(tài)的調(diào)整多個(gè)消息隊(duì)列的總消費(fèi)速度,保證下游獎(jiǎng)勵(lì)服務(wù)、資產(chǎn)中臺(tái)服務(wù)、激勵(lì)中臺(tái)服務(wù)不過(guò)載,且充分利用機(jī)器資源;還需要?jiǎng)討B(tài)的控制不同獎(jiǎng)勵(lì)類(lèi)型的消費(fèi)速度,支持現(xiàn)金等重要獎(jiǎng)勵(lì)優(yōu)先入賬。
活動(dòng)中發(fā)放的獎(jiǎng)勵(lì)類(lèi)型較多,不能為每種獎(jiǎng)勵(lì)單獨(dú)分配消息隊(duì)列 Topic。不同獎(jiǎng)勵(lì)類(lèi)型發(fā)放的數(shù)量差異顯著,發(fā)放量級(jí)大和入賬優(yōu)先級(jí)高的獎(jiǎng)勵(lì)獨(dú)占 Topic,發(fā)放量級(jí)小和入賬優(yōu)先級(jí)低的獎(jiǎng)勵(lì)共用一個(gè) Topic。不同獎(jiǎng)勵(lì)類(lèi)型的真實(shí)入賬服務(wù)(資產(chǎn)中臺(tái)服務(wù)的下游服務(wù))入賬能力不同,入賬能力最小的服務(wù)每秒僅能處理2000的發(fā)放請(qǐng)求。需要支持獎(jiǎng)勵(lì)類(lèi)型維度的靈活消費(fèi)控速能力。
在多維度的控速基礎(chǔ)上,還需要提供可靠消費(fèi)的能力,每個(gè)獎(jiǎng)勵(lì)消息至少成功處理一次(At least Once),所有獎(jiǎng)勵(lì)最終成功入賬。
基于上述背景,獎(jiǎng)勵(lì)消費(fèi)者服務(wù)消息拉取速度(從 Topic 讀取消息)和消息處理速度(通過(guò)獎(jiǎng)勵(lì)類(lèi)型限速,調(diào)用獎(jiǎng)勵(lì)系統(tǒng)發(fā)放獎(jiǎng)勵(lì))可能存在差異。當(dāng)拉取速度小于處理速度時(shí),獎(jiǎng)勵(lì)服務(wù)吞吐量下降,消息在 Broker 中堆積時(shí)間變長(zhǎng);當(dāng)拉取速度大于處理速度時(shí),不能通過(guò)獎(jiǎng)勵(lì)類(lèi)型限速的消息會(huì)堆積在消費(fèi)者服務(wù)進(jìn)程內(nèi)存中,并阻塞消費(fèi),差異顯著時(shí)可能造成消費(fèi)者服務(wù)進(jìn)程因 OOM 而退出,影響服務(wù)穩(wěn)定性。對(duì)于被獎(jiǎng)勵(lì)類(lèi)型限速的消息,需要立即進(jìn)行重入 隊(duì)列,消費(fèi)者服務(wù)繼續(xù)處理后續(xù)消息。由于網(wǎng)絡(luò)波動(dòng)等原因,暫時(shí)處理失敗的消息,也需要重入隊(duì)列,保證消息可以最終處理成功。
3.2.1 消費(fèi)控速實(shí)現(xiàn)
A. 消費(fèi)限速
RocketMQ 消費(fèi)者實(shí)例在啟動(dòng)時(shí)可配置單實(shí)例消費(fèi)速度和消費(fèi) Worker 數(shù)量。動(dòng)態(tài)調(diào)整消費(fèi)速度,需要重啟消費(fèi)者實(shí)例。ByteMQ 兼容 Kafka 協(xié)議,Golang 代碼中消費(fèi) ByteMQ 隊(duì)列使用了 sarama-cluster (https://github.com/bsm/sarama-cluster)。sarama-cluster 相比于RocketMQ 的 SDK 更加簡(jiǎn)單,沒(méi)有提供單實(shí)例消費(fèi)限速能力。單實(shí)例可以訂閱多個(gè) Partition,每個(gè) Partition 會(huì)啟動(dòng)一個(gè)協(xié)程從 Broker 讀取消息,多個(gè) Partiton 共用一個(gè)全局通道(Channel)寫(xiě)入待處理消息。業(yè)務(wù)代碼需要從全局通道中讀取消息進(jìn)行處理。限速邏輯只能在業(yè)務(wù)邏輯中實(shí)現(xiàn),動(dòng)態(tài)調(diào)整消費(fèi)速度無(wú)需重啟消費(fèi)者實(shí)例。
基于 sarama-cluster 的特點(diǎn),使用 Go 原生限速器(golang.org/x/time/rate)實(shí)現(xiàn)了 ByteMQ 消費(fèi)者的單實(shí)例限速器。代碼實(shí)現(xiàn)如下:
type Limiter struct {
Open bool
Fetcher LimitFetcher
inner *rate.Limiter
stop chan struct{}
}
// Wait 處理消息前調(diào)用,返回后進(jìn)行處理
func (s *Limiter) Wait() {
if s.Open {
_ = s.inner.Wait(context.Background())
}
}
// Loop 用于監(jiān)聽(tīng)限速變化
func (s *Limiter) Loop() {
for s.Open && s.Fetcher != nil {
select {
case <-time.After(time.Second * 5):
newLimit := s.Fetcher()
if newLimit != int(s.inner.Limit()) {
s.inner.SetLimit(rate.Limit(newLimit))
}
case <-s.stop:
return
}
}
}
Go 原生限速器采用令牌桶算法實(shí)現(xiàn)限流,內(nèi)部沒(méi)有維護(hù) Timer,而是采用了惰加載的思路,在獲取 Token 時(shí)根據(jù)時(shí)間差計(jì)算更新可用 Token 數(shù)量。沒(méi)有任何外部依賴(lài),非常適合用于單實(shí)例限流。
動(dòng)態(tài)調(diào)整限流器的速率時(shí),通過(guò)限速器 Reserve 和 Wait 接口消耗但未使用的Token 不會(huì)被取消。使用 Wait 方法阻塞的時(shí)間不會(huì)因?yàn)樗俾实恼{(diào)整而變化。速率調(diào)整發(fā)生后,對(duì)下游產(chǎn)生的 QPS 由三部分組成:調(diào)整前已經(jīng)在等待的請(qǐng)求(阻塞在 rate.Limiter::Wait()) 、調(diào)整后新增的 Token 帶來(lái)的請(qǐng)求和 Burst(桶容量)帶來(lái)的請(qǐng)求。調(diào)整后短時(shí)間內(nèi)的對(duì)下游產(chǎn)生的 QPS 可能超過(guò)預(yù)期的速度。對(duì)于突發(fā)流量場(chǎng)景,Burst 不宜設(shè)置過(guò)大。
// SetLimitAt sets a new Limit for the limiter. The new Limit, and Burst, may be violated
// or underutilized by those which reserved (using Reserve or Wait) but did not yet act
// before SetLimitAt was called.
func (lim *Limiter) SetLimitAt(now time.Time, newLimit Limit)
B. 并發(fā)消費(fèi)
RocketMQ 有序消費(fèi)時(shí),單個(gè) Queue 只能分配一個(gè) Worker 進(jìn)行消費(fèi),只有當(dāng)前 Queue 上一個(gè)消息成功處理后,才會(huì)處理下一個(gè)消息,消費(fèi)速度受限于Queue 的數(shù)量和單個(gè)消息的處理時(shí)延;無(wú)序消費(fèi)時(shí),所有 Worker 共用一個(gè)緩沖區(qū),隨機(jī)消費(fèi)不同 Queue 的消息,Worker 之間并發(fā)處理消息,Worker 數(shù)量越多消費(fèi)速度越快。
RocketMQ 進(jìn)行消息確認(rèn)(ACK)時(shí),本地處理成功的消息數(shù)量超過(guò)一定數(shù)量時(shí),或者距離上一次提交超過(guò)一定時(shí)間后,消費(fèi)者實(shí)例會(huì)批量提交(BatchCommit)成功消費(fèi)信息給 Broker。批量提交請(qǐng)求中包含每個(gè)消息的 MsgID、QueueID 和 Offset 等。Broker 側(cè)提供了消息確認(rèn)窗口機(jī)制,每次保存對(duì)應(yīng)Queue 的窗口中最小 Offset 到磁盤(pán)。若 Broker 發(fā)生宕機(jī),窗口中大于磁盤(pán)保存 Offset 的消息,將會(huì)被再次消費(fèi)。在消費(fèi)者視角,會(huì)消費(fèi)到已經(jīng)成功確認(rèn)的消息。因此,RocketMQ 不能保證 At Most Once,消息處理邏輯需要保證冪等。
ByteMQ 消息確認(rèn)機(jī)制相對(duì)簡(jiǎn)單,Broker 沒(méi)有提供消息確認(rèn)窗口機(jī)制,收到消費(fèi)者實(shí)例的 Commit 請(qǐng)求時(shí),直接保存當(dāng)前 Offset,偏移量小于當(dāng)前 Offset 的消息將不會(huì)再次被消費(fèi)。在消費(fèi)者實(shí)例中,業(yè)務(wù)代碼調(diào)用的 MarkOffset 方法,會(huì)基于確認(rèn)消息的 Offset+1并記錄在內(nèi)存中,由協(xié)程定時(shí)提交到 Broker。若消費(fèi)者實(shí)例發(fā)生宕機(jī),Offset 未提交到 Broker 的消息將會(huì)被 Broker 再次下發(fā),ByteMQ 也不能保證 At Most Once,消費(fèi)者也需要保證處理邏輯需要保證冪等。
消費(fèi) ByteMQ 時(shí),從 sarama-cluster 暴露的全局通道中讀取消息后,同步處理成功后調(diào)用 MarkOffset 方法可以保證順序消費(fèi)。但同步處理會(huì)嚴(yán)重降低消費(fèi)速度(單實(shí)例同一時(shí)刻只能處理一個(gè)消息)。啟動(dòng)協(xié)程異步處理可以并發(fā)處理消息,并可通過(guò)增加協(xié)程數(shù)量來(lái)提升消費(fèi)速度。但在消費(fèi)者進(jìn)程異常退出、消費(fèi)者宕機(jī)等情況下會(huì)造成消息丟失。例如:Offset 較大的消息處理后并成功確認(rèn)(Offset 成功提交到 Broker)后,Offset 較小的消息還未處理成功時(shí)消費(fèi)者宕機(jī),Broker 不再下發(fā)該消息,導(dǎo)致該消息漏處理,不滿(mǎn)足 At Least Once 語(yǔ)義。
// MarkOffset marks the provided message as processed, alongside a metadata string
// that represents the state of the partition consumer at that point in time. The
// metadata string can be used by another consumer to restore that state, so it
// can resume consumption.
//
// Note: calling MarkOffset does not necessarily commit the offset to the backend
// store immediately for efficiency reasons, and it may never be committed if
// your application crashes. This means that you may end up processing the same
// message twice, and your processing should ideally be idempotent.
func (c *Consumer) MarkOffset(msg *sarama.ConsumerMessage, metadata string) {
c.subs.Fetch(c.client.config.TryWrapTopicByEnv(msg.Topic), msg.Partition).MarkOffset(msg.Offset+ 1, metadata)
}
解決上述消息漏處理的問(wèn)題,需要針對(duì) ByteMQ 的確認(rèn)機(jī)制在業(yè)務(wù)層進(jìn)行優(yōu)化,即在消費(fèi)者代碼中自助實(shí)現(xiàn)消息確認(rèn)窗口機(jī)制。在消費(fèi)者進(jìn)程中,按照消息順序?qū)⑵?Offset 緩存在鏈表中,同時(shí)以 Offset 為 Key 在 HashMap 中存儲(chǔ)鏈表節(jié)點(diǎn)指針。消息成功處理時(shí),通過(guò) HashMap 尋址,修改鏈表節(jié)點(diǎn)狀態(tài)。本地協(xié)程定時(shí)從鏈表頭部掃描,嚴(yán)格按照順序向 Broker 提交成功消費(fèi)的 Offset。并發(fā)處理時(shí),保證較大 Offset 的消息不會(huì)提前確認(rèn)給 Broker。
3.2.2 事件處理邏輯
RocketMQ 提供了失敗隊(duì)列,并提供重試能力,但 ByteMQ 沒(méi)有失敗處理機(jī)制,為抹平兩種消息隊(duì)列的差異,事件處理方法(HandleMessage)需要盡最大可能保證成功處理,對(duì)于處理失敗的消息需要進(jìn)行重入隊(duì)列(SendEventToBackup)。
RocketMQ 消費(fèi)者失敗消息多次重入隊(duì)列失敗后,會(huì)繼續(xù)利用消息隊(duì)列 SDK 提供的失敗重試能力。由于 ByteMQ 的 SDK 沒(méi)有失敗處理機(jī)制, 失敗消息多次重入隊(duì)列失敗后,依然會(huì)對(duì)其 Offset 進(jìn)行確認(rèn),保證不會(huì)阻塞后續(xù)消息處理。
HandleMessage
// HandleMessage for ByteMQ
func HandleMessage(msg *sarama.ConsumerMessage) error {
err := DoReward(msg.Context, msg.Value, limiter)
MarkOffser(msg, err) // 本地確認(rèn),由異步協(xié)程定時(shí)提交
return nil
}
// HandleMessage for RocketMQ
func (w wrapper) HandleMessage(ctx context.Context, msg *pb.ConsumeMessage) error {
return handler.DoReward(ctx, msg.Msg.MsgBody, limiter)
}
type Limiter interface {
Allow(*BonusEvent) bool
}
func DoReward(ctx context.Context, data []byte, rate Limiter) error {
bonus := &BonusEvent{}
if err := proto.Unmarshal(data, bonus); err != nil {
return err
}
// 按照獎(jiǎng)勵(lì)類(lèi)型限流,當(dāng)rate為nil時(shí)不限流,熔斷時(shí)直接重入隊(duì)列
if rate == nil || rate.Allow(bonus) {
// 同步調(diào)用獎(jiǎng)勵(lì)服務(wù)進(jìn)行發(fā)獎(jiǎng)
if err := callReward(ctx, bonus); err == nil {
return nil
}
}
// 處理失?。褐匦聦?xiě)入隊(duì)列
return SendEventToBackup(ctx, bonus.UniqueID, bonus)
}
SendEventToBackup
func SendEventToBackup(ctx context.Context, tradeNo string, bonus *BonusEvent) error {
bonus.Retry++ // 增加Retry次數(shù)
data, err := proto.Marshal(bonus)
if err != nil {
return err
}
// 使用新PartitonKey進(jìn)行重發(fā)
newPartitionKey := fmt.Sprintf("%s{%d}", bonus.UniqueID, bonus.Retry)
for _, queue := range instances {
// 多個(gè)備選隊(duì)列用于重入隊(duì)列
if err = queue.Send(ctx, newPartitionKey, data); err == nil {
return nil
}
}
// 極端情況下通過(guò)日志回?fù)频姆绞教幚?br> logs.CtxError(ctx, "%s", base64.StdEncoding.EncodeToString(data) )
return err
}
3.2.3 獎(jiǎng)勵(lì)類(lèi)型限速
由于不同獎(jiǎng)勵(lì)類(lèi)型最終由不同的下游系統(tǒng)入賬,為保證下游系統(tǒng)都穩(wěn)定性,減少下游系統(tǒng)返回限流錯(cuò)誤和無(wú)效調(diào)用,針對(duì)每一個(gè)獎(jiǎng)勵(lì)類(lèi)型單獨(dú)配置了單實(shí)例限速。
func NewLimiter() *Limiter {
l := &Limiter{
m: sync.Map{},
ticker: time.NewTicker(5 * time.Second),
}
l.loop()
return l
}
type Limiter struct {
m sync.Map
ticker *time.Ticker
}
type innerLimiter struct {
*rate.Limiter
Fuse bool
}
// Allow 返回true時(shí)處理消息;返回false時(shí)不處理消息,直接重入隊(duì)列
func (L *Limiter) Allow(event *BonusEvent) bool {
if event == nil {
return true
}
if v, exist := L.m.Load(GetBonusType(event)); exist {
if inner, ok := v.(*innerLimiter); ok {
if inner.Fuse { // 開(kāi)啟了熔斷開(kāi)關(guān)
return false
}
return inner.Allow()
}
}
return true
}
func (L *Limiter) loop() {
go func() {
defer Recover()
L.run()
for range L.ticker.C {
L.run()
}
}()
}
// 監(jiān)聽(tīng)配置變更,動(dòng)態(tài)調(diào)整限速
func (L *Limiter) run() {
for wt, config := range tcc.GetRateCfg() {
value, exist := L.m.Load(wt)
if !exist || value == nil {
// 創(chuàng)建新增限流器
L.m.Store(wt, &innerLimiter{
Limiter: rate.NewLimiter(rate.Limit(config.Rate), config.Burst),
Fuse: config.Fuse,
})
continue
}
if inner, ok := value.(*innerLimiter); ok {
// 更新已有限流器
inner.Fuse = config.Fuse
if int(inner.Limiter.Limit()) != config.Rate {
inner.Limiter.SetLimit(rate.Limit(config.Rate))
}
continue
}
L.m.Delete(wt)
L.m.Store(wt, &innerLimiter{
Limiter: rate.NewLimiter(rate.Limit(config.Rate), config.Burst),
Fuse: config.Fuse,
})
}
}
func (L *Limiter) Close() {
if L.ticker != nil {
L.ticker.Stop()
L.ticker = nil
}
}
3.2.4 消費(fèi)和獎(jiǎng)勵(lì)類(lèi)型限速協(xié)調(diào)
消費(fèi)者類(lèi)似于一個(gè)管道,消費(fèi)限速相當(dāng)于流入管道的流量限制,獎(jiǎng)勵(lì)類(lèi)型限速相當(dāng)于流出管道的流量限制。當(dāng)消費(fèi)速度大于所有類(lèi)型速度之和時(shí),會(huì)導(dǎo)致請(qǐng)求重入隊(duì)列。減少重入隊(duì)列需要保證兩點(diǎn):
- 消費(fèi)限速和獎(jiǎng)勵(lì)類(lèi)型限速聯(lián)動(dòng),調(diào)整類(lèi)型限速時(shí)消費(fèi)速度自動(dòng)調(diào)整適配
- 上游發(fā)放獎(jiǎng)勵(lì)時(shí),不同獎(jiǎng)勵(lì)出現(xiàn)的概率分布和類(lèi)型限速配置匹配
在春節(jié)活動(dòng)中,獎(jiǎng)勵(lì)發(fā)放的概率由算法策略控制。在紅包雨、煙火大會(huì)、集卡開(kāi)獎(jiǎng)等場(chǎng)景下,概率分布符合預(yù)期,沒(méi)有發(fā)生重入隊(duì)列。
3.3 獎(jiǎng)勵(lì)服務(wù)設(shè)計(jì)
獎(jiǎng)勵(lì)服務(wù)負(fù)責(zé)調(diào)用資產(chǎn)中臺(tái)服務(wù)和激勵(lì)中臺(tái)服務(wù)發(fā)放具體的獎(jiǎng)勵(lì)。對(duì)上層提供全局冪等的保證、失敗托管重試、預(yù)算控制等能力。
由于上游存在使用同一個(gè)冪等 ID 發(fā)放不同獎(jiǎng)勵(lì)的情況,且不同的下游系統(tǒng)之間數(shù)據(jù)隔離,故需要獎(jiǎng)勵(lì)服務(wù)存儲(chǔ)所有發(fā)獎(jiǎng)?wù)埱筇幚頎顟B(tài)及結(jié)果,用于保證全局冪等。發(fā)放請(qǐng)求使用公司自研的 Abase 進(jìn)行存儲(chǔ),同時(shí)利用了 Abase 提供的 CAS 能力,對(duì)獎(jiǎng)勵(lì)發(fā)放行為進(jìn)行了并發(fā)控制,確保同一個(gè)冪等 ID 僅能用于一次發(fā)放行為。上游重試請(qǐng)求的獎(jiǎng)勵(lì)類(lèi)型和數(shù)值需要和原始請(qǐng)求保持一致,才能通過(guò)校驗(yàn),進(jìn)入真正的發(fā)放流程。
獎(jiǎng)勵(lì)服務(wù)對(duì)外提供同步發(fā)獎(jiǎng)和異步發(fā)獎(jiǎng)兩類(lèi)接口。對(duì)于需要感知獎(jiǎng)勵(lì)發(fā)放結(jié)果的場(chǎng)景,上游需要使用同步發(fā)獎(jiǎng)接口。例如獎(jiǎng)勵(lì)事件消費(fèi)者,需要明確感知發(fā)放是否成功,來(lái)決策是否需要重試等。同步接口穩(wěn)定性和響應(yīng)時(shí)延強(qiáng)依賴(lài)下游服務(wù)。部分獎(jiǎng)勵(lì)下游發(fā)放邏輯較重,耗時(shí)較長(zhǎng),容易導(dǎo)致上游調(diào)用超時(shí),穩(wěn)定性降低。
對(duì)于無(wú)需實(shí)時(shí)感知發(fā)放結(jié)果,或?qū)涌陧憫?yīng)實(shí)驗(yàn)非常敏感的場(chǎng)景,上游需要使用異步發(fā)獎(jiǎng)接口。異步接口在通過(guò)預(yù)算控制,成功將消息投遞到消息隊(duì)列后返回。異步接口可以提升系統(tǒng)吞吐能力,降低上游等待時(shí)間。利用消息隊(duì)列的削峰和異步能力,獎(jiǎng)勵(lì)服務(wù)可以直接承接中等規(guī)模(發(fā)放 QPS 在10萬(wàn)到50萬(wàn))的發(fā)獎(jiǎng)場(chǎng)景接入。對(duì)于大規(guī)模(發(fā)放 QPS 在50萬(wàn)之上)的發(fā)獎(jiǎng)場(chǎng)景,需要通過(guò)獎(jiǎng)勵(lì) SDK 接入。相對(duì)于同步接口,異步接口支持通用的失敗重試邏輯和異常處理能力,接入方無(wú)需再次開(kāi)發(fā)相關(guān)邏輯,可降低研發(fā)投入。
3.3.1 同步發(fā)獎(jiǎng)
同步發(fā)獎(jiǎng)接口會(huì)實(shí)時(shí)返回下游系統(tǒng)返回的入賬結(jié)果。對(duì)于失敗請(qǐng)求由上游服務(wù)負(fù)責(zé)處理,獎(jiǎng)勵(lì)服務(wù)不進(jìn)行托管。獎(jiǎng)勵(lì)同步發(fā)放的流程如下圖所示:
上述流程圖中,寫(xiě)消息隊(duì)列、添加記錄節(jié)點(diǎn)可以根據(jù)場(chǎng)景要求,可設(shè)置為強(qiáng)依賴(lài)節(jié)點(diǎn),也可設(shè)置為弱依賴(lài)節(jié)點(diǎn)。當(dāng)寫(xiě)消息隊(duì)列和添加記錄節(jié)點(diǎn)被設(shè)置為弱依賴(lài)時(shí),獎(jiǎng)勵(lì)服務(wù)不能?chē)?yán)格保證全局冪等,此時(shí)的冪等性需要下游系統(tǒng)保證;在消息隊(duì)列和 Abase 存儲(chǔ)系統(tǒng)出災(zāi)時(shí),獎(jiǎng)勵(lì)服務(wù)可正常對(duì)外提供服務(wù)。
3.3.2 異步發(fā)獎(jiǎng)
上游調(diào)用異步發(fā)獎(jiǎng)接口雖然不會(huì)實(shí)時(shí)返回發(fā)放結(jié)果,但會(huì)在上游請(qǐng)求時(shí)同步調(diào)用預(yù)算控制服務(wù)進(jìn)行扣減預(yù)算。異步發(fā)獎(jiǎng)流程中,發(fā)獎(jiǎng)?wù)埱蟪晒?xiě)入消息隊(duì)列后,立即返回。后續(xù)發(fā)獎(jiǎng)流程由獎(jiǎng)勵(lì)系統(tǒng)的消費(fèi)者服務(wù)通過(guò)消費(fèi)消息觸發(fā),并保證最終成功入賬。
異步發(fā)獎(jiǎng)?wù)埱筇幚磉^(guò)程中,收到下游系統(tǒng)返回的不可重試錯(cuò)誤時(shí),會(huì)將異常請(qǐng)求寫(xiě)入專(zhuān)用的失敗隊(duì)列并落 Hive 表存檔,以便后續(xù)處理。
3.3.3 預(yù)算控制
預(yù)算控制是保證資金安全的手段之一。在春節(jié)活動(dòng)中,除活動(dòng)玩法自身的頻控邏輯和預(yù)算控制策略外,獎(jiǎng)勵(lì)系統(tǒng)、資產(chǎn)中臺(tái)和下游賬戶(hù)服務(wù)都有自身的預(yù)算控制策略。
獎(jiǎng)勵(lì)系統(tǒng)中場(chǎng)景預(yù)算通過(guò)動(dòng)態(tài)配置 TCC 配置,可支持動(dòng)態(tài)調(diào)整。預(yù)算消耗情況通過(guò) KV 存儲(chǔ),為防止出現(xiàn)熱點(diǎn) Key,根據(jù)接入場(chǎng)景的流量大小做了分 Key,單預(yù)算 Key 承載小于500 QPS 的請(qǐng)求。進(jìn)行預(yù)算扣減時(shí),通過(guò)對(duì)唯一訂單號(hào)進(jìn)行哈希求余來(lái)決定具體的預(yù)算 Key,并在預(yù)算 Key 的 Value 中存儲(chǔ)若干條最新的訂單號(hào),基于存儲(chǔ)系統(tǒng)的 CAS 能力提供有限的預(yù)算扣減冪等能力。若在單預(yù)算 Key 上產(chǎn)生較高的并發(fā)請(qǐng)求,存儲(chǔ)的訂單號(hào)被淘汰的情況下發(fā)生超時(shí)重試,會(huì)導(dǎo)致預(yù)算超扣。進(jìn)行預(yù)算配置時(shí),做了一定比例的超配,防止因?yàn)榱髁坎痪皖A(yù)算超扣導(dǎo)致誤攔截。
資產(chǎn)中臺(tái)系統(tǒng)中,基于 Redis 執(zhí)行 Lua 腳本的能力,實(shí)現(xiàn)了多 Key 事務(wù)預(yù)算控制方案,提供了相對(duì)嚴(yán)格的預(yù)算控制能力。在下游的賬戶(hù)服務(wù)中,基于關(guān)系型數(shù)據(jù)的事務(wù)能力進(jìn)行了嚴(yán)格的預(yù)算控制,保證在活動(dòng)場(chǎng)景不會(huì)發(fā)生超發(fā)。
4. 總結(jié)
春節(jié)活動(dòng)于2022年1月24日正式上線(xiàn),2022年1月31日(除夕)結(jié)束,共持續(xù)7天?;顒?dòng)期間通過(guò)獎(jiǎng)勵(lì)系統(tǒng)發(fā)放各類(lèi)獎(jiǎng)勵(lì)約70億筆,僅除夕當(dāng)天就發(fā)放20億筆。在多場(chǎng)紅包雨中,獎(jiǎng)勵(lì)系統(tǒng)從生產(chǎn)端到消費(fèi)端做到了全部消息的可靠處理,離線(xiàn)對(duì)賬未檢測(cè)到任何有效差異,現(xiàn)金獎(jiǎng)勵(lì)全部成功入賬。
在春節(jié)活動(dòng)中對(duì)相關(guān)服務(wù)的性能、穩(wěn)定性和可靠性有著極高的要求。在設(shè)計(jì)技術(shù)方案時(shí),技術(shù)選型和常規(guī)需求有所不同,需要在可供選擇的組件中權(quán)衡性能和可靠性。降低系統(tǒng)復(fù)雜度,減少外部依賴(lài),并對(duì)依賴(lài)部分進(jìn)行充分的深入的了解是保證整個(gè)系統(tǒng)穩(wěn)定可靠的關(guān)鍵。