秒杀多线程第十篇 生产者消费者问题 (续)
生活随笔
收集整理的這篇文章主要介紹了
秒杀多线程第十篇 生产者消费者问题 (续)
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
使用java 和semaphore實現的 ,多個生產者和多個消費者的問題。
1.使用Semaphore,Semaphore的大小設定為BUFFER_LENGTH。也就是同時最多有這么多線程來操作緩沖區。2個semaphore, empty和exist。
默認開始緩沖區為空
1)StoreEmpty 在開始時,所有的都可用。
2)StoreHas 在開始時都是鎖定的,也就是沒有空余的可以acquire,直到生產者放入數據以后,就可以。
?
2.生產者邏輯:
1)等待緩沖區有空間
2)同步放入數據到緩存區
3)通知緩沖區存在數據
4)所有數據都已生產,通知其他生產線程停止。
3.消費者邏輯
1)等待緩沖區有數據
2)同步取出數據
3)通知緩沖區有空間
4)所有數據已消費,通知其他消費線程停止
?
?
main class:
package com.multithread.prosumer;import java.util.LinkedList; import java.util.Queue; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executor; import java.util.concurrent.Executors; import java.util.concurrent.Semaphore;import com.multithread.main.ExampleInterface;public class ProsumerExample extends ExampleInterface {static final int BUFFER_LENGTH = 10;static final int CUSTOMER_SIZE = 4;static final int PRODUCTOR_SIZE = 3;public static final int TOTAL_PRODUCTORS = 200;public Queue<Integer> g_produtor = new LinkedList<Integer>();public volatile int mProductor = 0;public Object objlock = new Object();Semaphore StoreEmpty = new Semaphore(BUFFER_LENGTH);//等待緩沖區數據Semaphore StoreHas = new Semaphore(BUFFER_LENGTH); public CountDownLatch mLatchDown = new CountDownLatch(PRODUCTOR_SIZE+CUSTOMER_SIZE);public CountDownLatch mLatchStart = new CountDownLatch(PRODUCTOR_SIZE+CUSTOMER_SIZE);public boolean bStopCustomFlag = false;public boolean bStopProductorFlag = false;@Overridepublic void startDemo() {try {g_produtor.clear(); bStopCustomFlag = false;initEmptySingal();initExistSingal();Executor mEcecutor = Executors.newFixedThreadPool(PRODUCTOR_SIZE+CUSTOMER_SIZE);for(int i=1;i<=PRODUCTOR_SIZE;i++){mEcecutor.execute(new ProducerThread(this,"生產者"+i));}for(int j=1;j<=CUSTOMER_SIZE;j++){char c =(char)(j+'A'-1);mEcecutor.execute(new CustomerThread(this,"消費者"+c));}mLatchStart.await();System.out.println("所有操作線程已啟動...");mLatchDown.await();} catch (InterruptedException e) {// TODO Auto-generated catch block e.printStackTrace();}catch(Exception e){e.printStackTrace(); }System.out.println("所有線程操作結束");}/** if true ,go back, if false, wait here* */public void waitEmpty(String name){try { // System.out.println("[waitEmpty]"+name+"等待緩沖區,有空余地方:"+StoreEmpty.availablePermits()); StoreEmpty.acquire(); // System.out.println("[waitEmpty]"+name+"等待緩沖區,有空余地方結束 剩余空間:"+StoreEmpty.availablePermits());} catch (InterruptedException e) {// TODO Auto-generated catch block e.printStackTrace();}}public void singalEmpty(String name){StoreEmpty.release(); // System.out.println("[singalEmpty]"+name+"緩沖區釋放空余地方,剩余空間:"+StoreEmpty.availablePermits()); }public void waitExist(String name){try { // System.out.println("[waitExist]"+name+"等待緩沖區,數據存放空間:"+StoreHas.availablePermits()); StoreHas.acquire(); // System.out.println("[waitExist]"+name+"緩沖區有數據放入,緩沖區數據個數:"+StoreHas.availablePermits());} catch (InterruptedException e) {// TODO Auto-generated catch block e.printStackTrace();}}public void singalExist(String name){StoreHas.release(); // System.out.println("[singalExist]"+name+"將數據放入緩沖區:"+StoreHas.availablePermits()); }public void initEmptySingal(){//init,all is empty; // try { // StoreEmpty.acquire(BUFFER_LENGTH-1); // } catch (InterruptedException e) { // // TODO Auto-generated catch block // e.printStackTrace(); // } }public void initExistSingal(){//init,nothing is existtry { // System.out.println("釋放所有緩沖區數據,消費線程全部等待:"+StoreHas.availablePermits()); StoreHas.acquire(StoreHas.availablePermits());} catch (InterruptedException e) {// TODO Auto-generated catch block e.printStackTrace();}}public void releaseExistSingal(){ // System.out.println("[releaseExistSingle]等待緩沖區有數據放入:釋放所有"+StoreHas.availablePermits()); StoreHas.release(BUFFER_LENGTH);}public void releaseEmptySingal(){StoreEmpty.release(BUFFER_LENGTH);} } package com.multithread.prosumer;public class ProducerThread extends Thread {ProsumerExample mProsumer = null;String name = null;boolean flag = true;public ProducerThread(ProsumerExample pe,String name){mProsumer = pe;this.name = name;}@Overridepublic void run() {System.out.println(name+"操作開始");mProsumer.mLatchStart.countDown(); // for(int i=0;i<=END_PRODUCE_NUMBER;i++) // { // try { // //等待緩沖區為空 // mProsumer.waitEmpty(name); // //互斥的訪問緩沖區 // synchronized (mProsumer.objlock) { // int index = mProsumer.g_produtor.size(); // mProsumer.g_produtor.offer(i); // System.out.println(name+"將數據"+i+"放入緩沖區位置:"+(index+1)); // } // // //通知緩沖區有新數據了 // mProsumer.singalExist(name); // // } catch (Exception e) { // // TODO Auto-generated catch block // e.printStackTrace(); // break; // } // finally{ // } // }while(flag){//等待緩沖區為空 mProsumer.waitEmpty(name);//互斥的訪問緩沖區 synchronized (mProsumer.objlock){if(mProsumer.mProductor<ProsumerExample.TOTAL_PRODUCTORS){int index = mProsumer.g_produtor.size();mProsumer.g_produtor.offer(mProsumer.mProductor);System.out.println(name+"將數據"+mProsumer.mProductor+"放入緩沖區位置:"+(index+1));++mProsumer.mProductor;if(mProsumer.mProductor>=ProsumerExample.TOTAL_PRODUCTORS){flag = false;mProsumer.releaseEmptySingal();} }else{flag = false;//結束操作break;//不用通知,應為沒有產生新數據 }}//通知緩沖區有新數據了 mProsumer.singalExist(name);}System.out.println(name+"操作結束");mProsumer.mLatchDown.countDown();}} public class CustomerThread extends Thread {volatile boolean flag = true;ProsumerExample mProsumer = null;int mProductor = 0;String name = null;public CustomerThread(ProsumerExample pe, String name) {mProsumer = pe;this.name = name;}@Overridepublic void run() {System.out.println("---" + name + "操作開始");mProsumer.mLatchStart.countDown();while (flag) {try {// 等待緩沖池有數據System.out.println("---" + name + "等待緩沖區數據");mProsumer.waitExist(name);// 互斥的訪問緩沖區synchronized (mProsumer.objlock) {if (mProsumer.g_produtor.size() > 0) {mProductor = mProsumer.g_produtor.poll();System.out.println("---" + name + "將數據" + mProductor+ "取出緩沖區");if (mProductor == (ProsumerExample.TOTAL_PRODUCTORS-1)) {flag = false;mProsumer.bStopCustomFlag = true;// 釋放其他消費線程 mProsumer.releaseExistSingal();}} else {System.out.println("---" + name + "緩沖區已空");// 其他消費者線程已停止,緩沖區已為空,此線程也要停止。if (mProsumer.bStopCustomFlag) {flag = false;break;//沒有產生新的空間 }}}// 通知緩存區有空間 mProsumer.singalEmpty(name);// doing other thingsThread.sleep((long) (Math.random() * 100));} catch (InterruptedException e) {// TODO Auto-generated catch block e.printStackTrace();}}System.out.println("---" + name + "操作結束");mProsumer.mLatchDown.countDown();}}?
轉載于:https://www.cnblogs.com/deman/p/4091365.html
總結
以上是生活随笔為你收集整理的秒杀多线程第十篇 生产者消费者问题 (续)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: log4j 配置,tomcat 启动或有
- 下一篇: Robotium只有apk文件测试实例