ThreadPoolExecutor线程池 + Queue队列
1:BlockingQueue繼承關系
??java.util.concurrent 包里的?BlockingQueue是一個接口,?繼承Queue接口,Queue接口繼承?Collection
?
??BlockingQueue----->Queue-->Collection
?圖:
?
隊列的特點是:先進先出(FIFO)
?
2:BlockingQueue的方法
BlockingQueue 具有 4 組不同的方法用于插入、移除以及對隊列中的元素進行檢查。如果請求的操作不能得到立即執(zhí)行的話,每個方法的表現(xiàn)也不同。這些方法如下:
?
?
| ? | 拋出異常 | 特殊值 | 阻塞 | 超時 | 
| 插入 | add(e) | offer(e) | put(e) | offer(e, time, unit) | 
| 移除 | remove() | poll() | take() | poll(time, unit) | 
| 檢查 | element() | peek() | 不可用 | 不可用 ?  | 
?
?
四組不同的行為方式解釋:
1(異常)
如果試圖的操作無法立即執(zhí)行,拋一個異常。
2(特定值)?
如果試圖的操作無法立即執(zhí)行,返回一個特定的值(常常是 true / false)。
3(阻塞)?
如果試圖的操作無法立即執(zhí)行,該方法調用將會發(fā)生阻塞,直到能夠執(zhí)行。
4(超時)?
如果試圖的操作無法立即執(zhí)行,該方法調用將會發(fā)生阻塞,直到能夠執(zhí)行,但等待時間不會超過給定值。返回一個特定值以告知該操作是否成功(典型的是 true / false)。
??
1.首先是springBoot的項目框架如下:
2.業(yè)務測試流程涉及的類,如下
BusinessThread 類
 package com.springboot.demo.Threads;
import org.springframework.context.annotation.Scope;
 import org.springframework.stereotype.Component;
 /**
 ?* Created by Administrator on 2018/5/9.
 ?*/
 @Component
 @Scope("prototype")//spring 多例
 public class BusinessThread implements Runnable{
? ? private String acceptStr;
? ? public BusinessThread(String acceptStr) {
 ? ? ? ? this.acceptStr = acceptStr;
 ? ? }
? ? public String getAcceptStr() {
 ? ? ? ? return acceptStr;
 ? ? }
? ? public void setAcceptStr(String acceptStr) {
 ? ? ? ? this.acceptStr = acceptStr;
 ? ? }
? ? @Override
 ? ? public void run() {
 ? ? ? ? //業(yè)務操作
 ? ? ? ? System.out.println("多線程已經(jīng)處理訂單插入系統(tǒng),訂單號:"+acceptStr);
? ? ? ? //線程阻塞
 ? ? ? ? /*try {
 ? ? ? ? ? ? Thread.sleep(1000);
 ? ? ? ? ? ? System.out.println("多線程已經(jīng)處理訂單插入系統(tǒng),訂單號:"+acceptStr);
 ? ? ? ? } catch (InterruptedException e) {
 ? ? ? ? ? ? e.printStackTrace();
 ? ? ? ? }*/
 ? ? }
 }
TestThreadPoolManager 類
 package com.springboot.demo.Threads;
import org.springframework.beans.BeansException;
 import org.springframework.beans.factory.BeanFactory;
 import org.springframework.beans.factory.BeanFactoryAware;
 import org.springframework.stereotype.Component;
import java.util.Map;
 import java.util.Queue;
 import java.util.concurrent.*;
/**
 ?* Created by Administrator on 2018/5/10.
 ?*/
 @Component
 public class TestThreadPoolManager implements BeanFactoryAware {
? ? //用于從IOC里取對象
 ? ? private BeanFactory factory; //如果實現(xiàn)Runnable的類是通過spring的application.xml文件進行注入,可通過 factory.getBean()獲取,這里只是提一下
? ? // 線程池維護線程的最少數(shù)量
 ? ? private final static int CORE_POOL_SIZE = 2;
 ? ? // 線程池維護線程的最大數(shù)量
 ? ? private final static int MAX_POOL_SIZE = 10;
 ? ? // 線程池維護線程所允許的空閑時間
 ? ? private final static int KEEP_ALIVE_TIME = 0;
 ? ? // 線程池所使用的緩沖隊列大小
 ? ? private final static int WORK_QUEUE_SIZE = 50;
? ? @Override
 ? ? public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
 ? ? ? ? factory = beanFactory;
 ? ? }
? ? /**
 ? ? ?* 用于儲存在隊列中的訂單,防止重復提交,在真實場景中,可用redis代替 驗證重復
 ? ? ?*/
 ? ? Map<String, Object> cacheMap = new ConcurrentHashMap<>();
 ? ? /**
 ? ? ?* 訂單的緩沖隊列,當線程池滿了,則將訂單存入到此緩沖隊列
 ? ? ?*/
 ? ? Queue<Object> msgQueue = new LinkedBlockingQueue<Object>();
 ? ? /**
 ? ? ?* 當線程池的容量滿了,執(zhí)行下面代碼,將訂單存入到緩沖隊列
 ? ? ?*/
 ? ? final RejectedExecutionHandler handler = new RejectedExecutionHandler() {
 ? ? ? ? @Override
 ? ? ? ? public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
 ? ? ? ? ? ? //訂單加入到緩沖隊列
 ? ? ? ? ? ? msgQueue.offer(((BusinessThread) r).getAcceptStr());
 ? ? ? ? ? ? System.out.println("系統(tǒng)任務太忙了,把此訂單交給(調度線程池)逐一處理,訂單號:" + ((BusinessThread) r).getAcceptStr());
 ? ? ? ? }
 ? ? };
 ? ? /**創(chuàng)建線程池*/
 ? ?final ThreadPoolExecutor threadPool = new ThreadPoolExecutor(CORE_POOL_SIZE, MAX_POOL_SIZE, KEEP_ALIVE_TIME, TimeUnit.SECONDS, new ArrayBlockingQueue(WORK_QUEUE_SIZE), this.handler);
 ? ? /**將任務加入訂單線程池*/
 ? ? public void addOrders(String orderId){
 ? ? ? ? System.out.println("此訂單準備添加到線程池,訂單號:" + orderId);
 ? ? ? ? //驗證當前進入的訂單是否已經(jīng)存在
 ? ? ? ? if (cacheMap.get(orderId) == null) {
 ? ? ? ? ? ? cacheMap.put(orderId, new Object());
 ? ? ? ? ? ? BusinessThread businessThread = new BusinessThread(orderId);
 ? ? ? ? ? ? threadPool.execute(businessThread);
 ? ? ? ? }
 ? ? }
? ? /**
 ? ? ?* 線程池的定時任務----> 稱為(調度線程池)。此線程池支持 定時以及周期性執(zhí)行任務的需求。
 ? ? ?*/
 ? ? final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(5);
 ? ? /**
 ? ? ?* 檢查(調度線程池),每秒執(zhí)行一次,查看訂單的緩沖隊列是否有 訂單記錄,則重新加入到線程池
 ? ? ?*/
 ? ? final ScheduledFuture scheduledFuture = scheduler.scheduleAtFixedRate(new Runnable() {
 ? ? ? ? @Override
 ? ? ? ? public void run() {
 ? ? ? ? ? ? //判斷緩沖隊列是否存在記錄
 ? ? ? ? ? ? if(!msgQueue.isEmpty()){
 ? ? ? ? ? ? ? ? //當線程池的隊列容量少于WORK_QUEUE_SIZE,則開始把緩沖隊列的訂單 加入到 線程池
 ? ? ? ? ? ? ? ? if (threadPool.getQueue().size() < WORK_QUEUE_SIZE) {
 ? ? ? ? ? ? ? ? ? ? String orderId = (String) msgQueue.poll();
 ? ? ? ? ? ? ? ? ? ? BusinessThread businessThread = new BusinessThread(orderId);
 ? ? ? ? ? ? ? ? ? ? threadPool.execute(businessThread);
 ? ? ? ? ? ? ? ? ? ? System.out.println("(調度線程池)緩沖隊列出現(xiàn)訂單業(yè)務,重新添加到線程池,訂單號:"+orderId);
 ? ? ? ? ? ? ? ? }
 ? ? ? ? ? ? }
 ? ? ? ? }
 ? ? }, 0, 1, TimeUnit.SECONDS);
 ? ? /**獲取消息緩沖隊列*/
 ? ? public Queue<Object> getMsgQueue() {
 ? ? ? ? return msgQueue;
 ? ? }
? ? /**終止訂單線程池+調度線程池*/
 ? ? public void shutdown() {
 ? ? ? ? //true表示如果定時任務在執(zhí)行,立即中止,false則等待任務結束后再停止
 ? ? ? ? System.out.println("終止訂單線程池+調度線程池:"+scheduledFuture.cancel(false));
 ? ? ? ? scheduler.shutdown();
 ? ? ? ? threadPool.shutdown();
? ? }
 }
 TestController 類
 package com.springboot.demo;
import com.springboot.demo.Threads.TestThreadPoolManager;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.web.bind.annotation.GetMapping;
 import org.springframework.web.bind.annotation.PathVariable;
 import org.springframework.web.bind.annotation.RestController;
import java.util.Queue;
 import java.util.UUID;
/**
 ?* Created by Administrator on 2018/5/9.
 ?*/
 @RestController
 public class TestController {
? ? @Autowired
 ? ? TestThreadPoolManager testThreadPoolManager;
? ? /**
 ? ? ?* 測試模擬下單請求 入口
 ? ? ?* @param id
 ? ? ?* @return
 ? ? ?*/
 ? ? @GetMapping("/start/{id}")
 ? ? public String start(@PathVariable Long id) {
 ? ? ? ? //模擬的隨機數(shù)
 ? ? ? ? String orderNo = System.currentTimeMillis() + UUID.randomUUID().toString();
? ? ? ? testThreadPoolManager.addOrders(orderNo);
? ? ? ? return "Test ThreadPoolExecutor start";
 ? ? }
? ? /**
 ? ? ?* 停止服務
 ? ? ?* @param id
 ? ? ?* @return
 ? ? ?*/
 ? ? @GetMapping("/end/{id}")
 ? ? public String end(@PathVariable Long id) {
? ? ? ? testThreadPoolManager.shutdown();
? ? ? ? Queue q = testThreadPoolManager.getMsgQueue();
 ? ? ? ? System.out.println("關閉了線程服務,還有未處理的信息條數(shù):" + q.size());
 ? ? ? ? return "Test ThreadPoolExecutor start";
 ? ? }
 }
??
總結
以上是生活随笔為你收集整理的ThreadPoolExecutor线程池 + Queue队列的全部內容,希望文章能夠幫你解決所遇到的問題。
                            
                        - 上一篇: Java高并发之BlockingQueu
 - 下一篇: mybatis动态更新xml文件后热部署