RocketMQ:Producer启动流程与消息发送源码分析
文章目錄
- Producer
- 1.方法和屬性
- 2.啟動流程
- 3.消息發送
- 3.1驗證消息
- 3.2查找路由
- 3.3選擇隊列
- 3.4發送消息
- 3.5發送批量消息
Producer
在RocketMQ中,消息生產者就是客戶端,即消息的提供者。
以下是消息生產者Producer項目預覽圖:
1.方法和屬性
Producer的相關核心類:
MQAdmin接口方法介紹:
//創建主題 void createTopic(final String key, final String newTopic, final int queueNum)throws MQClientException; //根據時間戳從隊列中查找消息偏移量 long searchOffset(final MessageQueue mq, final long timestamp) throws MQClientException; //查找消息隊列中最大/小偏移量 long maxOffset(final MessageQueue mq) throws MQClientException; long minOffset(final MessageQueue mq) throws MQClientException; //根據偏移量查找信息 MessageExt viewMessage(final String offsetMsgId) throws RemotingException, MQBrokerException,InterruptedException, MQClientException; //根據條件查詢消息 QueryResult queryMessage(final String topic, final String key, final int maxNum, final long begin,final long end) throws MQClientException, InterruptedException; //根據主題和消息ID查詢消息 MessageExt viewMessage(String topic,String msgId) throws RemotingException, MQBrokerException, InterruptedException, MQClientException;MQProducer接口方法介紹:
//啟動 void start() throws MQClientException; //關閉 void shutdown(); //查找該主題下的所有消息隊列 List<MessageQueue> fetchPublishMessageQueues(final String topic) throws MQClientException; //同步發送消息 SendResult send(final Message msg) throws MQClientException, RemotingException, MQBrokerException,InterruptedException; //同步超時發送消息 SendResult send(final Message msg, final long timeout) throws MQClientException,RemotingException, MQBrokerException, InterruptedException; //異步發送消息 void send(final Message msg, final SendCallback sendCallback) throws MQClientException,RemotingException, InterruptedException; //異步并附帶超時時間的消息發送 void send(final Message msg, final SendCallback sendCallback, final long timeout)throws MQClientException, RemotingException, InterruptedException; //發送單向消息-無需關注返回結果-void void sendOneway(final Message msg) throws MQClientException, RemotingException,InterruptedException; //同步并指定消息隊列發送消息 SendResult send(final Message msg, final MessageQueue mq) throws MQClientException,RemotingException, MQBrokerException, InterruptedException; //選擇指定隊列異步發送消息 void send(final Message msg, final MessageQueue mq, final SendCallback sendCallback)throws MQClientException, RemotingException, InterruptedException; //批量發送消息 SendResult send(final Collection<Message> msgs) throws MQClientException, RemotingException, MQBrokerException,InterruptedException;DefaultMQProducer:
屬性介紹:
producerGroup:生產者所屬組 createTopicKey:默認Topic defaultTopicQueueNums:默認主題在每一個Broker隊列數量 sendMsgTimeout:發送消息默認超時時間,默認3s compressMsgBodyOverHowmuch:消息體超過該值則啟用壓縮,默認4k retryTimesWhenSendFailed:同步方式發送消息重試次數,默認為2,總共執行3次 retryTimesWhenSendAsyncFailed:異步方法發送消息重試次數,默認為2 retryAnotherBrokerWhenNotStoreOK:消息重試時選擇另外一個Broker時,是否不等待存儲結果就返回,默認為false maxMessageSize:允許發送的最大消息長度,默認為4M2.啟動流程
啟動時序圖如下:
DefaultMQProducerImpl#start
switch (this.serviceState) {case CREATE_JUST:this.serviceState = ServiceState.START_FAILED;//檢查生產者組配置this.checkConfig();//生產組名=CLIENT_INNER_PRODUCERif (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {//將生產者實例名稱改為PIDthis.defaultMQProducer.changeInstanceNameToPID();}//獲得MQ客戶端實例this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQProducer, rpcHook);//注冊生產者到MQClientInstance中并返回注冊結果boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);if (!registerOK) {this.serviceState = ServiceState.CREATE_JUST;throw new MQClientException("The producer group[" + this.defaultMQProducer.getProducerGroup()+ "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),null);}//存入Topic主題發布信息this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo());//如果是初次啟動if (startFactory) {//啟動MQ客戶端mQClientFactory.start();}log.info("the producer [{}] start OK. sendMessageWithVIPChannel={}", this.defaultMQProducer.getProducerGroup(),this.defaultMQProducer.isSendMessageWithVIPChannel());//將服務狀態改為RUNNINGthis.serviceState = ServiceState.RUNNING;break;MQClientManager
//單例-一個JVM中只存在一個MQClientManager實例 private static MQClientManager instance = new MQClientManager(); //MQClientManager-維護一個MQClientInstance緩存表 //同一個clientId只會對應一個MQClientInstance //MQClientInstance封裝了RocketMQ網絡處理API,是消息生產者和消息消費者與NameServer、Broker打交道的網絡通道 private ConcurrentMap<String/* clientId */, MQClientInstance> factoryTable = new ConcurrentHashMap<String, MQClientInstance>();MQClientManager#getOrCreateMQClientInstance
//構建MQClientId String clientId = clientConfig.buildMQClientId(); //在緩存表中查詢是否存在instance MQClientInstance instance = this.factoryTable.get(clientId); //instance不存在 if (null == instance) {//構建instanceinstance =new MQClientInstance(clientConfig.cloneClientConfig(),this.factoryIndexGenerator.getAndIncrement(), clientId, rpcHook);//存入表中MQClientInstance prev = this.factoryTable.putIfAbsent(clientId, instance);if (prev != null) {instance = prev;log.warn("Returned Previous MQClientInstance for clientId:[{}]", clientId);} else {log.info("Created new MQClientInstance for clientId:[{}]", clientId);} }return instance;3.消息發送
消息發送時序圖:
DefaultMQProducerImpl#send(Message msg)
/*** DEFAULT SYNC -------------------------------------------------------*/ public SendResult send(Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {return send(msg, this.defaultMQProducer.getSendMsgTimeout()); }DefaultMQProducer#send(Message msg,long timeout)
/**** @param msg* @param timeout 默認超時時長為3s* @return 返回發送結果*/ public SendResult send(Message msg,long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {return this.sendDefaultImpl(msg, CommunicationMode.SYNC, null, timeout); }DefaultMQProducerImpl#sendDefaultImpl
3.1驗證消息
//驗證消息 Validators.checkMessage(msg, this.defaultMQProducer);Validators#checkMessage
//檢查是否為空 if (null == msg) {throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message is null"); } // topic-校驗主題 Validators.checkTopic(msg.getTopic()); //是否是禁止發送的消息主題 Validators.isNotAllowedSendTopic(msg.getTopic());// body-檢查是否為空 if (null == msg.getBody()) {throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message body is null"); } if (0 == msg.getBody().length) {throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message body length is zero"); } //body-檢查消息體是否大于消息最大限制大小 if (msg.getBody().length > defaultMQProducer.getMaxMessageSize()) {throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL,"the message body size over max value, MAX: " + defaultMQProducer.getMaxMessageSize()); }3.2查找路由
DefaultMQProducerImpl#tryToFindTopicPublishInfo
TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());DefaultMQProducerImpl#tryToFindTopicPublishInfo
//在本地緩存中獲取主題的路由信息 TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic); //若路由信息為空 || !ok() -> !(null != this.messageQueueList && !this.messageQueueList.isEmpty()) if (null == topicPublishInfo || !topicPublishInfo.ok()) {//為主題創建路由信息-存入緩存表this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());//從nameServer中獲取路由信息this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);topicPublishInfo = this.topicPublishInfoTable.get(topic); } //如果查詢出的Info合法-返回Info if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {return topicPublishInfo; } else {//否則將從nameServer獲取的路由信息更新到緩存表中this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);topicPublishInfo = this.topicPublishInfoTable.get(topic);return topicPublishInfo; }TopicPublishInfo
public class TopicPublishInfo {private boolean orderTopic = false; //是否是順序消息private boolean haveTopicRouterInfo = false; //是否有路由信息private List<MessageQueue> messageQueueList = new ArrayList<MessageQueue>(); //該主題的消息隊列private volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex(); //每選擇一次消息隊列該值+1private TopicRouteData topicRouteData; //關聯Topic路由元信息 }TopicRouteData
public class TopicRouteData extends RemotingSerializable {private String orderTopicConf; //順序消息配置private List<QueueData> queueDatas; //Broker隊列信息private List<BrokerData> brokerDatas; //Broker信息private HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable; //消息過濾表 }MQClientInstance#updateTopicRouteInfoFromNameServer
//this.lockNamesrv.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)-加鎖 TopicRouteData topicRouteData; //使用默認主題從NameServer獲取路由信息 if (isDefault && defaultMQProducer != null) {topicRouteData = this.mQClientAPIImpl.getDefaultTopicRouteInfoFromNameServer(defaultMQProducer.getCreateTopicKey(),clientConfig.getMqClientApiTimeout());if (topicRouteData != null) {for (QueueData data : topicRouteData.getQueueDatas()) {int queueNums = Math.min(defaultMQProducer.getDefaultTopicQueueNums(), data.getReadQueueNums());data.setReadQueueNums(queueNums);data.setWriteQueueNums(queueNums);}} } else {//使用指定主題從NameServer獲取路由信息topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, clientConfig.getMqClientApiTimeout()); } if (changed) {//克隆出一份主題路由信息TopicRouteData cloneTopicRouteData = topicRouteData.cloneTopicRouteData();for (BrokerData bd : topicRouteData.getBrokerDatas()) {this.brokerAddrTable.put(bd.getBrokerName(), bd.getBrokerAddrs());}// Update Pub info{//將topicRouteData轉化為TopicPublishInfoTopicPublishInfo publishInfo = topicRouteData2TopicPublishInfo(topic, topicRouteData);//是否有主題路由信息-設置為truepublishInfo.setHaveTopicRouterInfo(true);Iterator<Entry<String, MQProducerInner>> it = this.producerTable.entrySet().iterator();//遍歷生產者while (it.hasNext()) {Entry<String, MQProducerInner> entry = it.next();MQProducerInner impl = entry.getValue();//如果生產者不為空if (impl != null) {//更新publishInfo信息impl.updateTopicPublishInfo(topic, publishInfo);}}}// Update sub info{//主題訂閱信息-消息消費隊列Set<MessageQueue> subscribeInfo = topicRouteData2TopicSubscribeInfo(topic, topicRouteData);//遍歷消費者Iterator<Entry<String, MQConsumerInner>> it = this.consumerTable.entrySet().iterator();while (it.hasNext()) {Entry<String, MQConsumerInner> entry = it.next();MQConsumerInner impl = entry.getValue();//消費者不為空if (impl != null) {//更新subscribeInfo信息impl.updateTopicSubscribeInfo(topic, subscribeInfo);}}}log.info("topicRouteTable.put. Topic = {}, TopicRouteData[{}]", topic, cloneTopicRouteData);this.topicRouteTable.put(topic, cloneTopicRouteData);return true; }MQClientInstance#topicRouteData2TopicPublishInfo
public static TopicPublishInfo topicRouteData2TopicPublishInfo(final String topic, final TopicRouteData route) {//創建TopicPublishInfo對象TopicPublishInfo info = new TopicPublishInfo();//關聯TopicRouteData信息info.setTopicRouteData(route);//順序消息,更新TopicPublishInfoif (route.getOrderTopicConf() != null && route.getOrderTopicConf().length() > 0) {//獲取Broke列表-用分號隔開String[] brokers = route.getOrderTopicConf().split(";");for (String broker : brokers) {String[] item = broker.split(":");int nums = Integer.parseInt(item[1]);for (int i = 0; i < nums; i++) {MessageQueue mq = new MessageQueue(topic, item[0], i);info.getMessageQueueList().add(mq);}}//設置為順序消息info.setOrderTopic(true);} else {//非順序消息更新TopicPublishInfo//獲取消息隊列信息List<QueueData> qds = route.getQueueDatas();Collections.sort(qds);//遍歷topic隊列信息for (QueueData qd : qds) {//權限為可寫if (PermName.isWriteable(qd.getPerm())) {BrokerData brokerData = null;//遍歷寫隊列for (BrokerData bd : route.getBrokerDatas()) {//根據名稱獲取寫隊列對應的brokerif (bd.getBrokerName().equals(qd.getBrokerName())) {brokerData = bd;break;}}if (null == brokerData) {continue;}if (!brokerData.getBrokerAddrs().containsKey(MixAll.MASTER_ID)) {continue;}//填充TopicPublishInfo消息隊列列表for (int i = 0; i < qd.getWriteQueueNums(); i++) {MessageQueue mq = new MessageQueue(topic, qd.getBrokerName(), i);info.getMessageQueueList().add(mq);}}}//順序消息設置為falseinfo.setOrderTopic(false);}return info; }3.3選擇隊列
//DefaultMQProducerImpl#sendDefaultImpl MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName); //DefaultMQProducerImpl#selectOneMessageQueue return this.mqFaultStrategy.selectOneMessageQueue(tpInfo, lastBrokerName);MQFaultStrategy#selectOneMessageQueue
//是否啟用Broker故障延遲機制 if (this.sendLatencyFaultEnable) {try {int index = tpInfo.getSendWhichQueue().incrementAndGet();for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();if (pos < 0) {pos = 0;}MessageQueue mq = tpInfo.getMessageQueueList().get(pos);if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) {return mq;}}final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);if (writeQueueNums > 0) {final MessageQueue mq = tpInfo.selectOneMessageQueue();if (notBestBroker != null) {mq.setBrokerName(notBestBroker);mq.setQueueId(tpInfo.getSendWhichQueue().incrementAndGet() % writeQueueNums);}return mq;} else {latencyFaultTolerance.remove(notBestBroker);}} catch (Exception e) {log.error("Error occurred when selecting message queue", e);}return tpInfo.selectOneMessageQueue(); } //不啟用Broker故障延遲機制 return tpInfo.selectOneMessageQueue(lastBrokerName);默認不啟用Broker故障延遲機制
TopicPublishInfo#selectOneMessageQueue(final String lastBrokerName)
public MessageQueue selectOneMessageQueue(final String lastBrokerName) {//第一次選擇消息隊列if (lastBrokerName == null) {return selectOneMessageQueue();} else {for (int i = 0; i < this.messageQueueList.size(); i++) {//選擇一次-sendWhichQueue自增int index = this.sendWhichQueue.incrementAndGet();//取模int pos = Math.abs(index) % this.messageQueueList.size();if (pos < 0) {pos = 0;}//輪詢選擇消息隊列MessageQueue mq = this.messageQueueList.get(pos);//規避上次選擇的隊列if (!mq.getBrokerName().equals(lastBrokerName)) {return mq;}}return selectOneMessageQueue();} }TopicPublishInfo#selectOneMessageQueue()
//第一次選擇消息隊列 public MessageQueue selectOneMessageQueue() {//sendWhichQueue自增int index = this.sendWhichQueue.incrementAndGet();//對隊列大小取模int pos = Math.abs(index) % this.messageQueueList.size();if (pos < 0) {pos = 0;}//返回對應的隊列return this.messageQueueList.get(pos); }啟用Broker故障延遲機制
DefaultMQProducerImpl#selectOneMessageQueue
//Broker故障延遲機制 if (this.sendLatencyFaultEnable) {try {//對sendWhichQueue自增int index = tpInfo.getSendWhichQueue().incrementAndGet();//對消息隊列輪詢獲取一個隊列for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();if (pos < 0) {pos = 0;}MessageQueue mq = tpInfo.getMessageQueueList().get(pos);//驗證該隊列是否可用->可用即返回-不可用繼續輪詢if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) {return mq;}}//沒有選出較為合適的消息隊列->讓延遲容錯機制至少選出一個Broker出來final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();//寫隊列個數int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);//寫隊列個數大于0if (writeQueueNums > 0) {//選出一個消息隊列->指定broker和隊列ID并返回final MessageQueue mq = tpInfo.selectOneMessageQueue();if (notBestBroker != null) {mq.setBrokerName(notBestBroker);mq.setQueueId(tpInfo.getSendWhichQueue().incrementAndGet() % writeQueueNums);}return mq;} else {//該Broker也不可用->從容錯隊列中移除該BrokerlatencyFaultTolerance.remove(notBestBroker);}} catch (Exception e) {log.error("Error occurred when selecting message queue", e);}return tpInfo.selectOneMessageQueue(); }- 延遲機制接口規范
- FaultItem:失敗條目
- 消息失敗策略
DefaultMQProducerImpl#sendDefaultImpl
//消息發送->發送成功調用回調函數sendCallback sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime); //調用失敗記錄失敗時間戳 endTimestamp = System.currentTimeMillis(); //更新調用失敗條目 this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);MQFaultStrategy#updateFaultItem
if (this.sendLatencyFaultEnable) {//計算broker規避的時長long duration = computeNotAvailableDuration(isolation ? 30000 : currentLatency);//更新該FaultItem規避時長this.latencyFaultTolerance.updateFaultItem(brokerName, currentLatency, duration); }MQFaultStrategy#computeNotAvailableDuration
//遍歷latencyMax for (int i = latencyMax.length - 1; i >= 0; i--) {//找到第一個比currentLatency的latencyMax值if (currentLatency >= latencyMax[i])return this.notAvailableDuration[i]; } //沒有找到則返回0 return 0;LatencyFaultToleranceImpl#updateFaultItem
//原來的失敗條目信息 FaultItem old = this.faultItemTable.get(name); if (null == old) {//失敗條目為空->新建faultItem對象設置規避時長和開始時間final FaultItem faultItem = new FaultItem(name);faultItem.setCurrentLatency(currentLatency);faultItem.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);old = this.faultItemTable.putIfAbsent(name, faultItem);if (old != null) {old.setCurrentLatency(currentLatency);old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);} } else {//不為空->更新規避時長和開始時間old.setCurrentLatency(currentLatency);old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration); }3.4發送消息
DefaultMQProducerImpl#sendKernelImpl
private SendResult sendKernelImpl(final Message msg, //待發送消息final MessageQueue mq, //消息發送隊列 final CommunicationMode communicationMode, //消息發送模式->ASYNC/SYNC/ONEWAYfinal SendCallback sendCallback, //異步消息回調函數final TopicPublishInfo topicPublishInfo, //主題路由信息 final long timeout) //消息發送超時時間 //根據BrokerName獲取Broker地址 String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName()); //地址為空 if (null == brokerAddr) {//更新broker網絡地址信息tryToFindTopicPublishInfo(mq.getTopic());brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName()); } //非批量消息發送->設置消息唯一ID //批量消息->在消息打包過程中已經生成唯一ID if (!(msg instanceof MessageBatch)) {MessageClientIDSetter.setUniqID(msg); }boolean topicWithNamespace = false; if (null != this.mQClientFactory.getClientConfig().getNamespace()) {msg.setInstanceId(this.mQClientFactory.getClientConfig().getNamespace());topicWithNamespace = true; }int sysFlag = 0; boolean msgBodyCompressed = false; //大于4k->進行壓縮 if (this.tryToCompressMessage(msg)) {sysFlag |= MessageSysFlag.COMPRESSED_FLAG;msgBodyCompressed = true; } //如果是事務消息,設置消息標記MessageSysFlag.TRANSACTION_PREPARED_TYPE final String tranMsg = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED); if (tranMsg != null && Boolean.parseBoolean(tranMsg)) {sysFlag |= MessageSysFlag.TRANSACTION_PREPARED_TYPE; } //如果注冊消息發送鉤子函數->消息發送之前進行邏輯增強 if (this.hasSendMessageHook()) {context = new SendMessageContext();context.setProducer(this);context.setProducerGroup(this.defaultMQProducer.getProducerGroup());context.setCommunicationMode(communicationMode);context.setBornHost(this.defaultMQProducer.getClientIP());context.setBrokerAddr(brokerAddr);context.setMessage(msg);context.setMq(mq);context.setNamespace(this.defaultMQProducer.getNamespace());String isTrans = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);if (isTrans != null && isTrans.equals("true")) {context.setMsgType(MessageType.Trans_Msg_Half);}if (msg.getProperty("__STARTDELIVERTIME") != null || msg.getProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL) != null) {context.setMsgType(MessageType.Delay_Msg);}this.executeSendMessageHookBefore(context); }SendMessageHook
public interface SendMessageHook {String hookName();void sendMessageBefore(final SendMessageContext context);void sendMessageAfter(final SendMessageContext context); } //構建消息發送請求頭 SendMessageRequestHeader requestHeader = new SendMessageRequestHeader(); //生產者組 requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup()); //主題 requestHeader.setTopic(msg.getTopic()); //創建默認主題 requestHeader.setDefaultTopic(this.defaultMQProducer.getCreateTopicKey()); //該主題默認主題隊列個數4 requestHeader.setDefaultTopicQueueNums(this.defaultMQProducer.getDefaultTopicQueueNums()); //隊列ID requestHeader.setQueueId(mq.getQueueId()); //消息系統標識 requestHeader.setSysFlag(sysFlag); //消息發送時間戳 requestHeader.setBornTimestamp(System.currentTimeMillis()); //消息標記 requestHeader.setFlag(msg.getFlag()); //消息拓展信息 requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties())); //消息重試次數->初始為0 requestHeader.setReconsumeTimes(0); requestHeader.setUnitMode(this.isUnitMode()); //是否是批量消息 requestHeader.setBatch(msg instanceof MessageBatch); //如果是發送重試消息 if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {String reconsumeTimes = MessageAccessor.getReconsumeTime(msg);//對消息重試次數進行更新if (reconsumeTimes != null) {requestHeader.setReconsumeTimes(Integer.valueOf(reconsumeTimes));MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_RECONSUME_TIME);}String maxReconsumeTimes = MessageAccessor.getMaxReconsumeTimes(msg);if (maxReconsumeTimes != null) {requestHeader.setMaxReconsumeTimes(Integer.valueOf(maxReconsumeTimes));MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_MAX_RECONSUME_TIMES);} } SendResult sendResult = null; switch (communicationMode) {//異步發送 case ASYNC:Message tmpMessage = msg;boolean messageCloned = false;//如果消息體被壓縮if (msgBodyCompressed) {//msgBody應該使用prevBodytmpMessage = MessageAccessor.cloneMessage(msg);messageCloned = true;msg.setBody(prevBody);}if (topicWithNamespace) {if (!messageCloned) {tmpMessage = MessageAccessor.cloneMessage(msg);messageCloned = true;}msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQProducer.getNamespace()));}long costTimeAsync = System.currentTimeMillis() - beginStartTime;if (timeout < costTimeAsync) {throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");}//發送消息并返回結果sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(brokerAddr,mq.getBrokerName(),tmpMessage,requestHeader,timeout - costTimeAsync,communicationMode,sendCallback,topicPublishInfo,this.mQClientFactory,this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed(),context,this);break;case ONEWAY://同步發送 case SYNC:long costTimeSync = System.currentTimeMillis() - beginStartTime;if (timeout < costTimeSync) {throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");}sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(brokerAddr,mq.getBrokerName(),msg,requestHeader,timeout - costTimeSync,communicationMode,context,this);break;default:assert false;break; } //如果注冊了鉤子函數->發送完畢后執行鉤子函數 if (this.hasSendMessageHook()) {context.setSendResult(sendResult);this.executeSendMessageHookAfter(context); }3.5發送批量消息
批量消息發送時序圖:
批量消息發送是將一個主題的多條消息進行打包后發送到消息消費端,以此減少網絡調用,提高網絡傳輸以及消息發送效率。但是,同一批次的消息數量不是越多越好,如果消息內容過長,則打包消息過程中會導致占用線程資源時間過長,從而導致其他線程發送消息響應時間過長,并且單批次消息總長度不能超過DefaultMQProducer#maxMessageSize -> 4M。
DefaultMQProducer#send
public SendResult send(Collection<Message> msgs) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {//調用batch方法將消息進行打包后進行發送return this.defaultMQProducerImpl.send(batch(msgs)); } //繼承Message->其實就是用一個List將多個消息存封裝起來->上述代碼中的batch方法作用就是否則將消息封裝成MessageBatch public class MessageBatch extends Message implements Iterable<Message> {private static final long serialVersionUID = 621335151046335557L;private final List<Message> messages; }DefaultMQProducer#batch
MessageBatch msgBatch; try {//將消息集合封裝到MessageBatch.messagesmsgBatch = MessageBatch.generateFromList(msgs);//遍歷消息for (Message message : msgBatch) {//對消息一一進行檢查Validators.checkMessage(message, this);//對每個消息設置唯一ID和TopicMessageClientIDSetter.setUniqID(message);message.setTopic(withNamespace(message.getTopic()));}//編碼后存入Message.bodymsgBatch.setBody(msgBatch.encode()); } catch (Exception e) {throw new MQClientException("Failed to initiate the MessageBatch", e); } //設置msgBatch的主題Topic msgBatch.setTopic(withNamespace(msgBatch.getTopic())); return msgBatch;將消息封裝成MessageBatch之后的消息發送步驟跟單條消息的發送步驟完全一致,至此消息發送已經完成。
以上。
本文僅作為個人學習使用,水平有限,如有錯誤請指正!
總結
以上是生活随笔為你收集整理的RocketMQ:Producer启动流程与消息发送源码分析的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: RocketMQ:NameServer路
- 下一篇: RocketMQ:消息消费队列与索引文件