javascript
Spring Boot(十三)RabbitMQ安装与集成
一、前言
RabbitMQ是一個開源的消息代理軟件(面向消息的中間件),它的核心作用就是創(chuàng)建消息隊列,異步接收和發(fā)送消息,MQ的全程是:Message Queue中文的意思是消息隊列。
1.1 使用場景
-
削峰填谷:用于應對間歇性流量提升對于系統(tǒng)的“破壞”,比如秒殺活動,可以把請求先發(fā)送到消息隊列在平滑的交由系統(tǒng)去處理,當訪問量大于一定數(shù)量的時候,還可以直接屏蔽后續(xù)操作,給前臺的用戶友好的顯示;
-
延遲處理:可以進行事件后置,比如訂單超時業(yè)務,用戶下單30分鐘未支付取消訂單;
-
系統(tǒng)解耦:消息隊列也可以幫開發(fā)人員完成業(yè)務的解耦,比如用戶上傳頭像的功能,最初的設計是用戶上傳完之后才能發(fā)帖,后面有增加了經驗系統(tǒng),需要在上傳頭像之后增加經驗值,到后來又上線了金幣系統(tǒng),上傳頭像之后可以增加金幣,像這種需求的不斷升級,如果在業(yè)務代碼里面寫死每次該業(yè)務代碼是很不優(yōu)雅的,這個時候如果使用消息隊列,那么只需要增加一個訂閱器用于介紹用戶上傳頭像的消息,再執(zhí)行經驗的增加和金幣的增加是非常簡單的,并且在不改動業(yè)務模塊業(yè)務代碼的基礎上可以輕松實現(xiàn),如果后期需要撤銷某個模塊了,只需要刪除訂閱器即可,就這樣就降低了系統(tǒng)開發(fā)的耦合性;
1.2 為什么使用RabbitMQ?
現(xiàn)在市面上比較主流的消息隊列還有Kafka、RocketMQ、RabbitMQ,它們的介紹和區(qū)別如下:
-
Kafka是LinkedIn開源的分布式發(fā)布-訂閱消息系統(tǒng),目前歸屬于Apache定級項目。Kafka主要特點是基于Pull的模式來處理消息消費,追求高吞吐量,一開始的目的就是用于日志收集和傳輸。0.8版本開始支持復制,對消息的重復、丟失、錯誤沒有嚴格要求,適合產生大量數(shù)據(jù)的互聯(lián)網(wǎng)服務的數(shù)據(jù)收集業(yè)務。
-
RabbitMQ是使用Erlang語言開發(fā)的開源消息隊列系統(tǒng),基于AMQP協(xié)議來實現(xiàn)。AMQP的主要特征是面向消息、隊列、路由(包括點對點和發(fā)布/訂閱)、可靠性、安全。AMQP協(xié)議更多用在企業(yè)系統(tǒng)內,對數(shù)據(jù)一致性、穩(wěn)定性和可靠性要求很高的場景,對性能和吞吐量的要求還在其次。
-
RocketMQ是阿里開源的消息中間件,它是純Java開發(fā),具有高吞吐量、高可用性、適合大規(guī)模分布式系統(tǒng)應用的特點。RocketMQ思路起源于Kafka,但并不是Kafka的一個Copy,它對消息的可靠傳輸及事務性做了優(yōu)化,目前在阿里集團被廣泛應用于交易、充值、流計算、消息推送、日志流式處理、binglog分發(fā)等場景。
簡單總結: Kafka的性能最好,適用于對消息吞吐量達,對消息丟失不敏感的系統(tǒng);RocketMQ借鑒了Kafka并提高了消息的可靠性,修復了Kafka的不足;RabbitMQ性能略低于Kafka,并實現(xiàn)了AMQP(Advanced Message Queuing Protocol)高級消息隊列協(xié)議的標準,有非常好的穩(wěn)定性。
支持語言對比
- RocketMQ 支持語言:Java、C++、Golang
- Kafka 支持語言:Java、Scala
- RabbitMQ 支持語言:C#、Java、Js/NodeJs、Python、Ruby、Erlang、Perl、Clojure、Golang
1.3 RabbitMQ特點
RabbitMQ的特點是易用、擴展性好(集群訪問)、高可用,具體如下:
- 可靠性:持久化、消息確認、事務等保證了消息的可靠性;
- 伸縮性:集群服務,可以很方便的添加服務器來提高系統(tǒng)的負載;
- 高可用:集群狀態(tài)下部分節(jié)點出現(xiàn)問題依然可以運行;
- 多語言支持:RabbitMQ幾乎支持了所有的語言,比如Java、.Net、Nodejs、Golang等;
- 易用的管理頁面:RabbitMQ提供了易用了網(wǎng)頁版的管理監(jiān)控系統(tǒng),可以很方便的完成RabbitMQ的控制和查看;
- 插件機制:RabbitMQ提供了許多插件,可以豐富和擴展Rabbit的功能,用戶也可編寫自己的插件;
1.4 RabbitMQ基礎知識
在了解消息通訊之前首先要了解3個概念:生產者、消費者和代理。
生產者:消息的創(chuàng)建者,負責創(chuàng)建和推送數(shù)據(jù)到消息服務器;
消費者:消息的接收方,用于處理數(shù)據(jù)和確認消息;
代理:就是RabbitMQ本身,用于扮演“快遞”的角色,本身不生產消息,只是扮演“快遞”的角色。
(一)消息發(fā)送原理
首先你必須連接到Rabbit才能發(fā)布和消費消息,那怎么連接和發(fā)送消息的呢?
你的應用程序和Rabbit Server之間會創(chuàng)建一個TCP連接,一旦TCP打開,并通過了認證,認證就是你試圖連接Rabbit之前發(fā)送的Rabbit服務器連接信息和用戶名和密碼,有點像程序連接數(shù)據(jù)庫,使用Java有兩種連接認證的方式,后面代碼會詳細介紹,一旦認證通過你的應用程序和Rabbit就創(chuàng)建了一條AMQP信道(Channel)。
信道是創(chuàng)建在“真實”TCP上的虛擬連接,AMQP命令都是通過信道發(fā)送出去的,每個信道都會有一個唯一的ID,不論是發(fā)布消息,訂閱隊列或者接收消息都是通過信道完成的。
(二)為什么不通過TCP直接發(fā)送命令?
對于操作系統(tǒng)來說創(chuàng)建和銷毀TCP會話是非常昂貴的開銷,假設高峰期每秒有成千上萬條連接,每個連接都要創(chuàng)建一條TCP會話,這就造成了TCP連接的巨大浪費,而且操作系統(tǒng)每秒能創(chuàng)建的TCP也是有限的,因此很快就會遇到系統(tǒng)瓶頸。
如果我們每個請求都使用一條TCP連接,既滿足了性能的需要,又能確保每個連接的私密性,這就是引入信道概念的原因。
(三)RabbitMQ名稱解釋
ConnectionFactory(連接管理器): 應用程序與Rabbit之間建立連接的管理器,程序代碼中使用;
Channel(信道): 消息推送使用的通道;
Exchange(交換器): 用于接受、分配消息;
Queue(隊列): 用于存儲生產者的消息;
RoutingKey(路由鍵): 用于把生成者的數(shù)據(jù)分配到交換器上;
BindingKey(綁定鍵): 用于把交換器的消息綁定到隊列上;
看到上面的解釋,最難理解的路由鍵和綁定鍵了,那么他們具體怎么發(fā)揮作用的,請看下圖:
1.5 交換器分類
RabbitMQ的Exchange(交換器)分為四類:
- direct(默認)
- headers
- fanout
- topic
其中headers交換器允許你匹配AMQP消息的header而非路由鍵,除此之外headers交換器和direct交換器完全一致,但性能卻很差,幾乎用不到,所以我們這里不做解釋。
1.5.1 direct交換器
direct為默認的交換器類型,也非常的簡單,如果路由鍵匹配的話,消息就投遞到相應的隊列,如下圖:
1.5.2 fanout交換器
fanout有別于direct交換器,fanout是一種發(fā)布/訂閱模式的交換器,當你發(fā)送一條消息的時候,交換器會把消息廣播到所有附加到這個交換器的隊列上。
注意: 對于fanout交換器來說routingKey(路由鍵)是無效的,這個參數(shù)是被忽略的。
1.5.3 topic交換器
topic交換器運行和fanout類似,但是可以更靈活的匹配自己想要訂閱的信息,這個時候routingKey路由鍵就排上用場了,使用路由鍵進行消息(規(guī)則)匹配。
topic路由器的關鍵在于定義路由鍵,定義routingKey名稱不能超過255字節(jié),使用“.”作為分隔符,例如:com.mq.rabbit.error。
匹配規(guī)則
匹配表達式可以用“*”和“#”匹配任何字符,具體規(guī)則如下:
- “*”匹配一個分段(用“.”分割)的內容;
- “#”匹配所有字符;
例如發(fā)布了一個“cn.mq.rabbit.error”的消息:
能匹配上的路由鍵:
- cn.mq.rabbit.*
- cn.mq.rabbit.#
- #.error
- cn.mq.#
- #
不能匹配上的路由鍵:
- cn.mq.*
- *.error
- *
1.6 消息持久化
RabbitMQ隊列和交換器有一個不可告人的秘密,就是默認情況下重啟服務器會導致消息丟失,那么怎么保證Rabbit在重啟的時候不丟失呢?答案就是消息持久化。
當你把消息發(fā)送到Rabbit服務器的時候,你需要選擇你是否要進行持久化,但這并不能保證Rabbit能從崩潰中恢復,想要Rabbit消息能恢復必須滿足3個條件:
持久化工作原理
Rabbit會將你的持久化消息寫入磁盤上的持久化日志文件,等消息被消費之后,Rabbit會把這條消息標識為等待垃圾回收。
持久化的缺點
消息持久化的優(yōu)點顯而易見,但缺點也很明顯,那就是性能,因為要寫入硬盤要比寫入內存性能較低很多,從而降低了服務器的吞吐量,盡管使用SSD硬盤可以使事情得到緩解,但他仍然吸干了Rabbit的性能,當消息成千上萬條要寫入磁盤的時候,性能是很低的。
所以使用者要根據(jù)自己的情況,選擇適合自己的方式。
學習更多RabbitMQ知識,訪問:https://gitbook.cn/gitchat/activity/5b558d54c28306099b47ae9c
二、在Docker中安裝RabbitMQ
(1)下載鏡像
https://hub.docker.com/r/library/rabbitmq/tags/
- alpine 輕量版
- management 帶插件的版本
從鏡像的大小也可以很直觀的看出來alpine是輕量版。
使用命令:
docker pull rabbitmq:3.7.7-management
下載帶management插件的版本。
(2)運行RabbitMQ
使用命令:
docker run -d --hostname myrabbit --name rabbit -p 15672:15672 -p 5672:5672 rabbitmq:3.7.7-management
- -d 后臺運行
- –hostname 主機名稱
- –name 容器名稱
- -p 15672:15672 http訪問端口,映射本地端口到容器端口
- -p 5672:5672 amqp端口,映射本地端口到容器端口
正常啟動之后,訪問:http://localhost:15672/
登錄網(wǎng)頁管理頁面,用戶名密碼:guest/guest,登錄成功如下圖:
三、RabbitMQ集成
3.1 添加依賴
如果用Idea創(chuàng)建新項目,可以直接在創(chuàng)建Spring Boot的時候,點擊“Integration”面板,選擇RabbitMQ集成,如下圖:
如果是老Maven項目,直接在pom.xml添加如下代碼:
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId> </dependency>3.2 配置RabbitMQ信息
在application.properties設置如下信息:
spring.rabbitmq.host=localhost spring.rabbitmq.port=5672 spring.rabbitmq.username=test spring.rabbitmq.password=test3.3 代碼
3.3 代碼實現(xiàn)
本節(jié)分別來看三種交換器:direct、fanout、topic的實現(xiàn)代碼。
3.3.1 Direct Exchange
3.3.1.1 配置隊列
創(chuàng)建DirectConfig.java代碼如下:
package com.example.rabbitmq.mq; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.DirectExchange; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration;@Configuration public class DirectConfig {final static String QUEUE_NAME = "direct"; //隊列名稱final static String EXCHANGE_NAME = "mydirect"; //交換器名稱@Beanpublic Queue queue() {// 聲明隊列 參數(shù)一:隊列名稱;參數(shù)二:是否持久化return new Queue(DirectConfig.QUEUE_NAME, false);}// 配置默認的交換機,以下部分都可以不配置,不設置使用默認交換器(AMQP default)@BeanDirectExchange directExchange() {// 參數(shù)一:交換器名稱;參數(shù)二:是否持久化;參數(shù)三:是否自動刪除消息return new DirectExchange(DirectConfig.EXCHANGE_NAME, false, false);}// 綁定“direct”隊列到上面配置的“mydirect”路由器@BeanBinding bindingExchangeDirectQueue(Queue directQueue, DirectExchange directExchange) {return BindingBuilder.bind(directQueue).to(directExchange).with(DirectConfig.QUEUE_NAME);} }3.3.1.2 發(fā)送消息
創(chuàng)建Sender.java代碼如下:
package com.example.rabbitmq.mq; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import java.util.Date; /*** 消息發(fā)送者-生產消息*/ @Component public class Sender {@Autowiredprivate AmqpTemplate rabbitTemplate;public void driectSend(String message) {System.out.println("Direct 發(fā)送消息:" + message);//參數(shù)一:交換器名稱,可以省略(省略存儲到AMQP default交換器);參數(shù)二:路由鍵名稱(direct模式下路由鍵=隊列名稱);參數(shù)三:存儲消息this.rabbitTemplate.convertAndSend("direct", message);} }注意:
- 在direct交換器中,路由鍵名稱就是隊列的名稱;
- 發(fā)送消息“convertAndSend”的時候,第一個參數(shù)為交換器的名稱,非必填可以忽略,如果忽略則會把消息發(fā)送到默認交換器“AMQP default”;
3.3.1.3 消費消息
創(chuàng)建Receiver.java代碼如下:
package com.example.rabbitmq.mq; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component;/*** 消息接收者-消費消息*/ @Component @RabbitListener(queues = "direct") public class Receiver {@Autowiredprivate AmqpTemplate rabbitTemplate;@RabbitHandler/*** 監(jiān)聽消費消息*/public void process(String message) {System.out.println("Direct 消費消息:" + message);} }3.3.1.4 測試代碼
使用Spring Boot中的默認測試框架JUnit進行單元測試,不了解JUnit的可以參考我的上一篇文章,創(chuàng)建MQTest.java代碼如下:
package com.example.rabbitmq.mq; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner; import java.text.SimpleDateFormat; import java.util.Date; import static org.junit.Assert.*;@RunWith(SpringRunner.class) @SpringBootTest public class MQTest {@Autowiredprivate Sender sender;@Testpublic void driectTest() {SimpleDateFormat sf = new SimpleDateFormat("yyyy-MM-dd");sender.driectSend("Driect Data:" + sf.format(new Date()));} }執(zhí)行之后,效果如下圖:
表示消息已經被發(fā)送并被消費了。
3.3.2 Fanout Exchange
3.3.2.1 配置隊列
創(chuàng)建FanoutConfig.java代碼如下:
package com.example.rabbitmq.mq; import org.springframework.amqp.core.*; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration;@Configuration public class FanoutConfig {final static String QUEUE_NAME = "fanout"; //隊列名稱final static String QUEUE_NAME2 = "fanout2"; //隊列名稱final static String EXCHANGE_NAME = "myfanout"; //交換器名稱@Beanpublic Queue queueFanout() {return new Queue(FanoutConfig.QUEUE_NAME);}@Beanpublic Queue queueFanout2() {return new Queue(FanoutConfig.QUEUE_NAME2);}//配置交換器@BeanFanoutExchange fanoutExchange() {return new FanoutExchange(FanoutConfig.EXCHANGE_NAME);}// 綁定隊列到交換器@BeanBinding bindingFanoutExchangeQueue(Queue queueFanout, FanoutExchange fanoutExchange) {return BindingBuilder.bind(queueFanout).to(fanoutExchange);}// 綁定隊列到交換器@BeanBinding bindingFanoutExchangeQueue2(Queue queueFanout2, FanoutExchange fanoutExchange) {return BindingBuilder.bind(queueFanout2).to(fanoutExchange);} }3.3.2.2 發(fā)送消息
創(chuàng)建FanoutSender.java代碼如下:
package com.example.rabbitmq.mq; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component;@Component public class FanoutSender {@Autowiredprivate AmqpTemplate rabbitTemplate;public void send(String message) {System.out.println("發(fā)送消息:" + message); this.rabbitTemplate.convertAndSend(FanoutConfig.EXCHANGE_NAME,FanoutConfig.QUEUE_NAME, message);}public void send2(String message) {System.out.println("發(fā)送消息2:" + message); this.rabbitTemplate.convertAndSend(FanoutConfig.EXCHANGE_NAME,FanoutConfig.QUEUE_NAME2, message);} }3.3.2.3 消費消息
創(chuàng)建兩個監(jiān)聽類,第一個FanoutReceiver.java代碼如下:
package com.example.rabbitmq.mq; import com.rabbitmq.client.Channel; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import java.io.IOException;@Component @RabbitListener(queues = "fanout") public class FanoutReceiver {@RabbitHandlerpublic void process(String msg) {System.out.println("Fanout(FanoutReceiver)消費消息:" + msg);} }第二個FanoutReceiver2.java代碼如下:
package com.example.rabbitmq.mq; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component;@Component @RabbitListener(queues = "fanout2") public class FanoutReceiver2 {@RabbitHandlerpublic void process(String message) {System.out.println("Fanout(FanoutReceiver2)消費消息:" + message);} }3.3.2.4 測試代碼
創(chuàng)建FanoutTest.java代碼如下:
package com.example.rabbitmq.mq; import com.example.rabbitmq.RabbitmqApplication; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner; import java.text.SimpleDateFormat; import java.util.Date;@RunWith(SpringRunner.class) @SpringBootTest(classes = RabbitmqApplication.class) public class FanoutTest {@Autowiredprivate FanoutSender sender;@Testpublic void Test() throws InterruptedException {SimpleDateFormat sf = new SimpleDateFormat("yyyy-MM-dd");sender.send("Time1 => " + sf.format(new Date()));sender.send2("Date2 => " + sf.format(new Date()));} }運行測試代碼,輸出結果如下:
發(fā)送消息:Time1 => 2018-09-11 發(fā)送消息2:Date2 => 2018-09-11 Fanout(FanoutReceiver2)消費消息:Time1 => 2018-09-11 Fanout(FanoutReceiver2)消費消息:Date2 => 2018-09-11 Fanout(FanoutReceiver)消費消息:Time1 => 2018-09-11 Fanout(FanoutReceiver)消費消息:Date2 => 2018-09-11總結: 可以看出fanout會把消息分發(fā)到所有訂閱到該交換器的隊列,fanout模式是忽略路由鍵的。
3.3.3 Topic Exchange
3.3.3.1 配置隊列
@Configuration public class TopicConfig {final static String QUEUE_NAME = "log";final static String QUEUE_NAME2 = "log.all";final static String QUEUE_NAME3 = "log.all.error";final static String EXCHANGE_NAME = "topicExchange"; //交換器名稱@Beanpublic Queue queuetopic() {return new Queue(TopicConfig.QUEUE_NAME);}@Beanpublic Queue queuetopic2() {return new Queue(TopicConfig.QUEUE_NAME2);}@Beanpublic Queue queuetopic3() {return new Queue(TopicConfig.QUEUE_NAME3);}// 配置交換器@BeanTopicExchange topicExchange() {return new TopicExchange(TopicConfig.EXCHANGE_NAME);}// 綁定隊列到交換器,并設置路由鍵(log.#)@BeanBinding bindingtopicExchangeQueue(Queue queuetopic, TopicExchange topicExchange) {return BindingBuilder.bind(queuetopic).to(topicExchange).with("log.#");}// 綁定隊列到交換器,并設置路由鍵(log.*)@BeanBinding bindingtopicExchangeQueue2(Queue queuetopic2, TopicExchange topicExchange) {return BindingBuilder.bind(queuetopic2).to(topicExchange).with("log.*");}// 綁定隊列到交換器,并設置路由鍵(log.*.error)@BeanBinding bindingtopicExchangeQueue3(Queue queuetopic3, TopicExchange topicExchange) {return BindingBuilder.bind(queuetopic3).to(topicExchange).with("log.*.error");} }3.3.3.2 發(fā)布消息
@Component public class TopicSender {@Autowiredprivate AmqpTemplate rabbitTemplate;public void topicSender(String message) {String routingKey = "log.all.error";System.out.println(routingKey + " 發(fā)送消息:" + message);this.rabbitTemplate.convertAndSend(TopicConfig.EXCHANGE_NAME, routingKey, message);} }3.3.3.3 消費消息
@Component @RabbitListener(queues = "log") public class TopicReceiver {@RabbitHandlerpublic void process(String msg) {System.out.println("log.# 消費消息:" + msg);} } @Component @RabbitListener(queues = "log.all") public class TopicReceiver2 {@RabbitHandlerpublic void process(String msg) {System.out.println("log.* 消費消息:" + msg);} } @Component @RabbitListener(queues = "log.all.error") public class TopicReceiver3 {@RabbitHandlerpublic void process(String msg) {System.out.println("log.*.error 消費消息:" + msg);} }3.3.3.4 測試代碼
@RunWith(SpringRunner.class) @SpringBootTest(classes = RabbitmqApplication.class) public class FanoutTest {@Autowiredprivate FanoutSender fanoutSender;@Testpublic void Test() {SimpleDateFormat sf = new SimpleDateFormat("yyyy-MM-dd");fanoutSender.send("Time1 => " + sf.format(new Date()));fanoutSender.send2("Date2 => " + sf.format(new Date()));} }輸出結果:
log.all.error 發(fā)送消息:time => 2018-09-11 log.# 消費消息:time => 2018-09-11 log.*.error 消費消息:time => 2018-09-11總結: 在Topic Exchange中“#”可以匹配所有內容,而“*”則是匹配一個字符段的內容。
以上示例代碼Github地址:https://github.com/vipstone/springboot-example/tree/master/springboot-rabbitmq
參考文檔
阿里 RocketMQ 優(yōu)勢對比:https://juejin.im/entry/5a0abfb5f265da43062a4a91
總結
以上是生活随笔為你收集整理的Spring Boot(十三)RabbitMQ安装与集成的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: mssql 字增自段怎样重置(重新自增)
- 下一篇: 借力 Docker ,三分钟搞定 MyS