當前位置:
首頁 >
前端技术
> javascript
>内容正文
javascript
SpringBoot高级-消息-RabbitTemplate发送接受消息序列化机制
生活随笔
收集整理的這篇文章主要介紹了
SpringBoot高级-消息-RabbitTemplate发送接受消息序列化机制
小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.
引入了spring-boot-starter-amqp模塊,他引入了spring-messaging模塊,包括引入了spring-rabbit模塊,怎么配置使用呢,<dependency><groupId>org.springframework</groupId><artifactId>spring-messaging</artifactId>
</dependency>SpringBoot主要是引入了相關(guān)場景依賴,一切都是自動配置的,我們可以來分析一下原理,rabbitmq給我們自動配置了哪些
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.learn</groupId><artifactId>springboot-02-amqp</artifactId><version>0.0.1-SNAPSHOT</version><packaging>jar</packaging><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>1.5.12.RELEASE</version><relativePath/> </parent><properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding><java.version>1.8</java.version><thymeleaf.version>3.0.9.RELEASE</thymeleaf.version><thymeleaf-layout-dialect.version>2.2.2</thymeleaf-layout-dialect.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency></dependencies><!-- 這個插件,可以將應用打包成一個可執(zhí)行的jar包 --><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin></plugins></build></project>
我們就找一下和rabbitmq相關(guān)的類,有一個叫RabbitAutoConfiguration,這個自動配置類里面幫我們做了什么,我們在這記錄一下,自動配置類,我們配了哪些,進來看,這一塊我們配了連接工廠,這是人家?guī)臀覀兣浜玫倪B接工廠CachingConnectionFactory,有自動配置了連接工廠,從這里可以獲取,Rabbitmq的連接,比如我們的主機地址,用戶名,密碼,還有VisualHost等等@Configuration
@ConditionalOnMissingBean(ConnectionFactory.class)
protected static class RabbitConnectionFactoryCreator {@Beanpublic CachingConnectionFactory rabbitConnectionFactory(RabbitProperties config)throws Exception {RabbitConnectionFactoryBean factory = new RabbitConnectionFactoryBean();if (config.determineHost() != null) {factory.setHost(config.determineHost());}factory.setPort(config.determinePort());if (config.determineUsername() != null) {factory.setUsername(config.determineUsername());}if (config.determinePassword() != null) {factory.setPassword(config.determinePassword());}if (config.determineVirtualHost() != null) {factory.setVirtualHost(config.determineVirtualHost());}if (config.getRequestedHeartbeat() != null) {factory.setRequestedHeartbeat(config.getRequestedHeartbeat());}RabbitProperties.Ssl ssl = config.getSsl();if (ssl.isEnabled()) {factory.setUseSSL(true);if (ssl.getAlgorithm() != null) {factory.setSslAlgorithm(ssl.getAlgorithm());}factory.setKeyStore(ssl.getKeyStore());factory.setKeyStorePassphrase(ssl.getKeyStorePassword());factory.setTrustStore(ssl.getTrustStore());factory.setTrustStorePassphrase(ssl.getTrustStorePassword());}if (config.getConnectionTimeout() != null) {factory.setConnectionTimeout(config.getConnectionTimeout());}factory.afterPropertiesSet();CachingConnectionFactory connectionFactory = new CachingConnectionFactory(factory.getObject());connectionFactory.setAddresses(config.determineAddresses());connectionFactory.setPublisherConfirms(config.isPublisherConfirms());connectionFactory.setPublisherReturns(config.isPublisherReturns());if (config.getCache().getChannel().getSize() != null) {connectionFactory.setChannelCacheSize(config.getCache().getChannel().getSize());}if (config.getCache().getConnection().getMode() != null) {connectionFactory.setCacheMode(config.getCache().getConnection().getMode());}if (config.getCache().getConnection().getSize() != null) {connectionFactory.setConnectionCacheSize(config.getCache().getConnection().getSize());}if (config.getCache().getChannel().getCheckoutTimeout() != null) {connectionFactory.setChannelCheckoutTimeout(config.getCache().getChannel().getCheckoutTimeout());}return connectionFactory;}}都是從config里面得到的,config就是我們的RabbitProperties,我們點進來@ConfigurationProperties(prefix = "spring.rabbitmq")
public class RabbitProperties {這里面封裝了Rabbitmq的所有配置,那我們就來配置他們,找到我們的配置文件,spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.port=5672
#spring.rabbitmq.virtual-host=首先rabbitmq的主機地址,我們寫上我們的ip地址,http不用寫,http是web的訪問地址,還有rabbitmq的用戶名,我們也寫上,guest,guest用戶,還有我們的密碼,也叫g(shù)uest,還有rabbitmq的端口,這個端口呢,不寫默認就是5672,這個可以不寫,virtual-host屬性我們之前有說過,是一定要指定的,我們也可以不指定,virtual-host如果我們不寫,如果是空字符串也是訪問"/"/*** If addresses have been set and the first address has a virtual host it is returned.* Otherwise returns the result of calling {@code getVirtualHost()}.* @return the virtual host or {@code null}* @see #setAddresses(String)* @see #getVirtualHost()*/
public String determineVirtualHost() {if (CollectionUtils.isEmpty(this.parsedAddresses)) {return getVirtualHost();}Address address = this.parsedAddresses.get(0);return address.virtualHost == null ? getVirtualHost() : address.virtualHost;
}都可以通過配置文件來知道,這個配置我們基本寫完,但是我們?nèi)绾谓orabbitmq發(fā)送以及接收消息呢,我們可以看自動配置,這個自動配置除了看工廠,它還會給我們放一個RabbitTemplate,給容器中加一個他,給RabbitMQ接收和發(fā)送消息的,就像以前的JdbcTemplate,RedisTemplate,那么他還有什么呢,還可以來這里繼續(xù)來看,除了RabbitTemplate,他還放了AmqpAdmin,這AmqpAdmin呢,它是Rabbitmq的管理組件,Rabbitmq系統(tǒng)管理組件,他不來發(fā)消息和收消息,可以幫我們聲明一個隊列,創(chuàng)建一個交換器,這個是自動配置給我們放的組件,我們就用RabbitTemplate,我在這個測試類里面
我們嘗試給消息隊列里面發(fā)送一些內(nèi)容,我們來測試幾個消息,第一個單播下的點對點消息,我們將一個消息發(fā)送隊列里面,我們可以使用rabbitTemplate,兩個方法,send傳入exchange,就是交換器,消息先給交換器,根據(jù)路由件放到哪個隊列里面,后面這個就是我們要發(fā)的消息,這個是需要我們自己來編輯這個消息,第一個傳一個交換器,第二個是傳一個路由件routekey,第三個傳一個message,他的好處呢就是,特別是message需要自己定義,Message需要自己定義,需要自己構(gòu)造一個,自己構(gòu)造的時候呢,我們來看一下這個Message,我們要發(fā)的消息內(nèi)容,amqp這個核心包里面org.springframework.amqp.core.Message你要把你發(fā)的消息序列號成一個數(shù)組,還有你消息的頭信息,public Message(byte[] body, MessageProperties messageProperties) { //NOSONARthis.body = body; //NOSONARthis.messageProperties = messageProperties;
}可以定制消息體內(nèi)容和消息頭,這是rabbitTemplate.send,我們常見的是convertAndSend,轉(zhuǎn)化并發(fā)送,我們只需要傳入交換器,路由件,還有數(shù)據(jù)對象,這個對象默認就會被當成消息體,而且會自動轉(zhuǎn)換,我們只需要傳入要發(fā)送的對象,只需要傳入發(fā)送的對象,自動序列化給rabbitmq,這個object是被當成消息體的,默認當成消息體,我們就可以用這個方法,我們就直接用下面這個方法,我們要發(fā)點對點消息,我們就要按照交換器的規(guī)則,direct能夠直接派發(fā)給一個隊列,我們就連上這個交換器,我們就把消息給exchange.direct,我們連上這個交換器,這個交換器我們綁定了很多的路由件,比如china,路由件是他,交給這個隊列,我們就用china.news,我們用這個路由件,要發(fā)送的內(nèi)容是什么,就隨便寫一個map,我可以寫一個map,;里面存一些數(shù)據(jù)放進去,比如msg存一些提示消息,這是我們要發(fā)的數(shù)據(jù),我來發(fā)送測試一下,我們看這個數(shù)據(jù)能不能到達,這個運行完成了,我們來我們的消息隊列來看一下localhost:15672我們剛才發(fā)的是china.news
消息是這個樣子的
是因為序列號默認是JAVA的序列化方式,給我們序列化的對象,解碼以后的樣子,對象唄默認序列化以后,發(fā)送出去,發(fā)送點對點消息,我們要接收到這個消息,怎么辦呢,可以用這個API,我們同樣用rabbitTemplate,有這個receive方法可以接收,包括還有receiveAndConverter,接收并轉(zhuǎn)化,如果用這個接收,你收來自哪個消息隊列的消息,會給你把這個消息轉(zhuǎn)成Messager,里面就只有消息頭和消息體,如果用receiveAndConverter,可以把消息體轉(zhuǎn)成我們想要的對象,我們最終的消息是發(fā)到china.news,這個里面了,所以我們要收數(shù)據(jù),也是收這個消息隊列的,在這里寫上消息隊列的名稱,然后把收過來的數(shù)據(jù)拿過來,這個數(shù)據(jù)的類型是什么,再來打印的數(shù)據(jù)是什么
package com.learn.springboot;import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringBootAmqpApplicationTests {@AutowiredRabbitTemplate rabbitTemplate;/*** 1.單播(點對點)*/@Testpublic void contextLoads() {// Message需要自己構(gòu)造一個,定義消息體內(nèi)容和消息頭
// rabbitTemplate.send(exchange, routingKey, message);// object默認當成消息體,只需要傳入要發(fā)送的對象,自動序列化發(fā)送給rabbitmq
// rabbitTemplate.convertAndSend(routingKey, message, messagePostProcessor);Map<String,Object> map = new HashMap<String,Object>();map.put("msg", "這是第一個消息");map.put("data", Arrays.asList("helloworld",1231,true)); // 對象被默認序列化以后發(fā)送出去rabbitTemplate.convertAndSend("exchange.direct", "china.news",map);}@Testpublic void receive() {Object o = rabbitTemplate.receiveAndConvert("china.news");System.out.println(o.getClass());System.out.println(o);}
}
class java.util.HashMap
{msg=這是第一個消息, data=[helloworld, 1231, true]}
數(shù)據(jù)收到的類型是HashMap,拿到這些數(shù)據(jù)沒什么問題,所以我們也能準確的收到數(shù)據(jù),但是這個數(shù)據(jù)收到以后,來看這個消息隊列里面,這個消息隊列就沒有了
確實已經(jīng)沒有了,剛才已經(jīng)收到了,接收數(shù)據(jù),有時候我想把數(shù)據(jù)序列化成JSON的格式,那種序列化的結(jié)果有點不好看,如何將數(shù)據(jù)自動的轉(zhuǎn)為JSON發(fā)送出去,其實非常的簡單,我們到RabbitTemplate這里來看,為什么剛才是那種序列化結(jié)果,原因我們來往下翻,原因有一個消息轉(zhuǎn)換器private volatile MessageConverter messageConverter = new SimpleMessageConverter();這個轉(zhuǎn)換器呢,默認是SimpleMessageConverter,這個和MessageConverter呢,我們來序列化數(shù)據(jù)的時候,就是按照JDK的序列化規(guī)則,content = SerializationUtils.deserialize(createObjectInputStream(new ByteArrayInputStream(message.getBody()), this.codebaseUrl));我們可以給他換一個MessageConverterorg.springframework.amqp.support.converter.MessageConverter我們來寫上配置,比如我們在config包下寫一個MyAMQPConfig,這個Config我們來寫一個@Configuration,我們就自己來放一個MessageConverter,這是我們的MessageConverter,來放哪個MessageConverter呢,就來看他有哪些,抽象AbstractJsonMessageConverter有跟JSON有關(guān)的,Jackson2JsonMessageConverter,我們換成他,我們自動配置也會生效,當我們來配置RabbitTemplate的時候,如果我們有自己定義的MessageConverter,也會給我們設(shè)置進來,@Bean
@ConditionalOnSingleCandidate(ConnectionFactory.class)
@ConditionalOnMissingBean(RabbitTemplate.class)
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);MessageConverter messageConverter = this.messageConverter.getIfUnique();if (messageConverter != null) {rabbitTemplate.setMessageConverter(messageConverter);}我們再來測試這個消息,我們把消息發(fā)出去,來看還是不是原來的消息
package com.learn.config;import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class MyAMQPConfig {@Beanpublic MessageConverter messageConvert() {return new Jackson2JsonMessageConverter();}
}
消息就是我們的JSON數(shù)據(jù),而且還有定義的消息頭,消息頭有我們key的類型,我們整個消息的類型,application/json,序列化能過去,存起來,能不能取出消息,我們看到這塊也是沒有問題的,class java.util.HashMap
{msg=這是第一個消息, data=[helloworld, 1231, true]}比如我們發(fā)送Book對象,接下來我們來發(fā)一個圖書的對象,我們來new一個book
package com.learn.bean;public class Book {private String bookName;private String author;public Book() {super();}public Book(String bookName, String author) {super();this.bookName = bookName;this.author = author;}public String getBookName() {return bookName;}public void setBookName(String bookName) {this.bookName = bookName;}public String getAuthor() {return author;}public void setAuthor(String author) {this.author = author;}@Overridepublic String toString() {return "Book [bookName=" + bookName + ", author=" + author + "]";}}
package com.learn.springboot;import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;import com.learn.bean.Book;@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringBootAmqpApplicationTests {@AutowiredRabbitTemplate rabbitTemplate;/*** 1.單播(點對點)*/@Testpublic void contextLoads() {// Message需要自己構(gòu)造一個,定義消息體內(nèi)容和消息頭
// rabbitTemplate.send(exchange, routingKey, message);// object默認當成消息體,只需要傳入要發(fā)送的對象,自動序列化發(fā)送給rabbitmq
// rabbitTemplate.convertAndSend(routingKey, message, messagePostProcessor);Map<String,Object> map = new HashMap<String,Object>();map.put("msg", "這是第一個消息");map.put("data", Arrays.asList("helloworld",1231,true)); // 對象被默認序列化以后發(fā)送出去rabbitTemplate.convertAndSend("exchange.direct", "china.news",new Book("西游記","吳承恩"));}@Testpublic void receive() {Object o = rabbitTemplate.receiveAndConvert("china.news");System.out.println(o.getClass());System.out.println(o);}
}
能不能反序列化獲取出來呢,我們先用Object對象來接收,class com.learn.bean.Book
Book [bookName=西游記, author=吳承恩]這個Class拿到的是Book對象,強轉(zhuǎn)也可以在這里強轉(zhuǎn),消息序列化和反序列化,我們測試的是一個單播,那如果想廣播,不管是單播還是廣播,我們發(fā)給對應的exchange,我們再來測試一下,廣播類型的轉(zhuǎn)換器,然后路由件就不用寫了,因為廣播是不用管路由件的,我們來測試一下廣播
/*** 廣播*/@Testpublic void sendMsg() {rabbitTemplate.convertAndSend("exchange.fanout","",new Book("三國演義","羅貫中"));}
?
總結(jié)
以上是生活随笔為你收集整理的SpringBoot高级-消息-RabbitTemplate发送接受消息序列化机制的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: SpringBoot高级-消息-Rabb
- 下一篇: SpringBoot高级-消息-@Rab