zookeeper应用之分布式锁
前一段時間有討論過用redis來實現分布式鎖,講到setNx不是原子性、redis新的set方法及其誤刪和守護線程,還為了原子性不得不使用redis的腳本。雖然最終分布式鎖的這個效果是實現了,但是,不夠優雅。這里討論一下zookeeper對分布式鎖的實現。
??首先說一下zk中節點的類型,共分為四個類型:持久節點、臨時節點和持久有序節點、臨時有序節點。?
??什么是持久節點呢?字面意思,就是持久化的節點,當創建持久節點后,客戶端斷開了和服務器的鏈接,持久節點仍舊存在,與之相反的就是臨時節點了,臨時節點會在客戶端斷開鏈接后消失(客戶端主動自刎或者zk服務器清理門戶)。?
??什么是順序節點呢?先說下非順序節點,比如節點“lock”,你只可以創建一次,創建第二次就會提示你“NodeExistsException”
??假如使用的是順序節點呢,上面這個異常就不會出現了,順序節點會在節點名字后面追加一個序號,如果你使用“/lock”作為順序節點,實際節點路徑是“/lock0000000001”,十位數量的坑,絕對夠用了。
??再來說一下zk的數據結構.zk是樹形的數據結構,?
?
我們可以利用最左節點這個特性來實現分布式鎖:如果有多個請求過來,會依次掛在指定節點下,我們每次取最左邊的那個節點,執行完以后刪除節點,讓第二左節點接棒成為最左節點。
??下面就是具體的代碼了:?
先看下Main.java,這個是入口
還有具體的執行類 DistributedLock.java
import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.zookeeper.*; import org.apache.zookeeper.data.Stat;import java.util.List; import java.io.IOException; import java.util.Collections; import java.util.concurrent.CountDownLatch;public class DistributedLock implements Watcher {private static final Logger LOG = LoggerFactory.getLogger(DistributedLock.class);/** 父lock節點,持久有序節點,本demo所有的鎖都是在這個節點下面 */private String groupPath;/** 具體的鎖節點,隸屬于groupPath下 */private String subPath;/** 當前線程信息,做日志輸出時候用到 */private String LOG_PREFIX_OF_THREAD;/** zk的客戶端連接 */private ZooKeeper zk = null;/** 本節點的path */private String selfPath;/** pre節點的path */private String waitPath;/** 模擬的請求個數 */static final int THREAD_NUM = 3;/** 確保連接zk成功才開始工作,避免拋出連接丟失異常 * <span>org.apache.zookeeper.KeeperException$ConnectionLossException: KeeperErrorCode = ConnectionLoss for /lockParent</span>*/private CountDownLatch connectedSemaphore = new CountDownLatch(1);/** 確保所有線程運行結束; */static final CountDownLatch threadSemaphore = new CountDownLatch(THREAD_NUM);public DistributedLock(int threadId,String groupPath,String subPath) {this.groupPath = groupPath;this.subPath = subPath;LOG_PREFIX_OF_THREAD = "【第" + threadId + "個線程】";}/*** 獲取鎖成功*/public void getLockSuccess() throws KeeperException, InterruptedException {if (zk.exists(this.selfPath, false) == null) {LOG.error(LOG_PREFIX_OF_THREAD + "本節點已不在了...");return;}LOG.info(LOG_PREFIX_OF_THREAD + "獲取鎖成功,趕緊干活!");Thread.sleep(5000);LOG.info(LOG_PREFIX_OF_THREAD + "刪除本節點:" + selfPath);zk.delete(this.selfPath, -1);releaseConnection();threadSemaphore.countDown();}/*** 檢查自己是不是最小的節點* @return*/public boolean checkMinPath() throws KeeperException, InterruptedException {List<String> subNodes = zk.getChildren(groupPath, false);Collections.sort(subNodes);int index = subNodes.indexOf(selfPath.substring(groupPath.length() + 1));switch (index) {case -1: {LOG.error(LOG_PREFIX_OF_THREAD + "父lock節點下找不到本節點:" + selfPath);return false;}case 0: {LOG.info(LOG_PREFIX_OF_THREAD + "本節點是最左節點:" + selfPath);return true;}default: {this.waitPath = groupPath + "/" + subNodes.get(index - 1);LOG.info(LOG_PREFIX_OF_THREAD + "獲取子節點中,排在我前面的節點是:" + waitPath);try {zk.getData(waitPath, true, new Stat());} catch (KeeperException e) {if (zk.exists(waitPath, false) == null) {LOG.info(LOG_PREFIX_OF_THREAD + "子節點中,排在我前面的" + waitPath + "已失蹤,幸福來得太突然?");return checkMinPath();} else {throw e;}}return false;}}}/*** 創建ZK連接* @param connectString ZK服務器地址列表* @param sessionTimeout Session超時時間*/public void createConnection(String connectString, int sessionTimeout) throws IOException, InterruptedException {zk = new ZooKeeper(connectString, sessionTimeout, this);connectedSemaphore.await();}/*** 創建子節點* @return*/public void createChilden() throws KeeperException, InterruptedException {selfPath = zk.create(subPath, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);LOG.info(LOG_PREFIX_OF_THREAD + "創建鎖路徑:" + selfPath);}/*** 創建節點* @param path 節點path* @param data 初始數據內容* @return*/public void createParent(String path, String data, boolean needWatch)throws KeeperException, InterruptedException {if (null == zk.exists(path, needWatch)) {String createResult = this.zk.create(path, data.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT);LOG.info(LOG_PREFIX_OF_THREAD + "節點創建成功, Path: " + createResult + ", content: " + data);}}/*** 關閉ZK連接*/public void releaseConnection() {if (this.zk != null) {try {this.zk.close();} catch (InterruptedException e) {}}LOG.info(LOG_PREFIX_OF_THREAD + "釋放連接");}public void process(WatchedEvent event) {if (event == null) {return;}Event.KeeperState keeperState = event.getState();Event.EventType eventType = event.getType();if (Event.KeeperState.SyncConnected == keeperState) {if (Event.EventType.None == eventType) {LOG.info(LOG_PREFIX_OF_THREAD + "成功連接上ZK服務器");connectedSemaphore.countDown();} else if (event.getType() == Event.EventType.NodeDeleted && event.getPath().equals(waitPath)) {LOG.info(LOG_PREFIX_OF_THREAD + "收到情報,排我前面的家伙已掛,我是不是可以出山了?");try {if (checkMinPath()) {getLockSuccess();}} catch (Exception e) {e.printStackTrace();}}} else if (Event.KeeperState.Disconnected == keeperState) {LOG.info(LOG_PREFIX_OF_THREAD + "與ZK服務器斷開連接");} else if (Event.KeeperState.AuthFailed == keeperState) {LOG.info(LOG_PREFIX_OF_THREAD + "權限檢查失敗");} else if (Event.KeeperState.Expired == keeperState) {LOG.info(LOG_PREFIX_OF_THREAD + "會話失效");}} }運行Main類,可以清楚的看到三個線程先后獲取鎖、等待獲取鎖的信息
2018-06-23 23:54:56,656 - 【第3個線程】成功連接上ZK服務器 2018-06-23 23:54:56,867 - 【第3個線程】創建鎖路徑:/lockParent/lock0000000007 2018-06-23 23:54:56,923 - 【第3個線程】本節點是最左節點:/lockParent/lock0000000007 2018-06-23 23:54:56,936 - 【第3個線程】獲取鎖成功,趕緊干活! 2018-06-23 23:54:59,604 - 【第2個線程】成功連接上ZK服務器 2018-06-23 23:54:59,612 - 【第1個線程】成功連接上ZK服務器 2018-06-23 23:54:59,686 - 【第2個線程】創建鎖路徑:/lockParent/lock0000000008 2018-06-23 23:54:59,698 - 【第1個線程】創建鎖路徑:/lockParent/lock0000000009 2018-06-23 23:54:59,706 - 【第1個線程】獲取子節點中,排在我前面的節點是:/lockParent/lock0000000008 2018-06-23 23:54:59,709 - 【第2個線程】獲取子節點中,排在我前面的節點是:/lockParent/lock0000000007 2018-06-23 23:55:01,937 - 【第3個線程】刪除本節點:/lockParent/lock0000000007 2018-06-23 23:55:01,963 - 【第2個線程】收到情報,排我前面的家伙已掛,我是不是可以出山了? 2018-06-23 23:55:01,979 - 【第2個線程】本節點是最左節點:/lockParent/lock0000000008 2018-06-23 23:55:01,993 - 【第2個線程】獲取鎖成功,趕緊干活! 2018-06-23 23:55:02,029 - 【第3個線程】釋放連接 2018-06-23 23:55:06,993 - 【第2個線程】刪除本節點:/lockParent/lock0000000008 2018-06-23 23:55:07,008 - 【第1個線程】收到情報,排我前面的家伙已掛,我是不是可以出山了? 2018-06-23 23:55:07,032 - 【第1個線程】本節點是最左節點:/lockParent/lock0000000009 2018-06-23 23:55:07,039 - 【第2個線程】釋放連接 2018-06-23 23:55:07,062 - 【第1個線程】獲取鎖成功,趕緊干活! 2018-06-23 23:55:12,063 - 【第1個線程】刪除本節點:/lockParent/lock0000000009 2018-06-23 23:55:12,107 - 【第1個線程】釋放連接 2018-06-23 23:55:12,107 - 所有線程運行結束!假如覺得上面的實現不夠簡潔,curator開源項目提供zookeeper分布式鎖實現?
這是maven依賴
具體代碼:
public static void main(String[] args) throws Exception {//創建zookeeper的客戶端RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);CuratorFramework client = CuratorFrameworkFactory.newClient("10.21.41.181:2181,10.21.42.47:2181,10.21.49.252:2181", retryPolicy);client.start();//創建分布式鎖, 鎖空間的根節點路徑為/curator/lockInterProcessMutex mutex = new InterProcessMutex(client, "/curator/lock");mutex.acquire();//獲得了鎖, 進行業務流程System.out.println("Enter mutex");//完成業務流程, 釋放鎖mutex.release();//關閉客戶端client.close(); }短短的十行代碼就能實現鎖的效果,只需要在acquire和release代碼中間進行我們的業務操作即可,十分簡潔明了
總結
以上是生活随笔為你收集整理的zookeeper应用之分布式锁的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: AlphaGo技术剖析:揭开围棋大脑的神
- 下一篇: ElasticSearch 使用Java