使用Apache Kafka創(chuàng)建事件驅動的Spring Boot微服務
當今技術潮流中,事件驅動的微服務成為了一種轉型的力量,其中的微服務模塊通過事件實現(xiàn)無縫通信,提高系統(tǒng)的可擴展性、可適應性和敏捷性。消息隊列在事件驅動架構中起著重要作用,Apache Kafka 作為高性能、可擴展和可靠的消息隊列系統(tǒng),被廣泛應用于實時數(shù)據(jù)流處理和事件驅動架構中,因此成為了事件驅動架構中的核心技術之一。
本文介紹如何使用 Apache Kafka 構建事件驅動的微服務架構。
1 事件驅動架構簡介
事件驅動架構(EDA)是一種軟件設計模式,它使系統(tǒng)內部的組件通過生成和消費事件來相互通信。在這種架構中,事件表示系統(tǒng)內發(fā)生的重要事件,并可以在其他組件中觸發(fā)相應的操作。這種方法可以實現(xiàn)松散耦合的系統(tǒng),提高系統(tǒng)的可擴展性,并能夠快速響應實時變化。
2 Apache Kafka 簡介
Apache Kafka 是一個分布式、容錯的消息系統(tǒng),可以處理大量數(shù)據(jù)和實時流。它采用發(fā)布-訂閱模型,生產者將消息發(fā)布到主題中,消費者通過訂閱這些主題來接收消息。Kafka 的持久存儲和副本機制確保了數(shù)據(jù)的可靠性和容錯能力,是構建實時數(shù)據(jù)流處理和事件驅動架構的理想選擇。
3 設置環(huán)境
在深入研究構建微服務之前,先確保已經設置了有效的工作環(huán)境。需要:
- Java 開發(fā)工具包(JDK)
- Gradle(如果您使用 gradle-wrapper,則無需安裝系統(tǒng)級 gradle)
- Docker
- 運行實例的 Apache Kafka
- 您喜歡的代碼編輯器(例如 IntelliJ IDEA、Eclipse、VSCode)
4 具體步驟
4.1 步驟 1:設置 Spring Boot 項目
使用 Spring Initializer (https://start.spring.io/)創(chuàng)建一個新的 Spring Boot 項目,確保包含必要的依賴項。在項目中集成 Spring Web 以管理 Web 功能,并集成 Spring Boot DevTools 以提高開發(fā)效率。選擇 Java 21 作為開發(fā)環(huán)境,以兼容新的字符串模板和 Java 虛擬線程。
創(chuàng)建和保存項目文件,然后使用集成開發(fā)環(huán)境(IDE)打開它們。
在開始項目實現(xiàn)之前,為了保證最佳性能,可以在 application.properties 文件中啟用 Spring Boot 虛擬線程。通過這個配置,在處理 HTTP 連接時,可以在默認線程執(zhí)行器中使用虛擬線程。這樣可以提高應用程序的響應速度和處理能力,特別是在高并發(fā)場景下。
spring.threads.virtual.enabled=true
4.2 步驟 2:實現(xiàn)文本生產者微服務
在此微服務中,將處理文本數(shù)據(jù)的上傳,并將其發(fā)布到名為 TEXT_DATA 的 Apache Kafka 主題中。
在 application.properties 中添加所需的 Apache Kafka 屬性
# src/main/resources/application.properties
spring.kafka.bootstrap-servers=localhost:9092
創(chuàng)建所需的組件:
TextDataProducer 類:
package com.example.kafkaapp;
import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaAdmin;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.time.Duration;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.Period;
import java.time.temporal.ChronoUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Logger;
import java.util.stream.Stream;
import static java.lang.StringTemplate.STR;
@Component
public class TextDataProducer {
Logger logger = Logger.getLogger(getClass().getName());
// 主題配置常量
private final static int PARTITION_COUNT = 8;
private final static String TOPIC = "TEXT-DATA";
private final static short REPLICATION_FACTOR = 1;
private final KafkaTemplate<String, String> kafkaTemplate;
public TextDataProducer(KafkaTemplate<String, String> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
@Autowired
public void configureTopic(KafkaAdmin kafkaAdmin) {
kafkaAdmin.createOrModifyTopics(new NewTopic(TOPIC, PARTITION_COUNT, REPLICATION_FACTOR));
}
private void sendTextMessage(String text, int lineIndex) {
if (text == null || text.isEmpty()) {
return;
}
// 將 Link 消息發(fā)送到主題,根據(jù)行索引在分區(qū)上分發(fā)
kafkaTemplate.send(TOPIC, "KEY-" + (lineIndex % PARTITION_COUNT), text);
}
public void sendContentOf(File file) {
Instant before = Instant.now();
try (Stream<String> lines = Files.lines(file.toPath())) {
AtomicInteger counter = new AtomicInteger(0);
lines.forEach(line -> sendTextMessage(line, counter.getAndIncrement()));
Instant after = Instant.now();
Duration duration = Duration.between(before, after);
logger.info(STR."Streamed \{counter.get()} lines in \{duration.toMillis()} millisecond");
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
- 該類負責生產文本數(shù)據(jù)并將其發(fā)送到名為 TEXT-DATA 的 Kafka 主題。
- 它使用 @Component 進行注解,使其成為 Spring 管理的組件。
- 它具有主題配置的常量,例如 PARTITION_COUNT、TOPIC 和 REPLICATION_FACTOR。
- configureTopic 方法使用 KafkaAdmin 配置 Kafka 主題。它使用指定的設置創(chuàng)建或修改主題。
- sendTextMessage 方法將文本消息發(fā)送到 Kafka 主題,并根據(jù)行索引在分區(qū)上分布消息。
TextDataProducer 類的最重要部分是 sendContentOf(File file)方法。此方法負責逐行讀取給定文件的內容,并將每行發(fā)送到名為 TEXT-DATA 的 Kafka 主題。
TextDataController 類
package com.example.kafkaapp;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.multipart.MultipartFile;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Optional;
@RestController
public class TextDataController {
private final TextDataProducer producer;
public TextDataController(TextDataProducer producer) {
this.producer = producer;
}
@PostMapping("/upload")
public Optional<String> uploadTextFile(@RequestParam("file") MultipartFile file) throws IOException {
Path tempFile = Files.createTempFile(file.getOriginalFilename(), null);
file.transferTo(tempFile);
Thread.ofVirtual().start(() -> producer.sendContentOf(tempFile.toFile()));
return Optional.of(tempFile.toString());
}
}
- 該類是 Spring REST 控制器,用于處理 HTTP 請求。
- 它通過構造函數(shù)將 TextDataProducer bean 注入進來。
- uploadTextFile 方法被映射用于處理 /upload 的 POST 請求。它接受一個 multipart 文件,并上傳該文件。
- 它創(chuàng)建一個臨時文件,將上傳的文件內容轉移到該文件中,使用 TextDataProducer 將臨時文件的內容發(fā)送到 Kafka,并返回臨時文件的路徑。
TextProducerApplication 類
package com.example.kafkaapp;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class TextProducerApplication {
public static void main(String[] args) {
SpringApplication.run(TextProducerApplication.class, args);
}
}
- 是 Spring Boot 應用程序的入口點。
- 它使用 @SpringBootApplication 注解,該注解組合了 @Configuration、@EnableAutoConfiguration 和 @ComponentScan。
- main 方法啟動 Spring Boot 應用程序。
實現(xiàn)了文本生產者后,可以使用以下命令運行項目:
./gradlew bootRun
使用 Rest 客戶端(如 Postman 或 SwaggerUI)將示例文本文件發(fā)布到 http://localhost:8080/upload 端點。此外,也可以在 IntelliJ IDEA 中使用以下原始 HTTP 命令發(fā)布文件:
### 發(fā)送包含文本和文件字段的表單
POST http://localhost:8080/upload HTTP/1.1
Content-Type: multipart/form-data; boundary=boundary
--boundary
Content-Disposition: form-data; name="file"; filename="shakespeares.txt"
// 將上傳 "input.txt "文件
< /Users/mustafaguc/Desktop/kafka-demo-content/shakespeares.txt
--boundary
4.3 步驟 3:docker化 TextData 生產者
創(chuàng)建 Dockerfile,將微服務打包成 Docker 鏡像
FROM openjdk:21-slim
WORKDIR /app
COPY build/libs/text-producer-1.0.jar app.jar
EXPOSE 8080
ENTRYPOINT ["java", "-jar", "app.jar"]
在構建 Docker 鏡像之前,首先需要使用 Gradle 構建微服務:
./gradlew assemble
成功構建單個微服務 jar 文件后,就可以在項目根目錄中構建 Docker 鏡像:
docker build . -t text-producer
構建成功后,運行以下命令啟動 Docker 鏡像:
docker run -it -p 8080:8080 text-producer
運行上述命令后,應該會看到以下日志:
c.e.kafkaapp.TextProducerApplication : Starting TextProducerApplication using Java 21.0.1 with PID 4871 (/Users/mustafaguc/projects/java/text-producer/build/classes/java/main started by mustafaguc in /Users/mustafaguc/projects/java/text-producer)
c.e.kafkaapp.TextProducerApplication : No active profile set, falling back to 1 default profile: "default"
o.s.b.w.embedded.tomcat.TomcatWebServer : Tomcat initialized with port 8080 (http)
o.apache.catalina.core.StandardService : Starting service [Tomcat]
o.apache.catalina.core.StandardEngine : Starting Servlet engine: [Apache Tomcat/10.1.17]
o.a.c.c.C.[Tomcat].[localhost].[/] : Initializing Spring embedded WebApplicationContext
w.s.c.ServletWebServerApplicationContext : Root WebApplicationContext: initialization completed in 307 ms
o.a.k.clients.admin.AdminClientConfig : AdminClientConfig values:
........
o.a.k.clients.admin.AdminClientConfig : These configurations '[sasl.jaas.config, idompotence.enabled]' were supplied but are not used yet.
o.a.kafka.common.utils.AppInfoParser : Kafka version: 3.6.1
o.a.kafka.common.utils.AppInfoParser : Kafka commitId: 5e3c2b738d253ff5
o.a.kafka.common.utils.AppInfoParser : Kafka startTimeMs: 1708262321726
o.a.kafka.common.utils.AppInfoParser : App info kafka.admin.client for adminclient-1 unregistered
o.apache.kafka.common.metrics.Metrics : Metrics scheduler closed
o.apache.kafka.common.metrics.Metrics : Closing reporter org.apache.kafka.common.metrics.JmxReporter
o.apache.kafka.common.metrics.Metrics : Metrics reporters closed
o.s.b.w.embedded.tomcat.TomcatWebServer : Tomcat started on port 8080 (http) with context path ''
c.e.kafkaapp.TextProducerApplication : Started TextProducerApplication in 0.764 seconds (process running for 0.976)
目前,已經實現(xiàn)了生產者部分。我們將在下篇文章中介紹消費者和聚合器微服務。