RocketMQ:消息ACK机制源码解析
消息消費進度
概述
消費者消費消息過程中,為了避免消息的重復消費,應將消息消費進度保存起來,當其他消費者再對消息進行消費時,讀取已消費的消息偏移量,對之后的消息進行消費即可。
消息模式分為兩種:
- 集群模式:一條消息只能被一個消費者消費
- 廣播模式:一條消息被所有消費者都消費一次
廣播模式下,消息被所有消費者消費,因此消息消費的進度可以跟消費端保存在一起,即本地保存。
集群模式下,消息只能被集群內的一個消費者消費,進度不能保存在消費端,否則會導致消息重復消費,因此集群模式下消息進度集中保存在Broker中。
消息進度存儲接口
OffsetStore
public interface OffsetStore {//加載消息消費進度void load() throws MQClientException;//更新消費進度并保存在內存中void updateOffset(final MessageQueue mq, final long offset, final boolean increaseOnly);//從本地存儲中獲取消息消費進度long readOffset(final MessageQueue mq, final ReadOffsetType type);//保存所有消息消費進度-本地/遠程void persistAll(final Set<MessageQueue> mqs);void persist(final MessageQueue mq);//移除偏移量void removeOffset(MessageQueue mq);//根據Topic克隆一份消息隊列消費進度緩存表Map<MessageQueue, Long> cloneOffsetTable(String topic);//更新消費進度到Brokervoid updateConsumeOffsetToBroker(MessageQueue mq, long offset, boolean isOneway) throws RemotingException,MQBrokerException, InterruptedException, MQClientException; }DefaultMQPushConsumerImpl#start
switch (this.defaultMQPushConsumer.getMessageModel()) {case BROADCASTING://廣播模式下 將消息消費進度存儲到本地this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());break;case CLUSTERING://集群模式下 將消息消費的進度存儲到遠端Broker中this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());break;default:break; } this.defaultMQPushConsumer.setOffsetStore(this.offsetStore);如上所示,根據消息消費模式的不同,會創建不同的OffsetStore對象。
廣播模式消費進度存儲(LocalFileOffsetStore)
public class LocalFileOffsetStore implements OffsetStore {//存儲目錄//消費者啟動時-可以通過"-D rocketmq.client.localOffsetStoreDir=路徑"來指定public final static String LOCAL_OFFSET_STORE_DIR = System.getProperty("rocketmq.client.localOffsetStoreDir",System.getProperty("user.home") + File.separator + ".rocketmq_offsets");private final static InternalLogger log = ClientLogger.getLog();//MQ客戶端private final MQClientInstance mQClientFactory;//消費組名private final String groupName;//存儲路徑private final String storePath;//以MessageQueue為鍵-消費偏移量為值的緩存表private ConcurrentMap<MessageQueue, AtomicLong> offsetTable =new ConcurrentHashMap<MessageQueue, AtomicLong>(); }構造函數
public LocalFileOffsetStore(MQClientInstance mQClientFactory, String groupName) {this.mQClientFactory = mQClientFactory;this.groupName = groupName;//json格式存儲this.storePath = LOCAL_OFFSET_STORE_DIR + File.separator +this.mQClientFactory.getClientId() + File.separator +this.groupName + File.separator +"offsets.json"; }LocalFileOffsetStore#load
public void load() throws MQClientException {//從本地磁盤中進行讀取json文件-并進行序列化封裝轉化為mapOffsetSerializeWrapper offsetSerializeWrapper = this.readLocalOffset();if (offsetSerializeWrapper != null && offsetSerializeWrapper.getOffsetTable() != null) {//存入緩存表offsetTable.putAll(offsetSerializeWrapper.getOffsetTable());for (MessageQueue mq : offsetSerializeWrapper.getOffsetTable().keySet()) {AtomicLong offset = offsetSerializeWrapper.getOffsetTable().get(mq);log.info("load consumer's offset, {} {} {}",this.groupName,mq,offset.get());}} }public class OffsetSerializeWrapper extends RemotingSerializable {private ConcurrentMap<MessageQueue, AtomicLong> offsetTable =new ConcurrentHashMap<MessageQueue, AtomicLong>(); }LocalFileOffsetStore#persistAll
public void persistAll(Set<MessageQueue> mqs) {if (null == mqs || mqs.isEmpty()) {return;}OffsetSerializeWrapper offsetSerializeWrapper = new OffsetSerializeWrapper();for (Map.Entry<MessageQueue, AtomicLong> entry : this.offsetTable.entrySet()) {if (mqs.contains(entry.getKey())) {AtomicLong offset = entry.getValue();//填充<消息隊列-消費偏移量>緩存表offsetSerializeWrapper.getOffsetTable().put(entry.getKey(), offset);}}//轉為json格式String jsonString = offsetSerializeWrapper.toJson(true);if (jsonString != null) {try {//jsonString->file->保存到storePathMixAll.string2File(jsonString, this.storePath);} catch (IOException e) {log.error("persistAll consumer offset Exception, " + this.storePath, e);}} }persistAll()的入口是MQClientInstance#startScheduledTask
MQClientInstance#startScheduledTask
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {try {MQClientInstance.this.persistAllConsumerOffset();} catch (Exception e) {log.error("ScheduledTask persistAllConsumerOffset exception", e);}} }, 1000 * 10, this.clientConfig.getPersistConsumerOffsetInterval(), TimeUnit.MILLISECONDS);消費端啟動后延遲10s開啟該定時任務,每隔5s進行一次持久化。
MQClientInstance#persistAllConsumerOffset
Iterator<Entry<String, MQConsumerInner>> it = this.consumerTable.entrySet().iterator(); while (it.hasNext()) {Entry<String, MQConsumerInner> entry = it.next();MQConsumerInner impl = entry.getValue();impl.persistConsumerOffset(); }DefaultMQPushConsumerImpl#persistConsumerOffset
try {this.makeSureStateOK();Set<MessageQueue> mqs = new HashSet<MessageQueue>();//獲取重負載分配好的消息隊列Set<MessageQueue> allocateMq = this.rebalanceImpl.getProcessQueueTable().keySet();mqs.addAll(allocateMq);//當前是LocalFileOffsetStore.persistAllthis.offsetStore.persistAll(mqs); } catch (Exception e) {log.error("group: " + this.defaultMQPushConsumer.getConsumerGroup() + " persistConsumerOffset exception", e); }集群模式消費進度存儲(RemoteBrokerOffsetStore)
RemoteBrokerOffsetStore
private final static InternalLogger log = ClientLogger.getLog(); //MQ客戶端實例-該實例被同一個JVM下的消費者和生產者共用 private final MQClientInstance mQClientFactory; //消費組名 private final String groupName; //以消息隊列為鍵-消費偏移量為值的緩存表 private ConcurrentMap<MessageQueue, AtomicLong> offsetTable =new ConcurrentHashMap<MessageQueue, AtomicLong>();RemoteBrokerOffsetStore#persistAll
for (Map.Entry<MessageQueue, AtomicLong> entry : this.offsetTable.entrySet()) {MessageQueue mq = entry.getKey();AtomicLong offset = entry.getValue();if (offset != null) {if (mqs.contains(mq)) {try {//更新消費偏移量到Brokerthis.updateConsumeOffsetToBroker(mq, offset.get());log.info("[persistAll] Group: {} ClientId: {} updateConsumeOffsetToBroker {} {}",this.groupName,this.mQClientFactory.getClientId(),mq,offset.get());} catch (Exception e) {log.error("updateConsumeOffsetToBroker exception, " + mq.toString(), e);}} } }RemoteBrokerOffsetStore#updateConsumeOffsetToBroker
同步更新消息消費偏移量,如Master關閉,則更新到Slave。
//從MQ客戶端中根據BrokerName獲取消息隊列對應的Broker FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName()); if (null == findBrokerResult) {//根據Topic從NameServer更新路由信息this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());//重新查找findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName()); }if (findBrokerResult != null) {//封裝消息消費隊列更新請求頭UpdateConsumerOffsetRequestHeader requestHeader = new UpdateConsumerOffsetRequestHeader();requestHeader.setTopic(mq.getTopic()); //主題信息requestHeader.setConsumerGroup(this.groupName); //消費者組requestHeader.setQueueId(mq.getQueueId()); //隊列IDrequestHeader.setCommitOffset(offset); //消費偏移量if (isOneway) {//Oneway->根據Broker地址->發送請求將消費偏移量保存到Broker-超時時間默認為5sthis.mQClientFactory.getMQClientAPIImpl().updateConsumerOffsetOneway(findBrokerResult.getBrokerAddr(), requestHeader, 1000 * 5);} else {this.mQClientFactory.getMQClientAPIImpl().updateConsumerOffset(findBrokerResult.getBrokerAddr(), requestHeader, 1000 * 5);} } else {throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null); }RemoteBrokerOffsetStore#updateOffset
if (mq != null) {//從緩存中獲取消息隊列對應的偏移量AtomicLong offsetOld = this.offsetTable.get(mq);//為空if (null == offsetOld) {//將存入的offset存入內存中offsetOld = this.offsetTable.putIfAbsent(mq, new AtomicLong(offset));}//不為空->根據increaseOnly更新原先的offsetOldif (null != offsetOld) {if (increaseOnly) {MixAll.compareAndIncreaseOnly(offsetOld, offset);} else {offsetOld.set(offset);}} }RemoteBrokerOffsetStore#readOffset
public long readOffset(final MessageQueue mq, //消息隊列final ReadOffsetType type) { //讀取偏移量類型if (mq != null) {switch (type) {case MEMORY_FIRST_THEN_STORE://從內存中讀取 case READ_FROM_MEMORY: {AtomicLong offset = this.offsetTable.get(mq);if (offset != null) {return offset.get();} else if (ReadOffsetType.READ_FROM_MEMORY == type) {return -1;}}//從Broker中讀取 case READ_FROM_STORE: {try {//從Broker中獲取消費偏移量long brokerOffset = this.fetchConsumeOffsetFromBroker(mq);AtomicLong offset = new AtomicLong(brokerOffset);//更新至內存中(map)this.updateOffset(mq, offset.get(), false);return brokerOffset;}// No offset in brokercatch (MQBrokerException e) {return -1;}//Other exceptionscatch (Exception e) {log.warn("fetchConsumeOffsetFromBroker exception, " + mq, e);return -2;}}default:break;}}RemoteBrokerOffsetStore#fetchConsumeOffsetFromBroker
//從MQ客戶端實例中獲取Boker信息 FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName()); if (null == findBrokerResult) {//從NameServer中更新Topic的路由信息this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());//重新獲取BrokerfindBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName()); }if (findBrokerResult != null) {//封裝查詢消費進度請求頭QueryConsumerOffsetRequestHeader requestHeader = new QueryConsumerOffsetRequestHeader();requestHeader.setTopic(mq.getTopic());requestHeader.setConsumerGroup(this.groupName);requestHeader.setQueueId(mq.getQueueId());//帶上請求頭調用MQClientAPI到Broker中獲取return this.mQClientFactory.getMQClientAPIImpl().queryConsumerOffset(findBrokerResult.getBrokerAddr(), requestHeader, 1000 * 5); } else {throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null); }本文僅作為個人學習使用,如有不足或錯誤請指正!
總結
以上是生活随笔為你收集整理的RocketMQ:消息ACK机制源码解析的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: RocketMQ:消费端的消息消息队列负
- 下一篇: RocketMQ:消息存储机制详解与源码