本章節依賴于【Marble使用】,閱讀本章節前請保證已經充分了解Marble。 中斷特性從Marble-Agent 2.0.5開始支持。
線程中斷使用 引入marble-agent jar包 <dependency><groupId>com.github.jeff-dong</groupId><artifactId>marble-agent</artifactId><version>最新版</version>
</dependency>
JOB執行代碼適當位置添加中斷標志, 下面給出示例代碼 @Component("job1")
public class Job1 extends MarbleJob {private ClogWrapper logger = ClogWrapperFactory.getClogWrapper(Job1.class);@Overridepublic void execute(String param) throws Exception {logger.info("JOB1開始執行 ...");int i = 0;while (true) {i++;//1、用中斷狀態碼進行判斷if (Thread.interrupted()) {logger.info("JOB1-[{}]-[{}]被打斷啦", param, Thread.currentThread().getName());return;}try {Thread.sleep(500);} catch (InterruptedException e) {//2、捕獲終端異常后return結束return;}logger.info("JOB1-[{}]-[{}]-{}-------", param, Thread.currentThread().getName(), i);}}
}
Marble OFFLINE進行線程中斷 3.1 手動調度線程中斷
3.2 選擇要中斷的服務器進行終端嘗試
3.3 查看中斷日志(同步JOB)
中斷實現及原理 Java的線程中斷 Java的線程中斷機制是一種協作機制,線程中斷并不能立即停掉線程執行,相反,可能線程永遠都不會響應。 java的線程中斷模型只是通過修改線程的中斷標志(interrupt)進行中斷通知,不會有其它額外操作,因此線程是否最終中斷取決于線程的執行邏輯。因此,如果想讓線程按照自己的想法中斷,要代碼中事先進行中斷的“埋點”處理。
有人可能會想到Thread的stop方法進行中斷,由于此方法可能造成不可預知的結果,已經被拋棄
Marble進行線程中斷實現 需求收集 以JOB為維度進行線程中斷; 盡量做到實時響應; 存在集群中多臺機器,要支持指定某臺機器中的線程中斷; 允許多次中斷嘗試; 中斷請求不能依賴于JOB當前狀態??赡芤呀浲V拐{度的JOB也要手動中斷執行中的線程; 透明和擴展不同JOB的中斷(提供用戶中斷的"后處理"擴展); 需求分析及實現 【以JOB為維度進行線程中斷】
Marble的JOB標志為 schedulerName-appId-jobName組成,目前Marble每個JOB調度時間和頻率都是個性化,目前調度完成就銷毀。但要做到任何時間進行執行中的線程中斷就要:
1.1 存儲JOB的運行線程,隨時準備中斷; 1.2 在緩存的JOB數量/時間和性能間做權衡,不能過多也不能過少; 1.3 制定緩存已滿時的拋棄策略,避免緩存被占滿新的線程永遠無法中斷; 1.4 要同步JOB和異步JOB透明處理(感覺不出差異);
實現: Marble的線程池中定義支持并發的MAP進行JOB維度的線程緩存,此外指定每個JOB下緩存的線程數量。如下:
public class ThreadPool {...private Multimap<String, Object> threadMultimap = Multimaps.synchronizedMultimap(HashMultimap.<String, Object>create());//multimap的單個key的最大容量private static final int THREADMULTIMAP_SIZE = 50;...
}
Marble-Agent在同步/異步JOB生成新的線程對象時進行放入MAP緩存,如果緩存(50個)已滿采用如下策略進行處理:
嘗試清理當前map中的非活躍線程; 嘗試清理當前map中已經完成的線程(同步線程有效); 如果還未清理出空間,移除最久的線程; public ThreadPool multimapPut(String key, Object value) {if (StringUtils.isNotBlank(key)) {Collection collection = threadMultimap.get(key);if (collection != null && collection.size() >= THREADMULTIMAP_SIZE) {//替換最久的Iterator<Object> it = collection.iterator();//首先進行 非活躍線程清理while (it.hasNext()) {Object tempObj = it.next();if(tempObj instanceof MarbleThread){MarbleThread mt = (MarbleThread)tempObj;//不活躍刪除if(!mt.isThreadAlive()){it.remove();}}else if(tempObj instanceof MarbleThreadFeature){MarbleThreadFeature mf = (MarbleThreadFeature) tempObj;//完成的線程刪除if(mf.isDone()){it.remove();}}}//仍然>最大值,刪除最久未使用if(collection.size() >= THREADMULTIMAP_SIZE){while (it.hasNext()) {it.next();it.remove();break;}}threadMultimap.put(key, value);return this;}}threadMultimap.put(key, value);return this;}
此外,為了能在JVM關閉時進行線程中斷,添加JVM hook進行中斷調用處理(包括線程池的銷毀)。 除此之外,還有個小問題,由于線程池使用的是有界的阻塞隊列,此種情況下,線程中斷時可能有的線程存在于阻塞隊列中,單純的中斷無效,對于此類情況,要首先判斷阻塞隊列中是否存在要中斷的線程,存在的話進行隊列的移除操作。
【盡量做到實時響應】 只能通過用戶在具體的線程邏輯中進行埋點處理,Marble在框架層面除了及時把用戶的中斷請求送達之外,沒有其它措施。
【存在集群中多臺機器,要支持指定某臺機器中的線程中斷】 Marble OFFLINE的中斷頁面支持機器的選擇,用戶進行選擇后,Marble會有針對性的進行機器的中斷RPC發送。
【允許多次中斷嘗試】 OFFLINE未對中斷次數進行限制,目前支持多次中斷請求發送。
【中斷請求不能依賴于JOB當前狀態】 考慮到用戶對歷史線程的中斷請求,Marble未把中斷操作綁定在JOB狀態上,任何JOB都可以進行終端嘗試。
【透明擴展不同JOB的中斷】 Marble目前支持同步和異步JOB,兩類JOB的中斷處理并不一致,比如同步job的中斷是通過FeatureTask的cancel實現,異步JOB是通過Thread的interrupt實現,此外線程被中斷后Marble希望能更進一步提供一個統一的“后處理”操作給用戶自己實現,比如用戶可能需要在線程被中斷后進行一些后續的log記錄等。
為了代碼層面一致透明,且友好的實現“后處理”的封裝,Marble使用了代理模式,在Thread和FeatureTask上添加了一層“代理類”,由代理進行具體的中斷操作。 同步JOB代理類:
/*** @author <a href="dongjianxing@aliyun.com">jeff</a>* @version 2017/4/19 16:31*/
public class MarbleThreadFeature<V> implements RunnableFuture<V> {private ClogWrapper logger = ClogWrapperFactory.getClogWrapper(MarbleThreadFeature.class);private MarbleJob marbleJob;private String param;private FutureTask<Result> futureTask;public MarbleThreadFeature(final MarbleJob marbleJob, final String param) {super();this.marbleJob = marbleJob;this.param = param;futureTask = new FutureTask<>(new Callable<Result>() {@Overridepublic Result call() throws Exception {return marbleJob.executeSync(param);}});}@Overridepublic void run() {futureTask.run();}@Overridepublic boolean cancel(boolean mayInterruptIfRunning) {return futureTask.cancel(mayInterruptIfRunning);}@Overridepublic boolean isCancelled() {return futureTask.isCancelled();}@Overridepublic boolean isDone() {return futureTask.isDone();}@Overridepublic V get() throws InterruptedException, ExecutionException {return (V) futureTask.get();}@Overridepublic V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {return (V) futureTask.get(timeout, unit);}public void stop(String operator) {if (futureTask != null && !futureTask.isCancelled()) {logger.info("Thread-feature[{}] is interrupted", futureTask.getClass().getName());futureTask.cancel(true);}else if(marbleJob != null){boolean removeResult = ((ThreadPoolExecutor) ThreadPool.getFixedInstance().getExecutorService()).getQueue().remove(marbleJob);logger.info("Hanging MarbleJob[{}] is removed from the queue success?{}", marbleJob.getClass().getSimpleName(),removeResult);}//中斷后處理if(marbleJob != null){marbleJob.afterInterruptTreatment();}}}
異步JOB代理類:
/*** @author <a href="dongjianxing@aliyun.com">jeff</a>* @version 2017/4/19 16:31*/
public class MarbleThread implements Runnable {private ClogWrapper logger = ClogWrapperFactory.getClogWrapper(MarbleThread.class);private MarbleJob marbleJob;private String param;private Thread runThread;public MarbleThread(MarbleJob marbleJob, String param) {super();this.marbleJob = marbleJob;this.param = param;}@Overridepublic void run() {runThread = Thread.currentThread();try {marbleJob.execute(param);} catch (Exception e) {e.printStackTrace();}}public boolean isThreadAlive() {return (runThread != null && runThread.isAlive());}public String getThreadName() {return runThread != null ? runThread.getName() : "";}public void stop() {//首先嘗試在阻塞隊列中刪除boolean removeResult = ((ThreadPoolExecutor) ThreadPool.getFixedInstance().getExecutorService()).getQueue().remove(this);logger.info("Hanging MarbleJob[{}] is removed from the queue success?{}", this.getClass().getSimpleName(), removeResult);if (runThread != null && !runThread.isInterrupted()) {logger.info("Thread[{}] is interrupted", runThread.getName());runThread.interrupt();}//中斷后處理if (marbleJob != null) {marbleJob.afterInterruptTreatment();}}
}
總結
以上是生活随笔 為你收集整理的Marble原理之线程中断 的全部內容,希望文章能夠幫你解決所遇到的問題。
如果覺得生活随笔 網站內容還不錯,歡迎將生活随笔 推薦給好友。