史上最全的延迟任务实现方式汇总!附代码(强烈推荐)
這篇文章的誕生要感謝一位讀者,是他讓這篇優秀的文章有了和大家見面的機會,重點是優秀文章,哈哈。
事情的經過是這樣的...
不用謝我,送人玫瑰,手有余香。相信接下來的內容一定不會讓你失望,因為它將是目前市面上最好的關于“延遲任務”的文章,這也一直是我寫作追求的目標,讓我的每一篇文章都比市面上的好那么一點點。
好了,話不多說,直接進入今天的主題,本文的主要內容如下圖所示:
什么是延遲任務?
顧明思議,我們把需要延遲執行的任務叫做延遲任務。
延遲任務的使用場景有以下這些:
等事件都需要使用延遲任務。
延遲任務實現思路分析
延遲任務實現的關鍵是在某個時間節點執行某個任務。基于這個信息我們可以想到實現延遲任務的手段有以下兩個:
而通過?JDK?實現延遲任務我們能想到的關鍵詞是:DelayQueue、ScheduledExecutorService,而第三方提供的延遲任務執行方法就有很多了,例如:Redis、Netty、MQ?等手段。
延遲任務實現
下面我們將結合代碼來講解每種延遲任務的具體實現。
1.無限循環實現延遲任務
此方式我們需要開啟一個無限循環一直掃描任務,然后使用一個?Map?集合用來存儲任務和延遲執行的時間,實現代碼如下:
import java.time.Instant; import java.time.LocalDateTime; import java.util.HashMap; import java.util.Iterator; import java.util.Map;/*** 延遲任務執行方法匯總*/ public class DelayTaskExample {// 存放定時任務private static Map<String, Long> _TaskMap = new HashMap<>();public static void main(String[] args) {System.out.println("程序啟動時間:" + LocalDateTime.now());// 添加定時任務_TaskMap.put("task-1", Instant.now().plusSeconds(3).toEpochMilli()); // 延遲 3s// 調用無限循環實現延遲任務loopTask();}/*** 無限循環實現延遲任務*/public static void loopTask() {Long itemLong = 0L;while (true) {Iterator it = _TaskMap.entrySet().iterator();while (it.hasNext()) {Map.Entry entry = (Map.Entry) it.next();itemLong = (Long) entry.getValue();// 有任務需要執行if (Instant.now().toEpochMilli() >= itemLong) {// 延遲任務,業務邏輯執行System.out.println("執行任務:" + entry.getKey() +" ,執行時間:" + LocalDateTime.now());// 刪除任務_TaskMap.remove(entry.getKey());}}}} }以上程序執行的結果為:
程序啟動時間:2020-04-12T18:51:28.188
執行任務:task-1 ,執行時間:2020-04-12T18:51:31.189
可以看出任務延遲了 3s?鐘執行了,符合我們的預期。
2.Java?API 實現延遲任務
Java API?提供了兩種實現延遲任務的方法:DelayQueue?和?ScheduledExecutorService。
①?ScheduledExecutorService?實現延遲任務
我們可以使用 ScheduledExecutorService?來以固定的頻率一直執行任務,實現代碼如下:
public class DelayTaskExample {public static void main(String[] args) {System.out.println("程序啟動時間:" + LocalDateTime.now());scheduledExecutorServiceTask();}/*** ScheduledExecutorService 實現固定頻率一直循環執行任務*/public static void scheduledExecutorServiceTask() {ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);executor.scheduleWithFixedDelay(new Runnable() {@Overridepublic void run() {// 執行任務的業務代碼System.out.println("執行任務" +" ,執行時間:" + LocalDateTime.now());}},2, // 初次執行間隔2, // 2s 執行一次TimeUnit.SECONDS);} }以上程序執行的結果為:
程序啟動時間:2020-04-12T21:28:10.416
執行任務 ,執行時間:2020-04-12T21:28:12.421
執行任務 ,執行時間:2020-04-12T21:28:14.422
......
可以看出使用?ScheduledExecutorService#scheduleWithFixedDelay(...)?方法之后,會以某個頻率一直循環執行延遲任務。
②?DelayQueue?實現延遲任務
DelayQueue 是一個支持延時獲取元素的無界阻塞隊列,隊列中的元素必須實現 Delayed 接口,并重寫?getDelay(TimeUnit) 和 compareTo(Delayed)?方法,DelayQueue?實現延遲隊列的完整代碼如下:
public class DelayTest {public static void main(String[] args) throws InterruptedException {DelayQueue delayQueue = new DelayQueue();// 添加延遲任務delayQueue.put(new DelayElement(1000));delayQueue.put(new DelayElement(3000));delayQueue.put(new DelayElement(5000));System.out.println("開始時間:" + DateFormat.getDateTimeInstance().format(new Date()));while (!delayQueue.isEmpty()){// 執行延遲任務System.out.println(delayQueue.take());}System.out.println("結束時間:" + DateFormat.getDateTimeInstance().format(new Date()));}static class DelayElement implements Delayed {// 延遲截止時間(單面:毫秒)long delayTime = System.currentTimeMillis();public DelayElement(long delayTime) {this.delayTime = (this.delayTime + delayTime);}@Override// 獲取剩余時間public long getDelay(TimeUnit unit) {return unit.convert(delayTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);}@Override// 隊列里元素的排序依據public int compareTo(Delayed o) {if (this.getDelay(TimeUnit.MILLISECONDS) > o.getDelay(TimeUnit.MILLISECONDS)) {return 1;} else if (this.getDelay(TimeUnit.MILLISECONDS) < o.getDelay(TimeUnit.MILLISECONDS)) {return -1;} else {return 0;}}@Overridepublic String toString() {return DateFormat.getDateTimeInstance().format(new Date(delayTime));}} }以上程序執行的結果為:
開始時間:2020-4-12 20:40:38
2020-4-12 20:40:39
2020-4-12 20:40:41
2020-4-12 20:40:43
結束時間:2020-4-12 20:40:43
3.Redis?實現延遲任務
使用?Redis?實現延遲任務的方法大體可分為兩類:通過?zset 數據判斷的方式,和通過鍵空間通知的方式。
①?通過數據判斷的方式
我們借助?zset?數據類型,把延遲任務存儲在此數據集合中,然后在開啟一個無線循環查詢當前時間的所有任務進行消費,實現代碼如下(需要借助?Jedis?框架):
import redis.clients.jedis.Jedis; import utils.JedisUtils; import java.time.Instant; import java.util.Set;public class DelayQueueExample {// zset keyprivate static final String _KEY = "myDelayQueue";public static void main(String[] args) throws InterruptedException {Jedis jedis = JedisUtils.getJedis();// 延遲 30s 執行(30s 后的時間)long delayTime = Instant.now().plusSeconds(30).getEpochSecond();jedis.zadd(_KEY, delayTime, "order_1");// 繼續添加測試數據jedis.zadd(_KEY, Instant.now().plusSeconds(2).getEpochSecond(), "order_2");jedis.zadd(_KEY, Instant.now().plusSeconds(2).getEpochSecond(), "order_3");jedis.zadd(_KEY, Instant.now().plusSeconds(7).getEpochSecond(), "order_4");jedis.zadd(_KEY, Instant.now().plusSeconds(10).getEpochSecond(), "order_5");// 開啟延遲隊列doDelayQueue(jedis);}/*** 延遲隊列消費* @param jedis Redis 客戶端*/public static void doDelayQueue(Jedis jedis) throws InterruptedException {while (true) {// 當前時間Instant nowInstant = Instant.now();long lastSecond = nowInstant.plusSeconds(-1).getEpochSecond(); // 上一秒時間long nowSecond = nowInstant.getEpochSecond();// 查詢當前時間的所有任務Set<String> data = jedis.zrangeByScore(_KEY, lastSecond, nowSecond);for (String item : data) {// 消費任務System.out.println("消費:" + item);}// 刪除已經執行的任務jedis.zremrangeByScore(_KEY, lastSecond, nowSecond);Thread.sleep(1000); // 每秒輪詢一次}} }②?通過鍵空間通知
默認情況下?Redis?服務器端是不開啟鍵空間通知的,需要我們通過 config set notify-keyspace-events Ex?的命令手動開啟,開啟鍵空間通知后,我們就可以拿到每個鍵值過期的事件,我們利用這個機制實現了給每個人開啟一個定時任務的功能,實現代碼如下:
import redis.clients.jedis.Jedis; import redis.clients.jedis.JedisPubSub; import utils.JedisUtils;public class TaskExample {public static final String _TOPIC = "__keyevent@0__:expired"; // 訂閱頻道名稱public static void main(String[] args) {Jedis jedis = JedisUtils.getJedis();// 執行定時任務doTask(jedis);}/*** 訂閱過期消息,執行定時任務* @param jedis Redis 客戶端*/public static void doTask(Jedis jedis) {// 訂閱過期消息jedis.psubscribe(new JedisPubSub() {@Overridepublic void onPMessage(String pattern, String channel, String message) {// 接收到消息,執行定時任務System.out.println("收到消息:" + message);}}, _TOPIC);} }4.Netty 實現延遲任務
Netty 是由 JBOSS 提供的一個 Java 開源框架,它是一個基于 NIO 的客戶、服務器端的編程框架,使用 Netty 可以確保你快速和簡單的開發出一個網絡應用,例如實現了某種協議的客戶、服務端應用。Netty 相當于簡化和流線化了網絡應用的編程開發過程,例如:基于 TCP 和 UDP 的 socket 服務開發。
可以使用?Netty?提供的工具類?HashedWheelTimer?來實現延遲任務,實現代碼如下。
首先在項目中添加?Netty?引用,配置如下:
<!-- https://mvnrepository.com/artifact/io.netty/netty-common --> <dependency><groupId>io.netty</groupId><artifactId>netty-common</artifactId><version>4.1.48.Final</version> </dependency>Netty?實現的完整代碼如下:
public class DelayTaskExample {public static void main(String[] args) {System.out.println("程序啟動時間:" + LocalDateTime.now());NettyTask();}/*** 基于 Netty 的延遲任務*/private static void NettyTask() {// 創建延遲任務實例HashedWheelTimer timer = new HashedWheelTimer(3, // 時間間隔TimeUnit.SECONDS,100); // 時間輪中的槽數// 創建一個任務TimerTask task = new TimerTask() {@Overridepublic void run(Timeout timeout) throws Exception {System.out.println("執行任務" +" ,執行時間:" + LocalDateTime.now());}};// 將任務添加到延遲隊列中timer.newTimeout(task, 0, TimeUnit.SECONDS);} }以上程序執行的結果為:
程序啟動時間:2020-04-13T10:16:23.033
執行任務 ,執行時間:2020-04-13T10:16:26.118
HashedWheelTimer 是使用定時輪實現的,定時輪其實就是一種環型的數據結構,可以把它想象成一個時鐘,分成了許多格子,每個格子代表一定的時間,在這個格子上用一個鏈表來保存要執行的超時任務,同時有一個指針一格一格的走,走到那個格子時就執行格子對應的延遲任務,如下圖所示: (圖片來源于網絡)
以上的圖片可以理解為,時間輪大小為 8,某個時間轉一格(例如 1s),每格指向一個鏈表,保存著待執行的任務。
5.MQ 實現延遲任務
如果專門開啟一個?MQ?中間件來執行延遲任務,就有點殺雞用宰牛刀般的奢侈了,不過已經有了?MQ?環境的話,用它來實現延遲任務的話,還是可取的。
幾乎所有的?MQ?中間件都可以實現延遲任務,在這里更準確的叫法應該叫延隊列。本文就使用?RabbitMQ?為例,來看它是如何實現延遲任務的。
RabbitMQ?實現延遲隊列的方式有兩種:
- 通過消息過期后進入死信交換器,再由交換器轉發到延遲消費隊列,實現延遲功能;
- 使用 rabbitmq-delayed-message-exchange 插件實現延遲功能。
注意: 延遲插件 rabbitmq-delayed-message-exchange 是在 RabbitMQ 3.5.7 及以上的版本才支持的,依賴 Erlang/OPT 18.0 及以上運行環境。
由于使用死信交換器比較麻煩,所以推薦使用第二種實現方式 rabbitmq-delayed-message-exchange 插件的方式實現延遲隊列的功能。
首先,我們需要下載并安裝?rabbitmq-delayed-message-exchange 插件,下載地址:http://www.rabbitmq.com/community-plugins.html
選擇相應的對應的版本進行下載,然后拷貝到 RabbitMQ 服務器目錄,使用命令 rabbitmq-plugins enable rabbitmq_delayed_message_exchange?開啟插件,在使用命令 rabbitmq-plugins list?查詢安裝的所有插件,安裝成功如下圖所示:
最后重啟 RabbitMQ 服務,使插件生效。
首先,我們先要配置消息隊列,實現代碼如下:
import com.example.rabbitmq.mq.DirectConfig; import org.springframework.amqp.core.*; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.util.HashMap; import java.util.Map;@Configuration public class DelayedConfig {final static String QUEUE_NAME = "delayed.goods.order";final static String EXCHANGE_NAME = "delayedec";@Beanpublic Queue queue() {return new Queue(DelayedConfig.QUEUE_NAME);}// 配置默認的交換機@BeanCustomExchange customExchange() {Map<String, Object> args = new HashMap<>();args.put("x-delayed-type", "direct");//參數二為類型:必須是x-delayed-messagereturn new CustomExchange(DelayedConfig.EXCHANGE_NAME, "x-delayed-message", true, false, args);}// 綁定隊列到交換器@BeanBinding binding(Queue queue, CustomExchange exchange) {return BindingBuilder.bind(queue).to(exchange).with(DelayedConfig.QUEUE_NAME).noargs();} }然后添加增加消息的代碼,具體實現如下:
import org.springframework.amqp.AmqpException; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessagePostProcessor; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import java.text.SimpleDateFormat; import java.util.Date;@Component public class DelayedSender {@Autowiredprivate AmqpTemplate rabbitTemplate;public void send(String msg) {SimpleDateFormat sf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");System.out.println("發送時間:" + sf.format(new Date()));rabbitTemplate.convertAndSend(DelayedConfig.EXCHANGE_NAME, DelayedConfig.QUEUE_NAME, msg, new MessagePostProcessor() {@Overridepublic Message postProcessMessage(Message message) throws AmqpException {message.getMessageProperties().setHeader("x-delay", 3000);return message;}});} }再添加消費消息的代碼:
import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import java.text.SimpleDateFormat; import java.util.Date;@Component @RabbitListener(queues = "delayed.goods.order") public class DelayedReceiver {@RabbitHandlerpublic void process(String msg) {SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");System.out.println("接收時間:" + sdf.format(new Date()));System.out.println("消息內容:" + msg);} }最后,我們使用代碼測試一下:
import com.example.rabbitmq.RabbitmqApplication; import com.example.rabbitmq.mq.delayed.DelayedSender; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner;import java.text.SimpleDateFormat; import java.util.Date;@RunWith(SpringRunner.class) @SpringBootTest public class DelayedTest {@Autowiredprivate DelayedSender sender;@Testpublic void Test() throws InterruptedException {SimpleDateFormat sf = new SimpleDateFormat("yyyy-MM-dd");sender.send("Hi Admin.");Thread.sleep(5 * 1000); //等待接收程序執行之后,再退出測試} }以上程序的執行結果如下:
發送時間:2020-04-13 20:47:51
接收時間:2020-04-13 20:47:54
消息內容:Hi Admin.
從結果可以看出,以上程序執行符合延遲任務的實現預期。
6.使用?Spring?定時任務
如果你使用的是?Spring?或?SpringBoot?的項目的話,可以使用借助 Scheduled?來實現,本文將使用?SpringBoot?項目來演示?Scheduled?的實現,實現我們需要聲明開啟?Scheduled,實現代碼如下:
@SpringBootApplication @EnableScheduling public class Application {public static void main(String[] args) {SpringApplication.run(Application.class, args);} }然后添加延遲任務,實現代碼如下:
@Component public class ScheduleJobs {@Scheduled(fixedDelay = 2 * 1000)public void fixedDelayJob() throws InterruptedException {System.out.println("任務執行,時間:" + LocalDateTime.now());} }此時當我們啟動項目之后就可以看到任務以延遲了 2s?的形式一直循環執行,結果如下:
任務執行,時間:2020-04-13T14:07:53.349
任務執行,時間:2020-04-13T14:07:55.350
任務執行,時間:2020-04-13T14:07:57.351
...
我們也可以使用 Corn 表達式來定義任務執行的頻率,例如使用 @Scheduled(cron = "0/4 * * * * ?")?。
7.Quartz 實現延遲任務
Quartz 是一款功能強大的任務調度器,可以實現較為復雜的調度功能,它還支持分布式的任務調度。
我們使用?Quartz?來實現一個延遲任務,首先定義一個執行任務代碼如下:
import org.quartz.JobExecutionContext; import org.quartz.JobExecutionException; import org.springframework.scheduling.quartz.QuartzJobBean;import java.time.LocalDateTime;public class SampleJob extends QuartzJobBean {@Overrideprotected void executeInternal(JobExecutionContext jobExecutionContext)throws JobExecutionException {System.out.println("任務執行,時間:" + LocalDateTime.now());} }在定義一個?JobDetail 和 Trigger?實現代碼如下:
import org.quartz.*; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration;@Configuration public class SampleScheduler {@Beanpublic JobDetail sampleJobDetail() {return JobBuilder.newJob(SampleJob.class).withIdentity("sampleJob").storeDurably().build();}@Beanpublic Trigger sampleJobTrigger() {// 3s 后執行SimpleScheduleBuilder scheduleBuilder =SimpleScheduleBuilder.simpleSchedule().withIntervalInSeconds(3).withRepeatCount(1);return TriggerBuilder.newTrigger().forJob(sampleJobDetail()).withIdentity("sampleTrigger").withSchedule(scheduleBuilder).build();} }最后在?SpringBoot?項目啟動之后開啟延遲任務,實現代碼如下:
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.CommandLineRunner; import org.springframework.scheduling.quartz.SchedulerFactoryBean;/*** SpringBoot 項目啟動后執行*/ public class MyStartupRunner implements CommandLineRunner {@Autowiredprivate SchedulerFactoryBean schedulerFactoryBean;@Autowiredprivate SampleScheduler sampleScheduler;@Overridepublic void run(String... args) throws Exception {// 啟動定時任務schedulerFactoryBean.getScheduler().scheduleJob(sampleScheduler.sampleJobTrigger());} }以上程序的執行結果如下:
2020-04-13 19:02:12.331 ?INFO 17768 --- [ ?restartedMain] com.example.demo.DemoApplication ? ? ? ? : Started DemoApplication in 1.815 seconds (JVM running for 3.088)
任務執行,時間:2020-04-13T19:02:15.019
從結果可以看出在項目啟動 3s?之后執行了延遲任務。
總結
本文講了延遲任務的使用場景,以及延遲任務的 10 種實現方式:
最后的話
俗話說:臺上一分鐘,臺下十年功。本文的所有內容皆為作者多年工作積累的結晶,以及熬夜嘔心瀝血的整理,如果覺得本文有幫助到你,請幫我分享出去,讓更多的人看到,謝謝你。
更多精彩內容,請關注微信公眾號「Java中文社群」
創作挑戰賽新人創作獎勵來咯,堅持創作打卡瓜分現金大獎總結
以上是生活随笔為你收集整理的史上最全的延迟任务实现方式汇总!附代码(强烈推荐)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 32张图带你彻底搞懂事务和锁!
- 下一篇: 数组转List的3种方法和使用对比!