慕课网_《RabbitMQ消息中间件极速入门与实战》学习总结
生活随笔
收集整理的這篇文章主要介紹了
慕课网_《RabbitMQ消息中间件极速入门与实战》学习总结
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
慕課網《RabbitMQ消息中間件極速入門與實戰》學習總結
- 時間:2018年09月05日星期三
- 說明:本文部分內容均來自慕課網。@慕課網:https://www.imooc.com
- 教學源碼:無
- 學習源碼:https://github.com/zccodere/s...
第一章:RabbitMQ起步
1-1 課程導航
課程導航
- RabbitMQ簡介及AMQP協議
- RabbitMQ安裝與使用
- RabbitMQ核心概念
- 與SpringBoot整合
- 保障100%的消息可靠性投遞方案落地實現
1-2 RabbitMQ簡介
初識RabbitMQ
- RabbitMQ是一個開源的消息代理和隊列服務器
- 用來通過普通協議在完全不同的應用之間共享數據
- RabbitMQ是使用Erlang語言來編寫的
- 并且RabbitMQ是基于AMQP協議的
RabbitMQ簡介
- 目前很多互聯網大廠都在使用RabbitMQ
- RabbitMQ底層采用Erlang語言進行編寫
- 開源、性能優秀,穩定性保障
- 與SpringAMQP完美的整合、API豐富
- 集群模式豐富,表達式配置,HA模式,鏡像隊列模型
- 保證數據不丟失的前提做到高可靠性、可用性
- AMQP全稱:Advanced Message Queuing Protocol
- AMQP翻譯:高級消息隊列協議
AMQP協議模型
1-3 RabbitMQ安裝
學習筆記
0.安裝準備 官網地址:http://www.rabbitmq.com/ 安裝Linux必要依賴包<Linux7> 下載RabbitMQ安裝包 進行安裝,修改相關配置文件 vim /etc/hostname vim /etc/hosts1.安裝Erlang wget https://packages.erlang-solutions.com/erlang-solutions_1.0_all.deb sudo dpkg -i erlang-solutions_1.0_all.deb sudo apt-get install erlang erlang-nox2.安裝RabbitMQ echo 'deb http://www.rabbitmq.com/debian/ testing main' | sudo tee /etc/apt/sources.list.d/rabbitmq.list wget -O- https://www.rabbitmq.com/rabbitmq-release-signing-key.asc | sudo apt-key add - sudo apt-get install rabbitmq-server3.安裝RabbitMQ web管理插件 sudo rabbitmq-plugins enable rabbitmq_management sudo systemctl restart rabbitmq-server 訪問:http://localhost:15672 默認用戶名密碼:guest/guest4.修改RabbitMQ配置 vim /usr/lib/rabbitmq/lib/rabbitmq_server-3.5.7/ebin/rabbit.app 比如修改密碼、配置等等;例如:loopback_users中的<<"guest">>,只保留guest 服務啟動:rabbitmq-server start & 服務停止:rabbitmqctl app_stop1-4 RabbitMQ概念
RabbitMQ的整體架構
RabbitMQ核心概念
- Server:又稱Broker,接受客戶端的連接,實現AMQP實體服務
- Connection:連接,應用程序與Broker的網絡連接
-
Channel:網絡信道
幾乎所有的操作都在Channel中進行
Channel是進行消息讀寫的通道
客戶端可建立多個Channel
每個Channel代表一個會話任務 -
Message:消息
服務器和應用程序之間傳送的數據,由Properties和Body組成
Properties可以對消息進行修飾,比如消息的優先級、延遲等高級特性
Body則就是消息體內容 -
Virtual host:虛擬機
用于進行邏輯隔離,最上層的消息路由
一個Virtual host里面可以有若干個Exchange和Queue
同一個Virtual host里面不能有相同名稱的Exchange或Queue - Exchange:交換機,接收消息,根據路由鍵轉發消息到綁定的隊列
- Binding:Exchange和Queue之間的虛擬連接,binding中可以包含routing key
- Routing key:一個路由規則,虛擬機可用它來確定如何路由一個特定消息
- Queue:也稱為Message Queue,消息隊列,保存消息并將它們轉發給消費者
RabbitMQ消息的流轉過程
第二章:RabbitMQ使用
2-1 發送消息
SpringBoot與RabbitMQ集成
- 引入相關依賴
- 對application.properties進行配置
創建名為rabbitmq-producer的maven工程pom如下
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><parent><artifactId>47-rabbitmq</artifactId><groupId>com.myimooc</groupId><version>1.0-SNAPSHOT</version></parent><modelVersion>4.0.0</modelVersion><artifactId>rabbitmq-producer</artifactId><properties><spring.boot.version>2.0.4.RELEASE</spring.boot.version></properties><dependencyManagement><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-parent</artifactId><version>${spring.boot.version}</version><type>pom</type><scope>import</scope></dependency></dependencies></dependencyManagement><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><!--RabbitMQ依賴--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><!--工具類依賴--><dependency><groupId>org.apache.commons</groupId><artifactId>commons-lang3</artifactId></dependency><dependency><groupId>commons-io</groupId><artifactId>commons-io</artifactId><version>2.5</version></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.36</version></dependency><dependency><groupId>javax.servlet</groupId><artifactId>javax.servlet-api</artifactId><scope>provided</scope></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-api</artifactId></dependency><dependency><groupId>log4j</groupId><artifactId>log4j</artifactId><version>1.2.17</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin></plugins></build></project>1.編寫Order類
package com.myimooc.rabbitmq.entity;import java.io.Serializable;/*** <br>* 標題: 訂單實體<br>* 描述: 訂單實體<br>* 時間: 2018/09/06<br>** @author zc*/ public class Order implements Serializable{private static final long serialVersionUID = 6771608755338249746L;private String id;private String name;/*** 存儲消息發送的唯一標識*/private String messageId;public Order() {}public Order(String id, String name, String messageId) {this.id = id;this.name = name;this.messageId = messageId;}@Overridepublic String toString() {return "Order{" +"id='" + id + ''' +", name='" + name + ''' +", messageId='" + messageId + ''' +'}';}public String getId() {return id;}public void setId(String id) {this.id = id;}public String getName() {return name;}public void setName(String name) {this.name = name;}public String getMessageId() {return messageId;}public void setMessageId(String messageId) {this.messageId = messageId;} }2.編寫OrderSender類
package com.myimooc.rabbitmq.producer.producer;import com.myimooc.rabbitmq.entity.Order; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.support.CorrelationData; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component;/*** <br>* 標題: 訂單消息發送者<br>* 描述: 訂單消息發送者<br>* 時間: 2018/09/06<br>** @author zc*/ @Component public class OrderSender {private RabbitTemplate rabbitTemplate;@Autowiredpublic OrderSender(RabbitTemplate rabbitTemplate) {this.rabbitTemplate = rabbitTemplate;}/*** 發送訂單** @param order 訂單* @throws Exception 異常*/public void send(Order order) throws Exception {CorrelationData correlationData = new CorrelationData();correlationData.setId(order.getMessageId());// exchange:交換機// routingKey:路由鍵// message:消息體內容// correlationData:消息唯一IDthis.rabbitTemplate.convertAndSend("order-exchange", "order.a", order, correlationData);}}3.編寫application.properties類
# RabbitMQ配置 spring.rabbitmq.addresses=192.168.0.105:5672 spring.rabbitmq.username=guest spring.rabbitmq.password=guest spring.rabbitmq.virtual-host=/ spring.rabbitmq.connection-timeout=15000# Server配置 server.servlet.context-path=/ server.port=8080spring.http.encoding.charset=UTF-8 spring.jackson.date-format=yyyy-MM-dd HH:mm:ss spring.jackson.time-zone=GMT+8 spring.jackson.default-property-inclusion=NON_NULL4.編寫Application類
package com.myimooc.rabbitmq.producer;import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication;/*** <br>* 標題: 啟動類<br>* 描述: 啟動類<br>* 時間: 2018/09/06<br>** @author zc*/ @SpringBootApplication public class Application {public static void main(String[] args) {SpringApplication.run(Application.class, args);}}5.編寫OrderSenderTest類
package com.myimooc.rabbitmq.producer.producer;import com.myimooc.rabbitmq.entity.Order; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner;import java.util.UUID;/*** <br>* 標題: 訂單消息發送者測試<br>* 描述: 訂單消息發送者測試<br>* 時間: 2018/09/06<br>** @author zc*/ @RunWith(SpringRunner.class) @SpringBootTest public class OrderSenderTest {@Autowiredprivate OrderSender orderSender;@Testpublic void testSend1() throws Exception {Order order = new Order();order.setId("201809062009010001");order.setName("測試訂單1");order.setMessageId(System.currentTimeMillis() + "$" + UUID.randomUUID().toString().replaceAll("-",""));this.orderSender.send(order);} }2-2 處理消息
創建名為rabbitmq-consumer的maven工程pom如下
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><parent><artifactId>47-rabbitmq</artifactId><groupId>com.myimooc</groupId><version>1.0-SNAPSHOT</version></parent><modelVersion>4.0.0</modelVersion><artifactId>rabbitmq-consumer</artifactId><properties><spring.boot.version>2.0.4.RELEASE</spring.boot.version></properties><dependencyManagement><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-parent</artifactId><version>${spring.boot.version}</version><type>pom</type><scope>import</scope></dependency></dependencies></dependencyManagement><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><!--RabbitMQ依賴--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><!--工具類依賴--><dependency><groupId>org.apache.commons</groupId><artifactId>commons-lang3</artifactId></dependency><dependency><groupId>commons-io</groupId><artifactId>commons-io</artifactId><version>2.5</version></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.36</version></dependency><dependency><groupId>javax.servlet</groupId><artifactId>javax.servlet-api</artifactId><scope>provided</scope></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-api</artifactId></dependency><dependency><groupId>log4j</groupId><artifactId>log4j</artifactId><version>1.2.17</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin></plugins></build></project>1.編寫Order類
package com.myimooc.rabbitmq.entity;import java.io.Serializable;/*** <br>* 標題: 訂單實體<br>* 描述: 訂單實體<br>* 時間: 2018/09/06<br>** @author zc*/ public class Order implements Serializable{private static final long serialVersionUID = 6771608755338249746L;private String id;private String name;/*** 存儲消息發送的唯一標識*/private String messageId;public Order() {}public Order(String id, String name, String messageId) {this.id = id;this.name = name;this.messageId = messageId;}@Overridepublic String toString() {return "Order{" +"id='" + id + ''' +", name='" + name + ''' +", messageId='" + messageId + ''' +'}';}public String getId() {return id;}public void setId(String id) {this.id = id;}public String getName() {return name;}public void setName(String name) {this.name = name;}public String getMessageId() {return messageId;}public void setMessageId(String messageId) {this.messageId = messageId;} }2.編寫OrderReceiver類
package com.myimooc.rabbitmq.consumer.consumer;import com.rabbitmq.client.Channel; import com.myimooc.rabbitmq.entity.Order; import org.springframework.amqp.rabbit.annotation.*; import org.springframework.amqp.support.AmqpHeaders; import org.springframework.messaging.handler.annotation.Headers; import org.springframework.messaging.handler.annotation.Payload; import org.springframework.stereotype.Component;import java.util.Map;/*** <br>* 標題: 訂單接收者<br>* 描述: 訂單接收者<br>* 時間: 2018/09/06<br>** @author zc*/ @Component public class OrderReceiver {/*** 接收消息** @param order 消息體內容* @param headers 消息頭內容* @param channel 網絡信道* @throws Exception 異常*/@RabbitListener(bindings = @QueueBinding(value = @Queue(value = "order-queue",durable = "true"),exchange = @Exchange(name = "order-exchange",type = "topic"),key = "order.*"))@RabbitHandlerpublic void onOrderMessage(@Payload Order order, @Headers Map<String, Object> headers, Channel channel) throws Exception {// 消費者操作System.out.println("收到消息:");System.out.println("訂單信息:" + order.toString());// 手動簽收消息Long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);channel.basicAck(deliveryTag, false);} }3.編寫application.properties類
# RabbitMQ連接配置 spring.rabbitmq.addresses=192.168.0.105:5672 spring.rabbitmq.username=guest spring.rabbitmq.password=guest spring.rabbitmq.virtual-host=/ spring.rabbitmq.connection-timeout=15000 # RabbitMQ消費配置 # 基本并發:5 spring.rabbitmq.listener.simple.concurrency=5 # 最大并發:10 spring.rabbitmq.listener.simple.max-concurrency=10 # 簽收模式:手動簽收 spring.rabbitmq.listener.simple.acknowledge-mode=manual # 限流策略:同一時間只有1條消息發送過來消費 spring.rabbitmq.listener.simple.prefetch=1# Server配置 server.servlet.context-path=/ server.port=8082spring.http.encoding.charset=UTF-8 spring.jackson.date-format=yyyy-MM-dd HH:mm:ss spring.jackson.time-zone=GMT+8 spring.jackson.default-property-inclusion=NON_NULL4.編寫Application類
package com.myimooc.rabbitmq.consumer;import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication;/*** <br>* 標題: 啟動類<br>* 描述: 啟動類<br>* 時間: 2018/09/06<br>** @author zc*/ @SpringBootApplication public class Application {public static void main(String[] args) {SpringApplication.run(Application.class, args);}}第三章:可靠性投遞
3-1 設計方案
保障100%消息投遞成功設計方案(一)
3-2 代碼詳解
因篇幅限制,源碼請到github地址查看,這里僅展示核心關鍵類
1.編寫OrderSender類
package com.myimooc.rabbitmq.ha.producer;import com.myimooc.rabbitmq.entity.Order; import com.myimooc.rabbitmq.ha.constant.Constants; import com.myimooc.rabbitmq.ha.dao.mapper.BrokerMessageLogMapper; import com.myimooc.rabbitmq.ha.dao.po.BrokerMessageLogPO; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.support.CorrelationData; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component;/*** <br>* 標題: 訂單消息發送者<br>* 描述: 訂單消息發送者<br>* 時間: 2018/09/06<br>** @author zc*/ @Component public class OrderSender {@Autowiredprivate RabbitTemplate rabbitTemplate;@Autowiredprivate BrokerMessageLogMapper brokerMessageLogMapper;/*** 回調方法:confirm確認*/private final RabbitTemplate.ConfirmCallback confirmCallback = new RabbitTemplate.ConfirmCallback() {@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {System.out.println("correlationData:" + correlationData);String messageId = correlationData.getId();if (ack) {// 如果confirm返回成功,則進行更新BrokerMessageLogPO messageLogPO = new BrokerMessageLogPO();messageLogPO.setMessageId(messageId);messageLogPO.setStatus(Constants.OrderSendStatus.SEND_SUCCESS);brokerMessageLogMapper.changeBrokerMessageLogStatus(messageLogPO);} else {// 失敗則進行具體的后續操作:重試或者補償等System.out.println("異常處理...");}}};/*** 發送訂單** @param order 訂單*/public void send(Order order) {// 設置回調方法this.rabbitTemplate.setConfirmCallback(confirmCallback);// 消息IDCorrelationData correlationData = new CorrelationData(order.getMessageId());// 發送消息this.rabbitTemplate.convertAndSend("order-exchange", "order.a", order, correlationData);} }2.編寫OrderService類
package com.myimooc.rabbitmq.ha.service;import com.myimooc.rabbitmq.entity.Order; import com.myimooc.rabbitmq.ha.constant.Constants; import com.myimooc.rabbitmq.ha.dao.mapper.BrokerMessageLogMapper; import com.myimooc.rabbitmq.ha.dao.mapper.OrderMapper; import com.myimooc.rabbitmq.ha.dao.po.BrokerMessageLogPO; import com.myimooc.rabbitmq.ha.producer.OrderSender; import com.myimooc.rabbitmq.ha.util.FastJsonConvertUtils; import org.apache.commons.lang3.time.DateUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service;import java.util.Date;/*** <br>* 標題: 訂單服務<br>* 描述: 訂單服務<br>* 時間: 2018/09/07<br>** @author zc*/ @Service public class OrderService {@Autowiredprivate OrderMapper orderMapper;@Autowiredprivate BrokerMessageLogMapper brokerMessageLogMapper;@Autowiredprivate OrderSender orderSender;/*** 創建訂單** @param order 訂單*/public void create(Order order) {// 當前時間Date orderTime = new Date();// 業務數據入庫this.orderMapper.insert(order);// 消息日志入庫BrokerMessageLogPO messageLogPO = new BrokerMessageLogPO();messageLogPO.setMessageId(order.getMessageId());messageLogPO.setMessage(FastJsonConvertUtils.convertObjectToJson(order));messageLogPO.setTryCount(0);messageLogPO.setStatus(Constants.OrderSendStatus.SENDING);messageLogPO.setNextRetry(DateUtils.addMinutes(orderTime, Constants.ORDER_TIMEOUT));this.brokerMessageLogMapper.insert(messageLogPO);// 發送消息this.orderSender.send(order);} }3.編寫RetryMessageTask類
package com.myimooc.rabbitmq.ha.task;import com.myimooc.rabbitmq.entity.Order; import com.myimooc.rabbitmq.ha.constant.Constants; import com.myimooc.rabbitmq.ha.dao.mapper.BrokerMessageLogMapper; import com.myimooc.rabbitmq.ha.dao.po.BrokerMessageLogPO; import com.myimooc.rabbitmq.ha.producer.OrderSender; import com.myimooc.rabbitmq.ha.util.FastJsonConvertUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component;import java.util.List;/*** <br>* 標題: 重發消息定時任務<br>* 描述: 重發消息定時任務<br>* 時間: 2018/09/07<br>** @author zc*/ @Component public class RetryMessageTask {private Logger logger = LoggerFactory.getLogger(getClass());@Autowiredprivate OrderSender orderSender;@Autowiredprivate BrokerMessageLogMapper brokerMessageLogMapper;/*** 啟動完成3秒后開始執行,每隔10秒執行一次*/@Scheduled(initialDelay = 3000, fixedDelay = 10000)public void retrySend() {logger.debug("重發消息定時任務開始");// 查詢 status = 0 和 timeout 的消息日志List<BrokerMessageLogPO> pos = this.brokerMessageLogMapper.listSendFailureAndTimeoutMessage();for (BrokerMessageLogPO po : pos) {logger.debug("處理消息日志:{}",po);if (po.getTryCount() >= Constants.MAX_RETRY_COUNT) {// 更新狀態為失敗BrokerMessageLogPO messageLogPO = new BrokerMessageLogPO();messageLogPO.setMessageId(po.getMessageId());messageLogPO.setStatus(Constants.OrderSendStatus.SEND_FAILURE);this.brokerMessageLogMapper.changeBrokerMessageLogStatus(messageLogPO);} else {// 進行重試,重試次數+1this.brokerMessageLogMapper.updateRetryCount(po);Order reSendOrder = FastJsonConvertUtils.convertJsonToObject(po.getMessage(), Order.class);try {this.orderSender.send(reSendOrder);} catch (Exception ex) {// 異常處理logger.error("消息發送異常:{}", ex);}}}logger.debug("重發消息定時任務結束");} }4.編寫ApplicationTest類
package com.myimooc.rabbitmq.ha;import com.myimooc.rabbitmq.entity.Order; import com.myimooc.rabbitmq.ha.service.OrderService; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner;import java.util.UUID;/*** <br>* 標題: 訂單創建測試<br>* 描述: 訂單創建測試<br>* 時間: 2018/09/07<br>** @author zc*/ @RunWith(SpringRunner.class) @SpringBootTest public class ApplicationTest {@Autowiredprivate OrderService orderService;@Testpublic void testCreateOrder(){Order order = new Order();order.setId(String.valueOf(System.currentTimeMillis()));order.setName("測試創建訂單");order.setMessageId(System.currentTimeMillis() + "$" + UUID.randomUUID().toString().replaceAll("-",""));this.orderService.create(order);}}總結
以上是生活随笔為你收集整理的慕课网_《RabbitMQ消息中间件极速入门与实战》学习总结的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: python基础学习11----函数
- 下一篇: [PHP] 算法-合并两个有序链表为一个