消息队列_消息队列:kafka
概念
kafka是一個分布式的基于發布/訂閱模式的消息隊列,主要用于大數據實時處理領域。
要理解kafka首先要有分布式的概念,要有消息隊列的概念。分布式系統最大的優勢就是解耦和削峰,這種情況下,A系統生成了一個消息,B系統異步獲取,那么就需要一個存放消息的消息隊列(MQ)。
相比較傳統的消息隊列,消息被消費確認后會刪除,而kafka有持久化功能,B系統消費完,C系統還可以再次消費。kafka默認消息保存168小時,即7天,可配置。
kafka的核心概念有:broker、topic、partition、leader、follower、ISR、acks、offset、lEO和HW,下面分別理解一下。
broker、topic、partition
- Kafka以集群的方式運行,可以由一個或多個服務組成,每個服務叫做一個broker。 
- 一個broker服務上可以有多個topic,topic是kafka消息隊列的一個邏輯概念,partition是具體的物理概念,比如topic=first,partition=0,消息實際存儲在broker的first-0目錄下。 
- partition分區的優勢:高并發的情況下,可以動態擴展,生產者往多個分區發生消息,能提高并發量。 
- 生產者發送給kafka的消息是個ProducerRecord對象,ProducerRecord提供了6種構造器,如下: 
不管使用哪一種構造器,最終實現如下:
public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value, Iterable headers) {if (topic == null) {
throw new IllegalArgumentException("Topic cannot be null.");
} else if (timestamp != null && timestamp < 0L) {
throw new IllegalArgumentException(String.format("Invalid timestamp: %d. Timestamp should always be non-negative or null.", timestamp));
} else if (partition != null && partition < 0) {
throw new IllegalArgumentException(String.format("Invalid partition: %d. Partition number should always be non-negative or null.", partition));
} else {
this.topic = topic;
this.partition = partition;
this.key = key;
this.value = value;
this.timestamp = timestamp;
this.headers = new RecordHeaders(headers);
}
}
其中,topic必須指定,partition的值有以下參數確定:
1)指定partition值,消息直接放入指定的分區;
2)既沒有partition也沒有key,采用round-robin算法,隨機生成一個整數,與topic的partition數取余得到partition值,后續在這個整數上自增;
3)沒有指定partition但有key,將key的hash值與topic的Partition數進行取余得到partition值;
4)既有partition又有key,以partition為準,key失效。
leader、follower
每一個broker都有一個leader和多個follower,leader和ollower不能在同一個節點上,否則服務故障,主從都會失效。
producer可靠性保障和ISR
思考:producer向kafka發送消息,如何保障消息的可靠性?
有兩種方案:
1)為了保障消息不丟失,至少要半數以follower上同步完成,broker發確認。
半數以上同步完成,有且只有一個follower才有可能獲得半數以上投票,這樣就不會存在選出來多個leader了,而且這半數以上的follower都完成了同步,能保證參與選舉的follower(自己本身也是候選人)都滿足成為leader的資格。
比如有7個follower,半數以上即4個follower同步完成,全部投選1號為leader,leader唯一可確定。如果選舉是2,2,的結果,則沒有任何一個follower有資格成為leader。
2)等待所有follower全部同步完成發確認,此時可以隨便選舉一個follower作為leader。
第一種方案,n個副本掛掉了,需要2n+1個副本才能恢復數據;
第二種方案,n個副本掛掉了,只需要n+1個副本就能恢復數據;
優選第二種方案:第一種方案當n+1個同步完成,n個掛掉,需要2n+1個副本,而每個partition的數據量都很大,會造成數據冗余。
但是第二種方案存在萬一某個follower遲遲無法同步完成,會造成嚴重延遲,為了解決這個問題,kafka維護了一個動態follower集合——ISR:
ISR初始集合是所有follower,如果某個follower故障,在指定時間內無法同步,會被踢出ISR,這樣只要保證ISR中的剩余follower同步完成就可以發送確認。
replica.lag.time.ms參數用來指定超時的時間閥值。
acks
acks用來指定broker什么時候發收到消息的確認:
- 0:producer不等待broker的ack,broker一收到消息還沒寫入就返回,一旦broker故障,消息會丟失,適用日志記錄,對消息丟失容忍度比較高的場景; 
- 1:producer等待broker的ack,broker的leader寫入成功返回ack,如果follower同步成功之前leader掛掉,消息會丟失。 
- -1:producer等待broker的ack,broker的leader和所有follower都寫入成功后發ack,但是此時leader掛掉了,沒有發送ack,producer超時沒收到ack,就會重發消息,造成消息重復。 
文件存儲和offset
kafka的存儲消息有兩個文件,.log和.index;offset指定消費者要消費消息的位置,如下圖,根據offset=3先定位.index文件,找到3對應的索引值,然后根據索引值756找到.log中對應的具體消息:
思考:offset是保存在哪里?
kafka0.9版本之前是保存在zookeeper上,0.9之后保存在kafka內置的一個topic上。
LEO和HW
LEO:log end offset;
HW:high watermark,HW是所有follower中最小的LEO。
這兩個概念主要用于follower同步中,每個follower都有自己的LEO,因為每個follower同步有快有慢,所有出現了HW,類似木桶的最短板,如下圖:
比如說leader掛掉了,下圖中的follower3成為leader,那follower1和follower2需要同步數據,各自從HW位截斷,從HW=10開始同步,最終一致,leader和follower都是13;
假設follower2成為leader,follower1和follower3從從HW=10截斷,發現和follower2已經一致,此時不需要額外同步,最終一致,leader和follower都是10。
LEO之前的消息是完全一致的,而HW之后的數據不一致,所以kafka設計上HW之后數據對于消費者是不可見的。
follower故障被踢出之后,重新連接是從HW開始同步數據。
HW只能保證數據一致性,不能保證重復或者丟失,重復或者丟失由acks來定。
總結
以上是生活随笔為你收集整理的消息队列_消息队列:kafka的全部內容,希望文章能夠幫你解決所遇到的問題。
 
                            
                        - 上一篇: 学C语言办公本和游戏本,为什么不建议买游
- 下一篇: iaanotif.exe是什么进程 作用
