自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

圖解 Kafka 源碼之服務(wù)端啟動(dòng)流程

云計(jì)算 Kafka
從今天開始,我們來(lái)深度剖析 Kafka「Controller」的底層源碼實(shí)現(xiàn),這是 Controller 系列第一篇,我們先回過(guò)頭來(lái)繼續(xù)來(lái)深度聊聊「Kafka 服務(wù)端啟動(dòng)的流程」,看看 Kafka 服務(wù)端是如何啟動(dòng)的。

前面「八篇」文章通過(guò)「場(chǎng)景驅(qū)動(dòng)方式」帶你深度剖析了 Kafka「日志系統(tǒng)」源碼架構(gòu)設(shè)計(jì)的方方面面,從今天開始,我們來(lái)深度剖析 Kafka「Controller」的底層源碼實(shí)現(xiàn),這是 Controller 系列第一篇,我們先回過(guò)頭來(lái)繼續(xù)來(lái)深度聊聊「Kafka  服務(wù)端啟動(dòng)的流程」,看看 Kafka 服務(wù)端是如何啟動(dòng)的。

一、總體概述

在深入剖析Kafka「Controller」之前,我想你可能或多或少會(huì)有這樣的疑問:

Kafka  服務(wù)端都有哪些組件,這些組件又是通過(guò)哪個(gè)類來(lái)啟動(dòng)的呢?

這里我們通過(guò)啟動(dòng) Kafka 來(lái)了解,大家都知道,啟動(dòng) Kafka 可以執(zhí)行以下命令來(lái)啟動(dòng)

# 1、啟動(dòng) kafka 服務(wù)命令:
bin/kafka-server-start.sh config/server.properties &

那么今天就來(lái)看看通過(guò)這個(gè)腳本 KafkaServer 初始化了哪些組件。

二、kafka-server-start.sh

我們來(lái)看下里面的 shell 內(nèi)容,如下:

#!/bin/bash
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# 1、注釋說(shuō)明該腳本的版權(quán)信息和使用許可。
if [ $# -lt 1 ];
then
        echo "USAGE: $0 [-daemon] server.properties [--override property=value]*"
        exit 1
fi
# 2、檢查命令行參數(shù)的個(gè)數(shù),若小于 1 則輸出腳本的使用方法并退出。
base_dir=$(dirname $0)
# 3、獲取當(dāng)前腳本所在目錄的路徑,并將其賦值給 base_dir 變量。
if [ "x$KAFKA_LOG4J_OPTS" = "x" ]; then
    export KAFKA_LOG4J_OPTS="-Dlog4j.cnotallow=file:$base_dir/../config/log4j.properties"
fi
# 4、檢查 KAFKA_LOG4J_OPTS 環(huán)境變量是否設(shè)置,若未設(shè)置則設(shè)置該變量的值。
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
    export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
    export JMX_PORT="9999"
    export JMX_RMI_PORT="10000"
fi
# 5、檢查 KAFKA_HEAP_OPTS 環(huán)境變量是否設(shè)置,若未設(shè)置則設(shè)置該變量的值,并設(shè)置 JMX_PORT 和 JMX_RMI_PORT 環(huán)境變量的值,將 EXTRA_ARGS 變量的值設(shè)置為字符串 -name kafkaServer -loggc。
EXTRA_ARGS=${EXTRA_ARGS-'-name kafkaServer -loggc'}
# 6、檢查命令行參數(shù)中 COMMAND 變量的值是否為 -daemon,若是則將 EXTRA_ARGS 變量的值添加 -daemon 選項(xiàng)。同時(shí)將命令行參數(shù)向左移一位,即從 $2 開始計(jì)算參數(shù)。
COMMAND=$1
case $COMMAND in
  -daemon)
    EXTRA_ARGS="-daemon "$EXTRA_ARGS
    shift
    ;;
  *)
    ;;
esac
# 7、調(diào)用 $base_dir/kafka-run-class.sh 腳本并傳遞相應(yīng)的參數(shù)。其中 "@ 代表傳遞的為命令行參數(shù)。具體執(zhí)行的封裝在 Kafka 客戶端庫(kù)中的 kafka.Kafka 類。整個(gè)腳本的作用是啟動(dòng) Kafka 服務(wù)。
exec $base_dir/kafka-run-class.sh $EXTRA_ARGS kafka.Kafka "$@"
esac
# 7、調(diào)用 $base_dir/kafka-run-class.sh 腳本并傳遞相應(yīng)的參數(shù)。其中 "@ 代表傳遞的為命令行參數(shù)。具體執(zhí)行的封裝在 Kafka 客戶端庫(kù)中的 kafka.Kafka 類。整個(gè)腳本的作用是啟動(dòng) Kafka 服務(wù)。
exec $base_dir/kafka-run-class.sh $EXTRA_ARGS kafka.Kafka "$@"

這里我們重點(diǎn)來(lái)看 「第 7 步」,它底層執(zhí)行的是封裝在 Kafka 客戶端庫(kù)中的 kafka.Kafka 類。接下來(lái)我們來(lái)看下該類都做了什么。

三、kafka.Kafka 類

「Kafka.scala」類源碼在 Kafka 源碼包的 core 包下,具體的 github 源碼位置如下:

https://github.com/apache/kafka/blob/2.8.0/core/src/main/scala/kafka/Kafka.scala。

從整體上來(lái)看,該類就 3 個(gè)方法,相對(duì)比較簡(jiǎn)單,我能來(lái)看下里面的重點(diǎn)。

這里我們通過(guò)「2.8.x」版本來(lái)講解,「2.7.x」還未增加 KafkaRaftServer 類。

1、getPropsFromArgs

def getPropsFromArgs(args: Array[String]): Properties = {
  // 創(chuàng)建一個(gè)命令行參數(shù)解析器
  val optionParser = new OptionParser(false)
  // 定義 --override 選項(xiàng),用于覆蓋 server.properties 文件中的屬性
  val overrideOpt = optionParser.accepts("override", "Optional property that should override values set in server.properties file")
    .withRequiredArg()
    .ofType(classOf[String])
   
  // 定義 --version 選項(xiàng),用于打印版本信息并退出
  optionParser.accepts("version", "Print version information and exit.")
  // 若沒有提供參數(shù)或者參數(shù)包含 --help 選項(xiàng),則打印用法并退出
  if (args.length == 0 || args.contains("--help")) {
    CommandLineUtils.printUsageAndDie(optionParser, "USAGE: java [options] %s server.properties [--override property=value]*".format(classOf[KafkaServer].getSimpleName()))
  }
  // 若參數(shù)中包含 --version 選項(xiàng),則打印版本信息并退出
  if (args.contains("--version")) {
    CommandLineUtils.printVersionAndDie()
  }
  // 加載 server.properties 文件中的屬性到 Properties 對(duì)象中
  val props = Utils.loadProps(args(0))
  // 若提供了其他參數(shù),則解析這些參數(shù)
  if (args.length > 1) {
    // 解析參數(shù)中的選項(xiàng)和參數(shù)值
    val options = optionParser.parse(args.slice(1, args.length): _*)
    // 檢查是否有非選項(xiàng)參數(shù)
    if (options.nonOptionArguments().size() > 0) {
      CommandLineUtils.printUsageAndDie(optionParser, "Found non argument parameters: " + options.nonOptionArguments().toArray.mkString(","))
    }
    // 將解析得到的選項(xiàng)和參數(shù)值添加到 props 對(duì)象中
    props ++= CommandLineUtils.parseKeyValueArgs(options.valuesOf(overrideOpt).asScala)
  }
  // 返回解析得到的屬性集合
  props
}

該函數(shù)的作用是從命令行參數(shù)中解析出屬性集合。它內(nèi)部使用了 OptionParser 類庫(kù)來(lái)解析命令行選項(xiàng),并從 server.properties 文件中加載屬性。

如果提供了 override 選項(xiàng),則它將覆蓋 server.properties 文件中的相應(yīng)屬性。函數(shù)返回一個(gè) Properties 對(duì)象,其中包含了解析得到的屬性。

如果沒有提供正確的命令行參數(shù)或者提供了 --help 或 --version 選項(xiàng),函數(shù)會(huì)打印幫助信息或版本信息并退出。

2、buildServer

private def buildServer(props: Properties): Server = {
    val config = KafkaConfig.fromProps(props, false)
    // 直接啟動(dòng)定時(shí)任務(wù)、網(wǎng)絡(luò)層、請(qǐng)求處理層
    if (config.requiresZookeeper) {
      new KafkaServer(
        config,
        Time.SYSTEM,
        threadNamePrefix = None,
        enableForwarding = false
      )
    } else {
      // 調(diào)用 BrokerServer 等來(lái)啟動(dòng)網(wǎng)絡(luò)層和請(qǐng)求處理層
      new KafkaRaftServer(
        config,
        Time.SYSTEM,
        threadNamePrefix = None
      )
    }
}

在 kafka 2.8.x 版本中 新增了 raft 協(xié)議之后將 BrokerServer、ControllServer 使用了單獨(dú)的文件來(lái)啟動(dòng)最終調(diào)用網(wǎng)絡(luò)層和請(qǐng)求處理層,如果還是使用 zk 的方式啟動(dòng)則是 KafkaServer 啟動(dòng)網(wǎng)絡(luò)層和請(qǐng)求處理層。

3、main

# 2.7.x 版本源碼
def main(args: Array[String]): Unit = {
  try {
    // 1、解析命令行參數(shù),獲得屬性集合
    val serverProps = getPropsFromArgs(args)
    // 2、從屬性集合創(chuàng)建 KafkaServerStartable 對(duì)象
    val kafkaServerStartable = KafkaServerStartable.fromProps(serverProps)
    try {
      // 如果不是 Windows 操作系統(tǒng),并且不是 IBM JDK,則注冊(cè) LoggingSignalHandler
      if (!OperatingSystem.IS_WINDOWS && !Java.isIbmJdk)
        new LoggingSignalHandler().register()
    } catch {
      // 如果注冊(cè) LoggingSignalHandler 失敗,則在日志中打印警告信息
      case e: ReflectiveOperationException =>
        warn("Failed to register optional signal handler that logs a message when the process is terminated " +
          s"by a signal. Reason for registration failure is: $e", e)
    }
    // 3、添加 shutdown hook,用于在程序結(jié)束時(shí)執(zhí)行 KafkaServerStartable 的 shutdown 方法
    Exit.addShutdownHook("kafka-shutdown-hook", kafkaServerStartable.shutdown())
    // 4、啟動(dòng) KafkaServerStartable 實(shí)例
    kafkaServerStartable.startup()
    // 5、等待 KafkaServerStartable 實(shí)例終止
    kafkaServerStartable.awaitShutdown()
  }
  catch {
    // 如果有異常發(fā)生,則記錄日志并退出程序
    case e: Throwable =>
      fatal("Exiting Kafka due to fatal exception", e)
      Exit.exit(1)
  }
  // 6、正常終止程序
  Exit.exit(0)
}

該函數(shù)是 Kafka 服務(wù)進(jìn)程的入口,它是整個(gè) Kafka 運(yùn)行過(guò)程的驅(qū)動(dòng)程序。該函數(shù)首先通過(guò)調(diào)用 getPropsFromArgs 函數(shù)解析命令行參數(shù)并獲得屬性集合,然后使用這些屬性創(chuàng)建 KafkaServerStartable 實(shí)例。接著,它注冊(cè)一個(gè) shutdown hook,用于在程序終止時(shí)執(zhí)行 KafkaServerStartable 的 shutdown 方法。然后它啟動(dòng) KafkaServerStartable 實(shí)例,并等待該實(shí)例終止。如果發(fā)生異常,則記錄日志并退出程序。函數(shù)最后調(diào)用 Exit.exit 方法退出程序,返回 0 表示正常終止。

# 2.8.x 版本
def main(args: Array[String]): Unit = {
    // 獲取Kafka服務(wù)的配置信息
    val serverProps = getPropsFromArgs(args)
    // 根據(jù)配置信息構(gòu)建Kafka服務(wù)
    val server = buildServer(serverProps)
    try {
      // 注冊(cè)用于記錄日志的信號(hào)處理器(若實(shí)現(xiàn)失敗則退出)
      if (!OperatingSystem.IS_WINDOWS && !Java.isIbmJdk)
        new LoggingSignalHandler().register()
    } catch {
      case e: ReflectiveOperationException =>
        warn("Failed to register optional signal handler that logs a message when the process is terminated " +
          s"by a signal. Reason for registration failure is: $e", e)
    }
    // 掛載關(guān)閉處理器,用于捕獲終止信號(hào)和常規(guī)終止請(qǐng)求
    Exit.addShutdownHook("kafka-shutdown-hook", {
      try server.shutdown() // 關(guān)閉Kafka服務(wù)
      catch {
        case _: Throwable =>
          fatal("Halting Kafka.") // 日志記錄致命錯(cuò)誤信息
          // 調(diào)用Exit.halt()強(qiáng)制退出,避免重復(fù)調(diào)用Exit.exit()引發(fā)死鎖
          Exit.halt(1)
      }
    })
    try server.startup() // 啟動(dòng)Kafka服務(wù)
    catch {
      case _: Throwable =>
        // 調(diào)用Exit.exit()設(shè)置退出狀態(tài)碼,KafkaServer.startup()會(huì)在拋出異常時(shí)調(diào)用shutdown()
        fatal("Exiting Kafka.")
        Exit.exit(1)
    }
    server.awaitShutdown() // 等待Kafka服務(wù)關(guān)閉
    Exit.exit(0) // 調(diào)用Exit.exit()設(shè)置退出狀態(tài)碼
}

這里最重要的是 「第 4 步」,調(diào)用 kafkaServerStartable.startup() 或者 server.startup() 來(lái)啟動(dòng) kafka。

這里我們還是以「ZK 模式」的方式來(lái)啟動(dòng),后面抽空再進(jìn)行對(duì) 「Raft 模式」啟動(dòng)進(jìn)行補(bǔ)充。

四、KafkaServerStartable

「KafkaServerStartable.scala」類源碼在 Kafka 源碼包的 core 包下,具體的 github 源碼位置如下:

https://github.com/apache/kafka/blob/2.7.0/core/src/main/scala/kafka/server/KafkaServerStartable.scala。

在 Scala 語(yǔ)言里,在一個(gè)源代碼文件中同時(shí)定義相同名字的 class 和 object 的用法被稱為伴生(Companion)。Class 對(duì)象被稱為伴生類,它和 Java 中的類是一樣的;而 Object 對(duì)象是一個(gè)單例對(duì)象,用于保存一些靜態(tài)變量或靜態(tài)方法。

這里我們主要來(lái)看下 Class 類代碼。

class KafkaServerStartable(val staticServerConfig: KafkaConfig, reporters: Seq[KafkaMetricsReporter], threadNamePrefix: Option[String] = None) extends Logging {
  // 創(chuàng)建 KafkaServer 實(shí)例
  // 構(gòu)造函數(shù)有兩個(gè)參數(shù) —— staticServerConfig 表示靜態(tài)服務(wù)器配置,reporters 表示 Kafka 指標(biāo)報(bào)告器。如果 threadNamePrefix 參數(shù)未用于構(gòu)造函數(shù),則默認(rèn)值為 None。threadNamePrefix 參數(shù)表示線程名稱前綴,用于調(diào)試和維護(hù)目的。
  private val server = new KafkaServer(staticServerConfig, kafkaMetricsReporters = reporters, threadNamePrefix = threadNamePrefix)

  def this(serverConfig: KafkaConfig) = this(serverConfig, Seq.empty)
  // 啟動(dòng) KafkaServer
  // startup 方法嘗試啟動(dòng) Kafka 服務(wù)器。如果啟動(dòng) Kafka 服務(wù)器時(shí)發(fā)生異常,則記錄一條 fatal 錯(cuò)誤日志并退出程序。對(duì)于成功啟動(dòng)的 Kafka 服務(wù)器,它將開始監(jiān)聽客戶端連接,并在收到消息時(shí)執(zhí)行所需的操作。
  def startup(): Unit = {
    try server.startup()
    catch {
      // 如果出現(xiàn)異常,則記錄日志并退出程序
      case _: Throwable =>
        // KafkaServer.startup() calls shutdown() in case of exceptions, so we invoke `exit` to set the status code
        fatal("Exiting Kafka.")
        Exit.exit(1)
    }
  }
  // 關(guān)閉 KafkaServer
  // shutdown 方法嘗試停止 Kafka 服務(wù)器。如果在停止服務(wù)器時(shí)出現(xiàn)異常,則記錄一條 fatal 錯(cuò)誤日志并強(qiáng)制退出程序。調(diào)用 shutdown 方法后,服務(wù)器將不再接受新的請(qǐng)求,并開始等待當(dāng)前進(jìn)行中的請(qǐng)求完成。當(dāng)所有處理中的請(qǐng)求都完成后,服務(wù)器將徹底停止。
  def shutdown(): Unit = {
    try server.shutdown()
    catch {
      // 如果出現(xiàn)異常,則記錄日志并強(qiáng)制退出程序
      case _: Throwable =>
        fatal("Halting Kafka.")
        // Calling exit() can lead to deadlock as exit() can be called multiple times. Force exit.
        Exit.halt(1)
    }
  }
  // setServerState 方法允許從 KafkaServerStartable 對(duì)象中設(shè)置 broker 狀態(tài)。如果自定義 KafkaServerStartable 對(duì)象想要引入新的狀態(tài),則此方法很有用。
  def setServerState(newState: Byte): Unit = {
    server.brokerState.newState(newState)
  }
  // 等待 KafkaServer 退出
  // awaitShutdown 方法等待 Kafka 服務(wù)器完全退出。在 Kafka 服務(wù)器執(zhí)行 shutdown 方法后,它將不再接受新的請(qǐng)求。但是,服務(wù)器可能仍在處理一些已經(jīng)接收的請(qǐng)求。awaitShutdown 方法將阻塞當(dāng)前線程,直到服務(wù)器徹底停止。
  def awaitShutdown(): Unit = server.awaitShutdown()
}

KafkaServerStartable 類是一個(gè)可啟動(dòng)和停止的 Kafka 服務(wù)器。類中的 server 成員變量是 KafkaServer 類的實(shí)例,它將在 KafkaServerStartable 類對(duì)象啟動(dòng)時(shí)創(chuàng)建。該類提供了啟動(dòng)和停止 Kafka 服務(wù)器的方法,以及設(shè)置 broker 狀態(tài)和等待 Kafka 服務(wù)器退出的方法。

跟本文有關(guān)系的是 「啟動(dòng)」方法,它調(diào)用了 KafkaServer#startup 方法進(jìn)行啟動(dòng)。

五、KafkaServer 類

Kafka 集群由多個(gè) Broker 節(jié)點(diǎn)構(gòu)成,每個(gè)節(jié)點(diǎn)上都運(yùn)行著一個(gè) Kafka 實(shí)例,這些實(shí)例之間基于 ZK 來(lái)發(fā)現(xiàn)彼此,并由集群控制器 KafkaController 統(tǒng)籌協(xié)調(diào)運(yùn)行,彼此之間基于 socket 連接進(jìn)行通信。

「KafkaServer.scala」類源碼在 Kafka 源碼包的 core 包下,具體的 github 源碼位置如下:

https://github.com/apache/kafka/blob/2.7.0/core/src/main/scala/kafka/server/KafkaServer.scala。

KafkaServer 為 Kafka 的啟動(dòng)類,其中包含了 Kafka 的所有組件,如 KafkaController、groupCoordinator、replicaManager 等。

class KafkaServer(val config: KafkaConfig, //配置信息
time: Time = Time.SYSTEM, threadNamePrefix: Option[String] = None,
                  kafkaMetricsReporters: Seq[KafkaMetricsReporter] = List() //監(jiān)控上報(bào)
                  ) extends Logging with KafkaMetricsGroup {
  //標(biāo)識(shí)節(jié)點(diǎn)已經(jīng)啟動(dòng)完成
  private val startupComplete = new AtomicBoolean(false)
  //標(biāo)識(shí)節(jié)點(diǎn)正在執(zhí)行關(guān)閉操作
  private val isShuttingDown = new AtomicBoolean(false)
  //標(biāo)識(shí)節(jié)點(diǎn)正在執(zhí)行啟動(dòng)操作
  private val isStartingUp = new AtomicBoolean(false)
  //阻塞主線程等待 KafkaServer 的關(guān)閉
  private var shutdownLatch = new CountDownLatch(1)
  //日志上下文
  private var logContext: LogContext = null
  var metrics: Metrics = null
  //記錄節(jié)點(diǎn)的當(dāng)前狀態(tài)
  val brokerState: BrokerState = new BrokerState
  //API接口類,用于處理數(shù)據(jù)類請(qǐng)求
  var dataPlaneRequestProcessor: KafkaApis = null
  //API接口,用于處理控制類請(qǐng)求
  var controlPlaneRequestProcessor: KafkaApis = null
  //權(quán)限管理
  var authorizer: Option[Authorizer] = None
  //啟動(dòng)socket,監(jiān)聽9092端口,等待接收客戶端請(qǐng)求 
  var socketServer: SocketServer = null
  //數(shù)據(jù)類請(qǐng)求處理線程池
  var dataPlaneRequestHandlerPool: KafkaRequestHandlerPool = null
  //命令類處理線程池
  var controlPlaneRequestHandlerPool: KafkaRequestHandlerPool = null
  //日志管理器    
  var logDirFailureChannel: LogDirFailureChannel = null
  var logManager: LogManager = null
  //副本管理器
  var replicaManager: ReplicaManager = null
  //topic增刪管理器
  var adminManager: AdminManager = null
  //token管理器
  var tokenManager: DelegationTokenManager = null
  //動(dòng)態(tài)配置管理器
  var dynamicConfigHandlers: Map[String, ConfigHandler] = null
  var dynamicConfigManager: DynamicConfigManager = null
  var credentialProvider: CredentialProvider = null
  var tokenCache: DelegationTokenCache = null
  //分組協(xié)調(diào)器
  var groupCoordinator: GroupCoordinator = null
  //事務(wù)協(xié)調(diào)器
  var transactionCoordinator: TransactionCoordinator = null
  //集群控制器
  var kafkaController: KafkaController = null
  //定時(shí)任務(wù)調(diào)度器
  var kafkaScheduler: KafkaScheduler = null
  //集群分區(qū)狀態(tài)信息緩存
  var metadataCache: MetadataCache = null
  //配額管理器
  var quotaManagers: QuotaFactory.QuotaManagers = null
  //zk客戶端配置
  val zkClientConfig: ZKClientConfig = KafkaServer.zkClientConfigFromKafkaConfig(config).getOrElse(new ZKClientConfig())
  private var _zkClient: KafkaZkClient = null
  val correlationId: AtomicInteger = new AtomicInteger(0)
  val brokerMetaPropsFile = "meta.properties"
  val brokerMetadataCheckpoints = config.logDirs.map(logDir => (logDir, new BrokerMetadataCheckpoint(new File(logDir + File.separator + brokerMetaPropsFile)))).toMap
  private var _clusterId: String = null
  private var _brokerTopicStats: BrokerTopicStats = null
  def clusterId: String = _clusterId
  // Visible for testing
  private[kafka] def zkClient = _zkClient
  private[kafka] def brokerTopicStats = _brokerTopicStats
  ....
}

1、startup

該類方法很多,我們這里只看 startup 啟動(dòng)方法,來(lái)看看其內(nèi)部都啟動(dòng)了哪些組件,來(lái)解決本文開頭提出的問題。

/**
   * Start up API for bringing up a single instance of the Kafka server.
   * Instantiates the LogManager, the SocketServer and the request handlers - KafkaRequestHandlers
   */
  def startup(): Unit = {
    try {
      info("starting")
      // 是否已關(guān)閉
      if (isShuttingDown.get)
        throw new IllegalStateException("Kafka server is still shutting down, cannot re-start!")
      // 是否已啟動(dòng)
      if (startupComplete.get)
        return
      // 是否可以啟動(dòng)
      val canStartup = isStartingUp.compareAndSet(false, true)
      if (canStartup) { // 設(shè)置broker狀態(tài)為Starting
        brokerState.newState(Starting)
        /* setup zookeeper */
        // 連接ZK,并創(chuàng)建根節(jié)點(diǎn)
        initZkClient(time)
        /* initialize features */
        _featureChangeListener = new FinalizedFeatureChangeListener(featureCache, _zkClient)
        if (config.isFeatureVersioningSupported) {
          _featureChangeListener.initOrThrow(config.zkConnectionTimeoutMs)
        }
        /* Get or create cluster_id */
        // 從ZK獲取或創(chuàng)建集群id,規(guī)則:UUID的mostSigBits、leastSigBits組合轉(zhuǎn)base64
        _clusterId = getOrGenerateClusterId(zkClient)
        info(s"Cluster ID = $clusterId")
        /* load metadata */
        // 獲取brokerId及l(fā)og存儲(chǔ)路徑,brokerId通過(guò)zk生成或者server.properties配置broker.id
        // 規(guī)則:/brokers/seqid的version值 + maxReservedBrokerId(默認(rèn)1000),保證唯一性
        val (preloadedBrokerMetadataCheckpoint, initialOfflineDirs) = getBrokerMetadataAndOfflineDirs
        /* check cluster id */
        if (preloadedBrokerMetadataCheckpoint.clusterId.isDefined && preloadedBrokerMetadataCheckpoint.clusterId.get != clusterId)
          throw new InconsistentClusterIdException(
            s"The Cluster ID ${clusterId} doesn't match stored clusterId ${preloadedBrokerMetadataCheckpoint.clusterId} in meta.properties. " +
            s"The broker is trying to join the wrong cluster. Configured zookeeper.connect may be wrong.")
        /* generate brokerId */
        config.brokerId = getOrGenerateBrokerId(preloadedBrokerMetadataCheckpoint)
        logContext = new LogContext(s"[KafkaServer id=${config.brokerId}] ")
        // 配置logger
        this.logIdent = logContext.logPrefix
        // initialize dynamic broker configs from ZooKeeper. Any updates made after this will be
        // applied after DynamicConfigManager starts.
        // 初始化AdminZkClient,支持動(dòng)態(tài)修改配置 
        config.dynamicConfig.initialize(zkClient)
        /* start scheduler */
        // 初始化定時(shí)任務(wù)調(diào)度器
        kafkaScheduler = new KafkaScheduler(config.backgroundThreads)
        kafkaScheduler.startup()
        /* create and configure metrics */
        // 創(chuàng)建及配置監(jiān)控,默認(rèn)使用JMX及Yammer Metrics
        kafkaYammerMetrics = KafkaYammerMetrics.INSTANCE
        kafkaYammerMetrics.configure(config.originals)
        val jmxReporter = new JmxReporter()
        jmxReporter.configure(config.originals)
        val reporters = new util.ArrayList[MetricsReporter]
        reporters.add(jmxReporter)
        val metricConfig = KafkaServer.metricConfig(config)
        val metricsContext = createKafkaMetricsContext()
        metrics = new Metrics(metricConfig, reporters, time, true, metricsContext)
        /* register broker metrics */
        _brokerTopicStats = new BrokerTopicStats
        // 初始化配額管理器
        quotaManagers = QuotaFactory.instantiate(config, metrics, time, threadNamePrefix.getOrElse(""))
        notifyClusterListeners(kafkaMetricsReporters ++ metrics.reporters.asScala)
        // 用于保證kafka-log數(shù)據(jù)目錄的存在
        logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size)
        /* start log manager */
        // 啟動(dòng)日志管理器,kafka的消息以日志形式存儲(chǔ)
        logManager = LogManager(config, initialOfflineDirs, zkClient, brokerState, kafkaScheduler, time, brokerTopicStats, logDirFailureChannel)
        // 啟動(dòng)日志清理、刷新、校驗(yàn)、恢復(fù)等的定時(shí)線程
        logManager.startup()
        metadataCache = new MetadataCache(config.brokerId)
        // Enable delegation token cache for all SCRAM mechanisms to simplify dynamic update.
        // This keeps the cache up-to-date if new SCRAM mechanisms are enabled dynamically.
        // SCRAM認(rèn)證方式的token緩存
        tokenCache = new DelegationTokenCache(ScramMechanism.mechanismNames)
        credentialProvider = new CredentialProvider(ScramMechanism.mechanismNames, tokenCache)
        // Create and start the socket server acceptor threads so that the bound port is known.
        // Delay starting processors until the end of the initialization sequence to ensure
        // that credentials have been loaded before processing authentications.
        // 啟動(dòng)socket,監(jiān)聽9092端口,等待接收客戶端請(qǐng)求 
        socketServer = new SocketServer(config, metrics, time, credentialProvider)
        socketServer.startup(startProcessingRequests = false)
        /* start replica manager */
        brokerToControllerChannelManager = new BrokerToControllerChannelManagerImpl(metadataCache, time, metrics, config, threadNamePrefix)
        // 啟動(dòng)副本管理器,高可用相關(guān)
        replicaManager = createReplicaManager(isShuttingDown)
        replicaManager.startup()
        brokerToControllerChannelManager.start()
        // 將broker信息注冊(cè)到ZK上
        val brokerInfo = createBrokerInfo
        val brokerEpoch = zkClient.registerBroker(brokerInfo)
        // Now that the broker is successfully registered, checkpoint its metadata
        // 校驗(yàn) broker 信息
        checkpointBrokerMetadata(BrokerMetadata(config.brokerId, Some(clusterId)))
        /* start token manager */
        // 啟動(dòng) token 管理器
        tokenManager = new DelegationTokenManager(config, tokenCache, time , zkClient)
        tokenManager.startup()
        /* start kafka controller */
        // 啟動(dòng)Kafka控制器,只有 Leader 會(huì)與ZK建連
        kafkaController = new KafkaController(config, zkClient, time, metrics, brokerInfo, brokerEpoch, tokenManager, brokerFeatures, featureCache, threadNamePrefix)
        kafkaController.startup()
        // admin管理器
        adminManager = new AdminManager(config, metrics, metadataCache, zkClient)
        /* start group coordinator */
        // Hardcode Time.SYSTEM for now as some Streams tests fail otherwise, it would be good to fix the underlying issue
        // 啟動(dòng)集群群組協(xié)調(diào)器
        groupCoordinator = GroupCoordinator(config, zkClient, replicaManager, Time.SYSTEM, metrics)
        groupCoordinator.startup()
        /* start transaction coordinator, with a separate background thread scheduler for transaction expiration and log loading */
        // Hardcode Time.SYSTEM for now as some Streams tests fail otherwise, it would be good to fix the underlying issue
        // 啟動(dòng)事務(wù)協(xié)調(diào)器
        transactionCoordinator = TransactionCoordinator(config, replicaManager, new KafkaScheduler(threads = 1, threadNamePrefix = "transaction-log-manager-"), zkClient, metrics, metadataCache, Time.SYSTEM)
        transactionCoordinator.startup()
        /* Get the authorizer and initialize it if one is specified.*/
        // ACL
        authorizer = config.authorizer
        authorizer.foreach(_.configure(config.originals))
        val authorizerFutures: Map[Endpoint, CompletableFuture[Void]] = authorizer match {
          case Some(authZ) =>
            authZ.start(brokerInfo.broker.toServerInfo(clusterId, config)).asScala.map { case (ep, cs) =>
              ep -> cs.toCompletableFuture
            }
          case None =>
            brokerInfo.broker.endPoints.map { ep =>
              ep.toJava -> CompletableFuture.completedFuture[Void](null)
            }.toMap
        }
        // 創(chuàng)建拉取管理器
        val fetchManager = new FetchManager(Time.SYSTEM,
          new FetchSessionCache(config.maxIncrementalFetchSessionCacheSlots,
            KafkaServer.MIN_INCREMENTAL_FETCH_SESSION_EVICTION_MS))
        /* start processing requests */
        // 初始化數(shù)據(jù)類請(qǐng)求的KafkaApis,負(fù)責(zé)數(shù)據(jù)類請(qǐng)求邏輯處理
        dataPlaneRequestProcessor = new KafkaApis(socketServer.dataPlaneRequestChannel, replicaManager, adminManager, groupCoordinator, transactionCoordinator,kafkaController, zkClient, config.brokerId, config, metadataCache, metrics, authorizer, quotaManagers,fetchManager, brokerTopicStats, clusterId, time, tokenManager, brokerFeatures, featureCache)
        // 初始化數(shù)據(jù)類請(qǐng)求處理的線程池  
        dataPlaneRequestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.dataPlaneRequestChannel, dataPlaneRequestProcessor, time,
          config.numIoThreads, s"${SocketServer.DataPlaneMetricPrefix}RequestHandlerAvgIdlePercent", SocketServer.DataPlaneThreadPrefix)
        socketServer.controlPlaneRequestChannelOpt.foreach { controlPlaneRequestChannel =>
          // 初始化控制類請(qǐng)求的 KafkaApis
          controlPlaneRequestProcessor = new KafkaApis(controlPlaneRequestChannel, replicaManager, adminManager, groupCoordinator, transactionCoordinator,
            kafkaController, zkClient, config.brokerId, config, metadataCache, metrics, authorizer, quotaManagers,
            fetchManager, brokerTopicStats, clusterId, time, tokenManager, brokerFeatures, featureCache)
          // 初始化控制類請(qǐng)求的線程池
          controlPlaneRequestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.controlPlaneRequestChannelOpt.get, controlPlaneRequestProcessor, time,
            1, s"${SocketServer.ControlPlaneMetricPrefix}RequestHandlerAvgIdlePercent", SocketServer.ControlPlaneThreadPrefix)
        }
        Mx4jLoader.maybeLoad()
        /* Add all reconfigurables for config change notification before starting config handlers */
        config.dynamicConfig.addReconfigurables(this)
        /* start dynamic config manager */
        dynamicConfigHandlers = Map[String, ConfigHandler](ConfigType.Topic -> new TopicConfigHandler(logManager, config, quotaManagers, kafkaController),
                                                           ConfigType.Client -> new ClientIdConfigHandler(quotaManagers),
                                                           ConfigType.User -> new UserConfigHandler(quotaManagers, credentialProvider),
                                                           ConfigType.Broker -> new BrokerConfigHandler(config, quotaManagers))

        // Create the config manager. start listening to notifications
        // 啟動(dòng)動(dòng)態(tài)配置處理器
        dynamicConfigManager = new DynamicConfigManager(zkClient, dynamicConfigHandlers)
        dynamicConfigManager.startup()
        // 啟動(dòng)請(qǐng)求處理線程
        socketServer.startProcessingRequests(authorizerFutures)
        // 更新broker狀態(tài)
        brokerState.newState(RunningAsBroker)
        shutdownLatch = new CountDownLatch(1)
        startupComplete.set(true)
        isStartingUp.set(false)
        AppInfoParser.registerAppInfo(metricsPrefix, config.brokerId.toString, metrics, time.milliseconds())
        info("started")
      }
    }
    catch {
      case e: Throwable =>
        fatal("Fatal error during KafkaServer startup. Prepare to shutdown", e)
        isStartingUp.set(false)
        shutdown()
        throw e
    }
 }

這里總結(jié)下該方法都啟動(dòng)了哪些組件:

  • initZkClient(time) 初始化 Zk。
  • kafkaScheduler  定時(shí)器。
  • logManager 日志模塊。
  • MetadataCache  元數(shù)據(jù)緩存。
  • socketServer 網(wǎng)絡(luò)服務(wù)器。
  • replicaManager 副本模塊。
  • kafkaController 控制器。
  • groupCoordinator 協(xié)調(diào)器用于和ConsumerCoordinator 交互
  • transactionCoordinator 事務(wù)相關(guān)
  • fetchManager  副本拉取管理器。
  • dynamicConfigManager 動(dòng)態(tài)配置管理器。

2、Broker 狀態(tài)

這個(gè)是在 2.7.x 版本之前的狀態(tài),在 2.8.x 之后版本進(jìn)行了重構(gòu)。

sealed trait BrokerStates { def state: Byte }
case object NotRunning extends BrokerStates { val state: Byte = 0 }
case object Starting extends BrokerStates { val state: Byte = 1 }
case object RecoveringFromUncleanShutdown extends BrokerStates { val state: Byte = 2 }
case object RunningAsBroker extends BrokerStates { val state: Byte = 3 }
case object PendingControlledShutdown extends BrokerStates { val state: Byte = 6 }
case object BrokerShuttingDown extends BrokerStates { val state: Byte = 7 }
  • NotRunning:初始狀態(tài),標(biāo)識(shí)當(dāng)前 broker 節(jié)點(diǎn)未運(yùn)行。
  • Starting:標(biāo)識(shí)當(dāng)前 broker 節(jié)點(diǎn)正在啟動(dòng)中。
  • RecoveringFromUncleanShutdown:標(biāo)識(shí)當(dāng)前 broker 節(jié)點(diǎn)正在從上次非正常關(guān)閉中恢復(fù)。
  • RuningAsBroker:標(biāo)識(shí)當(dāng)前 broker 節(jié)點(diǎn)啟動(dòng)成功,可以對(duì)外提供服務(wù)。
  • PendingControlledShutdown:標(biāo)識(shí)當(dāng)前 broker 節(jié)點(diǎn)正在等待 controlled shutdown 操作完成。
  • BrokerShuttingDown:標(biāo)識(shí)當(dāng)前 broker 節(jié)點(diǎn)正在執(zhí)行 shutdown 操作。

這些就是 KafkaServer 中主要模塊的入口,接下來(lái)的文章會(huì)通過(guò)這些入口一一進(jìn)行分析。

六、總結(jié)

這里,我們一起來(lái)總結(jié)一下這篇文章的重點(diǎn)。

  • 文章開頭通過(guò)對(duì)「kafka-server-start.sh」內(nèi)容進(jìn)行剖析,引出了 「kafka.Kafka」類。
  • 在「kafka.Kafka」的 main 方法中調(diào)用了「KafkaServerStartable」嘗試啟動(dòng) Kafka 服務(wù)器。
  • 接著在 「KafkaServerStartable」的 startup 方法中調(diào)用了 「KafkaServer」的 startup 方法啟動(dòng)服務(wù)器需要的各種組件類。

下篇我們來(lái)深度剖析「Broker 啟動(dòng)集群如何感知」,大家期待,我們下期見。

責(zé)任編輯:姜華 來(lái)源: 華仔聊技術(shù)
相關(guān)推薦

2019-09-20 08:54:38

KafkaBroker消息

2022-09-23 08:02:42

Kafka消息緩存

2023-12-26 08:16:56

Kafka緩存架構(gòu)客戶端

2023-02-22 08:12:30

KafkaSender 線程

2022-05-08 17:53:38

Nacos服務(wù)端客戶端

2021-09-06 09:46:26

Dubbo 服務(wù)端開發(fā)

2021-06-11 06:54:34

Dubbo客戶端服務(wù)端

2017-03-03 09:10:41

2016-03-18 09:04:42

swift服務(wù)端

2023-03-15 08:17:27

Kafka網(wǎng)絡(luò)通信組件

2015-10-12 08:33:06

TCP網(wǎng)絡(luò)協(xié)議服務(wù)端

2021-08-10 20:41:33

AndroidApp流程

2021-04-16 08:54:03

CMS系統(tǒng)redisnode服務(wù)器

2022-03-06 12:15:38

NettyReactor線程

2012-03-02 10:38:33

MySQL

2013-03-25 10:08:44

PHPWeb

2019-09-23 10:47:52

Kafka架構(gòu)微服務(wù)

2022-08-22 08:45:57

Kafka網(wǎng)絡(luò)層源碼實(shí)現(xiàn)

2021-06-30 06:59:47

Zabbix Server服務(wù)端MySQL

2010-08-03 09:59:30

NFS服務(wù)
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)