自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

Flink實(shí)時(shí)計(jì)算topN熱榜

開發(fā) 架構(gòu)
為了統(tǒng)計(jì)每個(gè)窗口下活躍的用戶,我們需要再次按窗口進(jìn)行分組,這里根據(jù)UserViewCount中的windowEnd進(jìn)行keyBy()操作。

[[386499]]

本文轉(zhuǎn)載自微信公眾號(hào)「Java大數(shù)據(jù)與數(shù)據(jù)倉(cāng)庫(kù)」,作者柯廣。轉(zhuǎn)載本文請(qǐng)聯(lián)系Java大數(shù)據(jù)與數(shù)據(jù)倉(cāng)庫(kù)公眾號(hào)。

topN的常見應(yīng)用場(chǎng)景,最熱商品購(gòu)買量,最高人氣作者的閱讀量等等。

1. 用到的知識(shí)點(diǎn)

  • Flink創(chuàng)建kafka數(shù)據(jù)源;
  • 基于 EventTime 處理,如何指定 Watermark;
  • Flink中的Window,滾動(dòng)(tumbling)窗口與滑動(dòng)(sliding)窗口;
  • State狀態(tài)的使用;
  • ProcessFunction 實(shí)現(xiàn) TopN 功能;

2. 案例介紹

通過用戶訪問日志,計(jì)算最近一段時(shí)間平臺(tái)最活躍的幾位用戶topN。

  • 創(chuàng)建kafka生產(chǎn)者,發(fā)送測(cè)試數(shù)據(jù)到kafka;
  • 消費(fèi)kafka數(shù)據(jù),使用滑動(dòng)(sliding)窗口,每隔一段時(shí)間更新一次排名;

3. 數(shù)據(jù)源

這里使用kafka api發(fā)送測(cè)試數(shù)據(jù)到kafka,代碼如下:

  1. @Data 
  2. @NoArgsConstructor 
  3. @AllArgsConstructor 
  4. @ToString 
  5. public class User { 
  6.  
  7.     private long id; 
  8.     private String username; 
  9.     private String password
  10.     private long timestamp
  11.  
  12. Map<String, String> config = Configuration.initConfig("commons.xml"); 
  13.  
  14. @Test 
  15. public void sendData() throws InterruptedException { 
  16.     int cnt = 0; 
  17.  
  18.     while (cnt < 200){ 
  19.         User user = new User(); 
  20.         user.setId(cnt); 
  21.         user.setUsername("username" + new Random().nextInt((cnt % 5) + 2)); 
  22.         user.setPassword("password" + cnt); 
  23.         user.setTimestamp(System.currentTimeMillis()); 
  24.         Future<RecordMetadata> future = KafkaUtil.sendDataToKafka(config.get("kafka-topic"), String.valueOf(cnt), JSON.toJSONString(user)); 
  25.         while (!future.isDone()){ 
  26.             Thread.sleep(100); 
  27.         } 
  28.         try { 
  29.             RecordMetadata recordMetadata = future.get(); 
  30.             System.out.println(recordMetadata.offset()); 
  31.         } catch (InterruptedException e) { 
  32.             e.printStackTrace(); 
  33.         } catch (ExecutionException e) { 
  34.             e.printStackTrace(); 
  35.         } 
  36.         System.out.println("發(fā)送消息:" + cnt + "******" + user.toString()); 
  37.         cnt = cnt + 1; 
  38.     } 

這里通過隨機(jī)數(shù)來擾亂username,便于使用戶名大小不一,讓結(jié)果更加明顯。KafkaUtil是自己寫的一個(gè)kafka工具類,代碼很簡(jiǎn)單,主要是平時(shí)做測(cè)試方便。

4. 主要程序

創(chuàng)建一個(gè)main程序,開始編寫代碼。

創(chuàng)建flink環(huán)境,關(guān)聯(lián)kafka數(shù)據(jù)源。

  1. Map<String, String> config = Configuration.initConfig("commons.xml"); 
  2.  
  3. Properties kafkaProps = new Properties(); 
  4. kafkaProps.setProperty("zookeeper.connect", config.get("kafka-zookeeper")); 
  5. kafkaProps.setProperty("bootstrap.servers", config.get("kafka-ipport")); 
  6. kafkaProps.setProperty("group.id", config.get("kafka-groupid")); 
  7.  
  8. StreamExecutionEnvironment senv = StreamExecutionEnvironment.getExecutionEnvironment(); 

EventTime 與 Watermark

  1. senv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); 

設(shè)置屬性senv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime),表示按照數(shù)據(jù)時(shí)間字段來處理,默認(rèn)是TimeCharacteristic.ProcessingTime

  1. /** The time characteristic that is used if none other is set. */ 
  2. private static final TimeCharacteristic DEFAULT_TIME_CHARACTERISTIC = TimeCharacteristic.ProcessingTime; 

這個(gè)屬性必須設(shè)置,否則后面,可能窗口結(jié)束無法觸發(fā),導(dǎo)致結(jié)果無法輸出。取值有三種:

  • ProcessingTime:事件被處理的時(shí)間。也就是由flink集群機(jī)器的系統(tǒng)時(shí)間來決定。
  • EventTime:事件發(fā)生的時(shí)間。一般就是數(shù)據(jù)本身攜帶的時(shí)間。
  • IngestionTime:攝入時(shí)間,數(shù)據(jù)進(jìn)入flink流的時(shí)間,跟ProcessingTime還是有區(qū)別的;

指定好使用數(shù)據(jù)的實(shí)際時(shí)間來處理,接下來需要指定flink程序如何get到數(shù)據(jù)的時(shí)間字段,這里使用調(diào)用DataStream的assignTimestampsAndWatermarks方法,抽取時(shí)間和設(shè)置watermark。

  1. senv.addSource( 
  2.         new FlinkKafkaConsumer010<>( 
  3.                 config.get("kafka-topic"), 
  4.                 new SimpleStringSchema(), 
  5.                 kafkaProps 
  6.         ) 
  7. ).map(x ->{ 
  8.     return JSON.parseObject(x, User.class); 
  9. }).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<User>(Time.milliseconds(1000)) { 
  10.     @Override 
  11.     public long extractTimestamp(User element) { 
  12.         return element.getTimestamp(); 
  13.     } 
  14. }) 

前面給出的代碼中可以看出,由于發(fā)送到kafka的時(shí)候,將User對(duì)象轉(zhuǎn)換為json字符串了,這里使用的是fastjson,接收過來可以轉(zhuǎn)化為JsonObject來處理,我這里還是將其轉(zhuǎn)化為User對(duì)象JSON.parseObject(x, User.class),便于處理。

這里考慮到數(shù)據(jù)可能亂序,使用了可以處理亂序的抽象類BoundedOutOfOrdernessTimestampExtractor,并且實(shí)現(xiàn)了唯一的一個(gè)沒有實(shí)現(xiàn)的方法extractTimestamp,亂序數(shù)據(jù),會(huì)導(dǎo)致數(shù)據(jù)延遲,在構(gòu)造方法中傳入了一個(gè)Time.milliseconds(1000),表明數(shù)據(jù)可以延遲一秒鐘。比如說,如果窗口長(zhǎng)度是10s,0~10s的數(shù)據(jù)會(huì)在11s的時(shí)候計(jì)算,此時(shí)watermark是10,才會(huì)觸發(fā)計(jì)算,也就是說引入watermark處理亂序數(shù)據(jù),最多可以容忍0~t這個(gè)窗口的數(shù)據(jù),最晚在t+1時(shí)刻到來。

 

具體關(guān)于watermark的講解可以參考這篇文章

https://blog.csdn.net/qq_39657909/article/details/106081543

窗口統(tǒng)計(jì)

業(yè)務(wù)需求上,通??赡苁且粋€(gè)小時(shí),或者過去15分鐘的數(shù)據(jù),5分鐘更新一次排名,這里為了演示效果,窗口長(zhǎng)度取10s,每次滑動(dòng)(slide)5s,即5秒鐘更新一次過去10s的排名數(shù)據(jù)。

  1. .keyBy("username"
  2. .timeWindow(Time.seconds(10), Time.seconds(5)) 
  3. .aggregate(new CountAgg(), new WindowResultFunction()) 

我們使用.keyBy("username")對(duì)用戶進(jìn)行分組,使用.timeWindow(Time size, Time slide)對(duì)每個(gè)用戶做滑動(dòng)窗口(10s窗口,5s滑動(dòng)一次)。然后我們使用 .aggregate(AggregateFunction af, WindowFunction wf) 做增量的聚合操作,它能使用AggregateFunction提前聚合掉數(shù)據(jù),減少 state 的存儲(chǔ)壓力。較之.apply(WindowFunction wf)會(huì)將窗口中的數(shù)據(jù)都存儲(chǔ)下來,最后一起計(jì)算要高效地多。aggregate()方法的第一個(gè)參數(shù)用于

這里的CountAgg實(shí)現(xiàn)了AggregateFunction接口,功能是統(tǒng)計(jì)窗口中的條數(shù),即遇到一條數(shù)據(jù)就加一。

  1. public class CountAgg implements AggregateFunction<User, Long, Long>{ 
  2.     @Override 
  3.     public Long createAccumulator() { 
  4.         return 0L; 
  5.     } 
  6.  
  7.     @Override 
  8.     public Long add(User value, Long accumulator) { 
  9.         return accumulator + 1; 
  10.     } 
  11.  
  12.     @Override 
  13.     public Long getResult(Long accumulator) { 
  14.         return accumulator; 
  15.     } 
  16.  
  17.     @Override 
  18.     public Long merge(Long a, Long b) { 
  19.         return a + b; 
  20.     } 

.aggregate(AggregateFunction af, WindowFunction wf) 的第二個(gè)參數(shù)WindowFunction將每個(gè) key每個(gè)窗口聚合后的結(jié)果帶上其他信息進(jìn)行輸出。我們這里實(shí)現(xiàn)的WindowResultFunction將用戶名,窗口,訪問量封裝成了UserViewCount進(jìn)行輸出。

  1. private static class WindowResultFunction implements WindowFunction<Long, UserViewCount, Tuple, TimeWindow> { 
  2.  
  3.  
  4.     @Override 
  5.     public void apply(Tuple key, TimeWindow window, Iterable<Long> input, Collector<UserViewCount> out) throws Exception { 
  6.         Long count = input.iterator().next(); 
  7.         out.collect(new UserViewCount(((Tuple1<String>)key).f0, window.getEnd(), count)); 
  8.     } 
  9.  
  10. @Data 
  11. @NoArgsConstructor 
  12. @AllArgsConstructor 
  13. @ToString 
  14. public static class UserViewCount { 
  15.     private String userName; 
  16.     private long windowEnd; 
  17.     private long viewCount; 
  18.  

TopN計(jì)算最活躍用戶

為了統(tǒng)計(jì)每個(gè)窗口下活躍的用戶,我們需要再次按窗口進(jìn)行分組,這里根據(jù)UserViewCount中的windowEnd進(jìn)行keyBy()操作。然后使用 ProcessFunction 實(shí)現(xiàn)一個(gè)自定義的 TopN 函數(shù) TopNHotItems 來計(jì)算點(diǎn)擊量排名前3名的用戶,并將排名結(jié)果格式化成字符串,便于后續(xù)輸出。

  1. .keyBy("windowEnd"
  2. .process(new TopNHotUsers(3)) 
  3. .print(); 

ProcessFunction 是 Flink 提供的一個(gè) low-level API,用于實(shí)現(xiàn)更高級(jí)的功能。它主要提供了定時(shí)器 timer 的功能(支持EventTime或ProcessingTime)。本案例中我們將利用 timer 來判斷何時(shí)收齊了某個(gè) window 下所有用戶的訪問數(shù)據(jù)。由于 Watermark 的進(jìn)度是全局的,在 processElement 方法中,每當(dāng)收到一條數(shù)據(jù)(ItemViewCount),我們就注冊(cè)一個(gè) windowEnd+1 的定時(shí)器(Flink 框架會(huì)自動(dòng)忽略同一時(shí)間的重復(fù)注冊(cè))。windowEnd+1 的定時(shí)器被觸發(fā)時(shí),意味著收到了windowEnd+1的 Watermark,即收齊了該windowEnd下的所有用戶窗口統(tǒng)計(jì)值。我們?cè)?onTimer() 中處理將收集的所有商品及點(diǎn)擊量進(jìn)行排序,選出 TopN,并將排名信息格式化成字符串后進(jìn)行輸出。

這里我們還使用了 ListState 來存儲(chǔ)收到的每條 UserViewCount 消息,保證在發(fā)生故障時(shí),狀態(tài)數(shù)據(jù)的不丟失和一致性。ListState 是 Flink 提供的類似 Java List 接口的 State API,它集成了框架的 checkpoint 機(jī)制,自動(dòng)做到了 exactly-once 的語(yǔ)義保證。

  1. private static class TopNHotUsers extends KeyedProcessFunction<Tuple, UserViewCount, String> { 
  2.  
  3.     private int topSize; 
  4.     private ListState<UserViewCount> userViewCountListState; 
  5.  
  6.     public TopNHotUsers(int topSize) { 
  7.         this.topSize = topSize; 
  8.     } 
  9.  
  10.     @Override 
  11.     public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception { 
  12.         super.onTimer(timestamp, ctx, out); 
  13.         List<UserViewCount> userViewCounts = new ArrayList<>(); 
  14.         for(UserViewCount userViewCount : userViewCountListState.get()) { 
  15.             userViewCounts.add(userViewCount); 
  16.         } 
  17.  
  18.         userViewCountListState.clear(); 
  19.  
  20.         userViewCounts.sort(new Comparator<UserViewCount>() { 
  21.             @Override 
  22.             public int compare(UserViewCount o1, UserViewCount o2) { 
  23.                 return (int)(o2.viewCount - o1.viewCount); 
  24.             } 
  25.         }); 
  26.  
  27.         // 將排名信息格式化成 String, 便于打印 
  28.         StringBuilder result = new StringBuilder(); 
  29.         result.append("====================================\n"); 
  30.         result.append("時(shí)間: ").append(new Timestamp(timestamp-1)).append("\n"); 
  31.         for (int i = 0; i < topSize; i++) { 
  32.             UserViewCount currentItem = userViewCounts.get(i); 
  33.             // No1:  商品ID=12224  瀏覽量=2413 
  34.             result.append("No").append(i).append(":"
  35.                     .append("  用戶名=").append(currentItem.userName) 
  36.                     .append("  瀏覽量=").append(currentItem.viewCount) 
  37.                     .append("\n"); 
  38.         } 
  39.         result.append("====================================\n\n"); 
  40.  
  41.         Thread.sleep(1000); 
  42.  
  43.         out.collect(result.toString()); 
  44.  
  45.     } 
  46.  
  47.     @Override 
  48.     public void open(org.apache.flink.configuration.Configuration parameters) throws Exception { 
  49.         super.open(parameters); 
  50.         ListStateDescriptor<UserViewCount> userViewCountListStateDescriptor = new ListStateDescriptor<>( 
  51.                 "user-state"
  52.                 UserViewCount.class 
  53.         ); 
  54.         userViewCountListState = getRuntimeContext().getListState(userViewCountListStateDescriptor); 
  55.  
  56.     } 
  57.  
  58.     @Override 
  59.     public void processElement(UserViewCount value, Context ctx, Collector<String> out) throws Exception { 
  60.         userViewCountListState.add(value); 
  61.         ctx.timerService().registerEventTimeTimer(value.windowEnd + 1000); 
  62.     } 

結(jié)果輸出

可以看到,每隔5秒鐘更新輸出一次數(shù)據(jù)。

 

參考

 

http://wuchong.me/blog/2018/11/07/use-flink-calculate-hot-items/

 

責(zé)任編輯:武曉燕 來源: Java大數(shù)據(jù)與數(shù)據(jù)倉(cāng)庫(kù)
相關(guān)推薦

2021-06-06 13:10:12

FlinkPvUv

2021-07-16 10:55:45

數(shù)倉(cāng)一體Flink SQL

2015-07-31 10:35:18

實(shí)時(shí)計(jì)算

2022-12-29 09:13:02

實(shí)時(shí)計(jì)算平臺(tái)

2019-06-27 09:12:43

FlinkStorm框架

2016-12-28 14:27:24

大數(shù)據(jù)Apache Flin搜索引擎

2015-08-31 14:27:52

2015-10-09 13:42:26

hbase實(shí)時(shí)計(jì)算

2019-11-21 09:49:29

架構(gòu)運(yùn)維技術(shù)

2021-06-03 08:10:30

SparkStream項(xiàng)目Uv

2017-09-26 09:35:22

2019-02-18 15:23:21

馬蜂窩MESLambda

2021-07-05 10:48:42

大數(shù)據(jù)實(shí)時(shí)計(jì)算

2011-10-28 09:05:09

2021-03-10 14:04:10

大數(shù)據(jù)計(jì)算技術(shù)

2016-11-02 09:02:56

交通大數(shù)據(jù)計(jì)算

2017-01-15 13:45:20

Docker大數(shù)據(jù)京東

2020-09-10 17:41:14

ClickHouse數(shù)據(jù)引擎

2022-08-24 09:19:03

美團(tuán)計(jì)算

2022-11-10 08:48:20

開源數(shù)據(jù)湖Arctic
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)