spring整合rabbitMQ最新版
生活随笔
收集整理的這篇文章主要介紹了
spring整合rabbitMQ最新版
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
文章目錄
- 一、簡單對象
- 1. 依賴
- 2. 生產者
- 3. 消費者
- 4. 配置文件
- 5. spring版本
- 二、復雜對象
- 2.1. 生產者
- 2.2. 消費者
一、簡單對象
1. 依賴
<!--spring整合rabbitmq--><dependency><groupId>org.springframework.amqp</groupId><artifactId>spring-rabbit</artifactId><version>2.0.1.RELEASE</version></dependency>注:maven方式,這一個依賴即可,如果是非maven項目,需要引入5個jar如下:
推薦使用mavne方式,簡單,非Maven項目,先用maven把以來下載本地倉庫,復制到非maven的項目中即可。
2. 生產者
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xmlns:rabbit="http://www.springframework.org/schema/rabbit"xsi:schemaLocation="http://www.springframework.org/schema/rabbithttp://www.springframework.org/schema/rabbit/spring-rabbit-2.0.xsdhttp://www.springframework.org/schema/beanshttp://www.springframework.org/schema/beans/spring-beans-3.1.xsd"><!--生產者者配置如下:--><!-- 定義RabbitMQ的連接工廠 --><rabbit:connection-factory id="connectionFactory"host="${rabbitmq.host}"port="${rabbitmq.port}"username="${rabbitmq.username}"password="${rabbitmq.password}"virtual-host="${rabbitmq.vhost}"connection-timeout="${rabbitmq.conTimeout}"publisher-confirms="${rabbitmq.publisher-confirms}"publisher-returns="${rabbitmq.publisher-returns}"/><!-- 管理消息隊列 --><rabbit:admin connection-factory="connectionFactory"/><!--此處為配置文件方式 管控臺配置模式需要注釋 默認模式管控臺 Start--><!-- 定義一個隊列或者多個隊列 自動聲明--><rabbit:queue name="Queue-1" auto-declare="true" durable="true"/><rabbit:topic-exchange name="exchange-1"><rabbit:bindings><!-- 可綁定多個隊列,發送的時候指定key進行發送 --><rabbit:binding queue="Queue-1" pattern="ws.tjqb"/></rabbit:bindings></rabbit:topic-exchange><!--此處為配置文件方式 管控臺配置模式需要注釋 默認模式管控臺 End--><!-- 定義交換機 自動聲明--><rabbit:topic-exchange name="exchange-1"auto-declare="true" durable="true"/><!-- 5. 配置消息對象json轉換類 --><bean id="jsonMessageConverter" class="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter" /><!-- 定義MQ消息模板1. id : 定義消息模板ID2.connection-factory : 把定義的連接工廠放到消息模板中3.confirm-callback : confirm確認機制4.return-callback : return確認機制5.mandatory :#有2種狀態設置為 true 后 消費者在消息沒有被路由到合適隊列情況下會被return監聽,而不會自動刪除;設置為 false 后 消費者在消息沒有被路由到合適隊列情況下會自動刪除--><rabbit:template id="rabbitTemplate"connection-factory="connectionFactory"exchange="exchange-1"confirm-callback="confirmCallBackListener"return-callback="returnCallBackListener"mandatory="true"message-converter="jsonMessageConverter"/> </beans> package com.gblfy.order.controller;import com.gblfy.order.pojo.FisCallingTrace; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.support.CorrelationData; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController;import java.util.Date; import java.util.HashMap; import java.util.Map; import java.util.UUID;@RestController public class Send {public static final String EXCHANGE = "exchange-1";@AutowiredRabbitTemplate rabbitTemplate;@RequestMapping("/test")public String test() {String uuidStr = UUID.randomUUID().toString();CorrelationData correlationId = new CorrelationData(uuidStr);// 發送消息Map<String, String> map = new HashMap<>();map.put("email", "550731230@qq.com");rabbitTemplate.convertAndSend(EXCHANGE, "ws.tjqb", map, correlationId);return "success";}3. 消費者
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xmlns:rabbit="http://www.springframework.org/schema/rabbit"xsi:schemaLocation="http://www.springframework.org/schema/rabbithttp://www.springframework.org/schema/rabbit/spring-rabbit-2.0.xsdhttp://www.springframework.org/schema/beanshttp://www.springframework.org/schema/beans/spring-beans-3.1.xsd"><!--消費者配置如下:--><!-- 定義RabbitMQ的連接工廠 --><rabbit:connection-factory id="connectionFactory"host="${rabbitmq.host}" port="${rabbitmq.port}" username="${rabbitmq.username}"password="${rabbitmq.password}" virtual-host="${rabbitmq.vhost}"connection-timeout="${rabbitmq.conTimeout}"publisher-confirms="${rabbitmq.publisher-confirms}"publisher-returns="${rabbitmq.publisher-returns}"/><!-- 管理消息隊列 --><rabbit:admin connection-factory="connectionFactory"/><!-- 聲明多個消費者對象 --><bean id="emailMessageListener" class="com.gblfy.order.mqhandler.EmailMessageListener"/><!-- 監聽隊列1. connectionFactory 連接工廠2. manual 手動簽收3. ref="" 消費者監聽--><rabbit:listener-container connection-factory="connectionFactory"acknowledge="manual"concurrency="${rabbitmq.concurrency}"max-concurrency="${rabbitmq.max-concurrency}"><rabbit:listener ref="emailMessageListener" method="onMessage" queue-names="Queue-1"/></rabbit:listener-container> </beans> package com.gblfy.order.mqhandler;import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageListener;import java.io.IOException; @Component public class EmailMessageListener implements MessageListener {private static final ObjectMapper MAPPER = new ObjectMapper();@Overridepublic void onMessage(Message message) {try {JsonNode jsonNode = MAPPER.readTree(message.getBody());String email = jsonNode.get("email").asText();System.out.println("獲取隊列中消息:" + email);} catch (IOException e) {e.printStackTrace();}} }4. 配置文件
#RabbitMQ 連接信息 #IP地址 rabbitmq.host=192.168.0.114 #端口 rabbitmq.port=5672 #用戶名 rabbitmq.username=fis #密碼 rabbitmq.password=ncl@1234 #虛擬主機 rabbitmq.vhost=/app/fisMQ #連接超時時間 rabbitmq.conTimeout=15000 #發送確認 對應RabbitTemplate.ConfirmCallback接口 #消息發送成功 有2個重要參數 # ack 狀態為true correlationId 全局唯一ID用于標識每一支隊列 rabbitmq.publisher-confirms=true #發送失敗回退,對應RabbitTemplate.ReturnCallback接口 rabbitmq.publisher-returns=true #默認消費者數量 rabbitmq.concurrency=10 #最大消費者數量 rabbitmq.max-concurrency=205. spring版本
目前適配的spring版本4.2.3.RELEASE
二、復雜對象
聲明:配置文件不變
2.1. 生產者
package com.gblfy.order.controller;import com.gblfy.order.pojo.FisCallingTrace; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.support.CorrelationData; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController;import java.util.Date; import java.util.HashMap; import java.util.Map; import java.util.UUID;@RestController public class Send {public static final String EXCHANGE = "exchange-1";@AutowiredRabbitTemplate rabbitTemplate;@RequestMapping("/test")public String test() {FisCallingTrace f = getFisCallingTrace();String uuidStr = UUID.randomUUID().toString();CorrelationData correlationId = new CorrelationData(uuidStr);// 發送消息Map<String, Object> map = new HashMap<>();map.put("mReqXml", "請求報文");map.put("mResXml", "響應報文");map.put("mUUID", uuidStr);map.put("serviceName", "NYHC");map.put("fisCallingTrace", f);rabbitTemplate.convertAndSend(EXCHANGE, "ws.tjqb", map, correlationId);return "success";}// @RequestMapping("/test")// public String test() {// String uuidStr = UUID.randomUUID().toString();// CorrelationData correlationId = new CorrelationData(uuidStr);// // 發送消息// Map<String, String> map = new HashMap<>();// map.put("email", "550731230@qq.com");// rabbitTemplate.convertAndSend(EXCHANGE, "ws.tjqb", map, correlationId);// return "success";// }private FisCallingTrace getFisCallingTrace() {FisCallingTrace f = new FisCallingTrace();f.setServicename("tjqb");f.setServicetype("1");f.setInterfacetype("2");f.setResstatus("1");f.setResremark("紐約數據回傳接口");f.setReqdate(new Date());f.setReqtime("10:00:00");f.setResdate(new Date());f.setRestime("10:00:00");f.setReqxml("請求報文");f.setResxml("響應報文");return f;}}2.2. 消費者
package com.gblfy.order.mqhandler;import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.gblfy.order.pojo.FisCallingTrace; import com.rabbitmq.client.Channel; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener; import org.springframework.stereotype.Component;import java.io.IOException;@Slf4j @Component public class EmailMessageListener implements ChannelAwareMessageListener {private static final ObjectMapper MAPPER = new ObjectMapper();@Overridepublic void onMessage(Message message, Channel channel) throws Exception {try {JsonNode jsonNode = MAPPER.readTree(message.getBody());String mReqXml = jsonNode.get("mReqXml").asText();String mResXml = jsonNode.get("mResXml").asText();String mUUID = jsonNode.get("mUUID").asText();String serviceName = jsonNode.get("serviceName").asText();System.out.println("獲取隊列中消息:" + mReqXml);System.out.println("獲取隊列中消息:" + mResXml);System.out.println("獲取隊列中消息:" + mUUID);System.out.println("獲取隊列中消息:" + serviceName);JsonNode jsonNode1 = jsonNode.get("fisCallingTrace");String jsonStr = MAPPER.writeValueAsString(jsonNode1);FisCallingTrace f= MAPPER.readValue(jsonStr , FisCallingTrace.class);System.out.println("獲取隊列中消息:" + f.getReqxml());System.out.println("獲取隊列中消息:" + f.getResxml());// 消息的標識,false只確認當前一個消息收到,true確認所有consumer獲得的消息channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);} catch (IOException e) {e.printStackTrace();}log.info("解析操作");log.info("落庫操作");} }總結
以上是生活随笔為你收集整理的spring整合rabbitMQ最新版的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: PLSQL 设置日期格式为年月日不显示时
- 下一篇: Stream filter过滤案例