用户自定义协议client/server代码示例
用戶自定義協議client/server代碼示例
代碼參考鏈接:https://github.com/sogou/workflow
message.h
message.cc
server.cc
client.cc
關于user_defined_protocol
本示例設計一個簡單的通信協議,并在協議上構建server和client。server將client發送的消息轉換成大寫并返回。
協議的格式
協議消息包含一個4字節的head和一個message
body。head是一個網絡序的整數,指明body的長度。
請求和響應消息的格式一致。
協議的實現
用戶自定義協議,需要提供協議的序列化和反序列化方法,這兩個方法都是ProtocolMeessage類的虛函數。
另外,為了使用方便,強烈建議用戶實現消息的移動構造和移動賦值(用于std::move())。 在ProtocolMessage.h里,序列化反序列化接口如下:
namespace protocol
{
- class ProtocolMessage
- public CommMessageOut,
public CommMessageIn
{
private:
virtual
int encode(struct iovec
vectors[], int max);
/*
You have to implement one of the ‘append’ functions, and the first one
* with
arguement ‘size_t *size’ is recommmended. */
virtual
int append(const void
*buf, size_t *size);
virtual int append(const void
*buf, size_t size);
...
};
}
序列化函數encode
encode函數在消息被發送之前調用,每條消息只調用一次。
encode函數里,用戶需要將消息序列化到一個vector數組,數組元素個數不超過max。目前max的值為8192。
結構體struct iovec定義在請參考系統調用readv和writev。
encode函數正確情況下的返回值在0到max之間,表示消息使用了多少個vector。
如果是UDP協議,請注意總長度不超過64k,并且使用不超過1024個vector(Linux一次writev只能1024個vector)。
UDP協議只能用于client,無法實現UDP
server。
encode返回-1表示錯誤。返回-1時,需要置errno。如果返回值>max,將得到一個EOVERFLOW錯誤。錯誤都在callback里得到。
為了性能考慮vector里的iov_base指針指向的內容不會被復制。所以一般指向消息類的成員。
反序列化函數append
append函數在每次收到一個數據塊時被調用。因此,每條消息可能會調用多次。
buf和size分別是收到的數據塊內容和長度。用戶需要把數據內容復制。
如果實現了append(const void buf, size_t size)接口,可以通過修改size來告訴框架本次消費了多少長度。收到的size
- 消耗的size = 剩余的size,剩余的那部分buf會由下一次append被調用時再次收到。此功能更方便協議解析,當然用戶也可以全部復制自行管理,則無需修改size。
append函數返回0表示消息還不完整,傳輸繼續。返回1表示消息結束。-1表示錯誤,需要置errno。
總之append的作用就是用于告訴框架消息是否已經傳輸結束。不要在append里做復雜的非必要的協議解析。
errno的設置
encode或append返回-1或其它負數都會被理解為失敗,需要通過errno來傳遞錯誤原因。用戶會在callback里得到這個錯誤。
如果是系統調用或libc等庫函數失敗(比如malloc),libc肯定會設置好errno,用戶無需再設置。
一些消息不合法的錯誤是比較常見的,比如可以用EBADMSG,EMSGSIZE分別表示消息內容錯誤,和消息太大。
用戶可以選擇超過系統定義errno范圍的值來表示一些自定義錯誤。一般大于256的值是可以用的。
請不要使用負數errno。因為框架內部用了負數來代表SSL錯誤。
示例里,消息的序列化反序列化都非常的簡單。
頭文件message.h里,聲明了request和response類:
namespace protocol
{
- class TutorialMessage
- public ProtocolMessage
{
private:
virtual
int encode(struct iovec
vectors[], int max);
virtual
int append(const void
*buf, size_t size);
...
};
using TutorialRequest = TutorialMessage;
using TutorialResponse = TutorialMessage;
}
request和response類,都是同一種類型的消息。直接using就可以。
注意request和response必須可以無參數的被構造,也就是說需要有無參數的構造函數,或完全沒有構造函數。
此外,通訊過程中,如果發生重試,response對象會被銷毀并重新構造。因此,它最好是一個RAII類。否則處理起來會比較復雜。
message.cc里包含了encode和append的實現:
namespace protocol
{
int TutorialMessage::encode(struct iovec
vectors[], int max/max==8192/)
{
uint32_t
n = htonl(this->body_size);
memcpy(this->head,
&n, 4);
vectors[0].iov_base = this->head;vectors[0].iov_len = 4;vectors[1].iov_base = this->body;vectors[1].iov_len = this->body_size;return
2; /* return the number of vectors used, no more then max.
*/
}
int TutorialMessage::append(const void
*buf, size_t size)
{
if
(this->head_received
< 4)
{size_t
head_left;
void
*p;
p = &this->head[this->head_received];head_left = 4 - this->head_received;if
(size < 4 - this->head_received)
{memcpy(p,
buf, size);
this->head_received += size;return
0;
}memcpy(p,
buf, head_left);
size -= head_left;buf = (const char
*)buf + head_left;
p = this->head;this->body_size = ntohl(*(uint32_t *)p);if
(this->body_size
this->size_limit)
{errno = EMSGSIZE;return
-1;
}this->body = (char *)malloc(this->body_size);if
(!this->body)
return
-1;
this->body_received = 0;}size_t
body_left = this->body_size - this->body_received;
if
(size > body_left)
{errno = EBADMSG;return
-1;
}memcpy(this->body,
buf, body_left);
if
(size < body_left)
return
0;
return
1;
}
}
encode的實現非常簡單,固定使用了兩個vector,分別指向head和body。需要注意iov_base指針必須指向消息類的成員。
append需要保證4字節的head接收完整,再讀取message body。而且我們并不能保證第一次append一定包含完整的head,所以過程略為繁瑣。
append實現了size_limit功能,超過size_limit的會返回EMSGSIZE錯誤。用戶如果不需要限制消息大小,可以忽略size_limit這個域。
由于要求通信協議是一來一回的,所謂的“TCP包”問題不需要考慮,直接當錯誤消息處理。
現在,有了消息的定義和實現,就可以建立server和client了。
server和client的定義
有了request和response類,我們就可以建立基于這個協議的server和client。前面的示例里介紹過Http協議相關的類型定義:
using WFHttpTask =
WFNetworkTask<protocol::HttpRequest,
protocol::HttpResponse>;
using http_callback_t
= std::function<void (WFHttpTask *)>;
using WFHttpServer =
WFServer<protocol::HttpRequest,
protocol::HttpResponse>;
using http_process_t
= std::function<void (WFHttpTask *)>;
同樣的,對這個Tutorial協議,數據類型的定義并沒有什么區別:
using WFTutorialTask =
WFNetworkTask<protocol::TutorialRequest,
protocol::TutorialResponse>;
using tutorial_callback_t
= std::function<void (WFTutorialTask
*)>;
using WFTutorialServer =
WFServer<protocol::TutorialRequest,
protocol::TutorialResponse>;
using tutorial_process_t
= std::function<void (WFTutorialTask
*)>;
server端
server與普通的http
server沒有什么區別。優先IPv6啟動,這不影響IPv4的client請求。另外限制請求最多不超過4KB。
代碼請自行參考server.cc
client端
client端的邏輯是從標準IO接收用戶輸入,構造出請求發往server并得到結果。
為了簡單,讀取標準輸入的過程都在callback里完成,因此我們會先發出一條空請求。同樣為了安全我們限制server回復包不超4KB。
client端唯一需要了解的就是怎么產生一個自定義協議的client任務,在WFTaskFactory.h有三個接口可以選擇:
template<class REQ, class RESP>
class WFNetworkTaskFactory
{
private:
using
T = WFNetworkTask<REQ, RESP>;
public:
static
T *create_client_task(TransportType type,
const std::string& host,unsigned short
port,
int retry_max,std::function<void (T *)> callback);static
T *create_client_task(TransportType type,
const std::string& url,int retry_max,
std::function<void (T *)> callback);
static
T *create_client_task(TransportType type,
const URI& uri,int retry_max,
std::function<void (T *)>
callback);
...
};
其中,TransportType指定傳輸層協議,目前可選的值包括TT_TCP,TT_UDP,TT_SCTP和TT_TCP_SSL。
三個接口的區別不大,在這個示例里暫時不需要URL,用域名和端口來創建任務。
實際的調用代碼如下。派生了WFTaskFactory類,但這個派生并非必須的。
using namespace
protocol;
- class MyFactory
- public WFTaskFactory
{
public:
static
WFTutorialTask *create_tutorial_task(const std::string& host,
unsigned short
port,
int retry_max,
tutorial_callback_t callback)
{using
NTF = WFNetworkTaskFactory<TutorialRequest, TutorialResponse>;
WFTutorialTask *task = NTF::create_client_task(TT_TCP, host, port,
retry_max,
std::move(callback));task->set_keep_alive(30
-
1000);
return
task;
}
};
可以看到用了WFNetworkTaskFactory<TutorialRequest,
TutorialResponse>類來創建client任務。
接下來通過任務的set_keep_alive()接口,讓連接在通信完成之后保持30秒,否則,將默認采用短連接。
client的其它代碼涉及的知識點在之前的示例里都包含了。請參考client.cc
內置協議的請求是怎么產生的
現在系統中內置了http,
redis,mysql,kafka四種協議。可以通過相同的方法產生一個http或redis任務嗎?比如:
WFHttpTask *task = WFNetworkTaskFactory<protocol::HttpRequest,
protocol::HttpResponse>::create_client_task(…);
需要說明的是,這樣產生的http任務,會損失很多的功能,比如,無法根據header來識別是否用持久連接,無法識別重定向等。
同樣,如果這樣產生一個MySQL任務,可能根本就無法運行起來。因為缺乏登錄認證過程。
一個kafka請求可能需要和多臺broker有復雜的交互過程,這樣創建的請求顯然也無法完成這一過程。
可見每一種內置協議消息的產生過程都遠遠比這個示例復雜。同樣,如果用戶需要實現一個更多功能的通信協議,還有許多代碼要寫。
總結
以上是生活随笔為你收集整理的用户自定义协议client/server代码示例的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 使用TensorRT集成推理infere
- 下一篇: 将5g做到世界顶级