????????最近有用戶問如何使用BeetleX封裝一個基于Protobuf格式的websocket服務并支持控制器調用;其實BeetleX.FastHttpApi是支持Websocket服務和自定義數據格式的,但需要對組件有一定了解的情況才能進行擴展;接下來通過封裝一個支持Protobuf和MessagePack的websocket服務來介紹BeetleX.FastHttpApi的相關功能擴展和應用。
格式封裝
? ? ? ? 首先需要一個特性來標記消息類型在數據中用怎樣的數據來標記
[AttributeUsage(AttributeTargets.Class)]public class BinaryTypeAttribute : Attribute{public BinaryTypeAttribute(int id){this.ID = id;}public int ID { get; set; }}
可以通過一個Int類型來定義數值與消息類的關系。然后再定義一個特性用于描述消息對應的方法控制器類
[AttributeUsage(AttributeTargets.Class)]public class MessageControllerAttribute : Attribute{}
接下來就可以封裝對應Protobuf的數據轉換類,為了轉換方便就直接使用Protobuf.Net這個組件了
public class BinaryDataFactory{//數值與類型的關系private Dictionary<int, Type> mMessageTypes = new Dictionary<int, Type>();//類型與數值的關系private Dictionary<Type, int> mMessageIDs = new Dictionary<Type, int>();//定義消息與方法的關系private Dictionary<Type, ActionHandler> mActions = new Dictionary<Type, ActionHandler>();public void RegisterComponent<T>(){RegisterComponent(typeof(T));}public void RegisterComponent(Type type){foreach (var item in type.Assembly.GetTypes()){BinaryTypeAttribute[] bta = (BinaryTypeAttribute[])item.GetCustomAttributes(typeof(BinaryTypeAttribute), false);if (bta != null && bta.Length > 0){mMessageTypes[bta[0].ID] = item;mMessageIDs[item] = bta[0].ID;}
#if PROTOBUF_SERVERvar mca = item.GetCustomAttribute<MessageControllerAttribute>(false);if (mca != null){var controller = Activator.CreateInstance(item);foreach (MethodInfo method in item.GetMethods(BindingFlags.Public | BindingFlags.Instance)){var parameters = method.GetParameters();if (parameters.Length == 2 && parameters[0].ParameterType == typeof(WebSocketReceiveArgs)) {ActionHandler handler = new ActionHandler(controller, method);mActions[parameters[1].ParameterType] = handler;}}}
#endif}}//序列化對象public ArraySegment<byte> Serializable(object data){MemoryStream memory = new MemoryStream();var type = GetTypeID(data.GetType(), true);memory.Write(type);Serializer.Serialize(memory, data);return new ArraySegment<byte>(memory.GetBuffer(), 0, (int)memory.Length);}//反序列化對象public object Deserialize(byte[] data){return Deserialize(new ArraySegment<byte>(data, 0, data.Length));}//反序列化對象public object Deserialize(ArraySegment<byte> data){MemoryStream memory = new MemoryStream(data.Array, data.Offset, data.Count);byte[] id = new byte[4];memory.Read(id, 0, 4);Type type = GetMessageType(id, true);return Serializer.Deserialize(type, memory);}//獲消息對應數值的存儲數據public byte[] GetTypeID(Type type, bool littleEndian){if (mMessageIDs.TryGetValue(type, out int value)){if (!littleEndian){value = BeetleX.Buffers.BitHelper.SwapInt32(value);}return BitConverter.GetBytes(value);}throw new Exception($"binary websocket {type} id not found!");}//根據數值獲取類型public Type GetMessageType(int id){mMessageTypes.TryGetValue(id, out Type result);return result;}//根據存儲數獲取類型public Type GetMessageType(byte[] data, bool littleEndian){int value = BitConverter.ToInt32(data, 0);if (!littleEndian)value = BeetleX.Buffers.BitHelper.SwapInt32(value);return GetMessageType(value);}//根據消息獲處理方法public ActionHandler GetHandler(object message){mActions.TryGetValue(message.GetType(), out ActionHandler result);return result;}}public class ActionHandler{public ActionHandler(object controller, MethodInfo method){Method = method;Controller = controller;IsVoid = method.ReturnType == typeof(void);IsTaskResult = method.ReturnType.BaseType == typeof(Task);if (IsTaskResult && method.ReturnType.IsGenericType){HasTaskResultData = true;mGetPropertyInfo = method.ReturnType.GetProperty("Result", BindingFlags.Public | BindingFlags.Instance);}}private PropertyInfo mGetPropertyInfo;public?MethodInfo?Method?{?get;?private?set;?}public?bool?IsTaskResult?{?get;?set;?}public bool HasTaskResultData { get; set; }public?bool?IsVoid?{?get;?private?set;?}public?object?Controller?{?get;?private?set;?}public object GetTaskResult(Task task){return mGetPropertyInfo.GetValue(task);}public object Execute(params object[] data){return Method.Invoke(Controller, data);}}
協議轉換
??????? Protobuf的數據格式處理完成后就要針對BeetleX.FastHttpApi構建一個相應數據轉換器,這個轉換器實現也是非常簡單的。
public class ProtobufFrameSerializer : IDataFrameSerializer{public static BinaryDataFactory BinaryDataFactory { get; set; } = new BinaryDataFactory();public object FrameDeserialize(DataFrame data, PipeStream stream){var buffers = new byte[data.Length];stream.Read(buffers, 0, buffers.Length);return BinaryDataFactory.Deserialize(buffers);}public void FrameRecovery(byte[] buffer){}public ArraySegment<byte> FrameSerialize(DataFrame packet, object body){return BinaryDataFactory.Serializable(body);}}
實現IDataFrameSerializer接口的相關方法即可,接下來就針對HttpApiServer封裝一些擴展方法用于注冊對象和綁定Websocket數據處理事件。
public static class ProtobufExtensions{public static HttpApiServer RegisterProtobuf<T>(this HttpApiServer server){ProtobufFrameSerializer.BinaryDataFactory.RegisterComponent<T>();return server;}public static HttpApiServer UseProtobufController(this HttpApiServer server, Action<WebSocketReceiveArgs> handler = null){server.WebSocketReceive = async (o, e) =>{try{var msg = e.Frame.Body;var action = ProtobufFrameSerializer.BinaryDataFactory.GetHandler(msg);if (action != null){if (!action.IsVoid){if (action.IsTaskResult){Task task = (Task)action.Execute(e, msg);await task;if (action.HasTaskResultData){var result = action.GetTaskResult(task);e.ResponseBinary(result);}}else{var result = action.Execute(e, msg);e.ResponseBinary(result);}}}else{handler?.Invoke(e);}}catch (Exception e_){e.Request.Server.GetLog(BeetleX.EventArgs.LogType.Warring)?.Log(BeetleX.EventArgs.LogType.Error, e.Request.Session, $"Websocket packet process error {e_.Message}");}};return server;}}
服務使用
????????所有相關功能都封裝后就可以啟動HttpApiServer并把相關功能設置使用,在使用之前需要引用BeetleX.FastHttpApi.Hosting和Protobuf.Net組件
[BinaryType(1)][ProtoContract]public class User{[ProtoMember(1)]public string Name { get; set; }[ProtoMember(2)]public string Email { get; set; }[ProtoMember(3)]public DateTime ResultTime { get; set; }}[MessageController]public class Controller{public User Login(WebSocketReceiveArgs e, User user){user.ResultTime = DateTime.Now;return user;}}class?Program{static void Main(string[] args){BeetleX.FastHttpApi.Hosting.HttpServer server = new BeetleX.FastHttpApi.Hosting.HttpServer(80);server.Setting((service, options) =>{options.LogLevel = BeetleX.EventArgs.LogType.Trace;options.LogToConsole = true;options.ManageApiEnabled = false;options.WebSocketFrameSerializer = new ProtobufFrameSerializer();});server.Completed(http =>{http.RegisterProtobuf<User>();http.UseProtobufController();});server.Run();}}
客戶端
????????如果你希望在.Net中使用Websocket客戶端,BeetleX同樣也提供一個擴展組件BeetleX.Http.Clients,通過這組件可以輕易封裝一個Protobuf的Websocket客戶端。
public class ProtobufClient : BinaryClient{public ProtobufClient(string host) : base(host) { }public static BinaryDataFactory BinaryDataFactory { get; private set; } = new BinaryDataFactory();protected override object OnDeserializeObject(ArraySegment<byte> data){return BinaryDataFactory.Deserialize(data);}protected override ArraySegment<byte> OnSerializeObject(object data){return BinaryDataFactory.Serializable(data);}}
封裝完成后就可以使用了
class Program{static async Task Main(string[] args){ProtobufClient.BinaryDataFactory.RegisterComponent<User>();var client = new ProtobufClient("ws://localhost");User user = new User { Email="henryfan@msn.com", Name="henryfan" };var result = await client.ReceiveFrom<User>(user);Console.WriteLine(result.Name);}}
完整示例代碼可以訪問
https://github.com/beetlex-io/BeetleX-Samples/tree/master/Websocket.ProtobufPacket
https://github.com/beetlex-io/BeetleX-Samples/tree/master/Websocket.MessagePackPacket
BeetleX
開源跨平臺通訊框架(支持TLS) 提供高性能服務和大數據處理解決方案
https://beetlex-io.com
總結
以上是生活随笔 為你收集整理的BeetleX实现MessagePack和Protobuf消息控制器调用websocket服务详解 的全部內容,希望文章能夠幫你解決所遇到的問題。
如果覺得生活随笔 網站內容還不錯,歡迎將生活随笔 推薦給好友。