java.util.concurrent.Exchanger应用范例与原理浅析--转载
一、簡介
?? Exchanger是自jdk1.5起開始提供的工具套件,一般用于兩個工作線程之間交換數據。在本文中我將采取由淺入深的方式來介紹分析這個工具類。首先我們來看看官方的api文檔中的敘述:
??? 在以上的描述中,有幾個要點:
- 此類提供對外的操作是同步的;
- 用于成對出現的線程之間交換數據;
- 可以視作雙向的同步隊列;
- 可應用于基因算法、流水線設計等場景。
?? 接著看api文檔,這個類提供對外的接口非常簡潔,一個無參構造函數,兩個重載的范型exchange方法:
public V exchange(V x) throws InterruptedException
public V exchange(V x, long timeout, TimeUnit unit) throws InterruptedException, TimeoutException
?? 從官方的javadoc可以知道,當一個線程到達exchange調用點時,如果它的伙伴線程此前已經調用了此方法,那么它的伙伴會被調度喚醒并與之進行對象交換,然后各自返回。如果它的伙伴還沒到達交換點,那么當前線程將會被掛起,直至伙伴線程到達——完成交換正常返回;或者當前線程被中斷——拋出中斷異常;又或者是等候超時——拋出超時異常。
二、一個簡單的例子
按照某大師的觀點,行為知之先,在知道了Exchanger的大致用途并參閱了使用說明后,我們馬上動手寫個例子來跑一跑:
?? 這大致可以看作是一個簡易的生產者消費者模型,有兩個任務類,一個遞增地產生整數,一個產生整數0,然后雙方進行交易。每次交易前的生產者和每次交易后的消費者都會sleep 1秒來模擬數據處理的消耗,并在交易前后把整數值打印到控制臺以便檢測結果。在這個例子里交易循環只執行三次,采用一個volatile boolean來控制交易雙方線程的退出。
?? 我們來看看程序的輸出:
?
consumer before : 0producer before: 1
consumer after : 1
producer after: 0
consumer before : 0
producer before: 2
producer after: 0
consumer after : 2
consumer before : 0
producer before: 3
producer after: 0
consumer after : 3
??? 輸出結果驗證了以下兩件事情:
- exchange方法真的幫一對線程交換了數據;
- exchange方法真的會阻塞調用方線程直至另一方線程參與交易。
?? 那么在中斷和超時兩種情況下程序的運行表現會是怎樣呢?作為一個小練習,有興趣的觀眾可以設想并編寫測試用例覆蓋驗證之。接下來談談最近我在生產場景中對Exchanger的應用。
?
三、實戰場景
1.問題描述
?? 最近接到外部項目組向我組提出的接口需求,需要查詢我們業務辦理量的統計情況。我們系統目前的情況是,有一個日增長十多萬、總數據量為千萬級別的業務辦理明細表(xxx_info),每人次的業務辦理結果會實時寫入其中。以往對外提供的業務統計接口是在每次被調用時候在明細表中執行SQL查詢(select、count、where、group by等),響應時間很長,對原生產業務的使用也有很大的影響。于是我決定趁著這次新增接口的上線機會對系統進行優化。
2.優化思路
?? 首先是在明細表之外再建立一個數據統計(xxx_statistics)表,考慮到目前數據庫的壓力以及公司內部質管流控等因素,暫沒有分庫存放,仍舊與原明細表放在同一個庫。再設置一個定時任務于每日凌晨對明細表進行查詢、過濾、統計、排序等操作,把統計結果插入到統計表中。然后對外暴露統計接口查詢統計報表。現在的設計與原來的實現相比,雖然犧牲了統計表所占用的少量額外的存儲空間(每日新增的十來萬條業務辦理明細記錄經過處理最終會變成幾百條統計表的記錄),但是卻能把select、count這樣耗時的數據統計操作放到凌晨時段執行以避開白天的業務辦理高峰,分表處理能夠大幅降低對生產業務明細表的性能影響,而對外提供的統計接口的查詢速度也將得到幾個數量級的提升。當然,還有一個缺點是,不能實時提供當天的統計數據,不過這也是雙方可以接受的。
3.設計實現
?? 設計一個定時任務,每日凌晨執行。在定時任務中啟動兩個線程,一個線程負責對業務明細表(xxx_info)進行查詢統計,把統計的結果放置在內存緩沖區,另一個線程負責讀取緩沖區中的統計結果并插入到業務統計表(xxx_statistics)中。
?? 親,這樣的場景是不是聽起來很有感覺?沒錯!兩個線程在內存中批量交換數據,這個事情我們可以使用Exchanger去做!我們馬上來看看代碼如何實現。
?? 生產者線程:
?
class ExchangerProducer implements Runnable {private Exchanger<Set<XXXStatistics>> exchanger;private Set<XXXStatistics> holder;private Date fltDate;private int threshold;ExchangerProducer(Exchanger<Set<XXXStatistics>> exchanger,Set<XXXStatistics> holder, Date fltDate, int threshold) {this.exchanger = exchanger;this.holder = holder;this.fltDate = fltDate;this.threshold = threshold;}@Overridepublic void run() {try {while (!Thread.interrupted() && !isDone) {List<XXXStatistics> temp1 = null;List<XXXStatistics> temp11 = null;for (int i = 0; i < allCities.size(); i++) {try {temp1 = xxxDao.findStatistics1(fltDate, allCities.get(i));temp11 = xxxDao.findStatistics2(fltDate, allCities.get(i),internationalList);if (temp1 != null && !temp1.isEmpty()) {calculationCounter.addAndGet(temp1.size());if (temp11 != null && !temp11.isEmpty()) {// merge two lists into temp1 mergeLists(temp1, temp11);temp11.clear();temp11 = null;}// merge temp1 into holder set mergeListToSet(holder, temp1);temp1.clear();temp1 = null;}} catch (Exception e) {log.error(e, e);}// Insert every ${threshold} or the last into database.if (holder.size() >= threshold|| i == (allCities.size() - 1)) {log.info("data collected: \n" + holder);holder = exchanger.exchange(holder);log.info("data submitted");}}// all cities are calculatedisDone = true;}log.info("calculation job done, calculated: "+ calculationCounter.get());} catch (InterruptedException e) {log.error(e, e);}exchanger = null;holder.clear();holder = null;fltDate = null;} }?? 代碼說明:
?
- threshold:緩沖區的容量閥值;
- allCities:城市列表,迭代這個列表作為入參來執行查詢統計;
- XXXStatistics:統計數據封裝實體類,實現了Serializable和Comparable接口,覆寫equals和compareTo方法,以利用TreeSet提供的去重和排序處理;
- isDone:volatile boolean,標識統計任務是否完成;
- holder:TreeSet<XXXStatistics>,存放統計結果的內存緩沖區,容量達到閥值后提交給Exchanger執行exchange操作;
- dao.findStatistics1,dao.findStatistics2:簡化的數據庫查詢統計操作,此處僅供示意;
- calculationCounter:AtomicInteger,標記生產端所提交的記錄總數;
- mergeLists,mergeListToSet:內部私有工具方法,把dao查詢返回的列表合并到holder中;
?
?? 消費者線程:
class ExchangerConsumer implements Runnable {private Exchanger<Set<XXXStatistics>> exchanger;private Set<XXXStatistics> holder;ExchangerConsumer(Exchanger<Set<XXXStatistics>> exchanger,Set<XXXStatistics> holder) {this.exchanger = exchanger;this.holder = holder;}@Overridepublic void run() {try {List<XXXStatistics> tempList;while (!Thread.interrupted() && !isDone) {holder = exchanger.exchange(holder);log.info("got data: \n" + holder);if (holder != null && !holder.isEmpty()) {try {// insert data into databasetempList = convertSetToList(holder);insertionCounter.addAndGet(xxxDao.batchInsertXXXStatistics(tempList));tempList.clear();tempList = null;} catch (Exception e) {log.error(e, e);}// clear the set holder.clear();} else {log.info("wtf, got an empty list");}log.info("data processed");}log.info("insert job done, inserted: " + insertionCounter.get());} catch (InterruptedException e) {log.error(e, e);}exchanger = null;holder.clear();holder = null;} }?
?? 代碼說明:
- convertSetToList:由于dao接口的限制,需把交換得到的Set轉換為List;
- batchInsertXXXStatistics:使用jdbc4的batch update而實現的批量插入dao接口;
- insertionCounter:AtomicInteger,標記消費端插入成功的記錄總數;
?
?? 調度器代碼:
public boolean calculateStatistics(Date fltDate) {// initializationcalculationCounter.set(0);insertionCounter.set(0);isDone = false;exec = Executors.newCachedThreadPool();Set<XXXStatistics> producerSet = new TreeSet<XXXStatistics>();Set<XXXStatistics> consumerSet = new TreeSet<XXXStatistics>();Exchanger<Set<XXXStatistics>> xc = new Exchanger<Set<XXXStatistics>>();ExchangerProducer producer = new ExchangerProducer(xc, producerSet,fltDate, threshold);ExchangerConsumer consumer = new ExchangerConsumer(xc, consumerSet);// execution exec.execute(producer);exec.execute(consumer);exec.shutdown();boolean isJobDone = false;try {// wait for terminationisJobDone = exec.awaitTermination(calculationTimeoutMinutes,TimeUnit.MINUTES);} catch (InterruptedException e) {log.error(e, e);}if (!isJobDone) {// force shutdown exec.shutdownNow();log.error("time elapsed for "+ calculationTimeoutMinutes+ " minutes, but still not finished yet, shut it down anyway.");}// clean upexec = null;producerSet.clear();producerSet = null;consumerSet.clear();consumerSet = null;xc = null;producer = null;consumer = null;System.gc();// return the resultif (isJobDone && calculationCounter.get() > 0&& calculationCounter.get() == insertionCounter.get()) {return true;}return false; }?? 代碼說明:
?? 調度器的代碼就四個步驟:初始化、提交任務并等候處理結果、清理、返回。初始化階段使用了jdk提供的線程池提交生產者和消費者任務,設置了最長等候時間calculationTimeoutMinutes,如果調度器線程被中斷或者任務執行超時,awaitTermination會返回false,此時就強行關閉線程池并記錄到日志。統計操作每日凌晨執行一次,所以在任務退出前的清理階段建議jvm執行gc以盡早釋放計算時所產生的垃圾對象。在結果返回階段,如果查詢統計出來的記錄條數和插入成功的條數相等則返回true,否則返回false。
?
?
4.小結
?? 在這個案例中,使用Exchanger進行批量的雙向數據交換可謂恰如其分:生產者在執行新的查詢統計任務填入數據到緩沖區的同時,消費者正在批量插入生產者換入的上一次產生的數據,系統的吞吐量得到平滑的提升;計算復雜度、內存消耗、系統性能也能通過相關的參數設置而得到有效的控制(在消費端也可以對holder進行再次分割以控制每次批插入的大小,建議參閱數據庫廠商以及數據庫驅動包的說明文檔以確定jdbc的最優batch update size);代碼的實現也很簡潔易懂。這些優點,是采用有界阻塞隊列所難以達到的。
?? 程序的輸出結果與業務緊密相關,就不打印出來了。可以肯定的是,經過了一段時間的摸索調優,內存消耗、執行速度和處理結果還是比較滿意的。
?
原文地址:http://lixuanbin.iteye.com/blog/2166772
轉載于:https://www.cnblogs.com/davidwang456/p/4179488.html
總結
以上是生活随笔為你收集整理的java.util.concurrent.Exchanger应用范例与原理浅析--转载的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: windows 下rabbitmq 安装
- 下一篇: Java获取真实的IP地址--转载