自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

Flume-接入Hive數(shù)倉搭建流程

大數(shù)據(jù)
實(shí)時(shí)流接入數(shù)倉,基本在大公司都會(huì)有,在Flume1.8以后支持taildir source, 其有許多特點(diǎn),而被廣泛使用。本文以開源Flume流為例,介紹流接入HDFS ,后面在其上面建立ods層外表。

實(shí)時(shí)流接入數(shù)倉,基本在大公司都會(huì)有,在Flume1.8以后支持taildir source, 其有以下幾個(gè)特點(diǎn),而被廣泛使用:

  1. 使用正則表達(dá)式匹配目錄中的文件名
  2. 監(jiān)控的文件中,一旦有數(shù)據(jù)寫入,F(xiàn)lume就會(huì)將信息寫入到指定的Sink
  3. 高可靠,不會(huì)丟失數(shù)據(jù)
  4. 不會(huì)對(duì)跟蹤文件有任何處理,不會(huì)重命名也不會(huì)刪除
  5. 不支持Windows,不能讀二進(jìn)制文件。支持按行讀取文本文件

本文以開源Flume流為例,介紹流接入HDFS ,后面在其上面建立ods層外表。

1.1 taildir source配置

  1. a1.sources.r1.type = TAILDIR 
  2. a1.sources.r1.positionFile = /opt/hoult/servers/conf/startlog_position.json 
  3. a1.sources.r1.filegroups = f1 
  4. a1.sources.r1.filegroups.f1 =/opt/hoult/servers/logs/start/.*log 

1.2 hdfs sink 配置

  1. a1.sinks.k1.type = hdfs 
  2. a1.sinks.k1.hdfs.path = /user/data/logs/start/logs/start/%Y-%m-%d/ 
  3. a1.sinks.k1.hdfs.filePrefix = startlog. 
  4. # 配置文件滾動(dòng)方式(文件大小32M) 
  5. a1.sinks.k1.hdfs.rollSize = 33554432 
  6. a1.sinks.k1.hdfs.rollCount = 0 
  7. a1.sinks.k1.hdfs.rollInterval = 0 
  8. a1.sinks.k1.hdfs.idleTimeout = 0 
  9. a1.sinks.k1.hdfs.minBlockReplicas = 1 
  10. # 向hdfs上刷新的event的個(gè)數(shù) 
  11. a1.sinks.k1.hdfs.batchSize = 100 
  12. # 使用本地時(shí)間 
  13. a1.sinks.k1.hdfs.useLocalTimeStamp = true  

1.3 Agent的配置

 

  1. a1.sources = r1 
  2. a1.sinks = k1 
  3. a1.channels = c1 
  4. # taildir source 
  5. a1.sources.r1.type = TAILDIR 
  6. a1.sources.r1.positionFile = /opt/hoult/servers/conf/startlog_position.json 
  7. a1.sources.r1.filegroups = f1 
  8. a1.sources.r1.filegroups.f1 = /user/data/logs/start/.*log 
  9. # memorychannel 
  10. a1.channels.c1.type = memory 
  11. a1.channels.c1.capacity = 100000 
  12. a1.channels.c1.transactionCapacity = 2000 
  13. # hdfs sink 
  14. a1.sinks.k1.type = hdfs 
  15. a1.sinks.k1.hdfs.path = /opt/hoult/servers/logs/start/%Y-%m-%d/ 
  16. a1.sinks.k1.hdfs.filePrefix = startlog. 
  17. # 配置文件滾動(dòng)方式(文件大小32M) 
  18. a1.sinks.k1.hdfs.rollSize = 33554432 
  19. a1.sinks.k1.hdfs.rollCount = 0 
  20. a1.sinks.k1.hdfs.rollInterval = 0 
  21. a1.sinks.k1.hdfs.idleTimeout = 0 
  22. a1.sinks.k1.hdfs.minBlockReplicas = 1 
  23. # 向hdfs上刷新的event的個(gè)數(shù) 
  24. a1.sinks.k1.hdfs.batchSize = 1000 
  25. # 使用本地時(shí)間 
  26. a1.sinks.k1.hdfs.useLocalTimeStamp = true 
  27. # Bind the source and sink to the channel 
  28. a1.sources.r1.channels = c1 
  29. a1.sinks.k1.channel = c1  

/opt/hoult/servers/conf/flume-log2hdfs.conf

1.4 啟動(dòng)

 

  1. flume-ng agent --conf-file /opt/hoult/servers/conf/flume-log2hdfs.conf -name a1 -Dflume.roog.logger=INFO,console 
  2.  
  3. export JAVA_OPTS="-Xms4000m -Xmx4000m -Dcom.sun.management.jmxremote" 
  4. # 要想使配置文件生效,還要在命令行中指定配置文件目錄 
  5. flume-ng agent --conf /opt/hoult/servers/flume-1.9.0/conf --conf-file /opt/hoult/servers/conf/flume-log2hdfs.conf -name a1 -Dflume.roog.logger=INFO,console 

要$FLUME_HOME/conf/flume-env.sh加下面的參數(shù),否則會(huì)報(bào)錯(cuò)誤如下:

1.5 使用自定義攔截器解決Flume Agent替換本地時(shí)間為日志里面的時(shí)間戳

使用netcat source → logger sink來測(cè)試

 

  1. # a1是agent的名稱。source、channel、sink的名稱分別為:r1 c1 k1 
  2. a1.sources = r1 
  3. a1.channels = c1 
  4. a1.sinks = k1 
  5. # source 
  6. a1.sources.r1.type = netcat 
  7. a1.sources.r1.bind = linux121 
  8. a1.sources.r1.port = 9999 
  9. a1.sources.r1.interceptors = i1 
  10. a1.sources.r1.interceptors.i1.type = com.hoult.flume.CustomerInterceptor$Builder 
  11. # channel 
  12. a1.channels.c1.type = memory 
  13. a1.channels.c1.capacity = 10000 
  14. a1.channels.c1.transactionCapacity = 100 
  15. # sink 
  16. a1.sinks.k1.type = logger 
  17. # source、channel、sink之間的關(guān)系 
  18. a1.sources.r1.channels = c1 
  19. a1.sinks.k1.channel = c1  

攔截器主要代碼如下:

 

  1. public class CustomerInterceptor implements Interceptor { 
  2.     private static DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyyMMdd"); 
  3.  
  4.     @Override 
  5.     public void initialize() { 
  6.  
  7.     } 
  8.  
  9.     @Override 
  10.     public Event intercept(Event event) { 
  11.         // 獲得body的內(nèi)容 
  12.         String eventBody = new String(event.getBody(), Charsets.UTF_8); 
  13.         // 獲取header的內(nèi)容 
  14.         Map<String, String> headerMap = event.getHeaders(); 
  15.         final String[] bodyArr = eventBody.split("\\s+"); 
  16.         try { 
  17.             String jsonStr = bodyArr[6]; 
  18.             if (Strings.isNullOrEmpty(jsonStr)) { 
  19.                 return null
  20.             } 
  21.             // 將 string 轉(zhuǎn)成 json 對(duì)象 
  22.             JSONObject jsonObject = JSON.parseObject(jsonStr); 
  23.             String timestampStr = jsonObject.getString("time"); 
  24.             //將timestamp 轉(zhuǎn)為時(shí)間日期類型(格式 :yyyyMMdd) 
  25.             long timeStamp = Long.valueOf(timestampStr); 
  26.             String date = formatter.format(LocalDateTime.ofInstant(Instant.ofEpochMilli(timeStamp), ZoneId.systemDefault())); 
  27.             headerMap.put("logtime"date); 
  28.             event.setHeaders(headerMap); 
  29.         } catch (Exception e) { 
  30.             headerMap.put("logtime""unknown"); 
  31.             event.setHeaders(headerMap); 
  32.         } 
  33.         return event; 
  34.  
  35.     } 
  36.  
  37.     @Override 
  38.     public List<Event> intercept(List<Event> events) { 
  39.         List<Event> out = new ArrayList<>(); 
  40.         for (Event event : events) { 
  41.             Event outEvent = intercept(event); 
  42.             if (outEvent != null) { 
  43.                 out.add(outEvent); 
  44.             } 
  45.         } 
  46.         return out
  47.     } 
  48.  
  49.     @Override 
  50.     public void close() { 
  51.  
  52.     } 
  53.  
  54.     public static class Builder implements Interceptor.Builder { 
  55.         @Override 
  56.         public Interceptor build() { 
  57.             return new CustomerInterceptor(); 
  58.         } 
  59.  
  60.         @Override 
  61.         public void configure(Context context) { 
  62.         } 
  63.     } 

啟動(dòng)

 

  1. flume-ng agent --conf /opt/hoult/servers/flume-1.9.0/conf --conf-file /opt/hoult/servers/conf/flume-test.conf -name a1 -Dflume.roog.logger=INFO,console 
  2. ## 測(cè)試 
  3. telnet linux121 9999  

吳邪,小三爺,混跡于后臺(tái),大數(shù)據(jù),人工智能領(lǐng)域的小菜鳥。

責(zé)任編輯:未麗燕 來源: segmentfault.com
相關(guān)推薦

2023-08-07 01:25:39

2022-01-02 23:02:16

數(shù)據(jù)中臺(tái)選型

2021-01-31 23:54:23

數(shù)倉模型

2022-08-22 17:46:56

虛擬數(shù)倉Impala

2021-01-04 05:42:48

數(shù)倉模型設(shè)計(jì)

2022-07-26 15:38:58

數(shù)據(jù)倉數(shù)據(jù)治理數(shù)據(jù)團(tuán)隊(duì)

2023-01-03 17:43:39

網(wǎng)易郵箱數(shù)倉

2022-11-25 10:07:12

數(shù)倉數(shù)據(jù)流開發(fā)

2021-08-02 17:24:37

數(shù)字化

2021-12-02 08:41:30

數(shù)倉建模設(shè)計(jì)

2022-03-01 17:16:16

數(shù)倉建模ID Mapping

2023-11-23 16:53:56

數(shù)據(jù)倉庫大數(shù)據(jù)

2022-02-18 09:02:04

數(shù)據(jù)倉庫治理

2022-01-13 10:45:48

數(shù)倉對(duì)象主題域

2022-08-01 15:58:48

數(shù)據(jù)倉庫架構(gòu)數(shù)據(jù)

2021-11-30 20:12:00

FTP系統(tǒng)

2021-08-11 07:53:22

數(shù)倉維度建模

2022-12-08 10:16:58

數(shù)據(jù)模型

2021-01-05 05:30:30

數(shù)倉維度SCD

2023-02-20 07:33:47

Teradata數(shù)據(jù)倉庫
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)