自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

使用 Spring Boot 和 Kafka Streams 進(jìn)行實(shí)時(shí)數(shù)據(jù)處理

開發(fā) 開發(fā)工具
Spring Boot 和 Apache Kafka Streams 是兩個(gè)強(qiáng)大的工具,它們使開發(fā)人員能夠創(chuàng)建可靠且可擴(kuò)展的實(shí)時(shí)數(shù)據(jù)處理應(yīng)用程序。

Spring Boot 和 Apache Kafka Streams 是兩個(gè)強(qiáng)大的工具,它們使開發(fā)人員能夠創(chuàng)建可靠且可擴(kuò)展的實(shí)時(shí)數(shù)據(jù)處理應(yīng)用程序。在這篇文章中,我們將了解 Spring Boot 和 Kafka Streams 如何協(xié)同工作,如何利用流處理來發(fā)揮應(yīng)用程序的優(yōu)勢。還將探索交互式查詢,這是一個(gè)相對較新且有趣的功能,為實(shí)時(shí)數(shù)據(jù)分析提供了新的機(jī)會(huì)。

安裝Kafka

Kafka可以從官方網(wǎng)站https://kafka.apache.org/downloads下載。一旦 Kafka 啟動(dòng)并運(yùn)行,就創(chuàng)建一個(gè)主題。

創(chuàng)建Spring Boot項(xiàng)目

創(chuàng)建一個(gè)新的 Spring Boot 項(xiàng)目,并且引入“Spring Web”和“Spring for Apache Kafka”兩個(gè)依賴項(xiàng)。

@SpringBootApplication
public class KafkaStreamsDemoApplication {
    public static void main(String[] args) {
        SpringApplication.run(KafkaStreamsDemoApplication.class, args);
    }
}

配置Kafka

接下來,在應(yīng)用程序的 application.properties 文件中配置 Kafka 創(chuàng)建的主題和代理地址。

spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=my-group
spring.kafka.consumer.auto-offset-reset=earliest

創(chuàng)建 Kafka 流處理器

下一步是構(gòu)建一個(gè) Kafka Streams 處理器,從“my-topic”讀取消息并處理,然后將結(jié)果輸出到另一個(gè)主題。使用 KStream API 來處理邏輯,如下:

@Bean
public Function<KStream<String, String>, KStream<String, String>> process() {
    return input -> input
            .mapValues(value -> value.toUpperCase())
            .to("output-topic");
}

交互式查詢

交互式查詢是 Kafka Streams 的創(chuàng)新新功能之一。借助此功能,可以立即查詢 Kafka Streams 應(yīng)用程序的狀態(tài)存儲(chǔ)。讓我們看看如何使用交互式查詢檢索存儲(chǔ)在狀態(tài)存儲(chǔ)中的大寫消息的數(shù)量。

@Autowiredprivate 
InteractiveQueryService interactiveQueryService;

@GetMapping("/messageCount")
public long getMessageCount() {
  ReadOnlyKeyValueStore<String, Long> store = interactiveQueryService.getQueryableStore("message-count-store", QueryableStoreTypes.keyValueStore());   
  return store.get("uppercase-message-count");
}

在此代碼中,我們使用 InteractiveQueryService 來獲取“message-count-store”的狀態(tài)存儲(chǔ)的句柄,然以查詢該存儲(chǔ)來獲取大寫消息的計(jì)數(shù)。

發(fā)送數(shù)據(jù)到Kafka

在實(shí)際應(yīng)用程序中,數(shù)據(jù)將從多個(gè)源發(fā)送到 Kafka。在本示例中,我們將使用一個(gè)簡單的 Kafka 生產(chǎn)者來與“my-topic”進(jìn)行通信。

@Autowired
private KafkaTemplate<String, String> kafkaTemplate;

public void produceMessage(String message) {
    kafkaTemplate.send("my-topic", message);
}

使用處理后的數(shù)據(jù)

使用 Kafka 消費(fèi)者最終從“output-topic”接收編輯后的數(shù)據(jù),如下:

@KafkaListener(topics = "output-topic", groupId = "my-group")
public void consume(String message) {
    System.out.println("Received: " + message);
}

總結(jié)

在本文中,我們了解了如何使用 Spring Boot 和 Kafka Streams 創(chuàng)建用于實(shí)時(shí)數(shù)據(jù)處理的應(yīng)用程序,并且引入了交互式查詢這一有趣的新功能。借助交互式查詢,可以通過處理實(shí)時(shí)數(shù)據(jù)以及實(shí)時(shí)查詢 Kafka Streams 應(yīng)用程序的狀態(tài)來創(chuàng)建交互式動(dòng)態(tài)應(yīng)用程序。

責(zé)任編輯:華軒 來源: 今日頭條
相關(guān)推薦

2023-11-21 08:11:48

Kafka的分區(qū)策略

2015-06-16 16:49:25

AWSKinesis實(shí)時(shí)數(shù)據(jù)處理

2023-09-26 09:29:08

Java數(shù)據(jù)

2022-11-09 10:26:48

智慧城市物聯(lián)網(wǎng)

2012-05-18 10:49:36

SAP大數(shù)據(jù)HANA

2021-07-29 08:00:00

開源數(shù)據(jù)技術(shù)

2023-09-27 15:34:48

數(shù)據(jù)編程

2019-08-19 14:24:39

數(shù)據(jù)分析Spark操作

2023-12-11 08:00:00

架構(gòu)FlinkDruid

2013-09-23 09:24:33

2021-07-08 09:51:18

MaxCompute SQL數(shù)據(jù)處理

2024-01-26 08:00:00

Python數(shù)據(jù)管道

2023-11-23 18:57:57

邊緣智能人工智能

2018-08-19 09:15:25

MongoDBGo 微服務(wù)

2019-08-21 09:48:37

數(shù)據(jù)處理

2023-05-25 08:24:46

Kafka大數(shù)據(jù)

2023-12-13 09:00:00

2012-09-06 11:34:15

IBMdw

2022-01-26 09:00:00

數(shù)據(jù)庫SnowparkSQL

2022-09-22 10:53:38

實(shí)時(shí)數(shù)據(jù)ML 模型
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號