Hive 如何快速拉取大批量數(shù)據(jù)
用hive來(lái)做數(shù)倉(cāng)類操作,或者大數(shù)據(jù)的運(yùn)算,是沒(méi)有疑問(wèn)的,至少在你沒(méi)有更多選擇之前。
當(dāng)我們要hive來(lái)做類似于大批量數(shù)據(jù)的select時(shí),也許問(wèn)題就會(huì)發(fā)生了變化。
1. 通用解決方案之分頁(yè)
首先,我們要基于一個(gè)事實(shí),就是沒(méi)有哪個(gè)數(shù)據(jù)庫(kù)可以無(wú)限制的提供我們select任意數(shù)據(jù)量的數(shù)據(jù)。比如常用的 mysql, oracle, 一般你select 10w左右的數(shù)據(jù)量時(shí)已經(jīng)非常厲害了。而我們的解決方法也比較簡(jiǎn)單,那就是分頁(yè)獲取,比如我一頁(yè)取1w條,直到取完為止。同樣,因?yàn)閔ive基于都支持sql92協(xié)議,所以你也可以同樣的方案去解決大數(shù)據(jù)量的問(wèn)題。
分頁(yè)的解決方案會(huì)有什么問(wèn)題?首先,我們要明白分頁(yè)是如何完成的,首先數(shù)據(jù)庫(kù)server會(huì)根據(jù)條件運(yùn)算出所有或部分符合條件的數(shù)據(jù)(取決是否有額外的排序),然后再根據(jù)分頁(yè)偏移信息,獲取相應(yīng)的數(shù)據(jù)。所以,一次次的分頁(yè),則必定涉及到一次次的數(shù)據(jù)運(yùn)算。這在小數(shù)據(jù)量的情況下是可以接受的,因?yàn)橛?jì)算機(jī)的高速運(yùn)轉(zhuǎn)能力。但是當(dāng)數(shù)據(jù)量大到一定程序時(shí),就不行了。比如我們停滯了許多年的大數(shù)據(jù)領(lǐng)域解決方案就是很好的證明。
本文基于hive處理數(shù)據(jù),也就是說(shuō)數(shù)據(jù)量自然也是大到了一定的級(jí)別,那么用分頁(yè)也許就不好解決問(wèn)題了。比如,單次地運(yùn)算也許就是3-5分鐘(基于分布式并行計(jì)算系統(tǒng)能力),當(dāng)你要select 100w數(shù)據(jù)時(shí),如果用一頁(yè)1w的運(yùn)算,那么就是100次來(lái)回,1次3-5分鐘,100次就是5-8小時(shí)的時(shí)間,這就完全jj了。誰(shuí)能等這么長(zhǎng)時(shí)間?這樣處理的最終結(jié)果就是,業(yè)務(wù)被砍掉,等著財(cái)務(wù)結(jié)賬了。
所以,我們得改變點(diǎn)什么!
2. 使用hive-jdbc
jdbc本身不算啥,只是一個(gè)連接協(xié)議。但它的好處在于,可以維持長(zhǎng)連接。這個(gè)連接有個(gè)好處,就是server可以隨時(shí)輸出數(shù)據(jù),而client端則可以隨時(shí)處理數(shù)據(jù)。這就給了我們一個(gè)機(jī)會(huì),即比如100w的數(shù)據(jù)運(yùn)算好之后,server只需源源不斷的輸出結(jié)果,而client端則源源不斷地接收處理數(shù)據(jù)。
所以,我們解決方案是,基于hive-jdbc, 不使用分頁(yè),而全量獲取數(shù)據(jù)即可。這給我們帶來(lái)莫大的好處,即一次運(yùn)算即可。比如1次運(yùn)算3-5分鐘,那么總共的運(yùn)算也就是3-5分鐘。
看起來(lái)不錯(cuò),解決了重復(fù)運(yùn)算的問(wèn)題。好似萬(wàn)事大吉了。
具體實(shí)現(xiàn)就是引入幾個(gè)hive-jdbc的依賴,然后提交查詢,依次獲取結(jié)果即可。樣例如下:
- <!-- pom 依賴 -->
- <!-- https://mvnrepository.com/artifact/org.apache.hive/hive-jdbc -->
- <dependency>
- <groupId>org.apache.hive</groupId>
- <artifactId>hive-jdbc</artifactId>
- <version>2.3.4</version>
- </dependency>
- --
- // 測(cè)試hive-jdbc
- import java.sql.Connection;
- import java.sql.PreparedStatement;
- import java.sql.ResultSet;
- import java.sql.SQLException;
- import java.sql.Statement;
- import java.sql.DriverManager;
- public class HiveJdbcTest {
- private static Connection conn = getConnnection();
- private static PreparedStatement ps;
- private static ResultSet rs;
- // 獲取所有數(shù)據(jù)
- public static void getAll(String tablename) {
- String sql="select * from " + tablename;
- System.out.println(sql);
- try {
- ps = prepare(conn, sql);
- rs = ps.executeQuery();
- int columns = rs.getMetaData().getColumnCount();
- while(rs.next()) {
- for(int i=1;i<=columns;i++) {
- System.out.print(rs.getString(i));
- System.out.print("\t\t");
- }
- System.out.println();
- }
- }
- catch (SQLException e) {
- e.printStackTrace();
- }
- }
- // 測(cè)試
- public static void main(String[] args) {
- String tablename="t1";
- HiveJdbcTest.getAll(tablename);
- }
- private static String driverName = "org.apache.hive.jdbc.HiveDriver";
- private static String url = "jdbc:hive2://127.0.0.1:10000/";
- private static Connection conn;
- // 連接hive庫(kù)
- public static Connection getConnnection() {
- try {
- Class.forName(driverName);
- conn = DriverManager.getConnection(url, "hive", "123");
- }
- catch(ClassNotFoundException e) {
- e.printStackTrace();
- }
- catch (SQLException e) {
- e.printStackTrace();
- }
- return conn;
- }
- public static PreparedStatement prepare(Connection conn, String sql) {
- PreparedStatement ps = null;
- try {
- ps = conn.prepareStatement(sql);
- }
- catch (SQLException e) {
- e.printStackTrace();
- }
- return ps;
- }
- }
樣例代碼,無(wú)需糾結(jié)。簡(jiǎn)單的jdbc操作樣板。總體來(lái)說(shuō)就是,不帶分頁(yè)的接收全量數(shù)據(jù)。
但是,這個(gè)會(huì)有什么問(wèn)題?同樣,小數(shù)據(jù)量時(shí)無(wú)任何疑問(wèn),但當(dāng)數(shù)據(jù)量足夠大時(shí),每一次的數(shù)據(jù)接收,都需要一次網(wǎng)絡(luò)通信請(qǐng)示,且都是單線程的。我們假設(shè)接受一條數(shù)據(jù)花費(fèi)1ms, 那么接收1000條數(shù)就是1s, 6k條數(shù)據(jù)就是1min。36w條數(shù)據(jù)就是1h, 額,后面就無(wú)需再算了。同樣是不可估量的時(shí)間消耗。(實(shí)際情況也許會(huì)好點(diǎn),因?yàn)闀?huì)有buffer緩沖的存在)
為什么會(huì)這樣呢?運(yùn)算量已經(jīng)減小了,但是這網(wǎng)絡(luò)通信量,我們又能如何?實(shí)際上,問(wèn)題不在于網(wǎng)絡(luò)通信問(wèn)題,而在于我們使用這種方式,使我們從并行計(jì)算轉(zhuǎn)到了串行計(jì)算的過(guò)程了。因?yàn)橹挥袉吸c(diǎn)的數(shù)據(jù)接收,所以只能將數(shù)據(jù)匯集處理。從而就是一個(gè)串行化的東西了。
所以,我們更多應(yīng)該從并行這一層面去解決問(wèn)題。
3. 基于臨時(shí)表實(shí)現(xiàn)
要解決并行變串行的問(wèn)題,最根本的辦法就是避免一條條讀取數(shù)據(jù)。而要避免這個(gè)問(wèn)題,一個(gè)很好想到的辦法就是使用臨時(shí)表,繞開(kāi)自己代碼的限制。讓大數(shù)據(jù)集群自行處理并行計(jì)算問(wèn)題,這是個(gè)不錯(cuò)的想法。
但具體如何做呢?我們面臨至少這么幾個(gè)問(wèn)題:
- 如何將數(shù)據(jù)寫入臨時(shí)表?
- 寫入臨時(shí)表的數(shù)據(jù)如何取回?是否存在瓶頸問(wèn)題?
- 臨時(shí)表后續(xù)如何處理?
我們一個(gè)個(gè)問(wèn)題來(lái),第1個(gè),如何寫臨時(shí)表問(wèn)題:我們可以選擇先創(chuàng)建一個(gè)臨時(shí)表,然后再使用insert into select ... from ... 的方式寫入,但這種方式非常費(fèi)力,首先你得固化下臨時(shí)表的數(shù)據(jù)結(jié)構(gòu),其次你要處理多次寫入問(wèn)題??雌饋?lái)不是最好的辦法。幸好,hive中或者相關(guān)數(shù)據(jù)庫(kù)產(chǎn)品都提供了另一種更方便的建臨時(shí)表的方法: create table xxx as select ... from ... 你只需要使用一個(gè)語(yǔ)句就可以將結(jié)果寫入到臨時(shí)表了。但需要注意的是,我們創(chuàng)建時(shí),需要指定好我們需要的格式,否則最終結(jié)果也許不是我們想要的,比如我們需要使用','分隔數(shù)據(jù)而非tab, 我們需要使用 text 形式的數(shù)據(jù),而非壓縮的二進(jìn)制格式。
以下是個(gè)使用樣例:
- -- 外部使用 create table 包裹
- CREATE TABLE tmp_2020110145409001
- ROW FORMAT DELIMITED
- FIELDS TERMINATED BY ','
- STORED AS TEXTFILE as
- -- 具體的業(yè)務(wù)select sql
- select t1.*, t2.* from test t1 left join test2 t2 on t1.id = t2.t_id
- ;
如此,我們就得到所需的結(jié)果了。以上結(jié)果,在hive中表現(xiàn)為一個(gè)臨時(shí)表。而其背后則是一個(gè)個(gè)切分的文件,以','號(hào)分隔的文本文件,且會(huì)按照hive的默認(rèn)存儲(chǔ)目錄存放。(更多具體語(yǔ)法請(qǐng)查詢官網(wǎng)資料)
接下來(lái),我們要解決第2個(gè)問(wèn)題:如何將數(shù)據(jù)取回?這個(gè)問(wèn)題也不難,首先,現(xiàn)在結(jié)果已經(jīng)有了,我們可以一行行地讀取返回,就像前面一樣。但這時(shí)已經(jīng)沒(méi)有了數(shù)據(jù)運(yùn)算,應(yīng)該會(huì)好很多。但明顯還是不夠好,我們?nèi)匀恍枰磸?fù)的網(wǎng)絡(luò)通信。我們知道,hive存儲(chǔ)的背后,是一個(gè)個(gè)切分的文件,如果我們能夠?qū)⒃撐募苯酉螺d下來(lái),那將會(huì)是非常棒的事。不錯(cuò),最好的辦法就是,直接下載hive的數(shù)據(jù)文件,hive會(huì)存儲(chǔ)目錄下,以類似于 part_0000, part_0001... 之類的文件存放。
那么,我們?nèi)绾尾拍芟螺d到這些文件呢?hive是基于hadoop的,所以,很明顯我們要回到這個(gè)問(wèn)題,基于hadoop去獲取這些文件。即 hdfs 獲取,命令如下:
- // 查看所有分片數(shù)據(jù)文件列表
- hdfs dfs -ls hdfs://xx/hive/mydb.db/*
- // 下載所有數(shù)據(jù)文件到 /tmp/local_hdfs 目錄
- hdfs dfs -get hdfs://xx/hive/mydb.db/* /tmp/local_hdfs
我們可以通過(guò)以上命令,將數(shù)據(jù)文件下載到本地,也可以hdfs的jar包,使用 hdfs-client 進(jìn)行下載。優(yōu)缺點(diǎn)是:使用cli的方式簡(jiǎn)單穩(wěn)定但依賴于服務(wù)器環(huán)境,而使用jar包的方式則部署方便但需要自己寫更多代碼保證穩(wěn)定性。各自選擇即可。
最后,我們還剩下1個(gè)問(wèn)題:如何處理臨時(shí)表的問(wèn)題?hive目前尚不支持設(shè)置表的生命周期(阿里云的maxcompute則只是一個(gè) lifecycle 選項(xiàng)的問(wèn)題),所以,需要自行清理文件。這個(gè)問(wèn)題的實(shí)現(xiàn)方式很多,比如你可以自行記錄這些臨時(shí)表的創(chuàng)建時(shí)間、位置、過(guò)期時(shí)間,然后再每天運(yùn)行腳本清理表即可。再簡(jiǎn)單點(diǎn)就是你可以直接通過(guò)表名進(jìn)行清理,比如你以年月日作為命令開(kāi)頭,那么你可以根據(jù)這日期刪除臨時(shí)表即可。如:
- -- 列舉表名
- show tables like 'dbname.tmp_20201101*';
- -- 刪除具體表名
- drop table dbname.tmp_2020110100001 ;
至此,我們的所有問(wèn)題已解決??偨Y(jié)下:首先使用臨時(shí)表并行地將結(jié)果寫入;其次通過(guò)hdfs將文件快速下載到本地即可;最后需要定時(shí)清理臨時(shí)表;這樣,你就可以高效,無(wú)限制的為用戶拉取大批量數(shù)據(jù)了。
不過(guò)需要注意的是,我們的步驟從1個(gè)步驟變成了3個(gè)步驟,增加了復(fù)雜度。(實(shí)際上你可能還會(huì)處理更多的問(wèn)題,比如元數(shù)據(jù)信息的對(duì)應(yīng)問(wèn)題)復(fù)雜度增加的最大問(wèn)題就在于,它會(huì)帶來(lái)更多的問(wèn)題,所以我們一定要善于處理好這些問(wèn)題,否則將會(huì)帶來(lái)一副作用。