kafka创建topic_Kafka实战宝典:一文带解决Kafka常见故障处理
?Kafka自帶常用工具
Kafka的bin目錄下shell腳本是kafka自帶的管理工具,提供topic的創建/刪除/配置修改、消費者的監控、分區重載、集群健康監控、收發端TPS壓測、跨機房同步等能力,Kafka運維者可以使用這些工具進行集群的管理。
Kafka節點的啟/停
###kafka 運行
bin/kafka-server-start.sh -daemon ../config/server.properties &
###kafka 停止
bin/kafka-server-stop.sh
如果上面命令并未停止掉相應的進程,建議執行kill –s TERM $pid,完成進程關閉。
Topic的創建/刪除/配置修改
Kafka的bin目錄下有若干shell腳本,提供很多工具,完成kafka的元數據的監控和管理。
##創建topic
./kafka-topics.sh --zookeeper xxxx --replication-factor 3 --partitions 10 --create --topic xxxx
##查看topic
./kafka-topics.sh --zookeeper xxxx --describe –topic xxxx
##-刪除topic
./kafka-topics.sh --delete --zookeeper xxxx--topic xxxx
## 查看集群中的topic
./kafka-topics.sh --zookeeper xxxx –list
## 查看指定topic配置
./kafka-topics.sh --zookeeper xxxx --describe --topic xxx
## 修改超時時長
./kafka-topics.sh --zookeeper xxxx --alter --topic xxxx --config retention.ms=864000
## 增加topic分區數
./kafka-topics.sh --zookeeper xxxx --alter --partitions 5 --topic xxxx
Topic的生產/消費
## kafka生產消息
./kafka-console-producer.sh --broker-list xxxx --topic xxxx
## 消費kafka 若不需要重頭消費,去掉from-beginning
./kafka-console-consumer.sh --zookeeper xxxx --topic xxxx -from-beginning
查看/修改消費偏移量
## kafka_0.10前查看kafka的消費積壓
./kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --zookeeper xxxx --topic xx -group xx
## kafka_1.0新版本后查看kafka的消費積壓
./kafka-consumer-groups.sh --bootstrap-server xxxx --describe --group xxx
## 修改zk中保存的偏移量
./zkCli.sh –server xxxx:xxset /consumer/xxx/xx
## 修改kafka中保存的偏移量 kafka_0.10前版本不支持修改偏移量操作,0.11后版本支持
./kafka-consumer-groups.sh --bootstrap-server xxxx –group xxx –topic xxx:xx –shift-by xxxx --execute
Topic的分區重載
一般分區重載在集群新加節點(kafka集群增加節點后,舊topic不會進行數據的重載)和分區備份列表擴增的時候需要用到,分區重載需要預先設定重載的json配置文件;
##指定需要分配的broker列表
./bin/kafka-reassign-partitions.sh --zookeeper xxxx --topics-to-move-json-file xx.json --broker-list "1,2,3,4,5" --generate b
##執行重載計劃
./kafka-reassign-partitions.sh --zookeeper xxxx --reassignment-json-file xxx.json –execute
##驗證重載計劃
./kafka-reassign-partitions.sh --zookeeper xxxx --reassignment-json-file xxx.json --verify
Kafka常見問題
處理Kafka常見問題的思路是首先檢查集群健康,在實時監控集群節點運行日志的基礎上找出影響集群狀態的問題,broker狀態不正常會導致發端問題和消費積壓,確認集群節點正常后,發送端和消費端的問題可以通過調優解決.
Broker常見問題
1、分區ISR列表出現頻繁Expanding, Shinking,導致broker不可用:
Kafka的集群中有節點日志出現大量的ISR列表頻繁Expanding, Shinking問題造成當前節點不可用問題,該問題出現的原因為:Kafka的每個topic有若干個分區partition,每個partiton可能有多個備份,這樣就單個分區而言,多個備份中有leader和follower兩種角色,follower會定時從leader同步數據,每個分區都有一個ISR列表,該列表表征了分區的多個備份是否在同步中,若follower掛掉或者網絡抖動,則被移除ISR列表,若恢復正常,則加入到ISR列表。
若出現ISR頻繁的Expanding和 Shinking表明可能是單個分區的數據量過大導致部分分區的follower無法及時備份,或者follower無法及時同步足夠的消息已滿足ISR判定條件,從而被Shinking清除出ISR列表,瞬間又追上復制速度,從而Expanding加入到ISR列表。
解決方法:修改kafka的配置文件,增加單個broker的復制數據的線程數,降低ISR列表判定條件(時長+條數)。
2、節點OOM問題
Kafka的默認啟動內存256M,Kafka的生產端首先將數據發送到broker的內存存儲,隨機通過主機的OS層的數據刷盤機制將數據持久化,因此Kafka需要一定大小的內存空間,在生產環境一般建議將啟動內存調整,官方建議內存在4-8G左右大小;
若節點出現OOM,進程運行日志會出現OOM關鍵詞(目前已加入關鍵字告警),隨即kafka進程宕掉;
解決方法:修改${KAFKA_HOME}/bin/kafka-server-start.sh腳步。
3、java.io.IOException Connection to xx was disconnected before the response was read xxxxxxxxxxxxxxxxxx
針對此問題,網上的意見不一。導致該報錯的問題有很多,Kafka集群中的各個節點,均會自主發起同步其他節點數據的線程,用以已達到數據備份目的,若集群中有broker節點不正常或負載過高,其他broker節點同步該節點數據的線程即會出現這種報錯,因此該類問題通常伴隨著ReplicaFetcherThread線程shutdown日志.
解決方法:
1、觀察集群的其他節點是否有同樣報錯,多個報錯日志中是否都指向固定的kafka節點(連接問題),若指向同一broker,則表明數據同步線程無法讀取該節點的消息,該節點存在問題,觀察該節點的iostat,是否存在讀寫瓶頸(硬件+OS層均需要巡檢)。
2、若集群的多個節點均存在同樣的報錯,且報錯信息指向不同的節點(該問題較少出現),則排除單個broker問題造成的問題,觀察不影響數據收發,可忽略該報錯。
4、broker上kafka進程正確啟/停
生產中遇到過單個物理機部署多個Kafka實例的場景,在執行./kafka-server-stop.sh腳本,該腳本會匹配機器上所有運行的kafka實例,并全部關閉,如下,因此若一個機器上有多個kafka實例,需要關閉特定的Kafka實例,建議使用kill –s TERM $pids 方式停止進程。
Kafka的啟動方式使用:
./kafka-server-start –daemon ../conf/server.properties &
關于啟/停的驗證:kafka進程的啟動/關閉狀態,可通過log/server.log跟蹤,但在啟動時一般需要大量的時間恢復文件和index,關閉時需要shutdow一些同步數據的線程,因此根據zookeeper中的節點信息判定是否正確完成啟動/關閉:
1.使用./zkCli.sh –server host:port進入到zk的元數據樹;
2.查看get /brokers/ids/ 得到加入zk的節點數;
5、broker運行日志大量topic不存在報錯,導致節點不可用
若broker的運行日志大量刷topic不存在的WARN,并導致節點不可用;表明該集群存在topic被刪除,但有發端仍使用該topic發送數據,此時需要檢查broker上的2個配置項:
delete.topic.enableauto.create.topics.enable
生產環境下需要進行規范化的topic管理,難免進行topic的增刪,建議將自動創建topic開關關閉,將可刪除topic的開關打開,設置:
delete.topic.enable=trueauto.create.topics.enable=false
Producer常見問題
當前公司的commonlog封裝的是0.8版本的發端(scala版),發送效率低且默認的發送機制存在問題,官方建議盡早升級,后續將不再支持0.8版本的發送端發送消息;
1、kafka.common.MessageSizeTooLargeException
Kafka的broker和發送端、消費端都會定義單條數據大小的屬性,一般默認大小是0.95G,若在broker端調整了該屬性,但發端未同步設置單條數據大小,則會出現報錯kafka.common.MessageSizeTooLargeException,造成整個batch數據的丟失,若消費端設置的消費單條數據大小
解決方法:修改kafka的broker配置文件、發送者、消費者的單條數據大小,綜合考慮單條數據大小范圍;
2、fetching topic metadata for topics [Set(test)] from broker x failed
Kafka的發端發送數據的同時會給broker發送心跳,并得到一些topic的metadata元數據信息(包括分區數、分區的leader),fetching topic metadata for topics [Set(test)] from broker x failed 報錯一般表征了kafka的集群節點不健康。
解決方法:修改kafka的broker配置文件、發送者、消費者的單條數據大小,綜合考慮單條數據大小范圍;
3、LEADERNOTAVAILABLE
WARN Error while fetching metadata with correlation id 0{test=LEADER_NOT_AVAILABLE}
解決方法:若出現該報錯,表名Topic可能正在進行leader選舉 使用kafka-topics腳本檢查leader信息。
4、NotLeaderForPartitionException
Kafka的生產者在得到topic某個分區的leader信息后,生產者會向topic的leader發送消息,NotLeaderForPartitionException 的報錯一般發生在元數據中的leader和真實的leader不一致時候,即 leader從一個broker切換到另一個broker時,要分析什么原因引起了leader的切換。
5、TimeoutException
檢查網絡是否能通,如果可以通,可以考慮增加request.timeout.ms的值。
快速有效的解決方法
劃重點
我們常說“重啟是萬能的”。但是若出現kafka集群不可用,且無法快速恢復集群狀態,你該怎么辦。
答案是:換個姿勢重啟!
由于kafka一般都是至少3節點,若有節點長時間啟動不了,影響生產數據的發送,集群并沒有完全宕(zk存儲的kafka元數據信息沒有丟失)。
通過在同一個主機上新建數據目錄和kafka應用目錄,并重啟問題節點可以完成集群的快速恢復(默認丟棄問題節點的歷史數據)。
總結
以上是生活随笔為你收集整理的kafka创建topic_Kafka实战宝典:一文带解决Kafka常见故障处理的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 微信能远程控制电脑吗_神器分享:用微信就
- 下一篇: 分类系列之感知器学习算法PLA 和 口袋