Redis分布式锁,看完不懂你打我
簡(jiǎn)易的redis分布式鎖
加鎖:
set key my_random_value NX PX 30000
這個(gè)命令比setnx好,因?yàn)榭梢酝瑫r(shí)設(shè)置過期時(shí)間。不設(shè)置過期時(shí)間,應(yīng)用掛了,解不了鎖,就一直鎖住了。
解鎖:
if redis.call("get",KEYS[1])==ARGV[1] thenreturn redis.call("del",KEYS[1]) elsereturn 0 end先比較一下值,相等才刪除。防止其他線程把鎖給解了。
以上方案在一般的場(chǎng)景就夠用了,但還存在一些小問題:
解決方案:參照redisson的看門狗,可以后臺(tái)起一個(gè)線程去看看業(yè)務(wù)線程執(zhí)行完了沒有,如果沒有就延長(zhǎng)過期時(shí)間。
解決方案:RedLock算法。簡(jiǎn)單說就是N個(gè)(通常是5)獨(dú)立的redis節(jié)點(diǎn)同時(shí)執(zhí)行SETNX,如果大多數(shù)成功了,就拿到了鎖。這樣就允許少數(shù)節(jié)點(diǎn)不可用。
那我們看看工業(yè)級(jí)別是怎么實(shí)現(xiàn)redis分布式鎖的呢?
Redission實(shí)現(xiàn)的redis分布式鎖
加鎖流程:
解鎖流程:
Redission加鎖使用的是redis的hash結(jié)構(gòu)。
- key :要鎖的資源名稱
- filed :uuid+":"+線程id
- value : 數(shù)值型,可以實(shí)現(xiàn)可重入鎖
源碼里面用到了netty里面Promise的一些api,我列出來幫助理解:
// 異步操作完成且正常終止boolean isSuccess();// 異步操作是否可以取消boolean isCancellable();// 異步操作失敗的原因Throwable cause();// 添加一個(gè)監(jiān)聽者,異步操作完成時(shí)回調(diào),類比javascript的回調(diào)函數(shù)Future<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);Future<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener);// 阻塞直到異步操作完成Future<V> await() throws InterruptedException;// 同上,但異步操作失敗時(shí)拋出異常Future<V> sync() throws InterruptedException;// 非阻塞地返回異步結(jié)果,如果尚未完成返回nullV getNow();源碼分析:
加鎖:
public RLock getLock(String name) {return new RedissonLock(connectionManager.getCommandExecutor(), name);}public RedissonLock(CommandAsyncExecutor commandExecutor, String name) {super(commandExecutor, name);//命令執(zhí)行器this.commandExecutor = commandExecutor;//uuidthis.id = commandExecutor.getConnectionManager().getId();//超時(shí)時(shí)間,默認(rèn)30sthis.internalLockLeaseTime = commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout();this.entryName = id + ":" + name;} public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException {//獲取線程idlong threadId = Thread.currentThread().getId();//嘗試獲取鎖Long ttl = tryAcquire(leaseTime, unit, threadId);// lock acquired//ttl為空則代表加鎖成功if (ttl == null) {return;}//如果獲取鎖失敗,則訂閱到對(duì)應(yīng)這個(gè)鎖的channel,等其他線程釋放鎖時(shí),通知線程去獲取鎖RFuture<RedissonLockEntry> future = subscribe(threadId);commandExecutor.syncSubscription(future);try {while (true) {//再次嘗試獲取鎖ttl = tryAcquire(leaseTime, unit, threadId);// lock acquiredif (ttl == null) {break;}// waiting for message//ttl大于0,則等待ttl時(shí)間后繼續(xù)嘗試獲取鎖if (ttl >= 0) {getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);} else {getEntry(threadId).getLatch().acquire();}}} finally {//取消對(duì)channel的訂閱unsubscribe(future, threadId);} // get(lockAsync(leaseTime, unit));}再來看看里面的嘗試獲取鎖的代碼:
private Long tryAcquire(long leaseTime, TimeUnit unit, long threadId) {return get(tryAcquireAsync(leaseTime, unit, threadId));}private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, final long threadId) {if (leaseTime != -1) {//如果帶有過期時(shí)間,則按照普通方式獲取鎖return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);}//先按照30秒的過期時(shí)間來執(zhí)行獲取鎖的方法RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);//異步執(zhí)行回調(diào)監(jiān)聽ttlRemainingFuture.addListener(new FutureListener<Long>() {@Override//如果還持有這個(gè)鎖,則開啟定時(shí)任務(wù)不斷刷新該鎖的過期時(shí)間public void operationComplete(Future<Long> future) throws Exception {//沒有成功執(zhí)行完成if (!future.isSuccess()) {return;}//非阻塞地返回異步結(jié)果,如果尚未完成返回nullLong ttlRemaining = future.getNow();// lock acquiredif (ttlRemaining == null) {scheduleExpirationRenewal(threadId);}}});return ttlRemainingFuture;}看門狗邏輯:
使用的是Netty的Timeout延遲任務(wù)做的。
- 比如鎖過期 30 秒, 每過 1/3 時(shí)間也就是 10 秒會(huì)檢查鎖是否存在, 存在則更新鎖的超時(shí)時(shí)間
加鎖腳本
<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {internalLockLeaseTime = unit.toMillis(leaseTime);return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,//如果鎖不存在,則通過hset設(shè)置它的值,并設(shè)置過期時(shí)間"if (redis.call('exists', KEYS[1]) == 0) then " +"redis.call('hset', KEYS[1], ARGV[2], 1); " +"redis.call('pexpire', KEYS[1], ARGV[1]); " +"return nil; " +"end; " +//如果鎖已存在,并且鎖的是當(dāng)前線程,則通過hincrby給數(shù)值遞增1,并重新設(shè)置過期時(shí)間"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +"redis.call('pexpire', KEYS[1], ARGV[1]); " +"return nil; " +"end; " +//如果鎖已存在,但并非本線程,則返回過期時(shí)間"return redis.call('pttl', KEYS[1]);",Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));}解鎖:
public RFuture<Void> unlockAsync(final long threadId) {final RPromise<Void> result = new RedissonPromise<Void>();//底層解鎖方法RFuture<Boolean> future = unlockInnerAsync(threadId);future.addListener(new FutureListener<Boolean>() {@Overridepublic void operationComplete(Future<Boolean> future) throws Exception {if (!future.isSuccess()) {cancelExpirationRenewal(threadId);result.tryFailure(future.cause());return;}Boolean opStatus = future.getNow();//如果返回空,則證明解鎖的線程和當(dāng)前鎖不是同一個(gè)線程,拋出異常if (opStatus == null) {IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "+ id + " thread-id: " + threadId);result.tryFailure(cause);return;}if (opStatus) {cancelExpirationRenewal(null);}result.trySuccess(null);}});return result;}解鎖腳本:
protected RFuture<Boolean> unlockInnerAsync(long threadId) {return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,"if (redis.call('exists', KEYS[1]) == 0) then " +"redis.call('publish', KEYS[2], ARGV[1]); " +"return 1; " +"end;" +//如果釋放鎖的線程和已存在鎖的線程不是同一個(gè)線程,返回null"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +"return nil;" +"end; " +//通過hincrby遞減1的方式,釋放一次鎖"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +//若剩余次數(shù)大于0 ,則刷新過期時(shí)間"if (counter > 0) then " +"redis.call('pexpire', KEYS[1], ARGV[2]); " +"return 0; " +"else " +//否則證明鎖已經(jīng)釋放,刪除key并發(fā)布鎖釋放的消息"redis.call('del', KEYS[1]); " +"redis.call('publish', KEYS[2], ARGV[1]); " +"return 1; "+"end; " +"return nil;",Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.unlockMessage, internalLockLeaseTime, getLockName(threadId));}書山有路勤為徑,學(xué)海無涯苦作舟
?
?
總結(jié)
以上是生活随笔為你收集整理的Redis分布式锁,看完不懂你打我的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 面试中一个暴露能力等级的问题
- 下一篇: Redis三种集群模式介绍