C#网络编程(六)----Socket编程模型
簡(jiǎn)介
Socket(套接字)是計(jì)算機(jī)網(wǎng)絡(luò)中的一套編程接口,是網(wǎng)絡(luò)編程的核心,它將復(fù)雜的網(wǎng)絡(luò)協(xié)議封裝為簡(jiǎn)單的API,是應(yīng)用層(HTTP)與傳輸層(TCP)之間的橋梁。
應(yīng)用程序通過調(diào)用Socket API,比如connect、send、recv,無需處理IP包封裝,路由選擇等復(fù)雜網(wǎng)絡(luò)操作,屏蔽底層細(xì)節(jié)將網(wǎng)絡(luò)通信簡(jiǎn)化為建立連接-數(shù)據(jù)接收-數(shù)據(jù)發(fā)送-連接斷開,降低了開發(fā)復(fù)雜度。
FD&Handle
- FD
文件描述符,在linux系統(tǒng)中,一切皆文件,它是內(nèi)核為了管理已打開的文件,而給每個(gè)進(jìn)程維護(hù)的一個(gè)文件描述符表,而FD就是一個(gè)文件的索引。 - Handle
而在windows平臺(tái)下,這個(gè)概念被稱為Handle(句柄),都為應(yīng)用程序提供了一種統(tǒng)一的方式來訪問和操作資源,隱藏了底層資源管理的復(fù)雜性。
FD主要用于標(biāo)識(shí)文件、套接字、管道等輸入輸出資源;而Handle的應(yīng)用范圍更廣,除了文件和網(wǎng)絡(luò)資源外,還可以用于標(biāo)識(shí)窗口、進(jìn)程、線程、設(shè)備對(duì)象等各種系統(tǒng)資源。
Socket 網(wǎng)絡(luò)模型
BIO,Blocking I/O
BIO 是最傳統(tǒng)的 I/O 模型,其核心特征是一個(gè)連接一個(gè)線程,線程在讀取/寫入時(shí)會(huì)阻塞,直到I/O操作完成。
private static Socket _server;
private static byte[] _buffer = new byte[1024 * 4];
static void Main(string[] args)
{
_server=new Socket(AddressFamily.InterNetwork,SocketType.Stream, ProtocolType.Tcp);
_server.Bind(new IPEndPoint(IPAddress.Any, 6666));
_server.Listen();
while (true)
{
//BIO核心,線程阻塞,等待客戶端連接
var client = _server.Accept();
Console.WriteLine($"Client {client.RemoteEndPoint} connect. ");
//BIO核心,線程阻塞,等待客戶端發(fā)送消息
var messageCount = client.Receive(_buffer);
var message = Encoding.UTF8.GetString(_buffer, 0, messageCount);
Console.WriteLine($"Client {client.RemoteEndPoint} Say:{message}");
}
}
從代碼中可以看出,有兩個(gè)地方阻塞,一是Accept(),二是Receive(),如果客戶端一直不發(fā)送數(shù)據(jù),那么線程會(huì)一直阻塞在Receive()上,也不會(huì)接受其它客戶端的連接。
C10K問題
有聰明的小伙伴會(huì)想到,我可以利用多線程來處理Receive(),這樣就服務(wù)端就可以接受其它客戶端的連接了。
internal class Program
{
private static Socket _server;
private static byte[] _buffer = new byte[1024 * 4];
static void Main(string[] args)
{
_server=new Socket(AddressFamily.InterNetwork,SocketType.Stream, ProtocolType.Tcp);
_server.Bind(new IPEndPoint(IPAddress.Any, 6666));
_server.Listen();
while (true)
{
//BIO核心,線程阻塞,等待客戶端連接
var client = _server.Accept();
Console.WriteLine($"Client {client.RemoteEndPoint} connect. ");
//多線程讀取客戶端數(shù)據(jù),避免主線程阻塞
Task.Run(() => HandleClient(client));
}
}
static void HandleClient(Socket client)
{
while (true)
{
//BIO核心,線程阻塞,等待客戶端發(fā)送消息
var messageCount = client.Receive(_buffer);
var message = Encoding.UTF8.GetString(_buffer, 0, messageCount);
Console.WriteLine($"Client {client.RemoteEndPoint} Say:{message}");
}
}
}
當(dāng)給客戶端建立好連接后,會(huì)啟用一個(gè)新的線程來單獨(dú)處理Receive(),避免了主線程阻塞。
但有一個(gè)嚴(yán)重的缺陷,就是當(dāng)一萬個(gè)客戶端同時(shí)連接,服務(wù)端要?jiǎng)?chuàng)建一萬個(gè)線程來接。一萬個(gè)線程帶來的CPU上下文切換與內(nèi)存成本,非常容易會(huì)拖垮服務(wù)器。這就是C10K問題來由來。
因此,BIO的痛點(diǎn)在于:
- 高并發(fā)下資源耗盡
當(dāng)連接數(shù)激增時(shí),線程數(shù)量呈線性增長(zhǎng)(如 10000 個(gè)連接對(duì)應(yīng) 10000 個(gè)線程),導(dǎo)致內(nèi)存占用過高、上下文切換頻繁,系統(tǒng)性能急劇下降。 - 阻塞導(dǎo)致效率低下
線程在等待 IO 時(shí)無法做其他事情,CPU 利用率低。
NIO,Non-Blocking I/O
為了解決此問題,需要跪舔操作系統(tǒng),為用戶態(tài)程序提供一個(gè)真正非阻塞的Accept/Receive的函數(shù)。
該函數(shù)的效果應(yīng)該是,當(dāng)沒有新連接/新數(shù)據(jù)到達(dá)時(shí),不阻塞線程。而是返回一個(gè)特殊標(biāo)識(shí),來告訴線程沒有活干。
Java 1.4 引入 NIO,C# 通過Begin/End異步方法或SocketAsyncEventArgs實(shí)現(xiàn)類似邏輯。
internal class Program
{
private static Socket _server;
private static byte[] _buffer = new byte[1024 * 4];
//所有客戶端的連接
private static readonly List<Socket> _clients = new List<Socket>();
static void Main(string[] args)
{
_server=new Socket(AddressFamily.InterNetwork,SocketType.Stream, ProtocolType.Tcp);
_server.Bind(new IPEndPoint(IPAddress.Any, 6666));
_server.Listen();
//NIO核心,設(shè)為非阻塞模式
_server.Blocking = false;
while (true)
{
try
{
var client = _server.Accept();
_clients.Add(client);
Console.WriteLine($"Client {client.RemoteEndPoint} connect. ");
}
catch (SocketException ex) when(ex.SocketErrorCode==SocketError.WouldBlock)
{
//沒有新連接時(shí),調(diào)用Accept觸發(fā)WouldBlock異常,無視即可。
}
//一個(gè)線程同時(shí)管理Accept與Receive,已經(jīng)有了多路復(fù)用的意思。
HandleClient();
}
}
static void HandleClient()
{
//一個(gè)一個(gè)遍歷,尋找可用的客戶端,
foreach (var client in _clients.ToList())
{
try
{
//NIO核心,非阻塞讀取數(shù)據(jù),無數(shù)據(jù)時(shí)立刻返回
var messageCount = client.Receive(_buffer, SocketFlags.None);
var message = Encoding.UTF8.GetString(_buffer, 0, messageCount);
Console.WriteLine($"Client {client.RemoteEndPoint} Say:{message}");
}
catch (SocketException ex) when (ex.SocketErrorCode == SocketError.WouldBlock)
{
//沒有新數(shù)據(jù)讀取時(shí),調(diào)用Receive觸發(fā)WouldBlock異常,無視即可。
}
}
}
}
通過NIO,我們可以非常驚喜的發(fā)現(xiàn)。我們
僅用了一個(gè)線程就完成對(duì)客戶端的連接與監(jiān)聽,相對(duì)BIO有了質(zhì)的變化。
但有一個(gè)細(xì)節(jié),在數(shù)據(jù)沒拷貝到內(nèi)核緩沖區(qū)之前,這個(gè)階段是非阻塞的。當(dāng)已經(jīng)到達(dá)內(nèi)核緩沖區(qū)時(shí),此時(shí)調(diào)用Accept/Receive是會(huì)阻塞的,因?yàn)閒lag已經(jīng)填充了,需要等待一個(gè)從內(nèi)核緩沖區(qū)拷貝到用戶緩存區(qū)的時(shí)間。
盡管NIO已經(jīng)是JAVA世界的絕對(duì)主流,但依舊存在幾個(gè)痛點(diǎn):
- 輪詢開銷
如果事件比較少,輪詢會(huì)產(chǎn)生大量空轉(zhuǎn),CPU資源被浪費(fèi)。 - 需要手動(dòng)處理細(xì)節(jié)
比如手動(dòng)編寫捕獲when (ex.SocketErrorCode == SocketError.WouldBlock)來識(shí)別狀態(tài),
需要手動(dòng)處理TPC粘包,以及各種異常處理。
AIO,Asynchronous I/O
AIO作為大魔王與終極優(yōu)化,實(shí)現(xiàn)了真正的異步操作,當(dāng)發(fā)起IO請(qǐng)求后,內(nèi)核完全接管IO處理,完成后通過回調(diào)或者事件來通知程序,開發(fā)者無需關(guān)心緩沖區(qū)管理、事件狀態(tài)跟蹤或輪詢開銷。
Java 7 引入 NIO.2(AIO),C# 通過IOCP+Async來實(shí)現(xiàn)
internal class Program
{
private static Socket _server;
private static Memory<byte> _buffer = new byte[1024 * 4];
//所有客戶端的連接
private static readonly List<Socket> _clients = new List<Socket>();
static async Task Main(string[] args)
{
_server=new Socket(AddressFamily.InterNetwork,SocketType.Stream, ProtocolType.Tcp);
_server.Bind(new IPEndPoint(IPAddress.Any, 6666));
_server.Listen();
while (true)
{
//異步等待連接,線程不阻塞
var client = await _server.AcceptAsync();
//不阻塞主線程,由線程池調(diào)度
HandleClientAsync(client);
}
}
private static async Task HandleClientAsync(Socket client)
{
//異步讀取數(shù)據(jù),由操作系統(tǒng)完成IO后喚醒
var messageCount = await client.ReceiveAsync(_buffer);
var message = Encoding.UTF8.GetString(_buffer.ToArray(), 0, messageCount);
Console.WriteLine($"Client {client.RemoteEndPoint} Say:{message}");
}
}
Linux/Windows對(duì)模型的支持
IOCP:nput/Output Completion Port,I/O完成端口
.NET Core在Windows下基于IOCP,在Linux下基于epoll,在macOS中基于kqueue
NIO的改良,IO multiplexing
I/O Multiplexing 是一種高效處理多個(gè)I/O操作的技術(shù),核心思想是通過少量線程管理多個(gè)I/O流,避免因?yàn)閱蝹€(gè)I/O阻塞導(dǎo)致整體服務(wù)性能下降。
它通過事件機(jī)制(可讀,可寫,異常)監(jiān)聽多個(gè)I/O源,當(dāng)某個(gè)I/O流可操作時(shí),才對(duì)其執(zhí)行讀寫操作,從而實(shí)現(xiàn)單線程處理多連接的高效模型。
IO 多路復(fù)用本質(zhì)是NIO的改良
select/poll
參考上面的代碼,HandleClient方法中,我們遍歷了整個(gè)_Clients,用以尋找客戶端的Receive。
同樣是C10K問題,如果我們1萬,甚至100萬個(gè)客戶端連接。那么遍歷的效率太過低下。尤其是每調(diào)用一次Receive都是一次用戶態(tài)到內(nèi)核態(tài)的切換。
那么,如果讓操作系統(tǒng)告訴我們,哪些連接是可用的,我們就避免了在用戶態(tài)遍歷,從而提高性能。
/// <summary>
/// 偽代碼
/// </summary>
static void HandleClientSelect()
{
var clients = _clients.ToList();
//自己不遍歷,交給內(nèi)核態(tài)去遍歷.
//這里會(huì)有一次list copy到內(nèi)核態(tài)的過程,如果list量很大,開銷也不小.
var readyClients= Socket.Select(clients);
//內(nèi)核會(huì)幫你標(biāo)記好哪些client已經(jīng)就緒
foreach (var client in readyClients)
{
//用戶態(tài)依舊需要遍歷一遍,但避免無意義的系統(tǒng)調(diào)用,用戶態(tài)到內(nèi)核態(tài)的切換.只有真正就緒的client才處理
if (client.IsReady)
{
var messageCount = client.Receive(_buffer, SocketFlags.None);
var message = Encoding.UTF8.GetString(_buffer, 0, messageCount);
Console.WriteLine($"Client {client.RemoteEndPoint} Say:{message}");
}
else
{
break;
}
}
}
通過監(jiān)聽一組文件描述符(File Descriptor, FD)的可讀、可寫或異常狀態(tài),當(dāng)其中任意狀態(tài)滿足時(shí),內(nèi)核返回就緒的 FD 集合。用戶需遍歷所有 FD 判斷具體就緒的 I/O 操作。
select模型受限于系統(tǒng)默認(rèn)值,最大只能處理1024個(gè)連接。poll模型通過結(jié)構(gòu)體數(shù)組替代select位圖的方式,避免了數(shù)量限制,其它無區(qū)別。
epoll
作為NIO的終極解決方案,它解決了什么問題?
- 調(diào)用select需要傳遞整個(gè)List
var readyClients= Socket.Select(clients);
如果list中有10W+,那么這個(gè)copy的成本會(huì)非常高 - select依舊是線性遍歷
在內(nèi)核層面依舊是遍歷整個(gè)list,尋找可用的client,所以時(shí)間復(fù)雜度不變O(N),只是減少了從用戶態(tài)切換到內(nèi)核態(tài)的次數(shù)而已 - 僅僅對(duì)ready做標(biāo)記,并不減少返回量
select僅僅返回就緒的數(shù)量,具體是哪個(gè)就緒,還要自己遍歷一遍。
所以epoll模型主要主要針對(duì)這三點(diǎn),做出了如下優(yōu)化:
- 通過mmap,zero copy,減少數(shù)據(jù)拷貝
- 不再通過輪詢方式,而是通過異步事件通知喚醒,內(nèi)部使用紅黑樹來管理fd/handle
- 喚醒后,僅僅返回有變化的fd/handle,用戶無需遍歷整個(gè)list
基于事件驅(qū)動(dòng)(Event-Driven)機(jī)制,內(nèi)核維護(hù)一個(gè) FD 列表,通過epoll_ctl添加 / 刪除 FD 監(jiān)控,epoll_wait阻塞等待就緒事件。就緒的 FD 通過事件列表返回,用戶僅需處理就緒事件對(duì)應(yīng)的 FD。
點(diǎn)擊查看代碼
#include <sys/socket.h>
#include <sys/types.h>
#include <netinet/in.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <arpa/inet.h>
#include <pthread.h>
#include <unistd.h>
#include <sys/epoll.h>
#include <fcntl.h>
#include <errno.h>
#define SEVER_PORT 6666
#define BUFFER_SIZE 1024
#define MAX_EVENTS 10
#define handle_error(cmd,result)\
if(result<0){ \
perror(cmd); \
exit(EXIT_FAILURE); \
} \
char *read_buf=NULL;
char *write_buf=NULL;
void init_buf()
{
read_buf=malloc(sizeof(char)* BUFFER_SIZE);
//讀內(nèi)存分配判斷
if(!read_buf)
{
printf("讀緩存創(chuàng)建異常,斷開連接\n");
exit(EXIT_FAILURE);
}
//寫內(nèi)存分配判斷
write_buf=malloc(sizeof(char)* BUFFER_SIZE);
if(!write_buf)
{
printf("寫緩存創(chuàng)建異常,斷開連接\n");
exit(EXIT_FAILURE);
}
memset(read_buf,0,BUFFER_SIZE);
memset(write_buf,0,BUFFER_SIZE);
}
void clear_buf(char *buf)
{
memset(buf,0,BUFFER_SIZE);
}
void set_nonblocking(int sockfd)
{
int opts=fcntl(sockfd,F_GETFL);
if(opts<0)
{
perror("fcntl(F_GETFL)");
exit(EXIT_FAILURE);
}
opts|=O_NONBLOCK;
int res=fcntl(sockfd,F_SETFL,opts);
if(res<0)
{
perror("fcntl(F_GETFL)");
exit(EXIT_FAILURE);
}
}
int main(int argc, char const *argv[])
{
//初始化讀寫緩沖區(qū)
init_buf();
//聲明sockfd,clientfd
int sockfd,client_fd,temp_result;
//聲明服務(wù)端與客戶端地址
struct sockaddr_in server_addr,client_addr;
memset(&server_addr,0,sizeof(server_addr));
memset(&client_addr,0,sizeof(client_addr));
//聲明IP協(xié)議
server_addr.sin_family=AF_INET;
//綁定主機(jī)地址
server_addr.sin_addr.s_addr=htonl(INADDR_ANY);
//綁定端口
server_addr.sin_port=htons(SEVER_PORT);
//創(chuàng)建socket
sockfd=socket(AF_INET,SOCK_STREAM,0);
handle_error("socket",sockfd);
//綁定地址
temp_result=bind(sockfd,(struct sockaddr *)&server_addr,sizeof(server_addr));
handle_error("bind",temp_result);
//進(jìn)入監(jiān)聽
temp_result=listen(sockfd,128);
handle_error("listen",temp_result);
//將sockfd設(shè)為非阻塞模式
set_nonblocking(sockfd);
int epollfd,nfds;
struct epoll_event ev,events[MAX_EVENTS];
//創(chuàng)建epoll
epollfd=epoll_create1(0);
handle_error("epoll_create1",epollfd);
//將sockfd加入到監(jiān)控列表
ev.data.fd=sockfd;
//將關(guān)聯(lián)的文件描述符設(shè)為可讀,可讀說明有連接進(jìn)入,就會(huì)被epoll觸發(fā)
ev.events=EPOLLIN;
temp_result=epoll_ctl(epollfd,EPOLL_CTL_ADD,sockfd,&ev);
handle_error("epoll_ctl",temp_result);
socklen_t client_addr_len=sizeof(client_addr);
//接受client連接
while (1)
{
//掛起等待,有可讀信息
//nfds表示有多少個(gè)客戶端連接與多少條消息
nfds=epoll_wait(epollfd,events,MAX_EVENTS,-1);
handle_error("epoll_wait",nfds);
for (int i = 0; i < nfds; i++)
{
//第一個(gè)是sockfd,要預(yù)處理一下。
if(events[i].data.fd==sockfd)
{
client_fd=accept(sockfd,(struct sockaddr *)&client_addr,&client_addr_len);
handle_error("accept",client_fd);
set_nonblocking(client_fd);
printf("與客戶端from %s at PORT %d 文件描述符 %d 建立連接\n",inet_ntoa(client_addr.sin_addr),ntohs(client_addr.sin_port),client_fd);
//將獲取到的client連接也添加到監(jiān)控列表
ev.data.fd=client_fd;
ev.events=EPOLLIN|EPOLLET;
epoll_ctl(epollfd,EPOLL_CTL_ADD,client_fd,&ev);
}
//既有新的客戶端連接,又有舊客戶端發(fā)送消息
else if(events[i].events&EPOLLIN)
{
//老連接有數(shù)據(jù)
int count=0,send_count=0;
client_fd=events[i].data.fd;
while ((count=recv(client_fd,read_buf,BUFFER_SIZE,0)>0))
{
printf("receive message from client_fd: %d: %s \n",client_fd,read_buf);
clear_buf(read_buf);
strcpy(write_buf,"receive~\n");
send_count=send(client_fd,write_buf,strlen(write_buf),0);
handle_error("send",send_count);
clear_buf(write_buf);
}
if(count==-1&&errno==EAGAIN)
{
printf("當(dāng)前批次已經(jīng)讀取完畢。\n");
}
else if(count==0)
{
printf("客戶端client_fd:%d請(qǐng)求關(guān)閉連接......\n",client_fd);
strcpy(write_buf,"recevie your shutdown signal 收到你的關(guān)閉信號(hào)\n");
send_count=send(client_fd,write_buf,strlen(write_buf),0);
handle_error("send",send_count);
clear_buf(write_buf);
//從epoll文件描述法符中移除該client_fd
epoll_ctl(epollfd,EPOLL_CTL_DEL,client_fd,NULL);
printf("釋放client_fd:%d資源\n",client_fd);
shutdown(client_fd,SHUT_WR);
close(client_fd);
}
}
}
}
printf("服務(wù)端關(guān)閉后資源釋放\n");
close(epollfd);
close(sockfd);
free(read_buf);
free(write_buf);
return 0;
}
理論與現(xiàn)實(shí)的割裂
從上面的理論可以看出,AIO似乎是版本答案,在C#中,AIO已經(jīng)充斥著每一個(gè)角落,但在JAVA的世界中,更加主流的是NIO,這是為什么呢?
1. Linux的支持不足
Linux 內(nèi)核直到 3.11 版本(2013 年)才支持真正的異步 IO(io_uring),從而間接影響了JAVA的發(fā)展,Java的 AIO直到 2011 年Java 7才正式發(fā)布,而其前一代 NIO已發(fā)展近 10 年。
而Windows的IOCP在Windows NT 3.5就登上了歷史舞臺(tái),加上C#起步較晚,沒有歷史包袱,所以對(duì)AIO支持力度更大,尤其是2012年發(fā)布了async/await異步模型后,解決了回調(diào)地獄,實(shí)現(xiàn)了1+1>3的效果。
2. JAVA的路徑依賴
NIO生態(tài)過于強(qiáng)大,尤其是以Netty/Redis為首的經(jīng)典實(shí)現(xiàn),實(shí)在是太香了!
3. 理論優(yōu)勢(shì)并未轉(zhuǎn)換為實(shí)際收益
AIO的性能在特定場(chǎng)景(如超大規(guī)模文件讀寫、長(zhǎng)連接低活躍)下可能優(yōu)于NIO,但在互聯(lián)網(wǎng)場(chǎng)景中,NIO的足夠高效,比如HTTP請(qǐng)求,AIO的異步回調(diào)優(yōu)勢(shì)相對(duì)輪詢并不明顯。
| 維度 | Java AIO未普及的原因 | C# AIO普及的原因 |
|---|---|---|
| 歷史發(fā)展 | NIO早于AIO 9年推出,生態(tài)成熟;AIO定位模糊,未解決NIO的核心痛點(diǎn)(如編程復(fù)雜度) | AIO與async/await同步推出,解決了異步編程的“回調(diào)地獄”,成為高并發(fā)編程的默認(rèn)選擇 |
| 跨平臺(tái) | 需適配多系統(tǒng)異步機(jī)制(如Linux的epoll、macOS的kqueue),實(shí)際性能提升有限 |
早期綁定Windows IOCP,性能穩(wěn)定;跨平臺(tái)后對(duì)AIO需求不迫切 |
| 生態(tài) | Netty等NIO框架統(tǒng)治市場(chǎng),切換AIO成本高 | 缺乏NIO統(tǒng)治級(jí)框架,AIO通過async/await成為原生選擇 |
| 開發(fā)者習(xí)慣 | NIO代碼雖復(fù)雜,但通過框架封裝已足夠易用;AIO回調(diào)模式學(xué)習(xí)成本更高 | async/await語法糖讓異步代碼接近同步,開發(fā)者更易接受 |
| 性能場(chǎng)景 | 大多數(shù)場(chǎng)景下NIO已足夠高效,AIO的優(yōu)勢(shì)未顯著體現(xiàn) | Windows IOCP場(chǎng)景下AIO性能優(yōu)勢(shì)明顯,且覆蓋主流企業(yè)級(jí)需求 |
說人話就是,Netty太香了,完全沒動(dòng)力切換成AIO,順帶吐槽C#中沒有類似的框架。dotnetty不算,已經(jīng)停止更新了。
總結(jié)
以上是生活随笔為你收集整理的C#网络编程(六)----Socket编程模型的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 45分钟从零搭建私有MaaS平台和生产级
- 下一篇: 【经验】Python3|输入多个整数(m