RabbitMQ入门-Routing直连模式
Hello World模式,告訴我們?nèi)绾我粚?duì)一發(fā)送和接收消息;
Work模式,告訴我們?nèi)绾味喙荦R下高效的消費(fèi)消息;
Publish/Subscribe模式,告訴我們?nèi)绾螐V播消息
那么有沒(méi)有靈活強(qiáng)一點(diǎn)的既可以高效消費(fèi),又可以同時(shí)送達(dá)多個(gè)消費(fèi)者的模式?
有,這就是Routing模式,我又稱(chēng)之為Direct直連模式。
Routing模式
一個(gè)生產(chǎn)者P,一個(gè)交換機(jī)X,多個(gè)消息隊(duì)列Q以及多個(gè)消費(fèi)者C
在Exchange和Queue中,我們看到了不同的規(guī)則,也就是Routing Key
顯然從圖中的說(shuō)明,我們就知道這是一個(gè)log日志根據(jù)級(jí)別派發(fā)消息的例子。熟悉Log日志系統(tǒng)的應(yīng)該都知道,一般的log系統(tǒng)分為error、info、warn和debug等。從圖中我們可以看出,將日志級(jí)別為error的定向的派發(fā)到第一個(gè)消息隊(duì)列,將error、warn和info級(jí)別的日志派發(fā)到第一個(gè)消息隊(duì)列。
該模型首先實(shí)現(xiàn)了定向派發(fā),而不再是訂閱模式那種廣播式的派發(fā)。同一條消息既可以派發(fā)給一個(gè)Queue,也可以同時(shí)派發(fā)給兩個(gè)或者多個(gè)Queue,這就是該模式的靈活之處。下面來(lái)看看實(shí)例
發(fā)送端
/*** Created by jackie on 17/8/7.*/ public class EmitLogDirect {private static final String EXCHANGE_NAME = "direct_logs";public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("192.168.3.161");Connection connection = factory.newConnection();Channel channel = connection.createChannel();channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);String severity = getSeverity(argv);String message = getMessage(argv);channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes("UTF-8"));System.out.println(" [x] Sent '" + severity + "':'" + message + "'");channel.close();connection.close();}private static String getSeverity(String[] strings){if (strings.length < 1)return "info";return strings[0];}private static String getMessage(String[] strings){if (strings.length < 2)return "Hello World!";return joinStrings(strings, " ", 1);}private static String joinStrings(String[] strings, String delimiter, int startIndex) {int length = strings.length;if (length == 0 ) return "";if (length < startIndex ) return "";StringBuilder words = new StringBuilder(strings[startIndex]);for (int i = startIndex + 1; i < length; i++) {words.append(delimiter).append(strings[i]);}return words.toString();} }String severity = getSeverity(argv);通過(guò)程序參數(shù)賦值給Routing Key,作為發(fā)送消息的規(guī)則
String message = getMessage(argv);通過(guò)程序參數(shù)賦值作為消息實(shí)體發(fā)送到Queue
在run configurations中配置argv
*第一個(gè)參數(shù)是要綁定key的名稱(chēng),第二個(gè)參數(shù)是要發(fā)送的消息內(nèi)容
- 運(yùn)行后,可以在RabbitMQ管理應(yīng)用中看到exchange,但是此時(shí)沒(méi)有綁定queue,所以即使發(fā)送消息也沒(méi)有queue會(huì)存儲(chǔ)或者消費(fèi)。
接收端
/*** Created by jackie on 17/8/7.*/ public class ReceiveLogsDirect {private static final String EXCHANGE_NAME = "direct_logs";public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("192.168.3.161");Connection connection = factory.newConnection();Channel channel = connection.createChannel();channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);String queueName = channel.queueDeclare().getQueue();if (argv.length < 1){System.err.println("Usage: ReceiveLogsDirect [info] [warning] [error]");System.exit(1);}for(String severity : argv){channel.queueBind(queueName, EXCHANGE_NAME, severity);}System.out.println(" [*] Waiting for messages. To exit press CTRL+C");Consumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope,AMQP.BasicProperties properties, byte[] body) throws IOException {String message = new String(body, "UTF-8");System.out.println(" [x] Received '" + envelope.getRoutingKey() + "':'" + message + "'");}};channel.basicConsume(queueName, true, consumer);} }channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);表示使用的exchange類(lèi)型為Direct類(lèi)型
綁定的queue的名稱(chēng)也是通過(guò)program arguments指定的
這里兩個(gè)參數(shù)info和error表示綁定了兩個(gè)routing key,即如果發(fā)送routing key為info的消息該隊(duì)列能接收到,如果發(fā)送routing key為error,該隊(duì)列也能收到
運(yùn)行情況
啟動(dòng)接收端代碼,我們可以看到生成了Queue名稱(chēng)為amq.gen-ugjKo6t4y0PXPwoh3CeubA的隊(duì)列,同時(shí)有routingKey=info和routingKey=error的綁定到了Exchange上。
這時(shí)候起送發(fā)送端給routingkey為info發(fā)送消息“hello world”,我們可以看到在接收端確實(shí)能夠收到消息“hello world”,同理,這時(shí)候發(fā)送routingkey為error的消息,該隊(duì)列同樣能夠接收到,因?yàn)殛?duì)列同時(shí)綁定了兩個(gè)routing key
這個(gè)就是Routing直連模式。
如果您覺(jué)得閱讀本文對(duì)您有幫助,請(qǐng)點(diǎn)一下“推薦”按鈕,您的“推薦”將是我最大的寫(xiě)作動(dòng)力!如果您想持續(xù)關(guān)注我的文章,請(qǐng)掃描二維碼,關(guān)注JackieZheng的微信公眾號(hào),我會(huì)將我的文章推送給您,并和您一起分享我日常閱讀過(guò)的優(yōu)質(zhì)文章。
轉(zhuǎn)載于:https://www.cnblogs.com/bigdataZJ/p/rabbitmq6.html
總結(jié)
以上是生活随笔為你收集整理的RabbitMQ入门-Routing直连模式的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: UILabel 调整行间距
- 下一篇: 动态树与静态树显示——(一)