rabbitmq-5-案例2-简单的案例+exchange
Exchange交換機:
sendMessage端,發送消息到Exchage1、2, 然后交換機通過路由鍵,將消息轉發給隊列queue中,最后客戶端從隊列中獲取消息
?
交換屬性:
name:名稱
type:類型direct、topic、fanout、headers
durability:是否持久化
autoDelete:當最后一個綁定到exchange上的隊列刪除了,自動刪除exchage(即當交換機沒有和任何隊列關聯時,將自動刪除交換機)
internal:默認false(當前交換機是否為rabbitmq內部使用)
arguments:擴展參數
?
direct exchage:直連交換機
所有發送到Direct Exchage的消息被轉發到RouterKey中指定的queue中
?
代碼例子:
該例子和之前的quickStart類似,只是指令了交換機和路由鍵,其他代碼相同
ConnectionFactory、Connection、Channel
ConnectionFactory、Connection、Channel,這三個都是RabbitMQ對外提供的API中最基本的對象。不管是服務器端還是客戶端都會首先創建這三類對象。
? ? ?ConnectionFactory為Connection的制造工廠。
Connection是與RabbitMQ服務器的socket鏈接,它封裝了socket協議及身份驗證相關部分邏輯。
Channel是我們與RabbitMQ打交道的最重要的一個接口,大部分的業務操作是在Channel這個接口中完成的,包括定義Queue、定義Exchange、綁定Queue與Exchange、發布消息等。
Queue(隊列)是RabbitMQ的內部對象,用于存儲消息
?
生產端;
package com.xsxy.rabbitmq.demo.direct;import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory;/*** ProduceDirect*/ public class ProduceDirect {public static void main(String[] args) throws Exception {// 1、創建連接工廠ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("59.110.232.8");connectionFactory.setPort(5672);connectionFactory.setVirtualHost("/");connectionFactory.setUsername("woaini");connectionFactory.setPassword("woaini");// 2、通過工廠獲取連接Connection connection = connectionFactory.newConnection();// 3、獲取通道Channel channel = connection.createChannel();String exchangeName = "directExchange";String exchagenType = "direct";String queueName = "test.direct.queue";String routingKey = "test.direct";// 4、聲明交換機channel.exchangeDeclare(exchangeName, exchagenType, true, false, null);// 5、聲明隊列channel.queueDeclare(queueName, true, false, false, null);// 6、簡歷綁定關系channel.queueBind(queueName, exchangeName, routingKey);// 7發行消息for (int i = 0; i < 5; i++) {channel.basicPublish(exchangeName, routingKey, null, "testdirectexchage message".getBytes());}System.err.println("消息已發送===========");channel.close();connection.close();} }消費端:
package com.xsxy.rabbitmq.demo.direct;import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.QueueingConsumer;import java.io.IOException; import java.util.concurrent.TimeoutException;public class ConsumerDirect {public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {// 1、創建連接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("59.110.232.8");connectionFactory.setPort(5672);connectionFactory.setVirtualHost("/");connectionFactory.setUsername("woaini");connectionFactory.setPassword("woaini");// 2、通過Factory創建連接Connection connection = connectionFactory.newConnection();// 3、創建通道Channel channel = connection.createChannel();String queueName = "test.direct.queue";// 聲明隊列channel.queueDeclare(queueName, true, false, false, null);// 4、創建消費去列QueueingConsumer queueingConsumer = new QueueingConsumer(channel);// 5、消費消息(非自動確認)channel.basicConsume(queueName, false, queueingConsumer);// 6、監聽消息System.out.println("正在監聽消息===");while (true) {QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();byte[] body = delivery.getBody();System.out.println(new String(body));// 手動確認channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);}} }運行結果:
?
?
?
?
隊列消費者,用于監聽隊列中的消息。調用nextDelivery方法時,內部實現就是調用隊列的take方法。該方法的作用:獲取并移除此隊列的頭部,在元素變得可用之前一直等待(如果有必要)。說白了就是如果沒有消息,就處于阻塞狀態。
運行結果如下:(生產者、消費者誰先運行都可以)
?
相關博客:
https://www.kancloud.cn/longxuan/rabbitmq-arron/117514
?
總結
以上是生活随笔為你收集整理的rabbitmq-5-案例2-简单的案例+exchange的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: rabbitmq-5-案例1-简单的案例
- 下一篇: cocos2d-lua-win