自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

Flink SQL 實(shí)戰(zhàn):HBase 的結(jié)合應(yīng)用

大數(shù)據(jù)
本文主要介紹 HBase 和 Flink SQL 的結(jié)合使用。HBase 作為 Google 發(fā)表 Big Table 論文的開源實(shí)現(xiàn)版本,是一種分布式列式存儲(chǔ)的數(shù)據(jù)庫,構(gòu)建在 HDFS 之上的 NoSQL 數(shù)據(jù)庫,非常適合大規(guī)模實(shí)時(shí)查詢,因此 HBase 在實(shí)時(shí)計(jì)算領(lǐng)域使用非常廣泛。

本文主要介紹 HBase 和 Flink SQL 的結(jié)合使用。HBase 作為 Google 發(fā)表 Big Table 論文的開源實(shí)現(xiàn)版本,是一種分布式列式存儲(chǔ)的數(shù)據(jù)庫,構(gòu)建在 HDFS 之上的 NoSQL 數(shù)據(jù)庫,非常適合大規(guī)模實(shí)時(shí)查詢,因此 HBase 在實(shí)時(shí)計(jì)算領(lǐng)域使用非常廣泛??梢詫?shí)時(shí)寫 HBase,也可以利用 buckload 一把把離線 Job 生成 HFile Load 到HBase 表中。而當(dāng)下 Flink SQL 的火熱程度不用多說,F(xiàn)link SQL 也為 HBase 提供了 connector,因此 HBase 與 Flink SQL 的結(jié)合非常有必要實(shí)踐實(shí)踐。

當(dāng)然,本文假設(shè)用戶有一定的 HBase 知識(shí)基礎(chǔ),不會(huì)詳細(xì)去介紹 HBase 的架構(gòu)和原理,本文著重介紹 HBase 和 Flink 在實(shí)際場(chǎng)景中的結(jié)合使用。主要分為兩種場(chǎng)景,第一種場(chǎng)景:HBase 作為維表與 Flink Kafka table 做 temporal table join 的場(chǎng)景;第二種場(chǎng)景:Flink SQL 做計(jì)算之后的結(jié)果寫到 HBase 表,供其他用戶查詢的場(chǎng)景。因此,本文介紹的內(nèi)容如下所示:

  • HBase 環(huán)境準(zhǔn)備
  • 數(shù)據(jù)準(zhǔn)備
  • HBase 作為維度表進(jìn)行 temporal table join的場(chǎng)景
  • Flink SQL 做計(jì)算寫 HBase 的場(chǎng)景
  • 總結(jié)

01 HBase 環(huán)境準(zhǔn)備

由于沒有測(cè)試的 HBase 環(huán)境以及為了避免污染線上 Hbase 環(huán)境。因此,自己 build一個(gè) Hbase docker image(大家可以 docker pull guxinglei/myhbase 拉到本地),是基于官方干凈的 ubuntu imgae 之上安裝了 Hbase 2.2.0 版本以及 JDK1.8 版本。

啟動(dòng)容器,暴露 Hbase web UI 端口以及內(nèi)置 zk 端口,方便我們從 web 頁面看信息以及創(chuàng)建 Flink Hbase table 需要 zk 的鏈接信息。

  1. docker run -it --network=host -p 2181:2181 -p 60011:60011 docker.io/guxinglei/myhbase:latest bash 

 

進(jìn)入容器,啟動(dòng) HBase 集群,以及啟動(dòng) rest server,后續(xù)方便我們用 REST API 來讀取 Flink SQL 寫進(jìn) HBase 的數(shù)據(jù)。

  1. # 啟動(dòng)hbase 集群bin/start-hbase.sh# 后臺(tái)啟動(dòng)restServerbin/hbase-daemon.sh start rest -p 8000 

 

02 數(shù)據(jù)準(zhǔn)備

由于 HBase 環(huán)境是自己臨時(shí)搞的單機(jī)服務(wù),里面沒有數(shù)據(jù),需要往里面寫點(diǎn)數(shù)據(jù)供后續(xù)示例用。在 Flink SQL 實(shí)戰(zhàn)系列第二篇中介紹了如何注冊(cè) Flink Mysql table,我們可以將廣告位表抽取到 HBase 表中,用來做維度表,進(jìn)行 temporal table join。因此,我們需要在 HBase 中創(chuàng)建一張表,同時(shí)還需要?jiǎng)?chuàng)建 Flink HBase table, 這兩張表通過 Flink SQL 的 HBase connector 關(guān)聯(lián)起來。

在容器中啟動(dòng) HBase shell,創(chuàng)建一張名為 dim_hbase 的 HBase 表,建表語句如下所示:

  1. # 在hbase shell創(chuàng)建 hbase表 
  2. hbase(main):002:0> create 'dim_hbase','cf' 
  3. Created table dim_hbase 
  4. Took 1.3120 seconds 
  5. => Hbase::Table - dim_hbase 

 

在 Flink 中創(chuàng)建 Flink HBase table,建表語句如下所示:

  1. # 注冊(cè) Flink Hbase table 
  2. DROP TABLE IF EXISTS flink_rtdw.demo.hbase_dim_table; 
  3. CREATE TABLE flink_rtdw.demo.hbase_dim_table ( 
  4.   rowkey STRING, 
  5.   cf ROW < adspace_name STRING >, 
  6.   PRIMARY KEY (rowkey) NOT ENFORCED 
  7. WITH ( 
  8. 'connector' = 'hbase-1.4'
  9. 'table-name' = 'dim_hbase'
  10. 'sink.buffer-flush.max-rows' = '1000'
  11. 'zookeeper.quorum' = 'localhost:2181' 
  12. ); 

Flink MySQL table 和 Flink HBase table 已經(jīng)創(chuàng)建好了,就可以寫抽取數(shù)據(jù)到HBase 的 SQL job 了,SQL 語句以及 job 狀態(tài)如下所示:

  1. # 抽取Mysql數(shù)據(jù)到Hbase表中 
  2.  
  3.  
  4. insert into 
  5.   hbase_dim_table 
  6. select 
  7. CAST (ID as VARCHAR), 
  8. ROW(name
  9. from 
  10.   mysql_dim_table; 

 

03 HBase 作為維表與 Kafka做 temporal join 的場(chǎng)景

在 Flink SQL join 中,維度表的 join 一定繞不開的,比如訂單金額 join 匯率表,點(diǎn)擊流 join 廣告位的明細(xì)表等等,使用場(chǎng)景非常廣泛。那么作為分布式數(shù)據(jù)庫的 HBase 比 MySQL 作為維度表用作維度表 join 更有優(yōu)勢(shì)。在 Flink SQL 實(shí)戰(zhàn)系列第二篇中,我們注冊(cè)了廣告的點(diǎn)擊流,將 Kafka topic 注冊(cè) Flink Kafka Table,同時(shí)也介紹了 temporal table join 在 Flink SQL 中的使用;那么本節(jié)中將會(huì)介紹 HBase 作為維度表來使用,上面小節(jié)中已經(jīng)將數(shù)據(jù)抽取到 Hbase 中了,我們直接寫 temporal table join 計(jì)算邏輯即可。

作為廣告點(diǎn)擊流的 Flink Kafa table 與 作為廣告位的 Flink HBase table 通過廣告位 Id 進(jìn)行 temporal table join,輸出廣告位 ID 和廣告位中文名字,SQL join 邏輯如下所示:

  1. select adsdw_dwd_max_click_mobileapp.publisher_adspace_adspaceId as publisher_adspace_adspaceId, 
  2.        hbase_dim_table.cf.adspace_name as publisher_adspace_name 
  3. from adsdw_dwd_max_click_mobileapp 
  4. left join hbase_dim_table FOR SYSTEM_TIME AS OF adsdw_dwd_max_click_mobileapp.procTime 
  5. on cast(adsdw_dwd_max_click_mobileapp.publisher_adspace_adspaceId as string) = hbase_dim_table.rowkey; 

temporal table join job 提交 Flink 集群上的狀態(tài)以及 join 結(jié)果如下所示:

04 計(jì)算結(jié)果 sink 到 HBase 作為結(jié)果的場(chǎng)景

上面小節(jié)中,HBase 作為維度表用作 temporal table join 是非常常見的場(chǎng)景,實(shí)際上 HBase 作為存儲(chǔ)計(jì)算結(jié)果也是非常常見的場(chǎng)景,畢竟 Hbase 作為分布式數(shù)據(jù)庫,底層存儲(chǔ)是擁有多副本機(jī)制的 HDFS,維護(hù)簡(jiǎn)單,擴(kuò)容方便, 實(shí)時(shí)查詢快,而且提供各種客戶端方便下游使用存儲(chǔ)在 HBase 中的數(shù)據(jù)。那么本小節(jié)就介紹 Flink SQL 將計(jì)算結(jié)果寫到 HBase,并且通過 REST API 查詢計(jì)算結(jié)果的場(chǎng)景。

進(jìn)入容器中,在 HBase 中新建一張 HBase 表,一個(gè) column family 就滿足需求,建表語句如下所示:

  1. # 注冊(cè)hbase sink table 
  2. create 'dwa_hbase_click_report','cf' 

 

建立好 HBase 表之后,我們需要在 Flink SQL 創(chuàng)建一張 Flink HBase table,這個(gè)時(shí)候我們需要明確 cf 這個(gè) column famaly 下面 column 字段,在 Flink SQL實(shí)戰(zhàn)第二篇中,已經(jīng)注冊(cè)好了作為點(diǎn)擊流的 Flink Kafka table,因此本節(jié)中,將會(huì)計(jì)算點(diǎn)擊流的 uv 和點(diǎn)擊數(shù),因此兩個(gè) column 分別為 uv 和 click_count,建表語句如下所示:

  1. # 注冊(cè) Flink Hbase table 
  2. DROP TABLE IF EXISTS flink_rtdw.demo.dwa_hbase_click_report; 
  3. CREATE TABLE flink_rtdw.demo.dwa_hbase_click_report ( 
  4.   rowkey STRING, 
  5.   cf ROW < uv BIGINT, click_count BIGINT >, 
  6.   PRIMARY KEY (rowkey) NOT ENFORCED 
  7. WITH ( 
  8. 'connector' = 'hbase-1.4'
  9. 'table-name' = 'dwa_hbase_click_report'
  10. 'sink.buffer-flush.max-rows' = '1000'
  11. 'zookeeper.quorum' = 'hostname:2181' 
  12. ); 

 

前面點(diǎn)擊流的 Flink Kafka table 和存儲(chǔ)計(jì)算結(jié)果的 HBase table 和 Flink HBase table 已經(jīng)準(zhǔn)備了,我們將做一個(gè)1分鐘的翻轉(zhuǎn)窗口計(jì)算 uv 和點(diǎn)擊數(shù),并且將計(jì)算結(jié)果寫到 HBase 中。對(duì) HBase 了解的人應(yīng)該知道,rowkey 的設(shè)計(jì)對(duì) hbase regoin 的分布有著非常重要的影響,基于此我們的 rowkey 是使用 Flink SQL 內(nèi)置的 reverse 函數(shù)進(jìn)行廣告位 Id 進(jìn)行反轉(zhuǎn)和窗口啟始時(shí)間做 concat,因此,SQL 邏輯語句如下所示:

  1. INSERT INTO dwa_hbase_click_report 
  2. SELECT 
  3. CONCAT(REVERSE(CAST(publisher_adspace_adspaceId AS STRING)) , 
  4. '_'
  5. CAST((UNIX_TIMESTAMP(DATE_FORMAT(TUMBLE_START(ets, INTERVAL '1' MINUTE),'yyyy-MM-dd HH:mm:ss')) * 1000) AS STRING) 
  6.   ) as rowkey,  
  7. ROW(COUNT(DISTINCT audience_mvid) , COUNT(audience_behavior_click_creative_impressionId)) as cf 
  8. FROM 
  9.   adsdw_dwd_max_click_mobileapp 
  10. WHERE publisher_adspace_adspaceId IS NOT NULL AND audience_mvid IS NOT NULL AND audience_behavior_click_creative_impressionId IS NOT NULL 
  11. GROUP BY 
  12.   TUMBLE(ets, INTERVAL '1' MINUTE), 
  13.   publisher_adspace_adspaceId; 

 

SQL job 提交之后的狀態(tài)以及結(jié)果 check 如下所示:

上述 SQL job 已經(jīng)成功的將結(jié)算結(jié)果寫到 HBase 中了。對(duì)于線上的 HBase 服務(wù)來講,很多同事不一定有 HBase 客戶端的權(quán)限,從而也不能通過 HBase shell 讀取數(shù)據(jù);另外作為線上報(bào)表服務(wù)顯然不可能通過 HBase shell 來通過查詢數(shù)據(jù)。因此,在實(shí)時(shí)報(bào)表場(chǎng)景中,數(shù)據(jù)開發(fā)工程師將數(shù)據(jù)寫入 HBase, 前端工程師通過 REST API 來讀取數(shù)據(jù)。前面我們已經(jīng)啟動(dòng)了 HBase rest server 進(jìn)程,我們可以通 rest 服務(wù)提供讀取 HBase 里面的數(shù)據(jù)。

我們先 get 一條剛剛寫到 HBase 中的數(shù)據(jù)看看,如下所示:

下面我們開始通過 REST API 來查詢 HBase 中的數(shù)據(jù),第一步,執(zhí)行如下語句拿到 scannerId;首先需要將要查詢的 rowkey 進(jìn)行 base64 編碼才能使用,后面需要將結(jié)果進(jìn)行 base64 解碼

rowkey base64 編碼前:0122612_1606295280000 base64 編碼之后:MDEyMjYxMl8xNjA2Mjk1MjgwMDAw

  1. curl -vi -X PUT \ 
  2.          -H "Accept: text/xml" \ 
  3.          -H "Content-Type: text/xml" \ 
  4.          -d '<Scanner startRow="MDEyMjYxMl8xNjA2Mjk1MjgwMDAw" endRow="MDEyMjYxMl8xNjA2Mjk1MjgwMDAw"></Scanner>' \ 
  5. "http://hostname:8000/dwa_hbase_click_report/scanner" 

 

第二步,執(zhí)行如下語句根據(jù)上條語句返回的 scannerID 查詢數(shù)據(jù),可以看到返回的結(jié)果:

  1. curl -vi -X GET \ 
  2.          -H "Accept: application/json" \ 
  3. "http://hostname:8000/dwa_hbase_click_report/scanner/16063768141736ac0a8b5" 

 

第三步,查詢完畢之后,執(zhí)行如下語句刪除該 scannerId:

  1. curl -vi -X DELETE \ 
  2.          -H "Accept: text/xml" \ 
  3. "http://hostname:8000/dwa_hbase_click_report/scanner/16063768141736ac0a8b5" 

 

五. 總結(jié)

在本篇文章中,我們介紹了 HBase 和 Flink SQL 的結(jié)合使用比較廣泛兩種的場(chǎng)景:作為維度表用以及存儲(chǔ)計(jì)算結(jié)果;同時(shí)使用 REST API 對(duì) HBase 中的數(shù)據(jù)進(jìn)行查詢,對(duì)于查詢用戶來說,避免直接暴露 HBase 的 zk,同時(shí)將 rest server 和 HBase 集群解耦。

作者簡(jiǎn)介

余敖,360 數(shù)據(jù)開發(fā)高級(jí)工程師,目前專注于基于 Flink 的實(shí)時(shí)數(shù)倉建設(shè)與平臺(tái)化工作。對(duì) Flink、Kafka、Hive、Spark 等進(jìn)行數(shù)據(jù) ETL 和數(shù)倉開發(fā)有豐富的經(jīng)驗(yàn)。

責(zé)任編輯:未麗燕 來源: Flink 中文社區(qū)
相關(guān)推薦

2019-05-05 09:03:06

HBase大數(shù)據(jù)存儲(chǔ)數(shù)據(jù)存儲(chǔ)

2024-11-08 16:13:43

Python開發(fā)

2025-04-27 01:05:00

AI智能日志

2010-08-06 10:49:46

RIP路由

2010-12-14 11:30:11

2022-06-06 08:51:56

PandasSQLPython

2014-07-14 13:36:23

HBase實(shí)戰(zhàn)

2021-12-09 06:59:24

FlinkSQL 開發(fā)

2023-10-24 20:32:40

大數(shù)據(jù)

2025-03-26 02:00:00

C#Docker開發(fā)

2022-05-18 09:02:28

Flink SQLSQL字符串

2022-05-15 09:57:59

Flink SQL時(shí)間語義

2022-04-22 09:05:12

蔚來汽車Flink實(shí)時(shí)數(shù)倉

2009-06-04 20:38:15

MyEclipseWeblogicWeb應(yīng)用管理

2015-06-02 10:36:42

大數(shù)據(jù)

2020-11-20 07:51:02

JavaSPI機(jī)制

2012-03-29 13:56:58

HBase數(shù)據(jù)庫

2017-05-22 08:05:46

HBase阿里搜索實(shí)踐

2013-04-26 15:13:26

Ted YuHBase大數(shù)據(jù)全球技術(shù)峰會(huì)

2014-11-19 14:30:33

hbasenosql數(shù)據(jù)庫bigtable
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)