RocketMQ之Pull消费者客户端启动
Pull消費者客戶端(主動拉取消息的消費者)即構(gòu)造了DefaultMQPullConsumer對象,DefaultMQPullConsumer繼承了ClientConfig類。我們先看其構(gòu)造方法
[java] view plain copy public DefaultMQPullConsumer(final String consumerGroup, RPCHook rpcHook) { this.consumerGroup = consumerGroup; defaultMQPullConsumerImpl = new DefaultMQPullConsumerImpl(this, rpcHook); }
這里只是簡單設(shè)置了consumerGroup消費者組名,表示消費者屬于哪個組。構(gòu)造了DefaultMQPullConsumerImpl的實例,DefaultMQPullConsumerImpl的構(gòu)造方法很簡單,只是綁定了DefaultMQPullConsumer、配置了傳入的rpcHook。
DefaultMQPullConsumer內(nèi)部封裝了DefaultMQPullConsumerImpl,其中還維護(hù)這一些配置信息。這里維護(hù)著消費者訂閱的topic集合。
[java] view plain copy private Set<String> registerTopics = new HashSet<String>();整個消費者客戶端的啟動,調(diào)用了DefaultMQPullConsumer的start()方法,內(nèi)部直接調(diào)用DefaultMQPullConsumerImpl的start()方法,這個start方法加了synchronized修飾。
[java] view plain copypublic synchronized void start() throws MQClientException { switch (this.serviceState) { case CREATE_JUST: this.serviceState = ServiceState.START_FAILED; this.checkConfig(); this.copySubscription(); if (this.defaultMQPullConsumer.getMessageModel() == MessageModel.CLUSTERING) { this.defaultMQPullConsumer.changeInstanceNameToPID(); } this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQPullConsumer , this.rpcHook); this.rebalanceImpl.setConsumerGroup(this.defaultMQPullConsumer.getConsumerGroup()); this.rebalanceImpl.setMessageModel(this.defaultMQPullConsumer.getMessageModel()); this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPullConsumer.getAllocateMessageQueueStrategy()); this.rebalanceImpl.setmQClientFactory(this.mQClientFactory); this.pullAPIWrapper = new PullAPIWrapper( mQClientFactory, this.defaultMQPullConsumer.getConsumerGroup(), isUnitMode()); this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList); if (this.defaultMQPullConsumer.getOffsetStore() != null) { this.offsetStore = this.defaultMQPullConsumer.getOffsetStore(); } else { switch (this.defaultMQPullConsumer.getMessageModel()) { case BROADCASTING: this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPullConsumer .getConsumerGroup()); break; case CLUSTERING: this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPullConsumer .getConsumerGroup()); break; default: break; } this.defaultMQPullConsumer.setOffsetStore(this.offsetStore); } this.offsetStore.load(); boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPullConsumer.getConsumerGroup(), this); if (!registerOK) { this.serviceState = ServiceState.CREATE_JUST; throw new MQClientException("The consumer group[" + this.defaultMQPullConsumer.getConsumerGroup() + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl .GROUP_NAME_DUPLICATE_URL), null); } mQClientFactory.start(); log.info("the consumer [{}] start OK", this.defaultMQPullConsumer.getConsumerGroup()); this.serviceState = ServiceState.RUNNING; break; case RUNNING: case START_FAILED: case SHUTDOWN_ALREADY: throw new MQClientException("The PullConsumer service state not OK, maybe started once, " + this.serviceState + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK), null); default: break; } }一開始的serverState的狀態(tài)自然為CREAT_JUST,調(diào)用checkConfig(),其中先是對ConsumerGroup進(jìn)行驗證,非空,合法(符合正則規(guī)則,且長度不超過配置最大值),且不為默認(rèn)值(防止消費者集群名沖突),然后對消費者消息模式、消息隊列分配算法進(jìn)行非空、合法校驗。
關(guān)于消費者消息模式有BroadCasting(廣播)跟Clustering(集群)兩種、默認(rèn)是Clustering(集群)配置在DefaultMQPullConsumer中。關(guān)于消費者的消息分配算法,在DefaultMQPullConsumer中實現(xiàn)有默認(rèn)的消息分配算法,allocateMessageQueueStrategy = new AllocateMessageQueueAveragely();(平均分配算法)。其實現(xiàn)了AllocateMessageQueueStrategy接口,重點看其實現(xiàn)的allocate()方法。
[java] view plain copy @Override public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll, List<String> cidAll) { if (currentCID == null || currentCID.length() < 1) { throw new IllegalArgumentException("currentCID is empty"); } if (mqAll == null || mqAll.isEmpty()) { throw new IllegalArgumentException("mqAll is null or mqAll empty"); } if (cidAll == null || cidAll.isEmpty()) { throw new IllegalArgumentException("cidAll is null or cidAll empty"); } List<MessageQueue> result = new ArrayList<MessageQueue>(); if (!cidAll.contains(currentCID)) { log.info("[BUG] ConsumerGroup: {} The consumerId: {} not in cidAll: {}", consumerGroup, currentCID, cidAll); return result; } int index = cidAll.indexOf(currentCID); int mod = mqAll.size() % cidAll.size(); int averageSize = mqAll.size() <= cidAll.size() ? 1 : (mod > 0 && index < mod ? mqAll.size() / cidAll.size() + 1 : mqAll.size() / cidAll.size()); int startIndex = (mod > 0 && index < mod) ? index * averageSize : index * averageSize + mod; int range = Math.min(averageSize, mqAll.size() - startIndex); for (int i = 0; i < range; i++) { result.add(mqAll.get((startIndex + i) % mqAll.size())); } return result; }
傳入的參數(shù)有當(dāng)前消費者id,所有消息隊列數(shù)組,以及當(dāng)前所有消費者數(shù)組。先簡單驗證非空,再通過消費者數(shù)組大小跟消息隊列大小根據(jù)平均算法算出當(dāng)前消費者該分配哪些消息隊列集合。邏輯不難。RocketMQ還提供了循環(huán)平均、一致性哈希、配置分配等算法,這里默認(rèn)采用平均分配。
我們再回到DefaultMQPullConsumerImpl的start()方法,checkConfig后,調(diào)用copySubscription()方法,將配置在DefaultMQPullConsumer中的topic信息構(gòu)造成并構(gòu)造成subscriptionData數(shù)據(jù)結(jié)構(gòu),以topic為key以subscriptionData為value以鍵值對形式存到rebalanceImpl的subscriptionInner中。
[java] view plain copy private void copySubscription() throws MQClientException { try { Set<String> registerTopics = this.defaultMQPullConsumer.getRegisterTopics(); if (registerTopics != null) { for (final String topic : registerTopics) { SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPullConsumer.getConsumerGroup(), topic, SubscriptionData.SUB_ALL); this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData); } } } catch (Exception e) { throw new MQClientException("subscription exception", e); } }
接下來從MQCLientManager中得到MQClient的實例,這個步驟跟生產(chǎn)者客戶端相同。
再往后是對rebalanceImpl的配置,我們重點看下rebalanceImpl,它是在DefaultMQPullConsumerImpl成員中直接構(gòu)造private RebalanceImpl rebalanceImpl = new RebalancePullImpl(this);即在DefaultMQPullConsumerImpl初始化的時候構(gòu)造。接下來對其消費者組名、消息模式(默認(rèn)集群)、隊列分配算法(默認(rèn)平均分配)、消費者客戶端實例進(jìn)行配置,配置信息都是從DefaultMQPullConsumer中取得。
[java] view plain copy public abstract class RebalanceImpl { protected static final Logger log = ClientLogger.getLog(); protected final ConcurrentMap<MessageQueue, ProcessQueue> processQueueTable = new ConcurrentHashMap<MessageQueue, ProcessQueue>(64); protected final ConcurrentMap<String/* topic */, Set<MessageQueue>> topicSubscribeInfoTable = new ConcurrentHashMap<String, Set<MessageQueue>>(); protected final ConcurrentMap<String /* topic */, SubscriptionData> subscriptionInner = new ConcurrentHashMap<String, SubscriptionData>(); protected String consumerGroup; protected MessageModel messageModel; protected AllocateMessageQueueStrategy allocateMessageQueueStrategy; protected MQClientInstance mQClientFactory;接下來構(gòu)造了PullAPIWrapper,僅僅調(diào)用其構(gòu)造方法,簡單的配置下
[java] view plain copy public PullAPIWrapper(MQClientInstance mQClientFactory, String consumerGroup, boolean unitMode) { this.mQClientFactory = mQClientFactory; this.consumerGroup = consumerGroup; this.unitMode = unitMode; }
然后初始化消費者的offsetStore,offset即偏移量,可以理解為消費進(jìn)度,這里根據(jù)不同的消息模式來選擇不同的策略。如果是廣播模式,那么所有消費者都應(yīng)該收到訂閱的消息,那么每個消費者只應(yīng)該自己消費的消費隊列的進(jìn)度,那么需要把消費進(jìn)度即offsetStore存于本地采用LocalFileOffsetStroe,相反的如果是集群模式,那么集群中的消費者來平均消費消息隊列,那么應(yīng)該把消費進(jìn)度存于遠(yuǎn)程采用RemoteBrokerOffsetStore。然后調(diào)用相應(yīng)的load方法加載。
之后將當(dāng)前消費者注冊在MQ客戶端實例上之后,調(diào)用MQClientInstance的start()方法,啟動消費者客戶端。
[java] view plain copypublic void start() throws MQClientException { synchronized (this) { switch (this.serviceState) { case CREATE_JUST: this.serviceState = ServiceState.START_FAILED; // If not specified,looking address from name server if (null == this.clientConfig.getNamesrvAddr()) { this.mQClientAPIImpl.fetchNameServerAddr(); } // Start request-response channel this.mQClientAPIImpl.start(); // Start various schedule tasks this.startScheduledTask(); // Start pull service this.pullMessageService.start(); // Start rebalance service this.rebalanceService.start(); // Start push service this.defaultMQProducer.getDefaultMQProducerImpl().start(false); log.info("the client factory [{}] start OK", this.clientId); this.serviceState = ServiceState.RUNNING; break; case RUNNING: break; case SHUTDOWN_ALREADY: break; case START_FAILED: throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed." , null); default: break; } } }
看到這里應(yīng)該很熟悉,跟生產(chǎn)者客戶端這里是同一段代碼,無非解析路由消息并完成路由消息的配置,啟動netty客戶端,啟動定時任務(wù)(定時更新從名稱服務(wù)器獲取路由信息更新本地路由信息,心跳,調(diào)整線程數(shù)量),后面啟動pull server、rebalance service、push service最后把serviceState狀態(tài)設(shè)為Running表示客戶端啟動。
我們在這里重點看下RebalanceService的啟動。下面貼出的是RebalanceService的run()方法。
[java] view plain copy @Override public void run() { log.info(this.getServiceName() + " service started"); while (!this.isStopped()) { this.waitForRunning(waitInterval); this.mqClientFactory.doRebalance(); } log.info(this.getServiceName() + " service end"); }可以看到,只要這個線程沒有被停止(客戶端沒關(guān)閉),會一直循環(huán)調(diào)用客戶端的doRebalance()方法。
[java] view plain copy public void doRebalance() { for (Map.Entry<String, MQConsumerInner> entry : this.consumerTable.entrySet()) { MQConsumerInner impl = entry.getValue(); if (impl != null) { try { impl.doRebalance(); } catch (Throwable e) { log.error("doRebalance exception", e); } } } }MQClientInstance遍歷consumerTable(之前注冊的時候以consumerGroup為key,以消費者客戶端DefaultMQPullConsumerImpl為value存入consumerTable中)中的每個元素,循環(huán)調(diào)用其元素的doRebalance()方法。那我們看DefaultMQPullConsumerImpl的doRebalance方法。
1 [java] view plain copy 2 @Override 3 public void doRebalance() { 4 if (this.rebalanceImpl != null) { 5 this.rebalanceImpl.doRebalance(false); 6 } 7 }直接調(diào)用了rebalanceImpl的doRebalance方法
[java] view plain copy public void doRebalance(final boolean isOrder) { Map<String, SubscriptionData> subTable = this.getSubscriptionInner(); if (subTable != null) { for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) { final String topic = entry.getKey(); try { this.rebalanceByTopic(topic, isOrder); } catch (Throwable e) { if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { log.warn("rebalanceByTopic Exception", e); } } } } this.truncateMessageQueueNotMyTopic(); }可以看到先得到subTable即subscriptionInner,之前根據(jù)配置的每個topic生成的SubscriptionData數(shù)據(jù)結(jié)構(gòu)的map。先遍歷該map,得到每個topic,針對每個topic調(diào)用rebalanceByTopic()
1 [java] view plain copy 2 private void rebalanceByTopic(final String topic, final boolean isOrder) { 3 switch (messageModel) { 4 case BROADCASTING: { 5 Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic); 6 if (mqSet != null) { 7 boolean changed = this.updateProcessQueueTableInRebalance(topic, mqSet, isOrder); 8 if (changed) { 9 this.messageQueueChanged(topic, mqSet, mqSet); 10 log.info("messageQueueChanged {} {} {} {}", 11 consumerGroup, 12 topic, 13 mqSet, 14 mqSet); 15 } 16 } else { 17 log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic); 18 } 19 break; 20 } 21 case CLUSTERING: { 22 Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic); 23 List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup); 24 if (null == mqSet) { 25 if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { 26 log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic); 27 } 28 } 29 30 if (null == cidAll) { 31 log.warn("doRebalance, {} {}, get consumer id list failed", consumerGroup, topic); 32 } 33 34 if (mqSet != null && cidAll != null) { 35 List<MessageQueue> mqAll = new ArrayList<MessageQueue>(); 36 mqAll.addAll(mqSet); 37 38 Collections.sort(mqAll); 39 Collections.sort(cidAll); 40 41 AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy; 42 43 List<MessageQueue> allocateResult = null; 44 try { 45 allocateResult = strategy.allocate( 46 this.consumerGroup, 47 this.mQClientFactory.getClientId(), 48 mqAll, 49 cidAll); 50 } catch (Throwable e) { 51 log.error("AllocateMessageQueueStrategy.allocate Exception. allocateMessageQueueStrategyName={}", 52 strategy.getName(), e); 53 return; 54 } 55 56 Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>(); 57 if (allocateResult != null) { 58 allocateResultSet.addAll(allocateResult); 59 } 60 61 boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder); 62 if (changed) { 63 log.info( 64 "rebalanced result changed. allocateMessageQueueStrategyName={}, group={}, topic={}, clientId={}, mqAllSize={} 65 , cidAllSize={}, rebalanceResultSize={}, rebalanceResultSet={}", 66 strategy.getName(), consumerGroup, topic, this.mQClientFactory.getClientId(), mqSet.size(), cidAll.size(), 67 allocateResultSet.size(), allocateResultSet); 68 this.messageQueueChanged(topic, mqSet, allocateResultSet); 69 } 70 } 71 break; 72 } 73 default: 74 break; 75 } 76 }我們先重點關(guān)注集群模式下,先得到topic的本地路由信息,再通過topic跟這個消費者的組名,調(diào)用netty客戶端的同步網(wǎng)絡(luò)訪問topic指定的broker,從broker端得到與其連接的且是指定消費組名下訂閱指定topic的消費者id的集合。然后采用默認(rèn)的分配算法的allocate()進(jìn)行隊列給消費者平均分配。然后調(diào)用updateProcessQueueTableInRebalance()方法判斷是否重新隊列分配。
1 [java] view plain copy 2 private boolean updateProcessQueueTableInRebalance(final String topic, final Set<MessageQueue> mqSet, 3 final boolean isOrder) { 4 boolean changed = false; 5 6 Iterator<Entry<MessageQueue, ProcessQueue>> it = this.processQueueTable.entrySet().iterator(); 7 while (it.hasNext()) { 8 Entry<MessageQueue, ProcessQueue> next = it.next(); 9 MessageQueue mq = next.getKey(); 10 ProcessQueue pq = next.getValue(); 11 12 if (mq.getTopic().equals(topic)) { 13 if (!mqSet.contains(mq)) { 14 pq.setDropped(true); 15 if (this.removeUnnecessaryMessageQueue(mq, pq)) { 16 it.remove(); 17 changed = true; 18 log.info("doRebalance, {}, remove unnecessary mq, {}", consumerGroup, mq); 19 } 20 } else if (pq.isPullExpired()) { 21 switch (this.consumeType()) { 22 case CONSUME_ACTIVELY: 23 break; 24 case CONSUME_PASSIVELY: 25 pq.setDropped(true); 26 if (this.removeUnnecessaryMessageQueue(mq, pq)) { 27 it.remove(); 28 changed = true; 29 log.error("[BUG]doRebalance, {}, remove unnecessary mq, {}, because pull is pause, so try to fixed it", 30 consumerGroup, mq); 31 } 32 break; 33 default: 34 break; 35 } 36 } 37 } 38 } 39 40 List<PullRequest> pullRequestList = new ArrayList<PullRequest>(); 41 for (MessageQueue mq : mqSet) { 42 if (!this.processQueueTable.containsKey(mq)) { 43 if (isOrder && !this.lock(mq)) { 44 log.warn("doRebalance, {}, add a new mq failed, {}, because lock failed", consumerGroup, mq); 45 continue; 46 } 47 48 this.removeDirtyOffset(mq); 49 ProcessQueue pq = new ProcessQueue(); 50 long nextOffset = this.computePullFromWhere(mq); 51 if (nextOffset >= 0) { 52 ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq); 53 if (pre != null) { 54 log.info("doRebalance, {}, mq already exists, {}", consumerGroup, mq); 55 } else { 56 log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq); 57 PullRequest pullRequest = new PullRequest(); 58 pullRequest.setConsumerGroup(consumerGroup); 59 pullRequest.setNextOffset(nextOffset); 60 pullRequest.setMessageQueue(mq); 61 pullRequest.setProcessQueue(pq); 62 pullRequestList.add(pullRequest); 63 changed = true; 64 } 65 } else { 66 log.warn("doRebalance, {}, add new mq failed, {}", consumerGroup, mq); 67 } 68 } 69 } 70 71 this.dispatchPullRequest(pullRequestList); 72 73 return changed; 74 }先遍歷processQueueTable,看其topic下的該處理消息隊列是否還是應(yīng)該處理,由于新分配之后,消息隊列可能會改變,所以原該處理的消息隊列可能沒必要處理,因此沒必要處理的消息隊列移除。當(dāng)然也有可能多出需要處理的消息隊列,于是需要建立其與processQueue的對應(yīng)關(guān)系,先調(diào)用computerPullFromWhere得到該條消息下次拉取數(shù)據(jù)的位置,在RebalancePullImpl中實現(xiàn)了該方法直接返回0,把該處理的mq封裝成pq后,更新到processQueueTable中。若有更新,無論是增加還是刪除,則changed都設(shè)為true。(這個地方講的有點模糊,他是客戶端pull與push區(qū)別的關(guān)鍵,實際上push不過是在pull之上封裝了下操作,后面我們會重新回來分析。)
方法返回后,如果changed為true,會調(diào)用messageQueueChanged方法來通知配置在DefaultMQPullConsumer中的相關(guān)messageQueueListener,我們可以看到RebalancePullImpl中的實現(xiàn)。
1 [java] view plain copy 2 @Override 3 public void messageQueueChanged(String topic, Set<MessageQueue> mqAll, Set<MessageQueue> mqDivided) { 4 MessageQueueListener messageQueueListener = this.defaultMQPullConsumerImpl.getDefaultMQPullConsumer().getMessageQueueListener(); 5 if (messageQueueListener != null) { 6 try { 7 messageQueueListener.messageQueueChanged(topic, mqAll, mqDivided); 8 } catch (Throwable e) { 9 log.error("messageQueueChanged exception", e); 10 } 11 } 12 }廣播模式則比較簡單,由于所有消費者都要處理,少了隊列分配這個步驟。
本文轉(zhuǎn)載自:https://blog.csdn.net/panxj856856/article/details/80725630
轉(zhuǎn)載于:https://www.cnblogs.com/xiyunjava/p/9202526.html
總結(jié)
以上是生活随笔為你收集整理的RocketMQ之Pull消费者客户端启动的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Django学习笔记之模板渲染、模板语言
- 下一篇: Django自带的加密算法及加密模块