RabbitMQ使用手册
1. RabbitMQ配置
1.1RabbitMQ管理命令行
# 1.服務器啟動相關命令行 systemctl start|restart|stop|status rabbitmq-server # 2.管理命令行 rabbitmqctl help #查看更多命令 # 3.插件管理命令行 rabbitmq-plugins enable|disable|list1.2 Web管理界面介紹
1.2.1 OverView概覽
2. 消息隊列模式
第一種模型(直連)
在上圖所示的模型中有以下概念
- P:生產者
- C:消費者
- queue:消息隊列
1.開發生產者
connection = RabbitmqUtils.getConnection(); channel = connection.createChannel(); /*** 參數一:隊列名字(不存在會自動創建)、參數二:是否要持久化、* 參數三:是否獨占隊列、參數四:是否自動刪除隊列* 參數五:額外參數*/ channel.queueDeclare("hello", false, false, false, null); /*** 參數一:交換機名稱、參數二:隊列名稱、* 參數三:額外消息設置、參數四:消息具體內容*/ channel.basicPublish("", "hello", false, false, null, "hello rabbitmq".getBytes());2.消費者
Connection connection = RabbitMqUtils.getConnection(); Channel channel = connection.createChannel(); //通道綁定隊列 channel.queueDeclare("hello",false,false,false,null); /*** 參數一:隊列名稱、參數二:消息自動確認*/ channel.basicConsume("hello",true,new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println(new String(body));} });工具類RabbitMQUtils.java
package com.rabbitmq.study;import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory;import java.io.IOException; import java.util.concurrent.TimeoutException;/*** @FileName: RabbitMQUtils* @Author Steven* @Date: 2020/11/14*/ public class RabbitMQUtils {private static ConnectionFactory factory;static {factory = new ConnectionFactory();factory.setHost("xxx.xxx.xxx.xxx");factory.setPort(5672);factory.setVirtualHost("/cms");//這只訪問虛擬主機的用戶名、密碼factory.setUsername("ems");factory.setPassword("123");}public static Connection getConnection() {try {return factory.newConnection();} catch (IOException e) {e.printStackTrace();} catch (TimeoutException e) {e.printStackTrace();}return null;}public static void closeChannelAndConn(Channel channel, Connection conn) {try {if (channel != null) {channel.close();}if (conn != null) {conn.close();}} catch (Exception e) {e.printStackTrace();}} }3.參數說明
/** * 參數一:隊列名字(不存在會自動創建)、參數二:是否要持久化、 * 參數三:是否獨占隊列、參數四:是否自動刪除隊列 * 參數五:額外參數 */ channel.queueDeclare("aa", true, false, false, null);第二種模型(work queue)
work queue,也被稱為(task queue),任務模型,多個消費者綁定一個隊列共同消費,隊列中的消息一旦被消費就會消失,因此消息不回被重復消費
角色:
- P:生產者,任務發布
- queue:隊列存放消息
- C1:消費者1,消費消息較慢
- C2:消費者2,消費消息較快
1.生產者Send.java
Connection connection = RabbitMQUtils.getConnection(); Channel channel = connection.createChannel(); /*** 參數一:隊列名字(不存在會自動創建)、參數二:是否要持久化、* 參數三:是否獨占隊列、參數四:是否自動刪除隊列* 參數五:額外參數*/ channel.queueDeclare("work", true, false, false, null); /*** 參數一:交換機名稱、參數二:隊列名稱、* 參數三:額外消息設置、參數四:消息具體內容*/ for (int i = 1; i <=100 ; i++) {channel.basicPublish("", "work", false, false, MessageProperties.PERSISTENT_TEXT_PLAIN, (i+"hello rabbitmq").getBytes()); } RabbitMQUtils.closeChannelAndConn(channel,connection);2.消費者:Recv1.java
Connection connection = RabbitMQUtils.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare("work",true,false,false,null); channel.basicConsume("work",true,new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("消費者1:"+new String(body));try {TimeUnit.SECONDS.sleep(2);} catch (InterruptedException e) {e.printStackTrace();}} });3.消費者:Recv2.java
Connection connection = RabbitMQUtils.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare("work",true,false,false,null); channel.basicConsume("work",true,new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("消費者2:"+new String(body));try {TimeUnit.SECONDS.sleep(2);} catch (InterruptedException e) {e.printStackTrace();}} });消息確認機制
1.消費者:Recv1.java
Connection connection = RabbitMQUtils.getConnection(); Channel channel = connection.createChannel(); //每次只能消費一個消息 channel.basicQos(1); channel.queueDeclare("work", true, false, false, null); channel.basicConsume("work", false, new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("消費者2:" + new String(body));//參數1:確認具體為隊列中的那個消息,參數2:是否同時確認多個消息channel.basicAck(envelope.getDeliveryTag(),false);try {TimeUnit.SECONDS.sleep(2);} catch (InterruptedException e) {e.printStackTrace();}} });2.消費者:Recv2.java
Connection connection = RabbitMQUtils.getConnection(); Channel channel = connection.createChannel(); //每次只能消費一個消息 channel.basicQos(1); channel.queueDeclare("work",true,false,false,null); channel.basicConsume("work",false,new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("消費者2:"+new String(body));//參數1:確認具體為隊列中的那個消息,參數2:是否同時確認多個消息channel.basicAck(envelope.getDeliveryTag(),false);} });第三種模型(fanout)
fanout:扇出,又稱為廣播
在廣播模式下,消息發送流程
- 多個消費者
- 每個消費者都綁定這自己的隊列(queue)
- 每個隊列都要綁定到交換機(exchange)上
- 生產者只將消息發送給交換機
- 交換機(exchange)決定將消息發送給哪個隊列
1.開發生產者:send.java
Connection connection = RabbitMQUtils.getConnection(); Channel channel = connection.createChannel(); /*** 通道聲明為交換機* 參數1:交換機名稱、參數2:交換機類型 fanout:代表廣播類型*/ channel.exchangeDeclare("logs","fanout"); channel.basicPublish("logs","",null,"RabbitMq fanout".getBytes());2.消費者:Recv.java
channel.exchangeDeclare("logs","fanout"); //創建臨時隊列 String queueName = channel.queueDeclare().getQueue(); channel.queueBind(queueName,"logs",""); channel.basicConsume(queueName,true,new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("消費者1:"+new String(body));} });第四種模型(Routing)
1.Routing之訂閱模型-Direct(直連)
在fanout模式中,一條消息,會被所有訂閱的消費者消費,但在某些情景下,我們希望不同的消息被不同的消費者消費,這時就用到Direct類型的Exchange。
在Direct模型下:
- 隊列與交換機綁定,不能任意綁定,而是指定一個RoutingKey(路由key)
- 生產者向Exchange(交換機)發送消息時需要指定RoutingKey?(路由key)
- Exchange不在把消息交給綁定的隊列,而是根據消息的Routing Key進行判斷,只有當消息中的Routing Key隊列中的Routing Key完全一致才會收到消息
流程
圖解
- P:生產者,向exchange發送消息時會指定一個Routing Key
- X:Exchange(交換機),接受生產者消息,然后把消息發送給與Routing Key相同的隊列
- C1:消費者,指定需要接受Routing Key為error的消息
- C2:消費者,指定需要接受Routing key為error、info、warning的消息
1.生產者
Connection connection = RabbitMQUtils.getConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare("direct.logs", "direct"); String queueName = channel.queueDeclare().getQueue(); String routingKey = "error"; channel.queueBind(queueName, "direct.logs", routingKey); channel.basicPublish("direct.logs", routingKey, null, (routingKey + " Rabbit Direct").getBytes("utf-8")); RabbitMQUtils.closeChannelAndConn(channel, connection);2.消費者-1
channel.exchangeDeclare("direct.logs", "direct"); String queueName = channel.queueDeclare().getQueue();channel.queueBind(queueName, "direct.logs", "info"); channel.queueBind(queueName, "direct.logs", "warning"); channel.queueBind(queueName, "direct.logs", "error"); channel.basicConsume(queueName, true, new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("Recv1日志:" + new String(body));} });3.消費者-2
Connection connection = RabbitMQUtils.getConnection(); Channel channel = connection.createChannel(); /*** 參數一:交換機名稱、參數二:交換機的類型:(direct)直連式*/ channel.exchangeDeclare("direct.logs", "direct"); /*** 聲明一個臨時隊列*/ String queueName = channel.queueDeclare().getQueue(); /*** 參數一:隊列名稱、參數二:交換機名稱、參數三:RoutingKey(路由key)*/ channel.queueBind(queueName, "direct.logs", "info"); channel.queueBind(queueName, "direct.logs", "warning"); channel.queueBind(queueName, "direct.logs", "error"); channel.basicConsume(queueName, true, new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("Recv1日志:" + new String(body));} });2.Routing之訂閱模型-Topic(直連)
topic類型的exchange與direct相比,都可以根據RoutingKey把消息路由到不同的隊列中,只不過topic在綁定隊列時RoutingKey可以使用通配符,這種模型RoutingKey一般有一個或多個單詞組成多個單詞間用“.”分割類如:item.insert
# 通配符*(start):匹配一個單詞#(start):匹配多個單詞 #如:hello.*: hello.java、hello.worldhello.#: hello.java.world1.生產者:send.java
Connection connection = RabbitMQUtils.getConnection(); Channel channel = connection.createChannel(); // 參數一:交換機名稱 channel.exchangeDeclare("topics","topic"); String routingKey="user.save.findAll"; channel.basicPublish("topics",routingKey,null,("RabbitMQ Topic RoutingKey:"+routingKey).getBytes()); RabbitMQUtils.closeChannelAndConn(channel,connection);2.消費者:Recv.java
Connection connection = RabbitMQUtils.getConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare("topics","topic"); String queueName = channel.queueDeclare().getQueue(); String routingKey="user.#"; channel.queueBind(queueName,"topics",routingKey); channel.basicConsume(queueName,true,new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("RabbitMQ tutorials Topic:"+new String(body));} });3.SpringBoot整合RabbitMQ
3.1 初始環境單間
1.引入依賴
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId> </dependency>2.在application.yml中添加相關配置
server:address: 8080 spring:rabbitmq:addresses: 47.101.36.177username: emspassword: 123virtual-host: /cmsport: 5672application:name: rabbitmq-studyRabbitMQ簡化操作對象,使用時直接在項目中注入即可
3.2 第一種Hello World模型使用
1.生產者:Send.java
//注入RabbitMQ模板對象 @Autowired private RabbitTemplate rabbitTemplate;@Test public void testHelloWord() {rabbitTemplate.convertAndSend("hello","hello world"); }2.消費者:HelloConsumer.java
@Component // 持久化,非獨占、非自動刪除的隊列 @RabbitListener(queuesToDeclare = @Queue(value = "hello",durable = "false",autoDelete = "false")) public class HelloConsumer {@RabbitHandlerpublic void recive(String message){System.out.println("消息隊列中的消息:"+message);} }3.3 第二種work queue模型使用
1.生產者:Send.java
@Test public void testWork(){for (int i = 0; i < 10; i++) {rabbitTemplate.convertAndSend("work","Work模型");}}2.消費者:WorkConsumer.java
@Component public class WorkConsumer {@RabbitListener(queuesToDeclare=@Queue("work"))public void receive1(String message){System.out.println("work model1:"+message);}@RabbitListener(queuesToDeclare=@Queue("work"))public void receive2(String message){System.out.println("work model2:"+message);} }3.4第三種廣播模型(fanout)使用
1.生產者:send.java
/*** 廣播模型fanout*/ @Test public void fanoutModel(){rabbitTemplate.convertAndSend("logs","","fanout廣播模型"); }2.消費者:FanoutConsumer
@Component public class FanCoutModel {@RabbitListener(bindings = {@QueueBinding(value = @Queue,//不指定時,創建臨時隊列exchange =@Exchange(name = "logs",type = "fanout"))})public void receive1(String message){System.out.println("message1:"+message);}@RabbitListener(bindings = {@QueueBinding(value = @Queue,//不指定時,創建臨時隊列exchange =@Exchange(name = "logs",type = "fanout"))})public void receive2(String message){System.out.println("message1:"+message);}}3.5第三種路由(RoutingKey)模型使用
1.生產者:send.java
/*** route模式*/ @Test public void testRoute(){rabbitTemplate.convertAndSend("directs","error","通過路由發送error信息"); }2.消費者:RouteConsumer.java
@Component public class RouteConsumer {@RabbitListener(bindings = {@QueueBinding(value = @Queue,exchange = @Exchange(name = "directs",type = "direct"),key = {"info","error","warning"})})public void receive1(String message){System.out.println("receive1:"+message);}@RabbitListener(bindings = {@QueueBinding(value = @Queue,exchange = @Exchange(name = "directs",type = "direct"),key ={"error"})})public void receive2(String message){System.out.println("receive2:"+message);}}3.6第三種動態路由模型(Topic)使用
1.生產者:send.java
/*** topic模式(動態路由)*/ @Test public void testTopic(){rabbitTemplate.convertAndSend("topics","user.save","topic動態路由模式"); }2.消費者:TopicConsumer.java
@Component public class TopicConsumer {@RabbitListener(bindings = {@QueueBinding(value = @Queue,exchange = @Exchange(value = "topics",type = "topic"),key = {"user.*"})})public void receive1(String message){System.out.println("receive1:"+message);}@RabbitListener(bindings = {@QueueBinding(value = @Queue,exchange = @Exchange(name = "topics",type = "topic"),key = {"user.#"})})public void receive2(String message){System.out.println("receive2:"+message);} }4.RabbitMQ集群
4.1集群架構
4.1.1普通集群(副本集群)
總結
以上是生活随笔為你收集整理的RabbitMQ使用手册的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 用idea插件将一个spring boo
- 下一篇: SpringBoot-data-Mong