【消息中间件】浅谈中间件优缺RabbitMQ基本使用
消息中間件概述
MQ全稱為Message Queue,消息隊列是應用程序和應用程序之間的通信方法。是在消息的傳輸過程中保存消息的容器。多用于分布式系統之間進行通信。
-
為什么使用MQ
在項目中,可將一些無需即時返回且耗時的操作提取出來,進行異步處理,而這種異步處理的方式大大的節省了服務器的請求響應時間,從而提高了系統的吞吐量。 -
開發中消息隊列通常有如下應用場景:
- 任務異步處理
將不需要同步處理的并且耗時長的操作由消息隊列通知消息接收方進行異步處理。提高了應用程序的響應時間。 - 應用程序解耦合
MQ相當于一個中介,生產方通過MQ與消費方交互,它將應用程序進行解耦合。 - 削峰填谷
如訂單系統,在下單的時候就會往數據庫寫數據。但是數據庫只能支撐每秒1000左右的并發寫入,并發量再高就容易宕機。低峰期的時候并發也就100多個,但是在高峰期時候,并發量會突然激增到5000以上,這個時候數據庫肯定卡死了。
消息被MQ保存起來了,然后系統就可以按照自己的消費能力來消費,比如每秒1000個數據,這樣慢慢寫入數據庫,這樣就不會卡死數據庫了。
但是使用了MQ之后,限制消費消息的速度為1000,但是這樣一來,高峰期產生的數據勢必會被積壓在MQ中,高峰就被“削”掉了。但是因為消息積壓,在高峰期過后的一段時間內,消費消息的速度還是會維持在1000QPS,直到消費完積壓的消息,這就叫做“填谷”
- 任務異步處理
MQ 的劣勢
- 系統可用性降低
系統引入的外部依賴越多,系統穩定性越差。一旦 MQ 宕機,就會對業務造成影響。如何保證MQ的高可用? - 系統復雜度提高
MQ 的加入大大增加了系統的復雜度,以前系統間是同步的遠程調用,現在是通過 MQ 進行異步調用。如何保證消息沒有被重復消費?怎么處理消息丟失情況?那么保證消息傳遞的順序性? - 一致性問題
A 系統處理完業務,通過 MQ 給B、C、D三個系統發消息數據,如果 B 系統、C 系統處理成功,D 系統處理失敗。如何保證消息數據處理的一致性?
既然 MQ 有優勢也有劣勢,那么使用 MQ 需要滿足什么條件呢?
- 生產者不需要從消費者處獲得反饋。引入消息隊列之前的直接調用,其接口的返回值應該為空,這才讓明明下層的動作還沒做,上層卻當成動作做完了繼續往后走,即所謂異步成為了可能。
- 容許短暫的不一致性。
- 確實是用了有效果。即解耦、提速、削峰這些方面的收益,超過加入MQ,管理MQ這些成本。
AMQP 和 JMS
MQ是消息通信的模型;實現MQ的大致有兩種主流方式:AMQP、JMS。
AMQP
AMQP是一種協議,更準確的說是一種binary wire-level protocol(鏈接協議)。這是其和JMS的本質差別,AMQP不從API層進行限定,而是直接定義網絡交換的數據格式。
AMQP,即 Advanced Message Queuing Protocol(高級消息隊列協議),是一個網絡協議,是應用層協議的一個開放標準,為面向消息的中間件設計?;诖藚f議的客戶端與消息中間件可傳遞消息,并不受客戶端/中間件不同產品,不同的開發語言等條件的限制。2006年,AMQP 規范發布。類比HTTP。
2007年,Rabbit 技術公司基于 AMQP 標準開發的 RabbitMQ 1.0 發布。RabbitMQ 采用 Erlang 語言開發。
Erlang 語言由 Ericson 設計,專門為開發高并發和分布式系統的一種語言,在電信領域使用廣泛。
JMS
JMS即Java消息服務(JavaMessage Service)應用程序接口,是一個Java平臺中關于面向消息中間件(MOM)的API,用于在兩個應用程序之間,或分布式系統中發送消息,進行異步通信。
JMS 是 JavaEE 規范中的一種,類比JDBC
很多消息中間件都實現了JMS規范,例如:ActiveMQ。RabbitMQ 官方沒有提供 JMS 的實現包,但是開源社區有
AMQP 與 JMS 區別
- JMS是定義了統一的接口,來對消息操作進行統一;AMQP是通過規定協議來統一數據交互的格式
- JMS限定了必須使用Java語言;AMQP只是協議,不規定實現方式,因此是跨語言的。
- JMS規定了兩種消息模式;而AMQP的消息模式更加豐富
消息隊列產品
市場上常見的消息隊列有如下:
- ActiveMQ:基于JMS
- ZeroMQ:基于C語言開發
- RabbitMQ:基于AMQP協議,erlang語言開發,穩定性好
- RocketMQ:基于JMS,阿里巴巴產品
- Kafka:類似MQ的產品;分布式消息系統,高吞吐量
RabbitMQ
RabbitMQ 中的相關概念:
- Broker:接收和分發消息的應用,RabbitMQ Server就是 Message Broker
- Virtual host:出于多租戶和安全因素設計的,把 AMQP 的基本組件劃分到一個虛擬的分組中,類似于網絡中的 namespace 概念。當多個不同的用戶使用同一個 RabbitMQ server 提供的服務時,可以劃分出多個vhost,每個用戶在自己的 vhost 創建 exchange/queue 等
- Connection:publisher/consumer 和 broker 之間的 TCP 連接
- Channel:如果每一次訪問 RabbitMQ 都建立一個 Connection,在消息量大的時候建立 TCP Connection的開銷將是巨大的,效率也較低。Channel 是在 connection 內部建立的邏輯連接,如果應用程序支持多線程,通常每個thread創建單獨的 channel 進行通訊,AMQP method 包含了channel id 幫助客戶端和 message broker 識別 channel,所以 channel 之間是完全隔離的。Channel 作為輕量級的 Connection 極大減少了操作系統建立 TCP connection 的開銷
- Exchange:message 到達 broker 的第一站,根據分發規則,匹配查詢表中的 routing key,分發消息到queue 中去。常用的類型有:direct (point-to-point), topic (publish-subscribe) and fanout(multicast)
- Queue:消息最終被送到這里等待 consumer 取走
- Binding:exchange 和 queue 之間的虛擬連接,binding 中可以包含 routing key。Binding 信息被保存到 exchange 中的查詢表中,用于 message 的分發依據
RabbitMQ是由erlang語言開發,基于AMQP(Advanced Message Queue 高級消息隊列協議)協議實現的消息隊列,它是一種應用程序之間的通信方法,消息隊列在分布式系統開發中應用非常廣泛。
RabbitMQ官方地址:http://www.rabbitmq.com/
RabbitMQ提供了6種模式:簡單模式,work模式,Publish/Subscribe發布與訂閱模式,Routing路由模式,Topics主題模式,RPC遠程調用模式(遠程調用,不太算MQ;暫不作介紹);
官網對應模式介紹:https://www.rabbitmq.com/getstarted.html
RabbitMQ入門
pom.xml文件中添加如下依賴:
<dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.6.0</version></dependency>編寫生產者
public class Producer {static final String QUEUE_NAME = "simple_queue";public static void main(String[] args) throws Exception {//創建連接工廠ConnectionFactory connectionFactory = new ConnectionFactory();//主機地址;默認為 localhostconnectionFactory.setHost("localhost");//連接端口;默認為 5672connectionFactory.setPort(5672);//虛擬主機名稱;默認為 /connectionFactory.setVirtualHost("/wanyuan");//連接用戶名;默認為guestconnectionFactory.setUsername("wanyuan");//連接密碼;默認為guestconnectionFactory.setPassword("wanyuan");//創建連接Connection connection = connectionFactory.newConnection();// 創建頻道Channel channel = connection.createChannel();// 聲明(創建)隊列/*** 參數1:隊列名稱* 參數2:是否定義持久化隊列* 參數3:是否獨占本次連接* 參數4:是否在不使用的時候自動刪除隊列* 參數5:隊列其它參數*/channel.queueDeclare(QUEUE_NAME, true, false, false, null);// 要發送的信息String message = "你好;趙麗穎!";/*** 參數1:交換機名稱,如果沒有指定則使用默認Default Exchage* 參數2:路由key,簡單模式可以傳遞隊列名稱* 參數3:消息其它屬性* 參數4:消息內容*/channel.basicPublish("", QUEUE_NAME, null, message.getBytes());System.out.println("已發送消息:" + message);// 關閉資源channel.close();connection.close();}}在執行上述的消息發送之后;可以登錄rabbitMQ的管理控制臺,可以發現隊列和其消息
編寫消費者
抽取創建connection的工具類
public class ConnectionUtil {public static Connection getConnection() throws Exception {//創建連接工廠ConnectionFactory connectionFactory = new ConnectionFactory();//主機地址;默認為 localhostconnectionFactory.setHost("localhost");//連接端口;默認為 5672connectionFactory.setPort(5672);//虛擬主機名稱;默認為 /connectionFactory.setVirtualHost("/itcast");//連接用戶名;默認為guestconnectionFactory.setUsername("heima");//連接密碼;默認為guestconnectionFactory.setPassword("heima");//創建連接return connectionFactory.newConnection();} }編寫消息的消費者
public class Consumer {public static void main(String[] args) throws Exception {Connection connection = ConnectionUtil.getConnection();// 創建頻道Channel channel = connection.createChannel();// 聲明(創建)隊列/*** 參數1:隊列名稱* 參數2:是否定義持久化隊列* 參數3:是否獨占本次連接* 參數4:是否在不使用的時候自動刪除隊列* 參數5:隊列其它參數*/channel.queueDeclare(Producer.QUEUE_NAME, true, false, false, null);//創建消費者;并設置消息處理DefaultConsumer consumer = new DefaultConsumer(channel){@Override/*** consumerTag 消息者標簽,在channel.basicConsume時候可以指定* envelope 消息包的內容,可從中獲取消息id,消息routingkey,交換機,消息和重傳標志(收到消息失敗后是否需要重新發送)* properties 屬性信息* body 消息*/public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {//路由keySystem.out.println("路由key為:" + envelope.getRoutingKey());//交換機System.out.println("交換機為:" + envelope.getExchange());//消息idSystem.out.println("消息id為:" + envelope.getDeliveryTag());//收到的消息System.out.println("接收到的消息為:" + new String(body, "utf-8"));}};//監聽消息/*** 參數1:隊列名稱* 參數2:是否自動確認,設置為true為表示消息接收到自動向mq回復接收到了,mq接收到回復會刪除消息,設置為false則需要手動確認* 參數3:消息接收到后回調*/channel.basicConsume(Producer.QUEUE_NAME, true, consumer);//不關閉資源,應該一直監聽消息//channel.close();//connection.close();}}總結
上述的入門案例中中其實使用的是如下的簡單模式:
在上圖的模型中,有以下概念:
- P:生產者,也就是要發送消息的程序
- C:消費者:消息的接受者,會一直等待消息到來。
- queue:消息隊列,圖中紅色部分。類似一個郵箱,可以緩存消息;生產者向其中投遞消息,消費者從其中取出消息。
總結
以上是生活随笔為你收集整理的【消息中间件】浅谈中间件优缺RabbitMQ基本使用的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 【全文搜索引擎】Elasticsearc
- 下一篇: 【消息中间件】AMQPRabbitMQ工