自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

開源數(shù)據(jù)收集引擎 Logstash 講解和示例講解

開源
Logstash 是 Elastic Stack 中的一個(gè)重要組件,與 Elasticsearch 和 Kibana 配合使用,可以構(gòu)建強(qiáng)大的實(shí)時(shí)日志和數(shù)據(jù)分析解決方案。它為組織提供了強(qiáng)大的數(shù)據(jù)采集和處理工具,用于監(jiān)控、分析和可視化大規(guī)模數(shù)據(jù)。

一、概述

Logstash 是一個(gè)開源的數(shù)據(jù)收集和日志處理工具,它是 Elastic Stack(ELK Stack)的一部分,用于從各種數(shù)據(jù)源中采集、轉(zhuǎn)換和傳輸數(shù)據(jù),以幫助分析和可視化大規(guī)模數(shù)據(jù)。Logstash 通常與 Elasticsearch 和 Kibana 一起使用,以實(shí)現(xiàn)實(shí)時(shí)日志分析和監(jiān)控。

以下是 Logstash 的主要功能和特點(diǎn):

  • 數(shù)據(jù)采集:Logstash 可以從多種數(shù)據(jù)源中采集數(shù)據(jù),包括日志文件、數(shù)據(jù)文件、消息隊(duì)列、數(shù)據(jù)庫、網(wǎng)絡(luò)流量等。它支持多種輸入插件,以適應(yīng)不同數(shù)據(jù)源的需要。
  • 數(shù)據(jù)轉(zhuǎn)換:Logstash 具有強(qiáng)大的數(shù)據(jù)轉(zhuǎn)換功能,可以對(duì)采集的數(shù)據(jù)進(jìn)行過濾、解析、轉(zhuǎn)換和豐富操作。它使用過濾插件來對(duì)數(shù)據(jù)執(zhí)行各種操作,包括正則表達(dá)式解析、字段拆分、數(shù)據(jù)脫敏、時(shí)間戳生成等。
  • 多通道數(shù)據(jù)處理:Logstash 允許將數(shù)據(jù)流式傳輸?shù)讲煌耐ǖ?,以滿足不同的需求。通道可以是 Elasticsearch、Kafka、RabbitMQ 等,或者您可以定義自定義輸出插件。
  • 數(shù)據(jù)過濾和插件:Logstash 有豐富的插件生態(tài)系統(tǒng),包括輸入插件、過濾插件和輸出插件。這些插件可以根據(jù)特定需求來配置和擴(kuò)展,以適應(yīng)各種數(shù)據(jù)處理任務(wù)。
  • 實(shí)時(shí)數(shù)據(jù)處理:Logstash 具有實(shí)時(shí)數(shù)據(jù)處理能力,可以將數(shù)據(jù)從源頭到目的地以實(shí)時(shí)或近實(shí)時(shí)的方式傳遞。這使得它適用于日志監(jiān)控、安全分析、性能監(jiān)控等實(shí)時(shí)應(yīng)用。
  • 可伸縮性:Logstash 可以與多個(gè)Logstash 實(shí)例一起部署,以實(shí)現(xiàn)數(shù)據(jù)采集和處理的橫向擴(kuò)展。這有助于應(yīng)對(duì)大規(guī)模數(shù)據(jù)需求。
  • 易于配置:Logstash 使用簡單的配置文件(通常是YAML格式)來定義數(shù)據(jù)流的處理過程。配置文件非常直觀,易于理解和維護(hù)。
  • 社區(qū)和支持:Logstash 是一個(gè)廣泛采用的開源項(xiàng)目,擁有活躍的社區(qū)支持和大量的文檔資源。

Logstash 是 Elastic Stack 中的一個(gè)重要組件,與 Elasticsearch 和 Kibana 配合使用,可以構(gòu)建強(qiáng)大的實(shí)時(shí)日志和數(shù)據(jù)分析解決方案。它為組織提供了強(qiáng)大的數(shù)據(jù)采集和處理工具,用于監(jiān)控、分析和可視化大規(guī)模數(shù)據(jù)。

官方文檔:

二、Logstash 架構(gòu)

圖片圖片

Logstash 包含3個(gè)主要部分: 輸入(inputs),過濾器(filters)和輸出(outputs)

Logstash的事件(logstash將數(shù)據(jù)流中等每一條數(shù)據(jù)稱之為一個(gè)event)處理流水線有三個(gè)主要角色完成:inputs –> filters –> outputs。

  • inpust:必須,負(fù)責(zé)產(chǎn)生事件(Inputs generate events),常用:File、syslog、redis、kakfa、beats(如:Filebeats);官方文檔:https://www.elastic.co/guide/en/logstash/7.17/input-plugins.html
  • filters:可選,負(fù)責(zé)數(shù)據(jù)處理與轉(zhuǎn)換(filters modify them),常用:grok、mutate、drop、clone、geoip;官網(wǎng)文檔:https://www.elastic.co/guide/en/logstash/7.17/filter-plugins.html
  • outpus:必須,負(fù)責(zé)數(shù)據(jù)輸出(outputs ship them elsewhere),常用:elasticsearch、file、graphite、kakfa、statsd;官方文檔:https://www.elastic.co/guide/en/logstash/7.17/output-plugins.html

二、ElasticSearch 部署

這里可以選擇以下部署方式:

  • 通過docker-compose部署:通過 docker-compose 快速部署 Elasticsearch 和 Kibana 保姆級(jí)教程
  • on k8s 部署:ElasticSearch+Kibana on K8s 講解與實(shí)戰(zhàn)操作(版本7.17.3)

這里我選擇 docker-compose 部署方式。

1)部署 docker

# 安裝yum-config-manager配置工具
yum -y install yum-utils

# 建議使用阿里云yum源:(推薦)
#yum-config-manager --add-repo https://download.docker.com/linux/centos/docker-ce.repo
yum-config-manager --add-repo http://mirrors.aliyun.com/docker-ce/linux/centos/docker-ce.repo

# 安裝docker-ce版本
yum install -y docker-ce
# 啟動(dòng)并開機(jī)啟動(dòng)
systemctl enable --now docker
docker --version

2)部署 docker-compose

curl -SL https://github.com/docker/compose/releases/download/v2.16.0/docker-compose-linux-x86_64 -o /usr/local/bin/docker-compose

chmod +x /usr/local/bin/docker-compose
docker-compose --version

3)創(chuàng)建網(wǎng)絡(luò)

# 創(chuàng)建
docker network create bigdata

# 查看
docker network ls

4)修改 Linux 句柄數(shù)和最大線程數(shù)

#查看當(dāng)前最大句柄數(shù)
sysctl -a | grep vm.max_map_count
#修改句柄數(shù)
vi /etc/sysctl.conf
vm.max_map_count=262144

#臨時(shí)生效,修改后需要重啟才能生效,不想重啟可以設(shè)置臨時(shí)生效
sysctl -w vm.max_map_count=262144

#修改后需要重新登錄生效
vi /etc/security/limits.conf

# 添加以下內(nèi)容
* soft nofile 65535
* hard nofile 65535
* soft nproc 4096
* hard nproc 4096

# 重啟服務(wù),-h 立刻重啟,默認(rèn)間隔一段時(shí)間才會(huì)開始重啟
reboot -h now

5)下載部署包開始部署

# 這里選擇 docker-compose 部署方式
git clone https://gitee.com/hadoop-bigdata/docker-compose-es-kibana.git

cd docker-compose-es-kibana

chmod -R 777 es kibana

docker-compose -f docker-compose.yaml up -d

docker-compose ps

三、Logstash 部署與配置講解

1)下載Logstash安裝包

訪問官方網(wǎng)站 https://www.elastic.co/downloads/logstash ,下載相應(yīng)版本的zip文件。

wget https://artifacts.elastic.co/downloads/logstash/logstash-8.11.1-linux-x86_64.tar.gz

2)解壓安裝包文件

tar -xf logstash-8.11.1-linux-x86_64.tar.gz

3)不同場(chǎng)景測(cè)試

1)測(cè)試1:采用標(biāo)準(zhǔn)的輸入和輸出

cd logstash-8.11.1
# 測(cè)試,采用標(biāo)準(zhǔn)的輸入和輸出,#codec=>rubydebug,解析轉(zhuǎn)換類型:ruby
# codec各類型講解:https://www.elastic.co/guide/en/logstash/7.9/codec-plugins.html
./bin/logstash -e 'input{stdin{}} output{stdout{codec=>rubydebug}}'

# 輸入:
hello
# 輸出:
{
         "event" => {
        "original" => "hello"
    },
          "host" => {
        "hostname" => "local-168-182-110"
    },
      "@version" => "1",
    "@timestamp" => 2023-11-19T02:31:02.485073839Z,
       "message" => "hello"
}

圖片圖片

2)測(cè)試2:使用配置文件 +標(biāo)準(zhǔn)輸入輸出

配置文件:config/logstash-1.conf

input { 
	stdin { }
}
 
output {
   stdout { codec => rubydebug }
}

啟動(dòng)服務(wù)

./bin/logstash -f ./config/logstash-1.conf

3)測(cè)試3:配置文件+file輸入 +標(biāo)準(zhǔn)的屏幕輸出

配置文件:./config/logstash-2.conf

input {
  file {
    path => "/var/log/messages"
  }
}
output {
  stdout {
    codec=>rubydebug
  }
}

啟動(dòng)服務(wù)

./bin/logstash -f ./config/logstash-2.conf

圖片圖片

4)測(cè)試4:配置文件+文件輸入+kafka輸出

kafka 部署,可以參考我以下幾篇文章:

  • 【云原生】zookeeper + kafka on k8s 環(huán)境部署
  • 【中間件】通過 docker-compose 快速部署 Kafka 保姆級(jí)教程

配置文件:./config/logstash-3.conf

input {
  file {
    path => "/var/log/messages"
 
  }
}
output {
  kafka {
    bootstrap_servers => "192.168.182.110:9092"
    topic_id => "messages"
  }
}

啟動(dòng)服務(wù)

./bin/logstash -f ./config/logstash-3.conf

消費(fèi) kafka 數(shù)據(jù)

docker exec -it kafka-node1 bash

./bin/kafka-console-consumer.sh  --bootstrap-server localhost:9092 --topic messages  --from-beginning

圖片圖片

5)測(cè)試5:配置文件+filebeat端口輸入+標(biāo)準(zhǔn)輸出

filebeat 部署,可以參考我以下幾篇文章:

  • 輕量級(jí)的日志采集組件 Filebeat 講解與實(shí)戰(zhàn)操作
  • Filebeat on k8s 日志采集實(shí)戰(zhàn)操作

服務(wù)器產(chǎn)生日志(filebeat)---》logstash服務(wù)器

配置文件:./config/logstash-4.conf

input {
  beats {
    port => 5044
 
  }
}
output {
  stdout {
    codec => rubydebug
  }
}

啟動(dòng)服務(wù)

./bin/logstash -f ./config/logstash-4.conf

啟動(dòng)后會(huì)在本機(jī)啟動(dòng)一個(gè)5044端口,不要和系統(tǒng)已啟動(dòng)的端口沖突即可,配合測(cè)試我們?cè)?nbsp;filebeat 服務(wù)器上修改配置文件。

filebeat 配置文件內(nèi)容:filebeat.yml

filebeat.inputs:
- type: log
  enabled: true
  paths:
    - /var/log/messages
# ------------------------------ Logstash Output -------------------------------
output.logstash:
  hosts: ["192.168.182.110:5044"]

啟動(dòng) filebeat

./filebeat -e -c filebeat.yml

6)測(cè)試6:配置文件+filebeat端口輸入+輸出到kafka

服務(wù)器產(chǎn)生日志(filebeat)---> logstash服務(wù)器---->kafka服務(wù)器

配置文件:./config/logstash-5.conf

input {
  beats {
    port => 5044
 
  }
}
output {
  kafka {
  bootstrap_servers => "192.168.182.110:9092"
  topic_id => "messages"
 
  }
}

啟動(dòng)服務(wù)

./bin/logstash -f ./config/logstash-5.conf

7)測(cè)試7:filebeat數(shù)據(jù)采集+kafka讀取當(dāng)輸入+logstash處理+輸出到 ES

服務(wù)器產(chǎn)生日志(filebeat)---> kafka服務(wù)器__抽取數(shù)據(jù)___> logstash服務(wù)器---->ES

圖片圖片

logstash的配置:./config/logstash-6.conf

input {
    kafka {
    bootstrap_servers => "10.82.192.110:9092"
    topics => ["messages"]
    }
}
output {
    elasticsearch {
    hosts => ["10.82.192.110:9200"]
    index => "messageslog-%{+YYYY-MM-dd}"
  }
}

filebeat.yml output.kafka 配置:

# ------------------------------ KAFKA Output -------------------------------
output.kafka:
  eanbled: true
  hosts: ["10.82.192.110:9092"]
  version: "2.0.1"
  topic: '%{[fields][log_topic]}'
  partition.round_robin:
    reachable_only: true
  worker: 2
  required_acks: 1
  compression: gzip
  max_message_bytes: 10000000

使用 systemctl 啟動(dòng) filebeat

# vi /usr/lib/systemd/system/filebeat.service

[Unit]
Descriptinotallow=filebeat server daemon
Documentatinotallow=/opt/filebeat-7.6.2-linux-x86_64/filebeat -help
Wants=network-online.target
After=network-online.target
 
[Service]
User=root
Group=root
Envirnotallow="BEAT_CONFIG_OPTS=-c /opt/filebeat-7.6.2-linux-x86_64/filebeat.yml"
ExecStart=/opt/filebeat-7.6.2-linux-x86_64/filebeat $BEAT_CONFIG_OPTS
Restart=always
 
[Install]
WantedBy=multi-user.target

使用 systemctl 啟動(dòng) logstash

# vi /usr/lib/systemd/system/logstash.service

[Unit]
Descriptinotallow=logstash

[Service]
User=root
ExecStart=/opt/logstash-8.11.1/bin/logstash -f /opt/logstash-8.11.1/config/logstash-6.conf
Restart=always

[Install]
WantedBy=multi-user.target

啟動(dòng)服務(wù)

systemctl start logstash
systemctl status logstash

四、Logstash filter常用插件

負(fù)責(zé)數(shù)據(jù)處理與轉(zhuǎn)換(filters modify them),常用:grok、mutate、drop、clone、geoip;官網(wǎng)文檔:https://www.elastic.co/guide/en/logstash/7.17/filter-plugins.html

1)使用grok內(nèi)置的正則案例

grok 插件:Grok是將非結(jié)構(gòu)化日志數(shù)據(jù)解析為結(jié)構(gòu)化和可查詢內(nèi)容的好方法,底層原理是基于正則匹配任意文本格式

此工具非常適合syslog日志、apache和其他Web服務(wù)器日志、mysql日志,以及一般來說,任何通常為人類而不是計(jì)算機(jī)消費(fèi)編寫的日志格式。

grok內(nèi)置了120種匹配模式,也可以自定義匹配模式:https://github.com/logstash-plugins/logstash-patterns-core/tree/master/patterns

filebeat配置:filebeat.yml

##
filebeat.inputs:
- type: log
  enabled: true
  paths:
    - /var/log/messages

output.logstash:
  #指定logstash監(jiān)聽的IP和端口
  hosts: ["192.168.182.110:5044"]

logstash 配置:stdin-grok-stout.conf

cat >> stdin-grok-stout.conf << EOF
input {
  #監(jiān)聽的類型
  beats {
  #監(jiān)聽的本地端口
    port => 5044
  }
}

filter{
  grok{ 
   #match => { "message" => "%{COMBINEDAPACHELOG}" } 
   #上面的"COMBINEDAPACHELOG"變量官方github上已經(jīng)廢棄,建議使用下面的匹配模式 
   #參考地址:https://github.com/logstash-plugins/logstash-patterns-core/blob/main/patterns/legacy/httpd
   match => { "message" => "%{HTTPD_COMBINEDLOG}" }
  }

}

output {
  stdout {}

  elasticsearch {
    #定義es集群的主機(jī)地址
    hosts => ["192.168.182.110:9200"]
    #定義索引名稱
    index => "hqt-application-pro-%{+YYYY.MM.dd}"
  }
}
EOF

2)使用grok自定義的正則案例

參考官網(wǎng)地址:https://www.elastic.co/guide/en/logstash/7.17/plugins-filters-grok.html

配置如下:

cat >> stdin-grok_custom_patterns-stdout.conf << EOF
input {
 stdin {}
}

filter {
  grok {
    #指定模式匹配的目錄,可以使用絕對(duì)路徑
    #在./patterns目錄下隨便創(chuàng)建一文件,并寫入以下匹配模式
    # ORDER_ID [\u4e00-\u9fa5]{10,11}:[0-9A-F]{10,11}
    patterns_dir => ["./patterns"]
    #匹配模式
    #測(cè)試數(shù)據(jù)為:app_name:gotone-payment-api,client_ip:,context:,docker_name:,env:dev,exception:,extend1:,level:INFO,line:-1,log_message:com.gotone.paycenter.controller.task.PayCenterJobHandler.queryPayOrderTask-request:[\\],log_time:2022-11-23 00:00:00.045,log_type:applicationlog,log_version:1.0.0,本次成交的訂單編號(hào)為:BEF25A72965,parent_span_id:,product_line:,server_ip:,server_name:gotone-payment-api-c86658cb7-tc8k5,snooper:,span:0,span_id:,stack_message:,threadId:104,trace_id:,user_log_type:
    match => { "message" => "%{ORDER_ID:test_order_id}" }
  }
}

output {
  stdout {}
}
EOF

3)filter插件通用字段案例(添加/刪除字段、tag)

原有字段(nginx的json解析日志)

配置如下:

cat >> stdin-remove_add_field-stout.conf << EOF
input {
  beats {
    port => 5044
  }
}

filter {
  mutate {
    #移除指定的字段,使用逗號(hào)分隔
    remove_field => [ "tags","agent","input","log","ecs","version","@version","ident","referrer","auth" ]


    #添加指定的字段,使用逗號(hào)分隔
    #"%{clientip}"使用%可以將已有字段的值當(dāng)作變量使用
    add_field => {
     "app_name" => "nginx"
     "test_clientip" => "clientip---->%{clientip}"
    }


    #添加tag
    add_tag => [ "linux","web","nginx","test" ]


    #移除tag
    remove_tag => [ "linux","test" ]

  }
}

output {
  stdout {}
}
EOF

4)date 插件修改寫入ES的時(shí)間案例

測(cè)試日志:如下是我們要收集的一條json格式的日志

{"app_name":"gotone-payment-api","client_ip":"","context":"","docker_name":"","env":"dev","exception":"","extend1":"","level":"INFO","line":68,"log_message":"現(xiàn)代金控支付查詢->調(diào)用入?yún){}]","log_time":"2022-11-23 00:00:00.051","log_type":"applicationlog","log_version":"1.0.0","method_name":"com.gotone.paycenter.dao.third.impl.modernpay.ModernPayApiAbstract.getModernPayOrderInfo","parent_span_id":"","product_line":"","server_ip":"","server_name":"gotone-payment-api-c86658cb7-tc8k5","snooper":"","span":0,"span_id":"","stack_message":"","threadId":104,"trace_id":"gotone-payment-apib4a65777-ce6b-4bcc-8aef-71a7cfffaf2c","user_log_type":""}

配置如下:

cat >> stdin-date-es.conf << EOF
input {
  file {
    #指定收集的路徑
    path => "/var/log/messages"
  }
}


filter {

  json {
  #JSON解析器 可以將json形式的數(shù)據(jù)轉(zhuǎn)換為logstash實(shí)際的數(shù)據(jù)結(jié)構(gòu)(根據(jù)key:value拆分成字段形式)
    source => "message"
  }


  date {
    #匹配時(shí)間字段并解析
    match => [ "log_time", "yyyy-MM-dd HH:mm:ss.SSS" ]
    #將匹配到的時(shí)間字段解析后存儲(chǔ)到目標(biāo)字段,默認(rèn)字段為"@timestamp"
    target => "@timestamp"
    timezone => "Asia/Shanghai"
  }

}

output {
  stdout {}

  elasticsearch {
    #定義es集群的主機(jī)地址
    hosts => ["192.168.182.110:9200"]
    #定義索引名稱
    index => "hqt-application-pro-%{+YYYY.MM.dd}"
  }
}
EOF

5)geoip分析原IP地址位置案例

測(cè)試數(shù)據(jù)為:nginx的json格式日志

{"@timestamp":"2022-12-18T03:27:10+08:00","host":"10.0.24.2","clientip":"114.251.122.178","SendBytes":4833,"responsetime":0.000,"upstreamtime":"-","upstreamhost":"-","http_host":"43.143.242.47","uri":"/index.html","domain":"43.143.242.47","xff":"-","referer":"-","tcp_xff":"-","http_user_agent":"Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/51.0.2704.103 Safari/537.36","status":"200"}

配置如下:

cat >> beats-geoip-stdout.conf << EOF
input {
  file {
    #指定收集的路徑
    path => "/var/log/test.log"
  }
}

filter {
  json {
  #JSON解析器 可以將json形式的數(shù)據(jù)轉(zhuǎn)換為logstash實(shí)際的數(shù)據(jù)結(jié)構(gòu)(根據(jù)key:value拆分成字段形式)
    source => "message"
  }

  geoip {
    #指定基于哪個(gè)字段分析IP地址
    source => "client_ip"
    #指定IP地址分析模塊所使用的數(shù)據(jù)庫,默認(rèn)為GeoLite2-City.mmdb(這里必須再次指定以下,否則不會(huì)顯示城市)
    database => "/hqtbj/hqtwww/logstash_workspace/data/plugins/filters/geoip/CC/GeoLite2-City.mmdb"
    #如果期望查看指定的字段,則可以在這里配置,若不配置,表示顯示所有的查詢字段
    #fields => ["city_name","country_name","ip"]
    #指定geoip的輸出字段,當(dāng)有多個(gè)IP地址需要分析時(shí)(例如源IP和目的IP),則該字段非常有效
    #target => "test-geoip-nginx"
  }

}

output {
  stdout {}
}
EOF

GeoLite2-City.mmdb 下載:https://dev.maxmind.com/geoip/geolite2-free-geolocation-data

圖片圖片

7)mutate組件常用案例

mutate 測(cè)試數(shù)據(jù) python 腳本:

cat >> generate_log.py << EOF
#!/usr/bin/env python
# -*- coding: UTF-8 -*-
# @author : oldboyedu-linux80
import datetime
import random
import logging
import time
import sys
LOG_FORMAT = "%(levelname)s %(asctime)s [com.oldboyedu.%(module)s] - %(message)s "
DATE_FORMAT = "%Y-%m-%d %H:%M:%S"
# 配置root的logging.Logger實(shí)例的基本配置
logging.basicConfig(level=logging.INFO, format=LOG_FORMAT, datefmt=DATE_FORMAT, filename=sys.argv[1], filemode='a',)
actions = ["瀏覽??", "評(píng)論商品", "加?收藏", "加?購物?", "提交訂單", "使?優(yōu)惠券", "領(lǐng)取優(yōu)惠券", "搜索", "查看訂單", "付款", "清空購物?"]
while True: 
    time.sleep(random.randint(1, 5))
    user_id = random.randint(1, 10000)
# 對(duì)?成的浮點(diǎn)數(shù)保留2位有效數(shù)字.
    price = round(random.uniform(15000, 30000),2)
    action = random.choice(actions)
    svip = random.choice([0,1])
    logging.info("DAU|{0}|{1}|{2}|{3}".format(user_id, action,svip,price))
EOF


# python generate_log.py  /tmp/app.log

8)logstash的多if分支案例

配置如下:

cat >> homework-to-es.conf << EOF
input {
  beats {
    type => "test-nginx-applogs"
    port => 5044
  } 
  file {
    type => "test-product-applogs"
    path => "/tmp/app.logs"
  }
  beats {
    type => "test-dw-applogs"
    port => 8888
  }
  file { 
    type => "test-payment-applogs"
    path => "/tmp/payment.log"
  } 
}


filter {
  if [type] == "test-nginx-applogs"{
    mutate {
      remove_field => [ "tags","agent","input","log","ecs","version","@version","ident","referrer","auth","xff","referer","upstreamtime","upstreamhost","tcp_xff"]
    }
    geoip {
     source => "clientip"
     database => "/hqtbj/hqtwww/logstash_workspace/data/plugins/filters/geoip/CC/GeoLite2-City.mmdb"
    }
    useragent {
     source => "http_user_agent"
    }
  } 

  if [type] == "test-product-applogs" {
    mutate {
     split => { "message" => "|" }
    }
    mutate {
      add_field => {
        "user_id" => "%{[message][1]}"
        "action" => "%{[message][2]}"
        "svip" => "%{[message][3]}"
        "price" => "%{[message][4]}"
      }
    }
    mutate {
      convert => {
      "user_id" => "integer"
      "svip" => "boolean"
      "price" => "float"
      }
    }
  } 

  if [type] in [ "test-dw-applogs","test-payment-applogs" ] {
    json {
      source => "message"
    }
    date {
      match => [ "log_time", "yyyy-MM-dd HH:mm:ss.SSS" ]
      target => "@timestamp"
    }
  }
}


output {
  stdout {}
  if [type] == "test-nginx-applogs" { 
    elasticsearch {
      hosts => ["192.168.182.110:9200"]
      index => "test-nginx-logs-%{+YYYY.MM.dd}" 
    }
  }

  if [type] == "test-product-applogs" {
    elasticsearch {
      hosts => ["192.168.182.110:9200"]
      index => "test-product-applogs-%{+YYYY.MM.dd}"    
    }
  }

  if [type] in [ "test-dw-applogs","test-payment-applogs" ] {
    elasticsearch {
      hosts => ["192.168.182.110:9200"]
      index => "test-center-applogs-%{+YYYY.MM.dd}"
    }
  }
}
EOF

責(zé)任編輯:武曉燕 來源: 大數(shù)據(jù)與云原生技術(shù)分享
相關(guān)推薦

2023-02-24 07:42:30

Java動(dòng)態(tài)代理

2010-01-28 16:01:18

Android Jni

2023-05-03 22:09:02

Hive分區(qū)工具,

2022-06-27 10:26:37

枚舉Java

2009-12-24 13:51:49

WPF文檔打印

2018-04-16 10:12:46

Linux命令gunzip

2010-04-30 16:19:17

Unix內(nèi)核

2024-02-19 16:23:11

2023-05-09 07:46:32

2009-09-29 14:03:14

Hibernate數(shù)據(jù)

2009-12-14 11:33:59

Ruby正則表達(dá)式

2009-12-10 15:09:46

PHP搜索引擎類

2009-11-30 14:08:42

PHP字符串原理

2009-12-03 16:39:09

phpCB批量轉(zhuǎn)換

2023-05-06 07:15:59

Hive內(nèi)置函數(shù)工具

2009-11-30 17:11:53

PHP函數(shù)preg_g

2010-03-05 15:01:29

Python解析XML

2009-11-09 09:23:10

WCF數(shù)據(jù)契約

2020-11-03 10:16:24

Hive數(shù)據(jù)傾斜Hive SQL

2011-03-23 09:31:42

LAMP安裝LAMP配置
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)