javascript
Spring Boot 中使用 RabbitMQ
2019獨角獸企業(yè)重金招聘Python工程師標準>>>
RabbitMQ是一個開源的AMQP實現(xiàn),服務器端用Erlang語言編寫,支持多種客戶端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX。用于在分布式系統(tǒng)中存儲轉發(fā)消息,在易用性、擴展性、高可用性等方面表現(xiàn)不俗。
AMQP,即Advanced message Queuing Protocol,高級消息隊列協(xié)議,是應用層協(xié)議的一個開放標準,為面向消息的中間件設計。消息中間件主要用于組件之間的解耦,消息的發(fā)送者無需知道消息使用者的存在,反之亦然。
AMQP的主要特征是面向消息、隊列、路由(包括點對點和發(fā)布/訂閱)、可靠性、安全。
RabbitMQ是一個開源的AMQP實現(xiàn),服務器端用Erlang語言編寫,支持多種客戶端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX。用于在分布式系統(tǒng)中存儲轉發(fā)消息,在易用性、擴展性、高可用性等方面表現(xiàn)不俗。
常用概念
通常我們談到隊列服務, 會有三個概念: 發(fā)消息者、隊列、收消息者,RabbitMQ 在這個基本概念之上, 多做了一層抽象, 在發(fā)消息者和 隊列之間, 加入了交換器 (Exchange). 這樣發(fā)消息者和隊列就沒有直接聯(lián)系, 轉而變成發(fā)消息者把消息給交換器, 交換器根據調度策略再把消息再給隊列。
準備
環(huán)境安裝
任選其一
CentOs7.3 搭建 RabbitMQ 3.6 單機服務與使用
http://www.ymq.io/2017/08/16/rabbit-install
CentOs7.3 搭建 RabbitMQ 3.6 Cluster 集群服務與使用
http://www.ymq.io/2017/08/17/rabbit-install-cluster
Github 代碼
代碼我已放到 Github ,導入spring-boot-rabbitmq 項目
github https://github.com/souyunku/spring-boot-examples/tree/master/spring-boot-rabbitmq
添加依賴
在項目中添加 spring-boot-starter-amqp 依賴
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId> </dependency>參數配置
spring.application.name=ymq-rabbitmq-spring-bootspring.rabbitmq.host=10.4.98.15 spring.rabbitmq.port=5672 spring.rabbitmq.username=admin spring.rabbitmq.password=admin交換機(Exchange)
1.Direct Exchange 根據route key 直接找到隊列
2.Topic Exchange 根據route key 匹配隊列
3.Topic Exchange 不處理route key 全網發(fā)送,所有綁定的隊列都發(fā)送
Direct Exchange
Direct Exchange 是RabbitMQ默認的交換機模式,也是最簡單的模式,根據key全文匹配去尋找隊列。
任何發(fā)送到Direct Exchange的消息都會被轉發(fā)到RouteKey中指定的Queue。
1.一般情況可以使用rabbitMQ自帶的Exchange:""(該Exchange的名字為空字符串,下文稱其為default Exchange)。
2.這種模式下不需要將Exchange進行任何綁定(binding)操作
3.消息傳遞時需要一個RouteKey,可以簡單的理解為要發(fā)送到的隊列名字。
4.如果vhost中不存在RouteKey中指定的隊列名,則該消息會被拋棄。
配置隊列
@Configuration public class RabbitDirectConfig {@Beanpublic Queue helloQueue() {return new Queue("hello");}@Beanpublic Queue directQueue() {return new Queue("direct");}//-------------------配置默認的交換機模式,可以不需要配置以下-----------------------------------@BeanDirectExchange directExchange() {return new DirectExchange("directExchange");}//綁定一個key "direct",當消息匹配到就會放到這個隊列中@BeanBinding bindingExchangeDirectQueue(Queue directQueue, DirectExchange directExchange) {return BindingBuilder.bind(directQueue).to(directExchange).with("direct");}// 推薦使用 helloQueue() 方法寫法,這種方式在 Direct Exchange 模式 多此一舉,沒必要這樣寫//--------------------------------------------------------------------------------------------- }監(jiān)聽隊列
@Component @RabbitListener(queues = "hello") public class helloReceiver {@RabbitHandlerpublic void process(String message) {System.out.println("接收者 helloReceiver," + message);} } @Component @RabbitListener(queues = "direct") public class DirectReceiver {@RabbitHandlerpublic void process(String message) {System.out.println("接收者 DirectReceiver," + message);} }發(fā)送消息
package io.ymq.rabbitmq.test;import io.ymq.rabbitmq.run.Startup; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner;/*** 描述: 默認的交換機模式** @author: yanpenglei* @create: 2017/10/25 1:03*/ @RunWith(SpringRunner.class) @SpringBootTest(classes = Startup.class) public class RabbitDirectTest {@Autowiredprivate AmqpTemplate rabbitTemplate;@Testpublic void sendHelloTest() {String context = "此消息在,默認的交換機模式隊列下,有 helloReceiver 可以收到";String routeKey = "hello";context = "routeKey:" + routeKey + ",context:" + context;System.out.println("sendHelloTest : " + context);this.rabbitTemplate.convertAndSend(routeKey, context);}@Testpublic void sendDirectTest() {String context = "此消息在,默認的交換機模式隊列下,有 DirectReceiver 可以收到";String routeKey = "direct";String exchange = "directExchange";context = "context:" + exchange + ",routeKey:" + routeKey + ",context:" + context;System.out.println("sendDirectTest : " + context);// 推薦使用 sendHello() 方法寫法,這種方式在 Direct Exchange 多此一舉,沒必要這樣寫this.rabbitTemplate.convertAndSend(exchange, routeKey, context);} }按順序執(zhí)行:響應
接收者 helloReceiver,routeKey:hello,context:此消息在,默認的交換機模式隊列下,有 helloReceiver 可以收到接收者 DirectReceiver,context:directExchange,routeKey:direct,context:此消息在,默認的交換機模式隊列下,有 DirectReceiver 可以收到Fanout Exchange
任何發(fā)送到Fanout Exchange 的消息都會被轉發(fā)到與該Exchange綁定(Binding)的所有Queue上。
1.可以理解為路由表的模式
2.這種模式不需要 RouteKey
3.這種模式需要提前將Exchange與Queue進行綁定,一個Exchange可以綁定多個Queue,一個Queue可以同多個Exchange進行綁定。
4.如果接受到消息的Exchange沒有與任何Queue綁定,則消息會被拋棄。
配置隊列
@Configuration public class RabbitFanoutConfig {final static String PENGLEI = "fanout.penglei.net";final static String SOUYUNKU = "fanout.souyunku.com";@Beanpublic Queue queuePenglei() {return new Queue(RabbitFanoutConfig.PENGLEI);}@Beanpublic Queue queueSouyunku() {return new Queue(RabbitFanoutConfig.SOUYUNKU);}/*** 任何發(fā)送到Fanout Exchange的消息都會被轉發(fā)到與該Exchange綁定(Binding)的所有隊列上。*/@BeanFanoutExchange fanoutExchange() {return new FanoutExchange("fanoutExchange");}@BeanBinding bindingExchangeQueuePenglei(Queue queuePenglei, FanoutExchange fanoutExchange) {return BindingBuilder.bind(queuePenglei).to(fanoutExchange);}@BeanBinding bindingExchangeQueueSouyunku(Queue queueSouyunku, FanoutExchange fanoutExchange) {return BindingBuilder.bind(queueSouyunku).to(fanoutExchange);}}監(jiān)聽隊列
@Component @RabbitListener(queues = "fanout.penglei.net") public class FanoutReceiver1 {@RabbitHandlerpublic void process(String message) {System.out.println("接收者 FanoutReceiver1," + message);} } @Component @RabbitListener(queues = "fanout.souyunku.com") public class FanoutReceiver2 {@RabbitHandlerpublic void process(String message) {System.out.println("接收者 FanoutReceiver2," + message);} }發(fā)送消息
package io.ymq.rabbitmq.test;import io.ymq.rabbitmq.run.Startup; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner;/*** 描述: 廣播模式或者訂閱模式隊列** @author: yanpenglei* @create: 2017/10/25 1:08*/ @RunWith(SpringRunner.class) @SpringBootTest(classes = Startup.class) public class RabbitFanoutTest {@Autowiredprivate AmqpTemplate rabbitTemplate;@Testpublic void sendPengleiTest() {String context = "此消息在,廣播模式或者訂閱模式隊列下,有 FanoutReceiver1 FanoutReceiver2 可以收到";String routeKey = "topic.penglei.net";String exchange = "fanoutExchange";System.out.println("sendPengleiTest : " + context);context = "context:" + exchange + ",routeKey:" + routeKey + ",context:" + context;this.rabbitTemplate.convertAndSend(exchange, routeKey, context);}@Testpublic void sendSouyunkuTest() {String context = "此消息在,廣播模式或者訂閱模式隊列下,有 FanoutReceiver1 FanoutReceiver2 可以收到";String routeKey = "topic.souyunku.com";String exchange = "fanoutExchange";context = "context:" + exchange + ",routeKey:" + routeKey + ",context:" + context;System.out.println("sendSouyunkuTest : " + context);this.rabbitTemplate.convertAndSend(exchange, routeKey, context);} }按順序執(zhí)行:響應
接收者 FanoutReceiver1,context:fanoutExchange,routeKey:topic.penglei.net,context:此消息在,廣播模式或者訂閱模式隊列下,有 FanoutReceiver1 FanoutReceiver2 可以收到 接收者 FanoutReceiver2,context:fanoutExchange,routeKey:topic.penglei.net,context:此消息在,廣播模式或者訂閱模式隊列下,有 FanoutReceiver1 FanoutReceiver2 可以收到接收者 FanoutReceiver2,context:fanoutExchange,routeKey:topic.souyunku.com,context:此消息在,廣播模式或者訂閱模式隊列下,有 FanoutReceiver1 FanoutReceiver2 可以收到 接收者 FanoutReceiver1,context:fanoutExchange,routeKey:topic.souyunku.com,context:此消息在,廣播模式或者訂閱模式隊列下,有 FanoutReceiver1 FanoutReceiver2 可以收到Topic Exchange
任何發(fā)送到Topic Exchange的消息都會被轉發(fā)到所有關心RouteKey中指定話題的Queue上
1.這種模式較為復雜,簡單來說,就是每個隊列都有其關心的主題,所有的消息都帶有一個標題``(RouteKey),Exchange會將消息轉發(fā)到所有關注主題能與RouteKey模糊匹配的隊列。
2.這種模式需要RouteKey,也許要提前綁定Exchange與Queue。
3.在進行綁定時,要提供一個該隊列關心的主題,如#.log.#表示該隊列關心所有涉及l(fā)og的消息(一個RouteKey為MQ.log.error的消息會被轉發(fā)到該隊列)。
4.#表示0個或若干個關鍵字,*表示一個關鍵字。如topic.*能與topic.warn匹配,無法與topic.warn.timeout匹配;但是topic.#能與上述兩者匹配。
5.同樣,如果Exchange沒有發(fā)現(xiàn)能夠與RouteKey匹配的Queue,則會拋棄此消息。
配置隊列
@Configuration public class RabbitTopicConfig {final static String MESSAGE = "topic.message";final static String MESSAGES = "topic.message.s";final static String YMQ = "topic.ymq";@Beanpublic Queue queueMessage() {return new Queue(RabbitTopicConfig.MESSAGE);}@Beanpublic Queue queueMessages() {return new Queue(RabbitTopicConfig.MESSAGES);}@Beanpublic Queue queueYmq() {return new Queue(RabbitTopicConfig.YMQ);}/*** 交換機(Exchange) 描述:接收消息并且轉發(fā)到綁定的隊列,交換機不存儲消息*/@BeanTopicExchange topicExchange() {return new TopicExchange("topicExchange");}//綁定隊列 queueMessages() 到 topicExchange 交換機,路由鍵只接受完全匹配 topic.message 的隊列接受者可以收到消息@BeanBinding bindingExchangeMessage(Queue queueMessage, TopicExchange topicExchange) {return BindingBuilder.bind(queueMessage).to(topicExchange).with("topic.message");}//綁定隊列 queueMessages() 到 topicExchange 交換機,路由鍵只要是以 topic.message 開頭的隊列接受者可以收到消息@BeanBinding bindingExchangeMessages(Queue queueMessages, TopicExchange topicExchange) {return BindingBuilder.bind(queueMessages).to(topicExchange).with("topic.message.#");}//綁定隊列 queueYmq() 到 topicExchange 交換機,路由鍵只要是以 topic 開頭的隊列接受者可以收到消息@BeanBinding bindingExchangeYmq(Queue queueYmq, TopicExchange topicExchange) {return BindingBuilder.bind(queueYmq).to(topicExchange).with("topic.#");}}監(jiān)聽隊列
@Component @RabbitListener(queues = "topic.message") public class TopicReceiver1 {@RabbitHandlerpublic void process(String message) {System.out.println("接收者 TopicReceiver1," + message);}} @Component @RabbitListener(queues = "topic.message.s") public class TopicReceiver2 {@RabbitHandlerpublic void process(String message) {System.out.println("接收者 TopicReceiver2," + message);}} @Component @RabbitListener(queues = "topic.ymq") public class TopicReceiver3 {@RabbitHandlerpublic void process(String message) {System.out.println("接收者 TopicReceiver3," + message);}}發(fā)送消息
package io.ymq.rabbitmq.test;import io.ymq.rabbitmq.run.Startup; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner;/*** 描述: 配置轉發(fā)消息模式隊列** @author: yanpenglei* @create: 2017/10/25 1:20*/ @RunWith(SpringRunner.class) @SpringBootTest(classes = Startup.class) public class RabbitTopicTest {@Autowiredprivate AmqpTemplate rabbitTemplate;@Testpublic void sendMessageTest() {String context = "此消息在,配置轉發(fā)消息模式隊列下, 有 TopicReceiver1 TopicReceiver2 TopicReceiver3 可以收到";String routeKey = "topic.message";String exchange = "topicExchange";context = "context:" + exchange + ",routeKey:" + routeKey + ",context:" + context;System.out.println("sendMessageTest : " + context);this.rabbitTemplate.convertAndSend(exchange, routeKey, context);}@Testpublic void sendMessagesTest() {String context = "此消息在,配置轉發(fā)消息模式隊列下,有 TopicReceiver2 TopicReceiver3 可以收到";String routeKey = "topic.message.s";String exchange = "topicExchange";context = "context:" + exchange + ",routeKey:" + routeKey + ",context:" + context;System.out.println("sendMessagesTest : " + context);this.rabbitTemplate.convertAndSend(exchange, routeKey, context);}@Testpublic void sendYmqTest() {String context = "此消息在,配置轉發(fā)消息模式隊列下,有 TopicReceiver3 可以收到";String routeKey = "topic.ymq";String exchange = "topicExchange";context = "context:" + exchange + ",routeKey:" + routeKey + ",context:" + context;System.out.println("sendYmqTest : " + context);this.rabbitTemplate.convertAndSend(exchange, routeKey, context);} }按順序執(zhí)行:響應
接收者 TopicReceiver2,context:topicExchange,routeKey:topic.message,context:此消息在,配置轉發(fā)消息模式隊列下, 有 TopicReceiver1 TopicReceiver2 TopicReceiver3 可以收到 接收者 TopicReceiver1,context:topicExchange,routeKey:topic.message,context:此消息在,配置轉發(fā)消息模式隊列下, 有 TopicReceiver1 TopicReceiver2 TopicReceiver3 可以收到 接收者 TopicReceiver3,context:topicExchange,routeKey:topic.message,context:此消息在,配置轉發(fā)消息模式隊列下, 有 TopicReceiver1 TopicReceiver2 TopicReceiver3 可以收到接收者 TopicReceiver3,context:topicExchange,routeKey:topic.message.s,context:此消息在,配置轉發(fā)消息模式隊列下,有 TopicReceiver2 TopicReceiver3 可以收到 接收者 TopicReceiver2,context:topicExchange,routeKey:topic.message.s,context:此消息在,配置轉發(fā)消息模式隊列下,有 TopicReceiver2 TopicReceiver3 可以收到接收者 TopicReceiver3,context:topicExchange,routeKey:topic.ymq,context:此消息在,配置轉發(fā)消息模式隊列下,有 TopicReceiver3 可以收到代碼我已放到 Github ,導入spring-boot-rabbitmq 項目
github https://github.com/souyunku/spring-boot-examples/tree/master/spring-boot-rabbitmq
Contact
- 作者:鵬磊
- 出處:http://www.ymq.io/2017/10/26/rabbitmq-spring-boot-example
- Email:admin@souyunku.com
- 版權歸作者所有,轉載請注明出處
- Wechat:關注公眾號,搜云庫,專注于開發(fā)技術的研究與知識分享
轉載于:https://my.oschina.net/yanpenglei/blog/1608327
總結
以上是生活随笔為你收集整理的Spring Boot 中使用 RabbitMQ的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: rest-assured之Schema
- 下一篇: 有外键约束的子表插入数据时出现的错误