java线程池使用
在Java1.5中提供了一個(gè)非常高效實(shí)用的多線程包:java.util.concurrent,提供了大量高級(jí)工具,可以幫助開發(fā)者編寫高效易維護(hù)、結(jié)構(gòu)清晰的Java多線程程序。
線程池
之前我們?cè)谑褂枚嗑€程都是用Thread的start()來創(chuàng)建啟動(dòng)一個(gè)線程,但是在實(shí)際開發(fā)中,如果每個(gè)請(qǐng)求到達(dá)就創(chuàng)建一個(gè)新線程,開銷是相當(dāng)大的。服務(wù)器在創(chuàng)建和銷毀線程上花費(fèi)的時(shí)間和消耗的系統(tǒng)資源都相當(dāng)大,甚至可能要比在處理實(shí)際的用請(qǐng)求的時(shí)間和資源要多的多。除了創(chuàng)建和銷毀線程的開銷之外,活動(dòng)的線程也需要消耗系統(tǒng)資源。如果在一個(gè)jvm里創(chuàng)建太多的線程,可能會(huì)使系統(tǒng)由于過度消耗內(nèi)存或“切換過度”而導(dǎo)致系統(tǒng)資源不足。這就引入了線程池概念。
線程池的原理其實(shí)就是對(duì)多線程的一個(gè)管理,為了實(shí)現(xiàn)異步機(jī)制的一種方法,其實(shí)就是多個(gè)線程執(zhí)行多個(gè)任務(wù),最終這些線程通過線程池進(jìn)行管理…不用手動(dòng)去維護(hù)…一次可以處理多個(gè)任務(wù),這樣就可以迅速的進(jìn)行相應(yīng)…比如說一個(gè)網(wǎng)站成為了熱點(diǎn)網(wǎng)站,那么對(duì)于大量的點(diǎn)擊量,就必須要對(duì)每一次的點(diǎn)擊做出迅速的處理,這樣才能達(dá)到更好的交互效果…這樣就需要多個(gè)線程去處理這些請(qǐng)求,以便能夠更好的提供服務(wù)…
在java.util.concurrent包下,提供了一系列與線程池相關(guān)的類。合理的使用線程池,可以帶來多個(gè)好處:
(1) 降低資源消耗。通過重復(fù)利用已創(chuàng)建的線程降低線程創(chuàng)建和銷毀造成的消耗;
(2) 提高響應(yīng)速度。當(dāng)任務(wù)到達(dá)時(shí),任務(wù)可以不需要等到線程創(chuàng)建就能立即執(zhí)行;
(3) 提高線程的可管理性。線程是稀缺資源,如果無限制的創(chuàng)建,不僅會(huì)消耗系統(tǒng)資源,還會(huì)降低系統(tǒng)的穩(wěn)定性,使用線程池可以進(jìn)行統(tǒng)一的分配,調(diào)優(yōu)和監(jiān)控。
線程池可以應(yīng)對(duì)突然大爆發(fā)量的訪問,通過有限個(gè)固定線程為大量的操作服務(wù),減少創(chuàng)建和銷毀線程所需的時(shí)間。
使用線程池:
1、創(chuàng)建線程池
2、創(chuàng)建任務(wù)
3、執(zhí)行任務(wù)
4、關(guān)閉線程池
創(chuàng)建線程池
一般通過工具類Executors的靜態(tài)方法來獲取線程池或靜態(tài)方法。介紹四種常用創(chuàng)建方法
ExecutorService service1 = Executors.newSingleThreadExecutor();
說明: 單例線程,表示在任意的時(shí)間段內(nèi),線程池中只有一個(gè)線程在工作
ExecutorService service2 = Executors.newCacheThreadPool();
說明: 緩存線程池,先查看線程池中是否有當(dāng)前執(zhí)行線程的緩存,如果有就resue(復(fù)用),如果沒有,那么需要?jiǎng)?chuàng)建一個(gè)線程來完成當(dāng)前的調(diào)用.并且這類線程池只能完成一些生存期很短的一些任務(wù).并且這類線程池內(nèi)部規(guī)定能resue(復(fù)用)的線程,空閑的時(shí)間不能超過60s,一旦超過了60s,就會(huì)被移出線程池
ExecutorService service3 = Executors.newFixedThreadPool(10);
說明: 固定型線程池,和newCacheThreadPool()差不多,也能夠?qū)崿F(xiàn)resue(復(fù)用),但是這個(gè)池子規(guī)定了線程的最大數(shù)量,也就是說當(dāng)池子有空閑時(shí),那么新的任務(wù)將會(huì)在空閑線程中被執(zhí)行,一旦線程池內(nèi)的線程都在進(jìn)行工作,那么新的任務(wù)就必須等待線程池有空閑的時(shí)候才能夠進(jìn)入線程池,其他的任務(wù)繼續(xù)排隊(duì)等待.這類池子沒有規(guī)定其空閑的時(shí)間到底有多長.這一類的池子更適用于服務(wù)器.
ExecutorService service4 = Executors.newScheduledThreadPool(10);
說明: 調(diào)度型線程池,調(diào)度型線程池會(huì)根據(jù)Scheduled(任務(wù)列表)進(jìn)行延遲執(zhí)行,或者是進(jìn)行周期性的執(zhí)行.適用于一些周期性的工作.
package com.reapal.brave.main;import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors;/*** Created by jack-cooper on 2017/2/23.*/ public class Test {public static void main(String[] args) {ExecutorService service = Executors.newCachedThreadPool();service.submit(new Runnable() {public void run() {while(true){System.out.println("hello world !");try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}}}});System.out.println(" ===> main Thread execute here ! " );} }創(chuàng)建任務(wù)
任務(wù)分為兩種:一種是有返回值的( callable ),一種是沒有返回值的( runnable ). Callable與 Future 兩功能是Java在后續(xù)版本中為了適應(yīng)多并法才加入的,Callable是類似于Runnable的接口,實(shí)現(xiàn)Callable接口的類和實(shí)現(xiàn)Runnable的類都是可被其他線程執(zhí)行的任務(wù)。
- 無返回值的任務(wù)就是一個(gè)實(shí)現(xiàn)了runnable接口的類.使用run方法.
- 有返回值的任務(wù)是一個(gè)實(shí)現(xiàn)了callable接口的類.使用call方法.
Callable和Runnable的區(qū)別如下:
- Callable定義的方法是call,而Runnable定義的方法是run。
- Callable的call方法可以有返回值,而Runnable的run方法不能有返回值。
- Callable的call方法可拋出異常,而Runnable的run方法不能拋出異常。
Future 介紹
Future表示異步計(jì)算的結(jié)果,它提供了檢查計(jì)算是否完成的方法,以等待計(jì)算的完成,并檢索計(jì)算的結(jié)果。Future的cancel方法可以取消任務(wù)的執(zhí)行,它有一布爾參數(shù),參數(shù)為 true 表示立即中斷任務(wù)的執(zhí)行,參數(shù)為 false 表示允許正在運(yùn)行的任務(wù)運(yùn)行完成。Future的 get 方法等待計(jì)算完成,獲取計(jì)算結(jié)果。
package com.reapal.brave.main;import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future;public class CallableAndFuture {public static class MyCallable implements Callable{private int flag = 0;public MyCallable(int flag){this.flag = flag;}public String call() throws Exception{if (this.flag == 0){return "flag = 0";}if (this.flag == 1){try {while (true) {System.out.println("looping.");Thread.sleep(2000);}} catch (InterruptedException e) {System.out.println("Interrupted");}return "false";} else {throw new Exception("Bad flag value!");}}}public static void main(String[] args) {// 定義3個(gè)Callable類型的任務(wù)MyCallable task1 = new MyCallable(0);MyCallable task2 = new MyCallable(1);MyCallable task3 = new MyCallable(2);// 創(chuàng)建一個(gè)執(zhí)行任務(wù)的服務(wù)ExecutorService es = Executors.newFixedThreadPool(3);try {// 提交并執(zhí)行任務(wù),任務(wù)啟動(dòng)時(shí)返回了一個(gè)Future對(duì)象,// 如果想得到任務(wù)執(zhí)行的結(jié)果或者是異常可對(duì)這個(gè)Future對(duì)象進(jìn)行操作Future future1 = es.submit(task1);// 獲得第一個(gè)任務(wù)的結(jié)果,如果調(diào)用get方法,當(dāng)前線程會(huì)等待任務(wù)執(zhí)行完畢后才往下執(zhí)行System.out.println("task1: " + future1.get());Future future2 = es.submit(task2);// 等待5秒后,再停止第二個(gè)任務(wù)。因?yàn)榈诙€(gè)任務(wù)進(jìn)行的是無限循環(huán)Thread.sleep(5000);System.out.println("task2 cancel: " + future2.cancel(true));// 獲取第三個(gè)任務(wù)的輸出,因?yàn)閳?zhí)行第三個(gè)任務(wù)會(huì)引起異常// 所以下面的語句將引起異常的拋出Future future3 = es.submit(task3);System.out.println("task3: " + future3.get());} catch (Exception e){System.out.println(e.toString());}// 停止任務(wù)執(zhí)行服務(wù)es.shutdownNow();} }執(zhí)行任務(wù)
通過java.util.concurrent.ExecutorService接口對(duì)象來執(zhí)行任務(wù),該對(duì)象有兩個(gè)方法可以執(zhí)行任務(wù)execute和submit。execute這種方式提交沒有返回值,也就不能判斷是否執(zhí)行成功。submit這種方式它會(huì)返回一個(gè)Future對(duì)象,通過future的get方法來獲取返回值,get方法會(huì)阻塞住直到任務(wù)完成。
execute與submit區(qū)別:
- 接收的參數(shù)不一樣
- submit有返回值,而execute沒有
- submit方便Exception處理
- execute是Executor接口中唯一定義的方法;submit是ExecutorService(該接口繼承Executor)中定義的方法
關(guān)閉線程池
線程池使用完畢,需要對(duì)其進(jìn)行關(guān)閉,有兩種方法
shutdown()
說明:shutdown并不是直接關(guān)閉線程池,而是不再接受新的任務(wù)…如果線程池內(nèi)有任務(wù),那么把這些任務(wù)執(zhí)行完畢后,關(guān)閉線程池
shutdownNow()
說明:這個(gè)方法表示不再接受新的任務(wù),并把任務(wù)隊(duì)列中的任務(wù)直接移出掉,如果有正在執(zhí)行的,嘗試進(jìn)行停止
綜合使用案例(FutureTask)
import java.util.concurrent.*;/*** Author : Slogen* AddTime : 17/6/4* Email : huangjian13@meituan.com*/ public class CallDemo {public static void main(String[] args) throws ExecutionException, InterruptedException {/*** 第一種方式:Future + ExecutorService* Task task = new Task();* ExecutorService service = Executors.newCachedThreadPool();* Future<Integer> future = service.submit(task1);* service.shutdown();*//*** 第二種方式: FutureTask + ExecutorService* ExecutorService executor = Executors.newCachedThreadPool();* Task task = new Task();* FutureTask<Integer> futureTask = new FutureTask<Integer>(task);* executor.submit(futureTask);* executor.shutdown();*//*** 第三種方式:FutureTask + Thread*/// 2. 新建FutureTask,需要一個(gè)實(shí)現(xiàn)了Callable接口的類的實(shí)例作為構(gòu)造函數(shù)參數(shù)FutureTask<Integer> futureTask = new FutureTask<Integer>(new Task());// 3. 新建Thread對(duì)象并啟動(dòng)Thread thread = new Thread(futureTask);thread.setName("Task thread");thread.start();try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("Thread [" + Thread.currentThread().getName() + "] is running");// 4. 調(diào)用isDone()判斷任務(wù)是否結(jié)束if(!futureTask.isDone()) {System.out.println("Task is not done");try {Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();}}int result = 0;try {// 5. 調(diào)用get()方法獲取任務(wù)結(jié)果,如果任務(wù)沒有執(zhí)行完成則阻塞等待result = futureTask.get();} catch (Exception e) {e.printStackTrace();}System.out.println("result is " + result);}// 1. 繼承Callable接口,實(shí)現(xiàn)call()方法,泛型參數(shù)為要返回的類型static class Task implements Callable<Integer> {public Integer call() throws Exception {System.out.println("Thread [" + Thread.currentThread().getName() + "] is running");int result = 0;for(int i = 0; i < 100;++i) {result += i;}Thread.sleep(3000);return result;}} }綜合使用案例一
需求:從數(shù)據(jù)庫中獲取url,并利用httpclient循環(huán)訪問url地址,并對(duì)返回結(jié)果進(jìn)行操作
分析:由于是循環(huán)的對(duì)多個(gè)url進(jìn)行訪問并獲取數(shù)據(jù),為了執(zhí)行的效率,考慮使用多線程,url數(shù)量未知如果每個(gè)任務(wù)都創(chuàng)建一個(gè)線程將消耗大量的系統(tǒng)資源,最后決定使用線程池。
public class GetMonitorDataService {private Logger logger = LoggerFactory.getLogger(GetMonitorDataService.class);private MonitorProjectUrlMapper groupUrlMapper;private MonitorDetailBatchInsertMapper monitorDetailBatchInsertMapper;public void sendData(){//調(diào)用dao查詢所有urlMonitorProjectUrlExample example=new MonitorProjectUrlExample();List<MonitorProjectUrl> list=groupUrlMapper.selectByExample(example);logger.info("此次查詢數(shù)據(jù)庫中監(jiān)控url個(gè)數(shù)為"+list.size());//獲取系統(tǒng)處理器個(gè)數(shù),作為線程池?cái)?shù)量int nThreads=Runtime.getRuntime().availableProcessors();//定義一個(gè)裝載多線程返回值的集合List<MonitorDetail> result= Collections.synchronizedList(new ArrayList<MonitorDetail>());//創(chuàng)建線程池,這里定義了一個(gè)創(chuàng)建線程池的工具類,避免了創(chuàng)建多個(gè)線程池,ThreadPoolFactoryUtil可以使用單例模式設(shè)計(jì)ExecutorService executorService = ThreadPoolFactoryUtil.getExecutorService(nThreads);//遍歷數(shù)據(jù)庫取出的urlif(list!=null&&list.size()>0) {for (MonitorProjectUrl monitorProjectUrl : list) {String url = monitorProjectUrl.getMonitorUrl();//創(chuàng)建任務(wù)ThreadTask threadTask = new ThreadTask(url, result);//執(zhí)行任務(wù)executorService.execute(threadTask);//注意區(qū)分shutdownNowexecutorService.shutdown();try {//等待直到所有任務(wù)完成executorService.awaitTermination(Long.MAX_VALUE, TimeUnit.MINUTES);} catch (InterruptedException e) {e.printStackTrace();}}//對(duì)數(shù)據(jù)進(jìn)行操作saveData(result);}}任務(wù)
public class ThreadTask implements Runnable{//這里實(shí)現(xiàn)runnable接口private String url;private List<MonitorDetail> list;public ThreadTask(String url,List<MonitorDetail> list){this.url=url;this.list=list;}//把獲取的數(shù)據(jù)進(jìn)行處理public void run() {MonitorDetail detail = HttpClientUtil.send(url, MonitorDetail.class);list.add(detail);}}綜合使用案例二(countDownLatch)
package com.br.lucky.utils;import java.util.ArrayList; import java.util.List; import java.util.concurrent.*;/*** @author 10400* @create 2018-04-19 20:38*/ public class FatureTest {//1、配置線程池private static ExecutorService es = Executors.newFixedThreadPool(20);//2、封裝響應(yīng)Featureclass BizResult{public String orderId;public String data;public String getOrderId() {return orderId;}public void setOrderId(String orderId) {this.orderId = orderId;}public String getData() {return data;}public void setData(String data) {this.data = data;}}//3、實(shí)現(xiàn)Callable接口class BizTask implements Callable {private String orderId;private Object data;//可以用其他方式private CountDownLatch countDownLatch;public BizTask(String orderId, Object data, CountDownLatch countDownLatch) {this.orderId = orderId;this.data = data;this.countDownLatch = countDownLatch;}public Object call() {try {//todo businessSystem.out.println("當(dāng)前線程Id = " + this.orderId);BizResult br = new BizResult();br.setOrderId(this.orderId);br.setData("some key about your business" + this.getClass());return br;}catch (Exception e){e.printStackTrace();}finally {//線程結(jié)束時(shí),將計(jì)時(shí)器減一countDownLatch.countDown();}return null;}}/*** 業(yè)務(wù)邏輯入口*/public List<Future> beginBusiness() throws InterruptedException {//模擬批量業(yè)務(wù)數(shù)據(jù)List<String> list = new ArrayList<>();for (int i = 0 ; i < 1000 ; i++) {list.add(String.valueOf(i));}//設(shè)置計(jì)數(shù)器CountDownLatch countDownLatch = new CountDownLatch(list.size());//接收多線程響應(yīng)結(jié)果List<Future> resultList = new ArrayList<>();//begin threadfor( int i = 0 ,size = list.size() ; i<size; i++){//todo something befor threadresultList.add(es.submit(new BizTask(list.get(i), null, countDownLatch)));}//wait finishcountDownLatch.await();return resultList;}public static void main(String[] args) throws InterruptedException {FatureTest ft = new FatureTest();List<Future> futures = ft.beginBusiness();System.out.println("futures.size() = " + futures.size());//todo some operateSystem.out.println(" ==========================end========================= " );}}綜合使用案例三(future.get())
package com.br.lucky.utils;import java.util.ArrayList; import java.util.List; import java.util.concurrent.*;/*** @author 10400* @create 2018-04-19 20:38*/ public class FatureTest {//1、配置線程池private static ExecutorService es = Executors.newFixedThreadPool(20);//2、封裝響應(yīng)Featureclass BizResult{public String orderId;public String data;public String getOrderId() {return orderId;}public void setOrderId(String orderId) {this.orderId = orderId;}public String getData() {return data;}public void setData(String data) {this.data = data;}}//3、實(shí)現(xiàn)Callable接口class BizTask implements Callable {private String orderId;private Object data;public BizTask(String orderId, Object data) {this.orderId = orderId;this.data = data;}public Object call() {try {//todo businessSystem.out.println("當(dāng)前線程Id = " + this.orderId);BizResult br = new BizResult();br.setOrderId(this.orderId);br.setData("some key about your business" + this.getClass());Thread.sleep(3000);return br;}catch (Exception e){e.printStackTrace();}return null;}}/*** 業(yè)務(wù)邏輯入口*/public List<Future> beginBusiness() throws InterruptedException, ExecutionException {//模擬批量業(yè)務(wù)數(shù)據(jù)List<String> list = new ArrayList<>();for (int i = 0 ; i < 100 ; i++) {list.add(String.valueOf(i));}//接收多線程響應(yīng)結(jié)果List<Future> resultList = new ArrayList<>();//begin threadfor( int i = 0 ,size = list.size() ; i<size; i++){//todo something befor threadFuture future = es.submit(new BizTask(list.get(i), null));resultList.add(future);}for (Future f : resultList) {f.get();}System.out.println(" =====多線程執(zhí)行結(jié)束====== ");//wait finishreturn resultList;}public static void main(String[] args) throws InterruptedException, ExecutionException {FatureTest ft = new FatureTest();List<Future> futures = ft.beginBusiness();System.out.println("futures.size() = " + futures.size());//todo some operateSystem.out.println(" ==========================end========================= " );}}https://yq.aliyun.com/articles/5952
http://www.importnew.com/25286.html
作者:jackcooper
鏈接:https://www.jianshu.com/p/edd7cb4eafa0
來源:簡書
總結(jié)
- 上一篇: 泛型:工作原理及其重要性
- 下一篇: 聊聊并发(三)——JAVA线程池的分析和