自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

深度剖析 Kafka 日志保留與數(shù)據(jù)清理策略

開(kāi)發(fā)
本文從原理到源碼詳細(xì)分析了 Kafka 的日志保留和數(shù)據(jù)清理策略,Kafka 能夠在保證數(shù)據(jù)持久化的同時(shí),最大限度地利用存儲(chǔ)資源。。

Log 是Kafka的核心組件之一,用于持久化存儲(chǔ)消息,為了有效管理存儲(chǔ)空間和保證系統(tǒng)性能,Kafka 提供了日志保留和數(shù)據(jù)清理策略。這篇文章,我將詳細(xì)分析它們的工作原理。

一、日志保留

Kafka 的日志保留策略決定了消息在 Kafka 中存儲(chǔ)的時(shí)間長(zhǎng)度,保留策略可以基于時(shí)間或日志大小來(lái)配置。當(dāng)消息超過(guò)指定的保留時(shí)間或日志大小限制時(shí),Kafka 將自動(dòng)清理這些消息以釋放存儲(chǔ)空間。

1.日志保留配置

Kafka 提供了多種配置選項(xiàng)以控制日志保留策略:

  • log.retention.hours: 定義消息在日志中保留的時(shí)間(以小時(shí)為單位),默認(rèn)值為 168 小時(shí)(7 天)。
  • log.retention.minutes: 以分鐘為單位的保留時(shí)間。
  • log.retention.ms: 以毫秒為單位的保留時(shí)間。
  • log.retention.bytes: 定義每個(gè)日志分區(qū)允許使用的最大存儲(chǔ)空間,當(dāng)達(dá)到此限制時(shí),最早的消息將被刪除。

需要注意的是,時(shí)間和大小限制是互斥的,Kafka 將依據(jù)首先滿足的條件來(lái)清理日志。

2.日志清理策略

Kafka 提供兩種主要的日志清理策略:

  • 刪除策略(delete): 在達(dá)到保留期后刪除舊數(shù)據(jù)。
  • 壓縮策略(compact): 針對(duì)具有相同鍵的記錄,只保留最新版本。

默認(rèn)情況下,Kafka 使用刪除策略。日志清理策略可以通過(guò) log.cleanup.policy 配置,其中 delete 和 compact 都可以作為其值。

二、日志清理

Kafka 的日志清理是在后臺(tái)運(yùn)行的,它并不影響正常的讀寫(xiě)操作,日志清理策略主要包含刪除策略和壓縮策略 2種類型:

1.刪除策略

刪除策略是最簡(jiǎn)單的日志清理機(jī)制,Kafka 定期檢查日志分區(qū)的時(shí)間戳或大小,當(dāng)某個(gè)分區(qū)超過(guò)指定的保留時(shí)間或大小時(shí),系統(tǒng)會(huì)刪除該分區(qū)的舊日志段(Log Segment)。具體過(guò)程如下:

  • 檢查條件: Kafka 定期比較當(dāng)前時(shí)間與日志段創(chuàng)建時(shí)間的差值,或檢查日志分區(qū)的大小是否超過(guò)配置的限制。
  • 標(biāo)記刪除: 符合刪除條件的日志段被標(biāo)記為刪除。
  • 物理刪除: 在下一個(gè)清理周期中,Kafka 將實(shí)際刪除這些標(biāo)記的日志段以釋放磁盤(pán)空間。

2.壓縮策略

壓縮策略主要用于僅保留每個(gè)鍵的最新消息版本,它適用于更新頻繁的場(chǎng)景,例如數(shù)據(jù)庫(kù)變更日志。壓縮策略的工作流程如下:

  • 收集日志段: Kafka 定期掃描日志段,識(shí)別出需要壓縮的段。
  • 構(gòu)建索引: 為每個(gè)日志段構(gòu)建一個(gè)映射,記錄每個(gè)鍵的最新偏移量。
  • 合并日志段: 確定每個(gè)鍵的最新消息后,Kafka 將這些消息寫(xiě)入新的日志段。
  • 替換舊日志段: 新日志段生成后,Kafka 替換舊的日志段,并在下次清理時(shí)刪除舊段。

三、核心源碼分析

為了更深入理解 Kafka 的日志清理機(jī)制,接下來(lái)會(huì)分析幾個(gè)相關(guān)的核心源碼類:

1.LogCleaner 類

LogCleaner 是 Kafka 中負(fù)責(zé)日志壓縮(compaction)的核心組件之一,它的主要功能是定期掃描 Kafka 日志,并對(duì)其進(jìn)行壓縮,以確保每個(gè)鍵只保留最新的值。下面是對(duì) LogCleaner 源碼的詳細(xì)分析。

(1) LogCleaner 的基本結(jié)構(gòu)

LogCleaner 繼承自 ShutdownableThread,這意味著它是一個(gè)可以安全關(guān)閉的后臺(tái)線程,其主要職責(zé)是從需要壓縮的日志中清除冗余消息。

public class LogCleaner extends ShutdownableThread {
    // 主要成員變量
    private final CleanerConfig config;
    private final OffsetCheckpoint checkpoint;
    private final Time time;
    private final Cleaner cleaner;

    public LogCleaner(String name, CleanerConfig config, OffsetCheckpoint checkpoint, Time time) {
        super(name, true);
        this.config = config;
        this.checkpoint = checkpoint;
        this.time = time;
        this.cleaner = new Cleaner(config, time);
    }

    @Override
    public void doWork() {
        // 核心清理邏輯
    }

(2) 核心方法分析

① doWork()

doWork() 是 LogCleaner 的核心方法,它被定期調(diào)用以執(zhí)行日志壓縮任務(wù)。

@Override
public void doWork() {
    // 從清理隊(duì)列中獲取下一個(gè)需要清理的日志
    LogToClean logToClean = cleanerManager.grabFilthiestLog();
    if (logToClean != null) {
        try {
            // 執(zhí)行壓縮
            cleaner.clean(logToClean);
        } finally {
            // 釋放資源
            cleanerManager.doneCleaning(logToClean);
        }
    } else {
        // 如果沒(méi)有日志需要清理,則線程休眠一段時(shí)間
        time.sleep(config.backOffMs);
    }
}

該方法的主要步驟包括:

  • 從 cleanerManager 中獲取下一個(gè)需要清理的日志。
  • 調(diào)用 cleaner.clean() 方法對(duì)日志進(jìn)行壓縮。
  • 完成后,釋放資源并更新清理狀態(tài)。

② clean()

clean() 方法是 Cleaner 類中的一個(gè)重要方法,負(fù)責(zé)具體的日志壓縮操作。

public void clean(LogToClean logToClean) {
    // 獲取需要壓縮的日志段
    List<LogSegment> segments = logToClean.segments();
    
    // 創(chuàng)建一個(gè)新的日志段用于存儲(chǔ)壓縮后的數(shù)據(jù)
    LogSegment newSegment = new LogSegment(...);

    // 遍歷舊段,壓縮數(shù)據(jù)并寫(xiě)入新段
    for (LogSegment segment : segments) {
        // 讀取每個(gè)消息
        for (MessageAndOffset message : segment) {
            // 檢查是否是最新的消息
            if (isLatest(message)) {
                newSegment.append(message);
            }
        }
    }

    // 替換舊段
    logToClean.replaceSegments(newSegment);
}

clean() 方法的主要步驟包括:

  • 獲取需要壓縮的日志段。
  • 創(chuàng)建新的日志段以存儲(chǔ)壓縮后的數(shù)據(jù)。
  • 遍歷舊日志段,選出每個(gè)鍵的最新消息并寫(xiě)入新段。
  • 替換舊日志段為新段。

2.LogSegment 類

LogSegment 是 Kafka 中表示日志文件的基本單位。每個(gè) Kafka 主題分區(qū)由多個(gè)日志段(LogSegment)組成。每個(gè)日志段包括一個(gè)日志文件和一個(gè)索引文件。下面是對(duì) LogSegment 類的源碼分析,幫助理解其結(jié)構(gòu)和功能。

(1) LogSegment 的基本結(jié)構(gòu)

LogSegment 類位于 Kafka 的 log 包中,表示一個(gè)日志段。它包含兩個(gè)主要文件:數(shù)據(jù)文件(存儲(chǔ)消息)和索引文件(存儲(chǔ)消息的偏移量)。

public class LogSegment {
    private final File log;
    private final FileMessageSet messageSet;
    private final OffsetIndex index;
    private final TimeIndex timeIndex;
    private final long baseOffset;
    private final long created;
    private final AtomicLong nextOffset;
    private final AtomicLong nextTimeIndexEntry;
    // 其他成員變量和方法
}

(2) 核心構(gòu)造函數(shù)

LogSegment 的構(gòu)造函數(shù)負(fù)責(zé)初始化日志段的各個(gè)組件,包括數(shù)據(jù)文件和索引文件。

public LogSegment(File logFile,
                  FileMessageSet messageSet,
                  OffsetIndex offsetIndex,
                  TimeIndex timeIndex,
                  long baseOffset,
                  long created) {
    this.log = logFile;
    this.messageSet = messageSet;
    this.index = offsetIndex;
    this.timeIndex = timeIndex;
    this.baseOffset = baseOffset;
    this.created = created;
    this.nextOffset = new AtomicLong(baseOffset);
    this.nextTimeIndexEntry = new AtomicLong(baseOffset);
}

(3) 主要方法分析

① append()

append() 方法用于向日志段追加消息,它將消息寫(xiě)入數(shù)據(jù)文件,并在索引文件中記錄偏移量信息。

public void append(long offset, RecordBatch batch) {
    // 將消息追加到數(shù)據(jù)文件
    int physicalPosition = messageSet.append(batch);

    // 更新偏移量索引
    index.append(offset, physicalPosition);

    // 更新時(shí)間索引
    if (batch.maxTimestamp() > 0) {
        timeIndex.maybeAppend(batch.maxTimestamp(), offset);
    }

    // 更新下一個(gè)可用偏移量
    nextOffset.set(offset + 1);
}

② read()

read() 方法用于從日志段讀取消息,它根據(jù)給定的偏移量和大小,返回相應(yīng)的消息集合。

public FileMessageSet read(long startOffset, int maxSize) {
    // 計(jì)算讀取的起始位置和大小
    int startPosition = index.lookup(startOffset).position;
    return messageSet.read(startPosition, maxSize);
}

③ delete()

delete() 方法用于刪除日志段的物理文件,它會(huì)刪除數(shù)據(jù)文件和索引文件。

public void delete() {
    boolean deletedLog = log.delete();
    boolean deletedIndex = index.delete();
    boolean deletedTimeIndex = timeIndex.delete();
    if (!deletedLog || !deletedIndex || !deletedTimeIndex) {
        throw new KafkaException("Failed to delete log segment files.");
    }
}

四、優(yōu)化建議

Kafka 的日志清理機(jī)制可以通過(guò)多種配置進(jìn)行優(yōu)化,以適應(yīng)不同的業(yè)務(wù)需求。以下是一些常見(jiàn)的優(yōu)化建議:

  • 合理設(shè)置保留時(shí)間:根據(jù)數(shù)據(jù)的重要性和訪問(wèn)頻率,合理設(shè)置日志的保留時(shí)間。對(duì)于不常訪問(wèn)的數(shù)據(jù),可以適當(dāng)縮短保留時(shí)間,以節(jié)省存儲(chǔ)空間。
  • 調(diào)整日志段大小:通過(guò)設(shè)置 log.segment.bytes,可以控制每個(gè)日志段的大小。適當(dāng)?shù)娜罩径未笮】梢蕴岣咔謇硇?,避免頻繁的段切換。
  • 配置清理線程:Kafka 允許配置清理線程的數(shù)量和頻率。通過(guò) log.cleaner.threads 和 log.cleaner.interval.ms 配置,可以優(yōu)化清理線程的性能。

五、總結(jié)

本文,我們從原理到源碼詳細(xì)分析了 Kafka 的日志保留和數(shù)據(jù)清理策略,在日常工作種,通過(guò)合理配置和優(yōu)化這些策略,Kafka 能夠在保證數(shù)據(jù)持久化的同時(shí),最大限度地利用存儲(chǔ)資源。

責(zé)任編輯:趙寧寧 來(lái)源: 猿java
相關(guān)推薦

2024-08-07 10:54:27

MySQL日志策略

2010-02-05 15:33:29

Android JDK

2022-05-05 10:00:53

Kafka分區(qū)分配Linux

2010-05-20 18:05:38

2024-12-24 14:01:10

2022-11-07 09:25:02

Kafka存儲(chǔ)架構(gòu)

2011-11-21 15:04:30

2012-02-17 10:50:10

Java

2016-11-25 20:52:14

Linux

2024-07-29 00:01:00

RabbitMQ消息堆積

2022-09-27 18:56:28

ArrayList數(shù)組源代碼

2024-02-05 19:06:04

DartVMGC流程

2025-02-07 12:11:52

2025-01-02 10:19:18

2023-10-12 19:41:55

2017-06-23 18:25:51

kafka數(shù)據(jù)可靠性

2010-05-18 11:28:57

MySQL binlo

2013-07-02 10:08:46

爛代碼代碼優(yōu)化代碼清理

2018-10-29 13:07:15

HBase存儲(chǔ)遷移

2020-09-04 06:32:08

緩存數(shù)據(jù)庫(kù)接口
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)