自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

Flink常見維表Join方案,收藏學(xué)習(xí)開發(fā)很有用!

大數(shù)據(jù)
由于維表是一張不斷變化的表(靜態(tài)表只是動(dòng)態(tài)表的一種特例)。那如何 JOIN 一張不斷變化的表呢?如果用傳統(tǒng)的 JOIN 語(yǔ)法來(lái)表達(dá)維表 JOIN,是不完整的。因?yàn)榫S表是一直在更新變化的,如果用這個(gè)語(yǔ)法那么關(guān)聯(lián)上的是哪個(gè)時(shí)刻的維表呢?我們是不知道的,結(jié)果是不確定的。

本文轉(zhuǎn)載自微信公眾號(hào)「大數(shù)據(jù)左右手」,作者左右 。轉(zhuǎn)載本文請(qǐng)聯(lián)系大數(shù)據(jù)左右手公眾號(hào)。

前言

實(shí)時(shí)數(shù)倉(cāng),難免會(huì)遇到j(luò)oin維表的業(yè)務(wù)?,F(xiàn)總結(jié)幾種方案,供各位看官選擇:

  • 查找關(guān)聯(lián)(同步,異步)
  • 狀態(tài)編程,預(yù)加載數(shù)據(jù)到狀態(tài)中,按需取
  • 冷熱數(shù)據(jù)
  • 廣播維表
  • Temporal Table Join
  • Lookup Table Join

其中中間留下兩個(gè)問題,供大家思考,可留言一起討論?

查找關(guān)聯(lián)

查找關(guān)聯(lián)就是在主流數(shù)據(jù)中直接訪問外部數(shù)據(jù)(mysql,redis,impala ...)去根據(jù)主鍵或者某種關(guān)鍵條件去關(guān)聯(lián)取值。

適合: 維表數(shù)據(jù)量大,但是主數(shù)據(jù)不大的業(yè)務(wù)實(shí)時(shí)計(jì)算。

缺點(diǎn):數(shù)據(jù)量大的時(shí)候,會(huì)給外部數(shù)據(jù)源庫(kù)帶來(lái)很大的壓力,因?yàn)槟硹l數(shù)據(jù)都需要關(guān)聯(lián)。

同步

訪問數(shù)據(jù)庫(kù)是同步調(diào)用,導(dǎo)致 subtak 線程會(huì)被阻塞,影響吞吐量

  1. import com.alibaba.fastjson.{JSON, JSONArray, JSONObject} 
  2. import com.wang.stream.env.{FlinkStreamEnv, KafkaSourceEnv} 
  3. import org.apache.flink.api.common.functions.FlatMapFunction 
  4. import org.apache.flink.api.common.serialization.SimpleStringSchema 
  5. import org.apache.flink.streaming.api.scala._ 
  6. import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer 
  7. import org.apache.flink.util.Collector 
  8.  
  9.  def analyses(): Unit ={ 
  10.     val env: StreamExecutionEnvironment = FlinkStreamEnv.get() 
  11.     KafkaSourceEnv.getKafkaSourceStream(env,List("test")) 
  12.       .map(JSON.parseObject(_)) 
  13.       .filter(_!=null
  14.       .flatMap( 
  15.         new FlatMapFunction[JSONObject,String] { 
  16.           override def flatMap(jSONObject: JSONObject, collector: Collector[String]): Unit = { 
  17.             // 如果topic就一張表,不用區(qū)分,如果多張表,可以通過database 與 table 區(qū)分,放到下一步去處理 
  18.             // 表的名字 
  19.             val databaseName:String = jSONObject.getString("database"
  20.             // 表的名字 
  21.             val tableName:String = jSONObject.getString("table"
  22.             // 數(shù)據(jù)操作類型 INSERT UPDATE DELETE 
  23.             val operationType:String = jSONObject.getString("type"
  24.             // 主體數(shù)據(jù) 
  25.             val tableData: JSONArray = jSONObject.getJSONArray("data"
  26.             // old 值 
  27.             val old: JSONArray = jSONObject.getJSONArray("old"
  28.             // canal json 可能存在批處理出現(xiàn)data數(shù)據(jù)多條 
  29.             for (i <- 0 until tableData.size()) { 
  30.               val data: String = tableData.get(i).toString 
  31.               val nObject: JSONObject = JSON.parseObject(data) 
  32.               val orderId: AnyRef = nObject.get("order_id"
  33.               // 下面寫(mysql,redis或者h(yuǎn)base)的連接,利用api 通過orderId查找 
  34.                
  35.               // 最后封裝數(shù)據(jù)格式 就是join所得 
  36.               collector.collect(null
  37.             } 
  38.           } 
  39.         } 
  40.       ) 
  41.       .addSink( 
  42.         new FlinkKafkaProducer[String]( 
  43.           ""
  44.           ""
  45.           new SimpleStringSchema() 
  46.         ) 
  47.       ) 
  48.     env.execute(""

異步

AsyncIO 可以并發(fā)地處理多個(gè)請(qǐng)求,很大程度上減少了對(duì) subtask 線程的阻塞。

  1. def analyses(): Unit ={ 
  2.     val env: StreamExecutionEnvironment = FlinkStreamEnv.get() 
  3.     val source: DataStream[String] = KafkaSourceEnv.getKafkaSourceStream(env, List("test")) 
  4.       .map(JSON.parseObject(_)) 
  5.       .filter(_ != null
  6.       .flatMap( 
  7.         new FlatMapFunction[JSONObject, String] { 
  8.           override def flatMap(jSONObject: JSONObject, collector: Collector[String]): Unit = { 
  9.             // 如果topic就一張表,不用區(qū)分,如果多張表,可以通過database 與 table 區(qū)分,放到下一步去處理 
  10.             // 表的名字 
  11.             val databaseName: String = jSONObject.getString("database"
  12.             // 表的名字 
  13.             val tableName: String = jSONObject.getString("table"
  14.             // 數(shù)據(jù)操作類型 INSERT UPDATE DELETE 
  15.             val operationType: String = jSONObject.getString("type"
  16.             // 主體數(shù)據(jù) 
  17.             val tableData: JSONArray = jSONObject.getJSONArray("data"
  18.             // old 值 
  19.             val old: JSONArray = jSONObject.getJSONArray("old"
  20.             // canal json 可能存在批處理出現(xiàn)data數(shù)據(jù)多條 
  21.             for (i <- 0 until tableData.size()) { 
  22.               val data: String = tableData.get(i).toString 
  23.               collector.collect(data) 
  24.             } 
  25.           } 
  26.         } 
  27.       ) 
  28.        
  29.     AsyncDataStream.unorderedWait( 
  30.       source, 
  31.       new RichAsyncFunction[String,String] {//自定義的數(shù)據(jù)源異步處理類 
  32.         override def open(parameters: Configuration): Unit = { 
  33.           // 初始化 
  34.         } 
  35.         override def asyncInvoke(input: String, resultFuture: ResultFuture[String]): Unit = { 
  36.            
  37.           // 將數(shù)據(jù)搜集 
  38.           resultFuture.complete(null
  39.        } 
  40.  
  41.         override def close(): Unit = { 
  42.           // 關(guān)閉 
  43.         } 
  44.     }, 
  45.     1000,//異步超時(shí)時(shí)間 
  46.     TimeUnit.MILLISECONDS,//時(shí)間單位 
  47.     100)//最大異步并發(fā)請(qǐng)求數(shù)量 
  48.      .addSink( 
  49.         new FlinkKafkaProducer[String]( 
  50.           ""
  51.           ""
  52.           new SimpleStringSchema() 
  53.         ) 
  54.       ) 
  55.  
  56.     env.execute(""
  57.   } 

狀態(tài)編程,預(yù)加載數(shù)據(jù)到狀態(tài)中,按需取

首先把維表數(shù)據(jù)初始化到state中,設(shè)置好更新時(shí)間,定時(shí)去把維表。

優(yōu)點(diǎn):flink 自己維護(hù)狀態(tài)數(shù)據(jù),"榮辱與共",不需要頻繁鏈接外部數(shù)據(jù)源,達(dá)到解耦。

缺點(diǎn):不適合大的維表和變化大的維表。

  1. .keyBy(_._1) 
  2. .process( 
  3.   new KeyedProcessFunction[String,(String,String,String,String,String), String]{ 
  4.     private var mapState:MapState[String,Map[String,String]] = _ 
  5.     private var first: Boolean = true 
  6.      
  7.     override def open(parameters: Configuration): Unit = { 
  8.       val config: StateTtlConfig = StateTtlConfig 
  9.         .newBuilder(org.apache.flink.api.common.time.Time.minutes(5)) 
  10.         .setUpdateType(StateTtlConfig.UpdateType.OnReadAndWrite) 
  11.         .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired) 
  12.         .build() 
  13.       val join = new MapStateDescriptor[String,Map[String,String]]("join",classOf[String],classOf[Map[String,String]]) 
  14.       join.enableTimeToLive(config) 
  15.       mapState = getRuntimeContext.getMapState(join
  16.     } 
  17.     override def processElement( 
  18.                                  in: (String, String, String, String, String), 
  19.                                  context: KeyedProcessFunction[String, (String, String, String, String, String),String]#Context, 
  20.                                  collector: Collector[String]): Unit = { 
  21.       // 加載維表 
  22.       if(first){ 
  23.         first = false 
  24.         val time: Long = System.currentTimeMillis() 
  25.         getSmallDimTableInfo() 
  26.         // 設(shè)置好更新時(shí)間,定時(shí)去把維表 
  27.         context.timerService().registerProcessingTimeTimer(time + 86400000) 
  28.       } 
  29.        
  30.       // 數(shù)據(jù)處理,過來(lái)一條條數(shù)據(jù),然后按照自己的業(yè)務(wù)邏輯去取維表的數(shù)據(jù)即可 
  31.        
  32.       // 然后封裝 放到collect中 
  33.       collector.collect(null
  34.     } 
  35.  
  36.     override def onTimer( 
  37.                           timestamp: Long, 
  38.                           ctx: KeyedProcessFunction[String, (String, String, String, String, String),String]#OnTimerContext, 
  39.                           out: Collector[String]): Unit = { 
  40.       println("觸發(fā)器執(zhí)行"
  41.       mapState.clear() 
  42.       getSmallDimTableInfo() 
  43.       println(mapState) 
  44.       ctx.timerService().registerProcessingTimeTimer(timestamp + 86400000) 
  45.     } 
  46.     def getSmallDimTableInfo(): Unit ={ 
  47.  
  48.       // 加載 字典數(shù)據(jù) 
  49.       val select_dictionary="select dic_code,pre_dictionary_id,dic_name from xxxx" 
  50.       val dictionary: util.List[util.Map[String, AnyRef]] = MysqlUtil.executeQuery(select_dictionary, null
  51.       dictionary.foreach(item=>{ 
  52.         mapState.put("dic_dictionary_"+item.get("pre_dictionary_id").toString,item) 
  53.       }) 
  54.  
  55.     } 
  56.   } 
  57. .filter(_!=null
  58. .addSink( 
  59.   new FlinkKafkaProducer[String]( 
  60.     ""
  61.     ""
  62.     new SimpleStringSchema() 
  63.   ) 
  64. v.execute(""

思考下:直接定義一個(gè)Map集合這樣的優(yōu)缺點(diǎn)是什么?可以留言說(shuō)出自己的看法?

冷熱數(shù)據(jù)

思想:先去狀態(tài)去取,如果沒有,去外部查詢,同時(shí)去存到狀態(tài)里面。StateTtlConfig 的過期時(shí)間可以設(shè)置短點(diǎn)。

優(yōu)點(diǎn):中庸取值方案,熱備常用數(shù)據(jù)到內(nèi)存,也避免了數(shù)據(jù)join相對(duì)過多外部數(shù)據(jù)源。

缺點(diǎn):也不能一勞永逸解決某些問題,熱備數(shù)據(jù)過多,或者冷數(shù)據(jù)過大,都會(huì)對(duì)state 或者 外部數(shù)據(jù)庫(kù)造成壓力。

  1. .filter(_._1 != null
  2. .keyBy(_._1) 
  3. .process( 
  4.   new KeyedProcessFunction[String,(String,String,String,String,String), String]{ 
  5.     private var mapState:MapState[String,Map[String,String]] = _ 
  6.     private var first: Boolean = true 
  7.     override def open(parameters: Configuration): Unit = { 
  8.       val config: StateTtlConfig = StateTtlConfig 
  9.         .newBuilder(org.apache.flink.api.common.time.Time.days(1)) 
  10.         .setUpdateType(StateTtlConfig.UpdateType.OnReadAndWrite) 
  11.         .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired) 
  12.         .build() 
  13.       val join = new MapStateDescriptor[String,Map[String,String]]("join",classOf[String],classOf[Map[String,String]]) 
  14.       join.enableTimeToLive(config) 
  15.       mapState = getRuntimeContext.getMapState(join
  16.     } 
  17.     override def processElement( 
  18.                                  in: (String, String, String, String, String), 
  19.                                  context: KeyedProcessFunction[String, (String, String, String, String, String),String]#Context, 
  20.                                  collector: Collector[String]): Unit = { 
  21.  
  22.       // 數(shù)據(jù)處理,過來(lái)一條條數(shù)據(jù),然后按照自己的業(yè)務(wù)邏輯先去mapState去找,如果沒有再去 外部去找 
  23.       if (mapState.contains("xx_id")){ 
  24.         // 如果存在就取 
  25.  
  26.       }else
  27.         // 如果不存在去外部拿,然后放到mapState中 
  28.         val dim_sql="select dic_code,pre_dictionary_id,dic_name from xxxx where id=xx_id" 
  29.         val dim: util.List[util.Map[String, AnyRef]] = MysqlUtil.executeQuery(dim_sql, null
  30.         mapState.put("xx_id",null
  31.       } 
  32.       // 然后封裝 放到collect中 
  33.       collector.collect(null
  34.     } 
  35.   } 

廣播維表

比如上面提到的字典表,每一個(gè)Task都需要這份數(shù)據(jù),那么需要join這份數(shù)據(jù)的時(shí)候就可以使用廣播維表。

  1. val dimStream=env.addSource(MysqlSource) 
  2.  
  3. // 廣播狀態(tài) 
  4. val broadcastStateDesc=new MapStateDescriptor[String,String]("broadcaststate", BasicTypeInfo.STRING_TYPE_INFO, new MapTypeInfo<>(Long.class, Dim.class)) 
  5.  
  6. // 廣播流 
  7. val broadStream=dimStream.broadcast() 
  8.  
  9. // 主數(shù)據(jù)流 
  10. val mainConsumer = new FlinkKafkaConsumer[String]("topic", new SimpleStringSchema(), kafkaConfig) 
  11.  
  12. val mainStream=env.addSource(mainConsumer) 
  13.  
  14. // 廣播狀態(tài)與維度表關(guān)聯(lián) 
  15. val connectedStream=mainStream.connect(broadStream).map(..User(id,name)).key(_.1) 
  16.  
  17. connectedStream.process(new KeyedBroadcastProcessFunction[String,User,Map[Long,Dim],String] { 
  18.  
  19.      override def processElement(value: User, ctx: KeyedBroadcastProcessFunction[String,User,Map[Long,Dim],String]#ReadOnlyContext, out: Collector[String]): Unit = { 
  20.       // 取到數(shù)據(jù)就可以愉快的玩耍了 
  21.      val state=ctx.getBroadcastState(broadcastStateDesc) 
  22.        xxxxxx 
  23.          
  24.   } 

「思考:」 如果把維表流也通過實(shí)時(shí)監(jiān)控binlog到kafka,當(dāng)維度數(shù)據(jù)發(fā)生變化時(shí),更新放到狀態(tài)中,這種方式,是不是更具有時(shí)效性呢?

(1)通過canal把變更binlog方式發(fā)送到kafka中。

(2)數(shù)據(jù)流定義成為廣播流,廣播到數(shù)據(jù)到主數(shù)據(jù)流中。

(3)定義一個(gè)廣播狀態(tài)存儲(chǔ)數(shù)據(jù),在主數(shù)據(jù)進(jìn)行查找匹配,符合要求則join成功。

Temporal Table Join(FlinkSQL與Flink Table API)

由于維表是一張不斷變化的表(靜態(tài)表只是動(dòng)態(tài)表的一種特例)。那如何 JOIN 一張不斷變化的表呢?如果用傳統(tǒng)的 JOIN 語(yǔ)法來(lái)表達(dá)維表 JOIN,是不完整的。因?yàn)榫S表是一直在更新變化的,如果用這個(gè)語(yǔ)法那么關(guān)聯(lián)上的是哪個(gè)時(shí)刻的維表呢?我們是不知道的,結(jié)果是不確定的。所以 Flink SQL 的維表 JOIN 語(yǔ)法引入了Temporal Table 的標(biāo)準(zhǔn)語(yǔ)法,用來(lái)聲明關(guān)聯(lián)的是維表哪個(gè)時(shí)刻的快照。

普通關(guān)聯(lián)會(huì)一直保留關(guān)聯(lián)雙側(cè)的數(shù)據(jù),數(shù)據(jù)也就會(huì)一直膨脹,直到撐爆內(nèi)存導(dǎo)致任務(wù)失敗,Temporal Join則可以定期清理過期數(shù)據(jù),在合理的內(nèi)存配置下即可避免內(nèi)存溢出。

Event Time Temporal Join

語(yǔ)法

  1. SELECT [column_list] 
  2. FROM table1 [AS <alias1>] 
  3. [LEFTJOIN table2 FOR SYSTEM_TIME AS OF table1.{ proctime | rowtime } [AS <alias2>] 
  4. ON table1.column-name1 = table2.column-name1 

使用事件時(shí)間屬性(即行時(shí)間屬性),可以檢索過去某個(gè)時(shí)間點(diǎn)的鍵值。這允許在一個(gè)共同的時(shí)間點(diǎn)連接兩個(gè)表。

舉例

假設(shè)我們有一個(gè)訂單表,每個(gè)訂單都有不同貨幣的價(jià)格。為了將此表正確地規(guī)范化為單一貨幣,每個(gè)訂單都需要與下訂單時(shí)的適當(dāng)貨幣兌換率相結(jié)合。

  1. CREATE TABLE orders ( 
  2.     order_id    STRING, 
  3.     price       DECIMAL(32,2), 
  4.     currency    STRING, 
  5.     order_time  TIMESTAMP(3), 
  6.     WATERMARK FOR order_time AS order_time 
  7. WITH (/* ... */); 
  8.  
  9.  
  10. CREATE TABLE currency_rates ( 
  11.     currency STRING, 
  12.     conversion_rate DECIMAL(32, 2), 
  13.     update_time TIMESTAMP(3) METADATA FROM `values.source.timestamp` VIRTUAL 
  14.     WATERMARK FOR update_time AS update_time, 
  15.     PRIMARY KEY(currency) NOT ENFORCED 
  16. WITH ( 
  17.    'connector' = 'upsert-kafka'
  18.    /* ... */ 
  19. ); 
  20.  
  21. event-time temporal join需要temporal join條件的等價(jià)條件中包含的主鍵 
  22.  
  23. SELECT  
  24.      order_id, 
  25.      price, 
  26.      currency, 
  27.      conversion_rate, 
  28.      order_time, 
  29. FROM orders 
  30. LEFT JOIN currency_rates FOR SYSTEM TIME AS OF orders.order_time 
  31. ON orders.currency = currency_rates.currency 

Processing Time Temporal Join

處理時(shí)間時(shí)態(tài)表連接使用處理時(shí)間屬性將行與外部版本表中鍵的最新版本相關(guān)聯(lián)。

根據(jù)定義,使用processing-time屬性,連接將始終返回給定鍵的最新值??梢詫⒉檎冶砜醋魇且粋€(gè)簡(jiǎn)單的HashMap,它存儲(chǔ)來(lái)自構(gòu)建端的所有記錄。這種連接的強(qiáng)大之處在于,當(dāng)在Flink中無(wú)法將表具體化為動(dòng)態(tài)表時(shí),它允許Flink直接針對(duì)外部系統(tǒng)工作。

使用FOR SYSTEM_TIME AS OF table1.proctime表示當(dāng)左邊表的記錄與右邊的維表join時(shí),只匹配當(dāng)前處理時(shí)間維表所對(duì)應(yīng)的的快照數(shù)據(jù)。

Lookup Table Join

Lookup Join 通常用于通過連接外部表(維度表)補(bǔ)充信息,要求一個(gè)表具有處理時(shí)間屬性,另一個(gè)表使 Lookup Source Connector。

JDBC 連接器可以用在時(shí)態(tài)表關(guān)聯(lián)中作為一個(gè)可 lookup 的 source (又稱為維表)。用到的語(yǔ)法是 Temporal Joins 的語(yǔ)法。

  1. s""
  2.         |CREATE TABLE users( 
  3.         |id int
  4.         |name string, 
  5.         |PRIMARY KEY (id) NOT ENFORCED 
  6.         |) 
  7.         |WITH ( 
  8.         |'connector' = 'jdbc'
  9.         |'url' = 'xxxx'
  10.         |'driver'='$DRIVER_CLASS_NAME'
  11.         |'table-name'='$tableName'
  12.         |'lookup.cache.max-rows'='100'
  13.         |'lookup.cache.ttl'='30s' 
  14.         |) 
  15.         |""".stripMargin 
  16.          
  17.          
  18. s""
  19.        |CREATE TABLE car( 
  20.        |`id`   bigint , 
  21.        |`user_id` bigint
  22.        |`proctime` as PROCTIME() 
  23.        |) 
  24.        |WITH ( 
  25.        |    'connector' = 'kafka'
  26.        |    'topic' = '$topic'
  27.        |    'scan.startup.mode' = 'latest-offset'
  28.        |    'properties.bootstrap.servers' = '$KAFKA_SERVICE'
  29.        |    'properties.group.id' = 'indicator'
  30.        |    'format' = 'canal-json' 
  31.        |) 
  32.        |""".stripMargin 
  33.         
  34.         
  35.         
  36.     SELECT 
  37.         mc.user_id user_id, 
  38.         count(1) AS `value` 
  39.     FROM car mc 
  40.         inner join users FOR SYSTEM_TIME AS OF mc.proctime as u on mc.user_id=s.id 
  41.     group by mc.user_id 

總結(jié)

總體來(lái)講,關(guān)聯(lián)維表有四個(gè)基礎(chǔ)的方式:

(1)查找外部數(shù)據(jù)源關(guān)聯(lián)

(2)預(yù)加載維表關(guān)聯(lián)(內(nèi)存,狀態(tài))

(3)冷熱數(shù)據(jù)儲(chǔ)備(算是1和2的結(jié)合使用)

(4)維表變更日志關(guān)聯(lián)(廣播也好,其他方式的流關(guān)聯(lián)也好)

「同時(shí)考慮:」 吞吐量,時(shí)效性,外部數(shù)據(jù)源的負(fù)載,內(nèi)存資源,解耦性等等方面。

四種join方式不存在絕對(duì)的一勞永逸,更多的是針對(duì)業(yè)務(wù)場(chǎng)景在各指標(biāo)上的權(quán)衡取舍,因此看官需要結(jié)合場(chǎng)景來(lái)選擇適合的。

 

責(zé)任編輯:武曉燕 來(lái)源: 大數(shù)據(jù)左右手
相關(guān)推薦

2021-07-13 10:02:52

Pandas函數(shù)Linux

2013-08-15 09:52:45

開發(fā)框架開發(fā)工具開發(fā)腳本

2015-10-27 11:02:06

Web開發(fā)CSS 庫(kù)

2016-12-14 20:53:04

Linuxgcc命令行

2016-12-14 19:19:19

Linuxgcc命令行

2014-06-13 11:26:53

CSS庫(kù)Web開發(fā)

2023-03-06 10:42:34

CSS前端

2013-07-12 09:45:16

PHP功能

2021-06-29 10:50:30

Python函數(shù)文件

2023-09-07 16:28:46

JavaScrip

2013-08-23 09:28:37

GitGit 命令

2011-05-10 08:47:55

開發(fā)者HTML 5W3C

2021-11-30 23:30:45

sql 性能異步

2022-06-29 09:09:38

Python代碼

2022-08-23 09:01:02

HTMLWeb

2017-10-25 16:22:58

OpenStack操作Glance

2011-05-16 08:37:56

JavaScript庫(kù)

2020-11-18 11:14:27

運(yùn)維架構(gòu)技術(shù)

2022-07-13 12:53:59

數(shù)據(jù)存儲(chǔ)

2020-03-06 08:35:45

GitHub設(shè)計(jì)瀏覽器
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)