基于Consul的分布式锁实现
我們?cè)跇?gòu)建分布式系統(tǒng)的時(shí)候,經(jīng)常需要控制對(duì)共享資源的互斥訪問(wèn)。這個(gè)時(shí)候我們就涉及到分布式鎖(也稱為全局鎖)的實(shí)現(xiàn),基于目前的各種工具,我們已經(jīng)有了大量的實(shí)現(xiàn)方式,比如:基于Redis的實(shí)現(xiàn)、基于Zookeeper的實(shí)現(xiàn)。本文將介紹一種基于Consul 的Key/Value存儲(chǔ)來(lái)實(shí)現(xiàn)分布式鎖以及信號(hào)量的方法。
分布式鎖實(shí)現(xiàn)
基于Consul的分布式鎖主要利用Key/Value存儲(chǔ)API中的acquire和release操作來(lái)實(shí)現(xiàn)。acquire和release操作是類似Check-And-Set的操作:
- acquire操作只有當(dāng)鎖不存在持有者時(shí)才會(huì)返回true,并且set設(shè)置的Value值,同時(shí)執(zhí)行操作的session會(huì)持有對(duì)該Key的鎖,否則就返回false
- release操作則是使用指定的session來(lái)釋放某個(gè)Key的鎖,如果指定的session無(wú)效,那么會(huì)返回false,否則就會(huì)set設(shè)置Value值,并返回true
具體實(shí)現(xiàn)中主要使用了這幾個(gè)Key/Value的API:
- create session:https://www.consul.io/api/session.html#session_create
- delete session:https://www.consul.io/api/session.html#delete-session
- KV acquire/release:https://www.consul.io/api/kv.html#create-update-key
基本流程
具體實(shí)現(xiàn)
| public class Lock { private static final String prefix = "lock/"; // 同步鎖參數(shù)前綴 private ConsulClient consulClient; private String sessionName; private String sessionId = null; private String lockKey; /** * * @param consulClient * @param sessionName 同步鎖的session名稱 * @param lockKey 同步鎖在consul的KV存儲(chǔ)中的Key路徑,會(huì)自動(dòng)增加prefix前綴,方便歸類查詢 */ public Lock(ConsulClient consulClient, String sessionName, String lockKey) { this.consulClient = consulClient; this.sessionName = sessionName; this.lockKey = prefix + lockKey; } /** * 獲取同步鎖 * * @param block 是否阻塞,直到獲取到鎖為止 * @return */ public Boolean lock(boolean block) { if (sessionId != null) { throw new RuntimeException(sessionId + " - Already locked!"); } sessionId = createSession(sessionName); while(true) { PutParams putParams = new PutParams(); putParams.setAcquireSession(sessionId); if(consulClient.setKVValue(lockKey, "lock:" + LocalDateTime.now(), putParams).getValue()) { return true; } else if(block) { continue; } else { return false; } } } /** * 釋放同步鎖 * * @return */ public Boolean unlock() { PutParams putParams = new PutParams(); putParams.setReleaseSession(sessionId); boolean result = consulClient.setKVValue(lockKey, "unlock:" + LocalDateTime.now(), putParams).getValue(); consulClient.sessionDestroy(sessionId, null); return result; } /** * 創(chuàng)建session * @param sessionName * @return */ private String createSession(String sessionName) { NewSession newSession = new NewSession(); newSession.setName(sessionName); return consulClient.sessionCreate(newSession, null).getValue(); } } |
單元測(cè)試
下面單元測(cè)試的邏輯:通過(guò)線程的方式來(lái)模擬不同的分布式服務(wù)來(lái)競(jìng)爭(zhēng)鎖。多個(gè)處理線程同時(shí)以阻塞方式來(lái)申請(qǐng)分布式鎖,當(dāng)處理線程獲得鎖之后,Sleep一段隨機(jī)事件,以模擬處理業(yè)務(wù)邏輯,處理完畢之后釋放鎖。
| public class TestLock { private Logger logger = Logger.getLogger(getClass()); public void testLock() throws Exception { new Thread(new LockRunner(1)).start(); new Thread(new LockRunner(2)).start(); new Thread(new LockRunner(3)).start(); new Thread(new LockRunner(4)).start(); new Thread(new LockRunner(5)).start(); Thread.sleep(200000L); } class LockRunner implements Runnable { private Logger logger = Logger.getLogger(getClass()); private int flag; public LockRunner(int flag) { this.flag = flag; } public void run() { Lock lock = new Lock(new ConsulClient(), "lock-session", "lock-key"); try { if (lock.lock(true)) { logger.info("Thread " + flag + " start!"); Thread.sleep(new Random().nextInt(3000L)); logger.info("Thread " + flag + " end!"); } } catch (Exception e) { e.printStackTrace(); } finally { lock.unlock(); } } } } |
單元測(cè)試執(zhí)行結(jié)果如下:
| 2017-04-12 21:28:09,698 INFO [Thread-0] LockRunner - Thread 1 start! 2017-04-12 21:28:12,717 INFO [Thread-0] LockRunner - Thread 1 end! 2017-04-12 21:28:13,219 INFO [Thread-2] LockRunner - Thread 3 start! 2017-04-12 21:28:15,672 INFO [Thread-2] LockRunner - Thread 3 end! 2017-04-12 21:28:15,735 INFO [Thread-1] LockRunner - Thread 2 start! 2017-04-12 21:28:17,788 INFO [Thread-1] LockRunner - Thread 2 end! 2017-04-12 21:28:18,249 INFO [Thread-4] LockRunner - Thread 5 start! 2017-04-12 21:28:19,573 INFO [Thread-4] LockRunner - Thread 5 end! 2017-04-12 21:28:19,757 INFO [Thread-3] LockRunner - Thread 4 start! 2017-04-12 21:28:21,353 INFO [Thread-3] LockRunner - Thread 4 end! |
從測(cè)試結(jié)果我們可以看到,通過(guò)分布式鎖的形式來(lái)控制并發(fā)時(shí),多個(gè)同步操作只會(huì)有一個(gè)操作能夠被執(zhí)行,其他操作只有在等鎖釋放之后才有機(jī)會(huì)去執(zhí)行,所以通過(guò)這樣的分布式鎖,我們可以控制共享資源同時(shí)只能被一個(gè)操作進(jìn)行執(zhí)行,以保障數(shù)據(jù)處理時(shí)的分布式并發(fā)問(wèn)題。
優(yōu)化建議
本文我們實(shí)現(xiàn)了基于Consul的簡(jiǎn)單分布式鎖,但是在實(shí)際運(yùn)行時(shí),可能會(huì)因?yàn)楦鞣N各樣的意外情況導(dǎo)致unlock操作沒(méi)有得到正確地執(zhí)行,從而使得分布式鎖無(wú)法釋放。所以為了更完善的使用分布式鎖,我們還必須實(shí)現(xiàn)對(duì)鎖的超時(shí)清理等控制,保證即使出現(xiàn)了未正常解鎖的情況下也能自動(dòng)修復(fù),以提升系統(tǒng)的健壯性。那么如何實(shí)現(xiàn)呢?請(qǐng)持續(xù)關(guān)注我的后續(xù)分解!
參考文檔
Key/Value的API:https://www.consul.io/api/kv.html
選舉機(jī)制:https://www.consul.io/docs/guides/leader-election.html
實(shí)現(xiàn)代碼
- GitHub:https://github.com/dyc87112/consul-distributed-lock
- 開(kāi)源中國(guó):http://git.oschina.net/didispace/consul-distributed-lock
總結(jié)
以上是生活随笔為你收集整理的基于Consul的分布式锁实现的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: Spring Cloud实战小贴士:Zu
- 下一篇: Node.js Stream - 实战篇