自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

億級(jí)高性能通知系統(tǒng)實(shí)踐

開發(fā) 架構(gòu)
在運(yùn)維層面,也應(yīng)該考慮服務(wù)不同機(jī)房的部署,以保證服務(wù)的可用性,為了應(yīng)對(duì)流量的變化同時(shí)也基于成本的考慮,也可以基于服務(wù)的綜合指標(biāo)進(jìn)行彈性擴(kuò)縮容。

在一個(gè)公司中,消息通知系統(tǒng)是不可或缺的一部分,每個(gè)團(tuán)隊(duì)都可能開發(fā)了一套獨(dú)自的消息通知組件,隨著公司業(yè)務(wù)團(tuán)隊(duì)的日益增長,維護(hù)繁瑣、排查問題復(fù)雜、開發(fā)成本等問題就會(huì)凸顯出來。(例如我們的企微群通知,由于消息內(nèi)容不同模板不同,一個(gè)項(xiàng)目內(nèi)使用的組件就有3種,還不包含其他通知部分。)

基于這樣的背景,我們就迫切需要開發(fā)一套通用的消息通知系統(tǒng)。那么如何高效地處理大量的消息請(qǐng)求以及服務(wù)穩(wěn)定性的保障,成為了開發(fā)者需要面對(duì)的重要挑戰(zhàn)。本文將探討如何構(gòu)建高性能的消息通知系統(tǒng)。

1 服務(wù)劃分

圖片圖片

  • 配置層: 主要是后臺(tái)管理系統(tǒng),做一些發(fā)送的配置,包括請(qǐng)求方式、請(qǐng)求地址、預(yù)期響應(yīng)結(jié)果、通道綁定、通道選擇、重試策略以及結(jié)果查詢等功能。
  • 接口層:對(duì)外提供服務(wù)的方式,支持RPC與MQ的方式,后續(xù)如需Http或其他方式可以擴(kuò)展添加。
  • 基礎(chǔ)服務(wù)層:業(yè)務(wù)核心層,包括消息的首次發(fā)送與重試發(fā)送,消息通道的路由選擇以及服務(wù)的調(diào)用包裝。其中可以看到正常與異常的服務(wù)發(fā)送執(zhí)行器,通過這樣的設(shè)計(jì)可以對(duì)異常服務(wù)的發(fā)送與正常服務(wù)發(fā)送進(jìn)行隔離,避免異常服務(wù)的發(fā)送對(duì)正常服務(wù)造成影響。比如請(qǐng)求某一消息通道的接口耗時(shí)長了,導(dǎo)致請(qǐng)求該通道的資源占用時(shí)間較長,從而影響的正常服務(wù)的請(qǐng)求調(diào)用。執(zhí)行器的選擇是根據(jù)路由器進(jìn)行路由的,其中路由策略包括配置的路由策略以及動(dòng)態(tài)服務(wù)異常自發(fā)現(xiàn)路由策略。所謂正常服務(wù)與異常服務(wù)指的是調(diào)用的下游服務(wù)方是否正常,比如我們發(fā)送支付成功的消息或調(diào)用第三方短信服務(wù),如果在一段時(shí)間響應(yīng)都比較慢或直接失敗等我們就可以判定為異常服務(wù)。
  • 通用組件層:主要是對(duì)一些通用組件的封裝。
  • 存儲(chǔ)層:包括緩存層與持久化層,緩存層主要是緩存配置的發(fā)送策略、重試策略以及其他一些需要進(jìn)行緩存的內(nèi)容,持久化層主要是ES與MySQL,MySQL存儲(chǔ)消息的發(fā)送記錄以及配置,ES主要存儲(chǔ)消息的發(fā)送記錄供用戶查詢。

2 系統(tǒng)設(shè)計(jì)

2.1 首次消息發(fā)送

圖片圖片

在接受消息發(fā)送請(qǐng)求的時(shí)候,一般會(huì)通過 RPC 服務(wù)請(qǐng)求和 MQ 消息消費(fèi)進(jìn)行處理,這兩種方式各有優(yōu)缺點(diǎn),RPC 這種方式,我們無需考慮消息的丟失問題,MQ 可以實(shí)現(xiàn)異步解耦、削峰填谷。

2.1.1 冪等性的處理

為了防止接收到同樣的消息內(nèi)容進(jìn)行發(fā)送處理,我們通常會(huì)做一些冪等性的設(shè)計(jì)。冪等性的判斷有很多手段,比如先加鎖再查詢或利用數(shù)據(jù)庫的唯一主鍵等來實(shí)現(xiàn),但其實(shí)在我們消息量很大的時(shí)候,查數(shù)據(jù)庫就有點(diǎn)慢了。因?yàn)榘l(fā)送消息的這種場(chǎng)景,重復(fù)消息一般在短時(shí)間內(nèi)發(fā)生的,一般不會(huì)有跨很多天來一筆已經(jīng)發(fā)送過的消息,所以可以設(shè)計(jì)利用 Redis 來實(shí)現(xiàn),先判斷是否有相同的Redis Key,再判斷消息內(nèi)容是否相同,有可能相同的Redis Key,發(fā)送不同的消息內(nèi)容,這種是允許的,具體看對(duì)應(yīng)的業(yè)務(wù)需求。

private boolean isDuplicate(MessageDto messageDto) {
        String redisKey = getRedisKey(messageDto);
        boolean isDuplicate = false;
        try {
            if (!RedisUtils.setNx(redisKey, messageDto, 30*60L)) {
                isDuplicate = true;
            }
            if (isDuplicate) {
                MessageDto oldDTO = RedisUtils.getObject(redisKey);
                if (Objects.equals(messageDto,oldDTO)) {
                    log.info("消息重復(fù)了");
                } else {
                    isDuplicate = false;
                }
            }
        } catch (Exception e) {
            isDuplicate = false;
        }
        return isDuplicate;
    }

2.1.2 問題服務(wù)動(dòng)態(tài)發(fā)現(xiàn)器

上文提到路由器中的路由策略包括配置的路由策略和動(dòng)態(tài)服務(wù)異常自發(fā)現(xiàn)路由策略,其中動(dòng)態(tài)服務(wù)異常自發(fā)現(xiàn)路由策略核心在于服務(wù)異常自發(fā)現(xiàn),核心是依據(jù)問題服務(wù)動(dòng)態(tài)發(fā)現(xiàn)器實(shí)現(xiàn)的,當(dāng)我們發(fā)現(xiàn)某一個(gè)消息通道服務(wù)異常時(shí)可以自動(dòng)路由采用異常通知執(zhí)行器執(zhí)行。

我們主要是借助sentinel的API在各自節(jié)點(diǎn)JVM內(nèi)實(shí)現(xiàn)的,針對(duì)設(shè)置的時(shí)間窗口內(nèi)請(qǐng)求的總次數(shù)和失敗的總次數(shù)進(jìn)行統(tǒng)計(jì),達(dá)到設(shè)定值,就認(rèn)為請(qǐng)求的服務(wù)有問題了,認(rèn)定其為異常服務(wù)。核心主要是以下兩個(gè)方法,其中l(wèi)oadExecuteHandlerRules方法主要是對(duì)流控規(guī)則的設(shè)定,我們可以通過Apollo或Nacos進(jìn)行動(dòng)態(tài)的修改,judge方法是對(duì)請(qǐng)求和失敗的攔截,判斷允許正常訪問,一旦攔截后就認(rèn)為是異常服務(wù),在內(nèi)存中進(jìn)行標(biāo)記記錄,后續(xù)請(qǐng)求通過異常執(zhí)行器執(zhí)行處理。

當(dāng)我們看到這兒會(huì)不會(huì)有疑問,問題服務(wù)在啥時(shí)候會(huì)恢復(fù)正常呢,難道服務(wù)出現(xiàn)一次問題,就一直被認(rèn)定為問題服務(wù)了?當(dāng)時(shí)不是的,我們也設(shè)計(jì)了類似熔斷器那樣的自動(dòng)恢復(fù)功能,在判斷為問題服務(wù)后會(huì)經(jīng)過一段時(shí)間的靜默期,靜默期內(nèi)所有對(duì)該服務(wù)的請(qǐng)求都走異常通知器的執(zhí)行流程,當(dāng)靜默期過后,此時(shí)到達(dá)了半熔斷期,就是如果訪問正常的次數(shù)達(dá)到一定值后,就會(huì)恢復(fù)為正常。

//加載執(zhí)行器的規(guī)則 durationInSec 時(shí)間窗口長度  requestCount  請(qǐng)求總量 failCount失敗總量
    public void loadExecuteHandlerRules(Long durationInSec,Long requestCount,Long failCount) {
        List<ParamFlowRule> rules = new ArrayList<>();
        //REQUEST_RESOURCE  請(qǐng)求資源 可自定義
        rules.add(ofParamFlowRule(REQUEST_RESOURCE, requestCount, durationInSec));
        //REQUEST_RESOURCE  失敗資源 可自定義
        rules.add(ofParamFlowRule(FAIl_RESOURCE, failCount, durationInSec));
        ParamFlowRuleManager.loadRules(rules);
    }

    public ParamFlowRule ofParamFlowRule(String resource, Long failCount, Long durationInSec) {
        ParamFlowRule rule = new ParamFlowRule();
        rule.setResource(FAIl_RESOURCE);
        rule.setGrade(RuleConstant.FLOW_GRADE_QPS);
        rule.setCount(failCount);
        rule.setDurationInSec(durationInSec);
        rule.setParamIdx(0);
        return rule;
    }
//key 請(qǐng)求的標(biāo)識(shí)key,可以是對(duì)應(yīng)某一服務(wù)的標(biāo)識(shí),reqSuc 請(qǐng)求是否成功,false是失敗,true是成功
    public static boolean judge(String key, boolean reqSuc) {
        return isBlock(REQUEST_RESOURCE, reqSuc, key) && isBlock(FAIl_RESOURCE, reqSuc, key);
    }

    public Boolean isBlock(String resource, boolean reqSuc, String key) {
        boolean block = false;
        Entry failEntry = null;
        try {
            failEntry = entry(resource, EntryType.IN, reqSuc ? 0 : 1, key);
        } catch (BlockException e) {
            block = true;
        } finally {
            if (failEntry != null) {
                failEntry.exit();
            }
        }
        return block;
    }

2.1.3 sentinel 滑動(dòng)窗口的實(shí)現(xiàn)原理(環(huán)形數(shù)組)

圖片圖片

根據(jù)傳入的時(shí)間窗口大小和數(shù)量,計(jì)算數(shù)組的數(shù)量,數(shù)組的下標(biāo)就是windowsId,windowsStart是每個(gè)數(shù)組的起始時(shí)間值。

例如:統(tǒng)計(jì) 1s 的請(qǐng)求量,設(shè)置兩個(gè)窗口,那么每個(gè)窗口對(duì)應(yīng)的id 就是0、1,相應(yīng)的時(shí)間范圍就是 0m-500ms,500ms-1000ms。如果當(dāng)前時(shí)間是 700ms,那么對(duì)應(yīng)的窗口 id=(700/500)%2=0, 對(duì)應(yīng)的 windowStart=700-(700%500)=200,對(duì)應(yīng)的起始就是 id 為 0 的窗口;如果當(dāng)前時(shí)間是 1200ms,對(duì)應(yīng)的窗口 id=(1200/500)%2=0;對(duì)應(yīng)的 windowStart=1200-(1200%500)=1000 大于 id=0 的起始時(shí)間,重置 id 為 0 的窗口起始值,id=0 的位置不變。

2.1.4 線程池的動(dòng)態(tài)調(diào)整

消息處理完成后,利用線程池進(jìn)行異步發(fā)送,線程池分為正常服務(wù)的線程池和異常服務(wù)的線程池,至于為啥設(shè)計(jì)不同的線程池,我們?cè)谙旅娣€(wěn)定性設(shè)計(jì)方面闡述。線程池核心參數(shù)的設(shè)定一般會(huì)根據(jù)任務(wù)類型和 CPU 核數(shù)進(jìn)行一個(gè)初始化的設(shè)定,后續(xù)我們一般會(huì)壓測(cè)來動(dòng)態(tài)的調(diào)整來滿足我們的目標(biāo)。那么我們?cè)鯓涌梢栽O(shè)計(jì)一個(gè)可以動(dòng)態(tài)調(diào)整的線程池呢?

一般我們可以通過 Apollo 或 Nacos 等統(tǒng)一配置來動(dòng)態(tài)修改線程池的參數(shù),但是線程池的阻塞隊(duì)列長度是不允許修改的,當(dāng)然我們可以自己自定義一個(gè)隊(duì)列來實(shí)現(xiàn)這樣的功能。接下來我們講述的這種設(shè)計(jì),是不用通過自定義阻塞隊(duì)列的方式去實(shí)現(xiàn)的。

ThreadPoolExecutor pool = new ThreadPoolExecutor(poolSize, poolSize,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());

我們直接定義了一個(gè)無界的線程池,核心線程數(shù)和最大線程數(shù)相等,而且用的是默認(rèn)的丟棄策略,那么就有疑問了,這樣的線程池我們?cè)谑褂玫臅r(shí)候,會(huì)有內(nèi)存溢出和消息的丟失風(fēng)險(xiǎn),別著急,我們繼續(xù)往下看。

Notifier notifier = getNotifier();
   if (!notifier.isBusy()) {
        notifier.execute(msgContent);
    } 

  public boolean isBusy() {
      return notifyPool.getQueue().size() >= config.getMaxHandlerSize() * 2;
    }

在每次添加任務(wù)的時(shí)候會(huì)判斷線程池隊(duì)列中的任務(wù)是否達(dá)到設(shè)定的最大值,如果達(dá)到就不會(huì)繼續(xù)添加了,當(dāng)前線程池處于繁忙狀態(tài)了,后續(xù)可以利用 MQ 落庫,之后通過重試任務(wù)進(jìn)行發(fā)送了,也保證了永遠(yuǎn)不會(huì)觸發(fā)線程池的拒絕策略。

2.2 重試消息發(fā)送

圖片圖片

部分消息因?yàn)橄到y(tǒng)達(dá)到瓶頸處理不過來或某些消息發(fā)送失敗需要重試,這些消息都可以通過任務(wù)重試來進(jìn)行處理,當(dāng)然利用這種方式也可以實(shí)現(xiàn)延遲消息的發(fā)送。

實(shí)現(xiàn)這種重試的消息機(jī)制可以利用分布式定時(shí)任務(wù)調(diào)度框架,一般為了提高重試效率,會(huì)采用分片廣播這種方式,自己做好消息重復(fù)發(fā)送的控制,我們也可以利用調(diào)度線程池來實(shí)現(xiàn)。

public void init() {
        ScheduledExecutorService scheduledService = new ScheduledThreadPoolExecutor(taskRepository.size());
        for (Map.Entry<String, TaskHandler> entry : taskRepository.entrySet()) {
            final String taskName = entry.getKey();
            final TaskHandler handler = entry.getValue();
            scheduledService.scheduleAtFixedRate(new Runnable() {
                @Override
                public void run() {
                    try {
                        // 是否繁忙判斷
                        if (handler.isBusy()) {
                            return;
                        }
                        handleTask(taskName, handler);
                    } catch (Throwable e) {
                        logger.error(taskName + " task hanlder fail!", e);
                    }
                }
            }, 30, 5, TimeUnit.SECONDS);
        }
    }

每次進(jìn)行任務(wù)撈取進(jìn)行調(diào)度時(shí),會(huì)首先判斷下當(dāng)前 handler 是否繁忙,其實(shí)就是重試不同類型任務(wù)的線程池資源是否充足,如果不充足的話,即使撈取出來,也一直是排隊(duì)等待。

public void handTask(String taskName, TaskHandler handler) {
        Lock lock = LockFactory.getLock(taskName);
        List<ScheduleTaskEntity> taskList = null;
        try {
            if (lock.tryLock()) {
                taskList = getTaskList(taskName, handler);
            }
        } finally {
            lock.unlock();
        }
        if (taskList == null) return;
        handler.handleData(taskList);
    }

為了防止不同的節(jié)點(diǎn)處理相同的任務(wù)進(jìn)行了加鎖控制,每次撈取的任務(wù)量是根據(jù)不同任務(wù) handler 設(shè)置的量來確定的,撈取完成后發(fā)送至 MQ,然后采用線程池進(jìn)行發(fā)送處理。

2.2.1 ES與MySQL數(shù)據(jù)同步

由于發(fā)送消息的數(shù)據(jù)量,后臺(tái)在進(jìn)行數(shù)據(jù)查詢時(shí)主要是通過ES進(jìn)行查詢處理的,這就涉及到數(shù)據(jù)庫數(shù)據(jù)與ES數(shù)據(jù)一致性的問題。當(dāng)然也可以采用分庫分表或?qū)挶淼燃夹g(shù)進(jìn)行處理,分庫分表對(duì)一些非分片鍵的查詢可能不太友好。

圖片圖片

ES 更新完成后修改數(shù)據(jù)庫狀態(tài)為更新完成狀態(tài),若此時(shí)通知記錄表還有更新,就會(huì)將同步狀態(tài)初始化,若修改數(shù)據(jù)庫為init先于同步完成后的更新就會(huì)出現(xiàn)數(shù)據(jù)不一致的問題,所以每次同步時(shí)攜帶上數(shù)據(jù)庫中的update_time,大于等于db中的update_time才會(huì)更新完成(其實(shí)update_time就是一個(gè)版本號(hào))。

圖片圖片

ES按月滾動(dòng)建立索引,每月新建立的索引,標(biāo)簽都是hot,新增的數(shù)據(jù)都會(huì)放入hot節(jié)點(diǎn)上進(jìn)行存儲(chǔ),到了第二月,通過定時(shí)任務(wù)將上月索引的tag修改為cold,ES集群就會(huì)自動(dòng)將數(shù)據(jù)遷移到標(biāo)簽為cold節(jié)點(diǎn)上(cold節(jié)點(diǎn)的性能一般配置都比較低,對(duì)性能要求并不高)。

3 穩(wěn)定性的保障

上述一系列的設(shè)計(jì)是圍繞高性能進(jìn)行考慮的,當(dāng)然在穩(wěn)定性方面我們也不能忽略,下述幾方面也是我們?cè)诜€(wěn)定性方面的考慮。

3.1 流量突增

面對(duì)流量突增時(shí)做了兩層降級(jí)。當(dāng)流量緩慢增大時(shí),線程池繁忙后,利用MQ做了一次流量削峰、異步落庫,后續(xù)定時(shí)任務(wù)處理發(fā)送,發(fā)送的延時(shí)時(shí)間是0s;當(dāng)流量陡增,用sentinel進(jìn)行判斷,不經(jīng)任何判斷直接MQ削峰落庫,后續(xù)消費(fèi)是延遲消費(fèi)的,待資源空閑才進(jìn)行撈取處理。

3.2 問題服務(wù)的資源隔離

首先我們想想為啥要做問題服務(wù)的隔離呢,不做會(huì)有什么后果呢?設(shè)想一下如果不隔離,問題服務(wù)與正常服務(wù)采用同一線程池資源進(jìn)行處理,當(dāng)問題服務(wù)請(qǐng)求請(qǐng)求耗時(shí)時(shí)間較長,線程釋放慢,會(huì)導(dǎo)致大量正常服務(wù)的消息不能及時(shí)進(jìn)行處理,這樣就會(huì)導(dǎo)致問題服務(wù)影響到正常服務(wù)的消息處理,所以才需要做問題服務(wù)與正常服務(wù)的資源艙壁隔離。

3.3 第三方服務(wù)的保護(hù)

正常的第三方服務(wù)一般都會(huì)做限流降級(jí)設(shè)置,防止服務(wù)被擊垮。如果一些開發(fā)水平欠缺的服務(wù)沒有做,就需要我們進(jìn)行考慮了,一方面不能因?yàn)槲覀兊恼?qǐng)求量較大,影響到別人服務(wù),另一方面,我們的服務(wù)不能因?yàn)榈谌椒?wù)而引發(fā)問題,所以通常我們需要考慮進(jìn)行熔斷處置。

3.4 中間件的容錯(cuò)

在我們使用各種中間件時(shí),也應(yīng)該考慮的中間件的問題。比如公司MQ需要進(jìn)行擴(kuò)容升級(jí),會(huì)使MQ宕機(jī)數(shù)秒,針對(duì)這種問題的容錯(cuò),在進(jìn)行開發(fā)時(shí)也應(yīng)盡可能的考慮設(shè)計(jì)到。

3.5 完善的監(jiān)控體系

我們也應(yīng)該建立完善的監(jiān)控系統(tǒng),來保障服務(wù)的穩(wěn)定運(yùn)行,能在問題擴(kuò)散之前及時(shí)發(fā)現(xiàn)處理,能在問題發(fā)生后進(jìn)行快速的處理,能在后期優(yōu)化處理時(shí)提供輔助依據(jù)。

3.6 服務(wù)的雙活部署、彈性擴(kuò)縮容

在運(yùn)維層面,也應(yīng)該考慮服務(wù)不同機(jī)房的部署,以保證服務(wù)的可用性,為了應(yīng)對(duì)流量的變化同時(shí)也基于成本的考慮,也可以基于服務(wù)的綜合指標(biāo)進(jìn)行彈性擴(kuò)縮容。

4 總結(jié)

任何一個(gè)系統(tǒng)的設(shè)計(jì),我們都應(yīng)該從服務(wù)架構(gòu)、系統(tǒng)功能、穩(wěn)定性保障等方面去進(jìn)行考慮。如何具備良好的擴(kuò)展性與容錯(cuò)性,輕松應(yīng)對(duì)各種復(fù)雜多變的業(yè)務(wù)場(chǎng)景也是我們面臨的設(shè)計(jì)挑戰(zhàn)。當(dāng)然技術(shù)方案的設(shè)計(jì)從無萬全之策,亦不存在一勞永逸的‘銀彈’,所以需要結(jié)合具體的業(yè)務(wù)場(chǎng)景進(jìn)行自己的思考與設(shè)計(jì)。

關(guān)于作者趙培龍 采貨俠JAVA開發(fā)工程師

責(zé)任編輯:武曉燕 來源: 轉(zhuǎn)轉(zhuǎn)技術(shù)
相關(guān)推薦

2024-10-15 16:31:30

2020-01-17 11:00:23

流量系統(tǒng)架構(gòu)

2020-07-16 08:06:53

網(wǎng)關(guān)高性能計(jì)

2021-06-30 14:23:30

AMD

2021-02-02 08:32:46

日志系統(tǒng) 高性能

2016-05-03 16:00:30

Web系統(tǒng)容錯(cuò)性建設(shè)

2022-08-15 08:01:35

微服務(wù)框架RPC

2025-01-06 00:00:10

2022-05-12 14:34:14

京東數(shù)據(jù)

2014-03-19 14:34:06

JQuery高性能

2023-05-08 18:33:55

ES數(shù)據(jù)搜索

2019-05-21 09:40:47

Elasticsear高性能 API

2009-06-03 14:24:12

ibmdwWebSphere

2018-01-12 14:37:34

Java代碼實(shí)踐

2024-09-02 18:10:20

2011-08-10 09:27:06

IBM超級(jí)計(jì)算機(jī)藍(lán)水

2016-07-07 11:00:54

聯(lián)想

2020-08-17 08:18:51

Java

2024-09-25 16:10:05

2020-02-06 13:40:35

編程緩存優(yōu)化
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)