Kafka(六)Kafka基本客户端命令操作
轉(zhuǎn)載自:https://blog.51cto.com/littledevil/2147950
主題管理
創(chuàng)建主題
如果配置了auto.create.topics.enable=true(這也是默認(rèn)值)這樣當(dāng)生產(chǎn)者向一個(gè)沒有創(chuàng)建的主題發(fā)送消息就會(huì)自動(dòng)創(chuàng)建,其分區(qū)數(shù)量和副本數(shù)量也是有默認(rèn)配置來控制的。
# 我們這里創(chuàng)建一個(gè)3個(gè)分區(qū)每個(gè)分區(qū)有2個(gè)副本的主題 kafka-topics.sh --create --zookeeper 172.16.48.171:2181/kafka --replication-factor 2 --partitions 3 --topic KafkaTest| --create | 表示建立 |
| --zookeeper | 表示ZK地址,可以傳遞多個(gè),用逗號(hào)分隔 --zookeeper IP:PORT,IP:PORT,IP:PORT/kafka |
| --replication-factor | 表示副本數(shù)量,這里的數(shù)量是包含Leader副本和Follower副本,副本數(shù)量不能超過代理數(shù)量 |
| --partitions | 表示主題的分區(qū)數(shù)量,必須傳遞該參數(shù)。Kafka的生產(chǎn)者和消費(fèi)者采用多線程并行對主題的消息進(jìn)行處理,每個(gè)線程處理一個(gè)分區(qū),分區(qū)越多吞吐量就會(huì)越大,但是分區(qū)越多也意味著需要打開更多的文件句柄數(shù)量,這樣也會(huì)帶來一些開銷。 |
| --topic | 表示主題名稱 |
在Zookeeper中可以看到如下信息
刪除主題
刪除有兩種方式手動(dòng)和自動(dòng)
-
手動(dòng)方式需要?jiǎng)h除各個(gè)節(jié)點(diǎn)日志路徑下的該主題所有分區(qū),并且刪除zookeeper上/brokers/topics和/config/topics下的對應(yīng)主題節(jié)點(diǎn)
-
自動(dòng)刪除就是通過腳本來完成,同時(shí)需要配置服務(wù)器配置文件中的delete.topic.enable=true,默認(rèn)為false也就是說通過命令刪除主題只會(huì)刪除ZK中的節(jié)點(diǎn),日志文件不會(huì)刪除需要手動(dòng)清理,如果配置為true,則會(huì)自動(dòng)刪除日志文件。
下面的兩句話就是說該主題標(biāo)記為刪除/admin/delete_topics節(jié)點(diǎn)下。實(shí)際數(shù)據(jù)沒有影響因?yàn)樵搮?shù)沒有設(shè)置為true。
查看主題
# 列出所有主題 kafka-topics.sh --list --zookeeper 172.16.48.171:2181/kafka下面是從ZK中看到的所有主題
# 查看所有主題信息 kafka-topics.sh --describe --zookeeper 172.16.48.171:2181/kafka # 查看特定主題信息 kafka-topics.sh --describe --zookeeper 172.16.48.171:2181/kafka --topic BBBReplicas:是AR列表,表示副本分布在哪些代理上,且該列表第一個(gè)元素就是Leader副本所在代理
ISR:該列表是顯示已經(jīng)同步的副本集合,這個(gè)列表的副本都是存活的
# 通過--describe 和 --under-replicated-partitions 可以查看正在同步的主題或者同步可能發(fā)生異常, # 也就是ISR列表長度小于AR列表,如果一切正常則不會(huì)返回任何東西,也可以通過 --tipic 指定具體主題 kafka-topics.sh --describe --zookeeper 172.16.48.171:2181/kafka --under-replicated-partitions # 查看哪些主題建立時(shí)使用了單獨(dú)的配置 kafka-topics.sh --describe --zookeeper 172.16.48.171:2181/kafka --topics-with-overrides這里只有一個(gè)內(nèi)部主題__comsumer_offsets使用了非配置文件中的設(shè)置
?
配置管理
所謂配置就是參數(shù),比如修改主題的默認(rèn)參數(shù)。
主題級別的
# 查看配置 kafka-configs.sh --describe --zookeeper 172.16.48.171:2181/kafka --entity-type topics --entity-name BB這里顯示 Configs for topic 'BBB' are 表示它的配置有哪些,這里沒有表示沒有為該主題單獨(dú)設(shè)置配置,都是使用的默認(rèn)配置。
# 增加一個(gè)配置 kafka-configs.sh --zookeeper 172.16.48.171:2181/kafka --entity-type topics --entity-name BBB --alter --add-config flush.messages=2如果修改的話還是相同的命令,只是把值修改一下
# 刪除配置 kafka-configs.sh --zookeeper 172.16.48.171:2181/kafka --entity-type topics --entity-name BBB --alter --delete-config flush.messages客戶端級別
這個(gè)主要是設(shè)置流控
# 設(shè)置指定消費(fèi)者的流控 --entity-name 是客戶端在創(chuàng)建生產(chǎn)者或者消費(fèi)者時(shí)是指定的client.id名稱 kafka-configs.sh --zookeeper 172.16.48.171:2181/kafka --alter --add-config 'producer_byte_rate=1024,consumer_byte_rate=1024' --entity-type clients --entity-name COMSUMER_NAME下圖為ZK中對應(yīng)的信息
?
分區(qū)管理
分區(qū)平衡
Leader副本在集群中應(yīng)該是均衡分布,因?yàn)長eader副本對外提供讀寫服務(wù),盡可能不讓同一個(gè)主題的多個(gè)Leader副本在同一個(gè)代理上,但是隨著時(shí)間推移比如故障轉(zhuǎn)移等情況發(fā)送,Leader副本可能不均衡。有兩種方式設(shè)置自動(dòng)平衡,自動(dòng)和手動(dòng)。
自動(dòng)就是在配置文件中增加?auto.leader.rebalance.enable?=?true?如果該項(xiàng)為false,當(dāng)某個(gè)節(jié)點(diǎn)故障恢復(fù)并重新上線后,它原來的Leader副本也不會(huì)轉(zhuǎn)移回來,只是一個(gè)Follower副本。
手動(dòng)就是通過命令來執(zhí)行
kafka-preferred-replica-election.sh --zookeeper 172.16.48.171:2181/kafka分區(qū)遷移
當(dāng)下線一個(gè)節(jié)點(diǎn)需要將該節(jié)點(diǎn)上的分區(qū)副本遷移到其他可用節(jié)點(diǎn)上,Kafka并不會(huì)自動(dòng)進(jìn)行分區(qū)遷移,如果不遷移就會(huì)導(dǎo)致某些主題數(shù)據(jù)丟失和不可用的情況。當(dāng)增加新節(jié)點(diǎn)時(shí),只有新創(chuàng)建的主題才會(huì)分配到新節(jié)點(diǎn)上,之前的主題分區(qū)不會(huì)自動(dòng)分配到新節(jié)點(diǎn)上,因?yàn)槔系姆謪^(qū)在創(chuàng)建時(shí)AR列表中沒有這個(gè)新節(jié)點(diǎn)。
上面2個(gè)主題,每個(gè)主題3個(gè)分區(qū),每個(gè)分區(qū)3個(gè)副本,我們假設(shè)現(xiàn)在代理2要下線,所以我們要把代理2上的這兩個(gè)主題的分區(qū)數(shù)據(jù)遷移出來。
# 1. 在KAFKA目錄的config目錄中建立topics-to-move.json文件 {"topics":[{"topic":"AAA"},{"topic":"BBB"}],"version":1 }?
# 2. 生成分區(qū)分配方案,只是生成一個(gè)方案信息然后輸出 kafka-reassign-partitions.sh --zookeeper 172.16.48.171:2181/kafka --topics-to-move-json-file ./topics-to-move.json --broker-list "1,2" --generate這個(gè)命令的原理是從zookeeper中讀取主題元數(shù)據(jù)信息及制定的有效代理,根據(jù)分區(qū)副本分配算法重新計(jì)算指定主題的分區(qū)副本分配方案。把【Proposed partition reassignment configuration】下面的分區(qū)方案保存到一個(gè)JSON文件中,partitions-reassignment.json 文件名無所謂。
# 3. 執(zhí)行方案 kafka-reassign-partitions.sh --zookeeper 172.16.48.171:2181/kafka --reassignment-json-file ./partitions-reassignment.json --execute # 4. 查看進(jìn)度 kafka-reassign-partitions.sh --zookeeper 172.16.48.171:2181/kafka --reassignment-json-file ./partitions-reassignment.json --verify查看結(jié)果,這里已經(jīng)沒有代理0了。
集群擴(kuò)容
上面演示了節(jié)點(diǎn)下線的數(shù)據(jù)遷移,這里演示一下集群擴(kuò)容的數(shù)據(jù)遷移。我們還是用上面兩個(gè)主題,假設(shè)代理0又重新上線了。其實(shí)擴(kuò)容就是上面的反向操作
# 1. 建立JSON文件 # 該文件和之前的相同?
# 2. 生成方案并保存到一個(gè)JSON文件中 kafka-reassign-partitions.sh --zookeeper 172.16.48.171:2181/kafka --topics-to-move-json-file ./topics-to-move.json --broker-list "0,1,2" --generate # 3. 數(shù)據(jù)遷移,這里通過--throttle做一個(gè)限流操作,如果數(shù)據(jù)過大會(huì)把網(wǎng)絡(luò)堵塞。 kafka-reassign-partitions.sh --zookeeper 172.16.48.171:2181/kafka --reassignment-json-file ./partitions-reassignment.json --execute --throttle 1024查看進(jìn)度和結(jié)果
增加分區(qū)
通常在需要提供吞吐量的時(shí)候我們會(huì)增加分區(qū),然后如果代理數(shù)量不擴(kuò)大,同時(shí)生產(chǎn)者和消費(fèi)者線程不增大,你擴(kuò)展了分區(qū)也沒有用。
kafka-topics.sh --alter --zookeeper 172.16.48.171:2181/kafka --partitions 3 --topic KafkaTest03增加副本
集群規(guī)模擴(kuò)大并且想對所有主題或者指定主題提高可用性,那么可以增加原有主題的副本數(shù)量
上面是3個(gè)分區(qū),每個(gè)分區(qū)1個(gè)副本,我們現(xiàn)在把每個(gè)分區(qū)擴(kuò)展為3個(gè)副本
# 1. 創(chuàng)建JSON文件 replica-extends.json {"version": 1,"partitions": [{"topic": "KafkaTest04","partition": 0,"replicas": [0,1,2]},{"topic": "KafkaTest04","partition": 1,"replicas": [0,1,2]},{"topic": "KafkaTest04","partition": 2,"replicas": [0,1,2]}] } # 2. 執(zhí)行分區(qū)副本重新分配命令 kafka-reassign-partitions.sh --zookeeper 172.16.48.171:2181/kafka --reassignment-json-file ./replica-extends.json --execute查看狀態(tài)
查看結(jié)果
?
鏡像操作
Kafka有一個(gè)鏡像工具kafka-mirror-maker.sh,用于將一個(gè)集群數(shù)據(jù)同步到另外一個(gè)集群中,這個(gè)非常有用,比如機(jī)房搬遷就需要進(jìn)行數(shù)據(jù)同步。該工具的本質(zhì)就是創(chuàng)建一個(gè)消費(fèi)者,在源集群中需要遷移的主題消費(fèi)數(shù)據(jù),然后創(chuàng)建一個(gè)生產(chǎn)者,將消費(fèi)的數(shù)據(jù)寫入到目標(biāo)集群中。
首先創(chuàng)建消費(fèi)者配置文件mirror-consumer.properties(文件路徑和名稱是自定義的)
# 源kafka集群代理地址列表 bootstrap.servers=IP1:9092,IP2:9092,IP3:9092 # 消費(fèi)者組名 group.id=mirror其次創(chuàng)建生產(chǎn)者配置文件mirror-producer.properties(文件路徑和名稱是自定義的)
# 目標(biāo)kafka集群地址列表 bootstrap.servers=IP1:9092,IP2:9092,IP3:9092運(yùn)行鏡像命令
# 通過 --whitelist 指定需要鏡像的主題,通過 --blacklist 指定不需要鏡像的主題 kafka-mirror-maker.sh --consumer.config PATH/mirror-consumer.properties --producer.config PATH/mirror-producer.properties --whitelist TOPIC由于鏡像操作是啟動(dòng)一個(gè)生產(chǎn)者和消費(fèi)者,所以數(shù)據(jù)同步完成后這個(gè)生產(chǎn)者和消費(fèi)者并不會(huì)關(guān)閉,它會(huì)依然等待新數(shù)據(jù),所以同步完成以后你需要自己查看,確認(rèn)完成了則關(guān)閉生產(chǎn)者和消費(fèi)者。
總結(jié)
以上是生活随笔為你收集整理的Kafka(六)Kafka基本客户端命令操作的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Flink ProcessFuncti
- 下一篇: 提高C++程序运行效率的10个简单方法