springboot + rabbitmq 整合示例
幾個(gè)概念說明:
Broker:簡(jiǎn)單來說就是消息隊(duì)列服務(wù)器實(shí)體。
Exchange:消息交換機(jī),它指定消息按什么規(guī)則,路由到哪個(gè)隊(duì)列。
Queue:消息隊(duì)列載體,每個(gè)消息都會(huì)被投入到一個(gè)或多個(gè)隊(duì)列。
Binding:綁定,它的作用就是把exchange和queue按照路由規(guī)則綁定起來。
Routing Key:路由關(guān)鍵字,exchange根據(jù)這個(gè)關(guān)鍵字進(jìn)行消息投遞。
vhost:虛擬主機(jī),一個(gè)broker里可以開設(shè)多個(gè)vhost,用作不同用戶的權(quán)限分離。
producer:消息生產(chǎn)者,就是投遞消息的程序。
consumer:消息消費(fèi)者,就是接受消息的程序。
channel:消息通道,在客戶端的每個(gè)連接里,可建立多個(gè)channel,每個(gè)channel代表一個(gè)會(huì)話任務(wù)。
交換機(jī)路由的幾種類型:
Direct Exchange:直接匹配,通過Exchange名稱+RountingKey來發(fā)送與接收消息.
Fanout Exchange:廣播訂閱,向所有的消費(fèi)者發(fā)布消息,但是只有消費(fèi)者將隊(duì)列綁定到該路由器才能收到消息,忽略Routing Key.
Topic Exchange:主題匹配訂閱,這里的主題指的是RoutingKey,RoutingKey可以采用通配符,如:*或#,RoutingKey命名采用.來分隔多個(gè)詞,只有消息這將隊(duì)列綁定到該路由器且指定RoutingKey符合匹配規(guī)則時(shí)才能收到消息;
Headers Exchange:消息頭訂閱,消息發(fā)布前,為消息定義一個(gè)或多個(gè)鍵值對(duì)的消息頭,然后消費(fèi)者接收消息同時(shí)需要定義類似的鍵值對(duì)請(qǐng)求頭:(如:x-mactch=all或者x_match=any),只有請(qǐng)求頭與消息頭匹配,才能接收消息,忽略RoutingKey.
默認(rèn)的exchange:如果用空字符串去聲明一個(gè)exchange,那么系統(tǒng)就會(huì)使用”amq.direct”這個(gè)exchange,我們創(chuàng)建一個(gè)queue時(shí),默認(rèn)的都會(huì)有一個(gè)和新建queue同名的routingKey綁定到這個(gè)默認(rèn)的exchange上去
安裝Erland
http://www.erlang.org/downloads
安裝RabbitMQ
https://www.rabbitmq.com/download.html
開啟RabbitMQ服務(wù)
執(zhí)行rabbitmq-plugins enable rabbitmq_management命令,開啟Web管理插件
重啟RabbitMQ服務(wù)
Web地址
http://localhost:15672/
默認(rèn)用戶名和密碼:guest
?
一、引入springboot和rabbitmq的依賴
<!-- 添加springboot對(duì)amqp的支持 --> <dependency><groupId>org.quartz-scheduler</groupId><artifactId>quartz</artifactId><version>2.3.0</version> </dependency> View Code二、新增application.properties對(duì)rabbimq的配置信息
spring.application.name=springboot-rabbitmq spring.rabbitmq.host=116.255.193.36 spring.rabbitmq.port=5672 spring.rabbitmq.username=scrm spring.rabbitmq.password=scrm spring.rabbitmq.publisher-confirms=true spring.rabbitmq.publisher-returns=true spring.rabbitmq.virtual-host=scrm spring.rabbitmq.listener.simple.acknowledge-mode=manual #最小消息監(jiān)聽線程數(shù) spring.rabbitmq.listener.concurrency=2 #最大消息監(jiān)聽線程數(shù) spring.rabbitmq.listener.max-concurrency=2 View Code三、公共設(shè)置類
1、隊(duì)列、消息交換機(jī),路由關(guān)鍵字公共枚舉類
package cloud.app.prod.home.rabbitmq;/*** Author : YongBo Xie </br>* File Name: RabbitMqEnum.java </br>* Created Date: 2018年3月28日 上午10:32:02 </br>* Modified Date: 2018年3月28日 上午10:32:02 </br>* Version: 1.0 </br> */public class RabbitMqEnum {/*** describe: 定義隊(duì)列名稱**/public enum QueueName {MARKETING_ACTIVITIE_QUEUE("marketingActivitieQueue", "營銷活動(dòng)隊(duì)列");private String code;private String name;QueueName(String code, String name) {this.code = code;this.name = name;}public String getCode() {return code;}public String getName() {return name;}}/*** describe: 定義交換機(jī)**/public enum Exchange {DIRECT_EXCHANGE("directExchange", "直連交換機(jī)"),FANOUT_EXCHANGE("fanoutExchange", "扇形交換機(jī)"),TOPIC_EXCHANGE("topicExchange", "主題交換機(jī)"),HEADERS_EXCHANGE("headersExchange", "首部交換機(jī)");private String code;private String name;Exchange(String code, String name) {this.code = code;this.name = name;}public String getCode() {return code;}public String getName() {return name;}}/*** describe: 定義routing_key**/public enum QueueKey {MARKETING_ACTIVITIE_DIRECT("marketingActivitie", "營銷活動(dòng)key"),MARKETING_ACTIVITIE_TOPIC_01("*.marketingActivitie.*", "營銷活動(dòng)key"),MARKETING_ACTIVITIE_TOPIC_02("marketingActivitie.#", "營銷活動(dòng)key");private String code;private String name;QueueKey(String code, String name) {this.code = code;this.name = name;}public String getCode() {return code;}public String getName() {return name;}}} View Code2、數(shù)據(jù)連接配置類
package cloud.app.prod.home.rabbitmq;import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration;/*** Author : YongBo Xie </br>* File Name: RabbitConfig.java </br>* Created Date: 2018年3月28日 下午6:41:17 </br>* Modified Date: 2018年3月28日 下午6:41:17 </br>* Version: 1.0 </br>*/ @Configuration @ConfigurationProperties(prefix = "spring.rabbitmq") public class RabbitConfig {@Value("${spring.rabbitmq.host}")private String addresses;@Value("${spring.rabbitmq.port}")private int port;@Value("${spring.rabbitmq.username}")private String username;@Value("${spring.rabbitmq.password}")private String password;@Value("${spring.rabbitmq.publisher-confirms}")private Boolean publisherConfirms;@Value("${spring.rabbitmq.publisher-returns}")private Boolean publisherReturns;@Value("${spring.rabbitmq.virtual-host}")private String virtualHost;// 構(gòu)建mq實(shí)例工廠 @Beanpublic ConnectionFactory connectionFactory() {CachingConnectionFactory connectionFactory = new CachingConnectionFactory();connectionFactory.setAddresses(addresses);connectionFactory.setPort(port);connectionFactory.setUsername(username);connectionFactory.setPassword(password);connectionFactory.setPublisherConfirms(publisherConfirms);connectionFactory.setVirtualHost(virtualHost);connectionFactory.setPublisherReturns(publisherReturns);return connectionFactory;}} View Code3、生產(chǎn)者類
package cloud.app.prod.home.rabbitmq;import org.apache.log4j.Logger; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.core.RabbitTemplate.ConfirmCallback; import org.springframework.amqp.rabbit.core.RabbitTemplate.ReturnCallback; import org.springframework.amqp.rabbit.support.CorrelationData; import org.springframework.context.annotation.Bean; import org.springframework.stereotype.Component;/*** Author : YongBo Xie </br>* File Name: RabbitMqSender.java </br>* Created Date: 2018年3月30日 上午10:48:36 </br>* Modified Date: 2018年3月30日 上午10:48:36 </br>* Version: 1.0 </br> */ @Component public class RabbitMqSender {private static Logger logger = Logger.getLogger(RabbitMqSender.class);@Beanpublic RabbitTemplate messageRabbitTemplate(ConnectionFactory connectionFactory) {final RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);rabbitTemplate.setConfirmCallback(new ConfirmCallback() {/*** 回調(diào)* @param correlationData 消息唯一標(biāo)識(shí)* @param ack 確認(rèn)結(jié)果* @param cause 失敗原因*/@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {logger.info("消息唯一標(biāo)識(shí):"+correlationData);logger.info("確認(rèn)結(jié)果:"+ack);logger.info("失敗原因:"+cause);}});rabbitTemplate.setReturnCallback(new ReturnCallback() {/*** 用于實(shí)現(xiàn)消息發(fā)送到RabbitMQ交換器,但無相應(yīng)隊(duì)列與交換器綁定時(shí)的回調(diào)*/@Overridepublic void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {logger.info(message.getMessageProperties().getCorrelationIdString() + " 發(fā)送失敗");}});return rabbitTemplate;}} View Code四、個(gè)例
1、初始化隊(duì)列、消息交換機(jī),并把隊(duì)列綁定到消息交換機(jī)
package cloud.app.prod.home.rabbitmq.mem;import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.DirectExchange; import org.springframework.amqp.core.FanoutExchange; import org.springframework.amqp.core.HeadersExchange; import org.springframework.amqp.core.Queue; import org.springframework.amqp.core.TopicExchange; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration;import cloud.app.prod.home.rabbitmq.RabbitMqEnum;/*** Author : YongBo Xie </br>* File Name: RabbitConfig.java </br>* Created Date: 2018年3月27日 下午3:13:57 </br>* Modified Date: 2018年3月27日 下午3:13:57 </br>* Version: 1.0 </br> */ @Configuration public class MarketingActivitieRabbitConfig {// private static Logger logger = Logger.getLogger(MarketingActivitieRabbitConfig.class);/*** 構(gòu)建隊(duì)列,名稱,是否持久化之類* @return*/@Beanpublic Queue marketingActivitieQueue() {return new Queue(RabbitMqEnum.QueueName.MARKETING_ACTIVITIE_QUEUE.getCode(), true);}/*** 直連交換機(jī)(模式)* 用于實(shí)例間的任務(wù)分發(fā)* 是一種帶路由功能的交換機(jī),一個(gè)隊(duì)列會(huì)和一個(gè)交換機(jī)綁定,除此之外再綁定一個(gè)routing_key*/@Beanpublic DirectExchange createDirectExchange() {return new DirectExchange(RabbitMqEnum.Exchange.DIRECT_EXCHANGE.getCode());}/*** 扇形交換機(jī)(模式)* 分發(fā)給所有綁定到該exchange上的隊(duì)列,忽略routing key* 速度是所有的交換機(jī)類型里面最快的*/@Beanpublic FanoutExchange createFanoutExchange() {return new FanoutExchange(RabbitMqEnum.Exchange.FANOUT_EXCHANGE.getCode());}/*** 主題交換機(jī)(模式)* 通過可配置的規(guī)則分發(fā)給綁定在該exchange上的隊(duì)列* 發(fā)送到主題交換機(jī)上的消息需要攜帶指定規(guī)則的routing_key* 交換機(jī)和隊(duì)列的binding_key需要采用*.#.*.....的格式,每個(gè)部分用.分開* *表示一個(gè)單詞* #表示任意數(shù)量(零個(gè)或多個(gè))單詞*/@Beanpublic TopicExchange createTopicExchange() {return new TopicExchange(RabbitMqEnum.Exchange.TOPIC_EXCHANGE.getCode());}/*** 首部交換機(jī)(模式)* 適用規(guī)則復(fù)雜的分發(fā),用headers里的參數(shù)表達(dá)規(guī)則,有點(diǎn)像HTTP的Headers* 綁定交換機(jī)和隊(duì)列的時(shí)候,Hash結(jié)構(gòu)中要求攜帶一個(gè)鍵“x-match”,這個(gè)鍵的Value可以是any或者all,* 這代表消息攜帶的Hash是需要全部匹配(all),還是僅匹配一個(gè)鍵(any)就可以了*/@Beanpublic HeadersExchange createHeadersExchange() {return new HeadersExchange(RabbitMqEnum.Exchange.HEADERS_EXCHANGE.getCode());}/*** 隊(duì)列和直連交換機(jī)綁定* @param queue* @param routingKey* @return*/@Beanpublic Binding bindingQueueWithDirectExchange() {return BindingBuilder.bind(marketingActivitieQueue()).to(createDirectExchange()).with(RabbitMqEnum.QueueKey.MARKETING_ACTIVITIE_DIRECT.getCode());}/*** 隊(duì)列和扇形交換機(jī)綁定* @param queue* @return*/@Beanpublic Binding bindingQueueWithFanoutExchange() {return BindingBuilder.bind(marketingActivitieQueue()).to(createFanoutExchange());}/*** 隊(duì)列和主題交換機(jī)綁定* @param queue* @param routingKey* @return*/@Beanpublic Binding bindingQueueWithTopicExchange() {return BindingBuilder.bind(marketingActivitieQueue()).to(createTopicExchange()).with(RabbitMqEnum.QueueKey.MARKETING_ACTIVITIE_TOPIC_01.getCode());}/*** 隊(duì)列和首部交換機(jī)綁定* key和value匹配* @param queue* @param key* @param value* @return*/ // @Bean // public Binding bindingQueueWithHeadersExchange() { // return BindingBuilder.bind(marketingActivitieQueue()).to(createHeadersExchange()) // .where(RabbitMqEnum.QueueKey.MARKETING_ACTIVITIE_HEADERS.getCode()) // .matches(RabbitMqEnum.QueueKey.MARKETING_ACTIVITIE_HEADERS.getName()); // }/*** 隊(duì)列和首部交換機(jī)綁定(x-match : all)* 完全匹配* @param queue* @param headerValues* @return*/ // @Bean // public Binding bindingQueueWithHeadersExchangeAll(Map<String, Object> headerValues) { // return BindingBuilder.bind(marketingActivitieQueue()).to(createHeadersExchange()).whereAll(headerValues).match(); // }/*** 隊(duì)列和首部交換機(jī)綁定(x-match : all)* 任一匹配* @param queue* @param headerValues* @return*/ // @Bean // public Binding bindingQueueWithHeadersExchangeAny(Map<String, Object> headerValues) { // return BindingBuilder.bind(marketingActivitieQueue()).to(createHeadersExchange()).whereAny(headerValues).match(); // } } View Code2、生產(chǎn)者
package cloud.app.prod.home.rabbitmq.mem;import org.apache.log4j.Logger; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.support.CorrelationData; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component;import cloud.app.prod.home.common.FailException; import cloud.app.prod.home.mem.vo.MarketingActivitiesVO; import cloud.app.prod.home.rabbitmq.RabbitMqEnum; import cloud.app.prod.home.utils.DSHUtils;/*** Author : YongBo Xie </br>* File Name: MarketingActivitieRabbitMqSender.java </br>* Created Date: 2018年3月28日 下午2:16:32 </br>* Modified Date: 2018年3月28日 下午2:16:32 </br>* Version: 1.0 </br> */ @Component public class MarketingActivitieRabbitMqSender {private static Logger logger = Logger.getLogger(MarketingActivitieRabbitMqSender.class);@Autowiredprivate RabbitTemplate rabbitTemplate;/*** 發(fā)送消息* rabbitTemplate.send(message); //發(fā)消息,參數(shù)類型為org.springframework.amqp.core.Message * rabbitTemplate.convertAndSend(object); //轉(zhuǎn)換并發(fā)送消息。 將參數(shù)對(duì)象轉(zhuǎn)換為org.springframework.amqp.core.Message后發(fā)送 * rabbitTemplate.convertSendAndReceive(message) //轉(zhuǎn)換并發(fā)送消息,且等待消息者返回響應(yīng)消息* 針對(duì)業(yè)務(wù)場(chǎng)景選擇合適的消息發(fā)送方式即可* @param obj* @throws FailException*/public void sendRabbitmqDirect(MarketingActivitiesVO marketingActivitiesVO) throws FailException {CorrelationData correlationData = new CorrelationData(DSHUtils.generateUUID());logger.info("send: " + correlationData.getId());rabbitTemplate.convertAndSend(RabbitMqEnum.Exchange.DIRECT_EXCHANGE.getCode(), RabbitMqEnum.QueueKey.MARKETING_ACTIVITIE_DIRECT.getCode() , marketingActivitiesVO, correlationData);}public void sendRabbitmqDirect(String exchange, String routingKey, Object obj) throws FailException {CorrelationData correlationData = new CorrelationData(DSHUtils.generateUUID());logger.info("send: " + correlationData.getId());rabbitTemplate.convertAndSend(exchange, routingKey, obj);}} View Code3、消費(fèi)者
package cloud.app.prod.home.rabbitmq.mem;import org.apache.log4j.Logger; import org.springframework.amqp.core.AcknowledgeMode; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener; import org.springframework.amqp.rabbit.listener.MessageListenerContainer; import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer; import org.springframework.context.annotation.Bean; import org.springframework.stereotype.Component;import com.rabbitmq.client.Channel;import cloud.app.prod.home.rabbitmq.RabbitMqEnum;/*** Author : YongBo Xie </br>* File Name: MarketingActivitieRabbitMqReceiver.java </br>* Created Date: 2018年3月28日 下午3:14:58 </br>* Modified Date: 2018年3月28日 下午3:14:58 </br>* Version: 1.0 </br> */ @Component public class MarketingActivitieRabbitMqReceiver {private static Logger logger = Logger.getLogger(MarketingActivitieRabbitMqReceiver.class);@Beanpublic MessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory) {SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();container.setConnectionFactory(connectionFactory);container.setQueueNames(RabbitMqEnum.QueueName.MARKETING_ACTIVITIE_QUEUE.getCode());container.setMessageListener(messageListener());container.setAcknowledgeMode(AcknowledgeMode.MANUAL); //設(shè)置為手動(dòng)return container;}// @RabbitListener(queues = "marketingActivitieQueue") // @RabbitHandler // public void process(String msg) { // logger.info(Thread.currentThread().getName() + " 接收到來自marketingActivitieQueue隊(duì)列的消息:" + msg); // } @Beanpublic ChannelAwareMessageListener messageListener() {return new ChannelAwareMessageListener() {@Overridepublic void onMessage(Message message, Channel channel) throws Exception {channel.confirmSelect();//在設(shè)置消息被消費(fèi)的回調(diào)前需顯示調(diào)用,否則回調(diào)函數(shù)無法調(diào)用if (message.toString().indexOf("1") > 0){logger.info(Thread.currentThread().getName() + " 接收到來自marketingActivitieQueue隊(duì)列的消息1:" + message.toString()); channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);}if (message.toString().indexOf("2") > 0){logger.info(Thread.currentThread().getName() + " 接收到來自marketingActivitieQueue隊(duì)列的消息2:" + message.toString());//被拒絕的是否重新入隊(duì)列//channel.basicNack 與 channel.basicReject 的區(qū)別在于basicNack可以拒絕多條消息,而basicReject一次只能拒絕一條消息 // channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,true);channel.basicReject(message.getMessageProperties().getDeliveryTag(),false);}logger.info(Thread.currentThread().getName() + " 接收到來自marketingActivitieQueue隊(duì)列的消息3:" + message.toString());}};}} View Code?
轉(zhuǎn)載于:https://www.cnblogs.com/BobXie85/p/8696374.html
總結(jié)
以上是生活随笔為你收集整理的springboot + rabbitmq 整合示例的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Linux - 系统资源
- 下一篇: 解决 Let’s Encrypt SSL