02.pipeline常用processor
文章目錄
- 1. Set Processor: 指定字段存在時,修改指定字段的值,指定字段不存在時,新增該字段并設(shè)置該字段的值,可以修改_index的值哦
- 2. Append Processor: 在一個已經(jīng)存在的field上增加一些value
- 3. Drop Processor: 刪除doc的processor
- 4. Remove Processor: 刪除某些字段
- 5. Rename Processor: 修改某個field的name
- 6. Join Processor: 將某個field的數(shù)組內(nèi)容jion成一個字符串,和python中的字符串的join方法很類似
- 7. JSON Processor: 將符合json格式的字符裝換成json
- 8. KV Processor: 使用某個分隔符,將一個字段分割成k,v 格式
- 9. Split Processor: 用于將一個以指定分隔分開的字符串轉(zhuǎn)換成一個數(shù)組類型的字段
- 10. Lowercase Processor: 將某個字段的內(nèi)容都轉(zhuǎn)成小寫
- 11. Uppercase Processor: 該處理器類似于Lowercase Processor,將字符串文本統(tǒng)一轉(zhuǎn)換成大寫.
- 12. Convert Processor: 對字段的類型進行轉(zhuǎn)換設(shè)置
- 13. Date Index Name Processor: 把文檔按照日期分到按天或者月創(chuàng)建的索引當中去
- 14. Dot Expander Processor: 這個一般結(jié)合其他的processor使用,他使后面定義的processor能夠使用```.```的方式去訪問嵌套的field
- 15. Fail Processor: 該處理器比較簡單,就是當文檔通過該pipeline的時候,一旦出現(xiàn)異常,該pipeline指定的錯誤信息就會返回給請求者
- 16. Foreach Processor: 一個Foreach Processor是用來處理一些數(shù)組字段,數(shù)組內(nèi)的每個元素都會使用到一個相同的處理器,比如
- 17. Pipeline Processor: 執(zhí)行另一個pipeline
- 18. Script Processor: 使用es中的script來處理,直接是script的編程訪問模式,script能訪問哪些字段這里就能訪問那些字段
- 19. Sort Processor: 用于處理數(shù)組類型的字段,可以將存儲在原文檔中某個數(shù)組類型的字段中的元素按照升序或降序來對原元素進行排序
- 20. Trim Processor: 專門用于處理字符串兩端的空格問題
感覺ingest是es的一個著力點,因為現(xiàn)在ingest的processor越來越多了。
這里僅僅介紹部分自己認為常用的pipline
1. Set Processor: 指定字段存在時,修改指定字段的值,指定字段不存在時,新增該字段并設(shè)置該字段的值,可以修改_index的值哦
使用樣例,將一個field的value拷貝到另一個新的field上面
PUT _ingest/pipeline/set_os {"description": "sets the value of host.os.name from the field os","processors": [{"set": {"field": "host.os.name","value": "{{os}}"}}] }POST _ingest/pipeline/set_os/_simulate {"docs": [{"_source": {"os": "Ubuntu"}}] }field: 必須有, The field to insert, upsert, or update. Supports template snippets.
value: 必須有 The value to be set for the field. Supports template snippets.
override: 非必須,默認為true,If processor will update fields with pre-existing non-null-valued field. When set to false, such fields will not be touched.
if: Conditionally execute this processor.
on_failure: Handle failures for this processor. See Handling Failures in Pipelines.
ignore_failure: 默認為false, Ignore failures for this processor. See Handling Failures in Pipelines.
tag: An identifier for this processor. Useful for debugging and metrics.
2. Append Processor: 在一個已經(jīng)存在的field上增加一些value
field: 必須有, The field to insert, upsert, or update. Supports template snippets.
value: 必須有 The value to be set for the field. Supports template snippets.
override: 非必須,默認為true,If processor will update fields with pre-existing non-null-valued field. When set to false, such fields will not be touched.
if: Conditionally execute this processor.
on_failure: Handle failures for this processor. See Handling Failures in Pipelines.
ignore_failure: 默認為false, Ignore failures for this processor. See Handling Failures in Pipelines.
tag: An identifier for this processor. Useful for debugging and metrics.
使用樣例
PUT script_test/_mapping {"properties":{"name":{"type":"keyword"},"age":{"type":"integer"},"age_arr":{"type":"integer"}}}PUT script_test/_doc/2 {"name":"tengfei","age":[22,23],"age_arr":[12,15,13,98,102] }PUT script_test/_doc/3 {"name":"tengfei","age":22,"age_arr":[12,15,13,98,102] }PUT _ingest/pipeline/append_pipe {"description": "append to friend","processors": [{"append": {"field": "age","value": [23,78]}}] }PUT script_test/_doc/23?pipeline=append_pipe {"name":"append test""age":88}對應(yīng)放進去的doc為 {"_index" : "script_test","_type" : "_doc","_id" : "23","_score" : 1.0,"_source" : {"name" : "append test","age" : [23,78,88]} }相對于update_by_query中的script操作
POST script_test/_update_by_query {"query":{"match_all":{}},"script":{"lang":"painless","source":"ctx._source.age?.add(params.new_age)","params":{"from":"china","new_age":55}} }這個操作會報錯,因為,age字段有些不是數(shù)組,直接存儲的integer
"script": "ctx._source.age?.add(params.new_age)","lang": "painless","caused_by": {"type": "illegal_argument_exception","reason": "dynamic method [java.lang.Integer, add/1] not found"}但是這個操作換做在ingest pipeline當中則是正常可以執(zhí)行的。
3. Drop Processor: 刪除doc的processor
if: Conditionally execute this processor.
on_failure: Handle failures for this processor. See Handling Failures in Pipelines.
ignore_failure: 默認為false, Ignore failures for this processor. See Handling Failures in Pipelines.
tag: An identifier for this processor. Useful for debugging and metrics.
使用樣例
PUT _ingest/pipeline/drop_pipeline {"description": "drop doc when name is chen","processors": [{"drop": {"if": "ctx.name == 'chen'"}}] }PUT script_test/_doc/31?pipeline=drop_pipeline {"name":"chen","age":88 }返回 {"_index" : "script_test","_type" : "_doc","_id" : "31","_version" : -3,"result" : "noop", # 這里的意思就是跳過了,不處理"_shards" : {"total" : 0,"successful" : 0,"failed" : 0} }PUT script_test/_doc/32?pipeline=drop_pipeline {"name":"chenchuang","age":88 }返回 {"_index" : "script_test","_type" : "_doc","_id" : "32","_version" : 1,"result" : "created", # created 暗示已經(jīng)創(chuàng)建成功"_shards" : {"total" : 2,"successful" : 2,"failed" : 0},"_seq_no" : 21,"_primary_term" : 1 }4. Remove Processor: 刪除某些字段
field: 必須有, The field to insert, upsert, or update. Supports template snippets.
ignore_missing: 默認為false, If true and field does not exist or is null, the processor quietly exits without modifying the document
if: Conditionally execute this processor.
on_failure: Handle failures for this processor. See Handling Failures in Pipelines.
ignore_failure: 默認為false, Ignore failures for this processor. See Handling Failures in Pipelines.
tag: An identifier for this processor. Useful for debugging and metrics.
使用樣例
PUT _ingest/pipeline/remove_pipeline {"description": "remove some fields","processors": [{"remove": {"field": ["age01","age"]}}] }PUT script_test/_doc/33?pipeline=remove_pipeline {"name":"remove test","age":[123,45,67],"age01":32,"age_arr":[34,21] }GET script_test/_doc/33返回 {"_index" : "script_test","_type" : "_doc","_id" : "33","_version" : 1,"_seq_no" : 22,"_primary_term" : 1,"found" : true,"_source" : {"name" : "remove test","age_arr" : [34,21]} }5. Rename Processor: 修改某個field的name
field: 必須有, The field to insert, upsert, or update. Supports template snippets.
target_field: 必須要有,The new name of the field. Supports template snippets.
ignore_missing: 默認為false, If true and field does not exist or is null, the processor quietly exits without modifying the document
if: Conditionally execute this processor.
on_failure: Handle failures for this processor. See Handling Failures in Pipelines.
ignore_failure: 默認為false, Ignore failures for this processor. See Handling Failures in Pipelines.
tag: An identifier for this processor. Useful for debugging and metrics.
使用樣例
PUT _ingest/pipeline/rename_pipeline {"description": "rename fields","processors": [{"rename": {"field": "age","target_field": "life"}}] }PUT script_test/_doc/35?pipeline=rename_pipeline {"name":"rename test","age":108 }GET script_test/_doc/35返回 {"_index" : "script_test","_type" : "_doc","_id" : "35","_version" : 1,"_seq_no" : 23,"_primary_term" : 1,"found" : true,"_source" : {"name" : "rename test","life" : 108} }6. Join Processor: 將某個field的數(shù)組內(nèi)容jion成一個字符串,和python中的字符串的join方法很類似
field: 必須有, The field to insert, upsert, or update. Supports template snippets.
separator: 必須,The separator character
target_field: The field to assign the joined value to, by default field is updated in-place
if: Conditionally execute this processor.
on_failure: Handle failures for this processor. See Handling Failures in Pipelines.
ignore_failure: 默認為false, Ignore failures for this processor. See Handling Failures in Pipelines.
tag: An identifier for this processor. Useful for debugging and metrics.
使用樣例
PUT _ingest/pipeline/join_pipe {"description": "join some fields","processors": [{"join": {"field": "age_arr","separator": "*","target_field":"join_result"}}] }PUT script_test/_doc/36?pipeline=join_pipe {"name":"rename test","age":108,"age_arr":[12,17,123,987,9] }GET script_test/_doc/36返回 "_source" : {"name" : "rename test","join_result" : "12*17*123*987*9","age_arr" : [12,17,123,987,9],"age" : 108}7. JSON Processor: 將符合json格式的字符裝換成json
field: 必須有, The field to insert, upsert, or update. Supports template snippets.
target_field: The field to insert the converted structured object into
add_to_root: 默認為false,Flag that forces the serialized json to be injected into the top level of the document. target_field must not be set when this option is chosen.
if: Conditionally execute this processor.
on_failure: Handle failures for this processor. See Handling Failures in Pipelines.
ignore_failure: 默認為false, Ignore failures for this processor. See Handling Failures in Pipelines.
tag: An identifier for this processor. Useful for debugging and metrics.
使用樣例
PUT _ingest/pipeline/json_pipe {"description": "json pipeline","processors": [{"json": {"field": "child","target_field": "child_obj"}}] }PUT script_test/_doc/37?pipeline=json_pipe {"name":"rename test","age":108,"child":"{\"son\":\"datou\"}" }GET script_test/_doc/37返回 {"_index" : "script_test","_type" : "_doc","_id" : "37","_version" : 1,"_seq_no" : 26,"_primary_term" : 1,"found" : true,"_source" : {"name" : "rename test","child_obj" : {"son" : "datou"},"age" : 108,"child" : """{"son":"datou"}"""} }8. KV Processor: 使用某個分隔符,將一個字段分割成k,v 格式
這個看起來是挺復(fù)雜的,主要是像logstash一樣,把一行日志解析為多個filed,比如把ip=1.2.3.4 error=REFUSED解析為ip, error兩個field
使用樣例
PUT _ingest/pipeline/kv_pipe {"description": "kv pipeline","processors": [{"kv": {"field": "message","field_split": " ","value_split": "="}}] }9. Split Processor: 用于將一個以指定分隔分開的字符串轉(zhuǎn)換成一個數(shù)組類型的字段
field: 必須有, The field to insert, upsert, or update. Supports template snippets.
separator: 必須有,A regex which matches the separator, eg , or \s+
target_field: The field to assign the split value to, by default field is updated in-place
ignore_missing: 默認false,If true and field does not exist, the processor quietly exits without modifying the document
if: Conditionally execute this processor.
on_failure: Handle failures for this processor. See Handling Failures in Pipelines.
ignore_failure: 默認為false, Ignore failures for this processor. See Handling Failures in Pipelines.
tag: An identifier for this processor. Useful for debugging and metrics.
使用樣例
PUT _ingest/pipeline/split {"description": "split pipeline","processors": [{"split": {"field": "my_field","separator": "\\s+"}}] }10. Lowercase Processor: 將某個字段的內(nèi)容都轉(zhuǎn)成小寫
field: 必須有, The field to insert, upsert, or update. Supports template snippets.
target_field: The field to assign the converted value to, by default field is updated in-place
ignore_missing: If true and field does not exist or is null, the processor quietly exits without modifying the document
if: Conditionally execute this processor.
on_failure: Handle failures for this processor. See Handling Failures in Pipelines.
ignore_failure: 默認為false, Ignore failures for this processor. See Handling Failures in Pipelines.
tag: An identifier for this processor. Useful for debugging and metrics.
使用樣例
PUT _ingest/pipeline/lowercase_pipe {"description": "lowercase pipeline","processors": [{"lowercase": {"field": "name"}}] }11. Uppercase Processor: 該處理器類似于Lowercase Processor,將字符串文本統(tǒng)一轉(zhuǎn)換成大寫.
field: 必須有, The field to insert, upsert, or update. Supports template snippets.
target_field: The field to assign the converted value to, by default field is updated in-place
ignore_missing: If true and field does not exist or is null, the processor quietly exits without modifying the document
if: Conditionally execute this processor.
on_failure: Handle failures for this processor. See Handling Failures in Pipelines.
ignore_failure: 默認為false, Ignore failures for this processor. See Handling Failures in Pipelines.
tag: An identifier for this processor. Useful for debugging and metrics.
使用樣例
PUT _ingest/pipeline/uppercase_pipe {"description": "uppercase pipeline","processors": [{"uppercase": {"field": "name"}}] }12. Convert Processor: 對字段的類型進行轉(zhuǎn)換設(shè)置
使用樣例
PUT _ingest/pipeline/my-pipeline-id {"description": "converts the content of the id field to an integer","processors" : [{"convert" : {"field" : "id","type": "integer"}}] }13. Date Index Name Processor: 把文檔按照日期分到按天或者月創(chuàng)建的索引當中去
field: 必須有, The field to insert, upsert, or update. Supports template snippets.
value: 必須有 The value to be set for the field. Supports template snippets.
override: 非必須,默認為true,If processor will update fields with pre-existing non-null-valued field. When set to false, such fields will not be touched.
if: Conditionally execute this processor.
on_failure: Handle failures for this processor. See Handling Failures in Pipelines.
ignore_failure: 默認為false, Ignore failures for this processor. See Handling Failures in Pipelines.
tag: An identifier for this processor. Useful for debugging and metrics.
使用樣例
PUT _ingest/pipeline/monthlyindex {"description": "monthly date-time index naming","processors" : [{"date_index_name" : {"field" : "date1","index_name_prefix" : "myindex-","date_rounding" : "M"}}] }PUT /myindex/_doc/1?pipeline=monthlyindex {"date1" : "2016-04-25T12:02:01.789Z" }{"_index" : "myindex-2016-04-01","_type" : "_doc","_id" : "1","_version" : 1,"result" : "created","_shards" : {"total" : 2,"successful" : 1,"failed" : 0},"_seq_no" : 55,"_primary_term" : 1 }使用模擬方式
POST _ingest/pipeline/_simulate {"pipeline" :{"description": "monthly date-time index naming","processors" : [{"date_index_name" : {"field" : "date1","index_name_prefix" : "myindex-","date_rounding" : "M"}}]},"docs": [{"_source": {"date1": "2016-04-25T12:02:01.789Z"}}] }返回 {"docs" : [{"doc" : {"_index" : "<myindex-{2016-04-25||/M{yyyy-MM-dd|UTC}}>","_type" : "_doc","_id" : "_id","_source" : {"date1" : "2016-04-25T12:02:01.789Z"},"_ingest" : {"timestamp" : "2020-10-27T06:30:58.273Z"}}}] }這里的_index對應(yīng)的"<myindex-{2016-04-25||/M{yyyy-MM-dd|UTC}}>"表達式代表的實際上就是2016-04-01
14. Dot Expander Processor: 這個一般結(jié)合其他的processor使用,他使后面定義的processor能夠使用.的方式去訪問嵌套的field
使用樣例
PUT _ingest/pipeline/dot_pipeline {"description": "dot expand pipeline","processors": [{"dot_expander": {"field": "foo.bar"}}] }PUT script_test/_doc/38?pipeline=dot_pipeline {"foo.bar" : "value2","foo" : {"bar" : "value1"} }GET script_test/_doc/38返回 "_source" : {"foo" : {"bar" : ["value1","value2"]}}15. Fail Processor: 該處理器比較簡單,就是當文檔通過該pipeline的時候,一旦出現(xiàn)異常,該pipeline指定的錯誤信息就會返回給請求者
使用樣例
PUT _ingest/pipeline/fial_pipeline {"description": "fail pipeline","processors": [{"fail": {"if": "ctx.tags.contains('production') != true","message": "The production tag is not present, found tags: {{tags}}"}}] }16. Foreach Processor: 一個Foreach Processor是用來處理一些數(shù)組字段,數(shù)組內(nèi)的每個元素都會使用到一個相同的處理器,比如
使用樣例
PUT _ingest/pipeline/foreach_pipeline {"description": "foreach pipeline","processors": [{"foreach": {"field": "persons","processor": {"remove": {"field": "_ingest._value.id"}}}}] }PUT foreach_test/_doc/2?pipeline=foreach_pipeline {"persons" : [{"id" : "1","name" : "John Doe"},{"id" : "2","name" : "Jane Doe"}] }GET foreach_test/_search 返回"_source" : {"persons" : [{"name" : "John Doe"},{"name" : "Jane Doe"}]}17. Pipeline Processor: 執(zhí)行另一個pipeline
使用樣例
PUT _ingest/pipeline/pipelineA {"description" : "inner pipeline","processors" : [{"set" : {"field": "inner_pipeline_set","value": "inner"}}] }PUT _ingest/pipeline/pipelineB {"description" : "outer pipeline","processors" : [{"pipeline" : {"name": "pipelineA"}},{"set" : {"field": "outer_pipeline_set","value": "outer"}}] }PUT /myindex/_doc/1?pipeline=pipelineB {"field": "value" }對應(yīng)存儲后的doc是 {"field": "value","inner_pipeline_set": "inner","outer_pipeline_set": "outer" }18. Script Processor: 使用es中的script來處理,直接是script的編程訪問模式,script能訪問哪些字段這里就能訪問那些字段
這個在script那一部分有詳解,感覺processor中都用到了script
使用樣例
PUT _ingest/pipeline/my_index {"description": "use index:my_index and type:_doc","processors": [{"script": {"source": """ctx._index = 'my_index';ctx._type = '_doc';"""}}] }PUT any_index/_doc/1?pipeline=my_index {"message": "text" }19. Sort Processor: 用于處理數(shù)組類型的字段,可以將存儲在原文檔中某個數(shù)組類型的字段中的元素按照升序或降序來對原元素進行排序
使用樣例
PUT _ingest/pipeline/sort_pipeline {"description": "sort pipeline","processors": [{"sort": {"field": "age_arr","order": "desc"}}] }PUT sort_test/_doc/1?pipeline=sort_pipeline {"name":"age to be sort","ages":[56,23,78,45,99],"age_arr":[56,23,78,45,99] }GET sort_test/_doc/1返回"_source" : {"name" : "age to be sort","ages" : [56,23,78,45,99],"age_arr" : [99,78,56,45,23]}20. Trim Processor: 專門用于處理字符串兩端的空格問題
使用樣例
PUT _ingest/pipeline/trim_pipe {"description": "trim field","processors": [{"trim": {"field": "foo"}}] }總結(jié)
以上是生活随笔為你收集整理的02.pipeline常用processor的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 09.snapshot and rest
- 下一篇: 01.elasticsearch met