使用Flume 部署和管理可擴(kuò)展的Web 服務(wù)
機(jī)器生成的日志數(shù)據(jù)對于查找各種硬件和軟件故障的根源至關(guān)重要。來自該日志數(shù)據(jù)的信息可提供改進(jìn)系統(tǒng)架構(gòu)、減緩系統(tǒng)退化和改善正常運(yùn)行時(shí)間方面的反饋。最近,一些企業(yè)開始使用這些日志數(shù)據(jù)獲取業(yè)務(wù)洞察。在使用一個(gè)容錯(cuò)的架構(gòu)時(shí),F(xiàn)lume 是一個(gè)擁有高效收集、聚合和轉(zhuǎn)移大量日志數(shù)據(jù)的分布式服務(wù)。本文將介紹如何部署 Flume,以及如何將它與 Hadoop 集群和簡單的分布式 Web 服務(wù)結(jié)合使用。
Flume 架構(gòu)
Flume 是一項(xiàng)分布式、可靠的、容易使用的服務(wù),用于收集、聚合從許多來源傳來的大量流事件數(shù)據(jù)并將它們轉(zhuǎn)移到一個(gè)中央數(shù)據(jù)存儲中。
圖 1. Flume 架構(gòu)
Flume 事件可定義為一個(gè)擁有工作負(fù)載(字節(jié))和一個(gè)可選的字符串屬性集的數(shù)據(jù)流單元。Flume 代理是一個(gè)托管組件的 JVM 進(jìn)程,事件通過該進(jìn)程從外部來源流到下一個(gè)目標(biāo)(躍點(diǎn))。
InfoSphere® BigInsights™ 支持以較低的延遲持續(xù)分析和存儲流數(shù)據(jù)。InfoSphere Streams 可用于配置上述代理和收集器進(jìn)程(參見 參考資料)。Flume 也可用于在一個(gè)遠(yuǎn)程位置收集數(shù)據(jù),而且可在 InfoSphere BigInsights 服務(wù)器上配置一個(gè)收集器,將數(shù)據(jù)存儲在分布式文件系統(tǒng) (DFS) 上。但是,在本文中,我們會同時(shí)將 Flume 用作代理和收集器進(jìn)程,并使用一個(gè) Hadoop 分布式文件系統(tǒng) (HDFS) 集群作為存儲。
數(shù)據(jù)流模型
一個(gè) Flume 代理有三個(gè)主要組成部分:來源、通道和接收器 (sink)。來源 使用了外部來源(比如 Web 服務(wù))傳送給它的事件。外部來源以一種可識別的格式將事件發(fā)送給 Flume。當(dāng) Flume 來源收到事件后,它會將這些事件存儲在一個(gè)或多個(gè)通道 中。通道是一種被動存儲,它將事件保留到被 Flume 接收器 使用為止。例如,一個(gè)文件通道使用了本地文件系統(tǒng);接收器從通道提取事件,并將它放在一個(gè)外部存儲庫(比如 HDFS)中,或者將它轉(zhuǎn)發(fā)到流中下一個(gè) Flume 代理(下一個(gè)躍點(diǎn))的 Flume 來源;給定代理中的來源和接收器與暫存在通道中的事件同步運(yùn)行。
來源可針對不同的用途而使用不同的格式。例如,Avro Flume 來源可用于從 Avro 客戶端接收 Avro 事件。Avro 來源形成了一半的 Flume 的分層集合支持。在內(nèi)部,這個(gè)來源使用了 Avro 的 NettyTransceiver 監(jiān)聽和處理事件。它可與內(nèi)置 AvroSink 配套使用,共同創(chuàng)建分層集合拓?fù)浣Y(jié)構(gòu)。Flume 使用的其他流行的網(wǎng)絡(luò)流包括 Thrift、Syslog 和 Netcat。
Avro
Apache 的 Avro 是一種數(shù)字序列化格式。它是一個(gè)基于 RPC 的框架,被 Apache 項(xiàng)目(比如 Flume 和 Hadoop)廣泛用于數(shù)據(jù)存儲和通信。Avro 框架的用途是提供豐富的數(shù)據(jù)結(jié)構(gòu)、一種緊湊而又快速的二進(jìn)制數(shù)據(jù)格式,以及與動態(tài)語言(比如 C++、Java™、Perl 和 Python)的簡單集成。Avro 使用 JSON 作為其接口描述語言 (Interface Description Language, IDL),以指定數(shù)據(jù)類型和協(xié)議。
Avro 依賴于一種與數(shù)據(jù)存儲在一起的模式。因?yàn)闆]有每個(gè)值的開銷,這實(shí)現(xiàn)了輕松而又快速的序列化。在遠(yuǎn)程過程調(diào)用 (RPC) 期間,該模式會在客戶端-服務(wù)器握手期間交換。使用 Avro,字段之間的通信很容易得到解決,因?yàn)樗褂昧?JSON。
可靠性、可恢復(fù)性和多躍點(diǎn)流
Flume 使用一種事務(wù)型設(shè)計(jì)來確保事件交付的可靠性。事務(wù)型設(shè)計(jì)相當(dāng)于將每個(gè)事件當(dāng)作一個(gè)事務(wù)來對待,事件暫存在每個(gè)代理上的一個(gè)通道中。每個(gè)事件傳送到流中的下一個(gè)代理(比如來源欄)或終端存儲庫(比如 HDFS)。事件被存儲在下一個(gè)代理的通道中或終端存儲庫中后,就會從上一個(gè)通道中刪除,以便在收到存儲確認(rèn)之前維護(hù)一個(gè)最新事件隊(duì)列。這個(gè)過程通過來源和接收器完成,它們將存儲或檢索信息封裝在通道提供的一個(gè)事務(wù)中。這可以確保為 Flume 中的單躍點(diǎn)消息傳送語義提供了端到端的流可靠性。
可恢復(fù)性通過通道中的暫存事件來維護(hù),用于管理故障恢復(fù)。 Flume 支持一種受本地文件系統(tǒng)支持的持久性的文件通道(基本上用于在永久存儲上維護(hù)狀態(tài))。如果使用一個(gè)持久性的文件通道,任何丟失的事件(在發(fā)生崩潰或系統(tǒng)故障時(shí))都可以恢復(fù)。還有一個(gè)內(nèi)存通道將事件存儲在內(nèi)存中的一個(gè)隊(duì)列中,這么做更快,但在事件進(jìn)程結(jié)束時(shí),仍留在內(nèi)存通道內(nèi)的所有事件都無法恢復(fù)。
Flume 還允許用戶構(gòu)建多躍點(diǎn)流,事件會經(jīng)歷多個(gè)代理,然后才會到達(dá)最終的目標(biāo)。對于多躍點(diǎn)流,來自上一個(gè)躍點(diǎn)的接收器和來自下一個(gè)躍點(diǎn)的來源都會運(yùn)行自己的事務(wù)進(jìn)程,以確保數(shù)據(jù)安全地存儲在下一個(gè)躍點(diǎn)的通道中。
圖 2. 多躍點(diǎn)流
#p#
系統(tǒng)架構(gòu)
本節(jié)將討論如何使用 Flume 設(shè)置一個(gè)可擴(kuò)展的 Web 服務(wù)。出于此目的,我們需要使用代碼來讀取 RSS 提要。我們還需要配置 Flume 代理和收集器來接收 RSS 數(shù)據(jù),并將它們存儲在 HDFS 中。
Flume 代理配置存儲在一個(gè)本地配置文件中。這類似于一個(gè) Java 屬性文件,并且被存儲為一個(gè)文本文件。可在同一個(gè)配置文件中指定一個(gè)或多個(gè)代理的配置。配置文件包含一個(gè)代理中每個(gè)來源、接收器和通道的屬性,以及它們?nèi)绾芜B接在一起來形成數(shù)據(jù)流。
Avro 來源需要一個(gè)主機(jī)名(IP 地址)和端口號來接收數(shù)據(jù)。內(nèi)存通道可能擁有最大隊(duì)列大小(容量)限制,HDFS 接收器需要知道文件系統(tǒng) URI 和路徑才能創(chuàng)建文件。Avro 接收器可以是一個(gè)轉(zhuǎn)發(fā)接收器 (avro-forward-sink),它可以轉(zhuǎn)發(fā)到下一個(gè) Flume 代理。
我們的想法是創(chuàng)建一個(gè)微型的 Flume 分布式提要(日志事件)收集系統(tǒng)。我們將使用代理作為節(jié)點(diǎn),它們從一個(gè) RSS 提要閱讀器獲取數(shù)據(jù)(在本例中為 RSS 體驗(yàn))。這些代理將這些提要傳遞到一個(gè)收集器節(jié)點(diǎn),后者負(fù)責(zé)將這些提要存儲到一個(gè) HDFS 集群中。在本例中,我們將使用兩個(gè) Flume 代理節(jié)點(diǎn),一個(gè) Flume 收集器節(jié)點(diǎn)和一個(gè)包含三個(gè)節(jié)點(diǎn)的 HDFS 集群。表 1 描述了代理和收集器節(jié)點(diǎn)的來源和接收器。
表 1. 代理和收集器節(jié)點(diǎn)的來源和接收器
圖 3 給出了我們的多躍點(diǎn)系統(tǒng)的架構(gòu)概述,該系統(tǒng)包含兩個(gè)代理節(jié)點(diǎn)、一個(gè)收集器節(jié)點(diǎn)和一個(gè) HDFS 集群。RSS Web 提要(參見下面的代碼)是兩個(gè)代理的 Avro 來源,它將提要存儲在一個(gè)內(nèi)存通道中。當(dāng)提要在兩個(gè)代理的內(nèi)存通道中積累時(shí),Avro 接收器開始將這些事件發(fā)送到收集器節(jié)點(diǎn)的 Avro 來源。收集器還使用一個(gè)內(nèi)存通道和一個(gè) HDFS 接收器將這些提要轉(zhuǎn)儲到 HDFS 集群中。參見下圖,了解代理和收集器配置。
圖 3. 多躍點(diǎn)系統(tǒng)的架構(gòu)概述
讓我們來看一下如何使用 Flume 啟動一個(gè)簡單的新聞閱讀器服務(wù)。以下 Java 代碼描述了一個(gè)從 BBC 讀取 RSS Web 來源的 RSS 閱讀器。您可能已經(jīng)知道,RSS 是一個(gè) Web 提要格式系列,用于以一種標(biāo)準(zhǔn)化格式發(fā)布頻繁更新的網(wǎng)站,比如博客文章、新聞提要、音頻和視頻 。RSS 使用一種發(fā)布-訂閱模型來定期檢查訂閱的提要中的更新。
#p#
下面的 Java 代碼使用 Java 的 Net 和 Javax XML API 讀取 W3C 文檔中一個(gè) URL 來源的內(nèi)容,處理該信息,然后將該信息寫入到 Flume 通道中。
清單 1. Java 代碼 (RSSReader.java)
- import java.net.URL;
- import javax.xml.parsers.DocumentBuilder;
- import javax.xml.parsers.DocumentBuilderFactory;
- import org.w3c.dom.CharacterData;
- import org.w3c.dom.Document;
- import org.w3c.dom.Element;
- import org.w3c.dom.Node;
- import org.w3c.dom.NodeList;
- public class RSSReader {
- private static RSSReader instance = null;
- private RSSReader() {
- }
- public static RSSReader getInstance() {
- if(instance == null) {
- instance = new RSSReader();
- }
- return instance;
- }
- public void writeNews() {
- try {
- DocumentBuilder builder = DocumentBuilderFactory.newInstance().
- newDocumentBuilder();
- URL u = new URL("http://feeds.bbci.co.uk/news/world/rss.xml
- ?edition=uk#");
- Document doc = builder.parse(u.openStream());
- NodeList nodes = doc.getElementsByTagName("item");
- for(int i=0;i
- Element element = (Element)nodes.item(i);
- System.out.println("Title: " + getElementValue(element,"title"));
- System.out.println("Link: " + getElementValue(element,"link"));
- System.out.println("Publish Date: " + getElementValue(element,"pubDate"));
- System.out.println("author: " + getElementValue(element,"dc:creator"));
- System.out.println("comments: " + getElementValue(element,"wfw:comment"));
- System.out.println("description: " + getElementValue(element,"description"));
- System.out.println();
- }
- } catch(Exception ex) {
- ex.printStackTrace();
- }
- }
- private String getCharacterDataFromElement(Element e) {
- try {
- Node child = e.getFirstChild();
- if(child instanceof CharacterData) {
- CharacterData cd = (CharacterData) child;
- return cd.getData();
- }
- } catch(Exception ex) {
- }
- return "";
- }
- protected float getFloat(String value) {
- if(value != null && !value.equals("")) {
- return Float.parseFloat(value);
- }
- return 0;
- }
- protected String getElementValue(Element parent,String label) {
- return getCharacterDataFromElement((Element)parent.getElements
- ByTagName(label).item(0));
- }
- public static void main(String[] args) {
- RSSReader reader = RSSReader.getInstance();
- reader.writeNews();
- }
- }
下面的代碼清單給出了兩個(gè)代理(10.0.0.1 和 10.0.0.2)和一個(gè)收集器 (10.0.0.3) 的樣例配置文件。這些配置文件定義了來源、通道和接收器的語義。對于每種來源類型,我們還需要定義類型、命令、標(biāo)準(zhǔn)錯(cuò)誤行為和故障選項(xiàng)。對于每個(gè)通道,我們需要定義通道類型。還必須定義容量(通道中存儲的最大事件數(shù))和事務(wù)容量(對于每個(gè)事務(wù),通道將從一個(gè)來源獲取或提供給一個(gè)接收器的最大事件數(shù))。類似地,對于每種接收器類型,我們需要定義類型、主機(jī)名(事件接收者的 IP 地址)和端口。對于 HDFS 接收器,我們提供了到達(dá) HDFS 標(biāo)頭名稱節(jié)點(diǎn)的目錄路徑。
清單 2 顯示了示例配置文件 10.0.0.1.
清單 2. 代理 1 配置(10.0.0.1 上的 flume-conf.properties)
- # The configuration file needs to define the sources,
- # the channels and the sinks.
- # Sources, channels and sinks are defined per agent,
- # in this case called 'agent'
- agent.sources = reader
- agent.channels = memoryChannel
- agent.sinks = avro-forward-sink
- # For each one of the sources, the type is defined
- agent.sources.reader.type = exec
- agent.sources.reader.command = tail -f /var/log/flume-ng/source.txt
- # stderr is simply discarded, unless logStdErr=true
- # If the process exits for any reason, the source also exits and will produce no
- # further data.
- agent.sources.reader.logStdErr = true
- agent.sources.reader.restart = true
- # The channel can be defined as follows.
- agent.sources.reader.channels = memoryChannel
- # Each sink's type must be defined
- agent.sinks.avro-forward-sink.type = avro
- agent.sinks.avro-forward-sink.hostname = 10.0.0.3
- agent.sinks.avro-forward-sink.port = 60000
- #Specify the channel the sink should use
- agent.sinks.avro-forward-sink.channel = memoryChannel
- # Each channel's type is defined.
- agent.channels.memoryChannel.type = memory
- # Other config values specific to each type of channel(sink or source)
- # can be defined as well
- # In this case, it specifies the capacity of the memory channel
- agent.channels.memoryChannel.capacity = 10000
- agent.channels.memoryChannel.transactionCapacity = 100
清單 3 顯示了示例配置文件 10.0.0.2。
清單 3. 代理 2 配置(10.0.0.2 上的 flume-conf.properties)
- agent.sources = reader
- agent.channels = memoryChannel
- agent.sinks = avro-forward-sink
- # For each one of the sources, the type is defined
- agent.sources.reader.type = exec
- agent.sources.reader.command = tail -f /var/log/flume-ng/source.txt
- # stderr is simply discarded, unless logStdErr=true
- # If the process exits for any reason, the source also exits and will produce
- # no further data.
- agent.sources.reader.logStdErr = true
- agent.sources.reader.restart = true
- # The channel can be defined as follows.
- agent.sources.reader.channels = memoryChannel
- # Each sink's type must be defined
- agent.sinks.avro-forward-sink.type = avro
- agent.sinks.avro-forward-sink.hostname = 10.0.0.3
- agent.sinks.avro-forward-sink.port = 60000
- #Specify the channel the sink should use
- agent.sinks.avro-forward-sink.channel = memoryChannel
- # Each channel's type is defined.
- agent.channels.memoryChannel.type = memory
- # Other config values specific to each type of channel(sink or source)
- # can be defined as well
- # In this case, it specifies the capacity of the memory channel
- agent.channels.memoryChannel.capacity = 10000
- agent.channels.memoryChannel.transactionCapacity = 100
清單 4 顯示了收集器配置文件 10.0.0.3。
清單 4. 收集器配置(10.0.0.3 上的 flume-conf.properties)
- Collector configuration (flume-conf.properties on 10.0.0.3):
- # The configuration file needs to define the sources,
- # the channels and the sinks.
- # Sources, channels and sinks are defined per agent,
- # in this case called 'agent'
- agent.sources = avro-collection-source
- agent.channels = memoryChannel
- agent.sinks = hdfs-sink
- # For each one of the sources, the type is defined
- agent.sources.avro-collection-source.type = avro
- agent.sources.avro-collection-source.bind = 10.0.0.3
- agent.sources.avro-collection-source.port = 60000
- # The channel can be defined as follows.
- agent.sources.avro-collection-source.channels = memoryChannel
- # Each sink's type must be defined
- agent.sinks.hdfs-sink.type = hdfs
- agent.sinks.hdfs-sink.hdfs.path = hdfs://10.0.10.1:8020/flume
- #Specify the channel the sink should use
- agent.sinks.hdfs-sink.channel = memoryChannel
- # Each channel's type is defined.
- agent.channels.memoryChannel.type = memory
- # Other config values specific to each type of channel(sink or source)
- # can be defined as well
- # In this case, it specifies the capacity of the memory channel
- agent.channels.memoryChannel.capacity = 10000
#p#
后續(xù)步驟
現(xiàn)在我們已擁有讀取 RSS 提要的代碼,并知道如何配置 Flume 代理和收集器,我們可通過三個(gè)步驟設(shè)置整個(gè)系統(tǒng)。
步驟 1
編譯的 Java 代碼應(yīng)作為一個(gè)后臺進(jìn)程執(zhí)行,以保持運(yùn)行。
清單 5. 編譯的 Java 代碼
- $ javac RSSReader.java
- $ java -cp /root/RSSReader RSSReader > /var/log/flume-ng/source.txt &
步驟 2
想啟動代理之前,您需要使用 $FLUME_HOME/conf/ 目錄下提供的模板來修改配置文件。在修改配置文件后,可使用以下命令啟動代理。
清單 6 顯示了啟動節(jié)點(diǎn) 1 上的代理的命令。
清單 6. 啟動節(jié)點(diǎn) 1 上的代理
- Agent node 1 (on 10.0.0.1):
- $ $FLUME_HOME/bin/flume-ng agent -n agent1 -c conf -f
- $FLUME_HOME/conf/flume-conf.properties
清單 7 顯示了啟動節(jié)點(diǎn) 2 上的代理的命令。
清單 7. 啟動節(jié)點(diǎn) 2 上的代理
- Agent node 2 (on 10.0.0.2):
- $ $FLUME_HOME/bin/flume-ng agent -n agent2 -c conf -f
- $FLUME_HOME/conf/flume-conf.properties
在這里,$FLUME_HOME 被定義為一個(gè)環(huán)境變量(bash 或 .bashrc),它指向 Flume 的主目錄(例如 /home/user/flume-1.4/)。
步驟 3
清單 8 啟動收集器。值得注意的是,配置文件負(fù)責(zé)節(jié)點(diǎn)的行為方式,比如它是代理還是收集器。
清單 8. 收集器節(jié)點(diǎn)(10.0.0.3 上)
- $ $FLUME_HOME/bin/flume-ng agent -n collector -c conf -f
- $FLUME_HOME/conf/flume-conf.properties
結(jié)束語
在本文中,我們介紹了 Flume,一個(gè)用于高效收集大量日志數(shù)據(jù)的、分布式的、可靠的服務(wù)。我們介紹了如何根據(jù)需要使用 Flume 來部署單躍點(diǎn)和多躍點(diǎn)流。我們還介紹了一個(gè)部署多躍點(diǎn)新聞聚合器 Web 服務(wù)的詳細(xì)示例。在該示例中,我們使用了 Avro 代理讀取 RSS 提要,并使用一個(gè) HDFS 收集器存儲新聞提要。Flume 可用于構(gòu)建可擴(kuò)展的分布式系統(tǒng)來收集大量數(shù)據(jù)流。