RocketMQ:Consumer概述及启动流程与消息拉取源码分析
文章目錄
- Consumer
- 概述
- 消費者核心類
- 消費者啟動流程
- 消息拉取
- PullMessageService實現機制
- ProcessQueue實現機制
- 消息拉取基本流程
- 客戶端發起消息拉取請求
- 消息服務器Broker組裝消息
- 消息拉取客戶端處理消息
- 總結
Consumer
概述
-
消費者組與消費模式
消息消費以組的模式展開,一個消費組內可包含多個消費者,每個消費組可以訂閱多個主題。消費組之間有負載均衡和廣播兩種模式。負載均衡模式,主題下的同一條消息只允許被其中一個消費者消費。廣播模式,主題下的同一條消息,將被所有消費者消費一次。
-
消息傳遞模式
分為推送和拉取兩種模式。
-
從何處開始消費消息,可選參數:
CONSUME_FROM_LAST_OFFSET:上一次消費偏移量
CONSUME_FROM_FIRST_OFFSET:從頭開始
CONSUME_FROM_TIMESTAMP:某個時間開始
消費者核心類
基于消息推送模式
//發送消息-如果消息消費失敗,將會發送回Broker,過一段時間(delayLevel)再進行消費 void sendMessageBack(final MessageExt msg, final int delayLevel, final String brokerName); //根據主題從消費者緩存中獲取消息隊列 Set<MessageQueue> fetchSubscribeMessageQueues(final String topic); //注冊并發消息事件監聽器 void registerMessageListener(MessageListenerConcurrently messageListener); //注冊順序消息事件監聽器 void registerMessageListener(final MessageListenerOrderly messageListener); //基于主題訂閱消息,消息過濾使用表達式 void subscribe(final String topic, final String subExpression); //基于主題訂閱消息,消息過濾使用類模式 void subscribe(final String topic, final String fullClassName,final String filterClassSource); //訂閱消息,并指定隊列選擇器 void subscribe(final String topic, final MessageSelector selector); void unsubscribe(final String topic):取消消息訂閱DefaultMQPushConsumer
//消費者組 private String consumerGroup; //消息消費模式 private MessageModel messageModel = MessageModel.CLUSTERING; //指定消費開始偏移量(最大偏移量、最小偏移量、啟動時間戳)開始消費 private ConsumeFromWhere consumeFromWhere = ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET; //集群模式下的消息隊列負載策略 private AllocateMessageQueueStrategy allocateMessageQueueStrategy; //訂閱信息 private Map<String /* topic */, String /* sub expression */> subscription = new HashMap<String, String>(); //消息業務監聽器 private MessageListener messageListener; //消息消費進度存儲器 private OffsetStore offsetStore; //消費者最小線程數量 private int consumeThreadMin = 20; //消費者最大線程數量 private int consumeThreadMax = 20; //并發消息消費時處理隊列最大跨度 private int consumeConcurrentlyMaxSpan = 2000; //每1000次流控后打印流控日志 private int pullThresholdForQueue = 1000; //推模式下任務間隔時間 private long pullInterval = 0; //推模式下任務拉取的條數,默認32條 private int pullBatchSize = 32; //每次傳入MessageListener#consumerMessage中消息的數量 private int consumeMessageBatchMaxSize = 1; //是否每次拉取消息都訂閱消息 private boolean postSubscriptionWhenPull = false; //消息重試次數,-1代表16次 private int maxReconsumeTimes = -1; //消息消費超時時間 private long consumeTimeout = 15;DefaultMQPushConsumerImpl:消息消費者默認實現類,應用程序中直接調用該類的實例完成消息的消費。
RebalanceImpl:重新負載均衡實現類,實現消息消費端與消息隊列之間的重新分布。
MQClientInstance:消息客戶端實例,負責與MQ服務器Broker和NameServer之間的的網絡交互。
PullAPIWrapper:RocketMQ中,實際上只有Message Pull模式,而Push模式只是將Pull模式進行了封裝。
OffsetStore:消息消費進度存儲器。
消費者啟動流程
DefaultMQPushConsumerImpl#start
switch (this.serviceState) {case CREATE_JUST:log.info("the consumer [{}] start beginning. messageModel={}, isUnitMode={}", this.defaultMQPushConsumer.getConsumerGroup(),this.defaultMQPushConsumer.getMessageModel(), this.defaultMQPushConsumer.isUnitMode());this.serviceState = ServiceState.START_FAILED;//校驗消息合法性this.checkConfig();//構建主題訂閱信息this.copySubscription();/*** 如果消息消費模式為集群模式,并且當前的實例名為 DEFAULT,替換為當前客戶端進程的PID* if (this.instanceName.equals("DEFAULT")) {* this.instanceName = UtilAll.getPid() + "#" + System.nanoTime();* }*/if (this.defaultMQPushConsumer.getMessageModel() == MessageModel.CLUSTERING) {this.defaultMQPushConsumer.changeInstanceNameToPID();}//構建MQClientInstance mQClientFactorythis.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook);//構造重新負載均衡實現類this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup());this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel());this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer.getAllocateMessageQueueStrategy());this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);//拉取消息API封裝this.pullAPIWrapper = new PullAPIWrapper(mQClientFactory,this.defaultMQPushConsumer.getConsumerGroup(), isUnitMode());this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList);//消息消費進度加載if (this.defaultMQPushConsumer.getOffsetStore() != null) {this.offsetStore = this.defaultMQPushConsumer.getOffsetStore();} else {switch (this.defaultMQPushConsumer.getMessageModel()) {case BROADCASTING://廣播模式下 將消息消費進度存儲到本地this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());break;case CLUSTERING://集群模式下 將消息消費的進度存儲到遠端Broker中this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());break;default:break;}this.defaultMQPushConsumer.setOffsetStore(this.offsetStore);}this.offsetStore.load();//創建順序消息消費服務if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {//順序this.consumeOrderly = true;this.consumeMessageService =new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner());}//創建并發消息消費服務else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {//并發this.consumeOrderly = false;this.consumeMessageService =new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner());}//啟動消息消費服務this.consumeMessageService.start();//注冊到消費者實例到客戶端boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this);if (!registerOK) {this.serviceState = ServiceState.CREATE_JUST;this.consumeMessageService.shutdown(defaultMQPushConsumer.getAwaitTerminationMillisWhenShutdown());throw new MQClientException("The consumer group[" + this.defaultMQPushConsumer.getConsumerGroup()+ "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),null);}//--------啟動客戶端---------mQClientFactory.start();log.info("the consumer [{}] start OK.", this.defaultMQPushConsumer.getConsumerGroup());//進入消息消費狀態this.serviceState = ServiceState.RUNNING;break;case RUNNING:case START_FAILED:case SHUTDOWN_ALREADY:throw new MQClientException("The PushConsumer service state not OK, maybe started once, "+ this.serviceState+ FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),null);default:break; }//更新訂閱信息 this.updateTopicSubscribeInfoWhenSubscriptionChanged(); //檢測broker狀態 this.mQClientFactory.checkClientInBroker(); //發送心跳包 this.mQClientFactory.sendHeartbeatToAllBrokerWithLock(); //重新負載 this.mQClientFactory.rebalanceImmediately();消息拉取
核心類:PutMessageService,是一個消息拉取的服務線程。
PullMessageService實現機制
PullMessageService#run
//線程狀態為運行狀態 while (!this.isStopped()) {try {//在拉取消息請求隊列中拉取消息請求PullRequest pullRequest = this.pullRequestQueue.take();//處理拉取消息請求this.pullMessage(pullRequest);} catch (InterruptedException ignored) {} catch (Exception e) {log.error("Pull Message Service Run Method exception", e);} }PullRequest
public class PullRequest {//費者組private String consumerGroup;//待拉取消息隊列private MessageQueue messageQueue;//消息處理隊列private ProcessQueue processQueue;//待拉取的MessageQueue偏移量private long nextOffset;//是否被鎖定private boolean previouslyLocked = false; }PullMessageService#pullMessage
//找到消費者 final MQConsumerInner consumer = this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup()); if (consumer != null) {//強轉為推送模式下的消費者DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl) consumer;//拉取消息impl.pullMessage(pullRequest); } else {log.warn("No matched consumer for the PullRequest {}, drop it", pullRequest); }ProcessQueue實現機制
ProcessQueue是MessageQueue在消費端的快照。PullMessageService從消息服務器默認每次拉取32條消息,按照消息的隊列偏移量順序放在ProcessQueue中,PullMessageService再將消息提交到消費者消費線程池,消息消費成功之后從ProcessQueue中移除——Queue consumption snapshot。
ProcessQueue
//消息容器 private final TreeMap<Long, MessageExt> msgTreeMap = new TreeMap<Long, MessageExt>(); //讀寫鎖 private final ReadWriteLock lockTreeMap = new ReentrantReadWriteLock(); //ProcessQueue總消息數量 private final AtomicLong msgCount = new AtomicLong(); //ProcessQueue隊列最大偏移量 private volatile long queueOffsetMax = 0L; //當前ProcessQueue是否被丟棄 private volatile boolean dropped = false; //上一次拉取時間戳 private volatile long lastPullTimestamp = System.currentTimeMillis(); //上一次消費時間戳 private volatile long lastConsumeTimestamp = System.currentTimeMillis(); //移除消費超時消息 public void cleanExpiredMsg(DefaultMQPushConsumer pushConsumer) //添加消息 public boolean putMessage(final List<MessageExt> msgs) //獲取消息最大間隔 public long getMaxSpan() //移除消息 public long removeMessage(final List<MessageExt> msgs) //將consumingMsgOrderlyTreeMap中消息重新放在msgTreeMap,并清空consumingMsgOrderlyTreeMap public void rollback() //將consumingMsgOrderlyTreeMap消息清除,表示成功處理該批消息 public long commit() //重新處理該批消息 public void makeMessageToCosumeAgain(List<MessageExt> msgs) //從processQueue中取出batchSize條消息 public List<MessageExt> takeMessags(final int batchSize)消息拉取基本流程
客戶端發起消息拉取請求
DefaultMessagePushConsumerImpl#pullMessage
//獲取消息處理隊列 final ProcessQueue processQueue = pullRequest.getProcessQueue(); if (processQueue.isDropped()) {log.info("the pull request[{}] is dropped.", pullRequest.toString());return; }//設置拉取時間戳 pullRequest.getProcessQueue().setLastPullTimestamp(System.currentTimeMillis());try {//判斷服務是否狀態正常this.makeSureStateOK(); } catch (MQClientException e) {log.warn("pullMessage exception, consumer state not ok", e);this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);return; }//被掛起--->等待1s if (this.isPause()) {log.warn("consumer was paused, execute pull request later. instanceName={}, group={}", this.defaultMQPushConsumer.getInstanceName(), this.defaultMQPushConsumer.getConsumerGroup());this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_SUSPEND);return; }//-----------------------------------流量控制----------------------------------- //獲得最大待處理消息數量 long cachedMessageCount = processQueue.getMsgCount().get(); //獲得最大待處理消息大小 long cachedMessageSizeInMiB = processQueue.getMsgSize().get() / (1024 * 1024);//從數量進行流控->消息數量大于1000條 if (cachedMessageCount > this.defaultMQPushConsumer.getPullThresholdForQueue()) {//延遲消息拉取50msthis.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);if ((queueFlowControlTimes++ % 1000) == 0) {log.warn("the cached message count exceeds the threshold {}, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}",this.defaultMQPushConsumer.getPullThresholdForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes);}return; }//從消息大小進行流控->消息大小大于100Mb if (cachedMessageSizeInMiB > this.defaultMQPushConsumer.getPullThresholdSizeForQueue()) {//延遲消息拉取50msthis.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);if ((queueFlowControlTimes++ % 1000) == 0) {log.warn("the cached message size exceeds the threshold {} MiB, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}",this.defaultMQPushConsumer.getPullThresholdSizeForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes);}return; }....//獲得主題訂閱信息 final SubscriptionData subscriptionData = this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic()); //如果主題訂閱信息為空--->延遲3s后繼續拉取 if (null == subscriptionData) {this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);log.warn("find the consumer's subscription failed, {}", pullRequest);return; }final long beginTimestamp = System.currentTimeMillis();//拉取消息回調函數 PullCallback pullCallback = new PullCallback() {//這一部分實現在之后進行講解 };....//獲取消息系統拉取標記 int sysFlag = PullSysFlag.buildSysFlag(commitOffsetEnable, // commitOffsettrue, // suspendsubExpression != null, // subscriptionclassFilter // class filter );try {//與服務端進行交互獲取消息this.pullAPIWrapper.pullKernelImpl(pullRequest.getMessageQueue(),subExpression,subscriptionData.getExpressionType(),subscriptionData.getSubVersion(),pullRequest.getNextOffset(),this.defaultMQPushConsumer.getPullBatchSize(),sysFlag,commitOffsetValue,BROKER_SUSPEND_MAX_TIME_MILLIS,CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND,CommunicationMode.ASYNC,//-----------↓↓↓注意這個回調函數-------------pullCallback); } catch (Exception e) {log.error("pullKernelImpl exception", e);this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException); }接下來重點關注pullAPIWrapper.pullKernelImpl()的核心邏輯:
public PullResult pullKernelImpl(final MessageQueue mq, //消息消費隊列final String subExpression, //消息訂閱子模式subscribe( topicName, "模式")final String expressionType, final long subVersion, //版本final long offset, //pullRequest.getNextOffset()final int maxNums, //defaultMQPushConsumer.getPullBatchSize()->32條final int sysFlag, //系統標記final long commitOffset, //當前消息隊列最新偏移量final long brokerSuspendMaxTimeMillis,//運行Broker掛起最長時間->15sfinal long timeoutMillis, //超時時間->30sfinal CommunicationMode communicationMode,//Sync/Async/Onewayfinal PullCallback pullCallback //消息拉取后的回調函數 ) throws MQClientException, RemotingException, MQBrokerException, InterruptedExceptionPullAPIWrapper#pullKernelImpl
//獲取Broker信息 FindBrokerResult findBrokerResult =this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(),this.recalculatePullFromWhichNode(mq), false); //如果Broker信息為空 if (null == findBrokerResult) {//從Nameserver更新主題路由表信息this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());//重新獲取Broker信息findBrokerResult =this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(),this.recalculatePullFromWhichNode(mq), false); }if (findBrokerResult != null) {{//檢查版本if (!ExpressionType.isTagType(expressionType)&& findBrokerResult.getBrokerVersion() < MQVersion.Version.V4_1_0_SNAPSHOT.ordinal()) {throw new MQClientException("The broker[" + mq.getBrokerName() + ", "+ findBrokerResult.getBrokerVersion() + "] does not upgrade to support for filter message by " + expressionType, null);}}int sysFlagInner = sysFlag;if (findBrokerResult.isSlave()) {sysFlagInner = PullSysFlag.clearCommitOffsetFlag(sysFlagInner);}//封裝拉取消息請求PullMessageRequestHeader requestHeader = new PullMessageRequestHeader();requestHeader.setConsumerGroup(this.consumerGroup);requestHeader.setTopic(mq.getTopic());requestHeader.setQueueId(mq.getQueueId());requestHeader.setQueueOffset(offset);requestHeader.setMaxMsgNums(maxNums);requestHeader.setSysFlag(sysFlagInner);requestHeader.setCommitOffset(commitOffset);requestHeader.setSuspendTimeoutMillis(brokerSuspendMaxTimeMillis);requestHeader.setSubscription(subExpression);requestHeader.setSubVersion(subVersion);requestHeader.setExpressionType(expressionType);String brokerAddr = findBrokerResult.getBrokerAddr();if (PullSysFlag.hasClassFilterFlag(sysFlagInner)) {brokerAddr = computePullFromWhichFilterServer(mq.getTopic(), brokerAddr);}//根據brokerAddr、requestHeader等信息利用遠程網絡調用實例在Broker中對消息進行拉取//最后返回一個消息拉取結果PullResult pullResult = this.mQClientFactory.getMQClientAPIImpl().pullMessage(brokerAddr,requestHeader,timeoutMillis,communicationMode,pullCallback);return pullResult; }根據拉取消息的模式是異步或同步來進行不同操作。
MQClientAPIImpl#pullMessage
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE, requestHeader);switch (communicationMode) {case ONEWAY:assert false;return null;case ASYNC://異步拉取->發送拉取消息請求并立即返回結果this.pullMessageAsync(addr, request, timeoutMillis, pullCallback);return null;case SYNC://同步拉取->發送消息拉取請求并等待結果返回return this.pullMessageSync(addr, request, timeoutMillis);default:assert false;break; } return null;消息服務器Broker組裝消息
消息服務器接收到消費者的消息拉取請求之后進行消息組裝的時序圖:
消息請求處理器:PullMessageProcessor
public class PullMessageProcessor extends AsyncNettyRequestProcessor implements NettyRequestProcessor {private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);//Broker控制器private final BrokerController brokerController; //構造函數存儲容器private List<ConsumeMessageHook> consumeMessageHookList; }PullMessageProcessor#processRequest
//構建消息過濾器 MessageFilter messageFilter; if (this.brokerController.getBrokerConfig().isFilterSupportRetry()) {messageFilter = new ExpressionForRetryMessageFilter(subscriptionData, consumerFilterData,this.brokerController.getConsumerFilterManager()); } else {messageFilter = new ExpressionMessageFilter(subscriptionData, consumerFilterData,this.brokerController.getConsumerFilterManager()); }//調用MessageStore.getMessage查找消息 final GetMessageResult getMessageResult =this.brokerController.getMessageStore().getMessage(requestHeader.getConsumerGroup(),//消費組名稱requestHeader.getTopic(),//主題名稱requestHeader.getQueueId(),//隊列IDrequestHeader.getQueueOffset(),//待拉取偏移量requestHeader.getMaxMsgNums(), //最大拉取消息條數messageFilter //消息過濾器);DefaultMessageStore#getMessage
GetMessageStatus status = GetMessageStatus.NO_MESSAGE_IN_QUEUE; //查找下一次隊列偏移量 long nextBeginOffset = offset; //當前消息隊列最小偏移量 long minOffset = 0; //當前消息隊列最大偏移量 long maxOffset = 0;//懶加載-當找到消息再進行賦值 GetMessageResult getResult = null;//當前commitLog最大偏移量 final long maxOffsetPy = this.commitLog.getMaxOffset();//根據主題名稱和隊列編號獲取消息消費隊列 ConsumeQueue consumeQueue = findConsumeQueue(topic, queueId); if (consumeQueue != null) {minOffset = consumeQueue.getMinOffsetInQueue();maxOffset = consumeQueue.getMaxOffsetInQueue();//消息偏移量異常情況校對下一次拉取偏移量if (maxOffset == 0) {//表示當前消息隊列中沒有消息status = GetMessageStatus.NO_MESSAGE_IN_QUEUE;nextBeginOffset = nextOffsetCorrection(offset, 0);} else if (offset < minOffset) {//待拉取消息的偏移量小于隊列的實際偏移量status = GetMessageStatus.OFFSET_TOO_SMALL;nextBeginOffset = nextOffsetCorrection(offset, minOffset);} else if (offset == maxOffset) {//待拉取偏移量為隊列最大偏移量status = GetMessageStatus.OFFSET_OVERFLOW_ONE;nextBeginOffset = nextOffsetCorrection(offset, offset);} else if (offset > maxOffset) {//偏移量越界status = GetMessageStatus.OFFSET_OVERFLOW_BADLY;if (0 == minOffset) {nextBeginOffset = nextOffsetCorrection(offset, minOffset);} else {nextBeginOffset = nextOffsetCorrection(offset, maxOffset);}} else {//偏移量處于正常情況//-------------根據偏移量從CommitLog中拉取32條消息------------SelectMappedBufferResult selectResult = this.commitLog.getMessage(offsetPy, sizePy);}PullMessageProcessor#processRequest
//根據拉取結果填充responseHeader response.setRemark(getMessageResult.getStatus().name()); responseHeader.setNextBeginOffset(getMessageResult.getNextBeginOffset()); responseHeader.setMinOffset(getMessageResult.getMinOffset()); responseHeader.setMaxOffset(getMessageResult.getMaxOffset());if (getMessageResult.isSuggestPullingFromSlave()) {//如果當前拉取消息是從Slave節點拉取并且拉取速度較慢responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getWhichBrokerWhenConsumeSlowly()); } else {//設置下一次拉取任務的ID為主節點responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID); }switch (this.brokerController.getMessageStoreConfig().getBrokerRole()) {case ASYNC_MASTER:case SYNC_MASTER:break;case SLAVE:if (!this.brokerController.getBrokerConfig().isSlaveReadEnable()) {response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY);responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID);}break; } //GetMessageResult與Response的Code轉換 switch (getMessageResult.getStatus()) {//成功case FOUND:response.setCode(ResponseCode.SUCCESS);break;//消息不存在case MESSAGE_WAS_REMOVING://消息重試response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY);break;//沒有找到匹配的主題隊列case NO_MATCHED_LOGIC_QUEUE://消息隊列中未包含消息case NO_MESSAGE_IN_QUEUE:if (0 != requestHeader.getQueueOffset()) {response.setCode(ResponseCode.PULL_OFFSET_MOVED);// XXX: warn and notify melog.info("the broker store no queue data, fix the request offset {} to {}, Topic: {} QueueId: {} Consumer Group: {}",requestHeader.getQueueOffset(),getMessageResult.getNextBeginOffset(),requestHeader.getTopic(),requestHeader.getQueueId(),requestHeader.getConsumerGroup());} else {response.setCode(ResponseCode.PULL_NOT_FOUND);}break;//未找到匹配的消息case NO_MATCHED_MESSAGE:response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY);break;//消息物理偏移量為空case OFFSET_FOUND_NULL:response.setCode(ResponseCode.PULL_NOT_FOUND);break;//offset越界case OFFSET_OVERFLOW_BADLY:response.setCode(ResponseCode.PULL_OFFSET_MOVED);// XXX: warn and notify melog.info("the request offset: {} over flow badly, broker max offset: {}, consumer: {}",requestHeader.getQueueOffset(), getMessageResult.getMaxOffset(), channel.remoteAddress());break;//offset過大case OFFSET_OVERFLOW_ONE:response.setCode(ResponseCode.PULL_NOT_FOUND);break;//offset過小case OFFSET_TOO_SMALL:response.setCode(ResponseCode.PULL_OFFSET_MOVED);log.info("the request offset too small. group={}, topic={}, requestOffset={}, brokerMinOffset={}, clientIp={}",requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueOffset(),getMessageResult.getMinOffset(), channel.remoteAddress());break;default:assert false;break; } .... //如果CommitLog標記可用,并且當前Broker為主節點,則更新消息消費進度 boolean storeOffsetEnable = brokerAllowSuspend; storeOffsetEnable = storeOffsetEnable && hasCommitOffsetFlag; storeOffsetEnable = storeOffsetEnable && this.brokerController.getMessageStoreConfig().getBrokerRole() != BrokerRole.SLAVE; if (storeOffsetEnable) {this.brokerController.getConsumerOffsetManager().commitOffset(RemotingHelper.parseChannelRemoteAddr(channel),requestHeader.getConsumerGroup(),requestHeader.getTopic(), requestHeader.getQueueId(),requestHeader.getCommitOffset()); } return response;消息拉取客戶端處理消息
MQClientAPIImpl#processPullResponse
PullStatus pullStatus = PullStatus.NO_NEW_MSG; //判斷響應結果 switch (response.getCode()) {case ResponseCode.SUCCESS:pullStatus = PullStatus.FOUND;break;case ResponseCode.PULL_NOT_FOUND:pullStatus = PullStatus.NO_NEW_MSG;break;case ResponseCode.PULL_RETRY_IMMEDIATELY:pullStatus = PullStatus.NO_MATCHED_MSG;break;case ResponseCode.PULL_OFFSET_MOVED:pullStatus = PullStatus.OFFSET_ILLEGAL;break;default:throw new MQBrokerException(response.getCode(), response.getRemark(), addr); }//解碼響應頭 PullMessageResponseHeader responseHeader =(PullMessageResponseHeader) response.decodeCommandCustomHeader(PullMessageResponseHeader.class);//封裝PullResultExt返回 return new PullResultExt(pullStatus, responseHeader.getNextBeginOffset(), responseHeader.getMinOffset(),responseHeader.getMaxOffset(), null, responseHeader.getSuggestWhichBrokerId(), response.getBody());PullResult
/*** 拉取結果狀態:FOUND/NO_NEW_MSG/NO_MATCHED_MSG/OFFSET_ILLEGAL*/ private final PullStatus pullStatus; /*** 下次拉取偏移量*/ private final long nextBeginOffset; /*** 消息隊列最小偏移量*/ private final long minOffset; /*** 消息隊列最大偏移量*/ private final long maxOffset; /*** 拉取的消息隊列*/ private List<MessageExt> msgFoundList;DefaultMQPushConsumerImpl#pullMessage
PullCallback pullCallback = new PullCallback() {@Overridepublic void onSuccess(PullResult pullResult) {switch (pullResult.getPullStatus()) {case FOUND:long prevRequestOffset = pullRequest.getNextOffset();pullRequest.setNextOffset(pullResult.getNextBeginOffset());long pullRT = System.currentTimeMillis() - beginTimestamp;DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullRT(pullRequest.getConsumerGroup(),pullRequest.getMessageQueue().getTopic(), pullRT);long firstMsgOffset = Long.MAX_VALUE;if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty()) {DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);} else {firstMsgOffset = pullResult.getMsgFoundList().get(0).getQueueOffset();DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullTPS(pullRequest.getConsumerGroup(),pullRequest.getMessageQueue().getTopic(), pullResult.getMsgFoundList().size());//消息拉取成功-將消息放入processQueueboolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList());//提交消息消費請求-對消息進行處理DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(pullResult.getMsgFoundList(),processQueue,pullRequest.getMessageQueue(),dispatchToConsume);//如果pullInterval大于0,則等待pullInterval毫秒后將pullRequest對象放入到PullMessageService中的pullRequestQueue隊列中if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0) {DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest,DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval());} else {DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);}}....總結
本文僅作為個人學習使用,如有不足或錯誤請指正!
《新程序員》:云原生和全面數字化實踐50位技術專家共同創作,文字、視頻、音頻交互閱讀總結
以上是生活随笔為你收集整理的RocketMQ:Consumer概述及启动流程与消息拉取源码分析的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: RocketMQ:消息消费队列与索引文件
- 下一篇: RocketMQ:消费端的消息消息队列负