rabbitmq+topic+java_译:5.RabbitMQ Java Client 之 Topics (主题)
我們使用的是direct(直接交換),而不是使用只能進行虛擬廣播的 fanout(扇出交換),并且有可能選擇性地接收日志。
雖然使用direct(直接交換)改進了我們的系統,但它仍然有局限性 - 它不能基于多個標準進行路由。
在我們的日志系統中,我們可能不僅要根據嚴重性訂閱日志,還要根據發出日志的源來訂閱日志。您可能從syslogunix工具中了解這個概念,該工具根據嚴重性(info / warn / crit ...)和facility(auth / cron / kern ...)來路由日志。
這會給我們帶來很大的靈活性 - 我們可能想聽聽來自'cron'的關鍵錯誤以及來自'kern'的所有日志。
要在我們的日志系統中實現這一點,我們需要了解更復雜的topic (主題交換)。
Topic exchange 主題交換
發送到主題交換的消息不能具有任意routing_key- 它必須是由點分隔的單詞列表。單詞可以是任何內容,但通常它們指定與消息相關的一些功能。一些有效的路由密鑰示例:“stock.usd.nyse”,“nyse.vmw”,“quick.orange.rabbit”。路由密鑰中可以包含任意數量的單詞,最多可達255個字節。
綁定密鑰也必須采用相同的形式。主題交換背后的邏輯類似于直接交換- 使用特定路由密鑰發送的消息將被傳遞到與匹配綁定密鑰綁定的所有隊列。但是,綁定鍵有兩個重要的特殊情況:
*(星號)可以替代一個單詞。
#(hash)可以替換零個或多個單詞。
在一個例子中解釋這個是最容易的:
在這個例子中,我們將發送所有描述動物的消息。消息將與包含三個單詞(兩個點)的路由鍵一起發送。
路由鍵中的第一個單詞將描述速度,第二個是顏色,第三個是物種:“。。”。
我們創建了三個綁定:Q1綁定了綁定鍵“* .orange。*”,Q2綁定了“*。*。rabbit”和“lazy。#”。
這些綁定可以概括為:
Q1對所有橙色動物感興趣。
Q2希望聽到關于兔子的一切,以及關于懶惰動物的一切。
路由密鑰設置為“quick.orange.rabbit”的消息將傳遞到兩個隊列。消息“lazy.orange.elephant”也將同時發送給他們。另一方面,“quick.orange.fox”只會進入第一個隊列,而“lazy.brown.fox”只會進入第二個隊列。“lazy.pink.rabbit”將僅傳遞到第二個隊列一次,即使它匹配兩個綁定。“quick.brown.fox”與任何綁定都不匹配,因此它將被丟棄。
如果我們違反合同并發送帶有一個或四個單詞的消息,例如“orange”或“quick.orange.male.rabbit”,會發生什么?好吧,這些消息將不匹配任何綁定,將丟失。
另一方面,“lazy.orange.male.rabbit”,即使它有四個單詞,也會匹配最后一個綁定,并將被傳遞到第二個隊列。
主題交流
主題交換功能強大,可以像其他交易所一樣。
當隊列與“#”(哈希)綁定密鑰綁定時 - 它將接收所有消息,而不管路由密鑰 - 如扇出交換。
當特殊字符“*”(星號)和“#”(哈希)未在綁定中使用時,主題交換的行為就像直接交換一樣
把它們放在一起
我們將在日志記錄系統中使用主題交換。我們將首先假設日志的路由鍵有兩個詞:“。”。
EmitLogTopic.java
importcom.rabbitmq.client.BuiltinExchangeType;importcom.rabbitmq.client.Channel;importcom.rabbitmq.client.Connection;importcom.rabbitmq.client.ConnectionFactory;public classEmitLogTopic {private static final String EXCHANGE_NAME = "topic_logs";public static voidmain(String[] argv) {
Connection connection= null;
Channel channel= null;try{
ConnectionFactory factory= newConnectionFactory();
factory.setHost("localhost");
connection=factory.newConnection();
channel=connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
String routingKey=getRouting(argv);
String message=getMessage(argv);
channel.basicPublish(EXCHANGE_NAME, routingKey,null, message.getBytes("UTF-8"));
System.out.println(" [x] Sent '" + routingKey + "':'" + message + "'");
}catch(Exception e) {
e.printStackTrace();
}finally{if (connection != null) {try{
connection.close();
}catch(Exception ignore) {
}
}
}
}private staticString getRouting(String[] strings) {if (strings.length < 1)return "anonymous.info";return strings[0];
}private staticString getMessage(String[] strings) {if (strings.length < 2)return "Hello World!";return joinStrings(strings, " ", 1);
}private static String joinStrings(String[] strings, String delimiter, intstartIndex) {int length =strings.length;if (length == 0)return "";if (length
StringBuilder words= newStringBuilder(strings[startIndex]);for (int i = startIndex + 1; i < length; i++) {
words.append(delimiter).append(strings[i]);
}returnwords.toString();
}
}
ReceiveLogsTopic.java
importjava.io.IOException;importcom.rabbitmq.client.AMQP;importcom.rabbitmq.client.BuiltinExchangeType;importcom.rabbitmq.client.Channel;importcom.rabbitmq.client.Connection;importcom.rabbitmq.client.ConnectionFactory;importcom.rabbitmq.client.Consumer;importcom.rabbitmq.client.DefaultConsumer;importcom.rabbitmq.client.Envelope;public classReceiveLogsTopic {private static final String EXCHANGE_NAME = "topic_logs";public static void main(String[] argv) throwsException {
ConnectionFactory factory= newConnectionFactory();
factory.setHost("localhost");
Connection connection=factory.newConnection();
Channel channel=connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
String queueName=channel.queueDeclare().getQueue();if (argv.length < 1) {
System.err.println("Usage: ReceiveLogsTopic [binding_key]...");
System.exit(1);
}for(String bindingKey : argv) {
channel.queueBind(queueName, EXCHANGE_NAME, bindingKey);
}
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
Consumer consumer= newDefaultConsumer(channel) {
@Overridepublic voidhandleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties,byte[] body) throwsIOException {
String message= new String(body, "UTF-8");
System.out.println(" [x] Received '" + envelope.getRoutingKey() + "':'" + message + "'");
}
};
channel.basicConsume(queueName,true, consumer);
}
}
總結
以上是生活随笔為你收集整理的rabbitmq+topic+java_译:5.RabbitMQ Java Client 之 Topics (主题)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: linux查询tcp异常,linux服务
- 下一篇: mysql xa_Mysql对XA的支持