一、看源碼前的一些說明
1. 這篇文章的源碼筆記是基于之前的筆記,所以,這里的當前時間假設,客戶端A的過期時間,都需要結合之前的筆記
2. 通過前幾篇的筆記,現在有客戶端A,B,C三個,假設此時時間已經到達了10:00:36,客戶端C來進行的重新嘗試進行加鎖,此時客戶端B他其實在這之前不知道可能因為網絡原因或者是別的什么原因,可能他就是沒有嘗試過重新加鎖
3. 參數說明,這些參數都是Lua腳本中的參數,在閱讀源碼的時候,方便隨時回頭過來看
KEYS = Arrays.asList(getName(), threadsQueueName, timeoutSetName)KEYS[1] = getName() = 鎖的名字,“anyLock”KEYS[2] = threadsQueueName = redisson_lock_queue:{anyLock},基于redis的數據結構實現的一個隊列KEYS[3] = timeoutSetName = redisson_lock_timeout:{anyLock},基于redis的數據結構實現的一個Set數據集合,有序集合,可以自動按照你給每個數據指定的一個分數(score)來進行排序ARGV = internalLockLeaseTime, getLockName(threadId), currentTime + threadWaitTime, currentTimeARGV[1] = 30000毫秒ARGV[2] = UUID:threadIdARGV[3] = 當前時間(10:00:00) + 5000毫秒 = 10:00:05ARGV[4] = 當前時間(10:00:00)
二、代碼剖析
代碼片段一、
?RedissonFairLock
源碼解析中的1、2、3是程序進來的順序,比如1走完后,2才會走
@Override RFuture tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand command) { internalLockLeaseTime = unit.toMillis(leaseTime); long currentTime = System.currentTimeMillis(); if (command == RedisCommands.EVAL_NULL_BOOLEAN) { return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command, // remove stale threads "while true do " + "local firstThreadId2 = redis.call('lindex', KEYS[2], 0);" + "if firstThreadId2 == false then " + "break;" + "end; " + "local timeout = tonumber(redis.call('zscore', KEYS[3], firstThreadId2));" + "if timeout <= tonumber(ARGV[3]) then " + "redis.call('zrem', KEYS[3], firstThreadId2); " + "redis.call('lpop', KEYS[2]); " + "else " + "break;" + "end; " + "end;" + "if (redis.call('exists', KEYS[1]) == 0) and ((redis.call('exists', KEYS[2]) == 0) " + "or (redis.call('lindex', KEYS[2], 0) == ARGV[2])) then " + "redis.call('lpop', KEYS[2]); " + "redis.call('zrem', KEYS[3], ARGV[2]); " + "redis.call('hset', KEYS[1], ARGV[2], 1); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " + "redis.call('hincrby', KEYS[1], ARGV[2], 1); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + "return 1;", Arrays.asList(getName(), threadsQueueName, timeoutSetName), internalLockLeaseTime, getLockName(threadId), currentTime); } if (command == RedisCommands.EVAL_LONG) { return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command, // remove stale threads //1. 此時客戶端C會進入到一個死循環 "while true do “ //1. 從redisson_lock_queue:{anyLock} 中獲取第一個元素,應該是客戶端B,UUID_02:threadId_02 // 2. 此時隊列中就只剩下C了,那么此時firstThreadId2 = UUID_02:threadId_03 + "local firstThreadId2 = redis.call('lindex', KEYS[2], 0);" + "if firstThreadId2 == false then " + "break;" + "end; “ //1. zscore redisson_lock_timeout:{anyLock} UUID_02:threadId_02 獲取客戶端B的超時時間 // 2. zscore redisson_lock_timeout:{anyLock} UUID_03:threadId_03,獲取客戶端C的超時時間 + "local timeout = tonumber(redis.call('zscore', KEYS[3], firstThreadId2));” //1. 10:00:30(參考前幾篇筆記中的客戶端B的超時時間) <= 10:00:36(假設的當前時間)?條件是成立的 //2. 10:00:35(參考之前筆記客戶端B的超時時間) <= 10:00:36(假設的當前時間) ? ,條件是成立的 + "if timeout <= tonumber(ARGV[4]) then “ // 1. zrem redisson_lcok_timeout:{anyLock} UUID_02:threadId_02 其實就是將客戶端B在有序集合中的元素進行移除 // 2.zrem redisson_lock_timeout:{anyLock} UUID_03:threadId_03 ,將客戶端C在有序集合中移除 + "redis.call('zrem', KEYS[3], firstThreadId2); // 1. Lpop redisson_lock_queue:{anyLock} 將客戶端B在隊列中進行移除 // 1. 此時隊列中就只剩下客戶端C了,然后繼續進行死循環當中 // 2. lpop redisson_lock_queue:{anyLock} 將客戶端C從隊列中移除 // 2. 此時隊列中和有序集合中的的元素都為空,跳出死循環 + "redis.call('lpop', KEYS[2]); " + "else " + "break;" + "end; " + "end;" // 3. 這里的條件是不成立的,因為客戶端A還持有這把鎖的 + "if (redis.call('exists', KEYS[1]) == 0) and ((redis.call('exists', KEYS[2]) == 0) " + "or (redis.call('lindex', KEYS[2], 0) == ARGV[2])) then " + "redis.call('lpop', KEYS[2]); " + "redis.call('zrem', KEYS[3], ARGV[2]); " + "redis.call('hset', KEYS[1], ARGV[2], 1); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; “ + //3.hexists anyLock UUID_03:threadId_03 = 1 條件是不成立的,因為此時隊列時空的 "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " + "redis.call('hincrby', KEYS[1], ARGV[2], 1); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + // 3.lindex redisson_lock_timeout:{anyLock} 0 取出隊列的第一個元素,肯定是空的 "local firstThreadId = redis.call('lindex', KEYS[2], 0); " + "local ttl; “ + // if 條件不滿足 "if firstThreadId ~= false and firstThreadId ~= ARGV[2] then " + "ttl = tonumber(redis.call('zscore', KEYS[3], firstThreadId)) - tonumber(ARGV[4]);" + "else “ // ttl此時是anyLock的剩余生存時間,假設剩余23000毫秒 + "ttl = redis.call('pttl', KEYS[1]);" + "end; " + // timeout = 23000 + 10:00:36 + 5000 = 10:01:04 "local timeout = ttl + tonumber(ARGV[3]);” + // zadd redisson_lock_timeout:{anyLock} 10:01:04 UUID_03:threadId_03 將客戶端C保存到有序集合中 "if redis.call('zadd', KEYS[3], timeout, ARGV[2]) == 1 then “ + // rpush redisson_lock_queue UUID_03:threadid_03 將客戶端C放入到隊列中 "redis.call('rpush', KEYS[2], ARGV[2]);" + "end; " + "return ttl;", Arrays.asList(getName(), threadsQueueName, timeoutSetName), internalLockLeaseTime, getLockName(threadId), currentTime + threadWaitTime, currentTime); } throw new IllegalArgumentException();}
三、總結:
我們可以看到,在一個客戶端剛剛加鎖之后,其他的客戶端來爭搶這把鎖,剛開始在一定時間范圍之內,時間不要過長,各個客戶端是可以按照公平的節奏,在隊列和有序集合里面進行排序
在一定時間范圍內,時間不要過長,其實隊列里的元素順序是不會改變的,各個客戶端重新嘗試加鎖,只不過是刷新有序集合中的分數(timeout),各個客戶端的timeout不斷加長,但是整體順序大致還是保持一致的
但是如果客戶端A持有的鎖的時間過長,timeout,這個所謂的排隊是有timeout,可能會在while true死循環中將一些等待時間過長的客戶端從隊列和有序集合中刪除,一旦刪除過后,就會發生各個客戶端隨著自己重新嘗試加鎖的時間次序,重新進行一個隊列中的重排,也就是排隊的順序可能會發生變化
客戶端跟redis通信的網絡的一個問題,延遲,各種情況都可能會發生
客戶端釋放鎖,釋放鎖之后隊列中的排隊的客戶端是如何依次獲取這把鎖的,是按照隊列里的順序去獲取鎖的
四、加鎖流程圖
四、寄語:
================
如果手機觀看代碼比較難受,可以看網頁版本的
https://www.jianshu.com/p/ba52ce206986
================
總結
以上是生活随笔為你收集整理的公平锁非公平锁的实际使用_3. redisson源码公平锁之队列重排序的全部內容,希望文章能夠幫你解決所遇到的問題。
如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。