自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

Springboot整合Kafka Stream實時統(tǒng)計數(shù)據(jù)

開發(fā) 前端 Kafka
Kafka Streams是一個客戶端類庫,用于處理和分析存儲在Kafka中的數(shù)據(jù)。它建立在流式處理的一些重要的概念之上:如何區(qū)分事件時間和處理時間、Windowing的支持、簡單高效的管理和實時查詢應(yīng)用程序狀態(tài)。

[[417927]]

環(huán)境:springboot2.3.12.RELEASE + kafka_2.13-2.7.0 + zookeeper-3.6.2

Kafka Stream介紹

Kafka在0.10版本推出了Stream API,提供了對存儲在Kafka內(nèi)的數(shù)據(jù)進行流式處理和分析的能力。

流式計算一般被用來和批量計算做比較。批量計算往往有一個固定的數(shù)據(jù)集作為輸入并計算結(jié)果。而流式計算的輸入往往是“無界”的(Unbounded Data),持續(xù)輸入的,即永遠(yuǎn)拿不到全量數(shù)據(jù)去做計算;同時,計算結(jié)果也是持續(xù)輸出的,只能拿到某一個時刻的結(jié)果,而不是最終的結(jié)果。

Kafka Streams是一個客戶端類庫,用于處理和分析存儲在Kafka中的數(shù)據(jù)。它建立在流式處理的一些重要的概念之上:如何區(qū)分事件時間和處理時間、Windowing的支持、簡單高效的管理和實時查詢應(yīng)用程序狀態(tài)。

Kafka Streams的門檻非常低:和編寫一個普通的Kafka消息處理程序沒有太大的差異,可以通過多進程部署來完成擴容、負(fù)載均衡、高可用(Kafka Consumer的并行模型)。

Kafka Streams的一些特點:

  • 被設(shè)計成一個簡單的、輕量級的客戶端類庫,能夠被集成到任何Java應(yīng)用中
  • 除了Kafka之外沒有任何額外的依賴,利用Kafka的分區(qū)模型支持水平擴容和保證順序性
  • 通過可容錯的狀態(tài)存儲實現(xiàn)高效的狀態(tài)操作(windowed joins and aggregations)
  • 支持exactly-once語義
  • 支持紀(jì)錄級的處理,實現(xiàn)毫秒級的延遲
  • 提供High-Level的Stream DSL和Low-Level的Processor API

Stream Processing Topology流處理拓?fù)?/h3>
  • 流是Kafka Streams提供的最重要的抽象:它表示一個無限的、不斷更新的數(shù)據(jù)集。流是不可變數(shù)據(jù)記錄的有序、可重放和容錯序列,其中數(shù)據(jù)記錄定義為鍵值對。
  • Stream Processing Application是使用了Kafka Streams庫的應(yīng)用程序。它通過processor topologies定義計算邏輯,其中每個processor topology都是多個stream processor(節(jié)點)通過stream組成的圖。
  • A stream processor 是處理器拓?fù)渲械墓?jié)點;它表示一個處理步驟,通過每次從拓?fù)渲械纳嫌翁幚砥鹘邮找粋€輸入記錄,將其操作應(yīng)用于該記錄,來轉(zhuǎn)換流中的數(shù)據(jù),并且隨后可以向其下游處理器生成一個或多個輸出記錄。

有兩種特殊的processor:

Source Processor 源處理器是一種特殊類型的流處理器,它沒有任何上游處理器。它通過使用來自一個或多個kafka topic的記錄并將其轉(zhuǎn)發(fā)到其下游處理器,從而從一個或多個kafka topic生成其拓?fù)涞妮斎肓鳌?/p>

Sink Processor 接收器處理器是一種特殊類型的流處理器,沒有下游處理器。它將從其上游處理器接收到的任何記錄發(fā)送到指定的kafka topic。

Springboot整合Kafka Stream實時統(tǒng)計數(shù)據(jù)

相關(guān)的核心概念查看如下鏈接

Springboot整合Kafka Stream實時統(tǒng)計數(shù)據(jù)

下面演示Kafka Stream 在Springboot中的應(yīng)用

依賴

  1. <dependency> 
  2.   <groupId>org.springframework.boot</groupId> 
  3.   <artifactId>spring-boot-starter-web</artifactId> 
  4.   </dependency> 
  5. <dependency> 
  6.   <groupId>org.springframework.kafka</groupId> 
  7.   <artifactId>spring-kafka</artifactId> 
  8. </dependency> 
  9. <dependency> 
  10.   <groupId>org.apache.kafka</groupId> 
  11.   <artifactId>kafka-streams</artifactId> 
  12. </dependency> 

配置

  1. server: 
  2.   port: 9090 
  3. spring: 
  4.   application: 
  5.     name: kafka-demo 
  6.   kafka: 
  7.     streams: 
  8.       application-id: ${spring.application.name
  9.       properties: 
  10.         spring.json.trusted.packages: '*' 
  11.     bootstrap-servers: 
  12.     - localhost:9092 
  13.     - localhost:9093 
  14.     - localhost:9094 
  15.     producer: 
  16.       acks: 1 
  17.       retries: 10 
  18.       key-serializer: org.apache.kafka.common.serialization.StringSerializer 
  19.       value-serializer: org.springframework.kafka.support.serializer.JsonSerializer #org.apache.kafka.common.serialization.StringSerializer 
  20.       properties: 
  21.         spring.json.trusted.packages: '*' 
  22.     consumer: 
  23.       key-deserializer: org.apache.kafka.common.serialization.StringDeserializer 
  24.       value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer #org.apache.kafka.common.serialization.StringDeserializer 
  25.       enable-auto-commitfalse 
  26.       group-id: ConsumerTest 
  27.       auto-offset-reset: latest 
  28.       properties: 
  29.         session.timeout.ms: 12000 
  30.         heartbeat.interval.ms: 3000 
  31.         max.poll.records: 100 
  32.         spring.json.trusted.packages: '*' 
  33.     listener: 
  34.       ack-mode: manual-immediate 
  35.       type: batch 
  36.       concurrency: 8 
  37.     properties: 
  38.       max.poll.interval.ms: 300000 

消息發(fā)送

  1. @Service 
  2. public class MessageSend { 
  3.   @Resource 
  4.   private KafkaTemplate<String, Message> kafkaTemplate ; 
  5.   public void sendMessage2(Message message) { 
  6.     kafkaTemplate.send(new ProducerRecord<String, Message>("test", message)).addCallback(result -> { 
  7.       System.out.println("執(zhí)行成功..." + Thread.currentThread().getName()) ; 
  8.     }, ex -> { 
  9.       System.out.println("執(zhí)行失敗") ; 
  10.       ex.printStackTrace() ; 
  11.     }) ; 
  12.   } 

消息監(jiān)聽

  1. @KafkaListener(topics = {"test"}) 
  2. public void listener2(List<ConsumerRecord<String, Message>> records, Acknowledgment ack) { 
  3.   for (ConsumerRecord<String, Message> record : records) { 
  4.     System.out.println(this.getClass().hashCode() + ", Thread" + Thread.currentThread().getName() + ", key: " + record.key() + ", 接收到消息:" + record.value() + ", patition: " + record.partition() + ", offset: " + record.offset()) ; 
  5.   } 
  6.   try { 
  7.     TimeUnit.SECONDS.sleep(0) ; 
  8.   } catch (InterruptedException e) { 
  9.     e.printStackTrace(); 
  10.   } 
  11.   ack.acknowledge() ; 
  12.      
  13. @KafkaListener(topics = {"demo"}) 
  14. public void listenerDemo(List<ConsumerRecord<String, Message>> records, Acknowledgment ack) { 
  15.   for (ConsumerRecord<String, Message> record : records) { 
  16.     System.out.println("Demo Topic: " + this.getClass().hashCode() + ", Thread" + Thread.currentThread().getName() + ", key: " + record.key() + ", 接收到消息:" + record.value() + ", patition: " + record.partition() + ", offset: " + record.offset()) ; 
  17.   } 
  18.   ack.acknowledge() ; 

Kafka Stream處理

消息轉(zhuǎn)換并轉(zhuǎn)發(fā)其它Topic

  1. @Bean 
  2. public KStream<Object, Object> kStream(StreamsBuilder streamsBuilder) { 
  3.   KStream<Object, Object> stream = streamsBuilder.stream("test"); 
  4.   stream.map((key, value) -> { 
  5.     System.out.println("原始消息內(nèi)容:" + new String((byte[]) value, Charset.forName("UTF-8"))) ; 
  6.     return new KeyValue<>(key"{\"title\": \"123123\", \"message\": \"重新定義內(nèi)容\"}".getBytes(Charset.forName("UTF-8"))) ; 
  7.   }).to("demo") ; 
  8.   return stream; 

執(zhí)行結(jié)果:

Springboot整合Kafka Stream實時統(tǒng)計數(shù)據(jù)

Stream對象處理

  1. @Bean 
  2. public KStream<String, Message> kStream4(StreamsBuilder streamsBuilder) { 
  3.   JsonSerde<Message> jsonSerde = new JsonSerde<>() ; 
  4.   JsonDeserializer<Message> descri = (JsonDeserializer<Message>) jsonSerde.deserializer() ; 
  5.   descri.addTrustedPackages("*") ; 
  6.   KStream<String, Message> stream = streamsBuilder.stream("test", Consumed.with(Serdes.String(), jsonSerde)); 
  7.   stream.map((key, value) -> { 
  8.     value.setTitle("XXXXXXX") ; 
  9.     return new KeyValue<>(key, value) ; 
  10.   }).to("demo", Produced.with(Serdes.String(), jsonSerde)) ; 
  11.   return stream; 

執(zhí)行結(jié)果:

Springboot整合Kafka Stream實時統(tǒng)計數(shù)據(jù)

分組處理

  1. @Bean 
  2. public KStream<String, Message> kStream5(StreamsBuilder streamsBuilder) { 
  3.   JsonSerde<Message> jsonSerde = new JsonSerde<>() ; 
  4.   JsonDeserializer<Message> descri = (JsonDeserializer<Message>) jsonSerde.deserializer() ; 
  5.   descri.addTrustedPackages("*") ; 
  6.   KStream<String, Message> stream = streamsBuilder.stream("test", Consumed.with(Serdes.String(), jsonSerde)); 
  7.   stream.selectKey(new KeyValueMapper<String, Message, String>() { 
  8.     @Override 
  9.     public String apply(String key, Message value) { 
  10.       return value.getOrgCode() ; 
  11.     } 
  12.   }) 
  13.   .groupByKey(Grouped.with(Serdes.String(), jsonSerde)) 
  14.   .count() 
  15.   .toStream().print(Printed.toSysOut()); 
  16.   return stream; 

執(zhí)行結(jié)果:

Springboot整合Kafka Stream實時統(tǒng)計數(shù)據(jù)

聚合

  1. @Bean 
  2. public KStream<String, Message> kStream6(StreamsBuilder streamsBuilder) { 
  3.   JsonSerde<Message> jsonSerde = new JsonSerde<>() ; 
  4.   JsonDeserializer<Message> descri = (JsonDeserializer<Message>) jsonSerde.deserializer() ; 
  5.   descri.addTrustedPackages("*") ; 
  6.   KStream<String, Message> stream = streamsBuilder.stream("test", Consumed.with(Serdes.String(), jsonSerde)); 
  7.   stream.selectKey(new KeyValueMapper<String, Message, String>() { 
  8.     @Override 
  9.     public String apply(String key, Message value) { 
  10.       return value.getOrgCode() ; 
  11.     } 
  12.   }) 
  13.   .groupByKey(Grouped.with(Serdes.String(), jsonSerde)) 
  14.   .aggregate(() -> 0L, (key, value ,aggValue) -> { 
  15.     System.out.println("key = " + key + ", value = " + value + ", agg = " + aggValue) ; 
  16.     return aggValue + 1 ; 
  17.   }, Materialized.<String, Long, KeyValueStore<Bytes,byte[]>>as("kvs").withValueSerde(Serdes.Long())) 
  18.   .toStream().print(Printed.toSysOut()); 
  19.   return stream; 

執(zhí)行結(jié)果:

Springboot整合Kafka Stream實時統(tǒng)計數(shù)據(jù)

Filter過濾數(shù)據(jù)

  1. @Bean 
  2. public KStream<String, Message> kStream7(StreamsBuilder streamsBuilder) { 
  3.   JsonSerde<Message> jsonSerde = new JsonSerde<>() ; 
  4.   JsonDeserializer<Message> descri = (JsonDeserializer<Message>) jsonSerde.deserializer() ; 
  5.   descri.addTrustedPackages("*") ; 
  6.   KStream<String, Message> stream = streamsBuilder.stream("test", Consumed.with(Serdes.String(), jsonSerde)); 
  7.   stream.selectKey(new KeyValueMapper<String, Message, String>() { 
  8.     @Override 
  9.     public String apply(String key, Message value) { 
  10.       return value.getOrgCode() ; 
  11.     } 
  12.   }) 
  13.   .groupByKey(Grouped.with(Serdes.String(), jsonSerde)) 
  14.   .aggregate(() -> 0L, (key, value ,aggValue) -> { 
  15.     System.out.println("key = " + key + ", value = " + value + ", agg = " + aggValue) ; 
  16.     return aggValue + 1 ; 
  17.   }, Materialized.<String, Long, KeyValueStore<Bytes,byte[]>>as("kvs").withValueSerde(Serdes.Long())) 
  18.   .toStream() 
  19.   .filter((key, value) -> !"2".equals(key)) 
  20.   .print(Printed.toSysOut()); 
  21.   return stream; 

執(zhí)行結(jié)果:

Springboot整合Kafka Stream實時統(tǒng)計數(shù)據(jù)

過濾Key不等于"2"

分支多流處理

  1. @Bean 
  2. public KStream<String, Message> kStream8(StreamsBuilder streamsBuilder) { 
  3.   JsonSerde<Message> jsonSerde = new JsonSerde<>() ; 
  4.   JsonDeserializer<Message> descri = (JsonDeserializer<Message>) jsonSerde.deserializer() ; 
  5.   descri.addTrustedPackages("*") ; 
  6.   KStream<String, Message> stream = streamsBuilder.stream("test", Consumed.with(Serdes.String(), jsonSerde)); 
  7.   // 分支,多流處理 
  8.   KStream<String, Message>[] arrStream = stream.branch( 
  9.     (key, value) -> "男".equals(value.getSex()),  
  10.     (key, value) -> "女".equals(value.getSex())); 
  11.   Stream.of(arrStream).forEach(as -> { 
  12.     as.foreach((key, message) -> { 
  13.       System.out.println(Thread.currentThread().getName() + ", key = " + key + ", message = " + message) ; 
  14.     }); 
  15.   }); 
  16.   return stream; 

執(zhí)行結(jié)果:

Springboot整合Kafka Stream實時統(tǒng)計數(shù)據(jù)

多字段分組

不能使用多個selectKey,后面的會覆蓋前面的

  1. @Bean 
  2. public KStream<String, Message> kStreamM2(StreamsBuilder streamsBuilder) { 
  3.   JsonSerde<Message> jsonSerde = new JsonSerde<>() ; 
  4.   JsonDeserializer<Message> descri = (JsonDeserializer<Message>) jsonSerde.deserializer() ; 
  5.   descri.addTrustedPackages("*") ; 
  6.   KStream<String, Message> stream = streamsBuilder.stream("test", Consumed.with(Serdes.String(), jsonSerde)); 
  7.   stream 
  8.   .selectKey(new KeyValueMapper<String, Message, String>() { 
  9.     @Override 
  10.     public String apply(String key, Message value) { 
  11.       System.out.println(Thread.currentThread().getName()) ; 
  12.       return value.getTime() + " | " + value.getOrgCode() ; 
  13.     } 
  14.   }) 
  15.   .groupByKey(Grouped.with(Serdes.String(), jsonSerde)) 
  16.   .count() 
  17.   .toStream().print(Printed.toSysOut()); 
  18.   return stream; 

執(zhí)行結(jié)果:

Springboot整合Kafka Stream實時統(tǒng)計數(shù)據(jù)

 

責(zé)任編輯:姜華 來源: 今日頭條
相關(guān)推薦

2020-04-24 09:01:23

網(wǎng)絡(luò)安全數(shù)據(jù)泄露黑客

2011-10-09 10:33:12

2015-07-29 11:21:13

JavaScript統(tǒng)計數(shù)據(jù)

2023-07-18 10:43:14

物聯(lián)網(wǎng)IOT

2010-04-21 11:27:55

Oracle數(shù)據(jù)庫

2009-12-24 08:51:08

Linux用戶

2014-12-31 10:56:57

Windows PhoWP

2019-06-27 05:00:26

物聯(lián)網(wǎng)統(tǒng)計數(shù)據(jù)IOT

2022-10-25 09:11:47

物聯(lián)網(wǎng)IoT工業(yè)物聯(lián)網(wǎng)

2018-10-22 06:27:32

網(wǎng)絡(luò)犯罪數(shù)據(jù)泄露攻擊

2022-10-26 15:17:58

數(shù)字存儲數(shù)據(jù)中心

2020-05-09 22:54:48

物聯(lián)網(wǎng)安全物聯(lián)網(wǎng)IOT

2024-03-06 08:03:09

2022-04-28 07:31:41

Springkafka數(shù)據(jù)量

2022-06-20 09:49:25

Linux發(fā)行版

2019-07-22 05:01:38

物聯(lián)網(wǎng)IOT技術(shù)

2022-06-20 14:19:55

FedoraEPELLinux

2024-05-29 12:26:27

2020-11-06 22:48:01

物聯(lián)網(wǎng)數(shù)據(jù)技術(shù)

2021-02-18 16:10:03

物聯(lián)網(wǎng)工業(yè)4.0人工智能
點贊
收藏

51CTO技術(shù)棧公眾號