大数据技术之 Kafka (第 3 章 Kafka 架构深入 ) Kafka 消费者
3.3.1 消費方式?
consumer 采用 pull(拉)模式從 broker 中讀取數據。?
push(推)模式很難適應消費速率不同的消費者,因為消息發送速率是由 broker 決定的。
它的目標是盡可能以最快速度傳遞消息,但是這樣很容易造成 consumer 來不及處理消息,典型的表現就是拒絕服務以及網絡擁塞。而 pull 模式則可以根據 consumer 的消費能力以適當的速率消費消息。?
pull 模式不足之處是,如果 kafka 沒有數據,消費者可能會陷入循環中,一直返回空數據。針對這一點,Kafka 的消費者在消費數據時會傳入一個時長參數 timeout,如果當前沒有數據可供消費,consumer 會等待一段時間之后再返回,這段時長即為 timeout
3.3.2 分區分配策略?
一個 consumer group 中有多個 consumer,一個 topic 有多個 partition,所以必然會涉及到 partition 的分配問題,即確定那個 partition 由哪個 consumer 來消費。?
Kafka 有兩種分配策略,一是 RoundRobin,一是 Range。?
1)RoundRobin (輪詢)按照組來消費
分區分配策略之RoundRobin
使用輪詢的策略優點:就是一個消費者組多個消費者直接消費消息最多相差1個
缺點:使用輪詢的策略有一個問題,當一個消費者組訂閱的是多個topic主題,假設有一個消費者組consumergroup(consumerA訂閱了主題topic1和consumerB主題topic2)consumerA消費topic1,consumerB消費topic2 ,這看起來似乎沒有問題,使用輪詢的策略會將消費者組訂閱的主題當成一個整體。但是topic1和topic2各有三個partition分區,在kafka內部有一個TopicAndPartition這個類會將topic1和topic2的partition進行排序,假設兩個經過排序之后順序{topic1partition0,topic2partition0,topic2partition1,topic1partition2,topic1partition1,topic2partition2}? ?然后consumerA和consumerB輪詢的拉去消息,這樣consumerA就會將topic2的消息給拉取消費了這樣是不是有問題?
所以使用輪詢策略條件的前提:就是一個消費者組里消費者訂閱的主題是一樣的,只有consumerA和consumerB都訂閱了topic1和topic2,這樣使用輪詢的方式才不會有問題
2)Range? (范圍)默認的消費方式? 按主題的方式給消費者(誰訂閱了我就給誰消費)
? ? ? 分區分配策略之Range
范圍range是按照范主題劃分的,一個主題7個分區 3個消費者? 7除以3除不盡就會分布不均,消費者1消費前topic1的前三個分區,后面兩個消費者消費topic1的4和5分區? ?6和7分區就給消費者3消費,這種情況看起來也沒有什么問題?
缺點:假設消費者他們訂閱了2個主題topic1和topic2? 都是7個分區 ,由于是按主題劃分的所以,消費者1就分到了topic1和topic2的1、2、3分區這樣消費者1就被分到了6個分區,消費者2和消費者3只分到了4個分區,隨著訂閱的主題越來越多,這樣消費者1和其他消費者相差越來越大,就不均衡了?
思考一個問題:消費者消費消息什么時候重新分配?
當消費者個數發生變的時候,
1,假設topic1有6個分區? 三個消費者A、B、C,不管用什么策略分配,假設C負責消費partition4和partition5,突然C掛掉了,這個時候partition4和partition5需不需要消費,答案當然是要,那怎么消費?當然是重新分配
2,假設topic1有6個分區? 三個消費者A、B、C、D,當消費者A服務起來的時候6個分區都分配給了A,當B起來的時候重新分配,當C起來時候也會重新分配,消費者A、B、C都分配到了2個partition,當第四個消費者D加進來的時候,會怎么辦?還是上面那句話,消費者個數發生變化的時候,就會觸發分區分配策略重新分配
總結:當消費者個數發生變的時候,消費者個數可以增多或者減少,甚至可以增多至比分區數還多的時候,照樣會重新分配,只是有些消費者可能被分配不到
3.3.3 offset 的維護?
由于 consumer 在消費過程中可能會出現斷電宕機等故障,consumer 恢復后,需要從故障前的位置的繼續消費,所以 consumer 需要實時記錄自己消費到了哪個 offset,以便故障恢復后繼續消費。?
Kafka 0.9 版本之前,consumer 默認將 offset 保存在 Zookeeper 中,從 0.9 版本開始,consumer 默認將 offset 保存在 Kafka 一個內置的 topic 中,該 topic 為__consumer_offsets。?
1)修改配置文件 consumer.properties
exclude.internal.topics=false
2)讀取 offset
0.11.0.0 之前: bin/kafka-console-consumer.sh --topic __consumer_offsets zookeeper backup01:2181 --formatter "kafka.coordinator.GroupMetadataManager\$OffsetsMessageFormatter" --consumer.config config/consumer.properties from-beginning 0.11.0.0 之后版本(含): bin/kafka-console-consumer.sh --topic __consumer_offsets zookeeper backup01:2181 --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" --consumer.config config/consumer.properties frombeginning但是在新版本中
[root@backup01 kafka_2.12-2.4.1]# bin/kafka-console-consumer.sh --topic __consumer_offsets zookeeper backup01:2181 --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" --consumer.config config/consumer.properties frombeginning Missing required argument "[bootstrap-server]"那我們不能用zookeeper了
[root@backup01 kafka_2.12-2.4.1]# bin/kafka-console-consumer.sh --topic __consumer_offsets --bootstrap-server backup01:9092 --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" --consumer.config config/consumer.properties frombeginning總結
以上是生活随笔為你收集整理的大数据技术之 Kafka (第 3 章 Kafka 架构深入 ) Kafka 消费者的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Java未被捕获的异常
- 下一篇: Java try和catch的使用