Flume架構(gòu)與源碼分析-核心組件分析-1
首先所有核心組件都會(huì)實(shí)現(xiàn)org.apache.flume.lifecycle.LifecycleAware接口:
Java代碼
- public interface LifecycleAware {
- public void start();
- public void stop();
- public LifecycleState getLifecycleState();
- }
start方法在整個(gè)Flume啟動(dòng)時(shí)或者初始化組件時(shí)都會(huì)調(diào)用start方法進(jìn)行組件初始化,F(xiàn)lume組件出現(xiàn)異常停止時(shí)會(huì)調(diào)用stop,getLifecycleState返回組件的生命周期狀態(tài),有IDLE, START, STOP, ERROR四個(gè)狀態(tài)。
如果開(kāi)發(fā)的組件需要配置,如設(shè)置一些屬性;可以實(shí)現(xiàn)org.apache.flume.conf.Configurable接口:
Java代碼
- public interface Configurable {
- public void configure(Context context);
- }
Flume在啟動(dòng)組件之前會(huì)調(diào)用configure來(lái)初始化組件一些配置。
1、Source
Source用于采集日志數(shù)據(jù),有兩種實(shí)現(xiàn)方式:輪訓(xùn)拉取和事件驅(qū)動(dòng)機(jī)制;Source接口如下:
Java代碼
- public interface Source extends LifecycleAware, NamedComponent {
- public void setChannelProcessor(ChannelProcessor channelProcessor);
- public ChannelProcessor getChannelProcessor();
- }
Source接口首先繼承了LifecycleAware接口,然后只提供了ChannelProcessor的setter和getter接口,也就是說(shuō)它的的所有邏輯的實(shí)現(xiàn)應(yīng)該在LifecycleAware接口的start和stop中實(shí)現(xiàn);ChannelProcessor之前介紹過(guò)用來(lái)進(jìn)行日志流的過(guò)濾和Channel的選擇及調(diào)度。
而Source是通過(guò)SourceFactory工廠創(chuàng)建,默認(rèn)提供了DefaultSourceFactory,其首先通過(guò)Enum類型org.apache.flume.conf.source.SourceType查找默認(rèn)實(shí)現(xiàn),如exec,則找到org.apache.flume.source.ExecSource實(shí)現(xiàn),如果找不到直接Class.forName(className)創(chuàng)建。
Source提供了兩種機(jī)制: PollableSource(輪訓(xùn)拉取)和EventDrivenSource(事件驅(qū)動(dòng)):
PollableSource默認(rèn)提供了如下實(shí)現(xiàn):
比如JMSSource實(shí)現(xiàn)使用javax.jms.MessageConsumer.receive(pollTimeout)主動(dòng)去拉取消息。
EventDrivenSource默認(rèn)提供了如下實(shí)現(xiàn):
比如NetcatSource、HttpSource就是事件驅(qū)動(dòng),即被動(dòng)等待;比如HttpSource就是內(nèi)部啟動(dòng)了一個(gè)內(nèi)嵌的Jetty啟動(dòng)了一個(gè)Servlet容器,通過(guò)FlumeHTTPServlet去接收消息。
Flume提供了SourceRunner用來(lái)啟動(dòng)Source的流轉(zhuǎn):
Java代碼
- public class EventDrivenSourceRunner extends SourceRunner {
- private LifecycleState lifecycleState;
- public EventDrivenSourceRunner() {
- lifecycleState = LifecycleState.IDLE; //啟動(dòng)之前是空閑狀態(tài)
- }
- @Override
- public void start() {
- Source source = getSource(); //獲取Source
- ChannelProcessor cp = source.getChannelProcessor(); //Channel處理器
- cp.initialize(); //初始化Channel處理器
- source.start(); //啟動(dòng)Source
- lifecycleState = LifecycleState.START; //本組件狀態(tài)改成啟動(dòng)狀態(tài)
- }
- @Override
- public void stop() {
- Source source = getSource(); //先停Source
- source.stop();
- ChannelProcessor cp = source.getChannelProcessor();
- cp.close();//再停Channel處理器
- lifecycleState = LifecycleState.STOP; //本組件狀態(tài)改成停止?fàn)顟B(tài)
- }
- }
從本組件也可以看出:1、首先要初始化ChannelProcessor,其實(shí)現(xiàn)時(shí)初始化過(guò)濾器鏈;2、接著啟動(dòng)Source并更改本組件的狀態(tài)。
Java代碼
- public class PollableSourceRunner extends SourceRunner {
- @Override
- public void start() {
- PollableSource source = (PollableSource) getSource();
- ChannelProcessor cp = source.getChannelProcessor();
- cp.initialize();
- source.start();
- runner = new PollingRunner();
- runner.source = source;
- runner.counterGroup = counterGroup;
- runner.shouldStop = shouldStop;
- runnerThread = new Thread(runner);
- runnerThread.setName(getClass().getSimpleName() + "-" +
- source.getClass().getSimpleName() + "-" + source.getName());
- runnerThread.start();
- lifecycleState = LifecycleState.START;
- }
- }
而PollingRunner首先初始化組件,但是又啟動(dòng)了一個(gè)線程PollingRunner,其作用就是輪訓(xùn)拉取數(shù)據(jù):
Java代碼
- @Override
- public void run() {
- while (!shouldStop.get()) { //如果沒(méi)有停止,則一直在死循環(huán)運(yùn)行
- counterGroup.incrementAndGet("runner.polls");
- try {
- //調(diào)用PollableSource的process方法進(jìn)行輪訓(xùn)拉取,然后判斷是否遇到了失敗補(bǔ)償
- if (source.process().equals(PollableSource.Status.BACKOFF)) {/
- counterGroup.incrementAndGet("runner.backoffs");
- //失敗補(bǔ)償時(shí)暫停線程處理,等待超時(shí)時(shí)間之后重試
- Thread.sleep(Math.min(
- counterGroup.incrementAndGet("runner.backoffs.consecutive")
- * source.getBackOffSleepIncrement(), source.getMaxBackOffSleepInterval()));
- } else {
- counterGroup.set("runner.backoffs.consecutive", 0L);
- }
- } catch (InterruptedException e) {
- }
- }
- }
- }
- }
Flume在啟動(dòng)時(shí)會(huì)判斷Source是PollableSource還是EventDrivenSource來(lái)選擇使用PollableSourceRunner還是EventDrivenSourceRunner。
比如HttpSource實(shí)現(xiàn),其通過(guò)FlumeHTTPServlet接收消息然后:
Java代碼
- List<Event> events = Collections.emptyList(); //create empty list
- //首先從請(qǐng)求中獲取Event
- events = handler.getEvents(request);
- //然后交給ChannelProcessor進(jìn)行處理
- getChannelProcessor().processEventBatch(events);
到此基本的Source流程就介紹完了,其作用就是監(jiān)聽(tīng)日志,采集,然后交給ChannelProcessor進(jìn)行處理。
2、Channel
Channel用于連接Source和Sink,Source生產(chǎn)日志發(fā)送到Channel,Sink從Channel消費(fèi)日志;也就是說(shuō)通過(guò)Channel實(shí)現(xiàn)了Source和Sink的解耦,可以實(shí)現(xiàn)多對(duì)多的關(guān)聯(lián),和Source、Sink的異步化。
之前Source采集到日志后會(huì)交給ChannelProcessor處理,那么接下來(lái)我們先從ChannelProcessor入手,其依賴三個(gè)組件:
Java代碼
- private final ChannelSelector selector; //Channel選擇器
- private final InterceptorChain interceptorChain; //攔截器鏈
- private ExecutorService execService; //用于實(shí)現(xiàn)可選Channel的ExecutorService,默認(rèn)是單線程實(shí)現(xiàn)
接下來(lái)看下其是如何處理Event的:
Java代碼
- public void processEvent(Event event) {
- event = interceptorChain.intercept(event); //首先進(jìn)行攔截器鏈過(guò)濾
- if (event == null) {
- return;
- }
- List<Event> events = new ArrayList<Event>(1);
- events.add(event);
- //通過(guò)Channel選擇器獲取必須成功處理的Channel,然后事務(wù)中執(zhí)行
- List<Channel> requiredChannels = selector.getRequiredChannels(event);
- for (Channel reqChannel : requiredChannels) {
- executeChannelTransaction(reqChannel, events, false);
- }
- //通過(guò)Channel選擇器獲取可選的Channel,這些Channel失敗是可以忽略,不影響其他Channel的處理
- List<Channel> optionalChannels = selector.getOptionalChannels(event);
- for (Channel optChannel : optionalChannels) {
- execService.submit(new OptionalChannelTransactionRunnable(optChannel, events));
- }
- }
另外內(nèi)部還提供了批處理實(shí)現(xiàn)方法processEventBatch;對(duì)于內(nèi)部事務(wù)實(shí)現(xiàn)的話可以參考executeChannelTransaction方法,整體事務(wù)機(jī)制類似于JDBC:
Java代碼
- private static void executeChannelTransaction(Channel channel, List<Event> batch, boolean isOptional) {
- //1、獲取Channel上的事務(wù)
- Transaction tx = channel.getTransaction();
- Preconditions.checkNotNull(tx, "Transaction object must not be null");
- try {
- //2、開(kāi)啟事務(wù)
- tx.begin();
- //3、在Channel上執(zhí)行批量put操作
- for (Event event : batch) {
- channel.put(event);
- }
- //4、成功后提交事務(wù)
- tx.commit();
- } catch (Throwable t) {
- //5、異常后回滾事務(wù)
- tx.rollback();
- if (t instanceof Error) {
- LOG.error("Error while writing to channel: " +
- channel, t);
- throw (Error) t;
- } else if(!isOptional) {//如果是可選的Channel,異常忽略
- throw new ChannelException("Unable to put batch on required " +
- "channel: " + channel, t);
- }
- } finally {
- //***關(guān)閉事務(wù)
- tx.close();
- }
- }
Interceptor用于過(guò)濾Event,即傳入一個(gè)Event然后進(jìn)行過(guò)濾加工,然后返回一個(gè)新的Event,接口如下:
Java代碼
- public interface Interceptor {
- public void initialize();
- public Event intercept(Event event);
- public List<Event> intercept(List<Event> events);
- public void close();
- }
可以看到其提供了initialize和close方法用于啟動(dòng)和關(guān)閉;intercept方法用于過(guò)濾或加工Event。比如HostInterceptor攔截器用于獲取本機(jī)IP然后默認(rèn)添加到Event的字段為host的Header中。
接下來(lái)就是ChannelSelector選擇器了,其通過(guò)如下方式創(chuàng)建:
Java代碼
- //獲取ChannelSelector配置,比如agent.sources.s1.selector.type = replicating
- ChannelSelectorConfiguration selectorConfig = config.getSelectorConfiguration();
- //使用Source關(guān)聯(lián)的Channel創(chuàng)建,比如agent.sources.s1.channels = c1 c2
- ChannelSelector selector = ChannelSelectorFactory.create(sourceChannels, selectorConfig);
ChannelSelector默認(rèn)提供了兩種實(shí)現(xiàn):復(fù)制和多路復(fù)用:
默認(rèn)實(shí)現(xiàn)是復(fù)制選擇器ReplicatingChannelSelector,即把接收到的消息復(fù)制到每一個(gè)Channel;多路復(fù)用選擇器MultiplexingChannelSelector會(huì)根據(jù)Event Header中的參數(shù)進(jìn)行選擇,以此來(lái)選擇使用哪個(gè)Channel。
而Channel是Event中轉(zhuǎn)的地方,Source發(fā)布Event到Channel,Sink消費(fèi)Channel的Event;Channel接口提供了如下接口用來(lái)實(shí)現(xiàn)Event流轉(zhuǎn):
Java代碼
- public interface Channel extends LifecycleAware, NamedComponent {
- public void put(Event event) throws ChannelException;
- public Event take() throws ChannelException;
- public Transaction getTransaction();
- }
put用于發(fā)布Event,take用于消費(fèi)Event,getTransaction用于事務(wù)支持。默認(rèn)提供了如下Channel的實(shí)現(xiàn):
對(duì)于Channel的實(shí)現(xiàn)我們后續(xù)單獨(dú)章節(jié)介紹。
3、Sink
Sink從Channel消費(fèi)Event,然后進(jìn)行轉(zhuǎn)移到收集/聚合層或存儲(chǔ)層。Sink接口如下所示:
Java代碼
- public interface Sink extends LifecycleAware, NamedComponent {
- public void setChannel(Channel channel);
- public Channel getChannel();
- public Status process() throws EventDeliveryException;
- public static enum Status {
- READY, BACKOFF
- }
- }
類似于Source,其首先繼承了LifecycleAware,然后提供了Channel的getter/setter方法,并提供了process方法進(jìn)行消費(fèi),此方法會(huì)返回消費(fèi)的狀態(tài),READY或BACKOFF。
Sink也是通過(guò)SinkFactory工廠來(lái)創(chuàng)建,其也提供了DefaultSinkFactory默認(rèn)工廠,比如傳入hdfs,會(huì)先查找Enum org.apache.flume.conf.sink.SinkType,然后找到相應(yīng)的默認(rèn)處理類org.apache.flume.sink.hdfs.HDFSEventSink,如果沒(méi)找到默認(rèn)處理類,直接通過(guò)Class.forName(className)進(jìn)行反射創(chuàng)建。
我們知道Sink還提供了分組功能,用于把多個(gè)Sink聚合為一組進(jìn)行使用,內(nèi)部提供了SinkGroup用來(lái)完成這個(gè)事情。此時(shí)問(wèn)題來(lái)了,如何去調(diào)度多個(gè)Sink,其內(nèi)部使用了SinkProcessor來(lái)完成這個(gè)事情,默認(rèn)提供了故障轉(zhuǎn)移和負(fù)載均衡兩個(gè)策略。
首先SinkGroup就是聚合多個(gè)Sink為一組,然后將多個(gè)Sink傳給SinkProcessorFactory進(jìn)行創(chuàng)建SinkProcessor,而策略是根據(jù)配置文件中配置的如agent.sinkgroups.g1.processor.type = load_balance來(lái)選擇的。
SinkProcessor提供了如下實(shí)現(xiàn):
DefaultSinkProcessor:默認(rèn)實(shí)現(xiàn),用于單個(gè)Sink的場(chǎng)景使用。
FailoverSinkProcessor:故障轉(zhuǎn)移實(shí)現(xiàn):
Java代碼
- public Status process() throws EventDeliveryException {
- Long now = System.currentTimeMillis();
- //1、首先檢查失敗隊(duì)列的頭部的Sink是否已經(jīng)過(guò)了失敗補(bǔ)償?shù)却龝r(shí)間了
- while(!failedSinks.isEmpty() && failedSinks.peek().getRefresh() < now) {
- //2、如果可以使用了,則從失敗Sink隊(duì)列獲取隊(duì)列***個(gè)Sink
- FailedSink cur = failedSinks.poll();
- Status s;
- try {
- s = cur.getSink().process(); //3、使用此Sink進(jìn)行處理
- if (s == Status.READY) { //4、如果處理成功
- liveSinks.put(cur.getPriority(), cur.getSink()); //4.1、放回存活Sink隊(duì)列
- activeSink = liveSinks.get(liveSinks.lastKey());
- } else {
- failedSinks.add(cur); //4.2、如果此時(shí)不是READY,即BACKOFF期間,再次放回失敗隊(duì)列
- }
- return s;
- } catch (Exception e) {
- cur.incFails(); //5、如果遇到異常了,則增加失敗次數(shù),并放回失敗隊(duì)列
- failedSinks.add(cur);
- }
- }
- Status ret = null;
- while(activeSink != null) { //6、此時(shí)失敗隊(duì)列中沒(méi)有Sink能處理了,那么需要使用存活Sink隊(duì)列進(jìn)行處理
- try {
- ret = activeSink.process();
- return ret;
- } catch (Exception e) { //7、處理失敗進(jìn)行轉(zhuǎn)移到失敗隊(duì)列
- activeSink = moveActiveToDeadAndGetNext();
- }
- }
- throw new EventDeliveryException("All sinks failed to process, " +
- "nothing left to failover to");
- }
失敗隊(duì)列是一個(gè)優(yōu)先級(jí)隊(duì)列,使用refresh屬性排序,而refresh是通過(guò)如下機(jī)制計(jì)算的:
Java代碼
- refresh = System.currentTimeMillis()
- + Math.min(maxPenalty, (1 << sequentialFailures) * FAILURE_PENALTY);
其中maxPenalty是***等待時(shí)間,默認(rèn)30s,而(1 << sequentialFailures) * FAILURE_PENALTY)用于實(shí)現(xiàn)指數(shù)級(jí)等待時(shí)間遞增, FAILURE_PENALTY是1s。
LoadBalanceSinkProcessor:用于實(shí)現(xiàn)Sink的負(fù)載均衡,其通過(guò)SinkSelector進(jìn)行實(shí)現(xiàn),類似于ChannelSelector。LoadBalanceSinkProcessor在啟動(dòng)時(shí)會(huì)根據(jù)配置,如agent.sinkgroups.g1.processor.selector = random進(jìn)行選擇,默認(rèn)提供了兩種選擇器:
LoadBalanceSinkProcessor使用如下機(jī)制進(jìn)行負(fù)載均衡:
Java代碼
- public Status process() throws EventDeliveryException {
- Status status = null;
- //1、使用選擇器創(chuàng)建相應(yīng)的迭代器,也就是用來(lái)選擇Sink的迭代器
- Iterator<Sink> sinkIterator = selector.createSinkIterator();
- while (sinkIterator.hasNext()) {
- Sink sink = sinkIterator.next();
- try {
- //2、選擇器迭代Sink進(jìn)行處理,如果成功直接break掉這次處理,此次負(fù)載均衡就算完成了
- status = sink.process();
- break;
- } catch (Exception ex) {
- //3、失敗后會(huì)通知選擇器,采取相應(yīng)的失敗退避補(bǔ)償算法進(jìn)行處理
- selector.informSinkFailed(sink);
- LOGGER.warn("Sink failed to consume event. "
- + "Attempting next sink if available.", ex);
- }
- }
- if (status == null) {
- throw new EventDeliveryException("All configured sinks have failed");
- }
- return status;
- }
如上的核心就是怎么創(chuàng)建迭代器,如何進(jìn)行失敗退避補(bǔ)償處理,首先我們看下RoundRobinSinkSelector實(shí)現(xiàn),其內(nèi)部是通過(guò)通用的RoundRobinOrderSelector選擇器實(shí)現(xiàn):
Java代碼
- public Iterator<T> createIterator() {
- //1、獲取存活的Sink索引,
- List<Integer> activeIndices = getIndexList();
- int size = activeIndices.size();
- //2、如果上次記錄的下一個(gè)存活Sink的位置超過(guò)了size,那么從隊(duì)列頭重新開(kāi)始計(jì)數(shù)
- if (nextHead >= size) {
- nextHead = 0;
- }
- //3、獲取本次使用的起始位置
- int begin = nextHead++;
- if (nextHead == activeIndices.size()) {
- nextHead = 0;
- }
- //4、從該位置開(kāi)始迭代,其實(shí)現(xiàn)類似于環(huán)形隊(duì)列,比如整個(gè)隊(duì)列是5,起始位置是3,則按照 3、4、0、1、2的順序進(jìn)行輪訓(xùn),實(shí)現(xiàn)了輪訓(xùn)算法
- int[] indexOrder = new int[size];
- for (int i = 0; i < size; i++) {
- indexOrder[i] = activeIndices.get((begin + i) % size);
- }
- //indexOrder是迭代順序,getObjects返回相關(guān)的Sinks;
- return new SpecificOrderIterator<T>(indexOrder, getObjects());
- }
getIndexList實(shí)現(xiàn)如下:
Java代碼
- protected List<Integer> getIndexList() {
- long now = System.currentTimeMillis();
- List<Integer> indexList = new ArrayList<Integer>();
- int i = 0;
- for (T obj : stateMap.keySet()) {
- if (!isShouldBackOff() || stateMap.get(obj).restoreTime < now) {
- indexList.add(i);
- }
- i++;
- }
- return indexList;
- }
isShouldBackOff()表示是否開(kāi)啟退避算法支持,如果不開(kāi)啟,則認(rèn)為每個(gè)Sink都是存活的,每次都會(huì)重試,通過(guò)agent.sinkgroups.g1.processor.backoff = true配置開(kāi)啟,默認(rèn)false;restoreTime和之前介紹的refresh一樣,是退避補(bǔ)償?shù)却龝r(shí)間,算法類似,就不多介紹了。
那么什么時(shí)候調(diào)用Sink進(jìn)行消費(fèi)呢?其類似于SourceRunner,Sink提供了SinkRunner進(jìn)行輪訓(xùn)拉取處理,SinkRunner會(huì)輪訓(xùn)調(diào)度SinkProcessor消費(fèi)Channel的消息,然后調(diào)用Sink進(jìn)行轉(zhuǎn)移。SinkProcessor之前介紹過(guò),其負(fù)責(zé)消息復(fù)制/路由。
SinkRunner實(shí)現(xiàn)如下:
Java代碼
- public void start() {
- SinkProcessor policy = getPolicy();
- policy.start();
- runner = new PollingRunner();
- runner.policy = policy;
- runner.counterGroup = counterGroup;
- runner.shouldStop = new AtomicBoolean();
- runnerThread = new Thread(runner);
- runnerThread.setName("SinkRunner-PollingRunner-" +
- policy.getClass().getSimpleName());
- runnerThread.start();
- lifecycleState = LifecycleState.START;
- }
即獲取SinkProcessor然后啟動(dòng)它,接著啟動(dòng)輪訓(xùn)線程去處理。PollingRunner線程負(fù)責(zé)輪訓(xùn)消息,核心實(shí)現(xiàn)如下:
Java代碼
- public void run() {
- while (!shouldStop.get()) { //如果沒(méi)有停止
- try {
- if (policy.process().equals(Sink.Status.BACKOFF)) {//如果處理失敗了,進(jìn)行退避補(bǔ)償處理
- counterGroup.incrementAndGet("runner.backoffs");
- Thread.sleep(Math.min(
- counterGroup.incrementAndGet("runner.backoffs.consecutive")
- * backoffSleepIncrement, maxBackoffSleep)); //暫停退避補(bǔ)償設(shè)定的超時(shí)時(shí)間
- } else {
- counterGroup.set("runner.backoffs.consecutive", 0L);
- }
- } catch (Exception e) {
- try {
- Thread.sleep(maxBackoffSleep); //如果遇到異常則等待***退避時(shí)間
- } catch (InterruptedException ex) {
- Thread.currentThread().interrupt();
- }
- }
- }
- }
整體實(shí)現(xiàn)類似于PollableSourceRunner實(shí)現(xiàn),整體處理都是交給SinkProcessor完成的。SinkProcessor會(huì)輪訓(xùn)Sink的process方法進(jìn)行處理;此處以LoggerSink為例:
Java代碼
- @Override
- public Status process() throws EventDeliveryException {
- Status result = Status.READY;
- Channel channel = getChannel();
- //1、獲取事務(wù)
- Transaction transaction = channel.getTransaction();
- Event event = null;
- try {
- //2、開(kāi)啟事務(wù)
- transaction.begin();
- //3、從Channel獲取Event
- event = channel.take();
- if (event != null) {
- if (logger.isInfoEnabled()) {
- logger.info("Event: " + EventHelper.dumpEvent(event, maxBytesToLog));
- }
- } else {//4、如果Channel中沒(méi)有Event,則默認(rèn)進(jìn)入故障補(bǔ)償機(jī)制,即防止死循環(huán)造成CPU負(fù)載高
- result = Status.BACKOFF;
- }
- //5、成功后提交事務(wù)
- transaction.commit();
- } catch (Exception ex) {
- //6、失敗后回滾事務(wù)
- transaction.rollback();
- throw new EventDeliveryException("Failed to log event: " + event, ex);
- } finally {
- //7、關(guān)閉事務(wù)
- transaction.close();
- }
- return result;
- }
Sink中一些實(shí)現(xiàn)是支持批處理的,比如RollingFileSink:
Java代碼
- //1、開(kāi)啟事務(wù)
- //2、批處理
- for (int i = 0; i < batchSize; i++) {
- event = channel.take();
- if (event != null) {
- sinkCounter.incrementEventDrainAttemptCount();
- eventAttemptCounter++;
- serializer.write(event);
- }
- }
- //3、提交/回滾事務(wù)、關(guān)閉事務(wù)
定義一個(gè)批處理大小然后在事務(wù)中執(zhí)行批處理。
【本文是51CTO專欄作者張開(kāi)濤的原創(chuàng)文章,作者微信公眾號(hào):開(kāi)濤的博客,id:kaitao-1234567】