Java技巧:创建监视友好的ExecutorService
在本文中,我們將擴展具有監視功能的ExecutorService實現。 這種監視功能將幫助我們在實時生產環境中測量多個池參數,即活動線程,工作隊列大小等。 它還將使我們能夠衡量任務執行時間,成功任務計數和失敗任務計數。
監控庫
至于監控庫,我們將使用Metrics 。 為了簡單起見,我們將使用ConsoleReporter ,它將向控制臺報告指標。 對于生產級應用程序,我們應該使用高級報告器(即Graphite報告器)。 如果您不熟悉指標,那么建議您閱讀入門指南 。
讓我們開始吧。
擴展ThreadPoolExecutor
我們將使用ThreadPoolExecutor作為新類型的基類。 我們將其稱為MonitoredThreadPoolExecutor 。 此類將接受MetricRegistry作為其構造函數參數之一–
public class MonitoredThreadPoolExecutor extends ThreadPoolExecutor {private final MetricRegistry metricRegistry;public MonitoredThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,MetricRegistry metricRegistry) {super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);this.metricRegistry = metricRegistry;}public MonitoredThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,MetricRegistry metricRegistry) {super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);this.metricRegistry = metricRegistry;}public MonitoredThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,RejectedExecutionHandler handler,MetricRegistry metricRegistry) {super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);this.metricRegistry = metricRegistry;}public MonitoredThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler,MetricRegistry metricRegistry) {super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);this.metricRegistry = metricRegistry;} }注冊儀表以測量特定于池的參數
量表是一個值的瞬時量度。 我們將使用它來測量不同的池參數,例如活動線程數,任務隊列大小等。
在注冊儀表之前,我們需要確定如何為線程池計算指標名稱。 每個度量標準,無論是儀表,計時器還是儀表,都有一個唯一的名稱。 此名稱用于標識度量標準來源。 此處的約定是使用點分字符串,該點分字符串通常由要監視的類的完全限定名稱構成。
對于我們的線程池,我們將使用其完全限定名稱作為指標名稱的前綴。 另外,我們將添加另一個名為
poolName,客戶端將使用它來指定特定于實例的標識符。
實施這些更改后,該類如下所示–
public class MonitoredThreadPoolExecutor extends ThreadPoolExecutor {private final MetricRegistry metricRegistry;private final String metricsPrefix;public MonitoredThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,MetricRegistry metricRegistry,String poolName) {super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);this.metricRegistry = metricRegistry;this.metricsPrefix = MetricRegistry.name(getClass(), poolName);}// Rest of the constructors }現在我們準備注冊我們的儀表。 為此,我們將定義一個私有方法–
private void registerGauges() {metricRegistry.register(MetricRegistry.name(metricsPrefix, "corePoolSize"), (Gauge<Integer>) this::getCorePoolSize);metricRegistry.register(MetricRegistry.name(metricsPrefix, "activeThreads"), (Gauge<Integer>) this::getActiveCount);metricRegistry.register(MetricRegistry.name(metricsPrefix, "maxPoolSize"), (Gauge<Integer>) this::getMaximumPoolSize);metricRegistry.register(MetricRegistry.name(metricsPrefix, "queueSize"), (Gauge<Integer>) () -> getQueue().size()); }對于我們的示例,我們正在測量核心池大小,活動線程數,最大池大小和任務隊列大小。 根據監視要求,我們可以注冊更多/更少的量規來測量不同的屬性。
現在,所有構造函數都將調用此私有方法–
public MonitoredThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,MetricRegistry metricRegistry,String poolName ) {super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);this.metricRegistry = metricRegistry;this.metricsPrefix = MetricRegistry.name(getClass(), poolName);registerGauges(); }測量任務執行時間
為了衡量任務執行時間,我們將覆蓋ThreadPoolExecutor提供的兩個生命周期方法– beforeExecute和afterExecute 。
顧名思義,執行任務之前,將由執行任務的線程調用beforeExecute回調。 此回調的默認實現不執行任何操作。
同樣,在執行每個任務之后,執行任務的線程將調用afterExecute回調。 此回調的默認實現也不執行任何操作。 即使任務拋出未捕獲的RuntimeException或Error ,也會調用此回調。
我們將在beforeExecute覆蓋中啟動一個Timer ,然后將其用于afterExecute覆蓋中以獲取總的任務執行時間。 為了存儲對Timer的引用,我們將在類中引入一個新的ThreadLocal字段。
回調的實現如下:
public class MonitoredThreadPoolExecutor extends ThreadPoolExecutor {private final MetricRegistry metricRegistry;private final String metricsPrefix;private ThreadLocal<Timer.Context> taskExecutionTimer = new ThreadLocal<>();// Constructors@Overrideprotected void beforeExecute(Thread thread, Runnable task) {super.beforeExecute(thread, task);Timer timer = metricRegistry.timer(MetricRegistry.name(metricsPrefix, "task-execution"));taskExecutionTimer.set(timer.time());}@Overrideprotected void afterExecute(Runnable task, Throwable throwable) {Timer.Context context = taskExecutionTimer.get();context.stop();super.afterExecute(task, throwable);} }記錄由于未捕獲的異常而導致的失敗任務數
afterExecute回調的第二個參數是Throwable 。 如果非null,則此Throwable引用導致執行終止的未捕獲RuntimeException或Error 。 我們可以使用此信息來部分計算由于未捕獲的異常而突然終止的任務總數。
要獲得失敗任務的總數,我們必須考慮另一種情況。 使用execute方法提交的任務將拋出任何未捕獲的異常,并且它將用作afterExecute回調的第二個參數。 但是,執行者服務會吞下使用Submit方法提交的任務。 JavaDoc (重點是我的話)中對此做了清楚的解釋-
注意:如果將動作顯式地或通過諸如Submit之類的方法包含在任務(例如FutureTask)中,則這些任務對象會捕獲并維護計算異常,因此它們不會導致突然終止,并且內部異常不會傳遞給此方法。 。 如果您想使用此方法捕獲兩種類型的失敗,則可以進一步探查此類情況,例如,在此示例子類中,如果任務被中止,則打印直接原因或潛在異常。 幸運的是,同一文檔還為此提供了一種解決方案,即檢查可運行對象以查看其是否為Future ,然后獲取基礎異常。
結合這些方法,我們可以如下修改afterExecute方法–
@Override protected void afterExecute(Runnable runnable, Throwable throwable) {Timer.Context context = taskExecutionTimer.get();context.stop();super.afterExecute(runnable, throwable);if (throwable == null && runnable instanceof Future && ((Future) runnable).isDone()) {try {((Future) runnable).get();} catch (CancellationException ce) {throwable = ce;} catch (ExecutionException ee) {throwable = ee.getCause();} catch (InterruptedException ie) {Thread.currentThread().interrupt();}}if (throwable != null) {Counter failedTasksCounter = metricRegistry.counter(MetricRegistry.name(metricsPrefix, "failed-tasks"));failedTasksCounter.inc();} }計算成功任務的總數
先前的方法也可以用于計算成功任務的總數:完成的任務不會拋出任何異常或錯誤–
@Override protected void afterExecute(Runnable runnable, Throwable throwable) {// Rest of the method body .....if (throwable != null) {Counter failedTasksCounter = metricRegistry.counter(MetricRegistry.name(metricsPrefix, "failed-tasks"));failedTasksCounter.inc();} else {Counter successfulTasksCounter = metricRegistry.counter(MetricRegistry.name(metricsPrefix, "successful-tasks"));successfulTasksCounter.inc();} }結論
在本文中,我們研究了對ExecutorService實現的一些監視友好的自定義。 像往常一樣,任何建議/改進/錯誤修復將不勝感激。 至于示例源代碼,它已上傳到
Github 。
翻譯自: https://www.javacodegeeks.com/2018/05/java-tips-creating-a-monitoring-friendly-executorservice.html
總結
以上是生活随笔為你收集整理的Java技巧:创建监视友好的ExecutorService的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 联想s700一体机拆机(联想s700一体
- 下一篇: ai切换字体快捷键(ai快速切换字体快捷