圖解 Kafka 源碼之服務(wù)端啟動(dòng)流程
前面「八篇」文章通過(guò)「場(chǎng)景驅(qū)動(dòng)方式」帶你深度剖析了 Kafka「日志系統(tǒng)」源碼架構(gòu)設(shè)計(jì)的方方面面,從今天開始,我們來(lái)深度剖析 Kafka「Controller」的底層源碼實(shí)現(xiàn),這是 Controller 系列第一篇,我們先回過(guò)頭來(lái)繼續(xù)來(lái)深度聊聊「Kafka 服務(wù)端啟動(dòng)的流程」,看看 Kafka 服務(wù)端是如何啟動(dòng)的。
一、總體概述
在深入剖析Kafka「Controller」之前,我想你可能或多或少會(huì)有這樣的疑問:
Kafka 服務(wù)端都有哪些組件,這些組件又是通過(guò)哪個(gè)類來(lái)啟動(dòng)的呢?
這里我們通過(guò)啟動(dòng) Kafka 來(lái)了解,大家都知道,啟動(dòng) Kafka 可以執(zhí)行以下命令來(lái)啟動(dòng):
# 1、啟動(dòng) kafka 服務(wù)命令:
bin/kafka-server-start.sh config/server.properties &
那么今天就來(lái)看看通過(guò)這個(gè)腳本 KafkaServer 初始化了哪些組件。
二、kafka-server-start.sh
我們來(lái)看下里面的 shell 內(nèi)容,如下:
#!/bin/bash
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# 1、注釋說(shuō)明該腳本的版權(quán)信息和使用許可。
if [ $# -lt 1 ];
then
echo "USAGE: $0 [-daemon] server.properties [--override property=value]*"
exit 1
fi
# 2、檢查命令行參數(shù)的個(gè)數(shù),若小于 1 則輸出腳本的使用方法并退出。
base_dir=$(dirname $0)
# 3、獲取當(dāng)前腳本所在目錄的路徑,并將其賦值給 base_dir 變量。
if [ "x$KAFKA_LOG4J_OPTS" = "x" ]; then
export KAFKA_LOG4J_OPTS="-Dlog4j.cnotallow=file:$base_dir/../config/log4j.properties"
fi
# 4、檢查 KAFKA_LOG4J_OPTS 環(huán)境變量是否設(shè)置,若未設(shè)置則設(shè)置該變量的值。
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
export JMX_PORT="9999"
export JMX_RMI_PORT="10000"
fi
# 5、檢查 KAFKA_HEAP_OPTS 環(huán)境變量是否設(shè)置,若未設(shè)置則設(shè)置該變量的值,并設(shè)置 JMX_PORT 和 JMX_RMI_PORT 環(huán)境變量的值,將 EXTRA_ARGS 變量的值設(shè)置為字符串 -name kafkaServer -loggc。
EXTRA_ARGS=${EXTRA_ARGS-'-name kafkaServer -loggc'}
# 6、檢查命令行參數(shù)中 COMMAND 變量的值是否為 -daemon,若是則將 EXTRA_ARGS 變量的值添加 -daemon 選項(xiàng)。同時(shí)將命令行參數(shù)向左移一位,即從 $2 開始計(jì)算參數(shù)。
COMMAND=$1
case $COMMAND in
-daemon)
EXTRA_ARGS="-daemon "$EXTRA_ARGS
shift
;;
*)
;;
esac
# 7、調(diào)用 $base_dir/kafka-run-class.sh 腳本并傳遞相應(yīng)的參數(shù)。其中 "@ 代表傳遞的為命令行參數(shù)。具體執(zhí)行的封裝在 Kafka 客戶端庫(kù)中的 kafka.Kafka 類。整個(gè)腳本的作用是啟動(dòng) Kafka 服務(wù)。
exec $base_dir/kafka-run-class.sh $EXTRA_ARGS kafka.Kafka "$@"
esac
# 7、調(diào)用 $base_dir/kafka-run-class.sh 腳本并傳遞相應(yīng)的參數(shù)。其中 "@ 代表傳遞的為命令行參數(shù)。具體執(zhí)行的封裝在 Kafka 客戶端庫(kù)中的 kafka.Kafka 類。整個(gè)腳本的作用是啟動(dòng) Kafka 服務(wù)。
exec $base_dir/kafka-run-class.sh $EXTRA_ARGS kafka.Kafka "$@"
這里我們重點(diǎn)來(lái)看 「第 7 步」,它底層執(zhí)行的是封裝在 Kafka 客戶端庫(kù)中的 kafka.Kafka 類。接下來(lái)我們來(lái)看下該類都做了什么。
三、kafka.Kafka 類
「Kafka.scala」類源碼在 Kafka 源碼包的 core 包下,具體的 github 源碼位置如下:
https://github.com/apache/kafka/blob/2.8.0/core/src/main/scala/kafka/Kafka.scala。
從整體上來(lái)看,該類就 3 個(gè)方法,相對(duì)比較簡(jiǎn)單,我能來(lái)看下里面的重點(diǎn)。
這里我們通過(guò)「2.8.x」版本來(lái)講解,「2.7.x」還未增加 KafkaRaftServer 類。
1、getPropsFromArgs
def getPropsFromArgs(args: Array[String]): Properties = {
// 創(chuàng)建一個(gè)命令行參數(shù)解析器
val optionParser = new OptionParser(false)
// 定義 --override 選項(xiàng),用于覆蓋 server.properties 文件中的屬性
val overrideOpt = optionParser.accepts("override", "Optional property that should override values set in server.properties file")
.withRequiredArg()
.ofType(classOf[String])
// 定義 --version 選項(xiàng),用于打印版本信息并退出
optionParser.accepts("version", "Print version information and exit.")
// 若沒有提供參數(shù)或者參數(shù)包含 --help 選項(xiàng),則打印用法并退出
if (args.length == 0 || args.contains("--help")) {
CommandLineUtils.printUsageAndDie(optionParser, "USAGE: java [options] %s server.properties [--override property=value]*".format(classOf[KafkaServer].getSimpleName()))
}
// 若參數(shù)中包含 --version 選項(xiàng),則打印版本信息并退出
if (args.contains("--version")) {
CommandLineUtils.printVersionAndDie()
}
// 加載 server.properties 文件中的屬性到 Properties 對(duì)象中
val props = Utils.loadProps(args(0))
// 若提供了其他參數(shù),則解析這些參數(shù)
if (args.length > 1) {
// 解析參數(shù)中的選項(xiàng)和參數(shù)值
val options = optionParser.parse(args.slice(1, args.length): _*)
// 檢查是否有非選項(xiàng)參數(shù)
if (options.nonOptionArguments().size() > 0) {
CommandLineUtils.printUsageAndDie(optionParser, "Found non argument parameters: " + options.nonOptionArguments().toArray.mkString(","))
}
// 將解析得到的選項(xiàng)和參數(shù)值添加到 props 對(duì)象中
props ++= CommandLineUtils.parseKeyValueArgs(options.valuesOf(overrideOpt).asScala)
}
// 返回解析得到的屬性集合
props
}
該函數(shù)的作用是從命令行參數(shù)中解析出屬性集合。它內(nèi)部使用了 OptionParser 類庫(kù)來(lái)解析命令行選項(xiàng),并從 server.properties 文件中加載屬性。
如果提供了 override 選項(xiàng),則它將覆蓋 server.properties 文件中的相應(yīng)屬性。函數(shù)返回一個(gè) Properties 對(duì)象,其中包含了解析得到的屬性。
如果沒有提供正確的命令行參數(shù)或者提供了 --help 或 --version 選項(xiàng),函數(shù)會(huì)打印幫助信息或版本信息并退出。
2、buildServer
private def buildServer(props: Properties): Server = {
val config = KafkaConfig.fromProps(props, false)
// 直接啟動(dòng)定時(shí)任務(wù)、網(wǎng)絡(luò)層、請(qǐng)求處理層
if (config.requiresZookeeper) {
new KafkaServer(
config,
Time.SYSTEM,
threadNamePrefix = None,
enableForwarding = false
)
} else {
// 調(diào)用 BrokerServer 等來(lái)啟動(dòng)網(wǎng)絡(luò)層和請(qǐng)求處理層
new KafkaRaftServer(
config,
Time.SYSTEM,
threadNamePrefix = None
)
}
}
在 kafka 2.8.x 版本中 新增了 raft 協(xié)議之后將 BrokerServer、ControllServer 使用了單獨(dú)的文件來(lái)啟動(dòng)最終調(diào)用網(wǎng)絡(luò)層和請(qǐng)求處理層,如果還是使用 zk 的方式啟動(dòng)則是 KafkaServer 啟動(dòng)網(wǎng)絡(luò)層和請(qǐng)求處理層。
3、main
# 2.7.x 版本源碼
def main(args: Array[String]): Unit = {
try {
// 1、解析命令行參數(shù),獲得屬性集合
val serverProps = getPropsFromArgs(args)
// 2、從屬性集合創(chuàng)建 KafkaServerStartable 對(duì)象
val kafkaServerStartable = KafkaServerStartable.fromProps(serverProps)
try {
// 如果不是 Windows 操作系統(tǒng),并且不是 IBM JDK,則注冊(cè) LoggingSignalHandler
if (!OperatingSystem.IS_WINDOWS && !Java.isIbmJdk)
new LoggingSignalHandler().register()
} catch {
// 如果注冊(cè) LoggingSignalHandler 失敗,則在日志中打印警告信息
case e: ReflectiveOperationException =>
warn("Failed to register optional signal handler that logs a message when the process is terminated " +
s"by a signal. Reason for registration failure is: $e", e)
}
// 3、添加 shutdown hook,用于在程序結(jié)束時(shí)執(zhí)行 KafkaServerStartable 的 shutdown 方法
Exit.addShutdownHook("kafka-shutdown-hook", kafkaServerStartable.shutdown())
// 4、啟動(dòng) KafkaServerStartable 實(shí)例
kafkaServerStartable.startup()
// 5、等待 KafkaServerStartable 實(shí)例終止
kafkaServerStartable.awaitShutdown()
}
catch {
// 如果有異常發(fā)生,則記錄日志并退出程序
case e: Throwable =>
fatal("Exiting Kafka due to fatal exception", e)
Exit.exit(1)
}
// 6、正常終止程序
Exit.exit(0)
}
該函數(shù)是 Kafka 服務(wù)進(jìn)程的入口,它是整個(gè) Kafka 運(yùn)行過(guò)程的驅(qū)動(dòng)程序。該函數(shù)首先通過(guò)調(diào)用 getPropsFromArgs 函數(shù)解析命令行參數(shù)并獲得屬性集合,然后使用這些屬性創(chuàng)建 KafkaServerStartable 實(shí)例。接著,它注冊(cè)一個(gè) shutdown hook,用于在程序終止時(shí)執(zhí)行 KafkaServerStartable 的 shutdown 方法。然后它啟動(dòng) KafkaServerStartable 實(shí)例,并等待該實(shí)例終止。如果發(fā)生異常,則記錄日志并退出程序。函數(shù)最后調(diào)用 Exit.exit 方法退出程序,返回 0 表示正常終止。
# 2.8.x 版本
def main(args: Array[String]): Unit = {
// 獲取Kafka服務(wù)的配置信息
val serverProps = getPropsFromArgs(args)
// 根據(jù)配置信息構(gòu)建Kafka服務(wù)
val server = buildServer(serverProps)
try {
// 注冊(cè)用于記錄日志的信號(hào)處理器(若實(shí)現(xiàn)失敗則退出)
if (!OperatingSystem.IS_WINDOWS && !Java.isIbmJdk)
new LoggingSignalHandler().register()
} catch {
case e: ReflectiveOperationException =>
warn("Failed to register optional signal handler that logs a message when the process is terminated " +
s"by a signal. Reason for registration failure is: $e", e)
}
// 掛載關(guān)閉處理器,用于捕獲終止信號(hào)和常規(guī)終止請(qǐng)求
Exit.addShutdownHook("kafka-shutdown-hook", {
try server.shutdown() // 關(guān)閉Kafka服務(wù)
catch {
case _: Throwable =>
fatal("Halting Kafka.") // 日志記錄致命錯(cuò)誤信息
// 調(diào)用Exit.halt()強(qiáng)制退出,避免重復(fù)調(diào)用Exit.exit()引發(fā)死鎖
Exit.halt(1)
}
})
try server.startup() // 啟動(dòng)Kafka服務(wù)
catch {
case _: Throwable =>
// 調(diào)用Exit.exit()設(shè)置退出狀態(tài)碼,KafkaServer.startup()會(huì)在拋出異常時(shí)調(diào)用shutdown()
fatal("Exiting Kafka.")
Exit.exit(1)
}
server.awaitShutdown() // 等待Kafka服務(wù)關(guān)閉
Exit.exit(0) // 調(diào)用Exit.exit()設(shè)置退出狀態(tài)碼
}
這里最重要的是 「第 4 步」,調(diào)用 kafkaServerStartable.startup() 或者 server.startup() 來(lái)啟動(dòng) kafka。
這里我們還是以「ZK 模式」的方式來(lái)啟動(dòng),后面抽空再進(jìn)行對(duì) 「Raft 模式」啟動(dòng)進(jìn)行補(bǔ)充。
四、KafkaServerStartable
「KafkaServerStartable.scala」類源碼在 Kafka 源碼包的 core 包下,具體的 github 源碼位置如下:
https://github.com/apache/kafka/blob/2.7.0/core/src/main/scala/kafka/server/KafkaServerStartable.scala。
在 Scala 語(yǔ)言里,在一個(gè)源代碼文件中同時(shí)定義相同名字的 class 和 object 的用法被稱為伴生(Companion)。Class 對(duì)象被稱為伴生類,它和 Java 中的類是一樣的;而 Object 對(duì)象是一個(gè)單例對(duì)象,用于保存一些靜態(tài)變量或靜態(tài)方法。
這里我們主要來(lái)看下 Class 類代碼。
class KafkaServerStartable(val staticServerConfig: KafkaConfig, reporters: Seq[KafkaMetricsReporter], threadNamePrefix: Option[String] = None) extends Logging {
// 創(chuàng)建 KafkaServer 實(shí)例
// 構(gòu)造函數(shù)有兩個(gè)參數(shù) —— staticServerConfig 表示靜態(tài)服務(wù)器配置,reporters 表示 Kafka 指標(biāo)報(bào)告器。如果 threadNamePrefix 參數(shù)未用于構(gòu)造函數(shù),則默認(rèn)值為 None。threadNamePrefix 參數(shù)表示線程名稱前綴,用于調(diào)試和維護(hù)目的。
private val server = new KafkaServer(staticServerConfig, kafkaMetricsReporters = reporters, threadNamePrefix = threadNamePrefix)
def this(serverConfig: KafkaConfig) = this(serverConfig, Seq.empty)
// 啟動(dòng) KafkaServer
// startup 方法嘗試啟動(dòng) Kafka 服務(wù)器。如果啟動(dòng) Kafka 服務(wù)器時(shí)發(fā)生異常,則記錄一條 fatal 錯(cuò)誤日志并退出程序。對(duì)于成功啟動(dòng)的 Kafka 服務(wù)器,它將開始監(jiān)聽客戶端連接,并在收到消息時(shí)執(zhí)行所需的操作。
def startup(): Unit = {
try server.startup()
catch {
// 如果出現(xiàn)異常,則記錄日志并退出程序
case _: Throwable =>
// KafkaServer.startup() calls shutdown() in case of exceptions, so we invoke `exit` to set the status code
fatal("Exiting Kafka.")
Exit.exit(1)
}
}
// 關(guān)閉 KafkaServer
// shutdown 方法嘗試停止 Kafka 服務(wù)器。如果在停止服務(wù)器時(shí)出現(xiàn)異常,則記錄一條 fatal 錯(cuò)誤日志并強(qiáng)制退出程序。調(diào)用 shutdown 方法后,服務(wù)器將不再接受新的請(qǐng)求,并開始等待當(dāng)前進(jìn)行中的請(qǐng)求完成。當(dāng)所有處理中的請(qǐng)求都完成后,服務(wù)器將徹底停止。
def shutdown(): Unit = {
try server.shutdown()
catch {
// 如果出現(xiàn)異常,則記錄日志并強(qiáng)制退出程序
case _: Throwable =>
fatal("Halting Kafka.")
// Calling exit() can lead to deadlock as exit() can be called multiple times. Force exit.
Exit.halt(1)
}
}
// setServerState 方法允許從 KafkaServerStartable 對(duì)象中設(shè)置 broker 狀態(tài)。如果自定義 KafkaServerStartable 對(duì)象想要引入新的狀態(tài),則此方法很有用。
def setServerState(newState: Byte): Unit = {
server.brokerState.newState(newState)
}
// 等待 KafkaServer 退出
// awaitShutdown 方法等待 Kafka 服務(wù)器完全退出。在 Kafka 服務(wù)器執(zhí)行 shutdown 方法后,它將不再接受新的請(qǐng)求。但是,服務(wù)器可能仍在處理一些已經(jīng)接收的請(qǐng)求。awaitShutdown 方法將阻塞當(dāng)前線程,直到服務(wù)器徹底停止。
def awaitShutdown(): Unit = server.awaitShutdown()
}
KafkaServerStartable 類是一個(gè)可啟動(dòng)和停止的 Kafka 服務(wù)器。類中的 server 成員變量是 KafkaServer 類的實(shí)例,它將在 KafkaServerStartable 類對(duì)象啟動(dòng)時(shí)創(chuàng)建。該類提供了啟動(dòng)和停止 Kafka 服務(wù)器的方法,以及設(shè)置 broker 狀態(tài)和等待 Kafka 服務(wù)器退出的方法。
跟本文有關(guān)系的是 「啟動(dòng)」方法,它調(diào)用了 KafkaServer#startup 方法進(jìn)行啟動(dòng)。
五、KafkaServer 類
Kafka 集群由多個(gè) Broker 節(jié)點(diǎn)構(gòu)成,每個(gè)節(jié)點(diǎn)上都運(yùn)行著一個(gè) Kafka 實(shí)例,這些實(shí)例之間基于 ZK 來(lái)發(fā)現(xiàn)彼此,并由集群控制器 KafkaController 統(tǒng)籌協(xié)調(diào)運(yùn)行,彼此之間基于 socket 連接進(jìn)行通信。
「KafkaServer.scala」類源碼在 Kafka 源碼包的 core 包下,具體的 github 源碼位置如下:
https://github.com/apache/kafka/blob/2.7.0/core/src/main/scala/kafka/server/KafkaServer.scala。
KafkaServer 為 Kafka 的啟動(dòng)類,其中包含了 Kafka 的所有組件,如 KafkaController、groupCoordinator、replicaManager 等。
class KafkaServer(val config: KafkaConfig, //配置信息
time: Time = Time.SYSTEM, threadNamePrefix: Option[String] = None,
kafkaMetricsReporters: Seq[KafkaMetricsReporter] = List() //監(jiān)控上報(bào)
) extends Logging with KafkaMetricsGroup {
//標(biāo)識(shí)節(jié)點(diǎn)已經(jīng)啟動(dòng)完成
private val startupComplete = new AtomicBoolean(false)
//標(biāo)識(shí)節(jié)點(diǎn)正在執(zhí)行關(guān)閉操作
private val isShuttingDown = new AtomicBoolean(false)
//標(biāo)識(shí)節(jié)點(diǎn)正在執(zhí)行啟動(dòng)操作
private val isStartingUp = new AtomicBoolean(false)
//阻塞主線程等待 KafkaServer 的關(guān)閉
private var shutdownLatch = new CountDownLatch(1)
//日志上下文
private var logContext: LogContext = null
var metrics: Metrics = null
//記錄節(jié)點(diǎn)的當(dāng)前狀態(tài)
val brokerState: BrokerState = new BrokerState
//API接口類,用于處理數(shù)據(jù)類請(qǐng)求
var dataPlaneRequestProcessor: KafkaApis = null
//API接口,用于處理控制類請(qǐng)求
var controlPlaneRequestProcessor: KafkaApis = null
//權(quán)限管理
var authorizer: Option[Authorizer] = None
//啟動(dòng)socket,監(jiān)聽9092端口,等待接收客戶端請(qǐng)求
var socketServer: SocketServer = null
//數(shù)據(jù)類請(qǐng)求處理線程池
var dataPlaneRequestHandlerPool: KafkaRequestHandlerPool = null
//命令類處理線程池
var controlPlaneRequestHandlerPool: KafkaRequestHandlerPool = null
//日志管理器
var logDirFailureChannel: LogDirFailureChannel = null
var logManager: LogManager = null
//副本管理器
var replicaManager: ReplicaManager = null
//topic增刪管理器
var adminManager: AdminManager = null
//token管理器
var tokenManager: DelegationTokenManager = null
//動(dòng)態(tài)配置管理器
var dynamicConfigHandlers: Map[String, ConfigHandler] = null
var dynamicConfigManager: DynamicConfigManager = null
var credentialProvider: CredentialProvider = null
var tokenCache: DelegationTokenCache = null
//分組協(xié)調(diào)器
var groupCoordinator: GroupCoordinator = null
//事務(wù)協(xié)調(diào)器
var transactionCoordinator: TransactionCoordinator = null
//集群控制器
var kafkaController: KafkaController = null
//定時(shí)任務(wù)調(diào)度器
var kafkaScheduler: KafkaScheduler = null
//集群分區(qū)狀態(tài)信息緩存
var metadataCache: MetadataCache = null
//配額管理器
var quotaManagers: QuotaFactory.QuotaManagers = null
//zk客戶端配置
val zkClientConfig: ZKClientConfig = KafkaServer.zkClientConfigFromKafkaConfig(config).getOrElse(new ZKClientConfig())
private var _zkClient: KafkaZkClient = null
val correlationId: AtomicInteger = new AtomicInteger(0)
val brokerMetaPropsFile = "meta.properties"
val brokerMetadataCheckpoints = config.logDirs.map(logDir => (logDir, new BrokerMetadataCheckpoint(new File(logDir + File.separator + brokerMetaPropsFile)))).toMap
private var _clusterId: String = null
private var _brokerTopicStats: BrokerTopicStats = null
def clusterId: String = _clusterId
// Visible for testing
private[kafka] def zkClient = _zkClient
private[kafka] def brokerTopicStats = _brokerTopicStats
....
}
1、startup
該類方法很多,我們這里只看 startup 啟動(dòng)方法,來(lái)看看其內(nèi)部都啟動(dòng)了哪些組件,來(lái)解決本文開頭提出的問題。
/**
* Start up API for bringing up a single instance of the Kafka server.
* Instantiates the LogManager, the SocketServer and the request handlers - KafkaRequestHandlers
*/
def startup(): Unit = {
try {
info("starting")
// 是否已關(guān)閉
if (isShuttingDown.get)
throw new IllegalStateException("Kafka server is still shutting down, cannot re-start!")
// 是否已啟動(dòng)
if (startupComplete.get)
return
// 是否可以啟動(dòng)
val canStartup = isStartingUp.compareAndSet(false, true)
if (canStartup) { // 設(shè)置broker狀態(tài)為Starting
brokerState.newState(Starting)
/* setup zookeeper */
// 連接ZK,并創(chuàng)建根節(jié)點(diǎn)
initZkClient(time)
/* initialize features */
_featureChangeListener = new FinalizedFeatureChangeListener(featureCache, _zkClient)
if (config.isFeatureVersioningSupported) {
_featureChangeListener.initOrThrow(config.zkConnectionTimeoutMs)
}
/* Get or create cluster_id */
// 從ZK獲取或創(chuàng)建集群id,規(guī)則:UUID的mostSigBits、leastSigBits組合轉(zhuǎn)base64
_clusterId = getOrGenerateClusterId(zkClient)
info(s"Cluster ID = $clusterId")
/* load metadata */
// 獲取brokerId及l(fā)og存儲(chǔ)路徑,brokerId通過(guò)zk生成或者server.properties配置broker.id
// 規(guī)則:/brokers/seqid的version值 + maxReservedBrokerId(默認(rèn)1000),保證唯一性
val (preloadedBrokerMetadataCheckpoint, initialOfflineDirs) = getBrokerMetadataAndOfflineDirs
/* check cluster id */
if (preloadedBrokerMetadataCheckpoint.clusterId.isDefined && preloadedBrokerMetadataCheckpoint.clusterId.get != clusterId)
throw new InconsistentClusterIdException(
s"The Cluster ID ${clusterId} doesn't match stored clusterId ${preloadedBrokerMetadataCheckpoint.clusterId} in meta.properties. " +
s"The broker is trying to join the wrong cluster. Configured zookeeper.connect may be wrong.")
/* generate brokerId */
config.brokerId = getOrGenerateBrokerId(preloadedBrokerMetadataCheckpoint)
logContext = new LogContext(s"[KafkaServer id=${config.brokerId}] ")
// 配置logger
this.logIdent = logContext.logPrefix
// initialize dynamic broker configs from ZooKeeper. Any updates made after this will be
// applied after DynamicConfigManager starts.
// 初始化AdminZkClient,支持動(dòng)態(tài)修改配置
config.dynamicConfig.initialize(zkClient)
/* start scheduler */
// 初始化定時(shí)任務(wù)調(diào)度器
kafkaScheduler = new KafkaScheduler(config.backgroundThreads)
kafkaScheduler.startup()
/* create and configure metrics */
// 創(chuàng)建及配置監(jiān)控,默認(rèn)使用JMX及Yammer Metrics
kafkaYammerMetrics = KafkaYammerMetrics.INSTANCE
kafkaYammerMetrics.configure(config.originals)
val jmxReporter = new JmxReporter()
jmxReporter.configure(config.originals)
val reporters = new util.ArrayList[MetricsReporter]
reporters.add(jmxReporter)
val metricConfig = KafkaServer.metricConfig(config)
val metricsContext = createKafkaMetricsContext()
metrics = new Metrics(metricConfig, reporters, time, true, metricsContext)
/* register broker metrics */
_brokerTopicStats = new BrokerTopicStats
// 初始化配額管理器
quotaManagers = QuotaFactory.instantiate(config, metrics, time, threadNamePrefix.getOrElse(""))
notifyClusterListeners(kafkaMetricsReporters ++ metrics.reporters.asScala)
// 用于保證kafka-log數(shù)據(jù)目錄的存在
logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size)
/* start log manager */
// 啟動(dòng)日志管理器,kafka的消息以日志形式存儲(chǔ)
logManager = LogManager(config, initialOfflineDirs, zkClient, brokerState, kafkaScheduler, time, brokerTopicStats, logDirFailureChannel)
// 啟動(dòng)日志清理、刷新、校驗(yàn)、恢復(fù)等的定時(shí)線程
logManager.startup()
metadataCache = new MetadataCache(config.brokerId)
// Enable delegation token cache for all SCRAM mechanisms to simplify dynamic update.
// This keeps the cache up-to-date if new SCRAM mechanisms are enabled dynamically.
// SCRAM認(rèn)證方式的token緩存
tokenCache = new DelegationTokenCache(ScramMechanism.mechanismNames)
credentialProvider = new CredentialProvider(ScramMechanism.mechanismNames, tokenCache)
// Create and start the socket server acceptor threads so that the bound port is known.
// Delay starting processors until the end of the initialization sequence to ensure
// that credentials have been loaded before processing authentications.
// 啟動(dòng)socket,監(jiān)聽9092端口,等待接收客戶端請(qǐng)求
socketServer = new SocketServer(config, metrics, time, credentialProvider)
socketServer.startup(startProcessingRequests = false)
/* start replica manager */
brokerToControllerChannelManager = new BrokerToControllerChannelManagerImpl(metadataCache, time, metrics, config, threadNamePrefix)
// 啟動(dòng)副本管理器,高可用相關(guān)
replicaManager = createReplicaManager(isShuttingDown)
replicaManager.startup()
brokerToControllerChannelManager.start()
// 將broker信息注冊(cè)到ZK上
val brokerInfo = createBrokerInfo
val brokerEpoch = zkClient.registerBroker(brokerInfo)
// Now that the broker is successfully registered, checkpoint its metadata
// 校驗(yàn) broker 信息
checkpointBrokerMetadata(BrokerMetadata(config.brokerId, Some(clusterId)))
/* start token manager */
// 啟動(dòng) token 管理器
tokenManager = new DelegationTokenManager(config, tokenCache, time , zkClient)
tokenManager.startup()
/* start kafka controller */
// 啟動(dòng)Kafka控制器,只有 Leader 會(huì)與ZK建連
kafkaController = new KafkaController(config, zkClient, time, metrics, brokerInfo, brokerEpoch, tokenManager, brokerFeatures, featureCache, threadNamePrefix)
kafkaController.startup()
// admin管理器
adminManager = new AdminManager(config, metrics, metadataCache, zkClient)
/* start group coordinator */
// Hardcode Time.SYSTEM for now as some Streams tests fail otherwise, it would be good to fix the underlying issue
// 啟動(dòng)集群群組協(xié)調(diào)器
groupCoordinator = GroupCoordinator(config, zkClient, replicaManager, Time.SYSTEM, metrics)
groupCoordinator.startup()
/* start transaction coordinator, with a separate background thread scheduler for transaction expiration and log loading */
// Hardcode Time.SYSTEM for now as some Streams tests fail otherwise, it would be good to fix the underlying issue
// 啟動(dòng)事務(wù)協(xié)調(diào)器
transactionCoordinator = TransactionCoordinator(config, replicaManager, new KafkaScheduler(threads = 1, threadNamePrefix = "transaction-log-manager-"), zkClient, metrics, metadataCache, Time.SYSTEM)
transactionCoordinator.startup()
/* Get the authorizer and initialize it if one is specified.*/
// ACL
authorizer = config.authorizer
authorizer.foreach(_.configure(config.originals))
val authorizerFutures: Map[Endpoint, CompletableFuture[Void]] = authorizer match {
case Some(authZ) =>
authZ.start(brokerInfo.broker.toServerInfo(clusterId, config)).asScala.map { case (ep, cs) =>
ep -> cs.toCompletableFuture
}
case None =>
brokerInfo.broker.endPoints.map { ep =>
ep.toJava -> CompletableFuture.completedFuture[Void](null)
}.toMap
}
// 創(chuàng)建拉取管理器
val fetchManager = new FetchManager(Time.SYSTEM,
new FetchSessionCache(config.maxIncrementalFetchSessionCacheSlots,
KafkaServer.MIN_INCREMENTAL_FETCH_SESSION_EVICTION_MS))
/* start processing requests */
// 初始化數(shù)據(jù)類請(qǐng)求的KafkaApis,負(fù)責(zé)數(shù)據(jù)類請(qǐng)求邏輯處理
dataPlaneRequestProcessor = new KafkaApis(socketServer.dataPlaneRequestChannel, replicaManager, adminManager, groupCoordinator, transactionCoordinator,kafkaController, zkClient, config.brokerId, config, metadataCache, metrics, authorizer, quotaManagers,fetchManager, brokerTopicStats, clusterId, time, tokenManager, brokerFeatures, featureCache)
// 初始化數(shù)據(jù)類請(qǐng)求處理的線程池
dataPlaneRequestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.dataPlaneRequestChannel, dataPlaneRequestProcessor, time,
config.numIoThreads, s"${SocketServer.DataPlaneMetricPrefix}RequestHandlerAvgIdlePercent", SocketServer.DataPlaneThreadPrefix)
socketServer.controlPlaneRequestChannelOpt.foreach { controlPlaneRequestChannel =>
// 初始化控制類請(qǐng)求的 KafkaApis
controlPlaneRequestProcessor = new KafkaApis(controlPlaneRequestChannel, replicaManager, adminManager, groupCoordinator, transactionCoordinator,
kafkaController, zkClient, config.brokerId, config, metadataCache, metrics, authorizer, quotaManagers,
fetchManager, brokerTopicStats, clusterId, time, tokenManager, brokerFeatures, featureCache)
// 初始化控制類請(qǐng)求的線程池
controlPlaneRequestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.controlPlaneRequestChannelOpt.get, controlPlaneRequestProcessor, time,
1, s"${SocketServer.ControlPlaneMetricPrefix}RequestHandlerAvgIdlePercent", SocketServer.ControlPlaneThreadPrefix)
}
Mx4jLoader.maybeLoad()
/* Add all reconfigurables for config change notification before starting config handlers */
config.dynamicConfig.addReconfigurables(this)
/* start dynamic config manager */
dynamicConfigHandlers = Map[String, ConfigHandler](ConfigType.Topic -> new TopicConfigHandler(logManager, config, quotaManagers, kafkaController),
ConfigType.Client -> new ClientIdConfigHandler(quotaManagers),
ConfigType.User -> new UserConfigHandler(quotaManagers, credentialProvider),
ConfigType.Broker -> new BrokerConfigHandler(config, quotaManagers))
// Create the config manager. start listening to notifications
// 啟動(dòng)動(dòng)態(tài)配置處理器
dynamicConfigManager = new DynamicConfigManager(zkClient, dynamicConfigHandlers)
dynamicConfigManager.startup()
// 啟動(dòng)請(qǐng)求處理線程
socketServer.startProcessingRequests(authorizerFutures)
// 更新broker狀態(tài)
brokerState.newState(RunningAsBroker)
shutdownLatch = new CountDownLatch(1)
startupComplete.set(true)
isStartingUp.set(false)
AppInfoParser.registerAppInfo(metricsPrefix, config.brokerId.toString, metrics, time.milliseconds())
info("started")
}
}
catch {
case e: Throwable =>
fatal("Fatal error during KafkaServer startup. Prepare to shutdown", e)
isStartingUp.set(false)
shutdown()
throw e
}
}
這里總結(jié)下該方法都啟動(dòng)了哪些組件:
- initZkClient(time) 初始化 Zk。
- kafkaScheduler 定時(shí)器。
- logManager 日志模塊。
- MetadataCache 元數(shù)據(jù)緩存。
- socketServer 網(wǎng)絡(luò)服務(wù)器。
- replicaManager 副本模塊。
- kafkaController 控制器。
- groupCoordinator 協(xié)調(diào)器用于和ConsumerCoordinator 交互
- transactionCoordinator 事務(wù)相關(guān)
- fetchManager 副本拉取管理器。
- dynamicConfigManager 動(dòng)態(tài)配置管理器。
2、Broker 狀態(tài)
這個(gè)是在 2.7.x 版本之前的狀態(tài),在 2.8.x 之后版本進(jìn)行了重構(gòu)。
sealed trait BrokerStates { def state: Byte }
case object NotRunning extends BrokerStates { val state: Byte = 0 }
case object Starting extends BrokerStates { val state: Byte = 1 }
case object RecoveringFromUncleanShutdown extends BrokerStates { val state: Byte = 2 }
case object RunningAsBroker extends BrokerStates { val state: Byte = 3 }
case object PendingControlledShutdown extends BrokerStates { val state: Byte = 6 }
case object BrokerShuttingDown extends BrokerStates { val state: Byte = 7 }
- NotRunning:初始狀態(tài),標(biāo)識(shí)當(dāng)前 broker 節(jié)點(diǎn)未運(yùn)行。
- Starting:標(biāo)識(shí)當(dāng)前 broker 節(jié)點(diǎn)正在啟動(dòng)中。
- RecoveringFromUncleanShutdown:標(biāo)識(shí)當(dāng)前 broker 節(jié)點(diǎn)正在從上次非正常關(guān)閉中恢復(fù)。
- RuningAsBroker:標(biāo)識(shí)當(dāng)前 broker 節(jié)點(diǎn)啟動(dòng)成功,可以對(duì)外提供服務(wù)。
- PendingControlledShutdown:標(biāo)識(shí)當(dāng)前 broker 節(jié)點(diǎn)正在等待 controlled shutdown 操作完成。
- BrokerShuttingDown:標(biāo)識(shí)當(dāng)前 broker 節(jié)點(diǎn)正在執(zhí)行 shutdown 操作。
這些就是 KafkaServer 中主要模塊的入口,接下來(lái)的文章會(huì)通過(guò)這些入口一一進(jìn)行分析。
六、總結(jié)
這里,我們一起來(lái)總結(jié)一下這篇文章的重點(diǎn)。
- 文章開頭通過(guò)對(duì)「kafka-server-start.sh」內(nèi)容進(jìn)行剖析,引出了 「kafka.Kafka」類。
- 在「kafka.Kafka」的 main 方法中調(diào)用了「KafkaServerStartable」嘗試啟動(dòng) Kafka 服務(wù)器。
- 接著在 「KafkaServerStartable」的 startup 方法中調(diào)用了 「KafkaServer」的 startup 方法啟動(dòng)服務(wù)器需要的各種組件類。
下篇我們來(lái)深度剖析「Broker 啟動(dòng)集群如何感知」,大家期待,我們下期見。