探寻用户自定义定时任务的实践方案
導(dǎo)讀
工作中會(huì)遇到一些由用戶自定義定時(shí)任務(wù)的業(yè)務(wù)場(chǎng)景,常用的開(kāi)源框架(如 XXL-Job、Quartz)設(shè)計(jì)的初衷是給開(kāi)發(fā)人員使用,并不適合開(kāi)放給用戶創(chuàng)建大量的自定義任務(wù)。本文借鑒開(kāi)源框架定時(shí)任務(wù)作業(yè)的思想,結(jié)合 j.u.c 的 ScheduledExecutor,提供一種定時(shí)任務(wù)的實(shí)現(xiàn)方法,以解決用戶自定義定時(shí)任務(wù)場(chǎng)景的問(wèn)題。希望對(duì)大家有所幫助。
作者:楊凱 | 網(wǎng)易智企資深開(kāi)發(fā)工程師
用戶自定義定時(shí)任務(wù)
談到定時(shí)任務(wù)的實(shí)現(xiàn),我們優(yōu)先想到的是引入優(yōu)秀的開(kāi)源框架方案去解決,常見(jiàn)的開(kāi)源產(chǎn)品上文也提到過(guò),如Quartz、XXL-Job、ElasticJob 等,但是開(kāi)源框架應(yīng)用到用戶自定義任務(wù)上,存在以下需要問(wèn)題或不足:
-
開(kāi)源框架從任務(wù)創(chuàng)建到執(zhí)行有一套標(biāo)準(zhǔn)方案,用戶自定義任務(wù)在何時(shí),何地插入符合開(kāi)源框架標(biāo)準(zhǔn)任務(wù)并能控制生效、停止是一個(gè)需要考慮的復(fù)雜問(wèn)題。
-
開(kāi)源框架(如 XXL-Job)對(duì)任務(wù)的管理和業(yè)務(wù)容器是解耦的,如果用戶要完成任務(wù)的創(chuàng)建、修改需要業(yè)務(wù)服務(wù)反向調(diào)用操作任務(wù)中心,這不符合任務(wù)中心設(shè)計(jì)原則。
-
開(kāi)源框架設(shè)計(jì)的初衷是給程序開(kāi)發(fā)者創(chuàng)建和控制任務(wù)。一般情況下,任務(wù)執(zhí)行的策略、目的都比較明確,不像用戶自定義任務(wù)存在頻繁修改和相同業(yè)務(wù)背景多個(gè)任務(wù)定義使用同一個(gè)處理邏輯。
-
開(kāi)源框架未提供用戶友好的任務(wù)配置界面。
設(shè)計(jì)用戶自定義任務(wù)組件除了要考慮上面的問(wèn)題,還需要站在用戶角度思考用戶自定義任務(wù)的特點(diǎn):
-
開(kāi)始和結(jié)束可控
用戶自定義定時(shí)任務(wù)業(yè)務(wù)依賴性強(qiáng),可以多次創(chuàng)建和更新任務(wù),但不會(huì)執(zhí)行,也會(huì)在任務(wù)執(zhí)行期間人為停止。所以任務(wù)組件要將業(yè)務(wù)任務(wù)創(chuàng)建和作業(yè)任務(wù)的創(chuàng)建區(qū)分,只創(chuàng)建、加載用戶確定執(zhí)行的任務(wù)。
-
執(zhí)行策略和執(zhí)行時(shí)間對(duì)用戶友好
程序開(kāi)發(fā)者創(chuàng)建定時(shí)任務(wù),執(zhí)行策略(單個(gè)任務(wù)循環(huán)、單次)和執(zhí)行時(shí)間是由配置的 Cron 表達(dá)式確定,但是 Cron 表達(dá)式對(duì)用戶不友好,容易配置出錯(cuò)。用戶自定義定時(shí)任務(wù)在設(shè)置定時(shí)策略和執(zhí)行時(shí)間時(shí),需要提供用戶友好的配置界面,任務(wù)組件內(nèi)部轉(zhuǎn)換成對(duì)應(yīng)的 Cron 表達(dá)式。
-
執(zhí)行時(shí)間范圍可控
完成一、二步的配置后,需要給用戶提供一個(gè)任務(wù)執(zhí)行的時(shí)間范圍,在這個(gè)時(shí)間范圍內(nèi)才會(huì)執(zhí)行任務(wù)。 簡(jiǎn)單的用戶自定義定時(shí)任務(wù)的界面如下:
清楚了用戶自定義定時(shí)任務(wù)的特點(diǎn),定義任務(wù)模型 TaskScheduleDefine 為:
| id | 任務(wù)的唯一標(biāo)識(shí) |
| busId | 業(yè)務(wù)維度的 ID:可以根據(jù)業(yè)務(wù)背景決定是唯一還是指定 |
| taskName | 任務(wù)名稱 |
| beanName | 任務(wù)處理類實(shí)例名稱 |
| cron | cron 表達(dá)式 |
| startTime | 用戶定義的開(kāi)始時(shí)間 |
| endTime | 用戶定義的結(jié)束時(shí)間 |
| isPermanent | 是否永久任務(wù) |
| multiple | 是否允許同一時(shí)間任務(wù)任務(wù)并行執(zhí)行 |
| once | 是否單次任務(wù) |
| valid | 任務(wù)是否有效 |
定時(shí)任務(wù)執(zhí)行周期
定時(shí)任務(wù)從創(chuàng)建到執(zhí)行可分為如下階段:
-
創(chuàng)建:界面化的配置(如 XXL-Job),代碼配置(如 Quartz,spring-schedule)。
-
加載:任務(wù)加載到應(yīng)用緩存,可以在創(chuàng)建時(shí)進(jìn)行,但實(shí)際上任務(wù)創(chuàng)建和加載任務(wù)是分開(kāi)的,比如當(dāng)任務(wù)被修改時(shí),實(shí)際上是有一個(gè)更新的過(guò)程的,可以把這種更新叫做任務(wù)的重載。
-
調(diào)度:判斷被加載的任務(wù)是否滿足執(zhí)行條件(如果支持分布式調(diào)度要決定那一臺(tái)服務(wù)器去執(zhí)行),如果滿足,開(kāi)始執(zhí)行。
-
執(zhí)行:開(kāi)源框架都會(huì)完成上面的三個(gè)步驟(調(diào)度中心或應(yīng)用本身),業(yè)務(wù)開(kāi)發(fā)者只用關(guān)注業(yè)務(wù)邏輯部分,做到任務(wù)調(diào)度和業(yè)務(wù)執(zhí)行解耦。
本文介紹的任務(wù)組件也是基于這個(gè)思想去實(shí)現(xiàn)用戶自定義任務(wù)。
用戶自定義任務(wù)設(shè)計(jì)
應(yīng)用啟動(dòng)時(shí),初始化任務(wù)加載線程和任務(wù)調(diào)度線程(類似于 XXL-Job 的 scheduleThread 和 ringThread)
//上傳+加載,支持本地和數(shù)據(jù)庫(kù)任務(wù) uploadAndLoadDefinition(); //初始化調(diào)度, 調(diào)度由維護(hù)任務(wù)來(lái)處理,由調(diào)度任務(wù)來(lái)喚起相應(yīng)的具體執(zhí)行 internalScheduledExecutor.scheduleAtFixedRate(new SpringTaskMonitor(), 10, 45, TimeUnit.SECONDS); //定義維護(hù) internalScheduledExecutor.scheduleAtFixedRate(new SpringTaskDefinitionMonitor(), 1, 2, TimeUnit.MINUTES);任務(wù)創(chuàng)建
將業(yè)務(wù)任務(wù)執(zhí)行和停止與作業(yè)任務(wù)創(chuàng)建和失效關(guān)聯(lián),達(dá)到用戶自定義定時(shí)任務(wù)的初衷,作業(yè)任務(wù)完全由用戶決定。
任務(wù)加載
任務(wù)加載使用 j.u.c 提供的定時(shí)任務(wù)線程池 ScheduledThreadPoolExecutor 的 scheduleAtFixedRate 方法,周期性的觸發(fā)任務(wù)的加載,保證緩存中任務(wù)的及時(shí)更新。不同的是用戶自定義任務(wù)一般都是提前創(chuàng)建好的,不需要不間斷的去查詢,而且可以通過(guò)開(kāi)始和結(jié)束時(shí)間雙重保證任務(wù)正確觸發(fā)。
注冊(cè)任務(wù)部分邏輯:
//獲取全部任務(wù)列表defineList更新任務(wù)defineList.forEach(t -> {String key = t.getBeanName() + t.getBusId();val task = TaskDefinitions.registered(key);//沒(méi)有(并且有效),就添加if (task == null) {if (t.getValid()) {TaskDefinitions.registerTask(new ScheduleTask(t));changedList.add(t);}}//有,就替換定義else {boolean changed = task.updateDefine(t);if (changed) {changedList.add(t);}}});//打印變化的任務(wù)日志 } //ScheduleTask任務(wù)定義,updateDefine這個(gè)對(duì)象的屬性 public class ScheduleTask {private long id;private TaskScheduleDefine localScheduleDefine;private CronSequenceGenerator cronGenerator;public ScheduleTask(TaskScheduleDefine taskScheduleDefine) {this.id = taskScheduleDefine.getId();this.localScheduleDefine = taskScheduleDefine; } }任務(wù)調(diào)度
調(diào)度任務(wù)的部分邏輯:
public class SpringTaskMonitor implements Runnable {private static Date DATE_INIT = new Date();@Overridepublic void run() {ExceptionUtils.doActionLogE(this::doRun);}private void doRun() throws Throwable {val taskScheduleDefineMapper = ApplicationContextUtils.getReadyApplicationContext().getBean(TaskScheduleDefineMapper.class);val taskScheduleRecordMapper = ApplicationContextUtils.getReadyApplicationContext().getBean(TaskScheduleRecordMapper.class);TaskDefinitions.getTaskMap().values().forEach(t -> {//1.無(wú)效任務(wù)if (!t.getLocalScheduleDefine().getValid()) {return;}//2.設(shè)置了過(guò)期時(shí)間Date now = new Date();if (!t.getLocalScheduleDefine().getIsPermanent()) {Date endTime = t.getLocalScheduleDefine().getEndTime();if (null == endTime || endTime.before(now)) {TaskDefinitions.getTaskMap().remove(t.getLocalScheduleDefine().getBeanName() + t.getLocalScheduleDefine().getBusId());taskScheduleDefineMapper.updateTaskValid(t.getLocalScheduleDefine().getId(), false);return;}}val lastRecord = taskScheduleRecordMapper.getLast(t.getLocalScheduleDefine().getId());Date date = lastRecord == null ? DATE_INIT : lastRecord.getExecuteDate();boolean shouldRun = false;Date nextDate = t.cronGenerator().next(date);//首次執(zhí)行且執(zhí)行時(shí)間未到重置開(kāi)始時(shí)間if (null != t.getLocalScheduleDefine().getStartTime() && nextDate.before(t.getLocalScheduleDefine().getStartTime())) {DATE_INIT = new Date();log.warn("任務(wù)執(zhí)行時(shí)間未到設(shè)置的開(kāi)始時(shí)間,重新設(shè)置系統(tǒng)時(shí)間{},本次任務(wù)忽略:{}", DateUtil.formatDate(DATE_INIT, "yyyy-MM-dd HH:mm:ss"), GsonUtil.toJson(t));return;}if (DateUtils.addSeconds(nextDate, 30).before(now)) {shouldRun = true;}if (shouldRun) {TaskWork localWork = (TaskWork) ApplicationContextUtils.getReadyApplicationContext().getBean(t.getLocalScheduleDefine().getBeanName());SpringTaskExecutor.getExecutorService().submit(() -> localWork.runJob(t));}});} }上述流程較清晰的還原了任務(wù)調(diào)度的一些主要邏輯。從任務(wù)調(diào)度的部分代碼中可以看出,整個(gè)調(diào)度過(guò)程異常被捕獲,出現(xiàn)異常不會(huì)影響下一次的調(diào)度執(zhí)行,任務(wù)的 misfire 問(wèn)題處理策略是:
-
任務(wù)過(guò)了用戶的設(shè)定時(shí)間不執(zhí)行
-
任務(wù)未到用戶的設(shè)定時(shí)間不執(zhí)行
-
任務(wù)首次執(zhí)行出了異常(以數(shù)據(jù)庫(kù)執(zhí)行記錄為準(zhǔn)),以當(dāng)前時(shí)間為觸發(fā)頻率立刻觸發(fā)一次執(zhí)行,然后按照 Cron 頻率依次執(zhí)行(類似類似于 Quartz 的默認(rèn) withMisfireHandlingInstructionFireAndProceed 模式)
-
定時(shí)任務(wù)已有執(zhí)行記錄,以錯(cuò)過(guò)的第一個(gè)頻率時(shí)間立刻開(kāi)始執(zhí)行,重做錯(cuò)過(guò)的所有頻率周期后,重當(dāng)下一次觸發(fā)頻率發(fā)生時(shí)間大于當(dāng)前時(shí)間后,再按照正常的 Cron 頻率依次執(zhí)行(類似于 Quartz的withMisfireHandlingInstructionIgnoreMisfires 模式)
另外,需要考慮的是在同一個(gè)業(yè)務(wù)場(chǎng)景下,用戶會(huì)創(chuàng)建多個(gè)任務(wù)定義,但它們執(zhí)行的業(yè)務(wù)邏輯是一樣的(執(zhí)行策略,執(zhí)行時(shí)間等不一樣)。
任務(wù)執(zhí)行
任務(wù)調(diào)度提交的任務(wù)給線程池處理,執(zhí)行前后根據(jù)任務(wù)定義對(duì)任務(wù)做一些通用處理(黃色框部分),具體的執(zhí)行業(yè)務(wù)邏輯交給接口 LocalWork 實(shí)現(xiàn)類的 execute() 方法處理。
/*** description: 輔助來(lái)完成默認(rèn)的localWork方法*/ public class TaskWorkUtils {static void helpRun(TaskWork localWork, ScheduleTask scheduleTask) {//部分偽代碼如下}} //是否任務(wù)有執(zhí)行過(guò) boolean executed = false; TaskScheduleRecord record = null; Date executeDate = new Date(); try {//根據(jù)需要決定是否獲取鎖后執(zhí)行(redisLock,zkLock,dbLock都可以,保證任務(wù)唯一執(zhí)行)String lockName = localWork.getClass().getSimpleName() + scheduleTask.getLocalScheduleDefine().getBusId();//獲取不到鎖return//獲取到執(zhí)行下面邏輯record = ExceptionUtils.doFunLogE(() -> {TaskScheduleRecord newRecord = buildRecord(scheduleTask, executeDate);newRecord.setId(taskRecordService.save(newRecord));return newRecord;});//如果不能保存成功,表示出現(xiàn)了數(shù)據(jù)庫(kù)異常,相應(yīng)狀態(tài)不能存取,則直接返回,不再執(zhí)行if (record == null) {return;}executed = true;localWork.execute(record); } catch (Throwable throwable) {log.error("執(zhí)行任務(wù)時(shí)出現(xiàn)異常信息:{}", throwable.getMessage(), throwable);e = throwable; } finally {//釋放鎖:releaseLock()//記錄異常日志,更新任務(wù)狀態(tài)和失敗原因if (record != null) { }}if (!scheduleTask.getLocalScheduleDefine().getOnce()&&executed) {Date next = scheduleTask.cronGenerator().next(executeDate);long delay = next.getTime() - executeDate.getTime();SpringTaskExecutor.getExecutorService().schedule(() -> localWork.runJob(scheduleTask), delay, TimeUnit.MILLISECONDS);}}如果要保證任務(wù)在集群中保證唯一執(zhí)行可通過(guò)分布式鎖實(shí)現(xiàn),具體的key已給參考,因?yàn)闆](méi)有提供集群節(jié)點(diǎn)注冊(cè)的功能,負(fù)載均衡的調(diào)度只能依賴集群中節(jié)點(diǎn)獲取鎖的隨機(jī)性,即那個(gè)節(jié)點(diǎn)獲取到鎖,任務(wù)在哪個(gè)節(jié)點(diǎn)執(zhí)行。
當(dāng)任務(wù)執(zhí)行出錯(cuò)時(shí)(保存完執(zhí)行記錄后),不影響下一次任務(wù)的執(zhí)行,但會(huì)更新此次任務(wù)執(zhí)行的結(jié)果和失敗原因。
任務(wù)設(shè)計(jì)小結(jié)
應(yīng)用啟動(dòng)時(shí),初始化任務(wù),開(kāi)啟任務(wù)加載線程,開(kāi)啟任務(wù)調(diào)度線程。任務(wù)加載線程周期性的從 DB 中獲取全部任務(wù),并更新緩存中任務(wù)實(shí)例;調(diào)度線程負(fù)責(zé)對(duì)任務(wù)定義實(shí)例進(jìn)行一系列的判斷,決定是否交給執(zhí)行線程池去執(zhí)行,任務(wù)加載和調(diào)用可以使用一個(gè)定時(shí)線程池。
private ScheduledExecutorService internalScheduledExecutor = new ScheduledThreadPoolExecutor(2, new ThreadFactoryBuilder().setNameFormat("task-internal-%d").build());執(zhí)行任務(wù)的線程池接收到提交的任務(wù),執(zhí)行前后做統(tǒng)一處理,任務(wù)執(zhí)行的具體業(yè)務(wù)邏輯交給具體的實(shí)現(xiàn)類去做。整個(gè)處理流程中,需要兩張表(任務(wù)定義表+任務(wù)執(zhí)行記錄表),2 個(gè)定時(shí)線程池可完成。
總結(jié)
本文基于用戶自定義定時(shí)任務(wù)的特點(diǎn),從任務(wù)創(chuàng)建、任務(wù)加載、任務(wù)調(diào)度、任務(wù)執(zhí)行四個(gè)方面詳細(xì)的介紹了任務(wù)執(zhí)行的過(guò)程,對(duì)定時(shí)任務(wù)中常見(jiàn)的問(wèn)題和處理過(guò)程附帶了部分代碼供參考,在支持一般定時(shí)任務(wù)的同時(shí)給大家提供了一種用戶自定義定時(shí)任務(wù)的實(shí)踐方法。
總結(jié)
以上是生活随笔為你收集整理的探寻用户自定义定时任务的实践方案的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: 技术干货 | 如何在 Electron
- 下一篇: 网易云信联手长沙银行,远程视频银行系统助