从Curator实现分布式锁的源码再到羊群效应
一、前言
Curator是一款由Java編寫的,操作Zookeeper的客戶端工具,在其內(nèi)部封裝了分布式鎖、選舉等高級功能。
今天主要是分析其實(shí)現(xiàn)分布式鎖的主要原理,有關(guān)分布式鎖的一些介紹或其他實(shí)現(xiàn),有興趣的同學(xué)可以翻閱以下文章:
我用了上萬字,走了一遍Redis實(shí)現(xiàn)分布式鎖的坎坷之路,從單機(jī)到主從再到多實(shí)例,原來會發(fā)生這么多的問題
Redisson可重入與鎖續(xù)期源碼分析
在使用Curator獲取分布式鎖時,Curator會在指定的path下創(chuàng)建一個有序的臨時節(jié)點(diǎn),如果該節(jié)點(diǎn)是最小的,則代表獲取鎖成功。
接下來,在準(zhǔn)備工作中,我們可以觀察是否會創(chuàng)建出一個臨時節(jié)點(diǎn)出來。
二、準(zhǔn)備工作
首先我們需要搭建一個zookeeper集群,當(dāng)然你使用單機(jī)也行。
在這篇文章面試官:能給我畫個Zookeeper選舉的圖嗎?,介紹了一種使用docker-compose方式快速搭建zk集群的方式。
在pom中引入依賴:
<dependency><groupId>org.apache.curator</groupId><artifactId>curator-recipes</artifactId><version>2.12.0</version></dependency>Curator客戶端的配置項(xiàng):
/*** @author qcy* @create 2022/01/01 22:59:34*/ @Configuration public class CuratorFrameworkConfig {//zk各節(jié)點(diǎn)地址private static final String CONNECT_STRING = "localhost:2181,localhost:2182,localhost:2183";//連接超時時間(單位:毫秒)private static final int CONNECTION_TIME_OUT_MS = 10 * 1000;//會話超時時間(單位:毫秒)private static final int SESSION_TIME_OUT_MS = 30 * 1000;//重試的初始等待時間(單位:毫秒)private static final int BASE_SLEEP_TIME_MS = 2 * 1000;//最大重試次數(shù)private static final int MAX_RETRIES = 3;@Beanpublic CuratorFramework getCuratorFramework() {CuratorFramework curatorFramework = CuratorFrameworkFactory.builder().connectString(CONNECT_STRING).connectionTimeoutMs(CONNECTION_TIME_OUT_MS).sessionTimeoutMs(SESSION_TIME_OUT_MS).retryPolicy(new ExponentialBackoffRetry(BASE_SLEEP_TIME_MS, MAX_RETRIES)).build();curatorFramework.start();return curatorFramework;}}SESSION_TIME_OUT_MS參數(shù)則會保證,在某個客戶端獲取到鎖之后突然宕機(jī),zk能在該時間內(nèi)刪除當(dāng)前客戶端創(chuàng)建的臨時有序節(jié)點(diǎn)。
測試代碼如下:
//臨時節(jié)點(diǎn)路徑,qcy是博主名字縮寫哈private static final String LOCK_PATH = "/lockqcy";@ResourceCuratorFramework curatorFramework;public void testCurator() throws Exception {InterProcessMutex interProcessMutex = new InterProcessMutex(curatorFramework, LOCK_PATH);interProcessMutex.acquire();try {//模擬業(yè)務(wù)耗時Thread.sleep(30 * 1000);} catch (Exception e) {e.printStackTrace();} finally {interProcessMutex.release();}}當(dāng)使用接口調(diào)用該方法時,在Thread.sleep處打上斷點(diǎn),進(jìn)入到zk容器中觀察創(chuàng)建出來的節(jié)點(diǎn)。
使用? docker exec -it zk容器名 /bin/bash? 以交互模式進(jìn)入容器,接著使用? ?./bin/zkCli.sh? 連接到zk的server端。
然后使用? ls path? 查看節(jié)點(diǎn)
這三個節(jié)點(diǎn)都是持久節(jié)點(diǎn),可以使用? get path? 查看節(jié)點(diǎn)的數(shù)據(jù)結(jié)構(gòu)信息
若一個節(jié)點(diǎn)的ephemeralOwner值為0,即該節(jié)點(diǎn)的臨時擁有者的會話id為0,則代表該節(jié)點(diǎn)為持久節(jié)點(diǎn)。
當(dāng)走到斷點(diǎn)Thread.sleep時,確實(shí)發(fā)現(xiàn)在lockqcy下創(chuàng)建出來一個臨時節(jié)點(diǎn)
到這里嗎,準(zhǔn)備工作已經(jīng)做完了,接下來分析interProcessMutex.acquire與release的流程
三、源碼分析
Curator支持多種類型的鎖,例如
- InterProcessMutex,可重入鎖排它鎖
- InterProcessReadWriteLock,讀寫鎖
- InterProcessSemaphoreMutex,不可重入排它鎖
今天主要是分析InterProcessMutex的加解鎖過程,先看加鎖過程
加鎖
public void acquire() throws Exception {if (!internalLock(-1, null)) {throw new IOException("Lost connection while trying to acquire lock: " + basePath);}}這里是阻塞式獲取鎖,獲取不到鎖,就一直進(jìn)行阻塞。所以對于internalLock方法,超時時間設(shè)置為-1,時間單位設(shè)置成null。
private boolean internalLock(long time, TimeUnit unit) throws Exception {Thread currentThread = Thread.currentThread();//通過能否在map中取到該線程的LockData信息,來判斷該線程是否已經(jīng)持有鎖LockData lockData = threadData.get(currentThread);if (lockData != null) {//進(jìn)行可重入,直接返回加鎖成功lockData.lockCount.incrementAndGet();return true;}//進(jìn)行加鎖String lockPath = internals.attemptLock(time, unit, getLockNodeBytes());if (lockPath != null) {//加鎖成功,保存到map中LockData newLockData = new LockData(currentThread, lockPath);threadData.put(currentThread, newLockData);return true;}return false;}其中threadData是一個map,key線程對象,value為該線程綁定的鎖數(shù)據(jù)。
LockData中保存了加鎖線程owningThread,重入計(jì)數(shù)lockCount與加鎖路徑lockPath,例如/lockqcy/_c_c46513c3-ace0-405f-aa1e-a531ce28fb47-lock-0000000005
private final ConcurrentMap<Thread, LockData> threadData = Maps.newConcurrentMap();private static class LockData {final Thread owningThread;final String lockPath;final AtomicInteger lockCount = new AtomicInteger(1);private LockData(Thread owningThread, String lockPath) {this.owningThread = owningThread;this.lockPath = lockPath;}}進(jìn)入到internals.attemptLock方法中
String attemptLock(long time, TimeUnit unit, byte[] lockNodeBytes) throws Exception {//開始時間final long startMillis = System.currentTimeMillis();//將超時時間統(tǒng)一轉(zhuǎn)化為毫秒單位final Long millisToWait = (unit != null) ? unit.toMillis(time) : null;//節(jié)點(diǎn)數(shù)據(jù),這里為nullfinal byte[] localLockNodeBytes = (revocable.get() != null) ? new byte[0] : lockNodeBytes;//重試次數(shù)int retryCount = 0;//鎖路徑String ourPath = null;//是否獲取到鎖boolean hasTheLock = false;//是否完成boolean isDone = false;while (!isDone) {isDone = true;try {//創(chuàng)建一個臨時有序節(jié)點(diǎn),并返回節(jié)點(diǎn)路徑//內(nèi)部調(diào)用client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path);ourPath = driver.createsTheLock(client, path, localLockNodeBytes);//依據(jù)返回的節(jié)點(diǎn)路徑,判斷是否搶到了鎖hasTheLock = internalLockLoop(startMillis, millisToWait, ourPath);} catch (KeeperException.NoNodeException e) {//在會話過期時,可能導(dǎo)致driver找不到臨時有序節(jié)點(diǎn),從而拋出NoNodeException//這里就進(jìn)行重試if (client.getZookeeperClient().getRetryPolicy().allowRetry(retryCount++, System.currentTimeMillis() - startMillis, RetryLoop.getDefaultRetrySleeper())) {isDone = false;} else {throw e;}}}//獲取到鎖,則返回節(jié)點(diǎn)路徑,供調(diào)用方記錄到map中if (hasTheLock) {return ourPath;}return null;}接下來,將會在internalLockLoop中利用剛才創(chuàng)建出來的臨時有序節(jié)點(diǎn),判斷是否獲取到了鎖。
private boolean internalLockLoop(long startMillis, Long millisToWait, String ourPath) throws Exception {//是否獲取到鎖boolean haveTheLock = false;boolean doDelete = false;try {if (revocable.get() != null) {//當(dāng)前不會進(jìn)入這里client.getData().usingWatcher(revocableWatcher).forPath(ourPath);}//一直嘗試獲取鎖while ((client.getState() == CuratorFrameworkState.STARTED) && !haveTheLock) {//返回basePath(這里是lockqcy)下所有的臨時有序節(jié)點(diǎn),并且按照后綴從小到大排列List<String> children = getSortedChildren();//取出當(dāng)前線程創(chuàng)建出來的臨時有序節(jié)點(diǎn)的名稱,這里就是/_c_c46513c3-ace0-405f-aa1e-a531ce28fb47-lock-0000000005String sequenceNodeName = ourPath.substring(basePath.length() + 1);//判斷當(dāng)前節(jié)點(diǎn)是否處于排序后的首位,如果處于首位,則代表獲取到了鎖PredicateResults predicateResults = driver.getsTheLock(client, children, sequenceNodeName, maxLeases);if (predicateResults.getsTheLock()) {//獲取到鎖之后,則終止循環(huán)haveTheLock = true;} else {//這里代表沒有獲取到鎖//獲取比當(dāng)前節(jié)點(diǎn)索引小的前一個節(jié)點(diǎn)String previousSequencePath = basePath + "/" + predicateResults.getPathToWatch();synchronized (this) {try {//如果前一個節(jié)點(diǎn)不存在,則直接拋出NoNodeException,catch中不進(jìn)行處理,在下一輪中繼續(xù)獲取鎖//如果前一個節(jié)點(diǎn)存在,則給它設(shè)置一個監(jiān)聽器,監(jiān)聽它的釋放事件client.getData().usingWatcher(watcher).forPath(previousSequencePath);if (millisToWait != null) {millisToWait -= (System.currentTimeMillis() - startMillis);startMillis = System.currentTimeMillis();//判斷是否超時if (millisToWait <= 0) {//獲取鎖超時,刪除剛才創(chuàng)建的臨時有序節(jié)點(diǎn)doDelete = true;break;}//沒超時的話,在millisToWait內(nèi)進(jìn)行等待wait(millisToWait);} else {//無限期阻塞等待,監(jiān)聽到前一個節(jié)點(diǎn)被刪除時,才會觸發(fā)喚醒操作wait();}} catch (KeeperException.NoNodeException e) {//如果前一個節(jié)點(diǎn)不存在,則直接拋出NoNodeException,catch中不進(jìn)行處理,在下一輪中繼續(xù)獲取鎖}}}}} catch (Exception e) {ThreadUtils.checkInterrupted(e);doDelete = true;throw e;} finally {if (doDelete) {//刪除剛才創(chuàng)建出來的臨時有序節(jié)點(diǎn)deleteOurPath(ourPath);}}return haveTheLock;}判斷是否獲取到鎖的核心邏輯位于getsTheLock中
public PredicateResults getsTheLock(CuratorFramework client, List<String> children, String sequenceNodeName, int maxLeases) throws Exception {//獲取當(dāng)前節(jié)點(diǎn)在所有子節(jié)點(diǎn)排序后的索引位置int ourIndex = children.indexOf(sequenceNodeName);//判斷當(dāng)前節(jié)點(diǎn)是否處于子節(jié)點(diǎn)中validateOurIndex(sequenceNodeName, ourIndex);//InterProcessMutex的構(gòu)造方法,會將maxLeases初始化為1//ourIndex必須為0,才能使得getsTheLock為true,也就是說,當(dāng)前節(jié)點(diǎn)必須是basePath下的最小節(jié)點(diǎn),才能代表獲取到了鎖boolean getsTheLock = ourIndex < maxLeases;//如果獲取不到鎖,則返回上一個節(jié)點(diǎn)的名稱,用作對其設(shè)置監(jiān)聽String pathToWatch = getsTheLock ? null : children.get(ourIndex - maxLeases);return new PredicateResults(pathToWatch, getsTheLock);}static void validateOurIndex(String sequenceNodeName, int ourIndex) throws KeeperException {if (ourIndex < 0) {//可能會由于連接丟失導(dǎo)致臨時節(jié)點(diǎn)被刪除,因此這里屬于保險(xiǎn)措施throw new KeeperException.NoNodeException("Sequential path not found: " + sequenceNodeName);}}那什么時候,在internalLockLoop處于wait的線程能被喚醒呢?
在internalLockLoop方法中,已經(jīng)使用
client.getData().usingWatcher(watcher).forPath(previousSequencePath);給前一個節(jié)點(diǎn)設(shè)置了監(jiān)聽器,當(dāng)該節(jié)點(diǎn)被刪除時,將會觸發(fā)watcher中的回調(diào)
private final Watcher watcher = new Watcher() {//回調(diào)方法@Overridepublic void process(WatchedEvent event) {notifyFromWatcher();}};private synchronized void notifyFromWatcher() {//喚醒所以在LockInternals實(shí)例上等待的線程notifyAll();}到這里,基本上已經(jīng)分析完加鎖的過程了,在這里總結(jié)下:
首先創(chuàng)建一個臨時有序節(jié)點(diǎn)
如果該節(jié)點(diǎn)是basePath下最小節(jié)點(diǎn),則代表獲取到了鎖,存入map中,下次直接進(jìn)行重入。
如果該節(jié)點(diǎn)不是最小節(jié)點(diǎn),則對前一個節(jié)點(diǎn)設(shè)置監(jiān)聽,接著進(jìn)行wait等待。當(dāng)前一個節(jié)點(diǎn)被刪除時,將會通知notify該線程。
解鎖
解鎖的邏輯,就比較簡單了,直接進(jìn)入release方法中
public void release() throws Exception {Thread currentThread = Thread.currentThread();LockData lockData = threadData.get(currentThread);if (lockData == null) {throw new IllegalMonitorStateException("You do not own the lock: " + basePath);}int newLockCount = lockData.lockCount.decrementAndGet();//直接減少一次重入次數(shù)if (newLockCount > 0) {return;}if (newLockCount < 0) {throw new IllegalMonitorStateException("Lock count has gone negative for lock: " + basePath);}//到這里代表重入次數(shù)為0try {//釋放鎖internals.releaseLock(lockData.lockPath);} finally {//從map中移除threadData.remove(currentThread);}}void releaseLock(String lockPath) throws Exception {revocable.set(null);//內(nèi)部使用guaranteed,會在后臺不斷嘗試刪除節(jié)點(diǎn)deleteOurPath(lockPath);}重入次數(shù)大于0,就減少重入次數(shù)。當(dāng)減為0時,調(diào)用zk去刪除節(jié)點(diǎn),這一點(diǎn)和Redisson可重入鎖釋放時一致。
四、羊群效應(yīng)
在這里談?wù)勈褂肸ookeeper實(shí)現(xiàn)分布式鎖場景中的羊群效應(yīng)
什么是羊群效應(yīng)
首先,羊群是一種很散亂的組織,漫無目的,缺少管理,一般需要牧羊犬來幫助主人控制羊群。
某個時候,當(dāng)其中一只羊發(fā)現(xiàn)前面有更加美味的草而動起來,就會導(dǎo)致其余的羊一哄而上,根本不管周圍的情況。
所以羊群效應(yīng),指的是一個人在進(jìn)行理性的行為后,導(dǎo)致其余人直接盲從,產(chǎn)生非理性的從眾行為。
而Zookeeper中的羊群效應(yīng),則是指一個znode被改變后,觸發(fā)了大量本可以被避免的watch通知,造成集群資源的浪費(fèi)。
獲取不到鎖時的等待演化
sleep一段時間
如果某個線程在獲取鎖失敗后,完全可以sleep一段時間,再嘗試獲取鎖。
但這樣的方式,效率極低。
sleep時間短的話,會頻繁地進(jìn)行輪詢,浪費(fèi)資源。
sleep時間長的話,會出現(xiàn)鎖被釋放但仍然獲取不到鎖的尷尬情況。
所以,這里的優(yōu)化點(diǎn),在于如何變主動輪詢?yōu)楫惒酵ㄖ?/p>
watch被鎖住的節(jié)點(diǎn)
所有的客戶端要獲取鎖時,只去創(chuàng)建一個同名的node。
當(dāng)znode存在時,這些客戶端對其設(shè)置監(jiān)聽。當(dāng)znode被刪除后,通知所有等待鎖的客戶端,接著這些客戶端再次嘗試獲取鎖。
雖然這里使用watch機(jī)制來異步通知,可是當(dāng)客戶端的數(shù)量特別多時,會存在性能低點(diǎn)。
當(dāng)znode被刪除后,在這一瞬間,需要給大量的客戶端發(fā)送通知。在此期間,其余提交給zk的正常請求可能會被延遲或者阻塞。
這就產(chǎn)生了羊群效應(yīng),一個點(diǎn)的變化(znode被刪除),造成了全面的影響(通知大量的客戶端)。
所以,這里的優(yōu)化點(diǎn),在于如何減少對一個znode的監(jiān)聽數(shù)量,最好的情況是只有一個。
watch前一個有序節(jié)點(diǎn)
如果先指定一個basePath,想要獲取鎖的客戶端,直接在該路徑下創(chuàng)建臨時有序節(jié)點(diǎn)。
當(dāng)創(chuàng)建的節(jié)點(diǎn)是最小節(jié)點(diǎn)時,代表獲取到了鎖。如果不是最小的節(jié)點(diǎn),則只對前一個節(jié)點(diǎn)設(shè)置監(jiān)聽器,只監(jiān)聽前一個節(jié)點(diǎn)的刪除行為。
這樣前一個節(jié)點(diǎn)被刪除時,只會給下一個節(jié)點(diǎn)代表的客戶端發(fā)送通知,不會給所有客戶端發(fā)送通知,從而避免了羊群效應(yīng)。
在避免羊群效應(yīng)的同時,使得當(dāng)前鎖成為公平鎖。即按照申請鎖的先后順序獲得鎖,避免存在饑餓過度的線程。
五、后語
本文從源碼角度講解了使用Curator獲取分布式鎖的流程,接著從等待鎖的演化過程角度出發(fā),分析了Zookeeper在分布式鎖場景下避免羊群效應(yīng)的解決方案。
這是Zookeeper系列的第二篇,關(guān)于其watch原理分析、zab協(xié)議等文章也在安排的路上了。
總結(jié)
以上是生活随笔為你收集整理的从Curator实现分布式锁的源码再到羊群效应的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 浏览器端技术体系概览 -- 前端开发的七
- 下一篇: 研报复现系列(六)【国泰君安】基于CCK