大數(shù)據(jù)Hadoop之—Apache Hudi 數(shù)據(jù)湖實(shí)戰(zhàn)操作
一、概述
Hudi(Hadoop Upserts Deletes and Incrementals),簡(jiǎn)稱Hudi,是一個(gè)流式數(shù)據(jù)湖平臺(tái),支持對(duì)海量數(shù)據(jù)快速更新,內(nèi)置表格式,支持事務(wù)的存儲(chǔ)層、 一系列表服務(wù)、數(shù)據(jù)服務(wù)(開(kāi)箱即用的攝取工具)以及完善的運(yùn)維監(jiān)控工具,它可以以極低的延遲將數(shù)據(jù)快速存儲(chǔ)到HDFS或云存儲(chǔ)(S3)的工具,最主要的特點(diǎn)支持記錄級(jí)別的插入更新(Upsert)和刪除,同時(shí)還支持增量查詢。
GitHub地址:https://github.com/apache/hudi
官方文檔:https://hudi.apache.org/cn/docs/overview
關(guān)于Apache Hudi 數(shù)據(jù)湖 也可以參考我這篇文章:大數(shù)據(jù)Hadoop之——新一代流式數(shù)據(jù)湖平臺(tái) Apache Hudi
二、Hudi CLI
構(gòu)建hudi后,可以通過(guò)cd hudi cli&&./hudi-cli.sh啟動(dòng)shell。一個(gè)hudi表駐留在DFS上的一個(gè)稱為basePath的位置,我們需要這個(gè)位置才能連接到hudi表。Hudi庫(kù)有效地在內(nèi)部管理此表,使用.hoodie子文件夾跟蹤所有元數(shù)據(jù)。
編譯生成的包如下:
三、Spark 與 Hudi 整合使用
Hudi 流式數(shù)據(jù)湖平臺(tái),協(xié)助管理數(shù)據(jù),借助HDFS文件系統(tǒng)存儲(chǔ)數(shù)據(jù),使用Spark操作數(shù)據(jù)。
1)Spark 測(cè)試
2)Spark 與 Hudi 整合使用
官方示例:https://hudi.apache.org/docs/quick-start-guide/在spark-shell命令行,對(duì)Hudi表數(shù)據(jù)進(jìn)行操作,需要運(yùn)行spark-shell命令是,添加相關(guān)的依賴包,命令如下:
- 啟動(dòng)spark-shell
【第一種方式】在線聯(lián)網(wǎng)下載相關(guān)jar包
【第二種方式】離線使用已經(jīng)下載好的jar包。
- 導(dǎo)入park及Hudi相關(guān)包
- 定義變量
- 模擬生成Trip乘車數(shù)據(jù)
其中,DataGenerator可以用于生成測(cè)試數(shù)據(jù),用來(lái)完成后續(xù)操作。
- 將模擬數(shù)據(jù)List轉(zhuǎn)換為DataFrame數(shù)據(jù)集
- 將數(shù)據(jù)寫(xiě)入到hudi
本地存儲(chǔ)
HDFS 存儲(chǔ)
四、Flink 與 Hudi 整合使用
官方示例:https://hudi.apache.org/docs/flink-quick-start-guide
1)啟動(dòng)flink集群
下載地址:http://flink.apache.org/downloads.html
2) 啟動(dòng)flink SQL 客戶端
3)添加數(shù)據(jù)
HDFS上查看
4)查詢數(shù)據(jù)(批式查詢)
5)更新數(shù)據(jù)
6)Streaming Query(流式查詢)
首先創(chuàng)建表t2,設(shè)置相關(guān)屬性,以流的方式查詢讀取,映射到上面表:t1。
- read.streaming.enabled設(shè)置為true,表明通過(guò)streaming的方式讀取表數(shù)據(jù);
- read.streaming.check-interval指定了source監(jiān)控新的commits的間隔時(shí)間4s;
- table.type設(shè)置表類型為 MERGE_ON_READ;
注意:查看可能會(huì)遇到如下錯(cuò)誤:
[ERROR] Could not execute SQL statement. Reason:
java.lang.ClassNotFoundException:
org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat
【解決】添加hadoop-mapreduce-client-core-xxx.jar和hive-exec-xxx.jar到Flink lib中。