作者 | 蔡柱梁
審校 | 重樓
目標
- 了解 Kafka 的重要概念
- 搭建 Kafka 服務端
- 使用SpringBoot 實現(xiàn)簡單的 Demo
1 了解 Kafka 的重要概念
Kafka 是使用 Scala 語言開發(fā)的一個多分區(qū)、多副本且基于 ZooKeeper 協(xié)調(diào)的分布式消息系統(tǒng)。目前,它的定位是一個分布式流式處理平臺。
Kafka 在我們工作中最常扮演的三個角色:
- 消息系統(tǒng)
Kafka 和傳統(tǒng)的消息中間件一樣具有系統(tǒng)解耦、冗余存儲、流量削峰、異步通信等功能。
- 存儲系統(tǒng)
Kafka 會將消息持久化到磁盤,并且有多副本機制,有效降低了數(shù)據(jù)丟失的風險。有時,我們也可以使用它來存儲數(shù)據(jù),只需要把對應的數(shù)據(jù)保留策略設置成為“永久”即可。
- 流式處理平臺
Kafka 不僅為很多流式處理框架(如:Storm、Spark、Flink 等)提供了可靠的數(shù)據(jù)來源,還提供了一個完整的流式處理類庫。
1.1 基本概念
上圖(圖出自于《深入理解Kafka核心設計與實踐原理》)體現(xiàn)了 Kafka 的整體架構(gòu),Producer 發(fā)送消息,Kafka 將元數(shù)據(jù)存儲在 ZK 中并交由ZK 管理,Consumer 通過拉模式獲取消息。
- Producer
生產(chǎn)者,消息的投遞方,負責創(chuàng)建消息并投遞到 Kafka 中。
- Broker
Kafka 服務實例
- Consumer
消費者,處理消息的一方
上面的概念都是物理層面上的,但是在實際使用過程中還有很多邏輯方面的定義,這些概念也是需要了解的。如果不了解,就算勉強寫出了代碼,但是自己還是一臉懵不知道自己都定義了什么,它們都有什么意義,估計離生產(chǎn)故障就不遠了。
接下來我們再去了解三個重要的邏輯概念:
- Topic(主題)
生產(chǎn)者創(chuàng)建消息是要發(fā)送給特定的主題的,而消費者拉取消息也是要指定主題的。消息就是通過主題來歸類的。
- Partition(分區(qū))
一個Topic 可以有多個 Patition,而一個 Partition 只屬于一個 Topic。同一個 Topic 下,不同 Partition 存儲的消息是不同的。
- Offset(偏移量)
Kafka 的消息是可以持久化并反復消費的,這是因為在每個分區(qū)中,當有消息寫入就會像追加日志那樣順序?qū)懭耄樞騃O的寫入性能是十分好的),通過Offset 來記錄對應消息所在的位置。因此,Offset 是消息在 Partition 中的唯一標識,并且能看出同一個 Partition 內(nèi)的消息的先后順序,我們稱之為 “Kafka 保證消息在分區(qū)內(nèi)是有序的”。
為了更好,更直觀體現(xiàn)上面三者的關系,我們先一起看下圖(圖出自于《深入理解Kafka核心設計與實踐原理》)
該圖展示了一個擁有4個 Partition 的 Topic,而分區(qū)里面的阿拉伯數(shù)字就是 Offset(也表示著一條消息),虛線部分代表新消息可以插入的位置。每條消息在發(fā)送到 Broker 之前,會先計算當前消息應該發(fā)送到哪個 Partition。因此,只要我們設置合理,消息可以均勻地分配在不同的 Partition 上,當發(fā)現(xiàn)請求數(shù)量激增時,我們也可以考慮通過適當增加 Partition(Broker 也要增加)的方式,從而降低每個 Broker 的 I/O 壓力。
另外,為了降低消息丟失的風險,Kafka 為 Partition 引進了多副本(Replica)機制,通過增加副本數(shù)量來提高容災能力。副本之間采用的是“一主多從”的設計,其中 Leader 負責讀寫請求,F(xiàn)ollower 則僅負責同步 Leader 的消息(這種設計方式,大家應該要意識到會存在同步滯后的問題),并且副本處于不同的 Broker 中,當 Leader 出現(xiàn)故障(一般是因為其所在的 Broker 出現(xiàn)故障導致的)時,就從 Follower 中重新選舉出新的 Leader 提供服務。當選出新的 Leader 并恢復服務后,Consumer 可以通過之前自己保存的 Offset 來繼續(xù)拉取消息消費。
結(jié)合到目前為止我們所知道的知識點,一起看下 4 個 Broker 的 Kafka 集群中,某一個 Topic 有三個 Partition,其副本因子為 3(副本因子為3就是每個 Partition 有 3 個副本,一個 Leader,兩個 Follower)的架構(gòu)圖(圖出自于《深入理解Kafka核心設計與實踐原理》)。
1.2 Message 與 Partition
在 1.1 小節(jié)中,我們已經(jīng)知道一條消息只會存在一個 Partition中(只管 Leader,不管 Follower),而 Offset 則是消息在 Partition 中的唯一標識。而在本章節(jié),我們將一起更深入地了解消息與 Partition 的關系,還有副本間同步數(shù)據(jù)所衍生的一些概念。
上面有提到 Kafka 的多副本機制是 Leader 提供讀寫,而 Flower 是需要同步 Leader 的數(shù)據(jù)的,那么具體是怎樣的呢?請看下圖(單主題單分區(qū)3副本):
當Producer 不斷往 Leader 寫入消息時,F(xiàn)lower 會不斷去 Leader 拉取消息,但是每臺機器的性能會有出入,所以同步也有差異,正如上圖這般。對于 Consumer 而言,只有 HW 之前的消息是可見可拉取消費的,這樣做有個好處就是當發(fā)生故障轉(zhuǎn)移時,Consumer 的 Offset 也不會發(fā)生數(shù)組越界的問題。這種做法是 Kafka 權(quán)衡利弊后給出的數(shù)據(jù)可靠性與性能平衡的方案,即不采取同步復制(性能差,對于高并發(fā)場景是災難般的設計),也不采取異步復制(完全異步,數(shù)據(jù)丟失問題突出)。
當然,對于Producer 而言就是消息丟失了,有時我們需要確保消息百分百投遞,這樣不就有問題了嗎?不急,Kafka 可以在 Producer 的配置上配置 acks=-1 + min.insync.replicas=n(n 大于 1),這樣配置后,只有消息被寫入所有副本后,Kafka 服務端才會返回 ack 給 Producer。
下面來梳理下上面提及的幾個概念:
- HW(Heigh Watermark)
它標識了Consumer 可以拉取消息的最高水位,客戶端拉取的 Offset 必須小于 HW。
- LEO(Log End Offset)
這個標記位標識下一條寫入的消息應該存放的位置。
- AR(Assigned Rplicas)
所有副本的統(tǒng)稱
- ISR(In-Syns Rplicas)
與Leader 保持一定程度同步的 Flower 集合。這個一定程度指的是在可容忍滯后范圍內(nèi),這個可容忍范圍可以通過配置修改。
- OSR(Out-of-Sync Rplicas)
同步滯后超過了容忍范圍的Flower 集合。
2 搭建 Kafka 服務端
這里僅以單節(jié)點為例,不配置集群。
2.1 安裝 ZooKeeper
在第一章節(jié),我們知道 Kafka 會將元數(shù)據(jù)交由 ZK 管理,所以我們要先安裝好 ZK。
1.首先檢查自己的Linux 是否安裝好了 yum 工具
rpm -qa|grep yum
使用 yum 安裝好 wget
2.下載 ZK
wget https://archive.apache.org/dist/zookeeper/zookeeper-3.4.6/zookeeper-3.4.6.tar.gz
3.解壓
tar -zxvf zookeeper-3.4.6.tar.gz
4.為ZK 創(chuàng)建存放數(shù)據(jù)和日志的文件夾
mkdir data
mkdir logs
5.修改ZK 配置文件
cd conf
cp zoo_sample.cfg zoo.cfg
vi zoo.cfg
修改配置內(nèi)容具體如下:
# ZooKeeper 服務器心跳時間,單位:毫秒
tickTime=2000
# 投票選舉新 Leader 的初始化時間
initLimit=10
# Leader 與 Flower 心跳檢測最大容忍時間,響應超過 syncLimit*tickTime,就剔除 Flower
syncLimit=5
# 存放數(shù)據(jù)的文件夾
dataDir=/root/zookeeper-3.4.6/data
# 存放日志的文件夾
dataLogDir=/root/zookeeper-3.4.6/logs
# ZooKeeper提供給接入客戶端的連接端口
clientPort=2181
# the maximum number of client connections.
# increase this if you need to handle more clients
#maxClientCnxns=60
#
# Be sure to read the maintenance section of the
# administrator guide before turning on autopurge.
#
# http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance
#
# The number of snapshots to retain in dataDir
#autopurge.snapRetainCount=3
# Purge task interval in hours
# Set to "0" to disable auto purge feature
接著,到 /root/zookeeper-3.4.6/data 創(chuàng)建文件 myid(如果部署的是集群,那么這個 myid 必需唯一,不能重復)。
cat > myid
vi myid
具體如下:
6.配置環(huán)境變量
vi /etc/profile
export ZOOKEEPER_HOME=/root/zookeeper-3.4.6
export PATH=$PATH:$ZOOKEEPER_HOME/bin
再執(zhí)行 source /etc/profile
至此,ZooKeeper 已經(jīng)配置好了,我們可以啟動看下是否有問題。
2.2 安裝 Kafka
1.到官網(wǎng)下載安裝包
2.使用 psftp 上傳到服務器
# put dir remoteDir
put D:\downloads\kafka_2.13-3.5.0.tgz /root/kafka_2.13-3.5.0.tgz
3.解壓
tar -zxvf kafka_2.13-3.5.0.tgz
4.修改配置
cd kafka_2.13-3.5.0cd config/
由于 server.properties 比較大,就不全部貼上來了,只貼我修改的部分:
# 是Broker的標識,因此在集群中必需唯一
broker.id=0
# Broker 對外服務地址(我這里vmware的ip是192.168.226.140)
listeners=PLAINTEXT://192.168.226.140:9092
# 實際工作中,會分內(nèi)網(wǎng)外網(wǎng),當有需要提供給外部客戶端使用時,我們一般 listeners 配置內(nèi)網(wǎng)供 Broker 之間通信使用,而 advertised.listeners 配置走外網(wǎng)給接入的客戶端使用
#advertised.listeners=PLAINTEXT://your.host.name:9092
# 存放消息日志文件地址
log.dirs=/root/kafka_2.13-3.5.0/logs
# ZK 的訪問路徑,我這里因為 ZK 和 Kafka 放在了同一個服務器上,所以就使用了 localhost
zookeeper.connect=localhost:2181
5.修改環(huán)境變量
vi /etc/profile
export KAFKA_HOME=/root/kafka_2.13-3.5.0
export PATH=$PATH:$KAFKA_HOME/bin
再執(zhí)行 source /etc/profile
6.進入bin目錄,啟動 Broker
kafka-server-start.sh ../config/server.properties &
ps -ef|grep kafka 看下進程,但是是否已經(jīng)可以使用,要通過發(fā)送消息和消費消息來驗證。
3 使用 Spring Boot 實現(xiàn)簡單的 Demo
下面是示例代碼:
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<!-- spring boot3.0+ 只支持jdk17,如果使用1.8出現(xiàn)包沖突需要自己處理 -->
<version>2.7.12</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.example.czl</groupId>
<artifactId>kafka</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>springboot-kafka</name>
<description>spring boot集成kafka demo</description>
<properties>
<java.version>1.8</java.version>
<mybatis-plus.version>3.5.3.1</mybatis-plus.version>
<velocity-engine-core.version>2.3</velocity-engine-core.version>
<lombok.version>1.18.26</lombok.version>
<guava.version>31.1-jre</guava.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>com.mysql</groupId>
<artifactId>mysql-connector-j</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<artifactId>scala-library</artifactId>
<groupId>org.scala-lang</groupId>
</exclusion>
<exclusion>
<artifactId>scala-reflect</artifactId>
<groupId>org.scala-lang</groupId>
</exclusion>
</exclusions>
</dependency>
<!-- mybatis-plus -->
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>mybatis-plus-boot-starter</artifactId>
<version>${mybatis-plus.version}</version>
</dependency>
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>mybatis-plus-generator</artifactId>
<version>${mybatis-plus.version}</version>
</dependency>
<dependency>
<groupId>org.apache.velocity</groupId>
<artifactId>velocity-engine-core</artifactId>
<version>${velocity-engine-core.version}</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>${lombok.version}</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>${guava.version}</version>
</dependency>
</dependencies>
<!-- 對于一些特殊的依賴指定特定版本 -->
<!--<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.yaml</groupId>
<artifactId>snakeyaml</artifactId>
<version>2.0</version>
</dependency>
</dependencies>
</dependencyManagement>-->
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
application.yml
spring:
application:
name: spring-boot-kafka
profiles:
active: dev
server:
port: 8080
application-dev.yml
spring:
datasource:
url: "jdbc:mysql://***:***/***?useSSL=false&useUnicode=true&characterEncoding=utf8&ApplicationName=spring-boot-demo&serverTimezone=UTC&allowMultiQueries=true"
username: "***"
password: "***"
kafka:
bootstrap-servers: "192.168.226.140:9092" # 訪問Kafka服務端的地址
consumer:
group-id: ${spring.application.name}-${spring.profiles.active} # 一條消息只會被訂閱了該主題的同一個分組內(nèi)的一個消費者消費
mybatis-plus:
configuration:
# 打印sql
log-impl: org.apache.ibatis.logging.stdout.StdOutImpl
logback.xml
<?xml version="1.0" encoding="UTF-8"?>
<configuration>
<property name="LOG_PATH_HOME" value="./logs/spring-boot-kafka"/>
<property name="LOG_LEVEL" value="INFO"/>
<!-- 日志文件布局 -->
<appender name="FILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
<encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
<pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%t] %-5level %logger{36}\(%L\) - [%X{traceId}] %msg%n</pattern>
</encoder>
<!-- 按時間大小歸檔日志 -->
<rollingPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedRollingPolicy">
<FileNamePattern>${LOG_PATH_HOME}/log.%d{yyyy-MM-dd}.%i.log</FileNamePattern>
<maxFileSize>200MB</maxFileSize>
</rollingPolicy>
</appender>
<!-- 控制臺日志布局 -->
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
<Pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%t] %-5level %logger{36}\(%L\) - [%X{traceId}] %msg%n</Pattern>
</encoder>
</appender>
<logger name="org.springframework.web.filter.CommonsRequestLoggingFilter" level="INFO"/>
<logger name="org.springframework" level="INFO"/>
<logger name="com.czl.demo" level="${LOG_LEVEL}"/>
<root level="${LOG_LEVEL}">
<appender-ref ref="FILE"/>
<appender-ref ref="STDOUT"/>
</root>
</configuration>
ProducerDemo
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;
/**
* @author CaiZhuliang
* @date 2023/6/18
*/
@Slf4j
@Component
@RequiredArgsConstructor
public class ProducerDemo {
private final KafkaTemplate<String, String> kafkaTemplate;
/**
* 發(fā)送消息
* @param topic 主題
* @param msg 消息
* @param callback 鉤子
*/
public void send(String topic, String msg, ListenableFutureCallback<SendResult<String, String>> callback) {
log.info("發(fā)送Kafka消息 - topic : {}, msg : {}", topic, msg);
ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, msg);
if (null != callback) {
future.addCallback(callback);
}
}
}
ConsumerDemo
package com.example.czl.kafka.kafka.producer.consumer;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
/**
* @author CaiZhuliang
* @date 2023/6/18
*/
@Slf4j
@Component
@RequiredArgsConstructor
public class ConsumerDemo {
@KafkaListener(topics = "test-topic-1")
public void receivingMsg(String msg) {
log.info("接收到Kafka消息 - msg : {}", msg);
}
}
TestController
package com.example.czl.kafka.controller;
import com.example.czl.kafka.kafka.producer.ProducerDemo;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* @author CaiZhuliang
* @date 2023/6/18
*/
@Slf4j
@RestController
@RequiredArgsConstructor
@RequestMapping("/test")
public class TestController {
private final ProducerDemo producerDemo;
@GetMapping("/send/kafka_msg")
public Long sendMsg(String msg) {
log.info("測試發(fā)送kafka消息 - msg : {}", msg);
producerDemo.send("test-topic-1", msg, null);
return System.currentTimeMillis();
}
}
postman請求測試如下:
控制臺信息如下:
作者介紹
蔡柱梁,51CTO社區(qū)編輯,從事Java后端開發(fā)8年,做過傳統(tǒng)項目廣電BOSS系統(tǒng),后投身互聯(lián)網(wǎng)電商,負責過訂單,TMS,中間件等。