Elasticsearch Pipeline 详解
文章目錄
- Ingest Node
- 簡介 Ingest Node
- 簡介 Pipeline、Processors
- Pipeline 定義
- 簡介 Simulate Pipeline API
- 訪問 Pipeline 中的內(nèi)容
- Processors 類型詳解
- Append Processor
- Convert Processor
- Date Processor
- Date Index Name Processor
- Fail Processor
- Foreach Processor
- Grok Processor
- Gsub Processor
- Join Processor
- JSON Processor
- KV Processor
- Lowercase Processor
- Remove Processor
- Rename Processor
- Script Processor
- Set Processor
- Split Processor
- Sort Processor
- Trim Processor
- Uppercase Processor
- Dot Expander Processor
- 自定義 Processor
說明
本文是建立在有一些 Elasticsearch 基礎(chǔ)和了解相關(guān) Pipeline 概念的人
Ingest Node
簡介 Ingest Node
Ingest Node(預處理節(jié)點)是 ES 用于功能上命名的一種節(jié)點類型,可以通過在 elasticsearch.xml 進行如下配置來標識出集群中的某個節(jié)點是否是 Ingest Node.
node.ingest: false上述將 node.ingest 設(shè)置成 false,則表明當前節(jié)點不是 Ingest Node,不具有預處理能力,當然 Elasticsearch 默認所有節(jié)點都是 Ingest Node,即集群中所有的節(jié)點都具有預處理能力.
何為預處理呢?
用過 Logstash 對日志進行處理的用戶都知道,一般情況下我們并不會直接將原始的日志進行加載到 Elasticsearch 集群,而是對原始日志信息進行(深)加工后保存到 Elasticsearch 集群中.比如 Logstash 支持多種解析器比如 json,kv,date 等,比較經(jīng)典的是 grok.這里我們不會對 Logstash 的解析器進行詳細說明,只是為了描述一個問題,有些時候我們需要 Logstash 來對加載到 Elasticsearch 中的數(shù)據(jù)進行處理,這個處理,從概念上而言,我們也能稱之為"預處理.而這里我們所說的預處理也其實就是類似的概念.
可以這么說,在 Elasticsearch 沒有提供 IngestNode 這一概念時,我們想對存儲在 Elasticsearch 里的數(shù)據(jù)在存儲之前進行加工處理的話,我們只能依賴 Logstash 或自定義插件來完成這一功能,但是在 Elasticsearch 5.x 版本中,官方在內(nèi)部集成了部分 Logstash 的功能,這就是 Ingest,而具有 Ingest 能力的節(jié)點稱之為 Ingest Node.
請查看 Filter plugins 來了解更多關(guān)于 Logstash 解析器的內(nèi)容.
如果要脫離 Logstash 來對在 Elasticsearch 寫入文檔之前對文檔進行加工處理,比如為文檔某個字段設(shè)置默認值,重命名某個字段,設(shè)置通過腳本來完成更加復雜的加工邏輯,我們則必須要了解兩個基本概念: Pipeline 和 Processors.
參考: Ingest Node 來了解更多關(guān)于上述兩個概念.下文只是簡單說明兩者.
簡介 Pipeline、Processors
管道(Pipeline)是眾所周知的一個概念,Elasticsearch 引入這一概念,是為了讓那些有過工作經(jīng)驗的人來說更直白,更輕松的理解這一概念.本人是主要用 Java 進行開發(fā),這里就以 Pipeline 和 java 中的 Stream 進行類比,兩者從功能和概念上很類似,我們經(jīng)常會對 Stream 中的數(shù)據(jù)進行處理,比如 map 操作,peek 操作,reduce 操作,count 操作等,這些操作從行為上說,就是對數(shù)據(jù)的加工,而 Pipeline 也是如此,Pipeline 也會對通過該 Pipeline 的數(shù)據(jù)(一般來說是文檔)進行加工,比如上面說到的,修改文檔的某個字段值,修改文檔某個字段的類型等等.而 Elasticsearch 對該加工行為進行抽象包裝,并稱之為 Processors.Elasticsearch 命名了多種類型的 Processors 來規(guī)范對文檔的操作,比如 set,append,date,join,json,kv 等等.這些不同類型的 Processors,我們會在后文進行說明
Pipeline 定義
定義一個 Pipeline 是件很簡單的事情,官方給出了參考:
PUT _ingest/pipeline/my-pipeline-id {"description" : "describe pipeline","processors" : [{"set" : {"field": "foo","value": "bar"}}] }上面的例子,表明通過指定的 URL 請求"_ingest/pipeline"定義了一個 ID 為"my-pipeline-id"的 pipeline,其中請求體中的存在兩個必須要的元素:
- description 描述該 pipeline 是做什么的
- processors 定義了一系列的 processors,這里只是簡單的定義了一個賦值操作,即將字段名為"foo"的字段值都設(shè)置為"bar"
如果需要了解更多關(guān)于 Pipeline 定義的信息,可以參考: Ingest APIs
簡介 Simulate Pipeline API
既然 Elasticsearch 提供了預處理的能力,總不能是黑盒處理吧,為了讓開發(fā)者更好的了解和使用預處理的方式和原理,官方也提供了相關(guān)的接口,來讓我們對這些預處理操作進行測試,這些接口,官方稱之為: Simulate Pipeline API.想要深入了解 pipeline 以及 processors 的使用,我們基本離不開 Simulate Pipeline API 來輔助幫我們完成許多工作.
下面是一個比較簡單的 Simulate Pipeline API 的例子:
POST _ingest/pipeline/_simulate {"pipeline" : {// pipeline definition here},"docs" : [{ "_source": {/** first document **/} },{ "_source": {/** second document **/} },// ...] }上面的例子中,在請求 URL 中并沒有明確指定使用哪個 pipeline,這種情況下需要我們在請求體中即時定義一個,當然我們也可以在請求 URL 中指定一個 pipeline 的 ID,如下面例子:
POST _ingest/pipeline/my-pipeline-id/_simulate {"docs" : [{ "_source": {/** first document **/} },{ "_source": {/** second document **/} },// ...] }這樣一來,在請求體中我們定義的"docs"中的內(nèi)容就能夠使用該 pipeline 和該 pipeline 種的 processors,下面是一個比較具體的例子:
POST _ingest/pipeline/_simulate {"pipeline" :{"description": "_description","processors": [{"set" : {"field" : "field2","value" : "_value"}}]},"docs": [{"_index": "index","_type": "type","_id": "id","_source": {"foo": "bar"}},{"_index": "index","_type": "type","_id": "id","_source": {"foo": "rab"}}] }在上面具體的例子中,我們并沒有在請求 URL 中指定使用哪個 pipeline,因此我們不得不在請求體中即時定義一個 pipeline 和對應(yīng)的 processors,例子很簡單,只是讓通過該 pipeline 的的文檔中的字段名稱為"field2"的字段值為"_value",文檔我們也指定了,就是"docs"中定義的文檔,該請求的響應(yīng)信息如下:
{"docs": [{"doc": {"_id": "id","_index": "index","_type": "type","_source": {"field2": "_value","foo": "bar"},"_ingest": {"timestamp": "2017-05-04T22:30:03.187Z"}}},{"doc": {"_id": "id","_index": "index","_type": "type","_source": {"field2": "_value","foo": "rab"},"_ingest": {"timestamp": "2017-05-04T22:30:03.188Z"}}}] }從具體的響應(yīng)結(jié)果中看到,在文檔通過 pipeline 時(或理解為被 pipeline 中的 processors 加工后),新的文檔與原有的文檔產(chǎn)生了差異,這些差異體現(xiàn)為:
訪問 Pipeline 中的內(nèi)容
如果我們只是定義一個 Pipeline 而不能訪問該 Pipeline 的上下文信息,那么設(shè)計 Pipeline 就顯然是多此一舉.與 logstash 不同的時候,logstash 環(huán)境與 Elasticsearch 環(huán)境是隔離的,兩者無法形成上下文環(huán)境,而 Pipeline 則不同,天然的集成在 Elasticsearch 中,從而很好的與其形成上線文環(huán)境,這個環(huán)境讓 Pipeline 能夠更充分的利用起來,比如與 Pipeline 能夠形成最直接的上線文關(guān)系的是文檔信息,由此可見,我們可以在 Pipeline(應(yīng)該是 processors 中)能夠直接訪問通過 pipeline 的文檔信息,如文檔的字段,元數(shù)據(jù)信息.
下面的例子說明了我們?nèi)绾卧?processors 中訪問這些上下文信息.
訪問源文檔字段
{"set": {"field": "my_field""value": 582.1} }這個例子直接引用了通過該 Pipeline 內(nèi)的文檔字為"my_field"的字段,并將所有文檔該字段的值設(shè)置為 582.1.
或者我們也可以通過_source 字段來訪問源文檔的字段,如下
{"set": {"field": "_source.my_field""value": 582.1} }訪問文檔的元數(shù)據(jù)字段
每個文檔都會有一些元數(shù)據(jù)字段信息(metadata filed),比如_id,_index,_type 等,我們在 processors 中也可以直接訪問這些信息的,比如下面的例子:
{"set": {"field": "_id""value": "1"} }直接訪問文檔的_id 字段,并設(shè)置該值為 1
訪問瞬態(tài)元字段(ingest metadata field)
這個地方并沒有直接翻譯官方名稱,因為不好理解,我使用了我自己的理解意圖去翻譯該名稱.其實含義是一致的,官方稱之為ingest metadata field,翻譯過來且為"攝取元數(shù)據(jù)字段?"(請高人翻譯,我實在無法很好的翻譯出來),從官方對該詞的釋義中可以理解出來,這些字段是臨時保存的,在這些文檔被處理完成之后返回給對應(yīng)的批量請求和索引請求的時候,這些字段不會一并返回,因此這里稱之為"瞬態(tài)元字段",下面是一個如何訪問這些字段的例子:
{"set": {"field": "received""value": "{{_ingest.timestamp}}"} }該例子會在文檔中臨時加入一個名稱為"received"的字段,值為瞬態(tài)元字段中的 timestamp,以{{_ingest.timestamp}}的格式來進行訪問,看到{{}}是不是有種很熟悉的感覺,因為只有在涉及上下文的環(huán)境中一般才會使用這種表達式.
Processors 類型詳解
- Append Processor 追加處理器
- Convert Processor 轉(zhuǎn)換處理器
- Date Processor 日期轉(zhuǎn)換器
- Date Index Name Processor 日期索引名稱處理器
- Fail Processor 失敗處理器
- Foreach Processor 循環(huán)處理器
- Grok Processor Grok 處理器
- Gsub Processor
- Join Processor
- JSON Processor
- KV Processor
- Lowercase Processor
- Remove Processor
- Rename Processor
- Script Processor
- Split Processor
- Sort Processor
- Trim Processor
- Uppercase Processor
- Dot Expander Processor
Append Processor
顧名思義,追加處理器.就是當我們加載原始文檔到 Elasticsearch 的時候,某些字段的描述信息可能需要我們定制性的增加一些額外說明.
使用方法如下:
{"append": {"field": "field1","value": ["item2", "item3", "item4"]} }比如我們需要從第三方新導入一批商品,因為某種原因這批新商品必須要打上一個"家居"的標簽來表示該批商品是"家居分類",當然一個商品也可能不止一個分類,這個時候要求我們在導入這批文檔數(shù)據(jù)的時候,需要在原有的標簽字段中新增一個"家居"標簽,這個時候我們就通過追加處理器來完成.
下面的例子我們通過 Simulate Pipeline API 來幫助我們完成
POST _ingest/pipeline/_simulate {"pipeline": {"description": "新增一個家居標簽","processors": [{"append": {"field": "tag","value": ["家居"]}}]},"docs": [{"_index": "index","_type": "type","_id": "id","_source": {"code":"000001","tag": "衣柜"}},{"_index": "index","_type": "type","_id": "id","_source": {"code":"000002","tag": "沙發(fā)"}},{"_index": "index","_type": "type","_id": "id","_source": {"code":"000003"}}] }返回結(jié)果為:
{"docs": [{"doc": {"_index": "index","_type": "type","_id": "id","_source": {"code": "000001","tag": ["衣柜","家居"]},"_ingest": {"timestamp": "2017-11-24T13:19:55.555Z"}}},{"doc": {"_index": "index","_type": "type","_id": "id","_source": {"code": "000002","tag": ["沙發(fā)","家居"]},"_ingest": {"timestamp": "2017-11-24T13:19:55.555Z"}}},{"doc": {"_index": "index","_type": "type","_id": "id","_source": {"code": "000003","tag": ["家居"]},"_ingest": {"timestamp": "2017-11-24T13:19:55.555Z"}}}] }我們可以看到,通過該處理器處理之后,新的文檔中的 tag 字段都新增了一個"家居"標簽,同時如果原文檔沒有該字段,則會新建該字段.
Convert Processor
該類型的 Processor 是用于在寫入某些文檔之前對該文檔的字段類型進行轉(zhuǎn)換,比如文檔中有個字符串類型的"價格"字段,我們希望將其轉(zhuǎn)換成 float 類型,則我們可以使用該處理器來完成這項操作,實現(xiàn)如下:
POST _ingest/pipeline/_simulate {"pipeline": {"description": "修改字符串類型為double類型","processors": [{"convert": {"field": "price","type": "float"}}]},"docs": [{"_index": "index","_type": "type","_id": "id","_source": {"code":"000001","tag": "衣柜","price":"999.8"}},{"_index": "index","_type": "type","_id": "id","_source": {"code":"000002","tag": "沙發(fā)","price":"899.8"}},{"_index": "index","_type": "type","_id": "id","_source": {"code":"000003","price":"799.8"}}] }測試結(jié)果如下:
{"docs": [{"doc": {"_index": "index","_type": "type","_id": "id","_source": {"code": "000001","tag": "衣柜","price": 999.8},"_ingest": {"timestamp": "2017-11-26T10:56:47.663Z"}}},{"doc": {"_index": "index","_type": "type","_id": "id","_source": {"code": "000002","tag": "沙發(fā)","price": 899.8},"_ingest": {"timestamp": "2017-11-26T10:56:47.664Z"}}},{"doc": {"_index": "index","_type": "type","_id": "id","_source": {"code": "000003","price": 799.8},"_ingest": {"timestamp": "2017-11-26T10:56:47.664Z"}}}] }可以看到,新的文檔,price 字段將不再是字符串類型了,而是 float 類型.
使用 Convert Processor 需要注意的是,目前官方支持轉(zhuǎn)換的類型有如下幾個:integer, float, string, boolean, and auto。
關(guān)于更多該處理器的使用說明,請參考: https://www.elastic.co/guide/en/elasticsearch/reference/current/convert-processor.html
Date Processor
日期處理器,顧名思義,就是將原文檔中的某個日期字段轉(zhuǎn)換成一個 Elasticsearch 識別的時間戳字段(一般默認為@timestamp),該時間戳字段將會新增到原文檔中.(目前發(fā)現(xiàn)該功能比較雞肋,因為轉(zhuǎn)換后的時間格式只能是 ISO 8601 時間格式的,并不能轉(zhuǎn)換成其他格式,因此很少用)
下面是一個使用 Date Processor 的例子:
POST _ingest/pipeline/_simulate {"pipeline": {"description": "轉(zhuǎn)換成指定格式的日期格式字段","processors": [{"date": {"field": "date","formats": ["UNIX"],"timezone": "Asia/Shanghai"}}]},"docs": [{"_index": "index","_type": "type","_id": "id","_source": {"code":"000001","tag": "衣柜","price":"999.8","date":"1511696379"}}] }就是將原有的字符串表示的日期格式進行格式轉(zhuǎn)換(最終轉(zhuǎn)換成的是 ISO 時間格式),結(jié)果為:
{"docs": [{"doc": {"_index": "index","_type": "type","_id": "id","_source": {"date": "1511696379","code": "000001","@timestamp": "2017-11-26T19:39:39.000+08:00","price": "999.8","tag": "衣柜"},"_ingest": {"timestamp": "2017-11-26T12:01:16.787Z"}}}] }我們可以看到多了一個默認字段@timestamp,該字段名稱可以通過 target_field 字段來指定該字段名稱.
為什么說只能生成 ISO 格式的時間呢?這方面的資料官方?jīng)]有提供,只能自己去源碼中看,這里
public void execute(IngestDocument ingestDocument) {String value = ingestDocument.getFieldValue(field, String.class);DateTime dateTime = null;Exception lastException = null;for (Function<String, DateTime> dateParser : dateParsers) {try {dateTime = dateParser.apply(value);} catch (Exception e) {//try the next parser and keep track of the exceptionslastException = ExceptionsHelper.useOrSuppress(lastException, e);}}if (dateTime == null) {throw new IllegalArgumentException("unable to parse date [" + value + "]", lastException);}ingestDocument.setFieldValue(targetField, ISODateTimeFormat.dateTime().print(dateTime));}其中最后一行,在設(shè)置字段的時候,發(fā)現(xiàn)源碼中已經(jīng)寫死為 ISODateTimeFormat.個人感覺官方應(yīng)該會自定義 pattern.
Date Index Name Processor
Date Index Name Processor 算得上是一個比較強大的處理了,它的目的是讓通過該處理器的文檔能夠分配到符合指定時間格式的索引中,前提是按照官方提供的說明來進行使用:
我們先創(chuàng)建一個該類型的 pipeline,如下:
curl -XPUT 'localhost:9200/_ingest/pipeline/monthlyindex?pretty' -H 'Content-Type: application/json' -d' {"description": "monthly date-time index naming","processors" : [{"date_index_name" : {"field" : "date1","index_name_prefix" : "myindex-","date_rounding" : "M"}}] } '我們再創(chuàng)建一個文檔,如下(該文檔會使用該 pipeline)
curl -XPUT 'localhost:9200/myindex/type/1?pipeline=monthlyindex&pretty' -H 'Content-Type: application/json' -d' {"date1" : "2016-04-25T12:02:01.789Z" } '結(jié)果如下:
{"_index" : "myindex-2016-04-01","_type" : "type","_id" : "1","_version" : 1,"result" : "created","_shards" : {"total" : 2,"successful" : 1,"failed" : 0},"created" : true }以上請求將不會將此 document(文檔)放入 myindex 索引,而是放入到 myindex-2016-04-01 索引中.
這就是日期索引名稱處理器(Date Index Name Processor)強大的地方,我們寫入的文檔可以根據(jù)其中的某個日期格式的字段來指定該文檔將寫入哪個索引中,該功能配上 template,能夠?qū)崿F(xiàn)很強大的日志收集功能,比如按月按天來將日志寫入 Elasticsearch.
如果想了解 Date Index Name Processor 的使用,請參考:Date Index Name Processor
Fail Processor
該處理器比較簡單,就是當文檔通過該 pipeline 的時候,一旦出現(xiàn)異常,該 pipeline 指定的錯誤信息就會返回給請求者.
POST _ingest/pipeline/_simulate {"pipeline": {"description": "monthly date-time index naming","processors" : [{"fail": {"message": "an error message"}}]}, "docs": [{"_index": "myindex","_type": "type","_id": "id","_source": {"id":"1"}}] }結(jié)果如下:
{"docs": [{"error": {"root_cause": [{"type": "exception","reason": "java.lang.IllegalArgumentException: org.elasticsearch.ingest.common.FailProcessorException: an error message","header": {"processor_type": "fail"}}],"type": "exception","reason": "java.lang.IllegalArgumentException: org.elasticsearch.ingest.common.FailProcessorException: an error message","caused_by": {"type": "illegal_argument_exception","reason": "org.elasticsearch.ingest.common.FailProcessorException: an error message","caused_by": {"type": "fail_processor_exception","reason": "an error message"}},"header": {"processor_type": "fail"}}}] }Foreach Processor
一個 Foreach Processor 是用來處理一些數(shù)組字段,數(shù)組內(nèi)的每個元素都會使用到一個相同的處理器,比如
POST _ingest/pipeline/_simulate {"pipeline": {"description": "foreach processor","processors" : [{"foreach": {"field": "values","processor": {"uppercase": {"field": "_ingest._value"}}}}]}, "docs": [{"_index": "index","_type": "type","_id": "id","_source": {"id":"1","values":["hello","world","felayman"]}}] }上面的例子中,文檔中的 values 字段是一個數(shù)組類型的字段,其中每個元素都需要使用 Foreach Processor 中的一個共同的 uppercase Processor 來保證該字段中的每個元素都能執(zhí)行相同的操作
結(jié)果如下:
{"docs": [{"doc": {"_index": "index","_type": "type","_id": "id","_source": {"id": "1","values": ["HELLO","WORLD","FELAYMAN"]},"_ingest": {"_value": null,"timestamp": "2017-11-27T08:55:17.496Z"}}}] }關(guān)于每個 Processor,深入下去都有許多需要說明的,這里沒有深入,只是讓大家了解其基本使用。
詳情請參考: Foreach Processore
Grok Processor
Grok Processor 可以算的上是一個比較實用的處理器了,會經(jīng)常使用到日志格式切割上,有用過 logstash 的用戶應(yīng)該都知道 Grok 的強大.這里并不會涉及到到 Grok 詳細的語法知識.這里我們只是略過的說明 Elasticsearch 中的ingest node 中的 pipeline 也能提供 logstash 中的 Grok 的功能.
如下的使用例子:
POST _ingest/pipeline/_simulate {"pipeline": {"description": "grok processor","processors" : [{"grok": {"field": "message","patterns": ["%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:duration}"]}}]}, "docs": [{"_index": "index","_type": "type","_id": "id","_source": {"message": "55.3.244.1 GET /index.html 15824 0.043"}}] }上面例子中的日志可能是 nginx 的日志,格式為:
55.3.244.1 GET /index.html 15824 0.043對應(yīng)的 Grok 語法為:
%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:duration}因此該文本格式的日志信息在經(jīng)過 Grok Processor 之后,能夠解析成一個標準文檔的格式,該文檔可以使用 Elasticsearch 提供的檢索和聚合功能充分使用到,而原始的文本格式的日志信息則無法做到這一點
返回結(jié)果如下:
{"docs": [{"doc": {"_index": "index","_type": "type","_id": "id","_source": {"duration": "0.043","request": "/index.html","method": "GET","bytes": "15824","client": "55.3.244.1","message": "55.3.244.1 GET /index.html 15824 0.043"},"_ingest": {"timestamp": "2017-11-27T09:07:40.782Z"}}}] }可以看到,對應(yīng)的切割點都轉(zhuǎn)換成文檔中的一個字段.
關(guān)于 Grok Processor 的詳細介紹,請參考:Grok Processor
Gsub Processor
Gsub Processor 能夠解決一些字符串中才特有的問題,比如我想把字符串格式的日期格式如"yyyy-MM-dd HH:mm:ss"轉(zhuǎn)換成"yyyy/MM/dd HH:mm:ss"的格式,我們可以借助于 Gsub Processor 來解決,而 Gsub Processor 也正是利用正則來完成這一任務(wù)的.
如:
POST _ingest/pipeline/_simulate {"pipeline": {"description": "gsub processor","processors" : [{"gsub": {"field": "message","pattern": "-","replacement": "/"}}]}, "docs": [{"_index": "index","_type": "type","_id": "id","_source": {"message": "2017-11-27 00:00:00"}}] }返回結(jié)果為:
{"docs": [{"doc": {"_index": "index","_type": "type","_id": "id","_source": {"message": "2017/11/27 00:00:00"},"_ingest": {"timestamp": "2017-11-27T09:15:05.502Z"}}}] }就將 message 字段中的日期格式修改成我們想要的格式了.
更多關(guān)于 Gsub Processor 的介紹,請參考 Gsub Processor
Join Processor
Join Processor 能夠?qū)⒃疽粋€數(shù)組類型的字段值,分解成以指定分隔符分割的一個字符串.
如下例子:
POST _ingest/pipeline/_simulate {"pipeline": {"description": "join processor","processors": [{"join": {"field": "message","separator": "、、、、"}}]},"docs": [{"_index": "index","_type": "type","_id": "id","_source": {"message": ["hello","world","felayman"]}}] }結(jié)果為:
{"docs": [{"doc": {"_index": "index","_type": "type","_id": "id","_source": {"message": "hello、、、、world、、、、felayman"},"_ingest": {"timestamp": "2017-11-27T09:25:41.260Z"}}}] }可以看到,原文檔中數(shù)組格式的的 message 字段,經(jīng)過 Join Processor 處理后,變成了字符串類型的字段"hello、、、、world、、、、felayman".
更多關(guān)于 Join Processor 的內(nèi)容,請參考:Join Processor
JSON Processor
JSON Processor 也是用來處理字符串類型的字段,可以將那些符合 JSON 格式(或被 JSON 串化)的文本,在經(jīng)過 JSON Processor 加工之后,解析成對應(yīng)的 JSON 格式,如下例子:
POST _ingest/pipeline/_simulate {"pipeline": {"description": "join processor","processors": [{"json": {"field": "message"}}]},"docs": [{"_index": "index","_type": "type","_id": "id","_source": {"message":"{\"foo\": 2000}"}}] }結(jié)果為:
{"docs": [{"doc": {"_index": "index","_type": "type","_id": "id","_source": {"message": {"foo": 2000}},"_ingest": {"timestamp": "2017-11-27T09:30:28.546Z"}}}] }可以看到,原本是字符串格式的字段 message,在處理之后變成了一個標準的 JSON 格式.
更多關(guān)于 JSON Processor 的內(nèi)容,請參考:JSON Processor
KV Processor
KV Processor 用來 K,V 字符串格式的處理器,比如 K=V, K:V,K->V 等格式的解析.
如下例:
POST _ingest/pipeline/_simulate {"pipeline": {"description": "kv processor","processors": [{"kv": {"field": "message","field_split":" ","value_split": ":"}}]},"docs": [{"_index": "index","_type": "type","_id": "id","_source": {"message":"ip:127.0.0.1"}}] }其中字段 message 的原始值為"“ip:127.0.0.1"”,我們想將該格式的內(nèi)容新增一個獨立的字段如 ip:127.0.0.1 這種格式.
結(jié)果為:
{"docs": [{"doc": {"_index": "index","_type": "type","_id": "id","_source": {"message": "ip:107.0.0.1","ip": "107.0.0.1"},"_ingest": {"timestamp": "2017-11-27T09:46:48.715Z"}}}] }更多關(guān)于 KV Processor 的內(nèi)容,請參考:KV Processor
Lowercase Processor
Lowercase Processor 也是一個專用于字符串類型的字段處理器,顧名思義,是將字符串都轉(zhuǎn)換成小寫格式.
如下:
POST _ingest/pipeline/_simulate {"pipeline": {"description": "lowercase processor","processors": [{"lowercase": {"field": "message"}}]},"docs": [{"_index": "index","_type": "type","_id": "id","_source": {"message":"HELLO,WORLD,FELAYMAN."}}] }結(jié)果為:
{"docs": [{"doc": {"_index": "index","_type": "type","_id": "id","_source": {"message": "hello,world,felayman."},"_ingest": {"timestamp": "2017-11-27T09:49:58.564Z"}}}] }可以看到原文檔中的 message 字段的值,在使用 Lowercase Processor 之后,都變成大寫的了.
更多關(guān)于 Lowercase Processor 的內(nèi)容,請參考:Lowercase Processor
Remove Processor
Remove Processor 是用來處理在寫入文檔之前,刪除原文檔中的某些字段值的.
如下:
POST _ingest/pipeline/_simulate {"pipeline": {"description": "remove processor","processors": [{"remove": {"field": "extra_field"}}]},"docs": [{"_index": "index","_type": "type","_id": "id","_source": {"extra_field":"extra_field","message":" hello, felayman."}}] }在經(jīng)過 Remove Processor 處理后,extra_field 字段將不存在了.
結(jié)果為下:
{"docs": [{"doc": {"_index": "index","_type": "type","_id": "id","_source": {"message": " hello, felayman."},"_ingest": {"timestamp": "2017-11-27T09:58:46.038Z"}}}] }Rename Processor
Rename Processor 是用來處理在文檔寫入 Elasticsearch 之前修改某個文檔的字段的名稱
如下:
POST _ingest/pipeline/_simulate {"pipeline": {"description": "remove processor","processors": [{"rename": {"field": "old_name","target_field": "new_name"}}]},"docs": [{"_index": "index","_type": "type","_id": "id","_source": {"old_name":"old_name"}}] }會發(fā)現(xiàn)原文檔的字段 old_name 被重新命名為 new_name 字段
結(jié)果為:
{"docs": [{"doc": {"_index": "index","_type": "type","_id": "id","_source": {"new_name": "old_name"},"_ingest": {"timestamp": "2017-11-27T10:00:48.694Z"}}}] }Script Processor
Script Processor 算的上是最強大的處理了,因為能充分利用 Elasticsearch 提供的腳本能力.
這里也不會詳細介紹 Elasticsearch 中的腳本如何使用,有關(guān)信息,請參考:Script
我么看下在 Script Processor 中使用腳本的例子:
POST _ingest/pipeline/_simulate {"pipeline": {"description": "script processor","processors": [{"script": {"lang": "painless","inline": "ctx.viewCount = (ctx.viewCount) *10"}}]},"docs": [{"_index": "index","_type": "type","_id": "id","_source": {"viewCount": 100}}] }這里我們通過腳本讓原文檔中的 viewCount 字段的值擴大十倍,結(jié)果為:
{"docs": [{"doc": {"_index": "index","_type": "type","_id": "id","_source": {"viewCount": 1000},"_ingest": {"timestamp": "2017-11-27T10:07:56.112Z"}}}] }Set Processor
Set Processor 作用于兩種不同情況:
例子如下:
POST _ingest/pipeline/_simulate {"pipeline": {"description": "set processor","processors": [{"set": {"field": "category","value": "家居"}}]},"docs": [{"_index": "index","_type": "type","_id": "id","_source": {"id":"0001"}}] }這里我們?yōu)槊總€使用 Set Processor 的文檔新增一個分類"家居"
結(jié)果為:
{"docs": [{"doc": {"_index": "index","_type": "type","_id": "id","_source": {"id": "0001","category": "家居"},"_ingest": {"timestamp": "2017-11-27T10:12:05.306Z"}}}] }Split Processor
Split Processor 用于將一個以指定分隔分開的字符串轉(zhuǎn)換成一個數(shù)組類型的字段
如下:
POST _ingest/pipeline/_simulate {"pipeline": {"description": "Split processor","processors": [{"split": {"field": "message","separator": "-"}}]},"docs": [{"_index": "index","_type": "type","_id": "id","_source": {"message":"hello-world"}}] }其中原文檔的 message 字段值將會有"hello-world"轉(zhuǎn)換為[“hello”,“world”]
結(jié)果為:
{"docs": [{"doc": {"_index": "index","_type": "type","_id": "id","_source": {"message": ["hell","world"]},"_ingest": {"timestamp": "2017-11-27T10:13:35.982Z"}}}] }Sort Processor
Sort Processor 用于處理數(shù)組類型的字段,可以將存儲在原文檔中某個數(shù)組類型的字段中的元素按照升序或降序來對原元素進行排序
如下:
POST _ingest/pipeline/_simulate {"pipeline": {"description": "sort processor","processors": [{"sort": {"field": "category","order": "asc"}}]},"docs": [{"_index": "index","_type": "type","_id": "id","_source": {"category":[2,3,4,1]}}] }我們使用升序來修改原文檔 category 字段的值的存儲排序
結(jié)果為:
{"docs": [{"doc": {"_index": "index","_type": "type","_id": "id","_source": {"category": [1,2,3,4]},"_ingest": {"timestamp": "2017-11-27T10:16:16.409Z"}}}] }Trim Processor
哎,Elasticsearch 開發(fā)者真的沒誰了,基本上能把 String 類中的方法都拿過來搬一套過來使用.
Trim Processor 是專門用于處理字符串兩端的空格問題,如下
POST _ingest/pipeline/_simulate {"pipeline": {"description": "trim processor","processors": [{"trim": {"field": "message"}}]},"docs": [{"_index": "index","_type": "type","_id": "id","_source": {"message":" hello, felayman."}}] }請注意,是去除字符串兩端的空格.
結(jié)果為:
{"docs": [{"doc": {"_index": "index","_type": "type","_id": "id","_source": {"message": "hello, felayman."},"_ingest": {"timestamp": "2017-11-27T09:56:17.946Z"}}}] }Uppercase Processor
該處理器類似于 Lowercase Processor,將字符串文本統(tǒng)一轉(zhuǎn)換成大寫.
如下:
POST _ingest/pipeline/_simulate {"pipeline": {"description": "uppercase processor","processors": [{"uppercase": {"field": "message"}}]},"docs": [{"_index": "index","_type": "type","_id": "id","_source": {"message":"hello,world,felayman."}}] }結(jié)果為:
{"docs": [{"doc": {"_index": "index","_type": "type","_id": "id","_source": {"message": "HELLO,WORLD,FELAYMAN."},"_ingest": {"timestamp": "2017-11-27T09:53:33.672Z"}}}] }更多關(guān)于 Uppercase Processor 的內(nèi)容,請參考:Uppercase Processor
Dot Expander Processor
自定義 Processor
如果發(fā)現(xiàn)官方提供的 Processor 不滿足我們的加工邏輯怎么辦?不用擔心,官方提供了很好的插件機制來幫助我們對其進行擴展,想實現(xiàn)一個自定義的 Processor 需要兩個過程:
- 實現(xiàn) IngestPlugin 接口,并實現(xiàn) IngestPlugin 中的 getProcessors 方法
- 實現(xiàn)自定義的 Processor
下面我們給一個具體的例子,我們將任何傳入的一個字段的值都變成大寫(最簡化模型,不指定字段,使用_ingest/pipeline/_simulate api 模擬該操作)
新建一個項目,名稱為 elasticsearch-help,新建 FirstUpperPlugin 類,如下:
package org.elasticsearch.help; import org.elasticsearch.help.processor.FirstUpperProcessor; import org.elasticsearch.ingest.Processor; import org.elasticsearch.plugins.IngestPlugin; import org.elasticsearch.plugins.Plugin; import java.util.Collections; import java.util.Map;/*** @auhthor lei.fang@shijue.me* @since . 2017-11-26*/ public class FirstUpperPlugin extends Plugin implements IngestPlugin{@Overridepublic Map<String, Processor.Factory> getProcessors(Processor.Parameters parameters) {return Collections.singletonMap(FirstUpperProcessor.TYPE,new FirstUpperProcessor.Factory());} }其中我們實現(xiàn)了 getProcessors 方法,該方法主要用來提供我們自定義的額 Processor,其中 FirstUpperProcessor 源碼如下:
package org.elasticsearch.help.processor; import org.elasticsearch.ingest.AbstractProcessor; import org.elasticsearch.ingest.ConfigurationUtils; import org.elasticsearch.ingest.IngestDocument; import org.elasticsearch.ingest.Processor; import java.util.Map;/*** @auhthor lei.fang@shijue.me* @since . 2017-11-26*/ public class FirstUpperProcessor extends AbstractProcessor {public static final String TYPE = "firstUpper";private final String field;public FirstUpperProcessor(String tag,String field) {super(tag);this.field = field;}@Overridepublic void execute(IngestDocument ingestDocument) throws Exception {ingestDocument.setFieldValue(field,field.toUpperCase());}@Overridepublic String getType() {return TYPE;}public static final class Factory implements Processor.Factory {public FirstUpperProcessor create(Map<String, Processor.Factory> registry, String processorTag,Map<String, Object> config) throws Exception {String field = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "field");return new FirstUpperProcessor(processorTag, field);}}}可以看到,其中核心的方法為 execute,主要是將傳入的字段的值轉(zhuǎn)換成大寫,這里只是調(diào)用了簡單的 toUpperCase()方法.
我們使用_ingest/pipeline/_simulate api 來進行測試:
curl -XGET 'http://localhost:9200/_ingest/pipeline/_simulate?pretty ' -d '{"pipeline": {"description": "字符串首字母轉(zhuǎn)換成大寫","processors": [{"firstUpper":{"field":"message"}}]},"docs": [{"_index": "index","_type": "type","_id": "id","_source": {"message":"hello,world."}}] }'結(jié)果為:
{"docs": [{"doc": {"_index": "index","_type": "type","_id": "id","_source": {"message":"Hello,world."},"_ingest": {"timestamp": "2017-11-27T10:16:16.409Z"}}}] }可以看到,自定義的 Ingest node 插件成功了.
完結(jié).
總結(jié)
以上是生活随笔為你收集整理的Elasticsearch Pipeline 详解的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 关于在头文件中定义变量
- 下一篇: equal_range