java——自己实现基础的线程池及带有任务数过多拒绝策略、线程池销毁、自动扩充线程数量及闲时自动回收线程等操作的改进版线程池
生活随笔
收集整理的這篇文章主要介紹了
java——自己实现基础的线程池及带有任务数过多拒绝策略、线程池销毁、自动扩充线程数量及闲时自动回收线程等操作的改进版线程池
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
1. 實現第一版基礎的線程池
1.1 首先我們定義一個線程池類ThreadPool,然后線程池有一個容器存放我們創建的線程,另一個容器則是存放當前線程池需要處理的任務隊列,線程容器用ArrayList實現,任務隊列容器用LinkedList實現。
1.2 線程池功能分析:創建線程池時,往線程容器中加入默認數量的線程,但是由于此時的任務隊列為空,所以線程處于等待狀態;當往線程池提交任務時,喚醒正在等待的線程來處理任務,處理完任務后如果任務隊列為空,線程又進入等待狀態,代碼及運行結果如下。
import java.util.ArrayList; import java.util.LinkedList;public class ThreadPoolTest {public static void main(String[] args) {// 創建線程池ThreadPool threadPool = new ThreadPool();// 提交任務給線程池處理for (int i = 0; i < 40; i++) {threadPool.submit(String.valueOf(i));}} }class ThreadPool{private static final int DEFAULT_SIZE = 10;private int size;private final static ThreadGroup GROUP = new ThreadGroup("MyThreadGroup");private final LinkedList<String> taskQueue = new LinkedList<>();private ArrayList<Thread> threadLst = new ArrayList<>();ThreadPool(){this(DEFAULT_SIZE);}ThreadPool(int size){this.size = size;for (int i = 0; i < size; i++) {Thread t = new Thread(GROUP, ()->{while(true){synchronized (taskQueue) {// 任務隊列為空,線程則進入等待狀態while (taskQueue.isEmpty()) {try {taskQueue.wait();} catch (InterruptedException e) {e.printStackTrace();}}// 模擬處理任務System.out.println(Thread.currentThread().getName() + " deal task!!");taskQueue.removeFirst();}// 如果這一段業務代碼處于synchronized內,那么會導致提交任務時搶不到鎖,// 而造成提交任務被打斷try {Thread.sleep(20);} catch (InterruptedException e) {e.printStackTrace();}System.out.println(Thread.currentThread().getName() + " finish task!");}});threadLst.add(t);t.start();}}public void submit(String task){// 多線程下提交任務和處理任務,需要加synchronized保證線程安全。// 提交任務后需要喚醒線程處理任務synchronized (taskQueue){taskQueue.add(task);taskQueue.notify();System.out.println("提交了一個任務========");}} }? ??
?
2. 增加任務數過多和線程池銷毀功能的改進版線程池
2.1 正常運行的代碼及結果
import java.util.ArrayList; import java.util.LinkedList;public class ThreadPoolTest {public static void main(String[] args) {// 創建線程池ThreadPool threadPool = new ThreadPool();// 提交任務給線程池處理new Thread(){@Overridepublic void run() {for (int i = 0; i < 20; i++) {threadPool.submit(String.valueOf(i));}}}.start();// 銷毀線程池new Thread(){@Overridepublic void run() {threadPool.destry();}}.start();// 線程池被銷毀后,再提交任務會拋出異常threadPool.submit("w");} }class ThreadPool{private static final int DEFAULT_SIZE = 10;private int size;private boolean destry = false;private final static ThreadGroup GROUP = new ThreadGroup("MyThreadGroup");private final LinkedList<String> taskQueue = new LinkedList<>();private static final int MAX_TASK_SIZE = 2000;private final ArrayList<Worker> threadLst = new ArrayList<>();private class Worker extends Thread{private volatile boolean dead = false;public Worker(ThreadGroup group, String name){super(group, name);}@Overridepublic void run() {end:while(!dead){// 模擬多線程,等待搶taskQueue鎖 // try { // Thread.sleep(1); // } catch (InterruptedException e) { // }synchronized (taskQueue) {while (taskQueue.isEmpty()) {try {taskQueue.wait();} catch (InterruptedException e) {break end;}}System.out.println(Thread.currentThread().getName() + " deal task!!");taskQueue.removeFirst();}// 如果這一段業務代碼處于synchronized內,那么會導致提交任務時搶不到鎖,// 而造成提交任務被阻塞try {Thread.sleep(20);} catch (InterruptedException e) {}System.out.println(Thread.currentThread().getName() + " finish task!");}}public void terminate(){dead = true;}}ThreadPool(){this(DEFAULT_SIZE);}ThreadPool(int size){this.size = size;for (int i = 0; i < size; i++) {Worker worker = new Worker(GROUP, "t"+i);threadLst.add(worker);worker.start();}}public void submit(String task) {// 多線程下提交任務和處理任務,需要加synchronized保證線程安全。// 提交任務后需要喚醒線程處理任務synchronized (taskQueue){// 線程被銷毀,如果還有任務,那么拋出異常if(destry){throw new RuntimeException("ThreadPool has been destry!!");}if(taskQueue.size() >= MAX_TASK_SIZE){throw new RuntimeException("Task is too much!!!");}taskQueue.add(task);taskQueue.notify();System.out.println("提交了一個任務========");}}public void destry() {destry = true;/*// 當然可以加上這么一段,保證任務處理結束再銷毀線程池,但是需要等待的時間也變長了。synchronized(taskQueue){while(!taskQueue.isEmpty()){try {Thread.sleep(50);} catch (InterruptedException e) {e.printStackTrace();}}}*/for (int i = 0; i < size; i++) {Worker t = threadLst.get(i);t.interrupt();t.terminate();}// 等待50ms,再去查看線程池活躍線程數,因為還是要等run方法執行結束才行try {Thread.sleep(50);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("銷毀線程池后剩余的活躍線程數:" + GROUP.activeCount());} }?
2.2 模擬多線程出錯的情況
2.3 解決:既然一次銷毀不了,你還會進入等待狀態,那么就兩次唄,為了簡單實現而已。實際操作肯定不行,有待改進。
?
3. 簡單地擴充線程數量及閑時自動回收邏輯的改進版
package ThreadTest;import java.util.ArrayList; import java.util.LinkedList;public class ThreadPoolTest {public static void main(String[] args) throws InterruptedException {// 創建線程池ThreadPool threadPool = new ThreadPool();new Thread(threadPool).start();// 提交任務給線程池處理new Thread(){@Overridepublic void run() {for (int i = 0; i < 1000; i++) {threadPool.submit(String.valueOf(i));}}}.start();// 銷毀線程池new Thread(){@Overridepublic void run() {threadPool.destry();}}.start();// 線程池被銷毀后,再提交任務會拋出異常threadPool.submit("w");} }class ThreadPool implements Runnable{private static int cnt = 1;private boolean destry = false;private final static ThreadGroup GROUP = new ThreadGroup("MyThreadGroup");private final LinkedList<String> taskQueue = new LinkedList<>();private static final int MAX_TASK_SIZE = 2000;private final ArrayList<Worker> threadLst = new ArrayList<>();private int min;private int idle;private int max;private class Worker extends Thread{private volatile boolean dead = false;public Worker(ThreadGroup group, String name){super(group, name);}@Overridepublic void run() {end:while(!dead){// 模擬多線程,等待搶taskQueue鎖try {Thread.sleep(1);} catch (InterruptedException e) {}synchronized (taskQueue) {while (taskQueue.isEmpty()) {try {taskQueue.wait();} catch (InterruptedException e) {break end;}}System.out.println(Thread.currentThread().getName() + " deal task!!");taskQueue.removeFirst();}// 如果這一段業務代碼處于synchronized內,那么會導致提交任務時搶不到鎖,// 而造成提交任務被阻塞try {Thread.sleep(20);} catch (InterruptedException e) {}System.out.println(Thread.currentThread().getName() + " finish task!");}}public void terminate(){dead = true;}}ThreadPool(){this(5, 10, 50);}ThreadPool(int min, int idle, int max){this.min = min;this.idle = idle;this.max = max;for (int i = 0; i < min; i++) {createWorker();}}private void createWorker(){Worker worker = new Worker(GROUP, "t"+(cnt++));threadLst.add(worker);worker.start();}public void submit(String task) {// 多線程下提交任務和處理任務,需要加synchronized保證線程安全。// 提交任務后需要喚醒線程處理任務synchronized (taskQueue){// 線程被銷毀,如果還有任務,那么拋出異常if(destry){throw new RuntimeException("ThreadPool has been destry!!");}if(taskQueue.size() >= MAX_TASK_SIZE){throw new RuntimeException("Task is too much!!!");}taskQueue.add(task);taskQueue.notify();System.out.println("提交了一個任務========");}}public void destry() {destry = true;/*// 當然可以加上這么一段,保證任務處理結束再銷毀線程池,但是需要等待的時間也變長了。synchronized(taskQueue){while(!taskQueue.isEmpty()){try {Thread.sleep(50);} catch (InterruptedException e) {e.printStackTrace();}}}*/synchronized (threadLst){for (int i = 0; i < threadLst.size(); i++) {Worker t = threadLst.get(i);t.interrupt();t.terminate();}// 等待50ms,再去查看線程池活躍線程數,因為還是要等run方法執行結束才行try {Thread.sleep(50);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("第一次銷毀線程池后剩余的活躍線程數:" + GROUP.activeCount());for (int i = 0; i < threadLst.size(); i++) {Worker t = threadLst.get(i);t.interrupt();t.terminate();}try {Thread.sleep(30);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("第二次銷毀線程池后剩余的活躍線程數:" + GROUP.activeCount());}}@Overridepublic void run() {while(! destry){System.out.println("線程池當前的線程個數:" + threadLst.size());try {Thread.sleep(50);// taskQueue只是讀取size大小,不涉及更改;而且size的不安全不造成問題。// 但是threadLst涉及增加元素,存在同步安全問題;而且線程池大小有設定,所以這里對threadLst對象加鎖synchronized (threadLst){while (!destry){int size = threadLst.size();if(taskQueue.size() > 3 * min && size < idle){for (int i = size; i < idle; i++) {createWorker();}}else if(taskQueue.size() > 5 * idle && size < max){for (int i = size; i < max; i++) {createWorker();}}if(taskQueue.size() < idle && size > idle){for (int i = idle; i < size; i++) {Worker worker = threadLst.get(0);worker.interrupt();worker.terminate();threadLst.remove(0);}}}}} catch (InterruptedException e) {e.printStackTrace();}}} }?? ??
?
總結:寫得可能有點亂,甚至有bug,請多多指教。
與50位技術專家面對面20年技術見證,附贈技術全景圖總結
以上是生活随笔為你收集整理的java——自己实现基础的线程池及带有任务数过多拒绝策略、线程池销毁、自动扩充线程数量及闲时自动回收线程等操作的改进版线程池的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 单线程下的生产者--消费者模式详解,wa
- 下一篇: 两个例子详解并发编程的可见性问题和有序性