rabbitmq知识汇总
rabbitmq知識匯總
一、什么是消息中間件
1. 基于消息中間件的分布式系統的架構
[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-v81ZRat2-1624005504828)(C:\Users\Administrator\AppData\Roaming\Typora\typora-user-images\image-20210610153533157.png)]
2. 消息中間件應用的場景
- 跨系統數據傳遞
- 高并發的流量削峰
- 數據的分發和異步處理
- 大數據的分析與傳遞
- 分布式事務
3. RabbitMQ 為什么需要信道?為什么不是TCP直接通信?
- TCP的創建和銷毀,開銷大,創建需要三次握手,銷毀需要四次分手
- 如果不使用信道,那么引用程序就會使用TCP的方式連接到rabbitmq,高峰時每秒成千上萬條連接會造成資源的巨大浪費(一條tcp消耗資源,成千上萬的tcp會非常消耗資源),而且操作系統每秒處理TCP連接數量也是有限的,必定會造成性能瓶頸
- 信道的原理是一條線程一條信道,多條線程多條信道共同使用一條TCP連接。一條TCP連接可以容納無限的信道,及時每秒造成成千上萬的請求也不會造成性能瓶頸
4. 消息持久化,常見的持久化方式
| 文件存儲 | 支持 | 支持 | 支持 | 支持 | 
| 數據庫 | 支持 | / | / | / | 
5.消息分發策略的機制和對比
| 發布訂閱 | 支持 | 支持 | 支持 | 支持 | 
| 輪詢分發 | 支持 | 支持 | 支持 | / | 
| 公平分發 | / | 支持 | 支持 | / | 
| 重發 | 支持 | 支持 | / | 支持 | 
| 消息拉取 | / | 支持 | 支持 | 支持 | 
6.消息中間件的結構
[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-90wKXvzZ-1624005504831)(C:\Users\Administrator\AppData\Roaming\Typora\typora-user-images\image-20210617090341769.png)]
核心概念:
- Server: 又稱Broke,接受客戶端的連接,實現AMQP實體服務,安裝rabbitmq-server
- Connection: 連接,應用程序與Broke的網絡連接TCP/IP 三次握手和四次揮手
- Channel:網絡信道,幾乎所有的操作都在Channel中進行,Channel是進行消息讀寫的通道,客戶端可以建立對各Channel,每個Channel代表一個會話任務
- Message: 消息,服務與應用程序之間傳送的數據,由Properties和body組成,Properties可是對消息進行修飾,比如消息的優先級、延遲等高等特性,Body則就是消息體的內容
- Virtual Host:虛擬地址,用于進行邏輯隔離,最上層的消息路由,一個虛擬機可以有若干個Exchange和Queue,同一個虛擬機主機里面不能有相同名字和Exchange
- Exchange:交換機,接收消息,根據路由鍵發送消息到綁定的隊列(不具備消息存儲的能力)
- Binding:Exchange和Queue之間的虛擬連接,binding中可以保護多個routing key
- routing key:是一個路由規則,虛擬機可以用它來確定如何路由一個特定消息
- Queue:隊列,也成為Message Queue。消息隊列,保存消息并將它們轉發給消費者。
rabbitmq的運行流程:
[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-lLfQsC2m-1624005504832)(C:\Users\Administrator\AppData\Roaming\Typora\typora-user-images\image-20210617142927721.png)]
二、安裝過程
1.安裝erlang環境
查看erlang與rabbitmq的版本對應關系:
https://www.rabbitmq.com/which-erlang.html
[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-um99vqEL-1624005504834)(C:\Users\Administrator\AppData\Roaming\Typora\typora-user-images\image-20210611091932922.png)]
2.去erlang官網下載對應版本rpm文件,并上傳至linux服務器在安裝(推薦),在線安裝比較慢,一般需要fanqiang下載:
https://www.erlang-solutions.com/downloads/#
[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-HQ8IDsCo-1624005504835)(C:\Users\Administrator\AppData\Roaming\Typora\typora-user-images\image-20210611092130951.png)]
使用 rpm -Uvh esl-erlang_24.0-1_centos_7_amd64.rpm 會報如下錯誤:[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-GEs1ziim-1624005504836)(C:\Users\Administrator\AppData\Roaming\Typora\typora-user-images\image-20210611092300062.png)]
需要先執行下列命令即可:
sudo yum install epel-release sudo yum install unixODBC unixODBC-devel wxBase wxGTK SDL wxGTK-gl 或者執行: rpm -Uvh esl-erlang_23.2-1_centos_7_amd64.rpm --force --nodeps使用 erl -v 檢查是否安裝成功,如下
[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-LcztHNQ2-1624005504837)(C:\Users\Administrator\AppData\Roaming\Typora\typora-user-images\image-20210611092749327.png)]
安裝socat,如下:
[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-q6zpmKIT-1624005504837)(C:\Users\Administrator\AppData\Roaming\Typora\typora-user-images\image-20210611092932720.png)]
至此,erlang環境安裝完畢!
2.安裝rabbitmq
執行rpm -Uvh rabbitmq-server-3.8.16-1.el7.noarch.rpm
[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-h2JOSa9y-1624005504838)(C:\Users\Administrator\AppData\Roaming\Typora\typora-user-images\image-20210611093602214.png)]
執行yum install rabbitmq-server -y
[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-hLwubiNA-1624005504839)(C:\Users\Administrator\AppData\Roaming\Typora\typora-user-images\image-20210611093622407.png)]
執行systemctl start rabbitmq-server啟動
執行systemctl status rabbitmq-server查看,如下
[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-gRBIS748-1624005504839)(C:\Users\Administrator\AppData\Roaming\Typora\typora-user-images\image-20210611094013631.png)]
執行systemctl enable rabbitmq-server開機自啟動
執行systemctl stop rabbitmq-server開機自啟動關閉
3. 執行命令安裝rabbitmqweb
rabbitmq-plugins enable rabbitmq_management相關配置命令如下:
1:查看防火狀態 systemctl status firewalld service iptables status 2:暫時關閉防火墻 systemctl stop firewalld service iptables stop 3:永久關閉防火墻 systemctl disable firewalld chkconfig iptables off 4:重啟防火墻 systemctl enable firewalld service iptables restart 5:永久關閉后重啟 chkconfig iptables on4.授權賬號和密碼
新增用戶:
rabbitmqctl add_user admin admin設置用戶分配操作權限:
rabbitmqctl set_user_tags admin administrator用戶級別如下:
- administrator 可以登錄控制臺、查看所有信息、可以對rabbitmq進行管理
- monitoring 監控者登錄控制臺、查看所有信息
- policymaker 策略制定者 登錄控制臺指定策略
- managment 普通管理員 登錄控制臺
為用戶賦權:
sudo rabbitmqctl set_permissions -p / admintest '.*' '.*' '.*'三、spring boot集成
1.普通maven下的簡單模式(simple),舉例
<!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client --><dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.12.0</version></dependency><!-- https://mvnrepository.com/artifact/com.alibaba/fastjson --><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.76</version></dependency> package com.study.dyq.rabbitmq_01.simple;import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import com.rabbitmq.client.*;import java.io.IOException; import java.util.concurrent.TimeoutException;/*** maven 簡單模式simple的demo* 注釋:所有的中間件技術都是基于tcp/ip協議基礎之上的,rabbitmq遵循的是amqp協議,便于在消息頭傳遞一些附加的的信息*/ public class Productor {public static void main(String[] args) {// 1.創建連接工程ConnectionFactory cf = new ConnectionFactory();cf.setHost("10.200.4.126");cf.setPort(5672);cf.setUsername("admintest");cf.setPassword("admintest");cf.setVirtualHost("/");Connection connection = null;// 聲明連接Channel channel = null;// 聲明通道try {// 2.創建連接connectionconnection = cf.newConnection("生產者");// 3.通過連接獲取通道Channelchannel = connection.createChannel();// 4.通過通過創建交換機,聲明隊列,綁定關系、路由key,發送消息和接收消息String queueName = "queue1";/*** @Param1 隊列的名稱* @Param2 是否持久化,true:持久化,false:不持久化* @Param3 排他性,是否是獨占隊列* @Param4 是否自動刪除,隨著最后一個消費者消費完消息后隊列是否要自動刪除* @Param5 攜帶附屬參數*/channel.queueDeclare(queueName, true, false, false, null);// 5.準備消息內容,可以為json格式JSONObject ob = new JSONObject();ob.put("消息內容", "我是第一個測試的消息隊列呀!");String msg = JSON.toJSONString(ob);// 6.發送消息給隊列queuechannel.basicPublish("", queueName, MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes());} catch (IOException e) {e.printStackTrace();} catch (TimeoutException e) {e.printStackTrace();} finally {// 7.關閉連接if (channel != null && channel.isOpen()) {try {channel.close();} catch (IOException e) {e.printStackTrace();} catch (TimeoutException e) {e.printStackTrace();}}// 8.關閉通道if (connection != null && connection.isOpen()) {try {connection.close();} catch (IOException e) {e.printStackTrace();}}}} } package com.study.dyq.rabbitmq_01.simple;import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import com.rabbitmq.client.*;import java.io.IOException; import java.util.concurrent.TimeoutException;public class Cunsumer {public static void main(String[] args) {// 1.創建連接工程ConnectionFactory cf = new ConnectionFactory();cf.setHost("10.200.4.126");cf.setPort(5672);cf.setUsername("admintest");cf.setPassword("admintest");cf.setVirtualHost("/");Connection connection = null;// 聲明連接Channel channel = null;// 聲明通道try {// 2.創建連接connectionconnection = cf.newConnection("生產者");// 3.通過連接獲取通道Channelchannel = connection.createChannel();// 4.通過通過創建交換機,聲明隊列,綁定關系、路由key,發送消息和接收消息String queueName = "queue1";channel.basicConsume(queueName, true, new DeliverCallback() {@Overridepublic void handle(String s, Delivery delivery) throws IOException {System.out.println("收到的消息是:"+new String(delivery.getBody(), "utf-8"));}}, new CancelCallback() {@Overridepublic void handle(String s) throws IOException {System.out.println("收到消息失敗了!");}});} catch (IOException e) {e.printStackTrace();} catch (TimeoutException e) {e.printStackTrace();} finally {// 7.關閉連接if (channel != null && channel.isOpen()) {try {channel.close();} catch (IOException e) {e.printStackTrace();} catch (TimeoutException e) {e.printStackTrace();}}// 8.關閉通道if (connection != null && connection.isOpen()) {try {connection.close();} catch (IOException e) {e.printStackTrace();}}}} }1.簡單模式(simple)
[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-ome12qut-1624005504840)(C:\Users\Administrator\AppData\Roaming\Typora\typora-user-images\image-20210618095504495.png)]
1.生產者服務:
package com.study.dyq.rabbitmq_02.configration;import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration;/*** @author dyq* @description 一對一簡單模式* @data 2021/6/18*/ @Configuration public class SimpleRabbitConfigration {@Beanpublic Queue simpleQueue(){return new Queue("queue_simple");} } /*** 簡單模式*/public void addSimpleOrder(String userId, String productId, int num, String queueSimple){// 保存訂單String orderId = UUID.randomUUID().toString();System.out.println("創建生產者成功,訂單號為:"+orderId);// 通知其他服務,通過mq來完成rabbitTemplate.convertAndSend(queueSimple, userId+"買了"+num+"套"+productId+",訂單號:"+orderId);}2.消費者服務:
package com.study.dyq.rabbitmq_03.service.simple;import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Service;/*** @author dyq* @description simple模擬測試* @data 2021/6/17*/ @Service @RabbitListener(queues = {"queue_simple"}) public class smsSimpleService {@RabbitHandlerpublic void receiveMessage(String message){System.out.println("sms simple-----接收到了訂單的信息是:"+message);} }2.工作模式(work)
主要有兩種模式:
[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-nM9QsMjI-1624005504841)(C:\Users\Administrator\AppData\Roaming\Typora\typora-user-images\image-20210618094917971.png)]
3.發布/訂閱模式(fanout)
[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-okhSYFmF-1624005504841)(C:\Users\Administrator\AppData\Roaming\Typora\typora-user-images\image-20210617151719526.png)]
sprintboot下的整合:
1.生產者服務:
<!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-amqp --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-web --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency> package com.study.dyq.rabbitmq_02.configration;import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.FanoutExchange; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration;/*** @author dyq* @description rebbitmq消息中間件配置類* @data 2021/6/17*/ @Configuration public class RabbitConfigration {/*** 1.聲明注冊lanout模式的交換機* @return*/@Beanpublic FanoutExchange fanoutExchange(){return new FanoutExchange("fanout_order_exchange", true, false);}/*** 2.聲明隊列*/@Beanpublic Queue smsQueue(){// @param1 隊列名稱 @param2 是否持久化return new Queue("smsQueue", true);}@Beanpublic Queue duanxinQueue(){// @param1 隊列名稱 @param2 是否持久化return new Queue("duanxinQueue", true);}@Beanpublic Queue emailQueue(){// @param1 隊列名稱 @param2 是否持久化return new Queue("emailQueue", true);}// 3.設置綁定關系@Beanpublic Binding smsBinding(){return BindingBuilder.bind(smsQueue()).to(fanoutExchange());}@Beanpublic Binding duanxinBinding(){return BindingBuilder.bind(duanxinQueue()).to(fanoutExchange());}@Beanpublic Binding emailBinding(){return BindingBuilder.bind(emailQueue()).to(fanoutExchange());} } package com.study.dyq.rabbitmq_02.service;import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service;import java.util.UUID;/*** @author dyq* @description 模擬訂單類* @data 2021/6/17*/ @Service public class OrderService {@Autowiredprivate RabbitTemplate rabbitTemplate;/****/public void addOrder(String userId, String productId, int num){// 保存訂單String orderId = UUID.randomUUID().toString();System.out.println("創建生產者成功,訂單號為:"+orderId);// 通知其他服務,通過mq來完成String exchangeName = "fanout_order_exchange";rabbitTemplate.convertAndSend(exchangeName, "", userId+"買了"+num+"套"+productId+",訂單號:"+orderId);} } # 服務端口 server:port: 8091# 配置rabbitmq服務 spring:rabbitmq:username: admintestpassword: admintestvirtual-host: /host: 10.200.4.126port: 56722.消費者服務
<!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-amqp --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-web --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency> package com.study.dyq.rabbitmq_03.service;import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Service;/*** @author dyq* @description fanout模擬測試* @data 2021/6/17*/ @Service @RabbitListener(queues = {"duanxinQueue"}) public class duanxinFanoutService {@RabbitHandlerpublic void receiveMessage(String message){System.out.println("duanxin fanout-----接收到了訂單的信息是:"+message);} } package com.study.dyq.rabbitmq_03.service;import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Service;/*** @author dyq* @description fanout模擬測試* @data 2021/6/17*/ @Service @RabbitListener(queues = {"emailQueue"}) public class emailFanoutService {@RabbitHandlerpublic void receiveMessage(String message){System.out.println("email fanout-----接收到了訂單的信息是:"+message);} } package com.study.dyq.rabbitmq_03.service;import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Service;/*** @author dyq* @description fanout模擬測試* @data 2021/6/17*/ @Service @RabbitListener(queues = {"smsQueue"}) public class smsFanoutService {@RabbitHandlerpublic void receiveMessage(String message){System.out.println("sms fanout-----接收到了訂單的信息是:"+message);} }4.路由模式(routing)
[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-HUhVFilj-1624005504842)(C:\Users\Administrator\AppData\Roaming\Typora\typora-user-images\image-20210617172836192.png)]
1.生產者服務
package com.study.dyq.rabbitmq_02.configration;import com.rabbitmq.client.AMQP; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.DirectExchange; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration;/*** @author dyq* @description rabbit測試direct模式* @data 2021/6/17*/ @Configuration public class DirectRabbitConfigration {/*** 1.聲明注冊direct模式的交換機* @return*/@Beanpublic DirectExchange directExchange(){return new DirectExchange("direct_order_exchange", true, false);}/*** 2.聲明隊列*/@Beanpublic Queue smsDirectQueue(){// @param1 隊列名稱 @param2 是否持久化return new Queue("smsDirectQueue", true);}@Beanpublic Queue duanxinDirectQueue(){// @param1 隊列名稱 @param2 是否持久化return new Queue("duanxinDirectQueue", true);}@Beanpublic Queue emailDirectQueue(){// @param1 隊列名稱 @param2 是否持久化return new Queue("emailDirectQueue", true);}// 3.設置綁定關系@Beanpublic Binding smsDirectBinding(){return BindingBuilder.bind(smsDirectQueue()).to(directExchange()).with("sms");}@Beanpublic Binding duanxinDirectBinding(){return BindingBuilder.bind(duanxinDirectQueue()).to(directExchange()).with("duanxin");}@Beanpublic Binding emailDirectBinding(){return BindingBuilder.bind(emailDirectQueue()).to(directExchange()).with("email");} } public void addDirectOrder(String userId, String productId, int num, String exchangeName){// 保存訂單String orderId = UUID.randomUUID().toString();System.out.println("創建生產者成功,訂單號為:"+orderId);// 通知其他服務,通過mq來完成rabbitTemplate.convertAndSend(exchangeName, "sms", userId+"買了"+num+"套"+productId+",訂單號:"+orderId);rabbitTemplate.convertAndSend(exchangeName, "email", userId+"買了"+num+"套"+productId+",訂單號:"+orderId);}2.消費者服務
package com.study.dyq.rabbitmq_03.service.direct;import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Service;/*** @author dyq* @description fanout模擬測試* @data 2021/6/17*/ @Service @RabbitListener(queues = {"smsDirectQueue"}) public class SmsDirectService {@RabbitHandlerpublic void receiveMessage(String message){System.out.println("sms direct-----接收到了訂單的信息是:"+message);} } package com.study.dyq.rabbitmq_03.service.direct;import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Service;/*** @author dyq* @description fanout模擬測試* @data 2021/6/17*/ @Service @RabbitListener(queues = {"emailDirectQueue"}) public class EmailDirectService {@RabbitHandlerpublic void receiveMessage(String message){System.out.println("email Direct-----接收到了訂單的信息是:"+message);} } package com.study.dyq.rabbitmq_03.service.direct;import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Service;/*** @author dyq* @description fanout模擬測試* @data 2021/6/17*/ @Service @RabbitListener(queues = {"duanxinDirectQueue"}) public class DuanxinDirectService {@RabbitHandlerpublic void receiveMessage(String message){System.out.println("duanxin Direct-----接收到了訂單的信息是:"+message);} }5.主題模式(topic)
[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-elytWTEG-1624005504843)(C:\Users\Administrator\AppData\Roaming\Typora\typora-user-images\image-20210617172903060.png)]
" * " : 表示任何一個詞
" # ": 表示0或1個詞
四、集群搭建
五、分布式事務
六、常見問題匯總
總結
以上是生活随笔為你收集整理的rabbitmq知识汇总的全部內容,希望文章能夠幫你解決所遇到的問題。
 
                            
                        - 上一篇: 获取当前日期上周的周一和周日日期
- 下一篇: break
