RocketMQ 是什么 Github 上關(guān)于 RocketMQ 的介紹:RcoketMQ 是一款低延遲、高可靠、可伸縮、易于使用的消息中間件。具有以下特性:
支持發(fā)布/訂閱(Pub/Sub)和點(diǎn)對點(diǎn)(P2P)消息模型 在一個隊(duì)列中可靠的先進(jìn)先出(FIFO)和嚴(yán)格的順序傳遞 支持拉(pull)和推(push)兩種消息模式 單一隊(duì)列百萬消息的堆積能力 支持多種消息協(xié)議,如 JMS、MQTT 等 分布式高可用的部署架構(gòu),滿足至少一次消息傳遞語義 提供 docker 鏡像用于隔離測試和云集群部署 提供配置、指標(biāo)和監(jiān)控等功能豐富的 Dashboard 對于這些特性描述,大家簡單過一眼就即可,深入學(xué)習(xí)之后自然就明白了。
專業(yè)術(shù)語 Producer 消息生產(chǎn)者,生產(chǎn)者的作用就是將消息發(fā)送到 MQ,生產(chǎn)者本身既可以產(chǎn)生消息,如讀取文本信息等。也可以對外提供接口,由外部應(yīng)用來調(diào)用接口,再由生產(chǎn)者將收到的消息發(fā)送到 MQ。
Producer Group 生產(chǎn)者組,簡單來說就是多個發(fā)送同一類消息的生產(chǎn)者稱之為一個生產(chǎn)者組。在這里可以不用關(guān)心,只要知道有這么一個概念即可。
Consumer 消息消費(fèi)者,簡單來說,消費(fèi) MQ 上的消息的應(yīng)用程序就是消費(fèi)者,至于消息是否進(jìn)行邏輯處理,還是直接存儲到數(shù)據(jù)庫等取決于業(yè)務(wù)需要。
Consumer Group 消費(fèi)者組,和生產(chǎn)者類似,消費(fèi)同一類消息的多個 consumer 實(shí)例組成一個消費(fèi)者組。
Topic Topic 是一種消息的邏輯分類,比如說你有訂單類的消息,也有庫存類的消息,那么就需要進(jìn)行分類,一個是訂單 Topic 存放訂單相關(guān)的消息,一個是庫存 Topic 存儲庫存相關(guān)的消息。
Message Message 是消息的載體。一個 Message 必須指定 topic,相當(dāng)于寄信的地址。Message 還有一個可選的 tag 設(shè)置,以便消費(fèi)端可以基于 tag 進(jìn)行過濾消息。也可以添加額外的鍵值對,例如你需要一個業(yè)務(wù) key 來查找 broker 上的消息,方便在開發(fā)過程中診斷問題。
Tag 標(biāo)簽可以被認(rèn)為是對 Topic 進(jìn)一步細(xì)化。一般在相同業(yè)務(wù)模塊中通過引入標(biāo)簽來標(biāo)記不同用途的消息。
Broker Broker 是 RocketMQ 系統(tǒng)的主要角色,其實(shí)就是前面一直說的 MQ。Broker 接收來自生產(chǎn)者的消息,儲存以及為消費(fèi)者拉取消息的請求做好準(zhǔn)備。
Name Server Name Server 為 producer 和 consumer 提供路由信息。
RocketMQ 架構(gòu) RocketMQ 架構(gòu)
由這張圖可以看到有四個集群,分別是 NameServer 集群、Broker 集群、Producer 集群和 Consumer 集群:
NameServer: 提供輕量級的服務(wù)發(fā)現(xiàn)和路由。 每個 NameServer 記錄完整的路由信息,提供等效的讀寫服務(wù),并支持快速存儲擴(kuò)展。 Broker: 通過提供輕量級的 Topic 和 Queue 機(jī)制來處理消息存儲,同時支持推(push)和拉(pull)模式以及主從結(jié)構(gòu)的容錯機(jī)制。 Producer:生產(chǎn)者,產(chǎn)生消息的實(shí)例,擁有相同 Producer Group 的 Producer 組成一個集群。 Consumer:消費(fèi)者,接收消息進(jìn)行消費(fèi)的實(shí)例,擁有相同 Consumer Group 的Consumer 組成一個集群。 簡單說明一下圖中箭頭含義,從 Broker 開始,Broker Master1 和 Broker Slave1 是主從結(jié)構(gòu),它們之間會進(jìn)行數(shù)據(jù)同步,即 Date Sync。同時每個 Broker 與NameServer 集群中的所有節(jié)點(diǎn)建立長連接,定時注冊 Topic 信息到所有 NameServer 中。
Producer 與 NameServer 集群中的其中一個節(jié)點(diǎn)(隨機(jī)選擇)建立長連接,定期從 NameServer 獲取 Topic 路由信息,并向提供 Topic 服務(wù)的 Broker Master 建立長連接,且定時向 Broker 發(fā)送心跳。Producer 只能將消息發(fā)送到 Broker master,但是 Consumer 則不一樣,它同時和提供 Topic 服務(wù)的 Master 和 Slave建立長連接,既可以從 Broker Master 訂閱消息,也可以從 Broker Slave 訂閱消息。
RocketMQ 集群部署模式 單 master 模式也就是只有一個 master 節(jié)點(diǎn),稱不上是集群,一旦這個 master 節(jié)點(diǎn)宕機(jī),那么整個服務(wù)就不可用,適合個人學(xué)習(xí)使用。 多 master 模式多個 master 節(jié)點(diǎn)組成集群,單個 master 節(jié)點(diǎn)宕機(jī)或者重啟對應(yīng)用沒有影響。優(yōu)點(diǎn):所有模式中性能最高缺點(diǎn):單個 master 節(jié)點(diǎn)宕機(jī)期間,未被消費(fèi)的消息在節(jié)點(diǎn)恢復(fù)之前不可用,消息的實(shí)時性就受到影響。注意 :使用同步刷盤可以保證消息不丟失,同時 Topic 相對應(yīng)的 queue 應(yīng)該分布在集群中各個節(jié)點(diǎn),而不是只在某各節(jié)點(diǎn)上,否則,該節(jié)點(diǎn)宕機(jī)會對訂閱該 topic 的應(yīng)用造成影響。 多 master 多 slave 異步復(fù)制模式在多 master 模式的基礎(chǔ)上,每個 master 節(jié)點(diǎn)都有至少一個對應(yīng)的 slave。master節(jié)點(diǎn)可讀可寫,但是 slave 只能讀不能寫,類似于 mysql 的主備模式。優(yōu)點(diǎn): 在 master 宕機(jī)時,消費(fèi)者可以從 slave 讀取消息,消息的實(shí)時性不會受影響,性能幾乎和多 master 一樣。缺點(diǎn):使用異步復(fù)制的同步方式有可能會有消息丟失的問題。 多 master 多 slave 同步雙寫模式同多 master 多 slave 異步復(fù)制模式類似,區(qū)別在于 master 和 slave 之間的數(shù)據(jù)同步方式。優(yōu)點(diǎn):同步雙寫的同步模式能保證數(shù)據(jù)不丟失。缺點(diǎn):發(fā)送單個消息 RT 會略長,性能相比異步復(fù)制低10%左右。刷盤策略:同步刷盤和異步刷盤(指的是節(jié)點(diǎn)自身數(shù)據(jù)是同步還是異步存儲)同步方式:同步雙寫和異步復(fù)制(指的一組 master 和 slave 之間數(shù)據(jù)的同步)注意 :要保證數(shù)據(jù)可靠,需采用同步刷盤和同步雙寫的方式,但性能會較其他方式低。 RocketMQ 單主部署 鑒于是快速入門,我選擇的是第一種單 master 的部署模式。先說明一下我的安裝環(huán)境:
Centos 7.2 jdk 1.8 Maven 3.2.x Git 這里 git 可用可不用,主要是用來直接下載 github 上的源碼。也可以選擇自己到github 上下載,然后上傳到服務(wù)器上。以git操作為示例。
clone 源碼并用 maven 編譯 > git clone https://github.com/alibaba/RocketMQ.git /opt/RocketMQ
> cd /opt/RocketMQ && mvn -Dmaven.test.skip=true clean package install assembly:assembly -U
> cd target/alibaba-rocketmq-broker/alibaba-rocketmq
此處可能遇到的問題 一、執(zhí)行"git clone https://github.com/alibaba/RocketMQ.git /home/inspkgs/RocketMQ"時出現(xiàn)以下提示:
fatal: unable to access 'https://github.com/alibaba/RocketMQ.git/': Could not resolve host: github.com; Unknown error
解決辦法:一般是由于網(wǎng)絡(luò)原因造成的,執(zhí)行以下命令
> ping github.com
確定可以 ping 通之后,再重新執(zhí)行 git clone 命令。 二、執(zhí)行"mvn -Dmaven.test.skip=true clean package install assembly:assembly -U"編譯時,可能出現(xiàn)下載相關(guān)jar很慢的情況。 這也是由于默認(rèn) maven 中央倉庫在國外的原因,可以根據(jù)需要在 /home/maven/conf/setting.xml 中的 <mirrors></mirrors> 添加以下內(nèi)容后重新編譯:
<mirror><id>aliyun</id><mirrorOf>central</mirrorOf><name>aliyun maven</name><url>http://maven.aliyun.com/nexus/content/groups/public/</url>
</mirror>
啟動 Name Server > nohup sh /opt/RocketMQ/bin/mqnamesrv &
//執(zhí)行 jps 查看進(jìn)程
> jps
25913 NamesrvStartup
//查看日志確保服務(wù)已正常啟動
> tail -f ~/logs/rocketmqlogs/namesrv.log
The Name Server boot success...
啟動 broker > nohup sh /opt/RocketMQ/bin/mqbroker -n localhost:9876 &
//執(zhí)行 jps 查看進(jìn)程
> jps
25954 BrokerStartup
//查看日志確保服務(wù)已正常啟動
> tail -f ~/logs/rocketmqlogs/broker.log
The broker[broker-a, 10.1.54.121:10911] boot success...
發(fā)送和接收消息 發(fā)送/接收消息之前,我們需要告訴客戶端 NameServer 地址。RocketMQ 提供了多種方式來實(shí)現(xiàn)這一目標(biāo)。為簡單起見,我們使用環(huán)境變量 NAMESRV_ADDR。 > export NAMESRV_ADDR=localhost:9876
> sh /opt/RocketMQ/bin/tools.sh com.alibaba.rocketmq.example.quickstart.Producer
SendResult [sendStatus=SEND_OK, msgId= ...
> sh /opt/RocketMQ/bin/tools.sh com.alibaba.rocketmq.example.quickstart.Consumer
ConsumeMessageThread_%d Receive New Messages: [MessageExt...
關(guān)閉服務(wù) > sh /opt/RocketMQ/bin/mqshutdown broker
The mqbroker(36695) is running...
Send shutdown request to mqbroker(36695) OK
> sh /opt/RocketMQ/bin/mqshutdown namesrv
The mqnamesrv(36664) is running...
Send shutdown request to mqnamesrv(36664) OK
生產(chǎn)者、消費(fèi)者 Demo 生產(chǎn)者 public class Producer {public static void main(String[] args) throws MQClientException, InterruptedException {//聲明并初始化一個producer//需要一個producer group名字作為構(gòu)造方法的參數(shù),這里為producer1DefaultMQProducer producer = new DefaultMQProducer("producer1");//設(shè)置NameServer地址,此處應(yīng)改為實(shí)際NameServer地址,多個地址之間用;分隔//NameServer的地址必須有,但是也可以通過環(huán)境變量的方式設(shè)置,不一定非得寫死在代碼里producer.setNamesrvAddr("10.1.54.121:9876;10.1.54.122:9876");//調(diào)用start()方法啟動一個producer實(shí)例producer.start();//發(fā)送10條消息到Topic為TopicTest,tag為TagA,消息內(nèi)容為“Hello RocketMQ”拼接上i的值for (int i = 0; i < 10; i++) {try {Message msg = new Message("TopicTest",// topic"TagA",// tag("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)// body);//調(diào)用producer的send()方法發(fā)送消息//這里調(diào)用的是同步的方式,所以會有返回結(jié)果SendResult sendResult = producer.send(msg);//打印返回結(jié)果,可以看到消息發(fā)送的狀態(tài)以及一些相關(guān)信息System.out.println(sendResult);} catch (Exception e) {e.printStackTrace();Thread.sleep(1000);}}//發(fā)送完消息之后,調(diào)用shutdown()方法關(guān)閉producerproducer.shutdown();}
}
消費(fèi)者 public class Consumer {public static void main(String[] args) throws InterruptedException, MQClientException {//聲明并初始化一個consumer//需要一個consumer group名字作為構(gòu)造方法的參數(shù),這里為consumer1DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer1");//同樣也要設(shè)置NameServer地址consumer.setNamesrvAddr("10.1.54.121:9876;10.1.54.122:9876");//這里設(shè)置的是一個consumer的消費(fèi)策略//CONSUME_FROM_LAST_OFFSET 默認(rèn)策略,從該隊(duì)列最尾開始消費(fèi),即跳過歷史消息//CONSUME_FROM_FIRST_OFFSET 從隊(duì)列最開始開始消費(fèi),即歷史消息(還儲存在broker的)全部消費(fèi)一遍//CONSUME_FROM_TIMESTAMP 從某個時間點(diǎn)開始消費(fèi),和setConsumeTimestamp()配合使用,默認(rèn)是半個小時以前consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);//設(shè)置consumer所訂閱的Topic和Tag,*代表全部的Tagconsumer.subscribe("TopicTest", "*");//設(shè)置一個Listener,主要進(jìn)行消息的邏輯處理consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,ConsumeConcurrentlyContext context) {System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs);//返回消費(fèi)狀態(tài)//CONSUME_SUCCESS 消費(fèi)成功//RECONSUME_LATER 消費(fèi)失敗,需要稍后重新消費(fèi)return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});//調(diào)用start()方法啟動consumerconsumer.start();System.out.println("Consumer Started.");}
}
作者:馮先生的筆記
鏈接:http://www.jianshu.com/p/824066d70da8
來源:簡書
著作權(quán)歸作者所有。商業(yè)轉(zhuǎn)載請聯(lián)系作者獲得授權(quán),非商業(yè)轉(zhuǎn)載請注明出處。作者:馮先生的筆記鏈接:http://www.jianshu.com/p/824066d70da8來源:簡書著作權(quán)歸作者所有。商業(yè)轉(zhuǎn)載請聯(lián)系作者獲得授權(quán),非商業(yè)轉(zhuǎn)載請注明出處。
總結(jié)
以上是生活随笔 為你收集整理的RocketMQ特性、专业术语(Producer,Producer Group,Consumer Group,Topic,Message,Tag,Broker,Name Server)等 的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
如果覺得生活随笔 網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔 推薦給好友。