自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

高性能無鎖隊列 Disruptor 核心原理分析及其在i主題業(yè)務中的應用

移動開發(fā)
本文首先介紹了 Disruptor 高性能內存隊列的基本概念、使用 Demo、高性能原理及源碼分析,最后通過兩個例子介紹了 Disruptor 在i主題業(yè)務中的應用。

一、i主題及 Disruptor 簡介

i主題是 vivo 旗下的一款主題商店 app,用戶可以通過下載主題、壁紙、字體等,實現(xiàn)對手機界面風格的一鍵更換和自定義。

Disruptor 是英國外匯交易公司 LMAX 開發(fā)的一個高性能的內存隊列(用于系統(tǒng)內部線程間傳遞消息,不同于 RocketMQ、Kafka 這種分布式消息隊列),基于 Disruptor 開發(fā)的系統(tǒng)單線程能支撐每秒600萬訂單。目前,包括 Apache Storm、Camel、Log4j 2在內的很多知名項目都應用了 Disruptor 以獲取高性能。在 vivo 內部它也有不少應用,比如自定義監(jiān)控中使用 Disruptor 隊列來暫存通過監(jiān)控 SDK 上報的監(jiān)控數(shù)據(jù),i主題中也使用它來統(tǒng)計本地內存指標數(shù)據(jù)。

接下來從 Disruptor 和 JDK 內置隊列的對比、Disruptor 核心概念、Disruptor 使用Demo、Disruptor 核心源碼、Disruptor 高性能原理、Disruptor 在 i主題業(yè)務中的應用幾個角度來介紹 Disruptor。

二、和 JDK 中內置的隊列對比

下面來看下 JDK 中內置的隊列和 Disruptor 的對比。隊列的底層實現(xiàn)一般分為三種:數(shù)組、鏈表和堆,其中堆一般是為了實現(xiàn)帶有優(yōu)先級特性的隊列,暫不考慮。另外,像 ConcurrentLinkedQueue 、LinkedTransferQueue 屬于無界隊列,在穩(wěn)定性要求特別高的系統(tǒng)中,為了防止生產(chǎn)者速度過快,導致內存溢出,只能選擇有界隊列。這樣 JDK 中剩下可選的線程安全的隊列還有ArrayBlockingQueue 

和 LinkedBlockingQueue。

由于 LinkedBlockingQueue 是基于鏈表實現(xiàn)的,由于鏈表存儲的數(shù)據(jù)在內存里不連續(xù),對于高速緩存并不友好,而且 LinkedBlockingQueue 是加鎖的,性能較差。ArrayBlockingQueue 有同樣的問題,它也需要加鎖,另外,ArrayBlockingQueue 存在偽共享問題,也會導致性能變差。而今天要介紹的 Disruptor 是基于數(shù)組的有界無鎖隊列,符合空間局部性原理,可以很好的利用 CPU 的高速緩存,同時它避免了偽共享,大大提升了性能。

圖片

三、Disruptor 核心概念

如下圖,從數(shù)據(jù)流轉的角度先對 Disruptor 有一個直觀的概念。Disruptor 支持單(多)生產(chǎn)者、單(多)消費者模式。消費時支持廣播消費(HandlerA 會消費處理所有消息,HandlerB 也會消費處理所有消息)、集群消費(HandlerA 和 HandlerB 各消費部分消息),HandlerA 和HandlerB 消費完成后會把消息交給 HandlerC 繼續(xù)處理。

圖片

下面結合 Disruptor 官方的架構圖介紹下 Disruptor 的核心概念:

  • RingBuffer:前文說 Disruptor 是一個高性能內存內存隊列,而 RingBuffer 就是該內存隊列的數(shù)據(jù)結構,它是一個環(huán)形數(shù)組,是承載數(shù)據(jù)的載體。
  • Producer:Disruptor 是典型的生產(chǎn)者消費者模型。因此生產(chǎn)者是 Disruptor 編程模型中的核心組成,可以是單生產(chǎn)者,也可以多生產(chǎn)者。
  • Event:具體的數(shù)據(jù)實體,生產(chǎn)者生產(chǎn) Event ,存入 RingBuffer,消費者從 RingBuffer 中消費它進行邏輯處理。
  • Event Handler:開發(fā)者需要實現(xiàn) EventHandler 接口定義消費者處理邏輯。
  • Wait Strategy:等待策略,定義了當消費者無法從 RingBuffer 獲取數(shù)據(jù)時,如何等待。
  • Event Processor:事件循環(huán)處理器,EventProcessor 繼承了 Runnable 接口,它的子類實現(xiàn)了 run 方法,內部有一個 while 循環(huán),不斷嘗試從 RingBuffer 中獲取數(shù)據(jù),交給 EventHandler 去處理。
  • Sequence:RingBuffer 是一個數(shù)組,Sequence (序號)就是用來標記生產(chǎn)者數(shù)據(jù)生產(chǎn)到哪了,消費者數(shù)據(jù)消費到哪了。
  • Sequencer:分為單生產(chǎn)者和多生產(chǎn)者兩種實現(xiàn),生產(chǎn)者發(fā)布數(shù)據(jù)時需要先申請下可用序號,Sequencer 就是用來協(xié)調申請序號的。
  • Sequence Barrier:見下文分析。

圖片

四、Disruptor使用Demo

4.1 定義 Event

Event 是具體的數(shù)據(jù)實體,生產(chǎn)者生產(chǎn) Event ,存入 RingBuffer,消費者從 RingBuffer 中消費它進行邏輯處理。Event 就是一個普通的 Java 對象,無需實現(xiàn) Disruptor 內定義的接口。

public class OrderEvent {
    private long value;
 
    public long getValue() {
        return value;
    }
 
    public void setValue(long value) {
        this.value = value;
    }
}

4.2 定義 EventFactory

用于創(chuàng)建 Event 對象。

public class OrderEventFactory implements EventFactory<OrderEvent> {
    public OrderEvent newInstance() {
        return new OrderEvent();
    }
}

4.3 定義生產(chǎn)者

可以看到,生成者主要是持有 RingBuffer 對象進行數(shù)據(jù)的發(fā)布。這里有幾個點需要注意:

  • RingBuffer 內部維護了一個 Object 數(shù)組(也就是真正存儲數(shù)據(jù)的容器),在 RingBuffer 初始化時該 Object 數(shù)組就已經(jīng)使用 EventFactory 初始化了一些空 Event,后續(xù)就不需要在運行時來創(chuàng)建了,提高性能。因此這里通過 RingBuffer 獲取指定序號得到的是一個空對象,需要對它進行賦值后,才能進行發(fā)布。
  • 這里通過 RingBuffer 的 next 方法獲取可用序號,如果 RingBuffer 空間不足會阻塞。
  • 通過 next 方法獲取序號后,需要確保接下來使用 publish 方法發(fā)布數(shù)據(jù)。
public class OrderEventProducer {
 
    private RingBuffer<OrderEvent> ringBuffer;
     
    public OrderEventProducer(RingBuffer<OrderEvent> ringBuffer) {
        this.ringBuffer = ringBuffer;
    }
     
    public void sendData(ByteBuffer data) {
        // 1、在生產(chǎn)者發(fā)送消息的時候, 首先需要從我們的ringBuffer里面獲取一個可用的序號
        long sequence = ringBuffer.next();
        try {
            //2、注意此時獲取的OrderEvent對象是一個沒有被賦值的空對象
            OrderEvent event = ringBuffer.get(sequence);
            //3、進行實際的賦值處理
            event.setValue(data.getLong(0));           
        } finally {
            //4、 提交發(fā)布操作
            ringBuffer.publish(sequence);          
        }
    }
}

4.4 定義消費者

消費者可以實現(xiàn) EventHandler 接口,定義自己的處理邏輯。

public class OrderEventHandler implements EventHandler<OrderEvent> {
 
    public void onEvent(OrderEvent event,
                        long sequence,
                        boolean endOfBatch) throws Exception {
        System.out.println("消費者: " + event.getValue());
    }
}

4.5 主流程

  • 首先初始化一個 Disruptor 對象,Disruptor 有多個重載的構造函數(shù)。支持傳入 EventFactory 、ringBufferSize (需要是2的冪次方)、executor(用于執(zhí)行EventHandler 的事件處理邏輯,一個 EventHandler 對應一個線程,一個線程只服務于一個 EventHandler )、生產(chǎn)者模式(支持單生產(chǎn)者、多生產(chǎn)者)、阻塞等待策略。在創(chuàng)建 Disruptor 對象時,內部會創(chuàng)建好指定 size 的 RingBuffer 對象。
  • 定義 Disruptor 對象之后,可以通過該對象添加消費者 EventHandler。
  • 啟動 Disruptor,會將第2步添加的 EventHandler 消費者封裝成 EventProcessor(實現(xiàn)了 Runnable 接口),提交到構建 Disruptor 時指定的 executor 對象中。由于 EventProcessor 的 run 方法是一個 while 循環(huán),不斷嘗試從RingBuffer 中獲取數(shù)據(jù)。因此可以說一個 EventHandler 對應一個線程,一個線程只服務于一個EventHandler。
  • 拿到 Disruptor 持有的 RingBuffer,然后就可以創(chuàng)建生產(chǎn)者,通過該RingBuffer就可以發(fā)布生產(chǎn)數(shù)據(jù)了,然后 EventProcessor 中啟動的任務就可以消費到數(shù)據(jù),交給 EventHandler 去處理了。
public static void main(String[] args) {
    OrderEventFactory orderEventFactory = new OrderEventFactory();
    int ringBufferSize = 4;
    ExecutorService executor = Executors.newFixedThreadPool(1);
 
    /**
     * 1. 實例化disruptor對象
       1) eventFactory: 消息(event)工廠對象
       2) ringBufferSize: 容器的長度
       3) executor:
       4) ProducerType: 單生產(chǎn)者還是多生產(chǎn)者
       5) waitStrategy: 等待策略
     */
    Disruptor<OrderEvent> disruptor = new Disruptor<OrderEvent>(orderEventFactory,
                                                        ringBufferSize,
                                                        executor,
                                                        ProducerType.SINGLE,
                                                        new BlockingWaitStrategy());
 
    // 2. 添加消費者的監(jiān)聽
    disruptor.handleEventsWith(new OrderEventHandler());
 
    // 3. 啟動disruptor
    disruptor.start();
 
    // 4. 獲取實際存儲數(shù)據(jù)的容器: RingBuffer
    RingBuffer<OrderEvent> ringBuffer = disruptor.getRingBuffer();
 
    OrderEventProducer producer = new OrderEventProducer(ringBuffer);
 
    ByteBuffer bb = ByteBuffer.allocate(8);
 
    for (long i = 0; i < 5; i++) {
        bb.putLong(0, i);
        producer.sendData(bb);
    }
 
    disruptor.shutdown();
    executor.shutdown();
}

五、Disruptor 源碼分析

本文分析時以單(多)生產(chǎn)者、單消費者為例進行分析。

5.1 創(chuàng)建 Disruptor

首先是通過傳入的參數(shù)創(chuàng)建 RingBuffer,將創(chuàng)建好的 RingBuffer 與傳入的 executor 交給 Disruptor 對象持有。

public Disruptor(
    final EventFactory<T> eventFactory,
    final int ringBufferSize,
    final Executor executor,
    final ProducerType producerType,
    final WaitStrategy waitStrategy){
    this(RingBuffer.create(producerType, eventFactory, ringBufferSize, waitStrategy),
         executor);
}

接下來分析 RingBuffer 的創(chuàng)建過程,分為單生產(chǎn)者與多生產(chǎn)者。

public static <E> RingBuffer<E> create(
        ProducerType producerType,
        EventFactory<E> factory,
        int bufferSize,
        WaitStrategy waitStrategy){
        switch (producerType){
            case SINGLE:
                // 單生產(chǎn)者
                return createSingleProducer(factory, bufferSize, waitStrategy);
            case MULTI:
                // 多生產(chǎn)者
                return createMultiProducer(factory, bufferSize, waitStrategy);
            default:
                throw new IllegalStateException(producerType.toString());
        }
}

不論是單生產(chǎn)者還是多生產(chǎn)者,最終都會創(chuàng)建一個 RingBuffer 對象,只是傳給 RingBuffer 的 Sequencer 對象不同??梢钥吹?,RingBuffer 內部最終創(chuàng)建了一個Object 數(shù)組來存儲 Event 數(shù)據(jù)。這里有幾點需要注意:

  • RingBuffer 是用數(shù)組實現(xiàn)的,在創(chuàng)建該數(shù)組后緊接著調用 fill 方法調用 EventFactory 工廠方法為數(shù)組中的元素進行初始化,后續(xù)在使用這些元素時,直接通過下標獲取并給對應的屬性賦值,這樣就避免了 Event 對象的反復創(chuàng)建,避免頻繁 GC。
  • RingBuffe 的數(shù)組中的元素是在初始化時一次性全部創(chuàng)建的,所以這些元素的內存地址大概率是連續(xù)的。消費者在消費時,是遵循空間局部性原理的。消費完第一個Event 時,很快就會消費第二個 Event,而在消費第一個 Event 時,CPU 會把內存中的第一個 Event 的后面的 Event 也加載進 Cache 中,這樣當消費第二個 Event時,它已經(jīng)在 CPU Cache 中了,所以就不需要從內存中加載了,這樣可以大大提升性能。
public static <E> RingBuffer<E> createSingleProducer(
    EventFactory<E> factory, int bufferSize, WaitStrategy waitStrategy){
     
    SingleProducerSequencer sequencer = new SingleProducerSequencer(bufferSize,
                                                                    waitStrategy);
    return new RingBuffer<E>(factory, sequencer);
}
RingBufferFields(
        EventFactory<E> eventFactory,
        Sequencer sequencer){
        // 省略部分代碼...
         
        // 額外創(chuàng)建2個填充空間的大小, 首尾填充, 避免數(shù)組的有效載荷和其它成員加載到同一緩存行
        this.entries = new Object[sequencer.getBufferSize() + 2 * BUFFER_PAD];
        fill(eventFactory);
}
 
private void fill(EventFactory<E> eventFactory){
    for (int i = 0; i < bufferSize; i++){
        // BUFFER_PAD + i為真正的數(shù)組索引
        entries[BUFFER_PAD + i] = eventFactory.newInstance();
    }
}

5.2 添加消費者

添加消費者的核心代碼如下所示,核心就是為將一個 EventHandler 封裝成 BatchEventProcessor,

然后添加到 consumerRepository 中,后續(xù)啟動 Disruptor 時,會遍歷 consumerRepository 中的所有 BatchEventProcessor(實現(xiàn)了 Runnable 接口),將 BatchEventProcessor 任務提交到線程池中。

public final EventHandlerGroup<T> handleEventsWith(
                                    final EventHandler<? super T>... handlers){
    // 通過disruptor對象直接調用handleEventsWith方法時傳的是空的Sequence數(shù)組
    return createEventProcessors(new Sequence[0], handlers);
}
EventHandlerGroup<T> createEventProcessors(
    final Sequence[] barrierSequences,
    final EventHandler<? super T>[] eventHandlers) {
 
    // 收集添加的消費者的序號
    final Sequence[] processorSequences = new Sequence[eventHandlers.length];
    // 本批次消費由于添加在同一個節(jié)點之后, 因此共享該屏障
    final SequenceBarrier barrier = ringBuffer.newBarrier(barrierSequences);
 
    // 為每個EventHandler創(chuàng)建一個BatchEventProcessor
    for (int i = 0, eventHandlersLength = eventHandlers.length;
                    i < eventHandlersLength; i++) {
        final EventHandler<? super T> eventHandler = eventHandlers[i];
 
        final BatchEventProcessor<T> batchEventProcessor =
            new BatchEventProcessor<>(ringBuffer, barrier, eventHandler);
 
        if (exceptionHandler != null){
            batchEventProcessor.setExceptionHandler(exceptionHandler);
        }
 
        // 添加到消費者信息倉庫中
        consumerRepository.add(batchEventProcessor, eventHandler, barrier);
        processorSequences[i] = batchEventProcessor.getSequence();
    }
 
    // 更新網(wǎng)關序列(生產(chǎn)者只需要關注所有的末端消費者節(jié)點的序列)
    updateGatingSequencesForNextInChain(barrierSequences, processorSequences);
 
    return new EventHandlerGroup<>(this, consumerRepository, processorSequences);
}

創(chuàng)建完 Disruptor 對象之后,可以通過 Disruptor 對象添加 EventHandler,這里有一需要注意:通過 Disruptor 對象直接調用 handleEventsWith 方法時傳的是空的 Sequence 數(shù)組,這是什么意思?可以看到 createEventProcessors 方法接收該空 Sequence 數(shù)組的字段名是 barrierSequences,翻譯成中文就是柵欄序號。怎么理解這個字段?

比如通過如下代碼給 Disruptor 添加了兩個handler,記為 handlerA 和 handlerB,這種是串行消費,對于一個 Event,handlerA 消費完后才能輪到 handlerB 去消費。對于 handlerA 來說,它沒有前置消費者(生成者生產(chǎn)到哪里,消費者就可以消費到哪里),因此它的 barrierSequences 是一個空數(shù)組。而對于 handlerB 來說,它的前置消費者是 handlerA,因此它的 barrierSequences 就是A的消費進度,也就是說 handlerB 的消費進度是要小于 handlerA 的消費進度的。

圖片


disruptor.handleEventsWith(handlerA).handleEventsWith(handlerB);

如果是通過如下方式添加的 handler,則 handlerA 和handlerB 會消費所有 Event 數(shù)據(jù),類似 MQ 消息中的廣播消費,而 handlerC 的 barrierSequences 數(shù)組就是包含了 handlerA 的消費進度和 handlerB 的消費進度,這也是為什么 barrierSequences 是一個數(shù)組,后續(xù) handlerC 在消費數(shù)據(jù)時,會取A和B消費進度的較小值進行判斷,比如A消費到進度6,B消費到進度4,那么C只能去消費下標為3的數(shù)據(jù),這也是 barrierSequences 的含義。

disruptor.handleEventsWith(handlerA, handlerB).handleEventsWith(handlerC);


圖片


5.3 啟動 Disruptor

Disruptor的啟動邏輯比較簡潔,就是遍歷consumerRepository 中收集的 EventProcessor(實現(xiàn)了Runnable接口),將它提交到創(chuàng)建 Disruptor 時指定的executor 中,EventProcessor 的 run 方法會啟動一個while 循環(huán),不斷嘗試從 RingBuffer 中獲取數(shù)據(jù)進行消費。

disruptor.start();
public RingBuffer<T> start() {
    checkOnlyStartedOnce();
    for (final ConsumerInfo consumerInfo : consumerRepository) {
        consumerInfo.start(executor);
    }
 
    return ringBuffer;
}
 
public void start(final Executor executor) {
    executor.execute(eventprocessor);
}

5.4 發(fā)布數(shù)據(jù)

在分析 Disruptor 的發(fā)布數(shù)據(jù)的源碼前,先來回顧下發(fā)布數(shù)據(jù)的整體流程。

  • 調用 next 方法獲取可用序號,該方法可能會阻塞。
  • 通過上一步獲得的序號從 RingBuffer 中獲取對應的 Event,因為 RingBuffer 中所有的 Event 在初始化時已經(jīng)創(chuàng)建好了,這里獲取的只是空對象。
  • 因此接下來需要對該空對象進行業(yè)務賦值。
  • 調用 next 方法需要在 finally 方法中進行最終的發(fā)布,標記該序號數(shù)據(jù)已實際生產(chǎn)完成。
public void sendData(ByteBuffer data) {
    long sequence = ringBuffer.next();
    try {
        OrderEvent event = ringBuffer.get(sequence);
        event.setValue(data.getLong(0));           
    } finally {
        ringBuffer.publish(sequence);          
    }
}

5.4.1 獲取序號

next 方法默認申請一個序號。nextValue 表示已分配的序號,nextSequence 表示在此基礎上再申請n個序號(此處n為1),cachedValue 表示緩存的消費者的最小消費進度。

假設有一個 size 為8的 RingBuffer,當前下標為6的數(shù)據(jù)已經(jīng)發(fā)布好(nextValue為6),消費者一直未開啟消費(cachedValue 和 

cachedGatingSequence 為-1),此時生產(chǎn)者想繼續(xù)發(fā)布數(shù)據(jù),調用 next() 方法申請獲取序號為7的位置(nextSequence為7),計算得到的 wrapPoint 為7-8=-1,此時 wrapPoint 等于 

cachedGatingSequence,可以繼續(xù)發(fā)布數(shù)據(jù),如左圖。最后將 nextValue 賦值為7,表示序號7的位置已經(jīng)被生產(chǎn)者占用了。

接著生產(chǎn)者繼續(xù)調用 next() 方法申請序號為0的數(shù)據(jù),此時 nextValue為7,nextSequence 為8,wrapPoint 等于0,由于消費者遲遲未消費

(cachedGatingSequence為-1),此時 wrapPoint 大于了 cachedGatingSequence,因此 next 方法的if判斷成立,會調用 LockSupport.parkNanos 阻塞等待消費者進行消費。其中 getMinimumSequence 方法是獲取多個消費者的最小消費進度。


圖片

public long next() {
    return next(1);
}
public long next(int n) {
 
    /**
     * 已分配的序號的緩存(已分配到這里), 初始-1. 可以看該方法的返回值nextSequence,
     * 接下來生產(chǎn)者就會該往該位置寫數(shù)據(jù), 它賦值給了nextValue, 所以下一次調用next方
     * 法時, nextValue位置就是表示已經(jīng)生產(chǎn)好了數(shù)據(jù), 接下來要申請nextSequece的數(shù)據(jù)
     */
    long nextValue = this.nextValue;
 
    // 本次申請分配的序號
    long nextSequence = nextValue + n;
 
    // 構成環(huán)路的點:環(huán)形緩沖區(qū)可能追尾的點 = 等于本次申請的序號-環(huán)形緩沖區(qū)大小
    // 如果該序號大于最慢消費者的進度, 那么表示追尾了, 需要等待
    long wrapPoint = nextSequence - bufferSize;
 
    // 上次緩存的最小網(wǎng)關序號(消費最慢的消費者的進度)
    long cachedGatingSequence = this.cachedValue;
 
    // wrapPoint > cachedGatingSequence 表示生產(chǎn)者追上消費者產(chǎn)生環(huán)路(追尾), 即緩沖區(qū)已滿,
    // 此時需要獲取消費者們最新的進度, 以確定是否隊列滿
    if (wrapPoint > cachedGatingSequence || cachedGatingSequence > nextValue) {
        // 插入StoreLoad內存屏障/柵欄, 保證可見性。
        // 因為publish使用的是set()/putOrderedLong, 并不保證其他消費者能及時看見發(fā)布的數(shù)據(jù)
        // 當我再次申請更多的空間時, 必須保證消費者能消費發(fā)布的數(shù)據(jù)
        cursor.setVolatile(nextValue);
 
        long minSequence;
        // minSequence是多個消費者的最小序號, 要等所有消費者消費完了才能繼續(xù)生產(chǎn)
        while (wrapPoint > (minSequence = Util.getMinimumSequence(gatingSequences,
                                                                  nextValue))) {
            LockSupport.parkNanos(1L);
        }
 
        // 緩存生產(chǎn)者們最新的消費進度
        this.cachedValue = minSequence;
    }
 
    // 這里只寫了緩存, 并未寫volatile變量, 因為只是預分配了空間但是并未被發(fā)布數(shù)據(jù),
    // 不需要讓其他消費者感知到。消費者只會感知到真正被發(fā)布的序號
    this.nextValue = nextSequence;
 
    return nextSequence;
}

5.4.2 根據(jù)序號獲取 Event

直接通過 Unsafe 工具類獲取指定序號的 Event 對象,此時獲取的是空對象,因此接下來需要對該 Event 對象進行業(yè)務賦值,賦值完成后調用 publish 方法進行最終的數(shù)據(jù)發(fā)布。

OrderEvent event = ringBuffer.get(sequence);
public E get(long sequence) {
    return elementAt(sequence);
}
protected final E elementAt(long sequence) {
    return (E) UNSAFE.getObject(entries,
                                REF_ARRAY_BASE +
                                ((sequence & indexMask) << REF_ELEMENT_SHIFT));
}

5.4.3 發(fā)布數(shù)據(jù)

生產(chǎn)者獲取到可用序號后,首先對該序號處的空 Event 對象進行業(yè)務賦值,接著調用 RingBuffer 的 publish 方法發(fā)布數(shù)據(jù),RingBuffer 會委托給其持有的 sequencer(單生產(chǎn)者和多生產(chǎn)者對應不同的 sequencer)對象進行真正發(fā)布。單生產(chǎn)者的發(fā)布邏輯比較簡單,更新下 cursor 進度(cursor 表示生產(chǎn)者的生產(chǎn)進度,該位置已實際發(fā)布數(shù)據(jù),而 next 方法中的 nextSequence 表示生產(chǎn)者申請的最大序號,可能還未實際發(fā)布數(shù)據(jù)),接著喚醒等待的消費者。

waitStrategy 有不同的實現(xiàn),因此喚醒邏輯也不盡相同,如采用 BusySpinWaitStrategy 策略時,消費者獲取不到數(shù)據(jù)時自旋等待,然后繼續(xù)判斷是否有新數(shù)據(jù)可以消費了,因此 BusySpinWaitStrategy 策略的 signalAllWhenBlocking 就是一個空實現(xiàn),啥也不做。

ringBuffer.publish(sequence);
public void publish(long sequence) {
    sequencer.publish(sequence);
}
public void publish(long sequence) {
    // 更新生產(chǎn)者進度
    cursor.set(sequence);
    // 喚醒等待的消費者
    waitStrategy.signalAllWhenBlocking();
}

5.4.4 消費數(shù)據(jù)

前面提到,Disruptor 啟動時,會將封裝 EventHandler 的EventProcessor(此處以 BatchEventProcessor 為例)提交到線程池中運行,BatchEventProcessor 的 run 方法會調用 processEvents 方法不斷嘗試從 RingBuffer 中獲取數(shù)據(jù)進行消費,下面分析下 processEvents 的邏輯(代碼做了精簡)。它會開啟一個 while 循環(huán),調用 sequenceBarrier.waitFor 方法獲取最大可用的序號,比如獲取序號一節(jié)所提的,生產(chǎn)者持續(xù)生產(chǎn),消費者一直未消費,此時生產(chǎn)者已經(jīng)將整個 RingBuffer 數(shù)據(jù)都生產(chǎn)滿了,生產(chǎn)者無法再繼續(xù)生產(chǎn),生產(chǎn)者此時會阻塞。假設這時候消費者開始消費,因此 nextSequence 為0,而 

availableSequence 為7,此時消費者可以批量消費,將這8條已生產(chǎn)者的數(shù)據(jù)全部消費完,消費完成后更新下消費進度。更新消費進度后,生產(chǎn)者通過 Util.getMinimumSequence 方法就可以感知到最新的消費進度,從而不再阻塞,繼續(xù)發(fā)布數(shù)據(jù)了。

private void processEvents() {
    T event = null;
 
    // sequence記錄消費者的消費進度, 初始為-1
    long nextSequence = sequence.get() + 1L;
 
    // 死循環(huán),因此不會讓出線程,需要獨立的線程(每一個EventProcessor都需要獨立的線程)
    while (true) {
        // 通過屏障獲取到的最大可用序號
        final long availableSequence = sequenceBarrier.waitFor(nextSequence);
 
        // 批量消費
        while (nextSequence <= availableSequence) {
            event = dataProvider.get(nextSequence);
            eventHandler.onEvent(event, nextSequence, nextSequence == availableSequence);
            nextSequence++;
        }
 
        // 更新消費進度(批量消費, 每次消費只更新一次Sequence, 減少性能消耗)
        sequence.set(availableSequence);
    }
}

下面分析下 SequenceBarrier 的 waitFor 方法。首先它會調用 waitStrategy 的 waitFor 方法獲取最大可用序號,以 BusySpinWaitStrategy 策略為例,它的 waitFor 方法的三個參數(shù)的含義分別是:

  • sequence:消費者期望獲得的序號,也就是當前消費者已消費的進度+1
  • cursor:當前生產(chǎn)者的生成進度
  • dependentSequence:消費者依賴的前置消費者的消費進度。該字段是在添加 EventHandler,創(chuàng)建
    BatchEventProcessor 時創(chuàng)建的。如果當前消費者沒有前置依賴的消費者,那么它只需要關心生產(chǎn)者的進度,生產(chǎn)者生產(chǎn)到哪里,它就可以消費到哪里,因此 dependentSequence 就是 cursor。而如果當前消費者有前置依賴的消費者,那么dependentSequence就是
    FixedSequenceGroup(dependentSequences)。

因為 dependentSequence 分為兩種情況,所以 waitFor 的邏輯也可以分為兩種情況討論:

  • 當前消費者無前置消費者:假設 cursor 為6,也就是序號為6的數(shù)據(jù)已經(jīng)發(fā)布了數(shù)據(jù),此時傳入的sequence為6,則waitFor方法可以直接返回availableSequence(6),可以正常消費。序號為6的數(shù)據(jù)消費完成后,消費者繼續(xù)調用 waitFor 獲取數(shù)據(jù),傳入的 sequence為7,而此時 availableSequence 還是未6,因此消費者需要自旋等待。當生產(chǎn)者繼續(xù)發(fā)布數(shù)據(jù)后,因為 dependentSequence 持有的就是生產(chǎn)者的生成進度,因此消費者可以感知到,繼續(xù)消費。
  • 當前消費者有前置消費者:假設 cursor 為6,當前消費者C有兩個前置依賴的消費者A(消費進度為5)、B(消費進度為4),那么此時 availableSequence
    (FixedSequenceGroup實例,它的 get 方法是獲取A、B的最小值,也就是4)為4。如果當前消費者C期望消費下標為4的數(shù)據(jù),則可以正常消費,但是消費下標為5的數(shù)據(jù)就不行了,它需要等待它的前置消費者B消費完進度為5的數(shù)據(jù)后才能繼續(xù)消費。

在 waitStrategy 的 waitFor 方法返回,得到最大可用的序號 availableSequence 后,最后需要再調用下 sequencer 的 

getHighestPublishedSequence 獲取真正可用的最大序號,這和生產(chǎn)者模型有關系,如果是單生產(chǎn)者,因為數(shù)據(jù)是連續(xù)發(fā)布的,直接返回傳入的 availableSequence。而如果是多生產(chǎn)者,因為多生產(chǎn)者是有多個線程在生產(chǎn)數(shù)據(jù),發(fā)布的數(shù)據(jù)是不連續(xù)的,因此需要通過 

getHighestPublishedSequence 方法獲取已發(fā)布的且連續(xù)的最大序號,因為獲取序號進行消費時需要是順序的,不能跳躍。

public long waitFor(final long sequence)
        throws AlertException, InterruptedException, TimeoutException {
    /**
     * sequence: 消費者期望獲取的序號
     * cursorSequence: 生產(chǎn)者的序號
     * dependentSequence: 消費者需要依賴的序號
     */
    long availableSequence = waitStrategy.waitFor(sequence,
                                                  cursorSequence,
                                                  dependentSequence, this);
 
    if (availableSequence < sequence) {
        return availableSequence;
    }
 
    // 目標sequence已經(jīng)發(fā)布了, 這里獲取真正的最大序號(和生產(chǎn)者模型有關)
    return sequencer.getHighestPublishedSequence(sequence, availableSequence);
}
public long waitFor(
    final long sequence, Sequence cursor, final Sequence dependentSequence,
    final SequenceBarrier barrier) throws AlertException, InterruptedException {
    long availableSequence;
 
    // 確保該序號已經(jīng)被我前面的消費者消費(協(xié)調與其他消費者的關系)
    while ((availableSequence = dependentSequence.get()) < sequence) {
        barrier.checkAlert();
        // 自旋等待
        ThreadHints.onSpinWait();
    }
 
    return availableSequence;
}

六、Disruptor 高性能原理分析

6.1 空間預分配

前文分析源碼時介紹到,RingBuffer 內部維護了一個 Object 數(shù)組(也就是真正存儲數(shù)據(jù)的容器),在 RingBuffer 初始化時該 Object 數(shù)組就已經(jīng)使用EventFactory 初始化了一些空 Event,后續(xù)就不需要在運行時來創(chuàng)建了,避免頻繁GC。

另外,RingBuffe 的數(shù)組中的元素是在初始化時一次性全部創(chuàng)建的,所以這些元素的內存地址大概率是連續(xù)的。消費者在消費時,是遵循空間局部性原理的。消費完第一個Event 時,很快就會消費第二個 Event,而在消費第一個 Event 時,CPU 會把內存中的第一個 Event 的后面的 Event 也加載進 Cache 中,這樣當消費第二個 Event 時,它已經(jīng)在 CPU Cache 中了,所以就不需要從內存中加載了,這樣也可以大大提升性能。

6.2、避免偽共享

6.2.1 一個偽共享的例子

如下代碼所示,定義了一個 Pointer 類,它有2個 long 類型的成員變量x、y,然后在 main 方法中其中2個線程分別對同一個 Pointer 對象的x和y自增 100000000 次,最后統(tǒng)計下方法耗時,在我本機電腦上測試多次,平均約為3600ms。

public class Pointer {
 
    volatile long x;
 
    volatile long y;
 
    @Override
    public String toString() {
        return new StringJoiner(", ", Pointer.class.getSimpleName() + "[", "]")
                .add("x=" + x)
                .add("y=" + y)
                .toString();
    }
}
public static void main(String[] args) throws InterruptedException {
    Pointer pointer = new Pointer();
 
    int num = 100000000;
    long start = System.currentTimeMillis();
 
    Thread t1 = new Thread(() -> {
        for(int i = 0; i < num; i++){
            pointer.x++;
        }
    });
 
    Thread t2 = new Thread(() -> {
        for(int i = 0; i < num; i++){
            pointer.y++;
        }
    });
 
    t1.start();
    t2.start();
    t1.join();
    t2.join();
 
    System.out.println(System.currentTimeMillis() - start);
    System.out.println(pointer);
}

接著將 Pointer 類修改如下:在變量x和y之間插入7個 long 類型的變量,僅此而已,接著繼續(xù)通過上述的 main 方法統(tǒng)計耗時,平均約為500ms??梢钥吹?,修改前的耗時是修改后(避免了偽共享)的7倍多。那么什么是偽共享,為什么避免了偽共享能有這么大的性能提升呢?

public class Pointer {
 
    volatile long x;
 
    long p1, p2, p3, p4, p5, p6, p7;
 
    volatile long y;
 
    @Override
    public String toString() {
        return new StringJoiner(", ", Pointer.class.getSimpleName() + "[", "]")
                .add("x=" + x)
                .add("y=" + y)
                .toString();
    }
}

6.2.2、避免偽共享為什么可以提升性能

內存的訪問速度是遠遠慢于 CPU 的,為了高效利用 CPU,在 CPU 和內存之間加了緩存,稱為 CPU Cache。為了提高性能,需要更多地從 CPU Cache 里獲取數(shù)據(jù),而不是從內存中獲取數(shù)據(jù)。CPU Cache 加載內存里的數(shù)據(jù),是以緩存行(通常為64字節(jié))為單位加載的。Java 的 long 類型是8字節(jié),因此一個緩存行可以存放8個 long 類型的變量。

但是,這種加載帶來了一個壞處,如上述例子所示,假設有一個 long 類型的變量x,另外還有一個 long 類型的變量y緊挨著它,那么當加載x時也會加載y。如果此時 CPU Core1 的線程在對x進行修改,另一個 CPU Core2 的線程卻在對y進行讀取。當前者修改x時,會把x和y同時加載到 CPU Core1 對應的 CPU Cache 中,更新完后x和其它所有包含x的緩存行都將失效。而當 CPU Core2 的線程讀取y時,發(fā)現(xiàn)這個緩存行已經(jīng)失效了,需要從主內存中重新加載。

這就是偽共享,x和y不相干,但是卻因為x的更新導致需要重新從主內存讀取,拖慢了程序性能。解決辦法之一就是如上述示例中所做,在x和y之間填充7個 long 類型的變量,保證x和y不會被加載到同一個緩存行中去。Java8 中也增加了新的注解@Contended(JVM加上啟動參數(shù)-XX:-RestrictContended 才會生效),也可以避免偽共享。

圖片

6.2.3、Disruptor 中使用偽共享的場景

Disruptor 中使用 Sequence 類的 value 字段來表示生產(chǎn)/消費進度,可以看到在該字段前后各填充了7個 long 類型的變量,來避免偽共享。另外,向 RingBuffer 內部的數(shù)組、

SingleProducerSequencer 等也使用了該技術。

class LhsPadding {
    protected long p1, p2, p3, p4, p5, p6, p7;
}
 
class Value extends LhsPadding {
    protected volatile long value;
}
 
class RhsPadding extends Value {
    protected long p9, p10, p11, p12, p13, p14, p15;
}

6.3、無鎖

生產(chǎn)者生產(chǎn)數(shù)據(jù)時,需要入隊。消費者消費數(shù)據(jù)時,需要出隊。入隊時,不能覆蓋沒有消費的元素。出隊時,不能讀取沒有寫入的元素。因此,Disruptor 中需要維護一個入隊索引(生產(chǎn)者數(shù)據(jù)生產(chǎn)到哪里,對應 AbstractSequencer 中的 cursor )和一個出隊索引(所有消費者中消費進度最小的序號)。

Disruptor 中最復雜的是入隊操作,下面以多生產(chǎn)者(MultiProducerSequencer)的 next(n) 方法(申請n個序號)為例分析下 Disruptor 是如何實現(xiàn)無鎖操作的。代碼如下所示,判斷下是否有足夠的序號(空余位置),如果沒有,就讓出 CPU 使用權,然后重新判斷。如果有,則使用 CAS 設置 cursor(入隊索引)。

public long next(int n) {
    do {
        // cursor類似于入隊索引, 指的是上次生產(chǎn)到這里
        current = cursor.get();
        // 目標是再生產(chǎn)n個
        next = current + n;
 
        // 前文分析過, 用于判斷消費者是否已經(jīng)追上生產(chǎn)進度, 生產(chǎn)者能否申請到n個序號
        long wrapPoint = next - bufferSize;
        // 獲取緩存的上一次的消費進度
        long cachedGatingSequence = gatingSequenceCache.get();
 
        // 第一步:空間不足就繼續(xù)等待
        if (wrapPoint > cachedGatingSequence || cachedGatingSequence > current) {
            // 重新計算下所有消費者里的最小消費進度
            long gatingSequence = Util.getMinimumSequence(gatingSequences, current);
 
            // 依然沒有足夠的空間, 讓出CPU使用權
            if (wrapPoint > gatingSequence) {
                LockSupport.parkNanos(1);
                continue;
            }
 
            // 更新下最新的最小的消費進度
            gatingSequenceCache.set(gatingSequence);
        }
        // 第二步:看見空間足夠時嘗試CAS競爭空間
        else if (cursor.compareAndSet(current, next)) {
            break;
        }
    } while (true);
 
    return next;
}

6.4、支持批量消費定義 Event

這個比較好理解,在前文分析消費數(shù)據(jù)的邏輯時介紹了,消費者會獲取下最大可用的序號,然后批量消費這些消息。

七、Disruptor 在i主題業(yè)務中的使用

很多開源項目都使用了 Disruptor,比如日志框架 Log4j2 使用它來實現(xiàn)異步日志。HBase、Storm 等項目中也使用了到了 Disruptor。vivo 的 i主題業(yè)務也使用了 Disruptor,下面簡單介紹下它的2個使用場景。

7.1、監(jiān)控數(shù)據(jù)上報

業(yè)務監(jiān)控系統(tǒng)對于企業(yè)來說非常重要,可以幫助企業(yè)及時發(fā)現(xiàn)和解決問題,可以方便的檢測業(yè)務指標數(shù)據(jù),改進業(yè)務決策,從而保證業(yè)務的可持續(xù)發(fā)展。i主題使用 Disruptor(多生產(chǎn)者單消費者)來暫存待上報的業(yè)務指標數(shù)據(jù),然后有定時任務不斷提取數(shù)據(jù)上報到監(jiān)控平臺,如下圖所示。


圖片


7.2、本地緩存 key 統(tǒng)計分析

i主題業(yè)務中大量使用了本地緩存,為了統(tǒng)計本地緩存中key 的個數(shù)(去重)以及每種緩存模式 key 的數(shù)量,考慮使用 Disruptor 來暫存并消費處理數(shù)據(jù)。因為業(yè)務代碼里很多地方涉及到本地緩存的訪問,也就是說,生產(chǎn)者是多線程的??紤]到消費處理比較簡單,而如果使用多線程消費的話又涉及到加鎖同步,因此消費者采用單線程模式。

整體流程如下圖所示,首先在緩存訪問工具類中增加緩存訪問統(tǒng)計上報的調用,緩存訪問數(shù)據(jù)進入到 RingBuffer 后,單線程消費者使用 HyperLogLog 來去重統(tǒng)計不同 key的個數(shù),使用正則匹配來統(tǒng)計每種模式key的數(shù)量。然后有異步任務定時獲取統(tǒng)計結果,進行展示。

需要注意的是,因為 RingBuffer 隊列大小是固定的,如果生產(chǎn)者生產(chǎn)過快而消費者消費不過來,如果使用 next 方法申請序號,如果剩余空間不夠會導致生產(chǎn)者阻塞,因此建議使用 tryPublishEvent 方法去發(fā)布數(shù)據(jù),它內部是使用 tryNext 方法申請序號,該方法如果申請不到可用序號會拋出異常,這樣生產(chǎn)者感知到了就可以做兼容處理,而不是阻塞等待。

圖片


八、使用建議

  • Disruptor 是基于生產(chǎn)者消費者模式,如果生產(chǎn)快消費慢,就會導致生產(chǎn)者無法寫入數(shù)據(jù)。因此,不建議在 Disruptor 消費線程中處理耗時較長的業(yè)務。
  • 一個 EventHandler 對應一個線程,一個線程只服務于一個 EventHandler。Disruptor 需要為每一個
    EventHandler(EventProcessor) 創(chuàng)建一個線程。因此在創(chuàng)建 Disruptor 時不推薦傳入指定的線程池,而是由 Disruptor 自身根據(jù) EventHandler 數(shù)量去創(chuàng)建對應的線程。
  • 生產(chǎn)者調用 next 方法申請序號時,如果獲取不到可用序號會阻塞,這一點需要注意。推薦使用 tryPublishEvent 方法,生產(chǎn)者在申請不到可用序號時會立即返回,不會阻塞業(yè)務線程。
  • 如果使用 next 方法申請可用序號,需要確保在 finally 方法中調用 publish 真正發(fā)布數(shù)據(jù)。
  • 合理設置等待策略。消費者在獲取不到數(shù)據(jù)時會根據(jù)設置的等待策略進行等待,BlockingWaitStrategry 是最低效的策略,但其對 CPU消耗最小。YieldingWaitStrategy 有著較低的延遲、較高的吞吐量,以及較高 CPU 占用率。當 CPU 數(shù)量足夠時,可以使用該策略。

九、總結

本文首先通過對比 JDK 中內置的線程安全的隊列和Disruptor 的特點,引入了高性能無鎖內存隊列 Disruptor。接著介紹了 Disruptor 的核心概念和基本使用,使讀者對 Disruptor 建立起初步的認識。接著從源碼和原理角度介紹了 Disruptor 的核心實現(xiàn)以及高性能原理(空間預分配、避免偽共享、無鎖、支持批量消費)。其次,結合i主題業(yè)務介紹了 Disruptor 在實踐中的應用。最后,基于上述原理分析及應用實戰(zhàn),總結了一些 Disruptor 最佳實踐策略。


參考文章:

https://time.geekbang.org/column/article/132477

https://lmax-exchange.github.io/disruptor/


責任編輯:龐桂玉 來源: vivo互聯(lián)網(wǎng)技術
相關推薦

2022-12-09 08:40:56

高性能內存隊列

2022-06-09 08:36:56

高性能Disruptor模式

2025-03-05 08:37:05

2024-10-30 15:43:56

2022-03-24 10:23:51

時間輪方法任務

2023-05-08 14:56:00

Kafka高可靠高性能

2021-06-21 17:00:05

云計算Hologres云原生

2017-09-01 15:21:18

Raft算法CMQ應用

2017-09-01 15:49:41

Raft算法CMQ

2017-01-17 09:38:52

ZooKeeperHadoopHBase

2021-04-21 15:21:37

技術架構高并發(fā)基礎源碼解析

2020-03-13 07:40:36

Plato數(shù)據(jù)分析

2022-04-07 17:30:31

Flutter攜程火車票渲染

2024-09-06 07:55:42

2022-06-28 08:42:03

磁盤kafka高性能

2022-06-30 08:04:16

Redis分布式鎖Redisson

2025-01-15 07:54:02

2022-11-11 08:55:29

RoCE技術應用

2012-05-08 13:36:55

2013-06-06 13:10:44

HashMap無鎖
點贊
收藏

51CTO技術棧公眾號