Kafka跨集群迁移方案MirrorMaker原理、使用以及性能调优实践
?
序言
Kakfa MirrorMaker是Kafka 官方提供的跨數(shù)據(jù)中心的流數(shù)據(jù)同步方案。其實現(xiàn)原理,其實就是通過從Source Cluster消費消息然后將消息生產(chǎn)到Target Cluster,即普通的消息生產(chǎn)和消費。用戶只要通過簡單的consumer配置和producer配置,然后啟動Mirror,就可以實現(xiàn)準(zhǔn)實時的數(shù)據(jù)同步。
1. Kafka MirrorMaker基本特性
Kafka Mirror的基本特性有:
在Target Cluster沒有對應(yīng)的Topic的時候,Kafka MirrorMaker會自動為我們在Target Cluster上創(chuàng)建一個一模一樣(Topic Name、分區(qū)數(shù)量、副本數(shù)量)一模一樣的topic。如果Target Cluster存在相同的Topic則不進行創(chuàng)建,并且,MirrorMaker運行Source Cluster和Target Cluster的Topic的分區(qū)數(shù)量和副本數(shù)量不同。
同我們使用Kafka API創(chuàng)建KafkaConsumer一樣,Kafka MirrorMaker允許我們指定多個Topic。比如,TopicA|TopicB|TopicC。在這里,|其實是正則匹配符,MirrorMaker也兼容使用逗號進行分隔。
多線程支持。MirrorMaker會在每一個線程上創(chuàng)建一個Consumer對象,如果性能允許,建議多創(chuàng)建一些線程
多進程任意橫向擴展,前提是這些進程的consumerGroup相同。無論是多進程還是多線程,都是由Kafka ConsumerGroup的設(shè)計帶來的任意橫向擴展性,具體的分區(qū)分派,即具體的TopicPartition會分派給Group中的哪個Topic負(fù)責(zé),是Kafka自動完成的,Consumer無需關(guān)心。
我們使用Kafka MirrorMaker完成遠(yuǎn)程的AWS(Source Cluster)上的Kafka信息同步到公司的計算集群(Target Cluster)。由于我們的大數(shù)據(jù)集群只有一個統(tǒng)一的出口IP,外網(wǎng)訪問我們的內(nèi)網(wǎng)服務(wù)器必須通過nginx轉(zhuǎn)發(fā),因此為了降低復(fù)雜度,決定使用“拉”模式而不是“推”模式,即,Kafka MirrorMaker部署在我們內(nèi)網(wǎng)集群(Target Cluster),它負(fù)責(zé)從遠(yuǎn)程的Source Cluster(AWS)的Kafka 上拉取數(shù)據(jù),然后生產(chǎn)到本地的Kafka。
Kafka MirrorMaker的官方文檔一直沒有更新,因此新版Kafka為MirrorMaker增加的一些參數(shù)、特性等在文檔上往往找不到,需要看Kafka MirrorMaker的源碼。Kafka MirrorMaker的主類位于kafka.tools.MirrorMaker,尤其是一些參數(shù)的解析邏輯和主要的執(zhí)行流程,會比較有助于我們理解、調(diào)試和優(yōu)化Kafka MirrorMaker。
?
這是我啟動Kakfa MirrorMaker 的命令:
nohup ./bin/kafka-mirror-maker.sh --new.consumer --consumer.config config/mirror-consumer.properties --num.streams 40 --producer.config config/mirror-producer.properties --whitelist 'ABTestMsg|AppColdStartMsg|BackPayMsg|WebMsg|GoldOpenMsg|BoCaiMsg' &
mirror-consumer.properties配置文件如下:
#新版consumer擯棄了對zookeeper的依賴,使用bootstrap.servers告訴consumer kafka server的位置
bootstrap.servers=ip-188-33-33-31.eu-central-1.compute.internal:9092,ip-188-33-33-32.eu-central-1.compute.internal:9092,ip-188-33-33-33.eu-central-1.compute.internal:9092
#如果使用舊版Consumer,則使用zookeeper.connect
#zookeeper.connect=ip-188-33-33-31.eu-central-1.compute.internal:2181,ip-188-33-33-32.eu-central-1.compute.internal:2181,ip-188-33-33-33.eu-central-1.compute.internal:2181
1.compute.internal:2181
#change the default 40000 to 50000
request.timeout.ms=50000
#hange default heartbeat interval from 3000 to 15000
heartbeat.interval.ms=30000
#change default session timeout from 30000 to 40000
session.timeout.ms=40000
#consumer group id
group.id=africaBetMirrorGroupTest
partition.assignment.strategy=org.apache.kafka.clients.consumer.RoundRobinAssignor
#restrict the max poll records from 2147483647 to 200000
max.poll.records=20000
#set receive buffer from default 64kB to 512kb
receive.buffer.bytes=524288
#set max amount of data per partition to override default 1048576
max.partition.fetch.bytes=5248576
#consumer timeout
#consumer.timeout.ms=5000
mirror-producer.properties的配置文件如下:
bootstrap.servers=10.120.241.146:9092,10.120.241.82:9092,10.120.241.110:9092
# name of the partitioner class for partitioning events; default partition spreads data randomly
#partitioner.class=
# specifies whether the messages are sent asynchronously (async) or synchronously (sync)
producer.type=sync
# specify the compression codec for all data generated: none, gzip, snappy, lz4.
# the old config values work as well: 0, 1, 2, 3 for none, gzip, snappy, lz4, respectively
compression.codec=none
# message encoder
serializer.class=kafka.serializer.DefaultEncoder
同時,我使用kafka-consumer-groups.sh循環(huán)監(jiān)控消費延遲:
bin/kafka-consumer-groups.sh --bootstrap-server ip-188-33-33-31.eu-central-1.compute.internal:9092,ip-188-33-33-32.eu-central-1.compute.internal:9092,ip-188-33-33-33.eu-central-1.compute.internal:9092 --describe --group africaBetMirrorGroupTest --new-consumer
當(dāng)我們使用new KafkaConsumer進行消息消費,要想通過kafka-consumer-groups.sh獲取整個group的offset、lag延遲信息,也必須加上–new-consumer,告知kafka-consumer-groups.sh,這個group的消費者使用的是new kafka consumer,即group中所有consumer的信息保存在了Kafka上的一個名字叫做__consumer_offsets的特殊topic上,而不是保存在zookeeper上。我在使用kafka-consumer-groups.sh的時候就不知道還需要添加--new-consumer,結(jié)果我啟動了MirrorMaker以后,感覺消息在消費,但是就是在zookeeper的/consumer/ids/上找不到group的任何信息。后來在stack overflow上問了別人才知道。
3. 負(fù)載不均衡原因診斷以及問題解決
在我的另外一篇博客《Kafka為Consumer分派分區(qū):RangeAssignor和RoundRobinAssignor》中,介紹了Kafka內(nèi)置的分區(qū)分派策略:RangeAssignor和RoundRobinAssignor。由于RangeAssignor是早期版本的Kafka的唯一的分區(qū)分派策略,因此,默認(rèn)不配置的情況下,Kafka使用RangeAssignor進行分區(qū)分派,但是,在MirrorMaker的使用場景下,RoundRobinAssignor更有利于均勻的分區(qū)分派。甚至在KAFKA-3831中有人建議直接將MirrorMaker的默認(rèn)分區(qū)分派策略改為RoundRobinAssignor。那么,它們到底有什么區(qū)別呢?我們先來看兩種策略下的分區(qū)分派結(jié)果。在我的實驗場景下,有6個topic:ABTestMsg|AppColdStartMsg|BackPayMsg|WebMsg|GoldOpenMsg|BoCaiMsg,每個topic有兩個分區(qū)。由于MirrorMaker所在的服務(wù)器性能良好,我設(shè)置--num.streams 40,即單臺MirrorMaker會用40個線程,創(chuàng)建40個獨立的Consumer進行消息消費,兩個MirrorMaker加起來80個線程,80個并行Consumer。由于總共只有6 * 2=12個TopicPartition,因此最多也只有12個Consumer會被分派到分區(qū),其余Consumer空閑。
我們來看基于RangeAssignor分派策略,運行kafka-consumer-groups.sh觀察到的分區(qū)分派的結(jié)果:
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
ABTestMsg 0 780000 820038 49938 africaBetMirrorGroupTest-4-cf330e66-1319-4925-9605-46545df13453/114.113.198.126 africaBetMirrorGroupTest-0
ABTestMsg 1 774988 820038 55000 africaBetMirrorGroupTest-19-c77523e7-7b87-472b-9a26-cd902888944d/114.113.198.126 africaBetMirrorGroupTest-1
AppColdStartMsg 0 774000 820039 55938 africaBetMirrorGroupTest-19-674d8ad4-39d2-40cc-ae97-f4be9c1bb154/114.113.198.126 africaBetMirrorGroupTest-0
AppColdStartMsg 1 774100 820045 56038 africaBetMirrorGroupTest-15-91c67bf8-0c1c-42ac-97f0-5369794c2d1b/114.113.198.126 africaBetMirrorGroupTest-1
BackPayMsg 0 780000 820038 49938 africaBetMirrorGroupTest-4-cf330e66-1319-4925-9605-46545df13453/114.113.198.126 africaBetMirrorGroupTest-0
BackPayMsg 1 774988 820038 55000 africaBetMirrorGroupTest-19-c77523e7-7b87-472b-9a26-cd902888944d/114.113.198.126 africaBetMirrorGroupTest-1
WebMsg 0 774000 820039 55938 africaBetMirrorGroupTest-19-674d8ad4-39d2-40cc-ae97-f4be9c1bb154/114.113.198.126 africaBetMirrorGroupTest-0
WebMsg 1 774100 820045 56038 africaBetMirrorGroupTest-15-91c67bf8-0c1c-42ac-97f0-5369794c2d1b/114.113.198.126 africaBetMirrorGroupTest-1
GoldOpenMsg 0 780000 820038 49938 africaBetMirrorGroupTest-4-cf330e66-1319-4925-9605-46545df13453/114.113.198.126 africaBetMirrorGroupTest-0
GoldOpenMsg 1 774988 820038 55000 africaBetMirrorGroupTest-19-c77523e7-7b87-472b-9a26-cd902888944d/114.113.198.126 africaBetMirrorGroupTest-1
BoCaiMsg 0 774000 820039 55938 africaBetMirrorGroupTest-19-674d8ad4-39d2-40cc-ae97-f4be9c1bb154/114.113.198.126 africaBetMirrorGroupTest-0
BoCaiMsg 1 774100 820045 56038 africaBetMirrorGroupTest-15-91c67bf8-0c1c-42ac-97f0-5369794c2d1b/114.113.198.126 africaBetMirrorGroupTest-1
- - - - - africaBetMirrorGroupTest-6-ae373364-2ae2-42b8-8a74-683557e315bf/114.113.198.126 africaBetMirrorGroupTest-6
- - - - - africaBetMirrorGroupTest-9-0e346b46-1a2c-46a2-a2da-d977402f5c5d/114.113.198.126 africaBetMirrorGroupTest-9
- - - - - africaBetMirrorGroupTest-7-f0ae9f31-33e6-4ddd-beac-236fb7cf20d5/114.113.198.126 africaBetMirrorGroupTest-7
- - - - - africaBetMirrorGroupTest-7-e2a9e905-57c1-40a6-a7f3-4aefd4f1a30a/114.113.198.126 africaBetMirrorGroupTest-7
- - - - - africaBetMirrorGroupTest-8-480a2ef5-907c-48ed-be1f-33450903ec72/114.113.198.126 africaBetMirrorGroupTest-8
- - - - - africaBetMirrorGroupTest-8-4206bc08-58a5-488a-b756-672fb4eee6e0/114.113.198.126 africaBetMirrorGroupTest-8
.....后續(xù)更多空閑consumer省略不顯示
當(dāng)沒有在mirror-consumer.properties 中配置分區(qū)分派策略,即使用默認(rèn)的RangeAssignor的時候,我們發(fā)現(xiàn),盡管我們每一個MirrorMaker有40個Consumer,整個Group中有80個Consumer,但是,一共6 * 2 = 12個TopicPartition竟然全部聚集在2-3個Consumer上,顯然,這完全浪費了并行特性,被分配到一個consumer上的多個TopicPartition只能串行消費。
因此,通過partition.assignment.strategy=org.apache.kafka.clients.consumer.RoundRobinAssignor顯式指定分區(qū)分派策略為RoundRobinAssignor,重啟MirrorMaker,重新通過kafka-consumer-groups.sh 命令觀察分區(qū)分派和消費延遲結(jié)果:
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
ABTestMsg 0 819079 820038 959 africaBetMirrorGroupTest-4-cf330e66-1319-4925-9605-46545df13453/114.113.198.126 africaBetMirrorGroupTest-1
ABTestMsg 1 818373 820038 1665 africaBetMirrorGroupTest-19-c77523e7-7b87-472b-9a26-cd902888944d/114.113.198.126 africaBetMirrorGroupTest-5
AppColdStartMsg 0 818700 818907 1338 africaBetMirrorGroupTest-19-674d8ad4-39d2-40cc-ae97-f4be9c1bb154/114.113.198.126 africaBetMirrorGroupTest-20
AppColdStartMsg 1 818901 820045 1132 africaBetMirrorGroupTest-15-91c67bf8-0c1c-42ac-97f0-5369794c2d1b/114.113.198.126 africaBetMirrorGroupTest-18
BackPayMsg 0 819032 820038 959 africaBetMirrorGroupTest-4-cf330e66-1319-4925-9605-46545df13453/114.113.198.126 africaBetMirrorGroupTest-5
BackPayMsg 1 818343 820038 1638 africaBetMirrorGroupTest-19-c77523e7-7b87-472b-9a26-cd902888944d/114.113.198.126 africaBetMirrorGroupTest-8
WebMsg 0 818710 818907 1328 africaBetMirrorGroupTest-19-674d8ad4-39d2-40cc-ae97-f4be9c1bb154/114.113.198.126 africaBetMirrorGroupTest-7
WebMsg 1 818921 820045 1134 africaBetMirrorGroupTest-15-91c67bf8-0c1c-42ac-97f0-5369794c2d1b/114.113.198.126 africaBetMirrorGroupTest-9
GoldOpenMsg 0 819032 820038 959 africaBetMirrorGroupTest-4-cf330e66-1319-4925-9605-46545df13453/114.113.198.126 africaBetMirrorGroupTest-12
GoldOpenMsg 1 818343 820038 1638 africaBetMirrorGroupTest-19-c77523e7-7b87-472b-9a26-cd902888944d/114.113.198.126 africaBetMirrorGroupTest-14
BoCaiMsg 0 818710 818907 1322 africaBetMirrorGroupTest-19-674d8ad4-39d2-40cc-ae97-f4be9c1bb154/114.113.198.126 africaBetMirrorGroupTest-14
BoCaiMsg 1 818921 820045 1189 africaBetMirrorGroupTest-15-91c67bf8-0c1c-42ac-97f0-5369794c2d1b/114.113.198.126 africaBetMirrorGroupTest-117
- - - - - africaBetMirrorGroupTest-6-ae373364-2ae2-42b8-8a74-683557e315bf/114.113.198.126 africaBetMirrorGroupTest-6
- - - - - africaBetMirrorGroupTest-9-0e346b46-1a2c-46a2-a2da-d977402f5c5d/114.113.198.126 africaBetMirrorGroupTest-9
- - - - - africaBetMirrorGroupTest-7-f0ae9f31-33e6-4ddd-beac-236fb7cf20d5/114.113.198.126 africaBetMirrorGroupTest-7
- - - - - africaBetMirrorGroupTest-7-e2a9e905-57c1-40a6-a7f3-4aefd4f1a30a/114.113.198.126 africaBetMirrorGroupTest-7
- - - - - africaBetMirrorGroupTest-8-480a2ef5-907c-48ed-be1f-33450903ec72/114.113.198.126 africaBetMirrorGroupTest-8
- - - - - africaBetMirrorGroupTest-8-4206bc08-58a5-488a-b756-672fb4eee6e0/114.113.198.126 africaBetMirrorGroupTest-8
.....后續(xù)更多空閑consumer省略不顯示
對比RangeAssingor,消息延遲明顯減輕,而且,12個TopicPartition被均勻分配到了不同的consumer上,即單個Consumer只負(fù)責(zé)一個TopicPartition的消息消費,不同的TopicPartition之間實現(xiàn)了完全并行。
之所以出現(xiàn)以上不同,原因在于兩個分區(qū)分派方式的策略不同:
RangeAssingor:先對所有Consumer進行排序,然后對Topic逐個進行分區(qū)分派。用以上Topic作為例子:
對所有的Consumer進行排序,排序后的結(jié)果為Consumer-0,Consumer-1,Consumer-2 ....Consumer-79
對ABTestMsg進行分區(qū)分派:
ABTestMsg-0分配給Consumer-0
ABTestMsg-1分配各Consumer-1
對AppColdStartMsg進行分區(qū)分派:
AppColdStartMsg-0分配各Consumer-0
AppColdStartMsg-1分配各Consumer-1
#后續(xù)TopicParition的分派以此類推
可見,RangeAssingor 會導(dǎo)致多個TopicPartition被分派在少量分區(qū)上面。
- RoundRobinAssignor:與RangeAssignor最大的區(qū)別,是不再逐個Topic進行分區(qū)分派,而是先將Group中的所有TopicPartition平鋪展開,再一次性對他們進行一輪分區(qū)分派。
將Group中的所有TopicPartition展開,展開結(jié)果為:
ABTestMsg-0,ABTestMsg-1,AppColdStartMsg-0,AppColdStartMsg-1,BackPayMsg-0,BackPayMsg-1,WebMsg-0,WebMsg-1,GoldOpenMsg-0,GoldOpenMsg-1,BoCaiMsg-0,BoCaiMsg-1
對所有的Consumer進行排序,排序后的結(jié)果為Consumer-0,Consumer-1,Consumer-2 ,Consumer-79。
開始講平鋪的TopicPartition進行分區(qū)分派
ABTestMsg-0分配給Consumer-0
ABTestMsg-1分配給Consumer-1
AppColdStartMsg-0分配給Consumer-2
AppColdStartMsg-1分配給Consumer-3
BackPayMsg-0分配給Consumer-4
BackPayMsg-1分配給Consumer-5
#后續(xù)TopicParition的分派以此類推
由此可見,RoundRobinAssignor平鋪式的分區(qū)分派算法是讓我們的Kafka MirrorMaker能夠無重疊地將TopicParition分派給Consumer的原因。
4. 本身網(wǎng)絡(luò)帶寬限制問題
網(wǎng)絡(luò)帶寬本身也會限制Kafka Mirror的吞吐量。進行壓測的時候,我分別在我們的在線環(huán)境和測試環(huán)境分別運行Kafka MirrorMaker,均選擇兩臺服務(wù)器運行MirrorMaker,但是在線環(huán)境是實體機環(huán)境,單臺機器通過SCP方式拷貝Source Cluster上的大文件,平均吞吐量是600KB-1.5MB之間,但是測試環(huán)境的機器是同一個host主機上的多臺虛擬機,SCP吞吐量是100KB以下。經(jīng)過測試,測試環(huán)境消息積壓會逐漸增多,在線環(huán)境持續(xù)積壓,但是積壓一直保持穩(wěn)定。這種穩(wěn)定積壓是由于每次poll()的間隙新產(chǎn)生的消息量,屬于正常現(xiàn)象。
5. 適當(dāng)配置單次poll的消息總量和單次poll()的消息大小
通過Kafka MirrorMaker運行時指定的consumer配置文件(在我的環(huán)境中為$MIRROR_HOME/config/mirror-consumer.properties)來配置consumer。其中,通過以下配置,可以控制單次poll()的消息體量(數(shù)量和總體大小)
max.poll.records:單次poll()操作最多消費的消息總量,這里說的poll是單個consumer而言的。無論過大過小,都會發(fā)生問題:
如果設(shè)置得過小,則消息傳輸率降低,大量的頭信息會占用較大的網(wǎng)絡(luò)帶寬;-
如果設(shè)置得過大,則會產(chǎn)生一個非常難以判斷原因同時又會影響整個group中所有消息的消費的重要問題:rebalance。看過kafka代碼的話可以看到,每次poll()請求都會順帶向遠(yuǎn)程server發(fā)送心跳信息,遠(yuǎn)程GroupCoordinator會根據(jù)這個心跳信息判斷consumer的活性。如果超過指定時間(heartbeat.interval.ms)沒有收到對應(yīng)Consumer的心跳,則GroupCoordinator會判定這個Server已經(jīng)掛掉,因此將這個Consumer負(fù)責(zé)的partition分派給其它Consumer,即觸發(fā)rebalance。rebalance操作的影響范圍是整個Group,即Group中所有的Consumer全部暫停消費直到Rebalance完成。而且,TopicPartition越長,這個過程會越長。其實,一個正常消費的環(huán)境,應(yīng)該是任何時候都不應(yīng)該發(fā)生rebalance的(一個新的Consumer的正常加入也會引起Rebalance,這種情況除外)。雖然Kafka本身是非常穩(wěn)定的,但是還是應(yīng)該盡量避免rebalance的發(fā)生。在某些極端情況下觸發(fā)一些bug,rebalance可能永遠(yuǎn)停不下來了。。。如果單次max.poll.records消費太多消息,這些消息produce到Target Cluster的時間可能會較長,從而可能觸發(fā)Rebalance。
6. 惡劣網(wǎng)絡(luò)環(huán)境下增加超時時間配置
在不穩(wěn)定的網(wǎng)絡(luò)環(huán)境下,應(yīng)該增加部分超時時間配置,如request.timeout.ms或者session.timeout.ms,一方面可以避免頻繁的超時導(dǎo)致大量不必要的重試操作,同時,通過增加如上文所講,通過增加heartbeat.interval.ms時間,可以避免不必要的rebalance操作。當(dāng)然,在網(wǎng)絡(luò)環(huán)境良好的情況下,上述配置可以適當(dāng)減小以增加Kafka Server對MirrorMaker出現(xiàn)異常情況下的更加及時的響應(yīng)。
總之,Kafka MirrorMaker作為跨數(shù)據(jù)中心的Kafka數(shù)據(jù)同步方案,絕對無法允許數(shù)據(jù)丟失以及數(shù)據(jù)的傳輸速度低于生產(chǎn)速度導(dǎo)致數(shù)據(jù)越積累越多。因此,唯有進行充分的壓測和精準(zhǔn)的性能調(diào)優(yōu),才能綜合網(wǎng)絡(luò)環(huán)境、服務(wù)器性能,將Kafka MirrorMaker的性能發(fā)揮到最大。
轉(zhuǎn)載于:https://www.cnblogs.com/felixzh/p/11508192.html
總結(jié)
以上是生活随笔為你收集整理的Kafka跨集群迁移方案MirrorMaker原理、使用以及性能调优实践的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: kafka集群安全化之启用kerbero
- 下一篇: Kafka MirrorMaker 跨集