海豚调度Dolphinscheduler源码分析(三)
今天繼續分析海豚調度的源碼
上回分析的是dolphinscheduler-service模塊zookeeper相關的代碼
這回分析是dolphinscheduler-server模塊zookeeper相關的代碼
ZkMasterClient master服務zk客戶端類
類繼承的關系如下:
這個類的方法如下:
方法介紹:
- start()??根據路徑dolphinscheduler/lock/failover/master 創建一個分布式鎖,并進行初始化,檢查是否有master節點競爭鎖,確保只有一個主master,如果只有一個master節點,那么無法進行master服務的故障轉移
- dataChange() 變更zk節點
- removeZKNodePath(String path, ZKNodeType zkNodeType, boolean failover) 移除zookeeper 節點,并在/dead路徑添加節點,并會判斷是否需要容錯
- handleDeadServer()? 父類方法,就是處理宕機服務的zookeeper路徑,將獲取節點刪除,添加/dead路徑數據
- failoverServerWhenDown() 當服務宕機后,轉移服務,分為master服務和server服務
- checkTaskInstanceNeedFailover()
- failoverWorker()? 將worker上的task任務進行故障轉移
- 如果是yarn任務,干掉yarn任務
- 將任務狀態變更為需要故障轉移
- ? 當工作節點全部為null時,將所有任務進行故障轉移
zk分布式鎖獲取代碼如下:
public void start() {//Curator是zk的一個客戶端框架,其中分裝了分布式公平可重入互斥鎖,最為常見是InterProcessMutex
InterProcessMutex mutex = null;
try {
// create distributed lock with the root node path of the lock space as /dolphinscheduler/lock/failover/master
///根據這個路徑dolphinscheduler/lock/failover/master 創建一個分布式鎖
String znodeLock = getMasterStartUpLockPath();
//InterProcessMutex的構造方法,需要一個客戶端和路徑
mutex = new InterProcessMutex(getZkClient(), znodeLock);
//獲取鎖,鎖的獲取,最后必須釋放
mutex.acquire();
// init system znode
this.initSystemZNode();
//檢查是否有master節點
while (!checkZKNodeExists(NetUtils.getHost(), ZKNodeType.MASTER)){
ThreadUtils.sleep(SLEEP_TIME_MILLIS);
}
// self tolerant
//如果活動的master節點只有1個,無法進行master服務的容錯,故failoverMaster(null)
if (getActiveMasterNum() == 1) {
failoverWorker(null, true);
failoverMaster(null);
}
}catch (Exception e){
logger.error("master start up exception",e);
}finally {
//釋放鎖,這個方法是父類AbstractZKClient的,在finally中釋放,保證鎖最后能夠釋放
releaseMutex(mutex);
}
}
對于InterProcessMutex,Curator是ZooKeeper的一個客戶端框架,其中封裝了分布式互斥鎖的實現,最為常用的是InterProcessMutex
InterProcessMutex基于Zookeeper實現了分布式的公平可重入互斥鎖,類似于單個JVM進程內的ReentrantLock(fair=true)
全局同步的可重入分布式鎖,任何時刻不會有兩個客戶端同時持有該鎖。Reentrant和JDK的ReentrantLock類似, 意味著同一個客戶端在擁有鎖的同時,可以多次獲取,不會被阻塞
相關鏈接:https://blog.csdn.net/hosaos/article/details/89521537
相關鏈接:https://www.cnblogs.com/a-du/p/9876314.html
相關鏈接:https://blog.csdn.net/qq_34021712/article/details/82878396
主要方法:
//獲取鎖,若失敗則阻塞等待直到成功,支持重入 public void acquire() throws Exception //超時獲取鎖,超時失敗 public boolean acquire(long time, TimeUnit unit) throws Exception //釋放鎖,一般在finally中釋放 public void release() throws Exception注意點,調用acquire()方法后需相應調用release()來釋放鎖
private void removeZKNodePath(String path, ZKNodeType zkNodeType, boolean failover) 移除zk節點/**? ? ?
* remove zookeeper node path
** @param path zookeeper node path* @param zkNodeType zookeeper node type* @param failover is failover*/private void removeZKNodePath(String path, ZKNodeType zkNodeType, boolean failover) {logger.info("{} node deleted : {}", zkNodeType.toString(), path);InterProcessMutex mutex = null;try {String failoverPath = getFailoverLockPath(zkNodeType);// create a distributed lockmutex = new InterProcessMutex(getZkClient(), failoverPath);mutex.acquire();String serverHost = getHostByEventDataPath(path);// handle dead server//處理 宕機服務,刪除原來節點,在dead路徑增加節點,handleDeadServer(path, zkNodeType, Constants.ADD_ZK_OP);//failover server
//是否故障轉移服務if(failover){failoverServerWhenDown(serverHost, zkNodeType);}}catch (Exception e){logger.error("{} server failover failed.", zkNodeType.toString());logger.error("failover exception ",e);}finally {releaseMutex(mutex);}}
zookeeper分布式鎖詳解
在分布式環境中 ,為了保證數據的一致性,經常在程序的某個運行點(例如,減庫存操作或者流水號生成等)需要進行同步控制。以一個"流水號生成"的場景為例,普通的后臺應用通常都是使用時間戳來生成流水號,但是在用戶訪問量很大的情況下,可能會出現并發問題。下面通過示例程序就演示一個典型的并發問題:
public static void main(String[] args) throws Exception {CountDownLatch down = new CountDownLatch(1);for (int i=0;i<10;i++){new Thread(new Runnable() {@Overridepublic void run() {try {down.await();} catch (InterruptedException e) {e.printStackTrace();}SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss|SSS");String orderNo = sdf.format(new Date());System.out.println("生成的訂單號是:"+orderNo);}}).start();}down.countDown(); }輸出結果如下:
Thread[Thread-8,5,main]生成的訂單號是:14:41:26|098 Thread[Thread-4,5,main]生成的訂單號是:14:41:26|107 Thread[Thread-9,5,main]生成的訂單號是:14:41:26|108 Thread[Thread-3,5,main]生成的訂單號是:14:41:26|108 Thread[Thread-0,5,main]生成的訂單號是:14:41:26|108 Thread[Thread-6,5,main]生成的訂單號是:14:41:26|108 Thread[Thread-7,5,main]生成的訂單號是:14:41:26|108 Thread[Thread-2,5,main]生成的訂單號是:14:41:26|108 Thread[Thread-5,5,main]生成的訂單號是:14:41:26|108 Thread[Thread-1,5,main]生成的訂單號是:14:41:26|108不難發現,生成的10個訂單不少都是重復的,如果是實際的生產環境中,這顯然沒有滿足我們的也無需求。究其原因,就是因為在沒有進行同步的情況下,出現了并發問題。下面我們來看看如何使用Curator實現分布式鎖功能。
Shared Reentrant Lock(分布式可重入鎖)
全局同步的可重入分布式鎖,任何時刻不會有兩個客戶端同時持有該鎖。Reentrant和JDK的ReentrantLock類似, 意味著同一個客戶端在擁有鎖的同時,可以多次獲取,不會被阻塞。
相關的類
InterProcessMutex
使用
創建InterProcessMutex實例
InterProcessMutex提供了兩個構造方法,傳入一個CuratorFramework實例和一個要使用的節點路徑,InterProcessMutex還允許傳入一個自定義的驅動類,默認是使用StandardLockInternalsDriver。
獲取鎖
使用acquire方法獲取鎖,acquire方法有兩種:
public void acquire() throws Exception;獲取鎖,一直阻塞到獲取到鎖為止。獲取鎖的線程在獲取鎖后仍然可以調用acquire() 獲取鎖(可重入)。 鎖獲取使用完后,調用了幾次acquire(),就得調用幾次release()釋放。
public boolean acquire(long time, TimeUnit unit) throws Exception;與acquire()類似,等待time * unit時間獲取鎖,如果仍然沒有獲取鎖,則直接返回false。
釋放鎖
使用release()方法釋放鎖
線程通過acquire()獲取鎖時,可通過release()進行釋放,如果該線程多次調用 了acquire()獲取鎖,則如果只調用 一次release()該鎖仍然會被該線程持有。
注意:同一個線程中InterProcessMutex實例是可重用的,也就是不需要在每次獲取鎖的時候都new一個InterProcessMutex實例,用同一個實例就好。
鎖撤銷
InterProcessMutex 支持鎖撤銷機制,可通過調用makeRevocable()將鎖設為可撤銷的,當另一線程希望你釋放該鎖時,實例里的listener會被調用。 撤銷機制是協作的。
示例代碼(官網)
共享資源
public class FakeLimitedResource {//總共250張火車票private Integer ticket = 250;public void use() throws InterruptedException {try {System.out.println("火車票還剩"+(--ticket)+"張!");}catch (Exception e){e.printStackTrace();}} }使用鎖操作資源
public class ExampleClientThatLocks {/** 鎖 */private final InterProcessMutex lock;/** 共享資源 */private final FakeLimitedResource resource;/** 客戶端名稱 */private final String clientName;public ExampleClientThatLocks(CuratorFramework client, String lockPath, FakeLimitedResource resource, String clientName) {this.resource = resource;this.clientName = clientName;lock = new InterProcessMutex(client, lockPath);}public void doWork(long time, TimeUnit unit) throws Exception {if ( !lock.acquire(time, unit) ) {throw new IllegalStateException(clientName + " could not acquire the lock");}try {System.out.println(clientName + " has the lock");//操作資源resource.use();} finally {System.out.println(clientName + " releasing the lock");lock.release(); //總是在Final塊中釋放鎖。}} }客戶端
public class LockingExample {private static final int QTY = 5;private static final int REPETITIONS = QTY * 10;private static final String CONNECTION_STRING = "172.20.10.9:2181";private static final String PATH = "/examples/locks";public static void main(String[] args) throws Exception {//FakeLimitedResource模擬某些外部資源,這些外部資源一次只能由一個進程訪問final FakeLimitedResource resource = new FakeLimitedResource();ExecutorService service = Executors.newFixedThreadPool(QTY);try {for ( int i = 0; i < QTY; ++i ){final int index = i;Callable<Void> task = new Callable<Void>() {@Overridepublic Void call() throws Exception {CuratorFramework client = CuratorFrameworkFactory.newClient(CONNECTION_STRING, new ExponentialBackoffRetry(1000, 3,Integer.MAX_VALUE));try {client.start();ExampleClientThatLocks example = new ExampleClientThatLocks(client, PATH, resource, "Client " + index);for ( int j = 0; j < REPETITIONS; ++j ) {example.doWork(10, TimeUnit.SECONDS);}}catch ( InterruptedException e ){Thread.currentThread().interrupt();}catch ( Exception e ){e.printStackTrace();}finally{CloseableUtils.closeQuietly(client);}return null;}};service.submit(task);}service.shutdown();service.awaitTermination(10, TimeUnit.MINUTES);}catch (Exception e){e.printStackTrace();}} }起五個線程,即五個窗口賣票,五個客戶端分別有50張票可以賣,先是嘗試獲取鎖,操作資源后,釋放鎖。
轉自:https://blog.csdn.net/qq_34021712/article/details/82878396
總結
以上是生活随笔為你收集整理的海豚调度Dolphinscheduler源码分析(三)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 一行代码实现蒲公英市场APP检查更新
- 下一篇: CDH/HDP迁移之路