java ee api_Java EE并发API教程
java ee api
這是一個(gè)示例章節(jié),摘自Francesco Marchioni編輯的WildFly上的實(shí)用Java EE 7開(kāi)發(fā) 。
本章討論了新的Java EE并發(fā)API(JSR 236) ,它概述了使用一組托管資源在Java EE容器上并行執(zhí)行任務(wù)的標(biāo)準(zhǔn)方法。 為了描述如何在您的應(yīng)用程序中使用此API,我們將遵循以下路線圖:
- 并發(fā)實(shí)用工具簡(jiǎn)介
- 如何使用ManagedExecutorService利用異步任務(wù)
- 如何使用ManagedScheduledExecutorService在特定時(shí)間安排任務(wù)
- 如何創(chuàng)建動(dòng)態(tài)代理對(duì)象,以添加Java EE環(huán)境中可用的上下文信息
- 如何使用ManagedThreadFactory創(chuàng)建托管線程以供您的應(yīng)用程序使用
并發(fā)實(shí)用工具概述
在Java EE 7之前,在Java EE容器中執(zhí)行并發(fā)任務(wù)是眾所周知的危險(xiǎn)做法,有時(shí)甚至被容器禁止:
“企業(yè)bean不得嘗試管理線程。 企業(yè)bean不得嘗試啟動(dòng),停止,掛起或恢復(fù)線程,也不能?chē)L試更改線程的優(yōu)先級(jí)或名稱(chēng)。 企業(yè)bean不得嘗試管理線程組”
實(shí)際上,通過(guò)使用J2SE API在Java EE容器中創(chuàng)建自己的非托管線程,將無(wú)法保證將容器的上下文傳播到執(zhí)行任務(wù)的線程。
唯一可用的模式是使用異步EJB或消息驅(qū)動(dòng)Bean ,以便以異步方式執(zhí)行任務(wù)。 通常,這足以用于簡(jiǎn)單的觸發(fā)和遺忘模式,但對(duì)Threads的控制仍位于Container的手中。
通過(guò)Java EE并發(fā)API(JSR 236),您可以將java.util.concurrent API的擴(kuò)展用作托管資源 ,即由Container進(jìn)行管理。 與標(biāo)準(zhǔn)J2SE編程的唯一區(qū)別是,您將從容器的JNDI樹(shù)中檢索托管資源。 但是,您仍將使用屬于java.util.concurrent包的一部分的Runnable接口或類(lèi),例如Future或ScheduledFuture 。
在下一節(jié)中,我們將從最簡(jiǎn)單的示例開(kāi)始,該示例使用ManagedExecutorService執(zhí)行異步任務(wù)。
使用ManagedExecutorService提交任務(wù)
為了創(chuàng)建我們的第一個(gè)異步執(zhí)行,我們將展示如何使用ManagedExecutorService ,它擴(kuò)展了Java SE ExecutorService以提供用于提交要在Java EE環(huán)境中執(zhí)行的任務(wù)的方法。 通過(guò)使用此托管服務(wù),容器的上下文將傳播到執(zhí)行任務(wù)的線程:ManagedExecutorService包含在應(yīng)用程序服務(wù)器的EE配置中:
<subsystem xmlns="urn:jboss:domain:ee:2.0">. . .<concurrent>. . . .<managed-executor-services><managed-executor-service name="default"jndi-name="java:jboss/ee/concurrency/executor/default"context-service="default" hung-task-threshold="60000"core-threads="5" max-threads="25" keepalive-time="5000"/></managed-executor-services>. . . .</concurrent></subsystem>為了創(chuàng)建我們的第一個(gè)示例,我們從容器的JNDI上下文中檢索ManagedExecutorService,如下所示:
@Resource(name = "DefaultManagedExecutorService")ManagedExecutorService executor;通過(guò)使用ManagedExecutorService實(shí)例,您可以提交可以實(shí)現(xiàn)java.lang.Runnable接口或java.util.concurrent.Callable接口的任務(wù)。
Callable接口提供了一種call()方法,該方法可以返回任何泛型類(lèi)型,而不是使用run()方法。
編寫(xiě)一個(gè)簡(jiǎn)單的異步任務(wù)
因此,讓我們看一個(gè)簡(jiǎn)單的Servlet示例,該示例使用ManagedExecutorService觸發(fā)異步任務(wù):
@WebServlet("/ExecutorServlet")public class ExecutorServlet extends HttpServlet {@Resource(name = "DefaultManagedExecutorService")ManagedExecutorService executor;protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException { PrintWriter writer = response.getWriter(); executor.execute(new SimpleTask()); writer.write("Task SimpleTask executed! check logs"); }}在我們的示例中,類(lèi)SimpleTask通過(guò)提供并發(fā)執(zhí)行來(lái)實(shí)現(xiàn)Runnable接口。
public class SimpleTask implements Runnable {@Overridepublic void run() {System.out.println("Thread started.");}}從異步任務(wù)中檢索結(jié)果
上述任務(wù)是腳踏實(shí)地的好選擇; 正如您可能已經(jīng)注意到的那樣,無(wú)法攔截Task的返回值。 另外,在使用Runnable時(shí),您必須使用不受限制的異常(如果run( )拋出了一個(gè)檢查的異常,誰(shuí)會(huì)捕獲它呢?)您無(wú)法將run()調(diào)用封裝在處理程序中,因?yàn)槟鷽](méi)有編寫(xiě)調(diào)用它的代碼)。
如果你想克服這個(gè)限制,那么你可以實(shí)現(xiàn)一個(gè)java.util.concurrent.Callable接口相反,它提交給ExecutorService的,并與等待結(jié)果FutureTask.isDone()的返回ExecutorService.submit()
讓我們看一下Servlet的新版本,它捕獲了一個(gè)名為CallableTask的Task的結(jié)果:
@WebServlet("/CallableExecutorServlet")public class CallableExecutorServlet extends HttpServlet {@Resource(name = "DefaultManagedExecutorService")ManagedExecutorService executor;protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {PrintWriter writer = response.getWriter();Future<Long> futureResult = executor.submit(new CallableTask(5)); while (!futureResult.isDone()) {// Waittry {Thread.sleep(100);} catch (InterruptedException e) {e.printStackTrace();}}try {writer.write("Callable Task returned " +futureResult.get());} catch ( Exception e) {e.printStackTrace();} }}從代碼中可以看到,我們正在使用isDone( )方法輪詢?nèi)蝿?wù)完成情況。 任務(wù)完成后,我們可以調(diào)用FutureTask的get( )方法并獲取返回值。
現(xiàn)在,讓我們看一下我們的CallableTask實(shí)現(xiàn),在我們的示例中,該實(shí)現(xiàn)返回一個(gè)數(shù)字總和的值:
public class CallableTask implements Callable<Long> {private int id;public CallableTask(int id) {this.id = id;}public Long call() {long summation = 0;for (int i = 1; i <= id; i++) {summation += i;}return new Long(summation);}}在我們的示例中,我們要做的就是實(shí)現(xiàn)call方法,該方法返回Integer,該Integer最終將通過(guò)Future接口的get方法來(lái)收集。
如果您的Callable任務(wù)引發(fā)了Exception,則FutureTask.get()也將引發(fā)Exception,并且可以使用Exception.getCause()來(lái)訪問(wèn)原始Exception。
監(jiān)視未來(lái)任務(wù)的狀態(tài)
在上面的示例中,我們正在使用FutureTask.isDone()方法檢查Future Task的狀態(tài)。 如果需要對(duì)Future Task生命周期進(jìn)行更準(zhǔn)確的控制,則可以實(shí)現(xiàn)javax.enterprise.concurrent.ManagedTaskListener實(shí)例,以便接收生命周期事件通知。
這是我們?cè)鰪?qiáng)的Task,它實(shí)現(xiàn)了taskSubmitting , taskStarting , taskDone和taskAborted方法:
public class CallableListenerTask implements Callable<Long>,ManagedTaskListener {private int id;public CallableListenerTask(int id) {this.id = id;}public Long call() {long summation = 0;for (int i = 1; i <= id; i++) {summation += i;}return new Long(summation);}public void taskSubmitted(Future<?> f, ManagedExecutorService es,Object obj) {System.out.println("Task Submitted! "+f);}public void taskDone(Future<?> f, ManagedExecutorService es, Object obj,Throwable exc) {System.out.println("Task DONE! "+f);}public void taskStarting(Future<?> f, ManagedExecutorService es,Object obj) {System.out.println("Task Starting! "+f);}public void taskAborted(Future<?> f, ManagedExecutorService es,Object obj, Throwable exc) {System.out.println("Task Aborted! "+f);}}生命周期通知按以下順序調(diào)用:
- taskSubmitting :關(guān)于將任務(wù)提交給執(zhí)行者
- taskStarting :在實(shí)際啟動(dòng)任務(wù)之前
- taskDone :任務(wù)完成時(shí)觸發(fā)
- taskAborted :當(dāng)用戶調(diào)用futureResult.cancel()時(shí)觸發(fā)
在異步任務(wù)中使用事務(wù)
在分布式Java EE環(huán)境中,要確保并發(fā)任務(wù)執(zhí)行也能正確執(zhí)行事務(wù),這是一項(xiàng)艱巨的任務(wù)。 Java EE并發(fā)API依靠Java事務(wù)API(JTA)通過(guò)javax.transaction.UserTransaction來(lái)支持其組件頂部的事務(wù),該操作用于顯式劃分事務(wù)邊界。
以下代碼顯示可調(diào)用任務(wù)如何從JNDI樹(shù)中檢索UserTransaction,然后啟動(dòng)并提交與外部組件(EJB)的事務(wù):
public class TxCallableTask implements Callable<Long> {long id;public TxCallableTask(long i) {this.id = i;}public Long call() {long value = 0;UserTransaction tx = lookupUserTransaction();SimpleEJB ejb = lookupEJB();try {tx.begin();value = ejb.calculate(id); // Do Transactions heretx.commit();} catch (Exception e) {e.printStackTrace();try { tx.rollback(); } catch (Exception e1) { e1.printStackTrace(); }}return value;}// Lookup EJB and UserTransaction here ..}這種方法的主要局限性在于,盡管上下文對(duì)象可以開(kāi)始,提交或回滾事務(wù),但是這些對(duì)象無(wú)法加入父組件事務(wù)。
使用ManagedScheduledExecutorService安排任務(wù)
ManagedScheduledExecutorService擴(kuò)展了Java SE ScheduledExecutorService以提供用于提交延遲或定期任務(wù)以在Java EE環(huán)境中執(zhí)行的方法。 至于其他托管對(duì)象,您可以通過(guò)JNDI查找獲得ExecutorService的實(shí)例:
@Resource(name ="DefaultManagedScheduledExecutorService") ManagedScheduledExecutorService scheduledExecutor;一旦有了對(duì)ExecutorService的引用,便可以在其上調(diào)用schedule方法以提交延遲或定期的任務(wù)。 就像ManagedExecutors一樣,ScheduledExecutors也可以綁定到Runnable接口或Callable接口。 下一節(jié)將介紹這兩種方法。
提交一個(gè)簡(jiǎn)單的ScheduledTask
以最簡(jiǎn)單的形式提交計(jì)劃任務(wù)需要設(shè)置計(jì)劃表達(dá)式并將其傳遞給ManagedSchedulerExecutor服務(wù)。 在此示例中,由于調(diào)用了schedule( )方法,我們將創(chuàng)建一個(gè)延遲的任務(wù),該任務(wù)僅在10秒內(nèi)運(yùn)行一次:
@WebServlet("/ScheduledExecutor") public class ScheduledExecutor extends HttpServlet {@Resource(name ="DefaultManagedScheduledExecutorService")ManagedScheduledExecutorService scheduledExecutor;protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {PrintWriter writer = response.getWriter(); ScheduledFuture<?> futureResult = scheduledExecutor.schedule(new SimpleTask(), 10,TimeUnit.SECONDS);writer.write("Waiting 10 seconds before firing the task");}}如果需要重復(fù)計(jì)劃任務(wù),則可以使用scheduleAtFixedRate方法,該方法將觸發(fā)任務(wù)之前的時(shí)間,每次重復(fù)執(zhí)行之前的時(shí)間和TimeUnit作為輸入。 請(qǐng)參閱以下示例,該示例在初始延遲1秒后每10秒秒調(diào)度一次任務(wù):
ScheduledFuture<?> futureResult = scheduledExecutor. scheduleAtFixedRate (new SimpleTask(),1, 10,TimeUnit.SECONDS);捕獲計(jì)劃執(zhí)行的結(jié)果
如果需要從計(jì)劃執(zhí)行的任務(wù)中獲取返回值,則可以使用schedule方法返回的ScheduledFuture接口。 這是一個(gè)示例,它捕獲了我們先前編碼的階乘示例Task的結(jié)果:
ScheduledFuture<Long> futureResult =scheduledExecutor.schedule(new CallableTask(5), 5, TimeUnit.SECONDS); while (!futureResult.isDone()) { try {Thread.sleep(100); // Wait} catch (InterruptedException e) { e.printStackTrace();}} try {writer.write("Callable Task returned " +futureResult.get());} catch ( Exception e) {e.printStackTrace();}使用ManagedThreadFactory創(chuàng)建托管線程
javax.enterprise.concurrent.ManagedThreadFactory等效于J2SE ThreadFactory,可用于創(chuàng)建自己的線程。 為了使用ManagedThreadFactory,您需要照常從JNDI注入它:
@Resource(name ="DefaultManagedThreadFactory")ManagedThreadFactory factory;從工廠創(chuàng)建自己的托管線程(與ManagedExecutorService創(chuàng)建的托管線程相比)的主要優(yōu)點(diǎn)是,您可以設(shè)置一些典型的線程屬性(例如名稱(chēng)或優(yōu)先級(jí)),并且可以創(chuàng)建J2SE Executor服務(wù)的托管版本。 。 以下示例將向您展示如何。
從工廠創(chuàng)建托管線程
在此示例中,我們將使用DefaultManagedThreadFactory創(chuàng)建并啟動(dòng)新線程。 從代碼中可以看到,一旦我們創(chuàng)建了Thread類(lèi)的實(shí)例,就可以為其設(shè)置有意義的名稱(chēng)并將其與優(yōu)先級(jí)相關(guān)聯(lián)。 然后,我們將線程與我們的SimpleTask關(guān)聯(lián),該SimpleTask在控制臺(tái)上記錄一些數(shù)據(jù):
@WebServlet("/FactoryExecutorServlet")public class FactoryExecutorServlet extends HttpServlet {@Resource(name ="DefaultManagedThreadFactory")ManagedThreadFactory factory;protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {PrintWriter writer = response.getWriter();Thread thread = factory.newThread(new SimpleTask());thread.setName("My Managed Thread");thread.setPriority(Thread.MAX_PRIORITY);thread.start();writer.write("Thread started. Check logs");}}現(xiàn)在檢查您的服務(wù)器日志:毫無(wú)疑問(wèn),檢測(cè)自己創(chuàng)建的線程的輸出會(huì)更容易:
14:44:31,838 INFO [stdout] (My Managed Thread) Simple Task started在分析線程轉(zhuǎn)儲(chǔ)時(shí),收集有關(guān)線程名稱(chēng)的信息特別有用,并且線程名稱(chēng)是跟蹤線程執(zhí)行路徑的唯一線索。
使用托管執(zhí)行器服務(wù)
java.util.concurrent.ExecutorService接口是一種標(biāo)準(zhǔn)的J2SE機(jī)制,已大大取代了使用直接線程執(zhí)行異步執(zhí)行的方法。 與標(biāo)準(zhǔn)Thread機(jī)制相比,ExecutorService的主要優(yōu)點(diǎn)之一是您可以定義一個(gè)實(shí)例池來(lái)執(zhí)行您的作業(yè),并且您可以采用一種更安全的方式來(lái)中斷您的作業(yè)。
在企業(yè)應(yīng)用程序中使用ExecutorService很簡(jiǎn)單:只需將Managed ThreadFactory的實(shí)例傳遞給ExecutorService的構(gòu)造函數(shù)即可。 在以下示例中,我們使用SingletonEJB在其方法getThreadPoolExecutor中將ExecutorService作為服務(wù)提供:
@Singletonpublic class PoolExecutorEJB {private ExecutorService threadPoolExecutor = null;int corePoolSize = 5;int maxPoolSize = 10;long keepAliveTime = 5000;@Resource(name = "DefaultManagedThreadFactory")ManagedThreadFactory factory;public ExecutorService getThreadPoolExecutor() {return threadPoolExecutor;}@PostConstructpublic void init() { threadPoolExecutor = new ThreadPoolExecutor(corePoolSize, maxPoolSize, keepAliveTime, TimeUnit.SECONDS,new ArrayBlockingQueue<Runnable>(10), factory); }@PreDestroypublic void releaseResources() {threadPoolExecutor.shutdown(); }}ThreadPoolExecutor在其構(gòu)造函數(shù)中包含兩個(gè)核心參數(shù): corePoolSize和maximumPoolSize 。 當(dāng)在方法中提交新任務(wù)且運(yùn)行的線程數(shù)少于corePoolSize時(shí),即使其他工作線程處于空閑狀態(tài),也會(huì)創(chuàng)建一個(gè)新線程來(lái)處理請(qǐng)求。 如果運(yùn)行的線程數(shù)大于corePoolSize但小于maximumPoolSize,則僅在隊(duì)列已滿時(shí)才創(chuàng)建新線程。
然后,如以下示例所示, ExecutorService用于啟動(dòng)新的異步任務(wù),其中在Servlet中提供了Runnable的匿名實(shí)現(xiàn):
@WebServlet("/FactoryExecutorServiceServlet") public class FactoryExecutorServiceServlet extends HttpServlet {@EJB PoolExecutorEJB ejb;protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {final PrintWriter writer = response.getWriter();writer.write("Invoking ExecutorService. Check Logs.");ExecutorService executorService = ejb.getThreadPoolExecutor();executorService.execute(new Runnable() {public void run() {System.out.println("Message from your Executor!");}});}}PoolExecutorEJB終止后,ExecutorService也將在Singleton Bean的@PreDestroy方法中完成,該方法將調(diào)用ThreadPoolExecutor的shutdown()方法。 ExecutorService不會(huì)立即關(guān)閉,但將不再接受新任務(wù),并且一旦所有線程都完成了當(dāng)前任務(wù),ExecutorService就會(huì)關(guān)閉。
使用動(dòng)態(tài)上下文對(duì)象
動(dòng)態(tài)代理是有用的Java調(diào)整,可用于使用java.lang.reflect.Proxy API創(chuàng)建接口的動(dòng)態(tài)實(shí)現(xiàn)。 您可以將動(dòng)態(tài)代理用于多種不同目的,例如數(shù)據(jù)庫(kù)連接和事務(wù)管理,用于單元測(cè)試的動(dòng)態(tài)模擬對(duì)象以及其他類(lèi)似于AOP的方法攔截目的。
在Java EE環(huán)境中,可以使用一種稱(chēng)為動(dòng)態(tài)上下文代理的特殊類(lèi)型的動(dòng)態(tài)代理 。
動(dòng)態(tài)上下文對(duì)象最有趣的功能是將JNDI命名上下文,類(lèi)加載器和安全性上下文傳播 到代理對(duì)象 。 在將J2SE實(shí)現(xiàn)引入企業(yè)應(yīng)用程序并希望在容器的上下文中運(yùn)行它們的情況下,這很有用。
以下代碼片段顯示了如何將上下文對(duì)象注入到容器中。 由于上下文對(duì)象還需要您可以向其提交任務(wù)的ExecutorService,因此也會(huì)注入ThreadFactory:
@Resource(name ="DefaultContextService")ContextService cs;@Resource(name ="DefaultManagedThreadFactory")ManagedThreadFactory factory;在下一節(jié)中,我們將展示如何使用修訂版的Singleton EJB創(chuàng)建動(dòng)態(tài)上下文對(duì)象。
執(zhí)行上下文任務(wù)
下面的示例演示如何為Callable任務(wù)觸發(fā)上下文代理。 為此,我們將同時(shí)需要ManagedThreadfactory和ContextService。 我們的ContextExecutor EJB首先將在其init方法中創(chuàng)建ThreadPoolExecutor。 然后,在Submit方法內(nèi),創(chuàng)建可調(diào)用任務(wù)的新上下文代理,并將其提交給ThreadPool執(zhí)行器。
這是我們的ContextExecutorEJB的代碼:
@Singletonpublic class ContextExecutorEJB {private ExecutorService threadPoolExecutor = null;@Resource(name = "DefaultManagedThreadFactory")ManagedThreadFactory factory;@Resource(name = "DefaultContextService")ContextService cs;public ExecutorService getThreadPoolExecutor() {return threadPoolExecutor;}@PostConstructpublic void init() {threadPoolExecutor = new ThreadPoolExecutor(5, 10, 5, TimeUnit.SECONDS,new ArrayBlockingQueue>Runnable>(10), factory);}public Future>Long> submitJob(Callable>Long> task) {Callable>Long> proxy = cs.createContextualProxy(task, Callable.class);return getThreadPoolExecutor().submit(proxy);}}CallableTask類(lèi)比我們的第一個(gè)示例復(fù)雜一點(diǎn),因?yàn)樗鼘⒂涗浻嘘P(guān)javax.security.auth.Subject信息,該信息包含在調(diào)用方線程中:
public class CallableTask implements Callable<Long> {private int id;public CallableTask(int id) {this.id = id;}public Long call() {long summation = 0;// Do calculationSubject subject = Subject.getSubject(AccessController.getContext());logInfo(subject, summation); // Log Traces Subject identityreturn new Long(summation);}private void logInfo(Subject subject, long summation) { . . }}以下是向我們的SingletonEJB提交新的上下文任務(wù)的簡(jiǎn)單方法:
@EJB ContextExecutorEJB ejb; protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException { CallableTask task = new CallableTask(5);ejb.submitJob(task);}建立你的例子
為了對(duì)Java EE API使用并發(fā)實(shí)用程序,您的應(yīng)用程序需要以下Maven依賴(lài)項(xiàng):
<dependency><groupId>org.jboss.spec.javax.enterprise.concurrent</groupId><artifactId>jboss-concurrency-api_1.0_spec</artifactId><version>1.0.0.Final</version></dependency>
此摘錄摘自《 WildFly上的實(shí)用Java EE 7開(kāi)發(fā) 》一書(shū),該手冊(cè)是動(dòng)手實(shí)踐指南,其中介紹了最新WildFly應(yīng)用程序服務(wù)器上Java EE 7開(kāi)發(fā)的所有領(lǐng)域。 涵蓋了從基礎(chǔ)組件(EJB,Servlet,CDI,JPA)到Java Enterprise Edition 7中定義的新技術(shù)堆棧的所有內(nèi)容,因此包括新的Batch API,JSON-P Api,并發(fā)API,Web套接字,JMS 2.0 API,核心Web服務(wù)堆棧(JAX-WS,JAX-RS)。 帶有Arquillian框架和Security API的測(cè)試區(qū)域完成了本書(shū)中討論的主題列表。
翻譯自: https://www.javacodegeeks.com/2014/07/java-ee-concurrency-api-tutorial.html
java ee api
總結(jié)
以上是生活随笔為你收集整理的java ee api_Java EE并发API教程的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: 信号肽是什么 信号肽是什么意思
- 下一篇: 四红补血粥的营养价值 四红补血粥的营养价