分布与并行计算—生产者消费者模型RabbitMQ(Java)
生活随笔
收集整理的這篇文章主要介紹了
分布与并行计算—生产者消费者模型RabbitMQ(Java)
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
連接工具
public class ConnectionUtil {public static final String QUEUE_NAME="firstQueue";private static final String RABBIT_HOST = "11";private static final String RABBIT_USERNAME = "";private static final String RABBIT_PASSWORD = "";private static Connection connection = null;public static int byteArrayToInt(byte[] b) {return b[3] & 0xFF |(b[2] & 0xFF) << 8 |(b[1] & 0xFF) << 16 |(b[0] & 0xFF) << 24;}public static byte[] intToByteArray(int a) {return new byte[] {(byte) ((a >> 24) & 0xFF),(byte) ((a >> 16) & 0xFF),(byte) ((a >> 8) & 0xFF),(byte) (a & 0xFF)};}public static Connection getConnection() {if(connection == null) {ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost(RABBIT_HOST);connectionFactory.setUsername(RABBIT_USERNAME);connectionFactory.setPassword(RABBIT_PASSWORD);try {connection = connectionFactory.newConnection();} catch (IOException e) {e.printStackTrace();} catch (TimeoutException e) {e.printStackTrace();}}return connection;}}消費者線程
public class Consumer implements Runnable {int j=0;int n;CountDownLatch countDownLatch;public Consumer( int n,CountDownLatch countDownLatch) {this.countDownLatch=countDownLatch;this.n = n;}@Overridepublic void run() {try {Connection connection = ConnectionUtil.getConnection();Channel channel = connection.createChannel();channel.basicQos(1);//能者多勞模式channel.queueDeclare(ConnectionUtil.QUEUE_NAME,true,false,false,null);DefaultConsumer consumer = new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, com.rabbitmq.client.Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {super.handleDelivery(consumerTag, envelope, properties, body);int cur=ConnectionUtil.byteArrayToInt(body);try {isPrime(cur);countDownLatch.countDown();/* System.out.println(==1?"質數":"非質數");*/}finally {channel.basicAck(envelope.getDeliveryTag(),false);}}};channel.basicConsume(ConnectionUtil.QUEUE_NAME,false, consumer);} catch (IOException ioException) {ioException.printStackTrace();}}int isPrime(int n){ //返回1表示判斷為質數,0為非質數,在此沒有進行輸入異常檢測double n_sqrt;if(n==2 || n==3) return 1;if(n%6!=1 && n%6!=5) return 0;n_sqrt=Math.floor(Math.sqrt((float)n));for(int i=5;i<=n_sqrt;i+=6){if(n%(i)==0 | n%(i+2)==0) return 0;}return 1;}}啟動入口
public class Model {public static void excute(int consumerNum,int num){long s=System.currentTimeMillis();CountDownLatch countDownLatch=new CountDownLatch(num);ExecutorService executorService= Executors.newFixedThreadPool(consumerNum);for(int i=0;i<consumerNum;i++){executorService.execute(new Consumer(num/consumerNum,countDownLatch));}try {countDownLatch.await();} catch (InterruptedException e) {e.printStackTrace();}System.out.println((System.currentTimeMillis()-s)/1000+"s");executorService.shutdown();}public static void main(String[] args) {excute(4,100);} }生產者線程
public class Producer implements Runnable{int n;CountDownLatch countDownLatch;public Producer(int n, CountDownLatch countDownLatch) {this.n = n;this.countDownLatch=countDownLatch;}@Overridepublic void run() {Connection connection= ConnectionUtil.getConnection();try {Channel channel=connection.createChannel();channel.queueDeclare(ConnectionUtil.QUEUE_NAME,true,false,false,null);Random ra =new Random();for(int i=0;i<n;i++){try {/* System.out.println(this.toString()+i+"生產");*/channel.basicPublish("",ConnectionUtil.QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,ConnectionUtil.intToByteArray(ra.nextInt(2000000000)+1));countDownLatch.countDown();/* System.out.println("生產"+blockingQueue.size());*/} catch (Exception e) {e.printStackTrace();}}} catch (IOException ioException) {ioException.printStackTrace();}System.out.println("生產者完成");} }啟動入口
public class test {public static void excute(int producerNum,int num){CountDownLatch countDownLatch=new CountDownLatch(producerNum);ExecutorService executorService= Executors.newFixedThreadPool(num);for(int i=0;i<producerNum;i++){executorService.execute(new Producer(num/producerNum,countDownLatch));}try {countDownLatch.await();} catch (InterruptedException e) {e.printStackTrace();}executorService.shutdown(); /*for(int i=0;i<consumerNum;i++){new Thread(new Consumer(blockingQueue,num/consumerNum,countDownLatch)).start();} */}public static void main(String[] args) {long s=System.currentTimeMillis();excute(2,100);System.out.println((double) (System.currentTimeMillis()-s)/1000);}}總結
以上是生活随笔為你收集整理的分布与并行计算—生产者消费者模型RabbitMQ(Java)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 梦到陌生男人上我的床什么意思
- 下一篇: 如何告诉对方我梦到他了