udp 使用connect优点_nodejs源码分析第十九章 -- udp模块
udp不是面向連接的協議,所以使用上會比tcp簡單,他和tcp一樣,使用四元組來標記通信的雙方(單播的情況下)。我們看看udp作為服務器和客戶端的時候的流程。
1 在c語言中使用udp
1.1 服務器流程(偽代碼)
// 申請一個socket int fd = socket(...); // 綁定一個眾所周知的地址,像tcp一樣 bind(fd, ip, port); // 直接阻塞等待消息的到來,因為udp不是面向連接的,所以不需要listen recvmsg();1.2 客戶端流程
客戶端的流程有多種方式,原因在于源ip和端口可以有多種設置方式,不像服務器一樣,服務器的ip和端口是需要對外公布的,否則客戶端就無法找到目的地進行通信。這就意味著服務器的ip端口是需要用戶顯式指定的,而客戶端則不然,客戶端的ip端口是隨意選擇的,用戶可以自己指定,也可以由操作系統決定,下面我們看看各種使用方式。
1.2.1 顯式指定ip端口
// 申請一個socket int fd = socket(...); // 綁定一個客戶端的地址 bind(fd, ip, port); // 給服務器發送數據 sendto(fd, 服務器ip,服務器端口, data);1.2.2 由操作系統決定源ip和端口
// 申請一個socket int fd = socket(...); // 給服務器發送數據 sendto(fd, 服務器ip,服務器端口, data)我們看到這里直接就給服務器發送數據,如果用戶不指定ip和端口,則操作系統會提供默認的源ip和端口,不過端口是在第一個調用sendto的時候就設置了,并且不能修改,但是如果是多宿主主機,每次調用sendto的時候,操作系統會動態選擇源ip。另外還有另外一種使用方式。
// 申請一個socket int fd = socket(...); connect(fd, 服務器ip,服務器端口); // 給服務器發送數據,或者sendto(fd, null,null, data),調用sendto則不需要再指定服務器ip和端口 write(fd, data);我們可以先調用connect綁定服務器ip和端口到fd,然后直接調用write發送數據。 雖然使用方式很多,但是歸根到底還是對四元組設置的管理。bind是綁定源ip端口到fd,connect是綁定服務器ip端口到fd。我們可以主動調用他們來對fd進行設置,也可以讓操作系統隨機選擇。
1.3 發送數據
我們剛才看到使用udp之前都需要調用socket函數申請一個socket,雖然調用socket函數返回的是一個fd,但是在操作系統中,的確是新建了一個socket對象,fd只是一個索引,操作這個fd的時候,操作系統會根據這個fd找到對應的socket。socket是一個非常復雜的結構體,我們可以理解為一個對象。這個對象中有兩個屬性,一個是讀緩沖區大小,一個是寫緩沖區大小。當我們發送數據的時候,雖然理論上可以發送任意大小的數據,但是因為受限于發送緩沖區的大小,如果需要發送的數據比當前緩沖區大小大則會導致一些問題,我們分情況分析一下。 1 發送的數據大小比當前緩沖區大,如果設置了非阻塞模式,則返回EAGAIN,如果是阻塞模式,則會引起進程的阻塞。 2 如果發送的數據大小比緩沖區的最大值還大,則會導致一直阻塞或者返回EAGAIN。我們可能會想到修改緩沖區最大值的大小,但是這個大小也是有限制的。 講完一些邊界情況,我們再來看看正常的流程,我們看看發送一個數據包的流程 1 首先在socket的寫緩沖區申請一塊內存用于數據發送。 2 調用ip層發送接口,如果數據包大小超過了ip層的限制,則需要分包。因為udp不是可靠的,所以不需要緩存這個數據包。 這就是udp發送數據的流程。
1.4 接收數據
當收到一個udp數據包的時候,操作系統首先會把這個數據包緩存到socket的緩沖區,如果收到的數據包比當前緩沖區大小大,則丟棄數據包(關于大小的限制可以參考1.3章節),否則把數據包掛載到接收隊列,等用戶來讀取的時候,就逐個摘下接收隊列的節點。
2 udp模塊在nodejs中的實現
2.1 udp服務器
我們從一個使用例子開始看看udp模塊的實現。
const dgram = require('dgram'); // 創建一個socket對象 const server = dgram.createSocket('udp4'); // 監聽udp數據的到來 server.on('message', (msg, rinfo) => {// 處理數據 }); // 綁定端口 server.bind(41234);我們看到創建一個udp服務器很簡單,首先申請一個socket對象,在nodejs中和操作系統中一樣,socket是對網絡通信的一個抽象,我們可以把他理解成對傳輸層的抽象,他可以代表tcp也可以代表udp。我們看一下createSocket做了什么。
function createSocket(type, listener) {return new Socket(type, listener); } function Socket(type, listener) {EventEmitter.call(this);let lookup;let recvBufferSize;let sendBufferSize;let options;if (type !== null && typeof type === 'object') {options = type;type = options.type;lookup = options.lookup;recvBufferSize = options.recvBufferSize;sendBufferSize = options.sendBufferSize;}const handle = newHandle(type, lookup); this.type = type;if (typeof listener === 'function')this.on('message', listener);this[kStateSymbol] = {handle,receiving: false,bindState: BIND_STATE_UNBOUND,connectState: CONNECT_STATE_DISCONNECTED,queue: undefined,reuseAddr: options && options.reuseAddr, // Use UV_UDP_REUSEADDR if true.ipv6Only: options && options.ipv6Only,recvBufferSize,sendBufferSize}; }我們看到一個socket對象是對handle的一個封裝。我們看看handle是什么。
function newHandle(type, lookup) {// 用于dns解析的函數,比如我們調send的時候,傳的是一個域名if (lookup === undefined) {if (dns === undefined) {dns = require('dns');}lookup = dns.lookup;} if (type === 'udp4') {const handle = new UDP();handle.lookup = lookup4.bind(handle, lookup);return handle;}// 忽略ipv6的處理 }handle又是對UDP模塊的封裝,UDP是c++模塊,我們看看該c++模塊的定義。
// 定義一個v8函數模塊 Local<FunctionTemplate> t = env->NewFunctionTemplate(New);// t新建的對象需要額外拓展的內存t->InstanceTemplate()->SetInternalFieldCount(1);// 導出給js層使用的名字Local<String> udpString = FIXED_ONE_BYTE_STRING(env->isolate(), "UDP");t->SetClassName(udpString);// 屬性的存取屬性enum PropertyAttribute attributes =static_cast<PropertyAttribute>(ReadOnly | DontDelete);Local<Signature> signature = Signature::New(env->isolate(), t);// 新建一個函數模塊Local<FunctionTemplate> get_fd_templ =FunctionTemplate::New(env->isolate(),UDPWrap::GetFD,env->as_callback_data(),signature);// 設置一個訪問器,訪問fd屬性的時候,執行get_fd_templ,從而執行UDPWrap::GetFDt->PrototypeTemplate()->SetAccessorProperty(env->fd_string(),get_fd_templ,Local<FunctionTemplate>(),attributes);// 導出的函數env->SetProtoMethod(t, "open", Open);// 忽略一系列函數// 導出給js層使用target->Set(env->context(),udpString,t->GetFunction(env->context()).ToLocalChecked()).Check();在c++層通用邏輯中我們講過相關的知識,這里就不詳細講述了,當我們在js層new UDP的時候,會新建一個c++對象。
UDPWrap::UDPWrap(Environment* env, Local<Object> object): HandleWrap(env,object,reinterpret_cast<uv_handle_t*>(&handle_),AsyncWrap::PROVIDER_UDPWRAP) {int r = uv_udp_init(env->event_loop(), &handle_); }執行了uv_udp_init初始化udp對應的handle。我們看一下libuv的定義。
int uv_udp_init_ex(uv_loop_t* loop, uv_udp_t* handle, unsigned int flags) {int domain;int err;int fd;/* Use the lower 8 bits for the domain */domain = flags & 0xFF;// 申請一個socket,返回一個fdfd = uv__socket(domain, SOCK_DGRAM, 0);uv__handle_init(loop, (uv_handle_t*)handle, UV_UDP);handle->alloc_cb = NULL;handle->recv_cb = NULL;handle->send_queue_size = 0;handle->send_queue_count = 0;// 初始化io觀察者(還沒有注冊到事件循環的poll io階段),監聽的文件描述符是fd,回調是uv__udp_iouv__io_init(&handle->io_watcher, uv__udp_io, fd);// 初始化寫隊列QUEUE_INIT(&handle->write_queue);QUEUE_INIT(&handle->write_completed_queue);return 0; }到這里,就是我們在js層執行dgram.createSocket('udp4')的時候,在nodejs中主要的執行過程。回到最開始的例子,我們看一下執行bind的時候的邏輯。
Socket.prototype.bind = function(port_, address_ /* , callback */) {let port = port_;// socket的狀態const state = this[kStateSymbol];// 已經綁定過了則報錯if (state.bindState !== BIND_STATE_UNBOUND)throw new ERR_SOCKET_ALREADY_BOUND();// 否則標記已經綁定了state.bindState = BIND_STATE_BINDING;// 沒傳地址則默認綁定所有地址if (!address) {if (this.type === 'udp4')address = '0.0.0.0';elseaddress = '::';}// dns解析后在綁定,如果需要的話state.handle.lookup(address, (err, ip) => {if (err) {state.bindState = BIND_STATE_UNBOUND;this.emit('error', err);return;}const err = state.handle.bind(ip, port || 0, flags);if (err) {const ex = exceptionWithHostPort(err, 'bind', ip, port);state.bindState = BIND_STATE_UNBOUND;this.emit('error', ex);// Todo: close?return;}startListening(this);return this; }bind函數主要的邏輯是handle.bind和startListening。我們一個個看。我們看一下c++層的bind。
void UDPWrap::DoBind(const FunctionCallbackInfo<Value>& args, int family) {UDPWrap* wrap;ASSIGN_OR_RETURN_UNWRAP(&wrap,args.Holder(),args.GetReturnValue().Set(UV_EBADF));// bind(ip, port, flags)CHECK_EQ(args.Length(), 3);node::Utf8Value address(args.GetIsolate(), args[0]);Local<Context> ctx = args.GetIsolate()->GetCurrentContext();uint32_t port, flags;if (!args[1]->Uint32Value(ctx).To(&port) ||!args[2]->Uint32Value(ctx).To(&flags))return;struct sockaddr_storage addr_storage;int err = sockaddr_for_family(family, address.out(), port, &addr_storage);if (err == 0) {err = uv_udp_bind(&wrap->handle_,reinterpret_cast<const sockaddr*>(&addr_storage),flags);}args.GetReturnValue().Set(err); }也沒有太多邏輯,處理參數然后執行uv_udp_bind,uv_udp_bind就不具體展開了,和tcp類似,設置一些標記和屬性,然后執行操作系統bind的函數把本端的ip和端口保存到socket中。我們繼續看startListening。
function startListening(socket) {const state = socket[kStateSymbol];// 有數據時的回調,觸發message事件state.handle.onmessage = onMessage;// 重點,開始監聽數據state.handle.recvStart();state.receiving = true;state.bindState = BIND_STATE_BOUND;if (state.recvBufferSize)bufferSize(socket, state.recvBufferSize, RECV_BUFFER);if (state.sendBufferSize)bufferSize(socket, state.sendBufferSize, SEND_BUFFER);socket.emit('listening'); }重點是recvStart函數,我們到c++的實現。
void UDPWrap::RecvStart(const FunctionCallbackInfo<Value>& args) {UDPWrap* wrap;ASSIGN_OR_RETURN_UNWRAP(&wrap,args.Holder(),args.GetReturnValue().Set(UV_EBADF));int err = uv_udp_recv_start(&wrap->handle_, OnAlloc, OnRecv);// UV_EALREADY means that the socket is already bound but that's okayif (err == UV_EALREADY)err = 0;args.GetReturnValue().Set(err); }OnAlloc, OnRecv分別是分配內存接收數據的函數和數據到來時執行的回調。繼續看libuv
int uv__udp_recv_start(uv_udp_t* handle,uv_alloc_cb alloc_cb,uv_udp_recv_cb recv_cb) {int err;err = uv__udp_maybe_deferred_bind(handle, AF_INET, 0);if (err)return err;// 保存一些上下文handle->alloc_cb = alloc_cb;handle->recv_cb = recv_cb;// 注冊io觀察者到loop,如果事件到來,等到poll io階段處理uv__io_start(handle->loop, &handle->io_watcher, POLLIN);uv__handle_start(handle);return 0; }uvudp_recv_start主要是注冊io觀察者到loop,等待事件到來的時候,在poll io階段處理。前面我們講過,回調函數是uvudp_io。我們看一下事件觸發的時候,該函數怎么處理的。
static void uv__udp_io(uv_loop_t* loop, uv__io_t* w, unsigned int revents) {uv_udp_t* handle;handle = container_of(w, uv_udp_t, io_watcher);// 可讀事件觸發if (revents & POLLIN)uv__udp_recvmsg(handle);// 可寫事件觸發if (revents & POLLOUT) {uv__udp_sendmsg(handle);uv__udp_run_completed(handle);} }我們這里先分析可讀事件的邏輯。我們看uv__udp_recvmsg。
static void uv__udp_recvmsg(uv_udp_t* handle) {struct sockaddr_storage peer;struct msghdr h;ssize_t nread;uv_buf_t buf;int flags;int count;count = 32;do {// 分配內存接收數據,c++層設置的buf = uv_buf_init(NULL, 0);handle->alloc_cb((uv_handle_t*) handle, 64 * 1024, &buf);memset(&h, 0, sizeof(h));memset(&peer, 0, sizeof(peer));h.msg_name = &peer;h.msg_namelen = sizeof(peer);h.msg_iov = (void*) &buf;h.msg_iovlen = 1;// 調操作系統的函數讀取數據do {nread = recvmsg(handle->io_watcher.fd, &h, 0);}while (nread == -1 && errno == EINTR);// 調用c++層回調handle->recv_cb(handle, nread, &buf, (const struct sockaddr*) &peer, flags);} }最終通過操作系統調用recvmsg讀取數據,操作系統收到一個udp數據包的時候,會掛載到socket的接收隊列,如果滿了則會丟棄,當用戶調用recvmsg函數的時候,操作系統就把接收隊列中節點逐個返回給用戶。讀取完后,libuv會回調c++層,然后c++層回調到js層,最后觸發message事件,這就是對應開始那段代碼的message事件。
2.2 客戶端
udp客戶端的流程是
1 調用bind綁定客戶端的地址信息
2 調用connect綁定服務器的地址信息
3 調用sendmsg和recvmsg進行數據通信
我們看一下nodejs里的流程
我們看到nodejs首先調用connect綁定服務器的地址,然后調用send發送信息,最后調用close。我們一個個分析。首先看connect。
Socket.prototype.connect = function(port, address, callback) {port = validatePort(port);// 參數處理if (typeof address === 'function') {callback = address;address = '';} else if (address === undefined) {address = '';}validateString(address, 'address');const state = this[kStateSymbol];// 不是初始化狀態if (state.connectState !== CONNECT_STATE_DISCONNECTED)throw new ERR_SOCKET_DGRAM_IS_CONNECTED();// 設置socket狀態state.connectState = CONNECT_STATE_CONNECTING;// 還沒有綁定客戶端地址信息,則先綁定隨機地址(操作系統決定)if (state.bindState === BIND_STATE_UNBOUND)this.bind({ port: 0, exclusive: true }, null);// 執行bind的時候,state.bindState不是同步設置的if (state.bindState !== BIND_STATE_BOUND) {enqueue(this, _connect.bind(this, port, address, callback));return;}_connect.call(this, port, address, callback); };這里分為兩種情況,一種是在connect之前已經調用了bind,第二種是沒有調用bind,如果沒有調用bind,則在connect之前先要調用bind。我們只分析沒有調用bind的情況,因為這是最長的鏈路。我們看一下bind的邏輯。
// port = {posrt: 0, exclusive : true}, address_ = null Socket.prototype.bind = function(port_, address_ /* , callback */) {let port = port_;const state = this[kStateSymbol];state.bindState = BIND_STATE_BINDING;let address;let exclusive;// 修正參數,這里的port是0,address是nullif (port !== null && typeof port === 'object') {address = port.address || '';exclusive = !!port.exclusive;port = port.port;} else {address = typeof address_ === 'function' ? '' : address_;exclusive = false;}// 沒傳地址默認取全部ipif (!address) {if (this.type === 'udp4')address = '0.0.0.0';elseaddress = '::';}// 這里的地址是ip地址,所以不需要dns解析,但是lookup會在nexttick的時候執行回調state.handle.lookup(address, (err, ip) => {const err = state.handle.bind(ip, port || 0, flags);startListening(this);});return this; };因為bind函數中的lookup不是同步執行傳入的callback,所以這時候會先返回到connect函數。從而connect函數執行以下代碼。
if (state.bindState !== BIND_STATE_BOUND) {enqueue(this, _connect.bind(this, port, address, callback));return;}connect函數先把回調加入隊列。
function enqueue(self, toEnqueue) {const state = self[kStateSymbol];if (state.queue === undefined) {state.queue = [];self.once('error', onListenError);self.once('listening', onListenSuccess);}state.queue.push(toEnqueue); }enqueue把回調加入隊列,并且監聽了listening事件,該事件在bind成功后觸發。這時候connect函數就執行完了,等待bind成功后(nexttick)會執行 startListening(this)。
function startListening(socket) {const state = socket[kStateSymbol];state.handle.onmessage = onMessage;// 注冊等待可讀事件state.handle.recvStart();state.receiving = true;// 標記已bind成功state.bindState = BIND_STATE_BOUND;if (state.recvBufferSize)bufferSize(socket, state.recvBufferSize, RECV_BUFFER);if (state.sendBufferSize)bufferSize(socket, state.sendBufferSize, SEND_BUFFER);// 觸發listening事件socket.emit('listening'); }我們看到這里(bind成功后)觸發了listening事件,從而執行我們剛才入隊的回調onListenSuccess。
function onListenSuccess() {this.removeListener('error', onListenError);clearQueue.call(this); }function clearQueue() {const state = this[kStateSymbol];const queue = state.queue;state.queue = undefined;for (const queueEntry of queue)queueEntry(); }回調就是把隊列中的回調執行一遍,connect函數設置的回調是_connect。
function _connect(port, address, callback) {const state = this[kStateSymbol];if (callback)this.once('connect', callback);const afterDns = (ex, ip) => {defaultTriggerAsyncIdScope(this[async_id_symbol],doConnect,ex, this, ip, address, port, callback);};state.handle.lookup(address, afterDns); }這里的address是服務器地址,_connect函數主要邏輯是
1 監聽connect事件
2 對服務器地址進行dns解析(只能是本地的配的域名)。解析成功后執行afterDns,最后執行doConnect,并傳入解析出來的ip。我們看看doConnect
connect函數通過c++層,然后調用libuv,到操作系統的connect。作用是把服務器地址保存到socket中。connect的流程就走完了。接下來我們就可以調用send和recv發送和接收數據。
2.3 發送數據
發送數據接口是sendto,他是對send的封裝。
Socket.prototype.send = function(buffer,offset,length,port,address,callback) {let list;const state = this[kStateSymbol];const connected = state.connectState === CONNECT_STATE_CONNECTED;// 沒有調用connect綁定過服務端地址,則需要傳服務端地址信息if (!connected) {if (address || (port && typeof port !== 'function')) {buffer = sliceBuffer(buffer, offset, length);} else {callback = port;port = offset;address = length;}} else {if (typeof length === 'number') {buffer = sliceBuffer(buffer, offset, length);if (typeof port === 'function') {callback = port;port = null;}} else {callback = offset;}// 已經綁定了服務端地址,則不能再傳了if (port || address)throw new ERR_SOCKET_DGRAM_IS_CONNECTED();}// 如果沒有綁定服務器端口,則這里需要傳,并且校驗if (!connected)port = validatePort(port);// 忽略一些參數處理邏輯// 沒有綁定客戶端地址信息,則需要先綁定,值由操作系統決定if (state.bindState === BIND_STATE_UNBOUND)this.bind({ port: 0, exclusive: true }, null);// bind還沒有完成,則先入隊,等待bind完成再執行if (state.bindState !== BIND_STATE_BOUND) {enqueue(this, this.send.bind(this, list, port, address, callback));return;}// 已經綁定了,設置服務端地址后發送數據const afterDns = (ex, ip) => {defaultTriggerAsyncIdScope(this[async_id_symbol],doSend,ex, this, ip, list, address, port, callback);};// 傳了地址則可能需要dns解析if (!connected) {state.handle.lookup(address, afterDns);} else {afterDns(null, null);} }我們繼續看doSend函數。
function doSend(ex, self, ip, list, address, port, callback) {const state = self[kStateSymbol];// dns解析出錯if (ex) {if (typeof callback === 'function') {process.nextTick(callback, ex);return;}process.nextTick(() => self.emit('error', ex));return;}// 定義一個請求對象const req = new SendWrap();req.list = list; // Keep reference alive.req.address = address;req.port = port;// 設置nodejs和用戶的回調,oncomplete由c++層調用,callback由oncomplete調用if (callback) {req.callback = callback;req.oncomplete = afterSend;}let err;// 根據是否需要設置服務端地址,調c++層函數if (port)err = state.handle.send(req, list, list.length, port, ip, !!callback);elseerr = state.handle.send(req, list, list.length, !!callback);// err大于等于1說明同步發送成功了,直接執行回調,否則等待異步回調if (err >= 1) {if (callback)process.nextTick(callback, null, err - 1);return;}// 發送失敗if (err && callback) {// Don't emit as error, dgram_legacy.js compatibilityconst ex = exceptionWithHostPort(err, 'send', address, port);process.nextTick(callback, ex);} }我們穿過c++層,直接看libuv的代碼。
int uv__udp_send(uv_udp_send_t* req,uv_udp_t* handle,const uv_buf_t bufs[],unsigned int nbufs,const struct sockaddr* addr,unsigned int addrlen,uv_udp_send_cb send_cb) {int err;int empty_queue;assert(nbufs > 0);// 還沒有綁定服務端地址,則綁定if (addr) {err = uv__udp_maybe_deferred_bind(handle, addr->sa_family, 0);if (err)return err;}// 當前寫隊列是否為空empty_queue = (handle->send_queue_count == 0);// 初始化一個寫請求uv__req_init(handle->loop, req, UV_UDP_SEND);if (addr == NULL)req->addr.ss_family = AF_UNSPEC;elsememcpy(&req->addr, addr, addrlen);// 保存上下文req->send_cb = send_cb;req->handle = handle;req->nbufs = nbufs;// 初始化數據,預分配的內存不夠,則分配新的堆內存req->bufs = req->bufsml;if (nbufs > ARRAY_SIZE(req->bufsml))req->bufs = uv__malloc(nbufs * sizeof(bufs[0]));// 復制過去堆中memcpy(req->bufs, bufs, nbufs * sizeof(bufs[0]));// 更新寫隊列數據handle->send_queue_size += uv__count_bufs(req->bufs, req->nbufs);handle->send_queue_count++;// 插入寫隊列,等待可寫事件的發生QUEUE_INSERT_TAIL(&handle->write_queue, &req->queue);uv__handle_start(handle);// 當前寫隊列為空,則直接開始寫,否則設置等待可寫隊列if (empty_queue && !(handle->flags & UV_HANDLE_UDP_PROCESSING)) {// 發送數據uv__udp_sendmsg(handle);// 寫隊列是否非空,則設置等待可寫事件,可寫的時候接著寫if (!QUEUE_EMPTY(&handle->write_queue))uv__io_start(handle->loop, &handle->io_watcher, POLLOUT);} else {uv__io_start(handle->loop, &handle->io_watcher, POLLOUT);}return 0; }該函數把寫請求插入寫隊列中等待可寫事件的到來。然后注冊等待可寫事件。當可寫事件觸發的時候,執行的函數是uv__udp_io。
static void uv__udp_io(uv_loop_t* loop, uv__io_t* w, unsigned int revents) {uv_udp_t* handle;if (revents & POLLOUT) {uv__udp_sendmsg(handle);uv__udp_run_completed(handle);} }我們先看uv__udp_sendmsg
static void uv__udp_sendmsg(uv_udp_t* handle) {uv_udp_send_t* req;QUEUE* q;struct msghdr h;ssize_t size;// 逐個節點發送while (!QUEUE_EMPTY(&handle->write_queue)) {q = QUEUE_HEAD(&handle->write_queue);req = QUEUE_DATA(q, uv_udp_send_t, queue);memset(&h, 0, sizeof h);// 忽略參數處理h.msg_iov = (struct iovec*) req->bufs;h.msg_iovlen = req->nbufs;do {size = sendmsg(handle->io_watcher.fd, &h, 0);} while (size == -1 && errno == EINTR);if (size == -1) {// 繁忙則先不發了,等到可寫事件if (errno == EAGAIN || errno == EWOULDBLOCK || errno == ENOBUFS)break;}// 記錄發送結果req->status = (size == -1 ? UV__ERR(errno) : size);// 發送“完”移出寫隊列QUEUE_REMOVE(&req->queue);// 加入寫完成隊列QUEUE_INSERT_TAIL(&handle->write_completed_queue, &req->queue);// 有節點數據寫完了,把io觀察者插入pending隊列,pending階段執行回調uv__udp_iouv__io_feed(handle->loop, &handle->io_watcher);} }該函數遍歷寫隊列,然后逐個發送節點中的數據,并記錄發送結果, 1 如果寫繁忙則結束寫邏輯,等待下一次寫事件觸發。
2 如果寫成功則把節點插入寫完成隊列中,并且把io觀察者插入pending隊列,等待pending階段執行回調uvudp_io。 我們再次回到uvudp_io中
當寫事件觸發時,執行完數據發送的邏輯后還會處理寫完成隊列。我們看uv__udp_run_completed。
static void uv__udp_run_completed(uv_udp_t* handle) {uv_udp_send_t* req;QUEUE* q;handle->flags |= UV_HANDLE_UDP_PROCESSING;// 逐個節點處理while (!QUEUE_EMPTY(&handle->write_completed_queue)) {q = QUEUE_HEAD(&handle->write_completed_queue);QUEUE_REMOVE(q);req = QUEUE_DATA(q, uv_udp_send_t, queue);uv__req_unregister(handle->loop, req);// 更新待寫數據大小handle->send_queue_size -= uv__count_bufs(req->bufs, req->nbufs);handle->send_queue_count--;// 如果重新申請了堆內存,則需要釋放if (req->bufs != req->bufsml)uv__free(req->bufs);req->bufs = NULL;if (req->send_cb == NULL)continue;// 執行回調if (req->status >= 0)req->send_cb(req, 0);elsereq->send_cb(req, req->status);}// 寫隊列為空,則注銷等待可寫事件if (QUEUE_EMPTY(&handle->write_queue)) {uv__io_stop(handle->loop, &handle->io_watcher, POLLOUT);if (!uv__io_active(&handle->io_watcher, POLLIN))uv__handle_stop(handle);}handle->flags &= ~UV_HANDLE_UDP_PROCESSING; }這就是發送的邏輯,發送完后libuv會調用c++回調,最后回調js層回調。具體到操作系統也是類似的實現,操作系統首先判斷數據的大小是否小于寫緩沖區,是的話申請一塊內存,然后構造udp協議數據包,再逐層往下調,最后發送出來,但是如果數據超過了底層的報文大小限制,則會被分片。
2.4 多播
udp支持多播,tcp則不支持,因為tcp是基于連接和可靠的,多播則會帶來過多的連接和流量。多播分為局域網多播和廣域網多播,我們知道在局域網內發生一個數據,是會以廣播的形式發送到各個主機的,主機根據目的地址判斷是否需要處理該數據包。如果udp是單播的模式,則只會有一個主機會處理該數據包。如果udp是多播的模式,則有多個主機處理該數據包。多播的時候,存在一個多播組的概念,只有加入這個組的主機才能處理該組的數據包。假設有以下局域網
當主機1給多播組1發送數據的時候,主機2,4可以收到,主機3則無法收到。我們再來看看廣域網的多播。廣域網的多播需要路由器的支持,多個路由器之間會使用多播路由協議交換多播組的信息。假設有以下廣域網。
當主機1給多播組1發送數據的時候,路由器1會給路由器2發送一份數據(通過多播路由協議交換了信息,路由1知道路由器2的主機4在多播組1中),但是路由器2不會給路由器3發送數據,因為他知道路由器3對應的網絡中沒有主機在多播組1。以上是多播的一些概念。nodejs中關于多播的實現,基本是對操作系統api的封裝,所以就不打算講解,我們直接看操作系統中對于多播的實現。
2.4.1 加入一個多播組
可以通過以下代碼加入一個多播組。
setsockopt(fd,IPPROTO_IP,IP_ADD_MEMBERSHIP,&mreq, // device對應的ip和加入多播組的ipsizeof(mreq));mreq的結構體定義如下
struct ip_mreq {struct in_addr imr_multiaddr; /* IP multicast address of group */struct in_addr imr_interface; /* local IP address of interface */ };我們看一下setsockopt的實現(只列出相關部分代碼)
case IP_ADD_MEMBERSHIP: {struct ip_mreq mreq;static struct options optmem;unsigned long route_src;struct rtable *rt;struct device *dev=NULL;err=verify_area(VERIFY_READ, optval, sizeof(mreq));memcpy_fromfs(&mreq,optval,sizeof(mreq));// 沒有設置device則根據多播組ip選擇一個deviceif(mreq.imr_interface.s_addr==INADDR_ANY) {if((rt=ip_rt_route(mreq.imr_multiaddr.s_addr,&optmem, &route_src))!=NULL){dev=rt->rt_dev;rt->rt_use--;}}else{// 根據device ip找到,找到對應的devicefor(dev = dev_base; dev; dev = dev->next){// 在工作狀態、支持多播,ip一樣if((dev->flags&IFF_UP)&&(dev->flags&IFF_MULTICAST)&&(dev->pa_addr==mreq.imr_interface.s_addr))break;}}// 加入多播組return ip_mc_join_group(sk,dev,mreq.imr_multiaddr.s_addr);}拿到加入的多播組ip和device后,調用ip_mc_join_group,在socket結構體中,有一個字段維護了該socket加入的多播組信息。
int ip_mc_join_group(struct sock *sk , struct device *dev, unsigned long addr) {int unused= -1;int i;// 還沒有加入過多播組if(sk->ip_mc_list==NULL){if((sk->ip_mc_list=(struct ip_mc_socklist *)kmalloc(sizeof(*sk->ip_mc_list), GFP_KERNEL))==NULL)return -ENOMEM;memset(sk->ip_mc_list,'0',sizeof(*sk->ip_mc_list));}// 遍歷加入的多播組隊列,判斷是否已經加入過for(i=0;i<IP_MAX_MEMBERSHIPS;i++){if(sk->ip_mc_list->multiaddr[i]==addr && sk->ip_mc_list->multidev[i]==dev)return -EADDRINUSE;if(sk->ip_mc_list->multidev[i]==NULL)unused=i;}// 到這說明沒有加入過當前設置的多播組,則記錄并且加入if(unused==-1)return -ENOBUFS;sk->ip_mc_list->multiaddr[unused]=addr;sk->ip_mc_list->multidev[unused]=dev;// addr為多播組ipip_mc_inc_group(dev,addr);return 0; }ip_mc_join_group函數的主要邏輯是把socket想加入的多播組信息記錄到socket的ip_mc_list字段中(如果還沒有加入過該多播組的話)。接著調ip_mc_inc_group往下走。device層維護了主機中使用了該device的多播組信息。
static void ip_mc_inc_group(struct device *dev, unsigned long addr) {struct ip_mc_list *i;// 遍歷該設置維護的多播組隊列,判斷是否已經有socket加入過該多播組,是則引用數加一for(i=dev->ip_mc_list;i!=NULL;i=i->next){if(i->multiaddr==addr){i->users++;return;}}// 到這說明,還沒有socket加入過當前多播組,則記錄并加入i=(struct ip_mc_list *)kmalloc(sizeof(*i), GFP_KERNEL);if(!i)return;i->users=1;i->interface=dev;i->multiaddr=addr;i->next=dev->ip_mc_list;// 通過igmp通知其他方igmp_group_added(i);dev->ip_mc_list=i; }ip_mc_inc_group函數的主要邏輯是判斷socket想要加入的多播組是不是已經存在于當前device中,如果不是則新增一個節點。繼續調用igmp_group_added
static void igmp_group_added(struct ip_mc_list *im) {// 初始化定時器igmp_init_timer(im);// 發送一個igmp數據包,同步多播組信息(socket加入了一個新的多播組)igmp_send_report(im->interface, im->multiaddr, IGMP_HOST_MEMBERSHIP_REPORT);// 轉換多播組ip到多播mac地址,并記錄到device中ip_mc_filter_add(im->interface, im->multiaddr); }我們看看igmp_send_report和ip_mc_filter_add的具體邏輯。
static void igmp_send_report(struct device *dev, unsigned long address, int type) {// 申請一個skb表示一個數據包struct sk_buff *skb=alloc_skb(MAX_IGMP_SIZE, GFP_ATOMIC);int tmp;struct igmphdr *igh;// 構建ip頭,ip協議頭的源ip是INADDR_ANY,即隨機選擇一個本機的,目的ip為多播組ip(address)tmp=ip_build_header(skb, INADDR_ANY, address, &dev, IPPROTO_IGMP, NULL,skb->mem_len, 0, 1);// data表示所有的數據部分,tmp表示ip頭大小,所以igh就是ip協議的數據部分,即igmp報文的內容igh=(struct igmphdr *)(skb->data+tmp);skb->len=tmp+sizeof(*igh);igh->csum=0;igh->unused=0;igh->type=type;igh->group=address;igh->csum=ip_compute_csum((void *)igh,sizeof(*igh));// 調用ip層發送出去ip_queue_xmit(NULL,dev,skb,1); }igmp_send_report其實就是構造一個igmp協議數據包,然后發送出去,igmp的協議格式如下
struct igmphdr {// 類型unsigned char type;unsigned char unused;// 校驗和unsigned short csum;// igmp的數據部分,比如加入多播組的時候,group表示多播組ipunsigned long group; };接著我們看ip_mc_filter_add
void ip_mc_filter_add(struct device *dev, unsigned long addr) {char buf[6];// 把多播組ip轉成mac多播地址addr=ntohl(addr);buf[0]=0x01;buf[1]=0x00;buf[2]=0x5e;buf[5]=addr&0xFF;addr>>=8;buf[4]=addr&0xFF;addr>>=8;buf[3]=addr&0x7F;dev_mc_add(dev,buf,ETH_ALEN,0); }我們知道ip地址是32位,mac地址是48位,但是IANA規定,ipv4組播MAC地址的高24位是0x01005E,第25位是0,低23位是ipv4組播地址的低23位。而多播的ip地址高四位固定是1110。另外低23位被映射到mac多播地址的23位,所以多播ip地址中,有5位是可以隨機組合的。這就意味著,每32個多播ip地址,映射到一個mac地址。這會帶來一些問題,假設主機x加入了多播組a,主機y加入了多播組b,而a和b對應的mac多播地址是一樣的。當主機z給多播組a發送一個數據包的時候,這時候主機x和y的網卡都會處理該數據包,并上報到上層,但是多播組a對應的mac多播地址和多播組b是一樣的。我們拿到一個多播組ip的時候,可以計算出他的多播mac地址,但是反過來就不行,因為一個多播mac地址對應了32個多播ip地址。那主機x和y怎么判斷是不是發給自己的數據包?因為device維護了一個本device上的多播ip列表,操作系統根據收到的數據包中的ip目的地址和device的多播ip列表對比。如果在列表中,則說明是發給自己的。最后我們看看dev_mc_add。device中維護了當前的mac多播地址列表,他會把這個列表信息同步到網卡中,使得網卡可以處理該列表中多播mac地址的數據包。
void dev_mc_add(struct device *dev, void *addr, int alen, int newonly) {struct dev_mc_list *dmi;// device維護的多播mac地址列表for(dmi=dev->mc_list;dmi!=NULL;dmi=dmi->next){// 已存在,則引用計數加一if(memcmp(dmi->dmi_addr,addr,dmi->dmi_addrlen)==0 && dmi->dmi_addrlen==alen){if(!newonly)dmi->dmi_users++;return;}}// 不存在則新增一個項到device列表中dmi=(struct dev_mc_list *)kmalloc(sizeof(*dmi),GFP_KERNEL);memcpy(dmi->dmi_addr, addr, alen);dmi->dmi_addrlen=alen;dmi->next=dev->mc_list;dmi->dmi_users=1;dev->mc_list=dmi;dev->mc_count++;// 通知網卡需要處理該多播mac地址dev_mc_upload(dev); }網卡的工作模式有幾種,分別是正常模式(只接收發給自己的數據包)、混雜模式(接收所有數據包)、多播模式(接收一般數據包和多播數據包)。網卡默認是只處理發給自己的數據包,所以當我們加入一個多播組的時候,我們需要告訴網卡,當收到該多播組的數據包時,需要處理,而不是忽略。dev_mc_upload函數就是通知網卡。
void dev_mc_upload(struct device *dev) {struct dev_mc_list *dmi;char *data, *tmp;// 不工作了if(!(dev->flags&IFF_UP))return;// 當前是混雜模式,則不需要設置多播了,因為網卡會處理所有收到的數據,不管是不是發給自己的if(dev->flags&IFF_PROMISC){dev->set_multicast_list(dev, -1, NULL);return;}// 多播地址個數,為0,則設置網卡工作模式為正常模式,因為不需要處理多播了if(dev->mc_count==0){dev->set_multicast_list(dev,0,NULL);return;}data=kmalloc(dev->mc_count*dev->addr_len, GFP_KERNEL);// 復制所有的多播mac地址信息for(tmp = data, dmi=dev->mc_list;dmi!=NULL;dmi=dmi->next){memcpy(tmp,dmi->dmi_addr, dmi->dmi_addrlen);tmp+=dev->addr_len;}// 告訴網卡dev->set_multicast_list(dev,dev->mc_count,data);kfree(data); }最后我們看一下set_multicast_list
static void set_multicast_list(struct device *dev, int num_addrs, void *addrs) {int ioaddr = dev->base_addr;// 多播模式if (num_addrs > 0) {outb(RX_MULT, RX_CMD);inb(RX_STATUS); /* Clear status. */} else if (num_addrs < 0) { // 混雜模式outb(RX_PROM, RX_CMD);inb(RX_STATUS);} else { // 正常模式outb(RX_NORM, RX_CMD);inb(RX_STATUS);} }set_multicast_list就是設置網卡工作模式的函數。至此,我們就成功加入了一個多播組。離開一個多播組也是類似的過程。
2.4.2 開啟多播
udp的多播能力是需要用戶主動開啟的,原因是防止用戶發送udp數據包的時候,誤傳了一個多播地址,但其實用戶是想發送一個單播的數據包。我們可以通過setBroadcast開啟多播能力。我們看libuv的代碼。
int uv_udp_set_broadcast(uv_udp_t* handle, int on) {if (setsockopt(handle->io_watcher.fd,SOL_SOCKET,SO_BROADCAST,&on,sizeof(on))) {return UV__ERR(errno);}return 0; }再看看操作系統的實現。
int sock_setsockopt(struct sock *sk, int level, int optname,char *optval, int optlen){...case SO_BROADCAST:sk->broadcast=val?1:0; }我們看到實現很簡單,就是設置一個標記位。當我們發送消息的時候,如果目的地址是多播地址,但是又沒有設置這個標記,則會報錯。
if(!sk->broadcast && ip_chk_addr(sin.sin_addr.s_addr)==IS_BROADCAST)return -EACCES;上面代碼來自調用udp的發送函數(例如sendto)時,進行的校驗,如果發送的目的ip是多播地址,但是沒有設置多播標記,則報錯。
2.4.3 其他功能
udp模塊還提供了其他一些功能
1 獲取本端地址address
如果用戶沒有顯示調用bind綁定自己設置的ip和端口,那么操作系統就會隨機選擇。通過address函數就可以獲取操作系統選擇的源ip和端口。
2 獲取對端的地址
通過remoteAddress函數可以獲取對端地址。該地址由用戶調用connect或sendto函數時設置。
3 獲取/設置緩沖區大小get/setRecvBufferSize,get/setSendBufferSize
4 setMulticastLoopback
發送多播數據包的時候,如果多播ip在出口設備的多播列表中,則給回環設備也發一份。
5 setMulticastInterface
設置多播數據的出口設備
6 加入或退出多播組addMembership/dropMembership
7 addSourceSpecificMembership/dropSourceSpecificMembership
這兩個函數是設置本端只接收特性源(主機)的多播數據包。
8 setTTL
單播ttl(單播的時候,ip協議頭中的ttl字段)。
9 setMulticastTTL
多播ttl(多播的時候,ip協議的ttl字段)。
10 ref/unref
這兩個函數設置如果nodejs主進程中只有udp對應的handle時,是否允許nodejs退出。nodejs事件循環的退出的條件之一是是否還有ref狀態的handle。 這些都是對操作系統api的封裝,就不一一分析。
3 具體例子
局域網中有兩個局域網ip,分別是192.168.8.164和192.168.8.226
單播
服務器端
const dgram = require('dgram'); const udp = dgram.createSocket('udp4'); udp.bind(1234); udp.on('message', (msg, remoteInfo) => {console.log(`receive msg: ${msg} from ${remoteInfo.address}:${remoteInfo.port}`); });客戶端
const dgram = require('dgram'); const udp = dgram.createSocket('udp4'); udp.bind(1234); udp.send('test', 1234, '192.168.8.226');我們會看到服務端會顯示receive msg test from 192.168.8.164:1234。
多播
服務器
const dgram = require('dgram'); const udp = dgram.createSocket('udp4');udp.bind(1234, () => {// 局域網多播地址(224.0.0.0~224.0.0.255,該范圍的多播數據包,路由器不會轉發)udp.addMembership('224.0.0.114'); });udp.on('message', (msg, rinfo) => {console.log(`receive msg: ${msg} from ${rinfo.address}:${rinfo.port}`); });服務器綁定1234端口后,加入多播組224.0.0.114,然后等待多播數據的到來。
客戶端
const dgram = require('dgram'); const udp = dgram.createSocket('udp4'); udp.bind(1234, () => {udp.addMembership('224.0.0.114'); }); udp.send('test', 1234, '224.0.0.114', (err) => {});客戶端綁定1234端口后,也加入了多播組224.0.0.114,然后發送數據,但是發現服務端沒有收到數據,客戶端打印了receive msg test from 169.254.167.41:1234。這怎么多了一個ip出來?原來我主機有兩個局域網地址。當我們加入多播組的時候,不僅可以設置加入哪個多播組,還能設置出口的設備和ip。當我們調用udp.addMembership('224.0.0.114')的時候,我們只是設置了我們加入的多播組,沒有設置出口。這時候操作系統會為我們選擇一個。根據輸出,我們發現操作系統選擇的是169.254.167.41(子網掩碼是255.255.0.0)。因為這個ip和192開頭的那個不是同一子網,但是我們加入的是局域網的多播ip,所有服務端無法收到客戶端發出的數據包。下面是nodejs文檔的解釋。
Tells the kernel to join a multicast group at the given multicastAddress and multicastInterface using the IP_ADD_MEMBERSHIP socket option. If the multicastInterface argument is not specified, the operating system will choose one interface and will add membership to it. To add membership to every available interface, call addMembership multiple times, once per interface.我們看一下操作系統的相關邏輯。
if(MULTICAST(daddr) && *dev==NULL && skb->sk && *skb->sk->ip_mc_name)*dev=dev_get(skb->sk->ip_mc_name);上面的代碼來自操作系統發送ip數據包時的邏輯,如果目的ip似乎多播地址并且ip_mc_name非空(即我們通過addMembership第二個參數設置的值),則出口設備就是我們設置的值。否則操作系統自己選。所以我們需要顯示指定這個出口,把代碼改成udp.addMembership('224.0.0.114', '192.168.8.164');重新執行發現客戶端和服務器都顯示了receive msg test from 192.168.8.164:1234。為什么客戶端自己也會收到呢?原來操作系統發送多播數據的時候,也會給自己發送一份。我們看看相關邏輯
// 目的地是多播地址,并且不是回環設備 if (MULTICAST(iph->daddr) && !(dev->flags&IFF_LOOPBACK)) {// 是否需要給自己一份,默認為trueif(sk==NULL || sk->ip_mc_loop){ // 給所有多播組的所有主機的數據包,則直接給自己一份if(iph->daddr==IGMP_ALL_HOSTS)ip_loopback(dev,skb);else{ // 判斷目的ip是否在當前設備的多播ip列表中,是的回傳一份struct ip_mc_list *imc=dev->ip_mc_list;while(imc!=NULL){if(imc->multiaddr==iph->daddr){ip_loopback(dev,skb);break;}imc=imc->next;}}} }以上代碼來自ip層發送數據包時的邏輯。如果我們設置了sk->ip_mc_loop字段為1,并且數據包的目的ip在出口設備的多播列表中,則需要給自己回傳一份。那么我們如何關閉這個特性呢?調用udp.setMulticastLoopback(false)就可以了。
更多參考
1 通過源碼理解IGMP v1的實現(基于linux1.2.13)
2 UDP協議源碼解析之接收
3 UDP協議源碼解析之發送
總結
以上是生活随笔為你收集整理的udp 使用connect优点_nodejs源码分析第十九章 -- udp模块的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: linux删除Python(linux删
- 下一篇: 安卓设置动态锁屏(安卓动态屏锁)