Flink常見維表Join方案,收藏學(xué)習(xí)開發(fā)很有用!
本文轉(zhuǎn)載自微信公眾號(hào)「大數(shù)據(jù)左右手」,作者左右 。轉(zhuǎn)載本文請(qǐng)聯(lián)系大數(shù)據(jù)左右手公眾號(hào)。
前言
實(shí)時(shí)數(shù)倉(cāng),難免會(huì)遇到j(luò)oin維表的業(yè)務(wù)?,F(xiàn)總結(jié)幾種方案,供各位看官選擇:
- 查找關(guān)聯(lián)(同步,異步)
- 狀態(tài)編程,預(yù)加載數(shù)據(jù)到狀態(tài)中,按需取
- 冷熱數(shù)據(jù)
- 廣播維表
- Temporal Table Join
- Lookup Table Join
其中中間留下兩個(gè)問題,供大家思考,可留言一起討論?
查找關(guān)聯(lián)
查找關(guān)聯(lián)就是在主流數(shù)據(jù)中直接訪問外部數(shù)據(jù)(mysql,redis,impala ...)去根據(jù)主鍵或者某種關(guān)鍵條件去關(guān)聯(lián)取值。
適合: 維表數(shù)據(jù)量大,但是主數(shù)據(jù)不大的業(yè)務(wù)實(shí)時(shí)計(jì)算。
缺點(diǎn):數(shù)據(jù)量大的時(shí)候,會(huì)給外部數(shù)據(jù)源庫(kù)帶來(lái)很大的壓力,因?yàn)槟硹l數(shù)據(jù)都需要關(guān)聯(lián)。
同步
訪問數(shù)據(jù)庫(kù)是同步調(diào)用,導(dǎo)致 subtak 線程會(huì)被阻塞,影響吞吐量
- import com.alibaba.fastjson.{JSON, JSONArray, JSONObject}
- import com.wang.stream.env.{FlinkStreamEnv, KafkaSourceEnv}
- import org.apache.flink.api.common.functions.FlatMapFunction
- import org.apache.flink.api.common.serialization.SimpleStringSchema
- import org.apache.flink.streaming.api.scala._
- import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer
- import org.apache.flink.util.Collector
- def analyses(): Unit ={
- val env: StreamExecutionEnvironment = FlinkStreamEnv.get()
- KafkaSourceEnv.getKafkaSourceStream(env,List("test"))
- .map(JSON.parseObject(_))
- .filter(_!=null)
- .flatMap(
- new FlatMapFunction[JSONObject,String] {
- override def flatMap(jSONObject: JSONObject, collector: Collector[String]): Unit = {
- // 如果topic就一張表,不用區(qū)分,如果多張表,可以通過database 與 table 區(qū)分,放到下一步去處理
- // 表的名字
- val databaseName:String = jSONObject.getString("database")
- // 表的名字
- val tableName:String = jSONObject.getString("table")
- // 數(shù)據(jù)操作類型 INSERT UPDATE DELETE
- val operationType:String = jSONObject.getString("type")
- // 主體數(shù)據(jù)
- val tableData: JSONArray = jSONObject.getJSONArray("data")
- // old 值
- val old: JSONArray = jSONObject.getJSONArray("old")
- // canal json 可能存在批處理出現(xiàn)data數(shù)據(jù)多條
- for (i <- 0 until tableData.size()) {
- val data: String = tableData.get(i).toString
- val nObject: JSONObject = JSON.parseObject(data)
- val orderId: AnyRef = nObject.get("order_id")
- // 下面寫(mysql,redis或者h(yuǎn)base)的連接,利用api 通過orderId查找
- // 最后封裝數(shù)據(jù)格式 就是join所得
- collector.collect(null)
- }
- }
- }
- )
- .addSink(
- new FlinkKafkaProducer[String](
- "",
- "",
- new SimpleStringSchema()
- )
- )
- env.execute("")
異步
AsyncIO 可以并發(fā)地處理多個(gè)請(qǐng)求,很大程度上減少了對(duì) subtask 線程的阻塞。
- def analyses(): Unit ={
- val env: StreamExecutionEnvironment = FlinkStreamEnv.get()
- val source: DataStream[String] = KafkaSourceEnv.getKafkaSourceStream(env, List("test"))
- .map(JSON.parseObject(_))
- .filter(_ != null)
- .flatMap(
- new FlatMapFunction[JSONObject, String] {
- override def flatMap(jSONObject: JSONObject, collector: Collector[String]): Unit = {
- // 如果topic就一張表,不用區(qū)分,如果多張表,可以通過database 與 table 區(qū)分,放到下一步去處理
- // 表的名字
- val databaseName: String = jSONObject.getString("database")
- // 表的名字
- val tableName: String = jSONObject.getString("table")
- // 數(shù)據(jù)操作類型 INSERT UPDATE DELETE
- val operationType: String = jSONObject.getString("type")
- // 主體數(shù)據(jù)
- val tableData: JSONArray = jSONObject.getJSONArray("data")
- // old 值
- val old: JSONArray = jSONObject.getJSONArray("old")
- // canal json 可能存在批處理出現(xiàn)data數(shù)據(jù)多條
- for (i <- 0 until tableData.size()) {
- val data: String = tableData.get(i).toString
- collector.collect(data)
- }
- }
- }
- )
- AsyncDataStream.unorderedWait(
- source,
- new RichAsyncFunction[String,String] {//自定義的數(shù)據(jù)源異步處理類
- override def open(parameters: Configuration): Unit = {
- // 初始化
- }
- override def asyncInvoke(input: String, resultFuture: ResultFuture[String]): Unit = {
- // 將數(shù)據(jù)搜集
- resultFuture.complete(null)
- }
- override def close(): Unit = {
- // 關(guān)閉
- }
- },
- 1000,//異步超時(shí)時(shí)間
- TimeUnit.MILLISECONDS,//時(shí)間單位
- 100)//最大異步并發(fā)請(qǐng)求數(shù)量
- .addSink(
- new FlinkKafkaProducer[String](
- "",
- "",
- new SimpleStringSchema()
- )
- )
- env.execute("")
- }
狀態(tài)編程,預(yù)加載數(shù)據(jù)到狀態(tài)中,按需取
首先把維表數(shù)據(jù)初始化到state中,設(shè)置好更新時(shí)間,定時(shí)去把維表。
優(yōu)點(diǎn):flink 自己維護(hù)狀態(tài)數(shù)據(jù),"榮辱與共",不需要頻繁鏈接外部數(shù)據(jù)源,達(dá)到解耦。
缺點(diǎn):不適合大的維表和變化大的維表。
- .keyBy(_._1)
- .process(
- new KeyedProcessFunction[String,(String,String,String,String,String), String]{
- private var mapState:MapState[String,Map[String,String]] = _
- private var first: Boolean = true
- override def open(parameters: Configuration): Unit = {
- val config: StateTtlConfig = StateTtlConfig
- .newBuilder(org.apache.flink.api.common.time.Time.minutes(5))
- .setUpdateType(StateTtlConfig.UpdateType.OnReadAndWrite)
- .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
- .build()
- val join = new MapStateDescriptor[String,Map[String,String]]("join",classOf[String],classOf[Map[String,String]])
- join.enableTimeToLive(config)
- mapState = getRuntimeContext.getMapState(join)
- }
- override def processElement(
- in: (String, String, String, String, String),
- context: KeyedProcessFunction[String, (String, String, String, String, String),String]#Context,
- collector: Collector[String]): Unit = {
- // 加載維表
- if(first){
- first = false
- val time: Long = System.currentTimeMillis()
- getSmallDimTableInfo()
- // 設(shè)置好更新時(shí)間,定時(shí)去把維表
- context.timerService().registerProcessingTimeTimer(time + 86400000)
- }
- // 數(shù)據(jù)處理,過來(lái)一條條數(shù)據(jù),然后按照自己的業(yè)務(wù)邏輯去取維表的數(shù)據(jù)即可
- // 然后封裝 放到collect中
- collector.collect(null)
- }
- override def onTimer(
- timestamp: Long,
- ctx: KeyedProcessFunction[String, (String, String, String, String, String),String]#OnTimerContext,
- out: Collector[String]): Unit = {
- println("觸發(fā)器執(zhí)行")
- mapState.clear()
- getSmallDimTableInfo()
- println(mapState)
- ctx.timerService().registerProcessingTimeTimer(timestamp + 86400000)
- }
- def getSmallDimTableInfo(): Unit ={
- // 加載 字典數(shù)據(jù)
- val select_dictionary="select dic_code,pre_dictionary_id,dic_name from xxxx"
- val dictionary: util.List[util.Map[String, AnyRef]] = MysqlUtil.executeQuery(select_dictionary, null)
- dictionary.foreach(item=>{
- mapState.put("dic_dictionary_"+item.get("pre_dictionary_id").toString,item)
- })
- }
- }
- )
- .filter(_!=null)
- .addSink(
- new FlinkKafkaProducer[String](
- "",
- "",
- new SimpleStringSchema()
- )
- )
- v.execute("")
思考下:直接定義一個(gè)Map集合這樣的優(yōu)缺點(diǎn)是什么?可以留言說(shuō)出自己的看法?
冷熱數(shù)據(jù)
思想:先去狀態(tài)去取,如果沒有,去外部查詢,同時(shí)去存到狀態(tài)里面。StateTtlConfig 的過期時(shí)間可以設(shè)置短點(diǎn)。
優(yōu)點(diǎn):中庸取值方案,熱備常用數(shù)據(jù)到內(nèi)存,也避免了數(shù)據(jù)join相對(duì)過多外部數(shù)據(jù)源。
缺點(diǎn):也不能一勞永逸解決某些問題,熱備數(shù)據(jù)過多,或者冷數(shù)據(jù)過大,都會(huì)對(duì)state 或者 外部數(shù)據(jù)庫(kù)造成壓力。
- .filter(_._1 != null)
- .keyBy(_._1)
- .process(
- new KeyedProcessFunction[String,(String,String,String,String,String), String]{
- private var mapState:MapState[String,Map[String,String]] = _
- private var first: Boolean = true
- override def open(parameters: Configuration): Unit = {
- val config: StateTtlConfig = StateTtlConfig
- .newBuilder(org.apache.flink.api.common.time.Time.days(1))
- .setUpdateType(StateTtlConfig.UpdateType.OnReadAndWrite)
- .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
- .build()
- val join = new MapStateDescriptor[String,Map[String,String]]("join",classOf[String],classOf[Map[String,String]])
- join.enableTimeToLive(config)
- mapState = getRuntimeContext.getMapState(join)
- }
- override def processElement(
- in: (String, String, String, String, String),
- context: KeyedProcessFunction[String, (String, String, String, String, String),String]#Context,
- collector: Collector[String]): Unit = {
- // 數(shù)據(jù)處理,過來(lái)一條條數(shù)據(jù),然后按照自己的業(yè)務(wù)邏輯先去mapState去找,如果沒有再去 外部去找
- if (mapState.contains("xx_id")){
- // 如果存在就取
- }else{
- // 如果不存在去外部拿,然后放到mapState中
- val dim_sql="select dic_code,pre_dictionary_id,dic_name from xxxx where id=xx_id"
- val dim: util.List[util.Map[String, AnyRef]] = MysqlUtil.executeQuery(dim_sql, null)
- mapState.put("xx_id",null)
- }
- // 然后封裝 放到collect中
- collector.collect(null)
- }
- }
- )
廣播維表
比如上面提到的字典表,每一個(gè)Task都需要這份數(shù)據(jù),那么需要join這份數(shù)據(jù)的時(shí)候就可以使用廣播維表。
- val dimStream=env.addSource(MysqlSource)
- // 廣播狀態(tài)
- val broadcastStateDesc=new MapStateDescriptor[String,String]("broadcaststate", BasicTypeInfo.STRING_TYPE_INFO, new MapTypeInfo<>(Long.class, Dim.class))
- // 廣播流
- val broadStream=dimStream.broadcast()
- // 主數(shù)據(jù)流
- val mainConsumer = new FlinkKafkaConsumer[String]("topic", new SimpleStringSchema(), kafkaConfig)
- val mainStream=env.addSource(mainConsumer)
- // 廣播狀態(tài)與維度表關(guān)聯(lián)
- val connectedStream=mainStream.connect(broadStream).map(..User(id,name)).key(_.1)
- connectedStream.process(new KeyedBroadcastProcessFunction[String,User,Map[Long,Dim],String] {
- override def processElement(value: User, ctx: KeyedBroadcastProcessFunction[String,User,Map[Long,Dim],String]#ReadOnlyContext, out: Collector[String]): Unit = {
- // 取到數(shù)據(jù)就可以愉快的玩耍了
- val state=ctx.getBroadcastState(broadcastStateDesc)
- xxxxxx
- }
- }
「思考:」 如果把維表流也通過實(shí)時(shí)監(jiān)控binlog到kafka,當(dāng)維度數(shù)據(jù)發(fā)生變化時(shí),更新放到狀態(tài)中,這種方式,是不是更具有時(shí)效性呢?
(1)通過canal把變更binlog方式發(fā)送到kafka中。
(2)數(shù)據(jù)流定義成為廣播流,廣播到數(shù)據(jù)到主數(shù)據(jù)流中。
(3)定義一個(gè)廣播狀態(tài)存儲(chǔ)數(shù)據(jù),在主數(shù)據(jù)進(jìn)行查找匹配,符合要求則join成功。
Temporal Table Join(FlinkSQL與Flink Table API)
由于維表是一張不斷變化的表(靜態(tài)表只是動(dòng)態(tài)表的一種特例)。那如何 JOIN 一張不斷變化的表呢?如果用傳統(tǒng)的 JOIN 語(yǔ)法來(lái)表達(dá)維表 JOIN,是不完整的。因?yàn)榫S表是一直在更新變化的,如果用這個(gè)語(yǔ)法那么關(guān)聯(lián)上的是哪個(gè)時(shí)刻的維表呢?我們是不知道的,結(jié)果是不確定的。所以 Flink SQL 的維表 JOIN 語(yǔ)法引入了Temporal Table 的標(biāo)準(zhǔn)語(yǔ)法,用來(lái)聲明關(guān)聯(lián)的是維表哪個(gè)時(shí)刻的快照。
普通關(guān)聯(lián)會(huì)一直保留關(guān)聯(lián)雙側(cè)的數(shù)據(jù),數(shù)據(jù)也就會(huì)一直膨脹,直到撐爆內(nèi)存導(dǎo)致任務(wù)失敗,Temporal Join則可以定期清理過期數(shù)據(jù),在合理的內(nèi)存配置下即可避免內(nèi)存溢出。
Event Time Temporal Join
語(yǔ)法
- SELECT [column_list]
- FROM table1 [AS <alias1>]
- [LEFT] JOIN table2 FOR SYSTEM_TIME AS OF table1.{ proctime | rowtime } [AS <alias2>]
- ON table1.column-name1 = table2.column-name1
使用事件時(shí)間屬性(即行時(shí)間屬性),可以檢索過去某個(gè)時(shí)間點(diǎn)的鍵值。這允許在一個(gè)共同的時(shí)間點(diǎn)連接兩個(gè)表。
舉例
假設(shè)我們有一個(gè)訂單表,每個(gè)訂單都有不同貨幣的價(jià)格。為了將此表正確地規(guī)范化為單一貨幣,每個(gè)訂單都需要與下訂單時(shí)的適當(dāng)貨幣兌換率相結(jié)合。
- CREATE TABLE orders (
- order_id STRING,
- price DECIMAL(32,2),
- currency STRING,
- order_time TIMESTAMP(3),
- WATERMARK FOR order_time AS order_time
- ) WITH (/* ... */);
- CREATE TABLE currency_rates (
- currency STRING,
- conversion_rate DECIMAL(32, 2),
- update_time TIMESTAMP(3) METADATA FROM `values.source.timestamp` VIRTUAL
- WATERMARK FOR update_time AS update_time,
- PRIMARY KEY(currency) NOT ENFORCED
- ) WITH (
- 'connector' = 'upsert-kafka',
- /* ... */
- );
- event-time temporal join需要temporal join條件的等價(jià)條件中包含的主鍵
- SELECT
- order_id,
- price,
- currency,
- conversion_rate,
- order_time,
- FROM orders
- LEFT JOIN currency_rates FOR SYSTEM TIME AS OF orders.order_time
- ON orders.currency = currency_rates.currency
Processing Time Temporal Join
處理時(shí)間時(shí)態(tài)表連接使用處理時(shí)間屬性將行與外部版本表中鍵的最新版本相關(guān)聯(lián)。
根據(jù)定義,使用processing-time屬性,連接將始終返回給定鍵的最新值??梢詫⒉檎冶砜醋魇且粋€(gè)簡(jiǎn)單的HashMap,它存儲(chǔ)來(lái)自構(gòu)建端的所有記錄。這種連接的強(qiáng)大之處在于,當(dāng)在Flink中無(wú)法將表具體化為動(dòng)態(tài)表時(shí),它允許Flink直接針對(duì)外部系統(tǒng)工作。
使用FOR SYSTEM_TIME AS OF table1.proctime表示當(dāng)左邊表的記錄與右邊的維表join時(shí),只匹配當(dāng)前處理時(shí)間維表所對(duì)應(yīng)的的快照數(shù)據(jù)。
Lookup Table Join
Lookup Join 通常用于通過連接外部表(維度表)補(bǔ)充信息,要求一個(gè)表具有處理時(shí)間屬性,另一個(gè)表使 Lookup Source Connector。
JDBC 連接器可以用在時(shí)態(tài)表關(guān)聯(lián)中作為一個(gè)可 lookup 的 source (又稱為維表)。用到的語(yǔ)法是 Temporal Joins 的語(yǔ)法。
- s"""
- |CREATE TABLE users(
- |id int,
- |name string,
- |PRIMARY KEY (id) NOT ENFORCED
- |)
- |WITH (
- |'connector' = 'jdbc',
- |'url' = 'xxxx',
- |'driver'='$DRIVER_CLASS_NAME',
- |'table-name'='$tableName',
- |'lookup.cache.max-rows'='100',
- |'lookup.cache.ttl'='30s'
- |)
- |""".stripMargin
- s"""
- |CREATE TABLE car(
- |`id` bigint ,
- |`user_id` bigint,
- |`proctime` as PROCTIME()
- |)
- |WITH (
- | 'connector' = 'kafka',
- | 'topic' = '$topic',
- | 'scan.startup.mode' = 'latest-offset',
- | 'properties.bootstrap.servers' = '$KAFKA_SERVICE',
- | 'properties.group.id' = 'indicator',
- | 'format' = 'canal-json'
- |)
- |""".stripMargin
- SELECT
- mc.user_id user_id,
- count(1) AS `value`
- FROM car mc
- inner join users FOR SYSTEM_TIME AS OF mc.proctime as u on mc.user_id=s.id
- group by mc.user_id
總結(jié)
總體來(lái)講,關(guān)聯(lián)維表有四個(gè)基礎(chǔ)的方式:
(1)查找外部數(shù)據(jù)源關(guān)聯(lián)
(2)預(yù)加載維表關(guān)聯(lián)(內(nèi)存,狀態(tài))
(3)冷熱數(shù)據(jù)儲(chǔ)備(算是1和2的結(jié)合使用)
(4)維表變更日志關(guān)聯(lián)(廣播也好,其他方式的流關(guān)聯(lián)也好)
「同時(shí)考慮:」 吞吐量,時(shí)效性,外部數(shù)據(jù)源的負(fù)載,內(nèi)存資源,解耦性等等方面。
四種join方式不存在絕對(duì)的一勞永逸,更多的是針對(duì)業(yè)務(wù)場(chǎng)景在各指標(biāo)上的權(quán)衡取舍,因此看官需要結(jié)合場(chǎng)景來(lái)選擇適合的。