Redisson 分布式锁实现分析(一)
Why 分布式鎖
java.util.concurrent.locks 中包含了 JDK 提供的在多線程情況下對共享資源的訪問控制的一系列工具,它們可以幫助我們解決進程內(nèi)多線程并發(fā)時的數(shù)據(jù)一致性問題。
但是在分布式系統(tǒng)中,JDK 原生的并發(fā)鎖工具在一些場景就無法滿足我們的要求了,這就是為什么要使用分布式鎖。我總結了一句話,分布式鎖是用于解決分布式系統(tǒng)中操作共享資源時的數(shù)據(jù)一致性問題。
設計分布式鎖要注意的問題
互斥
分布式系統(tǒng)中運行著多個節(jié)點,必須確保在同一時刻只能有一個節(jié)點的一個線程獲得鎖,這是最基本的一點。
死鎖
分布式系統(tǒng)中,可能產(chǎn)生死鎖的情況要相對復雜一些。分布式系統(tǒng)是處在復雜網(wǎng)絡環(huán)境中的,當一個節(jié)點獲取到鎖,如果它在釋放鎖之前掛掉了,或者因網(wǎng)絡故障無法執(zhí)行釋放鎖的命令,都會導致其他節(jié)點無法申請到鎖。
因此分布式鎖有必要設置時效,確保在未來的一定時間內(nèi),無論獲得鎖的節(jié)點發(fā)生了什么問題,最終鎖都能被釋放掉。
性能
對于訪問量大的共享資源,如果針對其獲取鎖時造成長時間的等待,導致大量節(jié)點阻塞,是絕對不能接受的。
所以設計分布式鎖時要能夠掌握鎖持有者的動態(tài),若判斷鎖持有者處于不活動狀態(tài),要能夠強制釋放其持有的鎖。
此外,排隊等待鎖的節(jié)點如果不知道鎖何時會被釋放,則只能隔一段時間嘗試獲取一次鎖,這樣無法保證資源的高效利用,因此當鎖釋放時,要能夠通知等待隊列,使一個等待節(jié)點能夠立刻獲得鎖。
重入
考慮到一些應用場景和資源的高效利用,鎖要設計成可重入的,就像 JDK 中的 ReentrantLock 一樣,同一個線程可以重復拿到同一個資源的鎖。
RedissonLock 實現(xiàn)解讀
本文中 Redisson 的代碼版本為 2.2.17-SNAPSHOT。
這里以 lock() 方法為例,其他一系列方法與其核心實現(xiàn)基本一致。
先來看 lock() 的基本用法
RLock lock = redisson.getLock("foobar"); // 1.獲得鎖對象實例 lock.lock(); // 2.獲取分布式鎖 try {// do sth. } finally {lock.unlock(); // 3.釋放鎖 }下面來看看 RedissonLock 的具體實現(xiàn)
org.redisson.Redisson#getLock()
@Override public RLock getLock(String name) {return new RedissonLock(commandExecutor, name, id); }這里的 RLock 是繼承自 java.util.concurrent.locks.Lock 的一個 interface,getLock 返回的實際上是其實現(xiàn)類 RedissonLock 的實例。
來看看構造 RedissonLock 的參數(shù)
- commandExecutor: 與 Redis 節(jié)點通信并發(fā)送指令的真正實現(xiàn)。需要說明一下,Redisson 缺省的 CommandExecutor 實現(xiàn)是通過 eval 命令來執(zhí)行 Lua 腳本,所以要求 Redis 的版本必須為 2.6 或以上,否則你可能要自己來實現(xiàn) CommandExecutor。關于 Redisson 的 CommandExecutor 以后會專門解讀,所以本次就不多說了。
- name: 鎖的全局名稱,例如上面代碼中的 "foobar",具體業(yè)務中通常可能使用共享資源的唯一標識作為該名稱。
- id: Redisson 客戶端唯一標識,實際上就是一個 UUID.randomUUID()。
org.redisson.RedissonLock#lock()
此處略過前面幾個方法的層層調(diào)用,直接看最核心部分的方法 lockInterruptibly(),該方法在 RLock 中聲明,支持對獲取鎖的線程進行中斷操作。在直接使用 lock() 方法獲取鎖時,最后實際執(zhí)行的是 lockInterruptibly(-1, null)。
@Override public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException {// 1.嘗試獲取鎖Long ttl = tryAcquire(leaseTime, unit);// 2.獲得鎖成功if (ttl == null) {return;}// 3.等待鎖釋放,并訂閱鎖long threadId = Thread.currentThread().getId();Future<RedissonLockEntry> future = subscribe(threadId);get(future);try {while (true) {// 4.重試獲取鎖ttl = tryAcquire(leaseTime, unit);// 5.成功獲得鎖if (ttl == null) {break;}// 6.等待鎖釋放if (ttl >= 0) {getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);} else {getEntry(threadId).getLatch().acquire();}}} finally {// 7.取消訂閱unsubscribe(future, threadId);} }下面著重看看 tryAcquire() 方法的實現(xiàn),
private Long tryAcquire(long leaseTime, TimeUnit unit) {// 1.將異步執(zhí)行的結果以同步的形式返回return get(tryAcquireAsync(leaseTime, unit, Thread.currentThread().getId())); }private <T> Future<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, long threadId) {if (leaseTime != -1) {return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);}// 2.用默認的鎖超時時間去獲取鎖Future<Long> ttlRemainingFuture = tryLockInnerAsync(LOCK_EXPIRATION_INTERVAL_SECONDS,TimeUnit.SECONDS, threadId, RedisCommands.EVAL_LONG);ttlRemainingFuture.addListener(new FutureListener<Long>() {@Overridepublic void operationComplete(Future<Long> future) throws Exception {if (!future.isSuccess()) {return;}Long ttlRemaining = future.getNow();// 成功獲得鎖if (ttlRemaining == null) {// 3.鎖過期時間刷新任務調(diào)度scheduleExpirationRenewal();}}});return ttlRemainingFuture; }<T> Future<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId,RedisStrictCommand<T> command) {internalLockLeaseTime = unit.toMillis(leaseTime);// 3.使用 EVAL 命令執(zhí)行 Lua 腳本獲取鎖return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,"if (redis.call('exists', KEYS[1]) == 0) then " +"redis.call('hset', KEYS[1], ARGV[2], 1); " +"redis.call('pexpire', KEYS[1], ARGV[1]); " +"return nil; " +"end; " +"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +"redis.call('pexpire', KEYS[1], ARGV[1]); " +"return nil; " +"end; " +"return redis.call('pttl', KEYS[1]);",Collections.<Object>singletonList(getName()), internalLockLeaseTime,getLockName(threadId)); }以上就是對 lock() 的解讀,不過在實際業(yè)務中我們可能還會經(jīng)常使用 tryLock(),雖然兩者有一定差別,但核心部分的實現(xiàn)都是相同的,另外還有其他一些方法可以支持更多自定義參數(shù),本文中就不一一詳述了。
org.redisson.RedissonLock#unlock()
最后來看鎖的釋放,
@Override public void unlock() {// 1.通過 EVAL 和 Lua 腳本執(zhí)行 Redis 命令釋放鎖Boolean opStatus = commandExecutor.evalWrite(getName(), LongCodec.INSTANCE,RedisCommands.EVAL_BOOLEAN,"if (redis.call('exists', KEYS[1]) == 0) then " +"redis.call('publish', KEYS[2], ARGV[1]); " +"return 1; " +"end;" +"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +"return nil;" +"end; " +"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +"if (counter > 0) then " +"redis.call('pexpire', KEYS[1], ARGV[2]); " +"return 0; " +"else " +"redis.call('del', KEYS[1]); " +"redis.call('publish', KEYS[2], ARGV[1]); " +"return 1; "+"end; " +"return nil;",Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.unlockMessage, internalLockLeaseTime, getLockName(Thread.currentThread().getId()));// 2.非鎖的持有者釋放鎖時拋出異常if (opStatus == null) {throw new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "+ id + " thread-id: " + Thread.currentThread().getId());}// 3.釋放鎖后取消刷新鎖失效時間的調(diào)度任務if (opStatus) {cancelExpirationRenewal();} }總結
寫了這么多,其實最主要的就是上面的兩段 Lua 腳本,基于 Redis 的分布式鎖的設計完全體現(xiàn)在其中,看完這兩段腳本,再回顧一下前面的 設計分布式鎖要注意的問題 就豁然開朗了。
作者:Raymond_Z
鏈接:https://www.jianshu.com/p/de5a69622e49
來源:簡書
簡書著作權歸作者所有,任何形式的轉載都請聯(lián)系作者獲得授權并注明出處。
總結
以上是生活随笔為你收集整理的Redisson 分布式锁实现分析(一)的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 两个小工具,MySQL死锁分析,新技能又
- 下一篇: 一条简单的 SQL 执行超过1000ms