消息中间件NetMQ结合Protobuf简介
概述
對于稍微熟悉這兩個優秀的項目來說,每個內容單獨介紹都不為過,本文只是簡介并探討如何將兩部分內容合并起來,使其在某些場景下更適合、更高效。
?
NetMQ:ZeroMQ的.Net版本,ZeroMQ簡單來說就是局域網內的消息中間件(與MSMQ類似),包括了進程間通訊、點對點通訊、訂閱模式通訊等等,底層用更“完美”的Socket實現,ZeroMQ實現了多語言、跨平臺、高效率等諸多優勢。詳細介紹請參考ZeroMQ和NetMQ官方文檔:http://zguide.zeromq.org/page:all#Chapter-Sockets-and-Patterns,http://netmq.readthedocs.org/en/latest/introduction/。
?
Protocol Buffer:源自與Google內部的開源項目,作為高效的RPC消息協議,相比較Json、XML協議的消息格式,Protobuf在序列化以及數據大小上都具有十分明顯的優勢,跨平臺,協議可讀性也接近于Json等等。這里也推薦一篇文章:http://www.ibm.com/developerworks/cn/linux/l-cn-gpb/
?
定義Protobuf協議
Protocol Buffer(簡稱Protobuf)是以.proto的腳本形式實現的通用語義形式,類似于Json格式:
message WeatherMessage { enum CommandType {Debug=0;Weather=1;Other=2;}required CommandType Command=1 [default=Weather];optional string Content=2;message Loaction {required int32 East=1;required int32 North=2;}repeated Loaction UserLocation=3; }這里的Message、required(必選屬性)、optional(可有可無屬性)、repeated(內部嵌套的類型屬性)等都是proto的關鍵字,具體意義以及為關鍵字的功能大家可以查看官方文檔,這里只介紹如何應用,或者Stephen Liu的文章也不錯。
當然,光定義腳本是不能實現應用的,還需要根據特定的編碼語言進行描述,這里利用Protobuf-Net來實現.Net平臺的協議實現。
首先,下載軟件包:https://code.google.com/p/protobuf-net/(肯能需要FQ)
然后,解壓并將剛才的.proto文件復制到文件夾ProtoGen下。
最后,啟動CMD并cd到ProtoGen文件夾目錄下,運行命令:
protogen -i: PBWeatherMessage.proto -0: PBWeatherMessage.cs -ns:ProtobufNameSpace
(-i指定了輸入,-o指定了輸出,-ns指定了生成代碼的namespace)
如果,正確的話(當然了,我給出的腳本是不會錯的),就會生成一個PBWeatherMessage.cs文件,這樣的話就可以將.cs文件加入到項目中當做一個純粹的類來使用了。
代碼中使用,就是類似于二進制序列化一樣,只是這回序列化的是Protobuf專用的序列化方式而已。
序列化:
#region Protobufvar weatherMsg = new NetMQAndProtocolBuffer.PBProtocol.WeatherMessage(){Command = PBProtocol.WeatherMessage.CommandType.Weather,Content = string.Format("{0} {1} {2}", zipcode, temperature, relhumidity), };using (var sm = new MemoryStream()){ProtoBuf.Serializer.Serialize<NetMQAndProtocolBuffer.PBProtocol.WeatherMessage>(sm, weatherMsg);publisher.Send(sm.ToArray());}#endregion反序列化:
var weatherMsg = new NetMQAndProtocolBuffer.PBProtocol.WeatherMessage();var receivedBytes = subscriber.Receive();using (var sm = new MemoryStream(receivedBytes)){weatherMsg = ProtoBuf.Serializer.Deserialize<NetMQAndProtocolBuffer.PBProtocol.WeatherMessage>(sm);}這里就簡單介紹完了protobuf協議的使用,下面介紹一下NetMQ+Protobuf的使用。
?
NetMQ+Protobuf
接下來我們來改造下NetMQ Sample中的Publisher-Subscriber模式:
首先下載從GitHub上下載NetMQ Sample: https://github.com/zeromq/netmq
或者下載我的示例代碼,其中包含了一個No Protobuf的工程,這個是直接摘自原作者的示例代碼。
服務端Publisher:
using (var context = NetMQContext.Create())// NetMQ全局維護的Content上下文,建議只有一個并且使用完畢后及時回收。using (var publisher = context.CreatePublisherSocket())// 從Content上下文中創建CreatePublisherSocket,這里如果用其他四種模式之一需要Create其他類型。 {publisher.Bind("tcp://127.0.0.1:5556");// Bind到指定的IP及端口。var rng = new Random();while (!stopRequested){int zipcode = rng.Next(0, 99999);// 這里模擬一個隨機命令編號(如果非10001,客戶端直接丟棄此Publisher發布的消息,實現消息過濾)int temperature = rng.Next(-80, 135);int relhumidity = rng.Next(0, 90);publisher.Send(string.Format("{0} {1} {2}", zipcode, temperature, relhumidity));// 直接Send,干凈整潔。 }}客戶端Subscriber:
using (var context = NetMQContext.Create())// 創建全局NetMQ句柄,建議唯一,使用完畢及時回收。 using (var subscriber = context.CreateSubscriberSocket())// 創建Publisher-Subscriber模式的客戶端監聽。 {subscriber.Connect("tcp://127.0.0.1:5556");// 連接到指定Socketsubscriber.Subscribe(zipToSubscribeTo.ToString(CultureInfo.InvariantCulture));// 這里創建消息內容的過濾,如果不包含“zipToSubscribeTo”值則不接收消息。for (int i = 0; i < iterations; i++){string results = subscriber.ReceiveString(); // 如果消息以“zipToSubscribeTo”開頭,則會返回整條信息。Console.Write(".");// "zip temp relh" ... "10001 84 23" -> ["10001", "84", "23"]string[] split = results.Split(new[] { ' ' }, StringSplitOptions.RemoveEmptyEntries);// 按照固定模式解碼。int zip = int.Parse(split[0]);if (zip != zipToSubscribeTo){throw new Exception(string.Format("Received message for unexpected zipcode: {0} (expected {1})", zip, zipToSubscribeTo));}totalTemp += int.Parse(split[1]);totalHumidity += int.Parse(split[2]);}}這就是四種模式之一的發布者模式,使用起來很方便,但是這僅僅傳遞的是基于String的字符串,還不是一個可以序列化的對象,下一步我們將把消息字符串用Protobuf進行序列化與反序列化,來優化我們的消息格式。
請參考,我的示例代碼中的Publisher Pattern工程:
服務端Publisher:
using (var context = NetMQContext.Create())using (var publisher = context.CreatePublisherSocket()) {publisher.Bind("tcp://127.0.0.1:5556");var rng = new Random();while (!stopRequested){ int zipcode = rng.Next(10000,10010); //Relpace: rng.Next(0, 99999);int temperature = rng.Next(-80, 135);int relhumidity = rng.Next(0, 90);#region Protobufvar weatherMsg = new NetMQAndProtocolBuffer.PBProtocol.WeatherMessage(){Command = PBProtocol.WeatherMessage.CommandType.Weather,Content = string.Format("{0} {1} {2}", zipcode, temperature, relhumidity), };using (var sm = new MemoryStream()){ProtoBuf.Serializer.Serialize<NetMQAndProtocolBuffer.PBProtocol.WeatherMessage>(sm, weatherMsg);publisher.Send(sm.ToArray());}#endregion// publisher.Send(string.Format("{0} {1} {2}", zipcode, temperature, relhumidity)); WriteLine(string.Format("Publisher send message: {0} {1} {2}", zipcode, temperature, relhumidity));System.Threading.Thread.Sleep(100);}} View Code客戶端Subscriber:
using (var context = NetMQContext.Create())using (var subscriber = context.CreateSubscriberSocket()){subscriber.Connect("tcp://127.0.0.1:5556");subscriber.SubscribeToAnyTopic(); // No Command Filter, warn if not set thie method SubscribeToAnyTopic, it will receive nothing.while (true){if (curIndex > iterations) break;var weatherMsg = new NetMQAndProtocolBuffer.PBProtocol.WeatherMessage();var receivedBytes = subscriber.Receive();using (var sm = new MemoryStream(receivedBytes)){weatherMsg = ProtoBuf.Serializer.Deserialize<NetMQAndProtocolBuffer.PBProtocol.WeatherMessage>(sm);}// "zip temp relh" ... "10001 84 23" -> ["10001", "84", "23"]string[] split = weatherMsg.Content.Split(new[] { ' ' }, StringSplitOptions.RemoveEmptyEntries);int cmdId = int.Parse(split[0]);if (weatherMsg.Command == PBProtocol.WeatherMessage.CommandType.Weather){if (cmdId == zipToSubscribeTo){curIndex++;WriteLine(string.Format("Subscriber receive message: {0}", weatherMsg.Content));totalTemp += int.Parse(split[1]);totalHumidity += int.Parse(split[2]);}}}?
好了,其實單獨來看,這兩部分內容并為涉及的很深入,只是作為一個技術實踐、技術儲備,希望其中有問題或者有更好的應用場景,還請各位留言,不勝感謝!
?
我的示例代碼下載
冷靜下來
這里補充一些不足:
?
引用
ZMQ:http://zguide.zeromq.org/page:all#Chapter-Sockets-and-Patterns
NetMQ:http://netmq.readthedocs.org/en/latest/introduction/
Protocol Buffer:http://www.ibm.com/developerworks/cn/linux/l-cn-gpb/
Stephen Liu:http://www.cnblogs.com/stephen-liu74/archive/2013/01/02/2841485.html
Protobuf-Net:https://code.google.com/p/protobuf-net/
?
轉載于:https://www.cnblogs.com/cuiyansong/p/4326047.html
總結
以上是生活随笔為你收集整理的消息中间件NetMQ结合Protobuf简介的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Oracle资源管理器(二)-- 创建和
- 下一篇: 程序员面试不完全指南