引入了 Disruptor 后,系統(tǒng)性能大幅提升!
大家好,我是君哥。
Disruptor 是一個(gè)很受歡迎的內(nèi)存消息隊(duì)列,它源于 LMAX 對(duì)并發(fā)、性能和非阻塞算法的研究。今天一起來(lái)學(xué)習(xí)一下這個(gè)消息隊(duì)列。
簡(jiǎn)介
對(duì)于主流的分布式消息隊(duì)列來(lái)說(shuō),一般會(huì)包含 Producer、Broker、Consumer、注冊(cè)中心等模塊。比如 RocketMQ 架構(gòu)如下:
圖片
Disruptor 并不是分布式消息隊(duì)列,它是一款內(nèi)存消息隊(duì)列,因此架構(gòu)上跟分布式消息隊(duì)列有很大差別。下面是一張 LMAX 使用 Disruptor 的案例圖:
圖片
我們介紹一下 Disruptor 架構(gòu)中的核心概念。
1.1 Ring Buffer
Ring Buffer 通常被認(rèn)為是 Disruptor 的最主要的設(shè)計(jì),但是從 3.0 版本開(kāi)始,Ring Buffer 只負(fù)責(zé)存儲(chǔ)和更新經(jīng)過(guò) Disruptor 的數(shù)據(jù)。在一些高級(jí)的使用場(chǎng)景,它甚至完全可以被用戶(hù)替換。
1.2 Sequence
Disruptor 使用 Sequence 來(lái)識(shí)別特定組件的位置。每個(gè) Consumer(也就是事件處理器)都像 Disruptor 一樣持有一個(gè) Sequence。并發(fā)相關(guān)的核心代碼依賴(lài) Sequence 的自增值,因此 Sequence 具有跟 AtomicLong 相似的特性,事實(shí)上唯一的不同就是不同的 Sequence 之間不存在偽共享問(wèn)題。
偽共享:CPU 緩存是以緩存行為單位進(jìn)行加載和存儲(chǔ),CPU 每次從主存中拉取數(shù)據(jù)時(shí),會(huì)把相鄰的數(shù)據(jù)也存入同一個(gè)緩存行。即使多個(gè)線(xiàn)程操作的是同一緩存行中不同的變量,只要有一個(gè)線(xiàn)程修改了緩存行中的某一個(gè)變量值,該緩存行就會(huì)被標(biāo)記為無(wú)效,需要重新從主從中加載。在多線(xiàn)程環(huán)境下,頻繁地重新加載緩存行,會(huì)嚴(yán)重影響了程序執(zhí)行效率。
1.3 Sequencer
Sequencer 是 Disrupter 的真正核心,有單個(gè)生產(chǎn)者和多個(gè)生產(chǎn)者兩種實(shí)現(xiàn)(SingleProducerSequencer 和 MultiProducerSequencer)。為了讓數(shù)據(jù)在生產(chǎn)者和消費(fèi)者之間快速、準(zhǔn)確地傳輸,它們都實(shí)現(xiàn)了所有并發(fā)算法。
1.4 Sequence Barrier
Sequencer 生成一個(gè) Sequence Barrier,它包含由 Sequencer 生成的 Sequence 和消費(fèi)者擁有的 Sequence 的引用。Sequence Barrier 決定是否有事件給消費(fèi)者處理。
1.5 Wait Strategy
消費(fèi)者怎樣等待事件的到來(lái)。
1.6 Event Processor
主要負(fù)責(zé)循環(huán)處理來(lái)自 Disruptor 事件,它擁有消費(fèi)者 Sequence 的所有權(quán)。有一個(gè)單獨(dú)的實(shí)現(xiàn)類(lèi) BatchEventProcessor,這個(gè)類(lèi)擁有高效的事件循環(huán)處理能力并且處理完成后可以回調(diào)實(shí)現(xiàn) EventHandler 接口的用戶(hù)。
1.7 Event Handler
由用戶(hù)來(lái)實(shí)現(xiàn)并且代表 Disruptor 消費(fèi)者的接口。
2 Disruptor 特性
2.1 多播事件
多播事件是 Disruptor 區(qū)別于其他隊(duì)列的最大差異。其他隊(duì)列都是一個(gè)事件消息只能被單個(gè)消費(fèi)者消費(fèi),而 Disruptor 如果有多個(gè)消費(fèi)者監(jiān)聽(tīng),則可以將所有事件消息發(fā)送給所有消費(fèi)者。
在前面 LMAX 使用 Disruptor 的案例圖中,有 JournalConsumer、ReplicationConsumer 和 ApplicationConsumer 三個(gè)消費(fèi)者監(jiān)聽(tīng)了 Disruptor,這三個(gè)消費(fèi)者將收到來(lái)了 Disruptor 的所有消息。
2.2 消費(fèi)者依賴(lài)關(guān)系圖
為了支持并發(fā)處理在實(shí)際業(yè)務(wù)場(chǎng)景中的需要,有時(shí)消費(fèi)者直接需要做協(xié)調(diào)。再回到前面 LMAX 使用 Disruptor 的案例,在 journalling 和 replication 這兩個(gè)消費(fèi)者處理完成之前,有必要阻止業(yè)務(wù)邏輯消費(fèi)者開(kāi)始處理。我們稱(chēng)這個(gè)特征為“gating”(或者更準(zhǔn)確地說(shuō),該特征是“gating”的一種形式)。
首先,確保生產(chǎn)者數(shù)量不會(huì)超過(guò)消費(fèi)者。這通過(guò)調(diào)用 RingBuffer.addGatingConsumers()來(lái)將相關(guān)消費(fèi)者添加到 Disruptor。其次,消費(fèi)者依賴(lài)關(guān)系的實(shí)現(xiàn)是通過(guò)構(gòu)建一個(gè) SequenceBarrier,SequenceBarrier 擁有需要在它前面完成處理邏輯的消費(fèi)者的 Sequence。
就拿前面 LMAX 使用 Disruptor 的案例來(lái)說(shuō),ApplicationConsumer 的 SequenceBarrier 擁有 JournalConsumer 和 ReplicationConsumer 這 2 個(gè)消費(fèi)者的 Sequence,所以 ApplicationConsumer 對(duì) JournalConsumer 和 ReplicationConsumer 的依賴(lài)關(guān)系可以從 SequenceBarrier 到 Sequence 的連接中看到。
Sequencer 和下游消費(fèi)者的關(guān)系也需要注意。Sequencer 的一個(gè)角色就是發(fā)布的事件消息不能超出 Ring Buffer。這就要求下游消費(fèi)者的 Sequence 不能小于 Ring Buffer 的 Sequence,也不能小于 Ring Buffer 的大小。
上面圖中,因?yàn)?ApplicationConsumer 的 Sequence 必須要保證小于等于 JournalConsumer 和 ReplicationConsumer 的 Sequence,因此 Sequencer 只需要關(guān)心 ApplicationConsumer 的 Sequence。
2.3 內(nèi)存預(yù)分配
Disruptor 的目標(biāo)是低延遲,因此減少或者去除內(nèi)存分配是必要的。在基于 Java 的系統(tǒng)中,目標(biāo)是減少 STW 次數(shù)。
為了支持這一點(diǎn),用戶(hù)可以在 Disruptor 中預(yù)分配事件所需的內(nèi)存。在預(yù)分配內(nèi)存時(shí),用戶(hù)提供的 EventFactory 將對(duì) Ring Buffer 的所有元素進(jìn)行調(diào)用。當(dāng)生產(chǎn)者向 Disruptor 發(fā)送新的事件消息時(shí),Disruptor 的 API 允許用戶(hù)使用構(gòu)造好的對(duì)象,他們可以調(diào)用對(duì)象的方法或者更新對(duì)象的字段。Disruptor 需要確保并發(fā)安全。
2.4 無(wú)鎖并發(fā)
Disruptor 實(shí)現(xiàn)低延遲的另一個(gè)關(guān)鍵方法時(shí)使用無(wú)鎖算法,通過(guò)使用內(nèi)存屏障和 CAS 來(lái)實(shí)現(xiàn)內(nèi)存可見(jiàn)性和正確性。Disruptor 唯一使用鎖的地方就是在 BlockingWaitStrategy。
3 調(diào)優(yōu)選項(xiàng)
雖然大多數(shù)場(chǎng)景下 Disruptor 可以表現(xiàn)出優(yōu)秀的性能,但是仍然有一些調(diào)優(yōu)參數(shù)可以改進(jìn) Disruptor 的性能。
3.1 單個(gè)/多個(gè)生產(chǎn)者
Disruptor<LongEvent> disruptor = new Disruptor(
factory,
bufferSize,
DaemonThreadFactory.INSTANCE,
ProducerType.SINGLE,
new BlockingWaitStrategy()
);
上面是 disruptor 的構(gòu)造函數(shù),ProducerType.SINGLE 表示創(chuàng)建單生產(chǎn)者的 Sequencer,ProducerType.MULTI 表示創(chuàng)建多生產(chǎn)者的 Sequencer。
在并發(fā)系統(tǒng)中提高系統(tǒng)性能的最好方式是遵循單寫(xiě)原則。下面是官方的一個(gè) disruptor 吞吐量測(cè)試結(jié)果,測(cè)試環(huán)境是 i7 Sandy Bridge MacBook Air。
單生產(chǎn)者:
圖片
多生產(chǎn)者:
圖片
3.2 等待策略
- BlockingWaitStrategy
disruptor 的默認(rèn)等待策略是 BlockingWaitStrategy,這種策略使用鎖和喚醒鎖的 Condition 變量。
- SleepingWaitStrategy
跟 BlockingWaitStrategy 策略類(lèi)似,他是通過(guò) LockSupport.parkNanos(1) 方法來(lái)實(shí)現(xiàn)等待,不需要給 Condition 變量發(fā)送信號(hào)來(lái)喚醒等待。
主要適用于對(duì)延時(shí)要求不高的場(chǎng)景,比如異步打印日志。
- YieldingWaitStrategy
YieldingWaitStrategy 策略使用 Busy spin(不釋放 CPU 資源,通過(guò)循環(huán)檢查條件直到條件滿(mǎn)足為止)技術(shù)來(lái)等待 sequence 增長(zhǎng)到一個(gè)合適的值。在循環(huán)內(nèi)部會(huì)調(diào)用 Thread#yield() 方法允許其他排隊(duì)線(xiàn)程去執(zhí)行。
這種策略主要用于通過(guò)消耗 CPU 來(lái)實(shí)現(xiàn)低延遲的場(chǎng)景。當(dāng) EventHandler 數(shù)量消息邏輯 CPU 核數(shù)并且對(duì)延遲要求較高時(shí),可以考慮這種等待策略。
- BusySpinWaitStrategy
BusySpinWaitStrategy 是性能最高的等待策略,它適用于低延遲系統(tǒng),但是對(duì)部署環(huán)境要求很高。
這種等待策略的唯一適用場(chǎng)景是當(dāng) EventHandler 數(shù)量消息邏輯 CPU 核數(shù)并且超線(xiàn)程被禁用。
4 官方示例
下面是一個(gè)官方示例。這個(gè)例子比較簡(jiǎn)單,就是生產(chǎn)者向消費(fèi)者發(fā)送一個(gè) long 類(lèi)型的值。
- 首先定義一個(gè) Event。
public class LongEvent
{
private long value;
public void set(long value)
{
this.value = value;
}
@Override
public String toString()
{
return "LongEvent{" + "value=" + value + '}';
}
}
- 為了能讓 Disruptor 預(yù)分配內(nèi)存,這里定義一個(gè) LongEventFactory。
public class LongEventFactory implements EventFactory<LongEvent>
{
@Override
public LongEvent newInstance()
{
return new LongEvent();
}
}
- 創(chuàng)建一個(gè)消費(fèi)者來(lái)處理事件
public class LongEventHandler implements EventHandler<LongEvent>
{
@Override
public void onEvent(LongEvent event, long sequence, boolean endOfBatch)
{
System.out.println("Event: " + event);
}
}
- 編寫(xiě)發(fā)送事件消息的邏輯
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.examples.longevent.LongEvent;
import com.lmax.disruptor.util.DaemonThreadFactory;
import java.nio.ByteBuffer;
public class LongEventMain
{
public static void main(String[] args) throws Exception
{
int bufferSize = 1024;
Disruptor<LongEvent> disruptor =
new Disruptor<>(LongEvent::new, bufferSize, DaemonThreadFactory.INSTANCE);
disruptor.handleEventsWith((event, sequence, endOfBatch) ->
System.out.println("Event: " + event));
disruptor.start();
RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();
ByteBuffer bb = ByteBuffer.allocate(8);
for (long l = 0; true; l++)
{
bb.putLong(0, l);
ringBuffer.publishEvent((event, sequence, buffer) -> event.set(buffer.getLong(0)), bb);
Thread.sleep(1000);
}
}
}
5 總結(jié)
作為一款高性能的內(nèi)存隊(duì)列,Disruptor 有不少優(yōu)秀的設(shè)計(jì)思想值得我們學(xué)習(xí),比如內(nèi)存預(yù)分配、無(wú)鎖并發(fā)。同時(shí)它的使用非常簡(jiǎn)單,推薦大家使用。