自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

如何在Flink 1.9中使用 Hive?

大數(shù)據(jù)
Apache Flink 從 1.9.0 版本開始增加了與 Hive 集成的功能,用戶可以通過 Flink 來訪問 Hive 的元數(shù)據(jù),以及讀寫 Hive 中的表。本文將主要從項(xiàng)目的設(shè)計(jì)架構(gòu)、最新進(jìn)展、使用說明等方面來介紹這一功能。

Apache Flink 從 1.9.0 版本開始增加了與 Hive 集成的功能,用戶可以通過 Flink 來訪問 Hive 的元數(shù)據(jù),以及讀寫 Hive 中的表。本文將主要從項(xiàng)目的設(shè)計(jì)架構(gòu)、最新進(jìn)展、使用說明等方面來介紹這一功能。

如何在Flink 1.9中使用 Hive?

Flink on Hive 介紹

SQL 是大數(shù)據(jù)領(lǐng)域中的重要應(yīng)用場景,為了完善 Flink 的生態(tài),發(fā)掘 Flink 在批處理方面的潛力,我們決定增強(qiáng) FlinkSQL 的功能,從而讓用戶能夠通過 Flink 完成更多的任務(wù)。

Hive 是大數(shù)據(jù)領(lǐng)域最早出現(xiàn)的 SQL 引擎,發(fā)展至今有著豐富的功能和廣泛的用戶基礎(chǔ)。之后出現(xiàn)的 SQL 引擎,如 Spark SQL、Impala 等,都在一定程度上提供了與 Hive 集成的功能,從而方便用戶使用現(xiàn)有的數(shù)據(jù)倉庫、進(jìn)行作業(yè)遷移等。因此我們認(rèn)為提供與 Hive 交互的能力對于 FlinkSQL 也是非常重要的。

設(shè)計(jì)架構(gòu)

與 Hive 集成主要包含了元數(shù)據(jù)和實(shí)際表數(shù)據(jù)的訪問,因此我們會(huì)從這兩方面介紹一下該項(xiàng)目的架構(gòu)。

元數(shù)據(jù)

為了訪問外部系統(tǒng)的元數(shù)據(jù),F(xiàn)link 提供了 ExternalCatalog 的概念。但是目前 ExternalCatalog 的定義非常不完整,基本處于不可用的狀態(tài)。因此,我們提出了一套全新的 Catalog 接口來取代現(xiàn)有的 ExternalCatalog。新的 Catalog 能夠支持?jǐn)?shù)據(jù)庫、表、分區(qū)等多種元數(shù)據(jù)對象;允許在一個(gè)用戶 Session 中維護(hù)多個(gè) Catalog 實(shí)例,從而同時(shí)訪問多個(gè)外部系統(tǒng);并且 Catalog 以可插拔的方式接入 Flink,允許用戶提供自定義的實(shí)現(xiàn)。下圖展示了新的 Catalog API 的總體架構(gòu)。

如何在 Flink 1.9 中使用 Hive?

創(chuàng)建 TableEnvironment 的時(shí)候會(huì)同時(shí)創(chuàng)建一個(gè) CatalogManager,負(fù)責(zé)管理不同的 Catalog 實(shí)例。TableEnvironment 通過 Catalog 來為 Table API 和 SQL Client 用戶提供元數(shù)據(jù)服務(wù)。

目前 Catalog 有兩個(gè)實(shí)現(xiàn),GenericInMemoryCatalog 和 HiveCatalog。其中 GenericInMemoryCatalog 保持了原有的 Flink 元數(shù)據(jù)管理機(jī)制,將所有元數(shù)據(jù)保存在內(nèi)存中。而 HiveCatalog 會(huì)與一個(gè) Hive Metastore 的實(shí)例連接,提供元數(shù)據(jù)持久化的能力。要使用 Flink 與 Hive 進(jìn)行交互,用戶需要配置一個(gè) HiveCatalog,并通過 HiveCatalog 訪問 Hive 中的元數(shù)據(jù)。另一方面,HiveCatalog 也可以用來處理 Flink 自身的元數(shù)據(jù),在這種場景下,HiveCatalog 僅將 Hive Metastore 作為持久化存儲(chǔ)使用,寫入 Hive Metastore 中的元數(shù)據(jù)并不一定是 Hive 所支持的格式。一個(gè) HiveCatalog 實(shí)例可以同時(shí)支持這兩種模式,用戶無需為管理 Hive 和 Flink 的元數(shù)據(jù)創(chuàng)建不同的實(shí)例。

另外,我們設(shè)計(jì)了 HiveShim 來支持不同版本的 Hive Metastore。目前支持的 Hive 版本包括 2.3.4 和 1.2.1。

表數(shù)據(jù)

我們提供了 Hive Data Connector 來讀寫 Hive 的表數(shù)據(jù)。Hive Data Connector 盡可能的復(fù)用了 Hive 本身的 Input/Output Format 和 SerDe 等類,這樣做的好處一方面是減少了代碼重復(fù),更重要的是可以最大程度的保持與 Hive 的兼容,即 Flink 寫入的數(shù)據(jù) Hive 可以正常讀取,并且反之亦然。

與 HiveCatalog 類似的,Hive Data Connector 目前支持的 Hive 版本也是 2.3.4 和 1.2.1。

項(xiàng)目進(jìn)展

Flink 與 Hive 集成的功能會(huì)在 1.9.0 版本中作為試用功能發(fā)布,用戶可以通過 Table API 或者 SQL Client 的模式與 Hive 進(jìn)行交互。下面列出的是在 1.9.0 中已經(jīng)支持的功能:

  • 提供簡單的 DDL 來讀取 Hive 元數(shù)據(jù),比如 show databases、show tables、describe table 等。
  • 可通過 Catalog API 來修改 Hive 元數(shù)據(jù),如 create table、drop table 等。
  • 讀取 Hive 數(shù)據(jù),支持分區(qū)表和非分區(qū)表。
  • 寫 Hive 數(shù)據(jù),支持非分區(qū)表。
  • 支持 Text、ORC、Parquet、SequenceFile 等文件格式。
  • 支持調(diào)用用戶在 Hive 中創(chuàng)建的 UDF。

由于是試用功能,因此還有一些方面不夠完善,下面列出的是在 1.9.0 中缺失的功能:

  • 不支持INSERT OVERWRITE。
  • 不支持寫分區(qū)表。
  • 不支持ACID表。
  • 不支持Bucket表。
  • 不支持View。

部分?jǐn)?shù)據(jù)類型不支持,包括Decimal、Char、Varchar、Date、Time、Timestamp、Interval、Union等。

如何應(yīng)用

添加依賴

使用 Flink 與 Hive 集成的功能,用戶首先需要添加相應(yīng)的依賴。如果是使用 SQL Client,則需要將依賴的 jar 添加到 Flink 的 lib 目錄中;如果使用 Table API,則需要將相應(yīng)的依賴添加到項(xiàng)目中(如pom.xml)。

如上文所述,目前支持的 Hive 版本包括 2.3.4 和 1.2.1,下表列出的是針對不同版本所需的依賴。

 

如何在 Flink 1.9 中使用 Hive?

其中 flink-shaded-hadoop-2-uber 包含了 Hive 對于 Hadoop 的依賴。如果不用 Flink 提供的包,用戶也可以將集群中使用的 Hadoop 包添加進(jìn)來,不過需要保證添加的 Hadoop 版本與 Hive 所依賴的版本是兼容的(Hive 2.3.4 依賴的 Hadoop 版本是 2.7.2;Hive 1.2.1 依賴的 Hadoop 版本是 2.6.0)。

依賴的 Hive 包(即 hive-exec 和 hive-metastore)也可以使用用戶集群中 Hive 所提供的 jar 包,詳情請見支持不同的 Hive 版本。

配置 HiveCatalog

要與 Hive 交互,必須使用 HiveCatalog,下面介紹一下如何配置 HiveCatalog。

SQL Client

使用 SQL Client 時(shí),用戶需要在 sql-client-defaults.yaml 中指定自己所需的 Catalog,在 sql-client-defaults.yaml 的“catalogs”列表中可以指定一個(gè)或多個(gè) Catalog 實(shí)例。以下的示例展示了如何指定一個(gè) HiveCatalog:

 

  1. catalogs: 
  2. # A typical catalog definition looks like
  3.   - name: myhive 
  4.     type: hive 
  5. hive-conf-dir: /path/to/hive_conf_dir 
  6. hive-version: 2.3.4 

其中 name 是用戶給每個(gè) Catalog 實(shí)例指定的名字, Catalog 名字和 DB 名字構(gòu)成了 FlinkSQL 中元數(shù)據(jù)的命名空間,因此需要保證每個(gè) Catalog 的名字是唯一的。type 表示 Catalog 的類型,對于 HiveCatalog 而言,type 應(yīng)該指定為 hive。hive-conf-dir 用于讀取 Hive 的配置文件,用戶可以將其設(shè)定為集群中 Hive 的配置文件目錄。hive-version 用于指定所使用的 Hive 版本,可以設(shè)定為 2.3.4 或者 1.2.1。

指定了 HiveCatalog 以后,用戶就可以啟動(dòng) sql-client,并通過以下命令驗(yàn)證 HiveCatalog 已經(jīng)正確加載。

 

  1. Flink SQL> show catalogs; 
  2. default_catalog 
  3. myhive 
  4.  
  5. Flink SQL> use catalog myhive; 

其中 show catalogs 會(huì)列出加載的所有 Catalog 實(shí)例。需要注意的是,除了用戶在sql-client-defaults.yaml 文件中配置的 Catalog 以外,F(xiàn)linkSQL 還會(huì)自動(dòng)加載一個(gè) GenericInMemoryCatalog 實(shí)例作為內(nèi)置的 Catalog,該內(nèi)置 Catalog 默認(rèn)名字為 default_catalog。

使用 use catalog 可以設(shè)定用戶 Session 當(dāng)前的 Catalog。用戶在 SQL 語句中訪問元數(shù)據(jù)對象(如 DB、Table 等)時(shí),如果不指定 Catalog 名字,則 FlinkSQL 會(huì)在當(dāng)前 Catalog 中進(jìn)行查找。

Table API

下面的代碼展示了如何通過 TableAPI 來創(chuàng)建 HiveCatalog,并注冊到 TableEnvironment。

 

  1. String name = "myhive"
  2. String defaultDatabase = "default"
  3. String hiveConfDir = "/path/to/hive_conf_dir"
  4. String version = "2.3.4"
  5.  
  6. TableEnvironment tableEnv = …; // create TableEnvironment 
  7. HiveCatalog hiveCatalog = new HiveCatalog(name, defaultDatabase, 
  8. hiveConfDir, version); 
  9. tableEnv.registerCatalog(name, hiveCatalog); 
  10. tableEnv.useCatalog(name); 

將 HiveCatalog 注冊到 TableEnvironment 以后,就可以在通過 TableEnvironment 提交 SQL 的時(shí)候訪問 HiveCatalog 中的元數(shù)據(jù)了。與 SQL Client 類似, TableEnvironment 也提供了 useCatalog 接口讓用戶設(shè)定當(dāng)前 Catalog。

讀寫 Hive 表

設(shè)置好 HiveCatalog 以后就可以通過 SQL Client 或者 Table API 來讀寫 Hive 中的表了。

SQL Client

假設(shè) Hive 中已經(jīng)有一張名為 src 的表,我們可以用以下的 SQL 語句來讀寫這張表。

 

  1. Flink SQL> describe src; 
  2. root 
  3.  |-- key: STRING 
  4.  |-- value: STRING 
  5.  
  6.  
  7. Flink SQL> select * from src; 
  8.  
  9.                       key                     value 
  10.                        100                   val_100 
  11.                        298                   val_298 
  12.                          9                     val_9 
  13.                        341                   val_341 
  14.                        498                   val_498 
  15.                        146                   val_146 
  16.                        458                   val_458 
  17.                        362                   val_362 
  18.                        186                   val_186 
  19.                        ……                   …… 
  20.  
  21. Flink SQL> insert into src values ('newKey','newVal'); 

Table API

類似的,也可以通過 Table API 來讀寫上面提到的這張表。下面的代碼展示了如何實(shí)現(xiàn)這一操作。

 

  1. TableEnvironment tableEnv = …; // create TableEnvironment 
  2. tableEnv.registerCatalog("myhive", hiveCatalog); 
  3. // set myhive as current catalog 
  4. tableEnv.useCatalog("myhive"); 
  5.  
  6. Table src = tableEnv.sqlQuery("select * from src"); 
  7. // write src into a sink or do further analysis 
  8. …… 
  9.  
  10. tableEnv.sqlUpdate("insert into src values ('newKey', 'newVal')"); 
  11. tableEnv.execute("insert into src"); 

支持不同的 Hive 版本

Flink 1.9.0 中支持的 Hive 版本是 2.3.4 和 1.2.1,目前我們只針對這兩個(gè)版本進(jìn)行了測試。使用 SQL Client 時(shí),如果用戶沒有在 sql-client-defaults.yaml 文件中指定 Hive 版本,我們會(huì)自動(dòng)檢測 classpath 中的 Hive 版本。如果檢測到的 Hive 版本不是 2.3.4 或 1.2.1 就會(huì)報(bào)錯(cuò)。

借助 Hive 兼容性的保證,其它不同的小版本也比較可能是可以正常工作的。因此,如果用戶使用的 Hive 小版本與我們所支持的不同,可以指定一個(gè)支持的版本來試用與 Hive 集成的功能。比如用戶使用的 Hive 版本是 2.3.3,可以在 sql-client-defaults.yaml 文件或者代碼中將 Hive 版本指定為 2.3.4。

執(zhí)行模式與 Planner 的選擇

Flink 1.9.0 中 Hive 的 TableSink 只能在 batch 模式下工作,因此如果用戶想要使用 Hive 的 TableSink,需要將執(zhí)行模式設(shè)置為 batch。

Flink 1.9.0 增加了新的 blink planner,由于 blink planner 相比于原來的 planner 功能更加全面,因此我們建議在使用 FlinkSQL 與 Hive 集成時(shí)使用 blink planner。后續(xù)新的功能也可能會(huì)只支持 blink planner。

使用 SQL Client 時(shí)可以像這樣在 sql-client-defaults.yaml 中指定執(zhí)行模式和 planner:

 

  1. execution: 
  2.   # select the implementation responsible for planning table programs 
  3.   # possible values are 'old' (used by defaultor 'blink' 
  4.   planner: blink 
  5.   # 'batch' or 'streaming' execution 
  6.   type: batch 

對應(yīng)的 Table API 的寫法如下:

 

  1. EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build(); 
  2. TableEnvironment tableEnv = TableEnvironment.create(settings); 

后期規(guī)劃

我們會(huì)在 Flink 后續(xù)版本中進(jìn)一步完善與 Hive 集成的功能,預(yù)計(jì)會(huì)在 1.10.0 版本中實(shí)現(xiàn) Production-Ready。我們在后續(xù)版本中計(jì)劃開展的工作包括:

  • 更完整的數(shù)據(jù)類型支持
  • 支持寫分區(qū)表,包括靜態(tài)和動(dòng)態(tài)分區(qū)
  • 支持 INSERT OVERWRITE
  • 支持 View
  • 更完整的 DDL、DML 的支持
  • 支持 Hive 的 TableSink 在 streaming 模式下工作,以便用戶將流式數(shù)據(jù)寫入到 Hive 中
  • 測試并支持更多的 Hive 版本
  • 支持 Bucket 表
  • 性能測試與優(yōu)化

 

責(zé)任編輯:未麗燕 來源: 阿里云棲社區(qū)
相關(guān)推薦

2022-12-08 08:00:00

.NET?7BitArray數(shù)據(jù)執(zhí)行

2019-08-26 09:20:29

Windows 10虛擬桌面Windows

2016-08-11 10:43:56

2011-08-10 09:31:41

Hibernateunion

2021-03-09 07:27:40

Kafka開源分布式

2015-08-27 09:46:09

swiftAFNetworkin

2021-06-09 09:36:18

DjangoElasticSearLinux

2022-05-17 08:25:10

TypeScript接口前端

2022-06-23 08:00:53

PythonDateTime模塊

2024-01-18 08:37:33

socketasyncio線程

2019-09-16 19:00:48

Linux變量

2014-07-02 09:47:06

SwiftCocoaPods

2020-04-09 10:18:51

Bash循環(huán)Linux

2024-09-06 11:34:15

RustAI語言

2020-11-30 11:55:07

Docker命令Linux

2020-11-05 11:16:06

Apache CassCassandra虛擬表

2014-05-15 11:22:17

Windows SerNIC Teaming

2020-01-07 09:50:41

Windows 10上帝模式Windows

2025-03-21 09:58:59

Python數(shù)據(jù)類型安全

2021-09-10 10:30:22

Java代碼
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號