RabbitMQ初识
生活随笔
收集整理的這篇文章主要介紹了
RabbitMQ初识
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
官方介紹 - 中文 本文環境:ubuntu:20.04
RabbitMQ安裝、配置與基本使用
安裝RabbitMQ
# 簡易腳本安裝 curl -s https://packagecloud.io/install/repositories/rabbitmq/rabbitmq-server/script.deb.sh | sudo bash sudo apt-get install rabbitmq-server -y --fix-missing # 啟動、關閉、重啟、查看 rabbitmq 服務 sudo service rabbitmq-server start、stop、restart、status配置RabbitMQ
# 注意,啟動rabbitmq之后要啟動管理服務插件,否則15672管理頁面無法登錄 sudo rabbitmq-plugins enable rabbitmq_management # 因為guest用戶默認只能在localhost登錄,所以我們需要創建一個新的用戶: [fxm@fxm:~$ ] sudo rabbitmqctl add_user fxm(username) fxm(password) Adding user "fxm" ... [fxm@fxm:~$ ] sudo rabbitmqctl set_permissions -p / fxm(username) ".*" ".*" ".*" Setting permissions for user "fxm" in vhost "/" ... [fxm@fxm:~$ ] sudo rabbitmqctl set_user_tags fxm(username) administrator Setting tags for user "fxm" to [administrator] ...使用RabbitMQ
- 通過web訪問: ip:15672
六種工作模式
“Hello World!”(普通模式)
只有一個生產者,一個消費者,一個隊列
工作隊列
只有一個生產者,多個消費者,一個隊列
發布/訂閱
通過交換機將消息發送到多個隊列,多個消費者訂閱相應隊列
路由
待續。。。
主題交換機
待續。。。
遠程過程調用
待續。。。
Go實現(Go 1.15)
本文實現了兩種模式-普通模式、發布/訂閱 普通模式:1. 連接失敗重新連接2. 發送成功確認3. 消費成功確認 發布/訂閱:1. 連接失敗重新連接2. 消費成功確認普通模式(需要先創建Queue)
創建Queue
rabbit_p.go
package rabbit_pimport ("errors""fmt""github.com/streadway/amqp" )const MqUrl = "amqp://fxm(username):fxm(password)@IP:5672/"// 定義全局變量,指針類型 var mqConn *amqp.Connection var mqChan *amqp.Channel// 定義生產者接口 type Producer interface {MsgContent() []byte }// 定義RabbitMQ對象 type RabbitMQ struct {Connection *amqp.ConnectionChannel *amqp.ChannelQueueName string // 隊列名稱RoutingKey string // key名稱ExchangeName string // 交換機名稱ExchangeType string // 交換機類型 }// 鏈接rabbitMQ func (r *RabbitMQ) mqConnect() error {var err errormqConn, err = amqp.Dial(MqUrl)r.Connection = mqConn // 賦值給RabbitMQ對象if err != nil {fmt.Printf("MQ打開鏈接失敗:%s \n", err)return err}mqChan, err = mqConn.Channel()r.Channel = mqChan // 賦值給RabbitMQ對象if err != nil {fmt.Printf("MQ打開管道失敗:%s \n", err)return err}return nil }// 關閉RabbitMQ連接 func (r *RabbitMQ) mqClose() {// 先關閉管道,再關閉鏈接err := r.Channel.Close()if err != nil {fmt.Printf("MQ管道關閉失敗:%s \n", err)}err = r.Connection.Close()if err != nil {fmt.Printf("MQ鏈接關閉失敗:%s \n", err)} }// 啟動RabbitMQ生產者 func (r *RabbitMQ) StartP(producer Producer) error {// 開啟監聽生產者發送任務if err := r.listenProducer(producer); err != nil {return err}return nil }// 發送任務 func (r *RabbitMQ) listenProducer(producer Producer) error {// 處理結束關閉鏈接defer r.mqClose()// 驗證鏈接是否正常,否則重新鏈接if r.Channel == nil {if err := r.mqConnect(); err != nil {return err}}err := r.Channel.Confirm(false)if err != nil {fmt.Printf("error: %s \n", err.Error())return err}confirms := r.Channel.NotifyPublish(make(chan amqp.Confirmation, 1))err = r.Channel.Publish(r.ExchangeName, // exchanger.QueueName, // routing keyfalse, // mandatoryfalse, // immediateamqp.Publishing{ContentType: "text/plain",Body: producer.MsgContent(),})if err != nil {fmt.Printf("MQ任務發送失敗:%s \n", err)return errors.New("MQ任務發送失敗")}err = confirmOne(confirms)return err }//檢測是否發送成功 func confirmOne(confirms <-chan amqp.Confirmation) error {if confirmed := <-confirms; confirmed.Ack {return nil} else {return errors.New("任務發送失敗")} }rabbit_c.go
package rabbit_cimport ("fmt""github.com/streadway/amqp" )const MqUrl = "amqp://fxm(username):fxm(password)@IP:5672/"// 定義全局變量,指針類型 var mqConn *amqp.Connection var mqChan *amqp.Channel// 定義接收者接口 type Receiver interface {Consumer([]byte) error }// 定義RabbitMQ對象 type RabbitMQ struct {Connection *amqp.ConnectionChannel *amqp.ChannelQueueName string // 隊列名稱RoutingKey string // key名稱ExchangeName string // 交換機名稱ExchangeType string // 交換機類型 }// 鏈接rabbitMQ func (r *RabbitMQ) mqConnect() error {var err errormqConn, err = amqp.Dial(MqUrl)r.Connection = mqConn // 賦值給RabbitMQ對象if err != nil {fmt.Printf("MQ打開鏈接失敗:%s \n", err)return err}mqChan, err = mqConn.Channel()r.Channel = mqChan // 賦值給RabbitMQ對象if err != nil {fmt.Printf("MQ打開管道失敗:%s \n", err)return err}return nil }// 關閉RabbitMQ連接 func (r *RabbitMQ) mqClose() {// 先關閉管道,再關閉鏈接err := r.Channel.Close()if err != nil {fmt.Printf("MQ管道關閉失敗:%s \n", err)}err = r.Connection.Close()if err != nil {fmt.Printf("MQ鏈接關閉失敗:%s \n", err)} }// 啟動RabbitMQ消費者 func (r *RabbitMQ) StartR(receiver Receiver) error {// 處理結束關閉鏈接defer r.mqClose()// 驗證鏈接是否正常if r.Channel == nil {if err := r.mqConnect(); err != nil {return err}}// 獲取消費通道,確保rabbitMQ一個一個發送消息_ = r.Channel.Qos(1, 0, true)msgList, err := r.Channel.Consume(r.QueueName, // queue"", // consumerfalse, // auto-ackfalse, // exclusivefalse, // no-localfalse, // no-waitnil, // args)if err != nil {fmt.Printf("獲取消費通道異常:%s \n", err)return err}for msg := range msgList {// 處理數據err = receiver.Consumer(msg.Body)if err != nil {fmt.Printf("確認消息未完成異常:%s \n", err)return err} else {// 確認消息,必須為falseerr = msg.Ack(false)if err != nil {fmt.Printf("確認消息完成異常:%s \n", err)}return nil}}return nil }test_p.go
package mainimport ("fmt""golong/rabbit_p" )// 實現發送者 type TestP struct {msgContent []byte }func (t *TestP) MsgContent() []byte {fmt.Println(string(t.msgContent))return t.msgContent }func main() {//生產者a := "fxm"p := &TestP{[]byte(a),}mqp := &rabbit_p.RabbitMQ{QueueName: "fxm",}err := mqp.StartP(p)if err != nil {fmt.Println("添加異常 !!!")} }test_c.go
package mainimport ("fmt""golong/rabbit_c" )// 實現接收者 type TestC struct {msgContent string }func (t *TestC) Consumer(dataByte []byte) error {fmt.Println(string(dataByte))return nil }func main() {//消費者r := &TestC{}mqr := &rabbit_c.RabbitMQ{QueueName: "fxm",}_ = mqr.StartR(r) }發布/訂閱
rabbit_p.go
package rabbit_pimport ("errors""fmt""github.com/streadway/amqp" )const MqUrl = "amqp://fxm(username):fxm(password)@IP:5672/"// 定義全局變量,指針類型 var mqConn *amqp.Connection var mqChan *amqp.Channel// 定義生產者接口 type Producer interface {MsgContent() []byte }// 定義RabbitMQ對象 type RabbitMQ struct {Connection *amqp.ConnectionChannel *amqp.ChannelQueueName string // 隊列名稱RoutingKey string // key名稱ExchangeName string // 交換機名稱ExchangeType string // 交換機類型 }// 鏈接rabbitMQ func (r *RabbitMQ) mqConnect() error {var err errormqConn, err = amqp.Dial(MqUrl)r.Connection = mqConn // 賦值給RabbitMQ對象if err != nil {fmt.Printf("MQ打開鏈接失敗:%s \n", err)return err}mqChan, err = mqConn.Channel()r.Channel = mqChan // 賦值給RabbitMQ對象if err != nil {fmt.Printf("MQ打開管道失敗:%s \n", err)return err}return nil }// 關閉RabbitMQ連接 func (r *RabbitMQ) mqClose() {// 先關閉管道,再關閉鏈接err := r.Channel.Close()if err != nil {fmt.Printf("MQ管道關閉失敗:%s \n", err)}err = r.Connection.Close()if err != nil {fmt.Printf("MQ鏈接關閉失敗:%s \n", err)} }// 啟動RabbitMQ生產者 func (r *RabbitMQ) StartP(producer Producer) error {// 開啟監聽生產者發送任務if err := r.listenProducer(producer); err != nil {return err}return nil }// 發送任務 func (r *RabbitMQ) listenProducer(producer Producer) error {// 處理結束關閉鏈接defer r.mqClose()// 驗證鏈接是否正常,否則重新鏈接if r.Channel == nil {if err := r.mqConnect(); err != nil {return err}}//發送消息到一個具名交換機err := r.Channel.ExchangeDeclare(r.ExchangeName, // namer.ExchangeType, // typetrue, // durablefalse, // auto-deletedfalse, // internalfalse, // no-waitnil, // arguments)if err != nil {fmt.Printf("error: %s \n", err.Error())return err}//confirms := r.Channel.NotifyPublish(make(chan amqp.Confirmation, 1))err = r.Channel.Publish(r.ExchangeName, // exchange"", // routing keyfalse, // mandatoryfalse, // immediateamqp.Publishing{ContentType: "text/plain",Body: producer.MsgContent(),})if err != nil {fmt.Printf("MQ任務發送失敗:%s \n", err)return errors.New("MQ任務發送失敗")}return err }rabbit_c.go
package rabbit_cimport ("fmt""github.com/streadway/amqp" )const MqUrl = "amqp://fxm(username):fxm(password)@IP:5672/"// 定義全局變量,指針類型 var mqConn *amqp.Connection var mqChan *amqp.Channel// 定義接收者接口 type Receiver interface {Consumer([]byte) error }// 定義RabbitMQ對象 type RabbitMQ struct {Connection *amqp.ConnectionChannel *amqp.ChannelQueueName string // 隊列名稱RoutingKey string // key名稱ExchangeName string // 交換機名稱ExchangeType string // 交換機類型 }// 鏈接rabbitMQ func (r *RabbitMQ) mqConnect() error {var err errormqConn, err = amqp.Dial(MqUrl)r.Connection = mqConn // 賦值給RabbitMQ對象if err != nil {fmt.Printf("MQ打開鏈接失敗:%s \n", err)return err}mqChan, err = mqConn.Channel()r.Channel = mqChan // 賦值給RabbitMQ對象if err != nil {fmt.Printf("MQ打開管道失敗:%s \n", err)return err}return nil }// 關閉RabbitMQ連接 func (r *RabbitMQ) mqClose() {// 先關閉管道,再關閉鏈接err := r.Channel.Close()if err != nil {fmt.Printf("MQ管道關閉失敗:%s \n", err)}err = r.Connection.Close()if err != nil {fmt.Printf("MQ鏈接關閉失敗:%s \n", err)} }// 啟動RabbitMQ消費者 func (r *RabbitMQ) StartR(receiver Receiver) error {// 處理結束關閉鏈接defer r.mqClose()// 驗證鏈接是否正常if r.Channel == nil {if err := r.mqConnect(); err != nil {return err}}//發送消息到一個具名交換機err := r.Channel.ExchangeDeclare(r.ExchangeName, // namer.ExchangeType, // typetrue, // durablefalse, // auto-deletedfalse, // internalfalse, // no-waitnil, // arguments)if err != nil {fmt.Printf("error: %s \n", err.Error())return err}//臨時隊列 讓服務器選擇一個隨機的隊列名,當與消費者斷開連接時,這個隊列被立即刪除q, err := r.Channel.QueueDeclare("", // namefalse, // durable:持久false, // delete when usused:至少有一個使用方的隊列在最后一個使用方退訂時被刪除true, // exclusive:僅由一個連接使用,并且該連接關閉時隊列將被刪除false, // no-waitnil, // arguments)if err != nil {fmt.Printf("error: %s \n", err.Error())return err}r.QueueName = q.Nameerr = r.Channel.QueueBind(r.QueueName, // queue name"", // routing keyr.ExchangeName, // exchangefalse,nil,)// 獲取消費通道,確保rabbitMQ一個一個發送消息_ = r.Channel.Qos(1, 0, true)msgList, err := r.Channel.Consume(r.QueueName, // queue"", // consumerfalse, // auto-ackfalse, // exclusivefalse, // no-localfalse, // no-waitnil, // args)if err != nil {fmt.Printf("獲取消費通道異常:%s \n", err)return err}for msg := range msgList {// 處理數據err = receiver.Consumer(msg.Body)if err != nil {fmt.Printf("確認消息未完成異常:%s \n", err)return err} else {// 確認消息,必須為falseerr = msg.Ack(false)if err != nil {fmt.Printf("確認消息完成異常:%s \n", err)}return nil}}return nil }test_p.go
package mainimport ("fmt""golong/rabbit_p" )// 實現發送者 type TestP struct {msgContent []byte }func (t *TestP) MsgContent() []byte {fmt.Println(string(t.msgContent))return t.msgContent }func main() {//生產者a := "fxm"p := &TestP{[]byte(a),}mqp := &rabbit_p.RabbitMQ{ExchangeName: "log",ExchangeType: "fanout",}err := mqp.StartP(p)if err != nil {fmt.Println("添加異常 !!!")} }test_c.go
package mainimport ("fmt""golong/rabbit_c" )// 實現接收者 type TestC struct {msgContent string }func (t *TestC) Consumer(dataByte []byte) error {fmt.Println(string(dataByte))return nil }func main() {//消費者r := &TestC{}mqr := &rabbit_c.RabbitMQ{ExchangeName: "log",ExchangeType: "fanout",}_ = mqr.StartR(r) }總結
以上是生活随笔為你收集整理的RabbitMQ初识的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Apollo进阶课程 ⑧ | 高精地图的
- 下一篇: NPFMSG.exe - NPFMSG是