RocketMQ Consumer 负载均衡算法源码学习 -- AllocateMessageQueueConsistentHash
RocketMQ 提供了一致性hash 算法來做Consumer 和 MessageQueue的負載均衡。 源碼中一致性hash 環(huán)的實現(xiàn)是很優(yōu)秀的,我們一步一步分析。
一個Hash環(huán)包含多個節(jié)點, 我們用 MyNode 去封裝節(jié)點, 方法 getKey() 封裝獲取節(jié)點的key。我們可以實現(xiàn)MyNode 去描述一個物理節(jié)點或虛擬節(jié)點。MyVirtualNode 實現(xiàn) MyNode, 表示一個虛擬節(jié)點。這里注意:一個虛擬節(jié)點是依賴于一個物理節(jié)點,所以MyVirtualNode 中封裝了 一個 泛型 T physicalNode。物理節(jié)點MyClientNode也是實現(xiàn)了這個MyNode接口,很好的設(shè)計。代碼加注釋如下:
?/*** 表示hash環(huán)的一個節(jié)點*/public interface MyNode {/*** @return 節(jié)點的key*/String getKey();}/*** 虛擬節(jié)點*/ public class MyVirtualNode<T extends MyNode> implements MyNode {final T physicalNode; ?// 主節(jié)點final int replicaIndex; ?// 虛節(jié)點下標public MyVirtualNode(T physicalNode, int replicaIndex) {this.physicalNode = physicalNode;this.replicaIndex = replicaIndex;}@Overridepublic String getKey() {return physicalNode.getKey() + "-" + replicaIndex;}/*** thisMyVirtualNode 是否是pNode 的 虛節(jié)點*/public boolean isVirtualNodeOf(T pNode) {return physicalNode.getKey().equals(pNode.getKey());}public T getPhysicalNode() {return physicalNode;}}private static class MyClientNode implements MyNode {private final String clientID;public MyClientNode(String clientID) {this.clientID = clientID;}@Overridepublic String getKey() {return clientID;} }上面實現(xiàn)了節(jié)點, 一致性hash 下一個問題是怎么封裝hash算法呢?RocketMQ 使用 MyHashFunction 接口定義hash算法。使用MD5 + bit 位hash的方式實現(xiàn)hash算法。我們完全可以自己實現(xiàn)hash算法,具體見我的“常見的一些hash函數(shù)”文章。MyMD5Hash 算法代碼的如下:
?// MD5 hash 算法, 這里hash算法可以用常用的 hash 算法替換。private static class MyMD5Hash implements MyHashFunction {MessageDigest instance;public MyMD5Hash() {try {instance = MessageDigest.getInstance("MD5");} catch (NoSuchAlgorithmException e) {}}@Overridepublic long hash(String key) {instance.reset();instance.update(key.getBytes());byte[] digest = instance.digest();long h = 0;for (int i = 0; i < 4; i++) {h <<= 8;h |= ((int)digest[i]) & 0xFF;}return h;}}現(xiàn)在,hash環(huán)的節(jié)點有了, hash算法也有了,最重要的是描述一個一致性hash 環(huán)。 想一想,這個環(huán)可以由N 個物理節(jié)點, 每個物理節(jié)點對應(yīng)m個虛擬節(jié)點,節(jié)點位置用hash算法值描述。每個物理節(jié)點就是每個Consumer, 每個Consumer 的 id 就是 物理節(jié)點的key。 每個MessageQueue 的toString() 值 hash 后,用來找環(huán)上對應(yīng)的最近的下一個物理節(jié)點。源碼如下,這里展示主要的代碼,其中最巧妙地是routeNode 方法, addNode 方法 注意我的注釋:
public class MyConsistentHashRouter<T extends MyNode> {private final SortedMap<Long, MyVirtualNode<T>> ring = new TreeMap<>(); // key是虛節(jié)點key的哈希值, value 是虛節(jié)點 private final MyHashFunction myHashFunction; /*** @param pNodes 物理節(jié)點集合* @param vNodeCount 每個物理節(jié)點對應(yīng)的虛節(jié)點數(shù)量* @param hashFunction hash 函數(shù) 用于 hash 各個節(jié)點*/ public MyConsistentHashRouter(Collection<T> pNodes, int vNodeCount, MyHashFunction hashFunction) {if (hashFunction == null) {throw new NullPointerException("Hash Function is null");}this.myHashFunction = hashFunction;if (pNodes != null) {for (T pNode : pNodes) {this.addNode(pNode, vNodeCount);}} } /*** 添加物理節(jié)點和它的虛節(jié)點到hash環(huán)。* @param pNode 物理節(jié)點* @param vNodeCount 虛節(jié)點數(shù)量。*/ public void addNode(T pNode, int vNodeCount) {if (vNodeCount < 0) {throw new IllegalArgumentException("ill virtual node counts :" + vNodeCount);}int existingReplicas = this.getExistingReplicas(pNode);for (int i = 0; i < vNodeCount; i++) {MyVirtualNode<T> vNode = new MyVirtualNode<T>(pNode, i + existingReplicas); // 創(chuàng)建一個新的虛節(jié)點,位置是 i+existingReplicasring.put(this.myHashFunction.hash(vNode.getKey()), vNode); // 將新的虛節(jié)點放到hash環(huán)中} } /*** 根據(jù)一個給定的key 在 hash環(huán)中 找到離這個key最近的下一個物理節(jié)點* @param key 一個key, 用于找這個key 在環(huán)上最近的節(jié)點*/ public T routeNode(String key) {if (ring.isEmpty()) {return null;}Long hashVal = this.myHashFunction.hash(key);SortedMap<Long, MyVirtualNode<T>> tailMap = ring.tailMap(hashVal);Long nodeHashVal = !tailMap.isEmpty() ? tailMap.firstKey() : ring.firstKey();return ring.get(nodeHashVal).getPhysicalNode(); }/*** @param pNode 物理節(jié)點* @return 當前這個物理節(jié)點對應(yīng)的虛節(jié)點的個數(shù)*/ public int getExistingReplicas(T pNode) {int replicas = 0;for (MyVirtualNode<T> vNode : ring.values()) {if (vNode.isVirtualNodeOf(pNode)) {replicas++;}}return replicas; }現(xiàn)在一致性hash 環(huán)有了, 剩下的就是 和rocketmq 的 consumer, mq 構(gòu)成負載均衡策略了。比較簡單, 代碼如下:
??? ??? ??? ?/*** 基于一致性性hash環(huán)的Consumer負載均衡.*/?? ??public class MyAllocateMessageQueueConsistentHash implements AllocateMessageQueueStrategy {// 每個物理節(jié)點對應(yīng)的虛節(jié)點的個數(shù)private final int virtualNodeCnt;private final MyHashFunction customHashFunction;public MyAllocateMessageQueueConsistentHash() {this(10); ? // 默認10個虛擬節(jié)點}public MyAllocateMessageQueueConsistentHash(int virtualNodeCnt) {this(virtualNodeCnt, null);}public MyAllocateMessageQueueConsistentHash(int virtualNodeCnt, MyHashFunction customHashFunction) {if (virtualNodeCnt < 0) {throw new IllegalArgumentException("illegal virtualNodeCnt : " + virtualNodeCnt);}this.virtualNodeCnt = virtualNodeCnt;this.customHashFunction = customHashFunction;}@Overridepublic List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll, List<String> cidAll) {// 省去一系列非空校驗Collection<MyClientNode> cidNodes = new ArrayList<>();for (String cid : cidAll) {cidNodes.add(new MyClientNode(cid));}final MyConsistentHashRouter<MyClientNode> router;if (this.customHashFunction != null) {router = new MyConsistentHashRouter<MyClientNode>(cidNodes, virtualNodeCnt, customHashFunction);}else {router = new MyConsistentHashRouter<MyClientNode>(cidNodes, virtualNodeCnt);}List<MessageQueue> results = new ArrayList<MessageQueue>(); ?// 當前 currentCID 對應(yīng)的 mq// 將每個mq 根據(jù)一致性hash 算法找到對應(yīng)的物理節(jié)點(Consumer)for (MessageQueue mq : mqAll) {MyClientNode clientNode = router.routeNode(mq.toString()); ? // 根據(jù) mq toString() 方法做hash 和環(huán)上節(jié)點比較if (clientNode != null && currentCID.equals(clientNode.getKey())) {results.add(mq);}}return results;}@Overridepublic String getName() {return "CONSISTENT_HASH";}private static class MyClientNode implements MyNode {private final String clientID;public MyClientNode(String clientID) {this.clientID = clientID;}@Overridepublic String getKey() {return clientID;}}}
————————————————
版權(quán)聲明:本文為CSDN博主「昊haohao」的原創(chuàng)文章,遵循 CC 4.0 BY-SA 版權(quán)協(xié)議,轉(zhuǎn)載請附上原文出處鏈接及本聲明。
原文鏈接:https://blog.csdn.net/ZHANGYONGHAO604/article/details/82426373
總結(jié)
以上是生活随笔為你收集整理的RocketMQ Consumer 负载均衡算法源码学习 -- AllocateMessageQueueConsistentHash的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 常见的一些 Hash 函数
- 下一篇: redis探秘:选择合适的数据结构,减少