自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

Disruptor廣播模式與執(zhí)行順序鏈源碼分析

開發(fā) 前端
消費者線程起來后,然后進入死循環(huán),持續(xù)不斷從生產者處批量獲取可用的序號,如果獲取到可用序號后,那么遍歷所有可用序號,然后調用eventHandler的onEvent方法消費數據,onEvent方法寫的是消費者的業(yè)務邏輯。

1.前言

本篇文章開始Disruptor的源碼分析,理解起來相對比較困難,特別是Disruptor的sequenceBarrier的理解,sequenceBarrier包括生產者與消費者之間的gatingSequence以及消費者與消費者之間的dependentSequence。此外,Disruptor源碼中的sequence變量也比較多,需要捋清楚各種sequence的含義。最后,建議小伙伴們動手調試理解,效果會更好。

2.Disruptor六邊形DEMO

分析源碼前,先來看看Disruptor六邊形執(zhí)行器鏈的DEMO。

public class LongEventMain
{
private static final int BUFFER_SIZE = 1024;
public static void main(String[] args) throws Exception
{
// 1,構建disruptor
final Disruptor<LongEvent> disruptor = new Disruptor<LongEvent>(
new LongEventFactory(),
BUFFER_SIZE,
Executors.newFixedThreadPool(5), // 【注意點】線程池需要保證足夠的線程:有多少個消費者就要有多少個線程,否則有些消費者將不會執(zhí)行,生產者可能也會一直阻塞下去
ProducerType.SINGLE,
new YieldingWaitStrategy()
);

EventHandler eventHandler1 = new LongEventHandler1();
EventHandler eventHandler2 = new LongEventHandler2();
EventHandler eventHandler3 = new LongEventHandler3();
EventHandler eventHandler4 = new LongEventHandler4();
EventHandler eventHandler5 = new LongEventHandler5();

// 方式1 構建串行執(zhí)行順序:
/*disruptor
.handleEventsWith(eventHandler1)
.handleEventsWith(eventHandler2)
.handleEventsWith(eventHandler3)
.handleEventsWith(eventHandler4)
.handleEventsWith(eventHandler5);*/

// 方式2 構建并行執(zhí)行順序
/*disruptor
.handleEventsWith(eventHandler1, eventHandler2, eventHandler3, eventHandler4, eventHandler5);*/

// 方式3 構建菱形執(zhí)行順序
/*disruptor.handleEventsWith(eventHandler1, eventHandler2)
.handleEventsWith(eventHandler3);*/

// 2,構建eventHandler執(zhí)行鏈
// 方式4 構建六邊形執(zhí)行順序
disruptor.handleEventsWith(eventHandler1, eventHandler3);
disruptor.after(eventHandler1).handleEventsWith(eventHandler2);
disruptor.after(eventHandler3).handleEventsWith(eventHandler4);
disruptor.after(eventHandler2, eventHandler4).handleEventsWith(eventHandler5);

// 3, 啟動disruptor即啟動線程池線程執(zhí)行BatchEventProcessor任務
disruptor.start();

// 4,生產者往ringBuffer生產數據并喚醒所有的消費者消費數據
RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();
ByteBuffer bb = ByteBuffer.allocate(8);
bb.putLong(0, 666);
ringBuffer.publishEvent(new LongEventTranslatorOneArg(), bb);
}

static class LongEventTranslatorOneArg implements EventTranslatorOneArg<LongEvent, ByteBuffer> {
@Override
public void translateTo(LongEvent event, long sequence, ByteBuffer buffer) {
event.set(buffer.getLong(0));
}
}

static class LongEvent
{
private long value;

public void set(long value)
{
this.value = value;
}

public long get() {
return this.value;
}
}

static class LongEventFactory implements EventFactory<LongEvent>
{
@Override
public LongEvent newInstance()
{
return new LongEvent();
}
}

static class LongEventHandler1 implements EventHandler<LongEvent>
{
@Override
public void onEvent(LongEvent event, long sequence, boolean endOfBatch)
{
System.out.println("LongEventHandler1-" + event.get() + " executed by " + Thread.currentThread().getName());
}
}

static class LongEventHandler2 implements EventHandler<LongEvent>
{
@Override
public void onEvent(LongEvent event, long sequence, boolean endOfBatch)
{
System.out.println("LongEventHandler2-" + event.get() + " executed by " + Thread.currentThread().getName());
}
}

static class LongEventHandler3 implements EventHandler<LongEvent>
{
@Override
public void onEvent(LongEvent event, long sequence, boolean endOfBatch)
{
System.out.println("LongEventHandler3-" + event.get() + " executed by " + Thread.currentThread().getName());
}
}

static class LongEventHandler4 implements EventHandler<LongEvent>
{
@Override
public void onEvent(LongEvent event, long sequence, boolean endOfBatch)
{
System.out.println("LongEventHandler4-" + event.get() + " executed by " + Thread.currentThread().getName());
}
}

static class LongEventHandler5 implements EventHandler<LongEvent>
{
@Override
public void onEvent(LongEvent event, long sequence, boolean endOfBatch)
{
System.out.println("LongEventHandler5-" + event.get() + " executed by " + Thread.currentThread().getName());
}
}
}

3.初始化Disruptor實例

先來看下前面DEMO中的初始化Disruptor實例代碼:

// 1,構建disruptor
final Disruptor<LongEvent> disruptor = new Disruptor<LongEvent>(
new LongEventFactory(),
BUFFER_SIZE,
Executors.newFixedThreadPool(5), // 線程池需要保證足夠的線程
ProducerType.SINGLE,
new YieldingWaitStrategy()
);

這句代碼最終是給Disruptor的ringBuffer和executor屬性賦值:

// Disruptor.java
public Disruptor(
final EventFactory<T> eventFactory,
final int ringBufferSize,
final Executor executor,
final ProducerType producerType,
final WaitStrategy waitStrategy)
{
this(
// 創(chuàng)建RingBuffer實例
RingBuffer.create(producerType, eventFactory, ringBufferSize, waitStrategy),
executor);
}

private Disruptor(final RingBuffer<T> ringBuffer, final Executor executor)
{
this.ringBuffer = ringBuffer;
this.executor = executor;
}

那么RingBuffer實例又是如何創(chuàng)建的呢?我們來看下RingBuffer.create(producerType, eventFactory, ringBufferSize, waitStrategy)這句源碼:

// RingBuffer.java
public static <E> RingBuffer<E> create(
final ProducerType producerType,
final EventFactory<E> factory,
final int bufferSize,
final WaitStrategy waitStrategy)
{
switch (producerType)
{
case SINGLE:
return createSingleProducer(factory, bufferSize, waitStrategy);
case MULTI:
return createMultiProducer(factory, bufferSize, waitStrategy);
default:
throw new IllegalStateException(producerType.toString());
}
}

首先會根據producerType來創(chuàng)建不同的Producer,以創(chuàng)建SingleProducerSequencer實例為例進去源碼看下:

// RingBuffer.java
public static <E> RingBuffer<E> createSingleProducer(
final EventFactory<E> factory,
final int bufferSize,
final WaitStrategy waitStrategy)
{
// 1,創(chuàng)建SingleProducerSequencer實例
SingleProducerSequencer sequencer = new SingleProducerSequencer(bufferSize, waitStrategy);
// 2,創(chuàng)建RingBuffer實例
return new RingBuffer<>(factory, sequencer);
}

3.1 創(chuàng)建SingleProducerSequencer實例

首先創(chuàng)建了SingleProducerSequencer實例,給SingleProducerSequencer實例的bufferSize和waitStrategy賦初值;

// AbstractSequencer.java
// SingleProducerSequencer父類
public AbstractSequencer(int bufferSize, WaitStrategy waitStrategy)
{
this.bufferSize = bufferSize;
this.waitStrategy = waitStrategy;
}

此外,創(chuàng)建SingleProducerSequencer實例時還初始化了一個成員變量cursor:

protected final Sequence cursor = new Sequence(Sequencer.INITIAL_CURSOR_VALUE);

即給cursor賦值了一個Sequence實例對象,Sequence是標識RingBuffer環(huán)形數組的下標,同時生產者和消費者也會同時維護各自的Sequence。最重要的是,**Sequence通過填充CPU緩存行避免了偽共享帶來的性能損耗**,來看下其填充緩存行源碼:

// Sequence.java
class LhsPadding
{
// 左填充
protected long p1, p2, p3, p4, p5, p6, p7;
}

class Value extends LhsPadding
{
// Sequence值
protected volatile long value;
}

class RhsPadding extends Value
{
// 右填充
protected long p9, p10, p11, p12, p13, p14, p15;
}

public class Sequence extends RhsPadding
{
// ...
}

3.2 創(chuàng)建RingBuffer實例

然后核心是創(chuàng)建RingBuffer實例,看看最終創(chuàng)建RingBuffer實例源碼:

// RingBuffer.java
RingBufferFields( // RingBufferFields為RingBuffer父類
final EventFactory<E> eventFactory,
final Sequencer sequencer)
{
this.sequencer = sequencer;
this.bufferSize = sequencer.getBufferSize();

if (bufferSize < 1)
{
throw new IllegalArgumentException("bufferSize must not be less than 1");
}
if (Integer.bitCount(bufferSize) != 1)
{
throw new IllegalArgumentException("bufferSize must be a power of 2");
}

this.indexMask = bufferSize - 1;
// 【重要特性】內存預加載,內存池機制
this.entries = (E[]) new Object[sequencer.getBufferSize() + 2 * BUFFER_PAD];
fill(eventFactory);
}

實例作為構造參數傳入給了RingBuffer實例的sequencer屬性賦初值,然后最重要的是在創(chuàng)建RingBuffer實例時,會為RingBuffer的環(huán)形數組提前填充Event對象,即內存池機制:

private void fill(final EventFactory<E> eventFactory)
{
for (int i = 0; i < bufferSize; i++)
{
entries[BUFFER_PAD + i] = eventFactory.newInstance();
}
}

內存池機制好處:

  • 提前創(chuàng)建好復用的對象,減少程序運行時因為創(chuàng)建對象而浪費性能,其實也是一種空間換時間的思想;
  • 因為環(huán)形數組對象可復用,從而避免GC來提高性能。

4.構建執(zhí)行順序鏈

// 2,構建eventHandler執(zhí)行鏈:構建六邊形執(zhí)行順序
disruptor.handleEventsWith(eventHandler1, eventHandler3);
disruptor.after(eventHandler1).handleEventsWith(eventHandler2);
disruptor.after(eventHandler3).handleEventsWith(eventHandler4);
disruptor.after(eventHandler2, eventHandler4).handleEventsWith(eventHandler5);

再來看看Disruptor構建執(zhí)行順序鏈相關源碼:

先來看看disruptor.handleEventsWith(eventHandler1, eventHandler3);源碼:

// Disruptor.java
public final EventHandlerGroup<T> handleEventsWith(final EventHandler<? super T>... handlers)
{
return createEventProcessors(new Sequence[0], handlers);
}

EventHandlerGroup<T> createEventProcessors(
final Sequence[] barrierSequences,
final EventHandler<? super T>[] eventHandlers)
{
checkNotStarted();
// 根據eventHandlers長度來創(chuàng)建多少個消費者Sequence實例,注意這個processorSequences是傳遞到EventHandlerGroup用于構建執(zhí)行順序鏈用的,
// 比如有執(zhí)行順序鏈:A->B,那么A的sequenct即processorSequences會作為B節(jié)點的barrierSequences即dependencySequence
final Sequence[] processorSequences = new Sequence[eventHandlers.length];
// 新建了一個ProcessingSequenceBarrier實例返回
// ProcessingSequenceBarrier實例作用:序號屏障,通過追蹤生產者的cursorSequence和每個消費者( EventProcessor)
// 的sequence的方式來協(xié)調生產者和消費者之間的數據交換進度
final SequenceBarrier barrier = ringBuffer.newBarrier(barrierSequences);// 如果構建執(zhí)行順序鏈比如A->B,那么barrierSequences是A消費者的sequence;如果是A,C->B,那么barrierSequences是A和C消費者的sequence

for (int i = 0, eventHandlersLength = eventHandlers.length; i < eventHandlersLength; i++)
{
final EventHandler<? super T> eventHandler = eventHandlers[i];
// 有多少個eventHandlers就創(chuàng)建多少個BatchEventProcessor實例(消費者),
// 但需要注意的是同一批次的每個BatchEventProcessor實例共用同一個SequenceBarrier實例
final BatchEventProcessor<T> batchEventProcessor =
new BatchEventProcessor<>(ringBuffer, barrier, eventHandler);

if (exceptionHandler != null)
{
batchEventProcessor.setExceptionHandler(exceptionHandler);
}
// 將batchEventProcessor, eventHandler, barrier封裝成EventProcessorInfo實例并加入到ConsumerRepository相關集合
// ConsumerRepository作用:提供存儲機制關聯(lián)EventHandlers和EventProcessors
consumerRepository.add(batchEventProcessor, eventHandler, barrier); // // 如果構建執(zhí)行順序鏈比如A->B,那么B消費者也一樣會加入consumerRepository的相關集合
// 獲取到每個消費的消費sequece并賦值給processorSequences數組
// 即processorSequences[i]引用了BatchEventProcessor的sequence實例,
// 但processorSequences[i]又是構建生產者gatingSequence和消費者執(zhí)行器鏈dependentSequence的來源
processorSequences[i] = batchEventProcessor.getSequence();
}
// 總是拿執(zhí)行器鏈最后一個消費者的sequence作為生產者的gateingSequence
updateGatingSequencesForNextInChain(barrierSequences, processorSequences);
// 最終返回封裝了Disruptor、ConsumerRepository和消費者sequence數組processorSequences的EventHandlerGroup對象實例返回
return new EventHandlerGroup<>(this, consumerRepository, processorSequences);
}

構建Disruptor執(zhí)行順序鏈的核心邏輯就在這段源碼中,我們縷一縷核心邏輯:

  • 有多少個eventHandlers就創(chuàng)建多少個BatchEventProcessor實例(消費者),BatchEventProcessor消費者其實就是一個實現Runnable接口的線程實例;
  • 每個BatchEventProcessor實例(消費者)擁有前一個消費者的sequence作為其sequenceBarrier即dependentSequence;
  • 當前消費者的sequence通過EventHandlerGroup這個載體來傳遞給下一個消費者作為其sequenceBarrier即dependentSequence。

再來看看diruptor.after(eventHandler1)源碼:

// Disruptor.java
public final EventHandlerGroup<T> after(final EventHandler<T>... handlers)
{
// 獲取指定的EventHandler的消費者sequence并賦值給sequences數組,
// 然后重新新建一個EventHandlerGroup實例返回(封裝了前面的指定的消費者sequence被賦值
// 給了EventHandlerGroup的成員變量數組sequences,用于后面指定執(zhí)行順序用)
final Sequence[] sequences = new Sequence[handlers.length];
for (int i = 0, handlersLength = handlers.length; i < handlersLength; i++)
{
sequences[i] = consumerRepository.getSequenceFor(handlers[i]);
}

return new EventHandlerGroup<>(this, consumerRepository, sequences);
}

這段源碼做的事情也是將當前消費者sequence封裝進EventHandlerGroup,從而可以通過這個載體來傳遞給下一個消費者作為其sequenceBarrier即dependentSequence。

最終構建的最終sequence依賴關系如下圖,看到這個圖不禁讓我想起AQS的線程等待鏈即CLH鎖的變相實現,附上文章鏈接,有興趣的讀者可以比對理解。

5.啟動Disruptor實例

// 3, 啟動disruptor即啟動線程池線程執(zhí)行BatchEventProcessor任務
disruptor.start();

我們再來看看 disruptor.start()這句源碼:

// Disruptor.java
public RingBuffer<T> start()
{
checkOnlyStartedOnce();
// 遍歷每一個BatchEventProcessor消費者(線程)實例,并把該消費者線程實例跑起來
for (final ConsumerInfo consumerInfo : consumerRepository)
{
consumerInfo.start(executor);
}

return ringBuffer;
}

其實這里做的事情無非就是遍歷每個消費者線程實例,然后啟動每個消費者線程實例BatchEventProcessor,其中BatchEventProcessor被封裝進ConsumerInfo實例。還沒生產數據就啟動消費線程的話,此時消費者會根據阻塞策略WaitStrategy進行阻塞。

6.生產消費數據

6.1 生產者生產數據

// 4,生產者往ringBuffer生產數據并喚醒所有的消費者消費數據
RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();
ByteBuffer bb = ByteBuffer.allocate(8);
bb.putLong(0, 666);
ringBuffer.publishEvent(new LongEventTranslatorOneArg(), bb);

生產者生產數據的源碼在ringBuffer.publishEvent(new LongEventTranslatorOneArg(), bb);中。

// RingBuffer.java
public <A> void publishEvent(final EventTranslatorOneArg<E, A> translator, final A arg0)
{
// 【1】獲取下一個RingBuffer中需填充數據的event對象的序號,對應生產者
final long sequence = sequencer.next();
// 【2】轉換數據格式并生產數據并喚醒消費者
translateAndPublish(translator, sequence, arg0);
}

6.1.1 生產者獲取RingBuffer的sequence

先來看下單生產者獲取sequence的源碼:

// SingleProducerSequencer.java
public long next(final int n)
{
if (n < 1 || n > bufferSize)
{
throw new IllegalArgumentException("n must be > 0 and < bufferSize");
}
// 總是拿到生產者已生產的當前序號
long nextValue = this.nextValue;
// 獲取要生產的下n個序號
long nextSequence = nextValue + n;
// 生產者總是先有bufferSize個坑可以填,所以nextSequence - bufferSize
long wrapPoint = nextSequence - bufferSize;
// 拿到上一次的GatingSequence,因為是緩存,這里不是最新的
long cachedGatingSequence = this.cachedValue;
// 如果生產者生產超過了消費者消費速度,那么這里自旋等待,這里的生產者生產的下標wrapPoint是已經繞了RingBuffer一圈的了哈
if (wrapPoint > cachedGatingSequence || cachedGatingSequence > nextValue)
{
cursor.setVolatile(nextValue); // StoreLoad fence

long minSequence;
// 自旋等待,其中gatingSequences是前面構建執(zhí)行順序鏈時的最后一個消費者的sequence
while (wrapPoint > (minSequence = Util.getMinimumSequence(gatingSequences, nextValue)))
{
LockSupport.parkNanos(1L); // TODO: Use waitStrategy to spin?
}

this.cachedValue = minSequence;
}
// 將獲取的nextSequence賦值給生產者當前值nextValue
this.nextValue = nextSequence;

return nextSequence;
}

這段源碼相對較難,我們縷一縷:

  • 生產者把第一圈RingBuffer的坑填完后,此時生產者進入RingBuffer第2圈,如果消費者消費速度過慢,此時生產者很可能會追上消費者,如果追上消費者那么就讓生產者自旋等待;
  • 第1點的如果消費者消費速度過慢,對于構建了一個過濾器鏈的消費者中,那么指的是哪個消費者呢?指的就是執(zhí)行器鏈最后執(zhí)行的那個消費者gatingSequences就是執(zhí)行器鏈最后執(zhí)行的那個消費者的sequence;這個gatingSequences其實就是防止生產者追趕消費者的sequenceBarrier;

  • 生產者總是先把第一圈RingBuffer填滿后,才會考慮追趕消費者的問題,因此才有wrapPoint > cachedGatingSequence的評判條件。

前面是單生產者獲取sequence的源碼,對于多生產者MultiProducerSequencer的源碼邏輯也是類似,只不過將生產者當前值cursor和cachedGatingSequence用了CAS操作而已,防止多線程問題。

6.1.2 生產者生產數據并喚醒消費者

再來看看 translateAndPublish(translator, sequence, arg0)源碼:

// RingBuffer.java
private <A> void translateAndPublish(final EventTranslatorOneArg<E, A> translator, final long sequence, final A arg0)
{
try
{
// 【1】將相應數據arg0轉換為相應的Eevent數據,其中get(sequence)會從RingBuffer數組對象池中取出一個對象,而非新建
translator.translateTo(get(sequence), sequence, arg0);
}
finally
{
// 【2】發(fā)布該序號說明已經生產完畢供消費者使用
sequencer.publish(sequence);
}
}



// SingleProducerSequencer.java
public void publish(final long sequence)
{
// 【1】給生產者cursor游標賦值新的sequence,說明該sequenc對應的對象數據已經填充(生產)完畢
cursor.set(sequence);// 這個cursor即生產者生產時移動的游標,是AbstractSequencer的成員變量
// 【2】根據阻塞策略將所有消費者喚醒
// 注意:這個waitStrategy實例是所有消費者和生產者共同引用的
waitStrategy.signalAllWhenBlocking();
}

生產者生產數據并喚醒消費者的注釋已經寫得很清楚了,這里需要注意的點:

  • cursor才是生產者生產數據的當前下標,消費者消費速度有無追趕上生產者就是拿消費者的消費sequence跟生產者的cursor比較的,因此生產者生產數據完成后需要給cursor賦值;
  • waitStrategy策略對象時跟消費者共用的,這樣才能線程間實現阻塞喚醒邏輯。

6.2 消費者消費數據

前面第4節(jié)啟動Disruptor實例中講到,其實就是開啟各個消費者實例BatchEventProcessor線程,我們看看其run方法中的核心邏輯即processEvents源碼:

// BatchEventProcessor.java
private void processEvents()
{
T event = null;
// nextSequence:消費者要消費的下一個序號
long nextSequence = sequence.get() + 1L; // 【重要】每一個消費者都是從0開始消費,各個消費者維護各自的sequence
// 消費者線程一直在while循環(huán)中不斷獲取生產者數據
while (true)
{
try
{
// 拿到當前生產者的生產序號
final long availableSequence = sequenceBarrier.waitFor(nextSequence);
if (batchStartAware != null)
{
batchStartAware.onBatchStart(availableSequence - nextSequence + 1);
}
// 如果消費者要消費的下一個序號小于生產者的當前生產序號,那么消費者則進行消費
// 這里有一個亮點:就是消費者會一直循環(huán)消費直至到達當前生產者生產的序號
while (nextSequence <= availableSequence)
{
event = dataProvider.get(nextSequence);
eventHandler.onEvent(event, nextSequence, nextSequence == availableSequence);
nextSequence++;
}
// 消費完后設置當前消費者的消費進度,這點很重要
// 【1】如果當前消費者是執(zhí)行鏈的最后一個消費者,那么其sequence則是生產者的gatingSequence,因為生產者就是拿要生產的下一個sequence跟gatingSequence做比較的哈
// 【2】如果當前消費者不是執(zhí)行器鏈的最后一個消費者,那么其sequence作為后面消費者的dependentSequence
sequence.set(availableSequence);
}
catch (final TimeoutException e)
{
notifyTimeout(sequence.get());
}
catch (final AlertException ex)
{
if (running.get() != RUNNING)
{
break;
}
}
catch (final Throwable ex)
{
handleEventException(ex, nextSequence, event);
sequence.set(nextSequence);
nextSequence++;
}
}
}

消費者線程起來后,然后進入死循環(huán),持續(xù)不斷從生產者處批量獲取可用的序號,如果獲取到可用序號后,那么遍歷所有可用序號,然后調用eventHandler的onEvent方法消費數據,onEvent方法寫的是消費者的業(yè)務邏輯。消費完后再設置當前消費者的消費進度,這點很重要,用于構建sequenceBarrier包括gatingSequence和dependentSequence。

下面再來看看消費者是怎么獲取可用的序號的,繼續(xù)看sequenceBarrier.waitFor(nextSequence)源碼:

// ProcessingSequenceBarrier.java

public long waitFor(final long sequence)
throws AlertException, InterruptedException, TimeoutException
{
checkAlert();
// availableSequence:獲取生產者生產后可用的序號
// sequence:消費者要消費的下一個序號
// cursorSequence:生產者生產數據時的當前序號
// dependentSequence:第一個消費者即前面不依賴任何消費者的消費者,dependentSequence就是生產者游標;
// 有依賴其他消費者的消費者,dependentSequence就是依賴的消費者的sequence
long availableSequence = waitStrategy.waitFor(sequence, cursorSequence, dependentSequence, this);

if (availableSequence < sequence)
{
return availableSequence;
}
// 這個主要是針對多生產者的情形
return sequencer.getHighestPublishedSequence(sequence, availableSequence);
}

可以看到ProcessingSequenceBarrier封裝了WaitStrategy等待策略實例,此時消費者獲取下一批可用序號的邏輯又封裝在了WaitStrategy的waitFor方法中,以BlockingWaitStrategy為例來其實現邏輯:

// BlockingWaitStrategy.java

public long waitFor(long sequence, Sequence cursorSequence, Sequence dependentSequence, SequenceBarrier barrier)
throws AlertException, InterruptedException
{
long availableSequence;
// cursorSequence:生產者的序號
// 第一重條件判斷:如果消費者消費速度大于生產者生產速度(即消費者要消費的下一個數據已經大于生產者生產的數據時),那么消費者等待一下
if (cursorSequence.get() < sequence)
{
lock.lock();
try
{
while (cursorSequence.get() < sequence)
{
barrier.checkAlert();
processorNotifyCondition.await();
}
}
finally
{
lock.unlock();
}
}
// 第一重條件判斷:自旋等待
// 即當前消費者線程要消費的下一個sequence大于其前面執(zhí)行鏈路(若有依賴關系)的任何一個消費者最小sequence(dependentSequence.get()),那么這個消費者要自旋等待,
// 直到前面執(zhí)行鏈路(若有依賴關系)的任何一個消費者最小sequence(dependentSequence.get())已經大于等于當前消費者的sequence時,說明前面執(zhí)行鏈路的消費者已經消費完了
while ((availableSequence = dependentSequence.get()) < sequence)
{
barrier.checkAlert();
ThreadHints.onSpinWait();
}

return availableSequence;
}

可以看到,消費者獲取下一批可用消費序號時,此時要經過兩重判斷:

  • 第一重判斷:消費者消費的序號不能超過當前生產者消費當前生產的序號,否則消費者就阻塞等待;當然,這里因為是BlockingWaitStrategy等待策略的實現,如果是其他策略,比如BusySpinWaitStrategy和YieldingWaitStrategy的話,這里消費者是不會阻塞等待的,而是自旋,因此這也是其無鎖化的實現了,但就是很耗CPU而已;
  • 第二重判斷:消費者消費的序號不能超過其前面依賴的消費消費的序號,否則其自旋等待。因為這里是消費者等消費者,按理說前面消費者應該會很快處理完,所以不用阻塞等待;但是消費者等待生產者的話,如果生產者沒生產數據的話,消費者還是自旋等待的話會比較浪費CPU,所以對于BlockingWaitStrategy策略,是阻塞等待了。

7.WaitStrategy等待策略

最后,再來看下WaitStrategy有哪些實現類:

可以看到消費者的WaitStrategy等待策略有8種實現類,可以分為有鎖和無鎖兩大類,然后每一種都有其適用的場合,沒有最好的WaitStrategy等待策略,只有適合自己應用場景的等待策略。因為其源碼不是很難,這里逐一分析。

責任編輯:武曉燕 來源: 源碼筆記
相關推薦

2021-09-08 10:47:33

Flink執(zhí)行流程

2021-11-26 17:17:43

Android廣播運行原理源碼分析

2022-05-10 08:47:00

JMeter作用域執(zhí)行順序

2010-04-16 09:27:18

Ocacle執(zhí)行計劃

2022-08-27 08:02:09

SQL函數語法

2021-04-15 09:17:01

SpringBootRocketMQ

2010-09-01 09:03:56

CSS優(yōu)先權

2021-09-13 15:40:37

區(qū)塊鏈教育技術

2009-06-16 10:51:14

Java源碼

2016-10-21 13:03:18

androidhandlerlooper

2010-08-04 13:33:52

路由器配置

2019-12-10 09:54:20

高德APP架構全鏈路

2016-11-29 09:38:06

Flume架構核心組件

2016-11-25 13:26:50

Flume架構源碼

2023-09-04 08:00:53

提交事務消息

2016-11-25 13:14:50

Flume架構源碼

2009-07-03 16:33:13

Tapestry函數執(zhí)

2022-01-21 08:50:15

Promise任務隊列前端

2021-05-07 13:42:58

區(qū)塊鏈互聯(lián)網技術

2012-05-16 13:45:24

Java構造器
點贊
收藏

51CTO技術棧公眾號