Go 学习笔记(57)— Go 第三方库之 amqp (RabbitMQ 生产者、消费者整个流程)
1. 安裝 rabbitmq 的 golang 包
golang 可使用庫 github.com/streadway/amqp 操作 rabbitmq 。使用下面命令安裝 RabbitMQ 。
go get -v github.com/streadway/amqp
2. 生產者流程
在 Golang 中創建 rabbitmq 生產者基本步驟是:
- 連接 Connection
- 創建 Channel
- 創建或連接一個交換器
- 創建或連接一個隊列
- 交換器綁定隊列
- 投遞消息
- 關閉 Channel
- 關閉 Connection
2.1 創建連接
// connection
connection, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
2.2 創建通道
// channel
channel, err := connection.Channel()
2.3 創建交換器
err = channel.ExchangeDeclare("e1", "direct", true, false, false, true, nil)
參數依次說明:
name交換機名稱kind交換機類型durable持久化標識autoDelete是否自動刪除internal是否是內置交換機noWait是否等待服務器確認args其它配置
參數說明要點:
autoDelete:
自動刪除功能必須要在交換器曾經綁定過隊列或者交換器的情況下,處于不再使用的時候才會自動刪除,如果是剛剛創建的尚未綁定隊列或者交換器的交換器或者早已創建只是未進行隊列或者交換器綁定的交換器是不會自動刪除的。
internal:
內置交換器是一種特殊的交換器,這種交換器不能直接接收生產者發送的消息,只能作為類似于隊列的方式綁定到另一個交換器,來接收這個交換器中路由的消息,內置交換器同樣可以綁定隊列和路由消息,只是其接收消息的來源與普通交換器不同。
noWait
當 noWait 為 true 時,聲明時無需等待服務器的確認。
該通道可能由于錯誤而關閉。 添加一個 NotifyClose 偵聽器應對任何異常。創建交換器還有一個差不多的方法( ExchangeDeclarePassive ),他主要是假定交換已存在,并嘗試連接到不存在的交換將導致 RabbitMQ 引發異常,可用于檢測交換器的存在。
2.4 創建隊列
q, err := channel.QueueDeclare("q1", true, false, false, true, nil)
參數說明:
name隊列名稱durable持久化autoDelete自動刪除exclusive排他noWait是否等待服務器確認args Table
參數說明要點:
exclusive排他
排他隊列只對首次創建它的連接可見,排他隊列是基于連接( Connection )可見的,并且該連接內的所有信道( Channel)都可以訪問這個排他隊列,在這個連接斷開之后,該隊列自動刪除,由此可見這個隊列可以說是綁到連接上的,對同一服務器的其他連接不可見。
同一連接中不允許建立同名的排他隊列的這種排他優先于持久化,即使設置了隊列持久化,在連接斷開后,該隊列也會自動刪除。
非排他隊列不依附于連接而存在,同一服務器上的多個連接都可以訪問這個隊列。
autoDelete設置是否自動刪除。
為 true 則設置隊列為自動刪除。
自動刪除的前提是:至少有一個消費者連接到這個隊列,之后所有與這個隊列連接的消費者都斷開時,才會自動刪除。
不能把這個參數錯誤地理解為:“當連接到此隊列的所有客戶端斷開時,這個隊列自動刪除”,因為生產者客戶端創建這個隊列,或者沒有消費者客戶端與這個隊列連接時,都不會自動刪除這個隊列。
創建隊列還有一個差不多的方法( QueueDeclarePassive ),他主要是假定隊列已存在,并嘗試連接到不存在的隊列將導致 RabbitMQ 引發異常,可用于檢測隊列的存在。
2.5 綁定交換器和隊列
err = channel.QueueBind("q1", "q1Key", "e1", true, nil)
參數解析:
name隊列名稱key BindingKey根據交換機類型來設定exchange交換機名稱noWait是否等待服務器確認args Table
2.6 綁定交換器(可選)
err = channel.ExchangeBind("dest", "q1Key", "src", false, nil)
參數解析:
destination目的交換器key RoutingKey路由鍵source源交換器noWait是否等待服務器確認args Table其它參數
生產者發送消息至交換器 source 中,交換器 source 根據路由鍵找到與其匹配的另一個交換器 destination ,井把消息轉發到 destination 中,進而存儲在 destination 綁定的隊列 queue 中,某種程度上來說 destination 交換器可以看作一個隊列。如圖:
2.7 投遞消息
err = channel.Publish("e1", "q1Key", true, false, amqp.Publishing{Timestamp: time.Now(),DeliveryMode: amqp.Persistent, //Msg set as persistentContentType: "text/plain",Body: []byte("Hello Golang and AMQP(Rabbitmq)!"),
})
參數解析:
exchange交換器名稱key RouterKeymandatory是否為無法路由的消息進行返回處理immediate是否對路由到無消費者隊列的消息進行返回處理 RabbitMQ 3.0 廢棄msg消息體
參數說明要點:
mandatory
消息發布的時候設置消息的 mandatory 屬性用于設置消息在發送到交換器之后無法路由到隊列的情況對消息的處理方式,設置為 true 表示將消息返回到生產者,否則直接丟棄消息。
immediate
參數告訴服務器至少將該消息路由到一個隊列中,否則將消息返回給生產者。 imrnediate 參數告訴服務器,如果該消息關聯的隊列上有消費者,則立刻投遞:如果所有匹配的隊列上都沒有消費者,則直接將消息返還給生產者,不用將消息存入隊列而等待消費者了。
RabbitMQ 3.0版本開始去掉了對 imrnediate 參數的支持。
其中 amqp.Publishing 的 DeliveryMode 如果設為 amqp.Persistent 則消息會持久化。需要注意的是如果需要消息持久化 Queue 也是需要設定為持久化才有效。
3. 消費者流程
消費者的步驟和生產者流程基本類似,只是將生產者流程中的投遞消息變為消費消息。
Rabbitmq 消費方式共有 2 種,分別是推模式和拉模式。
3.1 推模式
推模式是通過持續訂閱的方式來消費信息, Consume 將信道( Channel )設置為接收模式,直到取消隊列的訂閱為止。在接收模式期間, RabbitMQ 會不斷地推送消息給消費者。推送消息的個數還是會受到 channel.Qos 的限制。
deliveries, err := channel.Consume("q1", "any", false, false, false, true, nil)
參數說明:
queue隊列名稱consumer消息者名稱autoAck是否確認消費exclusive排他noLocalnoWaitboolargs Table
參數說明要點:
noLocal
設置為 true 則表示不能將同一個 Connection 中生產者發送的消息傳送給這個 Connection 中的消費者
其中 autoAck 可以設置為 true 或者 false 。
- 如果設為
true則消費者一接收到就從queue中去除了,如果消費者處理消息中發生意外該消息就丟失了。 - 如果設為
false則消費者在處理完消息后,調用msg.Ack(false)后消息才從queue中去除。即便當前消費者處理該消息發生意外,只要沒有執行msg.Ack(false)那該消息就仍然在queue中,不會丟失。
如果autoAck設置為 false 則表示需要手動進行 ack 消費
v, ok := <-deliveries
if ok {// 手動ack確認// 注意: 這里只要調用了ack就是手動確認模式,// v.Ack的參數 multiple 表示的是在此channel中先前所有未確認的deliveries都將被確認// 并不是表示設置為false就不進行當前ack確認if err := v.Ack(true); err != nil {fmt.Println(err.Error())}
} else {fmt.Println("Channel close")
}
3.2 拉模式
相對來說比較簡單,是由消費者主動拉取信息來消費,每次只消費一條消息,同樣也需要進行 ack 確認消費。
channel.Get(queue string, autoAck bool)
參考:
https://studygolang.com/articles/25406
https://studygolang.com/articles/24699?fr=sidebar
https://github.com/rabbitmq/rabbitmq-tutorials/tree/master/go
總結
以上是生活随笔為你收集整理的Go 学习笔记(57)— Go 第三方库之 amqp (RabbitMQ 生产者、消费者整个流程)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 2022-2028年中国可降解聚乙烯农用
- 下一篇: 2022-2028年中国TPE手套行业市