RabbitMQ 一二事(2) - 工作队列使用
生活随笔
收集整理的這篇文章主要介紹了
RabbitMQ 一二事(2) - 工作队列使用
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
上篇文章講了簡單隊列的使用,這其實就是RMQ給的demo,實際并沒有什么用
本篇講講工作模式隊列,也稱之為任務隊列
一個生產者發布了多條消息,消費者A可以接受消息,接受消息后該消息就消除,消費者B可以接受其他消息
使用場景,一些數據庫操作比較緩慢的話可以分別給多個接口調用,降低壓力,或者搶單場景也能考慮,
比如就10個商品,100個消費者來搶單,前10個搶到了后,消息隊列就為空了,那么第11個以后的所有消費者都不會搶到
代碼示例:
生產者
1 public class Send { 2 3 private final static String QUEUE_NAME = "test_queue_work"; 4 5 public static void main(String[] argv) throws Exception { 6 // 獲取到連接以及mq通道 7 Connection connection = ConnectionUtil.getConnection(); 8 Channel channel = connection.createChannel(); 9 10 // 聲明隊列 11 channel.queueDeclare(QUEUE_NAME, false, false, false, null); 12 13 for (int i = 0; i < 50; i++) { 14 // 消息內容 15 String message = "" + i; 16 channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); 17 System.out.println(" [x] Sent '" + message + "'"); 18 19 Thread.sleep(i * 10); 20 } 21 22 channel.close(); 23 connection.close(); 24 } 25 }?
消費者1
1 public class Recv { 2 3 private final static String QUEUE_NAME = "test_queue_work"; 4 5 public static void main(String[] argv) throws Exception { 6 7 // 獲取到連接以及mq通道 8 Connection connection = ConnectionUtil.getConnection(); 9 Channel channel = connection.createChannel(); 10 11 // 聲明隊列 12 channel.queueDeclare(QUEUE_NAME, false, false, false, null); 13 14 // 同一時刻服務器只會發一條消息給消費者, 如果注釋了就是指生產者平均分配任務給消費者 15 channel.basicQos(1); 16 17 // 定義隊列的消費者 18 QueueingConsumer consumer = new QueueingConsumer(channel); 19 // 監聽隊列,手動返回完成 設置fasle代表需要手動返回消息的確認狀態 20 channel.basicConsume(QUEUE_NAME, false, consumer); 21 22 // 獲取消息 23 while (true) { 24 QueueingConsumer.Delivery delivery = consumer.nextDelivery(); 25 String message = new String(delivery.getBody()); 26 System.out.println(" [x] Received '" + message + "'"); 27 // 休眠 28 Thread.sleep(10); 29 // 手動確認 返回確認狀態 30 channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); 31 } 32 } 33 }?
消費者2
1 public class Recv2 { 2 3 private final static String QUEUE_NAME = "test_queue_work"; 4 5 public static void main(String[] argv) throws Exception { 6 7 // 獲取到連接以及mq通道 8 Connection connection = ConnectionUtil.getConnection(); 9 Channel channel = connection.createChannel(); 10 11 // 聲明隊列 12 channel.queueDeclare(QUEUE_NAME, false, false, false, null); 13 14 // 同一時刻服務器只會發一條消息給消費者, 如果注釋了就是指生產者平均分配任務給消費者 15 channel.basicQos(1); 16 17 // 定義隊列的消費者 18 QueueingConsumer consumer = new QueueingConsumer(channel); 19 // 監聽隊列,手動返回完成狀態 設置fasle代表需要手動返回消息的確認狀態 20 channel.basicConsume(QUEUE_NAME, false, consumer); 21 22 // 獲取消息 23 while (true) { 24 QueueingConsumer.Delivery delivery = consumer.nextDelivery(); 25 String message = new String(delivery.getBody()); 26 System.out.println(" [x] Received '" + message + "'"); 27 // 休眠1秒 28 Thread.sleep(1000); 29 // 手動確認 返回確認狀態 30 channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); 31 } 32 } 33 }?
轉載于:https://www.cnblogs.com/leechenxiang/p/5516726.html
總結
以上是生活随笔為你收集整理的RabbitMQ 一二事(2) - 工作队列使用的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: RabbitMQ 一二事 - 简单队列使
- 下一篇: Interface继承至System.O