Disruptor 有哪些典型的使用場景?
大家好,我是君哥。
Disruptor 是一款高性能的內(nèi)存有界隊列,它通過內(nèi)存預(yù)分配、無鎖并發(fā)、解決偽共享問題、使用 RingBuffer 取代阻塞隊列等措施來大幅提升隊列性能。
但開發(fā)者們往往對它的使用場景不太了解,到底應(yīng)該在哪些場景使用呢?今天咱們就來聊一聊 Disruptor 的使用場景。
Disruptor 是一個生產(chǎn)-消費模式的隊列,這里我們使用官網(wǎng)的示例,生產(chǎn)者發(fā)送一個 long 類型的變量,消費者收到消息后把變量打印出來。首先定義消息體:
public class LongEvent {
private long value;
public void set(long value)
{
this.value = value;
}
@Override
public String toString()
{
return "LongEvent{" + "value=" + value + '}';
}
}
為了讓 Disruptor 給消息預(yù)先分配內(nèi)存,定義一個 EventFactory,代碼如下:
public class LongEventFactory implements EventFactory<LongEvent>
{
@Override
public LongEvent newInstance()
{
return new LongEvent();
}
}
下面定義個消費者 LongEventHandler:
public class LongEventHandler implements EventHandler<LongEvent>
{
private String consumer;
public LongEventHandler(String consumer) {
this.consumer = consumer;
}
@Override
public void onEvent(LongEvent event, long sequence, boolean endOfBatch)
{
System.out.println("consumer: " + consumer + ",Event: " + event);
}
}
1.廣播場景
廣播場景在我們的開發(fā)工作中并不少見,比如系統(tǒng)收到上游系統(tǒng)的一個請求消息,然后把這個消息發(fā)送給多個下游系統(tǒng)來處理。Disruptor 支持廣播模式。比如消費者生產(chǎn)的消息由三個消費者來消費:
public class Broadcast {
public static void main(String[] args) throws InterruptedException {
int bufferSize = 1024;
Disruptor<LongEvent> disruptor =
new Disruptor<>(LongEvent::new, bufferSize, DaemonThreadFactory.INSTANCE);
EventHandler<LongEvent> consumer1 = new LongEventHandler("consumer1");
EventHandler<LongEvent> consumer2 = new LongEventHandler("consumer2");
EventHandler<LongEvent> consumer3 = new LongEventHandler("consumer3");
disruptor.handleEventsWith(consumer1, consumer2, consumer3);
disruptor.start();
RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();
ByteBuffer bb = ByteBuffer.allocate(8);
for (long l = 0; true; l++)
{
bb.putLong(0, l);
ringBuffer.publishEvent((event, sequence, buffer) -> event.set(buffer.getLong(0)), bb);
Thread.sleep(1000);
}
}
}
2.日志收集
再來看一個日志收集的例子。這里我們假設(shè)一個場景,業(yè)務(wù)系統(tǒng)集群有 3 個節(jié)點,每個節(jié)點打印的業(yè)務(wù)日志發(fā)送到 Disruptor,Disruptor 下游有 3 個消費者負(fù)責(zé)日志收集。
這里我們需要重新定義一個日志收集處理類,代碼如下:
public class LogCollectHandler implements WorkHandler<LongEvent> {
public LogCollectHandler(String consumer) {
this.consumer = consumer;
}
private String consumer;
@Override
public void onEvent(LongEvent event)
{
System.out.println("consumer: " + consumer + ",Event: " + event);
}
}
下面這個代碼是綁定消費者的代碼:
public static void main(String[] args) throws InterruptedException {
int bufferSize = 1024;
Disruptor<LongEvent> disruptor =
new Disruptor<>(LongEvent::new, bufferSize, DaemonThreadFactory.INSTANCE);
WorkHandler<LongEvent> consumer1 = new LogCollectHandler("consumer1");
WorkHandler<LongEvent> consumer2 = new LogCollectHandler("consumer2");
WorkHandler<LongEvent> consumer3 = new LogCollectHandler("consumer3");
disruptor.handleEventsWithWorkerPool(consumer1, consumer2, consumer3);
disruptor.start();
}
需要注意的是,上面使用的是 Disruptor 的 handleEventsWithWorkerPool 方法,使用的消費者不是 EventHandler,而是 WorkHandler。消費者組里面的消費者如果是 WorkHandler,那消費者之間就是有競爭的,比如一個 Event 已經(jīng)被 consumer1 消費過,那就不再會被其他消費者消費了。消費者組里面的消費者如果是 EventHandler,那消費者之間是沒有競爭的,所有消息都會消費。
3.責(zé)任鏈
責(zé)任鏈這種設(shè)計模式我們都比較熟悉了,同一個對象的處理有多個不同的邏輯,每個邏輯作為一個節(jié)點組成責(zé)任鏈,比如收到一條告警消息,處理節(jié)點分為:給開發(fā)人員發(fā)送郵件、給運維人員發(fā)送短信、給業(yè)務(wù)人員發(fā)送 OA 消息。
Disruptor 支持鏈?zhǔn)教幚硐?,看下面的示例代碼:
public static void main(String[] args) throws InterruptedException {
int bufferSize = 1024;
Disruptor<LongEvent> disruptor =
new Disruptor<>(LongEvent::new, bufferSize, DaemonThreadFactory.INSTANCE);
EventHandler<LongEvent> consumer1 = new LongEventHandler("consumer1");
EventHandler<LongEvent> consumer2 = new LongEventHandler("consumer2");
EventHandler<LongEvent> consumer3 = new LongEventHandler("consumer3");
disruptor.handleEventsWith(consumer1).then(consumer2).then(consumer3);
disruptor.start();
}
Disruptor 也支持多個并行責(zé)任鏈,下圖是 2 條責(zé)任鏈的場景:
這里給出一個示例代碼:
public static void main(String[] args) throws InterruptedException {
int bufferSize = 1024;
Disruptor<LongEvent> disruptor =
new Disruptor<>(LongEvent::new, bufferSize, DaemonThreadFactory.INSTANCE);
EventHandler<LongEvent> consumer1 = new LongEventHandler("consumer1");
EventHandler<LongEvent> consumer2 = new LongEventHandler("consumer2");
EventHandler<LongEvent> consumer3 = new LongEventHandler("consumer3");
EventHandler<LongEvent> consumer4 = new LongEventHandler("consumer4");
EventHandler<LongEvent> consumer5 = new LongEventHandler("consumer5");
EventHandler<LongEvent> consumer6 = new LongEventHandler("consumer6");
disruptor.handleEventsWith(consumer1).then(consumer2).then(consumer3);
disruptor.handleEventsWith(consumer4).then(consumer5).then(consumer6);
disruptor.start();
}
4.多任務(wù)協(xié)作
一個經(jīng)典的例子,我們在泡咖啡之前,需要燒水、洗被子、磨咖啡粉,這三個步驟可以并行,但是需要等著三步都完成之后,才可以泡咖啡。
當(dāng)然,這個例子可以用 Java 中的 CompletableFuture 來實現(xiàn),代碼如下:
public static void main(String[] args){
ExecutorService executor = ...;
CompletableFuture future1 = CompletableFuture.runAsync(() -> {
try {
washCup();
} catch (InterruptedException e) {
e.printStackTrace();
}
}, executor);
CompletableFuture future2 = CompletableFuture.runAsync(() -> {
try {
hotWater();
} catch (InterruptedException e) {
e.printStackTrace();
}
}, executor);
CompletableFuture future3 = CompletableFuture.runAsync(() -> {
try {
grindCoffee();
} catch (InterruptedException e) {
e.printStackTrace();
}
}, executor);
CompletableFuture.allOf(future1, future2, future3).thenAccept(
r -> {
System.out.println("泡咖啡");
}
);
System.out.println("我是主線程");
}
同樣,使用 Disruptor 也可以實現(xiàn)這個場景,看下面代碼:
public static void main(String[] args) throws InterruptedException {
int bufferSize = 1024;
Disruptor<LongEvent> disruptor =
new Disruptor<>(LongEvent::new, bufferSize, DaemonThreadFactory.INSTANCE);
EventHandler<LongEvent> consumer1 = new LongEventHandler("consumer1");
EventHandler<LongEvent> consumer2 = new LongEventHandler("consumer2");
EventHandler<LongEvent> consumer3 = new LongEventHandler("consumer3");
EventHandler<LongEvent> consumer4 = new LongEventHandler("consumer4");
disruptor.handleEventsWith(consumer1, consumer2, consumer3).then(consumer4);
disruptor.start();
}
5.多消費者組
類比主流消息隊列的場景,Disruptor 也可以實現(xiàn)多消費者組的場景,組間并行消費互不影響,組內(nèi)消費者競爭消息,如下圖:
示例代碼如下:
public static void main(String[] args) throws InterruptedException {
int bufferSize = 1024;
Disruptor<LongEvent> disruptor =
new Disruptor<>(LongEvent::new, bufferSize, DaemonThreadFactory.INSTANCE);
WorkHandler<LongEvent> consumer1 = new LogWorkHandler("consumer1");
WorkHandler<LongEvent> consumer2 = new LogWorkHandler("consumer2");
WorkHandler<LongEvent> consumer3 = new LogWorkHandler("consumer3");
WorkHandler<LongEvent> consumer4 = new LogWorkHandler("consumer4");
WorkHandler<LongEvent> consumer5 = new LogWorkHandler("consumer5");
WorkHandler<LongEvent> consumer6 = new LogWorkHandler("consumer6");
disruptor.handleEventsWithWorkerPool(consumer1, consumer2, consumer3);
disruptor.handleEventsWithWorkerPool(consumer4, consumer5, consumer6);
disruptor.start();
}
6.總結(jié)
通過消費者的靈活組合,Disruptor 的使用場景非常豐富。本文介紹了 Disruptor 的 5 個典型使用場景。在選型的時候,除了使用場景,更多地要考慮到 Disruptor 作為高性能內(nèi)存隊列的這個特點。