Redis学习(二):redis集群之cluster模式下的跨节点的批量操作 I
說明
通過之前的博文《Redis學習(一):redis集群之哨兵模式下的負載均衡》,對redis哨兵模式下的讀負載進行學習研究。在本篇博文中,將對redis cluster模式下的跨節點集合操作進行研究學習。通過本篇博文,我們將了解redis cluster模式的基本原理及在Jedis客戶端中如何對redis cluster集群進行批量操作。
正文
Redis Cluster
redis cluster是redis官方推薦的高可用分布式解決方案。它的設計目標主要是:
- 高性能和線性擴展
- 一定程度的寫操作安全性
- 可用性
但是它同時也引入了一些缺陷:所有的操作都只能在同一個節點(key的slot相同)進行,只能使用0號數據庫而不能使用其他數據庫,無法跨節點使用事務等等。
在對redis cluster操作前需要了解下槽點的概念:
redis cluster模式將存儲空間在邏輯上分為了16384個槽點,每個節點負責一部分槽點。在對數據進行操作時,需要根據key計算出槽點,進而找到對應的節點進行操作。槽點的算法為:HASH_SLOT = CRC16(KEY) mod 16384
為了保證集群的可用性,官方建議最少使用三個節點,三個從節點,以一主一從的形式。
關于redis集群的搭建可以參考《深入剖析Redis系列(三) - Redis集群模式搭建與原理詳解》這篇文章。
關于redis集群更多的詳細內容,詳見 https://redis.io/topics/cluster-spec
JedisCluster
JedisCluster是Jedis客戶端中對cluster模式實現的操作類,通過探究學習其源碼來了解Jedis如何對集群進行操作。
通過上圖可以看到,JedisCluster繼承了BinaryJedisCluster類,實現了JedisClusterCommands, MultiKeyJedisClusterCommands, JedisClusterScriptingCommands接口。
接著再以JedisCluster的構造函數為入口,探究該對象如何創建初始化。
public JedisCluster(Set<HostAndPort> nodes, GenericObjectPoolConfig poolConfig) {this((Set)nodes, 2000, 5, poolConfig); }public JedisCluster(Set<HostAndPort> jedisClusterNode, int timeout, int maxAttempts, GenericObjectPoolConfig poolConfig) {super(jedisClusterNode, timeout, maxAttempts, poolConfig); }通過源碼可以發現它最終調用了父類的構造方法:
public BinaryJedisCluster(Set<HostAndPort> jedisClusterNode, int timeout, int maxAttempts, GenericObjectPoolConfig poolConfig) {this.connectionHandler = new JedisSlotBasedConnectionHandler(jedisClusterNode, poolConfig, timeout);this.maxAttempts = maxAttempts; }在BinaryJedisCluster類的構造方法中,創建了JedisClusterConnectionHandler對象,可以看到這里具體創建的是JedisSlotBasedConnectionHandler對象。
通過上圖可以看到,JedisSlotBasedConnectionHandler繼承了JedisClusterConnectionHandler這個抽象類,該類的構造方法也是調用的父類的構造方法。
在抽象類JedisClusterConnectionHandler的構造函數中,創建了JedisClusterInfoCache對象,并進行了槽點初始化。在該類中有唯一一個屬性------JedisClusterInfoCache cache;
public JedisClusterConnectionHandler(Set<HostAndPort> nodes, GenericObjectPoolConfig poolConfig, int connectionTimeout, int soTimeout, String password, String clientName) {this.cache = new JedisClusterInfoCache(poolConfig, connectionTimeout, soTimeout, password, clientName);this.initializeSlotsCache(nodes, poolConfig, connectionTimeout, soTimeout, password, clientName); }至此,我們可以看到與JedisCluster類密切相關的兩個類,JedisClusterConnectionHandler和JedisClusterInfoCache。在JedisClusterInfoCache類中有兩個關鍵屬性 Map<String, JedisPool> nodes 和 Map<Integer, JedisPool> slots, 通過閱讀源碼來了解這兩個屬性的作用。
在以上代碼中可以看到,先創建JedisClusterInfoCache對象,再初始化槽點信息。
public JedisClusterInfoCache(GenericObjectPoolConfig poolConfig, int connectionTimeout, int soTimeout, String password, String clientName) {this.nodes = new HashMap();this.slots = new HashMap();this.rwl = new ReentrantReadWriteLock();this.r = this.rwl.readLock();this.w = this.rwl.writeLock();this.poolConfig = poolConfig;this.connectionTimeout = connectionTimeout;this.soTimeout = soTimeout;this.password = password;this.clientName = clientName; }初始化槽點信息調用了initializeSlotsCache方法
private void initializeSlotsCache(Set<HostAndPort> startNodes, GenericObjectPoolConfig poolConfig, int connectionTimeout, int soTimeout, String password, String clientName) {Iterator var7 = startNodes.iterator();while(var7.hasNext()) {HostAndPort hostAndPort = (HostAndPort)var7.next();Jedis jedis = null;try {jedis = new Jedis(hostAndPort.getHost(), hostAndPort.getPort(), connectionTimeout, soTimeout);if (password != null) {jedis.auth(password);}if (clientName != null) {jedis.clientSetname(clientName);}this.cache.discoverClusterNodesAndSlots(jedis);break;} catch (JedisConnectionException var14) {} finally {if (jedis != null) {jedis.close();}}}}在該方法中,可以看到通過配置的節點信息,循環調用JedisClusterInfoCache對象的discoverClusterNodesAndSlots方法。注意,這里只要初始化成功就立即終止循環。
public void discoverClusterNodesAndSlots(Jedis jedis) {this.w.lock();try {this.reset();List<Object> slots = jedis.clusterSlots();Iterator var3 = slots.iterator();while(true) {List slotInfo;do {if (!var3.hasNext()) {return;}Object slotInfoObj = var3.next();slotInfo = (List)slotInfoObj;} while(slotInfo.size() <= 2);List<Integer> slotNums = this.getAssignedSlotArray(slotInfo);int size = slotInfo.size();for(int i = 2; i < size; ++i) {List<Object> hostInfos = (List)slotInfo.get(i);if (hostInfos.size() > 0) {HostAndPort targetNode = this.generateHostAndPort(hostInfos);this.setupNodeIfNotExist(targetNode);if (i == 2) {this.assignSlotsToNode(slotNums, targetNode);}}}}} finally {this.w.unlock();}}在該方法中,通過jedis.clusterSlots()獲取集群的槽點信息。可以看到該方法返回了一個List<Object>對象,在list中每個元素是單獨的slot信息,這也是一個list集合。該集合的基本信息為[long, long, List, List], 第一,二個元素是該節點負責槽點的起始位置,第三個元素是主節點信息,第四個元素為主節點對應的從節點信息。該list的基本信息為[string,int,string],第一個為host信息,第二個為port信息,第三個為唯一id。
在獲取有關節點的槽點信息后,調用getAssignedSlotArray(slotinfo)來獲取所有的槽點值。
private List<Integer> getAssignedSlotArray(List<Object> slotInfo) {List<Integer> slotNums = new ArrayList();for(int slot = ((Long)slotInfo.get(0)).intValue(); slot <= ((Long)slotInfo.get(1)).intValue(); ++slot) {slotNums.add(slot);}return slotNums; }再獲取主節點的地址信息,調用generateHostAndPort(hostInfo)方法。
private HostAndPort generateHostAndPort(List<Object> hostInfos) {return new HostAndPort(SafeEncoder.encode((byte[])((byte[])hostInfos.get(0))), ((Long)hostInfos.get(1)).intValue()); }再根據節點地址信息來設置節點對應的JedisPool,即設置Map<String, JedisPool> nodes的值。
public JedisPool setupNodeIfNotExist(HostAndPort node) {this.w.lock();JedisPool nodePool;try {String nodeKey = getNodeKey(node);JedisPool existingPool = (JedisPool)this.nodes.get(nodeKey);if (existingPool == null) {nodePool = new JedisPool(this.poolConfig, node.getHost(), node.getPort(), this.connectionTimeout, this.soTimeout, this.password, 0, this.clientName, false, (SSLSocketFactory)null, (SSLParameters)null, (HostnameVerifier)null);this.nodes.put(nodeKey, nodePool);JedisPool var5 = nodePool;return var5;}nodePool = existingPool;} finally {this.w.unlock();}return nodePool; }接下來判斷若此時節點信息為主節點信息時,則調用assignSlotsToNodes方法,設置每個槽點值對應的連接池,即設置Map<Integer, JedisPool> slots的值。
public void assignSlotsToNode(List<Integer> targetSlots, HostAndPort targetNode) {this.w.lock();try {JedisPool targetPool = this.setupNodeIfNotExist(targetNode);Iterator var4 = targetSlots.iterator();while(var4.hasNext()) {Integer slot = (Integer)var4.next();this.slots.put(slot, targetPool);}} finally {this.w.unlock();}}至此,JedisCluster對象的初始化完成。這里主要是通過JedisClusterInfoCache對象來保存節點信息及對應槽點信息。
mget操作
在上部分內容中,簡單介紹了JedisCluster類的初始化過程。在之前提到redis cluster 只能實現在一個節點的集合操作,即要求所有的key都有相同的slot,這里我們通過源碼,了解JedisCluster的有限的批量操作。
public List<String> mget(final String... keys) {return (List)(new JedisClusterCommand<List<String>>(this.connectionHandler, this.maxAttempts) {public List<String> execute(Jedis connection) {return connection.mget(keys);}}).run(keys.length, keys); }在mget方法中,創建了JedisClusterCommand匿名對象,調用其run()方法來完成操作。這里從run()方法開始探究。
public T run(int keyCount, String... keys) {if (keys != null && keys.length != 0) {int slot = JedisClusterCRC16.getSlot(keys[0]);if (keys.length > 1) {for(int i = 1; i < keyCount; ++i) {int nextSlot = JedisClusterCRC16.getSlot(keys[i]);if (slot != nextSlot) {throw new JedisClusterOperationException("No way to dispatch this command to Redis Cluster because keys have different slots.");}}}return this.runWithRetries(slot, this.maxAttempts, false, (JedisRedirectionException)null);} else {throw new JedisClusterOperationException("No way to dispatch this command to Redis Cluster.");} }通過該方法可以看到,針對所有的key,都通過JedisClusterCRC16.getSlot(key)方法計算出其對應的槽點值,再循環判斷所有的槽點值是否相等,若存在不等則拋出異常: No way to dispatch this command to Redis Cluster because keys have different slots.
校驗完成后,調用了runWithRetries方法,具體執行命令,通過該方法名稱可以看出,該方法可以失敗重試。重試次數為配置時的參數,若沒有指定,則JedisCluster默認值為5。
private T runWithRetries(int slot, int attempts, boolean tryRandomNode, JedisRedirectionException redirect) {if (attempts <= 0) {throw new JedisClusterMaxAttemptsException("No more cluster attempts left.");} else {Jedis connection = null;Object var7;try {if (redirect != null) {connection = this.connectionHandler.getConnectionFromNode(redirect.getTargetNode());if (redirect instanceof JedisAskDataException) {connection.asking();}} else if (tryRandomNode) {connection = this.connectionHandler.getConnection();} else {connection = this.connectionHandler.getConnectionFromSlot(slot);}Object var6 = this.execute(connection);return var6;} catch (JedisNoReachableClusterNodeException var13) {throw var13;} catch (JedisConnectionException var14) {this.releaseConnection(connection);connection = null;if (attempts <= 1) {this.connectionHandler.renewSlotCache();}var7 = this.runWithRetries(slot, attempts - 1, tryRandomNode, redirect);return var7;} catch (JedisRedirectionException var15) {if (var15 instanceof JedisMovedDataException) {this.connectionHandler.renewSlotCache(connection);}this.releaseConnection(connection);connection = null;var7 = this.runWithRetries(slot, attempts - 1, false, var15);} finally {this.releaseConnection(connection);}return var7;}}在方法中,首先通過JedisClusterConnectionHandler的getConnectionFromSlot(slot)方法獲取對應槽點的連接jedis對象。
public Jedis getConnectionFromSlot(int slot) {JedisPool connectionPool = this.cache.getSlotPool(slot);if (connectionPool != null) {return connectionPool.getResource();} else {this.renewSlotCache();connectionPool = this.cache.getSlotPool(slot);return connectionPool != null ? connectionPool.getResource() : this.getConnection();} }在獲取槽點對應連接時,是通過JedisClusterInfoCache的getSlotPool(slot)方法。若獲取的JedisPool為null,則會進行重新初始化槽點的信息。在重新初始化后若值仍為null,則隨機獲取一個Jedis對象。
在獲取到jedis后,調用execute()方法執行命令,這個方法在創建匿名對象時,該方法被實現。
上面提到,該方法可以失敗重試。通過源碼得知,在異常為JedisConnectionException或JedisRedirectException時,才進行重試。在重試過程中,也會進行重新初始化槽點信息信息,直到成功執行或重試次數耗盡。
至此,JedisCluster的有限集合操作mget源碼分析結束。這里可以看出,JedisCluster只能進行有限的批量操作,必須要求所有key的slot值相等。這樣的方式,對我們的使用造成很多不便,雖然官方建議可以通過key的hash_tag來保證slot值一致,實現批量操作。
在下篇博文中,我將會根據閱讀源碼獲取的基本知識來打破這個約束,實現跨節點的批量操作。
總結
以上是生活随笔為你收集整理的Redis学习(二):redis集群之cluster模式下的跨节点的批量操作 I的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Citrix ADC中SNIP的三种配置
- 下一篇: 短视频搬运神器,二次剪辑神器,涨粉热门必