RocketMQ简介
RocketMQ作為一款純java、分布式、隊(duì)列模型的開(kāi)源消息中間件,支持事務(wù)消息、順序消息、批量消息、定時(shí)消息、消息回溯等。
RocketMQ優(yōu)點(diǎn)
1 RocketMQ去除對(duì)zk的依賴
2 RocketMQ支持異步和同步兩種方式刷磁盤
3 RocketMQ單機(jī)支持的隊(duì)列或者topic數(shù)量是5w
4 RocketMQ支持消息重試
5 RocketMQ支持嚴(yán)格按照一定的順序發(fā)送消息
6 RocketMQ支持定時(shí)發(fā)送消息
7 RocketMQ支持根據(jù)消息ID來(lái)進(jìn)行查詢
8 RocketMQ支持根據(jù)某個(gè)時(shí)間點(diǎn)進(jìn)行消息的回溯
9 RocketMQ支持對(duì)消息服務(wù)端的過(guò)濾
10 RocketMQ消費(fèi)并行度:順序消費(fèi) 取決于queue數(shù)量,亂序消費(fèi) 取決于consumer數(shù)量
RocketMQ架構(gòu)原理
RocketMQ專業(yè)名詞
Producer 生產(chǎn)者角色—投遞消息給mq。
Producer Group 生產(chǎn)者組 —
Consumer 消費(fèi)者 采用拉取/mq推送方式 獲取消息offset
Consumer Group 消費(fèi)者組----在同一個(gè)組中,是不允許多個(gè)不同的消費(fèi)者消費(fèi)同一條消息。
多個(gè)消費(fèi)者消費(fèi)同一條消息呢? 兩個(gè)分組 多個(gè)不同的分組中可以允許有不同分組中消費(fèi)者消費(fèi)同一條消息的。----
以組的名義關(guān)聯(lián)該組消費(fèi)的offset位置—
Topic
業(yè)務(wù)隊(duì)列存放消息
異步發(fā)送短信
異步發(fā)送郵件
郵件Topic
短信Topic
業(yè)務(wù)區(qū)分不同的隊(duì)列消息
Queue
會(huì)將一個(gè)topic主題中的消息存放在多個(gè)不同的Queue 與kafka分區(qū)模型是一樣
中。
Message
生產(chǎn)者投遞消息會(huì)自動(dòng)對(duì)給消息生成一個(gè)全局消息id,后期的可以根據(jù)該消息全局id實(shí)現(xiàn)業(yè)務(wù)的防止重復(fù)執(zhí)行------冪等性概念。
Tag
–區(qū)分 過(guò)濾
Broker
Mq服務(wù)器端
Name Server
與zk相同思想,作為rocketmq注冊(cè)中心 存放生產(chǎn)者消費(fèi)者 topic主題信息。
Producer
消息生產(chǎn)者,位于用戶的進(jìn)程內(nèi),Producer通過(guò)NameServer獲取所有Broker的路由信息,根據(jù)負(fù)載均衡策略選擇將消息發(fā)到哪個(gè)Broker,然后調(diào)用Broker接口提交消息。
Producer Group
生產(chǎn)者組,簡(jiǎn)單來(lái)說(shuō)就是多個(gè)發(fā)送同一類消息的生產(chǎn)者稱之為一個(gè)生產(chǎn)者組。
Consumer
消息消費(fèi)者,位于用戶進(jìn)程內(nèi)。Consumer通過(guò)NameServer獲取所有broker的路由信息后,向Broker發(fā)送Pull請(qǐng)求來(lái)獲取消息數(shù)據(jù)。Consumer可以以兩種模式啟動(dòng),廣播(Broadcast)和集群(Cluster),廣播模式下,一條消息會(huì)發(fā)送給所有Consumer,集群模式下消息只會(huì)發(fā)送給一個(gè)Consumer。
Consumer Group
消費(fèi)者組,和生產(chǎn)者類似,消費(fèi)同一類消息的多個(gè) Consumer 實(shí)例組成一個(gè)消費(fèi)者組。
Topic
Topic用于將消息按主題做劃分,Producer將消息發(fā)往指定的Topic,Consumer訂閱該Topic就可以收到這條消息。Topic跟發(fā)送方和消費(fèi)方都沒(méi)有強(qiáng)關(guān)聯(lián)關(guān)系,發(fā)送方可以同時(shí)往多個(gè)Topic投放消息,消費(fèi)方也可以訂閱多個(gè)Topic的消息。在RocketMQ中,Topic是一個(gè)上邏輯概念。消息存儲(chǔ)不會(huì)按Topic分開(kāi)。
Message
代表一條消息,使用MessageId唯一識(shí)別,用戶在發(fā)送時(shí)可以設(shè)置messageKey,便于之后查詢和跟蹤。一個(gè) Message 必須指定 Topic,相當(dāng)于寄信的地址。Message 還有一個(gè)可選的 Tag 設(shè)置,以便消費(fèi)端可以基于 Tag 進(jìn)行過(guò)濾消息。也可以添加額外的鍵值對(duì),例如你需要一個(gè)業(yè)務(wù) key 來(lái)查找 Broker 上的消息,方便在開(kāi)發(fā)過(guò)程中診斷問(wèn)題。
Tag
標(biāo)簽可以被認(rèn)為是對(duì) Topic 進(jìn)一步細(xì)化。一般在相同業(yè)務(wù)模塊中通過(guò)引入標(biāo)簽來(lái)標(biāo)記不同用途的消息。
Broker
Broker是RocketMQ的核心模塊,負(fù)責(zé)接收并存儲(chǔ)消息,同時(shí)提供Push/Pull接口來(lái)將消息發(fā)送給Consumer。Consumer可選擇從Master或者Slave讀取數(shù)據(jù)。多個(gè)主/從組成Broker集群,集群內(nèi)的Master節(jié)點(diǎn)之間不做數(shù)據(jù)交互。Broker同時(shí)提供消息查詢的功能,可以通過(guò)MessageID和MessageKey來(lái)查詢消息。Borker會(huì)將自己的Topic配置信息實(shí)時(shí)同步到NameServer。
Queue
Topic和Queue是1對(duì)多的關(guān)系,一個(gè)Topic下可以包含多個(gè)Queue,主要用于負(fù)載均衡。發(fā)送消息時(shí),用戶只指定Topic,Producer會(huì)根據(jù)Topic的路由信息選擇具體發(fā)到哪個(gè)Queue上。Consumer訂閱消息時(shí),會(huì)根據(jù)負(fù)載均衡策略決定訂閱哪些Queue的消息。
Offset
RocketMQ在存儲(chǔ)消息時(shí)會(huì)為每個(gè)Topic下的每個(gè)Queue生成一個(gè)消息的索引文件,每個(gè)Queue都對(duì)應(yīng)一個(gè)Offset記錄當(dāng)前Queue中消息條數(shù)。
NameServer
NameServer可以看作是RocketMQ的注冊(cè)中心,它管理兩部分?jǐn)?shù)據(jù):集群的Topic-Queue的路由配置;Broker的實(shí)時(shí)配置信息。其它模塊通過(guò)Nameserv提供的接口獲取最新的Topic配置和路由信息。
Producer/Consumer :通過(guò)查詢接口獲取Topic對(duì)應(yīng)的Broker的地址信息
Broker : 注冊(cè)配置信息到NameServer, 實(shí)時(shí)更新Topic信息到NameServe
RocketMQ環(huán)境搭建
注意:一定要配置rocketmq 環(huán)境變量 不然啟動(dòng) mqnamesrv.cmd
報(bào)錯(cuò): Please set the ROCKETMQ_HOME variable in your environment!
啟動(dòng)mqnamesrv
系統(tǒng)環(huán)境變量配置
變量名:ROCKETMQ_HOME
變量值:MQ解壓路徑\MQ文件夾名
eg、ROCKETMQ_HOME=D:\rocketmq-all-4.3.0-bin-release
4.啟動(dòng) mqnamesrv.cmd
啟動(dòng)mqBroker
Cmd命令框執(zhí)行進(jìn)入至‘MQ文件夾\bin’下,然后執(zhí)行‘start mqbroker.cmd -n 127.0.0.1:9876 autoCreateTopicEnable=true’,啟動(dòng)BROKER。成功后會(huì)彈出提示框,此框勿關(guān)閉。
啟動(dòng)Rocketmq-console
1.下載rocketmq-externals-master
,進(jìn)入‘rocketmq-externals\rocketmq-console\src\main\resources’文件夾,打開(kāi)‘a(chǎn)pplication.properties’進(jìn)行配置。
2. 新增:rocketmq.config.namesrvAddr=127.0.0.1:9876
3. 執(zhí)行
用CMD進(jìn)入‘\rocketmq-externals\rocketmq-console’文件夾,執(zhí)行‘mvn clean package -Dmaven.test.skip=true’,編譯生成。
編譯成功之后,Cmd進(jìn)入‘target’文件夾,執(zhí)行‘java -jar rocketmq-console-ng-2.0.0.jar’,啟動(dòng)‘rocketmq-console-ng-1.0.0.jar’。
4.瀏覽器中輸入‘127.0.0.1:配置端口’,成功后即可查看。
eg:http://127.0.0.1:8088
RocketMQ集群環(huán)境
1.集群支持:
RocketMQ天生對(duì)集群的支持非常友好
2.單Master:
優(yōu)點(diǎn):除了配置簡(jiǎn)單沒(méi)什么優(yōu)點(diǎn)
缺點(diǎn):不可靠,該機(jī)器重啟或宕機(jī),將導(dǎo)致整個(gè)服務(wù)不可用
3.多Master:
優(yōu)點(diǎn):配置簡(jiǎn)單,性能最高
缺點(diǎn):可能會(huì)有少量消息丟失(配置相關(guān)),單臺(tái)機(jī)器重啟或宕機(jī)期間,該機(jī)器下未被消費(fèi)的消息在機(jī)器恢復(fù)前不可訂閱,影響消息實(shí)時(shí)性
4.多Master多Slave異步模式:
每個(gè)Master配一個(gè)Slave,有多對(duì)Master-Slave,集群采用異步復(fù)制方式,主備有短消息延遲,毫秒級(jí)
優(yōu)點(diǎn):性能同多Master幾乎一樣,實(shí)時(shí)性高,主備間切換對(duì)應(yīng)用透明,不需人工干預(yù)
缺點(diǎn):Master宕機(jī)或磁盤損壞時(shí)會(huì)有少量消息丟失
Rocketmq偽集群搭建
集群:Master節(jié)點(diǎn)、Slave節(jié)點(diǎn)
Master節(jié)點(diǎn):讀寫 生產(chǎn)者可以投遞消息到Master節(jié)點(diǎn)、消費(fèi)者讀取該Master節(jié)點(diǎn)消費(fèi)消息。
Slave節(jié)點(diǎn):只能讀 不能夠?qū)?消費(fèi)者只能夠讀,生產(chǎn)者不能夠?qū)⑾⑼哆f到Slave節(jié)點(diǎn)中。
broker-a.conf 配置
brokerClusterName=DefaultCluster
#從節(jié)點(diǎn)的brokerName必須和主節(jié)點(diǎn)一樣
brokerName=broker-a
#0表示是一個(gè)主節(jié)點(diǎn), >0表示Slave
brokerId=0
deleteWhen=04
fileReservedTime=48
brokerRole=SLAVE
flushDiskType=ASYNC_FLUSH
#nameServer地址,分號(hào)分割
namesrvAddr=127.0.0.1:9876
#允許自動(dòng)創(chuàng)建主題
autoCreateTopicEnable=true
#注意需改端口,并且要和默認(rèn)的10911相差5以上
listenPort=10911
#存儲(chǔ)路徑
storePathRootDir=D:\rocketmq\store_master
#commitLog存儲(chǔ)路徑
storePathCommitLog=D:\rocketmq\store_master\commitLog
#消費(fèi)隊(duì)列存儲(chǔ)路徑
storePathConsumerQueue=D:\rocketmq\store_master\consumerqueue
#消息索引存儲(chǔ)路徑
storePathIndex=D:\rocketmq\store_master\index
#checkpoint 文件存儲(chǔ)路徑
storeCheckpoint=D:\rocketmq\store_master\checkpoint
#abort 文件存儲(chǔ)路徑
abortFile=D:\rocketmq\store_master\abort
broker-b.conf配置
brokerClusterName=DefaultCluster
#從節(jié)點(diǎn)的brokerName必須和主節(jié)點(diǎn)一樣
brokerName=broker-a
#0表示是一個(gè)主節(jié)點(diǎn), >0表示Slave
brokerId=1
deleteWhen=04
fileReservedTime=48
brokerRole=SLAVE
flushDiskType=ASYNC_FLUSH
#nameServer地址,分號(hào)分割
namesrvAddr=127.0.0.1:9876
#允許自動(dòng)創(chuàng)建主題
autoCreateTopicEnable=true
#注意需改端口,并且要和默認(rèn)的10911相差5以上
listenPort=10921
#存儲(chǔ)路徑
storePathRootDir=D:\rocketmq\store_slave
#commitLog存儲(chǔ)路徑
storePathCommitLog=D:\rocketmq\store_slave\commitLog
#消費(fèi)隊(duì)列存儲(chǔ)路徑
storePathConsumerQueue=D:\rocketmq\store_slave\consumerqueue
#消息索引存儲(chǔ)路徑
storePathIndex=D:\rocketmq\store_slave\index
#checkpoint 文件存儲(chǔ)路徑
storeCheckpoint=D:\rocketmq\store_slave\checkpoint
#abort 文件存儲(chǔ)路徑
abortFile=D:\rocketmq\store_slave\abort
啟動(dòng)broker集群
啟動(dòng)broker-a—
F:\path\alibabamq\rocketmq-all-4.3.2-bin-release\bin>mqbroker.cmd -c F:\path\ali
babamq\rocketmq-all-4.3.2-bin-release\conf\cluster\broker-a.conf
The broker[broker-a, 192.168.31.1:10911] boot success. serializeType=JSON and na
me server is 127.0.0.1:9876
啟動(dòng)broker-b—
F:\path\alibabamq\rocketmq-all-4.3.2-bin-release\bin>mqbroker.cmd -c F:\path\ali
babamq\rocketmq-all-4.3.2-bin-release\conf\cluster\broker-b.conf
The broker[broker-a, 192.168.31.1:10921] boot success. serializeType=JSON and na
me server is 127.0.0.1:9876
展示界面
Rocketmq集群同步方案
在RocketMQ中 Broker需要實(shí)現(xiàn)集群保證高可用(HA)
生產(chǎn)者投遞消息都是存放在主的Broker中
從Broker每次定時(shí)同步主Brokercommitlog日志文件。
A.如果實(shí)現(xiàn)一主一從讀寫分離消費(fèi)模型----
消費(fèi)者訂閱從節(jié)點(diǎn)消費(fèi)消息,可能會(huì)存在延遲問(wèn)題。(網(wǎng)絡(luò)數(shù)據(jù)傳輸過(guò)程中,延遲必然)
B. 消費(fèi)者直接訂閱我們主Broker消費(fèi)消息 延遲概率比較低
默認(rèn)的情況下,消費(fèi)者:訂閱我們主的Broker消費(fèi)消息,如果主的Broker節(jié)點(diǎn)物理內(nèi)存占用達(dá)到40%,開(kāi)始采用訂閱從節(jié)點(diǎn)實(shí)現(xiàn)消費(fèi),可以提高讀寫性能。----讀寫分離消費(fèi)模型架構(gòu)物理內(nèi)存占用達(dá)到40%----消息堆積—
如果主Broker宕機(jī)的情況下,生產(chǎn)者是無(wú)法投遞消息,而我們消費(fèi)者可以
訂閱我們從節(jié)點(diǎn)實(shí)現(xiàn)數(shù)據(jù)的消費(fèi)。
消費(fèi)者 消費(fèi)進(jìn)度同步問(wèn)題
在每個(gè)Broker服務(wù)器中, 在C:\Users\Administrator\store\config
consumerOffset.json 記錄每個(gè)分組消費(fèi)主題消息隊(duì)列 消費(fèi)進(jìn)度。
“topic_2020_mayikt@mayikt-group23”:{0:2,1:1,2:3,3:0
topic_2020_mayikt 主題名稱
mayikt-group23消費(fèi)者分組名稱
0:2,1:1,
0:隊(duì)列id 消費(fèi)進(jìn)度 offset為2的位置
1:隊(duì)列id 消費(fèi)者進(jìn)度 offset為1的位置
1.從節(jié)點(diǎn)定時(shí)的形式同步主節(jié)點(diǎn)消費(fèi)記錄信息,不管消費(fèi)者訂閱主節(jié)點(diǎn)還是從節(jié)點(diǎn)
最終都會(huì)優(yōu)先的將消費(fèi)記錄結(jié)果給主節(jié)點(diǎn),如果節(jié)點(diǎn)真的宕機(jī)的情況下,先記錄在
從節(jié)點(diǎn)。
2.如果主節(jié)點(diǎn)現(xiàn)在突然存活的情況下,從哪個(gè)位置開(kāi)始消費(fèi)呢?
如果我們消費(fèi)者內(nèi)存中有緩存消費(fèi)進(jìn)度的情況下,連接到主節(jié)點(diǎn)修改最新消費(fèi)者進(jìn)度記錄。
如果消費(fèi)者內(nèi)存沒(méi)有緩存消費(fèi)進(jìn)度的情況下,可能會(huì)發(fā)生重復(fù)消費(fèi)的問(wèn)題。
在RocketMQ中 Broker需要實(shí)現(xiàn)集群保證高可用(HA)
----Slave會(huì)同步Master節(jié)點(diǎn)中所有commitlog文件數(shù)據(jù)。
如果當(dāng)Master節(jié)點(diǎn)宕機(jī)了,這時(shí)候Master節(jié)點(diǎn)就無(wú)法寫入數(shù)據(jù)(生產(chǎn)者無(wú)法投遞數(shù)據(jù)),
但是消費(fèi)者可以根據(jù)Slave(備份)節(jié)點(diǎn)消費(fèi)數(shù)據(jù),注意的是 從Slave(備份)節(jié)點(diǎn)無(wú)法
寫入數(shù)據(jù)。
設(shè)置該參數(shù):slaveReadEnable 從服務(wù)器不允許讀;
RocketMQ讀寫分離機(jī)制:
讀寫分離消費(fèi)模型,提高IO讀寫的性能。
RocketMQ消費(fèi)者進(jìn)度如何同步:
Springboot整合方式
注意springboot整合rocketmq server端 版本一定要與rocketmq 不然可能啟動(dòng)報(bào)錯(cuò)
Maven依賴
org.springframework.boot
spring-boot-starter-parent
2.2.4.RELEASE
org.springframework.boot
spring-boot-starter-web
org.projectlombok
lombok
org.apache.rocketmq
rocketmq-spring-boot-starter
2.0.1
生產(chǎn)者
@Autowired
private RocketMQTemplate rocketMQTemplate;
/**
- 普通消息投遞 單向發(fā)送
*/
@GetMapping("/sendMsg")
public String sendMsg() {
MsgEntity msg = new MsgEntity(“mayikt” + UUID.randomUUID().toString(), 1234);
rocketMQTemplate.convertAndSend(RocketMQConfig.TOPIC_NAME, msg);
return “投遞消息 => " + msg.toString() + " => 成功”;
}
消費(fèi)者
/**
-
@ClassName RocketMQConsumer
-
@Author 螞蟻課堂余勝軍 QQ644064779 www.mayikt.com
-
@Version V1.0
log.info("消費(fèi)者監(jiān)聽(tīng)到消息:<msg:{}>", msgEntity);
**/
@Service
@Slf4j
@RocketMQMessageListener(consumerGroup = “mayikt-group5”, topic = “topic_meite”)
public class RocketMQConsumer implements RocketMQListener {
@Override
public void onMessage(MsgEntity msgEntity) {}
}
配置文件
spring:
application:
name: mayikt-rocketmq
server:
port: 8000
rocketmq:
rocketmq地址
name-server: 127.0.0.1:9876
producer:
# 必須填寫 group
group: mayikt-group
Rocketmq配置文件詳解
所屬集群名字
brokerClusterName=rocketmq-cluster
此處需手動(dòng)更改
broker名字,注意此處不同的配置文件填寫的不一樣
附加:按配置文件文件名來(lái)匹配
brokerName=broker-a
0 表示Master, > 0 表示slave
brokerId=0
此處許手動(dòng)更改
(此處nameserver跟host配置相匹配,9876為默認(rèn)rk服務(wù)默認(rèn)端口)nameServer 地址,分號(hào)分割
附加:broker啟動(dòng)時(shí)會(huì)跟nameserver建一個(gè)長(zhǎng)連接,broker通過(guò)長(zhǎng)連接才會(huì)向nameserver發(fā)新建的topic主題,然后java的客戶端才能跟nameserver端發(fā)起長(zhǎng)連接,向nameserver索取topic,找到topic主題之后,判斷其所屬的broker,建立長(zhǎng)連接進(jìn)行通訊,這是一個(gè)至關(guān)重要的路由的概念,重點(diǎn),也是區(qū)別于其它版本的一個(gè)重要特性
namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876
在發(fā)送消息時(shí),自動(dòng)創(chuàng)建服務(wù)器不存在的Topic,默認(rèn)創(chuàng)建的隊(duì)列數(shù)
defaultTopicQueueNums=4
是否允許Broker 自動(dòng)創(chuàng)建Topic,建議線下開(kāi)啟,線上關(guān)閉
autoCreateTopicEnable=true
是否允許Broker自動(dòng)創(chuàng)建訂閱組,建議線下開(kāi)啟,線上關(guān)閉
autoCreateSubscriptionGroup=true
Broker 對(duì)外服務(wù)的監(jiān)聽(tīng)端口
listenPort=10911
刪除文件時(shí)間點(diǎn),默認(rèn)是凌晨4點(diǎn)
deleteWhen=04
文件保留時(shí)間,默認(rèn)48小時(shí)
fileReservedTime=120
commitLog每個(gè)文件的大小默認(rèn)1G
附加:消息實(shí)際存儲(chǔ)位置,和ConsumeQueue是mq的核心存儲(chǔ)概念,之前搭建2m環(huán)境的時(shí)候創(chuàng)建在store下面,用于數(shù)據(jù)存儲(chǔ),consumequeue是一個(gè)邏輯的概念,消息過(guò)來(lái)之后,consumequeue并不是把消息所有保存起來(lái),而是記錄一個(gè)數(shù)據(jù)的位置,記錄好之后再把消息存到commitlog文件里
mapedFileSizeCommitLog=1073741824
ConsumeQueue每個(gè)文件默認(rèn)存30W條,根據(jù)業(yè)務(wù)情況調(diào)整
mapedFileSizeConsumeQueue=300000
destroyMapedFileIntervalForcibly=120000
redeleteHangedFileInterval=120000
檢測(cè)物理文件磁盤空間
diskMaxUsedSpaceRatio=88
存儲(chǔ)路徑
storePathRootDir=/usr/local/rocketmq/store
commitLog存儲(chǔ)路徑
storePathCommitLog=/usr/local/rocketmq/store/commitlog
消費(fèi)隊(duì)列存儲(chǔ)路徑
storePathConsumeQueue=/usr/local/rocketmq/store/consumequeue
消息索引存儲(chǔ)路徑
storePathIndex=/usr/local/rocketmq/store/index
checkpoint 文件存儲(chǔ)路徑
storeCheckpoint=/usr/local/rocketmq/store/checkpoint
abort 文件存儲(chǔ)路徑
abortFile=/usr/local/rocketmq/store/abort
限制的消息大小
maxMessageSize=65536
flushCommitLogLeastPages=4
flushConsumeQueueLeastPages=2
flushCommitLogThoroughInterval=10000
flushConsumeQueueThoroughInterval=60000
Broker 的角色
? ASYNC_MASTER 異步復(fù)制Master
? SYNC_MASTER 同步雙寫Master
? SLAVE
brokerRote=ASYNC_MASTER
刷盤方式
? ASYNC_FLUSH 異步刷盤
? SYNC_FLUSH 同步刷盤
flushDiskType=ASYNC_FLUSH
checkTransactionMessageEnable=false
發(fā)消息線程池?cái)?shù)量
sendMessageTreadPoolNums=128
拉消息線程池?cái)?shù)量
pullMessageTreadPoolNums=128
Rocketmq隊(duì)列分區(qū)模型
Rocketmq 底層存儲(chǔ)結(jié)構(gòu)中 將一個(gè)主題分成n多個(gè)不同的
隊(duì)列來(lái)實(shí)現(xiàn)存放消息。
創(chuàng)建了一個(gè)主題:需要指定隊(duì)列個(gè)數(shù) 默認(rèn)是4和16
寫隊(duì)列數(shù)量: 對(duì)我們生產(chǎn)者寫投遞隊(duì)列數(shù)量 16
讀隊(duì)列數(shù)量: 對(duì)我們消費(fèi)者獲取消息隊(duì)列數(shù)量 16
在rocketmq中,如果一個(gè)topic只有一個(gè)隊(duì)列的情況下支持的并行能力比較弱,所以會(huì)將一個(gè)topic分成分成n多個(gè)不同的隊(duì)列queue來(lái)實(shí)現(xiàn)存放, 類似于Kafka的分區(qū)模型概念。
writeQueueNums:寫隊(duì)列數(shù),表示producer發(fā)送到的MessageQueue的隊(duì)列個(gè)數(shù)
readQueueNums:讀隊(duì)列數(shù),表示Consumer讀取消息的MessageQueue隊(duì)列個(gè)數(shù)
注:這兩個(gè)值需要相等,在集群模式下如果不相等,writeQueueNums=6,readQueueNums=3, 那么每個(gè)broker上會(huì)有3個(gè)queue的消息是無(wú)法消費(fèi)的。
perm 隊(duì)列的權(quán)限:2表示w、4表示r 6表示rw
架構(gòu)模擬圖
注意我們r(jià)ocketmq與kafka設(shè)計(jì)思想原理相同,消息存儲(chǔ)在mq服務(wù)器端中,消費(fèi)者不管消費(fèi)失敗還是成功,最終消息還是會(huì)存在mq服務(wù)器端,可以通過(guò)日志策略清理消息。
RocketMQ解決方案
如果現(xiàn)在消費(fèi)者獲取消息、處理器都是同一個(gè)線程的情況下,
有可能會(huì)影響到我們消費(fèi)者速率—
消費(fèi)者獲取消息10毫秒—處理消息5s 5.001s
Rocketmq中 消費(fèi)者消費(fèi)的消息,采用多線程的形式,需要注意消息順序一致性的問(wèn)題。
與kafka思想是一樣的,消費(fèi)者不管消費(fèi)成功還是失敗,最終消息不會(huì)立即被刪除。
后期都是通過(guò)日志刪除策略,定時(shí)刪除消息。
NameServer原理
NameServer注冊(cè)中心 類似于zookeeper、nacos 存儲(chǔ) 服務(wù)節(jié)點(diǎn)信息。
Rocketmq 不使用Zookeeper而使用Nameserver實(shí)現(xiàn)注冊(cè)中心呢?
Cp(每個(gè)節(jié)點(diǎn)副本必須要保證數(shù)據(jù)一致性) ap(每個(gè)節(jié)點(diǎn)副本數(shù)據(jù)不一致
,保證可用性。)
Kafka:副本機(jī)制 副本選舉依賴 控制器 選舉依賴Zookeeper—選舉過(guò)程
在Rocketmq中 人為在配置文件中指定那個(gè)Broker 為主節(jié)點(diǎn) ,而在kafka中依賴
與zookeeper實(shí)現(xiàn)選舉的。
NameServer 每個(gè)節(jié)點(diǎn)相互之間沒(méi)有數(shù)據(jù)過(guò)程、都是獨(dú)立的 (ap模式)
保證可用性、不保證每個(gè)NameServer 數(shù)據(jù)的一致性問(wèn)題。
NameServer 與Broker 、生產(chǎn)者、消費(fèi)者關(guān)系。
NameServer 中存放那些信息呢?
NameServer 與Broker 關(guān)系
nameServer如何知道Broker 宕機(jī)呢?
發(fā)送一個(gè)心跳續(xù)約包給Broker 告訴我還在存活狀態(tài)
與nacos、eureka 實(shí)現(xiàn)服務(wù)注冊(cè)中心原理是一樣的。
生產(chǎn)者:
發(fā)送請(qǐng)求
Broker 與 nameserver心跳續(xù)約間隔30s
如果生產(chǎn)者獲取到一個(gè)故障的Broker地址,實(shí)現(xiàn)發(fā)送消息如果失敗的情況下
如何處理
答案:
實(shí)現(xiàn)調(diào)用 存儲(chǔ)消息。
NameServer類似于zookeeper實(shí)現(xiàn)服務(wù)注冊(cè)中心
為什么RocketMQ不使用zookeeper?而使用NameServer作為注冊(cè)中心呢?
Zookeeper實(shí)現(xiàn)注冊(cè)中心 模式CP模式 保證數(shù)據(jù)的一致性
NameServer 保證AP模式 可用性
特點(diǎn):
NameServer集群之間不需要數(shù)據(jù)同步
注冊(cè)分析:
Broker啟動(dòng)的時(shí)候,會(huì)讀取Broker.config 配置文件,獲取到多個(gè)不同的nameServer集群的地址列表,會(huì)將Broker信息注冊(cè)給所有的NameServer存儲(chǔ)與所有的NameServer保持長(zhǎng)連接。
注冊(cè)信息:
Broker 的IP和端口號(hào) 主題信息、集群信息 過(guò)濾器等。
NameSever如何知道我們Broker宕機(jī)呢?
續(xù)約機(jī)制:
Broker:
Rocketmq底層基于netty實(shí)現(xiàn)的,每個(gè)Broker在默認(rèn)每隔30
S時(shí)間給nameServer發(fā)送一個(gè)心跳續(xù)約包,告訴我Broker還在存活。
Name 每隔10s時(shí)間檢測(cè),故障Broker節(jié)點(diǎn),則剔除。
Broker關(guān)系
每個(gè)NameServer相互之間都是獨(dú)立,不會(huì)做任何數(shù)據(jù)同步,采用ap模式思想
Broker信息注冊(cè):Broker啟動(dòng)的時(shí)候,會(huì)根據(jù)配置文件讀取到多個(gè)NameServer地址,輪詢的將Broker信息注冊(cè)到每個(gè)NameServer上,這樣每個(gè)NameServer都有該Broker信息。
剔除:Broker需要每隔30s時(shí)間給每個(gè)NameServer發(fā)送一個(gè)心跳續(xù)約,如果沒(méi)有發(fā)送心跳續(xù)約的話,NameServer 會(huì)有一個(gè)定時(shí)器每隔每10s中掃描一次,檢測(cè)故障的Broker,則會(huì)剔除。
最終每個(gè)Broker與NameSserver保持長(zhǎng)連接。
生產(chǎn)者關(guān)系
如果是每隔30s發(fā)送續(xù)約,也就是意味著30s后 才能夠剔除該Broker
真好在這時(shí)候 生產(chǎn)者發(fā)送到該故障節(jié)點(diǎn)如何處理?
使用ack模式 確認(rèn)刷盤成功 才屬于生產(chǎn)者消息投遞成功。
相關(guān)疑問(wèn)
生產(chǎn)者發(fā)送消息三種模式
Producer發(fā)送消息有三種方式:同步、異步和單向
三種發(fā)送方式
單向 生產(chǎn)者投遞消息到mq中,不需要返回結(jié)果。
優(yōu)點(diǎn):延遲概率比較低
缺點(diǎn):丟失消息數(shù)據(jù)
投遞消息過(guò)程比較耗時(shí)時(shí)間5毫秒
異步 生產(chǎn)者投遞消息到mq中,使用回調(diào)形式返回。
投遞消息過(guò)程比較耗時(shí)時(shí)間5毫秒
補(bǔ)償----
同步
生產(chǎn)者投遞消息到mq中,采用同步的形式獲取到返回消息是否有
投遞成功的結(jié)果,導(dǎo)致接口延遲概率比較大。
投遞消息過(guò)程比較耗時(shí)時(shí)間10毫秒
發(fā)送請(qǐng)求 基于請(qǐng)求與響應(yīng)
1.同步發(fā)送:發(fā)送請(qǐng)求模式屬于同步的,發(fā)送該條消息不需等待該條消息發(fā)送成功之后,才可以繼續(xù)發(fā)送下一條。
2.異步發(fā)送:采用異步的發(fā)送模式,不需要同步阻塞等待,通過(guò)回調(diào)的形式監(jiān)聽(tīng)生產(chǎn)者消息投遞結(jié)果
3單向發(fā)送:只負(fù)責(zé)發(fā)送消息給mq,不管是否有發(fā)送成功。
同步發(fā)送
/**
- 同步發(fā)送
- @throws Exception
*/
@GetMapping("/sync")
public void sync() {
MsgEntity msg = new MsgEntity(“mayikt” + UUID.randomUUID().toString(), 1234);
SendResult sendResult = rocketMQTemplate.syncSend(RocketMQConfig.TOPIC_NAME, msg);
log.info(“同步發(fā)送字符串{}, 發(fā)送結(jié)果{}”, msg.toString(), sendResult);
}
異步發(fā)送
/**
-
異步發(fā)送
-
@throws Exception
@Overridepublic void onException(Throwable var1) {log.info("異步發(fā)送失敗{}", var1);}
*/
@GetMapping(“async”)
public void async() {
MsgEntity msg = new MsgEntity(“mayikt” + UUID.randomUUID().toString(), 1234);
log.info(">msg:<<" + msg);
rocketMQTemplate.asyncSend(RocketMQConfig.TOPIC_NAME, msg.toString(), new SendCallback() {
@Override
public void onSuccess(SendResult var1) {
log.info(“異步發(fā)送成功{}”, var1);
}});
}
單向發(fā)送
/**
- 普通消息投遞 單向發(fā)送
*/
@GetMapping("/sendMsg")
public String sendMsg() {
MsgEntity msg = new MsgEntity(“mayikt” + UUID.randomUUID().toString(), 1234);
rocketMQTemplate.convertAndSend(RocketMQConfig.TOPIC_NAME, msg);
return “投遞消息 => " + msg.toString() + " => 成功”;
}
順序消息
Rocketmq中,消費(fèi)者處理消息業(yè)務(wù)邏輯的時(shí)候 是采用多線程。
如何解決消息順序一致性的問(wèn)題?
實(shí)際上做業(yè)務(wù)邏輯開(kāi)發(fā)中,很少有需要保證消息順序一致性問(wèn)題。
1.在rocketmq 消費(fèi)者默認(rèn)是多線程異步消費(fèi)的,開(kāi)發(fā)者需要設(shè)定指定保證消息順序一致性問(wèn),也就是同一個(gè)隊(duì)列消息最終被同一個(gè)線程實(shí)現(xiàn)消費(fèi)。
相關(guān)代碼
生產(chǎn)者
String uuid = UUID.randomUUID().toString();
SendResult result1 = rocketMQTemplate.syncSendOrderly(RocketMQConfig.TOPIC_SEQUENTIAL, “insert”, uuid);
log.info(“insert:” + result1.toString());
SendResult result2 = rocketMQTemplate.syncSendOrderly(RocketMQConfig.TOPIC_SEQUENTIAL, “update”, uuid);
log.info(“update:” + result2.toString());
SendResult result3 = rocketMQTemplate.syncSendOrderly(RocketMQConfig.TOPIC_SEQUENTIAL, “delete”, uuid);
log.info(“delete:” + result3.toString());
消費(fèi)者
@Service
@Slf4j
@RocketMQMessageListener(consumerGroup = “mayikt-group20”, topic = “topic_seq”, consumeMode = ConsumeMode.ORDERLY
)
public class RocketMQConsumer01 implements RocketMQListener {
@Override
public void onMessage(String msg) {
try {
Random r = new Random(100);
int i = r.nextInt(500);
Thread.sleep(i);
} catch (Exception e) {
}
結(jié)果
消息存儲(chǔ)結(jié)構(gòu)
在win的安裝rocketmq,消息物理存放在
C:\Users\Administrator\store
commitlog:消息的存儲(chǔ)目錄
config:運(yùn)行期間一些配置信息
consumequeue:消息消費(fèi)隊(duì)列存儲(chǔ)目錄
index:消息索引文件存儲(chǔ)目錄
abort:如果存在abort文件說(shuō)明Broker非正常關(guān)閉,該文件默認(rèn)啟動(dòng)時(shí)創(chuàng)建,正常退出時(shí)刪除
checkpoint:文件檢測(cè)點(diǎn)。存儲(chǔ)commitlog文件最后一次刷盤時(shí)間戳、consumequeue最后一次刷盤時(shí)間、index索引文件最后一次刷盤時(shí)間戳。
學(xué)習(xí)kafka 的時(shí)候,topic主題消息分成n多個(gè)不同的partition 分區(qū)存放,
而在我們的rocketmq中,將一個(gè)topic主題的消息分成多個(gè)不同的Queue存放。
前提條件:commitlog日志文件沒(méi)有滿的情況下:
在rocketmq中所有topic主題對(duì)應(yīng)的隊(duì)列的消息都會(huì)存放在同一個(gè)commitlog日志文件中,
消費(fèi)者在消費(fèi)消息的時(shí)候 不會(huì)直接與commitlog日志文件打交道。
Rocketmq 提供消費(fèi)隊(duì)列 (邏輯概念)
Commitlog日志文件 存放消息內(nèi)容主體----Commitlogoffset
ConsumeQueue 不存放消息主體,只存放消息的Commitlogoffset、msgsize、msgtag
Queue offset與Commitlogoffset 之間區(qū)別?
Queue offset—消息存放在ConsumeQueue 消費(fèi)偏移量的位置
Commitlogoffset ----消息物理存放位置
[queueId=2, storeSize=242, queueOffset=25, sysFlag=0, bornTimestamp=1611665447721, bornHost=/192.168.31.1:55706, storeTimestamp=1611665447722, storeHost=/192.168.31.1:10911, msgId=C0A81F0100002A9F0000000000039330, commitLogOffset=234288, bodyCRC=336380854, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic=‘topic_ide_mayikt’, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=26,
生產(chǎn)者如何投遞消息:
消費(fèi)者如何消費(fèi):
消息的commitlogoffset 如何存放在不同的consumequeue中。
Consumequeue==16
投遞消息 消息key
消息key%16=1
Consumequeue 中 Consumeoffset 對(duì)應(yīng) 一條消息(沒(méi)有對(duì)應(yīng)消息主體)—commitlogoffset
消費(fèi)者消費(fèi)我們消息 在(kafka、rocketmq)中 消費(fèi)成功或者失敗都不會(huì)立即將該消息刪除,日志清理策略刪除。
Rocketmq消息存儲(chǔ)結(jié)構(gòu)
1.(commitlog日志文件沒(méi)有滿的情況下)在rocketmq中所有的消息日志,都存放在同一個(gè)commitlog日志文件中,(默認(rèn)是1GB來(lái)存儲(chǔ) 好處1零拷貝映射 好處2 分段 非常好方式管理清理日志文件 commitlog命名就是上一個(gè)commitlog日志文件中最后一個(gè)commitlog-offset值)
,ConsumeQueue類似于在kafka中的 partition 分區(qū)模型, 而ConsumeQueue存放的是commitlog開(kāi)始的commitLogoffset、msgsize、tag。
2.每次消費(fèi)者讀取消息的時(shí)候,先讀取ConsumeQueue中獲取到commitLogoffset,在根據(jù)該
commitLogoffset查找commitLog日志文件獲取到消息體返回給消費(fèi)者客戶端。
3.與kafka的設(shè)計(jì)不同
根據(jù)阿里巴巴消息中間件團(tuán)隊(duì)的測(cè)試,如果每個(gè)topic中的partition 分區(qū)存儲(chǔ)的消息過(guò)多
,可能會(huì)影響到磁盤io的讀寫性能,所以采用ConsumeQueue存放少量的數(shù)據(jù),消息讀取還是通過(guò)commitlog文件中查找。
Rocketmq讀寫使用:MappedByteBuffer 零拷貝
因?yàn)閞ocketmq 中日志文件存儲(chǔ)采用文件映射機(jī)制+mmap 減少用戶態(tài)與內(nèi)核態(tài)來(lái)回拷貝的次數(shù),從而可以提高性能
MappedByteBuffer 文件存儲(chǔ)映射,只能映射用戶態(tài)1.5GB-2GB 所以RocketMQ日志文件存儲(chǔ)commitlog默認(rèn)為1G大小。
commitLog文件
commitlog文件的存儲(chǔ)地址:KaTeX parse error: Undefined control sequence: \store at position 5: HOME\?s?t?o?r?e?\commitlog{fileName},每個(gè)文件的大小默認(rèn)1G =102410241024,commitlog的文件名fileName,名字長(zhǎng)度為20位,左邊補(bǔ)零,剩余為起始偏移量;比如00000000000000000000代表了第一個(gè)文件,起始偏移量為0,文件大小為1G=1073741824;當(dāng)這個(gè)文件滿了,第二個(gè)文件名字為00000000001073741824,起始偏移量為1073741824,以此類推,第三個(gè)文件名字為00000000002147483648,起始偏移量為2147483648 ,消息存儲(chǔ)的時(shí)候會(huì)順序?qū)懭胛募?#xff0c;當(dāng)文件滿了,寫入下一個(gè)文件。
冪等問(wèn)題
整合方式
消費(fèi)者冪等問(wèn)題
當(dāng)消費(fèi)者消費(fèi)消息失敗的時(shí)候,可以返回繼續(xù)重復(fù)消費(fèi),這時(shí)候rocketmq會(huì)給
該消費(fèi)者不斷的重試,重試的過(guò)程中需要注意冪等性問(wèn)題。
在rocketmq中,根據(jù)返回ack 狀態(tài):
ConsumeConcurrentlyStatus
CONSUME_SUCCESS --消費(fèi)成功
RECONSUME_LATER— 消費(fèi)失敗繼續(xù)重試消費(fèi)。
Messageid
對(duì)我們每條消息生成一個(gè)messageid------根據(jù)該id 實(shí)現(xiàn)消費(fèi)者冪等性問(wèn)題
Messageid 生成規(guī)則:關(guān)聯(lián)存放在那個(gè)mq服務(wù)器端、commitlog-offset值。
Messageid --C0A812E0495C18B4AAC276054FC20005 轉(zhuǎn)碼
獲取到commitlog-offset —查找物理對(duì)應(yīng)消息位置。
Rocketmq中的 messageid 長(zhǎng)度一共占用16個(gè)字節(jié),其中包含消息存儲(chǔ)的ip和端口號(hào),
消息commitLogOffset,客戶端發(fā)送請(qǐng)求傳遞messageid 給mq服務(wù)器端,mq服務(wù)器端
根據(jù)該commitlogoffset值查找對(duì)應(yīng)的消息返回給客戶端。
消息過(guò)濾
1.Tag就是相當(dāng)于給消息打一個(gè)標(biāo)簽,比如我的文章 屬于那些標(biāo)簽下,用戶如果訂閱了該標(biāo)簽就可能刷到該文章。
2. 一個(gè)消息可能打上了多個(gè)tag標(biāo)簽,消費(fèi)者訂閱該主題和tag標(biāo)簽,標(biāo)簽如果一致的情況下就可以獲取到該消息。
原理:在我們ConsumeQueue中,存儲(chǔ)結(jié)構(gòu)為 commitoffset、size、messagetag 會(huì)根據(jù)消費(fèi)者傳遞的tag hash值與隊(duì)列中每條消息的tag hash 做比較,如果相等的情況則獲取該消息,如果不相等的情況下,則不會(huì)獲取該消息。
消費(fèi)者組
與kafka基本原理相同,同一個(gè)消費(fèi)者中,只能有一個(gè)消費(fèi)者消費(fèi)消息,
多個(gè)不同的消費(fèi)者組可以消費(fèi)同一條消息。
消費(fèi)成功之后,提交offset。
分布式事務(wù)
消息的可用性
刷盤:將數(shù)據(jù)從pagecache中刷盤到硬盤中存儲(chǔ)
,避免數(shù)據(jù)的丟失。
刷盤模式:
1.同步刷盤:
生產(chǎn)者必須等待該消息 從pagecache刷盤到硬盤中,在返回
Ack給生產(chǎn)者。
優(yōu)點(diǎn):可以保證消息不丟失
缺點(diǎn):影響到整體接口吞吐量
應(yīng)用場(chǎng)景:金融支付類 msg
2.異步刷盤
生產(chǎn)者不需要等待pagecache刷盤到硬盤中結(jié)果,完全采用異步的形式
優(yōu)點(diǎn):
提高整體接口吞吐量
缺點(diǎn):
有可能消息會(huì)丟失 (概率不是很大)
應(yīng)用場(chǎng)景:行為分析---- 100 -12 98%
刷盤策略
RocketMQ,默認(rèn)會(huì)將消息持久化存放在硬盤,首先會(huì)
寫入到系統(tǒng)PageCahe中,讓后再刷盤到硬盤,這樣就可以保證
硬盤與PageCache中數(shù)據(jù)完全一致性。
同步刷盤:
消息真正刷盤到磁盤中,才會(huì)返回給生產(chǎn)者,只要磁盤沒(méi)有壞,這樣做
可以保證消息不丟失,但是可能會(huì)影響整體的吞吐量。
異步刷盤:
讀寫充分利用pagecache,消息寫入到pagecache成功之后,采用異步的形式
刷盤到硬盤中,可以提高系統(tǒng)的吞吐量,但是可能消息會(huì)丟失。
RocketMQ解決分布式事務(wù)問(wèn)題
RocketMQ解決分布式事務(wù)思想與原理
RocketMQ在其消息定義的基礎(chǔ)上,對(duì)事務(wù)消息擴(kuò)展了兩個(gè)相關(guān)的概念:
1.Half(Prepare) Message——半消息(預(yù)處理消息)
半消息是一種特殊的消息類型,該狀態(tài)的消息暫時(shí)不能被Consumer消費(fèi)。當(dāng)一條事務(wù)消息被成功投遞到Broker上,但是Broker并沒(méi)有接收到Producer發(fā)出的二次確認(rèn)時(shí),該事務(wù)消息就處于"暫時(shí)不可被消費(fèi)"狀態(tài),該狀態(tài)的事務(wù)消息被稱為半消息。
2.Message Status Check——消息狀態(tài)回查
由于網(wǎng)絡(luò)抖動(dòng)、Producer重啟等原因,可能導(dǎo)致Producer向Broker發(fā)送的二次確認(rèn)消息沒(méi)有成功送達(dá)。如果Broker檢測(cè)到某條事務(wù)消息長(zhǎng)時(shí)間處于半消息狀態(tài),則會(huì)主動(dòng)向Producer端發(fā)起回查操作,查詢?cè)撌聞?wù)消息在Producer端的事務(wù)狀態(tài)(Commit 或 Rollback)??梢钥闯?#xff0c;Message Status Check主要用來(lái)解決分布式事務(wù)中的超時(shí)問(wèn)題。
RocketMQ的事務(wù)消息是基于兩階段提交實(shí)現(xiàn)的,也就是說(shuō)消息有兩個(gè)狀態(tài),prepared和commited。當(dāng)消息執(zhí)行完send方法后,進(jìn)入的prepared狀態(tài),進(jìn)入prepared狀態(tài)以后,就要執(zhí)行executeLocalTransaction方法,這個(gè)方法的返回值有3個(gè),也決定著這個(gè)消息的命運(yùn),
COMMIT_MESSAGE:提交消息,這個(gè)消息由prepared狀態(tài)進(jìn)入到commited狀態(tài),消費(fèi)者可以消費(fèi)這個(gè)消息;
ROLLBACK_MESSAGE:回滾,這個(gè)消息將被刪除,消費(fèi)者不能消費(fèi)這個(gè)消息;
UNKNOW:未知,這個(gè)狀態(tài)有點(diǎn)意思,如果返回這個(gè)狀態(tài),這個(gè)消息既不提交,也不回滾,還是保持prepared狀態(tài),而最終決定這個(gè)消息命運(yùn)的,是checkLocalTransaction這個(gè)方法。
RocketMQ實(shí)現(xiàn)分布式事務(wù)的原理:
消費(fèi)。
服務(wù)器端,RocketMQ服務(wù)器端將該消息,推送給消費(fèi)者消費(fèi)。
核心思想:確保生產(chǎn)者一定將消息投遞到mq服務(wù)器端,生產(chǎn)者必須先一定執(zhí)行完成,在執(zhí)行消費(fèi)者。
RocketMQLocalTransactionState.ROLLBACK;—回滾
RocketMQLocalTransactionState.COMMIT—提交
RocketMQLocalTransactionState.UNKNOWN— 不是提交也不是回滾。
存在的一些問(wèn)題:
發(fā)送ack給rocketmq 回滾該消息即可,不會(huì)被消費(fèi)者消費(fèi)。
Rocketmq服務(wù)器端 在默認(rèn)的情況下 每隔60s 檢查本地事務(wù)是否已經(jīng)執(zhí)行過(guò),如果執(zhí)行過(guò)的情況下,則提交該提交,如果沒(méi)有執(zhí)行該事務(wù)的情況下,則回滾。
常見(jiàn)錯(cuò)誤
Lock failed,MQ already started
將 broker 的 master 和 slave 節(jié)點(diǎn)放在同一臺(tái)機(jī)器上,配置的storePath相同導(dǎo)致的,修改配置文件,改為不同的路徑即可解決。
總結(jié)
以上是生活随笔為你收集整理的RocketMQ简介的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: 分布式服务追踪与调用链系统
- 下一篇: SpringBoot如何返回页面