億級(jí)高性能通知系統(tǒng)實(shí)踐
在一個(gè)公司中,消息通知系統(tǒng)是不可或缺的一部分,每個(gè)團(tuán)隊(duì)都可能開發(fā)了一套獨(dú)自的消息通知組件,隨著公司業(yè)務(wù)團(tuán)隊(duì)的日益增長,維護(hù)繁瑣、排查問題復(fù)雜、開發(fā)成本等問題就會(huì)凸顯出來。(例如我們的企微群通知,由于消息內(nèi)容不同模板不同,一個(gè)項(xiàng)目內(nèi)使用的組件就有3種,還不包含其他通知部分。)
基于這樣的背景,我們就迫切需要開發(fā)一套通用的消息通知系統(tǒng)。那么如何高效地處理大量的消息請(qǐng)求以及服務(wù)穩(wěn)定性的保障,成為了開發(fā)者需要面對(duì)的重要挑戰(zhàn)。本文將探討如何構(gòu)建高性能的消息通知系統(tǒng)。
1 服務(wù)劃分
圖片
- 配置層: 主要是后臺(tái)管理系統(tǒng),做一些發(fā)送的配置,包括請(qǐng)求方式、請(qǐng)求地址、預(yù)期響應(yīng)結(jié)果、通道綁定、通道選擇、重試策略以及結(jié)果查詢等功能。
- 接口層:對(duì)外提供服務(wù)的方式,支持RPC與MQ的方式,后續(xù)如需Http或其他方式可以擴(kuò)展添加。
- 基礎(chǔ)服務(wù)層:業(yè)務(wù)核心層,包括消息的首次發(fā)送與重試發(fā)送,消息通道的路由選擇以及服務(wù)的調(diào)用包裝。其中可以看到正常與異常的服務(wù)發(fā)送執(zhí)行器,通過這樣的設(shè)計(jì)可以對(duì)異常服務(wù)的發(fā)送與正常服務(wù)發(fā)送進(jìn)行隔離,避免異常服務(wù)的發(fā)送對(duì)正常服務(wù)造成影響。比如請(qǐng)求某一消息通道的接口耗時(shí)長了,導(dǎo)致請(qǐng)求該通道的資源占用時(shí)間較長,從而影響的正常服務(wù)的請(qǐng)求調(diào)用。執(zhí)行器的選擇是根據(jù)路由器進(jìn)行路由的,其中路由策略包括配置的路由策略以及動(dòng)態(tài)服務(wù)異常自發(fā)現(xiàn)路由策略。所謂正常服務(wù)與異常服務(wù)指的是調(diào)用的下游服務(wù)方是否正常,比如我們發(fā)送支付成功的消息或調(diào)用第三方短信服務(wù),如果在一段時(shí)間響應(yīng)都比較慢或直接失敗等我們就可以判定為異常服務(wù)。
- 通用組件層:主要是對(duì)一些通用組件的封裝。
- 存儲(chǔ)層:包括緩存層與持久化層,緩存層主要是緩存配置的發(fā)送策略、重試策略以及其他一些需要進(jìn)行緩存的內(nèi)容,持久化層主要是ES與MySQL,MySQL存儲(chǔ)消息的發(fā)送記錄以及配置,ES主要存儲(chǔ)消息的發(fā)送記錄供用戶查詢。
2 系統(tǒng)設(shè)計(jì)
2.1 首次消息發(fā)送
圖片
在接受消息發(fā)送請(qǐng)求的時(shí)候,一般會(huì)通過 RPC 服務(wù)請(qǐng)求和 MQ 消息消費(fèi)進(jìn)行處理,這兩種方式各有優(yōu)缺點(diǎn),RPC 這種方式,我們無需考慮消息的丟失問題,MQ 可以實(shí)現(xiàn)異步解耦、削峰填谷。
2.1.1 冪等性的處理
為了防止接收到同樣的消息內(nèi)容進(jìn)行發(fā)送處理,我們通常會(huì)做一些冪等性的設(shè)計(jì)。冪等性的判斷有很多手段,比如先加鎖再查詢或利用數(shù)據(jù)庫的唯一主鍵等來實(shí)現(xiàn),但其實(shí)在我們消息量很大的時(shí)候,查數(shù)據(jù)庫就有點(diǎn)慢了。因?yàn)榘l(fā)送消息的這種場(chǎng)景,重復(fù)消息一般在短時(shí)間內(nèi)發(fā)生的,一般不會(huì)有跨很多天來一筆已經(jīng)發(fā)送過的消息,所以可以設(shè)計(jì)利用 Redis 來實(shí)現(xiàn),先判斷是否有相同的Redis Key,再判斷消息內(nèi)容是否相同,有可能相同的Redis Key,發(fā)送不同的消息內(nèi)容,這種是允許的,具體看對(duì)應(yīng)的業(yè)務(wù)需求。
private boolean isDuplicate(MessageDto messageDto) {
String redisKey = getRedisKey(messageDto);
boolean isDuplicate = false;
try {
if (!RedisUtils.setNx(redisKey, messageDto, 30*60L)) {
isDuplicate = true;
}
if (isDuplicate) {
MessageDto oldDTO = RedisUtils.getObject(redisKey);
if (Objects.equals(messageDto,oldDTO)) {
log.info("消息重復(fù)了");
} else {
isDuplicate = false;
}
}
} catch (Exception e) {
isDuplicate = false;
}
return isDuplicate;
}
2.1.2 問題服務(wù)動(dòng)態(tài)發(fā)現(xiàn)器
上文提到路由器中的路由策略包括配置的路由策略和動(dòng)態(tài)服務(wù)異常自發(fā)現(xiàn)路由策略,其中動(dòng)態(tài)服務(wù)異常自發(fā)現(xiàn)路由策略核心在于服務(wù)異常自發(fā)現(xiàn),核心是依據(jù)問題服務(wù)動(dòng)態(tài)發(fā)現(xiàn)器實(shí)現(xiàn)的,當(dāng)我們發(fā)現(xiàn)某一個(gè)消息通道服務(wù)異常時(shí)可以自動(dòng)路由采用異常通知執(zhí)行器執(zhí)行。
我們主要是借助sentinel的API在各自節(jié)點(diǎn)JVM內(nèi)實(shí)現(xiàn)的,針對(duì)設(shè)置的時(shí)間窗口內(nèi)請(qǐng)求的總次數(shù)和失敗的總次數(shù)進(jìn)行統(tǒng)計(jì),達(dá)到設(shè)定值,就認(rèn)為請(qǐng)求的服務(wù)有問題了,認(rèn)定其為異常服務(wù)。核心主要是以下兩個(gè)方法,其中l(wèi)oadExecuteHandlerRules方法主要是對(duì)流控規(guī)則的設(shè)定,我們可以通過Apollo或Nacos進(jìn)行動(dòng)態(tài)的修改,judge方法是對(duì)請(qǐng)求和失敗的攔截,判斷允許正常訪問,一旦攔截后就認(rèn)為是異常服務(wù),在內(nèi)存中進(jìn)行標(biāo)記記錄,后續(xù)請(qǐng)求通過異常執(zhí)行器執(zhí)行處理。
當(dāng)我們看到這兒會(huì)不會(huì)有疑問,問題服務(wù)在啥時(shí)候會(huì)恢復(fù)正常呢,難道服務(wù)出現(xiàn)一次問題,就一直被認(rèn)定為問題服務(wù)了?當(dāng)時(shí)不是的,我們也設(shè)計(jì)了類似熔斷器那樣的自動(dòng)恢復(fù)功能,在判斷為問題服務(wù)后會(huì)經(jīng)過一段時(shí)間的靜默期,靜默期內(nèi)所有對(duì)該服務(wù)的請(qǐng)求都走異常通知器的執(zhí)行流程,當(dāng)靜默期過后,此時(shí)到達(dá)了半熔斷期,就是如果訪問正常的次數(shù)達(dá)到一定值后,就會(huì)恢復(fù)為正常。
//加載執(zhí)行器的規(guī)則 durationInSec 時(shí)間窗口長度 requestCount 請(qǐng)求總量 failCount失敗總量
public void loadExecuteHandlerRules(Long durationInSec,Long requestCount,Long failCount) {
List<ParamFlowRule> rules = new ArrayList<>();
//REQUEST_RESOURCE 請(qǐng)求資源 可自定義
rules.add(ofParamFlowRule(REQUEST_RESOURCE, requestCount, durationInSec));
//REQUEST_RESOURCE 失敗資源 可自定義
rules.add(ofParamFlowRule(FAIl_RESOURCE, failCount, durationInSec));
ParamFlowRuleManager.loadRules(rules);
}
public ParamFlowRule ofParamFlowRule(String resource, Long failCount, Long durationInSec) {
ParamFlowRule rule = new ParamFlowRule();
rule.setResource(FAIl_RESOURCE);
rule.setGrade(RuleConstant.FLOW_GRADE_QPS);
rule.setCount(failCount);
rule.setDurationInSec(durationInSec);
rule.setParamIdx(0);
return rule;
}
//key 請(qǐng)求的標(biāo)識(shí)key,可以是對(duì)應(yīng)某一服務(wù)的標(biāo)識(shí),reqSuc 請(qǐng)求是否成功,false是失敗,true是成功
public static boolean judge(String key, boolean reqSuc) {
return isBlock(REQUEST_RESOURCE, reqSuc, key) && isBlock(FAIl_RESOURCE, reqSuc, key);
}
public Boolean isBlock(String resource, boolean reqSuc, String key) {
boolean block = false;
Entry failEntry = null;
try {
failEntry = entry(resource, EntryType.IN, reqSuc ? 0 : 1, key);
} catch (BlockException e) {
block = true;
} finally {
if (failEntry != null) {
failEntry.exit();
}
}
return block;
}
2.1.3 sentinel 滑動(dòng)窗口的實(shí)現(xiàn)原理(環(huán)形數(shù)組)
圖片
根據(jù)傳入的時(shí)間窗口大小和數(shù)量,計(jì)算數(shù)組的數(shù)量,數(shù)組的下標(biāo)就是windowsId,windowsStart是每個(gè)數(shù)組的起始時(shí)間值。
例如:統(tǒng)計(jì) 1s 的請(qǐng)求量,設(shè)置兩個(gè)窗口,那么每個(gè)窗口對(duì)應(yīng)的id 就是0、1,相應(yīng)的時(shí)間范圍就是 0m-500ms,500ms-1000ms。如果當(dāng)前時(shí)間是 700ms,那么對(duì)應(yīng)的窗口 id=(700/500)%2=0, 對(duì)應(yīng)的 windowStart=700-(700%500)=200,對(duì)應(yīng)的起始就是 id 為 0 的窗口;如果當(dāng)前時(shí)間是 1200ms,對(duì)應(yīng)的窗口 id=(1200/500)%2=0;對(duì)應(yīng)的 windowStart=1200-(1200%500)=1000 大于 id=0 的起始時(shí)間,重置 id 為 0 的窗口起始值,id=0 的位置不變。
2.1.4 線程池的動(dòng)態(tài)調(diào)整
消息處理完成后,利用線程池進(jìn)行異步發(fā)送,線程池分為正常服務(wù)的線程池和異常服務(wù)的線程池,至于為啥設(shè)計(jì)不同的線程池,我們?cè)谙旅娣€(wěn)定性設(shè)計(jì)方面闡述。線程池核心參數(shù)的設(shè)定一般會(huì)根據(jù)任務(wù)類型和 CPU 核數(shù)進(jìn)行一個(gè)初始化的設(shè)定,后續(xù)我們一般會(huì)壓測(cè)來動(dòng)態(tài)的調(diào)整來滿足我們的目標(biāo)。那么我們?cè)鯓涌梢栽O(shè)計(jì)一個(gè)可以動(dòng)態(tài)調(diào)整的線程池呢?
一般我們可以通過 Apollo 或 Nacos 等統(tǒng)一配置來動(dòng)態(tài)修改線程池的參數(shù),但是線程池的阻塞隊(duì)列長度是不允許修改的,當(dāng)然我們可以自己自定義一個(gè)隊(duì)列來實(shí)現(xiàn)這樣的功能。接下來我們講述的這種設(shè)計(jì),是不用通過自定義阻塞隊(duì)列的方式去實(shí)現(xiàn)的。
ThreadPoolExecutor pool = new ThreadPoolExecutor(poolSize, poolSize,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());
我們直接定義了一個(gè)無界的線程池,核心線程數(shù)和最大線程數(shù)相等,而且用的是默認(rèn)的丟棄策略,那么就有疑問了,這樣的線程池我們?cè)谑褂玫臅r(shí)候,會(huì)有內(nèi)存溢出和消息的丟失風(fēng)險(xiǎn),別著急,我們繼續(xù)往下看。
Notifier notifier = getNotifier();
if (!notifier.isBusy()) {
notifier.execute(msgContent);
}
public boolean isBusy() {
return notifyPool.getQueue().size() >= config.getMaxHandlerSize() * 2;
}
在每次添加任務(wù)的時(shí)候會(huì)判斷線程池隊(duì)列中的任務(wù)是否達(dá)到設(shè)定的最大值,如果達(dá)到就不會(huì)繼續(xù)添加了,當(dāng)前線程池處于繁忙狀態(tài)了,后續(xù)可以利用 MQ 落庫,之后通過重試任務(wù)進(jìn)行發(fā)送了,也保證了永遠(yuǎn)不會(huì)觸發(fā)線程池的拒絕策略。
2.2 重試消息發(fā)送
圖片
部分消息因?yàn)橄到y(tǒng)達(dá)到瓶頸處理不過來或某些消息發(fā)送失敗需要重試,這些消息都可以通過任務(wù)重試來進(jìn)行處理,當(dāng)然利用這種方式也可以實(shí)現(xiàn)延遲消息的發(fā)送。
實(shí)現(xiàn)這種重試的消息機(jī)制可以利用分布式定時(shí)任務(wù)調(diào)度框架,一般為了提高重試效率,會(huì)采用分片廣播這種方式,自己做好消息重復(fù)發(fā)送的控制,我們也可以利用調(diào)度線程池來實(shí)現(xiàn)。
public void init() {
ScheduledExecutorService scheduledService = new ScheduledThreadPoolExecutor(taskRepository.size());
for (Map.Entry<String, TaskHandler> entry : taskRepository.entrySet()) {
final String taskName = entry.getKey();
final TaskHandler handler = entry.getValue();
scheduledService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
// 是否繁忙判斷
if (handler.isBusy()) {
return;
}
handleTask(taskName, handler);
} catch (Throwable e) {
logger.error(taskName + " task hanlder fail!", e);
}
}
}, 30, 5, TimeUnit.SECONDS);
}
}
每次進(jìn)行任務(wù)撈取進(jìn)行調(diào)度時(shí),會(huì)首先判斷下當(dāng)前 handler 是否繁忙,其實(shí)就是重試不同類型任務(wù)的線程池資源是否充足,如果不充足的話,即使撈取出來,也一直是排隊(duì)等待。
public void handTask(String taskName, TaskHandler handler) {
Lock lock = LockFactory.getLock(taskName);
List<ScheduleTaskEntity> taskList = null;
try {
if (lock.tryLock()) {
taskList = getTaskList(taskName, handler);
}
} finally {
lock.unlock();
}
if (taskList == null) return;
handler.handleData(taskList);
}
為了防止不同的節(jié)點(diǎn)處理相同的任務(wù)進(jìn)行了加鎖控制,每次撈取的任務(wù)量是根據(jù)不同任務(wù) handler 設(shè)置的量來確定的,撈取完成后發(fā)送至 MQ,然后采用線程池進(jìn)行發(fā)送處理。
2.2.1 ES與MySQL數(shù)據(jù)同步
由于發(fā)送消息的數(shù)據(jù)量,后臺(tái)在進(jìn)行數(shù)據(jù)查詢時(shí)主要是通過ES進(jìn)行查詢處理的,這就涉及到數(shù)據(jù)庫數(shù)據(jù)與ES數(shù)據(jù)一致性的問題。當(dāng)然也可以采用分庫分表或?qū)挶淼燃夹g(shù)進(jìn)行處理,分庫分表對(duì)一些非分片鍵的查詢可能不太友好。
圖片
ES 更新完成后修改數(shù)據(jù)庫狀態(tài)為更新完成狀態(tài),若此時(shí)通知記錄表還有更新,就會(huì)將同步狀態(tài)初始化,若修改數(shù)據(jù)庫為init先于同步完成后的更新就會(huì)出現(xiàn)數(shù)據(jù)不一致的問題,所以每次同步時(shí)攜帶上數(shù)據(jù)庫中的update_time,大于等于db中的update_time才會(huì)更新完成(其實(shí)update_time就是一個(gè)版本號(hào))。
圖片
ES按月滾動(dòng)建立索引,每月新建立的索引,標(biāo)簽都是hot,新增的數(shù)據(jù)都會(huì)放入hot節(jié)點(diǎn)上進(jìn)行存儲(chǔ),到了第二月,通過定時(shí)任務(wù)將上月索引的tag修改為cold,ES集群就會(huì)自動(dòng)將數(shù)據(jù)遷移到標(biāo)簽為cold節(jié)點(diǎn)上(cold節(jié)點(diǎn)的性能一般配置都比較低,對(duì)性能要求并不高)。
3 穩(wěn)定性的保障
上述一系列的設(shè)計(jì)是圍繞高性能進(jìn)行考慮的,當(dāng)然在穩(wěn)定性方面我們也不能忽略,下述幾方面也是我們?cè)诜€(wěn)定性方面的考慮。
3.1 流量突增
面對(duì)流量突增時(shí)做了兩層降級(jí)。當(dāng)流量緩慢增大時(shí),線程池繁忙后,利用MQ做了一次流量削峰、異步落庫,后續(xù)定時(shí)任務(wù)處理發(fā)送,發(fā)送的延時(shí)時(shí)間是0s;當(dāng)流量陡增,用sentinel進(jìn)行判斷,不經(jīng)任何判斷直接MQ削峰落庫,后續(xù)消費(fèi)是延遲消費(fèi)的,待資源空閑才進(jìn)行撈取處理。
3.2 問題服務(wù)的資源隔離
首先我們想想為啥要做問題服務(wù)的隔離呢,不做會(huì)有什么后果呢?設(shè)想一下如果不隔離,問題服務(wù)與正常服務(wù)采用同一線程池資源進(jìn)行處理,當(dāng)問題服務(wù)請(qǐng)求請(qǐng)求耗時(shí)時(shí)間較長,線程釋放慢,會(huì)導(dǎo)致大量正常服務(wù)的消息不能及時(shí)進(jìn)行處理,這樣就會(huì)導(dǎo)致問題服務(wù)影響到正常服務(wù)的消息處理,所以才需要做問題服務(wù)與正常服務(wù)的資源艙壁隔離。
3.3 第三方服務(wù)的保護(hù)
正常的第三方服務(wù)一般都會(huì)做限流降級(jí)設(shè)置,防止服務(wù)被擊垮。如果一些開發(fā)水平欠缺的服務(wù)沒有做,就需要我們進(jìn)行考慮了,一方面不能因?yàn)槲覀兊恼?qǐng)求量較大,影響到別人服務(wù),另一方面,我們的服務(wù)不能因?yàn)榈谌椒?wù)而引發(fā)問題,所以通常我們需要考慮進(jìn)行熔斷處置。
3.4 中間件的容錯(cuò)
在我們使用各種中間件時(shí),也應(yīng)該考慮的中間件的問題。比如公司MQ需要進(jìn)行擴(kuò)容升級(jí),會(huì)使MQ宕機(jī)數(shù)秒,針對(duì)這種問題的容錯(cuò),在進(jìn)行開發(fā)時(shí)也應(yīng)盡可能的考慮設(shè)計(jì)到。
3.5 完善的監(jiān)控體系
我們也應(yīng)該建立完善的監(jiān)控系統(tǒng),來保障服務(wù)的穩(wěn)定運(yùn)行,能在問題擴(kuò)散之前及時(shí)發(fā)現(xiàn)處理,能在問題發(fā)生后進(jìn)行快速的處理,能在后期優(yōu)化處理時(shí)提供輔助依據(jù)。
3.6 服務(wù)的雙活部署、彈性擴(kuò)縮容
在運(yùn)維層面,也應(yīng)該考慮服務(wù)不同機(jī)房的部署,以保證服務(wù)的可用性,為了應(yīng)對(duì)流量的變化同時(shí)也基于成本的考慮,也可以基于服務(wù)的綜合指標(biāo)進(jìn)行彈性擴(kuò)縮容。
4 總結(jié)
關(guān)于作者趙培龍 采貨俠JAVA開發(fā)工程師