路由动态更新 动态路由更新算法)
一、Producer路由信息
從NameServer章節分析得知,路由信息存儲在NameServer,生產端和消費端定時向NameServer獲取topic相關的路由信息;
從生產者啟動流程得知:
路由信息的動態更新源碼在MQClientInstance#startScheduledTask定時任務里面
具體方法:
updateTopicRouteInfoFromNameServer下圖為路由更新流程
添加圖片注釋,不超過 140 字(可選)
接下來我們著重解析此段源碼:
1 定時任務:頻率-30s
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
//從nameServer更新路由信息 -定時任務:30s一次
MQClientInstance.this.updateTopicRouteInfoFromNameServer();
} catch (Exception e) {
log.error("ScheduledTask updateTopicRouteInfoFromNameServer exception", e);
}
}
}, 10, this.clientConfig.getPollNameServerInterval(), TimeUnit.MILLISECONDS);
2 updateTopicRouteInfoFromNameServer
public void updateTopicRouteInfoFromNameServer() {
Set topicList = new HashSet();
{ // Consumer 消費端,后續再分析
...省略....
}
{ // Producer 生產端
Iterator<entry> it = this.producerTable.entrySet().iterator();
while (it.hasNext()) {
Entry entry = it.next();
MQProducerInner impl = entry.getValue();
if (impl != null) {
Set lst = impl.getPublishTopicList();//1>獲取所有 topic-list
topicList.addAll(lst);
}
}
}
//2>更新路由信息
for (String topic : topicList) {
this.updateTopicRouteInfoFromNameServer(topic);
}
}</entry
分析如下:
1.1 getPublishTopicListgetPublishTopicList 方法分析:
public Set getPublishTopicList() {
Set topicList = new HashSet();
for (String key : this.topicPublishInfoTable.keySet()) {
topicList.add(key);
}
return topicList;
}
備注:
細心的你可能發現從啟動流程中得知:
topicPublishInfoTable(ConcurrentHashMap)只會默認注冊topic=TBW102的信息,那正常業務發送的topic是如何注冊進去的呢,建議直接觀看理解以下代碼,在發送流程中會體現出如何注冊到topicPublishInfoTable中;
topicPublishInfoTable數據的初始化(value:第一次默認都是new TopicPublishInfo())
//查找主題的路由信息的方法
private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
if (null == topicPublishInfo || !topicPublishInfo.ok()) {
this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());
this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic); //從NameServer更新topic路由信息
topicPublishInfo = this.topicPublishInfoTable.get(topic);
}
if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {
return topicPublishInfo;
} else {
this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer); //從NameServer更新topic路由信息
topicPublishInfo = this.topicPublishInfoTable.get(topic);
return topicPublishInfo;
}
}
分析
- 如果生產者中緩存了 topic 的路由信息,如果該路由信息中包含了消息隊列,則直接返回該路由信息,
- 如果沒有緩存或沒有包含消息隊列, 則向 NameServer查詢該 topic 的路由信息。
- 如果最終未找到路由信息,則拋出異常 : 無法找到主題相關路由信息異常 。
1.2 updateTopicRouteInfoFromNameServer從NameServer更新topic路由信息
在分析之前,可先簡單分析MQClientInstance核心屬性:
public class MQClientInstance {
...省略...
//key:group, value: 生產者
private final ConcurrentMap producerTable = new ConcurrentHashMap();
...省略..
//topic-路由信息
private final ConcurrentMap topicRouteTable = new ConcurrentHashMap();
private final Lock lockNamesrv = new ReentrantLock(); //更新路由使用
private final Lock lockHeartbeat = new ReentrantLock(); //發送心跳使用
//broker信息,key:Broker Name, value:-key:brokerId,value:address
private final ConcurrentMap<string *="" broker="" name="" ,="" hashmap> brokerAddrTable =
new ConcurrentHashMap<string, hashmap>();
//broker版本信息,key:Broker Name, value:-key:address,value:version
private final ConcurrentMap<string *="" broker="" name="" ,="" hashmap> brokerVersionTable =
new ConcurrentHashMap<string, hashmap>();
...省略...
}
備注:
此處列出的屬性僅跟生產端相關,其他的屬性和方法大都我們會在消費端分析
接下來著重分析:updateTopicRouteInfoFromNameServer
/**
* 向-NameServer查詢該 topic 的路由信息
* @param topic 主題
* @param isDefault 是否默認主題
* @param defaultMQProducer 默認MQProducer
* @return
*/
public boolean updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault,
DefaultMQProducer defaultMQProducer) {
try {
if (this.lockNamesrv.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) { //獲取鎖:3s
try {
TopicRouteData topicRouteData;
if (isDefault && defaultMQProducer != null) { //默認主題-'TBW102',從NameServer查詢-topicRouteData
topicRouteData = this.mQClientAPIImpl.getDefaultTopicRouteInfoFromNameServer(defaultMQProducer.getCreateTopicKey(), 1000 * 3);
if (topicRouteData != null) {
for (QueueData data : topicRouteData.getQueueDatas()) { //讀寫隊列取最小值,getDefaultTopicQueueNums=4,getReadQueueNums=16
int queueNums = Math.min(defaultMQProducer.getDefaultTopicQueueNums(), data.getReadQueueNums());
data.setReadQueueNums(queueNums);
data.setWriteQueueNums(queueNums);
}
}
} else {//非默認主題,從NameServer查詢-topicRouteData
topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, 1000 * 3);
}
if (topicRouteData != null) {
TopicRouteData old = this.topicRouteTable.get(topic);
boolean changed = topicRouteDataIsChange(old, topicRouteData);//1> 判斷:TopicRouteData 是否改變
if (!changed) { //未改變,
changed = this.isNeedUpdateTopicRouteInfo(topic);//2>繼續判斷是否需要更新:topic-路由信息
} else {
log.info("the topic[{}] route info changed, old[{}] ,new[{}]", topic, old, topicRouteData);
}
if (changed) { // 需要更新
TopicRouteData cloneTopicRouteData = topicRouteData.cloneTopicRouteData();
for (BrokerData bd : topicRouteData.getBrokerDatas()) {
this.brokerAddrTable.put(bd.getBrokerName(), bd.getBrokerAddrs());//維護brokerAddrTable地址信息
}
// Update Pub info
{ //topicRouteData 轉換 TopicPublishInfo(isWriteable)-生產需要的數據
TopicPublishInfo publishInfo = topicRouteData2TopicPublishInfo(topic, topicRouteData); // 3>數據轉換
publishInfo.setHaveTopicRouterInfo(true);
Iterator<entry> it = this.producerTable.entrySet().iterator();//迭代-producerTable
while (it.hasNext()) {
Entry entry = it.next();
MQProducerInner impl = entry.getValue();
if (impl != null) {
impl.updateTopicPublishInfo(topic, publishInfo);//4-更新-路由發布信息
}
}
}
// Update sub info
{ //消費端--后續消費端講解( topicRouteData 轉換 TopicPublishInfo(isReadable) 隊列消息)
Set subscribeInfo = topicRouteData2TopicSubscribeInfo(topic, topicRouteData);//構建隊列信息
Iterator<entry> it = this.consumerTable.entrySet().iterator();
while (it.hasNext()) {
Entry entry = it.next();
MQConsumerInner impl = entry.getValue();
if (impl != null) {
impl.updateTopicSubscribeInfo(topic, subscribeInfo); //更新-消費端:隊列信息
}
}
}
log.info("topicRouteTable.put. Topic = {}, TopicRouteData[{}]", topic, cloneTopicRouteData);
this.topicRouteTable.put(topic, cloneTopicRouteData); // 維護:topicRouteTable
return true;
}
} else {
log.warn("updateTopicRouteInfoFromNameServer, getTopicRouteInfoFromNameServer return null, Topic: {}", topic);
}
} catch (Exception e) { //異常吃掉了
if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX) && !topic.equals(MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC)) {
log.warn("updateTopicRouteInfoFromNameServer Exception", e);//topic 非 RETRY 和 TBW102 失
}
} finally {
this.lockNamesrv.unlock(); // 釋放鎖
}
} else {
log.warn("updateTopicRouteInfoFromNameServer tryLock timeout {}ms", LOCK_TIMEOUT_MILLIS);
}
} catch (InterruptedException e) {
log.warn("updateTopicRouteInfoFromNameServer Exception", e);
}
return false;
}</entry</entry
備注:
- 判斷:TopicRouteData 是否改變,topicRouteDataIsChange(old, topicRouteData); 代碼很簡單,直接分析如下:
- (1) 判斷 olddata 或 nowdata 是否為空
- (2) TopicRouteData的equals方法比較
- 繼續判斷是否需要更新topic路由信息isNeedUpdateTopicRouteInfo(topic);
- 最終調用的代碼為:
- DefaultMQProducerImpl#isPublishTopicNeedUpdate(主要邏輯判斷是 TopicPublishInfo是否存在,或者 TopicPublishInfo的messageQueueList是否為空)
- topicRouteData2TopicPublishInfo數據轉換,你一定感興趣,內容相當簡單,
分析如下:
public static TopicPublishInfo topicRouteData2TopicPublishInfo(final String topic, final TopicRouteData route) {
TopicPublishInfo info = new TopicPublishInfo();
info.setTopicRouteData(route);
if (route.getOrderTopicConf() != null && route.getOrderTopicConf().length() > 0) { // 此處可忽略,針對順序消息
String[] brokers = route.getOrderTopicConf().split(";");
for (String broker : brokers) {
String[] item = broker.split(":");
int nums = Integer.parseInt(item[1]);
for (int i = 0; i < nums; i++) {
MessageQueue mq = new MessageQueue(topic, item[0], i);
info.getMessageQueueList().add(mq);
}
}
info.setOrderTopic(true);
} else {
List qds = route.getQueueDatas();
Collections.sort(qds);
for (QueueData qd : qds) {
if (PermName.isWriteable(qd.getPerm())) { // 寫權限判斷
BrokerData brokerData = null;
for (BrokerData bd : route.getBrokerDatas()) {
if (bd.getBrokerName().equals(qd.getBrokerName())) {
brokerData = bd;
break;
}
}
if (null == brokerData) {
continue;
}
if (!brokerData.getBrokerAddrs().containsKey(MixAll.MASTER_ID)) { // 不包含masterId,過濾
continue;
}
for (int i = 0; i < qd.getWriteQueueNums(); i++) {
MessageQueue mq = new MessageQueue(topic, qd.getBrokerName(), i);
info.getMessageQueueList().add(mq); //根據寫隊列個數,根據 topic+序號創建 MessageQueue,填充topicPublishlnfo 的 List。完成消息發送的路由查找 。
}
}
}
info.setOrderTopic(false);
}
return info;
}
4.更新-路由發布信息:updateTopicPublishInfo(topic, publishInfo);調用的代碼為:DefaultMQProducerImpl#updateTopicPublishInfo,本質就是維護Map-topicPublishInfoTable
二、結論
路由更新雖然相對簡單,但對于生產者來說至關重要,生產端需要知道路由信息才能進行計算選擇將消息發送到哪臺broker;但從源碼分析中,可以看出更新路由信息以topic為維度,組裝更新數據,本質還是維護Map(topicRouteTable、brokerAddrTable、topicPublishInfoTable)等,但是要注意是:ConcurrentHashMap。
程序員的核心競爭力其實還是技術,因此對技術還是要不斷的學習,關注 “IT巔峰技術” 公眾號 ,該公眾號內容定位:中高級開發、架構師、中層管理人員等中高端崗位服務的,除了技術交流外還有很多架構思想和實戰案例.
程序員的核心競爭力其實還是技術,因此對技術還是要不斷的學習,作者是 《 消息中間件 RocketMQ 技術內幕》一書作者,同時也是 “RocketMQ 上海社區”聯合創始人,曾就職于拼多多、德邦等公司,現任上市快遞公司架構負責人,主要負責開發框架的搭建、中間件相關技術的二次開發和運維管理、混合云及基礎服務平臺的建設。
總結
以上是生活随笔為你收集整理的路由动态更新 动态路由更新算法)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 恢复 outlook express中的
- 下一篇: 遨游3.0 RC 版公布