如何對(duì) Elasticsearch、Filebeat、Logstash、Kibana 深度巡檢?
在運(yùn)維和開發(fā)中,確保 Elasticsearch、Filebeat、Logstash、Kibana(簡(jiǎn)稱 EFLK)集群的穩(wěn)定運(yùn)行至關(guān)重要。
本文將詳細(xì)介紹一套深度巡檢方案,包括各組件的監(jiān)控方法、健康狀態(tài)檢查、性能指標(biāo)監(jiān)控,以及一些關(guān)鍵的 DSL 查詢示例,幫助大家全面掌握集群狀態(tài),及時(shí)發(fā)現(xiàn)潛在問題,優(yōu)化 EFLK 的運(yùn)行。
一、Elasticsearch 深度巡檢方案
1. 集群健康檢查
使用 _cluster/health API 獲取集群的整體健康狀況:
GET _cluster/health
返回的關(guān)鍵字段:
圖片
- status:集群狀態(tài)(green、yellow、red)
- number_of_nodes:節(jié)點(diǎn)數(shù)量
- active_primary_shards:活躍的主分片數(shù)
- active_shards:活躍的總分片數(shù)
- unassigned_shards:未分配的分片數(shù)量
2. 節(jié)點(diǎn)性能監(jiān)控
使用 _nodes/stats API 獲取各節(jié)點(diǎn)的性能指標(biāo):
GET _nodes/stats
重點(diǎn)關(guān)注:
圖片
- indices.docs.count:文檔數(shù)量
- indices.store.size_in_bytes:索引大小
- jvm.mem.heap_used_percent:JVM 堆內(nèi)存使用率
- os.cpu.percent:CPU 使用率
- fs.total.available_in_bytes:磁盤可用空間
3. 分片狀態(tài)監(jiān)控
使用 _cat/shards API 查看所有分片的詳細(xì)信息:
GET _cat/shards?v&h=index,shard,prirep,state,unassigned.reason
重點(diǎn)檢查 unassigned.reason 字段,確保沒有未分配的分片。如果發(fā)現(xiàn)未分配的分片,可以使用以下命令重新分配:
圖片
POST /_cluster/reroute
{
"commands": [
{
"allocate_stale_primary": {
"index": "your-index",
"shard": 0,
"node": "node-name",
"accept_data_loss": true
}
}
]
}
4. 索引狀態(tài)巡檢
定期檢查索引的健康狀態(tài),特別是大型或活躍的索引:
GET _cat/indices?v&h=index,health,status,pri,rep,docs.count,store.size
圖片
查看以下字段:
- health:索引健康狀態(tài)
- status:索引狀態(tài)
- pri:主分片數(shù)量
- rep:副本分片數(shù)量
- docs.count:文檔數(shù)量
- store.size:索引大小
5. 集群性能分析(使用 Profile 查詢)
為了深入分析查詢性能,可以在查詢時(shí)使用 profile 參數(shù),返回每個(gè)查詢階段的執(zhí)行時(shí)間,幫助定位性能瓶頸:
GET /your-index/_search?pretty
{
"profile": true,
"query": {
"match": {
"field": "value"
}
}
}
返回結(jié)果中包含每個(gè)查詢的執(zhí)行時(shí)間,識(shí)別最耗時(shí)的部分(如分片級(jí)別操作)。
kibana 的 Search Profile 工具要用好!
圖片
二、Filebeat 巡檢方案
1. Filebeat 配置檢查
確保 Filebeat 正確安裝并運(yùn)行:
systemctl status filebeat
檢查配置文件 /etc/filebeat/filebeat.yml,確保以下關(guān)鍵部分:
- 輸入源(如日志文件、系統(tǒng)日志)配置正確
- 輸出目標(biāo)(Elasticsearch 或 Logstash)配置正確
2. Filebeat 日志檢查
查看 Filebeat 日志文件(通常位于 /var/log/filebeat/filebeat)以確保沒有錯(cuò)誤信息:
tail -f /var/log/filebeat/filebeat
常見錯(cuò)誤:
- 無法連接到 Elasticsearch 或 Logstash
- 由于權(quán)限問題無法讀取日志文件
3. Filebeat 配置測(cè)試
測(cè)試 Filebeat 配置文件的正確性:
filebeat test config
使用 -e 參數(shù)啟動(dòng) Filebeat,輸出調(diào)試日志:
filebeat -e
三、Logstash 巡檢方案
1. Logstash 進(jìn)程檢查
檢查 Logstash 是否正常運(yùn)行:
systemctl status logstash
2. Logstash 管道配置檢查
檢查 Logstash 的管道配置,通常位于 /etc/logstash/conf.d/ 下:
cat /etc/logstash/conf.d/*.conf
確保以下部分配置正確:
- 輸入(如 Filebeat、Kafka)
- 過濾(如 Grok 解析、日期處理)
- 輸出(如 Elasticsearch、文件)
3. Logstash 日志檢查
查看 Logstash 的日志文件(通常位于 /var/log/logstash/):
tail -f /var/log/logstash/logstash-plain.log
重點(diǎn)檢查:
- 連接失?。ㄈ鐭o法連接到 Elasticsearch)
- 解析錯(cuò)誤(如 Grok 模式不匹配)
四、Kibana 巡檢方案
1. Kibana 進(jìn)程狀態(tài)檢查
檢查 Kibana 服務(wù)是否正常運(yùn)行:
systemctl status kibana
2. Kibana 配置檢查
檢查 Kibana 配置文件(通常位于 /config/kibana.yml):
cat /config/kibana.yml
確保以下配置正確:
- elasticsearch.hosts: ["http://localhost:9200"](或集群地址)
- server.host: 0.0.0.0(確保 Kibana 可被外部訪問)
3. Kibana 日志檢查
查看 Kibana 的日志文件(通常位于 /logs/kibana.log):
tail -f logs/kibana.log
重點(diǎn)檢查:
圖片
- 無法連接 Elasticsearch
- Kibana 啟動(dòng)失敗
4. Kibana UI 巡檢
登錄 Kibana 界面,檢查以下功能是否正常:
- Discover:能否正常搜索和查看日志數(shù)據(jù)
- Dashboards:儀表板是否正常顯示
- Visualizations:可視化圖表是否加載正常
五、DSL 查詢示例
為了更深入的巡檢,可以使用一些常見的 DSL 查詢來檢查系統(tǒng)的日志和性能。
1. 查詢慢查詢?nèi)罩?/h3>
查找慢查詢,定位性能瓶頸:
GET /_search
{
"query": {
"range": {
"@timestamp": {
"gte": "now-1d/d",
"lt": "now/d"
}
}
},
"sort": [
{
"took": {
"order": "desc"
}
}
],
"size": 10
}
Elasticsearch高級(jí)調(diào)優(yōu)方法論之——根治慢查詢!為什么Elasticsearch查詢變得這么慢了?
2. 查詢錯(cuò)誤日志
如果懷疑系統(tǒng)有錯(cuò)誤,可以查詢錯(cuò)誤日志:
Elasticsearch 日志能否把全部請(qǐng)求打印出來?
GET /filebeat-*/_search
{
"query": {
"match": {
"message": "error"
}
}
}
3. 查詢特定節(jié)點(diǎn)性能問題
根據(jù)節(jié)點(diǎn)的 ID 或名稱查詢?cè)摴?jié)點(diǎn)上的所有日志:
GET /_search
{
"query": {
"term": {
"host.name": {
"value": "node-1"
}
}
}
}
六、企業(yè)級(jí)參考:自動(dòng)化監(jiān)控 Elasticsearch 集群
為了持續(xù)監(jiān)控 Elasticsearch 集群的健康狀態(tài)、CPU、內(nèi)存、負(fù)載、磁盤使用率等指標(biāo),可以使用 Python 腳本結(jié)合定時(shí)任務(wù)實(shí)現(xiàn)自動(dòng)化采集。我早期項(xiàng)目是借助 shell 腳本實(shí)現(xiàn),原理一致
1. Elasticsearch 指標(biāo)采集 Python 腳本
1.1 準(zhǔn)備工作
確保安裝了 requests 庫(kù):
pip install requests
1.2 腳本代碼(可運(yùn)行版本)import json
from datetime import datetime
import configparser
import warnings
from elasticsearch import Elasticsearch
warnings.filterwarnings("ignore")
# 初始化 Elasticsearch 客戶端
def init_es_client(config_path='./conf/config.ini'):
"""初始化并返回Elasticsearch客戶端"""
config = configparser.ConfigParser()
config.read(config_path)
es_host = config.get('elasticsearch', 'ES_HOST')
es_user = config.get('elasticsearch', 'ES_USER')
es_password = config.get('elasticsearch', 'ES_PASSWORD')
es = Elasticsearch(
hosts=[es_host],
basic_auth=(es_user, es_password),
verify_certs=False,
ca_certs='conf/http_ca.crt'
)
return es
# 日志文件配置
LOG_FILE = 'elasticsearch_metrics.log'
# 使用初始化的客戶端
es = init_es_client()
# 獲取集群健康狀況
def get_cluster_health():
return es.cluster.health().body # 獲取字典格式的數(shù)據(jù)
# 獲取節(jié)點(diǎn)統(tǒng)計(jì)信息
def get_node_stats():
return es.nodes.stats().body # 獲取字典格式的數(shù)據(jù)
# 獲取集群指標(biāo)
def get_cluster_metrics():
metrics = {}
cluster_health = get_cluster_health()
metrics['cluster_health'] = cluster_health
node_stats = get_node_stats()
nodes = node_stats.get('nodes', {})
metrics['nodes'] = {}
for node_id, node_info in nodes.items():
node_name = node_info.get('name')
metrics['nodes'][node_name] = {
'cpu_usage': node_info['os']['cpu']['percent'],
'load_average': node_info['os']['cpu'].get('load_average', {}).get('1m'),
'memory_used': node_info['os']['mem']['used_percent'],
'heap_used': node_info['jvm']['mem']['heap_used_percent'],
'disk_available': node_info['fs']['total']['available_in_bytes'] / (1024 ** 3),
'disk_total': node_info['fs']['total']['total_in_bytes'] / (1024 ** 3),
'disk_usage_percent': 100 - (
node_info['fs']['total']['available_in_bytes'] * 100 / node_info['fs']['total']['total_in_bytes']
)
}
return metrics
# 記錄集群狀態(tài)到日志文件
def log_metrics():
metrics = get_cluster_metrics()
timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
with open(LOG_FILE, 'a') as f:
f.write(f"Timestamp: {timestamp}\n")
f.write(json.dumps(metrics, indent=4))
f.write('\n\n')
# 主函數(shù)
if __name__ == "__main__":
log_metrics()
print("Elasticsearch cluster metrics logged successfully.")
1.2 腳本代碼(可運(yùn)行版本)import json
from datetime import datetime
import configparser
import warnings
from elasticsearch import Elasticsearch
warnings.filterwarnings("ignore")
# 初始化 Elasticsearch 客戶端
def init_es_client(config_path='./conf/config.ini'):
"""初始化并返回Elasticsearch客戶端"""
config = configparser.ConfigParser()
config.read(config_path)
es_host = config.get('elasticsearch', 'ES_HOST')
es_user = config.get('elasticsearch', 'ES_USER')
es_password = config.get('elasticsearch', 'ES_PASSWORD')
es = Elasticsearch(
hosts=[es_host],
basic_auth=(es_user, es_password),
verify_certs=False,
ca_certs='conf/http_ca.crt'
)
return es
# 日志文件配置
LOG_FILE = 'elasticsearch_metrics.log'
# 使用初始化的客戶端
es = init_es_client()
# 獲取集群健康狀況
def get_cluster_health():
return es.cluster.health().body # 獲取字典格式的數(shù)據(jù)
# 獲取節(jié)點(diǎn)統(tǒng)計(jì)信息
def get_node_stats():
return es.nodes.stats().body # 獲取字典格式的數(shù)據(jù)
# 獲取集群指標(biāo)
def get_cluster_metrics():
metrics = {}
cluster_health = get_cluster_health()
metrics['cluster_health'] = cluster_health
node_stats = get_node_stats()
nodes = node_stats.get('nodes', {})
metrics['nodes'] = {}
for node_id, node_info in nodes.items():
node_name = node_info.get('name')
metrics['nodes'][node_name] = {
'cpu_usage': node_info['os']['cpu']['percent'],
'load_average': node_info['os']['cpu'].get('load_average', {}).get('1m'),
'memory_used': node_info['os']['mem']['used_percent'],
'heap_used': node_info['jvm']['mem']['heap_used_percent'],
'disk_available': node_info['fs']['total']['available_in_bytes'] / (1024 ** 3),
'disk_total': node_info['fs']['total']['total_in_bytes'] / (1024 ** 3),
'disk_usage_percent': 100 - (
node_info['fs']['total']['available_in_bytes'] * 100 / node_info['fs']['total']['total_in_bytes']
)
}
return metrics
# 記錄集群狀態(tài)到日志文件
def log_metrics():
metrics = get_cluster_metrics()
timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
with open(LOG_FILE, 'a') as f:
f.write(f"Timestamp: {timestamp}\n")
f.write(json.dumps(metrics, indent=4))
f.write('\n\n')
# 主函數(shù)
if __name__ == "__main__":
log_metrics()
print("Elasticsearch cluster metrics logged successfully.")
1.3 關(guān)鍵指標(biāo)說明
- cluster_health:集群健康狀態(tài)(green、yellow、red)
- cpu_usage:每個(gè)節(jié)點(diǎn)的 CPU 使用率
- load_average:系統(tǒng)負(fù)載平均值
- memory_used:每個(gè)節(jié)點(diǎn)的內(nèi)存使用率
- heap_used:JVM 堆內(nèi)存使用率
- disk_available:磁盤可用空間(GB)
- disk_total:磁盤總空間(GB)
- disk_usage_percent:磁盤使用率
日志將以 JSON 格式保存,記錄每次獲取到的集群和節(jié)點(diǎn)狀態(tài)。(如下圖所示)
圖片
2. 定時(shí)執(zhí)行任務(wù)腳本
為了每天早上6點(diǎn)自動(dòng)執(zhí)行腳本,可以使用 cron 設(shè)置定時(shí)任務(wù)。
2.1 編輯定時(shí)任務(wù)
假設(shè)腳本位于 /home/user/scripts/es_metrics.py,使用以下命令編輯 crontab:
crontab -e
添加以下行:
0 6 * * * /usr/bin/python3 /home/user/scripts/es_metrics.py >> /home/user/scripts/es_metrics_cron.log 2>&1
解釋:
- 0 6 * * *:每天早上6點(diǎn)執(zhí)行
- /usr/bin/python3:Python 3 的完整路徑,需根據(jù)實(shí)際情況調(diào)整
- >> /home/user/scripts/es_metrics_cron.log 2>&1:將輸出和錯(cuò)誤信息重定向到日志文件
七、結(jié)論
通過以上深度巡檢方案,能夠全面掌握 EFLK 各組件的健康狀態(tài)和性能指標(biāo)。定期巡檢和自動(dòng)化監(jiān)控有助于及時(shí)發(fā)現(xiàn)潛在問題,保障集群的穩(wěn)定運(yùn)行。
- Elasticsearch:重點(diǎn)關(guān)注集群健康、節(jié)點(diǎn)性能、分片狀態(tài)和索引狀況
- Filebeat、Logstash、Kibana:檢查服務(wù)狀態(tài)、配置文件和日志,確保數(shù)據(jù)采集和展示正常
持續(xù)的監(jiān)控和優(yōu)化,才能讓我們的 EFLK 集群始終保持最佳狀態(tài)。
當(dāng)然,我們推薦優(yōu)先借助監(jiān)控指標(biāo)工具 Prometheus、Zabbix、Grafana 等巡查。