深度解析 Raft 協(xié)議與 KRaft 實戰(zhàn)演示
一、Raft 協(xié)議是什么?
Raft 協(xié)議是一種分布式一致性算法,它用于在分布式系統(tǒng)中的多個節(jié)點之間達成一致性。Raft 協(xié)議的目標是提供一種相對簡單、易于理解和實現(xiàn)的方法,以確保在網(wǎng)絡分區(qū)、節(jié)點故障等情況下,系統(tǒng)仍然能夠保持一致性和可用性。
圖片
應用服務對于請求的處理流程圖:
圖片
以下是 Raft 協(xié)議的核心架構組件和流程:
1、節(jié)點角色:
- Leader:負責管理整個集群,處理客戶端請求,發(fā)起日志復制,以及觸發(fā)新的選舉。
- Follower:被動節(jié)點,接收并復制 Leader 的日志條目,響應 Leader 的心跳和日志復制請求。
- Candidate:當 Follower 在選舉超時時間內未收到 Leader 的心跳時,它會變成 Candidate 并發(fā)起選舉。
圖片
當一個節(jié)點啟動的時候,需要將自身節(jié)點信息注冊到集群中Leader節(jié)點
2、領導者選舉(Leader Election):
- 當集群啟動或 Leader 失效時,F(xiàn)ollower 會等待一段時間(隨機化超時時間)后變成 Candidate。
- Candidate 發(fā)起選舉,向其他節(jié)點發(fā)送請求投票(RequestVote RPC)。
- 如果 Candidate 獲得大多數(shù)節(jié)點的投票,它就成為新的 Leader。
3、日志復制(Log Replication):
- Leader 處理客戶端請求,將每個請求作為新的日志條目追加到其日志中。
- Leader 向其他節(jié)點發(fā)送 AppendEntries RPC 來復制日志條目。
- 當日志條目被復制到大多數(shù)節(jié)點時,Leader 將這些條目標記為已提交,并通知 Follower 應用這些更改。
4、日志壓縮(Log Compaction):
- 為了減少日志的大小,Raft 允許 Leader 刪除那些已經(jīng)被大多數(shù)節(jié)點復制并提交的日志條目。
5、安全性和一致性:
- Raft 確保在任何時候,只有當前任期的日志條目可以被提交。通過領導者的選舉機制和日志復制策略,Raft 保證了集群狀態(tài)的一致性。
6、成員變更(Membership Changes):
- Raft 允許在不停機的情況下更改集群的成員。
- Leader 可以向 Follower 發(fā)送配置更改的日志條目,這些更改在被復制和提交后生效。
7、心跳和超時:
- Leader 定期向 Follower 發(fā)送心跳(Heartbeat)以維持其領導地位。
- Follower 在未收到心跳的情況下會觸發(fā)新的選舉。
8、日志一致性:
- Raft 通過確保所有已提交的日志條目在集群中的所有節(jié)點上都是一致的,來維護一致性。
Raft 協(xié)議的架構設計強調了簡單性和易于理解,同時提供了強大的一致性和容錯能力。這種設計使得 Raft 成為了許多分布式系統(tǒng)和數(shù)據(jù)庫的首選一致性算法。
角色轉換這幅圖是領袖、候選人和群眾的角色切換圖,我先簡單總結一下:
- 群眾 -> 候選人:當開始選舉,或者“選舉超時”時
- 候選人 -> 候選人:當“選舉超時”,或者開始新的“任期”
- 候選人 -> 領袖:獲取大多數(shù)投票時
- 候選人 -> 群眾:其它節(jié)點成為領袖,或者開始新的“任期”
- 領袖 -> 群眾:發(fā)現(xiàn)自己的任期ID比其它節(jié)點分任期ID小時,會自動放棄領袖位置
圖片
Raft 協(xié)議通過這些機制解決了分布式系統(tǒng)中的一致性問題,特別是在領導者選舉和日志復制方面。它被廣泛應用于各種分布式系統(tǒng)和服務中,例如 etcd(一個分布式鍵值存儲系統(tǒng)),它被用作 Kubernetes 的后端存儲。Raft 協(xié)議的設計使得它在實際應用中既高效又可靠。
二、Raft 協(xié)議應用場景
Raft 協(xié)議作為一種分布式一致性算法,被廣泛應用于需要在多個節(jié)點間保持數(shù)據(jù)一致性的分布式系統(tǒng)場景中。以下是一些典型的 Raft 協(xié)議應用場景:
1、分布式存儲系統(tǒng):
Raft 協(xié)議被用于分布式存儲系統(tǒng)中,以確保數(shù)據(jù)在多個節(jié)點間的一致性和可用性。例如,分布式鍵值存儲(如 etcd、Consul)和分布式數(shù)據(jù)庫(如 TiKV)都采用了 Raft 協(xié)議。
2、配置管理服務:
在配置管理服務中,Raft 用于確保集群中的所有節(jié)點都能訪問到最新的配置信息。例如,Consul 提供了一個服務發(fā)現(xiàn)和配置的工具,它使用 Raft 來保證配置的一致性。
3、服務發(fā)現(xiàn)和注冊:
服務發(fā)現(xiàn)和注冊系統(tǒng)(如 etcd)使用 Raft 來維護服務實例的注冊信息,確保客戶端能夠發(fā)現(xiàn)和連接到正確的服務實例。
4、分布式鎖服務:
分布式鎖服務需要在多個節(jié)點間協(xié)調資源的訪問,Raft 協(xié)議可以幫助實現(xiàn)一個高可用和一致性的分布式鎖。
5、分布式任務調度:
在分布式任務調度系統(tǒng)中,Raft 可以用來選舉任務調度器的領導者,確保任務分配的一致性和順序執(zhí)行。
6、分布式狀態(tài)機:
Raft 協(xié)議可以用來構建分布式狀態(tài)機,其中每個節(jié)點都維護一個狀態(tài)機的副本,Raft 保證這些狀態(tài)機的狀態(tài)一致。
7、分布式日志系統(tǒng):
分布式日志系統(tǒng)(如 Apache Kafka)可以使用 Raft 來保證日志數(shù)據(jù)在多個副本之間的一致性。
8、集群管理:
在集群管理工具中,Raft 可以用于選舉集群領導者,管理集群狀態(tài),以及處理集群成員的加入和退出。
9、分布式事務:
雖然 Raft 本身不直接處理分布式事務,但它可以作為分布式事務協(xié)議的一部分,用于保證事務日志的一致性。
Raft 協(xié)議因其易于理解和實現(xiàn),以及在實踐中的高效性和可靠性,成為了構建分布式系統(tǒng)時的首選一致性算法之一。在這些應用場景中,Raft 協(xié)議幫助系統(tǒng)在面對網(wǎng)絡分區(qū)、節(jié)點故障等分布式系統(tǒng)常見問題時,仍然能夠保持數(shù)據(jù)的一致性和系統(tǒng)的可用性。
三、Kafka Raft(KRaft)
Kafka Raft(KRaft)與 Apache ZooKeeper 是兩種不同的分布式協(xié)調服務,它們在 Kafka 集群中扮演著不同的角色。以下是 KRaft 與 ZooKeeper 的對比:
1、依賴性:
- ZooKeeper:在 KRaft 出現(xiàn)之前,Kafka 嚴重依賴于 ZooKeeper 來管理集群的元數(shù)據(jù),如 broker 注冊、主題分區(qū)、控制器選舉等。
- KRaft:KRaft 是 Kafka 內部實現(xiàn)的一致性協(xié)議,它允許 Kafka 集群在不依賴 ZooKeeper 的情況下運行,從而簡化了 Kafka 的架構。
2、一致性協(xié)議:
- ZooKeeper:使用 ZAB(ZooKeeper Atomic Broadcast)協(xié)議,它是一個為分布式系統(tǒng)提供一致性服務的協(xié)議。
- KRaft:基于 Raft 一致性協(xié)議,它提供了一種更易于理解和實現(xiàn)的領導者選舉和日志復制機制。
3、性能和可伸縮性:
- ZooKeeper:在大型集群中,ZooKeeper 可能會成為性能瓶頸,因為它需要處理大量的客戶端請求和維護復雜的會話狀態(tài)。
- KRaft:KRaft 旨在提高 Kafka 的性能和可伸縮性,通過內部管理元數(shù)據(jù),減少了對外部協(xié)調服務的依賴。
4、部署和管理:
- ZooKeeper:部署和維護 ZooKeeper 集群需要額外的工作,包括配置、監(jiān)控和故障恢復。
- KRaft:由于 KRaft 集成在 Kafka 中,部署和管理 Kafka 集群變得更加簡單,不再需要單獨的 ZooKeeper 集群。
5、可靠性和可用性:
- ZooKeeper:ZooKeeper 提供了強一致性保證,但在選舉過程中可能會有短暫的不可用性。
- KRaft:KRaft 同樣提供了強一致性保證,并且通過內部的控制器集群(Controller Quorum)來提高系統(tǒng)的可靠性和可用性。
6、未來發(fā)展:
- ZooKeeper:隨著 KRaft 的引入,Kafka 社區(qū)逐漸減少了對 ZooKeeper 的依賴,這可能會影響 ZooKeeper 在 Kafka 生態(tài)系統(tǒng)中的地位。
- KRaft:KRaft 是 Kafka 未來發(fā)展的方向,它標志著 Kafka 朝著更輕量級、更易于管理的方向發(fā)展。
KRaft 模式的主要優(yōu)勢包括:
- 去中心化:Kafka 集群不再依賴于外部的 ZooKeeper 集群,簡化了部署和運維。
- 性能提升:由于不再需要與 ZooKeeper 進行通信,Kafka 集群的性能得到了提升。
- 擴展性:KRaft 模式允許 Kafka 集群更靈活地擴展,不再受到 ZooKeeper 集群規(guī)模的限制。
- 一致性和可用性:Raft 協(xié)議確保了即使在部分控制器節(jié)點失敗的情況下,集群的元數(shù)據(jù)仍然能夠保持一致性和可用性。
- 簡化的故障恢復:在 KRaft 模式下,Kafka 集群的故障恢復過程更加簡單和直接。
KRaft 模式在 Kafka 3.3.1 版本中被標記為可以在生產(chǎn)環(huán)境中使用。這意味著 Kafka 用戶現(xiàn)在可以選擇 KRaft 模式來部署他們的 Kafka 集群,以獲得更好的性能和更簡單的運維體驗。然而,需要注意的是,KRaft 模式目前仍然是一個相對較新的功能,因此在生產(chǎn)環(huán)境中使用時,建議密切關注 Kafka 社區(qū)的更新和最佳實踐。
四、基于KRaft 協(xié)議部署Kafka(不依賴與Zookeeper)
關于更多為啥會拋棄Zookeeper的原因可以參考我這篇文章:為何Kafka在2.8版本開始會“拋棄”Zookeeper?
首先來看一下KRaft在系統(tǒng)架構層面和之前的版本有什么區(qū)別。KRaft模式提出來去zookeeper后的kafka整體架構入下圖是前后架構圖對比:
圖片
1)下載 Kafka
wget https://downloads.apache.org/kafka/3.6.1/kafka_2.13-3.6.1.tgz
2)配置修改
修改kafka目錄下的config/kraft/server.properties文件。三個服務器都需要修改。特別注意:每個服務器(broker)上的配置里的node.id必須是數(shù)字,并且不能重復。
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# This configuration file is intended for use in KRaft mode, where
# Apache ZooKeeper is not present. See config/kraft/README.md for details.
#
############################# Server Basics #############################
# The role of this server. Setting this puts us in KRaft mode
# 節(jié)點角色(修改)
process.roles=broker,controller
# The node id associated with this instance's roles
# 節(jié)點ID,和節(jié)點所承擔的角色想關聯(lián)(修改)
node.id=1
# The connect string for the controller quorum
# 配置標識有哪些節(jié)點是 **Quorum** 的投票者節(jié)點
controller.quorum.voters=1@192.168.182.110:9093,2@192.168.182.111:9093,3@192.168.182.112:9093
############################# Socket Server Settings #############################
# The address the socket server listens on.
# Combined nodes (i.e. those with `process.roles=broker,controller`) must list the controller listener here at a minimum.
# If the broker listener is not defined, the default listener will use a host name that is equal to the value of java.net.InetAddress.getCanonicalHostName(),
# with PLAINTEXT listener name, and port 9092.
# FORMAT:
# listeners = listener_name://host_name:port
# EXAMPLE:
# listeners = PLAINTEXT://your.host.name:9092
listeners=PLAINTEXT://:9092,CONTROLLER://:9093
# Name of listener used for communication between brokers.
inter.broker.listener.name=PLAINTEXT
# Listener name, hostname and port the broker will advertise to clients.
# If not set, it uses the value for "listeners".
advertised.listeners=PLAINTEXT://:9092
# A comma-separated list of the names of the listeners used by the controller.
# If no explicit mapping set in `listener.security.protocol.map`, default will be using PLAINTEXT protocol
# This is required if running in KRaft mode.
controller.listener.names=CONTROLLER
# Maps listener names to security protocols, the default is for them to be the same. See the config documentation for more details
listener.security.protocol.map=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
# The number of threads that the server uses for receiving requests from the network and sending responses to the network
num.network.threads=3
# The number of threads that the server uses for processing requests, which may include disk I/O
num.io.threads=8
# The send buffer (SO_SNDBUF) used by the socket server
socket.send.buffer.bytes=102400
# The receive buffer (SO_RCVBUF) used by the socket server
socket.receive.buffer.bytes=102400
# The maximum size of a request that the socket server will accept (protection against OOM)
socket.request.max.bytes=104857600
############################# Log Basics #############################
# A comma separated list of directories under which to store log files
# 這里我修改了日志文件的路徑,默認是在/tmp目錄下的
log.dirs=/data/kraft-combined-logs
# The default number of log partitions per topic. More partitions allow greater
# parallelism for consumption, but this will also result in more files across
# the brokers.
num.partitinotallow=1
# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
# This value is recommended to be increased for installations with data dirs located in RAID array.
num.recovery.threads.per.data.dir=1
############################# Internal Topic Settings #############################
# The replication factor for the group metadata internal topics "__consumer_offsets" and "__transaction_state"
# For anything other than development testing, a value greater than 1 is recommended to ensure availability such as 3.
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
############################# Log Flush Policy #############################
# Messages are immediately written to the filesystem but by default we only fsync() to sync
# the OS cache lazily. The following configurations control the flush of data to disk.
# There are a few important trade-offs here:
# 1. Durability: Unflushed data may be lost if you are not using replication.
# 2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.
# 3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to excessive seeks.
# The settings below allow one to configure the flush policy to flush data after a period of time or
# every N messages (or both). This can be done globally and overridden on a per-topic basis.
# The number of messages to accept before forcing a flush of data to disk
#log.flush.interval.messages=10000
# The maximum amount of time a message can sit in a log before we force a flush
#log.flush.interval.ms=1000
############################# Log Retention Policy #############################
# The following configurations control the disposal of log segments. The policy can
# be set to delete segments after a period of time, or after a given size has accumulated.
# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
# from the end of the log.
# The minimum age of a log file to be eligible for deletion due to age
log.retention.hours=168
# A size-based retention policy for logs. Segments are pruned from the log unless the remaining
# segments drop below log.retention.bytes. Functions independently of log.retention.hours.
#log.retention.bytes=1073741824
# The maximum size of a log segment file. When this size is reached a new log segment will be created.
log.segment.bytes=1073741824
# The interval at which log segments are checked to see if they can be deleted according
# to the retention policies
log.retention.check.interval.ms=300000
三個broker的配置基本都和上面的配置一樣,不同的地方就是node.id:
kraft1:node.id=1
kraft2:node.id=2
kraft3:node.id=3
另外還有兩處需要修改。
- controller.quorum.voters=1@kraft1:9093,2@kraft2:9093,3@kraft3:9093【以逗號分隔的{id}@{host}:{port}投票者列表。例如:1@localhost:9092,2@localhost:9093,3@localhost:9094】
- log.dirs=/home/vagrant/kraft-combined-logs【日志路徑,默認是/temp下的文件下,生產(chǎn)環(huán)境不要使用,因為linux會清理/tmp目錄下的文件,會造成數(shù)據(jù)丟失】
Process.Roles:
每個Kafka服務器現(xiàn)在都有一個新的配置項,叫做Process.Roles, 這個參數(shù)可以有以下值:
- 如果Process.Roles = Broker, 服務器在KRaft模式中充當 Broker。
- 如果Process.Roles = Controller, 服務器在KRaft模式下充當 Controller。
- 如果Process.Roles = Broker,Controller,服務器在KRaft模式中同時充當 Broker 和Controller。
- 如果process.roles 沒有設置。那么集群就假定是運行在ZooKeeper模式下。
如前所述,目前不能在不重新格式化目錄的情況下在ZooKeeper模式和KRaft模式之間來回轉換。同時充當Broker和Controller的節(jié)點稱為“組合”節(jié)點。
對于簡單的場景,組合節(jié)點更容易運行和部署,可以避免多進程運行時,JVM帶來的相關的固定內存開銷。關鍵的缺點是,控制器將較少地與系統(tǒng)的其余部分隔離。例如,如果代理上的活動導致內存不足,則服務器的控制器部分不會與該OOM條件隔離。
Quorum Voters
- 系統(tǒng)中的所有節(jié)點都必須設置 controller.quorum.voters 配置。這個配置標識有哪些節(jié)點是 Quorum 的投票者節(jié)點。所有想成為控制器的節(jié)點都需要包含在這個配置里面。這類似于在使用ZooKeeper時,使用ZooKeeper.connect配置時必須包含所有的ZooKeeper服務器。
- 然而,與ZooKeeper配置不同的是,controller.quorum.voters 配置需要包含每個節(jié)點的id。格式為: id1@host1:port1,id2@host2:port2。
3)生成集群ID
隨便找一個服務器,進入kafka目錄,使用kafka-storage.sh生成一個uuid,一個集群只能有一個uuid?。?!
./bin/kafka-storage.sh random-uuid
# 這個ID就可以作為集群的ID
# AxAUvePAQ364y4mxggF35w
4)用 kafka-storage.sh 格式化存儲數(shù)據(jù)的目錄
三個機器上都需要執(zhí)行
#./bin/kafka-storage.sh format -t <uuid> -c ./config/kraft/server.properties
./bin/kafka-storage.sh format -t AxAUvePAQ364y4mxggF35w -c config/kraft/server.properties
5)用bin/kafka-server-start.sh 啟動Kafka Server
./bin/kafka-server-start.sh -daemon ./config/kraft/server.properties
6)測試驗證
./bin/kafka-topics.sh --create --topic kafkaraftTest --partitions 1 --replication-factor 1 --bootstrap-server 192.168.182.110:9092
查看topic
./bin/kafka-topics.sh --list --bootstrap-server 192.168.182.110:9092
./bin/kafka-topics.sh --describe --topic kafkaraftTest --bootstrap-server 192.168.182.110:9092
圖片