11.reindex操作
文章目錄
- 1. Reindex API簡介
- 2. 從遠程集群重建索引
- 3. URL參數
- 4. 響應體
- 5. 配合Task API使用
- 1. 查找任務
- 2. 配合取消任務API使用
- 3. 重置節流閥
- 4. 修改字段名
- 6. 使用slice并行執行
- 1. 手動切片
- 2. 自動切片
- 3. 挑選slice的數量
- 8. index name 帶有日志的索引的重建
- 提取索引的隨機子集
1. Reindex API簡介
Reindex不會嘗試創建目標索引。它不會復制源索引的設置信息。您應該在運行_reindex操作之前創建目標索引,包括設置mapping,shard,replica_num等。
_reindex的最基本形式只是將文檔從一個索引復制到另一個索引。下面將文檔從twitter索引復制到new_twitter索引中:
POST _reindex {"source": {"index": "twitter"},"dest": {"index": "new_twitter"} }這將會返回類似以下的信息:
{"took" : 147,"timed_out": false,"created": 120,"updated": 0,"deleted": 0,"batches": 1,"version_conflicts": 0,"noops": 0,"retries": {"bulk": 0,"search": 0},"throttled_millis": 0,"requests_per_second": -1.0,"throttled_until_millis": 0,"total": 120,"failures" : [ ] }和_update_by_query一樣,_reindex獲取源索引的snapshot,但其目標索引必須是不同的索引,因此不會發生版本沖突。 dest元素可以像index API一樣進行配置基于樂觀鎖的并發控制。如果將version_type 空著(像上面一樣)或將version_type設置為internal,則Elasticsearch強制性的將文檔轉儲到目標中,覆蓋具有相同類型和ID的任何內容:
POST _reindex {"source": {"index": "twitter"},"dest": {"index": "new_twitter","version_type": "internal"} }將version_type設置為external將導致Elasticsearch從源doc中攜帶version,如果target index中沒有這個doc會創建該doc,反之target index中的對應的version小于當前verison才會更新。
POST _reindex {"source": {"index": "twitter"},"dest": {"index": "new_twitter","version_type": "external"} }設置op_type為create將導致_reindex僅在目標索引中創建缺少的文檔。所有存在的文檔將導致版本沖突:
POST _reindex {"source": {"index": "twitter"},"dest": {"index": "new_twitter","op_type": "create"} }默認情況下,version 沖突將中止_reindex進程,但您可以通過請求體設置"conflict":"proceed"來在沖突時進行計數:
POST _reindex {"conflicts": "proceed","source": {"index": "twitter"},"dest": {"index": "new_twitter","op_type": "create"} }您可以通過向source添加type或添加query來限制文檔。下面會將kimchy發布的tweet復制到new_twitter中:
POST _reindex {"source": {"index": "twitter","query": {"term": {"user": "kimchy"}}},"dest": {"index": "new_twitter"} }source中的index和type都可以是一個列表,允許您在一個請求中從大量的來源進行復制。下面將從twitter和blog索引中的tweet和post類型中復制文檔。它也包含twitter索引中post類型以及blog索引中的tweet類型。如果你想更具體,你將需要使用query。它也沒有努力處理ID沖突。目標索引將保持有效,但由于迭代順序定義不正確,預測哪個文檔可以保存下來是不容易的。
POST _reindex {"source": {"index": ["twitter", "blog"]},"dest": {"index": "all_together"} }還可以通過設置大小限制處理的文檔的數量。下面只會將單個文檔從twitter復制到new_twitter:
POST _reindex {"size": 1,"source": {"index": "twitter"},"dest": {"index": "new_twitter"} }如果你想要從twitter索引獲得一個特定的文檔集合你需要排序。排序使滾動效率更低,但在某些情況下它是值得的。如果可能,更喜歡更多的選擇性查詢size和sort。這將從twitter復制10000個文檔到new_twitter:
POST _reindex {"size": 10000,"source": {"index": "twitter","sort": { "date": "desc" }},"dest": {"index": "new_twitter"} }source部分支持搜索請求中支持的所有元素。例如,只使用原始文檔的一部分字段,使用源過濾如下所示:
POST _reindex {"source": {"index": "twitter","_source": ["user", "tweet"]},"dest": {"index": "new_twitter"} }像update_by_query一樣,_reindex支持修改文檔的腳本。與_update_by_query不同,腳本允許修改文檔的元數據。此示例修改了源文檔的版本:
POST _reindex {"source": {"index": "twitter"},"dest": {"index": "new_twitter","version_type": "external"},"script": {"inline": "if (ctx._source.foo == 'bar') {ctx._version++; ctx._source.remove('foo')}","lang": "painless"} }就像在_update_by_query中一樣,您可以設置ctx.op來更改在目標索引上執行的操作:
noop: 設置 ctx.op = “noop” 。如果你的腳本并沒有對原來的doc做任何更改。這將導致 reindex 忽略該doc。這將在響應的 noop 中被展示。
delete: 設置ctx.op = “delete”,如果你的腳本如此設定,target index中的該doc會被被刪除。這將在響應的 deleted 中被展示。
也就是說使用update 和 update_by_query 的api是有能力刪除doc的
設置 ctx.op成別的值會報錯。設置任何其它領域中 ctx 也會報錯。
下面的字段都是你可以在script中改變的,但是在你決定改變之前請仔細評估可能帶來的影響
_id _type _index _version _routing _parent將_version設置為null或從ctx中移除version相關操作就像在索引請求中不發送版本一樣。這將導致目標索引中的文檔被覆蓋,無論目標版本或_reindex請求中使用的版本類型如何。
默認情況下,如果_reindex看到具有routing的doc,則發送bulk request的時候routing將被保留,可以修改routing的值
在dest部分可以將routing字段設置為一下的值
例如,您可以使用以下請求將source索引的所有公司名稱為cat的文檔復制到路由設置為cat的dest索引。
默認情況下,_reindex批量滾動處理大小為1000.您可以在source元素中指定size字段來更改批量處理大小:
POST _reindex {"source": {"index": "source","size": 100},"dest": {"index": "dest","routing": "=cat"} }Reindex也可以使用[Ingest Node]功能來指定pipeline, 就像這樣:
POST _reindex {"source": {"index": "source"},"dest": {"index": "dest","pipeline": "some_ingest_pipeline"} }2. 從遠程集群重建索引
Reindex支持從遠程Elasticsearch群集重建索引:
這個非常適合es的跨版本升級數據遷移工作。
POST _reindex {"source": {"remote": {"host": "http://otherhost:9200","username": "user","password": "pass"},"index": "source","query": {"match": {"test": "data"}}},"dest": {"index": "dest"} }host參數必須包含scheme,host和port(例如 https:// otherhost:9200)。用戶名和密碼參數是可選的,當它們存在時,索引將使用基本認證連接到遠程Elasticsearch節點。使用基本認證時請務必使用https,密碼將以純文本格式發送。
必須在elasticsearch.yaml中使用reindex.remote.whitelist屬性將遠程主機明確列入白名單。它可以設置為允許的遠程host和port組合的逗號分隔列表(例如otherhost:9200,another:9200,127.0.10.:9200,localhost:)。白名單忽略了scheme ——僅使用主機和端口。
此功能應適用于您可能找到的任何版本的Elasticsearch的遠程群集。這應該允許您從任何版本的Elasticsearch升級到當前版本,通過從舊版本的集群重新建立索引。
要啟用發送到舊版本Elasticsearch的查詢,query參數將直接發送到遠程主機,無需驗證或修改。
從遠程服務器重新索引使用堆上緩沖區,該緩沖區默認最大大小為100mb。如果遠程索引包含非常大的文檔,則需要使用較小的批量。下面的示例將批次大小設置為10,這非常小。
POST _reindex {"source": {"remote": {"host": "http://otherhost:9200"},"index": "source","size": 10,"query": {"match": {"test": "data"}}},"dest": {"index": "dest"} }也可以使用socket_timeout字段在遠程連接上設置socket的讀取超時,并使用connect_timeout字段設置連接超時。兩者默認為三十秒。此示例將套接字讀取超時設置為一分鐘,并將連接超時設置為十秒:
POST _reindex {"source": {"remote": {"host": "http://otherhost:9200","socket_timeout": "1m","connect_timeout": "10s"},"index": "source","query": {"match": {"test": "data"}}},"dest": {"index": "dest"} }3. URL參數
除了標準參數像pretty之外,“Reindex API”還支持refresh、wait_for_completion、wait_for_active_shards、timeout以及requests_per_second。
當請求完成時,發送 refresh將更新索引中的所有分片。這與索引index API 的 refresh 參數不同,index api只會refresh收到請求的shard。
如果請求包含wait_for_completion=false,那么Elasticsearch將執行一些預檢檢查、啟動請求、然后返回一個任務,可以與Tasks API一起使用來取消或獲取任務的狀態。Elasticsearch還將以.tasks/task/${taskId}作為文檔創建此任務的記錄。這是你可以根據是否合適來保留或刪除它。當你完成它時,刪除它可以讓Elasticsearc
wait_for_active_shards控制在繼續請求之前必須有多少個分片必須處于活動狀態,詳見這里。timeout控制每個寫入請求等待不可用分片變成可用的時間。兩者都能正確地在Bulk API中工作。
requests_per_second可以設置為任何正數(1.4,6,1000等),來作為“delete-by-query”每秒請求數的節流閥數字,或者將其設置為-1以禁用限制。節流是在批量批次之間等待,以便它可以操縱滾動超時。等待時間是批次完成的時間與request_per_second * requests_in_the_batch的時間之間的差異。由于分批處理沒有被分解成多個批量請求,所以會導致Elasticsearch創建許多請求,然后等待一段時間再開始下一組。這是“突發”而不是“平滑”。默認值為-1。在集群本省的任務比較重的情況下建議開啟限流,以減少對集群資源的使用。
throttle的計算方式是,每秒處理requests_per_second 個request,假如我們設置requests_per_second=500,寫1000個需要0.5s,則等待時間是
target_time = 1000 / 500 per second = 2 seconds wait_time = target_time - write_time = 2 seconds - .5 seconds = 1.5 seconds默認沒有限流
4. 響應體
JSON響應類似如下:
{"took" : 639,"updated": 0,"created": 123,"batches": 1,"version_conflicts": 2,"retries": {"bulk": 0,"search": 0}"throttled_millis": 0,"failures" : [ ] }took: 從整個操作的開始到結束的毫秒數。
updated: 成功更新的文檔數。
upcreateddated: 成功創建的文檔數。
batches: 通過查詢更新的滾動響應數量。
version_conflicts: 根據查詢更新時,版本沖突的數量。
retries: 根據查詢更新的重試次數。bluk 是重試的批量操作的數量,search 是重試的搜索操作的數量。
throttled_millis: 請求休眠的毫秒數,與requests_per_second一致。
failures: 失敗的索引數組。如果這是非空的,那么請求因為這些失敗而中止。請參閱 conflicts 來如何防止版本沖突中止操作。
5. 配合Task API使用
您可以使用Task API獲取任何正在運行的重建索引請求的狀態:
1. 查找任務
GET _tasks?detailed=true&actions=*/update/byquery 響應會類似如下:{"nodes" : {"r1A2WoRbTwKZ516z6NEs5A" : {"name" : "r1A2WoR","transport_address" : "127.0.0.1:9300","host" : "127.0.0.1","ip" : "127.0.0.1:9300","attributes" : {"testattr" : "test","portsfile" : "true"},"tasks" : {"r1A2WoRbTwKZ516z6NEs5A:36619" : {"node" : "r1A2WoRbTwKZ516z6NEs5A","id" : 36619,"type" : "transport","action" : "indices:data/write/reindex","status" : { //①"total" : 6154,"updated" : 3500,"created" : 0,"deleted" : 0,"batches" : 4,"version_conflicts" : 0,"noops" : 0,"retries": {"bulk": 0,"search": 0},"throttled_millis": 0},"description" : ""}}}} }① 此對象包含實際狀態。它就像是響應json,重要的添加total字段。 total是重建索引希望執行的操作總數。您可以通過添加的updated、created和deleted的字段來估計進度。當它們的總和等于total字段時,請求將完成。
使用任務id可以直接查找任務:
GET /_tasks/taskId:1這個API的優點是它與wait_for_completion=false集成,以透明地返回已完成任務的狀態。如果任務完成并且wait_for_completion=false被設置,那么它將返回results或error字段。此功能的成本是wait_for_completion=false在.tasks/task/${taskId}創建的文檔,由你自己刪除該文件。
2. 配合取消任務API使用
所有重建索引都能使用Task Cancel API取消:
POST _tasks/task_id:1/_cancel可以使用上面的任務API找到task_id。
取消應盡快發生,但可能需要幾秒鐘。上面的任務狀態API將繼續列出任務,直到它被喚醒取消自身。
3. 重置節流閥
request_per_second的值可以在通過查詢刪除時使用_rethrottle API更改:
POST _update_by_query/task_id:1/_rethrottle?requests_per_second=-1可以使用上面的任務API找到task_id。
就像在_update_by_query API中設置它一樣,request_per_second可以是-1來禁用限制,或者任何十進制數字,如1.7或12,以節制到該級別。加速查詢的會立即生效,但是在完成當前批處理之后,減慢查詢的才會生效。這樣可以防止滾動超時。
4. 修改字段名
_reindex可用于使用重命名的字段構建索引的副本。假設您創建一個包含如下所示的文檔的索引:
POST test/_doc/1?refresh {"text": "words words","flag": "foo" }但是你不喜歡這個flag名稱,而是要用tag替換它。 _reindex可以為您創建其他索引:
POST _reindex {"source": {"index": "test"},"dest": {"index": "test2"},"script": {"inline": "ctx._source.tag = ctx._source.remove(\"flag\")"} }現在你可以get得到新的文件:
GET test2/_doc/1它看起來像:
{"found": true,"_id": "1","_index": "test2","_type": "_doc","_version": 1,"_seq_no": 44,"_primary_term": 1,"_source": {"text": "words words","tag": "foo"} }或者你可以通過tag進行任何你想要的搜索。
6. 使用slice并行執行
1. 手動切片
重建索引支持滾動切片,您可以相對輕松地手動并行化處理:
POST _reindex {"source": {"index": "twitter","slice": {"id": 0,"max": 2}},"dest": {"index": "new_twitter"} } POST _reindex {"source": {"index": "twitter","slice": {"id": 1,"max": 2}},"dest": {"index": "new_twitter"} }您可以通過以下方式驗證:
GET _refresh POST new_twitter/_search?size=0&filter_path=hits.total其結果一個合理的total像這樣:
{"hits": {"total" : {"value": 120,"relation": "eq"}} }2. 自動切片
你還可以讓重建索引使用切片的_uid來自動并行的滾動切片。
POST _reindex?slices=5&refresh {"source": {"index": "twitter"},"dest": {"index": "new_twitter"} }您可以通過以下方式驗證:
POST new_twitter/_search?size=0&filter_path=hits.total其結果一個合理的total像這樣:
{"hits": {"total" : {"value": 120,"relation": "eq"}} }將slices添加到_reindex中可以自動執行上述部分中使用的手動過程,創建子請求,這意味著它有一些怪癖:
3. 挑選slice的數量
如果使用auto模式,es默認會選擇合適的slice數量
如果是主觀的設置slice的數量或者是手動執行slice,下面有一些建議。
不要使用大的數字,500就能造成相當大的CPU抖動。
在源索引中使用完全相同的分片是從查詢性能的角度來看效率最高的。
索引性能應在可用資源之間以slices數量線性擴展。
索引或查詢性能是否支配該流程取決于許多因素,如正在重建索引的文檔和進行reindexing的集群。
8. index name 帶有日志的索引的重建
您可以使用_reindex與Painless組合來重新每日編制索引,以將新模板應用于現有文檔。 假設您有由以下文件組成的索引:
PUT metricbeat-2016.05.30/_doc/1?refresh {"system.cpu.idle.pct": 0.908} PUT metricbeat-2016.05.31/_doc/1?refresh {"system.cpu.idle.pct": 0.105}metricbeat-*索引的新模板已經加載到Elaticsearch中,但它僅適用于新創建的索引。Painless可用于重新索引現有文檔并應用新模板。
下面的腳本從索引名稱中提取日期,并創建一個附帶有-1的新索引。來自metricbeat-2016.05.31的所有數據將重新索引到metricbeat-2016.05.31-1。
POST _reindex {"source": {"index": "metricbeat-*"},"dest": {"index": "metricbeat"},"script": {"lang": "painless","inline": "ctx._index = 'metricbeat-' + (ctx._index.substring('metricbeat-'.length(), ctx._index.length())) + '-1'"} }來自上一個度量索引的所有文檔現在可以在*-1索引中找到。
GET metricbeat-2016.05.30-1/beat/1
GET metricbeat-2016.05.31-1/beat/1
以前的方法也可以與更改字段的名稱一起使用,以便將現有數據加載到新索引中,但如果需要,還可以重命名字段。
提取索引的隨機子集
Reindex可用于提取用于測試的索引的隨機子集:
POST _reindex {"size": 10,"source": {"index": "twitter","query": {"function_score" : {"query" : { "match_all": {} },"random_score" : {}}},"sort": "_score" //①},"dest": {"index": "random_twitter"} }① Reindex默認按_doc排序,所以random_score不會有任何效果,除非您將排序重寫為_score。
總結
以上是生活随笔為你收集整理的11.reindex操作的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 10.bulk操作
- 下一篇: 12.term_vectors查看