廢話少說,直接進入正題。
相信大家對XXL-JOB都很了解,故本文對源碼不進行過多介紹,側重的是看源碼過程中想到的幾個知識點 ,不一定都對,請大神們批評指正。
XXL-JOB簡介
XXL-JOB是一個輕量級分布式任務調度平臺,其核心設計目標是開發迅速、學習簡單、輕量級、易擴展。現已開放源代碼并接入多家公司線上產品線,開箱即用。 XXL-JOB分為調度中心、執行器、數據中心,調度中心負責任務管理及調度、執行器管理、日志管理等,執行器負責任務執行及執行結果回調。
任務調度 - “類時間輪”的實現
時間輪
時間輪出自Netty中的HashedWheelTimer,是一個環形結構,可以用時鐘來類比,鐘面上有很多bucket,每一個bucket上可以存放多個任務,使用一個List保存該時刻到期的所有任務,同時一個指針隨著時間流逝一格一格轉動,并執行對應bucket上所有到期的任務。任務通過取模 決定應該放入哪個bucket。和HashMap的原理類似,newTask對應put,使用List來解決 Hash 沖突。
?
?
?
以上圖為例,假設一個bucket是1秒,則指針轉動一輪表示的時間段為8s,假設當前指針指向 0,此時需要調度一個3s后執行的任務,顯然應該加入到(0+3=3)的方格中,指針再走3s次就可以執行了;如果任務要在10s后執行,應該等指針走完一輪零2格再執行,因此應放入2,同時將round(1)保存到任務中。檢查到期任務時只執行round為0的,bucket上其他任務的round減1。
當然,還有優化的“分層時間輪”的實現,請參考https://cnkirito.moe/timer/。
XXL-JOB中的“時間輪”
?
?
?
XXL-JOB中負責任務調度的有兩個線程,分別為ringThread和scheduleThread,其作用如下。
1、scheduleThread:對任務信息進行讀取,預讀未來5s 即將觸發的任務,放入時間輪。 2、ringThread:對當前bucket和前一個bucket中的任務取出并執行。
下面結合源代碼看下,為什么說是“類時間輪”,關鍵代碼附上了注解,請大家留意觀看。
// 環狀結構
private volatile static Map<Integer, List<Integer>> ringData = new ConcurrentHashMap<>();// 任務下次啟動時間(單位為秒) % 60
int ringSecond = (int)((jobInfo.getTriggerNextTime()/1000)%60);// 任務放進時間輪
private void pushTimeRing(int ringSecond, int jobId){// push async ringList<Integer> ringItemData = ringData.get(ringSecond);if (ringItemData == null) {ringItemData = new ArrayList<Integer>();ringData.put(ringSecond, ringItemData);}ringItemData.add(jobId);}
復制代碼
// 同時取兩個時間刻度的任務
List<Integer> ringItemData = new ArrayList<>();
int nowSecond = Calendar.getInstance().get(Calendar.SECOND);
// 避免處理耗時太長,跨過刻度,向前校驗一個刻度;
for (int i = 0; i < 2; i++) {List<Integer> tmpData = ringData.remove( (nowSecond+60-i)%60 );if (tmpData != null) {ringItemData.addAll(tmpData);}
}
// 運行
for (int jobId: ringItemData) {JobTriggerPoolHelper.trigger(jobId, TriggerTypeEnum.CRON, -1, null, null);
}
復制代碼
一致性Hash路由中的Hash算法
大家也知道,XXL-JOB在執行任務時,任務具體在哪個執行器上運行是根據路由策略來決定的,其中有一個策略是一致性Hash策略(源碼在ExecutorRouteConsistentHash.java),自然而然想到了一致性Hash算法 。 一致性Hash算法 是為了解決分布式系統中負載均衡的問題時候可以使用Hash算法讓固定的一部分請求落到同一臺服務器上,這樣每臺服務器固定處理一部分請求(并維護這些請求的信息),起到負載均衡的作用。普通的余數hash(hash(比如用戶id)%服務器機器數)算法伸縮性很差,當新增或者下線服務器機器時候,用戶id與服務器的映射關系會大量失效。一致性hash則利用hash環對其進行了改進。 一致性Hash算法 在實踐中,當服務器節點比較少的時候會出現上節所說的一致性hash傾斜的問題,一個解決方法是多加機器,但是加機器是有成本的,那么就加虛擬節點 。具體原理請參考https://www.jianshu.com/p/e968c081f563。 下圖為帶有虛擬節點的Hash環,其中ip1-1是ip1的虛擬節點,ip2-1是ip2的虛擬節點,ip3-1是ip3的虛擬節點。
?
?
?
可見 ,一致性Hash算法的關鍵在于Hash算法 ,保證虛擬節點 及Hash結果 的均勻性, 而均勻性可以理解為減少Hash沖突 ,Hash沖突的知識點請參考從HashMap,Redis 字典看【Hash】。。。。
XXL-JOB中的一致性Hash的Hash函數如下。
// jobId轉換為md5
// 不直接用hashCode() 是因為擴大hash取值范圍,減少沖突
byte[] digest = md5.digest();// 32位hashCode
long hashCode = ((long) (digest[3] & 0xFF) << 24)| ((long) (digest[2] & 0xFF) << 16)| ((long) (digest[1] & 0xFF) << 8)| (digest[0] & 0xFF);long truncateHashCode = hashCode & 0xffffffffL;
復制代碼
看到上圖的Hash函數,讓我想到了HashMap的Hash函數
f(key) = hash(key) & (table.length - 1)
// 使用>>> 16的原因,hashCode()的高位和低位都對f(key)有了一定影響力,使得分布更加均勻,散列沖突的幾率就小了。
hash(key) = (h = key.hashCode()) ^ (h >>> 16)
復制代碼
同理,將jobId的md5編碼的高低位都對Hash結果有影響,使得Hash沖突的概率減小。
分片任務的實現 - 維護線程上下文
XXL-JOB的分片任務實現了任務的分布式執行,其實是筆者調研的重點,日常開發中很多定時任務都是單機執行,對于后續數據量大的任務最好有一個分布式的解決方案。
分片任務的路由策略,源代碼作者提出了分片廣播 的概念,剛開始還有點摸不清頭腦,看了源碼逐漸清晰了起來。
想必看過源碼的也遇到過這么一個小插曲,路由策略咋沒實現?如下圖所示。
public enum ExecutorRouteStrategyEnum {FIRST(I18nUtil.getString("jobconf_route_first"), new ExecutorRouteFirst()),LAST(I18nUtil.getString("jobconf_route_last"), new ExecutorRouteLast()),ROUND(I18nUtil.getString("jobconf_route_round"), new ExecutorRouteRound()),RANDOM(I18nUtil.getString("jobconf_route_random"), new ExecutorRouteRandom()),CONSISTENT_HASH(I18nUtil.getString("jobconf_route_consistenthash"), new ExecutorRouteConsistentHash()),LEAST_FREQUENTLY_USED(I18nUtil.getString("jobconf_route_lfu"), new ExecutorRouteLFU()),LEAST_RECENTLY_USED(I18nUtil.getString("jobconf_route_lru"), new ExecutorRouteLRU()),FAILOVER(I18nUtil.getString("jobconf_route_failover"), new ExecutorRouteFailover()),BUSYOVER(I18nUtil.getString("jobconf_route_busyover"), new ExecutorRouteBusyover()),// 說好的實現呢???竟然是nullSHARDING_BROADCAST(I18nUtil.getString("jobconf_route_shard"), null);
復制代碼
再繼續追查得到了結論,待我慢慢道來,首先分片任務執行參數傳遞的是什么?看XxlJobTrigger.trigger函數中的一段代碼。
...
// 如果是分片路由,走的是這段邏輯
if (ExecutorRouteStrategyEnum.SHARDING_BROADCAST == ExecutorRouteStrategyEnum.match(jobInfo.getExecutorRouteStrategy(), null)&& group.getRegistryList() != null && !group.getRegistryList().isEmpty()&& shardingParam == null) {for (int i = 0; i < group.getRegistryList().size(); i++) {// 最后兩個參數,i是當前機器在執行器集群當中的index,group.getRegistryList().size()為執行器總數processTrigger(group, jobInfo, finalFailRetryCount, triggerType, i, group.getRegistryList().size());}}
...
復制代碼
參數經過自研RPC傳遞到執行器,在執行器中具體負責任務執行的JobThread.run中,看到了如下代碼。
// 分片廣播的參數比set進了ShardingUtil
ShardingUtil.setShardingVo(new ShardingUtil.ShardingVO(triggerParam.getBroadcastIndex(), triggerParam.getBroadcastTotal()));
...
// 將執行參數傳遞給jobHandler執行
handler.execute(triggerParamTmp.getExecutorParams())
復制代碼
接著看ShardingUtil,才發現了其中的奧秘,請看代碼。
public class ShardingUtil {// 線程上下文private static InheritableThreadLocal<ShardingVO> contextHolder = new InheritableThreadLocal<ShardingVO>();// 分片參數對象public static class ShardingVO {private int index; // sharding indexprivate int total; // sharding total// 次數省略 get/set}// 參數對象注入上下文public static void setShardingVo(ShardingVO shardingVo){contextHolder.set(shardingVo);}// 從上下文中取出參數對象public static ShardingVO getShardingVo(){return contextHolder.get();}}
復制代碼
顯而易見,在負責分片任務的ShardingJobHandler里取出了線程上下文中的分片參數,這里也給個代碼把~
@JobHandler(value="shardingJobHandler")
@Service
public class ShardingJobHandler extends IJobHandler {@Overridepublic ReturnT<String> execute(String param) throws Exception {// 分片參數ShardingUtil.ShardingVO shardingVO = ShardingUtil.getShardingVo();XxlJobLogger.log("分片參數:當前分片序號 = {}, 總分片數 = {}", shardingVO.getIndex(), shardingVO.getTotal());// 業務邏輯for (int i = 0; i < shardingVO.getTotal(); i++) {if (i == shardingVO.getIndex()) {XxlJobLogger.log("第 {} 片, 命中分片開始處理", i);} else {XxlJobLogger.log("第 {} 片, 忽略", i);}}return SUCCESS;}}
復制代碼
由此得出,分布式實現是根據分片參數index及total來做的,簡單來講,就是給出了當前執行器的標識,根據這個標識將任務的數據或者邏輯進行區分,即可實現分布式運行。 題外話:至于為什么用外部注入分片參數的方式,不直接execute傳遞?
1、可能是因為只有分片任務才用到這兩個參數 2、IJobHandler只有String類型參數
看完源碼后的思考
1、經過此次看源代碼,XXL-JOB的設計目標確實符合開發迅速、學習簡單、輕量級、易擴展 。 2、至于自研RPC還沒有具體考量,具體接入應該會考慮公司的RPC框架。 3、作者給出的Quartz調度的不足,筆者得繼續深入了解。 4、框架中很多對宕機、故障、超時等異常狀況的兼容值得學習。 5、Rolling日志以及日志系統實現需要繼續了解。
參考文獻
www.xuxueli.com/xxl-job/#/?… cnkirito.moe/timer/ www.jianshu.com/p/e968c081f…
總結
以上是生活随笔 為你收集整理的揪出XXL-JOB中的细节 的全部內容,希望文章能夠幫你解決所遇到的問題。
如果覺得生活随笔 網站內容還不錯,歡迎將生活随笔 推薦給好友。