rabbitmq-发布订阅模式
【README】
本文po出 mq的發布訂閱模式,及代碼示例;
?
【1】intro
1) 角色: 有4個角色, 包括 生產者,消費者, 交換機 exchange(X), 隊列;
2)交換機: 一方面,接收生產者的消息,另一方面,處理消息,如發送給隊列,或丟棄;這取決于 exchange類型;
3)exchange類型有如下3種:
fanout 廣播, 把消費轉發給所有 綁定到該交換機的所有隊列;
direct 定向, 把消息轉發給符合 指定 routing key 路由鍵的隊列;
topic 通配符, 把消息交給 routing pattern(路由模式)的隊列;
4)exchange 交換機, 只負責轉發消息, 不具備存儲消息的能力; 因此如果沒有任何隊列與 exchange 綁定, 或者沒有符合規則的隊列, 那么消息會丟失;
5)發布訂閱模式:
5.1-每個消費者監聽自己的隊列;
5.2-生產者把消息發送給 broker, 由交換機把消息轉發到綁定此交換機的所有隊列;
6)交換機需要與隊列綁定, 綁定之后,一個消息可以被多個消費者收到;
【2】代碼(生產者1個,交換機exchange1個,但對應到2個隊列,即消息有2個replication)
生產者
/*** 發布訂閱模式生產者* 本文發布訂閱模式使用的交換機類型為廣播 fanout * @author tang rong */ public class PSProduer {/** 交換機類型 */static String FANOUT_EXCHANGE = "fanout_exchange";/** 隊列名稱1 */static String FANOUT_QUEUE_1 = "fanout_queue_1";/** 隊列名稱1 */static String FANOUT_QUEUE_2 = "fanout_queue_2";public static void main(String[] args) throws Exception {Connection conn = RBConnectionUtil.getConn(); // 創建連接Channel channel = conn.createChannel(); // 創建頻道/*** 聲明交換機* 參數1-交換機名稱 * 參數2-交換機類型(fanout, topic, direct, headers)*/channel.exchangeDeclare(FANOUT_EXCHANGE, BuiltinExchangeType.FANOUT);/*** 創建隊列* @param1 隊列名稱* @param2 是否持久化隊列* @param3 是否獨占本次連接 * @param4 是否在不使用的時候自動刪除隊列 * @param5 隊列其他參數 */ channel.queueDeclare(FANOUT_QUEUE_1, true, false, false, null);channel.queueDeclare(FANOUT_QUEUE_2, true, false, false, null);/*** 隊列綁定交換機 */channel.queueBind(FANOUT_QUEUE_1, FANOUT_EXCHANGE, "");channel.queueBind(FANOUT_QUEUE_2, FANOUT_EXCHANGE, "");/*** 發送消息 */long temp = 1; for (int i = 0; i < 1000; i++) { String msg = "發布訂閱模式消息,序號=" + (temp+i) + "時間=" + MyDateUtil.getNow();/*** 參數1 交換機名稱,沒有指定則使用默認交換機 Default change * 參數2 路由key,簡單模式可以傳遞隊列名稱 * 參數3 消息其他屬性 * 參數4 消息內容 */channel.basicPublish(FANOUT_EXCHANGE, "", null, msg.getBytes("UTF-8")); System.out.println("生產者發送消息" + msg); } System.out.println("=== 生產者消息發送完成");/* 關閉資源 */channel.close();conn.close(); } }消費者1
/*** 發布訂閱模式消費者1* @author tang rong */ public class PSConsumer1 {/** 交換機類型 */static String FANOUT_EXCHANGE = "fanout_exchange";/** 隊列名稱1 */static String FANOUT_QUEUE_1 = "fanout_queue_1";public static void main(String[] args) throws Exception {Connection conn = RBConnectionUtil.getConn(); // 創建連接 Channel channel = conn.createChannel(); // 創建隊列 channel.exchangeDeclare(FANOUT_EXCHANGE, BuiltinExchangeType.FANOUT); // 創建交換機/*** 創建隊列 * 參數1 隊列名稱 * 參數2 是否持久化* 參數3 是否獨占本連接 * 參數4 是否在不使用的時候自動刪除隊列* 參數5 隊列其他參數 */channel.queueDeclare(FANOUT_QUEUE_1, true, false, false, null);/*** 隊列綁定交換機*/channel.queueBind(FANOUT_QUEUE_1, FANOUT_EXCHANGE, "");/* 創建消費者,設置消息處理邏輯 */Consumer consumer = new DefaultConsumer(channel) {/*** @param consumerTag 消費者標簽,在 channel.basicConsume 可以指定 * @param envelope 消息包內容,包括消息id,消息routingkey,交換機,消息和重轉標記(收到消息失敗后是否需要重新發送) * @param properties 基本屬性* @param body 消息字節數組 */@Overridepublic void handleDelivery(String consumerTag, Envelope envelope,BasicProperties properties, byte[] body) throws IOException {System.out.println("=== 消費者1 start ===");System.out.println("路由key=" + envelope.getRoutingKey());System.out.println("交換機=" + envelope.getExchange());System.out.println("消息id=" + envelope.getDeliveryTag()); String message = new String(body, "UTF-8");System.out.println(String.format("消費者收到的消息【%s】", message)); System.out.println("=== 消費者1 end ===\n"); } };/*** 監聽消息* 參數1 隊列名稱 * 參數2 是否自動確認, 設置為true表示消息接收到自動向 mq回復ack;mq收到ack后會刪除消息; 設置為false則需要手動發送ack; * 參數3 消息接收后的回調 */channel.basicConsume(FANOUT_QUEUE_1, true, consumer); }}消費者2
/*** 發布訂閱模式消費者* @author tang rong */ public class PSConsumer2 {/** 交換機類型 */static String FANOUT_EXCHANGE = "fanout_exchange";/** 隊列名稱1 */static String FANOUT_QUEUE_2 = "fanout_queue_2";public static void main(String[] args) throws Exception {Connection conn = RBConnectionUtil.getConn(); // 創建連接 Channel channel = conn.createChannel(); // 創建隊列 channel.exchangeDeclare(FANOUT_EXCHANGE, BuiltinExchangeType.FANOUT); // 創建交換機/*** 創建隊列 * 參數1 隊列名稱 * 參數2 是否持久化* 參數3 是否獨占本連接 * 參數4 是否在不使用的時候自動刪除隊列* 參數5 隊列其他參數 */channel.queueDeclare(FANOUT_QUEUE_2, true, false, false, null);/*** 隊列綁定交換機*/channel.queueBind(FANOUT_QUEUE_2, FANOUT_EXCHANGE, "");/* 創建消費者,設置消息處理邏輯 */Consumer consumer = new DefaultConsumer(channel) {/*** @param consumerTag 消費者標簽,在 channel.basicConsume 可以指定 * @param envelope 消息包內容,包括消息id,消息routingkey,交換機,消息和重轉標記(收到消息失敗后是否需要重新發送) * @param properties 基本屬性* @param body 消息字節數組 */@Overridepublic void handleDelivery(String consumerTag, Envelope envelope,BasicProperties properties, byte[] body) throws IOException {System.out.println("=== 消費者2 start ===");System.out.println("路由key=" + envelope.getRoutingKey());System.out.println("交換機=" + envelope.getExchange());System.out.println("消息id=" + envelope.getDeliveryTag()); String message = new String(body, "UTF-8");System.out.println(String.format("消費者收到的消息【%s】", message)); System.out.println("=== 消費者2 end ===\n"); } };/*** 監聽消息* 參數1 隊列名稱 * 參數2 是否自動確認, 設置為true表示消息接收到自動向 mq回復ack;mq收到ack后會刪除消息; 設置為false則需要手動發送ack; * 參數3 消息接收后的回調 */channel.basicConsume(FANOUT_QUEUE_2, true, consumer); }}?
【3】小結
1)發布訂閱模式與工作模式的區別;
區別1)工作隊列模式不需要定義交換機, 發布訂閱模式需要;
區別2)工作隊列模式的生產者向隊列發送消息(底層使用默認交換機),? 發布訂閱模式的生產者向交換機發送消息;
區別3)工作隊列模式的隊列不需要與交換機綁定(底層與默認交換機綁定), 發布訂閱模式中的隊列需要與交換機綁定;
2)默認交換機
AMQP default
?
?
?
?
?
?
?
?
總結
以上是生活随笔為你收集整理的rabbitmq-发布订阅模式的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 下载的网站模板怎么使用(下载的网站模板怎
- 下一篇: rabbitmq-路由模式-routin