自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

性能優(yōu)化之Hystrix請(qǐng)求合并&自實(shí)現(xiàn)簡(jiǎn)化版本

開(kāi)發(fā) 前端
在業(yè)務(wù)開(kāi)發(fā)過(guò)程中,存在這樣的場(chǎng)景:程序接收到數(shù)據(jù)后,調(diào)用其他接口再將數(shù)據(jù)轉(zhuǎn)發(fā)出去;如果接收一條轉(zhuǎn)發(fā)一條,效率是比較低的,所以一個(gè)思路是先將數(shù)據(jù)緩存起來(lái),緩存到一定數(shù)量后一次性轉(zhuǎn)發(fā)出去。

背景介紹

在業(yè)務(wù)開(kāi)發(fā)過(guò)程中,存在這樣的場(chǎng)景:程序接收到數(shù)據(jù)后,調(diào)用其他接口再將數(shù)據(jù)轉(zhuǎn)發(fā)出去;如果接收一條轉(zhuǎn)發(fā)一條,效率是比較低的,所以一個(gè)思路是先將數(shù)據(jù)緩存起來(lái),緩存到一定數(shù)量后一次性轉(zhuǎn)發(fā)出去。

有優(yōu)點(diǎn)就有缺點(diǎn),需要根據(jù)業(yè)務(wù)場(chǎng)景進(jìn)行考量:

  • 在QPS較小的情況下,達(dá)到閾值的等待時(shí)間較長(zhǎng),造成數(shù)據(jù)延遲較大
  • 在應(yīng)用發(fā)布的時(shí)候,緩存的數(shù)據(jù)存在丟失的可能性
  • 在應(yīng)用非正常down掉的情況下,緩存的數(shù)據(jù)存在丟失的可能性

下面內(nèi)容是對(duì)Hystrix請(qǐng)求合并及根據(jù)Hystrix請(qǐng)求合并原理自定義實(shí)現(xiàn)的簡(jiǎn)化版本。

Hystrix請(qǐng)求合并

什么是請(qǐng)求合并

Without Collapsing

without collapsing

With Collapsing

with collapsing

請(qǐng)求合并設(shè)計(jì)思路

design

Hystrix使用示例

示例采用Spring-Boot編寫(xiě),下面代碼拷貝到工程中可以直接運(yùn)行。

添加依賴(lài)

下面是spring與hystrix集成的依賴(lài)pom。

pom

<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-hystrix</artifactId>
<version>2.1.6.RELEASE</version>
</dependency>

啟動(dòng)類(lèi)

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.netflix.hystrix.EnableHystrix;

@EnableHystrix
@SpringBootApplication
public class Application {
public static void main(String[] args){
SpringApplication.run(Application.class, args);
}
}

使用示例

HystrixController

import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.Resource;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;

@RestController
public class HystrixController {
@Resource
private HystrixService hystrixService;

@RequestMapping("/byid")
public Long byId(Long id) throws InterruptedException, ExecutionException {
Future<Long> future = hystrixService.byId(id);
return future.get();
}
}

HystrixService

import org.springframework.stereotype.Service;

import java.util.List;
import java.util.concurrent.Future;
import com.netflix.hystrix.contrib.javanica.annotation.HystrixCollapser;
import com.netflix.hystrix.contrib.javanica.annotation.HystrixCommand;
import com.netflix.hystrix.contrib.javanica.annotation.HystrixProperty;

@Service
public class HystrixService {
@HystrixCollapser(batchMethod="byIds",scope= com.netflix.hystrix.HystrixCollapser.Scope.GLOBAL,
collapserProperties={
@HystrixProperty(name="maxRequestsInBatch",value="10"),
@HystrixProperty(name="timerDelayInMilliseconds",value="1000")
})
public Future<Long> byId(Long id){
return null;
}

@HystrixCommand
public List<Long> byIds(List<Long> ids){
System.out.println(ids);
return ids;
}
}

測(cè)試類(lèi)

發(fā)送請(qǐng)求進(jìn)行驗(yàn)證。

import org.apache.http.HttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class HystrixTest {

public static void main(String[] args) throws Exception{
CloseableHttpClient httpClient = HttpClients.custom().setMaxConnPerRoute(100).build();
String url = "http://localhost:8086/byid?id=";
ExecutorService executorService = Executors.newFixedThreadPool(20);

int requestCount = 20;
for(int i = 0;i < requestCount;i++){
final int id = i;
executorService.execute(new Runnable() {
@Override
public void run(){
try{
HttpGet httpGet = new HttpGet(url+ id);
HttpResponse response = httpClient.execute(httpGet);
System.out.println(response);
}catch (Exception e){
e.printStackTrace();
}
}
});
}

Thread.sleep(1000*10);
executorService.shutdown();
httpClient.close();
}
}

簡(jiǎn)化版本實(shí)現(xiàn)

由于Hystrix已不再維護(hù),同時(shí)考慮到Hystrix使用RxJava的學(xué)習(xí)門(mén)檻,根據(jù)HystrixCollapser設(shè)計(jì)思路及常見(jiàn)業(yè)務(wù)功能需求實(shí)現(xiàn)了一個(gè)簡(jiǎn)化版本。

實(shí)現(xiàn)

RequestCollapserFactory

import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;

public class RequestCollapserFactory {
private static RequestCollapserFactory factory;
private static final int MAXBATCHSIZE = 20;
private static final long DELAY = 1000L;
private static final long PERIOD = 500L;
private ConcurrentHashMap<String,RequestCollapser> collapsers;
private ScheduledExecutorService executor;

private RequestCollapserFactory(){
collapsers = new ConcurrentHashMap<>();
ThreadFactory threadFactory = new ThreadFactory() {
final AtomicInteger counter = new AtomicInteger();
@Override
public Thread newThread(Runnable r){
Thread thread = new Thread(r, "RequestCollapserTimer-" + counter.incrementAndGet());
thread.setDaemon(true);
return thread;
}
};
executor = new ScheduledThreadPoolExecutor(Runtime.getRuntime().availableProcessors(), threadFactory);
}

public static RequestCollapserFactory getInstance(){
if(factory != null){
return factory;
}
synchronized (RequestCollapserFactory.class){
if(factory != null){
return factory;
}
factory = new RequestCollapserFactory();
}
return factory;
}

public RequestCollapser getRequestCollapser(String key,RequestBatch requestBatch){
return getRequestCollapser(key,requestBatch,MAXBATCHSIZE,DELAY,PERIOD);
}
public RequestCollapser getRequestCollapser(String key,RequestBatch requestBatch,int maxBatchSize){
return getRequestCollapser(key,requestBatch,maxBatchSize,DELAY,PERIOD);
}
public RequestCollapser getRequestCollapser(String key,RequestBatch requestBatch,int maxBatchSize,long delay, long period){
RequestCollapser collapser = collapsers.get(key);
if(collapser != null){
return collapser;
}

synchronized (collapsers){
collapser = collapsers.get(key);
if(collapser != null){
return collapser;
}
collapser = new RequestCollapser(requestBatch,maxBatchSize,delay,period,executor);
collapsers.put(key,collapser);
return collapser;
}
}
}

RequestCollapser

import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

public class RequestCollapser {
private int maxBatchSize;
private long delay;
private long period;
private LinkedBlockingQueue<Object> queue = new LinkedBlockingQueue();
private ScheduledExecutorService executor;
private RequestBatch requestBatch;
private AtomicBoolean timerscheduled = new AtomicBoolean();
protected RequestCollapser(RequestBatch requestBatch,int maxBatchSize,long delay,long period,ScheduledExecutorService executor){
if(requestBatch == null){
throw new IllegalArgumentException("requestBatch can not be null");
}
this.maxBatchSize = maxBatchSize;
this.delay = delay;
this.period = period;
this.executor = executor;
this.requestBatch = requestBatch;
}

public List<Object> submitRequest(Object obj,boolean ifFullThenBatchExecute){
if(timerscheduled.compareAndSet(false,true)){
this.startSchedule();
}
List<Object> objectList = null;
synchronized (queue){
if(obj instanceof Collection){
queue.addAll((Collection)obj);
}else {
queue.offer(obj);
}

if(queue.size() >= this.maxBatchSize){
objectList = new LinkedList<>();
queue.drainTo(objectList);
}
}

if(!ifFullThenBatchExecute){
return objectList;
}

this.doBatch(objectList);
return null;
}

private boolean doBatch(List<Object> objectList){
if(objectList == null){
return true;
}
try{
return requestBatch.batch(objectList);
}catch (Throwable t){
t.printStackTrace();
}
return false;
}

private void startSchedule(){
Runnable r = new Runnable() {
@Override
public void run(){
List<Object> objectList = null;
synchronized (queue){
if(queue.size() > 0) {
objectList = new LinkedList<>();
queue.drainTo(objectList);
}
}
doBatch(objectList);
}
};

this.executor.scheduleAtFixedRate(r,this.delay,this.period, TimeUnit.MILLISECONDS);
}
}

RequestBatch

import java.util.List;

public interface RequestBatch {
boolean batch(List<Object> objectList);
}

驗(yàn)證測(cè)試

RequestCollapserTest

import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;

public class RequestCollapserTest {
private static AtomicInteger counter = new AtomicInteger();
private static long delay = 1;
private static long period = 1;
private static int maxBatchSize = 20;
private static int requestCount = 50000;
private static RequestCollapserFactory factory = RequestCollapserFactory.getInstance();
private static RequestBatch requestBatch = new RequestBatch() {
@Override
public boolean batch(List<Object> objectList){
int size = objectList.size();
counter.addAndGet(size);
System.out.println(counter + ":::::" + size + ":::::" + objectList);
return true;
}
};

public static void main(String[] args) throws Exception{
//sync();
async();
}

public static void async() throws Exception{
ExecutorService executorService = Executors.newFixedThreadPool(20);
CountDownLatch countDownLatch = new CountDownLatch(requestCount);
for(int i = 0;i < requestCount;i++){
final int id = i;
executorService.execute(new Runnable() {
@Override
public void run(){
try{
RequestCollapser requestCollapser =
factory.getRequestCollapser("1",requestBatch,maxBatchSize,delay,period);
requestCollapser.submitRequest(id,true);
}catch (Exception e){
e.printStackTrace();
}
countDownLatch.countDown();
}
});
}

executorService.shutdown();
countDownLatch.await();
Thread.sleep(1000);
System.out.println(counter.get());
}

public static void sync() throws Exception{
for(int i = 0;i < requestCount;i++){
final int id = i;
RequestCollapser requestCollapser =
factory.getRequestCollapser("1",requestBatch,maxBatchSize,delay,period);
requestCollapser.submitRequest(id,true);
}
Thread.sleep(1000);
System.out.println(counter.get());
}
}


責(zé)任編輯:武曉燕 來(lái)源: 今日頭條
相關(guān)推薦

2023-02-03 15:16:42

SpringHystrix

2017-12-01 08:54:18

SpringCloudHystrix

2023-11-28 12:49:01

AI訓(xùn)練

2017-04-03 21:52:30

隔離線程池分布式

2010-05-17 15:50:06

2021-07-29 14:20:34

網(wǎng)絡(luò)優(yōu)化移動(dòng)互聯(lián)網(wǎng)數(shù)據(jù)存儲(chǔ)

2011-02-13 09:37:55

ASP.NET

2022-02-16 14:10:51

服務(wù)器性能優(yōu)化Linux

2021-11-29 11:13:45

服務(wù)器網(wǎng)絡(luò)性能

2009-12-17 15:59:44

VS2010簡(jiǎn)化版

2015-03-16 14:09:33

GoogleUbuntuDocker

2009-06-01 09:04:15

Windows 7微軟操作系統(tǒng)

2009-06-30 11:23:02

性能優(yōu)化

2018-01-09 16:56:32

數(shù)據(jù)庫(kù)OracleSQL優(yōu)化

2019-12-13 10:25:08

Android性能優(yōu)化啟動(dòng)優(yōu)化

2023-07-19 12:24:48

C++constexpr?語(yǔ)句

2013-02-20 14:32:37

Android開(kāi)發(fā)性能

2011-07-11 15:26:49

性能優(yōu)化算法

2025-01-20 09:09:59

2022-05-26 00:00:00

網(wǎng)絡(luò)請(qǐng)求合并優(yōu)化
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)