RabbitMQ实例教程:主题交换机
前面的例子中,盡管我們使用了direct路由代替fanout路由解決了盲目廣播的問題,但direct路由也有它的缺陷,他不能基于多個(gè)標(biāo)準(zhǔn)做路由轉(zhuǎn)發(fā)。
在上面的日志系統(tǒng)中,如果不僅想基于日志等級(jí)做訂閱,也想根據(jù)日志的發(fā)生源做訂閱該怎么處理呢?這時(shí)候你可能想到了unix系統(tǒng)工具中的syslog服務(wù),它不僅基于日志等級(jí)(info/warn/crit...)進(jìn)行路由轉(zhuǎn)發(fā),也會(huì)根據(jù)操作(auth/cron/kern...)做路由轉(zhuǎn)發(fā)。
如果是那樣的話,日志系統(tǒng)就靈活多了,它不僅能夠監(jiān)聽來自‘cron’的關(guān)鍵錯(cuò)誤,也能監(jiān)聽來自'kern'的所有日志。其實(shí)主題交換機(jī)(topic exchange)就能解決這種問題。
主題交換機(jī)(Topic exchange)
主題交換機(jī)的路由代碼不能是任意寫的,必須是小樹點(diǎn)分隔開的一組單詞列表。這些單詞可以隨便寫,但通常是與連接消息特征有關(guān)的單詞。有效地路由代碼應(yīng)該是這樣的“stock.usd.nyse”,“nyse.vmw”,“quick.orange.rabbit”。路由代碼可以隨便寫,但是長度限制在255字節(jié)。
注意,綁定代碼也必須在同一個(gè)表單中。topic交換機(jī)與direct交換機(jī)類似-具有特定路由代碼的消息會(huì)傳送給所有匹配綁定代碼的隊(duì)列,但有兩個(gè)特殊的綁定代碼:
* :它能替代一個(gè)單詞
#:它能替代0或多個(gè)單詞
該例子中,我們給所有的動(dòng)物發(fā)送消息,符合由三個(gè)單詞(第一個(gè)單詞描述速度;第二個(gè)單詞描述顏色;第三個(gè)單詞描述物種)組成的路由代碼將會(huì)發(fā)送消息:“<speed>.<colour>.<species>”。
? ?
我們創(chuàng)建了三個(gè)綁定:Q1使用“*.orange.*”綁定,Q2使用“*.*.rabbit”和“l(fā)azy.#”綁定。這些綁定的意義如下:
??
Q1描述了所有顏色為橙色的動(dòng)物。
??
Q2描述了是兔子的動(dòng)物和懶惰的動(dòng)物。
??
這樣,“quick.orange.rabbit”消息通過路由轉(zhuǎn)發(fā)給Q1、Q2兩個(gè)隊(duì)列。"lazy.orange.elephant"消息也會(huì)轉(zhuǎn)發(fā)給Q1、Q2兩個(gè)隊(duì)列。“quick.orange.fox”消息只會(huì)轉(zhuǎn)發(fā)給Q1隊(duì)列,"lazy.brown.fox"也只會(huì)轉(zhuǎn)發(fā)給Q2隊(duì)列。"lazy.pink.rabbit"會(huì)轉(zhuǎn)發(fā)給Q2隊(duì)列一次,盡管它匹配兩個(gè)綁定。"quick.brown.fox"并不匹配任何一個(gè)隊(duì)列就會(huì)被廢棄。
??
如果我們打破規(guī)則,每次只發(fā)一個(gè)或四個(gè)單詞的話,如“orange”或”quick.orange.male.rabbit“,這些消息不匹配任何綁定,就會(huì)被廢棄。但如果發(fā)送”lazy.orange.male.rabbit“這樣的消息的話,由于它匹配最后的綁定仍會(huì)被轉(zhuǎn)發(fā)到Q2隊(duì)列中。
? ??
主題交換機(jī)是一種非常強(qiáng)大的交換機(jī),當(dāng)它只綁定”#“時(shí),它會(huì)接收所有的消息,與fanout交換機(jī)類似。當(dāng)沒有使用”*“和”#“符號(hào)時(shí),主題交換機(jī)的作用等同與direct交換機(jī)。
源代碼
EmitLogTopic.java
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 | package?com.favccxx.favrabbit; import?com.rabbitmq.client.Channel; import?com.rabbitmq.client.Connection; import?com.rabbitmq.client.ConnectionFactory; public?class?EmitLogTopic?{ ????private?static?final?String?EXCHANGE_NAME?=?"topic_logs"; ????public?static?void?main(String[]?argv)?{ ????????Connection?connection?=?null; ????????Channel?channel?=?null; ????????try?{ ????????????ConnectionFactory?factory?=?new?ConnectionFactory(); ????????????factory.setHost("localhost"); ????????????connection?=?factory.newConnection(); ????????????channel?=?connection.createChannel(); ????????????channel.exchangeDeclare(EXCHANGE_NAME,?"topic"); ????????????String[]?routingKeys?=?{?"fast.orange.duck",?"slow.orange.fish",?"grey.rabbit",?"fast.black.rabbit", ????????????????????"quick.white.rabbit",?"lazy.dog",?"lazy.black.pig"?}; ????????????String[]?messages?=?{?"Hello",?"Guys",?"Girls",?"Babies"?}; ????????????for?(int?i?=?0;?i?<?routingKeys.length;?i++)?{ ????????????????for?(int?j?=?0;?j?<?messages.length;?j++)?{ ????????????????????channel.basicPublish(EXCHANGE_NAME,?routingKeys[i],?null,?messages[j].getBytes("UTF-8")); ????????????????????System.out.println("?[x]?Sent?'"?+?routingKeys[i]?+?"':'"?+?messages[j]?+?"'"); ????????????????} ????????????} ????????}?catch?(Exception?e)?{ ????????????e.printStackTrace(); ????????}?finally?{ ????????????if?(connection?!=?null)?{ ????????????????try?{ ????????????????????connection.close(); ????????????????}?catch?(Exception?ignore)?{ ????????????????} ????????????} ????????} ????} } |
ReceiveLogsTopic.java
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 | package?com.favccxx.favrabbit; import?java.io.IOException; import?com.rabbitmq.client.AMQP; import?com.rabbitmq.client.Channel; import?com.rabbitmq.client.Connection; import?com.rabbitmq.client.ConnectionFactory; import?com.rabbitmq.client.Consumer; import?com.rabbitmq.client.DefaultConsumer; import?com.rabbitmq.client.Envelope; public?class?ReceiveLogsTopic?{ ????private?static?final?String?EXCHANGE_NAME?=?"topic_logs"; ????public?static?void?main(String[]?argv)?throws?Exception?{ ????????ConnectionFactory?factory?=?new?ConnectionFactory(); ????????factory.setHost("localhost"); ????????Connection?connection?=?factory.newConnection(); ????????Channel?channel?=?connection.createChannel(); ????????channel.exchangeDeclare(EXCHANGE_NAME,?"topic"); ????????String?queueName?=?channel.queueDeclare().getQueue(); ????????String[]?bindingKeys?=?{?"*.orange.*",?"*.*.rabbit",?"lazy.#"?}; ????????for?(final?String?bindingKey?:?bindingKeys)?{ ????????????channel.queueBind(queueName,?EXCHANGE_NAME,?bindingKey); ????????????Consumer?consumer?=?new?DefaultConsumer(channel)?{ ????????????????@Override ????????????????public?void?handleDelivery(String?consumerTag,?Envelope?envelope,?AMQP.BasicProperties?properties, ????????????????????????byte[]?body)?throws?IOException?{ ????????????????????String?message?=?new?String(body,?"UTF-8"); ????????????????????System.out.println("["?+?bindingKey?+?"]?Received?message?:'"?+?message?+?"'?from?routingKey?:?"?+?envelope.getRoutingKey()); ????????????????} ????????????}; ????????????channel.basicConsume(queueName,?true,?consumer); ????????} ????} } |
運(yùn)行消息發(fā)送器,在消息接收平臺(tái)輸出內(nèi)容如下
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 | [*.orange.*]?Received?message?:'Hello'?from?routingKey?:?fast.orange.duck [*.*.rabbit]?Received?message?:'Guys'?from?routingKey?:?fast.orange.duck [lazy.#]?Received?message?:'Girls'?from?routingKey?:?fast.orange.duck [*.orange.*]?Received?message?:'Babies'?from?routingKey?:?fast.orange.duck [*.*.rabbit]?Received?message?:'Hello'?from?routingKey?:?slow.orange.fish [lazy.#]?Received?message?:'Guys'?from?routingKey?:?slow.orange.fish [*.orange.*]?Received?message?:'Girls'?from?routingKey?:?slow.orange.fish [*.*.rabbit]?Received?message?:'Babies'?from?routingKey?:?slow.orange.fish [lazy.#]?Received?message?:'Hello'?from?routingKey?:?fast.black.rabbit [*.orange.*]?Received?message?:'Guys'?from?routingKey?:?fast.black.rabbit [*.*.rabbit]?Received?message?:'Girls'?from?routingKey?:?fast.black.rabbit [lazy.#]?Received?message?:'Babies'?from?routingKey?:?fast.black.rabbit [*.orange.*]?Received?message?:'Hello'?from?routingKey?:?quick.white.rabbit [*.*.rabbit]?Received?message?:'Guys'?from?routingKey?:?quick.white.rabbit [lazy.#]?Received?message?:'Girls'?from?routingKey?:?quick.white.rabbit [*.orange.*]?Received?message?:'Babies'?from?routingKey?:?quick.white.rabbit [*.*.rabbit]?Received?message?:'Hello'?from?routingKey?:?lazy.dog [lazy.#]?Received?message?:'Guys'?from?routingKey?:?lazy.dog [*.orange.*]?Received?message?:'Girls'?from?routingKey?:?lazy.dog [*.*.rabbit]?Received?message?:'Babies'?from?routingKey?:?lazy.dog [lazy.#]?Received?message?:'Hello'?from?routingKey?:?lazy.black.pig [*.orange.*]?Received?message?:'Guys'?from?routingKey?:?lazy.black.pig [*.*.rabbit]?Received?message?:'Girls'?from?routingKey?:?lazy.black.pig [lazy.#]?Received?message?:'Babies'?from?routingKey?:?lazy.black.pig |
本文轉(zhuǎn)自 genuinecx 51CTO博客,原文鏈接:http://blog.51cto.com/favccxx/1703031,如需轉(zhuǎn)載請自行聯(lián)系原作者
總結(jié)
以上是生活随笔為你收集整理的RabbitMQ实例教程:主题交换机的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 使用Laravel Eloquent O
- 下一篇: vbkey常数