websphere mq 查看队列中是否有数据_全网最全的 “消息队列”
消息隊列的使用場景
以下介紹消息隊列在實際應用常用的使用場景。異步處理、應用解耦、流量削鋒和消息通訊四個場景。
1】異步處理:場景說明:用戶注冊后,需要發注冊郵件和注冊短信。
引入消息隊列后架構如下:用戶的響應時間=注冊信息寫入數據庫的時間,例如50毫秒。發注冊郵箱、發注冊短信寫入消息隊列后,直接返回客戶端,因寫入消息隊列的速度很快,基本可以忽略,因此用戶的響應時間可能是50毫秒。按照傳統的做法:
? ①、串行方式,將注冊信息寫入數據庫成功后,發注冊郵件,再發送注冊短信,以上三個成功后,返回客戶端。可能需要150毫秒,這樣使用消息隊列提高了3倍。
? ②、并行方式,將注冊信息寫入數據庫成功后,發送注冊郵件,同時發送注冊短信。也可能需要100毫秒,這樣使用消息隊列提高了2倍。
2】應用解耦:場景說明:用戶下單后,訂單系統需要通知庫存系統。如下圖:
傳統模式的缺點:①、庫存系統無法訪問時,則訂單減庫存業務將會失敗,從而導致訂單失敗;②、訂單系統與庫存系統耦合;
引入消息隊列:①、用戶下單后,訂單系統完成持久化處理,將消息寫入消息隊列,返回用戶訂單下單成功。②、庫存系統:訂閱下單的消息,采用拉/推的方式,獲取下單信息,庫存系統根據下單信息,進行庫存操作。
?? ?當庫存系統不能正常使用時,也不會影響正常下單,因為下單后,訂單系統寫入消息隊列就不再關心其他的后續操作了。實現訂單系統與庫存系統的解耦。
3】流量削鋒:場景說明:秒殺或團搶活動中使用廣泛。秒殺活動,一般會因為流量過大,導致流量暴增,應用掛掉。一般需要在應用前端加入消息隊列。用戶請求:服務器接受后,首先寫入消息隊列。當消息隊列長度超出最大數量,則直接拋棄用戶請求或跳轉至錯誤頁面。秒殺業務處理:根據消息隊列中的請求信息,再做后續處理。
??▁▂▃ 這樣可以有效的控制活動人數和有效緩解短時間內的高流量沖擊,防止壓垮應用系統。
4】日志處理:指將消息隊列用在日志處理中,比如 Kafka 的應用,解決大量日志傳輸的問題。
? ?? 日志采集客戶端:負責日志數據采集,定時寫入 Kafka隊列。
? ?? kafka消息隊列:負責日志數據的接收,存儲和轉發。
? ?? 日志處理應用:訂閱并消費 kafka 隊列中的日志數據。
5】消息通信:消息隊列一般都內置了高效的通信機制,因此也可以用純消息通信。比如實現點對點消息隊列,或者聊天室。
? ①、點對點通訊:客戶端A和客戶端B使用同一隊列,進行消息通訊? ②、聊天室通訊(發布訂閱模式):客戶端A,客戶端B,客戶端N訂閱同一主題,進行消息發布和接收。實現類似聊天室效果。消息中間件的工作流程
?1、發送端 MQ-Product (消息生產者)將消息發送給 MQ-server;
?2、MQ-server 將消息落地,持久化到數據庫等;
?3、MQ-server 回 ACK 給 MQ-Producer;
?4、MQ-server 將消息發送給消息接收端 MQ-Consumer (消息消費者);
?5、MQ-Consumer 消費接收到消息后發送 ACK 給 MQ-server;
?6、MQ-server 將落地消息刪除;消息的重發,補發策略為了保證消息必達,MQ使用了消息超時、重傳、確認機制。使得消息可能被重復發送,當消息生產者收不到 MQ-server 的ACK,重復向 MQ-server發送消息。MQ-server 收不到消息消費者的 ACK,重復向消息消費者發消息。
【消息重發】:【1】如果消息接收者在處理消息過程中沒有對MOM(消息中間鍵)進行應答,則消息將由 MOM重發。
【2】如果隊列中設置了預讀參數(consumer.perfetchSize),如果消息接收者在處理第一條消息時(沒有向MOM進行確認)就宕機了,則預讀數量的所有消息將被重發。
【3】如果 Session 是事務的,則只要消息接收者有一條消息沒有確認,或消息發送期間 MOM 或客戶端某一方突然宕機了,則該事務范圍中的所有消息 MOM 都將重發。
?? ActiveMQ 消息服務器怎么知道客戶端到底是消息正在處理中還是已處理完成沒應答MOM或者宕機等等情況?其實是所有的客戶端機器,都運行著一套客戶端的 ActiveMQ 環境,該環境緩存發來的消息,維持著和 ActiveMQ服務器的消息通訊,負責失效轉移(fail-over)等,所有的判斷和處理都是由這套客戶端環境來完成的。
【補發策略】:前提,Broker 根據自己的規則,通過 BrokerInfo 命令包和客戶端建立連接,向客戶端傳送缺省發送策略(發送:同步和異步,策略:持久化消息和非持久化消息)。但是客戶端可以使用 ActiveMQConnect.getRedeliveryPolicy() 方法覆蓋該策略設置。
RedeliveryPolicy policy = connection.getRedeliveryPolicy(); policy.setInitialRedeliveryDelay(500); policy.setBackOffMultiplier(2); policy.setUseExponentialBackOff(true); policy.setMaximumRedeliveries(2);★? 一旦消息重發嘗試超過重發策略中配置的 maximumRedeliveries(默認=6)會給 Broker 發送一個“Poison ack”通知它,這個消息被認為是 a poison pill,接著 Broker會將這個消息發送給 DLQ(Dead Letter Queue),以便后續處理。
【策略】:【1】 缺省死信隊列(Dead Letter Queue)叫做Active.DLQ;所有的未送達消息將發送到這個隊列,導致非常難于管理。此時就可以通過設置 activemq.xml 文件中的 destination policy map 的 “individualDeadLetterStrategy” 屬性來修改。
<broker...> <destinationPolicy> <policyMap> <policyEntries> <policyEntry queue=">"> <deadLetterStrategy> <individualDeadLetterStrategy queuePrefix="DLQ." useQueueForQueueMessages="true" /> deadLetterStrategy> policyEntry> policyEntries> policyMap> destinationPolicy> ... broker>??【2】自動丟棄過期消息(Expired Messages):一些應用可能只是簡單的丟棄過期消息,而不是將它們放到 DLQ。在dead ?letter strategy死信策略上配置 processExpired 屬性為 false,可以實現這個功能。
<broker...> <destinationPolicy> <policyMap> <policyEntries> <policyEntry queue=">"> <deadLetterStrategy> <sharedDeadLetterStrategy processExpired="false" /> deadLetterStrategy> policyEntry> policyEntries> policyMap> destinationPolicy> ... broker> 【3】將非持久信息(non-persistent messages)放入死信隊列 ActiveMQ 缺省不會將未發送到的非持久信息放入死信隊列。如果一個應用程序并不想將消息 message 設置為持久的,那么記錄下來的那些未發送到的消息對它來說往往也就沒有價值。不過如果想實現這個功能,可以在 dead-letter-strategy 死信策略上設置 processNonPersistent="true"。
<broker...> <destinationPolicy> <policyMap> <policyEntries> <policyEntry queue=">"> <deadLetterStrategy> <sharedDeadLetterStrategy processNonPersistent="true" /> deadLetterStrategy> policyEntry> policyEntries> policyMap> destinationPolicy> ... broker> 消息重復發送產生的后果
對于非冪等性的服務而言,如果重復發送消息就會產生嚴重的問題。譬如:銀行取錢,上游支付系統負責給用戶扣款,下游系統負責給用戶發錢,通過MQ異步通知。不管是上游的ACK丟失,導致 MQ收到重復的消息,還是下半場 ACK丟失,導致系統收到重復的出錢通知,都可能出現,上游扣了一次錢,下游發了多次錢。消息隊列的異步操作,通常用于冪等性的服務,非冪等性的服務時不適用中間件進行通信的。更多的是建立長連接 Socket 進行通信的。或者通過如下方式改造。
MQ內部如何做到冪等性的對于每條消息,MQ內部生成一個全局唯一、與業務無關的消息ID:inner-msg-id。當 MQ-server 接收到消息時,先根據 inner-msg-id 判斷消息是否重復發送,再決定是否將消息落地到 DB中。這樣,有了這個 inner-msg-id 作為去重的依據就能保證一條消息只能一次落地到 DB。
消息消費者應當如何做到冪等性
【1】對于非冪等性業務且要求實現冪等性業務:生成一個唯一ID標記每一條消息,將消息處理成功和去重日志通過事物的形式寫入去重表。
【2】對于非冪等性業務可不實現冪等性的業務:權衡去重所花的代價決定是否需要實現冪等性,如:購物會員卡成功,向用戶發送通知短信,發送一次或者多次影響不大。不做冪等性可以省掉寫去重日志的操作。
如何保證消息的有序性
【Active 中有兩種方式保證消息消費的順序性】:【1】通過高級特性 consumer 獨有的消費者(exclusive consumer)。如果一個 queue 設置為 exclusive,broker 會挑選一個 consumer,并且將所有的消息都發給這個 consumer。如果這個 consumer掛了,broker 會自動挑選另外一個 consumer。
queue = new ActiveMQQueue("TEST.QUEUE?consumer.exclusive=true"); consumer = session.createConsumer(queue);【2】利用 Activemq 的高級特性:MessageGroups。Message Groups 特性是一種負載均衡的機制。在一個消息被分發到consumer 之前,broker 首先檢查消息 JMSXGroupID 屬性。如果存在,那么 broker 會檢查是否有某個 consumer 擁有這個message group。如果沒有,那么 broker 會選擇一個 consumer,并將它關聯到這個 message group。此后,這個 consumer 會接收這個 message group 的所有消息,直到:
? ①、Consumer 被關閉。
? ②、Message group 被關閉,通過發送一個消息,并設置這個消息的 JMSXGroupSeq 為 -1。
消費者實際上根據兩個維度排序了,一個是消費者的 Priority,即消費者的優先級。還有一個是消費者的指定的消息組的個數 AssignedGroupCount。這個順序直接影響到下一條消息是誰來接收。
protected boolean assignMessageGroup(Subscription subscription, QueueMessageReference node) throws Exception { boolean result = true; // 保持消息組在一起。 String groupId = node.getGroupID(); int sequence = node.getGroupSequence(); if (groupId != null) { // 先查找該queue存儲的一個groupId,和consumerId的一個map MessageGroupMap messageGroupOwners = getMessageGroupOwners(); // 如果是該組的第一條消息。則指定該consumer消費該消息組 if (sequence == 1) { assignGroup(subscription, messageGroupOwners, node, groupId); } else { // 確保前一個所有者仍然有效,否則就生成新的主人。 ConsumerId groupOwner; groupOwner = messageGroupOwners.get(groupId); if (groupOwner == null) { assignGroup(subscription, messageGroupOwners, node, groupId); } else { if (groupOwner.equals(subscription.getConsumerInfo().getConsumerId())) { // 一個組中的 sequence < 1 表示改組消息已經消費完了 if (sequence < 0) { messageGroupOwners.removeGroup(groupId); subscription.getConsumerInfo().decrementAssignedGroupCount(destination); } } else { // 說明該消費者不能消費該消息組 result = false; } } } } return result;}【RabbitMQ 保證消息隊列的順序性】:造成順序錯亂的場景:RabbitMQ 中有一個 Queue,多個 Consumer。生產者向 RabbitMQ 里發送了三條數據,順序依次是 data1、data2、data3,放入RabbitMQ 的一個內存隊列。有三個消費者分別從 MQ 中消費這三條數據中的一條,可能消費者2先執行完操作,把 data2 存入數據庫,然后是 data1、data3。導致順序錯亂。
【解決方案】:RabbitMQ 將上面的一個 Queue 拆分為三個 Queue,每個 Queue 對應一個 Consumer,就是多一些 Queue 而已,確實是麻煩點;然后這個 Consumer 內部用內存隊列做排隊,然后分發給底層不同的 worker 來處理。如下,將消息放入一個隊列,由一個消費者消費即可保證順序。
【Kafka 保證消息隊列的順序性】: 建了一個 Topic,有三個 Partition。生產者在寫的時候,其實可以指定一個 key,比如說指定了某個訂單 id 作為 key,那么這個訂單相關的數據,一定會被分發到同一個 Partition 中去,而且這個 Partition 中的數據一定是有順序的。消費者從 Partition 中取出來數據的時候,也一定是有順序的。接著,消費者里可能會搞多個線程來并發處理消息。因為如果消費者用單線程時,處理比較耗時。而多線程并發處理時,順序可能就亂序。
【解決方案】:①、一個 topic,一個 partition,一個 consumer,內部單線程消費,單線程吞吐量太低,一般不會用這個。
②、寫 N 個內存 queue,具有相同 key 的數據都到同一個內存 queue;然后對于 N 個線程,每個線程分別消費一個內存 queue 即可,這樣就能保證順序性。
【1】Kafka 是 LinkedIn 開發的一個高性能、分布式的消息系統,廣泛用于日志收集、流式數據處理、在線和離線消息分發等場景。雖然不是作為傳統的 MQ來設計,但在大部分情況下,Kafka 也可以代替原有 ActiveMQ 等傳統的消息系統。
【2】Kafka 將消息流按 Topic 組織,保存消息的服務器稱為 Broker,消費者可以訂閱一個或者多個 Topic。為了均衡負載,一個Topic 的消息又可以劃分到多個分區(Partition),分區越多,Kafka 并行能力和吞吐量越高。
【3】Kafka 集群需要 Zookeeper 支持來實現集群,Kafka 發行包中已經包含了 Zookeeper,部署的時候可以在一臺服務器上同時啟動一個 Zookeeper Server 和 一個 Kafka Server,也可以使用已有的其他 Zookeeper 集群。
【4】和傳統的 MQ 不同,消費者需要自己保留一個 offset,從 Kafka 獲取消息時,只拉取當前 offset 以后的消息。Kafka 的scala/java 版的 Client 已經實現了這部分的邏輯,將 offset 保存到 zookeeper 上。每個消費者可以選擇一個 id,同樣 id 的消費者對于同一條消息只會收到一次。一個 Topic 的消費者如果都使用相同的id,就是傳統的 Queue;如果每個消費者都使用不同的id,就是傳統的 pub-sub。
【如果在 MQ 的場景下,將 Kafka 和 ActiveMQ 相比,Kafka 的優點】:
【1】分布式、高可擴展:Kafka 集群可以透明的擴展,增加新的服務器進集群。
【2】高性能:Kafka 的性能大大超過傳統的 ActiveMQ、RabbitMQ 等 MQ 實現,尤其是 Kafka 還支持 batch 操作。
【3】容錯:Kafka 每個 Partition 的數據都會復制到幾臺服務器上。當某個 Broker 故障失效時,ZooKeeper 服務將通知生產者和消費者,生產者和消費者轉而使用其它 Broker。
【4】高吞吐:在一臺普通的服務器上既可以達到 10W/s 的吞吐速率。
【5】完全的分布式系統:Broker、Producer、Consumer都原生自動支持分布式,自動實現負載均衡。
【6】快速持久化:可以在 O(1) 的系統開銷下進行消息持久化。
【7】游標位置:ActiveMQ 游標由 AMQ來管理,無法讀取歷史數據。Kafka 客戶端自己管理游標,可以重讀數據。
【Kafka 的缺點】:
【1】重復消息:Kafka 只保證每個消息至少會送達一次,雖然幾率很小,但一條消息有可能會被送達多次。
【2】消息亂序:雖然一個 Partition 內部的消息是保證有序的,但是如果一個Topic 有多個Partition,Partition 之間的消息送達不保證有序。
【3】復雜性:Kafka 需要 zookeeper 集群的支持,Topic 通常需要人工來創建,部署和維護較一般消息隊列成本更高。
? MQ 是非線程安全的【Kafka 架構】:【1】Producers(生產者):生產者是發送一個或多個主題 Topic 的發布者。生產者向 Kafka 代理發送數據。每當生產者將消息發布給代理時,代理只需要將消息附加到最后一個段文件。實際上,該消息將被附加到分區。生產者也可以向指定的分區發送消息。
【2】Brokers:代理(經紀人)負責維護發布數據的簡單系統。
【3】Topic:主題屬于特定類別的信息流稱為主題。數組存儲在主題中。Topic 相當于 Queue。主題被拆分成分區。分區被實現為具有大小相等的一組分段文件。
【4】Partition(分區):每個 Partition 內部消息有序,其中每個消息都有一個 offset 序號。一個 Partition 值對應一個 Broker,一個 Broker 可以管理多個 Partition。
【5】Segment:Partition 物理上由多個 Segment組成。每個 Partion 目錄相當于一個巨型文件被平均分配到多個大小相等segment 段數據文件中。但每個段 segment file消息數量不一定相等
【6】Partition offset(分區偏移):每個 Partition 都由一系列有序的、不可變的消息組成,這些消息被連續的追加到 Partition中。Partition 中的每個消息都有一個連續的序列號叫做 offset,用于 Partition唯一標識一條消息。
【7】Replicas of partition(分區備份):副本只是一個分區備份:不讀取和寫入數據,主要用于防止數據丟失。
【8】Kafka Cluster(Kafka 集群):Kafka 有多個代理被稱為 Kafka集群。可以擴展 Kafka集群,無需停機。這些集群用于管理消息數據的持久性和復制。
【9】Consumers(消費者):Consumers 從 MQ讀取數據。消費者訂閱一個或多個主題,并通過從代理中提取數據來使用已發布的消息。Consumer 自己維護消費到哪個 offset。
【每個Consumer 都有對應的 group】:【1】group 內是 queue 消費模型:各個 Consumer 消費不同的 Partition,因此一個消息在 group 內只消費一次。
【2】group 間是 publish-subscribe 消費模型:各個 group 各自獨立消費,互不影響,因此一個消息被每個 group 消費一次。
MQ 系統的數據如何保證不丟失
【Producer 數據丟失的原因】:【1】使用同步模式的時候,有 3種狀態保證消息被安全生產,當配置 ack=1時(只保證寫入Leader成功)的話,如果剛好 Leader partition 掛了,數據就會丟失。
ack 機制:broker 表示發來的數據已確認接收無誤,表示數據已經保存到磁盤。
?0:不等待 broker 返回確認消息
?1:等待 topic 中某個 partition leader 保存成功的狀態反饋
-1/all:等待 topic 中某個 partition 所有副本都保存成功的狀態反饋
【2】使用異步模式時,當緩沖區滿了,如果配置=0(還沒有收到確認的數據,數據就立即被丟棄掉)。
【解決辦法】:只要能避免以上兩種情況就可以保證消息不會被丟失。如下:
【1】當同步模式時,確認機制設置為-1,就是讓消息寫入 Leader 和所有副本。
【2】當異步模式時,消息發出,還沒收到確認的時候,緩沖區也滿了。在配置文件中設置成不限制阻塞超時的時間,也就是說讓生產者一直阻塞,這樣就能保證數據不會丟失。
producer.type = async
request.required.acks=1
queue.buffering.max.ms=5000 #異步發送的時候 發送時間間隔 單位是毫秒
queue.buffering.max.messages=10000
queue.enqueue.timeout.ms = -1
batch.num.messages=200 #異步發送 每次批量發送的條目【Kafka弄丟了數據】:Kafka 的某個 Broker宕機了,然后重新選舉Broker 上的 Partition 的 Leader時。如果此時 Follower還沒來得及同步數據,Leader就掛了,然后某個 Follower成為了 Leader,他就少了一部分數據。
【解決辦法】:一般要求設置 4個參數來保證消息不丟失:
【1】給 Topic設置 replication.factor 參數:這個值必須大于1,表示要求每個 Partition必須至少有2個副本。
【2】在 Kafka服務端設置 min.isync.replicas參數:這個值必須大于1,表示要求一個 Leader至少感知到有至少一個 Follower在跟自己保持聯系正常同步數據,這樣才能保證 Leader掛了之后還有一個 Follower。
【3】在生產者端設置 acks= -1:要求每條數據,必須是寫入所有 Replica 副本之后,才能認為是寫入成功了。
【4】在生產者端設置? retries=MAX(很大的一個值,表示無限重試):表示消息一旦寫入事變,就無限重試。
【Consumer 數據丟失的原因】:當你消費到了這個消息,然后消費者那邊自動提交了offset,讓 kafka 以為你已經消費好了這個消息,其實你剛準備處理這個消息,你還沒處理,你自己就掛了,此時這條消息就丟了。
【解決辦法】:【1】Kafka 會自動提交 offset,使用 Kafka高級API,如果將自動提交 offset 改為手動提交(當數據入庫之后進行偏移量的更新),就可以保證數據不會丟。但是可能導致重復消費,比如你剛處理完,還沒有提交 offset,結果自己掛了,此時肯定會重復消費一次,自己保證冪等性就好了。
RabbitMQ 如何實現集群高可用
【鏡像模式】:隊列的數據都鏡像了一份到所有的節點上。這樣任何一個節點失效,不會影響到整個集群的使用。在實現上 mirror queue 內部有一套選舉算法,會選出一個 master 和若干的 slaver。master 和 slaver 通過相互之間不斷的發送心跳來檢查是否連接斷開。可以通過指定 net_ticktime 來控制心跳檢查頻率。注意一個單位時間 net_ticktime 實際上做了4次交互,故當超過net_ticktime (± 25%) 秒沒有響應的話則認為節點掛掉。另外注意修改 net_ticktime 時需要所有節點都一致。配置舉例:
{rabbit, [{tcp_listeners, [5672]}]},
{kernel, [{net_ticktime, 120}]}【Consumer】:任意連接一個節點,若連上的不是 Master,請求會轉發給 Master,為了保證消息的可靠性,Consumer 回復 Ack 給 Master 后,Master 刪除消息并廣播所有的 Slaver 去刪除;
【Publisher】:任意連接一個節點,若連上的不是 Master,則轉發給 Master,由 Master存儲并轉發給其他的 Slaver存儲;
【如果 Slaver 掛掉】:則集群的節點狀態沒有任何變化。只要 Client 沒有連到這個節點上,也不會給 Client 發送失敗的通知。在檢測到 Slaver 掛掉的期間 Publish 消息會有延遲。如果配置了高可用策略是自動同步,當 Slaver 起來后,隊列中有大量的消息需要同步,將會整個集群阻塞長時間的不能讀寫直到同步結束;
【RabbitMQ 實現了一種鏡像隊列(mirrored queue)的算法提供HA】:創建隊列時可以通過傳入“x-ha-policy”參數設置隊列為鏡像隊列,鏡像隊列會存儲在多個 Rabbit MQ 節點上,并配置成一主多從的結構,可以通過“x-ha-policy-params”參數來具體指定master 節點和 slave節點的列表。所有發送到鏡像隊列上的操作,比如消息的發送和刪除,都會先在 master節點上執行,再通過一種叫 GM(Guaranteed Multicast)的原子廣播(atomic broadcast)算法同步到各 slave節點。GM算法通過兩階段的提交,可以保證 master節點發送到所有 slave節點上的消息要么全部執行成功,要么全部失敗;通過環形的消息發送順序,即 master節點發送消息給一個 slave節點,這個 slave節點依次發送給下一個 slave節點,最終消息回到 master節點,保證了主從節點上的負載差別不大。通過傳入“x-ha-policy”參數設置隊列為鏡像隊列(mirrored queue):定義一個policy:以“ha.”開頭的隊列都被鏡像到集群中的所有節點上:rabbitmqctl set_policy ha-all "^ha\." '{"ha-mode":"all"}'。定義一個policy:以“cinder”開頭的隊列被鏡像到集群中的任意兩個節點上,并且自動同步:rabbitmqctl set_policy ha-cinder-two "^cinder"或者設置'{"ha-mode":"exactly","ha-params":2,"ha-sync-mode":"automatic"}';
? all:隊列將 mirrored 到所有集群中的節點中,當新節點添加進來時也會 mirrored 到新的節點;
? exactly(需指定count):如果節點數小于 count 數,則隊列將 mirrored 到所有的節點。如果節點數大于 count,新的節點將不再創建隊列的 mirror(即使原來已創建 mirror 的節點掛掉也不會創建);
? nodes:對指定的節點進行 mirror。如果沒有一個指定的節點在運行中,那么只有 client 連接的那個節點才會聲明 queue(這里有個遷移策略:假如 queue是在[A,B]上且A為 master,若給定的新的策略為nodes[C,D],那么為了防止數據丟失,在遷移中會同時存在[A,C,D]直到C,D已經同步好以后,A才會關閉);
Kafka 吞吐量高的原因
【1】順序讀寫磁盤,充分利用了操作系統的預讀機制。
【2】Linux 中使用 sendfile 命令,減少一次數據拷貝:
?? ①、把數據從硬盤讀取到內核中的頁緩存。
?? ②、把數據從內核中讀取到用戶空間(sendfile 命令跳過此步驟)。
?? ③、把用戶空間的數據寫到 socket 緩存區中。
?? ④、操作系統將數據從 socket 緩沖區中復制到網卡緩沖區,以便將數據經網絡發出。
【3】生產者緩存消息批量發送,消費者批量從 broker 獲取消息,減少 IO 次數,充分利用磁盤順序讀寫的性能。
【4】通常情況下 Kafka 的瓶頸不是 CPU或者磁盤,而是網絡寬帶,所以生產者可以對數據進行壓縮。
Kafka 和其他消息隊列的區別
【與 RabbitMQ 的區別】:? RabbitMQ:用在實時的對可靠性要求比較高的消息傳遞上。kafka:用于處理活躍的流式數據,大數據量的數據處理上。
【1】在架構模型方面:RabbitMQ 遵循 AMQP 協議,RabbitMQ 的 Broker由 Exchange、Binding、Queue 組成,其中 Exchange 和 Binding 組成了消息的路由鍵;Producer 通過連接 Channel 和 Server 進行通信,Consumer 從 Queue 獲取消息進行消費(長連接,queue 有消息會推送到 consumer端,consumer 循環從輸入流讀取數據)。rabbitMQ 以 Broker為中心;有消息的確認機制。
? ? kafka 遵從一般的MQ結構,Producer,Broker,Consumer,以 Consumer為中心,消費信息保存的客戶端 Consumer上,Consumer根據消費的點,從 Broker上批量 pull數據,無消息確認機制。
【2】在吞吐量方面:RabbitMQ在吞吐量方面稍遜于Kafka,他們的出發點不一樣,RabbitMQ支持對消息的可靠的傳遞,支持事務,不支持批量的操作;基于存儲的可靠性的要求存儲可以采用內存或者硬盤。
? ? kafka具有高的吞吐量,內部采用消息的批量處理,zero-copy(sendfile 函數) 機制,數據的存儲和獲取是本地磁盤順序批量操作,具有O(1)的復雜度,消息處理的效率很高。
?【3】在可用性方面:RabbitMQ 支持 mirror 的 queue,主 queue失效,mirror queue接管。
? ? Kafka 的 Broker支持主備模式。
?【4】在集群負載均衡方面:RabbitMQ 的負載均衡需要單獨的 loadbalancer 進行支持。
? ? Kafka 采用 Zookeeper對集群中的 Broker、Consumer進行管理,可以注冊 Topic 到Zookeeper上;通過 Zookeeper的協調機制,Producer 保存對應 Topic的 Broker信息,可以隨機或者輪詢發送到 Broker上;并且 Producer可以基于語義指定分片,消息發送到 Broker的某分片上。
【與 ActiveMQ 的區別】:ActiveMQ 和 Kafka,前者完全實現了 JMS 的規范,后者并沒有糾結于JMS規范,設計了另一套吞吐非常高的分布式發布-訂閱消息系統,非常流行。目前歸屬于 Apache 定級項目。它只用文件系統來管理消息的生命周期。接下來我們結合三個點(消息安全性,服務器的穩定容錯性以及吞吐量)來分別談談這兩個消息中間件。
【1】消息的安全性:Kafka 集群中的 Leader 負責某一 Topic 的某一 Partition 的消息的讀寫,理論上 Consumer 和 Producer 只與該 Leader 節點打交道,一個集群里的某一 Broker 即是 Leader 的同時也可以擔當某一 Partition 的 Follower,即 Replica。Kafka 分配 Replica 的算法如下:
(1)將所有 Broker(假設共n個Broker)和待分配的 Partition排序。
(2)將第i個 Partition分配到第(i mod n)個 Broker上。
(3)將第i個 Partition的第j個 Replica分配到第((i + j) mod n)個 Broker上。
同時,Kafka 與 Replica 既非同步也不是嚴格意義上的異步。一個典型的 Kafka 發送-消費消息的過程如下:首先 Producer消息發送給某 Topic 的某 Partition 的 Leader,Leader 先是將消息寫入本地 Log,同時 follower(如果落后過多將會被踢出 Replica列表)從Leader上 pull 消息,并且在未寫入 log 的同時即向 Leader 發送 ACK 的反饋,所以對于某一條已經算作 commit 的消息來講,在某一時刻,其存在于 Leader的 log中,以及 Replica的內存中。這可以算作一個危險的情況(聽起來嚇人),因為如果此時集群掛了這條消息就算丟失了,但結合 producer的屬性(request.required.acks=-1,當所有follower都收到消息后返回ack)可以保證在絕大多數情況下消息的安全性。當消息算作 commit的時候才會暴露給 consumer,并保證 at-least-once的投遞原則。
【2】服務的穩定容錯性:前面提到過,Kafka天然支持HA,整個 leader/follower 機制通過 zookeeper調度,它在所有 Broker中選出一個 controller,所有 Partition的 Leader選舉都由 controller決定,同時 controller也負責增刪 Topic以及 Replica的重新分配。如果Leader掛了,集群將在ISR(in-sync replicas)中選出新的Leader,選舉基本原則是:新的 Leader必須擁有原來的 Leader commit 過的所有消息。假如所有的 follower都掛了,Kafka會選擇第一個“活”過來的 Replica(不一定是ISR中的)作為 Leader,因為如果此時等待 ISR中的 Replica是有風險的,假如所有的ISR都無法“活”,那此 Partition將會變成不可用。
【3】吞吐量:Leader 節點負責某一 Topic(可以分成多個 Partition)的某一 Partition的消息的讀寫,任何發布到此 Partition的消息都會被直接追加到 log文件的尾部,因為每條消息都被 append 到該 Partition中,是順序寫磁盤,因此效率非常高(經驗證,順序寫磁盤效率比隨機寫內存還要高,這是 Kafka高吞吐率的一個很重要的保證),同時通過合理的 Partition,消息可以均勻的分布在不同的 Partition里面。Kafka基于時間或者 Partition的大小來刪除消息,同時 Broker是無狀態的,Consumer的消費狀態(offset)是由Consumer 自己控制的(每一個 Consumer實例只會消費某一個或多個特定 Partition的數據,而某個 Partition的數據只會被某一個特定的 Consumer實例所消費),也不需要 Broker通過鎖機制去控制消息的消費,所以吞吐量驚人,這也是 Kafka吸引人的地方。最后說下由于 zookeeper 引起的腦裂(Split Brain)問題:腦裂問題就是產生了兩個 Leader,導致集群行為不一致了。1個集群如果發生了網絡故障,很可能出現1個集群分成了兩部分,而這兩個部分都不知道對方是否存活,不知道到底是網絡問題還是直接機器down了,所以這兩部分都要選舉1個Leader,而一旦兩部分都選出了Leader, 并且網絡又恢復了,那么就會出現兩個 Brain的情況,整個集群的行為不一致了。解決:只有集群中超過半數節點投票才能選舉出 Leader。ZooKeeper默認采用了這種方式。
【Kafka 的設計目標】:kafka在 設計之初就需要考慮以下5個方面的問題
【1】以時間復雜度為O(1)的方式提供消息持久化能力,即使對 TB級以上數據也能保證常數時間復雜度的訪問性能。
【2】高吞吐率,即使在非常廉價的商用機器上也能做到單機支持每秒100K條以上消息的傳輸。
【3】支持Kafka Server間的消息分區,及分布式消費,同時保證每個Partition內的消息順序傳輸。
【4】同時支持離線數據處理和實時數據處理。
【5】Scale out:支持在線水平擴展。
所以,不像 AMQ,Kafka 從設計開始極為高可用為目的,天然 HA。Broker 支持集群,消息亦支持負載均衡,還有副本機制。同樣,Kafka 也是使用 Zookeeper 管理集群節點信息,包括 Consumer 的消費信息也是保存在 zk 中,下面我們分話題來談:
【和傳統的MQ不同】:消費者需要自己保留一個offset,從kafka 獲取消息時,只拉取當前offset 以后的消息。將 offset 保存到 zookeeper 上。每個消費者可以選擇一個id,同樣id 的消費者對于同一條消息只會收到一次。一個Topic 的消費者如果都使用相同的id,就是傳統的 Queue;如果每個消費者都使用不同的id, 就是傳統的pub-sub。
【kafka 主從同步怎么實現】:Kafka 的主從同步,主要是針對它的 Broker來說。在 Kafka 的 Broker 中,同一個 Topic 可以被分配成多個 Partition,每個 Partition的可以有一個或者多個 replicas(備份),即會有一個 Leader 以及 0到多個 Follower,在Consumer 讀取數據的時候,只會從 Leader上讀取數據,Follower只是在 Leader宕機的時候來替代 Leader,主從同步有兩種方式:同步復制和異步復制,Kafka采用的是中間策略 ISR(In Sync Replicas)。
【Kafka 的 ISR策略】:有數據寫 Leader的時候,Leader會查看 Follower組成的 ISR列表,并且符合以下兩點才算是屬于 ISR列表:【1】Broker 可以維護和 zookeeper的連接,zookeeper通過心跳機制檢查每個節點的連接。【2】如果節點是個 Follow它必須能及時同步 Leader的寫操作,不能延時太久。當有寫消息的時候,我們可以根據配置做如下配置:request.required.acks 參數的設置來進行調整:
?? 0 ,相當于異步發送,消息發送完畢即 offset增加,繼續生產;相當于At most once;
?? 1,Leader 收到Leader Replica 對一個消息的接收 ack才增加 offset,然后繼續生產;
?? -1,Leader 收到所有 Replica 對一個消息的接收 ack才增加 offset,然后繼續生產;
MQ 的消息延遲了怎么處理
【1】延遲處理:可以通過設置延遲級別,控制消息延遲的時間。
【2】設置過期時間:
<broker> ... <plugins> <timeStampingBrokerPluginttlCeiling="30000" zeroExpirationOverride="30000" /> plugins> ... broker>?? 1)Message 過期則客戶端不能接收;
?? 2)ttlCeiling:表示過期時間上限(程序寫的過期時間不能超過此時間);
?? 3)zeroExpirationOverride:表示過期時間(給未分配過期時間的消息分配過期時間);
【3】過期消息處理辦法:消息過期后會進入死信隊列,如不想拋棄死信隊列,默認進入 ACTIVEMQ.DLQ隊列,且不會自動清除;對于過期的消息進入死信隊列還有一些可選的策略:放入各自的死信通道、保存在一個共享的隊列(默認),且可以設置是否將過期消息放入隊列的開關以及死信隊列消息過期時間。
?? 1)直接拋棄死信隊列:AcitveMQ提供了一個便捷的插件:DiscardingDLQBrokerPlugin,來拋棄DeadLetter。如果開發者不需要關心DeadLetter,可以使用此策略。
<broker>... <plugins> <discardingDLQBrokerPlugindropAll="true" dropTemporaryTopics="true" dropTemporaryQueues="true" /> plugins> ... broker>? 2)定時拋棄死信隊列:默認情況下,ActiveMQ永遠不會過期發送到 DLQ的消息。但是,從 ActiveMQ5.12開始,deadLetterStrategy 支持 expiration屬性,其值以毫秒為單位。
<policyEntryqueue=">"…> ... <deadLetterStrategy> <sharedDeadLetterStrategy processExpired="true" expiration="30000"/> deadLetterStrategy> ...policyEntry>?? 3)慢消費者策略設置:Broker將會啟動一個后臺線程用來檢測所有的慢速消費者,并定期關閉它們;中斷慢速消費者,慢速消費將會被關閉。abortConnection是否關閉連接;如果慢速消費者最后一個ACK距離現在的時間間隔超過閥 maxTimeSinceLastAck,則中斷慢速消費者。
<policyEntryqueue=">"…> … <slowConsumerStrategy> <abortSlowConsumerStrategyabortConnection="false"/> slowConsumerStrategy> …policyEntry>利用 MQ 怎么實現最終一致性
RabbitMQ 遵循了 AMQP 規范,用消息確認機制來保證:只要消息發送,就能確保被消費者消費,來做到了消息最終一致性。Rabbitmq 的整個發送過程如下:【1】生產者發送消息到消息服務。
【2】如果消息落地持久化完成,則返回一個標志給生產者。生產者拿到這個確認后,才能放心的說消息終于成功發到消息服務了。否則進入異常處理流程。
【3】消息服務將消息發送給消費者。
【4】消費者接受并處理消息,如果處理成功則手動確認。當消息服務拿到這個確認后,才放心的說終于消費完成了。否則重發,或者進入異常處理。
使用 kafka 有沒有遇到什么問題,怎么解決的
【問題】:兩臺設備上只有一個上存在 logs;
【基本情況】:一個 Topic 配置了四個 Partition,一個 Consumer Group 消費此Topic,但使用兩臺服務器,分別創建 Consumer 實例。都運行日志收集程序。
【問題】:Consumer Group 是將消費到的日志寫入服務器磁盤文件中。有兩臺服務器都在運行此日志收集程序,每個服務器上的程序都創建了一個 Group 的 Consumer實例,此 Consumer實例會分配到兩個 Partition進行處理,因此每個服務器都只存儲了一部分日志文件。但是在測試時發現,所有日志都寫入了 ServerA,ServerB上沒有日志,即便使用測試工具發送了大量數據,ServerB仍然沒有日志。
【原因】:查看 log發現,ServerA 上的 Consumer實例分配的 Partition 為 Partition_0 / Partition_1,serverB 上的 Consumer實例分配的 Partition 為partition_3 / Partition_4,兩個 Server上的 Consumer實例都被分配了Partition,Partition分配正常,消費應該沒有問題。ServerB 上沒有日志數據,說明沒有數據供其消費,也就是說,所有數據都被 Producer發送到了 Partition_1 或Partition_2 上,這是生產的問題,應該是與生產者的分區路由有關,因此有必要了解下生產者的分區路由策略。Kafka 中的每個Topic 分配了4個 Partition,生產者(Producer)在將消息記錄(ProducerRecord)發送到某個 Topic時是要選擇對應的 Partition的,選擇 Partition的策略如下:
【1】消息中指定Partition:判斷 Partition字段是否有值,有值就直接將該消息發送到指定的 Partition就行;
【2】如果沒有指定分區(Partition),則使用分區器進行分區路由,首先判斷消息中是否指定了key;
【3】如果指定了key,則使用該 key進行 hash操作,并轉為正數,然后將其對 Topic相應的分區數進行取余操作,得到一個分區;
【4】如果沒有指定key,則在一個隨機數上以自增的方式產生一個數(第一次時生成隨機數,之后在其基礎上進行自增),轉為正數之后對分區數量進行取余操作,得到一個分區。
由于在程序中 Producer發送記錄的時候指定了固定的 key,根據這個 key進行分區路由總是會選擇同一個分區,所有日志都被發送給了同一個分區,因此只有關聯這個分區的 Consumer實例才能消費,只有此 Consumer實例所在的 Server上才有日志。
Kafka 中的 ISR、AR又代表什么
【1】ISR:In-Sync Replicas 副本同步隊列;
【2】AR:Assigned Replicas 所有副本;
ISR是由 Leader維護,Follower 從 Leader同步數據有一些延遲(包括延遲時間 replica.lag.time.max.ms 和延遲條數replica.lag.max.messages 兩個維度, 當前最新的版本0.10.x中只支持 replica.lag.time.max.ms這個維度),任意一個超過閾值都會把 Follower剔除出 ISR,存入OSR(Outof-Sync Replicas)列表,新加入的 Follower 也會先存放在OSR中。AR=ISR+OSR。
十七、Kafka 為什么不支持讀寫分離
在 Kafka 中這種功能完全可以支持,同時主寫從讀可以讓從節點去分擔主節點的負載壓力,預防主節點負載過重而從節點卻空閑的情況發生。但是主寫從讀也有 2 個很明顯的缺點:
【1】數據一致性問題。數據從主節點轉到從節點必然會有一個延時的時間窗口,這個時間窗口會導致主從節點之間的數據不一致。某一時刻,在主節點和從節點中 A 數據的值都為 X, 之后將主節點中 A 的值修改為 Y,那么在這個變更通知到從節點之前,應用讀取從節點中的 A 數據的值并不為最新的 Y,由此便產生了數據不一致的問題。
【2】延時問題。類似 Redis 這種組件,數據從寫入主節點到同步至從節點中的過程需要經歷網絡→主節點內存→網絡→從節點內存這幾個階段,整個過程會耗費一定的時間。而在 Kafka 中,主從同步會比 Redis 更加耗時,它需要經歷網絡→主節點內存→主節點磁盤→網絡→從節 點內存→從節點磁盤這幾個階段。對延時敏感的應用而言,主寫從讀的功能并不太適用。
【Kafka 架構導致我們沒有必要使用主從分離】在 Kafka 中 這種負載均衡是在主寫主讀的架構上實現的。我們來看 一下 Kafka 的生產消費模型,如下圖所示。
在 Kafka 集群中有 3 個分區,每個分區有 3 個副本,正好均勻地分布在 3個 broker 上,灰色陰影的代表 Leader 副本,非灰色陰影的代表 Follower 副本,虛線表示 Follower 副本從 Leader 副本上拉取消息。當生產者寫入消息的時候都寫入 Leader 副本,對于圖中的情形,每個 Broker 都有消息從生產者流入。當消費者讀取消息的時候也是從 Leader 副本中讀取 的,對于圖中的情形,每個 Broker 都有消息流出到消費者。從而將壓力分配到每個服務器上,從而實現了負載均衡功能。
ZK 在 kafka 中的作用
【1】Broker 注冊:Broker 是分布式部署并且相互之間相互獨立,但是需要有一個注冊系統能夠將整個集群中的 Broker管理起來,此時就使用到了 Zookeeper。在 Zookeeper上會有一個專門用來進行 Broker服務器列表記錄的節點:/brokers/ids 每個Broker在啟動時,都會到 Zookeeper上進行注冊,即到 /brokers/ids下創建屬于自己的節點,如/brokers/ids/[0...N]。Kafka 使用了全局唯一的數字來指代每個 Broker服務器,不同的 Broker必須使用不同的 Broker ID進行注冊,創建完節點后,每個 Broker就會將自己的 IP地址和端口信息記錄到該節點中去。其中,Broker創建的節點類型是臨時節點,一旦 Broker宕機,則對應的臨時節點也會被自動刪除。
【2】Topic 注冊:在 Kafka中,同一個Topic的消息會被分成多個分區并將其分布在多個 Broker上,這些分區信息及與Broker的對應關系也都是由 Zookeeper在維護,由專門的節點來記錄,如:/borkers/topics Kafka 中每個 Topic都會以 /brokers/topics/[topic] 的形式被記錄,如 /brokers/topics/login 和 /brokers/topics/search 等。Broker服務器啟動后,會到對應 Topic節點(/brokers/topics)上注冊自己的 Broker ID并寫入針對該 Topic的分區總數,如 /brokers/topics/login/3->2,這個節點表示Broker ID為3的一個 Broker服務器,對于"login" 這個 Topic的消息,提供了2個分區進行消息存儲,同樣,這個分區節點也是臨時節點。
【3】生產者負載均衡:由于同一個 Topic消息會被分區并將其分布在多個 Broker上,因此,生產者需要將消息合理地發送到這些分布式的Broker上,那么如何實現生產者的負載均衡,Kafka 支持傳統的四層負載均衡,也支持 Zookeeper方式實現負載均衡。
?? ■? 四層負載均衡,根據生產者的 IP地址和端口來為其確定一個相關聯的 Broker。通常,一個生產者只會對應單個 Broker,然后該生產者產生的消息都發往該Broker。這種方式邏輯簡單,每個生產者不需要同其他系統建立額外的 TCP連接,只需要和 Broker維護單個 TCP連接即可。但是,其無法做到真正的負載均衡,因為實際系統中的每個生產者產生的消息量及每個 Broker的消息存儲量都是不一樣的,如果有些生產者產生的消息遠多于其他生產者的話,那么會導致不同的 Broker接收到的消息總數差異巨大,同時,生產者也無法實時感知到 Broker的新增和刪除。
?? ■? 使用 Zookeeper進行負載均衡,由于每個Broker啟動時,都會完成 Broker注冊過程,生產者會通過該節點的變化來動態地感知到 Broker服務器列表的變更,這樣就可以實現動態的負載均衡機制。
【4】消費者負載均衡:與生產者類似,Kafka 中的消費者同樣需要進行負載均衡來實現多個消費者合理地從對應的 Broker服務器上接收消息,每個消費者分組包含若干消費者,每條消息都只會發送給分組中的一個消費者,不同的消費者分組消費自己特定的Topic下面的消息,互不干擾。
【5】分區與消費者的關系:消費組 (Consumer Group)下有多個 Consumer(消費者)。對于每個消費者組 (Consumer Group),Kafka 都會為其分配一個全局唯一的Group ID,Group 內部的所有消費者共享該 ID。訂閱的 Topic下的每個分區只能分配給某個 group 下的一個 Consumer(當然該分區還可以被分配給其他 group)。同時,Kafka為每個消費者分配一個Consumer ID,通常采用"Hostname:UUID"形式表示。在 Kafka中,規定了每個消息分區只能被同組的一個消費者進行消費,因此,需要在 Zk 上記錄消息分區與 Consumer 之間的關系,每個消費者一旦確定了對一個消息分區的消費權力,需要將其Consumer ID 寫入到 Zookeeper 對應消息分區的臨時節點上,例如:/consumers/[group_id]/owners/[topic]/[broker_id-partition_id] 其中,[broker_id-partition_id]就是一個消息分區的標識,節點內容就是該消息分區上消費者的 Consumer ID。
【6】消息消費進度Offset 記錄:在消費者對指定消息分區進行消息消費的過程中,需要定時地將分區消息的消費進度 Offset記錄到Zookeeper上,以便在該消費者進行重啟或者其他消費者重新接管該消息分區的消息消費后,能夠從之前的進度開始繼續進行消息消費。Offset 在 Zookeeper中由一個專門節點進行記錄,其節點路徑為:/consumers/[group_id]/offsets/[topic]/[broker_id-partition_id] 節點內容就是 Offset的值。
【7】消費者注冊:消費者服務器在初始化啟動時加入消費者分組的步驟如下:注冊到消費者分組。每個消費者服務器啟動時,都會到 Zookeeper的指定節點下創建一個屬于自己的消費者節點,例如/consumers/[group_id]/ids/[consumer_id],完成節點創建后,消費者就會將自己訂閱的 Topic信息寫入該臨時節點。對消費者分組中的消費者的變化注冊監聽。每個消費者都需要關注所屬消費者分組中其他消費者服務器的變化情況,即對/consumers/[group_id]/ids節點注冊子節點變化的 Watcher監聽,一旦發現消費者新增或減少,就觸發消費者的負載均衡。對Broker服務器變化注冊監聽。消費者需要對/broker/ids/[0-N]中的節點進行監聽,如果發現 Broker服務器列表發生變化,那么就根據具體情況來決定是否需要進行消費者負載均衡。進行消費者負載均衡。為了讓同一個Topic下不同分區的消息盡量均衡地被多個消費者消費而進行消費者與消息分區分配的過程,通常,對于一個消費者分組,如果組內的消費者服務器發生變更或 Broker服務器發生變更,會發出消費者負載均衡。
【ZK 的詳細存儲結構圖】:
早期版本的 kafka 用 zk 做 meta 信息存儲,consumer 的消費狀態,group 的管理以及 offset 的值。考慮到 zk本身的一些因素以及整個架構較大概率存在單點問題,新版本中確實逐漸弱化了 zookeeper的作用。新的 consumer使用了 kafka內部的 group coordination 協議,也減少了對 zookeeper的依賴。
Kafka Follower 如何與 Leader同步數據
Kafka 使用 ISR的方式很好的均衡了確保數據不丟失以及吞吐率。Follower 可以批量的從 Leader復制數據,而且Leader充分利用磁盤順序讀以及send file(zero copy)機制,這樣極大的提高復制性能,內部批量寫磁盤,大幅減少了 Follower與 Leader的消息量差。所有的 Follower 都復制 Leader 的日志,日志中的消息和順序都和 Leader 中的一致。Follower 像普通的 Consumer 那樣從 Leader 那里拉取消息并保存在自己的日志文件中。ISR 中有f+1個節點,就可以允許在f個節點 Down掉的情況下不會丟失消息并正常提供服。ISR 的成員是動態的,如果一個節點被淘汰了,當它重新達到“同步中”的狀態時,他可以重新加入ISR。因此如果 Leader宕了,直接從 ISR中選擇一個 Follower就行。只有當消息被所有的副本加入到日志中時,才算是“committed”,只有 committed的消息才會發送給 consumer,這樣就不用擔心 Leader Down掉了消息會丟失。Kafka 選擇一個節點作為“controller”,當發現有Leader 節點 Down掉的時候它負責在LSR 分區的所有節點中選擇新的 Leader,這使得 Kafka可以批量的高效的管理所有分區節點的主從關系。如果 controller down掉了,活著的節點中的一個會被切換為新的 controller。
什么情況下 Follower 會從 ISR 中踢除
Leader 維護一個與其基本保持同步的 Replica列表,該列表稱為 ISR(in-sync Replica),每個 Partition都會有一個 ISR,而且是由Leader動態維護 ,如果 Follower 比 Leader落后太多消息數量【replica.lag.max.messages】,或者超過一定時間未發起數據復制請求【replica.lag.time.max.ms】,則 Leader將其從 ISR中移除 。
Kafka 為什么那么快
Kafka 的消息是保存或緩存在磁盤上的,一般認為在磁盤上讀寫數據是會降低性能的,因為尋址會比較消耗時間,但是實際上,Kafka 的特性之一就是高吞吐率。Kafka 之所以能這么快,是因為順序寫磁盤、大量使用內存頁 、零拷貝技術的使用
【數據寫入】:Kafka 會把收到的消息都寫入到硬盤中,不會丟失數據。為了優化寫入速度 Kafka 采用了兩個技術, 順序寫入和 MMFile(Memory Mapped?File)
【原因一:順序寫入】:磁盤讀寫的快慢取決于你怎么使用它,也就是順序讀寫或者隨機讀寫。在順序讀寫的情況下,磁盤的順序讀寫速度和內存持平。因為硬盤是機械結構,每次讀寫都會尋址->寫入,其中尋址是一個“機械動作”,它是最耗時的。所以硬盤最討厭隨機 I/O,最喜歡順序 I/O。為了提高讀寫硬盤的速度,Kafka 就是使用順序 I/O。如果在內存做這些操作的時候,一個是 Java 對象的內存開銷很大,另一個是隨著堆內存數據的增多,Java 的 GC 時間會變得很長。
【使用磁盤操作有以下幾個好處】:①、磁盤順序讀寫速度超過內存隨機讀寫。②、JVM 的 GC 效率低,內存占用大。使用磁盤可以避免這一問題。③、系統冷啟動后,磁盤緩存依然可用。下圖就展示了 Kafka 是如何寫入數據的, 每一個 Partition 其實都是一個文件 ,收到消息后 Kafka 會把數據插入到文件末尾(虛框部分):
該方法的缺陷:沒有辦法刪除數據 ,所以 Kafka 是不會刪除數據的,它會把所有的數據都保留下來,每個消費者(Consumer)對每個 Topic 都有一個 Offset 用來表示讀取到了第幾條數據 。
如果不刪除硬盤肯定會被撐滿,所以 Kakfa 提供了兩種策略來刪除數據:
【1】基于時間;
【2】基于 Partition 文件大小;
【原因二:Memory Mapped Files】:即便是順序寫入硬盤,硬盤的訪問速度還是不可能追上內存。所以 Kafka 的數據并不是實時的寫入硬盤 ,它充分利用了現代操作系統分頁存儲來利用內存提高 I/O 效率。Memory Mapped Files(后面簡稱 mmap)也被翻譯成內存映射文件 ,在 64 位操作系統中一般可以表示 20G 的數據文件,它的工作原理是直接利用操作系統的 Page 來實現文件到物理內存的直接映射。完成映射之后你對物理內存的操作會被同步到硬盤上(操作系統在適當的時候)。通過 mmap,進程像讀寫硬盤一樣讀寫內存(當然是虛擬機內存),也不必關心內存的大小,有虛擬內存為我們兜底。使用這種方式可以獲取很大的 I/O 提升,省去了用戶空間到內核空間復制的開銷。(調用文件的 Read 會把數據先放到內核空間的內存中,然后再復制到用戶空間的內存中)但也有一個很明顯的缺陷:不可靠,寫到 mmap 中的數據并沒有被真正的寫到硬盤,操作系統會在程序主動調用 Flush 的時候才把數據真正的寫到硬盤。Kafka 提供了一個參數 producer.type 來控制是不是主動 Flush:如果 Kafka 寫入到 mmap 之后就立即 Flush,然后再返回 Producer 叫同步 (Sync)。如果 Kafka 寫入 mmap 之后立即返回 Producer 不調用 Flush 叫異步 (Async)。
【原因三:Zero Copy】:傳統模式下,當需要對一個文件進行傳輸的時候,其具體流程細節如下:調用 Read 函數,文件數據被 Copy 到內核緩沖區。Read 函數返回,文件數據從內核緩沖區 Copy 到用戶緩沖區。Write 函數調用,將文件數據從用戶緩沖區 Copy 到內核與 Socket 相關的緩沖區。數據從 Socket 緩沖區 Copy 到相關協議引擎。以上細節是傳統 Read/Write 方式進行網絡文件傳輸的方式,我們可以看到,在這個過程當中,文件數據實際上是經過了四次 Copy 操作:硬盤—>內核 buf—>用戶 buf—>Socket 相關緩沖區—>協議引擎。而 Sendfile 系統調用則提供了一種減少以上多次 Copy,提升文件傳輸性能的方法。在內核版本 2.1 中,引入了 Sendfile 系統調用,以簡化網絡上和兩個本地文件之間的數據傳輸。Sendfile 的引入不僅減少了數據復制,還減少了上下文切換。sendfile(socket, file, len);
【運行流程如下】:【1】Sendfile 系統調用,文件數據被 Copy 至內核緩沖區。【2】再從內核緩沖區 Copy 至內核中 Socket 相關的緩沖區。【3】最后再 Socket 相關的緩沖區 Copy 到協議引擎。
相較傳統 Read/Write 方式,2.1 版本內核引進的 Sendfile 已經減少了內核緩沖區到 User 緩沖區,再由 User 緩沖區到 Socket 相關緩沖區的文件 Copy。而在內核版本 2.4 之后,文件描述符結果被改變,Sendfile 實現了更簡單的方式,再次減少了一次 Copy 操作。在 Apache、Nginx、Lighttpd 等 Web 服務器當中,都有一項 Sendfile 相關的配置,使用 Sendfile 可以大幅提升文件傳輸性能。Kafka 把所有的消息都存放在一個一個的文件中,當消費者需要數據的時候 Kafka 直接把文件發送給消費者,配合 mmap 作為文件讀寫方式,直接把它傳給 Sendfile。
【原因四:批量壓縮】:在很多情況下,系統的瓶頸不是 CPU 或磁盤,而是網絡 IO,對于需要在廣域網上的數據中心之間發送消息的數據流水線尤其如此。進行數據壓縮會消耗少量的 CPU 資源,不過對于 Kafka 而言,網絡 IO 更應該考慮:
? ■ ?因為每個消息都壓縮,但是壓縮率相對很低,所以 Kafka 使用了批量壓縮,即將多個消息一起壓縮而不是單個消息壓縮。
? ■ ?Kafka 允許使用遞歸的消息集合,批量的消息可以通過壓縮的形式傳輸并且在日志中也可以保持壓縮格式,直到被消費者解壓縮。
? ■ ?Kafka 支持多種壓縮協議,包括 Gzip 和 Snappy 壓縮協議。
【總結】:Kafka 速度的秘訣在于,它把所有的消息都變成一個批量的文件,并且進行合理的批量壓縮,減少網絡 IO 損耗,通過 mmap 提高 I/O 速度。寫入數據的時候由于單個 Partion 是末尾添加,所以速度最優;讀取數據的時候配合 Sendfile 直接暴力輸出。
kafka 使用過程中遇到的問題
【基本情況】:兩臺設備上只有一個上存在 logs;
【詳細情況】:一個topic,此topic配置了四個partition。兩個consumer group,這兩個consumer group用于消費同一個topic,但做不同的處理任務。每個consumer group中都只有一個 consumer實例進行消費。兩臺服務器,都運行此日志收集程序。
【問題】: 兩個consumer group用于消費同一個 topic并做不同的處理,其中一個 consumer group(稱作 group2)是將消費到的日志寫入服務器磁盤文件中。有兩臺服務器都在運行此日志收集程序,每個服務器上的程序都創建了一個group2的consumer實例,此consumer實例會分配到兩個 partition進行處理,因此每個服務器都只存儲了一部分日志文件。但是在測試時發現,所有日志都寫入了server1,server2上沒有日志,即便使用測試工具發送了大量數據,server2仍然沒有日志。
【原因】:查看 log發現,server1上的 consumer實例分配的 partition為 partition_0 partition_1,server2上的 consumer實例分配的partition為partition_3、partition_4,兩個server上的consumer實例都被分配了partition,partition分配正常,消費應該沒有問題。server2上沒有日志數據,說明沒有數據供其消費,也就是說,所有數據都被 producer發送到了 partition_1或 partition_2上,這是生產的問題,應該是與生產者的分區路由有關,因此有必要了解下生產者的分區路由策略。Kafka中的每個 Topic分配了4個Partition,生產者(Producer)在將消息記錄(ProducerRecord)發送到某個 Topic時是要選擇對應的 Partition的,選擇 Partition的策略如下:
【1】判斷消息中的 partition字段是否有值,有值的話就是指定了partition,直接將該消息發送到指定的 partition就行。
【2】如果沒有指定分區(partition),則使用分區器進行分區路由,首先判斷消息中是否指定了key。
【3】如果指定了key,則使用該key進行hash操作,并轉為正數,然后將其對topic相應的分區數進行取余操作,得到一個分區。
【4】如果沒有指定key,則在一個隨機數上以自增的方式產生一個數(第一次時生成隨機數,之后在其基礎上進行自增),轉為正數之后對分區數量進行取余操作,得到一個分區。
由于在程序中Producer發送記錄的時候指定了固定的key,根據這個key進行分區路由總是會選擇同一個分區,所有日志都被發送給了同一個分區,因此只有關聯這個分區的 consumer實例才能消費,只有此 consumer實例所在的 server上才有日志。
end
總結
以上是生活随笔為你收集整理的websphere mq 查看队列中是否有数据_全网最全的 “消息队列”的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: python 爬取svg数据_pytho
- 下一篇: oauth_client_details