mysql异步查询 java_java 手写并发框架(一)异步查询转同步的 7 种实现方式
序言
本節(jié)將學(xué)習(xí)一下如何實現(xiàn)異步查詢轉(zhuǎn)同步的方式,共計介紹了 7 種常見的實現(xiàn)方式。
思維導(dǎo)圖如下:
異步轉(zhuǎn)同步
業(yè)務(wù)需求
有些接口查詢反饋結(jié)果是異步返回的,無法立刻獲取查詢結(jié)果。
比如業(yè)務(wù)開發(fā)中我們調(diào)用其他系統(tǒng),但是結(jié)果的返回確實通知的。
或者 rpc 實現(xiàn)中,client 調(diào)用 server 端,結(jié)果也是異步返回的,那么如何同步獲取調(diào)用結(jié)果呢?
正常處理邏輯
觸發(fā)異步操作,然后傳遞一個唯一標(biāo)識。
等到異步結(jié)果返回,根據(jù)傳入的唯一標(biāo)識,匹配此次結(jié)果。
如何轉(zhuǎn)換為同步
正常的應(yīng)用場景很多,但是有時候不想做數(shù)據(jù)存儲,只是想簡單獲取調(diào)用結(jié)果。
即想達(dá)到同步操作的結(jié)果,怎么辦呢?
思路
發(fā)起異步操作
在異步結(jié)果返回之前,一直等待(可以設(shè)置超時)
結(jié)果返回之后,異步操作結(jié)果統(tǒng)一返回
常見的實現(xiàn)方式
循環(huán)等待
wait & notify
使用條件鎖
使用 CountDownLatch
使用 CyclicBarrier
Future
Spring EventListener
下面我們一起來學(xué)習(xí)下這幾種實現(xiàn)方式。
循環(huán)等待
說明
循環(huán)等待是最簡單的一種實現(xiàn)思路。
我們調(diào)用對方一個請求,在沒有結(jié)果之前一直循環(huán)查詢即可。
這個結(jié)果可以在內(nèi)存中,也可以放在 redis 緩存或者 mysql 等數(shù)據(jù)庫中。
代碼實現(xiàn)
定義抽象父類
為了便于后面的其他幾種實現(xiàn)方式統(tǒng)一,我們首先定義一個抽象父類。
/**
* 抽象查詢父類
* @author binbin.hou
* @since 1.0.0
*/
public abstract class AbstractQuery {
private static final Log log = LogFactory.getLog(AbstractQuery.class);
protected String result;
public void asyncToSync() {
startQuery();
new Thread(new Runnable() {
public void run() {
remoteCall();
}
}).start();
endQuery();
}
protected void startQuery() {
log.info("開始查詢...");
}
/**
* 遠(yuǎn)程調(diào)用
*/
protected void remoteCall() {
try {
log.info("遠(yuǎn)程調(diào)用開始");
TimeUnit.SECONDS.sleep(5);
result = "success";
log.info("遠(yuǎn)程調(diào)用結(jié)束");
} catch (InterruptedException e) {
log.error("遠(yuǎn)程調(diào)用失敗", e);
}
}
/**
* 查詢結(jié)束
*/
protected void endQuery() {
log.info("完成查詢,結(jié)果為:" + result);
}
}
代碼實現(xiàn)
實現(xiàn)還是非常簡單的,在沒有結(jié)果之前一直循環(huán)。
TimeUnit.MILLISECONDS.sleep(10); 這里循環(huán)等待的小睡一會兒是比較重要的,避免 cpu 飆升,也可以降低為 1ms,根據(jù)自己的業(yè)務(wù)調(diào)整即可。
/**
* 循環(huán)等待
* @author binbin.hou
* @since 1.0.0
*/
public class LoopQuery extends AbstractQuery {
private static final Log log = LogFactory.getLog(LoopQuery.class);
@Override
protected void endQuery() {
try {
while (StringUtil.isEmpty(result)) {
//循環(huán)等待一下
TimeUnit.MILLISECONDS.sleep(10);
}
//獲取結(jié)果
log.info("完成查詢,結(jié)果為:" + result);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
測試
LoopQuery loopQuery = new LoopQuery();
loopQuery.asyncToSync();
日志
[INFO] [2020-10-08 09:50:43.330] [main] [c.g.h.s.t.d.AbstractQuery.startQuery] - 開始查詢...
[INFO] [2020-10-08 09:50:43.331] [Thread-0] [c.g.h.s.t.d.AbstractQuery.remoteCall] - 遠(yuǎn)程調(diào)用開始
[INFO] [2020-10-08 09:50:48.334] [Thread-0] [c.g.h.s.t.d.AbstractQuery.remoteCall] - 遠(yuǎn)程調(diào)用結(jié)束
[INFO] [2020-10-08 09:50:48.343] [main] [c.g.h.s.t.d.LoopQuery.endQuery] - 完成查詢,結(jié)果為:success
這里可以看到遠(yuǎn)程調(diào)用是 Thread-0 線程執(zhí)行的,遠(yuǎn)程調(diào)用的耗時為 5S。
超時特性
為什么需要超時時間
上面的實現(xiàn)存在一個問題,那就是循環(huán)等待沒有超時時間。
我們的一個網(wǎng)絡(luò)請求,可能存在失敗,也可能對方收到請求之后沒有正確處理。
所以如果我們一直等待,可能永遠(yuǎn)也沒有結(jié)果,或者很久之后才有結(jié)果。這在業(yè)務(wù)上是不可忍受的,所以需要添加一個超時時間。
代碼實現(xiàn)
/**
* 循環(huán)等待-包含超時時間
* @author binbin.hou
* @since 1.0.0
*/
public class LoopTimeoutQuery extends AbstractQuery {
private static final Log log = LogFactory.getLog(LoopTimeoutQuery.class);
/**
* 超時時間
*/
private long timeoutMills = 3000;
public LoopTimeoutQuery() {
}
public LoopTimeoutQuery(long timeoutMills) {
this.timeoutMills = timeoutMills;
}
@Override
protected void endQuery() {
try {
final long endTimeMills = System.currentTimeMillis() + timeoutMills;
while (StringUtil.isEmpty(result)) {
// 超時判斷
if(System.currentTimeMillis() >= endTimeMills) {
throw new RuntimeException("請求超時");
}
//循環(huán)等待一下
TimeUnit.MILLISECONDS.sleep(10);
}
//獲取結(jié)果
log.info("完成查詢,結(jié)果為:" + result);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
測試
LoopTimeoutQuery loopQuery = new LoopTimeoutQuery();
loopQuery.asyncToSync();
日志如下:
[INFO] [2020-10-08 10:04:58.091] [main] [c.g.h.s.t.d.AbstractQuery.startQuery] - 開始查詢...
[INFO] [2020-10-08 10:04:58.092] [Thread-0] [c.g.h.s.t.d.AbstractQuery.remoteCall] - 遠(yuǎn)程調(diào)用開始
Exception in thread "main" java.lang.RuntimeException: 請求超時
at com.github.houbb.sync.test.demo.LoopTimeoutQuery.endQuery(LoopTimeoutQuery.java:38)
at com.github.houbb.sync.test.demo.AbstractQuery.asyncToSync(AbstractQuery.java:26)
at com.github.houbb.sync.test.demo.LoopTimeoutQuery.main(LoopTimeoutQuery.java:55)
[INFO] [2020-10-08 10:05:03.097] [Thread-0] [c.g.h.s.t.d.AbstractQuery.remoteCall] - 遠(yuǎn)程調(diào)用結(jié)束
超時時間是可以設(shè)定的,平時開發(fā)中可以根據(jù)自己的響應(yīng)時間設(shè)置。
如果請求超時,考慮對應(yīng)的兜底方案。
基于 wait() & notifyAll()
簡介
實際上 loop 循環(huán)還是比較消耗性能的,對于這種等待特性, jdk 實際上為我們封裝了多種特性。
比如最常見的 wait() 進(jìn)入等待,notifyAll() 喚醒等待的組合方式。
這個同時也是阻塞隊列的實現(xiàn)思想,阻塞隊列我們就不介紹了,我們來看一下 wait+notify 的實現(xiàn)方式。
java 實現(xiàn)
package com.github.houbb.sync.test.demo;
import com.github.houbb.log.integration.core.Log;
import com.github.houbb.log.integration.core.LogFactory;
/**
* wait+notify 實現(xiàn)
* @author binbin.hou
* @since 1.0.0
*/
public class WaitNotifyQuery extends AbstractQuery {
private static final Log log = LogFactory.getLog(WaitNotifyQuery.class);
/**
* 聲明對象
*/
private final Object lock = new Object();
@Override
protected void remoteCall() {
super.remoteCall();
synchronized (lock) {
log.info("遠(yuǎn)程線程執(zhí)行完成,喚醒所有等待。");
lock.notifyAll();
}
}
@Override
protected void endQuery() {
try {
// 等待 10s
synchronized (lock) {
log.info("主線程進(jìn)入等待");
lock.wait(10 * 1000);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
super.endQuery();
}
public static void main(String[] args) {
WaitNotifyQuery query = new WaitNotifyQuery();
query.asyncToSync();
}
}
注意:編程時需要使用 synchronized 保證鎖的持有者線程安全,不然會報錯。
測試
日志如下:
[INFO] [2020-10-08 11:05:50.769] [main] [c.g.h.s.t.d.AbstractQuery.startQuery] - 開始查詢...
[INFO] [2020-10-08 11:05:50.770] [main] [c.g.h.s.t.d.WaitNotifyQuery.endQuery] - 主線程進(jìn)入等待
[INFO] [2020-10-08 11:05:50.770] [Thread-0] [c.g.h.s.t.d.AbstractQuery.remoteCall] - 遠(yuǎn)程調(diào)用開始
[INFO] [2020-10-08 11:05:55.772] [Thread-0] [c.g.h.s.t.d.AbstractQuery.remoteCall] - 遠(yuǎn)程調(diào)用結(jié)束
[INFO] [2020-10-08 11:05:55.773] [Thread-0] [c.g.h.s.t.d.WaitNotifyQuery.remoteCall] - 遠(yuǎn)程線程執(zhí)行完成,喚醒所有等待。
[INFO] [2020-10-08 11:05:55.773] [main] [c.g.h.s.t.d.AbstractQuery.endQuery] - 完成查詢,結(jié)果為:success
基于條件鎖的實現(xiàn)
條件鎖簡介
如果你想編寫一個含有多個條件謂詞的并發(fā)對象,或者你想獲得比條件隊列的可見性之外更多的控制權(quán),那么顯式的Lock和Condition的實現(xiàn)類提供了一個比內(nèi)部鎖和條件隊列更加靈活的選擇。
如同Lock提供了比內(nèi)部加鎖要豐富得多的特征集一樣,Condition也提供了比內(nèi)部條件隊列要豐富得多的特征集:
每個鎖可以有多個等待集(因await掛起的線程的集合)、可中斷/不可中斷的條件等待、基于時限的等待以及公平/非公平隊列之間的選擇.
注意事項:
wait、notify和notifyAll在Condition對象中的對等體是await、signal和signalAll.
但是,Condition繼承與Object,這意味著它也有wait和notify方法.
一定要確保使用了正確的版本–await和signal!
java 實現(xiàn)
為了演示簡單,我們直接選擇可重入鎖即可。
一個Condition和一個單獨的Lock相關(guān)聯(lián),就像條件隊列和單獨的內(nèi)部鎖相關(guān)聯(lián)一樣;
調(diào)用與Condition相關(guān)聯(lián)的Lock的Lock.newCondition方法,可以創(chuàng)建一個Condition.
package com.github.houbb.sync.test.demo;
import com.github.houbb.log.integration.core.Log;
import com.github.houbb.log.integration.core.LogFactory;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/**
* 條件鎖實現(xiàn)
* @author binbin.hou
* @since 1.0.0
*/
public class LockConditionQuery extends AbstractQuery {
private static final Log log = LogFactory.getLog(LockConditionQuery.class);
private final Lock lock = new ReentrantLock();
private final Condition condition = lock.newCondition();
@Override
protected void remoteCall() {
lock.lock();
try{
super.remoteCall();
log.info("遠(yuǎn)程線程執(zhí)行完成,喚醒所有等待線程。");
condition.signalAll();
} finally {
lock.unlock();
}
}
@Override
protected void endQuery() {
lock.lock();
try{
// 等待
log.info("主線程進(jìn)入等待");
condition.await();
super.endQuery();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public static void main(String[] args) {
LockConditionQuery query = new LockConditionQuery();
query.asyncToSync();
}
}
實現(xiàn)也比較簡單,我們在方法進(jìn)入,調(diào)用 lock.lock() 加鎖,finally 中調(diào)用 lock.unlock() 釋放鎖。
condition.await(); 進(jìn)入等待;condition.signalAll(); 喚醒所有等待線程。
測試日志
[INFO] [2020-10-08 12:33:40.985] [main] [c.g.h.s.t.d.AbstractQuery.startQuery] - 開始查詢...
[INFO] [2020-10-08 12:33:40.986] [main] [c.g.h.s.t.d.LockConditionQuery.endQuery] - 主線程進(jìn)入等待
[INFO] [2020-10-08 12:33:40.987] [Thread-0] [c.g.h.s.t.d.AbstractQuery.remoteCall] - 遠(yuǎn)程調(diào)用開始
[INFO] [2020-10-08 12:33:45.990] [Thread-0] [c.g.h.s.t.d.AbstractQuery.remoteCall] - 遠(yuǎn)程調(diào)用結(jié)束
[INFO] [2020-10-08 12:33:45.991] [Thread-0] [c.g.h.s.t.d.LockConditionQuery.remoteCall] - 遠(yuǎn)程線程執(zhí)行完成,喚醒所有等待線程。
[INFO] [2020-10-08 12:33:45.993] [main] [c.g.h.s.t.d.AbstractQuery.endQuery] - 完成查詢,結(jié)果為:success
CountDownLatch 閉鎖實現(xiàn)
CountDownLatch/Future/CyclicBarrier 這三個都是 jdk 為我們提供的同步工具類,我們此處只做簡單介紹。
詳情參見:
CountDownLatch 簡介
閉鎖是一種同步工具類,可以延遲線程的進(jìn)度直到其達(dá)到終止?fàn)顟B(tài)。
閉鎖的作用相當(dāng)于一扇門:在閉鎖到達(dá)結(jié)束狀態(tài)之前,這扇門一直是關(guān)閉的,并且沒有任何線程能通過,當(dāng)?shù)竭_(dá)結(jié)束狀態(tài)時,這扇門會打開并允許所有的線程通過。
當(dāng)閉鎖到達(dá)結(jié)束狀態(tài)后,將不會再改變狀態(tài),因此這扇門將永遠(yuǎn)保持打開狀態(tài)。
閉鎖可以用來確保某些活動直到其它活動都完成后才繼續(xù)執(zhí)行。
java 代碼實現(xiàn)
import com.github.houbb.log.integration.core.Log;
import com.github.houbb.log.integration.core.LogFactory;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
/**
* CountDownLatch 實現(xiàn)
* @author binbin.hou
* @since 1.0.0
*/
public class CountDownLatchQuery extends AbstractQuery {
private static final Log log = LogFactory.getLog(CountDownLatchQuery.class);
/**
* 閉鎖
* 調(diào)用1次,后續(xù)方法即可通行。
*/
private final CountDownLatch countDownLatch = new CountDownLatch(1);
@Override
protected void remoteCall() {
super.remoteCall();
// 調(diào)用一次閉鎖
countDownLatch.countDown();
}
@Override
protected void endQuery() {
try {
// countDownLatch.await();
countDownLatch.await(10, TimeUnit.SECONDS);
log.info("完成查詢,結(jié)果為:" + result);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
CountDownLatchQuery loopQuery = new CountDownLatchQuery();
loopQuery.asyncToSync();
}
}
我們在返回結(jié)果之前調(diào)用 countDownLatch.await(10, TimeUnit.SECONDS); 進(jìn)行等待,這里可以指定超時時間。
remoteCall() 遠(yuǎn)程完成后,執(zhí)行一下 countDownLatch.countDown();,進(jìn)而可以讓程序繼續(xù)執(zhí)行下去。
測試
代碼
CountDownLatchQuery loopQuery = new CountDownLatchQuery();
loopQuery.asyncToSync();
日志
[INFO] [2020-10-08 10:24:03.348] [main] [c.g.h.s.t.d.AbstractQuery.startQuery] - 開始查詢...
[INFO] [2020-10-08 10:24:03.350] [Thread-0] [c.g.h.s.t.d.AbstractQuery.remoteCall] - 遠(yuǎn)程調(diào)用開始
[INFO] [2020-10-08 10:24:08.353] [Thread-0] [c.g.h.s.t.d.AbstractQuery.remoteCall] - 遠(yuǎn)程調(diào)用結(jié)束
[INFO] [2020-10-08 10:24:08.354] [main] [c.g.h.s.t.d.CountDownLatchQuery.endQuery] - 完成查詢,結(jié)果為:success
jdk 提供的閉鎖功能還是非常的方便的。
CyclicBarrier 柵欄
簡介
柵欄(Barrier)類似于閉鎖,它能阻塞一組線程直到某個事件發(fā)生[CPJ 4.4.3]。閉鎖是一次性對象,一旦進(jìn)入最終狀態(tài),就不能被重置了。
柵欄與閉鎖的關(guān)鍵區(qū)別在于,所有線程必須同時達(dá)到柵欄位置,才能繼續(xù)執(zhí)行。閉鎖用于等待事件,而柵欄用于等待其他線程。
java 實現(xiàn)
package com.github.houbb.sync.test.demo;
import com.github.houbb.log.integration.core.Log;
import com.github.houbb.log.integration.core.LogFactory;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
/**
* CyclicBarrier 實現(xiàn)
* @author binbin.hou
* @since 1.0.0
*/
public class CyclicBarrierQuery extends AbstractQuery {
private static final Log log = LogFactory.getLog(CyclicBarrierQuery.class);
private CyclicBarrier cyclicBarrier = new CyclicBarrier(2);
@Override
protected void remoteCall() {
super.remoteCall();
try {
cyclicBarrier.await();
log.info("遠(yuǎn)程調(diào)用進(jìn)入等待");
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
}
@Override
protected void endQuery() {
try {
cyclicBarrier.await();
log.info("主線程進(jìn)入等待");
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
super.endQuery();
}
}
測試
代碼
public static void main(String[] args) {
CyclicBarrierQuery cyclicBarrierQuery = new CyclicBarrierQuery();
cyclicBarrierQuery.asyncToSync();
}
日志
[INFO] [2020-10-08 10:39:00.890] [main] [c.g.h.s.t.d.AbstractQuery.startQuery] - 開始查詢...
[INFO] [2020-10-08 10:39:00.892] [Thread-0] [c.g.h.s.t.d.AbstractQuery.remoteCall] - 遠(yuǎn)程調(diào)用開始
[INFO] [2020-10-08 10:39:05.894] [Thread-0] [c.g.h.s.t.d.AbstractQuery.remoteCall] - 遠(yuǎn)程調(diào)用結(jié)束
[INFO] [2020-10-08 10:39:05.895] [Thread-0] [c.g.h.s.t.d.CyclicBarrierQuery.remoteCall] - 遠(yuǎn)程調(diào)用進(jìn)入等待
[INFO] [2020-10-08 10:39:05.895] [main] [c.g.h.s.t.d.CyclicBarrierQuery.endQuery] - 主線程進(jìn)入等待
[INFO] [2020-10-08 10:39:05.896] [main] [c.g.h.s.t.d.AbstractQuery.endQuery] - 完成查詢,結(jié)果為:success
可以看出遠(yuǎn)程線程 Thread-0 執(zhí)行完之后就進(jìn)入等待,此時主線程調(diào)用,然后也進(jìn)入等待。
等主線程 endQuery 等待時,就滿足了兩個線程同時等待,然后執(zhí)行就結(jié)束了。
基于 Future 實現(xiàn)
Future 簡介
Future模式可以這樣來描述:我有一個任務(wù),提交給了Future,Future替我完成這個任務(wù)。期間我自己可以去做任何想做的事情。一段時間之后,我就便可以從Future那兒取出結(jié)果。就相當(dāng)于下了一張訂貨單,一段時間后可以拿著提訂單來提貨,這期間可以干別的任何事情。其中Future 接口就是訂貨單,真正處理訂單的是Executor類,它根據(jù)Future接口的要求來生產(chǎn)產(chǎn)品。
Future接口提供方法來檢測任務(wù)是否被執(zhí)行完,等待任務(wù)執(zhí)行完獲得結(jié)果,也可以設(shè)置任務(wù)執(zhí)行的超時時間。這個設(shè)置超時的方法就是實現(xiàn)Java程序執(zhí)行超時的關(guān)鍵。
詳細(xì)介紹:
java 代碼實現(xiàn)
采用 Future 返回和以前的實現(xiàn)差異較大,我們直接覆寫以前的方法即可。
import com.github.houbb.log.integration.core.Log;
import com.github.houbb.log.integration.core.LogFactory;
import java.util.concurrent.*;
/**
* Future 實現(xiàn)
* @author binbin.hou
* @since 1.0.0
*/
public class FutureQuery extends AbstractQuery {
private static final Log log = LogFactory.getLog(FutureQuery.class);
private final ExecutorService executorService = Executors.newSingleThreadExecutor();
@Override
public void asyncToSync() {
//1. 開始調(diào)用
super.startQuery();
//2. 遠(yuǎn)程調(diào)用
Future stringFuture = remoteCallFuture();
//3. 完成結(jié)果
try {
String result = stringFuture.get(10, TimeUnit.SECONDS);
log.info("調(diào)用結(jié)果:{}", result);
} catch (InterruptedException | TimeoutException | ExecutionException e) {
e.printStackTrace();
}
}
/**
* 遠(yuǎn)程調(diào)用
* @return Future 信息
*/
private Future remoteCallFuture() {
FutureTask futureTask = new FutureTask<>(new Callable() {
@Override
public String call() throws Exception {
log.info("開始異步調(diào)用");
TimeUnit.SECONDS.sleep(5);
log.info("完成異步調(diào)用");
return "success";
}
});
executorService.submit(futureTask);
// 關(guān)閉線程池
executorService.shutdown();
return futureTask;
}
public static void main(String[] args) {
FutureQuery query = new FutureQuery();
query.asyncToSync();
}
}
遠(yuǎn)程調(diào)用執(zhí)行時,是一個 FutureTask,然后提交到線程池去執(zhí)行。
獲取結(jié)果的時候,stringFuture.get(10, TimeUnit.SECONDS) 可以指定獲取的超時時間。
日志
測試日志如下:
[INFO] [2020-10-08 12:52:05.175] [main] [c.g.h.s.t.d.AbstractQuery.startQuery] - 開始查詢...
[INFO] [2020-10-08 12:52:05.177] [pool-1-thread-1] [c.g.h.s.t.d.FutureQuery.call] - 開始異步調(diào)用
[INFO] [2020-10-08 12:52:10.181] [pool-1-thread-1] [c.g.h.s.t.d.FutureQuery.call] - 完成異步調(diào)用
[INFO] [2020-10-08 12:52:10.185] [main] [c.g.h.s.t.d.FutureQuery.asyncToSync] - 調(diào)用結(jié)果:success
Spring EventListener
spring 事件監(jiān)聽器模式
對于一件事情完成的結(jié)果調(diào)用,使用觀察者模式是非常適合的。
spring 為我們提供了比較強(qiáng)大的監(jiān)聽機(jī)制,此處演示下結(jié)合 spring 使用的例子。
ps: 這個例子是2年前的自己寫的例子了,此處為了整個系列的完整性,直接搬過來作為補(bǔ)充。
代碼實現(xiàn)
BookingCreatedEvent.java
定義一個傳輸屬性的對象。
public class BookingCreatedEvent extends ApplicationEvent {
private static final long serialVersionUID = -1387078212317348344L;
private String info;
public BookingCreatedEvent(Object source) {
super(source);
}
public BookingCreatedEvent(Object source, String info) {
super(source);
this.info = info;
}
public String getInfo() {
return info;
}
}
BookingService.java
說明:當(dāng) this.context.publishEvent(bookingCreatedEvent); 觸發(fā)時,
會被 @EventListener 指定的方法監(jiān)聽到。
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Service;
import java.util.concurrent.TimeUnit;
@Service
public class BookingService {
@Autowired
private ApplicationContext context;
private volatile BookingCreatedEvent bookingCreatedEvent;
/**
* 異步轉(zhuǎn)同步查詢
* @param info
* @return
*/
public String asyncQuery(final String info) {
query(info);
new Thread(new Runnable() {
@Override
public void run() {
remoteCallback(info);
}
}).start();
while(bookingCreatedEvent == null) {
//.. 空循環(huán)
// 短暫等待。
try {
TimeUnit.MILLISECONDS.sleep(1);
} catch (InterruptedException e) {
//...
}
//2. 使用兩個單獨的 event...
}
final String result = bookingCreatedEvent.getInfo();
bookingCreatedEvent = null;
return result;
}
@EventListener
public void onApplicationEvent(BookingCreatedEvent bookingCreatedEvent) {
System.out.println("監(jiān)聽到遠(yuǎn)程的信息: " + bookingCreatedEvent.getInfo());
this.bookingCreatedEvent = bookingCreatedEvent;
System.out.println("監(jiān)聽到遠(yuǎn)程消息后: " + this.bookingCreatedEvent.getInfo());
}
/**
* 執(zhí)行查詢
* @param info
*/
public void query(final String info) {
System.out.println("開始查詢: " + info);
}
/**
* 遠(yuǎn)程回調(diào)
* @param info
*/
public void remoteCallback(final String info) {
System.out.println("遠(yuǎn)程回調(diào)開始: " + info);
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 重發(fā)結(jié)果事件
String result = info + "-result";
BookingCreatedEvent bookingCreatedEvent = new BookingCreatedEvent(this, result);
//觸發(fā)event
this.context.publishEvent(bookingCreatedEvent);
}
}
測試方法
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(classes = SpringConfig.class)
public class BookServiceTest {
@Autowired
private BookingService bookingService;
@Test
public void asyncQueryTest() {
bookingService.asyncQuery("1234");
}
}
日志
2018-08-10 18:27:05.958 INFO [main] com.github.houbb.spring.lean.core.ioc.event.BookingService:84 - 開始查詢:1234
2018-08-10 18:27:05.959 INFO [Thread-2] com.github.houbb.spring.lean.core.ioc.event.BookingService:93 - 遠(yuǎn)程回調(diào)開始:1234
接收到信息: 1234-result
2018-08-10 18:27:07.964 INFO [Thread-2] com.github.houbb.spring.lean.core.ioc.event.BookingService:73 - 監(jiān)聽到遠(yuǎn)程的信息: 1234-result
2018-08-10 18:27:07.964 INFO [Thread-2] com.github.houbb.spring.lean.core.ioc.event.BookingService:75 - 監(jiān)聽到遠(yuǎn)程消息后: 1234-result
2018-08-10 18:27:07.964 INFO [Thread-2] com.github.houbb.spring.lean.core.ioc.event.BookingService:106 - 已經(jīng)觸發(fā)event
2018-08-10 18:27:07.964 INFO [main] com.github.houbb.spring.lean.core.ioc.event.BookingService:67 - 查詢結(jié)果: 1234-result
2018-08-10 18:27:07.968 INFO [Thread-1] org.springframework.context.support.GenericApplicationContext:993 - Closing org.springframework.context.support.GenericApplicationContext@5cee5251: startup date [Fri Aug 10 18:27:05 CST 2018]; root of context hierarchy
小結(jié)
本文共計介紹了 7 種異步轉(zhuǎn)同步的方式,實際上思想都是一樣的。
在異步執(zhí)行完成前等待,執(zhí)行完成后喚醒等待即可。
當(dāng)然我寫本文除了總結(jié)以上幾種方式以外,還想為后續(xù)寫一個異步轉(zhuǎn)同步的工具提供基礎(chǔ)。
下一節(jié)我們將一起學(xué)習(xí)下如何將這個功能封裝為一個同步轉(zhuǎn)換框架,感興趣的可以關(guān)注一下,便于實時接收最新內(nèi)容。
覺得本文對你有幫助的話,歡迎點贊評論收藏轉(zhuǎn)發(fā)一波。你的鼓勵,是我最大的動力~
不知道你有哪些收獲呢?或者有其他更多的想法,歡迎留言區(qū)和我一起討論,期待與你的思考相遇。
代碼地址
為了便于學(xué)習(xí),文中的所有例子都已經(jīng)開源:
實現(xiàn) 1-6:sync
總結(jié)
以上是生活随笔為你收集整理的mysql异步查询 java_java 手写并发框架(一)异步查询转同步的 7 种实现方式的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: MySQL之日期时间处理函数_MySQL
- 下一篇: MYSQL查询语句待优化_mysql语句