kafka 重新分配节点_Kafka扩容节点和分区迁移
背景
最近工作中碰到Kafka 節點的網卡成為了性能瓶頸,為了提高整個消息隊列的輸出吞吐量,需要將數據量大的Topic 遷移到新的Kafka節點上。
操作步驟
1. 新建Kafka 節點
通過CDH 管理界面在新機器上安裝Kafka 服務,并得到相應的Kafka broker id。(假設為140, 141)
2. 創建要遷移的Topic 列表
查看所有的Topic
$ cd /opt/cloudera/parcels/KAFKA/bin
$ ./kafka-topics --describe --zookeeper 10.1.1.50:2181/kafka
如果要刪除某些不用的Topic,可運行命令
$ ./kafka-run-class kafka.admin.TopicCommand --delete --topic test_p1_r1 --zookeeper 10.1.1.50:2181/kafka
新建文件topics-to-move.json,包含要遷移到Topic 列表。這里只遷移了一個Topic,也可以是多個Topic。
{
"topics": [{"topic": "sdk_counters"}],
"version": 1
}
3. 生成Topic 分區分配表
使用kafka-reassign-partitions 工具生成分區分配表,其中需要指定topics-to-move.json 文件和遷移目標節點的broker id
$ ./kafka-reassign-partitions --zookeeper 10.1.1.50:2181/kafka --topics-to-move-json-file ~/kafka/topics-to-move.json --broker-list "140,141" --generate
將生成以下結果
Current partition replica assignment
{"version":1,"partitions":[{"topic":"sdk_counters","partition":1,"replicas":[61,62]},{"topic":"sdk_counters","partition":0,"replicas":[62,61]}]}
Proposed partition reassignment configuration
{"version":1,"partitions":[{"topic":"sdk_counters","partition":1,"replicas":[140,141]},{"topic":"sdk_counters","partition":0,"replicas":[141,140]}]}
將Current partition replica assignment 的內容保存到rollback-cluster-reassignment.json,用于回滾操作。將Proposed partition reassignment configuration 的內容保存到expand-cluster-reassignment.json,用于執行遷移操作。
在這里也可以手工編輯expand-cluster-reassignment.json 文件更改replica 和partition 配置。也可以在遷移之前更改Topic 的分區數 (6)。
$ ./kafka-topics --zookeeper 10.1.1.50:2181/kafka --alter --topic sdk_counters --partitions 6
4. 執行遷移操作
$ ./kafka-reassign-partitions --zookeeper 10.1.1.50:2181/kafka --reassignment-json-file ~/kafka/expand-cluster-reassignment.json --execute
遷移操作會將指定Topic 的數據文件移動到新的節點目錄下,這個過程可能需要等待很長時間,視Topic 的數據量而定。可以運行以下命令查看執行狀態。
$ ./kafka-reassign-partitions --zookeeper 10.1.1.50:2181/kafka --reassignment-json-file ~/kafka/expand-cluster-reassignment.json --verify
狀態有兩種,in progress 表示正在遷移,completed successlly 表示已經成功完成遷移。在此過程中,可以在各個Kafka 的節點上使用iftop 工具實時監控網絡帶寬??梢杂^察到遷移的source 節點使用了大量的輸出帶寬,遷移的target 節點使用了大量的輸入帶寬。由于在遷移過程中,會占用大量的網卡帶寬進行數據傳輸,可能會影響到其他Topic 和應用程序的帶寬使用。
遷移完成后,原先的節點下將不存在該Topic 的數據文件。
優化
減少遷移的數據量
如果要遷移的Topic 有大量數據(Topic 默認保留1天的數據),可以在遷移之前臨時動態地調整retention.ms 來減少數據量,Kafka 會主動purge 掉1個小時之前的數據。
$ ./kafka-topics --zookeeper 10.1.1.50:2181/kafka --alter --topic sdk_counters --config retention.ms=3600000
在遷移完成后,恢復原先設置
$ ./kafka-topics --zookeeper 10.1.1.50:2181/kafka --alter --topic sdk_counters --config retention.ms=86400000
在遷移過程中,不會影響應用程序寫Kafka,在遷移完成后需要查看應用程序是否運行正常。
在已有的Topic 上增加分區
如果使用kafka-topics 動態地增加partition 數目,則新增的partition 可能會出現在遷移之前的機器上。這時可以使用kafka-reassign-partitions 工具并手動更改分區分配表以保證所有的分區都在遷移后的機器上。注意要保持舊的分區中的節點分配和replica 和之前相同,否則會導致Kafka 對舊分區的重新遷移,增加了遷移時間,并且可能導致正在運行的程序因為分區失效而出錯。
重新指定partition leader
有時候由于節點down 了,partition 的leader 可能不是我們prefer 的,這時,可以通過kafka-preferred-replica-election 工具將replica 中的第一個節點作為該分區的leader。
手動編輯topicPartitionList.json 文件,指定要重新分配leader 的分區。
{"partitions":[{"topic":"sdk_counters","partition":5}]}
執行命令
$ ./kafka-preferred-replica-election --zookeeper 10.1.1.50:2181/kafka -path-to-json-file ~/kafka/topicPartitionList.json
中斷遷移任務
一旦啟動reassign 腳本,則無法停止遷移任務。如果需要強制停止,可以通過zookeeper 進行修改。
$ zookeeper-client -server 10.1.1.50:2181/kafka
[zk] delete /admin/reassign_partitions
創作挑戰賽新人創作獎勵來咯,堅持創作打卡瓜分現金大獎總結
以上是生活随笔為你收集整理的kafka 重新分配节点_Kafka扩容节点和分区迁移的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 打开python环境_windows下切
- 下一篇: warshall算法求传递闭包c++_【