自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

圖解 Kafka 源碼之 Sender 線程架構(gòu)設(shè)計(jì)

開(kāi)發(fā) 架構(gòu)
今天主要聊聊「發(fā)送網(wǎng)絡(luò) I/O 的 Sender 線程的架構(gòu)設(shè)計(jì)」,深度剖析下消息是如何被發(fā)送出去的。

大家好,我是 華仔, 又跟大家見(jiàn)面了。

原文完整版在星球里面,如果感興趣可以掃文末二維碼加入。

上篇主要帶大家深度剖析了「號(hào)稱承載 Kafka 客戶端消息快遞倉(cāng)庫(kù) RecordAccmulator 的架構(gòu)設(shè)計(jì)」,消息被暫存到累加器中,今天主要聊聊「發(fā)送網(wǎng)絡(luò) I/O 的 Sender 線程的架構(gòu)設(shè)計(jì)」,深度剖析下消息是如何被發(fā)送出去的。

這篇文章干貨很多,希望你可以耐心讀完。

一、總的概述

通過(guò)「場(chǎng)景驅(qū)動(dòng)」的方式,來(lái)看看消息是如何從客戶端發(fā)送出去的。

在上篇中,我們知道了消息被暫存到 Deque<ProducerBatch> 的 batches 中,等「批次已滿」或者「有新批次被創(chuàng)建」后,喚醒 Sender 子線程將消息批量發(fā)送給 Kafka Broker 端。

接下來(lái)我們就來(lái)看看,「Sender 線程的架構(gòu)實(shí)現(xiàn)以及發(fā)送處理流程」,為了方便大家理解,所有的源碼只保留骨干。

二、Sender 線程架構(gòu)設(shè)計(jì)

在《圖解Kafka生產(chǎn)者初始化核心流程》這篇中我們知道 KafkaProducer 會(huì)啟動(dòng)一個(gè)后臺(tái)守護(hù)進(jìn)程,其線程名稱:kafka-producer-network-thread + "|" + clientId。

在 KafkaProducer.java 類有常量定義:NETWORK_THREAD_PREFIX,并啟動(dòng) 守護(hù)線程 KafkaThread 即 ioThread,如果不主動(dòng)關(guān)閉 Sender 線程會(huì)一直執(zhí)行下去。

github 源碼地址如下:

?https://github.com/apache/kafka/blob/2.7/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java?

?https://github.com/apache/kafka/blob/2.7/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java?

?https://github.com/apache/kafka/blob/2.7/clients/src/main/java/org/apache/kafka/clients/RequestCompletionHandler.java?

?https://github.com/apache/kafka/blob/2.7/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java?

public class KafkaProducer<K, V> implements Producer<K, V> {
public static final String NETWORK_THREAD_PREFIX = "kafka-producer-network-thread";
// visible for testing
@SuppressWarnings("unchecked")
KafkaProducer(Map<String, Object> configs,Serializer<K> keySerializer,
Serializer<V> valueSerializer,ProducerMetadata metadata,
KafkaClient kafkaClient,ProducerInterceptors<K, V> interceptors,Time time) {
try {
...
this.sender = newSender(logContext, kafkaClient, this.metadata);
String ioThreadName = NETWORK_THREAD_PREFIX + " | " + clientId;
this.ioThread = new KafkaThread(ioThreadName, this.sender, true);
this.ioThread.start();
...
log.debug("Kafka producer started");
} catch (Throwable t) {
...
}
}
}

從上面得出 Sender 類是一個(gè)線程類, 我們來(lái)看看 Sender 線程的重要字段和方法,并講解其是如何發(fā)送消息和處理消息響應(yīng)的。

1、關(guān)鍵字段

/**
* The background thread that handles the sending of produce requests to the Kafka cluster. This thread makes metadata
* requests to renew its view of the cluster and then sends produce requests to the appropriate nodes.
*/
public class Sender implements Runnable {
/* the state of each nodes connection */
private final KafkaClient client; // 為 Sender 線程提供管理網(wǎng)絡(luò)連接進(jìn)行網(wǎng)絡(luò)讀寫(xiě)
/* the record accumulator that batches records */
private final RecordAccumulator accumulator; // 消息倉(cāng)庫(kù)累加器
/* the metadata for the client */
private final ProducerMetadata metadata; // 生產(chǎn)者元數(shù)據(jù)
/* the flag indicating whether the producer should guarantee the message order on the broker or not. */
private final boolean guaranteeMessageOrder; // 是否保證消息在 broker 端的順序性
/* the maximum request size to attempt to send to the server */
private final int maxRequestSize; //發(fā)送消息最大字節(jié)數(shù)。
/* the number of acknowledgements to request from the server */
private final short acks; // 生產(chǎn)者的消息發(fā)送確認(rèn)機(jī)制
/* the number of times to retry a failed request before giving up */
private final int retries; // 發(fā)送失敗后的重試次數(shù),默認(rèn)為0次

/* true while the sender thread is still running */
private volatile boolean running; // Sender 線程是否還在運(yùn)行中
/* true when the caller wants to ignore all unsent/inflight messages and force close. */
private volatile boolean forceClose; // 是否強(qiáng)制關(guān)閉,此時(shí)會(huì)忽略正在發(fā)送中的消息。
/* the max time to wait for the server to respond to the request*/
private final int requestTimeoutMs; // 等待服務(wù)端響應(yīng)的最大時(shí)間,默認(rèn)30s
/* The max time to wait before retrying a request which has failed */
private final long retryBackoffMs; // 失敗重試退避時(shí)間
/* current request API versions supported by the known brokers */
private final ApiVersions apiVersions; // 所有 node 支持的 api 版本
/* all the state related to transactions, in particular the producer id, producer epoch, and sequence numbers */
private final TransactionManager transactionManager; // 事務(wù)管理,這里忽略 后續(xù)會(huì)有專門(mén)一篇講解事務(wù)相關(guān)的
// A per-partition queue of batches ordered by creation time for tracking the in-flight batches
private final Map<TopicPartition, List<ProducerBatch>> inFlightBatches; // 正在執(zhí)行發(fā)送相關(guān)的消息批次集合, key為分區(qū),value是 list<ProducerBatch> 。

從該類屬性字段來(lái)看比較多,這里說(shuō)幾個(gè)關(guān)鍵字段:

  1. client:KafkaClient 類型,是一個(gè)接口類,Sender 線程主要用它來(lái)實(shí)現(xiàn)真正的網(wǎng)絡(luò)I/O,即 NetworkClient。該字段主要為 Sender 線程提供了網(wǎng)絡(luò)連接管理和網(wǎng)絡(luò)讀寫(xiě)操作能力。
  2. accumulator:RecordAccumulator類型,上一篇的內(nèi)容  圖解 Kafka 源碼之快遞倉(cāng)庫(kù) RecordAccumulator 架構(gòu)設(shè)計(jì),Sender 線程用它獲取待發(fā)送的 node 節(jié)點(diǎn)及批次消息等能力。
  3. metadata:ProducerMetadata類型,生產(chǎn)者元數(shù)據(jù)。因?yàn)榘l(fā)送消息時(shí)要知道分區(qū) Leader 在哪些節(jié)點(diǎn),以及節(jié)點(diǎn)的地址、主題分區(qū)的情況等。
  4. guaranteeMessageOrder:是否保證消息在 broker 端的順序性,參數(shù):max.in.flight.requests.per.connection。
  5. maxRequestSize:?jiǎn)蝹€(gè)請(qǐng)求發(fā)送消息最大字節(jié)數(shù),默認(rèn)為1M,它限制了生產(chǎn)者在單個(gè)請(qǐng)求發(fā)送的記錄數(shù),以避免發(fā)送大量請(qǐng)求。
  6. acks:生產(chǎn)者的消息發(fā)送確認(rèn)機(jī)制。有3個(gè)可選值:0,1,-1/all。
  7. retries:生產(chǎn)者發(fā)送失敗后的重試次數(shù)。默認(rèn)是0次。
  8. running:Sender線程是否還在運(yùn)行中。
  9. forceClose:是否強(qiáng)制關(guān)閉,此時(shí)會(huì)忽略正在發(fā)送中的消息。
  10. requestTimeoutMs:生產(chǎn)者發(fā)送請(qǐng)求后等待服務(wù)端響應(yīng)的最大時(shí)間。如果超時(shí)了且配置了重試次數(shù),會(huì)再次發(fā)送請(qǐng)求,待重試次數(shù)用完后在這個(gè)時(shí)間范圍內(nèi)返回響應(yīng)則認(rèn)為請(qǐng)求最終失敗,默認(rèn) 30 秒。
  11. retryBackoffMs:生產(chǎn)者在發(fā)送請(qǐng)求失敗后可能會(huì)重新發(fā)送失敗的請(qǐng)求,其目的就是防止重發(fā)過(guò)快造成服務(wù)端壓力過(guò)大。默認(rèn)100 ms。
  12. apiVersions:ApicVersions類對(duì)象,保存了每個(gè)node所支持的api版本。
  13. inFlightBatches:正在執(zhí)行發(fā)送相關(guān)的消息批次集合, key為分區(qū),value是 list<|ProducerBatch>。

2、關(guān)鍵方法

Sender 類的方法也不少,這里針對(duì)關(guān)鍵方法逐一講解下。

(1)run()

Sender 線程實(shí)現(xiàn)了 Runnable 接口,會(huì)不斷的調(diào)用 runOnce(),這是一個(gè)典型的循環(huán)事件機(jī)制。

/**
* The main run loop for the sender thread
*/
@Override
public void run() {
log.debug("Starting Kafka producer I/O thread.");
// 這里使用 volatile boolean 類型的變量 running,判斷 Sender 線程是否在運(yùn)行狀態(tài)中。
// 1. 如果 Sender 線程在運(yùn)行狀態(tài)即 running=true,則一直循環(huán)調(diào)用 runOnce() 方法。
while (running) {
try {
// 將緩沖區(qū)的消息發(fā)送到 broker。
runOnce();
} catch (Exception e) {
log.error("Uncaught error in kafka producer I/O thread: ", e);
}
}
log.debug("Beginning shutdown of Kafka producer I/O thread, sending remaining records.");
// 2. 如果(沒(méi)有強(qiáng)制關(guān)閉 && ((消息累加器中還有剩余消息待發(fā)送 || 還有等待未響應(yīng)的消息 ) || 還有事務(wù)請(qǐng)求未完成)),則繼續(xù)發(fā)送剩下的消息。
while (!forceClose && ((this.accumulator.hasUndrained() || this.client.inFlightRequestCount() > 0) || hasPendingTransactionalRequests())) {
try {
// 繼續(xù)執(zhí)行將剩余的消息發(fā)送完畢
runOnce();
} catch (Exception e) {
log.error("Uncaught error in kafka producer I/O thread: ", e);
}
}
// 3. 對(duì)進(jìn)行中的事務(wù)進(jìn)行中斷,則繼續(xù)發(fā)送剩下的消息。
while (!forceClose && transactionManager != null && transactionManager.hasOngoingTransaction()) {
if (!transactionManager.isCompleting()) {
log.info("Aborting incomplete transaction due to shutdown");
transactionManager.beginAbort();
}
try {
runOnce();
} catch (Exception e) {
log.error("Uncaught error in kafka producer I/O thread: ", e);
}
}
// 4. 如果強(qiáng)制關(guān)閉,則關(guān)閉事務(wù)管理器、終止消息的追加并清空未完成的批次
if (forceClose) {
if (transactionManager != null) {
log.debug("Aborting incomplete transactional requests due to forced shutdown");
// 關(guān)閉事務(wù)管理器
transactionManager.close();
}
log.debug("Aborting incomplete batches due to forced shutdown");
// 終止消息的追加并清空未完成的批次
this.accumulator.abortIncompleteBatches();
}
// 5. 關(guān)閉 NetworkClient
try {
this.client.close();
} catch (Exception e) {
log.error("Failed to close network client", e);
}
log.debug("Shutdown of Kafka producer I/O thread has completed.");
}

當(dāng) Sender 線程啟動(dòng)后會(huì)直接運(yùn)行 run() 方法,該方法在 4種情況下會(huì)一直循環(huán)調(diào)用去發(fā)送消息到 Broker。

(2)runOnce()

我們來(lái)看看這個(gè)執(zhí)行業(yè)務(wù)處理的方法,關(guān)于事務(wù)的部分后續(xù)專門(mén)文章講解。

/**
* Run a single iteration of sending
*/
void runOnce() {
if (transactionManager != null) {
... //事務(wù)處理方法 后續(xù)文章專門(mén)講解
}
// 1. 獲取當(dāng)前時(shí)間的時(shí)間戳。
long currentTimeMs = time.milliseconds();
// 2. 調(diào)用 sendProducerData 發(fā)送消息,但并非真正的發(fā)送,而是把消息緩存在 把消息緩存在 KafkaChannel 的 Send 字段里。下一篇會(huì)講解 NetworkClient。
long pollTimeout = sendProducerData(currentTimeMs);
// 3. 讀取消息實(shí)現(xiàn)真正的網(wǎng)絡(luò)發(fā)送
client.poll(pollTimeout, currentTimeMs);
}

該方法比較簡(jiǎn)單,主要做了3件事情:

  1. 獲取當(dāng)前時(shí)間的時(shí)間戳。
  2. 調(diào)用 sendProducerData 發(fā)送消息,但并非真正的發(fā)送,而是把消息緩存在 NetworkClient 的 Send 字段里。下一篇會(huì)講解 NetworkClient。
  3. 讀取 NetworkClient 的 send 字段消息實(shí)現(xiàn)真正的網(wǎng)絡(luò)發(fā)送。

(3)sendProducerData()

該方法主要是獲取已經(jīng)準(zhǔn)備好的節(jié)點(diǎn)上的批次數(shù)據(jù)并過(guò)濾過(guò)期的批次集合,最后暫存消息。

private long sendProducerData(long now) {
// 1. 獲取元數(shù)據(jù)
Cluster cluster = metadata.fetch();
// get the list of partitions with data ready to send
// 2. 獲取已經(jīng)準(zhǔn)備好的節(jié)點(diǎn)以及找不到 Leader 分區(qū)對(duì)應(yīng)的節(jié)點(diǎn)的主題
RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);
// if there are any partitions whose leaders are not known yet, force metadata update
// 3. 如果主題 Leader 分區(qū)對(duì)應(yīng)的節(jié)點(diǎn)不存在,則強(qiáng)制更新元數(shù)據(jù)
if (!result.unknownLeaderTopics.isEmpty()) {
// 添加 topic 到?jīng)]有拉取到元數(shù)據(jù)的 topic 集合中,并標(biāo)識(shí)需要更新元數(shù)據(jù)
for (String topic : result.unknownLeaderTopics)
this.metadata.add(topic, now);
...
// 針對(duì)這個(gè) topic 集合進(jìn)行元數(shù)據(jù)更新
this.metadata.requestUpdate();
}
// 4. 循環(huán) readyNodes 并檢查客戶端與要發(fā)送節(jié)點(diǎn)的網(wǎng)絡(luò)是否已經(jīng)建立好了
Iterator<Node> iter = result.readyNodes.iterator();
long notReadyTimeout = Long.MAX_VALUE;
while (iter.hasNext()) {
Node node = iter.next();
// 檢查客戶端與要發(fā)送節(jié)點(diǎn)的網(wǎng)絡(luò)是否已經(jīng)建立好了
if (!this.client.ready(node, now)) {
// 移除對(duì)應(yīng)節(jié)點(diǎn)
iter.remove();
notReadyTimeout = Math.min(notReadyTimeout, this.client.pollDelayMs(node, now));
}
}
// create produce requests
// 5. 獲取上面返回的已經(jīng)準(zhǔn)備好的節(jié)點(diǎn)上要發(fā)送的 ProducerBatch 集合
Map<Integer, List<ProducerBatch>> batches = this.accumulator.drain(cluster, result.readyNodes, this.maxRequestSize, now);
// 6. 將從消息累加器中讀取的數(shù)據(jù)集,放入正在執(zhí)行發(fā)送相關(guān)的消息批次集合中
addToInflightBatches(batches);
// 7. 要保證消息的順序性,即參數(shù) max.in.flight.requests.per.cnotallow=1
if (guaranteeMessageOrder) {
// Mute all the partitions drained
for (List<ProducerBatch> batchList : batches.values()) {
for (ProducerBatch batch : batchList)
// 對(duì) tp 進(jìn)行 mute,保證只有一個(gè) batch 正在發(fā)送
this.accumulator.mutePartition(batch.topicPartition);
}
}
// 重置下一批次的過(guò)期時(shí)間
accumulator.resetNextBatchExpiryTime();
// 8. 從正在執(zhí)行發(fā)送數(shù)據(jù)集合 inflightBatches 中獲取過(guò)期集合
List<ProducerBatch> expiredInflightBatches = getExpiredInflightBatches(now);
// 9. 從 batches 中獲取過(guò)期集合
List<ProducerBatch> expiredBatches = this.accumulator.expiredBatches(now);
// 10. 從 inflightBatches 與 batches 中查找已過(guò)期的消息批次(ProducerBatch),判斷批次是否過(guò) 期是根據(jù)系統(tǒng)當(dāng)前時(shí)間與 ProducerBatch 創(chuàng)建時(shí)間之差是否超過(guò)120s,過(guò)期時(shí)間可以通過(guò)參數(shù) delivery.timeout.ms 設(shè)置。
expiredBatches.addAll(expiredInflightBatches);

// 如果過(guò)期批次不為空 則輸出對(duì)應(yīng)日志
if (!expiredBatches.isEmpty())
log.trace("Expired {} batches in accumulator", expiredBatches.size());
// 11. 處理已過(guò)期的消息批次,通知該批消息發(fā)送失敗并返回給客戶端
for (ProducerBatch expiredBatch : expiredBatches) {
// 處理當(dāng)前過(guò)期ProducerBatch的回調(diào)結(jié)果 ProduceRequestResult,并且設(shè)置超時(shí)異常 new TimeoutException(errorMessage)
String errorMessage = "Expiring " + expiredBatch.recordCount + " record(s) for " + expiredBatch.topicPartition
+ ":" + (now - expiredBatch.createdMs) + " ms has passed since batch creation";
// 通知該批消息發(fā)送失敗并返回給客戶端
failBatch(expiredBatch, -1, NO_TIMESTAMP, new TimeoutException(errorMessage), false);
// ... 事務(wù)管理器的處理忽略
}
// 收集統(tǒng)計(jì)指標(biāo),后續(xù)會(huì)專門(mén)對(duì) Kafka 的 Metrics 進(jìn)行分析
sensors.updateProduceRequestMetrics(batches);

// 設(shè)置下一次的發(fā)送延時(shí)
long pollTimeout = Math.min(result.nextReadyCheckDelayMs, notReadyTimeout);
pollTimeout = Math.min(pollTimeout, this.accumulator.nextExpiryTimeMs() - now);
pollTimeout = Math.max(pollTimeout, 0);
if (!result.readyNodes.isEmpty()) {
log.trace("Nodes with data ready to send: {}", result.readyNodes);
pollTimeout = 0;
}
// 12. 發(fā)送消息暫存到 NetworkClient send 字段里。
sendProduceRequests(batches, now);
return pollTimeout;
}

該方法主要做了12件事情,逐一說(shuō)明下:

  1. 首先獲取元數(shù)據(jù),這里主要是根據(jù)元數(shù)據(jù)的更新機(jī)制來(lái)保證數(shù)據(jù)的準(zhǔn)確性。
  2. 獲取已經(jīng)準(zhǔn)備好的節(jié)點(diǎn)。accumulator#reay() 方法會(huì)通過(guò)發(fā)送記錄對(duì)應(yīng)的節(jié)點(diǎn)和元數(shù)據(jù)進(jìn)行比較,返回結(jié)果中包括兩個(gè)重要的集合:「準(zhǔn)備好發(fā)送的節(jié)點(diǎn)集合 readyNodes」、「找不到 Leader 分區(qū)對(duì)應(yīng)節(jié)點(diǎn)的主題 unKnownLeaderTopic」。
  3. 如果主題 Leader 分區(qū)對(duì)應(yīng)的節(jié)點(diǎn)不存在,則強(qiáng)制更新元數(shù)據(jù)。
  4. 循環(huán) readyNodes 并檢查客戶端與要發(fā)送節(jié)點(diǎn)的網(wǎng)絡(luò)是否已經(jīng)建立好了。在 NetworkClient 中維護(hù)了客戶端與所有節(jié)點(diǎn)的連接,這樣就可以通過(guò)連接的狀態(tài)判斷是否連接正常。
  5. 獲取上面返回的已經(jīng)準(zhǔn)備好的節(jié)點(diǎn)上要發(fā)送的 ProducerBatch 集合。accumulator#drain() 方法就是將 「TopicPartition」-> 「ProducerBatch 集合」的映射關(guān)系轉(zhuǎn)換成 「Node 節(jié)點(diǎn)」->「ProducerBatch 集合」的映射關(guān)系,如下圖所示,這樣的話按照節(jié)點(diǎn)方式只需要2次就完成,大大減少網(wǎng)絡(luò)的開(kāi)銷(xiāo)。

圖片

  1. 將從消息累加器中讀取的數(shù)據(jù)集,放入正在執(zhí)行發(fā)送相關(guān)的消息批次集合中。
  2. 要保證消息的順序性,即參數(shù) max.in.flight.requests.per.cnotallow=1,會(huì)添加到 muted 集合,保證只有一個(gè) batch 在發(fā)送。
  3. 從正在執(zhí)行發(fā)送數(shù)據(jù)集合 inflightBatches 中獲取過(guò)期集合。
  4. 從 accumulator 累加器的 batches 中獲取過(guò)期集合。
  5. 從 inflightBatches 與 batches 中查找已過(guò)期的消息批次(ProducerBatch),判斷批次是否過(guò)期是根據(jù)系統(tǒng)當(dāng)前時(shí)間與 ProducerBatch 創(chuàng)建時(shí)間之差是否超過(guò)120s,過(guò)期時(shí)間可以通過(guò)參數(shù) delivery.timeout.ms 設(shè)置。
  6. 處理已過(guò)期的消息批次,通知該批消息發(fā)送失敗并返回給客戶端。
  7. 發(fā)送消息暫存到 NetworkClient send 字段里。

從上面源碼可以看出,SendProducerData 方法中調(diào)用到了 Sender 線程類中多個(gè)方法,這里就不一一講解了。

三、Sender 發(fā)送流程分析

通過(guò)前兩部分的源碼解讀和剖析,我們可以得出 Sender 線程的處理流程可以分為兩大部分:「發(fā)送請(qǐng)求」、「接收響應(yīng)結(jié)果」。

1、發(fā)送請(qǐng)求

從 runOnce 方法可以得出發(fā)送請(qǐng)求也分兩步:「消息預(yù)發(fā)送」、「真正的網(wǎng)絡(luò)發(fā)送」。

void runOnce() {
// 1. 把消息緩存在 KafkaChannel 的 Send 字段里。
long pollTimeout = sendProducerData(currentTimeMs);
// 2. 讀取消息實(shí)現(xiàn)真正的網(wǎng)絡(luò)發(fā)送
client.poll(pollTimeout, currentTimeMs);
}

2、接收響應(yīng)結(jié)果

等 Sender 線程收到 Broker 端的響應(yīng)結(jié)果后,會(huì)根據(jù)響應(yīng)結(jié)果分情況進(jìn)行處理。

3、時(shí)序圖

原文完整版在星球里面,如果感興趣可以掃文末二維碼加入。

四、總結(jié)

這里,我們一起來(lái)總結(jié)一下這篇文章的重點(diǎn)。

1、開(kāi)篇總述消息被暫存到 Deque<ProducerBatch> 的 batches 中,等「批次已滿」或者「有新批次被創(chuàng)建」后,喚醒 Sender 子線程將消息批量發(fā)送給 Kafka Broker 端,從而引出「Sender 線程」。

2、帶你深度剖析了「Sender 線程」 的發(fā)送消息以及響應(yīng)處理的相關(guān)方法。

3、最后帶你串聯(lián)了整個(gè)消息發(fā)送的流程,讓你有個(gè)更好的整體認(rèn)知。

責(zé)任編輯:姜華 來(lái)源: 華仔聊技術(shù)
相關(guān)推薦

2023-03-15 08:17:27

Kafka網(wǎng)絡(luò)通信組件

2023-12-26 08:16:56

Kafka緩存架構(gòu)客戶端

2022-03-29 15:10:22

架構(gòu)設(shè)計(jì)模型

2022-09-23 08:02:42

Kafka消息緩存

2019-09-20 08:54:38

KafkaBroker消息

2023-08-14 08:17:13

Kafka服務(wù)端

2021-11-01 17:17:13

Kafka 高并發(fā)場(chǎng)景

2024-03-14 08:33:13

kafka三高架構(gòu)Zookeeper

2015-06-02 04:17:44

架構(gòu)設(shè)計(jì)審架構(gòu)設(shè)計(jì)說(shuō)明書(shū)

2021-06-10 07:49:27

Kafka 架構(gòu)設(shè)計(jì)

2024-08-23 16:04:45

2012-05-30 09:43:45

業(yè)務(wù)邏輯層

2015-06-02 04:34:05

架構(gòu)設(shè)計(jì)

2023-04-13 08:23:28

軟件架構(gòu)設(shè)計(jì)

2014-05-19 10:08:36

IM系統(tǒng)架構(gòu)設(shè)計(jì)

2009-06-22 14:48:21

DRY架構(gòu)設(shè)計(jì)

2024-09-18 09:04:33

架構(gòu)模式查詢

2024-10-30 10:06:51

2021-12-07 07:32:09

kafka架構(gòu)原理

2023-08-16 12:34:16

同步備份異步備份
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)