一種基于布隆過濾器的大表計(jì)算優(yōu)化方法
問題背景
在大數(shù)據(jù)行業(yè)內(nèi),尤其是數(shù)倉建設(shè)中,一直有一個(gè)繞不開的難題,就是大表的分析計(jì)算(這里的大表指億級以上)。特別是大表之間的 Join 分析,對任何公司數(shù)據(jù)部門都是一個(gè)挑戰(zhàn)!
主要有以下挑戰(zhàn):
- 由于數(shù)據(jù)量大,分析計(jì)算時(shí)會耗費(fèi)更多 CPU、內(nèi)存和 IO,占用大量的集群資源。
- 由于數(shù)據(jù)量大,分析計(jì)算過程緩慢,擠占其它任務(wù)資源使用,從而影響數(shù)倉整體任務(wù)產(chǎn)出時(shí)間。
- 由于數(shù)據(jù)量大,長時(shí)間占用資源,會造成該任務(wù)在時(shí)間、資源和財(cái)務(wù)各方面成本巨大。
當(dāng)前業(yè)內(nèi)流行的優(yōu)化方案
1.增加集群資源
優(yōu)點(diǎn):簡單粗暴,對業(yè)務(wù)和數(shù)據(jù)開發(fā)人員友好,不用調(diào)整。
缺點(diǎn):費(fèi)錢,看你公司是否有錢。
2.采用增量計(jì)算
優(yōu)點(diǎn):可以在不大幅增加計(jì)算集群成本的情況下,完成日常計(jì)算任務(wù)。
缺點(diǎn):對數(shù)據(jù)和業(yè)務(wù)都有一定要求,數(shù)據(jù)一般要求是日志類數(shù)據(jù)。或者具有一定的生命周期數(shù)據(jù)(歷史數(shù)據(jù)可歸檔)。
問題場景和 Spark 算法分析
Spark 經(jīng)典算法 SortMergeJoin(以大表間的 Join 分析為例)。
- 對兩張表分別進(jìn)行 Shuffle 重分區(qū),之后將相同Key的記錄分到對應(yīng)分區(qū),每個(gè)分區(qū)內(nèi)的數(shù)據(jù)在 Join 之前都要進(jìn)行排序,這一步對應(yīng) Exchange 節(jié)點(diǎn)和 Sort 節(jié)點(diǎn)。也就是 Spark 的 Sort Merge Shuffle 過程。
- 遍歷流式表,對每條記錄都采用順序查找的方式從查找表中搜索,每遇到一條相同的 Key 就進(jìn)行 Join 關(guān)聯(lián)。每次處理完一條記錄,只需從上一次結(jié)束的位置開始繼續(xù)查找。
該算法也可以簡化流程為: Map 一> Shuffle 一> Sort 一> Merge 一> Reduce
該算法的性能瓶頸主要在 Sort Merge Shuffle 階段(紅色流程部分),數(shù)據(jù)量越大,資源要求越高,性能越低。
大表問題思考
大數(shù)據(jù)計(jì)算優(yōu)化思路,核心無非就三條:增加計(jì)算資源;減少被計(jì)算數(shù)據(jù)量;優(yōu)化計(jì)算算法。其中前兩條是我們普通人最常用的方法。
兩個(gè)大表的 Join ,是不是真的每天都有大量的數(shù)據(jù)有變更呢?如果是的話,那我們的業(yè)務(wù)就應(yīng)該思考一下是否合理了。
其實(shí)在我們的日常實(shí)踐場景中,大部分是兩個(gè)表里面的數(shù)據(jù)每天只有少量(十萬百萬至千萬級)數(shù)據(jù)隨機(jī)變化,大部分?jǐn)?shù)據(jù)是不變的。
說到這里,很多人的第一想法是,我們增加分區(qū),按數(shù)據(jù)是否有變化進(jìn)行區(qū)分,計(jì)算有變化的(今日有更新的業(yè)務(wù)數(shù)據(jù)),合并未變化的(昨日計(jì)算完成的歷史數(shù)據(jù)),不就可以解決問題了。其實(shí)這個(gè)想法存在以下問題:
- 由于每個(gè)表的數(shù)據(jù)是隨機(jī)變化的,那就存在,第一個(gè)表中變化的數(shù)據(jù)在第二個(gè)表中是未變的,反之亦然(見圖片示例)。并且可能后續(xù)計(jì)算還有第三個(gè)表、第四個(gè)表等等呢?這種分區(qū)是難以構(gòu)建的。
- 變化的數(shù)據(jù)如果是百萬至千萬級,那這里也是一個(gè)較大規(guī)模的數(shù)據(jù)量了,既要關(guān)聯(lián)計(jì)算變化的,也要關(guān)聯(lián)計(jì)算未變化的,這里的計(jì)算成本也很大。
圖片
問題讀到這里,如果我們分別把表 A、表 B 的有變化記錄的關(guān)聯(lián)主鍵取出來合并在一起,形成一個(gè)數(shù)組變量。計(jì)算的時(shí)候用這個(gè)變量分別從表 A 和表 B 中過濾出有變化的數(shù)據(jù)進(jìn)行計(jì)算,并從未變化的表(昨日計(jì)算完成的歷史數(shù)據(jù))中過濾出不存在的(即未變化歷史結(jié)果數(shù)據(jù))。這樣兩份數(shù)據(jù)簡單合并到一起,不就是表 A 和表 B 全量 Join 計(jì)算的結(jié)果了嗎!
那什么樣的數(shù)組可以輕易的存下這百萬千萬級的數(shù)據(jù)量呢?我們第一個(gè)想到的答案: 布隆過濾器!
使用布隆過濾器的優(yōu)化方案
- 構(gòu)建布隆過濾器:分別讀取表 A 和表 B 中有變化的數(shù)據(jù)的關(guān)聯(lián)主鍵。
- 使用布隆過濾器:分別過濾表 A 和表 B 中的數(shù)據(jù)(即關(guān)聯(lián)主鍵命中布隆過濾器),然后進(jìn)行 join 分析。
- 使用布隆過濾器:從未變化的表(昨日計(jì)算完成的歷史數(shù)據(jù))中過濾出數(shù)據(jù)(即沒有命中布隆過濾器)。
- 合并 2、 3 步驟的數(shù)據(jù)結(jié)果。
也許這里有人會有疑惑,不是說布隆過濾器是命中并不代表一定存在,不命中才代表一定不存在!其實(shí)這個(gè)命中不代表一定存在,是一個(gè)極少量概率問題,即極少量沒有更新的數(shù)據(jù)也會命中布隆過濾器,從而參與了接下來的數(shù)據(jù)計(jì)算,實(shí)際上只要所有變化的數(shù)據(jù)能命中即可。這個(gè)不影響它已經(jīng)幫我買過濾了絕大部分不需要計(jì)算的數(shù)據(jù)。
回看我們的 Spark 經(jīng)典算法 SortMergeJoin,我們可以看出,該方案是在 Map 階段就過濾了數(shù)據(jù),大大減少了數(shù)據(jù)量的,提升了計(jì)算效率,減少了計(jì)算資源使用!
Spark 函數(shù) Java 代碼實(shí)現(xiàn)
大家可以根據(jù)需要參考、修改和優(yōu)化,有更好的實(shí)現(xiàn)方式歡迎大家分享交流。
程序流程圖
圖片
Spark 函數(shù) Java 代碼實(shí)現(xiàn)。
package org.example;
import org.apache.curator.shaded.com.google.common.hash.BloomFilter;
import org.apache.curator.shaded.com.google.common.hash.Funnels;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.spark.sql.api.java.*;
import org.apache.spark.SparkConf;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.lucene.util.RamUsageEstimator;
/**
* add by chengwansheng
*/
class MyBloomFilter {
private BloomFilter bloomFilter;
public MyBloomFilter(BloomFilter b) {
bloomFilter = b;
}
public BloomFilter getBloomFilter() {
return bloomFilter;
}
}
public class BloomUdf implements UDF2<Object, String, Boolean> {
//最大記錄限制,安全起見
private static int maxSize = 50000000;
//布隆過濾器是否開啟配置, 1 開啟,0 關(guān)閉
private static int udfBloomFilterEnable;
//布隆過濾器是否開啟參數(shù),默認(rèn)開啟
private static String bloomFilterConfKey = "spark.myudf.bloom.enable";
//加配置配置參數(shù),目前不起作用??
static {
SparkConf sparkConf = new SparkConf();
udfBloomFilterEnable = sparkConf.getInt(bloomFilterConfKey, 1);
System.out.println("the spark.myudf.bloom.enable value " + udfBloomFilterEnable);
}
//布隆過濾器列表,支持多個(gè)布隆過濾器
private static ConcurrentHashMap<String, MyBloomFilter> bloomFilterMap = new ConcurrentHashMap<>();
/**
* 布隆過濾器核心構(gòu)建方法
* 通過讀取表的 hdfs 文件信息,構(gòu)建布隆過濾器
* 一個(gè) jvm 只加載一次
* @param key
* @param path
* @throws IOException
*/
private synchronized static void buildBloomFilter(String key, String path) throws IOException {
if (!bloomFilterMap.containsKey(key)) {
BloomFilter bloomFilter;
Configuration cnotallow=new Configuration();
FileSystem hdfs=FileSystem.get(conf);
Path pathDf=new Path(path);
FileStatus[] stats=hdfs.listStatus(pathDf);
//獲取記錄總數(shù)
long sum = 0;
for (int i=0; i<stats.length; i++){
InputStream inputStream=hdfs.open(stats[i].getPath());
InputStreamReader inputStreamReader= new InputStreamReader(inputStream);
BufferedReader reader=new BufferedReader(inputStreamReader);
sum = sum + reader.lines().count();
}
if(sum > maxSize) {
//如果數(shù)據(jù)量大于期望值,則將布隆過濾器置空(即布隆過濾器不起作用)
System.out.println("the max number is " + maxSize + ", but target num is too big, the " + key + " bloom will be invalid");
bloomFilter = null;
} else {
//默認(rèn) 1000 W,超過取樣本數(shù)據(jù) 2 倍的量。這里取 2 倍是為了提高布隆過濾器的效果, 2 倍是一個(gè)比較合適的值
long exceptSize = sum*2>10000000?sum*2:10000000;
bloomFilter = BloomFilter.create(Funnels.stringFunnel(StandardCharsets.UTF_8), (int) exceptSize);
for (int i=0; i<stats.length; i++){
//打印每個(gè)文件路徑
System.out.println(stats[i].getPath().toString());
//讀取每個(gè)文件
InputStream inputStream=hdfs.open(stats[i].getPath());
InputStreamReader inputStreamReader= new InputStreamReader(inputStream);
BufferedReader reader=new BufferedReader(inputStreamReader);
String line="";
while((line=reader.readLine())!=null){
bloomFilter.put(line);
}
}
}
MyBloomFilter myBloomFilter = new MyBloomFilter(bloomFilter);
bloomFilterMap.put(key, myBloomFilter);
System.out.println("the bloom " + key + " size is " + RamUsageEstimator.humanSizeOf(bloomFilter) + " num " + sum);
}
}
/**
* 核心調(diào)用方法
* 參數(shù) s :被過濾的參數(shù)
* 參數(shù) key :需要構(gòu)建的布隆過濾器,此處是庫名 + 表名稱,即 db_name.table_name
* @param s
* @param key
* @return
* @throws Exception
*/
@Override
public Boolean call(Object s, String key) throws Exception {
//如果 spark.myudf.bloom.enable 參數(shù)配置為 0 ,則布隆過濾器失效,直接返回 true
if (udfBloomFilterEnable == 0) {
return true;
}
if (!bloomFilterMap.containsKey(key)) {
String[] table_array = key.split("\\.");
if (table_array.length != 2) {
String msg = "the key is invalid: " + key + ", must like db_name.table_name";
System.out.println(msg);
throw new IOException(msg);
}
String dbName = table_array[0];
String tableName = table_array[1];
String path = "/hive/" + dbName + ".db/" + tableName;
System.out.println(path);
//構(gòu)建布隆過濾器
buildBloomFilter(key, path);
}
if (!bloomFilterMap.containsKey(key)) {
String msg = "not found bloom filter " + key;
System.out.println(msg);
throw new IOException(msg);
}
BloomFilter bloomFilter = bloomFilterMap.get(key).getBloomFilter();
if (bloomFilter == null) {
//如果數(shù)據(jù)量大于期望值,則直接返回真,即布隆過濾器不起作用
return true;
} else {
return bloomFilter.mightContain(String.valueOf(s));
}
}
}
使用示例演示
表信息和數(shù)據(jù)準(zhǔn)備。
--建表數(shù)據(jù)
create table default.A (
item_id bigint comment '商品ID',
item_name string comment '商品名稱',
item_price bigint comment '商品價(jià)格',
create_time timestamp comment '創(chuàng)建時(shí)間',
update_time timestamp comment '創(chuàng)建時(shí)間'
)
create table default.B (
item_id bigint comment '商品ID',
sku_id bigint comment 'skuID',
sku_price bigint comment '商品價(jià)格',
create_time timestamp comment '創(chuàng)建時(shí)間',
update_time timestamp comment '創(chuàng)建時(shí)間'
)
create table default.ot (
item_id bigint comment '商品ID',
sku_id bigint comment 'skuID',
sku_price bigint comment '商品價(jià)格',
item_price bigint comment '商品價(jià)格'
) PARTITIONED BY (pt string COMMENT '分區(qū)字段')
--準(zhǔn)備數(shù)據(jù)
insert overwrite table default.A
values
(1,'測試1',101,'2023-03-25 08:00:00','2023-03-25 08:00:00'),
(2,'測試2',102,'2023-03-25 08:00:00','2023-03-25 08:00:00'),
(3,'測試2',103,'2023-03-25 08:00:00','2023-03-25 08:00:00'),
(4,'測試2',104,'2023-03-25 08:00:00','2023-04-22 08:00:00'),
(5,'測試2',105,'2023-03-25 08:00:00','2023-04-22 08:00:00');
insert overwrite table default.B
values
(1,11,201,'2023-03-25 08:00:00','2023-03-25 08:00:00'),
(1,12,202,'2023-03-25 08:00:00','2023-03-25 08:00:00'),
(1,13,203,'2023-04-22 08:00:00','2023-04-22 08:00:00'),
(2,21,211,'2023-03-25 08:00:00','2023-03-25 08:00:00'),
(2,22,212,'2023-03-25 08:00:00','2023-03-25 08:00:00'),
(4,42,212,'2023-03-25 08:00:00','2023-03-25 08:00:00'),
(5,51,251,'2023-03-25 08:00:00','2023-03-25 08:00:00'),
(5,52,252,'2023-04-22 08:00:00','2023-04-22 08:00:00'),
(5,53,253,'2023-04-22 08:00:00','2023-04-22 08:00:00');
insert overwrite table default.ot partition(pt='20230421')
values
(1,11,201,101),
(1,12,202,101),
(2,21,211,102),
(2,22,212,102),
(4,42,212,114),
(5,51,251,110);
原來處理的 SQL 語句。
insert overwrite table default.ot partition(pt='20230422')
select B.item_id
,B.sku_id
,B.sku_price
,A.item_price
from B
left join A on(A.item_id=B.item_id)
使用布隆過濾器的 SQL(Java 函數(shù)導(dǎo)入 Spark,函數(shù)名為 “bloom_filter”)。
--構(gòu)建布隆過濾器
drop table if exists tmp.tmp_primary_key;
create table tmp.tmp_primary_key stored as TEXTFILE as
select item_id
from (
select item_id
from default.A
where update_time>='2023-04-22'
union all
select item_id
from default.B
where update_time>='2023-04-22'
) where length(item_id)>0
group by item_id;
--增量數(shù)據(jù)計(jì)算
insert overwrite table default.ot partition(pt='20230422')
select B.item_id
,B.sku_id
,B.sku_price
,A.item_price
from default.B
left join default.A on(A.item_id=B.item_id and bloom_filter(A.item_id, "tmp.tmp_primary_key"))
where bloom_filter(B.item_id, "tmp.tmp_primary_key")
union all
--合并歷史未變更數(shù)據(jù)
select item_id
,sku_id
,sku_price
,item_price
from default.ot
where not bloom_filter(item_id, "tmp.tmp_primary_key")
and pt='20230421'
從上面代碼可以看出,使用布隆過濾器的 SQL,核心業(yè)務(wù)邏輯代碼只是在原來全量計(jì)算的邏輯中增加了過濾條件而已,使用起來還是比較方便的。
實(shí)測效果
以我司的 “dim.dim_itm_sku_info_detail_d” 和 “dim.dim_itm_info_detail_d” 任務(wù)為例,使用引擎 Spark2。
圖片
總結(jié)
從理論分析和實(shí)測效果來看,使用布隆過濾器的解決方案可以大幅提升任務(wù)的性能,并減少集群資源的使用。
該方案不僅適用大表間 Join 分析計(jì)算,也適用大表相關(guān)的其它分析計(jì)算需求,核心思想就是計(jì)算有必要的數(shù)據(jù),排除沒必要數(shù)據(jù),減小無效的計(jì)算損耗。