Flume-接入Hive數(shù)倉搭建流程
實(shí)時(shí)流接入數(shù)倉,基本在大公司都會(huì)有,在Flume1.8以后支持taildir source, 其有以下幾個(gè)特點(diǎn),而被廣泛使用:
- 使用正則表達(dá)式匹配目錄中的文件名
- 監(jiān)控的文件中,一旦有數(shù)據(jù)寫入,F(xiàn)lume就會(huì)將信息寫入到指定的Sink
- 高可靠,不會(huì)丟失數(shù)據(jù)
- 不會(huì)對(duì)跟蹤文件有任何處理,不會(huì)重命名也不會(huì)刪除
- 不支持Windows,不能讀二進(jìn)制文件。支持按行讀取文本文件
本文以開源Flume流為例,介紹流接入HDFS ,后面在其上面建立ods層外表。
1.1 taildir source配置
- a1.sources.r1.type = TAILDIR
- a1.sources.r1.positionFile = /opt/hoult/servers/conf/startlog_position.json
- a1.sources.r1.filegroups = f1
- a1.sources.r1.filegroups.f1 =/opt/hoult/servers/logs/start/.*log
1.2 hdfs sink 配置
- a1.sinks.k1.type = hdfs
- a1.sinks.k1.hdfs.path = /user/data/logs/start/logs/start/%Y-%m-%d/
- a1.sinks.k1.hdfs.filePrefix = startlog.
- # 配置文件滾動(dòng)方式(文件大小32M)
- a1.sinks.k1.hdfs.rollSize = 33554432
- a1.sinks.k1.hdfs.rollCount = 0
- a1.sinks.k1.hdfs.rollInterval = 0
- a1.sinks.k1.hdfs.idleTimeout = 0
- a1.sinks.k1.hdfs.minBlockReplicas = 1
- # 向hdfs上刷新的event的個(gè)數(shù)
- a1.sinks.k1.hdfs.batchSize = 100
- # 使用本地時(shí)間
- a1.sinks.k1.hdfs.useLocalTimeStamp = true
1.3 Agent的配置
- a1.sources = r1
- a1.sinks = k1
- a1.channels = c1
- # taildir source
- a1.sources.r1.type = TAILDIR
- a1.sources.r1.positionFile = /opt/hoult/servers/conf/startlog_position.json
- a1.sources.r1.filegroups = f1
- a1.sources.r1.filegroups.f1 = /user/data/logs/start/.*log
- # memorychannel
- a1.channels.c1.type = memory
- a1.channels.c1.capacity = 100000
- a1.channels.c1.transactionCapacity = 2000
- # hdfs sink
- a1.sinks.k1.type = hdfs
- a1.sinks.k1.hdfs.path = /opt/hoult/servers/logs/start/%Y-%m-%d/
- a1.sinks.k1.hdfs.filePrefix = startlog.
- # 配置文件滾動(dòng)方式(文件大小32M)
- a1.sinks.k1.hdfs.rollSize = 33554432
- a1.sinks.k1.hdfs.rollCount = 0
- a1.sinks.k1.hdfs.rollInterval = 0
- a1.sinks.k1.hdfs.idleTimeout = 0
- a1.sinks.k1.hdfs.minBlockReplicas = 1
- # 向hdfs上刷新的event的個(gè)數(shù)
- a1.sinks.k1.hdfs.batchSize = 1000
- # 使用本地時(shí)間
- a1.sinks.k1.hdfs.useLocalTimeStamp = true
- # Bind the source and sink to the channel
- a1.sources.r1.channels = c1
- a1.sinks.k1.channel = c1
/opt/hoult/servers/conf/flume-log2hdfs.conf
1.4 啟動(dòng)
- flume-ng agent --conf-file /opt/hoult/servers/conf/flume-log2hdfs.conf -name a1 -Dflume.roog.logger=INFO,console
- export JAVA_OPTS="-Xms4000m -Xmx4000m -Dcom.sun.management.jmxremote"
- # 要想使配置文件生效,還要在命令行中指定配置文件目錄
- flume-ng agent --conf /opt/hoult/servers/flume-1.9.0/conf --conf-file /opt/hoult/servers/conf/flume-log2hdfs.conf -name a1 -Dflume.roog.logger=INFO,console
要$FLUME_HOME/conf/flume-env.sh加下面的參數(shù),否則會(huì)報(bào)錯(cuò)誤如下:
1.5 使用自定義攔截器解決Flume Agent替換本地時(shí)間為日志里面的時(shí)間戳
使用netcat source → logger sink來測(cè)試
- # a1是agent的名稱。source、channel、sink的名稱分別為:r1 c1 k1
- a1.sources = r1
- a1.channels = c1
- a1.sinks = k1
- # source
- a1.sources.r1.type = netcat
- a1.sources.r1.bind = linux121
- a1.sources.r1.port = 9999
- a1.sources.r1.interceptors = i1
- a1.sources.r1.interceptors.i1.type = com.hoult.flume.CustomerInterceptor$Builder
- # channel
- a1.channels.c1.type = memory
- a1.channels.c1.capacity = 10000
- a1.channels.c1.transactionCapacity = 100
- # sink
- a1.sinks.k1.type = logger
- # source、channel、sink之間的關(guān)系
- a1.sources.r1.channels = c1
- a1.sinks.k1.channel = c1
攔截器主要代碼如下:
- public class CustomerInterceptor implements Interceptor {
- private static DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyyMMdd");
- @Override
- public void initialize() {
- }
- @Override
- public Event intercept(Event event) {
- // 獲得body的內(nèi)容
- String eventBody = new String(event.getBody(), Charsets.UTF_8);
- // 獲取header的內(nèi)容
- Map<String, String> headerMap = event.getHeaders();
- final String[] bodyArr = eventBody.split("\\s+");
- try {
- String jsonStr = bodyArr[6];
- if (Strings.isNullOrEmpty(jsonStr)) {
- return null;
- }
- // 將 string 轉(zhuǎn)成 json 對(duì)象
- JSONObject jsonObject = JSON.parseObject(jsonStr);
- String timestampStr = jsonObject.getString("time");
- //將timestamp 轉(zhuǎn)為時(shí)間日期類型(格式 :yyyyMMdd)
- long timeStamp = Long.valueOf(timestampStr);
- String date = formatter.format(LocalDateTime.ofInstant(Instant.ofEpochMilli(timeStamp), ZoneId.systemDefault()));
- headerMap.put("logtime", date);
- event.setHeaders(headerMap);
- } catch (Exception e) {
- headerMap.put("logtime", "unknown");
- event.setHeaders(headerMap);
- }
- return event;
- }
- @Override
- public List<Event> intercept(List<Event> events) {
- List<Event> out = new ArrayList<>();
- for (Event event : events) {
- Event outEvent = intercept(event);
- if (outEvent != null) {
- out.add(outEvent);
- }
- }
- return out;
- }
- @Override
- public void close() {
- }
- public static class Builder implements Interceptor.Builder {
- @Override
- public Interceptor build() {
- return new CustomerInterceptor();
- }
- @Override
- public void configure(Context context) {
- }
- }
啟動(dòng)
- flume-ng agent --conf /opt/hoult/servers/flume-1.9.0/conf --conf-file /opt/hoult/servers/conf/flume-test.conf -name a1 -Dflume.roog.logger=INFO,console
- ## 測(cè)試
- telnet linux121 9999
吳邪,小三爺,混跡于后臺(tái),大數(shù)據(jù),人工智能領(lǐng)域的小菜鳥。