logstash配置文件
1. 安裝? logstash
安裝過程很簡單,直接參照官方文檔: https://www.elastic.co/guide/en/logstash/current/installing-logstash.html
# rpm --import https://artifacts.elastic.co/GPG-KEY-elasticsearch # vim /etc/yum.repos.d/logstash.repo [logstash-6.x] name=Elastic repository for 6.x packages baseurl=https://artifacts.elastic.co/packages/6.x/yum gpgcheck=1 gpgkey=https://artifacts.elastic.co/GPG-KEY-elasticsearch enabled=1 autorefresh=1 type=rpm-md # sudo yum install logstash# ln -s /usr/share/logstash/bin/logstash /usr/bin/logstash # 可以對logstash可執行文件建立一個軟鏈接,便于直接使用logstash命令
?
2. logstash配置文件說明? logstash
logstash.yml? 主配置文件
# cat /etc/logstash/logstash.yml |grep -v ^# path.data: /data/logstash #數據存儲路徑 path.config: /etc/logstash/conf.d/*.conf #配置文件目錄 path.logs: /var/log/logstash #日志輸出路徑# mkdir -p /data/logstash #創建data目錄
# chown logstash.logstash /data/logstash #授權
?jvm.options? ? 這個配置文件是有關jvm的配置,可以配置運行時內存的最大最小值,垃圾清理機制等
-Xms256m #設置內存大小 -Xmx256mstartup.options? ?logstash運行相關的參數
配置文件是寫在/etc/logstash/conf.d/ 下,以.conf結尾。
?
?3. logstash配置文件
logstash pipeline 包含兩個必須的元素:input和output,和一個可選元素:filter。
?從input讀取事件源,(經過filter解析和處理之后),從output輸出到目標存儲庫(elasticsearch或其他)。
?
運行一個最基本的logstash測試一下:
# logstash -e'input {stdin {}} output {stdout {}}'看到? - Successfully started Logstash API endpoint {:port=>9600}? 這條信息后 說明logstash已經成功啟動,這時輸入你要測試的內容
這只是一個測試事件,生產環境使用logstash,一般使用都將配置寫入文件里面,然后啟動logstash。
?例如,我要處理nginx日志,我先在/etc/logstash/conf.d 下創建一個 nginx_access.conf的日志。
# cat nginx_access.conf input{file{path => "/var/log/nginx/access.log"start_position => "beginning"type => "nginx_access_log"} } filter{grok{match => {"message" => "%{IPORHOST:clientip} %{USER:ident} %{USER:auth} \[%{HTTPDATE:timestamp}\] \"%{WORD:verb} %{DATA:request} HTTP/%{NUMBER:httpversion}\" %{NUMBER:response:int} (?:-|%{NUMBER:bytes:int}) \"(?:-|%{DATA:referrer})\" \"%{DATA:user_agent}\" (?:%{IP:proxy}|-) %{DATA:upstream_addr} %{NUMBER:upstream_request_time:float} %{NUMBER:upstream_response_time:float}"}match => {"message" => "%{IPORHOST:clientip} %{USER:ident} %{USER:auth} \[%{HTTPDATE:timestamp}\] \"%{WORD:verb} %{DATA:request} HTTP/%{NUMBER:httpversion}\" %{NUMBER:response:int} (?:-|%{NUMBER:bytes:int}) \"%{DATA:referrer}\" \"%{DATA:user_agent}\" \"%{DATA:proxy}\""}}if [request] {urldecode {field => "request"}ruby {init => "@kname = ['url_path','url_arg']"code => " new_event = LogStash::Event.new(Hash[@kname.zip(event.get('request').split('?'))])event.append(new_event)" }if [url_arg] {ruby {init => "@kname = ['key', 'value']"code => "event.set('url_args', event.get('url_arg').split('&').collect {|i| Hash[@kname.zip(i.split('='))]})"}}}geoip{source => "clientip"}useragent{source => "user_agent"target => "ua"remove_field => "user_agent"}date {match => ["timestamp","dd/MMM/YYYY:HH:mm:ss Z"]locale => "en"}mutate{remove_field => ["message","timestamp","request","url_arg"]} } output{elasticsearch { hosts => "localhost:9200"index => "nginx-access-log-%{+YYYY.MM.dd}" }# stdout {
# codec => rubydebug
# } }
?
如果是想測試配置文件寫的是否正確,用下面這個方式啟動測試一下
/usr/share/logstash/bin/logstash -t -f /etc/logstash/conf.d/nginx.conf #測試配置文件 Configuration OK /usr/share/logstash/bin/logstash -f /etc/logstash/conf.d/nginx_access.conf #啟動logstash啟動logstash
# systemctl start logstash?
logstash的配置詳解
input plugin? 讓logstash可以讀取特定的事件源。
?官網:https://www.elastic.co/guide/en/logstash/current/input-plugins.html
? 事件源可以是從stdin屏幕輸入讀取,可以從file指定的文件,也可以從es,filebeat,kafka,redis等讀取
- stdin 標準輸入
- file ? 從文件讀取數據 file{path => ['/var/log/nginx/access.log'] #要輸入的文件路徑type => 'nginx_access_log'start_position => "beginning" } # path 可以用/var/log/*.log,/var/log/**/*.log,如果是/var/log則是/var/log/*.log # type 通用選項. 用于激活過濾器 # start_position 選擇logstash開始讀取文件的位置,begining或者end。 還有一些常用的例如:discover_interval,exclude,sincedb_path,sincedb_write_interval等可以參考官網
- syslog ?通過網絡將系統日志消息讀取為事件 syslog{port =>"514" type => "syslog" } # port 指定監聽端口(同時建立TCP/UDP的514端口的監聽)#從syslogs讀取需要實現配置rsyslog: # cat /etc/rsyslog.conf 加入一行 *.* @172.17.128.200:514 #指定日志輸入到這個端口,然后logstash監聽這個端口,如果有新日志輸入則讀取 # service rsyslog restart #重啟日志服務
- beats ??從Elastic beats接收事件 beats {port => 5044 #要監聽的端口 } # 還有host等選項# 從beat讀取需要先配置beat端,從beat輸出到logstash。 # vim /etc/filebeat/filebeat.yml .......... output.logstash: hosts: ["localhost:5044"]
- kafka ?將 kafka topic 中的數據讀取為事件 kafka{bootstrap_servers=> "kafka01:9092,kafka02:9092,kafka03:9092"topics => ["access_log"]group_id => "logstash-file"codec => "json"
}
kafka{bootstrap_servers=> "kafka01:9092,kafka02:9092,kafka03:9092"topics => ["weixin_log","user_log"] codec => "json" }
# bootstrap_servers 用于建立群集初始連接的Kafka實例的URL列表。
# topics 要訂閱的主題列表,kafka topics
# group_id 消費者所屬組的標識符,默認為logstash。kafka中一個主題的消息將通過相同的方式分發到Logstash的group_id # codec 通用選項,用于輸入數據的編解碼器。
還有很多的input插件類型,可以參考官方文檔來配置。
filter plugin 過濾器插件,對事件執行中間處理
- grok ??解析文本并構造 。把非結構化日志數據通過正則解析成結構化和可查詢化?? grok {match => {"message"=>"^%{IPORHOST:clientip} %{USER:ident} %{USER:auth} \[%{HTTPDATE:timestamp}\] "%{WORD:verb} %{DATA:request} HTTP/%{NUMBER:httpversion}" %{NUMBER:response:int} (?:-|%{NUMBER:bytes:int}) %{QS:referrer} %{QS:agent}$"}} 匹配nginx日志 # 203.202.254.16 - - [22/Jun/2018:16:12:54 +0800] "GET / HTTP/1.1" 200 3700 "-" "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_11_6) AppleWebKit/601.7.7 (KHTML, like Gecko) Version/9.1.2 Safari/601.7.7" #220.181.18.96 - - [13/Jun/2015:21:14:28 +0000] "GET /blog/geekery/xvfb-firefox.html HTTP/1.1" 200 10975 "-" "Mozilla/5.0 (compatible; Baiduspider/2.0; +http://www.baidu.com/search/spider.html)" View Code
- ?注意這里grok 可以有多個match匹配規則,如果前面的匹配失敗可以使用后面的繼續匹配。例如 grok {match => ["message", "%{IP:clientip} - %{USER:user} \[%{HTTPDATE:raw_datetime}\] \"(?:%{WORD:verb} %{URIPATHPARAM:request} HTTP/%{NUMBER:httpversion})\" (?:\"%{DATA:body}\" )?(?:\"%{DATA:cookie}\" )?%{NUMBER:response} (?:%{NUMBER:bytes:int}|-) \"%{DATA:referrer}\" \"%{DATA:agent}\" (?:(%{IP:proxy},? ?)*|-|unknown) (?:%{DATA:upstream_addr} |)%{NUMBER:request_time:float} (?:%{NUMBER:upstream_time:float}|-)"]match => ["message", "%{IP:clientip} - %{USER:user} \[%{HTTPDATE:raw_datetime}\] \"(?:%{WORD:verb} %{URI:request} HTTP/%{NUMBER:httpversion})\" (?:\"%{DATA:body}\" )?(?:\"%{DATA:cookie}\" )?%{NUMBER:response} (?:%{NUMBER:bytes:int}|-) \"%{DATA:referrer}\" \"%{DATA:agent}\" (?:(%{IP:proxy},? ?)*|-|unknown) (?:%{DATA:upstream_addr} |)%{NUMBER:request_time:float} (?:%{NUMBER:upstream_time:float}|-)"] } View Code
? ? ? grok 語法:%{SYNTAX:SEMANTIC} ? 即 %{正則:自定義字段名}
? ? ? ? ? ? ? ? ? ? ?官方提供了很多正則的grok pattern可以直接使用? :https://github.com/logstash-plugins/logstash-patterns-core/blob/master/patterns??
? ? ? ? ? ? ? ? ? ? ?grok debug工具: http://grokdebug.herokuapp.com
正則表達式調試工具: https://www.debuggex.com/
需要用到較多的正則知識,參考文檔有:https://www.jb51.net/tools/zhengze.html
? ? ? ? ?自定義模式: ??(?<字段名>the pattern)
? ? ? ? 例如: 匹配 2018/06/27 14:00:54 ?
? ? ? ? ? ? ? ? (?<datetime>\d\d\d\d\/\d\d\/\d\d \d\d:\d\d:\d\d)
? ? ? ? ? 得到結果:??"datetime": "2018/06/27 14:00:54"
??
- date ? 日期解析 ?解析字段中的日期,然后轉存到@timestamp [2018-07-04 17:43:35,503] grok{match => {"message"=>"%{DATA:raw_datetime}"} } date{match => ["raw_datetime","YYYY-MM-dd HH:mm:ss,SSS"]remove_field =>["raw_datetime"] }#將raw_datetime存到@timestamp 然后刪除raw_datetime#24/Jul/2018:18:15:05 +0800 date {match => ["timestamp","dd/MMM/YYYY:HH:mm:ss Z] } View Code
- mutate ?對字段做處理?重命名、刪除、替換和修改字段。
- covert?類型轉換。類型包括:integer,float,integer_eu,float_eu,string和boolean filter{mutate{ # covert => ["response","integer","bytes","float"] #數組的類型轉換convert => {"message"=>"integer"}} } #測試-------> {"host" => "localhost","message" => 123, #沒帶“”,int類型"@timestamp" => 2018-06-26T02:51:08.651Z,"@version" => "1" } View Code
- split ? 使用分隔符把字符串分割成數組 mutate{split => {"message"=>","} } #----------> aaa,bbb {"@timestamp" => 2018-06-26T02:40:19.678Z,"@version" => "1","host" => "localhost","message" => [[0] "aaa",[1] "bbb"]} 192,128,1,100 {"host" => "localhost","message" => [[0] "192",[1] "128",[2] "1",[3] "100"],"@timestamp" => 2018-06-26T02:45:17.877Z,"@version" => "1" } View Code
- merge ?合并字段 ?。數組和字符串 ,字符串和字符串 filter{mutate{add_field => {"field1"=>"value1"}}mutate{ split => {"message"=>"."} #把message字段按照.分割 }mutate{merge => {"message"=>"field1"} #將filed1字段加入到message字段 } } #---------------> abc {"message" => [[0] "abc,"[1] "value1"],"@timestamp" => 2018-06-26T03:38:57.114Z,"field1" => "value1","@version" => "1","host" => "localhost" }abc,.123 {"message" => [[0] "abc,",[1] "123",[2] "value1"],"@timestamp" => 2018-06-26T03:38:57.114Z,"field1" => "value1","@version" => "1","host" => "localhost" } View Code
- rename ? 對字段重命名 filter{mutate{rename => {"message"=>"info"}} } #--------> 123 {"@timestamp" => 2018-06-26T02:56:00.189Z,"info" => "123","@version" => "1","host" => "localhost" } View Code
- remove_field ? ?移除字段 mutate {remove_field => ["message","datetime"] } View Code
- join ?用分隔符連接數組,如果不是數組則不做處理 mutate{split => {"message"=>":"} } mutate{join => {"message"=>","} } ------> abc:123 {"@timestamp" => 2018-06-26T03:55:41.426Z,"message" => "abc,123","host" => "localhost","@version" => "1" } aa:cc {"@timestamp" => 2018-06-26T03:55:47.501Z,"message" => "aa,cc","host" => "localhost","@version" => "1" } View Code
-
- gsub ?用正則或者字符串替換字段值。僅對字符串有效 mutate{gsub => ["message","/","_"] #用_替換/ }------> a/b/c/ {"@version" => "1","message" => "a_b_c_","host" => "localhost","@timestamp" => 2018-06-26T06:20:10.811Z } View Code
- update ?更新字段。如果字段不存在,則不做處理 mutate{add_field => {"field1"=>"value1"}}mutate{update => {"field1"=>"v1"}update => {"field2"=>"v2"} #field2不存在 不做處理 } ----------------> {"@timestamp" => 2018-06-26T06:26:28.870Z,"field1" => "v1","host" => "localhost","@version" => "1","message" => "a" } View Code
- replace 更新字段。如果字段不存在,則創建 mutate{add_field => {"field1"=>"value1"}}mutate{replace => {"field1"=>"v1"}replace => {"field2"=>"v2"}} ----------------------> {"message" => "1","host" => "localhost","@timestamp" => 2018-06-26T06:28:09.915Z,"field2" => "v2", #field2不存在,則新建"@version" => "1","field1" => "v1" } View Code
- geoip ?根據來自Maxmind GeoLite2數據庫的數據添加有關IP地址的地理位置的信息 geoip {source => "clientip"database =>"/tmp/GeoLiteCity.dat"} View Code
- ruby? ? ruby插件可以執行任意Ruby代碼 filter{urldecode{field => "message"}ruby {init => "@kname = ['url_path','url_arg']"code => " new_event = LogStash::Event.new(Hash[@kname.zip(event.get('message').split('?'))]) event.append(new_event)" }if [url_arg]{kv{source => "url_arg"field_split => "&"target => "url_args"remove_field => ["url_arg","message"]}} } # ruby插件 # 以?為分隔符,將request字段分成url_path和url_arg --------------------> www.test.com?test {"url_arg" => "test","host" => "localhost","url_path" => "www.test.com","message" => "www.test.com?test", "@version" => "1","@timestamp" => 2018-06-26T07:31:04.887Z } www.test.com?title=elk&content=學習elk {"url_args" => {"title" => "elk","content" => "學習elk"},"host" => "localhost","url_path" => "www.test.com","@version" => "1","@timestamp" => 2018-06-26T07:33:54.507Z } View Code
- urldecode ? ?用于解碼被編碼的字段,可以解決URL中 中文亂碼的問題 urldecode{field => "message"}# field :指定urldecode過濾器要轉碼的字段,默認值是"message" # charset(缺省): 指定過濾器使用的編碼.默認UTF-8 View Code
- kv ? 通過指定分隔符將字符串分割成key/value kv{prefix => "url_" #給分割后的key加前綴target => "url_ags" #將分割后的key-value放入指定字段source => "message" #要分割的字段field_split => "&" #指定分隔符remove_field => "message"} --------------------------> a=1&b=2&c=3 {"host" => "localhost","url_ags" => {"url_c" => "3","url_a" => "1","url_b" => "2"},"@version" => "1","@timestamp" => 2018-06-26T07:07:24.557Z View Code
- useragent 添加有關用戶代理(如系列,操作系統,版本和設備)的信息 if [agent] != "-" {useragent {source => "agent"target => "ua"remove_field => "agent"} } # if語句,只有在agent字段不為空時才會使用該插件 #source 為必填設置,目標字段 #target 將useragent信息配置到ua字段中。如果不指定將存儲在根目錄中 View Code
logstash 比較運算符
等于:? ?==, !=, <, >, <=, >=
正則:? ?=~, !~ (checks a pattern on the right against a string value on the left)
包含關系:? in, not in
支持的布爾運算符:and, or, nand, xor
支持的一元運算符:?!
output plugin? 輸出插件,將事件發送到特定目標。over
- stdout? 標準輸出。將事件輸出到屏幕上 output{stdout{codec => "rubydebug"} }
- file? ?將事件寫入文件 file {path => "/data/logstash/%{host}/{application}codec => line { format => "%{message}"} }}
- kafka? 將事件發送到kafka kafka{bootstrap_servers => "localhost:9092"topic_id => "test_topic" #必需的設置。生成消息的主題
}
- elasticseach? 在es中存儲日志 elasticsearch {hosts => "localhost:9200"index => "nginx-access-log-%{+YYYY.MM.dd}" } #index 事件寫入的索引。可以按照日志來創建索引,以便于刪舊數據和按時間來搜索日志
?
補充一個codec plugin?編解碼器插件
codec 本質上是流過濾器,可以作為input 或output 插件的一部分運行。例如上面output的stdout插件里有用到。
- multiline codec plugin? 多行合并, 處理堆棧日志或者其他帶有換行符日志需要用到 input {stdin {codec => multiline {pattern => "pattern, a regexp" #正則匹配規則,匹配到的內容按照下面兩個參數處理negate => "true" or "false" # 默認為false。處理匹配符合正則規則的行。如果為true,處理不匹配符合正則規則的行。what => "previous" or "next" #指定上下文。將指定的行是合并到上一行或者下一行。 }} } codec => multiline {pattern => "^\s" what => "previous" } # 以空格開頭的行都合并到上一行 codec => multiline {# Grok pattern names are valid! :)pattern => "^%{TIMESTAMP_ISO8601} "negate => truewhat => "previous" } # 任何不以這個時間戳格式開頭的行都與上一行合并 codec => multiline {pattern => "\\$"what => "next" } # 以反斜杠結尾的行都與下一行合并
?
轉載于:https://www.cnblogs.com/xiaobaozi-95/p/9214307.html
總結
以上是生活随笔為你收集整理的logstash配置文件的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 1、spring+quartz关闭Tom
- 下一篇: jquery插件课程2 放大镜、多文件上