如何在Flink 1.9中使用 Hive?
Apache Flink 從 1.9.0 版本開始增加了與 Hive 集成的功能,用戶可以通過 Flink 來訪問 Hive 的元數(shù)據(jù),以及讀寫 Hive 中的表。本文將主要從項(xiàng)目的設(shè)計(jì)架構(gòu)、最新進(jìn)展、使用說明等方面來介紹這一功能。
Flink on Hive 介紹
SQL 是大數(shù)據(jù)領(lǐng)域中的重要應(yīng)用場景,為了完善 Flink 的生態(tài),發(fā)掘 Flink 在批處理方面的潛力,我們決定增強(qiáng) FlinkSQL 的功能,從而讓用戶能夠通過 Flink 完成更多的任務(wù)。
Hive 是大數(shù)據(jù)領(lǐng)域最早出現(xiàn)的 SQL 引擎,發(fā)展至今有著豐富的功能和廣泛的用戶基礎(chǔ)。之后出現(xiàn)的 SQL 引擎,如 Spark SQL、Impala 等,都在一定程度上提供了與 Hive 集成的功能,從而方便用戶使用現(xiàn)有的數(shù)據(jù)倉庫、進(jìn)行作業(yè)遷移等。因此我們認(rèn)為提供與 Hive 交互的能力對于 FlinkSQL 也是非常重要的。
設(shè)計(jì)架構(gòu)
與 Hive 集成主要包含了元數(shù)據(jù)和實(shí)際表數(shù)據(jù)的訪問,因此我們會(huì)從這兩方面介紹一下該項(xiàng)目的架構(gòu)。
元數(shù)據(jù)
為了訪問外部系統(tǒng)的元數(shù)據(jù),F(xiàn)link 提供了 ExternalCatalog 的概念。但是目前 ExternalCatalog 的定義非常不完整,基本處于不可用的狀態(tài)。因此,我們提出了一套全新的 Catalog 接口來取代現(xiàn)有的 ExternalCatalog。新的 Catalog 能夠支持?jǐn)?shù)據(jù)庫、表、分區(qū)等多種元數(shù)據(jù)對象;允許在一個(gè)用戶 Session 中維護(hù)多個(gè) Catalog 實(shí)例,從而同時(shí)訪問多個(gè)外部系統(tǒng);并且 Catalog 以可插拔的方式接入 Flink,允許用戶提供自定義的實(shí)現(xiàn)。下圖展示了新的 Catalog API 的總體架構(gòu)。
創(chuàng)建 TableEnvironment 的時(shí)候會(huì)同時(shí)創(chuàng)建一個(gè) CatalogManager,負(fù)責(zé)管理不同的 Catalog 實(shí)例。TableEnvironment 通過 Catalog 來為 Table API 和 SQL Client 用戶提供元數(shù)據(jù)服務(wù)。
目前 Catalog 有兩個(gè)實(shí)現(xiàn),GenericInMemoryCatalog 和 HiveCatalog。其中 GenericInMemoryCatalog 保持了原有的 Flink 元數(shù)據(jù)管理機(jī)制,將所有元數(shù)據(jù)保存在內(nèi)存中。而 HiveCatalog 會(huì)與一個(gè) Hive Metastore 的實(shí)例連接,提供元數(shù)據(jù)持久化的能力。要使用 Flink 與 Hive 進(jìn)行交互,用戶需要配置一個(gè) HiveCatalog,并通過 HiveCatalog 訪問 Hive 中的元數(shù)據(jù)。另一方面,HiveCatalog 也可以用來處理 Flink 自身的元數(shù)據(jù),在這種場景下,HiveCatalog 僅將 Hive Metastore 作為持久化存儲(chǔ)使用,寫入 Hive Metastore 中的元數(shù)據(jù)并不一定是 Hive 所支持的格式。一個(gè) HiveCatalog 實(shí)例可以同時(shí)支持這兩種模式,用戶無需為管理 Hive 和 Flink 的元數(shù)據(jù)創(chuàng)建不同的實(shí)例。
另外,我們設(shè)計(jì)了 HiveShim 來支持不同版本的 Hive Metastore。目前支持的 Hive 版本包括 2.3.4 和 1.2.1。
表數(shù)據(jù)
我們提供了 Hive Data Connector 來讀寫 Hive 的表數(shù)據(jù)。Hive Data Connector 盡可能的復(fù)用了 Hive 本身的 Input/Output Format 和 SerDe 等類,這樣做的好處一方面是減少了代碼重復(fù),更重要的是可以最大程度的保持與 Hive 的兼容,即 Flink 寫入的數(shù)據(jù) Hive 可以正常讀取,并且反之亦然。
與 HiveCatalog 類似的,Hive Data Connector 目前支持的 Hive 版本也是 2.3.4 和 1.2.1。
項(xiàng)目進(jìn)展
Flink 與 Hive 集成的功能會(huì)在 1.9.0 版本中作為試用功能發(fā)布,用戶可以通過 Table API 或者 SQL Client 的模式與 Hive 進(jìn)行交互。下面列出的是在 1.9.0 中已經(jīng)支持的功能:
- 提供簡單的 DDL 來讀取 Hive 元數(shù)據(jù),比如 show databases、show tables、describe table 等。
- 可通過 Catalog API 來修改 Hive 元數(shù)據(jù),如 create table、drop table 等。
- 讀取 Hive 數(shù)據(jù),支持分區(qū)表和非分區(qū)表。
- 寫 Hive 數(shù)據(jù),支持非分區(qū)表。
- 支持 Text、ORC、Parquet、SequenceFile 等文件格式。
- 支持調(diào)用用戶在 Hive 中創(chuàng)建的 UDF。
由于是試用功能,因此還有一些方面不夠完善,下面列出的是在 1.9.0 中缺失的功能:
- 不支持INSERT OVERWRITE。
- 不支持寫分區(qū)表。
- 不支持ACID表。
- 不支持Bucket表。
- 不支持View。
部分?jǐn)?shù)據(jù)類型不支持,包括Decimal、Char、Varchar、Date、Time、Timestamp、Interval、Union等。
如何應(yīng)用
添加依賴
使用 Flink 與 Hive 集成的功能,用戶首先需要添加相應(yīng)的依賴。如果是使用 SQL Client,則需要將依賴的 jar 添加到 Flink 的 lib 目錄中;如果使用 Table API,則需要將相應(yīng)的依賴添加到項(xiàng)目中(如pom.xml)。
如上文所述,目前支持的 Hive 版本包括 2.3.4 和 1.2.1,下表列出的是針對不同版本所需的依賴。
其中 flink-shaded-hadoop-2-uber 包含了 Hive 對于 Hadoop 的依賴。如果不用 Flink 提供的包,用戶也可以將集群中使用的 Hadoop 包添加進(jìn)來,不過需要保證添加的 Hadoop 版本與 Hive 所依賴的版本是兼容的(Hive 2.3.4 依賴的 Hadoop 版本是 2.7.2;Hive 1.2.1 依賴的 Hadoop 版本是 2.6.0)。
依賴的 Hive 包(即 hive-exec 和 hive-metastore)也可以使用用戶集群中 Hive 所提供的 jar 包,詳情請見支持不同的 Hive 版本。
配置 HiveCatalog
要與 Hive 交互,必須使用 HiveCatalog,下面介紹一下如何配置 HiveCatalog。
SQL Client
使用 SQL Client 時(shí),用戶需要在 sql-client-defaults.yaml 中指定自己所需的 Catalog,在 sql-client-defaults.yaml 的“catalogs”列表中可以指定一個(gè)或多個(gè) Catalog 實(shí)例。以下的示例展示了如何指定一個(gè) HiveCatalog:
- catalogs:
- # A typical catalog definition looks like:
- - name: myhive
- type: hive
- hive-conf-dir: /path/to/hive_conf_dir
- hive-version: 2.3.4
其中 name 是用戶給每個(gè) Catalog 實(shí)例指定的名字, Catalog 名字和 DB 名字構(gòu)成了 FlinkSQL 中元數(shù)據(jù)的命名空間,因此需要保證每個(gè) Catalog 的名字是唯一的。type 表示 Catalog 的類型,對于 HiveCatalog 而言,type 應(yīng)該指定為 hive。hive-conf-dir 用于讀取 Hive 的配置文件,用戶可以將其設(shè)定為集群中 Hive 的配置文件目錄。hive-version 用于指定所使用的 Hive 版本,可以設(shè)定為 2.3.4 或者 1.2.1。
指定了 HiveCatalog 以后,用戶就可以啟動(dòng) sql-client,并通過以下命令驗(yàn)證 HiveCatalog 已經(jīng)正確加載。
- Flink SQL> show catalogs;
- default_catalog
- myhive
- Flink SQL> use catalog myhive;
其中 show catalogs 會(huì)列出加載的所有 Catalog 實(shí)例。需要注意的是,除了用戶在sql-client-defaults.yaml 文件中配置的 Catalog 以外,F(xiàn)linkSQL 還會(huì)自動(dòng)加載一個(gè) GenericInMemoryCatalog 實(shí)例作為內(nèi)置的 Catalog,該內(nèi)置 Catalog 默認(rèn)名字為 default_catalog。
使用 use catalog 可以設(shè)定用戶 Session 當(dāng)前的 Catalog。用戶在 SQL 語句中訪問元數(shù)據(jù)對象(如 DB、Table 等)時(shí),如果不指定 Catalog 名字,則 FlinkSQL 會(huì)在當(dāng)前 Catalog 中進(jìn)行查找。
Table API
下面的代碼展示了如何通過 TableAPI 來創(chuàng)建 HiveCatalog,并注冊到 TableEnvironment。
- String name = "myhive";
- String defaultDatabase = "default";
- String hiveConfDir = "/path/to/hive_conf_dir";
- String version = "2.3.4";
- TableEnvironment tableEnv = …; // create TableEnvironment
- HiveCatalog hiveCatalog = new HiveCatalog(name, defaultDatabase,
- hiveConfDir, version);
- tableEnv.registerCatalog(name, hiveCatalog);
- tableEnv.useCatalog(name);
將 HiveCatalog 注冊到 TableEnvironment 以后,就可以在通過 TableEnvironment 提交 SQL 的時(shí)候訪問 HiveCatalog 中的元數(shù)據(jù)了。與 SQL Client 類似, TableEnvironment 也提供了 useCatalog 接口讓用戶設(shè)定當(dāng)前 Catalog。
讀寫 Hive 表
設(shè)置好 HiveCatalog 以后就可以通過 SQL Client 或者 Table API 來讀寫 Hive 中的表了。
SQL Client
假設(shè) Hive 中已經(jīng)有一張名為 src 的表,我們可以用以下的 SQL 語句來讀寫這張表。
- Flink SQL> describe src;
- root
- |-- key: STRING
- |-- value: STRING
- Flink SQL> select * from src;
- key value
- 100 val_100
- 298 val_298
- 9 val_9
- 341 val_341
- 498 val_498
- 146 val_146
- 458 val_458
- 362 val_362
- 186 val_186
- …… ……
- Flink SQL> insert into src values ('newKey','newVal');
Table API
類似的,也可以通過 Table API 來讀寫上面提到的這張表。下面的代碼展示了如何實(shí)現(xiàn)這一操作。
- TableEnvironment tableEnv = …; // create TableEnvironment
- tableEnv.registerCatalog("myhive", hiveCatalog);
- // set myhive as current catalog
- tableEnv.useCatalog("myhive");
- Table src = tableEnv.sqlQuery("select * from src");
- // write src into a sink or do further analysis
- ……
- tableEnv.sqlUpdate("insert into src values ('newKey', 'newVal')");
- tableEnv.execute("insert into src");
支持不同的 Hive 版本
Flink 1.9.0 中支持的 Hive 版本是 2.3.4 和 1.2.1,目前我們只針對這兩個(gè)版本進(jìn)行了測試。使用 SQL Client 時(shí),如果用戶沒有在 sql-client-defaults.yaml 文件中指定 Hive 版本,我們會(huì)自動(dòng)檢測 classpath 中的 Hive 版本。如果檢測到的 Hive 版本不是 2.3.4 或 1.2.1 就會(huì)報(bào)錯(cuò)。
借助 Hive 兼容性的保證,其它不同的小版本也比較可能是可以正常工作的。因此,如果用戶使用的 Hive 小版本與我們所支持的不同,可以指定一個(gè)支持的版本來試用與 Hive 集成的功能。比如用戶使用的 Hive 版本是 2.3.3,可以在 sql-client-defaults.yaml 文件或者代碼中將 Hive 版本指定為 2.3.4。
執(zhí)行模式與 Planner 的選擇
Flink 1.9.0 中 Hive 的 TableSink 只能在 batch 模式下工作,因此如果用戶想要使用 Hive 的 TableSink,需要將執(zhí)行模式設(shè)置為 batch。
Flink 1.9.0 增加了新的 blink planner,由于 blink planner 相比于原來的 planner 功能更加全面,因此我們建議在使用 FlinkSQL 與 Hive 集成時(shí)使用 blink planner。后續(xù)新的功能也可能會(huì)只支持 blink planner。
使用 SQL Client 時(shí)可以像這樣在 sql-client-defaults.yaml 中指定執(zhí)行模式和 planner:
- execution:
- # select the implementation responsible for planning table programs
- # possible values are 'old' (used by default) or 'blink'
- planner: blink
- # 'batch' or 'streaming' execution
- type: batch
對應(yīng)的 Table API 的寫法如下:
- EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build();
- TableEnvironment tableEnv = TableEnvironment.create(settings);
后期規(guī)劃
我們會(huì)在 Flink 后續(xù)版本中進(jìn)一步完善與 Hive 集成的功能,預(yù)計(jì)會(huì)在 1.10.0 版本中實(shí)現(xiàn) Production-Ready。我們在后續(xù)版本中計(jì)劃開展的工作包括:
- 更完整的數(shù)據(jù)類型支持
- 支持寫分區(qū)表,包括靜態(tài)和動(dòng)態(tài)分區(qū)
- 支持 INSERT OVERWRITE
- 支持 View
- 更完整的 DDL、DML 的支持
- 支持 Hive 的 TableSink 在 streaming 模式下工作,以便用戶將流式數(shù)據(jù)寫入到 Hive 中
- 測試并支持更多的 Hive 版本
- 支持 Bucket 表
- 性能測試與優(yōu)化