javascript
SpringBoot连接多RabbitMQ源
轉自:
SpringBoot連接多RabbitMQ源 - 掘金在實際開發中,很多場景需要異步處理,這時就需要用到RabbitMQ,而且隨著場景的增多程序可能需要連接多個RabbitMQ。SpringBoot本身提供了默認的配置可以快速配置連接RabbitMQ,但是只能連接一個RabbitMQ,當需要連接多個RabbitMQ時,默認的配置就…https://juejin.cn/post/6844904039797243917
在實際開發中,很多場景需要異步處理,這時就需要用到RabbitMQ,而且隨著場景的增多程序可能需要連接多個RabbitMQ。SpringBoot本身提供了默認的配置可以快速配置連接RabbitMQ,但是只能連接一個RabbitMQ,當需要連接多個RabbitMQ時,默認的配置就不太適用了,需要單獨編寫每個連接。
在SpringBoot框架中,我們常用的兩個類一般是:
- RabbitTemplate:作為生產、消費消息使用;
- RabbitAdmin:作為申明、刪除交換機和隊列,綁定和解綁隊列和交換機的綁定關系使用。
所以我們連接多個RabbitMQ就需要重新建立連接、重新實現這兩個類。 代碼如下:
配置
application.properties配置文件需要配置兩個連接:
?
重寫連接工廠
需要注意的是,在多源的情況下,需要在某個連接加上@Primary注解,表示主連接,默認使用這個連接;
package com.example.config.rabbitmq;import com.alibaba.fastjson.JSON; import org.springframework.amqp.core.AcknowledgeMode; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory; import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitAdmin; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Primary;/*** Created by shuai on 2019/4/23.*/ @Configuration public class MultipleRabbitMQConfig {@Bean(name = "v2ConnectionFactory")public CachingConnectionFactory hospSyncConnectionFactory(@Value("${v2.spring.rabbitmq.host}") String host,@Value("${v2.spring.rabbitmq.port}") int port,@Value("${v2.spring.rabbitmq.username}") String username,@Value("${v2.spring.rabbitmq.password}") String password,@Value("${v2.spring.rabbitmq.virtual-host}") String virtualHost,@Value("${v2.spring.rabbitmq.publisher-confirms}") Boolean publisherConfirms,@Value("${v2.spring.rabbitmq.publisher-returns}") Boolean publisherReturns) {CachingConnectionFactory connectionFactory = new CachingConnectionFactory();connectionFactory.setHost(host);connectionFactory.setPort(port);connectionFactory.setUsername(username);connectionFactory.setPassword(password);connectionFactory.setVirtualHost(virtualHost);connectionFactory.setPublisherConfirms(publisherConfirms);connectionFactory.setPublisherReturns(publisherReturns);return connectionFactory;}@Bean(name = "v2RabbitTemplate")public RabbitTemplate firstRabbitTemplate(@Qualifier("v2ConnectionFactory") ConnectionFactory connectionFactory,@Value("${v2.spring.rabbitmq.template.mandatory}") Boolean mandatory) {RabbitTemplate v2RabbitTemplate = new RabbitTemplate(connectionFactory);v2RabbitTemplate.setMandatory(mandatory);v2RabbitTemplate.setConfirmCallback((correlationData, ack, s) -> {if (!ack) { // LOGGER.info("{} 發送RabbitMQ消息 ack確認 失敗: [{}]", this.name, JSON.toJSONString(object));} else { // LOGGER.info("{} 發送RabbitMQ消息 ack確認 成功: [{}]", this.name, JSON.toJSONString(object));}});v2RabbitTemplate.setReturnCallback((message, code, s, exchange, routingKey) -> { // LOGGER.error("{} 發送RabbitMQ消息returnedMessage,出現異常,Exchange不存在或發送至Exchange卻沒有發送到Queue中,message:[{}], code[{}], s[{}], exchange[{}], routingKey[{}]", new Object[]{this.name, JSON.toJSONString(message), JSON.toJSONString(code), JSON.toJSONString(s), JSON.toJSONString(exchange), JSON.toJSONString(routingKey)});});return v2RabbitTemplate;}@Bean(name = "v2ContainerFactory")public SimpleRabbitListenerContainerFactory hospSyncFactory(@Qualifier("v2ConnectionFactory") ConnectionFactory connectionFactory,@Value("${v2.spring.rabbitmq.listener.simple.acknowledge-mode}") String acknowledge,@Value("${v2.spring.rabbitmq.listener.simple.prefetch}") Integer prefetch) {SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();factory.setConnectionFactory(connectionFactory);factory.setAcknowledgeMode(AcknowledgeMode.valueOf(acknowledge.toUpperCase()));factory.setPrefetchCount(prefetch);return factory;}@Bean(name = "v2RabbitAdmin")public RabbitAdmin iqianzhanRabbitAdmin(@Qualifier("v2ConnectionFactory") ConnectionFactory connectionFactory) {RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);rabbitAdmin.setAutoStartup(true);return rabbitAdmin;}// mq主連接@Bean(name = "v1ConnectionFactory")@Primarypublic CachingConnectionFactory publicConnectionFactory(@Value("${v1.spring.rabbitmq.host}") String host,@Value("${v1.spring.rabbitmq.port}") int port,@Value("${v1.spring.rabbitmq.username}") String username,@Value("${v1.spring.rabbitmq.password}") String password,@Value("${v1.spring.rabbitmq.virtual-host}") String virtualHost,@Value("${v1.spring.rabbitmq.publisher-confirms}") Boolean publisherConfirms,@Value("${v1.spring.rabbitmq.publisher-returns}") Boolean publisherReturns) {CachingConnectionFactory connectionFactory = new CachingConnectionFactory();connectionFactory.setHost(host);connectionFactory.setPort(port);connectionFactory.setUsername(username);connectionFactory.setPassword(password);connectionFactory.setVirtualHost(virtualHost);connectionFactory.setPublisherConfirms(publisherConfirms);connectionFactory.setPublisherReturns(publisherReturns);return connectionFactory;}@Bean(name = "v1RabbitTemplate")@Primarypublic RabbitTemplate publicRabbitTemplate(@Qualifier("v1ConnectionFactory") ConnectionFactory connectionFactory,@Value("${v1.spring.rabbitmq.template.mandatory}") Boolean mandatory) {RabbitTemplate v1RabbitTemplate = new RabbitTemplate(connectionFactory);v1RabbitTemplate.setMandatory(mandatory);v1RabbitTemplate.setConfirmCallback((correlationData, ack, s) -> {if (!ack) { // LOGGER.info("{} 發送RabbitMQ消息 ack確認 失敗: [{}]", this.name, JSON.toJSONString(object));} else { // LOGGER.info("{} 發送RabbitMQ消息 ack確認 成功: [{}]", this.name, JSON.toJSONString(object));}});v1RabbitTemplate.setReturnCallback((message, code, s, exchange, routingKey) -> { // LOGGER.error("{} 發送RabbitMQ消息returnedMessage,出現異常,Exchange不存在或發送至Exchange卻沒有發送到Queue中,message:[{}], code[{}], s[{}], exchange[{}], routingKey[{}]", new Object[]{this.name, JSON.toJSONString(message), JSON.toJSONString(code), JSON.toJSONString(s), JSON.toJSONString(exchange), JSON.toJSONString(routingKey)});});return v1RabbitTemplate;}@Bean(name = "v1ContainerFactory")@Primarypublic SimpleRabbitListenerContainerFactory insMessageListenerContainer(@Qualifier("v1ConnectionFactory") ConnectionFactory connectionFactory,@Value("${v1.spring.rabbitmq.listener.simple.acknowledge-mode}") String acknowledge,@Value("${v1.spring.rabbitmq.listener.simple.prefetch}") Integer prefetch) {SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();factory.setConnectionFactory(connectionFactory);factory.setAcknowledgeMode(AcknowledgeMode.valueOf(acknowledge.toUpperCase()));factory.setPrefetchCount(prefetch);return factory;}@Bean(name = "v1RabbitAdmin")@Primarypublic RabbitAdmin publicRabbitAdmin(@Qualifier("v1ConnectionFactory") ConnectionFactory connectionFactory) {RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);rabbitAdmin.setAutoStartup(true);return rabbitAdmin;} }?
創建Exchange、Queue并綁定
再實現RabbitAdmin后,我們就需要根據RabbitAdmin創建對應的交換機和隊列,并建立綁定關系
package com.example.config.rabbitmq;import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.Queue; import org.springframework.amqp.core.TopicExchange; import org.springframework.amqp.rabbit.core.RabbitAdmin; import org.springframework.context.annotation.Configuration;import javax.annotation.PostConstruct; import javax.annotation.Resource;/*** 創建Queue、Exchange并建立綁定關系* Created by shuai on 2019/5/16.*/ @Configuration public class MyRabbitMQCreateConfig {@Resource(name = "v2RabbitAdmin")private RabbitAdmin v2RabbitAdmin;@Resource(name = "v1RabbitAdmin")private RabbitAdmin v1RabbitAdmin;@PostConstructpublic void RabbitInit() {v2RabbitAdmin.declareExchange(new TopicExchange("exchange.topic.example.new", true, false));v2RabbitAdmin.declareQueue(new Queue("queue.example.topic.new", true));v2RabbitAdmin.declareBinding(BindingBuilder.bind(new Queue("queue.example.topic.new", true)) //直接創建隊列.to(new TopicExchange("exchange.topic.example.new", true, false)) //直接創建交換機 建立關聯關系.with("routing.key.example.new")); //指定路由Key} }生產者
為了后續驗證每個連接都建立成功,并且都能生產消息,生產者這里分別使用新生成的RabbitTemplate發送一條消息。
package com.example.topic;import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component;import javax.annotation.Resource;@Component public class TopicProducer {@Resource(name = "v1RabbitTemplate")private RabbitTemplate v1RabbitTemplate;@Resource(name = "v2RabbitTemplate")private RabbitTemplate v2RabbitTemplate;public void sendMessageByTopic() {String content1 = "This is a topic type of the RabbitMQ message example from v1RabbitTemplate";v1RabbitTemplate.convertAndSend("exchange.topic.example.new","routing.key.example.new",content1);String content2 = "This is a topic type of the RabbitMQ message example from v2RabbitTemplate";v2RabbitTemplate.convertAndSend("exchange.topic.example.new","routing.key.example.new",content2);} }消費者
這里需要注意在配置消費隊列時,需要標識ContainerFactory
package com.example.topic;import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component;@Component @RabbitListener(queues = "queue.example.topic.new", containerFactory = "v2ContainerFactory") public class TopicConsumer {@RabbitHandlerpublic void consumer(String message) {System.out.println(message);} }?這樣就完成了SpringBoot連接多個RabbitMQ源的示例了,再寫一段測試代碼驗證下。
測試驗證
package com.example.test;import com.example.topic.TopicProducer; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner;@RunWith(SpringRunner.class) @SpringBootTest public class RabbitMQMultipleTest {@Autowiredprivate TopicProducer topicProducer;@Testpublic void topicProducerTest() {topicProducer.sendMessageByTopic();} }?
執行測試代碼,驗證結果為:
?
驗證SpringBoot連接多RabbitMQ源成功!
github地址:Spring Boot 教程、技術棧、示例代碼?
?
總結
以上是生活随笔為你收集整理的SpringBoot连接多RabbitMQ源的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 指令寻址方式与数据寻址方式
- 下一篇: 最难防御的ddos攻击方式(最难防御的D