ThreadPoolExecutor里面4种拒绝策略--CallerRunsPolicy
首先介紹一下ThreadPoolExecutor構(gòu)造方法:
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler) {if (corePoolSize < 0 ||maximumPoolSize <= 0 ||maximumPoolSize < corePoolSize ||keepAliveTime < 0)throw new IllegalArgumentException();if (workQueue == null || threadFactory == null || handler == null)throw new NullPointerException();this.corePoolSize = corePoolSize;this.maximumPoolSize = maximumPoolSize;this.workQueue = workQueue;this.keepAliveTime = unit.toNanos(keepAliveTime);this.threadFactory = threadFactory;this.handler = handler;}構(gòu)造方法中的字段含義如下
- corePoolSize:核心線程數(shù)量,當(dāng)有新任務(wù)在execute()方法提交時(shí),會(huì)執(zhí)行以下判斷:
- 如果運(yùn)行的線程少于 corePoolSize,則創(chuàng)建新線程來處理任務(wù),即使線程池中的其他線程是空閑的;
- 如果線程池中的線程數(shù)量大于等于 corePoolSize 且小于 maximumPoolSize,則只有當(dāng)workQueue滿時(shí)才創(chuàng)建新的線程去處理任務(wù);
- 如果設(shè)置的corePoolSize 和 maximumPoolSize相同,則創(chuàng)建的線程池的大小是固定的,這時(shí)如果有新任務(wù)提交,若workQueue未滿,則將請求放入workQueue中,等待有空閑的線程去從workQueue中取任務(wù)并處理;
- 如果運(yùn)行的線程數(shù)量大于等于maximumPoolSize,這時(shí)如果workQueue已經(jīng)滿了,則通過handler所指定的策略來處理任務(wù)
- 所以,任務(wù)提交時(shí),判斷的順序?yàn)?corePoolSize –> workQueue –> maximumPoolSize。
- maximumPoolSize:最大線程數(shù)量
- workQueue:等待隊(duì)列,當(dāng)任務(wù)提交時(shí),如果線程池中的線程數(shù)量大于等于corePoolSize的時(shí)候,把該任務(wù)封裝成一個(gè)Worker對象放入等待隊(duì)列;
- keepAliveTime:線程池維護(hù)線程所允許的空閑時(shí)間。當(dāng)線程池中的線程數(shù)量大于corePoolSize的時(shí)候,如果這時(shí)沒有新的任務(wù)提交,核心線程外的線程不會(huì)立即銷毀,而是會(huì)等待,直到等待的時(shí)間超過了keepAliveTime;
- threadFactory:它是ThreadFactory類型的變量,用來創(chuàng)建新線程。默認(rèn)使用Executors.defaultThreadFactory() 來創(chuàng)建線程。使用默認(rèn)的ThreadFactory來創(chuàng)建線程時(shí),會(huì)使新創(chuàng)建的線程具有相同的NORM_PRIORITY優(yōu)先級并且是非守護(hù)線程,同時(shí)也設(shè)置了線程的名稱。
- handler:它是RejectedExecutionHandler類型的變量,表示線程池的飽和策略。如果阻塞隊(duì)列滿了并且沒有空閑的線程,這時(shí)如果繼續(xù)提交任務(wù),就需要采取一種策略處理該任務(wù)。線程池提供了4種策略:
- AbortPolicy:直接拋出異常,這是默認(rèn)策略;
- CallerRunsPolicy:用調(diào)用者所在的線程來執(zhí)行任務(wù);
- DiscardOldestPolicy:丟棄阻塞隊(duì)列中靠最前的任務(wù),并執(zhí)行當(dāng)前任務(wù);
- DiscardPolicy:直接丟棄任務(wù);
簡單來說,在執(zhí)行execute()方法時(shí)如果狀態(tài)一直是RUNNING時(shí),的執(zhí)行過程如下:
實(shí)驗(yàn):拒絕策略CallerRunsPolicy
? ? 測試當(dāng)拒絕策略是CallerRunsPolicy時(shí),用調(diào)用者所在的線程來執(zhí)行任務(wù),是什么現(xiàn)象。
實(shí)驗(yàn)環(huán)境
- ?jdk 1.8
- postman模擬并發(fā)
- mysql5.7
- intellj?
- springboot?2.1.4.RELEASE
實(shí)驗(yàn)代碼
本實(shí)驗(yàn)代碼,在https://github.com/vincentduan/mavenProject?下的threadManagement目錄下。
實(shí)驗(yàn)步驟
1.??首先配置maven pom.xml文件內(nèi)容,關(guān)鍵內(nèi)容如下:
<dependencyManagement><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-dependencies</artifactId><version>2.1.4.RELEASE</version><scope>import</scope><type>pom</type></dependency></dependencies></dependencyManagement><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-configuration-processor</artifactId><optional>true</optional></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-jdbc</artifactId></dependency><dependency><groupId>com.alibaba</groupId><artifactId>druid</artifactId><version>1.1.6</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.6</version></dependency>2. 配置數(shù)據(jù)庫連接池:
@SpringBootConfiguration public class DBConfiguration {@Autowiredprivate Environment environment;@Beanpublic DataSource createDataSource() {DruidDataSource druidDataSource = new DruidDataSource();druidDataSource.setUrl(environment.getProperty("spring.datasource.url"));druidDataSource.setUsername(environment.getProperty("spring.datasource.username"));druidDataSource.setPassword(environment.getProperty("spring.datasource.password"));druidDataSource.setDriverClassName(environment.getProperty("spring.datasource.driver-class-name"));return druidDataSource;}}3. 配置線程池:
@Configuration @EnableAsync @Slf4j public class ExecutorConfig {@AutowiredVisiableThreadPoolTaskExecutor visiableThreadPoolTaskExecutor;@Beanpublic Executor asyncServiceExecutor() {log.info("start asyncServiceExecutor");ThreadPoolTaskExecutor executor = visiableThreadPoolTaskExecutor;//配置核心線程數(shù)executor.setCorePoolSize(5);//配置最大線程數(shù)executor.setMaxPoolSize(5);//配置隊(duì)列大小executor.setQueueCapacity(5);//配置線程池中的線程的名稱前綴executor.setThreadNamePrefix("async-service-");// rejection-policy:當(dāng)pool已經(jīng)達(dá)到max size的時(shí)候,如何處理新任務(wù)// CALLER_RUNS:不在新線程中執(zhí)行任務(wù),而是有調(diào)用者所在的線程來執(zhí)行executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());//執(zhí)行初始化executor.initialize();return executor;} }VisiableThreadPoolTaskExecutor類作為一個(gè)bean,并且繼承了ThreadPoolTaskExecutor;
4. 接下來創(chuàng)建一個(gè)service,為了模擬并發(fā),我們將方法執(zhí)行sleep了30秒。如下:
@Service @Slf4j public class UserThreadServiceImpl implements UserThreadService {@AutowiredUserThreadDao userThreadDao;@Override@Async("asyncServiceExecutor")public void serviceTest(String username) {log.info("開啟執(zhí)行一個(gè)Service, 這個(gè)Service執(zhí)行時(shí)間為30s, threadId:{}",Thread.currentThread().getId());userThreadDao.add(username, Integer.parseInt(Thread.currentThread().getId() +""), "started");try {Thread.sleep(30000);} catch (InterruptedException e) {e.printStackTrace();}log.info("執(zhí)行完成一個(gè)Service, threadId:{}",Thread.currentThread().getId());userThreadDao.update(username, Integer.parseInt(Thread.currentThread().getId() +""), "ended");}@Overridepublic void update(String username, int threadId, String status) {userThreadDao.update(username, threadId, status);} }5. 其中的dao很簡單,就是往數(shù)據(jù)庫插入數(shù)據(jù)和更新數(shù)據(jù)。
6. 創(chuàng)建web Controller:
@RestController @RequestMapping("/serviceTest") public class TestController {@AutowiredUserThreadService userThreadService;@Autowiredprivate VisiableThreadPoolTaskExecutor visiableThreadPoolTaskExecutor;@GetMapping("test/{username}")public Object test(@PathVariable("username") String username) {userThreadService.serviceTest(username);JSONObject jsonObject = new JSONObject();ThreadPoolExecutor threadPoolExecutor = visiableThreadPoolTaskExecutor.getThreadPoolExecutor();jsonObject.put("ThreadNamePrefix", visiableThreadPoolTaskExecutor.getThreadNamePrefix());jsonObject.put("TaskCount", threadPoolExecutor.getTaskCount());jsonObject.put("completedTaskCount", threadPoolExecutor.getCompletedTaskCount());jsonObject.put("activeCount", threadPoolExecutor.getActiveCount());jsonObject.put("queueSize", threadPoolExecutor.getQueue().size());return jsonObject;}}執(zhí)行測試
因?yàn)镃orePoolSize數(shù)量是5,MaxPoolSize也是5,因此線程池的大小是固定的。而QueueCapacity數(shù)量也是5。因此當(dāng)請求前5次時(shí),返回響應(yīng):
當(dāng)前線程數(shù)達(dá)到CorePoolSize時(shí),新來的請求就會(huì)進(jìn)入到workQueue中,如下所示:
而QueueCapacity的數(shù)量也是5,因此達(dá)到QueueCapacity的最大限制后,同時(shí)也達(dá)到了MaxPoolSize的最大限制,將會(huì)根據(jù)拒絕策略來處理該任務(wù),這里的策略是CallerRunsPolicy時(shí),用調(diào)用者所在的線程來執(zhí)行任務(wù),表現(xiàn)為當(dāng)前頁面被阻塞住了,直到當(dāng)前調(diào)用者所在的線程執(zhí)行完畢。如下所示:
從控制臺(tái)的輸出也能看出CallerRunsPolicy策略執(zhí)行線程是調(diào)用者線程:
2019-08-19 21:06:50.255 INFO 1302 --- [async-service-4] c.a.i.s.impl.UserThreadServiceImpl : 開啟執(zhí)行一個(gè)Service, 這個(gè)Service執(zhí)行時(shí)間為30s, threadId:51 2019-08-19 21:06:50.271 INFO 1302 --- [async-service-4] cn.ac.iie.dao.UserThreadDao : threadId:51, jdbcTemplate:org.springframework.jdbc.core.JdbcTemplate@5d83694c 2019-08-19 21:06:50.751 INFO 1302 --- [async-service-5] c.a.i.s.impl.UserThreadServiceImpl : 開啟執(zhí)行一個(gè)Service, 這個(gè)Service執(zhí)行時(shí)間為30s, threadId:52 2019-08-19 21:06:50.771 INFO 1302 --- [async-service-5] cn.ac.iie.dao.UserThreadDao : threadId:52, jdbcTemplate:org.springframework.jdbc.core.JdbcTemplate@5d83694c 2019-08-19 21:06:55.028 INFO 1302 --- [nio-8080-exec-1] c.a.i.s.impl.UserThreadServiceImpl : 開啟執(zhí)行一個(gè)Service, 這個(gè)Service執(zhí)行時(shí)間為30s, threadId:24 2019-08-19 21:06:55.036 INFO 1302 --- [nio-8080-exec-1] cn.ac.iie.dao.UserThreadDao : threadId:24, jdbcTemplate:org.springframework.jdbc.core.JdbcTemplate@5d83694c其中[nio-8080-exec-1]表示就是調(diào)用者的線程。而其他線程都是[async-service-]
總結(jié)
以上是生活随笔為你收集整理的ThreadPoolExecutor里面4种拒绝策略--CallerRunsPolicy的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Mesos介绍
- 下一篇: springboot实现多线程servi