分布式限流,你想知道的都在這里
前言
在一個(gè)高并發(fā)系統(tǒng)中對(duì)流量的把控是非常重要的,當(dāng)巨大的流量直接請(qǐng)求到我們的服務(wù)器上沒多久就可能造成接口不可用,不處理的話甚至?xí)斐烧麄€(gè)應(yīng)用不可用。
比如最近就有個(gè)這樣的需求,我作為客戶端要向kafka生產(chǎn)數(shù)據(jù),而kafka的消費(fèi)者則再源源不斷的消費(fèi)數(shù)據(jù),并將消費(fèi)的數(shù)據(jù)全部請(qǐng)求到web服務(wù)器,雖說做了負(fù)載(有4臺(tái)web服務(wù)器)但業(yè)務(wù)數(shù)據(jù)的量也是巨大的,每秒鐘可能有上萬條數(shù)據(jù)產(chǎn)生。如果生產(chǎn)者直接生產(chǎn)數(shù)據(jù)的話極有可能把web服務(wù)器拖垮。
對(duì)此就必須要做限流處理,每秒鐘生產(chǎn)一定限額的數(shù)據(jù)到kafka,這樣就能極大程度的保證web的正常運(yùn)轉(zhuǎn)。
其實(shí)不管處理何種場(chǎng)景,本質(zhì)都是降低流量保證應(yīng)用的高可用。
常見算法
對(duì)于限流常見有兩種算法:
- 漏桶算法
- 令牌桶算法
漏桶算法比較簡(jiǎn)單,就是將流量放入桶中,漏桶同時(shí)也按照一定的速率流出,如果流量過快的話就會(huì)溢出(漏桶并不會(huì)提高流出速率)。溢出的流量則直接丟棄。
如下圖所示:

這種做法簡(jiǎn)單粗暴。
漏桶算法雖說簡(jiǎn)單,但卻不能應(yīng)對(duì)實(shí)際場(chǎng)景,比如突然暴增的流量。
這時(shí)就需要用到令牌桶算法:
令牌桶會(huì)以一個(gè)恒定的速率向固定容量大小桶中放入令牌,當(dāng)有流量來時(shí)則取走一個(gè)或多個(gè)令牌。當(dāng)桶中沒有令牌則將當(dāng)前請(qǐng)求丟棄或阻塞。
相比之下令牌桶可以應(yīng)對(duì)一定的突發(fā)流量。
RateLimiter實(shí)現(xiàn)
對(duì)于令牌桶的代碼實(shí)現(xiàn),可以直接使用Guava包中的RateLimiter。
- @Override
- public BaseResponse<UserResVO> getUserByFeignBatch(@RequestBody UserReqVO userReqVO) {
- //調(diào)用遠(yuǎn)程服務(wù)
- OrderNoReqVO vo = new OrderNoReqVO() ;
- vo.setReqNo(userReqVO.getReqNo());
- RateLimiter limiter = RateLimiter.create(2.0) ;
- //批量調(diào)用
- for (int i = 0 ;i< 10 ; i++){
- double acquire = limiter.acquire();
- logger.debug("獲取令牌成功!,消耗=" + acquire);
- BaseResponse<OrderNoResVO> orderNo = orderServiceClient.getOrderNo(vo);
- logger.debug("遠(yuǎn)程返回:"+JSON.toJSONString(orderNo));
- }
- UserRes userRes = new UserRes() ;
- userRes.setUserId(123);
- userRes.setUserName("張三");
- userRes.setReqNo(userReqVO.getReqNo());
- userRes.setCode(StatusEnum.SUCCESS.getCode());
- userRes.setMessage("成功");
- return userRes ;
- }
詳見此。
調(diào)用結(jié)果如下:
代碼可以看出以每秒向桶中放入兩個(gè)令牌,請(qǐng)求一次消耗一個(gè)令牌。所以每秒鐘只能發(fā)送兩個(gè)請(qǐng)求。按照?qǐng)D中的時(shí)間來看也確實(shí)如此(返回值是獲取此令牌所消耗的時(shí)間,差不多也是每500ms一個(gè))。
使用RateLimiter有幾個(gè)值得注意的地方:
允許先消費(fèi),后付款,意思就是它可以來一個(gè)請(qǐng)求的時(shí)候一次性取走幾個(gè)或者是剩下所有的令牌甚至多取,但是后面的請(qǐng)求就得為上一次請(qǐng)求買單,它需要等待桶中的令牌補(bǔ)齊之后才能繼續(xù)獲取令牌。
總結(jié)
針對(duì)于單個(gè)應(yīng)用的限流 RateLimiter 夠用了,如果是分布式環(huán)境可以借助 Redis 來完成。
來做演示。
在 Order 應(yīng)用提供的接口中采取了限流。首先是配置了限流工具的 Bean:
- @Configuration
- public class RedisLimitConfig {
- @Value("${redis.limit}")
- private int limit;
- @Autowired
- private JedisConnectionFactory jedisConnectionFactory;
- @Bean
- public RedisLimit build() {
- RedisClusterConnection clusterConnection = jedisConnectionFactory.getClusterConnection();
- JedisCluster jedisCluster = (JedisCluster) clusterConnection.getNativeConnection();
- RedisLimit redisLimit = new RedisLimit.Builder<>(jedisCluster)
- .limit(limit)
- .build();
- return redisLimit;
- }
- }
接著在 Controller 使用組件:
- @Autowired
- private RedisLimit redisLimit ;
- @Override
- @CheckReqNo
- public BaseResponse<OrderNoResVO> getOrderNo(@RequestBody OrderNoReqVO orderNoReq) {
- BaseResponse<OrderNoResVO> res = new BaseResponse();
- //限流
- boolean limit = redisLimit.limit();
- if (!limit){
- res.setCode(StatusEnum.REQUEST_LIMIT.getCode());
- res.setMessage(StatusEnum.REQUEST_LIMIT.getMessage());
- return res ;
- }
- res.setReqNo(orderNoReq.getReqNo());
- if (null == orderNoReq.getAppId()){
- throw new SBCException(StatusEnum.FAIL);
- }
- OrderNoResVO orderNoRes = new OrderNoResVO() ;
- orderNoRes.setOrderId(DateUtil.getLongTime());
- res.setCode(StatusEnum.SUCCESS.getCode());
- res.setMessage(StatusEnum.SUCCESS.getMessage());
- res.setDataBody(orderNoRes);
- return res ;
- }
為了方便使用,也提供了注解:
- @Override
- @ControllerLimit
- public BaseResponse<OrderNoResVO> getOrderNoLimit(@RequestBody OrderNoReqVO orderNoReq) {
- BaseResponse<OrderNoResVO> res = new BaseResponse();
- // 業(yè)務(wù)邏輯
- return res ;
- }
該注解攔截了 http 請(qǐng)求,會(huì)再請(qǐng)求達(dá)到閾值時(shí)直接返回。
普通方法也可使用:
- @CommonLimit
- public void doSomething(){}
會(huì)在調(diào)用達(dá)到閾值時(shí)拋出異常。
為了模擬并發(fā),在 User 應(yīng)用中開啟了 10 個(gè)線程調(diào)用 Order(限流次數(shù)為5) 接口(也可使用專業(yè)的并發(fā)測(cè)試工具 JMeter 等)。
- @Override
- public BaseResponse<UserResVO> getUserByFeign(@RequestBody UserReqVO userReq) {
- //調(diào)用遠(yuǎn)程服務(wù)
- OrderNoReqVO vo = new OrderNoReqVO();
- vo.setAppId(1L);
- vo.setReqNo(userReq.getReqNo());
- for (int i = 0; i < 10; i++) {
- executorService.execute(new Worker(vo, orderServiceClient));
- }
- UserRes userRes = new UserRes();
- userRes.setUserId(123);
- userRes.setUserName("張三");
- userRes.setReqNo(userReq.getReqNo());
- userRes.setCode(StatusEnum.SUCCESS.getCode());
- userRes.setMessage("成功");
- return userRes;
- }
- private static class Worker implements Runnable {
- private OrderNoReqVO vo;
- private OrderServiceClient orderServiceClient;
- public Worker(OrderNoReqVO vo, OrderServiceClient orderServiceClient) {
- this.vo = vo;
- this.orderServiceClient = orderServiceClient;
- }
- @Override
- public void run() {
- BaseResponse<OrderNoResVO> orderNo = orderServiceClient.getOrderNoCommonLimit(vo);
- logger.info("遠(yuǎn)程返回:" + JSON.toJSONString(orderNo));
- }
- }
為了驗(yàn)證分布式效果啟動(dòng)了兩個(gè) Order 應(yīng)用。
效果如下:
實(shí)現(xiàn)原理
實(shí)現(xiàn)原理其實(shí)很簡(jiǎn)單。既然要達(dá)到分布式全局限流的效果,那自然需要一個(gè)第三方組件來記錄請(qǐng)求的次數(shù)。
其中 Redis 就非常適合這樣的場(chǎng)景。
- 每次請(qǐng)求時(shí)將當(dāng)前時(shí)間(精確到秒)作為 Key 寫入到 Redis 中,超時(shí)時(shí)間設(shè)置為 2 秒,Redis 將該 Key 的值進(jìn)行自增。
- 當(dāng)達(dá)到閾值時(shí)返回錯(cuò)誤。
- 寫入 Redis 的操作用 Lua 腳本來完成,利用 Redis 的單線程機(jī)制可以保證每個(gè) Redis 請(qǐng)求的原子性。
Lua 腳本如下:
--lua 下標(biāo)從 1 開始-- 限流 keylocal key = KEYS[1]-- 限流大小local limit = tonumber(ARGV[1])-- 獲取當(dāng)前流量大小local curentLimit = tonumber(redis.call('get', key) or "0")if curentLimit + 1 > limit then -- 達(dá)到限流大小 返回 return 0;else -- 沒有達(dá)到閾值 value + 1 redis.call("INCRBY", key, 1) redis.call("EXPIRE", key, 2) return curentLimit + 1end
Java 中的調(diào)用邏輯:
- --lua 下標(biāo)從 1 開始
- -- 限流 key
- local key = KEYS[1]
- -- 限流大小
- local limit = tonumber(ARGV[1])
- -- 獲取當(dāng)前流量大小
- local curentLimit = tonumber(redis.call('get', key) or "0")
- if curentLimit + 1 > limit then
- -- 達(dá)到限流大小 返回
- return 0;
- else
- -- 沒有達(dá)到閾值 value + 1
- redis.call("INCRBY", key, 1)
- redis.call("EXPIRE", key, 2)
- return curentLimit + 1
- end
所以只需要在需要限流的地方調(diào)用該方法對(duì)返回值進(jìn)行判斷即可達(dá)到限流的目的。
當(dāng)然這只是利用 Redis 做了一個(gè)粗暴的計(jì)數(shù)器,如果想實(shí)現(xiàn)類似于上文中的令牌桶算法可以基于 Lua 自行實(shí)現(xiàn)。
Builder 構(gòu)建器
在設(shè)計(jì)這個(gè)組件時(shí)想盡量的提供給使用者清晰、可讀性、不易出錯(cuò)的 API。
比如***步,如何構(gòu)建一個(gè)限流對(duì)象。
最常用的方式自然就是構(gòu)造函數(shù),如果有多個(gè)域則可以采用重疊構(gòu)造器的方式:
- public A(){}
- public A(int a){}
- public A(int a,int b){}
缺點(diǎn)也是顯而易見的:如果參數(shù)過多會(huì)導(dǎo)致難以閱讀,甚至如果參數(shù)類型一致的情況下客戶端顛倒了順序,但不會(huì)引起警告從而出現(xiàn)難以預(yù)測(cè)的結(jié)果。
第二種方案可以采用 JavaBean 模式,利用 setter 方法進(jìn)行構(gòu)建:
- A a = new A();
- a.setA(a);
- a.setB(b);
這種方式清晰易讀,但卻容易讓對(duì)象處于不一致的狀態(tài),使對(duì)象處于線程不安全的狀態(tài)。
所以這里采用了第三種創(chuàng)建對(duì)象的方式,構(gòu)建器:
- public class RedisLimit {
- private JedisCommands jedis;
- private int limit = 200;
- private static final int FAIL_CODE = 0;
- /**
- * lua script
- */
- private String script;
- private RedisLimit(Builder builder) {
- this.limit = builder.limit ;
- this.jedis = builder.jedis ;
- buildScript();
- }
- /**
- * limit traffic
- * @return if true
- */
- public boolean limit() {
- String key = String.valueOf(System.currentTimeMillis() / 1000);
- Object result = null;
- if (jedis instanceof Jedis) {
- result = ((Jedis) this.jedis).eval(script, Collections.singletonList(key), Collections.singletonList(String.valueOf(limit)));
- } else if (jedis instanceof JedisCluster) {
- result = ((JedisCluster) this.jedis).eval(script, Collections.singletonList(key), Collections.singletonList(String.valueOf(limit)));
- } else {
- //throw new RuntimeException("instance is error") ;
- return false;
- }
- if (FAIL_CODE != (Long) result) {
- return true;
- } else {
- return false;
- }
- }
- /**
- * read lua script
- */
- private void buildScript() {
- script = ScriptUtil.getScript("limit.lua");
- }
- /**
- * the builder
- * @param <T>
- */
- public static class Builder<T extends JedisCommands>{
- private T jedis = null ;
- private int limit = 200;
- public Builder(T jedis){
- this.jedis = jedis ;
- }
- public Builder limit(int limit){
- this.limit = limit ;
- return this;
- }
- public RedisLimit build(){
- return new RedisLimit(this) ;
- }
- }
- }
這樣客戶端在使用時(shí):
- RedisLimit redisLimit = new RedisLimit.Builder<>(jedisCluster)
- .limit(limit)
- .build();
更加的簡(jiǎn)單直接,并且避免了將創(chuàng)建過程分成了多個(gè)子步驟。
這在有多個(gè)構(gòu)造參數(shù),但又不是必選字段時(shí)很有作用。
因此順便將分布式鎖的構(gòu)建器方式也一并更新了:
https://github.com/crossoverJie/distributed-redis-tool#features
API
從上文可以看出,使用過程就是調(diào)用 limit 方法。
- //限流
- boolean limit = redisLimit.limit();
- if (!limit){
- //具體限流邏輯
- }
為了減少侵入性,也為了簡(jiǎn)化客戶端提供了兩種注解方式。
@ControllerLimit
該注解可以作用于 @RequestMapping 修飾的接口中,并會(huì)在限流后提供限流響應(yīng)。
實(shí)現(xiàn)如下:
- @Component
- public class WebIntercept extends WebMvcConfigurerAdapter {
- private static Logger logger = LoggerFactory.getLogger(WebIntercept.class);
- @Autowired
- private RedisLimit redisLimit;
- @Override
- public void addInterceptors(InterceptorRegistry registry) {
- registry.addInterceptor(new CustomInterceptor())
- .addPathPatterns("/**");
- }
- private class CustomInterceptor extends HandlerInterceptorAdapter {
- @Override
- public boolean preHandle(HttpServletRequest request, HttpServletResponse response,
- Object handler) throws Exception {
- if (redisLimit == null) {
- throw new NullPointerException("redisLimit is null");
- }
- if (handler instanceof HandlerMethod) {
- HandlerMethod method = (HandlerMethod) handler;
- ControllerLimit annotation = method.getMethodAnnotation(ControllerLimit.class);
- if (annotation == null) {
- //skip
- return true;
- }
- boolean limit = redisLimit.limit();
- if (!limit) {
- logger.warn("request has bean limit");
- response.sendError(500, "request limit");
- return false;
- }
- }
- return true;
- }
- }
- }
其實(shí)就是實(shí)現(xiàn)了 SpringMVC 中的攔截器,并在攔截過程中判斷是否有使用注解,從而調(diào)用限流邏輯。
前提是應(yīng)用需要掃描到該類,讓 Spring 進(jìn)行管理。
- @ComponentScan(value = "com.crossoverjie.distributed.intercept")
@CommonLimit
當(dāng)然也可以在普通方法中使用。實(shí)現(xiàn)原理則是 Spring AOP (SpringMVC 的攔截器本質(zhì)也是 AOP)。
- @Aspect
- @Component
- @EnableAspectJAutoProxy(proxyTargetClass = true)
- public class CommonAspect {
- private static Logger logger = LoggerFactory.getLogger(CommonAspect.class);
- @Autowired
- private RedisLimit redisLimit ;
- @Pointcut("@annotation(com.crossoverjie.distributed.annotation.CommonLimit)")
- private void check(){}
- @Before("check()")
- public void before(JoinPoint joinPoint) throws Exception {
- if (redisLimit == null) {
- throw new NullPointerException("redisLimit is null");
- }
- boolean limit = redisLimit.limit();
- if (!limit) {
- logger.warn("request has bean limit");
- throw new RuntimeException("request has bean limit") ;
- }
- }
- }
很簡(jiǎn)單,也是在攔截過程中調(diào)用限流。
當(dāng)然使用時(shí)也得掃描到該包:
- @ComponentScan(value = "com.crossoverjie.distributed.intercept")
總結(jié)
限流在一個(gè)高并發(fā)大流量的系統(tǒng)中是保護(hù)應(yīng)用的一個(gè)利器,成熟的方案也很多,希望對(duì)剛了解這一塊的朋友提供一些思路。