javascript
使用RabbitMQ的SpringBoot消息传递
RabbitMQ是流行的消息代理解決方案之一,并提供可用于各種編程語言的客戶端庫,包括Java,Scala,.NET,Go,Python,Ruby,PHP等。在本教程中,我們將學(xué)習(xí)如何使用RabbitMQ消息代理從SpringBoot應(yīng)用程序發(fā)送和接收消息。 我們還將研究如何將消息作為JSON負(fù)載發(fā)送,以及如何使用Dead Letter Queue(DLQ)處理錯(cuò)誤。
首先,按照此處https://www.rabbitmq.com/download.html所述在本地計(jì)算機(jī)上安裝RabbitMQ服務(wù)器,或者使用以下docker-compose.yml作為Docker映像運(yùn)行。
version: '3' services:rabbitmq:container_name: rabbitmqimage: 'rabbitmq:management'ports:- "5672:5672"- "15672:15672"現(xiàn)在,您可以使用docker-compose啟動(dòng)RabbitMQ ,并在http:// localhost:15672 /啟動(dòng)管理UI。
如果您熟悉ActiveMQ等其他消息傳遞代理,則通常使用隊(duì)列和主題發(fā)送一對(duì)一和發(fā)布-訂閱通信模型。 在RabbitMQ中,我們將郵件發(fā)送到Exchange,并根據(jù)路由密鑰將郵件轉(zhuǎn)發(fā)到隊(duì)列。 您可以在https://www.rabbitmq.com/tutorials/amqp-concepts.html上閱讀有關(guān)RabbitMQ概念的更多信息。
您可以在https://github.com/sivaprasadreddy/sivalabs-blog-samples-code/tree/master/springboot-rabbitmq-demo中找到本文的源代碼
RabbitMQ的SpringBoot應(yīng)用程序
現(xiàn)在,讓我們從http://start.spring.io/選擇Web , Thymeleaf和RabbitMQ啟動(dòng)器創(chuàng)建一個(gè)SpringBoot應(yīng)用程序。
pom.xml
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.sivalabs</groupId><artifactId>springboot-rabbitmq-demo</artifactId><version>1.0-SNAPSHOT</version><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.0.0.RC1</version><relativePath/></parent><properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding><java.version>1.8</java.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-thymeleaf</artifactId></dependency></dependencies></project>讓我們從RabbitMQ配置開始。 創(chuàng)建RabbitConfig配置類,并定義Queue , Exchange和Binding Bean,如下所示:
import org.springframework.amqp.core.*; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration;@Configuration public class RabbitConfig {public static final String QUEUE_ORDERS = "orders-queue";public static final String EXCHANGE_ORDERS = "orders-exchange";@BeanQueue ordersQueue() {return QueueBuilder.durable(QUEUE_ORDERS).build();}@BeanQueue deadLetterQueue() {return QueueBuilder.durable(QUEUE_DEAD_ORDERS).build();}@BeanExchange ordersExchange() {return ExchangeBuilder.topicExchange(EXCHANGE_ORDERS).build();}@BeanBinding binding(Queue ordersQueue, TopicExchange ordersExchange) {return BindingBuilder.bind(ordersQueue).to(ordersExchange).with(QUEUE_ORDERS);} } 在這里,我們聲明一個(gè)名稱為orders-queue的隊(duì)列和一個(gè)名稱為orders-exchange的Exchange。
我們還定義了orders-queue和orders-exchange之間的綁定,以便將任何以routing-key作為“ orders-queue”發(fā)送到orders-exchange的消息都發(fā)送到orders-queue。
我們可以在application.properties中配置RabbitMQ服務(wù)器的詳細(xì)信息,如下所示:
spring.rabbitmq.host=localhost spring.rabbitmq.port=5672 spring.rabbitmq.username=guest spring.rabbitmq.password=guest讓我們創(chuàng)建一個(gè)Spring bean OrderMessageSender來發(fā)送消息到orders-exchange。
Spring Boot自動(dòng)配置向RabbitMQ代理發(fā)送消息或從RabbitMQ代理接收消息所需的基礎(chǔ)結(jié)構(gòu)bean。 我們可以簡(jiǎn)單地通過調(diào)用RabbitTemplate.convertAndSend(“ routingKey”,Object)方法自動(dòng)連接RabbitTemplate并發(fā)送消息。
public class Order implements Serializable {private String orderNumber;private String productId;private double amount;//setters & getters }import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service;@Service public class OrderMessageSender {private final RabbitTemplate rabbitTemplate;@Autowiredpublic OrderMessageSender(RabbitTemplate rabbitTemplate) {this.rabbitTemplate = rabbitTemplate;}public void sendOrder(Order order) {this.rabbitTemplate.convertAndSend(RabbitConfig.QUEUE_ORDERS, order);} }默認(rèn)情況下,Spring Boot使用org.springframework.amqp.support.converter.SimpleMessageConverter并將對(duì)象串行化為byte [] 。
現(xiàn)在有了此配置,我們可以通過調(diào)用OrderMessageSender.sendOrder(Order)方法將消息發(fā)送到RabbitMQ訂單隊(duì)列。
發(fā)送消息后,您可以通過使用來賓/來賓憑證登錄從Administration UI應(yīng)用程序中查看消息。 您可以單擊“ 交易所 / 隊(duì)列”選項(xiàng)卡以查看已創(chuàng)建的訂單交換和訂單隊(duì)列 。 您還可以檢查訂單交換的綁定,如下所示:
現(xiàn)在轉(zhuǎn)到“隊(duì)列”選項(xiàng)卡,然后單擊“訂單隊(duì)列”。 向下滾動(dòng)到“ 獲取消息”部分,然后單擊“ 獲取消息”按鈕,可以查看消息的內(nèi)容。
現(xiàn)在,使用@RabbitListener創(chuàng)建訂單隊(duì)列的偵聽器。
創(chuàng)建一個(gè)Spring bean OrderMessageListener ,如下所示:
import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component;@Component public class OrderMessageListener {static final Logger logger = LoggerFactory.getLogger(OrderMessageListener.class);@RabbitListener(queues = RabbitConfig.QUEUE_ORDERS)public void processOrder(Order order) {logger.info("Order Received: "+order);} }而已!! 通過簡(jiǎn)單地添加@RabbitListener并定義要監(jiān)聽的隊(duì)列,我們??可以創(chuàng)建一個(gè)Listener。
現(xiàn)在,如果您向Order-queue發(fā)送一條消息,該消息應(yīng)該由OrderMessageListener.processOrder()方法使用,并且應(yīng)該看到日志語句“ Order Received:”。
以JSON有效載荷的形式發(fā)送和接收消息
如我們所見,默認(rèn)的序列化機(jī)制使用SimpleMessageConverter將消息對(duì)象轉(zhuǎn)換為byte [],并在接收端將使用GenericMessageConverter將byte []反序列化為Object類型(在我們的示例中為Order)。
為了更改此行為,我們需要定制Spring Boot RabbitMQ自動(dòng)配置的bean。
以JSON格式發(fā)送消息
一種將消息作為JSON有效負(fù)載發(fā)送的快速方法是使用ObjectMapper,我們可以將Order對(duì)象轉(zhuǎn)換為JSON并發(fā)送。
@Autowired private ObjectMapper objectMapper;public void sendOrder(Order order) {try {String orderJson = objectMapper.writeValueAsString(order);Message message = MessageBuilder.withBody(orderJson.getBytes()).setContentType(MessageProperties.CONTENT_TYPE_JSON).build();this.rabbitTemplate.convertAndSend(RabbitConfig.QUEUE_ORDERS, message);} catch (JsonProcessingException e) {e.printStackTrace();} }但是像這樣將對(duì)象轉(zhuǎn)換為JSON是一種樣板。 相反,我們可以采用以下方法。
我們可以配置讓RabbitTemplate使用org.springframework.amqp.support.converter.Jackson2JsonMessageConverter bean,以便將消息序列化為JSON而不是byte []。
@Configuration public class RabbitConfig {......@Beanpublic RabbitTemplate rabbitTemplate(final ConnectionFactory connectionFactory) {final RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);rabbitTemplate.setMessageConverter(producerJackson2MessageConverter());return rabbitTemplate;}@Beanpublic Jackson2JsonMessageConverter producerJackson2MessageConverter() {return new Jackson2JsonMessageConverter();} }現(xiàn)在,當(dāng)您發(fā)送一條消息時(shí),它將轉(zhuǎn)換為JSON并將其發(fā)送到Queue。
以JSON格式接收消息
為了將消息有效負(fù)載視為JSON,我們應(yīng)該通過實(shí)現(xiàn)RabbitListenerConfigurer來定制RabbitMQ配置。
@Configuration public class RabbitConfig implements RabbitListenerConfigurer {......@Overridepublic void configureRabbitListeners(RabbitListenerEndpointRegistrar registrar) {registrar.setMessageHandlerMethodFactory(messageHandlerMethodFactory());}@BeanMessageHandlerMethodFactory messageHandlerMethodFactory() {DefaultMessageHandlerMethodFactory messageHandlerMethodFactory = new DefaultMessageHandlerMethodFactory();messageHandlerMethodFactory.setMessageConverter(consumerJackson2MessageConverter());return messageHandlerMethodFactory;}@Beanpublic MappingJackson2MessageConverter consumerJackson2MessageConverter() {return new MappingJackson2MessageConverter();} }使用DeadLetterQueues(DLQ)處理錯(cuò)誤和無效消息
我們可能希望將無效消息發(fā)送到單獨(dú)的隊(duì)列,以便以后可以檢查和重新處理它們。 我們可以使用DLQ概念自動(dòng)執(zhí)行此操作,而無需手動(dòng)編寫代碼來處理這種情況。
我們可以在定義Queue Bean的同時(shí)聲明Queue的dead-letter-exchange , dead-letter-routing-key :
@Configuration public class RabbitConfig implements RabbitListenerConfigurer {public static final String QUEUE_ORDERS = "orders-queue";public static final String EXCHANGE_ORDERS = "orders-exchange";public static final String QUEUE_DEAD_ORDERS = "dead-orders-queue";@BeanQueue ordersQueue() {return QueueBuilder.durable(QUEUE_ORDERS).withArgument("x-dead-letter-exchange", "").withArgument("x-dead-letter-routing-key", QUEUE_DEAD_ORDERS).withArgument("x-message-ttl", 15000) //if message is not consumed in 15 seconds send to DLQ.build();}@BeanQueue deadLetterQueue() {return QueueBuilder.durable(QUEUE_DEAD_ORDERS).build();}...... }現(xiàn)在嘗試將無效的JSON消息發(fā)送到orders-queue,它將被發(fā)送到dead-orders-queue。
您可以在https://github.com/sivaprasadreddy/sivalabs-blog-samples-code/tree/master/springboot-rabbitmq-demo中找到本文的源代碼。
翻譯自: https://www.javacodegeeks.com/2018/02/springboot-messaging-rabbitmq.html
總結(jié)
以上是生活随笔為你收集整理的使用RabbitMQ的SpringBoot消息传递的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 101英语怎么读 101英语是什么
- 下一篇: 网络用语nc粉是什么意思 来源是什么