javascript
SpringCloud stream连接RabbitMQ收发信息
百度上查的大部分都是一些很簡單的單消費(fèi)者或者單生產(chǎn)者的例子,并且多是同一個(gè)服務(wù)器的配置,本文的例子為多服務(wù)器配置下的消費(fèi)生產(chǎn)和消費(fèi)者配置。
參考資料:https://docs.spring.io/spring-cloud-stream/docs/Elmhurst.RELEASE/reference/htmlsingle/#_binder_implementations
?
1、POM引入spring-cloud-starter-stream-rabbit
<dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-stream-rabbit</artifactId> </dependency>2、application.properties
通用配置:
#rabbit的配置信息 spring.rabbitmq.addresses=amqp://10.18.75.231:5672 spring.rabbitmq.username=user_admin spring.rabbitmq.password=12345678#下面這個(gè)配置優(yōu)先級太高,在配置中心分模塊(分文件)的場景下后面的binder屬性無法被覆蓋,如果有存在多個(gè)vhost的情況下建議將該屬性注釋掉
spring.rabbitmq.virtual-host=boss
當(dāng)存在多個(gè)binder時(shí)必須指定一個(gè)默認(rèn)的binder:
# 設(shè)置一個(gè)默認(rèn)的binder,如果不配置將報(bào)錯(cuò) spring.cloud.stream.defaultBinder=boss消費(fèi)者配置:?1 # 配置ecm消費(fèi)者的服務(wù)器配置信息
2 spring.cloud.stream.binders.ecm.type=rabbit 3 #spring.cloud.stream.binders.ecm.environment.spring.rabbitmq.addresses=${spring.rabbitmq.addresses} 4 #spring.cloud.stream.binders.ecm.environment.spring.rabbitmq.username=${spring.rabbitmq.username} 5 #spring.cloud.stream.binders.ecm.environment.spring.rabbitmq.password=${spring.rabbitmq.password} 6 spring.cloud.stream.binders.ecm.environment.spring.rabbitmq.virtual-host=ecm 7 8 #交易系統(tǒng)ECM的貨柜模板變更消費(fèi)者 9 spring.cloud.stream.bindings.ecm_shop_template.binder=ecm 10 spring.cloud.stream.bindings.ecm_shop_template.destination=這里填exchange的名字 11 #默認(rèn)情況下同一個(gè)隊(duì)列的只能被同一個(gè)group的消費(fèi)者消費(fèi) 12 spring.cloud.stream.bindings.ecm_shop_template.group=這里是消費(fèi)者的名稱 13 spring.cloud.stream.bindings.ecm_shop_template.contentType=text/plain 14 #指定該主題的類型為廣播模式 15 spring.cloud.stream.rabbit.bindings.ecm_shop_template.consumer.exchangeType=fanout 16 #消費(fèi)失敗的消息放入dlq隊(duì)列 17 spring.cloud.stream.rabbit.bindings.ecm_shop_template.consumer.autoBindDlq=true 18 spring.cloud.stream.rabbit.bindings.ecm_shop_template.consumer.republishToDlq=true配置死信隊(duì)列會在消費(fèi)者出現(xiàn)異常的時(shí)候重試3(默認(rèn)為3,可以配置)次后將消息放入死信隊(duì)列中,效果如下:
?
生產(chǎn)者配置:
1 # BOSS消息生產(chǎn)者服務(wù)器配置 2 spring.cloud.stream.binders.boss.type=rabbit 3 #spring.cloud.stream.binders.boss.environment.spring.rabbitmq.addresses=${spring.rabbitmq.addresses} 4 #spring.cloud.stream.binders.boss.environment.spring.rabbitmq.username=${spring.rabbitmq.username}5 #spring.cloud.stream.binders.boss.environment.spring.rabbitmq.password=${spring.rabbitmq.password} 6 spring.cloud.stream.binders.boss.environment.spring.rabbitmq.virtual-host=boss 7 8 #BOSS基礎(chǔ)信息生產(chǎn)者 9 spring.cloud.stream.bindings.message_output.destination=exchange的名稱 10 #exchange的類型為廣播模式 11 spring.cloud.stream.rabbit.bindings.message_output.producer.exchangeType=fanout下面是java代碼
1、定義消息的Input和Output配置信息
1 import org.springframework.cloud.stream.annotation.Input; 2 import org.springframework.cloud.stream.annotation.Output; 3 import org.springframework.messaging.MessageChannel; 4 5 /** 6 * mq連接源定義 7 * 8 * 其中類中的2個(gè)屬性的值和properties里的配置需要一致 9 **/ 10 public interface MqMessageSource { 11 // BOSS生產(chǎn)者 12 String MESSAGE_OUTPUT = "message_output"; 13 // ECM消費(fèi)者 14 String ECM_SHOP_TEMPLATE_INPUT = "ecm_shop_template"; 15 16 @Output(MESSAGE_OUTPUT) 17 MessageChannel messageOutput(); 18 19 @Input(ECM_SHOP_TEMPLATE_INPUT) 20 MessageChannel messageInput(); 21 22 }2、消息消費(fèi)
1 import org.springframework.beans.factory.annotation.Autowired; 2 import org.springframework.cloud.stream.annotation.EnableBinding; 3 import org.springframework.cloud.stream.annotation.StreamListener; 4 import org.springframework.messaging.Message; 5 6 import com.alibaba.fastjson.JSONObject; 7 8 import lombok.extern.slf4j.Slf4j; 9 10 /** 11 * MQ消費(fèi)者 12 * @author yangzhilong 13 * 14 */ 15 @Slf4j 16 @EnableBinding(MqMessageSource.class) 17 public class MqMessageConsumer { 18 19 @Autowired 20 private XXService xxService; 21 22 /** 23 * 消費(fèi)ECM的貨柜模板變更 24 * @param message 25 */ 26 @StreamListener(MqMessageSource.ECM_SHOP_TEMPLATE_INPUT) 27 public void receive(Message<String> message) { 28 log.info("接收貨柜模板開始,參數(shù)={}", JSONObject.toJSONString(message)); 29 if (null == message) { 30 return; 31 } 32 try { 33 String payload = message.getPayload(); 34 log.info("具體消息內(nèi)容= {}", JSONObject.toJSONString(payload)); 35 JSONObject jsonObject = JSONObject.parseObject(payload); 36 ShopReqDto shopReqDto = new ShopReqDto(); 37 shopReqDto.setCode(jsonObject.getString("shopNo")); 38 shopReqDto.setGoodsMarketTemplateId(jsonObject.getLong("goodsMarketTemplateId")); 39 shopReqDto.setGoodsMarketTemplateName(jsonObject.getString("goodsMarketTemplateName")); 40 ResponseResult<String> responseResult = xxService.updateTemplateIdAndName(shopReqDto); 41 if(responseResult.isSuccess()){ 42 log.info("【MQ消費(fèi)貨柜模板更新信息成功】"); 43 }else{ 44 log.error("【MQ消費(fèi)貨柜模板更新信息失敗】,返回結(jié)果信息:" + JSONObject.toJSONString(responseResult)); 45 } 46 } catch (Exception e) { 47 log.error("接收處理貨柜模板MQ時(shí)出現(xiàn)異常:{}", e); 48 throw new RuntimeException(e); 49 } 50 } 51 }3、消息生產(chǎn)者代碼
1 import org.springframework.beans.factory.annotation.Autowired; 2 import org.springframework.cloud.stream.annotation.EnableBinding; 3 import org.springframework.cloud.stream.annotation.Output; 4 import org.springframework.messaging.MessageChannel; 5 import org.springframework.messaging.support.MessageBuilder; 6 import com.alibaba.fastjson.JSON; 7 import lombok.extern.slf4j.Slf4j; 8 9 /** 10 * 消息生產(chǎn)者 11 * 12 **/ 13 @EnableBinding(MqMessageSource.class) 14 @Slf4j 15 public class MqMessageProducer { 16 @Autowired 17 @Output(MqMessageSource.MESSAGE_OUTPUT) 18 private MessageChannel channel; 19 20 21 //品牌 22 public void sendBrandAdd(Brand brand) { 23 BossMessage<Brand> message = new BossMessage<>(); 24 message.setData(brand); 25 message.setOpType(MqMessageProducer.ADD); 26 message.setDataType(MqMessageProducer.BRAND); 27 channel.send(MessageBuilder.withPayload(JSON.toJSONString(message)).build()); 28 log.info("【MQ發(fā)送內(nèi)容】" + JSON.toJSONString(message)); 29 } 30 }?
轉(zhuǎn)載于:https://www.cnblogs.com/yangzhilong/p/7904461.html
總結(jié)
以上是生活随笔為你收集整理的SpringCloud stream连接RabbitMQ收发信息的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 线程池处理流程
- 下一篇: 面试题 锁消除是什么