環(huán)境:Springboot2.6.14 + Spring Cloud2021.0.5
概述
Spring Cloud Circuit breaker提供了一個(gè)跨不同斷路器實(shí)現(xiàn)的抽象。它為你的應(yīng)用程序提供了一致的API,讓你(開發(fā)人員)選擇最適合你應(yīng)用程序需求的斷路器實(shí)現(xiàn)。
Spring Cloud支持以下斷路器實(shí)現(xiàn):
- Resilience4J
- Sentinel
- Spring Retry
核心概念
要在代碼中創(chuàng)建斷路器,可以使用CircuitBreakerFactory API。當(dāng)你在classpath中包含SpringCloud CircuitBreaker啟動(dòng)器時(shí),會(huì)自動(dòng)為你創(chuàng)建一個(gè)實(shí)現(xiàn)了這個(gè)API的bean。下面的例子展示了如何使用這個(gè)API:?
@Service
public static class DemoControllerService {
private RestTemplate rest;
private CircuitBreakerFactory cbFactory;
public DemoControllerService(RestTemplate rest, CircuitBreakerFactory cbFactory) {
this.rest = rest;
this.cbFactory = cbFactory;
}
public String slow() {
return cbFactory.create("slow").run(() -> rest.getForObject("/slow", String.class), throwable -> "fallback");
}
}
以上是斷路器的基本使用方法。CircuitBreakerFactory.create API創(chuàng)建一個(gè)名為CircuitBreaker的類實(shí)例。run方法接受一個(gè)Supplier和一個(gè)Function作為參數(shù)。Supplier是要封裝在斷路器中的代碼。該函數(shù)是在斷路器跳閘時(shí)運(yùn)行的備用機(jī)制也就是會(huì)調(diào)用run方法的第二個(gè)參數(shù)Function。
實(shí)現(xiàn)原理
在上面我們所看到的API,如:CircuitBreaker,CircuitBreakerFactory這些接口(抽象類)的定義都在Spring Cloud Commons包中定義,而具體的實(shí)現(xiàn)我們還需要自己根據(jù)情況引入。如Resilience4J或者Sentinel,本篇內(nèi)容使用Resilience4J。
首先,引入Ressilience4J自動(dòng)配置
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-circuitbreaker-resilience4j</artifactId>
</dependency>
引入該依賴后自動(dòng)配置CircuitBreakerFactory的實(shí)現(xiàn)?
// 可以通過spring.cloud.circuitbreaker.resilience4j.enabled控制是否啟用,默認(rèn)開啟
@EnableConfigurationProperties(Resilience4JConfigurationProperties.class)
@ConditionalOnProperty(name = { "spring.cloud.circuitbreaker.resilience4j.enabled",
"spring.cloud.circuitbreaker.resilience4j.blocking.enabled" }, matchIfMissing = true)
public class Resilience4JAutoConfiguration {
@Autowired(required = false)
private List<Customizer<Resilience4JCircuitBreakerFactory>> customizers = new ArrayList<>();
@Bean
@ConditionalOnMissingBean(CircuitBreakerFactory.class)
public Resilience4JCircuitBreakerFactory resilience4jCircuitBreakerFactory(
CircuitBreakerRegistry circuitBreakerRegistry, TimeLimiterRegistry timeLimiterRegistry,
@Autowired(required = false) Resilience4jBulkheadProvider bulkheadProvider,
Resilience4JConfigurationProperties resilience4JConfigurationProperties) {
Resilience4JCircuitBreakerFactory factory = new Resilience4JCircuitBreakerFactory(circuitBreakerRegistry,
timeLimiterRegistry, bulkheadProvider, resilience4JConfigurationProperties);
customizers.forEach(customizer -> customizer.customize(factory));
return factory;
}
}
有了上面的工廠類Resilience4JCircuitBreakerFactory后我們就可以通過該工廠進(jìn)行實(shí)際的開發(fā)了。
在上面的Bean創(chuàng)建方法參數(shù)中有幾個(gè)非常重要的核心類:
- CircuitBreakerRegistry
該類用來配置注冊創(chuàng)建CircuitBreaker實(shí)例。
- TimeLimiterRegistry
該類用來注冊配置每一個(gè)實(shí)例(CircuitBreaker)的時(shí)間限制。
- Resilience4jBulkheadProvider
該類主要是提供了隔離機(jī)制,其內(nèi)通過BulkheadRegistry及
ThreadPoolBulkheadRegistry注冊配置每一個(gè)實(shí)例的隔離機(jī)制及線程池隔離配置。
- 核心類Resilience4JCircuitBreakerFactory
在需要進(jìn)行熔斷降級操作都需用通過該工廠進(jìn)行創(chuàng)建CircuitBreaker實(shí)例?
public class Resilience4JCircuitBreakerFactory {
private CircuitBreakerRegistry circuitBreakerRegistry = CircuitBreakerRegistry.ofDefaults();
private TimeLimiterRegistry timeLimiterRegistry = TimeLimiterRegistry.ofDefaults();
private ExecutorService executorService = Executors.newCachedThreadPool();
private Function<String, Resilience4JConfigBuilder.Resilience4JCircuitBreakerConfiguration> defaultConfiguration;
public Resilience4JCircuitBreakerFactory(CircuitBreakerRegistry circuitBreakerRegistry,
TimeLimiterRegistry timeLimiterRegistry, Resilience4jBulkheadProvider bulkheadProvider,
Resilience4JConfigurationProperties resilience4JConfigurationProperties) {
this.circuitBreakerRegistry = circuitBreakerRegistry;
this.timeLimiterRegistry = timeLimiterRegistry;
this.bulkheadProvider = bulkheadProvider;
// 獲取默認(rèn)的配置(當(dāng)沒有為具體的id實(shí)例創(chuàng)建配置時(shí),使用該默認(rèn)配置)
this.defaultConfiguration = id -> new Resilience4JConfigBuilder(id)
.circuitBreakerConfig(this.circuitBreakerRegistry.getDefaultConfig())
.timeLimiterConfig(this.timeLimiterRegistry.getDefaultConfig()).build();
this.resilience4JConfigurationProperties = resilience4JConfigurationProperties;
}
}
創(chuàng)建CircuitBreaker實(shí)例
public Resilience4JCircuitBreaker create(String id) {
return create(id, id, this.executorService);
}
private Resilience4JCircuitBreaker create(String id, String groupName,
ExecutorService circuitBreakerExecutorService) {
// 根據(jù)實(shí)例id獲取對應(yīng)的配置,如果不存在則使用默認(rèn)的配置
// 初始是沒有對應(yīng)id的配置
Resilience4JConfigBuilder.Resilience4JCircuitBreakerConfiguration config = getConfigurations()
.computeIfAbsent(id, defaultConfiguration);
// spring.cloud.circuitbreaker.resilience4j.disableThreadPool 默認(rèn)false
if (resilience4JConfigurationProperties.isDisableThreadPool()) {
return new Resilience4JCircuitBreaker(id, groupName, config.getCircuitBreakerConfig(),
config.getTimeLimiterConfig(), circuitBreakerRegistry, timeLimiterRegistry,
Optional.ofNullable(circuitBreakerCustomizers.get(id)), bulkheadProvider);
} else {
// 創(chuàng)建CircuitBreaker實(shí)例,這里的config.getCircuitBreakerConfig和config.getTimeLimiterConfig
// 兩個(gè)方法返回的默認(rèn)的配置(首次)
return new Resilience4JCircuitBreaker(id, groupName, config.getCircuitBreakerConfig(),
config.getTimeLimiterConfig(), circuitBreakerRegistry, timeLimiterRegistry,
circuitBreakerExecutorService, Optional.ofNullable(circuitBreakerCustomizers.get(id)),bulkheadProvider);
}
}
Resilience4JCircuitBreaker實(shí)例?
public class Resilience4JCircuitBreaker implements CircuitBreaker {
public Resilience4JCircuitBreaker(String id, String groupName,
io.github.resilience4j.circuitbreaker.CircuitBreakerConfig circuitBreakerConfig,
TimeLimiterConfig timeLimiterConfig, CircuitBreakerRegistry circuitBreakerRegistry,
TimeLimiterRegistry timeLimiterRegistry, ExecutorService executorService,
Optional<Customizer<io.github.resilience4j.circuitbreaker.CircuitBreaker>> circuitBreakerCustomizer,
Resilience4jBulkheadProvider bulkheadProvider) {
this.id = id;
this.groupName = groupName;
this.circuitBreakerConfig = circuitBreakerConfig;
this.registry = circuitBreakerRegistry;
this.timeLimiterRegistry = timeLimiterRegistry;
this.timeLimiterConfig = timeLimiterConfig;
this.executorService = executorService;
// ...
}
public <T> T run(Supplier<T> toRun, Function<Throwable, T> fallback) {
// ...
// 下面的CircuitBreaker和TimeLimiter都會(huì)先從Registry中獲取,不存在則使用默認(rèn)的
// 從CircuitBreakerRegistry獲取對應(yīng)Id實(shí)例配置,如果不存在則返回的就是當(dāng)前circuitBreakerConfig
io.github.resilience4j.circuitbreaker.CircuitBreaker defaultCircuitBreaker = registry.circuitBreaker(this.id,
this.circuitBreakerConfig, tags);
// 如果對應(yīng)Id實(shí)例配置不存在則應(yīng)用默認(rèn)的timeLimiterConfig配置
TimeLimiter timeLimiter = timeLimiterRegistry.timeLimiter(id, timeLimiterConfig, tags);
// ...
}
}
CircuitBreakerRegistry注冊器
該注冊器將配置文件中配置的實(shí)例配置進(jìn)行注冊并提供默認(rèn)的配置
@Import({CircuitBreakerConfigurationOnMissingBean.class, FallbackConfigurationOnMissingBean.class})
public class CircuitBreakerAutoConfiguration {}
CircuitBreakerConfigurationOnMissingBean該類注冊CircuitBreakerRegistry
@Configuration
public class CircuitBreakerConfigurationOnMissingBean extends AbstractCircuitBreakerConfigurationOnMissingBean {
public CircuitBreakerConfigurationOnMissingBean(CircuitBreakerConfigurationProperties circuitBreakerProperties) {
super(circuitBreakerProperties);
}
}
屬性配置?
// 相關(guān)配置都在該屬性配置中
@ConfigurationProperties(prefix = "resilience4j.circuitbreaker")
public class CircuitBreakerProperties extends CircuitBreakerConfigurationProperties {}
AbstractCircuitBreakerConfigurationOnMissingBean?
@Configuration
@Import({FallbackConfigurationOnMissingBean.class, SpelResolverConfigurationOnMissingBean.class})
public abstract class AbstractCircuitBreakerConfigurationOnMissingBean {
protected final CircuitBreakerConfiguration circuitBreakerConfiguration;
protected final CircuitBreakerConfigurationProperties circuitBreakerProperties;
public AbstractCircuitBreakerConfigurationOnMissingBean(CircuitBreakerConfigurationProperties circuitBreakerProperties) {
this.circuitBreakerProperties = circuitBreakerProperties;
this.circuitBreakerConfiguration = new CircuitBreakerConfiguration(circuitBreakerProperties);
}
@Bean
@ConditionalOnMissingBean
public CircuitBreakerRegistry circuitBreakerRegistry(
EventConsumerRegistry<CircuitBreakerEvent> eventConsumerRegistry,
RegistryEventConsumer<CircuitBreaker> circuitBreakerRegistryEventConsumer,
@Qualifier("compositeCircuitBreakerCustomizer") CompositeCustomizer<CircuitBreakerConfigCustomizer> compositeCircuitBreakerCustomizer) {
// 創(chuàng)建實(shí)例
return circuitBreakerConfiguration
.circuitBreakerRegistry(eventConsumerRegistry, circuitBreakerRegistryEventConsumer,compositeCircuitBreakerCustomizer);
}
}
CircuitBreakerConfiguration?
@Configuration
public class CircuitBreakerConfiguration {
private final CircuitBreakerConfigurationProperties circuitBreakerProperties;
public CircuitBreakerConfiguration(CircuitBreakerConfigurationProperties circuitBreakerProperties) {
this.circuitBreakerProperties = circuitBreakerProperties;
}
@Bean
public CircuitBreakerRegistry circuitBreakerRegistry(
EventConsumerRegistry<CircuitBreakerEvent> eventConsumerRegistry,
RegistryEventConsumer<CircuitBreaker> circuitBreakerRegistryEventConsumer,
@Qualifier("compositeCircuitBreakerCustomizer") CompositeCustomizer<CircuitBreakerConfigCustomizer> compositeCircuitBreakerCustomizer) {
// 將resilience4j.circuitbreaker.configs所有配置進(jìn)行注冊
CircuitBreakerRegistry circuitBreakerRegistry = createCircuitBreakerRegistry(
circuitBreakerProperties, circuitBreakerRegistryEventConsumer,
compositeCircuitBreakerCustomizer);
//...
// 將resilience4j.circuitbreaker.instances所有配置進(jìn)行注冊
initCircuitBreakerRegistry(circuitBreakerRegistry, compositeCircuitBreakerCustomizer);
return circuitBreakerRegistry;
}
}
上面的configs和instances所有的配置都會(huì)注冊到AbstractRegistry.entryMap中。
獲取實(shí)例Id配置
io.github.resilience4j.circuitbreaker.CircuitBreaker defaultCircuitBreaker = registry.circuitBreaker(this.id,this.circuitBreakerConfig, tags);
// InMemoryCircuitBreakerRegistry
public CircuitBreaker circuitBreaker(String name, CircuitBreakerConfig config,io.vavr.collection.Map<String, String> tags) {
return computeIfAbsent(name, () -> CircuitBreaker.of(name, Objects.requireNonNull(config, CONFIG_MUST_NOT_BE_NULL), getAllTags(tags)));
}
// AbstractRegistry
protected E computeIfAbsent(String name, Supplier<E> supplier) {
// 從map獲取
return entryMap.computeIfAbsent(Objects.requireNonNull(name, NAME_MUST_NOT_BE_NULL), k -> {
E entry = supplier.get();
eventProcessor.processEvent(new EntryAddedEvent<>(entry));
return entry;
});
}
TimeLimiterRegistry注冊器
該配置與CircuitBreakerRegistry的配置類似,這里不贅述
以上就是Resilience4J的實(shí)現(xiàn)原理
Resilience4J配置
resilience4j:
circuitbreaker:
#configs:
# default:
# minimumNumberOfCalls: 5 #默認(rèn)配置
instances:
#在Feign中使用的命名
DemoFeignformat:
minimumNumberOfCalls: 5
lkk:
minimumNumberOfCalls: 2
timelimiter:
configs:
#為任何調(diào)用提供默認(rèn)的配置
default:
timeoutDuration: 1s
instances:
DemoFeignformat:
timeoutDuration: 1s
DemoFeigndate3:
timeoutDuration: 5s
lkk:
timeoutDuration: 1s
在上面Feign的命名規(guī)則如下:
如果開啟了如下配置:?
feign:
circuitbreaker:
enabled: true
alphanumericIds:
enabled: true
public class FeignClientFactoryBean {
<T> T getTarget() {
Feign.Builder builder = feign(context);
}
}
public final class FeignCircuitBreaker {
public static final class Builder extends Feign.Builder {
public Feign build(final FallbackFactory<?> nullableFallbackFactory) {
// FeignCircuitBreakerInvocationHandler執(zhí)行調(diào)用
super.invocationHandlerFactory((target, dispatch) -> new FeignCircuitBreakerInvocationHandler(circuitBreakerFactory,
feignClientName, target, dispatch, nullableFallbackFactory,
circuitBreakerGroupEnabled, circuitBreakerNameResolver));
return super.build();
}
}
}
FeignCircuitBreakerInvocationHandler?
class FeignCircuitBreakerInvocationHandler implements InvocationHandler {
private final CircuitBreakerNameResolver circuitBreakerNameResolver;
public Object invoke() {
String circuitName = circuitBreakerNameResolver.resolveCircuitBreakerName(feignClientName, target, method);
CircuitBreaker circuitBreaker = circuitBreakerGroupEnabled ? factory.create(circuitName, feignClientName) : factory.create(circuitName);
}
}
static class AlphanumericCircuitBreakerNameResolver extends DefaultCircuitBreakerNameResolver {
@Override
public String resolveCircuitBreakerName(String feignClientName, Target<?> target, Method method) {
return super.resolveCircuitBreakerName(feignClientName, target, method).replaceAll("[^a-zA-Z0-9]", "");
}
}
static class DefaultCircuitBreakerNameResolver implements CircuitBreakerNameResolver {
@Override
public String resolveCircuitBreakerName(String feignClientName, Target<?> target, Method method) {
return Feign.configKey(target.type(), method);
}
}
public abstract class Feign {
// 開啟了Alphanumeric會(huì)將特殊字符全部刪除
public static String configKey(Class targetType, Method method) {
StringBuilder builder = new StringBuilder();
builder.append(targetType.getSimpleName());
builder.append('#').append(method.getName()).append('(');
for (Type param : method.getGenericParameterTypes()) {
param = Types.resolve(targetType, targetType, param);
builder.append(Types.getRawType(param).getSimpleName()).append(',');
}
if (method.getParameterTypes().length > 0) {
builder.deleteCharAt(builder.length() - 1);
}
return builder.append(')').toString();
}
}
以上就是基于Resilience4J的實(shí)現(xiàn)原理及配置實(shí)現(xiàn)