自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

Flume架構(gòu)與源碼分析-核心組件分析-1

開(kāi)發(fā) 開(kāi)發(fā)工具
start方法在整個(gè)Flume啟動(dòng)時(shí)或者初始化組件時(shí)都會(huì)調(diào)用start方法進(jìn)行組件初始化,F(xiàn)lume組件出現(xiàn)異常停止時(shí)會(huì)調(diào)用stop,getLifecycleState返回組件的生命周期狀態(tài),有IDLE, START, STOP, ERROR四個(gè)狀態(tài)。

[[177130]]

首先所有核心組件都會(huì)實(shí)現(xiàn)org.apache.flume.lifecycle.LifecycleAware接口:

Java代碼

  1. public interface LifecycleAware {   
  2.   public void start();   
  3.   public void stop();   
  4.   public LifecycleState getLifecycleState();   
  5. }   

start方法在整個(gè)Flume啟動(dòng)時(shí)或者初始化組件時(shí)都會(huì)調(diào)用start方法進(jìn)行組件初始化,F(xiàn)lume組件出現(xiàn)異常停止時(shí)會(huì)調(diào)用stop,getLifecycleState返回組件的生命周期狀態(tài),有IDLE, START, STOP, ERROR四個(gè)狀態(tài)。

如果開(kāi)發(fā)的組件需要配置,如設(shè)置一些屬性;可以實(shí)現(xiàn)org.apache.flume.conf.Configurable接口:

Java代碼

  1. public interface Configurable {   
  2.    public void configure(Context context);   
  3. }   

Flume在啟動(dòng)組件之前會(huì)調(diào)用configure來(lái)初始化組件一些配置。

1、Source

Source用于采集日志數(shù)據(jù),有兩種實(shí)現(xiàn)方式:輪訓(xùn)拉取和事件驅(qū)動(dòng)機(jī)制;Source接口如下:

Java代碼

  1. public interface Source extends LifecycleAware, NamedComponent {   
  2.   public void setChannelProcessor(ChannelProcessor channelProcessor);   
  3.   public ChannelProcessor getChannelProcessor();   
  4. }    

Source接口首先繼承了LifecycleAware接口,然后只提供了ChannelProcessor的setter和getter接口,也就是說(shuō)它的的所有邏輯的實(shí)現(xiàn)應(yīng)該在LifecycleAware接口的start和stop中實(shí)現(xiàn);ChannelProcessor之前介紹過(guò)用來(lái)進(jìn)行日志流的過(guò)濾和Channel的選擇及調(diào)度。

而Source是通過(guò)SourceFactory工廠創(chuàng)建,默認(rèn)提供了DefaultSourceFactory,其首先通過(guò)Enum類型org.apache.flume.conf.source.SourceType查找默認(rèn)實(shí)現(xiàn),如exec,則找到org.apache.flume.source.ExecSource實(shí)現(xiàn),如果找不到直接Class.forName(className)創(chuàng)建。

Source提供了兩種機(jī)制: PollableSource(輪訓(xùn)拉取)和EventDrivenSource(事件驅(qū)動(dòng)):

PollableSource默認(rèn)提供了如下實(shí)現(xiàn):

比如JMSSource實(shí)現(xiàn)使用javax.jms.MessageConsumer.receive(pollTimeout)主動(dòng)去拉取消息。

EventDrivenSource默認(rèn)提供了如下實(shí)現(xiàn):

比如NetcatSource、HttpSource就是事件驅(qū)動(dòng),即被動(dòng)等待;比如HttpSource就是內(nèi)部啟動(dòng)了一個(gè)內(nèi)嵌的Jetty啟動(dòng)了一個(gè)Servlet容器,通過(guò)FlumeHTTPServlet去接收消息。

Flume提供了SourceRunner用來(lái)啟動(dòng)Source的流轉(zhuǎn):

Java代碼

  1. public class EventDrivenSourceRunner extends SourceRunner {   
  2.   private LifecycleState lifecycleState;   
  3.   public EventDrivenSourceRunner() {   
  4.       lifecycleState = LifecycleState.IDLE; //啟動(dòng)之前是空閑狀態(tài)   
  5.   }   
  6.    
  7.   @Override   
  8.   public void start() {   
  9.     Source source = getSource(); //獲取Source   
  10.     ChannelProcessor cp = source.getChannelProcessor(); //Channel處理器   
  11.     cp.initialize(); //初始化Channel處理器   
  12.     source.start();  //啟動(dòng)Source   
  13.     lifecycleState = LifecycleState.START; //本組件狀態(tài)改成啟動(dòng)狀態(tài)   
  14.   }   
  15.   @Override   
  16.   public void stop() {   
  17.     Source source = getSource(); //先停Source   
  18.     source.stop();   
  19.     ChannelProcessor cp = source.getChannelProcessor();   
  20.     cp.close();//再停Channel處理器   
  21.     lifecycleState = LifecycleState.STOP; //本組件狀態(tài)改成停止?fàn)顟B(tài)   
  22.   }   
  23. }    

從本組件也可以看出:1、首先要初始化ChannelProcessor,其實(shí)現(xiàn)時(shí)初始化過(guò)濾器鏈;2、接著啟動(dòng)Source并更改本組件的狀態(tài)。

Java代碼

  1. public class PollableSourceRunner extends SourceRunner {   
  2.  @Override   
  3.  public void start() {   
  4.   PollableSource source = (PollableSource) getSource();   
  5.   ChannelProcessor cp = source.getChannelProcessor();   
  6.   cp.initialize();   
  7.   source.start();   
  8.    
  9.   runner = new PollingRunner();   
  10.   runner.source = source;   
  11.   runner.counterGroup = counterGroup;   
  12.   runner.shouldStop = shouldStop;   
  13.    
  14.   runnerThread = new Thread(runner);   
  15.   runnerThread.setName(getClass().getSimpleName() + "-" +    
  16.       source.getClass().getSimpleName() + "-" + source.getName());   
  17.   runnerThread.start();    
  18.    
  19.   lifecycleState = LifecycleState.START;   
  20.  }   
  21. }    

而PollingRunner首先初始化組件,但是又啟動(dòng)了一個(gè)線程PollingRunner,其作用就是輪訓(xùn)拉取數(shù)據(jù):

Java代碼

  1. @Override   
  2.   public void run() {   
  3.     while (!shouldStop.get()) { //如果沒(méi)有停止,則一直在死循環(huán)運(yùn)行   
  4.       counterGroup.incrementAndGet("runner.polls");   
  5.    
  6.       try {   
  7.         //調(diào)用PollableSource的process方法進(jìn)行輪訓(xùn)拉取,然后判斷是否遇到了失敗補(bǔ)償   
  8.         if (source.process().equals(PollableSource.Status.BACKOFF)) {/   
  9.           counterGroup.incrementAndGet("runner.backoffs");   
  10.    
  11.           //失敗補(bǔ)償時(shí)暫停線程處理,等待超時(shí)時(shí)間之后重試   
  12.           Thread.sleep(Math.min(   
  13.               counterGroup.incrementAndGet("runner.backoffs.consecutive")   
  14.               * source.getBackOffSleepIncrement(), source.getMaxBackOffSleepInterval()));   
  15.         } else {   
  16.           counterGroup.set("runner.backoffs.consecutive", 0L);   
  17.         }   
  18.       } catch (InterruptedException e) {   
  19.                 }   
  20.       }   
  21.     }   
  22.   }   
  23. }    

Flume在啟動(dòng)時(shí)會(huì)判斷Source是PollableSource還是EventDrivenSource來(lái)選擇使用PollableSourceRunner還是EventDrivenSourceRunner。

比如HttpSource實(shí)現(xiàn),其通過(guò)FlumeHTTPServlet接收消息然后:

Java代碼

  1. List<Event> events = Collections.emptyList(); //create empty list   
  2. //首先從請(qǐng)求中獲取Event   
  3. events = handler.getEvents(request);   
  4. //然后交給ChannelProcessor進(jìn)行處理   
  5. getChannelProcessor().processEventBatch(events);    

到此基本的Source流程就介紹完了,其作用就是監(jiān)聽(tīng)日志,采集,然后交給ChannelProcessor進(jìn)行處理。

2、Channel

Channel用于連接Source和Sink,Source生產(chǎn)日志發(fā)送到Channel,Sink從Channel消費(fèi)日志;也就是說(shuō)通過(guò)Channel實(shí)現(xiàn)了Source和Sink的解耦,可以實(shí)現(xiàn)多對(duì)多的關(guān)聯(lián),和Source、Sink的異步化。

之前Source采集到日志后會(huì)交給ChannelProcessor處理,那么接下來(lái)我們先從ChannelProcessor入手,其依賴三個(gè)組件:

Java代碼

  1. private final ChannelSelector selector;  //Channel選擇器   
  2. private final InterceptorChain interceptorChain; //攔截器鏈   
  3. private ExecutorService execService; //用于實(shí)現(xiàn)可選Channel的ExecutorService,默認(rèn)是單線程實(shí)現(xiàn)    

接下來(lái)看下其是如何處理Event的:

Java代碼

  1. public void processEvent(Event event) {   
  2.   event = interceptorChain.intercept(event); //首先進(jìn)行攔截器鏈過(guò)濾   
  3.   if (event == null) {   
  4.     return;   
  5.   }   
  6.   List<Event> events = new ArrayList<Event>(1);   
  7.   events.add(event);   
  8.    
  9.   //通過(guò)Channel選擇器獲取必須成功處理的Channel,然后事務(wù)中執(zhí)行   
  10.   List<Channel> requiredChannels = selector.getRequiredChannels(event);   
  11.   for (Channel reqChannel : requiredChannels) {    
  12.     executeChannelTransaction(reqChannel, events, false);   
  13.   }   
  14.    
  15.   //通過(guò)Channel選擇器獲取可選的Channel,這些Channel失敗是可以忽略,不影響其他Channel的處理   
  16.   List<Channel> optionalChannels = selector.getOptionalChannels(event);   
  17.   for (Channel optChannel : optionalChannels) {   
  18.     execService.submit(new OptionalChannelTransactionRunnable(optChannel, events));   
  19.   }   
  20. }    

另外內(nèi)部還提供了批處理實(shí)現(xiàn)方法processEventBatch;對(duì)于內(nèi)部事務(wù)實(shí)現(xiàn)的話可以參考executeChannelTransaction方法,整體事務(wù)機(jī)制類似于JDBC:

Java代碼

  1. private static void executeChannelTransaction(Channel channel, List<Event> batch, boolean isOptional) {   
  2.   //1、獲取Channel上的事務(wù)   
  3.   Transaction tx = channel.getTransaction();   
  4.   Preconditions.checkNotNull(tx, "Transaction object must not be null");   
  5.   try {   
  6.     //2、開(kāi)啟事務(wù)   
  7.     tx.begin();   
  8.     //3、在Channel上執(zhí)行批量put操作   
  9.     for (Event event : batch) {   
  10.       channel.put(event);   
  11.     }   
  12.     //4、成功后提交事務(wù)   
  13.     tx.commit();   
  14.   } catch (Throwable t) {   
  15.     //5、異常后回滾事務(wù)   
  16.     tx.rollback();   
  17.     if (t instanceof Error) {   
  18.        LOG.error("Error while writing to channel: " +   
  19.            channel, t);   
  20.        throw (Error) t;   
  21.     } else if(!isOptional) {//如果是可選的Channel,異常忽略   
  22.        throw new ChannelException("Unable to put batch on required " +   
  23.              "channel: " + channel, t);   
  24.     }   
  25.   } finally {   
  26.     //***關(guān)閉事務(wù)   
  27.     tx.close();   
  28.   }   
  29. }   

Interceptor用于過(guò)濾Event,即傳入一個(gè)Event然后進(jìn)行過(guò)濾加工,然后返回一個(gè)新的Event,接口如下:

Java代碼

  1. public interface Interceptor {   
  2.     public void initialize();   
  3.     public Event intercept(Event event);   
  4.     public List<Event> intercept(List<Event> events);   
  5.     public void close();   
  6. }    

可以看到其提供了initialize和close方法用于啟動(dòng)和關(guān)閉;intercept方法用于過(guò)濾或加工Event。比如HostInterceptor攔截器用于獲取本機(jī)IP然后默認(rèn)添加到Event的字段為host的Header中。

接下來(lái)就是ChannelSelector選擇器了,其通過(guò)如下方式創(chuàng)建:

Java代碼

  1. //獲取ChannelSelector配置,比如agent.sources.s1.selector.type = replicating   
  2. ChannelSelectorConfiguration selectorConfig = config.getSelectorConfiguration();   
  3. //使用Source關(guān)聯(lián)的Channel創(chuàng)建,比如agent.sources.s1.channels = c1 c2   
  4. ChannelSelector selector = ChannelSelectorFactory.create(sourceChannels, selectorConfig);    

ChannelSelector默認(rèn)提供了兩種實(shí)現(xiàn):復(fù)制和多路復(fù)用:

默認(rèn)實(shí)現(xiàn)是復(fù)制選擇器ReplicatingChannelSelector,即把接收到的消息復(fù)制到每一個(gè)Channel;多路復(fù)用選擇器MultiplexingChannelSelector會(huì)根據(jù)Event Header中的參數(shù)進(jìn)行選擇,以此來(lái)選擇使用哪個(gè)Channel。

而Channel是Event中轉(zhuǎn)的地方,Source發(fā)布Event到Channel,Sink消費(fèi)Channel的Event;Channel接口提供了如下接口用來(lái)實(shí)現(xiàn)Event流轉(zhuǎn):

Java代碼

  1. public interface Channel extends LifecycleAware, NamedComponent {   
  2.   public void put(Event event) throws ChannelException;   
  3.   public Event take() throws ChannelException;   
  4.   public Transaction getTransaction();   
  5. }    

put用于發(fā)布Event,take用于消費(fèi)Event,getTransaction用于事務(wù)支持。默認(rèn)提供了如下Channel的實(shí)現(xiàn):

對(duì)于Channel的實(shí)現(xiàn)我們后續(xù)單獨(dú)章節(jié)介紹。

3、Sink

Sink從Channel消費(fèi)Event,然后進(jìn)行轉(zhuǎn)移到收集/聚合層或存儲(chǔ)層。Sink接口如下所示:

Java代碼

  1. public interface Sink extends LifecycleAware, NamedComponent {   
  2.   public void setChannel(Channel channel);   
  3.   public Channel getChannel();   
  4.   public Status process() throws EventDeliveryException;   
  5.   public static enum Status {   
  6.     READY, BACKOFF   
  7.   }   
  8. }    

類似于Source,其首先繼承了LifecycleAware,然后提供了Channel的getter/setter方法,并提供了process方法進(jìn)行消費(fèi),此方法會(huì)返回消費(fèi)的狀態(tài),READY或BACKOFF。

Sink也是通過(guò)SinkFactory工廠來(lái)創(chuàng)建,其也提供了DefaultSinkFactory默認(rèn)工廠,比如傳入hdfs,會(huì)先查找Enum org.apache.flume.conf.sink.SinkType,然后找到相應(yīng)的默認(rèn)處理類org.apache.flume.sink.hdfs.HDFSEventSink,如果沒(méi)找到默認(rèn)處理類,直接通過(guò)Class.forName(className)進(jìn)行反射創(chuàng)建。

我們知道Sink還提供了分組功能,用于把多個(gè)Sink聚合為一組進(jìn)行使用,內(nèi)部提供了SinkGroup用來(lái)完成這個(gè)事情。此時(shí)問(wèn)題來(lái)了,如何去調(diào)度多個(gè)Sink,其內(nèi)部使用了SinkProcessor來(lái)完成這個(gè)事情,默認(rèn)提供了故障轉(zhuǎn)移和負(fù)載均衡兩個(gè)策略。

首先SinkGroup就是聚合多個(gè)Sink為一組,然后將多個(gè)Sink傳給SinkProcessorFactory進(jìn)行創(chuàng)建SinkProcessor,而策略是根據(jù)配置文件中配置的如agent.sinkgroups.g1.processor.type = load_balance來(lái)選擇的。

SinkProcessor提供了如下實(shí)現(xiàn):

DefaultSinkProcessor:默認(rèn)實(shí)現(xiàn),用于單個(gè)Sink的場(chǎng)景使用。

FailoverSinkProcessor:故障轉(zhuǎn)移實(shí)現(xiàn):

Java代碼

  1. public Status process() throws EventDeliveryException {   
  2.   Long now = System.currentTimeMillis();   
  3.     //1、首先檢查失敗隊(duì)列的頭部的Sink是否已經(jīng)過(guò)了失敗補(bǔ)償?shù)却龝r(shí)間了   
  4.   while(!failedSinks.isEmpty() && failedSinks.peek().getRefresh() < now) {   
  5.     //2、如果可以使用了,則從失敗Sink隊(duì)列獲取隊(duì)列***個(gè)Sink   
  6.     FailedSink cur = failedSinks.poll();   
  7.     Status s;   
  8.     try {   
  9.       s = cur.getSink().process(); //3、使用此Sink進(jìn)行處理   
  10.       if (s  == Status.READY) { //4、如果處理成功   
  11.         liveSinks.put(cur.getPriority(), cur.getSink()); //4.1、放回存活Sink隊(duì)列   
  12.         activeSink = liveSinks.get(liveSinks.lastKey());   
  13.       } else {   
  14.         failedSinks.add(cur); //4.2、如果此時(shí)不是READY,即BACKOFF期間,再次放回失敗隊(duì)列   
  15.       }   
  16.       return s;   
  17.     } catch (Exception e) {   
  18.       cur.incFails(); //5、如果遇到異常了,則增加失敗次數(shù),并放回失敗隊(duì)列   
  19.       failedSinks.add(cur);   
  20.     }   
  21.   }   
  22.    
  23.   Status ret = null;   
  24.   while(activeSink != null) { //6、此時(shí)失敗隊(duì)列中沒(méi)有Sink能處理了,那么需要使用存活Sink隊(duì)列進(jìn)行處理   
  25.     try {   
  26.       ret = activeSink.process();   
  27.       return ret;   
  28.     } catch (Exception e) { //7、處理失敗進(jìn)行轉(zhuǎn)移到失敗隊(duì)列   
  29.       activeSink = moveActiveToDeadAndGetNext();   
  30.     }   
  31.   }   
  32.    
  33.   throw new EventDeliveryException("All sinks failed to process, " +   
  34.       "nothing left to failover to");   
  35. }   

失敗隊(duì)列是一個(gè)優(yōu)先級(jí)隊(duì)列,使用refresh屬性排序,而refresh是通過(guò)如下機(jī)制計(jì)算的:

Java代碼

  1. refresh = System.currentTimeMillis() 
  2. + Math.min(maxPenalty, (1 << sequentialFailures) * FAILURE_PENALTY); 

其中maxPenalty是***等待時(shí)間,默認(rèn)30s,而(1 << sequentialFailures) * FAILURE_PENALTY)用于實(shí)現(xiàn)指數(shù)級(jí)等待時(shí)間遞增, FAILURE_PENALTY是1s。

LoadBalanceSinkProcessor:用于實(shí)現(xiàn)Sink的負(fù)載均衡,其通過(guò)SinkSelector進(jìn)行實(shí)現(xiàn),類似于ChannelSelector。LoadBalanceSinkProcessor在啟動(dòng)時(shí)會(huì)根據(jù)配置,如agent.sinkgroups.g1.processor.selector = random進(jìn)行選擇,默認(rèn)提供了兩種選擇器:

LoadBalanceSinkProcessor使用如下機(jī)制進(jìn)行負(fù)載均衡:

Java代碼

  1. public Status process() throws EventDeliveryException {   
  2.   Status status = null;   
  3.   //1、使用選擇器創(chuàng)建相應(yīng)的迭代器,也就是用來(lái)選擇Sink的迭代器   
  4.   Iterator<Sink> sinkIterator = selector.createSinkIterator();   
  5.   while (sinkIterator.hasNext()) {   
  6.     Sink sink = sinkIterator.next();   
  7.     try {   
  8.       //2、選擇器迭代Sink進(jìn)行處理,如果成功直接break掉這次處理,此次負(fù)載均衡就算完成了   
  9.       status = sink.process();   
  10.       break;   
  11.     } catch (Exception ex) {   
  12.       //3、失敗后會(huì)通知選擇器,采取相應(yīng)的失敗退避補(bǔ)償算法進(jìn)行處理   
  13.       selector.informSinkFailed(sink);   
  14.       LOGGER.warn("Sink failed to consume event. "   
  15.           + "Attempting next sink if available.", ex);   
  16.     }   
  17.   }   
  18.   if (status == null) {   
  19.     throw new EventDeliveryException("All configured sinks have failed");   
  20.   }   
  21.   return status;   
  22. }    

如上的核心就是怎么創(chuàng)建迭代器,如何進(jìn)行失敗退避補(bǔ)償處理,首先我們看下RoundRobinSinkSelector實(shí)現(xiàn),其內(nèi)部是通過(guò)通用的RoundRobinOrderSelector選擇器實(shí)現(xiàn):

Java代碼

  1. public Iterator<T> createIterator() {   
  2.   //1、獲取存活的Sink索引,   
  3.   List<Integer> activeIndices = getIndexList();   
  4.   int size = activeIndices.size();   
  5.   //2、如果上次記錄的下一個(gè)存活Sink的位置超過(guò)了size,那么從隊(duì)列頭重新開(kāi)始計(jì)數(shù)   
  6.   if (nextHead >= size) {   
  7.     nextHead = 0;   
  8.   }   
  9.   //3、獲取本次使用的起始位置   
  10.   int begin = nextHead++;   
  11.   if (nextHead == activeIndices.size()) {   
  12.     nextHead = 0;   
  13.   }   
  14.   //4、從該位置開(kāi)始迭代,其實(shí)現(xiàn)類似于環(huán)形隊(duì)列,比如整個(gè)隊(duì)列是5,起始位置是3,則按照 3、4、0、1、2的順序進(jìn)行輪訓(xùn),實(shí)現(xiàn)了輪訓(xùn)算法    
  15.   int[] indexOrder = new int[size];   
  16.   for (int i = 0; i < size; i++) {   
  17.     indexOrder[i] = activeIndices.get((begin + i) % size);   
  18.   }   
  19.   //indexOrder是迭代順序,getObjects返回相關(guān)的Sinks;   
  20.   return new SpecificOrderIterator<T>(indexOrder, getObjects());   
  21. }    

getIndexList實(shí)現(xiàn)如下:

Java代碼

  1. protected List<Integer> getIndexList() {   
  2.   long now = System.currentTimeMillis();   
  3.   List<Integer> indexList = new ArrayList<Integer>();   
  4.   int i = 0;   
  5.   for (T obj : stateMap.keySet()) {   
  6.     if (!isShouldBackOff() || stateMap.get(obj).restoreTime < now) {   
  7.       indexList.add(i);   
  8.     }   
  9.     i++;   
  10.   }   
  11.   return indexList;   
  12. }   

isShouldBackOff()表示是否開(kāi)啟退避算法支持,如果不開(kāi)啟,則認(rèn)為每個(gè)Sink都是存活的,每次都會(huì)重試,通過(guò)agent.sinkgroups.g1.processor.backoff = true配置開(kāi)啟,默認(rèn)false;restoreTime和之前介紹的refresh一樣,是退避補(bǔ)償?shù)却龝r(shí)間,算法類似,就不多介紹了。

那么什么時(shí)候調(diào)用Sink進(jìn)行消費(fèi)呢?其類似于SourceRunner,Sink提供了SinkRunner進(jìn)行輪訓(xùn)拉取處理,SinkRunner會(huì)輪訓(xùn)調(diào)度SinkProcessor消費(fèi)Channel的消息,然后調(diào)用Sink進(jìn)行轉(zhuǎn)移。SinkProcessor之前介紹過(guò),其負(fù)責(zé)消息復(fù)制/路由。

SinkRunner實(shí)現(xiàn)如下:

Java代碼

  1. public void start() {   
  2.   SinkProcessor policy = getPolicy();   
  3.   policy.start();   
  4.   runner = new PollingRunner();   
  5.   runner.policy = policy;   
  6.   runner.counterGroup = counterGroup;   
  7.   runner.shouldStop = new AtomicBoolean();   
  8.   runnerThread = new Thread(runner);   
  9.   runnerThread.setName("SinkRunner-PollingRunner-" +   
  10.       policy.getClass().getSimpleName());   
  11.   runnerThread.start();   
  12.   lifecycleState = LifecycleState.START;   
  13. }    

即獲取SinkProcessor然后啟動(dòng)它,接著啟動(dòng)輪訓(xùn)線程去處理。PollingRunner線程負(fù)責(zé)輪訓(xùn)消息,核心實(shí)現(xiàn)如下:

Java代碼

  1. public void run() {   
  2.   while (!shouldStop.get()) { //如果沒(méi)有停止   
  3.     try {   
  4.       if (policy.process().equals(Sink.Status.BACKOFF)) {//如果處理失敗了,進(jìn)行退避補(bǔ)償處理   
  5.         counterGroup.incrementAndGet("runner.backoffs");   
  6.         Thread.sleep(Math.min(   
  7.             counterGroup.incrementAndGet("runner.backoffs.consecutive")   
  8.             * backoffSleepIncrement, maxBackoffSleep)); //暫停退避補(bǔ)償設(shè)定的超時(shí)時(shí)間   
  9.       } else {   
  10.         counterGroup.set("runner.backoffs.consecutive", 0L);   
  11.       }   
  12.     } catch (Exception e) {   
  13.       try {   
  14.         Thread.sleep(maxBackoffSleep); //如果遇到異常則等待***退避時(shí)間   
  15.       } catch (InterruptedException ex) {   
  16.         Thread.currentThread().interrupt();   
  17.       }   
  18.     }   
  19.   }   
  20. }    

整體實(shí)現(xiàn)類似于PollableSourceRunner實(shí)現(xiàn),整體處理都是交給SinkProcessor完成的。SinkProcessor會(huì)輪訓(xùn)Sink的process方法進(jìn)行處理;此處以LoggerSink為例:

Java代碼

  1. @Override   
  2. public Status process() throws EventDeliveryException {   
  3.   Status result = Status.READY;   
  4.   Channel channel = getChannel();   
  5.   //1、獲取事務(wù)   
  6.   Transaction transaction = channel.getTransaction();   
  7.   Event event = null;   
  8.    
  9.   try {   
  10.     //2、開(kāi)啟事務(wù)   
  11.     transaction.begin();   
  12.     //3、從Channel獲取Event   
  13.     event = channel.take();   
  14.     if (event != null) {   
  15.       if (logger.isInfoEnabled()) {   
  16.         logger.info("Event: " + EventHelper.dumpEvent(event, maxBytesToLog));   
  17.       }   
  18.     } else {//4、如果Channel中沒(méi)有Event,則默認(rèn)進(jìn)入故障補(bǔ)償機(jī)制,即防止死循環(huán)造成CPU負(fù)載高   
  19.       result = Status.BACKOFF;   
  20.     }   
  21.     //5、成功后提交事務(wù)   
  22.     transaction.commit();   
  23.   } catch (Exception ex) {   
  24.     //6、失敗后回滾事務(wù)   
  25.     transaction.rollback();   
  26.     throw new EventDeliveryException("Failed to log event: " + event, ex);   
  27.   } finally {   
  28.     //7、關(guān)閉事務(wù)   
  29.     transaction.close();   
  30.   }   
  31.   return result;   
  32. }    

Sink中一些實(shí)現(xiàn)是支持批處理的,比如RollingFileSink:

Java代碼

  1. //1、開(kāi)啟事務(wù)   
  2. //2、批處理   
  3. for (int i = 0; i < batchSize; i++) {   
  4.   event = channel.take();   
  5.   if (event != null) {   
  6.     sinkCounter.incrementEventDrainAttemptCount();   
  7.     eventAttemptCounter++;   
  8.     serializer.write(event);   
  9.   }   
  10. }   
  11. //3、提交/回滾事務(wù)、關(guān)閉事務(wù)   

定義一個(gè)批處理大小然后在事務(wù)中執(zhí)行批處理。

【本文是51CTO專欄作者張開(kāi)濤的原創(chuàng)文章,作者微信公眾號(hào):開(kāi)濤的博客,id:kaitao-1234567】

責(zé)任編輯:武曉燕 來(lái)源: 開(kāi)濤的博客
相關(guān)推薦

2016-11-29 09:38:06

Flume架構(gòu)核心組件

2016-11-25 13:14:50

Flume架構(gòu)源碼

2016-11-29 16:59:46

Flume架構(gòu)源碼

2022-06-07 10:33:29

Camera組件鴻蒙

2011-04-29 13:40:37

MongoDBCommand

2015-04-24 09:33:11

Cloud Found組件分析PaaS

2021-09-05 07:35:58

lifecycleAndroid組件原理

2009-12-31 15:55:06

ADO.NET結(jié)構(gòu)

2016-10-21 13:03:18

androidhandlerlooper

2021-09-08 10:47:33

Flink執(zhí)行流程

2019-10-08 10:01:22

Kafka應(yīng)用場(chǎng)景架構(gòu)

2015-08-11 15:52:52

大數(shù)據(jù)數(shù)據(jù)分析

2022-07-17 06:51:22

Vite 3.0前端

2022-01-05 08:53:13

Spring原理分析MVC

2025-01-13 00:13:59

VSCode架構(gòu)依賴注入

2011-03-15 11:33:18

iptables

2017-05-04 22:30:17

Zuul過(guò)濾器微服務(wù)

2009-12-11 09:42:54

Linux內(nèi)核源碼進(jìn)程調(diào)度

2011-05-26 10:05:48

MongoDB

2014-08-26 11:11:57

AsyncHttpCl源碼分析
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)