一篇帶給你Sentinel 流控原理
我們?cè)陧?xiàng)目中添加 Spring Cloud Sentinel 依賴添加后 spring-cloud-starter-alibaba-sentinel 在 Spring-Boot 啟動(dòng)的過(guò)程中回去初始化 spring.factories 中的配置信息,如:SentinelWebAutoConfiguration 、SentinelAutoConfiguration 等配置文件來(lái)初始化
再講代碼之前我先聲明一下我的版本號(hào)sentinel 1.8.0 。后續(xù)的所有內(nèi)容均基于該版本進(jìn)行
@ResoureSetinel 工作原理
配置流控規(guī)則我們最簡(jiǎn)單的方式就是通過(guò) @ResoureSetinel 的方式來(lái)管理,該注解可以直接定義流控規(guī)則、降級(jí)規(guī)則。下面是一個(gè)簡(jiǎn)單的使用例子:
- @SentinelResource(value = "ResOrderGet",
- fallback = "fallback",
- fallbackClass = SentinelResourceExceptionHandler.class,
- blockHandler = "blockHandler",
- blockHandlerClass = SentinelResourceExceptionHandler.class
- )
- @GetMapping("/order/get/{id}")
- public CommonResult<StockModel> getStockDetails(@PathVariable Integer id) {
- StockModel stockModel = new StockModel();
- stockModel.setCode("STOCK==>1000");
- stockModel.setId(id);
- return CommonResult.success(stockModel);
- }
如果大家熟悉 Spring 相關(guān)的組件大家都可以想到,這里多半是通過(guò)Spring Aop. 的方式來(lái)攔截 getStockDetails 方法。我們先看看SentinelAutoConfiguration 配置文件,我們可以找到 SentinelResourceAspect Bean 的定義方法。
- @Bean
- @ConditionalOnMissingBean
- public SentinelResourceAspect sentinelResourceAspect() {
- return new SentinelResourceAspect();
- }
讓后我們?cè)賮?lái)看看 SentinelResourceAspect 具體是怎么處理的,源碼如下:
- // 定義 Pointcut
- @Pointcut("@annotation(com.alibaba.csp.sentinel.annotation.SentinelResource)")
- public void sentinelResourceAnnotationPointcut() {
- }
- // Around 來(lái)對(duì)被標(biāo)記 @SentinelResource 注解的方法進(jìn)行處理
- @Around("sentinelResourceAnnotationPointcut()")
- public Object invokeResourceWithSentinel(ProceedingJoinPoint pjp) throws Throwable {
- Method originMethod = resolveMethod(pjp);
- // 獲取注解信息
- SentinelResource annotation = originMethod.getAnnotation(SentinelResource.class);
- // 獲取資源名稱
- String resourceName = getResourceName(annotation.value(), originMethod);
- EntryType entryType = annotation.entryType();
- int resourceType = annotation.resourceType();
- Entry entry = null;
- try {
- // 執(zhí)行 entry
- entry = SphU.entry(resourceName, resourceType, entryType, pjp.getArgs());
- // 執(zhí)行業(yè)務(wù)方法
- Object result = pjp.proceed();
- // 返回
- return result;
- } catch (BlockException ex) {
- // 處理 BlockException
- return handleBlockException(pjp, annotation, ex);
- } catch (Throwable ex) {
- Class<? extends Throwable>[] exceptionsToIgnore = annotation.exceptionsToIgnore();
- // The ignore list will be checked first.
- if (exceptionsToIgnore.length > 0 && exceptionBelongsTo(ex, exceptionsToIgnore)) {
- throw ex;
- }
- if (exceptionBelongsTo(ex, annotation.exceptionsToTrace())) {
- traceException(ex);
- // 處理降級(jí)
- return handleFallback(pjp, annotation, ex);
- }
- // No fallback function can handle the exception, so throw it out.
- throw ex;
- }
- }
我們總結(jié)一下, @SentinelResource 的執(zhí)行過(guò)程, 首先是通過(guò) Aop 進(jìn)行攔截,然后通過(guò) SphU.entry 執(zhí)行對(duì)應(yīng)的流控規(guī)則,最后調(diào)用業(yè)務(wù)方法。如果觸發(fā)流控規(guī)則首先處理流控異常 BlockException 然后在判斷是否有服務(wù)降級(jí)的處理,如果有就調(diào)用 fallback 方法。通過(guò) handleBlockException 、handleFallback 進(jìn)行處理。
責(zé)任鏈模式處理流控
通過(guò)上面的梳理,我們知道對(duì)于流控的過(guò)程,核心處理方法就是 SphU.entry 。在這個(gè)方法中其實(shí)主要就是初始化流控 Solt 和執(zhí)行 Solt. 在這個(gè)過(guò)程中會(huì)對(duì):簇點(diǎn)定義、流量控制、熔斷降級(jí)、系統(tǒng)白名單等頁(yè)面功能進(jìn)行處理。
1. 初始化責(zé)任鏈
下面是初始化 Solt 的核心代碼在 SphU.entryWithPriority
- // 刪減部分代碼
- private Entry entryWithPriority(ResourceWrapper resourceWrapper, int count, boolean prioritized, Object... args)
- throws BlockException {
- // 初始化責(zé)任鏈
- ProcessorSlot<Object> chain = lookProcessChain(resourceWrapper);
- Entry e = new CtEntry(resourceWrapper, chain, context);
- try {
- // 執(zhí)行 entry
- chain.entry(context, resourceWrapper, null, count, prioritized, args);
- } catch (BlockException e1) {
- e.exit(count, args);
- // 異常拋出,讓 SentinelResourceAspect.invokeResourceWithSentinel 統(tǒng)一處理
- throw e1;
- } catch (Throwable e1) {
- // This should not happen, unless there are errors existing in Sentinel internal.
- RecordLog.info("Sentinel unexpected exception", e1);
- }
- return e;
- }
通過(guò) lookProcessChain 方法我逐步的查找,我們可以看到最終的責(zé)任鏈初始化類,默認(rèn)是 DefaultSlotChainBuilder
- public class DefaultSlotChainBuilder implements SlotChainBuilder {
- @Override
- public ProcessorSlotChain build() {
- ProcessorSlotChain chain = new DefaultProcessorSlotChain();
- // Note: the instances of ProcessorSlot should be different, since they are not stateless.
- // 通過(guò) SPI 去加載所有的 ProcessorSlot 實(shí)現(xiàn),通過(guò) Order 排序
- List<ProcessorSlot> sortedSlotList = SpiLoader.loadPrototypeInstanceListSorted(ProcessorSlot.class);
- for (ProcessorSlot slot : sortedSlotList) {
- if (!(slot instanceof AbstractLinkedProcessorSlot)) {
- RecordLog.warn("The ProcessorSlot(" + slot.getClass().getCanonicalName() + ") is not an instance of AbstractLinkedProcessorSlot, can't be added into ProcessorSlotChain");
- continue;
- }
- // 添加到 chain 尾部
- chain.addLast((AbstractLinkedProcessorSlot<?>) slot);
- }
- return chain;
- }
- }
2. 責(zé)任鏈的處理過(guò)程
我們可以通過(guò)斷點(diǎn)的方式來(lái)查看在 sortedSlotList 集合中所有的 solt 順序如下圖所示:
我們可以通過(guò)如下的順序進(jìn)行逐個(gè)的簡(jiǎn)單的分析一下
- NodeSelectorSolt
- CusterBuilderSolt
- LogSlot
- StatisicSlot
- AuthoritySolt
- SystemSolts
- ParamFlowSolt
- FlowSolt
- DegradeSlot
對(duì)于 Sentinel 的 Slot 流控協(xié)作流程可以參考官方給出的文檔, 如下圖所示:
FlowSolt 流控
通過(guò) NodeSelectorSolt、CusterBuilderSolt、StatisicSlot 等一系列的請(qǐng)求數(shù)據(jù)處理,在 FlowSolt會(huì)進(jìn)入流控規(guī)則,所有的 Solt 都會(huì)執(zhí)行 entry 方法, 如下所示
- // FlowSolt 的 entry 方法
- @Override
- public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
- boolean prioritized, Object... args) throws Throwable {
- // 檢查流量
- checkFlow(resourceWrapper, context, node, count, prioritized);
- fireEntry(context, resourceWrapper, node, count, prioritized, args);
- }
在后續(xù)的流程中,會(huì)執(zhí)進(jìn)行判斷具體的流控策略,默認(rèn)是快速失敗,會(huì)執(zhí)行 DefaultController 方法。
- // DefaultController
- @Override
- public boolean canPass(Node node, int acquireCount, boolean prioritized) {
- // 當(dāng)前資源的調(diào)用次數(shù)
- int curCount = avgUsedTokens(node);
- // 當(dāng)前資源的調(diào)用次數(shù) + 1 > 當(dāng)前閾值
- if (curCount + acquireCount > count) {
- // 刪減比分代碼
- // 不通過(guò)
- return false;
- }
- // 通過(guò)
- return true;
- }
- private int avgUsedTokens(Node node) {
- if (node == null) {
- return DEFAULT_AVG_USED_TOKENS;
- }
- return grade == RuleConstant.FLOW_GRADE_THREAD ? node.curThreadNum() : (int)(node.passQps());
- }
如果上面返回不通過(guò)會(huì)回到,那么會(huì)拋出 FlowException
- public void checkFlow(Function<String, Collection<FlowRule>> ruleProvider, ResourceWrapper resource,
- Context context, DefaultNode node, int count, boolean prioritized) throws BlockException {
- if (ruleProvider == null || resource == null) {
- return;
- }
- Collection<FlowRule> rules = ruleProvider.apply(resource.getName());
- if (rules != null) {
- for (FlowRule rule : rules) {
- if (!canPassCheck(rule, context, node, count, prioritized)) {
- // 流控規(guī)則不通過(guò),會(huì)拋出 FlowException
- throw new FlowException(rule.getLimitApp(), rule);
- }
- }
- }
- }
然后會(huì)在 StatisticSlot 中增加統(tǒng)計(jì)信息, 最后會(huì)拋出給 SentinelResourceAspect 進(jìn)行處理,完成流控功能。我們?cè)賮?lái)看看這個(gè)異常信息,如果是BlockException 異常,會(huì)進(jìn)入 handleBlockException 方法處理, 如果是其他的業(yè)務(wù)異常首先會(huì)判斷是否有配置 fallback 處理如果有,就調(diào)用 handleFallback 沒(méi)有就繼續(xù)往外拋,至此完成流控功能
- try {
- entry = SphU.entry(resourceName, resourceType, entryType, pjp.getArgs());
- Object result = pjp.proceed();
- return result;
- } catch (BlockException ex) {
- return handleBlockException(pjp, annotation, ex);
- } catch (Throwable ex) {
- Class<? extends Throwable>[] exceptionsToIgnore = annotation.exceptionsToIgnore();
- // The ignore list will be checked first.
- if (exceptionsToIgnore.length > 0 && exceptionBelongsTo(ex, exceptionsToIgnore)) {
- throw ex;
- }
- if (exceptionBelongsTo(ex, annotation.exceptionsToTrace())) {
- traceException(ex);
- return handleFallback(pjp, annotation, ex);
- }
- // No fallback function can handle the exception, so throw it out.
- throw ex;
- }
DegradeSlot 降級(jí)
斷路器的作用是當(dāng)某些資源一直出現(xiàn)故障時(shí),將觸發(fā)斷路器。斷路器不會(huì)繼續(xù)訪問(wèn)已經(jīng)發(fā)生故障的資源,而是攔截請(qǐng)求并返回故障信號(hào)。
Sentinel 在 DegradeSlot 這個(gè) Slot 中實(shí)現(xiàn)了熔斷降級(jí)的功能,它有三個(gè)狀態(tài) OPEN 、HALF_OPEN、CLOSED 以ResponseTimeCircuitBreaker RT 響應(yīng)時(shí)間維度來(lái)分析, 斷路器工作的過(guò)程。下面是一個(gè)標(biāo)準(zhǔn)斷路器的工作流程:
在 Sentinel 實(shí)現(xiàn)的源碼過(guò)程如下圖所示:
Sentinel 通過(guò) Web 攔截器
Sentinel 在默認(rèn)情況下, 不使用 @ResourceSentinel 注解實(shí)現(xiàn)流控的時(shí)候, Sentinel 通過(guò)攔截器進(jìn)行流控實(shí)現(xiàn)的。初始化類在 SentinelWebAutoConfiguration 它實(shí)現(xiàn)了 WebMvcConfigurer 接口,在 sentinelWebInterceptor 方法初始化 SentinelWebInterceptor 等 Bean。
- @Bean
- @ConditionalOnProperty(name = "spring.cloud.sentinel.filter.enabled",
- matchIfMissing = true)
- public SentinelWebInterceptor sentinelWebInterceptor(
- SentinelWebMvcConfig sentinelWebMvcConfig) {
- return new SentinelWebInterceptor(sentinelWebMvcConfig);
- }
我們?cè)? SentinelWebInterceptor 的核心方法 preHandle 中處理,這里面我們又可以看到 SphU.entry 熟悉的過(guò)程調(diào)用流控的責(zé)任鏈。由于邏輯都類似,此處不再多說(shuō)。代碼如下:
- public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler)
- throws Exception {
- try {
- String resourceName = getResourceName(request);
- if (StringUtil.isEmpty(resourceName)) {
- return true;
- }
- if (increaseReferece(request, this.baseWebMvcConfig.getRequestRefName(), 1) != 1) {
- return true;
- }
- // Parse the request origin using registered origin parser.
- String origin = parseOrigin(request);
- String contextName = getContextName(request);
- ContextUtil.enter(contextName, origin);
- Entry entry = SphU.entry(resourceName, ResourceTypeConstants.COMMON_WEB, EntryType.IN);
- request.setAttribute(baseWebMvcConfig.getRequestAttributeName(), entry);
- return true;
- } catch (BlockException e) {
- try {
- handleBlockException(request, response, e);
- } finally {
- ContextUtil.exit();
- }
- return false;
- }
- }
參考文檔
https://github.com/alibaba/Sentinel/wiki
https://martinfowler.com/bliki/CircuitBreaker.html