自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

大數(shù)據(jù)Hadoop之——Apache Hudi 數(shù)據(jù)湖實(shí)戰(zhàn)操作(Spark,F(xiàn)link與Hudi整合)

大數(shù)據(jù) Hadoop
Hudi(Hadoop Upserts Deletes and Incrementals)?,簡稱?Hudi?,是一個流式數(shù)據(jù)湖平臺,支持對海量數(shù)據(jù)快速更新,內(nèi)置表格式,支持事務(wù)的存儲層、 一系列表服務(wù)、數(shù)據(jù)服務(wù)(開箱即用的攝取工具)以及完善的運(yùn)維監(jiān)控工具,它可以以極低的延遲將數(shù)據(jù)快速存儲到HDFS或云存儲(S3)的工具,最主要的特點(diǎn)支持記錄級別的插入更新(Upsert)和刪除,同時還支持增量查詢

一、概述

Hudi(Hadoop Upserts Deletes and Incrementals)?,簡稱?Hudi?,是一個流式數(shù)據(jù)湖平臺,支持對海量數(shù)據(jù)快速更新,內(nèi)置表格式,支持事務(wù)的存儲層、 一系列表服務(wù)、數(shù)據(jù)服務(wù)(開箱即用的攝取工具)以及完善的運(yùn)維監(jiān)控工具,它可以以極低的延遲將數(shù)據(jù)快速存儲到HDFS或云存儲(S3)的工具,最主要的特點(diǎn)支持記錄級別的插入更新(Upsert)和刪除,同時還支持增量查詢。

GitHub地址:https://github.com/apache/hudi

官方文檔:https://hudi.apache.org/cn/docs/overview

關(guān)于Apache Hudi 數(shù)據(jù)湖 也可以參考我這篇文章:大數(shù)據(jù)Hadoop之——新一代流式數(shù)據(jù)湖平臺 Apache Hudi

圖片

二、Hudi CLI

構(gòu)建hudi后,可以通過cd hudi cli&&./hudi-cli.sh啟動shell。一個hudi表駐留在DFS上的一個稱為basePath的位置,我們需要這個位置才能連接到hudi表。Hudi庫有效地在內(nèi)部管理此表,使用.hoodie子文件夾跟蹤所有元數(shù)據(jù)。

編譯生成的包如下:

圖片

# 啟動
./hudi-cli/hudi-cli.sh

圖片

三、Spark 與 Hudi 整合使用

Hudi 流式數(shù)據(jù)湖平臺,協(xié)助管理數(shù)據(jù),借助HDFS文件系統(tǒng)存儲數(shù)據(jù),使用Spark操作數(shù)據(jù)。

圖片

Hadoop 安裝可參考我這篇文章:大數(shù)據(jù)Hadoop原理介紹+安裝+實(shí)戰(zhàn)操作(HDFS+YARN+MapReduce)?Hadoop HA安裝可參考我這篇文章:大數(shù)據(jù)Hadoop之——Hadoop 3.3.4 HA(高可用)原理與實(shí)現(xiàn)(QJM)Spark 環(huán)境配置可以參考我這篇文章:大數(shù)據(jù)Hadoop之——計算引擎Spark

1)Spark 測試

cd $SPARK_HOME
hdfs dfs -mkdir /tmp/
hdfs dfs -put README.md /tmp/
hdfs dfs -text /tmp/README.md

# 啟動spark-shell
./bin/spark-shell --master local[2]

val datasRDD = sc.textFile("/tmp/README.md")
# 行數(shù)
datasRDD.count()
# 讀取第一行數(shù)據(jù)
datasRDD.first()
val dataframe = spark.read.textFile("/tmp/README.md")
dataframe.printSchema
dataframe.show(10,false)

圖片

2)Spark 與 Hudi 整合使用

官方示例:https://hudi.apache.org/docs/quick-start-guide/在spark-shell命令行,對Hudi表數(shù)據(jù)進(jìn)行操作,需要運(yùn)行spark-shell命令是,添加相關(guān)的依賴包,命令如下:

1、啟動spark-shell

【第一種方式】在線聯(lián)網(wǎng)下載相關(guān)jar包

### 啟動spark-shell,使用spark-shell操作hudi數(shù)據(jù)湖
### 第一種方式
./bin/spark-shell \
--master local[2] \
--packages org.apache.hudi:hudi-spark3.2-bundle_2.12:0.12.0 \
--conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \
--conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog' \
--conf 'spark.sql.extensinotallow=org.apache.spark.sql.hudi.HoodieSparkSessionExtension'

### 上述命令需要聯(lián)網(wǎng),基于ivy下載下載相關(guān)jar包到本地,然后加載到CLASSPATH,其中包含三個jar包。

【第二種方式】離線使用已經(jīng)下載好的jar包

### 第二種方式,使用--jars
cd /opt/apache
wget https://repo1.maven.org/maven2/org/apache/spark/spark-avro_2.12/3.3.0/spark-avro_2.12-3.3.0.jar

cd $SPARK_HOME
./bin/spark-shell \
--master local[2] \
--jars /opt/apache/hudi-0.12.0/packaging/hudi-spark-bundle/target/hudi-spark3.2-bundle_2.12-0.12.0.jar,/opt/apache/hudi-0.12.0/hudi-examples/hudi-examples-spark/target/lib/unused-1.0.0.jar,/opt/apache/spark-avro_2.12-3.3.0.jar \
--conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog' \
--conf "spark.serializer=org.apache.spark.serializer.KryoSerializer"

2、導(dǎo)入park及Hudi相關(guān)包

import org.apache.hudi.QuickstartUtils._
import scala.collection.JavaConversions._
import org.apache.spark.sql.SaveMode._
import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.config.HoodieWriteConfig._
import org.apache.hudi.common.model.HoodieRecord

3、定義變量

val tableName = "hudi_trips_cow"
# 存儲到HDFS
val basePath = "hdfs://hadoop-hadoop-hdfs-nn:9000/tmp/hudi_trips_cow"
# 存儲到本地
# val basePath = "file:///tmp/hudi_trips_cow"

4、模擬生成Trip乘車數(shù)據(jù)

##構(gòu)建DataGenerator對象,用于模擬生成10條Trip乘車數(shù)據(jù)
val dataGen = new DataGenerator

val inserts = convertToStringList(dataGen.generateInserts(10))

其中,DataGenerator可以用于生成測試數(shù)據(jù),用來完成后續(xù)操作。

5、將模擬數(shù)據(jù)List轉(zhuǎn)換為DataFrame數(shù)據(jù)集

##轉(zhuǎn)成df
val df = spark.read.json(spark.sparkContext.parallelize(inserts,2))

##查看數(shù)據(jù)結(jié)構(gòu)
df.printSchema()
##查看數(shù)據(jù)
df.show()
# 指定字段查詢
df.select("rider","begin_lat","begin_lon","driver","end_lat","end_lon","fare","partitionpath","ts","uuid").show(10,truncate=false)

6、將數(shù)據(jù)寫入到hudi

# 將數(shù)據(jù)保存到hudi表中,由于Hudi誕生時基于Spark框架,所以SparkSQL支持Hudi數(shù)據(jù)源,直接通過format指定數(shù)據(jù)源Source,設(shè)置相關(guān)屬性保存數(shù)據(jù)即可,注意,hudi不是正真存儲數(shù)據(jù),而是管理數(shù)據(jù)。

df.write.format("hudi").
options(getQuickstartWriteConfigs).
option(PRECOMBINE_FIELD_OPT_KEY, "ts").
option(RECORDKEY_FIELD_OPT_KEY, "uuid").
option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
option(TABLE_NAME, tableName).
mode(Overwrite).
save(basePath)

## 重要參數(shù)說明
#參數(shù):getQuickstartWriteConfigs,設(shè)置寫入/更新數(shù)據(jù)至Hudi時,Shuffle時分區(qū)數(shù)目
#參數(shù):PRECOMBINE_FIELD_OPT_KEY,數(shù)據(jù)合并時,依據(jù)主鍵字段
#參數(shù):RECORDKEY_FIELD_OPT_KEY,每條記錄的唯一id,支持多個字段
#參數(shù):PARTITIONPATH_FIELD_OPT_KEY,用于存放數(shù)據(jù)的分區(qū)字段

本地存儲

圖片

HDFS 存儲

圖片

四、Flink 與 Hudi 整合使用

官方示例:https://hudi.apache.org/docs/flink-quick-start-guide

1)啟動flink集群

下載地址:http://flink.apache.org/downloads.html

### 1、下載軟件包
wget https://dlcdn.apache.org/flink/flink-1.14.6/flink-1.14.6-bin-scala_2.12.tgz
tar -xf flink-1.14.6-bin-scala_2.12.tgz
export FLINK_HOME=/opt/apache/flink-1.14.6

### 2、設(shè)置HADOOP_CLASSPATH
# HADOOP_HOME is your hadoop root directory after unpack the binary package.
export HADOOP_CLASSPATH=`$HADOOP_HOME/bin/hadoop classpath`
export HADOOP_CONF_DIR='/opt/apache/hadoop/etc/hadoop'

### 3、啟動單節(jié)點(diǎn)flink 集群
# Start the Flink standalone cluster,這里先修改slot數(shù)量,默認(rèn)是1,這里改成4
# taskmanager.numberOfTaskSlots: 4
cd $FLINK_HOME
./bin/start-cluster.sh

# 測試可用性
./bin/flink run examples/batch/WordCount.jar

圖片

2) 啟動flink SQL 客戶端

# 【第一種方式】指定jar包
./bin/sql-client.sh embedded -j ../hudi-0.12.0/packaging/hudi-flink-bundle/target/hudi-flink1.14-bundle-0.12.0.jar shell

# 【第二種方式】還可以將jar包放在$FINK_HOME/lib目錄下
./bin/sql-client.sh embedded shell

3)添加數(shù)據(jù)

-- sets up the result mode to tableau to show the results directly in the CLI
SET 'sql-client.execution.result-mode' = 'tableau';

CREATE TABLE t1(
uuid VARCHAR(20) PRIMARY KEY NOT ENFORCED,
name VARCHAR(10),
age INT,
ts TIMESTAMP(3),
`partition` VARCHAR(20)
)
PARTITIONED BY (`partition`)
WITH (
'connector' = 'hudi',
'path' = 'hdfs://hadoop-hadoop-hdfs-nn:9000/tmp/flink-hudi-t1',
'table.type' = 'MERGE_ON_READ' -- this creates a MERGE_ON_READ table, by default is COPY_ON_WRITE
);

INSERT INTO t1 VALUES ('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:01','par1');
-- insert data using values
INSERT INTO t1 VALUES
('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:01','par1'),
('id2','Stephen',33,TIMESTAMP '1970-01-01 00:00:02','par1'),
('id3','Julian',53,TIMESTAMP '1970-01-01 00:00:03','par2'),
('id4','Fabian',31,TIMESTAMP '1970-01-01 00:00:04','par2'),
('id5','Sophia',18,TIMESTAMP '1970-01-01 00:00:05','par3'),
('id6','Emma',20,TIMESTAMP '1970-01-01 00:00:06','par3'),
('id7','Bob',44,TIMESTAMP '1970-01-01 00:00:07','par4'),
('id8','Han',56,TIMESTAMP '1970-01-01 00:00:08','par4');

圖片

HDFS上查看

圖片

4)查詢數(shù)據(jù)(批式查詢)

select * from t1;

圖片

5)更新數(shù)據(jù)

-- this would update the record with key 'id1'
insert into t1 values
('id1','Danny',27,TIMESTAMP '1970-01-01 00:00:01','par1');

6)Streaming Query(流式查詢)

首先創(chuàng)建表t2,設(shè)置相關(guān)屬性,以流的方式查詢讀取,映射到上面表:t1

  • read.streaming.enabled 設(shè)置為true,表明通過streaming的方式讀取表數(shù)據(jù);
  • read.streaming.check-interval 指定了source監(jiān)控新的commits的間隔時間4s
  • table.type 設(shè)置表類型為 MERGE_ON_READ
CREATE TABLE t2(
uuid VARCHAR(20) PRIMARY KEY NOT ENFORCED,
name VARCHAR(10),
age INT,
ts TIMESTAMP(3),
`partition` VARCHAR(20)
)
PARTITIONED BY (`partition`)
WITH (
'connector' = 'hudi',
'path' = 'hdfs://hadoop-hadoop-hdfs-nn:9000/tmp/flink-hudi-t1',
'table.type' = 'MERGE_ON_READ',
'read.streaming.enabled' = 'true', -- this option enable the streaming read
'read.start-commit' = '20210316134557', -- specifies the start commit instant time
'read.streaming.check-interval' = '4' -- specifies the check interval for finding new source commits, default 60s.
);

-- Then query the table in stream mode
select * from t2;

注意:查看可能會遇到如下錯誤:

[ERROR] Could not execute SQL statement. Reason: java.lang.ClassNotFoundException: org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat

【解決】添加hadoop-mapreduce-client-core-xxx.jar和hive-exec-xxx.jar到Flink lib中。

cp /opt/apache/hadoop-3.3.2/share/hadoop/mapreduce/hadoop-mapreduce-client-core-3.3.2.jar $FLINK_HOME/lib
cp ./hudi-0.12.0/hudi-examples/hudi-examples-spark/target/lib/hive-exec-2.3.1-core.jar $FLINK_HOME/lib

圖片

Hive 與 Hudi的整合,小伙伴可以先看官網(wǎng)文檔:https://hudi.apache.org/docs/syncing_metastore/#flink-setup

責(zé)任編輯:武曉燕 來源: 大數(shù)據(jù)與云原生技術(shù)分享
相關(guān)推薦

2022-10-24 00:26:51

大數(shù)據(jù)Hadoop存儲層

2022-10-17 10:48:50

Hudi大數(shù)據(jù)Hadoop

2021-08-31 10:07:16

Flink Hud數(shù)據(jù)湖阿里云

2022-06-09 14:19:46

順豐數(shù)據(jù)集成Flink

2021-09-13 13:46:29

Apache HudiB 站數(shù)據(jù)湖

2020-03-26 10:05:18

大數(shù)據(jù)IT互聯(lián)網(wǎng)

2021-09-07 10:41:21

CDC數(shù)據(jù)湖Apache Hud

2020-10-30 09:27:25

開源技術(shù) 數(shù)據(jù)

2022-11-03 07:22:42

2022-11-01 07:43:30

2022-10-28 07:10:51

HudiJavaHive

2017-10-11 11:10:02

Spark Strea大數(shù)據(jù)流式處理

2022-12-08 07:17:49

2017-02-14 13:11:23

HadoopStormSamza

2019-07-22 10:45:31

2022-07-20 15:10:38

Docker大數(shù)據(jù)平臺

2022-06-01 13:52:11

開源大數(shù)據(jù)

2023-12-14 13:01:00

Hudivivo

2022-07-20 11:47:18

數(shù)據(jù)

2018-07-25 15:31:51

SparkFlink大數(shù)據(jù)
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號