自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

限流降級(jí)神器,帶你解讀阿里巴巴開(kāi)源 Sentinel 實(shí)現(xiàn)原理

新聞 前端
Sentinel 是阿里中間件團(tuán)隊(duì)開(kāi)源的,面向分布式服務(wù)架構(gòu)的輕量級(jí)高可用流量控制組件,主要以流量為切入點(diǎn),從流量控制、熔斷降級(jí)、系統(tǒng)負(fù)載保護(hù)等多個(gè)維度來(lái)幫助用戶保護(hù)服務(wù)的穩(wěn)定性。

Sentinel 是阿里中間件團(tuán)隊(duì)開(kāi)源的,面向分布式服務(wù)架構(gòu)的輕量級(jí)高可用流量控制組件,主要以流量為切入點(diǎn),從流量控制、熔斷降級(jí)、系統(tǒng)負(fù)載保護(hù)等多個(gè)維度來(lái)幫助用戶保護(hù)服務(wù)的穩(wěn)定性。

大家可能會(huì)問(wèn):Sentinel 和之前常用的熔斷降級(jí)庫(kù) Netflix Hystrix 有什么異同呢?Sentinel官網(wǎng)有一個(gè)對(duì)比的文章,這里摘抄一個(gè)總結(jié)的表格,具體的對(duì)比可以點(diǎn)此 鏈接 查看。

限流降級(jí)神器,帶你解讀阿里巴巴開(kāi)源 Sentinel 實(shí)現(xiàn)原理

從對(duì)比的表格可以看到,Sentinel比Hystrix在功能性上還要強(qiáng)大一些,本文讓我們一起來(lái)了解下Sentinel的源碼,揭開(kāi)Sentinel的神秘面紗。

項(xiàng)目結(jié)構(gòu)

將Sentinel的源碼fork到自己的github庫(kù)中,接著把源碼clone到本地,然后開(kāi)始源碼閱讀之旅吧。

首先我們看一下Sentinel項(xiàng)目的整個(gè)結(jié)構(gòu):

限流降級(jí)神器,帶你解讀阿里巴巴開(kāi)源 Sentinel 實(shí)現(xiàn)原理
  • sentinel-core 核心模塊,限流、降級(jí)、系統(tǒng)保護(hù)等都在這里實(shí)現(xiàn)
  • sentinel-dashboard 控制臺(tái)模塊,可以對(duì)連接上的sentinel客戶端實(shí)現(xiàn)可視化的管理
  • sentinel-transport 傳輸模塊,提供了基本的監(jiān)控服務(wù)端和客戶端的API接口,以及一些基于不同庫(kù)的實(shí)現(xiàn)
  • sentinel-extension 擴(kuò)展模塊,主要對(duì)DataSource進(jìn)行了部分?jǐn)U展實(shí)現(xiàn)
  • sentinel-adapter 適配器模塊,主要實(shí)現(xiàn)了對(duì)一些常見(jiàn)框架的適配
  • sentinel-demo 樣例模塊,可參考怎么使用sentinel進(jìn)行限流、降級(jí)等
  • sentinel-benchmark 基準(zhǔn)測(cè)試模塊,對(duì)核心代碼的精確性提供基準(zhǔn)測(cè)試

運(yùn)行樣例

基本上每個(gè)框架都會(huì)帶有樣例模塊,有的叫example,有的叫demo,sentinel也不例外。

那我們從sentinel的demo中找一個(gè)例子運(yùn)行下看看大致的情況吧,上面說(shuō)過(guò)了sentinel主要的核心功能是做限流、降級(jí)和系統(tǒng)保護(hù),那我們就從“限流”開(kāi)始看sentinel的實(shí)現(xiàn)原理吧。

限流降級(jí)神器,帶你解讀阿里巴巴開(kāi)源 Sentinel 實(shí)現(xiàn)原理

可以看到sentinel-demo模塊中有很多不同的樣例,我們找到basic模塊下的flow包,這個(gè)包下面就是對(duì)應(yīng)的限流的樣例,但是限流也有很多種類型的限流,我們就找根據(jù)qps限流的類看吧,其他的限流方式原理上都大差不差。

  1. public class FlowQpsDemo { 
  2.  
  3. private static final String KEY = "abc"
  4.  
  5. private static AtomicInteger pass = new AtomicInteger(); 
  6.  
  7. private static AtomicInteger block = new AtomicInteger(); 
  8.  
  9. private static AtomicInteger total = new AtomicInteger(); 
  10.  
  11. private static volatile boolean stop = false
  12.  
  13. private static final int threadCount = 32
  14.  
  15. private static int seconds = 30
  16.  
  17. public static void main(String[] args) throws Exception { 
  18.  
  19. initFlowQpsRule(); 
  20. tick(); 
  21.  
  22. // first make the system run on a very low condition 
  23.  
  24. simulateTraffic(); 
  25.  
  26. System.out.println("===== begin to do flow control"); 
  27.  
  28. System.out.println("only 20 requests per second can pass"); 
  29.  
  30.  
  31. private static void initFlowQpsRule() { 
  32.  
  33. List<FlowRule> rules = new ArrayList<FlowRule>(); 
  34.  
  35. FlowRule rule1 = new FlowRule(); 
  36.  
  37. rule1.setResource(KEY); 
  38.  
  39. // set limit qps to 20 
  40.  
  41. rule1.setCount(20); 
  42.  
  43. // 設(shè)置限流類型:根據(jù)qps 
  44.  
  45. rule1.setGrade(RuleConstant.FLOW_GRADE_QPS); 
  46.  
  47. rule1.setLimitApp("default"); 
  48.  
  49. rules.add(rule1); 
  50.  
  51. // 加載限流的規(guī)則 
  52.  
  53. FlowRuleManager.loadRules(rules); 
  54.  
  55.  
  56. private static void simulateTraffic() { 
  57.  
  58. for (int i = 0; i < threadCount; i++) { 
  59.  
  60. Thread t = new Thread(new RunTask()); 
  61.  
  62. t.setName("simulate-traffic-Task"); 
  63.  
  64. t.start(); 
  65.  
  66.  
  67.  
  68. private static void tick() { 
  69.  
  70. Thread timer = new Thread(new TimerTask()); 
  71.  
  72. timer.setName("sentinel-timer-task"); 
  73.  
  74. timer.start(); 
  75.  
  76.  
  77. static class TimerTask implements Runnable { 
  78.  
  79. @Override 
  80.  
  81. public void run() { 
  82.  
  83. long start = System.currentTimeMillis(); 
  84.  
  85. System.out.println("begin to statistic!!!"); 
  86.  
  87. long oldTotal = 0
  88.  
  89. long oldPass = 0
  90.  
  91. long oldBlock = 0
  92.  
  93. while (!stop) { 
  94.  
  95. try { 
  96.  
  97. TimeUnit.SECONDS.sleep(1); 
  98.  
  99. catch (InterruptedException e) { 
  100.  
  101.  
  102. long globalTotal = total.get(); 
  103.  
  104. long oneSecondTotal = globalTotal - oldTotal; 
  105.  
  106. oldTotal = globalTotal; 
  107.  
  108. long globalPass = pass.get(); 
  109.  
  110. long oneSecondPass = globalPass - oldPass; 
  111.  
  112. oldPass = globalPass; 
  113.  
  114. long globalBlock = block.get(); 
  115.  
  116. long oneSecondBlock = globalBlock - oldBlock; 
  117.  
  118. oldBlock = globalBlock; 
  119.  
  120. System.out.println(seconds + " send qps is: " + oneSecondTotal); 
  121.  
  122. System.out.println(TimeUtil.currentTimeMillis() + ", total:" + oneSecondTotal 
  123.  
  124. ", pass:" + oneSecondPass 
  125.  
  126. ", block:" + oneSecondBlock); 
  127.  
  128. if (seconds-- <= 0) { 
  129.  
  130. stop = true
  131.  
  132.  
  133.  
  134. long cost = System.currentTimeMillis() - start; 
  135.  
  136. System.out.println("time cost: " + cost + " ms"); 
  137.  
  138. System.out.println("total:" + total.get() + ", pass:" + pass.get() 
  139.  
  140. ", block:" + block.get()); 
  141.  
  142. System.exit(0); 
  143.  
  144.  
  145.  
  146. static class RunTask implements Runnable { 
  147.  
  148. @Override 
  149.  
  150. public void run() { 
  151.  
  152. while (!stop) { 
  153.  
  154. Entry entry = null
  155.  
  156. try { 
  157.  
  158. entry = SphU.entry(KEY); 
  159.  
  160. // token acquired, means pass 
  161.  
  162. pass.addAndGet(1); 
  163.  
  164. catch (BlockException e1) { 
  165.  
  166. block.incrementAndGet(); 
  167.  
  168. catch (Exception e2) { 
  169.  
  170. // biz exception 
  171.  
  172. finally { 
  173.  
  174. total.incrementAndGet(); 
  175.  
  176. if (entry != null) { 
  177.  
  178. entry.exit(); 
  179.  
  180.  
  181.  
  182. Random random2 = new Random(); 
  183.  
  184. try { 
  185.  
  186. TimeUnit.MILLISECONDS.sleep(random2.nextInt(50)); 
  187.  
  188. catch (InterruptedException e) { 
  189.  
  190. // ignore 
  191.  
  192.  
  193.  
  194.  
  195.  

執(zhí)行上面的代碼后,打印出如下的結(jié)果:

限流降級(jí)神器,帶你解讀阿里巴巴開(kāi)源 Sentinel 實(shí)現(xiàn)原理

可以看到,上面的結(jié)果中,pass的數(shù)量和我們的預(yù)期并不相同,我們預(yù)期的是每秒允許pass的請(qǐng)求數(shù)是20個(gè),但是目前有很多pass的請(qǐng)求數(shù)是超過(guò)20個(gè)的。

原因是,我們這里測(cè)試的代碼使用了多線程,注意看 threadCount 的值,一共有32個(gè)線程來(lái)模擬,而在RunTask的run方法中執(zhí)行資源保護(hù)時(shí),即在 SphU.entry 的內(nèi)部是沒(méi)有加鎖的,所以就會(huì)導(dǎo)致在高并發(fā)下,pass的數(shù)量會(huì)高于20。

可以用下面這個(gè)模型來(lái)描述下,有一個(gè)TimeTicker線程在做統(tǒng)計(jì),每1秒鐘做一次。有N個(gè)RunTask線程在模擬請(qǐng)求,被訪問(wèn)的business code被資源key保護(hù)著,根據(jù)規(guī)則,每秒只允許20個(gè)請(qǐng)求通過(guò)。

由于pass、block、total等計(jì)數(shù)器是全局共享的,而多個(gè)RunTask線程在執(zhí)行SphU.entry申請(qǐng)獲取entry時(shí),內(nèi)部沒(méi)有鎖保護(hù),所以會(huì)存在pass的個(gè)數(shù)超過(guò)設(shè)定的閾值。

限流降級(jí)神器,帶你解讀阿里巴巴開(kāi)源 Sentinel 實(shí)現(xiàn)原理

那為了證明在單線程下限流的正確性與可靠性,那我們的模型就應(yīng)該變成了這樣:

限流降級(jí)神器,帶你解讀阿里巴巴開(kāi)源 Sentinel 實(shí)現(xiàn)原理

那接下來(lái)我把 threadCount 的值改為1,只有一個(gè)線程來(lái)執(zhí)行這個(gè)方法,看下具體的限流結(jié)果,執(zhí)行上面的代碼后打印的結(jié)果如下:

限流降級(jí)神器,帶你解讀阿里巴巴開(kāi)源 Sentinel 實(shí)現(xiàn)原理

可以看到pass數(shù)基本上維持在20,但是***次統(tǒng)計(jì)的pass值還是超過(guò)了20。這又是什么原因?qū)е碌哪兀?/p>

其實(shí)仔細(xì)看下Demo中的代碼可以發(fā)現(xiàn),模擬請(qǐng)求是用的一個(gè)線程,統(tǒng)計(jì)結(jié)果是用的另外一個(gè)線程,統(tǒng)計(jì)線程每1秒鐘統(tǒng)計(jì)一次結(jié)果,這兩個(gè)線程之間是有時(shí)間上的誤差的。從TimeTicker線程打印出來(lái)的時(shí)間戳可以看出來(lái),雖然每隔一秒進(jìn)行統(tǒng)計(jì),但是當(dāng)前打印時(shí)的時(shí)間和上一次的時(shí)間還是有誤差的,不完全是1000ms的間隔。

要真正驗(yàn)證每秒限制20個(gè)請(qǐng)求,保證數(shù)據(jù)的精準(zhǔn)性,需要做基準(zhǔn)測(cè)試,這個(gè)不是本篇文章的重點(diǎn),有興趣的同學(xué)可以去了解下jmh,sentinel中的基準(zhǔn)測(cè)試也是通過(guò)jmh做的。

深入原理

通過(guò)一個(gè)簡(jiǎn)單的示例程序,我們了解了sentinel可以對(duì)請(qǐng)求進(jìn)行限流,除了限流外,還有降級(jí)和系統(tǒng)保護(hù)等功能。那現(xiàn)在我們就撥開(kāi)云霧,深入源碼內(nèi)部去一窺sentinel的實(shí)現(xiàn)原理吧。

首先從入口開(kāi)始: SphU.entry() 。這個(gè)方法會(huì)去申請(qǐng)一個(gè)entry,如果能夠申請(qǐng)成功,則說(shuō)明沒(méi)有被限流,否則會(huì)拋出BlockException,表面已經(jīng)被限流了。

從 SphU.entry() 方法往下執(zhí)行會(huì)進(jìn)入到 Sph.entry() ,Sph的默認(rèn)實(shí)現(xiàn)類是 CtSph ,在CtSph中最終會(huì)執(zhí)行到 entry(ResourceWrapperresourceWrapper,intcount,Object...args)throwsBlockException 這個(gè)方法。

我們來(lái)看一下這個(gè)方法的具體實(shí)現(xiàn):

  1. public Entry entry(ResourceWrapper resourceWrapper, int count, Object... args) throws BlockException { 
  2.  
  3. Context context = ContextUtil.getContext(); 
  4.  
  5. if (context instanceof NullContext) { 
  6.  
  7. // Init the entry only. No rule checking will occur. 
  8.  
  9. return new CtEntry(resourceWrapper, null, context); 
  10.  
  11.  
  12. if (context == null) { 
  13.  
  14. context = MyContextUtil.myEnter(Constants.CONTEXT_DEFAULT_NAME, "", resourceWrapper.getType()); 
  15.  
  16.  
  17. // Global switch is close, no rule checking will do. 
  18.  
  19. if (!Constants.ON) { 
  20.  
  21. return new CtEntry(resourceWrapper, null, context); 
  22.  
  23.  
  24. // 獲取該資源對(duì)應(yīng)的SlotChain 
  25.  
  26. ProcessorSlot<Object> chain = lookProcessChain(resourceWrapper); 
  27.  
  28. /* 
  29.  
  30. * Means processor cache size exceeds {@link Constants.MAX_SLOT_CHAIN_SIZE}, so no 
  31.  
  32. * rule checking will be done. 
  33.  
  34. */ 
  35.  
  36. if (chain == null) { 
  37.  
  38. return new CtEntry(resourceWrapper, null, context); 
  39.  
  40.  
  41. Entry e = new CtEntry(resourceWrapper, chain, context); 
  42.  
  43. try { 
  44.  
  45. // 執(zhí)行Slot的entry方法 
  46.  
  47. chain.entry(context, resourceWrapper, null, count, args); 
  48.  
  49. catch (BlockException e1) { 
  50.  
  51. e.exit(count, args); 
  52.  
  53. // 拋出BlockExecption 
  54.  
  55. throw e1; 
  56.  
  57. catch (Throwable e1) { 
  58.  
  59. RecordLog.info("Sentinel unexpected exception", e1); 
  60.  
  61.  
  62. return e; 
  63.  

這個(gè)方法可以分為以下幾個(gè)部分:

  • 1.對(duì)參數(shù)和全局配置項(xiàng)做檢測(cè),如果不符合要求就直接返回了一個(gè)CtEntry對(duì)象,不會(huì)再進(jìn)行后面的限流檢測(cè),否則進(jìn)入下面的檢測(cè)流程。
  • 2.根據(jù)包裝過(guò)的資源對(duì)象獲取對(duì)應(yīng)的SlotChain
  • 3.執(zhí)行SlotChain的entry方法
  • 3.1.如果SlotChain的entry方法拋出了BlockException,則將該異常繼續(xù)向上拋出
  • 3.2.如果SlotChain的entry方法正常執(zhí)行了,則***會(huì)將該entry對(duì)象返回
  • 4.如果上層方法捕獲了BlockException,則說(shuō)明請(qǐng)求被限流了,否則請(qǐng)求能正常執(zhí)行

其中比較重要的是第2、3兩個(gè)步驟,我們來(lái)分解一下這兩個(gè)步驟。

創(chuàng)建SlotChain

首先看一下lookProcessChain的方法實(shí)現(xiàn):

  1. private ProcessorSlot<Object> lookProcessChain(ResourceWrapper resourceWrapper) { 
  2.  
  3. ProcessorSlotChain chain = chainMap.get(resourceWrapper); 
  4.  
  5. if (chain == null) { 
  6.  
  7. synchronized (LOCK) { 
  8.  
  9. chain = chainMap.get(resourceWrapper); 
  10.  
  11. if (chain == null
  12.  
  13. // Entry size limit. 
  14.  
  15. if (chainMap.size() >= Constants.MAX_SLOT_CHAIN_SIZE) { 
  16.  
  17. return null
  18.  
  19.  
  20. // 具體構(gòu)造chain的方法 
  21.  
  22. chain = Env.slotsChainbuilder.build(); 
  23.  
  24. Map<ResourceWrapper, ProcessorSlotChain> newMap = new HashMap<ResourceWrapper, ProcessorSlotChain>(chainMap.size() + 1); 
  25.  
  26. newMap.putAll(chainMap); 
  27.  
  28. newMap.put(resourceWrapper, chain); 
  29.  
  30. chainMap = newMap; 
  31.  
  32.  
  33.  
  34.  
  35. return chain; 
  36.  

該方法使用了一個(gè)HashMap做了緩存,key是資源對(duì)象。這里加了鎖,并且做了 doublecheck 。具體構(gòu)造chain的方法是通過(guò): Env.slotsChainbuilder.build() 這句代碼創(chuàng)建的。那就進(jìn)入這個(gè)方法看看吧。

  1. public ProcessorSlotChain build() { 
  2.  
  3. ProcessorSlotChain chain = new DefaultProcessorSlotChain(); 
  4.  
  5. chain.addLast(new NodeSelectorSlot()); 
  6.  
  7. chain.addLast(new ClusterBuilderSlot()); 
  8.  
  9. chain.addLast(new LogSlot()); 
  10.  
  11. chain.addLast(new StatisticSlot()); 
  12.  
  13. chain.addLast(new SystemSlot()); 
  14.  
  15. chain.addLast(new AuthoritySlot()); 
  16.  
  17. chain.addLast(new FlowSlot()); 
  18.  
  19. chain.addLast(new DegradeSlot()); 
  20.  
  21. return chain; 
  22.  

Chain是鏈條的意思,從build的方法可看出,ProcessorSlotChain是一個(gè)鏈表,里面添加了很多個(gè)Slot。具體的實(shí)現(xiàn)需要到DefaultProcessorSlotChain中去看。

  1. public class DefaultProcessorSlotChain extends ProcessorSlotChain { 
  2.  
  3. AbstractLinkedProcessorSlot<?> first = new AbstractLinkedProcessorSlot<Object>() { 
  4.  
  5. @Override 
  6.  
  7. public void entry(Context context, ResourceWrapper resourceWrapper, Object t, int count, Object... args) 
  8.  
  9. throws Throwable { 
  10.  
  11. super.fireEntry(context, resourceWrapper, t, count, args); 
  12.  
  13.  
  14. @Override 
  15.  
  16. public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) { 
  17.  
  18. super.fireExit(context, resourceWrapper, count, args); 
  19.  
  20.  
  21. }; 
  22.  
  23. AbstractLinkedProcessorSlot<?> end = first; @Override public void addFirst(AbstractLinkedProcessorSlot<?> protocolProcessor) { protocolProcessor.setNext(first.getNext()); first.setNext(protocolProcessor); if (end == first) { end = protocolProcessor; } } @Override 
  24. public void addLast(AbstractLinkedProcessorSlot<?> protocolProcessor) 
  25.  
  26. end.setNext(protocolProcessor); 
  27.  
  28. end = protocolProcessor; 
  29.  
  30.  

DefaultProcessorSlotChain中有兩個(gè)AbstractLinkedProcessorSlot類型的變量:first和end,這就是鏈表的頭結(jié)點(diǎn)和尾節(jié)點(diǎn)。

創(chuàng)建DefaultProcessorSlotChain對(duì)象時(shí),首先創(chuàng)建了首節(jié)點(diǎn),然后把首節(jié)點(diǎn)賦值給了尾節(jié)點(diǎn),可以用下圖表示:

限流降級(jí)神器,帶你解讀阿里巴巴開(kāi)源 Sentinel 實(shí)現(xiàn)原理

將***個(gè)節(jié)點(diǎn)添加到鏈表中后,整個(gè)鏈表的結(jié)構(gòu)變成了如下圖這樣:

限流降級(jí)神器,帶你解讀阿里巴巴開(kāi)源 Sentinel 實(shí)現(xiàn)原理

將所有的節(jié)點(diǎn)都加入到鏈表中后,整個(gè)鏈表的結(jié)構(gòu)變成了如下圖所示:

限流降級(jí)神器,帶你解讀阿里巴巴開(kāi)源 Sentinel 實(shí)現(xiàn)原理

這樣就將所有的Slot對(duì)象添加到了鏈表中去了,每一個(gè)Slot都是繼承自AbstractLinkedProcessorSlot。而AbstractLinkedProcessorSlot是一種責(zé)任鏈的設(shè)計(jì),每個(gè)對(duì)象中都有一個(gè)next屬性,指向的是另一個(gè)AbstractLinkedProcessorSlot對(duì)象。其實(shí)責(zé)任鏈模式在很多框架中都有,比如Netty中是通過(guò)pipeline來(lái)實(shí)現(xiàn)的。

知道了SlotChain是如何創(chuàng)建的了,那接下來(lái)就要看下是如何執(zhí)行Slot的entry方法的了。

執(zhí)行SlotChain的entry方法

lookProcessChain方法獲得的ProcessorSlotChain的實(shí)例是DefaultProcessorSlotChain,那么執(zhí)行chain.entry方法,就會(huì)執(zhí)行DefaultProcessorSlotChain的entry方法,而DefaultProcessorSlotChain的entry方法是這樣的:

  1. @Override 
  2.  
  3. public void entry(Context context, ResourceWrapper resourceWrapper, Object t, int count, Object... args) 
  4.  
  5. throws Throwable { 
  6.  
  7. first.transformEntry(context, resourceWrapper, t, count, args); 
  8.  

也就是說(shuō),DefaultProcessorSlotChain的entry實(shí)際是執(zhí)行的first屬性的transformEntry方法。

而transformEntry方法會(huì)執(zhí)行當(dāng)前節(jié)點(diǎn)的entry方法,在DefaultProcessorSlotChain中first節(jié)點(diǎn)重寫(xiě)了entry方法,具體如下:

  1. @Override 
  2.  
  3. public void entry(Context context, ResourceWrapper resourceWrapper, Object t, int count, Object... args) 
  4.  
  5. throws Throwable { 
  6.  
  7. super.fireEntry(context, resourceWrapper, t, count, args); 
  8.  

first節(jié)點(diǎn)的entry方法,實(shí)際又是執(zhí)行的super的fireEntry方法,那繼續(xù)把目光轉(zhuǎn)移到fireEntry方法,具體如下:

  1. @Override 
  2.  
  3. public void fireEntry(Context context, ResourceWrapper resourceWrapper, Object obj, int count, Object... args) 
  4.  
  5. throws Throwable { 
  6.  
  7. if (next != null) { 
  8.  
  9. next.transformEntry(context, resourceWrapper, obj, count, args); 
  10.  
  11.  

從這里可以看到,從fireEntry方法中就開(kāi)始傳遞執(zhí)行entry了,這里會(huì)執(zhí)行當(dāng)前節(jié)點(diǎn)的下一個(gè)節(jié)點(diǎn)transformEntry方法,上面已經(jīng)分析過(guò)了,transformEntry方法會(huì)觸發(fā)當(dāng)前節(jié)點(diǎn)的entry,也就是說(shuō)fireEntry方法實(shí)際是觸發(fā)了下一個(gè)節(jié)點(diǎn)的entry方法。具體的流程如下圖所示:

限流降級(jí)神器,帶你解讀阿里巴巴開(kāi)源 Sentinel 實(shí)現(xiàn)原理

從圖中可以看出,從最初的調(diào)用Chain的entry()方法,轉(zhuǎn)變成了調(diào)用SlotChain中Slot的entry()方法。從上面的分析可以知道,SlotChain中的***個(gè)Slot節(jié)點(diǎn)是NodeSelectorSlot。

執(zhí)行Slot的entry方法

現(xiàn)在可以把目光轉(zhuǎn)移到SlotChain中的***個(gè)節(jié)點(diǎn)NodeSelectorSlot的entry方法中去了,具體的代碼如下:

  1. @Override 
  2.  
  3. public void entry(Context context, ResourceWrapper resourceWrapper, Object obj, int count, Object... args) 
  4.  
  5. throws Throwable { 
  6.  
  7. DefaultNode node = map.get(context.getName()); 
  8.  
  9. if (node == null) { 
  10.  
  11. synchronized (this) { 
  12.  
  13. node = map.get(context.getName()); 
  14.  
  15. if (node == null) { 
  16.  
  17. node = Env.nodeBuilder.buildTreeNode(resourceWrapper, null); 
  18.  
  19. HashMap<String, DefaultNode> cacheMap = new HashMap<String, DefaultNode>(map.size()); 
  20.  
  21. cacheMap.putAll(map); 
  22.  
  23. cacheMap.put(context.getName(), node); 
  24.  
  25. map = cacheMap; 
  26.  
  27.  
  28. // Build invocation tree 
  29.  
  30. ((DefaultNode)context.getLastNode()).addChild(node); 
  31.  
  32.  
  33.  
  34. context.setCurNode(node); 
  35.  
  36. // 由此觸發(fā)下一個(gè)節(jié)點(diǎn)的entry方法 
  37.  
  38. fireEntry(context, resourceWrapper, node, count, args); 
  39.  

從代碼中可以看到,NodeSelectorSlot節(jié)點(diǎn)做了一些自己的業(yè)務(wù)邏輯處理,具體的大家可以深入源碼繼續(xù)追蹤,這里大概的介紹下每種Slot的功能職責(zé):

  • NodeSelectorSlot 負(fù)責(zé)收集資源的路徑,并將這些資源的調(diào)用路徑,以樹(shù)狀結(jié)構(gòu)存儲(chǔ)起來(lái),用于根據(jù)調(diào)用路徑來(lái)限流降級(jí);
  • ClusterBuilderSlot 則用于存儲(chǔ)資源的統(tǒng)計(jì)信息以及調(diào)用者信息,例如該資源的 RT, QPS, thread count 等等,這些信息將用作為多維度限流,降級(jí)的依據(jù);
  • StatistcSlot 則用于記錄,統(tǒng)計(jì)不同緯度的 runtime 信息;
  • FlowSlot 則用于根據(jù)預(yù)設(shè)的限流規(guī)則,以及前面 slot 統(tǒng)計(jì)的狀態(tài),來(lái)進(jìn)行限流;
  • AuthorizationSlot 則根據(jù)黑白名單,來(lái)做黑白名單控制;
  • DegradeSlot 則通過(guò)統(tǒng)計(jì)信息,以及預(yù)設(shè)的規(guī)則,來(lái)做熔斷降級(jí);
  • SystemSlot 則通過(guò)系統(tǒng)的狀態(tài),例如 load1 等,來(lái)控制總的入口流量;

執(zhí)行完業(yè)務(wù)邏輯處理后,調(diào)用了fireEntry()方法,由此觸發(fā)了下一個(gè)節(jié)點(diǎn)的entry方法。此時(shí)我們就知道了sentinel的責(zé)任鏈就是這樣傳遞的:每個(gè)Slot節(jié)點(diǎn)執(zhí)行完自己的業(yè)務(wù)后,會(huì)調(diào)用fireEntry來(lái)觸發(fā)下一個(gè)節(jié)點(diǎn)的entry方法。

所以可以將上面的圖完整了,具體如下:

限流降級(jí)神器,帶你解讀阿里巴巴開(kāi)源 Sentinel 實(shí)現(xiàn)原理

至此就通過(guò)SlotChain完成了對(duì)每個(gè)節(jié)點(diǎn)的entry()方法的調(diào)用,每個(gè)節(jié)點(diǎn)會(huì)根據(jù)創(chuàng)建的規(guī)則,進(jìn)行自己的邏輯處理,當(dāng)統(tǒng)計(jì)的結(jié)果達(dá)到設(shè)置的閾值時(shí),就會(huì)觸發(fā)限流、降級(jí)等事件,具體是拋出BlockException異常。

總結(jié)

sentinel主要是基于7種不同的Slot形成了一個(gè)鏈表,每個(gè)Slot都各司其職,自己做完分內(nèi)的事之后,會(huì)把請(qǐng)求傳遞給下一個(gè)Slot,直到在某一個(gè)Slot中***規(guī)則后拋出BlockException而終止。

前三個(gè)Slot負(fù)責(zé)做統(tǒng)計(jì),后面的Slot負(fù)責(zé)根據(jù)統(tǒng)計(jì)的結(jié)果結(jié)合配置的規(guī)則進(jìn)行具體的控制,是Block該請(qǐng)求還是放行。

控制的類型也有很多可選項(xiàng):根據(jù)qps、線程數(shù)、冷啟動(dòng)等等。

然后基于這個(gè)核心的方法,衍生出了很多其他的功能:

  • 1、dashboard控制臺(tái),可以可視化的對(duì)每個(gè)連接過(guò)來(lái)的sentinel客戶端 (通過(guò)發(fā)送heartbeat消息)進(jìn)行控制,dashboard和客戶端之間通過(guò)http協(xié)議進(jìn)行通訊。
  • 2、規(guī)則的持久化,通過(guò)實(shí)現(xiàn)DataSource接口,可以通過(guò)不同的方式對(duì)配置的規(guī)則進(jìn)行持久化,默認(rèn)規(guī)則是在內(nèi)存中的
  • 3、對(duì)主流的框架進(jìn)行適配,包括servlet,dubbo,rRpc等

Dashboard控制臺(tái)

sentinel-dashboard是一個(gè)單獨(dú)的應(yīng)用,通過(guò)spring-boot進(jìn)行啟動(dòng),主要提供一個(gè)輕量級(jí)的控制臺(tái),它提供機(jī)器發(fā)現(xiàn)、單機(jī)資源實(shí)時(shí)監(jiān)控、集群資源匯總,以及規(guī)則管理的功能。

我們只需要對(duì)應(yīng)用進(jìn)行簡(jiǎn)單的配置,就可以使用這些功能。

1 啟動(dòng)控制臺(tái)

1.1 下載代碼并編譯控制臺(tái)

  • 下載 控制臺(tái) 工程
  • 使用以下命令將代碼打包成一個(gè) fat jar: mvn cleanpackage

1.2 啟動(dòng)

使用如下命令啟動(dòng)編譯后的控制臺(tái):

$ java -Dserver.port=8080 -Dcsp.sentinel.dashboard.server=localhost:8080 -jar target/sentinel-dashboard.jar

上述命令中我們指定了一個(gè)JVM參數(shù), -Dserver.port=8080 用于指定 Spring Boot 啟動(dòng)端口為 8080。

2 客戶端接入控制臺(tái)

控制臺(tái)啟動(dòng)后,客戶端需要按照以下步驟接入到控制臺(tái)。

2.1 引入客戶端jar包

通過(guò) pom.xml 引入 jar 包:

  1. <dependency> 
  2.  
  3. <groupId>com.alibaba.csp</groupId> 
  4.  
  5. <artifactId>sentinel-transport-simple-http</artifactId> 
  6.  
  7. <version>x.y.z</version> 
  8.  
  9. </dependency> 

2.2 配置啟動(dòng)參數(shù)

啟動(dòng)時(shí)加入 JVM 參數(shù) -Dcsp.sentinel.dashboard.server=consoleIp:port 指定控制臺(tái)地址和端口。若啟動(dòng)多個(gè)應(yīng)用,則需要通過(guò) -Dcsp.sentinel.api.port=xxxx 指定客戶端監(jiān)控 API 的端口(默認(rèn)是 8719)。

除了修改 JVM 參數(shù),也可以通過(guò)配置文件取得同樣的效果。更詳細(xì)的信息可以參考 啟動(dòng)配置項(xiàng)。

2.3 觸發(fā)客戶端初始化

確保客戶端有訪問(wèn)量,Sentinel 會(huì)在客戶端***調(diào)用的時(shí)候進(jìn)行初始化,開(kāi)始向控制臺(tái)發(fā)送心跳包。

sentinel-dashboard是一個(gè)獨(dú)立的web應(yīng)用,可以接受客戶端的連接,然后與客戶端之間進(jìn)行通訊,他們之間使用http協(xié)議進(jìn)行通訊。他們之間的關(guān)系如下圖所示:

限流降級(jí)神器,帶你解讀阿里巴巴開(kāi)源 Sentinel 實(shí)現(xiàn)原理

dashboard

dashboard啟動(dòng)后會(huì)等待客戶端的連接,具體的做法是在 MachineRegistryController 中有一個(gè) receiveHeartBeat 的方法,客戶端發(fā)送心跳消息,就是通過(guò)http請(qǐng)求這個(gè)方法。

dashboard接收到客戶端的心跳消息后,會(huì)把客戶端的傳遞過(guò)來(lái)的ip、port等信息封裝成一個(gè) MachineInfo對(duì)象,然后將該對(duì)象通過(guò) MachineDiscovery 接口的 addMachine 方法添加到一個(gè)ConcurrentHashMap中保存起來(lái)。

這里會(huì)有問(wèn)題,因?yàn)榭蛻舳说男畔⑹潜4嬖赿ashboard的內(nèi)存中的,所以當(dāng)dashboard應(yīng)用重啟后,之前已經(jīng)發(fā)送過(guò)來(lái)的客戶端信息都會(huì)丟失掉。

client

client在啟動(dòng)時(shí),會(huì)通過(guò)CommandCenterInitFunc選擇一個(gè),并且只選擇一個(gè)CommandCenter進(jìn)行啟動(dòng)。

啟動(dòng)之前會(huì)通過(guò)spi的方式掃描獲取到所有的CommandHandler的實(shí)現(xiàn)類,然后將所有的CommandHandler注冊(cè)到一個(gè)HashMap中去,待后期使用。

PS:考慮一下,為什么CommandHandler不需要做持久化,而是直接保存在內(nèi)存中。

注冊(cè)完CommandHandler之后,緊接著就啟動(dòng)CommandCenter了,目前CommandCenter有兩個(gè)實(shí)現(xiàn)類:

  • SimpleHttpCommandCenter 通過(guò)ServerSocket啟動(dòng)一個(gè)服務(wù)端,接受socket連接
  • NettyHttpCommandCenter 通過(guò)Netty啟動(dòng)一個(gè)服務(wù)端,接受channel連接

CommandCenter啟動(dòng)后,就等待dashboard發(fā)送消息過(guò)來(lái)了,當(dāng)接收到消息后,會(huì)把消息通過(guò)具體的CommandHandler進(jìn)行處理,然后將處理的結(jié)果返回給dashboard。

這里需要注意的是,dashboard給client發(fā)送消息是通過(guò)異步的httpClient進(jìn)行發(fā)送的,在HttpHelper類中。

但是詭異的是,既然通過(guò)異步發(fā)送了,又通過(guò)一個(gè)CountDownLatch來(lái)等待消息的返回,然后獲取結(jié)果,那這樣不就失去了異步的意義的嗎?具體的代碼如下:

  1. private String httpGetContent(String url) { final HttpGet httpGet = new HttpGet(url); final CountDownLatch latch = new CountDownLatch(1); 
  2. final AtomicReference<String> reference = new AtomicReference<>(); 
  3.  
  4. httpclient.execute(httpGet, new FutureCallback<HttpResponse>() { 
  5.  
  6. @Override 
  7.  
  8. public void completed(final HttpResponse response) { 
  9.  
  10. try { 
  11.  
  12. reference.set(getBody(response)); 
  13.  
  14. catch (Exception e) { 
  15.  
  16. logger.info("httpGetContent " + url + " error:", e); 
  17.  
  18. finally { 
  19.  
  20. latch.countDown(); 
  21.  
  22.  
  23.  
  24. @Override 
  25.  
  26. public void failed(final Exception ex) { 
  27.  
  28. latch.countDown(); 
  29.  
  30. logger.info("httpGetContent " + url + " failed:", ex); 
  31.  
  32.  
  33. @Override 
  34.  
  35. public void cancelled() { 
  36.  
  37. latch.countDown(); 
  38.  
  39.  
  40. }); 
  41.  
  42. try { 
  43.  
  44. latch.await(5, TimeUnit.SECONDS); 
  45.  
  46. catch (Exception e) { 
  47.  
  48. logger.info("wait http client error:", e); 
  49.  
  50.  
  51. return reference.get(); 
  52.  

主流框架的適配

sentinel也對(duì)一些主流的框架進(jìn)行了適配,使得在使用主流框架時(shí),也可以享受到sentinel的保護(hù)。目前已經(jīng)支持的適配器包括以下這些:

  • Web Servlet
  • Dubbo
  • Spring Boot / Spring Cloud
  • gRPC
  • Apache RocketMQ

其實(shí)做適配就是通過(guò)那些主流框架的擴(kuò)展點(diǎn),然后在擴(kuò)展點(diǎn)上加入sentinel限流降級(jí)的代碼即可。拿Servlet的適配代碼看一下,具體的代碼是:

  1. public class CommonFilter implements Filter { 
  2.  
  3. @Override 
  4.  
  5. public void init(FilterConfig filterConfig) { 
  6.  
  7.  
  8. @Override 
  9.  
  10. public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain) 
  11.  
  12. throws IOException, ServletException 
  13.  
  14. HttpServletRequest sRequest = (HttpServletRequest)request; 
  15.  
  16. Entry entry = null
  17.  
  18. try { 
  19.  
  20. // 根據(jù)請(qǐng)求生成的資源 
  21.  
  22. String target = FilterUtil.filterTarget(sRequest); 
  23.  
  24. target = WebCallbackManager.getUrlCleaner().clean(target); 
  25.  
  26. // “申請(qǐng)”該資源 
  27.  
  28. ContextUtil.enter(target); 
  29.  
  30. entry = SphU.entry(target, EntryType.IN); 
  31.  
  32. // 如果能成功“申請(qǐng)”到資源,則說(shuō)明未被限流 
  33.  
  34. // 則將請(qǐng)求放行 
  35.  
  36. chain.doFilter(request, response); 
  37.  
  38. catch (BlockException e) { 
  39.  
  40. // 否則如果捕獲了BlockException異常,說(shuō)明請(qǐng)求被限流了 
  41.  
  42. // 則將請(qǐng)求重定向到一個(gè)默認(rèn)的頁(yè)面 
  43.  
  44. HttpServletResponse sResponse = (HttpServletResponse)response; 
  45.  
  46. WebCallbackManager.getUrlBlockHandler().blocked(sRequest, sResponse); 
  47.  
  48. catch (IOException e2) { 
  49.  
  50. // 省略部分代碼 
  51.  
  52. finally { 
  53.  
  54. if (entry != null) { 
  55.  
  56. entry.exit(); 
  57.  
  58.  
  59. ContextUtil.exit(); 
  60.  
  61.  
  62.  
  63. @Override 
  64.  
  65. public void destroy() { 
  66.  
  67.  

通過(guò)Servlet的Filter進(jìn)行擴(kuò)展,實(shí)現(xiàn)一個(gè)Filter,然后在doFilter方法中對(duì)請(qǐng)求進(jìn)行限流控制,如果請(qǐng)求被限流則將請(qǐng)求重定向到一個(gè)默認(rèn)頁(yè)面,否則將請(qǐng)求放行給下一個(gè)Filter。

規(guī)則持久化,動(dòng)態(tài)化

Sentinel 的理念是開(kāi)發(fā)者只需要關(guān)注資源的定義,當(dāng)資源定義成功,可以動(dòng)態(tài)增加各種流控降級(jí)規(guī)則。

Sentinel 提供兩種方式修改規(guī)則:

  • 通過(guò) API 直接修改 ( loadRules)
  • 通過(guò) DataSource適配不同數(shù)據(jù)源修改

通過(guò) API 修改比較直觀,可以通過(guò)以下三個(gè) API 修改不同的規(guī)則:

FlowRuleManager.loadRules(List<FlowRule> rules); // 修改流控規(guī)則

DegradeRuleManager.loadRules(List<DegradeRule> rules); // 修改降級(jí)規(guī)則

SystemRuleManager.loadRules(List<SystemRule> rules); // 修改系統(tǒng)規(guī)則

DataSource 擴(kuò)展

上述 loadRules() 方法只接受內(nèi)存態(tài)的規(guī)則對(duì)象,但應(yīng)用重啟后內(nèi)存中的規(guī)則就會(huì)丟失,更多的時(shí)候規(guī)則***能夠存儲(chǔ)在文件、數(shù)據(jù)庫(kù)或者配置中心中。

DataSource 接口給我們提供了對(duì)接任意配置源的能力。相比直接通過(guò) API 修改規(guī)則,實(shí)現(xiàn) DataSource 接口是更加可靠的做法。

官方推薦通過(guò)控制臺(tái)設(shè)置規(guī)則后將規(guī)則推送到統(tǒng)一的規(guī)則中心,用戶只需要實(shí)現(xiàn) DataSource 接口,來(lái)監(jiān)聽(tīng)規(guī)則中心的規(guī)則變化,以實(shí)時(shí)獲取變更的規(guī)則。

DataSource 拓展常見(jiàn)的實(shí)現(xiàn)方式有:

  • 拉模式:客戶端主動(dòng)向某個(gè)規(guī)則管理中心定期輪詢拉取規(guī)則,這個(gè)規(guī)則中心可以是 SQL、文件,甚至是 VCS 等。這樣做的方式是簡(jiǎn)單,缺點(diǎn)是無(wú)法及時(shí)獲取變更;
  • 推模式:規(guī)則中心統(tǒng)一推送,客戶端通過(guò)注冊(cè)監(jiān)聽(tīng)器的方式時(shí)刻監(jiān)聽(tīng)變化,比如使用 Nacos、Zookeeper 等配置中心。這種方式有更好的實(shí)時(shí)性和一致性保證。

至此,sentinel的基本情況都已經(jīng)分析了,更加詳細(xì)的內(nèi)容,可以繼續(xù)閱讀源碼來(lái)研究。

責(zé)任編輯:張燕妮 來(lái)源: 頭條科技
相關(guān)推薦

2024-09-06 13:53:28

2019-01-28 10:10:36

開(kāi)源技術(shù) 趨勢(shì)

2023-04-26 09:16:17

2018-12-14 11:00:18

2021-03-16 08:31:59

微服務(wù)Sentinel雪崩效應(yīng)

2010-06-28 10:43:47

2022-08-22 08:07:45

DruidMySQL密碼

2013-08-22 09:41:52

阿里巴巴去IOE王堅(jiān)

2022-04-06 08:14:49

云原生混部系統(tǒng)開(kāi)源

2011-12-28 15:26:16

Spring\Dubb

2019-02-01 11:16:55

阿里巴巴Java開(kāi)源

2014-03-17 10:24:22

阿里云物聯(lián)網(wǎng)美的

2023-03-29 09:42:32

2013-08-22 09:36:45

阿里巴巴王堅(jiān)阿里云

2019-08-15 10:25:02

代碼開(kāi)發(fā)工具

2009-02-27 10:46:32

DBA筆試題阿里巴巴

2021-05-14 07:45:07

Sentinel 接口限流

2020-11-10 09:00:31

阿里巴巴技術(shù)開(kāi)源

2022-03-21 08:30:13

開(kāi)源模型訓(xùn)練預(yù)測(cè)引擎

2023-06-20 08:10:00

點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)