rabbitmq 持久化_RabbitMQ原理与相关操作(三)消息持久化
現(xiàn)在聊一下RabbitMQ消息持久化:
問題及方案描述
1.當有多個消費者同時收取消息,且每個消費者在接收消息的同時,還要處理其它的事情,且會消耗很長的時間。在此過程中可能會出現(xiàn)一些意外,比如消息接收到一半的時候,一個消費者死掉了。
這種情況要使用消息接收確認機制,可以執(zhí)行上次宕機的消費者沒有完成的事情。
2.在默認情況下,我們程序創(chuàng)建的消息隊列以及存放在隊列里面的消息,都是非持久化的。當RabbitMQ死掉了或者重啟了,上次創(chuàng)建的隊列、消息都不會保存。
這種情況可以使用RabbitMQ提供的消息隊列的持久化機制。
相關(guān)理論描述
RabbitMQ支持消息的持久化,也就是數(shù)據(jù)寫在磁盤上,為了數(shù)據(jù)安全考慮,我個人覺得大多數(shù)開發(fā)人員都會選擇持久化。
隊列和交換機有一個創(chuàng)建時候指定的標志durable。durable的唯一含義就是具有這個標志的隊列和交換機會在重啟之后重新建立,它不表示說在隊列當中的消息會在重啟后恢復(fù)。
消息隊列持久化包括3個部分:
1、exchange持久化,在聲明時指定durable => true2、queue持久化,在聲明時指定durable => true3、消息持久化,在投遞時指定delivery_mode=> 2(1是非持久化)
如果exchange和queue都是持久化的,那么它們之間的binding也是持久化的。如果exchange和queue兩者之間有一個持久化,一個非持久化,就不允許建立綁定。
注意:一旦創(chuàng)建了隊列和交換機,就不能修改其標志了。例如,如果創(chuàng)建了一個non-durable的隊列,然后想把它改變成durable的,唯一的辦法就是刪除這個隊列然后重現(xiàn)創(chuàng)建。
程序示例
生產(chǎn)者
class Producter { const string ExchangeName = "eric.exchange"; const string QueueName = "eric.queue"; static void Main(string[] args) { var factory = new ConnectionFactory() { HostName = "localhost", UserName = "eric", Password = "123456", }; using (var connection = factory.CreateConnection()) using (var channel = connection.CreateModel()) { channel.ExchangeDeclare(ExchangeName, "direct", durable: true, autoDelete: false, arguments: null);//聲明消息隊列,且為可持久化的 channel.QueueDeclare(QueueName, durable: true, exclusive: false, autoDelete: false, arguments: null);//聲明消息隊列,且為可持久化的 channel.QueueBind(QueueName, ExchangeName, routingKey: QueueName); string message = "Eric is very handsome"; var body = Encoding.UTF8.GetBytes(message); //將隊列設(shè)置為持久化之后,還需要將消息也設(shè)為可持久化的 var props = channel.CreateBasicProperties(); props.SetPersistent(true); channel.BasicPublish(ExchangeName, routingKey: QueueName, basicProperties: props, body: body); Console.WriteLine("Producter Sent: {0}", message); Console.ReadKey(); } } }注:ack是 acknowledgments 的縮寫,noAck 是("no manual acks")
因為我前段時間換了筆記本,所以用戶的“eric”的操作出踩了個坑,下面進行介紹下:
如果調(diào)試運行時報錯:None of the specified endpoints were reachable
innerException是:
{"The AMQP operation was interrupted: AMQP close-reason, initiated by Library, code=541, text="Unexpected Exception", classId=0, methodId=0, cause=System.IO.IOException: 無法從傳輸連接中讀取數(shù)據(jù): 遠程主機強迫關(guān)閉了一個現(xiàn)有的連接。。 ---> System.Net.Sockets.SocketException: 遠程主機強迫關(guān)閉了一個現(xiàn)有的連接。 在 System.Net.Sockets.Socket.Receive(Byte[] buffer, Int32 offset, Int32 size, SocketFlags socketFlags) 在 System.Net.Sockets.NetworkStream.Read(Byte[] buffer, Int32 offset, Int32 size) --- 內(nèi)部異常堆棧跟蹤的結(jié)尾 --- 在 RabbitMQ.Client.Impl.Frame.ReadFrom(NetworkBinaryReader reader) 在 RabbitMQ.Client.Impl.SocketFrameHandler.ReadFrame() 在 RabbitMQ.Client.Framing.Impl.Connection.MainLoopIteration() 在 RabbitMQ.Client.Framing.Impl.Connection.MainLoop()"}這說明我們使用的用戶 不是 系統(tǒng)默認的 guest 而是我們自己創(chuàng)建的用戶,但是沒有足夠的權(quán)限進行操作。
解決辦法:
rabbitmqctl set_user_tags username administratorrabbitmqctl set_permissions -p / username ".*" ".*" ".*"執(zhí)行結(jié)果:
相關(guān)其他操作見:windows下 安裝 rabbitMQ 及操作常用命令
程序運行結(jié)果:
消費者
class Recevice { const string ExchangeName = "eric.exchange"; const string QueueName = "eric.queue"; public static void Main() { var factory = new ConnectionFactory() { HostName = "localhost", UserName = "eric", Password = "123456", VirtualHost = "/" }; using (var connection = factory.CreateConnection()) using (var channel = connection.CreateModel()) { channel.ExchangeDeclare(ExchangeName, "direct", durable: true, autoDelete: false, arguments: null);//聲明消息隊列,且為可持久化的 channel.QueueDeclare(QueueName, durable: true, exclusive: false, autoDelete: false, arguments: null);//聲明消息隊列,且為可持久化的 channel.QueueBind(QueueName, ExchangeName, routingKey: QueueName); BasicGetResult msgResponse = channel.BasicGet(QueueName, noAck: true); //NoAck:true 告訴RabbitMQ立即從隊列中刪除消息,另一個非常受歡迎的方式是從隊列中刪除已經(jīng)確認接收的消息,可以通過單獨調(diào)用BasicAck 進行確認: //BasicGetResult msgResponse = channel.BasicGet(QueueName, noAck:false); var msgContent = Encoding.UTF8.GetString(msgResponse.Body); Console.WriteLine("The received content:"+msgContent); channel.BasicAck(msgResponse.DeliveryTag, multiple: false); //使用BasicAck方式來告之是否從隊列中移除該條消息 //需要額外注意,比如從隊列中獲取消息并用它來操作數(shù)據(jù)庫或日志文件時,如果出現(xiàn)操作失敗時,則該條消息應(yīng)該保留在隊列中,只到操作成功時才從隊列中移除。 Console.ReadKey(); } } }接受消息還有一種方法,就是通過基于推送的事件訂閱。可以使用內(nèi)置的 QueueingBasicConsumer 提供簡化的編程模型,允許在共享隊列上阻塞,直到收到一條消息。
var consumer = new QueueingBasicConsumer(channel); channel.BasicConsume(QueueName, noAck: true, consumer: consumer); var msgResponse = consumer.Queue.Dequeue(); var msgContent = Encoding.UTF8.GetString(msgResponse.Body);程序運行結(jié)果:
原文鏈接:https://www.cnblogs.com/ericli-ericli/p/5938106.html
總結(jié)
以上是生活随笔為你收集整理的rabbitmq 持久化_RabbitMQ原理与相关操作(三)消息持久化的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 与时间相关的java源码_Java 基于
- 下一篇: pyqt5 自定义控件_PyQt5学习笔