(二): 基于ZeroMQ的实时通讯平台
基于ZeroMQ的實時通訊平臺
上篇:C++分布式實時應用框架 (Cpp Distributed Real-time Application Framework)----(一):整體介紹
版權聲明:本文版權及所用技術歸屬smartguys團隊所有,對于抄襲,非經同意轉載等行為保留法律追究的權利!
通訊平臺作為C++分布式實時應用框架(Cpp Distributed Real-time ApplicationFramework)的最核心模塊,承擔了分布式實時框架的基礎通訊功能。通訊平臺框架具備了基于Reactor模式的網絡通訊能力,并且依賴于ZeroMQ庫,因此支持非持久化的message queue的功能。基于配置文件來自動建立鏈接關系的功能,可以和狀態中心一起配合,實現無需重啟節點的動態擴容縮容等功能。強大的實時監控能力,可以實時上報每個通訊子節點的TPS和時延等關鍵性能數據。管控業務進程的能力,業務進程的心跳檢測,故障時自動重啟、保證系統正常運行。完善的平臺工具,可以通過通訊平臺向業務進程發送各種命令,如:調整日志級別,刷新業務參數,啟停業務進程等等。下面將逐一介紹通訊平臺的功能細節。
一、根據配置文件自動建立通訊鏈接拓撲關系
常見的分布式系統通常將進程間、節點間的各種通訊關系寫死在業務代碼中,這是導致代碼復雜難以理解的原因。我們創新地將所有的通訊關系提取到AppInit.json配置文件中,業務代碼中不再包含任何與通訊連接相關的內容,使業務代碼可以更專注于業務處理,而不用分心于復雜的分布式節點通訊當中。下面我們將帶大家看下圖所示通訊關系的配置。
OLC作為數據分發節點,給多個業務處理節點分發消息。業務處理節點內部由OCDis接收外部消息,轉發給內部的OCPro業務處理進程,并負責處理完后的回包。
OLC配置部分:
   "OLC" : {
      "AUTO_START" : "YES",
      "ENDPOINTS" : [
         {  // 用于與SmartMonitor建立心跳
            "name" : "MonitorSUB",   
            "zmq_socket_action" : "CONNECT",  // ZMQ的連接模式
            "zmq_socket_type" : "ZMQ_SUB"     // ZMQ的通訊模式
         },
         { // 下發消息給OCDis,這邊存在轉發功能,支持業務實現按條件轉發
            "downstream" : [ "OCDis2OLC"],
            "name" : "NE2OLC",                // 根據這個名字在業務代碼中實現轉發
            "zmq_socket_action" : "BIND",
            "zmq_socket_type" : "ZMQ_STREAM" 
         },
         { // OLC到OCDis的鏈路
            "name" : "OCDis2OLC",
            "statistics_on" : true,
            "zmq_socket_action" : "CONNECT",
            "zmq_socket_type" : "ZMQ_DEALER"
         },
         { // OCDis回OLC的鏈路,之所以來去分開,主要用于實現優雅啟停功能(啟停節點保證不丟消息)
            "name" : "OCDis2OLC_Backway",
            "statistics_on" : true,
            "zmq_socket_action" : "CONNECT",
            "zmq_socket_type" : "ZMQ_DEALER",
            "backway_pair" : "OCDis2OLC"
         },
         {  // 用于與SmartMonitor的命令消息鏈路
            "name" : "OLC2Monitor",
            "zmq_socket_action" : "CONNECT",
            "zmq_socket_type" : "ZMQ_DEALER"
         },
      ],
      "ENDPOINT_TO_MONITOR" : "OLC2Monitor",
      "INSTANCE_GROUP" : [
         {
            "instance_endpoints_address" : [
               {
                  "endpoint_name" : "NE2OLC",
                  "zmq_socket_address" : "tcp://*:6701"
               },
               {
                  "endpoint_name" : "OCDis2OLC",
                  "zmq_socket_address" : [
                     "tcp://127.0.0.1:7201"   // 跨機的IP地址與端口,配合狀態中心可實現自動管理,無需人工參與配置
                  ]
               },
               {
                  "endpoint_name" : "OCDis2OLC_Backway",
                  "zmq_socket_address" : [
                     "tcp://127.0.0.1:7202"
                  ]
               },
               {
                  "endpoint_name" : "OLC2Monitor",
                  "zmq_socket_address" : "ipc://Monitor2Business_IPC"
               },
               {
                  "endpoint_name" : "MonitorSUB",
                  "zmq_socket_address" : "ipc://MonitorPUB"
               }
            ],
            "instance_group_name" : "1"
         }
      ]
   },
OLC程序:
static const char * ENDPOINT_NE2OLC = "NE2OLC";
static const char * ENDPOINT_OLC2OCDIS = "OCDis2OLC";
static const char * ENDPOINT_MONITORSUB = "MonitorSUB";
int main(int argc, char * argv[]) {
    SmartUtilities::Daemonize();
    OLCProxyServer server(argc, argv);
    if (!server.Initialize(logger))
        return -1;
  
    // OLC與OCDis的消息處理
    server.SetCallbackOnReceivingMessage(ENDPOINT_OLC2OCDIS, bind(&OLCProxyServer::ReceiveFromOCDis, &server, _1, _2, _3));
  // OLC與SmartMonitor的消息處理
    server.SetCallbackOnReceivingMessage(ENDPOINT_MONITORSUB, bind(&OLCProxyServer::ReceiveFromMonitorSUB, &server, _1, _2, _3));
  // 解析消息包實現業務功能
    server.SetPacketParserFunction(ENDPOINT_NE2OLC, bind(&OLCProxyServer::ParseStreamCCR, &server, _1, _2, _3));
  // 設置消息轉發具體規則
    server.SetDownstreamSelector(ENDPOINT_NE2OLC, bind(&OLCProxyServer::StreamSelector, &server, _1, _2));
    server.Run();
    return 0;
}
二、在線更新鏈接拓撲能力
通訊平臺支持在線重新讀取更新的配置文件,更新網絡拓撲,自動建立新鏈接、斷開舊鏈接的能力。配合狀態中心可以實現無需重啟節點的動態擴容縮容等功能。
三、SmartMonitor進程監控管理業務進程與SmartTool工具進程
業務進程可以跟SmartMonitor建立通訊聯系,SmartMonitor可以檢測業務進程的心跳,以保證業務進程的可用。SmartMonitor通過AppCount.json來管理節點業務進程,實現統一啟停等功能。
{
  "OCPro": {
    "IN":  2,      // 業務進程可以有不同的種類,后面代表進程數
    "PS":  3,
    "SMS": 4,
  },
  "OCDis": 3,
  "SERVER_TYPE":"OCS"  // 節點的類型
}
還可以通過SmartTool工具進程,來給業務進程發送各種命令,如:調整日志級別,刷新業務參數,啟停業務進程等等。
1. 啟動平臺
SmartMonitor
2. 停平臺
SmartTool stop all
停指定進程(停止后會被SmartMonitor重新拉起)
SmartTool stop OCPro 停止所有業務的OCPro進程
SmartTool stop OCPro.IN 停止IN業務的OCPro進程
SmartTool stop 4829 停止PID為4829的進程
3. 調整應用層、框架層日志級別
其中,日志級別為error,warn,info,debug,trace
SmartTool log 進程名 level=日志級別,flush=日志級別
比如: SmartTool log OCProlevel=debug,flush=debug
四、通訊平臺性能數據
進程Z負載控制消息流量,進程A負責發、收消息,統計時延數據。進程B收到消息后負責回消息。
性能瓶頸主要在A機,既要負責收發包,又要統計時延數據,還要控制流量。
未完待續...
技術交流合作QQ群:436466587 歡迎討論交流
總結
以上是生活随笔為你收集整理的(二): 基于ZeroMQ的实时通讯平台的全部內容,希望文章能夠幫你解決所遇到的問題。
                            
                        - 上一篇: Vue.js响应式原理
 - 下一篇: ini是什么