大數(shù)據(jù)查詢——HBase 讀寫設(shè)計(jì)與實(shí)踐
背景介紹
本項(xiàng)目主要解決 check 和 opinion2 張歷史數(shù)據(jù)表(歷史數(shù)據(jù)是指當(dāng)業(yè)務(wù)發(fā)生過程中的完整中間流程和結(jié)果數(shù)據(jù))的在線查詢。原實(shí)現(xiàn)基于 Oracle 提供存儲(chǔ)查詢服務(wù),隨著數(shù)據(jù)量的不斷增加,在寫入和讀取過程中面臨性能問題,且歷史數(shù)據(jù)僅供業(yè)務(wù)查詢參考,并不影響實(shí)際流程,從系統(tǒng)結(jié)構(gòu)上來說,放在業(yè)務(wù)鏈條上游比較重。本項(xiàng)目將其置于下游數(shù)據(jù)處理 Hadoop 分布式平臺(tái)來實(shí)現(xiàn)此需求。下面列一些具體的需求指標(biāo):
- 數(shù)據(jù)量:目前 check 表的累計(jì)數(shù)據(jù)量為 5000w+ 行,11GB;opinion 表的累計(jì)數(shù)據(jù)量為 3 億 +,約 100GB。每日增量約為每張表 50 萬 + 行,只做 insert,不做 update。
- 查詢要求:check 表的主鍵為 id(Oracle 全局 id),查詢鍵為 check_id,一個(gè) check_id 對(duì)應(yīng)多條記錄,所以需返回對(duì)應(yīng)記錄的 list; opinion 表的主鍵也是 id,查詢鍵是 bussiness_no 和 buss_type,同理返回 list。單筆查詢返回 List 大小約 50 條以下,查詢頻率為 100 筆 / 天左右,查詢響應(yīng)時(shí)間 2s。
技術(shù)選型
從數(shù)據(jù)量及查詢要求來看,分布式平臺(tái)上具備大數(shù)據(jù)量存儲(chǔ),且提供實(shí)時(shí)查詢能力的組件*** HBase。根據(jù)需求做了初步的調(diào)研和評(píng)估后,大致確定 HBase 作為主要存儲(chǔ)組件。將需求拆解為寫入和讀取 HBase 兩部分。
讀取 HBase 相對(duì)來說方案比較確定,基本根據(jù)需求設(shè)計(jì) RowKey,然后根據(jù) HBase 提供的豐富 API(get,scan 等)來讀取數(shù)據(jù),滿足性能要求即可。
寫入 HBase 的方法大致有以下幾種:
- Java 調(diào)用 HBase 原生 API,HTable.add(List(Put))。
- MapReduce 作業(yè),使用 TableOutputFormat 作為輸出。
- Bulk Load,先將數(shù)據(jù)按照 HBase 的內(nèi)部數(shù)據(jù)格式生成持久化的 HFile 文件,然后復(fù)制到合適的位置并通知 RegionServer ,即完成海量數(shù)據(jù)的入庫。其中生成 Hfile 這一步可以選擇 MapReduce 或 Spark。
本文采用第 3 種方式,Spark + Bulk Load 寫入 HBase。該方法相對(duì)其他 2 種方式有以下優(yōu)勢:
- BulkLoad 不會(huì)寫 WAL,也不會(huì)產(chǎn)生 flush 以及 split。
- 如果我們大量調(diào)用 PUT 接口插入數(shù)據(jù),可能會(huì)導(dǎo)致大量的 GC 操作。除了影響性能之外,嚴(yán)重時(shí)甚至可能會(huì)對(duì) HBase 節(jié)點(diǎn)的穩(wěn)定性造成影響,采用 BulkLoad 無此顧慮。
- 過程中沒有大量的接口調(diào)用消耗性能。
- 可以利用 Spark 強(qiáng)大的計(jì)算能力。
圖示如下:

設(shè)計(jì)
環(huán)境信息
- Hadoop 2.5-2.7
- HBase 0.98.6
- Spark 2.0.0-2.1.1
- Sqoop 1.4.6
表設(shè)計(jì)
本段的重點(diǎn)在于討論 HBase 表的設(shè)計(jì),其中 RowKey 是最重要的部分。為了方便說明問題,我們先來看看數(shù)據(jù)格式。以下以 check 舉例,opinion 同理。
check 表(原表字段有 18 個(gè),為方便描述,本文截選 5 個(gè)字段示意)

如上圖所示,主鍵為 id,32 位字母和數(shù)字隨機(jī)組成,業(yè)務(wù)查詢字段 check_id 為不定長字段(不超過 32 位),字母和數(shù)字組成,同一 check_id 可能對(duì)應(yīng)多條記錄,其他為相關(guān)業(yè)務(wù)字段。眾所周知,HBase 是基于 RowKey 提供查詢,且要求 RowKey 是唯一的。RowKey 的設(shè)計(jì)主要考慮的是數(shù)據(jù)將怎樣被訪問。初步來看,我們有 2 種設(shè)計(jì)方法。
- 拆成 2 張表,一張表 id 作為 RowKey,列為 check 表對(duì)應(yīng)的各列;另一張表為索引表,RowKey 為 check_id,每一列對(duì)應(yīng)一個(gè) id。查詢時(shí),先找到 check_id 對(duì)應(yīng)的 id list,然后根據(jù) id 找到對(duì)應(yīng)的記錄。均為 HBase 的 get 操作。
- 將本需求可看成是一個(gè)范圍查詢,而不是單條查詢。將 check_id 作為 RowKey 的前綴,后面跟 id。查詢時(shí)設(shè)置 Scan 的 startRow 和 stopRow,找到對(duì)應(yīng)的記錄 list。
***種方法優(yōu)點(diǎn)是表結(jié)構(gòu)簡單,RowKey 容易設(shè)計(jì),缺點(diǎn)為 1)數(shù)據(jù)寫入時(shí),一行原始數(shù)據(jù)需要寫入到 2 張表,且索引表寫入前需要先掃描該 RowKey 是否存在,如果存在,則加入一列,否則新建一行,2)讀取的時(shí)候,即便是采用 List, 也至少需要讀取 2 次表。第二種設(shè)計(jì)方法,RowKey 設(shè)計(jì)較為復(fù)雜,但是寫入和讀取都是一次性的。綜合考慮,我們采用第二種設(shè)計(jì)方法。
RowKey 設(shè)計(jì)
熱點(diǎn)問題
HBase 中的行是以 RowKey 的字典序排序的,其熱點(diǎn)問題通常發(fā)生在大量的客戶端直接訪問集群的一個(gè)或極少數(shù)節(jié)點(diǎn)。默認(rèn)情況下,在開始建表時(shí),表只會(huì)有一個(gè) region,并隨著 region 增大而拆分成更多的 region,這些 region 才能分布在多個(gè) regionserver 上從而使負(fù)載均分。對(duì)于我們的業(yè)務(wù)需求,存量數(shù)據(jù)已經(jīng)較大,因此有必要在一開始就將 HBase 的負(fù)載均攤到每個(gè) regionserver,即做 pre-split。常見的防治熱點(diǎn)的方法為加鹽,hash 散列,自增部分(如時(shí)間戳)翻轉(zhuǎn)等。
RowKey 設(shè)計(jì)
Step1:確定預(yù)分區(qū)數(shù)目,創(chuàng)建 HBase Table
不同的業(yè)務(wù)場景及數(shù)據(jù)特點(diǎn)確定數(shù)目的方式不一樣,我個(gè)人認(rèn)為應(yīng)該綜合考慮數(shù)據(jù)量大小和集群大小等因素。比如 check 表大小約為 11G,測試集群大小為 10 臺(tái)機(jī)器,hbase.hregion.max.filesize=3G(當(dāng) region 的大小超過這個(gè)數(shù)時(shí),將拆分為 2 個(gè)),所以初始化時(shí)盡量使得一個(gè) region 的大小為 1~2G(不會(huì)一上來就 split),region 數(shù)據(jù)分到 11G/2G=6 個(gè),但為了充分利用集群資源,本文中 check 表劃分為 10 個(gè)分區(qū)。如果數(shù)據(jù)量為 100G,且不斷增長,集群情況不變,則 region 數(shù)目增大到 100G/2G=50 個(gè)左右較合適。Hbase check 表建表語句如下:

其中,Column Family =‘f’,越短越好。
COMPRESSION => 'SNAPPY',HBase 支持 3 種壓縮 LZO, GZIP and Snappy。GZIP 壓縮率高,但是耗 CPU。后兩者差不多,Snappy 稍微勝出一點(diǎn),cpu 消耗的比 GZIP 少。一般在 IO 和 CPU 均衡下,選擇 Snappy。
DATA_BLOCK_ENCODING => 'FAST_DIFF',本案例中 RowKey 較為接近,通過以下命令查看 key 長度相對(duì) value 較長。


Step2:RowKey 組成
Salt
讓數(shù)據(jù)均衡的分布到各個(gè) Region 上,結(jié)合 pre-split,我們對(duì)查詢鍵即 check 表的 check_id 求 hashcode 值,然后 modulus(numRegions) 作為前綴,注意補(bǔ)齊數(shù)據(jù)。

說明:如果數(shù)據(jù)量達(dá)上百 G 以上,則 numRegions 自然到 2 位數(shù),則 salt 也為 2 位。
Hash 散列
因?yàn)?check_id 本身是不定長的字符數(shù)字串,為使數(shù)據(jù)散列化,方便 RowKey 查詢和比較,我們對(duì) check_id 采用 SHA1 散列化,并使之 32 位定長化。
唯一性
以上 salt+hash 作為 RowKey 前綴,加上 check 表的主鍵 id 來保障 RowKey 唯一性。綜上,check 表的 RowKey 設(shè)計(jì)如下:(check_id=A208849559)

為增強(qiáng)可讀性,中間還可以加上自定義的分割符,如’+’,’|’等。
以上設(shè)計(jì)能保證對(duì)每次查詢而言,其 salt+hash 前綴值是確定的,并且落在同一個(gè) region 中。需要說明的是 HBase 中 check 表的各列同數(shù)據(jù)源 Oracle 中 check 表的各列存儲(chǔ)。
WEB 查詢?cè)O(shè)計(jì)
RowKey 設(shè)計(jì)與查詢息息相關(guān),查詢方式?jīng)Q定 RowKey 設(shè)計(jì),反之基于以上 RowKey 設(shè)計(jì),查詢時(shí)通過設(shè)置 Scan 的 [startRow,stopRow], 即可完成掃描。以查詢 check_id=A208849559 為例,根據(jù) RowKey 的設(shè)計(jì)原則,對(duì)其進(jìn)行 salt+hash 計(jì)算,得前綴。

代碼實(shí)現(xiàn)關(guān)鍵流程
Spark write to HBase
Step0: prepare work
因?yàn)槭菑纳嫌蜗到y(tǒng)承接的業(yè)務(wù)數(shù)據(jù),存量數(shù)據(jù)采用 sqoop 抽到 hdfs;增量數(shù)據(jù)每日以文件的形式從 ftp 站點(diǎn)獲取。因?yàn)闃I(yè)務(wù)數(shù)據(jù)字段中包含一些換行符,且 sqoop1.4.6 目前只支持單字節(jié),所以本文選擇’0x01’作為列分隔符,’0x10’作為行分隔符。
Step1: Spark read hdfs text file

SparkContext.textfile() 默認(rèn)行分隔符為”n”,此處我們用“0x10”,需要在 Configuration 中配置。應(yīng)用配置,我們調(diào)用 newAPIHadoopFile 方法來讀取 hdfs 文件,返回 JavaPairRDD ,其中 LongWritable 和 Text 分別為 Hadoop 中的 Long 類型和 String 類型(所有 Hadoop 數(shù)據(jù)類型和 java 的數(shù)據(jù)類型都很相像,除了它們是針對(duì)網(wǎng)絡(luò)序列化而做的特殊優(yōu)化)。我們需要的數(shù)據(jù)文件放在 pairRDD 的 value 中,即 Text 指代。為后續(xù)處理方便,可將 JavaPairRDD 轉(zhuǎn)換為 JavaRDD< String >。
Step2: Transfer and sort RDD
1)將 avaRDD< String>轉(zhuǎn)換成 JavaPairRDD,其中參數(shù)依次表示為,RowKey,col,value。做這樣轉(zhuǎn)換是因?yàn)? HBase 的基本原理是基于 RowKey 排序的,并且當(dāng)采用 bulk load 方式將數(shù)據(jù)寫入多個(gè)預(yù)分區(qū)(region)時(shí),要求 Spark 各 partition 的數(shù)據(jù)是有序的,RowKey,column family(cf),col name 均需要有序。在本案例中因?yàn)橹挥幸粋€(gè)列簇,所以將 RowKey 和 col name 組織出來為 Tuple2 格式的 key。請(qǐng)注意原本數(shù)據(jù)庫中的一行記錄(n 個(gè)字段),此時(shí)會(huì)被拆成 n 行。
2)基于 JavaPairRDD進(jìn)行 RowKey,col 的二次排序。如果不做排序,會(huì)報(bào)以下異常:
3)將數(shù)據(jù)組織成 HFile 要求的 JavaPairRDD hfileRDD。
Step3:create hfile and bulk load to HBase
1)主要調(diào)用 saveAsNewAPIHadoopFile 方法:

2)hfilebulk load to HBase

注:如果集群開啟了 kerberos,step4 需要放置在 ugi.doAs()方法中,在進(jìn)行如下驗(yàn)證后實(shí)現(xiàn)

訪問 HBase 集群的 60010 端口 web,可以看到 region 分布情況。

Read from HBase
本文基于 spring boot 框架來開發(fā) web 端訪問 HBase 內(nèi)數(shù)據(jù)。
use connection pool(使用連接池)
創(chuàng)建連接是一個(gè)比較重的操作,在實(shí)際 HBase 工程中,我們引入連接池來共享 zk 連接,meta 信息緩存,region server 和 master 的連接。

也可以通過以下方法,覆蓋默認(rèn)線程池。

process query
Step1: 根據(jù)查詢條件,確定 RowKey 前綴
根據(jù) 3.3 RowKey 設(shè)計(jì)介紹,HBase 的寫和讀都遵循該設(shè)計(jì)規(guī)則。此處我們采用相同的方法,將 web 調(diào)用方傳入的查詢條件,轉(zhuǎn)化成對(duì)應(yīng)的 RowKey 前綴。例如,查詢 check 表傳遞過來的 check_id=A208849559,生成前綴 7+7c9498b4a83974da56b252122b9752bf。
Step2:確定 scan 范圍
A208849559 對(duì)應(yīng)的查詢結(jié)果數(shù)據(jù)即在 RowKey 前綴為 7+7c9498b4a83974da56b252122b9752bf 對(duì)應(yīng)的 RowKey 及 value 中。

Step3:查詢結(jié)果組成返回對(duì)象
遍歷 ResultScanner 對(duì)象,將每一行對(duì)應(yīng)的數(shù)據(jù)封裝成 table entity,組成 list 返回。
測試
從原始數(shù)據(jù)中隨機(jī)抓取 1000 個(gè) check_id,用于模擬測試,連續(xù)發(fā)起 3 次請(qǐng)求數(shù)為 2000(200 個(gè)線程并發(fā),循環(huán) 10 次),平均響應(yīng)時(shí)間為 51ms,錯(cuò)誤率為 0。


如上圖,經(jīng)歷 N 次累計(jì)測試后,各個(gè) region 上的 Requests 數(shù)較為接近,符合負(fù)載均衡設(shè)計(jì)之初。
踩坑記錄
1、kerberos 認(rèn)證問題
如果集群開啟了安全認(rèn)證,那么在進(jìn)行 Spark 提交作業(yè)以及訪問 HBase 時(shí),均需要進(jìn)行 kerberos 認(rèn)證。
本文采用 yarn cluster 模式,像提交普通作業(yè)一樣,可能會(huì)報(bào)以下錯(cuò)誤。

定位到 HbaseKerberos.java:18,代碼如下:

這是因?yàn)?executor 在進(jìn)行 HBase 連接時(shí),需要重新認(rèn)證,通過 --keytab 上傳的 tina.keytab 并未被 HBase 認(rèn)證程序塊獲取到,所以認(rèn)證的 keytab 文件需要另外通過 --files 上傳。示意如下

其中 tina.keytab.hbase 是將 tina.keytab 復(fù)制并重命名而得。因?yàn)?Spark 不允許同一個(gè)文件重復(fù)上傳。
2、序列化

解決方法一:
如果 sc 作為類的成員變量,在方法中被引用,則加 transient 關(guān)鍵字,使其不被序列化。
解決方法二:
將 sc 作為方法參數(shù)傳遞,同時(shí)使涉及 RDD 操作的類 implements Serializable。 代碼中采用第二種方法。詳見代碼。
3、批量請(qǐng)求測試

或者

查看下面 issue 以及一次排查問題的過程,可能是 open file 超過限制。
- https://github.com/spring-projects/spring-boot/issues/1106
- http://mp.weixin.qq.com/s/34GVlaYDOdY1OQ9eZs-iXg
使用 ulimit-a 查看每個(gè)用戶默認(rèn)打開的文件數(shù)為 1024。
在系統(tǒng)文件 /etc/security/limits.conf 中修改這個(gè)數(shù)量限制,在文件中加入以下內(nèi)容, 即可解決問題。
- soft nofile 65536
- hard nofile 65536