rabbitmq-通配符模式
【README】
本文介紹 通配符模式,及代碼示例
【1】intro to rabbitmq通配符模式
0)通配符模式-交換機(jī)類型為 Topic;
1)與路由模式相比,相同點(diǎn)是 兩者都可以通過(guò) routingkey 把消息轉(zhuǎn)發(fā)到不同的隊(duì)列;
不同點(diǎn)是通配符模式-topic類型的exchange可以讓隊(duì)列在綁定routing key的時(shí)候使用通配符;
2)通配符模式的routingkey 通常使用多個(gè)單詞并用點(diǎn)號(hào)連接,如 item.insert ;
3)通配符規(guī)則:
# 匹配一個(gè)或多個(gè)詞;
* 匹配不多不少一個(gè)詞; ?
荔枝:
item.# 能夠匹配 item.insert.abc 或 item.insert? ; (可以多層)
item.* 能夠匹配 item.insert ;? (只能一層)
refers2 https://www.rabbitmq.com/tutorials/tutorial-five-java.html?
4)新建隊(duì)列
5)把隊(duì)列綁定到交換機(jī)?
6)生產(chǎn)者發(fā)送消息到隊(duì)列,路由key 分別是 item.insert , item.update, item.delete ; 如下:
【2】代碼
生產(chǎn)者
/*** 通配符模式-交換機(jī)類型為TOPIC*/ public class WildProducer {/* 交換機(jī)名稱 */static final String TOPIC_EXCHANGE = "topic_exchange"; /*隊(duì)列名稱1*/ static final String TOPIC_QUEUE_1 = "topic_queue_1";/*隊(duì)列名稱2*/static final String TOPIC_QUEUE_2 = "topic_queue_2";public static void main(String[] args) throws Exception {/*獲取連接*/Connection conn = RBConnectionUtil.getConn();// 創(chuàng)建頻道 Channel channel = conn.createChannel();/*** 聲明交換機(jī)* 參數(shù)1-交換機(jī)名稱 * 參數(shù)2-交換機(jī)類型(fanout, topic, direct, headers)*/channel.exchangeDeclare(TOPIC_EXCHANGE, BuiltinExchangeType.TOPIC); /*** routingkey-路由鍵 */String itemInsertRoutingKey = "item.insert"; String itemUpdateRoutingKey = "item.update";String itemDeleteRoutingKey = "item.delete";/* 發(fā)送消息-insert */ /*** 參數(shù)1 交換機(jī)名稱 如果沒(méi)有指定則使用默認(rèn) default exchange * 參數(shù)2 routingkey-路由key, 簡(jiǎn)單模式可以傳遞隊(duì)列名稱 * 參數(shù)3 消息其他屬性* 參數(shù)4 消息內(nèi)容 */String insertMsg = "我是消息,通配符模式,routingkey=" + itemInsertRoutingKey + MyDateUtil.getNow();channel.basicPublish(TOPIC_EXCHANGE, itemInsertRoutingKey, null, insertMsg.getBytes());System.out.println("已發(fā)送消息=" + insertMsg); String updMsg = "我是消息,通配符模式,routingkey=" + itemUpdateRoutingKey + MyDateUtil.getNow();channel.basicPublish(TOPIC_EXCHANGE, itemUpdateRoutingKey, null, updMsg.getBytes());System.out.println("已發(fā)送消息=" + updMsg);String deleteMsg = "我是消息,通配符模式,routingkey=" + itemDeleteRoutingKey + MyDateUtil.getNow();channel.basicPublish(TOPIC_EXCHANGE, itemDeleteRoutingKey, null, deleteMsg.getBytes());System.out.println("已發(fā)送消息=" + deleteMsg);/* 關(guān)閉連接和信道 */ channel.close();conn.close(); } }消費(fèi)者1? topic_queue_1
/*** 通配符模式消費(fèi)者-routingkey */ public class RouteConsumerWild1 {/* 交換機(jī)名稱 */static final String TOPIC_EXCHANGE = "topic_exchange"; /*隊(duì)列名稱1*/ static final String TOPIC_QUEUE_1 = "topic_queue_1";public static void main(String[] args) throws Exception {/*創(chuàng)建連接 */Connection conn = RBConnectionUtil.getConn();/*創(chuàng)建隊(duì)列*/Channel channel = conn.createChannel(); /*聲明交換機(jī)*/channel.exchangeDeclare(TOPIC_EXCHANGE, BuiltinExchangeType.TOPIC);/*** routingkey-路由鍵 */String itemInsertRoutingKey = "item.insert"; String itemUpdateRoutingKey = "item.update";String itemDeleteRoutingKey = "item.delete";/*** 聲明/創(chuàng)建隊(duì)列 * 參數(shù)1 隊(duì)列名稱 * 參數(shù)2 是否持久化* 參數(shù)3 是否獨(dú)占本連接 * 參數(shù)4 是否在不使用的時(shí)候自動(dòng)刪除隊(duì)列* 參數(shù)5 隊(duì)列其他參數(shù) */ // channel.queueDeclare(TOPIC_QUEUE_1, true, false, false, null); // ui界面可以創(chuàng)建隊(duì)列 /*** 隊(duì)列綁定交換機(jī)* 參數(shù)1 隊(duì)列名稱* 參數(shù)2 交換機(jī)* 參數(shù)3 routingkey-路由鍵 */ // channel.queueBind(TOPIC_QUEUE_1, TOPIC_EXCHANGE, "item.#"); // ui界面可以把隊(duì)列綁定到交換機(jī) /* 創(chuàng)建消費(fèi)者,設(shè)置消息處理邏輯 */Consumer consumer = new DefaultConsumer(channel) {/*** @param consumerTag 消費(fèi)者標(biāo)簽,在 channel.basicConsume 可以指定 * @param envelope 消息包內(nèi)容,包括消息id,消息routingkey,交換機(jī),消息和重轉(zhuǎn)標(biāo)記(收到消息失敗后是否需要重新發(fā)送) * @param properties 基本屬性* @param body 消息字節(jié)數(shù)組 */@Overridepublic void handleDelivery(String consumerTag, Envelope envelope,BasicProperties properties, byte[] body) throws IOException {System.out.println("=== 消費(fèi)者1 start ===");System.out.println("路由key=" + envelope.getRoutingKey());System.out.println("交換機(jī)=" + envelope.getExchange());System.out.println("消息id=" + envelope.getDeliveryTag()); String message = new String(body, "UTF-8");System.out.println(String.format("消費(fèi)者收到的消息【%s】", message)); System.out.println("=== 消費(fèi)者1 end ===\n"); } };/*** 監(jiān)聽(tīng)消息 * 參數(shù)1 隊(duì)列名稱 * 參數(shù)2 是否自動(dòng)確認(rèn), 設(shè)置為true表示消息接收到自動(dòng)向 mq回復(fù)ack;mq收到ack后會(huì)刪除消息; 設(shè)置為false則需要手動(dòng)發(fā)送ack; * 參數(shù)3 消息接收后的回調(diào) */channel.basicConsume(TOPIC_QUEUE_1, true, consumer); } }消費(fèi)者2 topic_queue_2
/*** 通配符模式消費(fèi)者-routingkey */ public class RouteConsumerWild2 {/* 交換機(jī)名稱 */static final String TOPIC_EXCHANGE = "topic_exchange"; /*隊(duì)列名稱1*/ static final String TOPIC_QUEUE_2 = "topic_queue_2";public static void main(String[] args) throws Exception {/*創(chuàng)建連接 */Connection conn = RBConnectionUtil.getConn();/*創(chuàng)建隊(duì)列*/Channel channel = conn.createChannel(); /*聲明交換機(jī)*/channel.exchangeDeclare(TOPIC_EXCHANGE, BuiltinExchangeType.TOPIC);/*** routingkey-路由鍵 */String itemInsertRoutingKey = "item.insert"; String itemUpdateRoutingKey = "item.update";String itemDeleteRoutingKey = "item.delete";/*** 聲明/創(chuàng)建隊(duì)列 * 參數(shù)1 隊(duì)列名稱 * 參數(shù)2 是否持久化* 參數(shù)3 是否獨(dú)占本連接 * 參數(shù)4 是否在不使用的時(shí)候自動(dòng)刪除隊(duì)列* 參數(shù)5 隊(duì)列其他參數(shù) */ // channel.queueDeclare(TOPIC_QUEUE_2, true, false, false, null); // ui界面可以創(chuàng)建隊(duì)列 /*** 隊(duì)列綁定交換機(jī)* 參數(shù)1 隊(duì)列名稱 * 參數(shù)2 交換機(jī)* 參數(shù)3 routingkey-路由鍵 */ // channel.queueBind(TOPIC_QUEUE_2 TOPIC_EXCHANGE, "*.delete"); // ui界面可以把隊(duì)列綁定到交換機(jī) /* 創(chuàng)建消費(fèi)者,設(shè)置消息處理邏輯 */Consumer consumer = new DefaultConsumer(channel) {/** * @param consumerTag 消費(fèi)者標(biāo)簽,在 channel.basicConsume 可以指定 * @param envelope 消息包內(nèi)容,包括消息id,消息routingkey,交換機(jī),消息和重轉(zhuǎn)標(biāo)記(收到消息失敗后是否需要重新發(fā)送) * @param properties 基本屬性* @param body 消息字節(jié)數(shù)組 */@Overridepublic void handleDelivery(String consumerTag, Envelope envelope,BasicProperties properties, byte[] body) throws IOException {System.out.println("=== 消費(fèi)者1 start ===");System.out.println("路由key=" + envelope.getRoutingKey());System.out.println("交換機(jī)=" + envelope.getExchange());System.out.println("消息id=" + envelope.getDeliveryTag()); String message = new String(body, "UTF-8");System.out.println(String.format("消費(fèi)者收到的消息【%s】", message)); System.out.println("=== 消費(fèi)者1 end ===\n"); } };/*** 監(jiān)聽(tīng)消息 * 參數(shù)1 隊(duì)列名稱 * 參數(shù)2 是否自動(dòng)確認(rèn), 設(shè)置為true表示消息接收到自動(dòng)向 mq回復(fù)ack;mq收到ack后會(huì)刪除消息; 設(shè)置為false則需要手動(dòng)發(fā)送ack; * 參數(shù)3 消息接收后的回調(diào) */channel.basicConsume(TOPIC_QUEUE_2, true, consumer); } }【3】 rabbitmq 模式總結(jié)??
8.1)模式1 簡(jiǎn)單模式 helloworld
一個(gè)生產(chǎn)者,一個(gè)消費(fèi)者,不需要設(shè)置交換機(jī),使用默認(rèn)交換機(jī);
8.2)模式2 工作隊(duì)列模式 work queue
一個(gè)生產(chǎn)者,多個(gè)消費(fèi)者(競(jìng)爭(zhēng)關(guān)系),不需要設(shè)置交換機(jī)(使用默認(rèn)交換機(jī));
8.3)發(fā)布訂閱模式? publish/subscribe
需要設(shè)置類型為 fanout-廣播的交換機(jī),并且交換機(jī)和隊(duì)列進(jìn)行綁定,當(dāng)發(fā)送消息到交換機(jī)后,交換機(jī)會(huì)將消息發(fā)送到綁定的隊(duì)列;
8.4)路由模式 routing
需要設(shè)置類型為 direct的交換機(jī), 交換機(jī)和隊(duì)列進(jìn)行綁定,并且指定routing key,當(dāng)發(fā)送消息到交換機(jī)后,交換機(jī)會(huì)根據(jù)routing key 將消息發(fā)送到對(duì)應(yīng)隊(duì)列;
8.5)通配符模式 topic
需要設(shè)置類型為 topic的交換機(jī), 交換機(jī)和隊(duì)列進(jìn)行綁定, 并且指定通配符方式的routing key, 當(dāng)發(fā)送消息到交換機(jī)后,交換機(jī)會(huì)根據(jù) routing key將消息發(fā)送到對(duì)應(yīng)的隊(duì)列;
?
?
總結(jié)
以上是生活随笔為你收集整理的rabbitmq-通配符模式的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: 哪个安卓模拟器不卡不玩游戏(哪个安卓模拟
- 下一篇: Java秒杀系统实战系列~RabbitM