nng源码阅读
Core
thread & mutex & cv
nni_mtx
互斥鎖, 提供nni_mtx_init,?nni_mtx_fini,?nni_mtx_lock,?nni_mtx_unlock接口.
nni_cv
條件變量, 提供nni_cv_init,?nni_cv_fini,?nni_cv_wake,?nni_cv_wake1,?nni_cv_until接口.
POSIX下就是pthread的條件變量.
nni_thr
int nni_thr_init(nni_thr *thr, nni_thr_func fn, void *arg);nni_thr_run,?nni_thr_fini,?nni_thr_wait,?nni_thr_is_self
線程有init, start, stop, done4個(gè)狀態(tài), 執(zhí)行nni_thr_init后, 進(jìn)入init狀態(tài). 執(zhí)行nni_thr_run進(jìn)入start狀態(tài). 執(zhí)行nni_thr_wait后, 首先設(shè)置為stop狀態(tài), 然后等待線程變成done狀態(tài).
nni_thr并不直接執(zhí)行nni_thr_func函數(shù), 而是執(zhí)行nni_thr_wrap函數(shù), 該函數(shù)內(nèi)部再調(diào)用nni_thr_init提供的回調(diào)函數(shù).
nni_thr_wrap
taskq
taskq為任務(wù)隊(duì)列, 內(nèi)部包含一個(gè)鏈表, 鏈表中為需要執(zhí)行的task, 每個(gè)taskq里面會(huì)有多個(gè)線程來處理鏈表里面的task.
struct nni_taskq {nni_list tq_tasks; // 任務(wù)列表, 雙向鏈表實(shí)現(xiàn)nni_mtx tq_mtx; // 鎖nni_cv tq_sched_cv; // 條件變量nni_cv tq_wait_cv; // 條件變量nni_taskq_thr *tq_threads; // 線程數(shù)組int tq_nthreads; // 線程數(shù)bool tq_run; // true表示線程開始執(zhí)行. };struct nni_task {nni_list_node task_node; // tq_tasks的節(jié)點(diǎn)void * task_arg; // 任務(wù)的參數(shù)nni_cb task_cb; // 該任務(wù)的回調(diào)函數(shù)nni_taskq * task_tq; // 任務(wù)所屬taskqnni_thr * task_thr; // 執(zhí)行任務(wù)的線程. non-NULL if the task is runningunsigned task_busy;bool task_prep;bool task_reap; // task執(zhí)行完后是否需要銷毀.nni_mtx task_mtx; // task的鎖.nni_cv task_cv; };nni_taskq_init(nni_taskq **tqp, int nthr)
創(chuàng)建一個(gè)新的taskq, nthr指定taskq關(guān)聯(lián)的線程數(shù), 進(jìn)行nni_taskq的初始化. 啟動(dòng)全部關(guān)聯(lián)的線程, 線程的函數(shù)為nni_taskq_thread.
nni_taskq_thread
nni_taskq_sys_init
系統(tǒng)的taskq, 相當(dāng)于全局的taskq, 其中線程數(shù)最少為2, 默認(rèn)為CPU數(shù)乘2, 如果編譯設(shè)置NNG_NUM_TASKQ_THREADS宏, 那么線程數(shù)就是該宏的值.
nni_task_init
創(chuàng)建一個(gè)nni_task. 默認(rèn)情況: task_prep為false, task_reap為false, task_busy為0.
nni_task_prep
task_busy ++, task_prep = true
nni_task_wait
如果task_busy不為0, 那么等待task_cv條件變量
nni_task_dispatch
將task加入到taskq的任務(wù)鏈表, 然后執(zhí)行nni_cv_wake1喚醒tq_sched_cv條件變量, 相當(dāng)于是異步執(zhí)行任務(wù).
nni_task_exec
執(zhí)行task_cb回調(diào)函數(shù). 相當(dāng)于是同步執(zhí)行任務(wù).
aio
struct nng_aio {int a_result; // Result code (nng_errno)size_t a_count; // Bytes transferred (I/O only)nni_time a_expire; // Absolute timeoutnni_duration a_timeout; // Relative timeout// These fields are private to the aio framework.bool a_stop; // shutting down (no new operations)bool a_sleep; // sleeping with no actionint a_sleeprv; // result when sleep wakesnni_task *a_task;// Read/write operations.nni_iov *a_iov;unsigned a_niov;nni_iov a_iovinl[4]; // inline IOVs - when the IOV list is shortnni_iov *a_iovalloc; // dynamically allocated IOVsunsigned a_niovalloc; // number of allocated IOVs// Message operations.nni_msg *a_msg;// User scratch data. Consumers may store values here, which// must be preserved by providers and the framework.void *a_user_data[4];// Operation inputs & outputs. Up to 4 inputs and 4 outputs may be// specified. The semantics of these will vary, and depend on the// specific operation.void *a_inputs[4];void *a_outputs[4];// Provider-use fields.nni_aio_cancelfn a_cancel_fn;void * a_cancel_arg;void * a_prov_data;nni_list_node a_prov_node;void * a_prov_extra[4]; // Extra data used by provider// Socket address. This turns out to be very useful, as we wind up// needing socket addresses for numerous connection related routines.// It would be cleaner to not have this and avoid burning the space,// but having this hear dramatically simplifies lots of code.nng_sockaddr a_sockaddr;// Expire node.nni_list_node a_expire_node; };nni_aio_init
主要是創(chuàng)建a_task. 這個(gè)task關(guān)聯(lián)一個(gè)回調(diào)函數(shù).
a_expire = NNI_TIME_NEVER a_timeout = NNG_DURATION_INFINITE a_iov = a_iovinl a_niovalloc = 0
void a_user_data[4];
可以通過nni_aio_set_data和nni_aio_get_data設(shè)置和獲取數(shù)據(jù),
a_inputs, a_outputs
通過nni_aio_set_input,?nni_aio_get_input,?nni_aio_set_output,?nni_aio_get_output設(shè)置和獲取
nni_aio_begin
進(jìn)行一些初始化
- a_result = 0;
- a_count = 0;
- a_cancel_fn = 0;
- a_outputs = NULL
- nni_task_prep(aio->a_task)
nni_aio_finish
int nni_aio_finish(nni_aio *aio, int result, size_t count)
nni_aio_schedule
該函數(shù)設(shè)置aio的a_expire為當(dāng)前時(shí)間加上aio->a_timeout, 如果a_expire不是NNI_TIME_NEVER, 那么會(huì)使用函數(shù)nni_aio_expire_add將aio 該函數(shù)設(shè)置aio的a_expire為當(dāng)前時(shí)間加上aio->a_timeout, 如果a_expire不是NNI_TIME_NEVER, 那么會(huì)使用函數(shù)nni_aio_expire_add將aio 加入到全局nni_aio_expire_aios鏈表中, nni_aio_sys_init里面會(huì)創(chuàng)建線程執(zhí)行函數(shù)nni_aio_expire_loop. 該函數(shù)設(shè)置aio的a_expire為當(dāng)前時(shí)間加上aio->a_timeout, 如果a_expire不是NNI_TIME_NEVER, 那么會(huì)使用函數(shù)nni_aio_expire_add將aio 該函數(shù)設(shè)置aio的a_expire為當(dāng)前時(shí)間加上aio->a_timeout, 如果a_expire不是NNI_TIME_NEVER, 那么會(huì)使用函數(shù)nni_aio_expire_add將aio 加入到全局nni_aio_expire_aios鏈表中, nni_aio_sys_init里面會(huì)創(chuàng)建線程執(zhí)行函數(shù)nni_aio_expire_loop.
nni_aio_expire_loop會(huì)......
nni_pollable
struct nni_pollable {int p_rfd;int p_wfd;nni_mtx p_lock;bool p_raised;bool p_open; };nni_pollable_getfd會(huì)使用eventfd創(chuàng)建一個(gè)fd, p_rfd和p_wfd都等于這個(gè)fd. 設(shè)置p_open為true.
nni_pollable_raise是會(huì)對(duì)p_wfd寫入數(shù)字1, 這樣p_rfd會(huì)變?yōu)榭勺x狀態(tài). 會(huì)設(shè)置p_raised為true
nni_plat_pipe_clear會(huì)從p_rfd里面讀取數(shù)字1, 然后清除p_raised狀態(tài).
msgqueue
struct nni_msgq {nni_mtx mq_lock;int mq_cap;int mq_alloc; // alloc is cap + 2...int mq_len; // mq_msgs的當(dāng)前元素個(gè)數(shù).int mq_get;int mq_put; // mq_msgs當(dāng)前可以存放msg的位置(即數(shù)組下標(biāo))int mq_geterr;bool mq_closed;nni_msg **mq_msgs; // msg指針數(shù)組nni_list mq_aio_putq; // 等待寫的aionni_list mq_aio_getq; // 等待讀的aio// Pollable status.nni_pollable *mq_sendable;nni_pollable *mq_recvable; };nni_msgq_init
分配cap+2個(gè)nni_msg 指針, 賦值給mq_msgs.
其他進(jìn)行初始化.
nni_msgq_tryput
將msg放入msgqueue.
- 取出一個(gè)aio
- 執(zhí)行nni_aio_finish_msg(aio, msg)
- nni_msgq_run_notify(msgqueue)
- 實(shí)際就是將msg設(shè)置到aio的a_msg里面, 然后執(zhí)行aio的回調(diào)函數(shù).
- 將msg放入到mq_msgs數(shù)組中
nni_msgq_aio_put(msgqueue, waio)
- 如果mq_aio_getq鏈表里面有raio
- 那么直接將waio里面的msg交給raio處理
nni_msgq_aio_get(msgqueue, aio)
類似nni_msgq_aio_get, 從mq_aio_getq里面取出一個(gè)raio, 然后從mq_msg或者mq_aio_putq的waio里面獲取一個(gè)msg. 獲取到msg后就會(huì)執(zhí)行raio的回調(diào)接口.
如果沒有msg, 那么就將raio放入到mq_aio_getq里面, 等待后面的msg
nni_msgq_get_recvable和nni_msgq_get_sendable
protocol
用于表示一種協(xié)議, 實(shí)際的協(xié)議實(shí)現(xiàn), 必須提供下面的nni_proto對(duì)象.
struct nni_proto {uint32_t proto_version; // Ops vector versionnni_proto_id proto_self; // Our identitynni_proto_id proto_peer; // Peer identityuint32_t proto_flags; // Protocol flagsconst nni_proto_sock_ops *proto_sock_ops; // Per-socket opeationsconst nni_proto_pipe_ops *proto_pipe_ops; // Per-pipe operations.const nni_proto_ctx_ops * proto_ctx_ops; // Context operations.// proto_init, if not NULL, provides a function that initializes// global values. The main purpose of this may be to initialize// protocol option values.int (*proto_init)(void);// proto_fini, if not NULL, is called at shutdown, to release// any resources allocated at proto_init time.void (*proto_fini)(void); };proto_flags的值是如下標(biāo)記
#define NNI_PROTO_FLAG_RCV 1 // Protocol can receive #define NNI_PROTO_FLAG_SND 2 // Protocol can send #define NNI_PROTO_FLAG_SNDRCV 3 // Protocol can both send & recv #define NNI_PROTO_FLAG_RAW 4 // Protocol is raw #define NNI_PROTO_FLAG_NOMSGQ 8 // Protocol bypasses the upper queuesnni_proto_open
該函數(shù)由協(xié)議調(diào)用, 用于創(chuàng)建一個(gè)socket.
每個(gè)協(xié)議都會(huì)有一次初始化過程, 初始化的時(shí)候會(huì)調(diào)用proto_init接口, 然后將nni_proto放入到全局鏈表nni_proto_list中.
nni_proto_open里面調(diào)用nni_sock_open進(jìn)行實(shí)際的創(chuàng)建工作.
socket
struct nni_socket {nni_list_node s_node;nni_mtx s_mx;nni_cv s_cv;nni_cv s_close_cv;uint32_t s_id;uint32_t s_flags;unsigned s_refcnt; // protected by global lockvoid * s_data; // Protocol privatenni_msgq *s_uwq; // Upper write queuenni_msgq *s_urq; // Upper read queuenni_proto_id s_self_id;nni_proto_id s_peer_id;nni_proto_pipe_ops s_pipe_ops;nni_proto_sock_ops s_sock_ops;nni_proto_ctx_ops s_ctx_ops;// optionsnni_duration s_sndtimeo; // send timeoutnni_duration s_rcvtimeo; // receive timeoutnni_duration s_reconn; // reconnect timenni_duration s_reconnmax; // max reconnect timesize_t s_rcvmaxsz; // max receive sizenni_list s_options; // opts not handled by sock/protochar s_name[64]; // socket name (legacy compat)char s_scope[24]; // socket scope ("socket%u", 32 bits max)nni_list s_listeners; // active listenersnni_list s_dialers; // active dialersnni_list s_pipes; // active pipes nni_list s_ctxs; // active contexts (protected by global sock_lk)bool s_closing; // Socket is closingbool s_closed; // Socket closed, protected by global lockbool s_ctxwait; // Waiting for contexts to close.nni_mtx s_pipe_cbs_mtx;nni_sock_pipe_cb s_pipe_cbs[NNG_PIPE_EV_NUM];sock_stats s_stats; };nni_sock_create
創(chuàng)建一個(gè)nni_sock對(duì)象, 然后進(jìn)行初始化.
s->s_sndtimeo = -1;s->s_rcvtimeo = -1;s->s_reconn = NNI_SECOND;s->s_reconnmax = 0;s->s_rcvmaxsz = 0; // unlimited by defaults->s_id = 0;s->s_refcnt = 0;s->s_self_id = proto->proto_self;s->s_peer_id = proto->proto_peer;s->s_flags = proto->proto_flags;s->s_sock_ops = *proto->proto_sock_ops; // 用proto的proto_sock_opss->s_pipe_ops = *proto->proto_pipe_ops; // 用proto的proto_pipe_opss->s_closed = false;s->s_closing = false;if (proto->proto_ctx_ops != NULL) {s->s_ctx_ops = *proto->proto_ctx_ops;}// 鏈表相關(guān)初始化NNI_LIST_NODE_INIT(&s->s_node);NNI_LIST_INIT(&s->s_options, nni_sockopt, node);NNI_LIST_INIT(&s->s_ctxs, nni_ctx, c_node);NNI_LIST_INIT(&s->s_pipes, nni_pipe, p_sock_node);NNI_LIST_INIT(&s->s_listeners, nni_listener, l_node);NNI_LIST_INIT(&s->s_dialers, nni_dialer, d_node);// 消息隊(duì)列nni_msgq_init(&s->s_uwq, 0)nni_msgq_init(&s->s_urq, 1)// 執(zhí)行sock_inits->s_sock_ops.sock_init(&s->s_data, s)// 其他為設(shè)置optionsnni_sock_open
dialer
struct nni_dialer {nni_tran_dialer_ops d_ops; // transport opsnni_tran * d_tran; // transport pointervoid * d_data; // transport privateuint32_t d_id; // endpoint idnni_list_node d_node; // per socket listnni_sock * d_sock;nni_url * d_url;nni_pipe * d_pipe; // active pipe (for redialer)int d_refcnt;bool d_closed; // full shutdownbool d_closing;nni_atomic_flag d_started;nni_mtx d_mtx;nni_list d_pipes;nni_aio * d_user_aio;nni_aio * d_con_aio;nni_aio * d_tmo_aio; // backoff timernni_duration d_maxrtime; // maximum time for reconnectnni_duration d_currtime; // current time for reconnectnni_duration d_inirtime; // initial time for reconnectnni_time d_conntime; // time of last good connectnni_reap_item d_reap;nni_dialer_stats d_stats; };nni_dialer_create
nni_dialer_start
分為2種情況, NONBLOCK和BLOCK模式
- nni_aio_init初始化一個(gè)新的aio.
- 執(zhí)行nni_aio_begin.
- 設(shè)置dialer的d_user_aio為aio
- 執(zhí)行dialer_connect_start.
- 等待aio執(zhí)行完成, nni_aio_wait
- 獲取nni_aio_result
- 執(zhí)行nni_aio_fini
- aio為NULL
- 設(shè)置dialer的d_user_aio為NULL
- 執(zhí)行: dialer_connect_start
在dialer_connect_start里面執(zhí)行連接操作:?d->d_ops.d_connect(d->d_data, d->d_conn_aio);
d_conn_aio的回調(diào)函數(shù)dialer_connect_cb, 其nni_aio_result的結(jié)果有以下情況.
listener
struct nni_listener {nni_tran_listener_ops l_ops; // transport opsnni_tran * l_tran; // transport pointervoid * l_data; // transport privateuint32_t l_id; // endpoint idnni_list_node l_node; // per socket listnni_sock * l_sock;nni_url * l_url;int l_refcnt;bool l_closed; // full shutdownbool l_closing; // close started (shutdown)nni_atomic_flag l_started;nni_list l_pipes; nni_aio * l_acc_aio;nni_aio * l_tmo_aio;nni_reap_item l_reap;nni_listener_stats l_stats; };nni_listener_create
其過程類似nni_dialer_create.
nni_listener_start
l_acc_aio的回調(diào)為listener_accept_cb, 其獲取aio的結(jié)果.
pipe
struct nni_pipe {uint32_t p_id;nni_tran_pipe_ops p_tran_ops;nni_proto_pipe_ops p_proto_ops;void * p_tran_data;void * p_proto_data;nni_list_node p_sock_node;nni_list_node p_ep_node;nni_sock * p_sock;nni_dialer * p_dialer;nni_listener * p_listener;bool p_closed;nni_atomic_flag p_stop;bool p_cbs;int p_refcnt;nni_mtx p_mtx;nni_cv p_cv;nni_reap_item p_reap;nni_pipe_stats p_stats; };pipe_create
pipe_create(nni_pipe **pp, nni_sock *sock, nni_tran *tran, void *tdata)
傳輸層inproc
傳輸層的實(shí)現(xiàn), 必須提供struct nni_tran對(duì)象, inproc的該對(duì)象內(nèi)主要是包含tran_dialer, tran_listener, tran_pipe三組回調(diào)函數(shù), 以及tran_init用于初始化的函數(shù).
inproc的實(shí)現(xiàn)比較簡單, 內(nèi)部主要是用nni_msgq進(jìn)行數(shù)據(jù)傳輸.
inproc->tran_dialer
tran_dialer里面主要是提供init以及connect接口.
inproc_dialer_init
inproc_ep_connect
inproc_accept_clients
inproc_pipe_recv
inproc_pipe_send
nng源碼閱讀 · GitHub
總結(jié)
- 上一篇: 洛谷—— P2934 [USACO09J
- 下一篇: php动漫随机图源码,随机图片API源码