Elastic-Job | 由浅入深一篇理解分布式定时任务的基本用法及简单原理解析
目錄
一、定時任務的基礎實現
1.?利用Thread及Sleep實現,通過while循環讓其不停運行
2.使用jdk的Timer和TimerTask
3.ScheduledExecutorService
4.?Quartz實現
附:Cron表達式
5.?Spring Task實現
6. 分布式定時任務Elastic-Job
1.概述
2.調度模型
3.功能
4.適用場景
5.分片策略
6. ElasticJob 原理
7.?失效轉移
其次,定時任務大體分兩種:指定間隔時間執行 和 指定某個時間執行
實現定時任務的途徑有很多,比如你甚至可以自己實現簡單的定時任務
一、定時任務的基礎實現
1.?利用Thread及Sleep實現,通過while循環讓其不停運行
public class ThreadTaskDemo {public static void main(String[] args) {Runnable runable=new Runnable() {@Overridepublic void run() {System.out.println("子線程執行任務,當前時間:"+new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));}};try {System.out.println("主線程啟動子線程時間:"+new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));scheduleThread(5L,3,runable);} catch (InterruptedException e) {e.printStackTrace();}}/*** @param duration 指定什么時間后運行 單位:秒* @param timeInterval 每次運行間隔時間 單位:秒* @param runnable 待運行的Runable對象* @throws InterruptedException*/static void scheduleThread(Long duration,Integer timeInterval,Runnable runnable) throws InterruptedException{/*阻塞等待*/TimeUnit.SECONDS.sleep(duration);final Runnable interiorRun=runnable;final Integer interiorTimeInterval=timeInterval;/*運行*/new Thread(new Runnable() {@Overridepublic void run() {while(true){/*執行方法*/interiorRun.run();try {/*任務執行間隔*/TimeUnit.SECONDS.sleep(interiorTimeInterval);} catch (InterruptedException e) {e.printStackTrace();}}}}).start();} }2.使用jdk的Timer和TimerTask
????使用jdk的Timer和TimerTask,可以實現簡單的間隔執行任務,無法實現按日歷去調度執行任務
public class TimerTaskDemo {/*** 其中Timer和TimerTask的區別和聯系:* * Timer是調度者,可以安排任務執行計劃。* * TimerTask是任務。Timer類可以調度TimerTask任務,TimerTask則通過在run()方法里實現具體任務。TimerTask也可停止自身任務。* * 一個Timer可以調度多個TimerTask。* * Timer是單線程的:Timer構造函數調用時會創建了一個新線程,所有TimerTask都是依靠這個新的線程執行。* @param args*/public static void main(String[] args) {TimerTask timerTask = new TimerTask() {@Overridepublic void run() {System.out.println("當前時間:"+new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));}};Timer timer = new Timer();timer.schedule(timerTask,10,1000);}}3.ScheduledExecutorService
????ScheduledExecutorService是并發工具包中的類,是對比前面最理想的定時任務實現方式。
public class ScheduledDemo {public static void main(String[] args) {Runnable runnable1 = new Runnable() {@Overridepublic void run() {System.out.println("runnable1當前時間:"+new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));}};Runnable runnable2 = new Runnable() {@Overridepublic void run() {System.out.println("runnable2當前時間:"+new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));}};//方式一 定義4個線程ScheduledExecutorService service = Executors.newScheduledThreadPool(4);ScheduledFuture<?> scheduledFuture= service.scheduleAtFixedRate(runnable1, 0,2, TimeUnit.SECONDS);//方式二ScheduledExecutorService service2 = Executors.newSingleThreadScheduledExecutor();service2.scheduleAtFixedRate(runnable2, 1, 2, TimeUnit.SECONDS);}}4.?Quartz實現
<dependency><groupId>org.quartz-scheduler</groupId><artifactId>quartz</artifactId><version>2.3.2</version></dependency><dependency><groupId>org.quartz-scheduler</groupId><artifactId>quartz-jobs</artifactId><version>2.3.2</version></dependency> public class QuartzDemo {public static void main(String[] args) throws SchedulerException {//創建Scheduler的工廠SchedulerFactory sf = new StdSchedulerFactory();//從工廠中獲取調度器實例Scheduler scheduler = sf.getScheduler();/*** 創建JobDetail* withDescription:job的描述* withIdentity:job的name和group*/JobDetail jb = JobBuilder.newJob(QuartzSchedueJob.class).withDescription("this is a job").withIdentity("myJob", "myJobGroup").build();//任務運行的時間,5秒后啟動任務long time = System.currentTimeMillis() + 5 * 1000L;Date statTime = new Date(time);//創建Trigger,使用SimpleScheduleBuilder或者CronScheduleBuilderTrigger t = TriggerBuilder.newTrigger().withDescription("this is a trigger").withIdentity("myTrigger", "myTriggerGroup")//.withSchedule(SimpleScheduleBuilder.simpleSchedule())//設置啟動時間.startAt(statTime)//每隔3秒執行一次.withSchedule(CronScheduleBuilder.cronSchedule("0/3 * * * * ? *")).build();//注冊任務和定時器scheduler.scheduleJob(jb, t);//啟動 調度器scheduler.start();}}附:Cron表達式
????????在上面的demo中出現了"0/3 * * * * ? *",這是cron表達式,表示定時任務執行的時間規則
????????cron?表達式是一個字符串,該字符串由?6?個空格分為?7?個域,每一個域代表一個時間含義。 格式: [秒] [分] [時] [日] [月] [周] [年],其中通常定義 “年” 的部分可以省略,實際常用的由前六部分組成。
????????關于?cron?的各個域的定義如下表格所示:
| 秒 | 是 | 0-59 | , - * / |
| 分 | 是 | 0-59 | , - * / |
| 時 | 是 | 0-23 | , - * / |
| 日 | 是 | 1-31 | , - * ? / L W |
| 月 | 是 | 1-12 或 JAN-DEC | , - * / |
| 周 | 是 | 1-7 或 SUN-SAT | , - * ? / L # |
| 年 | 否 | 1970-2099 | , - * / |
這塊不作過多描述,有興趣的可以自行了解,也可以通過在線工具轉換:quartz/Cron/Crontab表達式在線生成工具-BeJSON.com
5.?Spring Task實現
1)SpringBoot:在Spring boot啟動類上添加注解:@EnableScheduling
2)Spring:添加命名空間:xmlns:task="http://www.springframework.org/schema/task"
???????????????????添加約束:http://www.springframework.org/schema/task
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?? http://www.springframework.org/schema/task/spring‐task.xsd
??????????????????開啟任務調度:<task:annotation‐driven></task:annotation‐driven>
?定時任務串行執行:
@Component public class SpringTaskTest {private static final Logger LOGGER = LoggerFactory.getLogger(SpringTaskTest.class);/*** 每隔2秒執行一次*/@Scheduled(cron = "0/2 * * * * *")public void task1() {LOGGER.info("--------------------task1開始--------------------");try {Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();}LOGGER.info("--------------------task1結束--------------------");} }定時任務并行執行:
@Configuration //啟動類或者此處配置@EnableScheduling public class TaskConfig implements SchedulingConfigurer, AsyncConfigurer {/*** 線程池線程數量*/private int poolSize = 5;@Beanpublic ThreadPoolTaskScheduler taskScheduler() {//創建定時任務線程池ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();///初始化線程池scheduler.initialize();//線程池容量scheduler.setPoolSize(poolSize);return scheduler;}@Overridepublic Executor getAsyncExecutor() {Executor executor = taskScheduler();return executor;}@Overridepublic AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {return null;}@Overridepublic void configureTasks(ScheduledTaskRegistrar scheduledTaskRegistrar) {scheduledTaskRegistrar.setTaskScheduler(taskScheduler());} }6. 分布式定時任務Elastic-Job
1.概述
???????上文中提到的多種定時任務的實現,而本篇的重點在于站在“巨人”肩膀上的ElasticJob分布式調度框架,巨人是指“Quartz”和“Zookeeper”,Elastic-Job最開始只有一個 elastic-job-core 的項目,在 2.X 版本以后主要分為Elastic-Job-Lite 和 Elastic-Job-Cloud 兩個子項目。其中,Elastic-Job-Lite 定位為輕量級無中心化解決方案 , 使 用 jar包的形式提供分布式任務的協調服務。
????????應用在各自的節點執行任務,通過 ZK 注冊中心協調。節點注冊、節點選舉、任務分片、監聽都在 E-Job 的代碼中完成,以下是官網架構圖:
2.調度模型
????????ElasticJob Lite是線程級別調度的進程內調度。
????????1.方便與Spring、Dubbo等Java框架配合使用,自由使用Spring注入Bean;
????????2.與業務應用部署在一起,生命周期與業務應用保持一致,是典型的嵌入式輕量級架構;
????????3.適用于資源使用穩定、部署架構簡單的普通Java應用;
????????4.分布式下的每個任務節點均是以自調度的方式適時的調度作業,任務之間只需要一個注冊中心(注冊中心目前支持Zookeeper和ETCD兩種)對分布式場景下任務狀態進行協調即可;
????????5.分布式作業節點通過選舉的方式獲取主節點,主節點進行分片,完畢后主節點和其他節點并無不同,都以自我調度的反射光hi執行任務
????????ElasticJob Cloud調度方式是可以是進程內調度,作業類型屬于:常駐任務,也可以是進程級別調度,作業類型屬于:瞬時任務。
????????在ElasticJob Lite全部能力的基礎上,還擁有資源分配和任務分發的能力,將作業的開發、打包、分發、調度、治理、分片等一系列生命周期完全托管,是真正的作業云調度系統。
3.功能
? ? ? ? 1. 彈性調度:讓任務通過分片進行水平擴展的任務處理,每臺服務器只運行分配給該服務器的分片;
? ? ? ? 2. 資源分配:由Mesos實現,Mesos負責分配任務聲明所需要的資源(內存和CPU),并將分配出去的資源進行隔離
? ? ? ? 3. 作業治理:分布式場景下高可用、失效轉移、錯過作業重新執行等行為的治理協調
? ? ? ? 4. 可視化管理:包含作業增刪改查管控端、執行歷史記錄查詢、配置中心管理等
4.適用場景
????????1.復雜任務,如數據遷移,彈性分片能力大大減少海量數據遷移的時間
????????2.資源導向任務,占用大量計算資源的報表作業適合采用瞬時作業實現
????????3.訂單拉取之類的,就是我們系統中最常用的那些場景
5.分片策略
1.?分片項與分片參數
任務分片,是為了實現把一個任務拆分成多個子任務,在不同的 ejob 示例上執行。例如 100W 條數據,在配置文件中指定分成 10 個子任務(分片項),這 10 個子任務再按照一定的規則分配到 5 個實際運行的服務器上執行。除了直接用分片項 ShardingItem獲取分片任務之外,還可以用 item 對應的 parameter 獲取任務。
????????定義幾個分片項,一個任務就會有幾個線程去運行它。
注意:分片個數和分片參數要一一對應。通常把分片項設置得比 E-Job 服務器個數大一些,比如 3 臺服務器,分成 9 片,這樣如果有服務器宕機,分片還可以相對均勻。
2.?設置分片策略
// 作業分片策略 // 基于平均分配算法的分片策略 String jobShardingStrategyClass = AverageAllocationJobShardingStrategy.class.getCanonicalName(); // 定義Lite作業根配置 LiteJobConfiguration simpleJobRootConfig = LiteJobConfiguration.newBuilder(simpleJobConfig).jobShardingStrategyClass(jobShardingStrategyClass).build();? ? 3.分片方案
- 對業務主鍵進行取模,獲取余數等于分片項的數據,舉例:獲取到的 sharding item 是 0,1在 SQL 中加入過濾條件:where mod(id, 4) in (1, 2)。這種方式的缺點:會導致索引失效,查詢數據時會全表掃描。解決方案:在查詢條件中在增加一個索引條件進行過濾。
- 在表中增加一個字段,根據分片數生成一個 mod 值。取模的基數要大于機器數。否則在增加機器后,會導致機器空閑。例如取模基數是 2,而服務器有 5 臺,那么有三臺服務器永遠空閑。而取模基數是 10,生成 10 個 shardingItem,可以分配到 5 臺服務器。當然,取模基數也可以調整。
- 如果從業務層面,可以用 ShardingParamter 進行分片。例如 0=RDP, 1=CORE, 2=SIMS, 3=ECIF,List<users> = SELECT * FROM user WHERE status = 0 AND SYSTEM_ID ='RDP' limit 0, 100。
在 Spring Boot 中要 Elastic-Job 要配置的內容太多了,有沒有更簡單的添加任務的方法呢?比如在類上添加一個注解?這個時候我們就要用到 starter 了。
6. ElasticJob 原理
? ? ? ?1. new JobScheduler(regCenter, simpleJobRootConfig, jobEventConfig).init(); 進入啟動流程
/** * 初始化作業. */ public void init() {LiteJobConfiguration liteJobConfigFromRegCenter = schedulerFacade.updateJobConfiguration(liteJobConfig);// 設置分片數JobRegistry.getInstance().setCurrentShardingTotalCount(liteJobConfigFromRegCenter.getJobName(), liteJobConfigFromRegCenter.getTypeConfig().getCoreConfig().getShardingTotalCount());// 構建任務,創建調度器JobScheduleController jobScheduleController = new JobScheduleController(createScheduler(), createJobDetail(liteJobConfigFromRegCenter.getTypeConfig().getJobClass()), liteJobConfigFromRegCenter.getJobName());// 在 ZK 上注冊任務JobRegistry.getInstance().registerJob(liteJobConfigFromRegCenter.getJobName(), jobScheduleController, regCenter);// 添加任務信息并進行節點選舉schedulerFacade.registerStartUpInfo(!liteJobConfigFromRegCenter.isDisabled());// 啟動調度器jobScheduleController.scheduleJob(liteJobConfigFromRegCenter.getTypeConfig().getCoreConfig().getCron()); }? ? ? ? 2.registerStartUpInfo 方法
/** * 注冊作業啟動信息. * * @param enabled 作業是否啟用 */ public void registerStartUpInfo(final boolean enabled) {// 啟動所有的監聽器、監聽器用于監聽 ZK 節點信息的變化。listenerManager.startAllListeners();// 節點選舉leaderService.electLeader();// 服務信息持久化(寫到 ZK)serverService.persistOnline(enabled);// 實例信息持久化(寫到 ZK)instanceService.persistOnline();// 重新分片shardingService.setReshardingFlag();// 監控信息監聽器monitorService.listen();// 自診斷修復,使本地節點與 ZK 數據一致if (!reconcileService.isRunning()) {reconcileService.startAsync();} }? ? ? ? ?3. 啟動的時候進行主節點選舉
/** * 選舉主節點. */ public void electLeader() {log.debug("Elect a new leader now.");jobNodeStorage.executeInLeader(LeaderNode.LATCH, new LeaderElectionExecutionCallback());log.debug("Leader election completed."); }????????Latch 是一個分布式鎖,選舉成功后在 instance 寫入服務器信息
? ? ? ? 4. 啟動調度任務則是
/** * 調度作業. * * @param cron CRON表達式 */ public void scheduleJob(final String cron) {try {if (!scheduler.checkExists(jobDetail.getKey())) {scheduler.scheduleJob(jobDetail, createTrigger(cron));} //調用 Quartz 一樣的類進行啟動scheduler.start();} catch (final SchedulerException ex) {throw new JobSystemException(ex);} }7.?失效轉移
失效轉移,就是在執行任務的過程中發生異常時,這個分片任務可以在其他節點再次執行。
FailoverListenerManager 監聽的是 zk 的 instance 節點刪除事件。如果任務配置了 failover 等于 true,其中某個 instance 與 zk 失去聯系或被刪除,并且失效的節點又不是本身,就會觸發失效轉移邏輯。Job 的失效轉移監聽來源于 FailoverListenerManager 中內部類JobCrashedJobListener 的 dataChanged 方法。當節點任務失效時會調用 JobCrashedJobListener 監聽器,此監聽器會根據實例 id獲取所有的分片,然后調用 FailoverService 的 setCrashedFailoverFlag 方法,將每個分片 id 寫到/jobName/leader/failover/items 下,例如原來的實例負責 1、2 分片項,那么 items 節點就會寫入 1、2,代表這兩個分片項需要失效轉移。
class JobCrashedJobListener extends AbstractJobListener {@Overrideprotected void dataChanged(final String path, final Type eventType, final String data) {if (isFailoverEnabled() && Type.NODE_REMOVED == eventType && instanceNode.isInstancePath(path)) {String jobInstanceId = path.substring(instanceNode.getInstanceFullPath().length() + 1);if (jobInstanceId.equals(JobRegistry.getInstance().getJobInstance(jobName).getJobInstanceId())) {return;} // 獲取到失效的分片集合List<Integer> failoverItems = failoverService.getFailoverItems(jobInstanceId);if (!failoverItems.isEmpty()) {for (int each : failoverItems) { // 設置失效的分片項標記failoverService.setCrashedFailoverFlag(each);failoverService.failoverIfNecessary();}} else {for (int each : shardingService.getShardingItems(jobInstanceId)) {failoverService.setCrashedFailoverFlag(each);failoverService.failoverIfNecessary();}}}}}總結
以上是生活随笔為你收集整理的Elastic-Job | 由浅入深一篇理解分布式定时任务的基本用法及简单原理解析的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: C语言数组初始化的问题
- 下一篇: mysql中使用BETWEEN AND