08.update_by_query操作
文章目錄
- 1. 通過查詢 API 更新
- 2. URL 參數
- 3. 響應
- 4. 配合Task API使用
- 1. 配合取消任務API使用
- 5. 切片并行
- 2. 自動切片
- 3. 挑選slice的數量
- 6. mapping中新增一個property
1. 通過查詢 API 更新
_update_by_query 最簡單的用法只是基于索引來執行每個文檔的更新,而不更改源。這是非常有用快捷的,在完成一些 mapping 的變化的時候使用。這里是 API:
POST twitter/_update_by_query?conflicts=proceed返回值類似于這樣:
{"took" : 147,"timed_out": false,"updated": 120,"deleted": 0,"batches": 1,"version_conflicts": 0,"noops": 0,"retries": {"bulk": 0,"search": 0},"throttled_millis": 0,"requests_per_second": -1.0,"throttled_until_millis": 0,"total": 120,"failures" : [ ] }_update_by_query在開始處理時時獲取索引的快照,并使用內部版本控制update它所查找到的內容。這意味著如果文檔在query和處理update之間發生變化,會報沖突錯誤。當版本匹配時文檔被update。
同樣的只支持internal version,所以0是非法的,internal version 從1開始遞增。
所有的更新和查詢失敗導致 _update_by_query中止,并在返回錯誤的響應。已經執行的更新將會保留。換句話說,該方法不支持回滾,僅中止。雖然第一次失敗將導致中止,由失敗的請求返回所有故障將在 failures元素體現。因此,有可能存在一些失敗的實體。
如果你想簡單地統計版本沖突不想導致 _update_by_query 中止你可以在 URL 中設置 conflicts=proceed或在請求中設置 “conflicts”: “proceed” 。第一個例子這樣做因為它只是試圖完成一個當前的 mapping 變化,這里發生的版本沖突僅僅意味著沖突的文檔在 _update_by_query 開始的時間和文件嘗試更新時間之間被更新了。這并不影響。
再來看看剛才的請求,下面的請求會更新index的所有doc
POST twitter/tweet/_update_by_query?conflicts=proceed您也可以使用DSL限制 _update_by_query更新的文檔。
POST twitter/_update_by_query?conflicts=proceed {"query": { (1)"term": {"user": "kimchy"}} }(1)查詢必須作為一個值傳遞 query鍵,就像在 Search API 中同樣的方式。還可以使用的 q 參數。、
到目前為止,我們只是一直在更新文檔,但是沒有改變source的字段值。_update_by_query支持腳本對象更新文檔。這將在所有 kimchy tweet 中增加 likes :
POST twitter/_update_by_query {"script": {"inline": "ctx._source.likes++","lang": "painless"},"query": {"term": {"user": "kimchy"}} }如在 Update API 中可以設置 ctx.op來改變所執行的操作:
noop: 設置 ctx.op = “noop” 。如果你的腳本并沒有對原來的doc做任何更改。這將導致 _update_by_query 忽略該doc。這將在響應的 noop 中被展示。
delete: 設置ctx.op = “delete”,如果你的腳本如此設定,該會被被刪除。這將在響應的 deleted 中被展示。
也就是說使用update 和 update_by_query 的api是有能力刪除doc的
設置 ctx.op成別的值會報錯。設置任何其它領域中 ctx 也會報錯。
POST /website/_doc/1/_update {"script": {"source" : "ctx.op = ctx._source.views == params.count ? 'delete' : 'none'","params" : {"count": 1}} }注意,當我們沒有指定 conflicts=proceed 時,我們希望版本沖突中的一個中止進程,以至于我們可以處理失敗。
此 API 不允許移動文件本身,只需修改其soure。這是有意而為之,我們并沒有獲得從其原始位置刪除該文件的權限。
通過多索引也可以完成所有的事情,就像搜索API:
POST twitter,blog/_update_by_query如果提供 routing ,則將被復制到滾動查詢,限制到分片的進程來匹配 routing 值:
POST twitter/_update_by_query?routing=1在默認情況下 _update_by_query使用 1000 批次的回滾。可以更改與批量大小的 scroll_size URL參數:
POST twitter/_update_by_query?scroll_size=100_update_by_query也可以使用 Ingest Node 的特點,通過指定一個 pipeline:
PUT _ingest/pipeline/set-foo {"description" : "sets foo","processors" : [ {"set" : {"field": "foo","value": "bar"}} ] } POST twitter/_update_by_query?pipeline=set-foo2. URL 參數
除了標準的參數,如 pretty,通過查詢 API 還支持 refresh,wait_for_completion,wait_for_active_shards和 timeout。
當請求完成時,發送 refresh將更新索引中的所有分片。這與索引update API 的 refresh 參數不同,update api只會refresh收到請求的shard。
如果請求包含wait_for_completion=false,那么Elasticsearch將執行一些預檢檢查、啟動請求、然后返回一個任務,可以與Tasks API一起使用來取消或獲取任務的狀態。Elasticsearch還將以.tasks/task/${taskId}作為文檔創建此任務的記錄。這是你可以根據是否合適來保留或刪除它。當你完成它時,刪除它可以讓Elasticsearc
wait_for_active_shards控制在繼續請求之前必須有多少個分片必須處于活動狀態,詳見這里。timeout控制每個寫入請求等待不可用分片變成可用的時間。兩者都能正確地在Bulk API中工作。
requests_per_second可以設置為任何正數(1.4,6,1000等),來作為“delete-by-query”每秒請求數的節流閥數字,或者將其設置為-1以禁用限制。節流是在批量批次之間等待,以便它可以操縱滾動超時。等待時間是批次完成的時間與request_per_second * requests_in_the_batch的時間之間的差異。由于分批處理沒有被分解成多個批量請求,所以會導致Elasticsearch創建許多請求,然后等待一段時間再開始下一組。這是“突發”而不是“平滑”。默認值為-1。在集群本省的任務比較重的情況下建議開啟限流,以減少對集群資源的使用。
throttle的計算方式是,每秒處理requests_per_second 個request,假如我們設置requests_per_second=500,寫1000個需要0.5s,則等待時間是
target_time = 1000 / 500 per second = 2 seconds wait_time = target_time - write_time = 2 seconds - .5 seconds = 1.5 seconds默認沒有限流
3. 響應
Json 響應如下:
{"took" : 639,"updated": 0,"batches": 1,"version_conflicts": 2,"retries": {"bulk": 0,"search": 0}"throttled_millis": 0,"failures" : [ ] }took: 從整個操作的開始到結束的毫秒數。
timed_out: delete by query的請求處理過程中是否超時
total: 成功處理的doc數量
deleted: 成功刪除的文檔數。
batches: 通過查詢刪除的scroll響應數量。
version_conflicts: 根據查詢刪除時,版本沖突的數量。
noops: 這個在這里沒有用,只是為了讓delete by query, update by query, and reindex APIs 返回相同結構的 responses
retries: search 操作和bulk操作的重試次數
throttled_millis: 因為requests_per_second而產生的睡眠時間
requests_per_second: 每秒處理的請求數
throttled_until_millis: 這個也是總是0,沒有用,為了保持結構一致性
failures: 失敗的情況,會終止操作的失敗
失敗的索引數組。如果這是非空的,那么請求因為這些失敗而中止。請參閱 conflicts 來如何防止版本沖突中止操作。
4. 配合Task API使用
Works With the Task API
你可以通過 Task API 獲取所有正在運行的更新請求狀態:
(1)這個對象的實際狀態。它就像響應中 json 用做 total 的重要補充。total是操作任務的總數重新索引預計執行。您可以添加 updated,created以及 deleted 等多個域。當它們之和等于 total字段值時該請求將完成。
使用任務id可以直接查找任務:
GET /_tasks/r1A2WoRbTwKZ516z6NEs5A:36619這個API的優點是它與wait_for_completion=false集成,以透明地返回已完成任務的狀態。如果任務完成并且wait_for_completion=false被設置,那么它將返回results或error字段。此功能的成本是wait_for_completion=false在.tasks索引創建taskId的文檔,需要你自己刪除該doc。
1. 配合取消任務API使用
所有根據查詢刪除都能使用Task Cancel API取消:
POST _tasks/task_id:1/_cancel可以使用上面的任務API找到task_id。 取消應盡快發生,但可能需要幾秒鐘。上面的任務狀態API將繼續列出任務,直到它被喚醒取消自身。
重置節流閥
request_per_second的值可以在通過查詢刪除時使用_rethrottle API更改:
可以使用上面的任務API找到task_id。
就像在_delete_by_query API中設置它一樣,request_per_second可以是-1來禁用限制,或者任何十進制數字,如1.7或12,以節制到該級別。加速查詢的會立即生效,但是在完成當前批處理之后,減慢查詢的才會生效。這樣可以防止滾動超時。
5. 切片并行
Manually slicing
通過請求的更新支持分片回滾,允許更加簡單的手動多線程操作:
你可以通過如下來驗證任務:
GET _refresh POST twitter/_search?size=0&q=extra:test&filter_path=hits.total 結果 total 如下:{"hits": {"total": {"value": 120,"relation": "eq"}} }2. 自動切片
POST twitter/_update_by_query?refresh&slices=5 {"script": {"source": "ctx._source['extra'] = 'test'"} }上面的會自動分成5個任務來執行
您可以通過以下方式驗證:
POST twitter/_search?size=0&q=extra:test&filter_path=hits.total其結果一個合理的total像這樣:
{"hits": {"total": {"value": 120,"relation": "eq"}} }設置slices=auto將使Elasticsearch選擇要使用的切片數。此設置將每個分片使用一個slice,上限為一定限制。如果有多個源index,它將根據分片數量最少的索引選擇slice。
在_delete_by_query使用slice只會自動執行上一節中使用的手動過程,從而創建子請求,這意味著它具有一些怪癖:
3. 挑選slice的數量
如果使用auto模式,es默認會選擇合適的slice數量
如果是主觀的設置slice的數量或者是手動執行slice,下面有一些建議。
不要使用大的數字,500就能造成相當大的CPU抖動。
在源索引中使用完全相同的分片是從查詢性能的角度來看效率最高的。
索引性能應在可用資源之間以slices數量線性擴展。
索引或查詢性能是否支配該流程取決于許多因素,如正在重建索引的文檔和進行reindexing的集群。
6. mapping中新增一個property
如果你禁用了動態mapping,那么新的字段只會存儲在source當中,這個時候你可以添加一個mapping字段,然后再更新索引
PUT test {"mappings": {"dynamic": false, "properties": {"text": {"type": "text"}}} }POST test/_doc?refresh {"text": "words words","flag": "bar" } POST test/_doc?refresh {"text": "words words","flag": "foo" } PUT test/_mapping {"properties": {"text": {"type": "text"},"flag": {"type": "text", "analyzer": "keyword"}} }下面的查詢啥都沒有
POST test/_search?filter_path=hits.total {"query": {"match": {"flag": "foo"}} }{"hits" : {"total": {"value": 0,"relation": "eq"}} } POST test/_update_by_query?refresh&conflicts=proceed POST test/_search?filter_path=hits.total {"query": {"match": {"flag": "foo"}} }返回{"hits" : {"total": {"value": 1,"relation": "eq"}} }總結
以上是生活随笔為你收集整理的08.update_by_query操作的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 06.delete_by_query操作
- 下一篇: 09.multi-get api操作