生活随笔
收集整理的這篇文章主要介紹了
分布式锁-zk临时节点
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
多線程訪問同一個共享資源時,會出現并發問題,synchronized或者lock 類的鎖只能控制單一進程的資源訪問,多進程下就需要用到分布式鎖
利用zk 可以實現獨占鎖,(同級節點唯一性)多個進程往zk指定節點下創建一個相同名稱的節點,只有一個能成功,創建失敗的通過zk的watcher機制監聽子節點變化,一個監聽到子節點刪除事件,會再次觸發所有進程的寫鎖,但這里會有驚群效應,會影響到性能
利用有序節點實現分布式鎖:每個客戶端都往一個指定節點(locks)注冊一個臨時有序節點,越早創建的節點編號越小,最小編號的節點獲得鎖,通過監聽比自己小的節點,當比自己小的節點刪除后,客戶端會收到watcher,再次判斷自己的節點是不是所有節點最小的,是則獲得鎖;這種方式也會解決驚群問題
接下來我們來看實現:curator 分布式鎖的使用
curator對鎖封裝了一層,提供了InterProcessMutex;還提供了leader 選舉、分布式隊列 InterProcessMutex 分布式可重入排他鎖
InterProcessSemaphoreMutex 分布式排他鎖
InterProcessReadWriteLock 分布式讀寫鎖
public class LeaderSelectorClient extends LeaderSelectorListenerAdapter implements Closeable {private String name
; private LeaderSelector leaderSelector
; private CountDownLatch countDownLatch
=new CountDownLatch(1);public LeaderSelectorClient(){}public LeaderSelectorClient(String name) {this.name
= name
;}public LeaderSelector
getLeaderSelector() {return leaderSelector
;}public void setLeaderSelector(LeaderSelector leaderSelector) {this.leaderSelector
= leaderSelector
;}public void start(){leaderSelector
.start(); }@Override
public void takeLeadership(CuratorFramework client
) throws Exception
{System
.out
.println(name
+"->現在是leader了");
}@Override
public void close() throws IOException
{leaderSelector
.close();}private static String
CONNECTION_STR="zk集群地址,至少三臺機器";public static void main(String
[] args
) throws IOException
{CuratorFramework curatorFramework
= CuratorFrameworkFactory
.builder().connectString(CONNECTION_STR).sessionTimeoutMs(50000000).retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();curatorFramework
.start();LeaderSelectorClient leaderSelectorClient
=new LeaderSelectorClient("ClientA");LeaderSelector leaderSelector
=new LeaderSelector(curatorFramework
,"/leader",leaderSelectorClient
);leaderSelectorClient
.setLeaderSelector(leaderSelector
);leaderSelectorClient
.start(); System
.in
.read();}
}
我們來看下curator 實現分布式鎖的原理,這里我把注釋寫在了代碼中,所以把代碼貼到一塊
public InterProcessMutex(CuratorFramework client, String path) {this(client
, path
, new StandardLockInternalsDriver());}public StandardLockInternalsDriver() {}public InterProcessMutex(CuratorFramework client, String path, LockInternalsDriver driver) {this(client
, path
, "lock-", 1, driver
);}InterProcessMutex(CuratorFramework client, String path, String lockName, int maxLeases, LockInternalsDriver driver) {this.threadData
= Maps
.newConcurrentMap();this.basePath
= PathUtils
.validatePath(path
);this.internals
= new LockInternals(client
, driver
, path
, lockName
, maxLeases
);}public void acquire() throws Exception
{if (!this.internalLock(-1L
, (TimeUnit
)null)) {throw new IOException("Lost connection while trying to acquire lock: " + this.basePath
);}}public boolean
acquire(long time
, TimeUnit unit
) throws Exception
{return this.internalLock(time
, unit
);}private boolean
internalLock(long time
, TimeUnit unit
) throws Exception
{Thread currentThread
= Thread
.currentThread();InterProcessMutex
.LockData lockData
= (InterProcessMutex
.LockData
)this.threadData
.get(currentThread
);if (lockData
!= null) {lockData
.lockCount
.incrementAndGet();return true;} else {String lockPath
= this.internals
.attemptLock(time
, unit
, this.getLockNodeBytes());if (lockPath
!= null) {InterProcessMutex
.LockData newLockData
= new InterProcessMutex.LockData(currentThread
, lockPath
);this.threadData
.put(currentThread
, newLockData
);return true;} else {return false;}}}private final ConcurrentMap
<Thread
, InterProcessMutex
.LockData
> threadData
;private static class LockData {final Thread owningThread
;final String lockPath
;final AtomicInteger lockCount
;private LockData(Thread owningThread, String lockPath) {this.lockCount
= new AtomicInteger(1); this.owningThread
= owningThread
;this.lockPath
= lockPath
;}}String
attemptLock(long time
, TimeUnit unit
, byte
[] lockNodeBytes
) throws Exception
{long startMillis
= System
.currentTimeMillis();Long millisToWait
= unit
!= null ? unit
.toMillis(time
) : null;byte
[] localLockNodeBytes
= this.revocable
.get() != null ? new byte[0] : lockNodeBytes
;int retryCount
= 0;String ourPath
= null;boolean hasTheLock
= false;boolean isDone
= false;while(!isDone
) {isDone
= true;try {ourPath
= this.driver
.createsTheLock(this.client
, this.path
, localLockNodeBytes
);hasTheLock
= this.internalLockLoop(startMillis
, millisToWait
, ourPath
);} catch (NoNodeException var14
) {if (!this.client
.getZookeeperClient().getRetryPolicy().allowRetry(retryCount
++, System
.currentTimeMillis() - startMillis
, RetryLoop
.getDefaultRetrySleeper())) {throw var14
;}isDone
= false;}}return hasTheLock
? ourPath
: null;}public String
createsTheLock(CuratorFramework client
, String path
, byte
[] lockNodeBytes
) throws Exception
{String ourPath
;if (lockNodeBytes
!= null) {ourPath
= (String
)((ACLBackgroundPathAndBytesable
)client
.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode
.EPHEMERAL_SEQUENTIAL)).forPath(path
, lockNodeBytes
);} else {ourPath
= (String
)((ACLBackgroundPathAndBytesable
)client
.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode
.EPHEMERAL_SEQUENTIAL)).forPath(path
);}return ourPath
;}private boolean
internalLockLoop(long startMillis
, Long millisToWait
, String ourPath
) throws Exception
{boolean haveTheLock
= false;boolean doDelete
= false;try {if (this.revocable
.get() != null) {((BackgroundPathable
)this.client
.getData().usingWatcher(this.revocableWatcher
)).forPath(ourPath
);}while(this.client
.getState() == CuratorFrameworkState
.STARTED && !haveTheLock
) {List
<String
> children
= this.getSortedChildren();String sequenceNodeName
= ourPath
.substring(this.basePath
.length() + 1);PredicateResults predicateResults
= this.driver
.getsTheLock(this.client
, children
, sequenceNodeName
, this.maxLeases
);if (predicateResults
.getsTheLock()) {haveTheLock
= true;} else {String previousSequencePath
= this.basePath
+ "/" + predicateResults
.getPathToWatch();synchronized(this) {try {((BackgroundPathable
)this.client
.getData().usingWatcher(this.watcher
)).forPath(previousSequencePath
);if (millisToWait
== null) {this.wait();} else {millisToWait
= millisToWait
- (System
.currentTimeMillis() - startMillis
);startMillis
= System
.currentTimeMillis();if (millisToWait
> 0L
) {this.wait(millisToWait
);} else {doDelete
= true;break;}}} catch (NoNodeException var19
) {}}}}} catch (Exception var21
) {ThreadUtils
.checkInterrupted(var21
);doDelete
= true;throw var21
;} finally {if (doDelete
) {this.deleteOurPath(ourPath
);}}return haveTheLock
;}
《新程序員》:云原生和全面數字化實踐50位技術專家共同創作,文字、視頻、音頻交互閱讀
總結
以上是生活随笔為你收集整理的分布式锁-zk临时节点的全部內容,希望文章能夠幫你解決所遇到的問題。
如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。