rocketmq 顺序消费_10 分钟看懂消息队列 RocketMQ
RocketMQ 是阿里巴巴在2012年開源的分布式消息中間件,目前已經捐贈給 Apache 軟件基金會,并于2017年9月25日成為 Apache 的頂級項目。作為經歷過多次阿里巴巴雙十一這種“超級工程”的洗禮并有穩定出色表現的國產中間件,以其高性能、低延時和高可靠等特性近年來已經也被越來越多的國內企業使用。其主要特點有:
1. 靈活可擴展性
RocketMQ 天然支持集群,其核心四組件(Name Server、Broker、Producer、Consumer)每一個都可以在沒有單點故障的情況下進行水平擴展。
2. 海量消息堆積能力
RocketMQ 采用零拷貝原理實現超大的消息的堆積能力,據說單機已可以支持億級消息堆積,而且在堆積了這么多消息后依然保持寫入低延遲。
3. 支持順序消息
可以保證消息消費者按照消息發送的順序對消息進行消費。順序消息分為全局有序和局部有序,一般推薦使用局部有序,即生產者通過將某一類消息按順序發送至同一個隊列來實現。
4. 多種消息過濾方式
消息過濾分為在服務器端過濾和在消費端過濾。服務器端過濾時可以按照消息消費者的要求做過濾,優點是減少不必要消息傳輸,缺點是增加了消息服務器的負擔,實現相對復雜。消費端過濾則完全由具體應用自定義實現,這種方式更加靈活,缺點是很多無用的消息會傳輸給消息消費者。
5. 支持事務消息
RocketMQ 除了支持普通消息,順序消息之外還支持事務消息,這個特性對于分布式事務來說提供了又一種解決思路。
6. 回溯消費
回溯消費是指消費者已經消費成功的消息,由于業務上需求需要重新消費,RocketMQ 支持按照時間回溯消費,時間維度精確到毫秒,可以向前回溯,也可以向后回溯。
基本概念
下面是一張 RocketMQ 的部署結構圖,里面涉及了 RocketMQ 核心的四大組件:Name Server、Broker、Producer、Consumer ,每個組件都可以部署成集群模式進行水平擴展。
生產者
生產者(Producer)負責產生消息,生產者向消息服務器發送由業務應用程序系統生成的消息。RocketMQ 提供了三種方式發送消息:同步、異步和單向。
同步發送
同步發送指消息發送方發出數據后會在收到接收方發回響應之后才發下一個數據包。一般用于重要通知消息,例如重要通知郵件、營銷短信。
異步發送
異步發送指發送方發出數據后,不等接收方發回響應,接著發送下個數據包,一般用于可能鏈路耗時較長而對響應時間敏感的業務場景,例如用戶視頻上傳后通知啟動轉碼服務。
單向發送
單向發送是指只負責發送消息而不等待服務器回應且沒有回調函數觸發,適用于某些耗時非常短但對可靠性要求并不高的場景,例如日志收集。
生產者組
生產者組(Producer Group)是一類 Producer 的集合,這類 Producer 通常發送一類消息并且發送邏輯一致,所以將這些 Producer 分組在一起。從部署結構上看生產者通過 Producer Group 的名字來標記自己是一個集群。
消費者
消費者(Consumer)負責消費消息,消費者從消息服務器拉取信息并將其輸入用戶應用程序。站在用戶應用的角度消費者有兩種類型:拉取型消費者、推送型消費者。
拉取型消費者
拉取型消費者(Pull Consumer)主動從消息服務器拉取信息,只要批量拉取到消息,用戶應用就會啟動消費過程,所以 Pull 稱為主動消費型。
推送型消費者
推送型消費者(Push Consumer)封裝了消息的拉取、消費進度和其他的內部維護工作,將消息到達時執行的回調接口留給用戶應用程序來實現。所以 Push 稱為被動消費類型,但從實現上看還是從消息服務器中拉取消息,不同于 Pull 的是 Push 首先要注冊消費監聽器,當監聽器處觸發后才開始消費消息。
消費者組
消費者組(Consumer Group)一類 Consumer 的集合名稱,這類 Consumer 通常消費同一類消息并且消費邏輯一致,所以將這些 Consumer 分組在一起。消費者組與生產者組類似,都是將相同角色的分組在一起并命名,分組是個很精妙的概念設計,RocketMQ 正是通過這種分組機制,實現了天然的消息負載均衡。消費消息時通過 Consumer Group 實現了將消息分發到多個消費者服務器實例,比如某個 Topic 有9條消息,其中一個 Consumer Group 有3個實例(3個進程或3臺機器),那么每個實例將均攤3條消息,這也意味著我們可以很方便的通過加機器來實現水平擴展。
消息服務器
消息服務器(Broker)是消息存儲中心,主要作用是接收來自 Producer 的消息并存儲, Consumer 從這里取得消息。它還存儲與消息相關的元數據,包括用戶組、消費進度偏移量、隊列信息等。從部署結構圖中可以看出 Broker 有 Master 和 Slave 兩種類型,Master 既可以寫又可以讀,Slave 不可以寫只可以讀。從物理結構上看 Broker 的集群部署方式有四種:單 Master 、多 Master 、多 Master 多 Slave(同步刷盤)、多 Master多 Slave(異步刷盤)。
單 Master
這種方式一旦 Broker 重啟或宕機會導致整個服務不可用,這種方式風險較大,所以顯然不建議線上環境使用。
多 Master
所有消息服務器都是 Master ,沒有 Slave 。這種方式優點是配置簡單,單個 Master 宕機或重啟維護對應用無影響。缺點是單臺機器宕機期間,該機器上未被消費的消息在機器恢復之前不可訂閱,消息實時性會受影響。
多 Master 多 Slave(異步復制)
每個 Master 配置一個 Slave,所以有多對 Master-Slave,消息采用異步復制方式,主備之間有毫秒級消息延遲。這種方式優點是消息丟失的非常少,且消息實時性不會受影響,Master 宕機后消費者可以繼續從 Slave 消費,中間的過程對用戶應用程序透明,不需要人工干預,性能同多 Master 方式幾乎一樣。缺點是 Master 宕機時在磁盤損壞情況下會丟失極少量消息。
多 Master 多 Slave(同步雙寫)
每個 Master 配置一個 Slave,所以有多對 Master-Slave ,消息采用同步雙寫方式,主備都寫成功才返回成功。這種方式優點是數據與服務都沒有單點問題,Master 宕機時消息無延遲,服務與數據的可用性非常高。缺點是性能相對異步復制方式略低,發送消息的延遲會略高。
名稱服務器
名稱服務器(NameServer)用來保存 Broker 相關元信息并給 Producer 和 Consumer 查找 Broker 信息。NameServer 被設計成幾乎無狀態的,可以橫向擴展,節點之間相互之間無通信,通過部署多臺機器來標記自己是一個偽集群。每個 Broker 在啟動的時候會到 NameServer 注冊,Producer 在發送消息前會根據 Topic 到 NameServer 獲取到 Broker 的路由信息,Consumer 也會定時獲取 Topic 的路由信息。所以從功能上看應該是和 ZooKeeper 差不多,據說 RocketMQ 的早期版本確實是使用的 ZooKeeper ,后來改為了自己實現的 NameServer 。
消息
消息(Message)就是要傳輸的信息。一條消息必須有一個主題(Topic),主題可以看做是你的信件要郵寄的地址。一條消息也可以擁有一個可選的標簽(Tag)和額處的鍵值對,它們可以用于設置一個業務 key 并在 Broker 上查找此消息以便在開發期間查找問題。
主題
主題(Topic)可以看做消息的規類,它是消息的第一級類型。比如一個電商系統可以分為:交易消息、物流消息等,一條消息必須有一個 Topic 。Topic 與生產者和消費者的關系非常松散,一個 Topic 可以有0個、1個、多個生產者向其發送消息,一個生產者也可以同時向不同的 Topic 發送消息。一個 Topic 也可以被 0個、1個、多個消費者訂閱。
標簽
標簽(Tag)可以看作子主題,它是消息的第二級類型,用于為用戶提供額外的靈活性。使用標簽,同一業務模塊不同目的的消息就可以用相同 Topic 而不同的 Tag 來標識。比如交易消息又可以分為:交易創建消息、交易完成消息等,一條消息可以沒有 Tag 。標簽有助于保持您的代碼干凈和連貫,并且還可以為 RocketMQ 提供的查詢系統提供幫助。
消息隊列
消息隊列(Message Queue),主題被劃分為一個或多個子主題,即消息隊列。一個 Topic 下可以設置多個消息隊列,發送消息時執行該消息的 Topic ,RocketMQ 會輪詢該 Topic 下的所有隊列將消息發出去。下圖 Broker 內部消息情況:
消息消費模式
消息消費模式有兩種:集群消費(Clustering)和廣播消費(Broadcasting)。默認情況下就是集群消費,該模式下一個消費者集群共同消費一個主題的多個隊列,一個隊列只會被一個消費者消費,如果某個消費者掛掉,分組內其它消費者會接替掛掉的消費者繼續消費。而廣播消費消息會發給消費者組中的每一個消費者進行消費。
消息順序
消息順序(Message Order)有兩種:順序消費(Orderly)和并行消費(Concurrently)。順序消費表示消息消費的順序同生產者為每個消息隊列發送的順序一致,所以如果正在處理全局順序是強制性的場景,需要確保使用的主題只有一個消息隊列。并行消費不再保證消息順序,消費的最大并行數量受每個消費者客戶端指定的線程池限制。
Java 訪問 RocketMQ 實例
RocketMQ 目前支持 Java、C++、Go 三種語言訪問,按慣例以 Java 語言為例看下如何用 RocketMQ 來收發消息的。
引入依賴
添加 RocketMQ 客戶端訪問支持,具體版本和安裝的 RocketMQ 版本一致即可。
消息生產者
示例中用 DefaultMQProducer 類來創建一個消息生產者,通常一個應用創建一個 DefaultMQProducer 對象,所以一般由應用來維護生產者對象,可以其設置為全局對象或者單例。該類構造函數入參 producerGroup 是消息生產者組的名字,無論生產者還是消費者都必須給出 GroupName ,并保證該名字的唯一性,ProducerGroup 發送普通的消息時作用不大,后面介紹分布式事務消息時會用到。
接下來指定 NameServer 地址和調用 start 方法初始化,在整個應用生命周期內只需要調用一次 start 方法。
初始化完成后,調用 send 方法發送消息,示例中只是簡單的構造了100條同樣的消息發送,其實一個 Producer 對象可以發送多個主題多個標簽的消息,消息對象的標簽可以為空。send 方法是同步調用,只要不拋異常就標識成功。
最后應用退出時調用 shutdown 方法清理資源、關閉網絡連接,從服務器上注銷自己,通常建議應用在 JBOSS、Tomcat 等容器的退出鉤子里調用 shutdown 方法。
消息消費者
示例中用 DefaultMQPushConsumer 類來創建一個消息消費者,通生產者一樣一個應用一般創建一個 DefaultMQPushConsumer 對象,該對象一般由應用來維護,可以其設置為全局對象或者單例。該類構造函數入參 consumerGroup 是消息消費者組的名字,需要保證該名字的唯一性。
接下來指定 NameServer 地址和設置消費者應用程序第一次啟動時從隊列頭部開始消費還是隊列尾部開始消費。
接著調用 subscribe 方法給消費者對象訂閱指定主題下的消息,該方法第一個參數是主題名,第二個擦書是標簽名,示例表示訂閱了主題名 topic_example_java 下所有標簽的消息。
最主要的是注冊消息監聽器才能消費消息,示例中用的是 Consumer Push 的方式,即設置監聽器回調的方式消費消息,默認監聽回調方法中 List 里只有一條消息,可以通過設置參數來批量接收消息。
最后調用 start 方法初始化,在整個應用生命周期內只需要調用一次 start 方法。
啟動 Name Server
RocketMQ 核心的四大組件中 Name Server 和 Broker 都是由 RocketMQ 安裝包提供的,所以要啟動這兩個應用才能提供消息服務。首先啟動 Name Server,先確保你的機器中已經安裝了與 RocketMQ 相匹配的 JDK ,并設置了環境變量 JAVA_HOME ,然后在 RocketMQ 的安裝目錄下執行 bin 目錄下的 mqnamesrv ,默認會將該命令的執行情況輸出到當前目錄的 nohup.out 文件,最后跟蹤日志文件查看 Name Server 的實際運行情況。
啟動 Broker
同樣也要確保你的機器中已經安裝了與 RocketMQ 相匹配的 JDK ,并設置了環境變量 JAVA_HOME ,然后在 RocketMQ 的安裝目錄下執行 bin 目錄下的 mqbroker ,默認會將該命令的執行情況輸出到當前目錄的 nohup.out 文件,最后跟蹤日志文件查看 Broker 的實際運行情況。
運行 Consumer
先運行 Consumer 類,這樣當生產者發送消息的時候能在消費者后端看到消息記錄。配置沒問題的話會看到在控制臺打印出消息消費者已啟動。
運行 Producer
最后運行 Producer 類,在 Consumer 的控制臺能看到接收的消息
Spring 整合 RocketMQ
不同于 RabbitMQ、ActiveMQ、Kafka 等消息中間件,Spring 社區已經通過多種方式提供了對這些中間件產品集成,例如通過 spring-jms 整合 ActiveMQ、通過 Spring AMQP 項目下的 spring-rabbit 整合 RabbitMQ、通過 spring-kafka 整合 kafka ,通過他們可以在 Spring 項目中更方便使用其 API 。目前在 Spring 框架中集成 RocketMQ 有三種方式,一是將消息生產者和消費者定義成 bean 對象交由 Spring 容器管理,二是使用 RocketMQ 社區的外部項目 rocketmq-jms(https://github.com/apache/rocketmq-externals/tree/master/rocketmq-jms)然后通過 spring-jms 方式集成使用,三是如果你的應用是基于 spring-boot 的,可以使用 RocketMQ 的外部項目 rocketmq-spring-boot-starter(https://github.com/apache/rocketmq-externals/tree/master/rocketmq-spring-boot-starter)比較方便的收發消息。
總的來講 rocketmq-jms 項目實現了 JMS 1.1 規范的部分內容,目前支持 JMS 中的發布/訂閱模型收發消息。rocketmq-spring-boot-starter 項目目前已經支持同步發送、異步發送、單向發送、順序消費、并行消費、集群消費、廣播消費等特性,如果比較喜歡 Spring Boot 這種全家桶的快速開發框架并且現有特性已滿足業務要求可以使用該項目。當然從 API 使用上最靈活的還是第一種方式,下面以第一種方式為例簡單看下Spring 如何集成 RocketMQ 的。
消息生產者
消息生產者就是把生產者 DefaultMQProducer 對象的生命周期分成構造函數、init、destroy 三個方法,構造函數中將生產者組名、NameServer 地址作為變量由 Spring 容器在配置時提供,init 方法中實例化 DefaultMQProducer 對象、設置 NameServer 地址、初始化生產者對象,destroy 方法用于生產者對象銷毀時清理資源。
消息消費者
同消息生產者類似,消息消費者是把生產者 DefaultMQPushConsumer 對象的生命周期分成構造函數、init、destroy 三個方法,具體含義在介紹 Java 訪問 RocketMQ 實例時已經介紹過了,不再贅述。當然,有了消費者對象還需要消息監聽器在接收到消息后執行具體的處理邏輯。
消息監聽器類就是把前面 Java 示例中注冊消息監聽器時聲明的匿名內部類代碼抽取出來定義成單獨一個類而已。
Spring 配置文件
因為只使用 Spring 框架集成,所以除了 Sping 框架核心 jar 包外不需要額外添加依賴包了。本例中將消息生產者和消息消費者分成兩個配置文件,這樣能更好的演示收發消息的效果。
消息生產者配置很簡單,定義了一個消息生產者對象,該對象初始化時調用 init 方法,對象銷毀前執行 destroy 方法,將 Name Server 地址和生產者組配置好。
消息消費者同消息生產者配置類似,多了一個消息監聽器對象的定義和綁定。
運行實例程序
按前述步驟 啟動 Name Server 和 Broker,接著運行消息生產者和消息消費者程序,簡化起見我們用兩個單元測試類模擬這兩個程序:
SpringProducerTest 類模擬消息生產者發送消息。
SpringConsumerTest 類模擬消息消費者者接收消息,在 consume 方法返回之前需要讓當前線程睡眠一段時間,使消費者程序繼續存活才能監聽到生產者發送的消息。
分別運行 SpringProducerTest 類 和 SpringConsumerTest 類,在 SpringConsumerTest 的控制臺能看到接收的消息:
假如啟動兩個 SpringConsumerTest 類進程,因為它們屬于同一消費者組,在 SpringConsumerTest 的控制臺能看到它們均攤到了消息:
最后,我這里有一套 RocketMQ 的視頻教程,有需要的網友,請加我微信號“xttblog”為好友,我免費發給大家!
總結
以上是生活随笔為你收集整理的rocketmq 顺序消费_10 分钟看懂消息队列 RocketMQ的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 电脑重新分区扩大c盘_两种方法,给电脑C
- 下一篇: python join_python j