輕量級的日志采集組件 Filebeat 講解與實戰(zhàn)操作
一、概述
Filebeat是一個輕量級的日志數(shù)據(jù)收集工具,屬于Elastic公司的Elastic Stack(ELK Stack)生態(tài)系統(tǒng)的一部分。它的主要功能是從各種來源收集日志數(shù)據(jù),將數(shù)據(jù)發(fā)送到Elasticsearch、Logstash或其他目標,以便進行搜索、分析和可視化。
以下是Filebeat的主要概述和特點:
- 輕量級:Filebeat是一個輕量級的代理,對系統(tǒng)資源的消耗非常低。它設計用于高性能和低延遲,可以在各種環(huán)境中運行,包括服務器、容器和虛擬機。
- 多源收集:Filebeat支持從各種來源收集數(shù)據(jù),包括日志文件、系統(tǒng)日志、Docker容器日志、Windows事件日志等。它具有多個輸入模塊,可以輕松配置用于不同數(shù)據(jù)源的數(shù)據(jù)收集。
- 模塊化:Filebeat采用模塊化的方式組織配置,每個輸入類型都可以作為一個模塊,易于擴展和配置。這使得添加新的數(shù)據(jù)源和日志格式變得更加簡單。
- 自動發(fā)現(xiàn):Filebeat支持自動發(fā)現(xiàn)服務,可以在容器化環(huán)境中自動識別新的容器和服務,并開始收集其日志數(shù)據(jù)。
- 安全性:Filebeat支持安全傳輸,可以使用TLS/SSL加密協(xié)議將數(shù)據(jù)安全地傳輸?shù)侥繕?。它還支持基于令牌的身份驗證。
- 數(shù)據(jù)處理:Filebeat可以對數(shù)據(jù)進行簡單的處理,如字段分割、字段重命名和數(shù)據(jù)過濾,以確保數(shù)據(jù)適合進一步處理和分析。
- 目標輸出:Filebeat可以將數(shù)據(jù)發(fā)送到多個目標,最常見的是將數(shù)據(jù)發(fā)送到Elasticsearch,以便進行全文搜索和分析。此外,還可以將數(shù)據(jù)發(fā)送到Logstash、Kafka等目標。
- 實時性:Filebeat可以以實時方式收集和傳輸數(shù)據(jù),確保日志數(shù)據(jù)及時可用于分析和可視化。
- 監(jiān)控和管理:Filebeat具有自身的監(jiān)控功能,可以監(jiān)視自身的狀態(tài)和性能,并與Elasticsearch、Kibana等工具集成,用于管理和監(jiān)控數(shù)據(jù)收集。
工作的流程圖如下:
圖片
Filebeat的采集原理的主要步驟
- 數(shù)據(jù)源檢測:
Filebeat首先配置要監(jiān)視的數(shù)據(jù)源,這可以是日志文件、系統(tǒng)日志、Docker容器日志、Windows事件日志等。Filebeat可以通過輸入模塊配置來定義數(shù)據(jù)源。
- 數(shù)據(jù)收集:
一旦數(shù)據(jù)源被定義,F(xiàn)ilebeat會定期輪詢這些數(shù)據(jù)源,檢查是否有新的數(shù)據(jù)產(chǎn)生。
如果有新數(shù)據(jù),F(xiàn)ilebeat將讀取數(shù)據(jù)并將其發(fā)送到后續(xù)處理階段。
數(shù)據(jù)處理:
Filebeat可以對采集到的數(shù)據(jù)進行一些簡單的處理,例如字段分割、字段重命名、數(shù)據(jù)解析等。這有助于確保數(shù)據(jù)格式適合進一步的處理和分析。
數(shù)據(jù)傳輸:
采集到的數(shù)據(jù)將被傳輸?shù)揭粋€或多個目標位置,通常是Elasticsearch、Logstash或Kafka等。
Filebeat可以配置多個輸出目標,以便將數(shù)據(jù)復制到多個地方以增加冗余或分發(fā)數(shù)據(jù)。
安全性和可靠性:
Filebeat支持安全傳輸,可以使用TLS/SSL協(xié)議對數(shù)據(jù)進行加密。它還具有數(shù)據(jù)重試機制,以確保數(shù)據(jù)能夠成功傳輸?shù)侥繕宋恢谩?/p>
數(shù)據(jù)目的地:
數(shù)據(jù)被傳輸?shù)侥繕宋恢煤?,可以被進一步處理、索引和分析。目標位置通常是Elasticsearch,用于全文搜索和分析,或者是Logstash用于進一步的數(shù)據(jù)處理和轉(zhuǎn)換,也可以是Kafka等其他消息隊列。
實時性和監(jiān)控:
Filebeat可以以實時方式監(jiān)視數(shù)據(jù)源,確保新數(shù)據(jù)能夠快速傳輸和處理。
Filebeat還可以與監(jiān)控工具集成,以監(jiān)控其自身的性能和狀態(tài),并將這些數(shù)據(jù)發(fā)送到監(jiān)控系統(tǒng)中。
總的來說,F(xiàn)ilebeat采集原理是通過輪詢監(jiān)視數(shù)據(jù)源,將新數(shù)據(jù)采集并發(fā)送到目標位置,同時確保數(shù)據(jù)的安全傳輸和可靠性。它提供了一種高效且靈活的方式來處理各種類型的日志和事件數(shù)據(jù),以便進行后續(xù)的分析和可視化。
二、Kafka 安裝
為了快速部署,這里選擇通過docker-compose部署,可以參考我這篇文章:【中間件】通過 docker-compose 快速部署 Kafka 保姆級教程
# 先安裝 zookeeper
git clone https://gitee.com/hadoop-bigdata/docker-compose-zookeeper.git
cd docker-compose-zookeeper
docker-compose -f docker-compose.yaml up -d
# 安裝kafka
git clone https://gitee.com/hadoop-bigdata/docker-compose-kafka.git
cd docker-compose-kafka
docker-compose -f docker-compose.yaml up -d
如果僅僅只是為測試也可以部署一個單機kafka官方下載地址:http://kafka.apache.org/downloads
### 1、下載kafka
wget https://downloads.apache.org/kafka/3.4.1/kafka_2.12-3.4.1.tgz --no-check-certificate
### 2、解壓
tar -xf kafka_2.12-3.4.1.tgz
### 3、配置環(huán)境變量
# ~/.bashrc添加如下內(nèi)容:
export PATH=$PATH:/opt/docker-compose-kafka/images/kafka_2.12-3.4.1/bin
### 4、配置zookeeper 新版Kafka已內(nèi)置了ZooKeeper,如果沒有其它大數(shù)據(jù)組件需要使用ZooKeeper的話,直接用內(nèi)置的會更方便維護。
# vi kafka_2.12-3.4.1/config/zookeeper.properties
#注釋掉
#maxClientCnxns=0
#設置連接參數(shù),添加如下配置
#為zk的基本時間單元,毫秒
tickTime=2000
#Leader-Follower初始通信時限 tickTime*10
initLimit=10
#Leader-Follower同步通信時限 tickTime*5
syncLimit=5
#設置broker Id的服務地址
#hadoop-node1對應于前面在hosts里面配置的主機映射,0是broker.id, 2888是數(shù)據(jù)同步和消息傳遞端口,3888是選舉端口
server.0=local-168-182-110:2888:3888
### 5、配置kafka
# vi kafka_2.12-3.4.1/config/server.properties
#添加以下內(nèi)容:
broker.id=0
listeners=PLAINTEXT://local-168-182-110:9092
# 上面容器的zookeeper
zookeeper.cnotallow=local-168-182-110:2181
# topic不存在的,kafka就會創(chuàng)建該topic。
#auto.create.topics.enable=true
### 6、啟動服務
./bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
./bin/kafka-server-start.sh -daemon config/server.properties
### 7、測試驗證
#創(chuàng)建topic
kafka-topics.sh --bootstrap-server local-168-182-110:9092 --create --topic topic1 --partitions 8 --replication-factor 1
#列出所有topic
kafka-topics.sh --bootstrap-server local-168-182-110:9092 --list
#列出所有topic的信息
kafka-topics.sh --bootstrap-server local-168-182-110:9092 --describe
#列出指定topic的信息
kafka-topics.sh --bootstrap-server local-168-182-110:9092 --describe --topic topic1
#生產(chǎn)者(消息發(fā)送程序)
kafka-console-producer.sh --broker-list local-168-182-110:9092 --topic topic1
#消費者(消息接收程序)
kafka-console-consumer.sh --bootstrap-server local-168-182-110:9092 --topic topic1
三、Filebeat 安裝
1)下載 Filebeat
官網(wǎng)地址:https://www.elastic.co/cn/downloads/past-releases#filebeat
curl -L -O https://artifacts.elastic.co/downloads/beats/filebeat/filebeat-7.6.2-linux-x86_64.tar.gz
tar -xf filebeat-7.6.2-linux-x86_64.tar.gz
2)Filebeat 配置參數(shù)講解
Filebeat的配置文件通常是YAML格式,包含各種配置參數(shù),用于定義數(shù)據(jù)源、輸出目標、數(shù)據(jù)處理和其他選項。以下是一些常見的Filebeat配置參數(shù)及其含義:
- filebeat.inputs:指定要監(jiān)視的數(shù)據(jù)源??梢耘渲枚鄠€輸入,每個輸入定義一個數(shù)據(jù)源。每個輸入包括以下參數(shù):
type:數(shù)據(jù)源的類型,例如日志文件、系統(tǒng)日志、Docker日志等。
paths:要監(jiān)視的文件路徑或者使用通配符指定多個文件。
enabled:是否啟用該輸入。
示例:
filebeat.inputs:
- type: log
paths:
- /var/log/*.log
- type: docker
enabled: true
- filebeat.modules:定義要加載的模塊,每個模塊用于解析特定類型的日志或事件數(shù)據(jù)。每個模塊包括以下參數(shù):
module:模塊名稱。
enabled:是否啟用模塊。
var:自定義模塊變量。
示例:
filebeat.modules:
- module: apache
access:
enabled: true
error:
enabled: true
- output.elasticsearch:指定將數(shù)據(jù)發(fā)送到Elasticsearch的配置參數(shù),包括Elasticsearch主機、索引名稱等。
hosts:Elasticsearch主機列表。
index:索引名稱模板。
username和password:用于身份驗證的用戶名和密碼。
pipeline:用于數(shù)據(jù)預處理的Ingest節(jié)點管道。
示例:
output.elasticsearch:
hosts: ["localhost:9200"]
index: "filebeat-%{[agent.version]}-%{+yyyy.MM.dd}"
username: "your_username"
password: "your_password"
- output.logstash:指定將數(shù)據(jù)發(fā)送到Logstash的配置參數(shù),包括Logstash主機和端口等。
hosts:Logstash主機列表。
index:索引名稱模板。
ssl:是否使用SSL/TLS加密傳輸數(shù)據(jù)。
示例:
output.logstash:
hosts: ["localhost:5044"]
index: "filebeat-%{[agent.version]}-%{+yyyy.MM.dd}"
ssl.enabled: true
- processors:定義對數(shù)據(jù)的預處理步驟,包括字段分割、重命名、添加字段等。
add_fields:添加字段到事件數(shù)據(jù)。
decode_json_fields:解碼JSON格式的字段。
drop_fields:刪除指定字段。
rename:重命名字段。
示例:
processors:
- add_fields:
target: "my_field"
value: "my_value"
- drop_fields:
fields: ["field1", "field2"]
- filebeat.registry.path:指定Filebeat用于跟蹤已經(jīng)讀取的文件和位置信息的注冊文件的路徑。
- filebeat.autodiscover:自動發(fā)現(xiàn)數(shù)據(jù)源,特別是用于容器化環(huán)境,配置自動檢測新容器的策略。
- logging.level:指定Filebeat的日志級別,可選項包括info、debug、warning等。
這些是 Filebeat 的一些常見配置參數(shù),具體的配置取決于您的使用場景和需求。您可以根據(jù)需要自定義配置文件,以滿足您的數(shù)據(jù)采集和處理需求。詳細的配置文檔可以在Filebeat官方文檔中找到。
3)filebeat.prospectors 推送kafka完整配置
這里主要用到幾個核心字段:filebeat.prospectors、processors、output.kafka
1、filebeat.prospectors
filebeat.prospectors:用于定義要監(jiān)視的數(shù)據(jù)源和采集規(guī)則。每個 prospector 包含一個或多個輸入規(guī)則,它們指定要監(jiān)視的文件或數(shù)據(jù)源以及如何采集和解析數(shù)據(jù)。
以下是一個示例 filebeat.prospectors 部分的配置:
filebeat.prospectors:
- type: log
enabled: true
paths:
- /var/log/*.log
exclude_files:
- "*.gz"
multiline.pattern: '^\['
multiline.negate: false
multiline.match: after
tags: ["tag1", "tag2"]
tail_files: true
fields:
app: myapp
env: production
在上述示例中,我們定義了一個 filebeat.prospectors 包含一個 type: log 的 prospector,下面是各個字段的解釋:
- type(必需):數(shù)據(jù)源的類型。在示例中,類型是 log,表示監(jiān)視普通文本日志文件。Filebeat支持多種類型,如 log、stdin、tcp、udp 等。
- enabled:是否啟用此 prospector。如果設置為 true,則啟用,否則禁用。默認為 true。
- paths(必需):要監(jiān)視的文件或文件模式,可以使用通配符指定多個文件。在示例中,F(xiàn)ilebeat將監(jiān)視 /var/log/ 目錄下的所有以 .log 結尾的文件。
- exclude_files:要排除的文件或文件模式列表。這里排除了所有以 .gz 結尾的文件??蛇x字段。
- multiline.pattern:多行日志的起始模式。如果您的日志事件跨越多行,此選項可用于合并多行日志事件。例如,設置為 'pattern' 將根據(jù)以 'pattern' 開頭的行來合并事件。
- multiline.negate:是否取反多行日志模式。如果設置為 true,則表示匹配不包含多行日志模式的行??蛇x字段,默認為 false。
- multiline.match:多行匹配模式,可以是 before(與上一行合并)或 after(與下一行合并)。如果設置為 before,則當前行與上一行合并為一個事件;如果設置為 after,則當前行與下一行合并為一個事件??蛇x字段,默認為 after。
- tags:為采集的事件添加標簽,以便后續(xù)的數(shù)據(jù)處理。標簽是一個字符串數(shù)組,可以包含多個標簽。在示例中,事件將被標記為 "tag1" 和 "tag2"。可選字段。
- tail_files:用于控制Filebeat是否應該跟蹤正在寫入的文件(tail文件)。當 tail_files 設置為 true 時,F(xiàn)ilebeat將監(jiān)視正在被寫入的文件,即使它們還沒有完成。這對于實時監(jiān)視日志文件非常有用,因為它允許Filebeat立即處理新的日志行。默認情況下,tail_files 是啟用的,因此只有在特殊情況下才需要顯式設置為 false。
- fields:為事件添加自定義字段。這是一個鍵值對,允許您添加額外的信息到事件中。在示例中,事件將包含 "app" 字段和 "env" 字段,分別設置為 "myapp" 和 "production"。可選字段。
這些字段允許您配置Filebeat以滿足特定的數(shù)據(jù)源和采集需求。您可以根據(jù)需要定義多個 prospector 來監(jiān)視不同類型的數(shù)據(jù)源,每個 prospector 可以包含不同的參數(shù)。通過靈活配置 filebeat.prospectors,F(xiàn)ilebeat可以適應各種日志和數(shù)據(jù)采集場景。
2、processors
processors 是Filebeat配置中的一個部分,用于定義在事件傳輸?shù)捷敵瞿繕酥皩κ录?shù)據(jù)進行預處理的操作。您可以使用 processors 來修改事件數(shù)據(jù)、添加字段、刪除字段,以及執(zhí)行其他自定義操作。以下是一些常見的 processors 配置示例和說明:
- 添加字段(Add Fields):可以使用 add_fields 處理器將自定義字段添加到事件中,以豐富事件的信息。例如,將應用程序名稱和環(huán)境添加到事件中:
processors:
- add_fields:
fields:
app: myapp
env: production
- 刪除字段(Drop Fields):使用 drop_fields 處理器可以刪除事件中的指定字段。以下示例刪除名為 "sensitive_data" 的字段:
processors:
- drop_fields:
fields: ["sensitive_data"]
- 解碼 JSON 字段(Decode JSON Fields):如果事件中包含JSON格式的字段,您可以使用 decode_json_fields 處理器將其解碼為結構化數(shù)據(jù)。以下示例將名為 "json_data" 的字段解碼為結構化數(shù)據(jù):
processors:
- decode_json_fields:
fields: ["json_data"]
target: ""
- 字段重命名(Rename Fields):可以使用 rename 處理器重命名事件中的字段。例如,將 "old_field" 重命名為 "new_field":
processors:
- rename:
fields:
- from: old_field
to: new_field
- 條件處理(Conditional Processing):使用 if 條件可以根據(jù)事件的特定字段或?qū)傩詠磉x擇是否應用某個處理器。以下示例根據(jù)事件中的 "log_level" 字段,僅在 "error" 日志級別時添加 "error" 標簽:
processors:
- add_tags:
tags: ["error"]
when:
equals:
log_level: "error"
- 多個處理器(Multiple Processors):您可以配置多個處理器,它們將按照順序依次應用于事件數(shù)據(jù)。例如,您可以先添加字段,然后刪除字段,最后重命名字段。
processors 部分允許您對事件數(shù)據(jù)進行復雜的處理和轉(zhuǎn)換,以適應特定的需求。您可以根據(jù)需要組合不同的處理器來執(zhí)行多個操作,以確保事件數(shù)據(jù)在傳輸?shù)捷敵瞿繕酥皾M足您的要求。
3、output.kafka
output.kafka 是Filebeat配置文件中的一個部分,用于配置將事件數(shù)據(jù)發(fā)送到Kafka消息隊列的相關設置。以下是 output.kafka 部分的常見參數(shù)及其解釋:
output.kafka:
hosts: ["kafka-broker1:9092", "kafka-broker2:9092"]
topic: "my-log-topic"
partition.round_robin:
reachable_only: false
required_acks: 1
compression: gzip
max_message_bytes: 1000000
以下是各個參數(shù)的詳細解釋:
- hosts(必需):Kafka broker 的地址和端口列表。在示例中,我們指定了兩個Kafka broker:kafka-broker1:9092 和 kafka-broker2:9092。Filebeat將使用這些地址來連接到Kafka集群。
- topic(必需):要發(fā)送事件到的Kafka主題(topic)的名稱。在示例中,主題名稱為 "my-log-topic"。Filebeat將會將事件發(fā)送到這個主題。
- partition.round_robin:事件分區(qū)策略的配置。這里的配置是將事件平均分布到所有分區(qū),不僅僅是可達的分區(qū)。reachable_only 設置為 false,表示即使分區(qū)不可達也會發(fā)送數(shù)據(jù)。如果設置為 true,則只會發(fā)送到可達的分區(qū)。
- required_acks:Kafka的確認機制。指定要等待的確認數(shù),1 表示只需要得到一個分區(qū)的確認就認為消息已經(jīng)成功發(fā)送。更高的值表示更多的確認。通常,1 是常見的設置,因為它具有較低的延遲。
- compression:數(shù)據(jù)的壓縮方式。在示例中,數(shù)據(jù)被gzip壓縮。這有助于減小傳輸數(shù)據(jù)的大小,降低網(wǎng)絡帶寬的使用。
- max_message_bytes:Kafka消息的最大字節(jié)數(shù)。如果事件的大小超過此限制,F(xiàn)ilebeat會將事件拆分為多個消息。
以上是常見的 output.kafka 參數(shù),您可以根據(jù)您的Kafka集群配置和需求來調(diào)整這些參數(shù)。確保配置正確的Kafka主題和分區(qū)策略以滿足您的數(shù)據(jù)傳輸需求。同時,要確保Filebeat服務器可以連接到指定的Kafka broker地址。
以下是一個完整的Filebeat配置文件示例,其中包括了 filebeat.prospectors、processors 和 output.kafka 的配置部分,以用于從日志文件采集數(shù)據(jù)并將其發(fā)送到Kafka消息隊列:
4)filebeat.inputs 與 filebeat.prospectors區(qū)別
Filebeat 從 7.x 版本開始引入了新的配置方式 filebeat.inputs,以提供更靈活的輸入配置選項,同時保留了向后兼容性。以下是 filebeat.inputs 和 filebeat.prospectors 之間的主要區(qū)別:
- filebeat.inputs:
filebeat.inputs 是較新版本的配置方式,用于定義輸入配置。
允許您以更靈活的方式配置不同類型的輸入。您可以在配置文件中定義多個獨立的輸入塊,每個塊用于配置不同類型的輸入。
每個輸入塊可以包含多個字段,用于定制不同輸入類型的配置,如 type、enabled、paths、multiline 等。
使配置更具可讀性,因為每個輸入類型都有自己的配置塊。
示例:
filebeat.inputs:
- type: log
enabled: true
paths:
- /var/log/app/*.log
- type: syslog
enabled: true
port: 514
protocol.udp: true
- filebeat.prospectors:
filebeat.prospectors 是舊版配置方式,用于定義輸入配置。
所有的輸入類型(如日志文件、系統(tǒng)日志、stdin 等)都需要放在同一個部分中。
需要在同一個配置塊中定義不同輸入類型的路徑等細節(jié)。
舊版配置方式,不如 filebeat.inputs 配置方式那么靈活和可讀性好。
以下是一些常見的 type 值以及它們的含義:
- log(常用):用于監(jiān)視和收集文本日志文件,例如應用程序日志。
- type: log
paths:
- /var/log/*.log
- stdin:用于從標準輸入(stdin)收集數(shù)據(jù)。
- type: stdin
- syslog:用于收集系統(tǒng)日志數(shù)據(jù),通常是通過UDP或TCP協(xié)議從遠程或本地 syslog 服務器接收。
- type: syslog
port: 514
protocol.udp: true
- filestream:用于收集 Windows 上的文件日志數(shù)據(jù)。
- type: filestream
enabled: true
- httpjson:用于通過 HTTP 請求從 JSON API 收集數(shù)據(jù)。
- type: httpjson
enabled: true
urls:
- http://example.com/api/data
- tcp 和 udp:用于通過 TCP 或 UDP 協(xié)議收集網(wǎng)絡數(shù)據(jù)。
- type: tcp
enabled: true
host: "localhost"
port: 12345
- type: udp
enabled: true
host: "localhost"
port: 12345
總的來說,filebeat.inputs 提供了更靈活的方式來配置不同類型的輸入,更容易組織和管理配置。如果您使用的是較新版本的 Filebeat,推薦使用 filebeat.inputs 配置方式。但對于向后兼容性,舊版的 filebeat.prospectors 仍然可以使用。
5)filebeat.yml 配置
filebeat.yml
filebeat.inputs:
- type: log
enabled: true
paths:
- /var/log/*.log
multiline.pattern: '^\['
multiline.negate: false
multiline.match: after
tail_files: true
fields:
app: myapp
env: production
topicname: my-log-topic
- type: log
enabled: true
paths:
- /var/log/messages
multiline.pattern: '^\['
multiline.negate: false
multiline.match: after
tail_files: true
fields:
app: myapp
env: production
topicname: my-log-topic
processors:
- add_fields:
fields:
app: myapp
env: production
- drop_fields:
fields: ["sensitive_data"]
output.kafka:
hosts: ["local-168-182-110:9092"]
#topic: "my-log-topic"
# 這里也可以應用上面filebeat.prospectors.fields的值
topic: '%{[fields][topicname]}'
partition.round_robin:
reachable_only: false
required_acks: 1
compression: gzip
max_message_bytes: 1000000
6)啟動 Filebeat 服務
nohup ./filebeat -e -c filebeat.yml >/dev/null 2>&1 &
# -e 將啟動信息輸出到屏幕上
# filebeat本身運行的日志默認位置${install_path}/logs/filebeat
要修改filebeat的日子路徑,可以添加一下內(nèi)容在filebeat.yml配置文件:
#logging.level :debug 日志級別
path.logs: /var/log/
使用 systemctl 啟動 filebeat
# vi /usr/lib/systemd/system/filebeat.service
[Unit]
Descriptinotallow=filebeat server daemon
Documentatinotallow=/opt/filebeat-7.6.2-linux-x86_64/filebeat -help
Wants=network-online.target
After=network-online.target
[Service]
User=root
Group=root
Envirnotallow="BEAT_CONFIG_OPTS=-c /opt/filebeat-7.6.2-linux-x86_64/filebeat.yml"
ExecStart=/opt/filebeat-7.6.2-linux-x86_64/filebeat $BEAT_CONFIG_OPTS
Restart=always
[Install]
WantedBy=multi-user.target
【溫馨提示】記得更換自己的 filebeat 目錄。
systemctl 啟動 filebeat 服務
#刷新一下配置文件
systemctl daemon-reload
# 啟動
systemctl start filebeat
# 查看狀態(tài)
systemctl status filebeat
# 查看進程
ps -ef|grep filebeat
# 查看日志
vi logs/filebeat
7)檢測日志是否已經(jīng)采集到 kafka
# 設置環(huán)境變量
export KAFKA_HOME=/opt/docker-compose-kafka/images/kafka_2.12-3.4.1
# 查看topic列表
${KAFKA_HOME}/bin/kafka-topics.sh --bootstrap-server local-168-182-110:9092 --list
# 查看topic列表詳情
${KAFKA_HOME}/bin/kafka-topics.sh --bootstrap-server local-168-182-110:9092 --describe
# 指定topic
${KAFKA_HOME}/bin/kafka-topics.sh --bootstrap-server local-168-182-110:9092 --describe --topic my-log-topic
# 查看kafka數(shù)據(jù)
${KAFKA_HOME}/bin/kafka-console-consumer.sh --topic my-log-topic --bootstrap-server local-168-182-110:9092
#上述命令會連接到指定的Kafka集群并打印my_topic主題上的所有消息。如果要查看特定數(shù)量的最新消息,則應將“--from-beginning”添加到命令中。
# 在較高版本的 Kafka 中(例如 Kafka 2.4.x 和更高版本),消費者默認需要明確指定要消費的分區(qū)。
#以下是查看特定最新消息數(shù)量的示例:
${KAFKA_HOME}/bin/kafka-console-consumer.sh --topic my-log-topic --bootstrap-server local-168-182-110:9092 --from-beginning --max-messages 10 --partition 0
# 查看kafka數(shù)據(jù)量,在較高版本的 Kafka 中(例如 Kafka 2.4.x 和更高版本),消費者默認需要明確指定要消費的分區(qū)。
${KAFKA_HOME}/bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list local-168-182-110:9092 --topic my-log-topic --time -1
# 消費數(shù)據(jù)查看數(shù)據(jù),這里指定一個分區(qū)
${KAFKA_HOME}/bin/kafka-console-consumer.sh --bootstrap-server local-168-182-110:9092 --topic my-log-topic --partition 0 --offset 100
# 也可以通過消費組消費,可以不指定分區(qū)
${KAFKA_HOME}/bin/kafka-console-consumer.sh --topic my-log-topic --bootstrap-server local-168-182-110:9092 --from-beginning --group my-group
這將返回主題 <topic_name> 的分區(qū)和偏移量信息,您可以根據(jù)這些信息計算出數(shù)據(jù)量。