Nginx-rtmp直播之业务流程分析--比较详细
1. 綜述
1.1 直播原理
使用 obs 向 nginx 推送一個直播流,該直播流經 nginx-rtmp 的 ngx_rtmp_live_module 模塊轉發給 application live 應用,
然后使用 vlc 連接 live,播放該直播流。
1.2 nginx.conf
# 創建的子進程數 worker_processes 1;error_log stderr debug;daemon off;master_process off;events {worker_connections 1024; }rtmp {server {listen 1935; # rtmp傳輸端口chunk_size 4096; # 數據傳輸塊大小application live { # 直播配置live on;}# obs 將流推到該 push 應用,push 應用又將該流發布到 live 應用application push {live on;push rtmp://192.168.1.82:1935/live; # 推流到上面的直播應用}} }1.3 obs 推流設置
點擊 "+" 選擇一個媒體源,確定,然后設置該媒體源,如下圖:
點擊 "設置" 選擇 "流",設置推流地址,如下圖,確定后即可進行推流:
1.4 使用 vlc 播放直播流
2. 源碼分析:application push
首先開始分析從 obs 推送 rtmp 流到 nginx 服務器的整個流程。
2.1 監聽連接
nginx 啟動后,就會一直在 ngx_process_events 函數中的 epoll_eait 處休眠,監聽客戶端的連接:
static ngx_int_t ngx_epoll_process_events(ngx_cycle_t *cycle, ngx_msec_t timer, ngx_uint_t flags) {...ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0,"epoll timer: %M", timer);/* nginx 最初運行時,timer 為 -1,即一直等待客戶端連接 */events = epoll_wait(ep, event_list, (int) nevents, timer);...for (i = 0; i < events; i++) {c = event_list[i].data.ptr;instance = (uintptr_t) c & 1;c = (ngx_connection_t *) ((uintptr_t) c & (uintptr_t) ~1);/* 獲取被監聽的讀事件 */rev = c->read;/* 獲取 epoll_wait 返回的事件標志 */revents = event_list[i].events;.../* 若是監聽的事件可讀,首次監聽即表示有新連接到來 */if ((revents & EPOLLIN) && rev->active) {...rev->ready = 1;/* 若是開啟了負載均衡,則先將該事件添加到 ngx_posted_accept_events * 延遲隊列中 */if (flags & NGX_POST_EVENTS) {queue = rev->accept ? &ngx_posted_accept_events: &ngx_posted_events;ngx_post_event(rev, queue);} else {/* 否則,直接調用該讀事件的回調函數,若是新連接則* 調用的是 ngx_event_accept 函數 */rev->handler(rev);}}...}return NGX_OK; }ngx_event_accept 函數中主要也就是接受客戶端的連接,并調用該監聽端口對應的回調函數:
void ngx_event_accept(ngx_event_t *ev) {...do {...s = accept(lc->fd, &sa.sockaddr, &socklen);.../* 調用該監聽端口對應的回調函數,對于 rtmp 模塊,則固定為 ngx_rtmp_init_connection */ls->handler(c);...} while (ev->available); }在 ngx_rtmp_init_connection 函數中先經過一系列的初始化后,開始接收與客戶端進行 rtmp 的 handshake 過程。
下面從 hanshake 到 hanshake 成功后接收到第一個 rtmp 包之間僅以圖片說明,就不再分析源碼了。
2.2 handshake
2.2.1 hs_stage: SERVER_RECV_CHALLENGE(1)
該 hanshake 階段即為等待接收客戶端發送的 C0 和 C1 階段。
receive: Handshake C0+C1 圖(1)
接收到客戶端發送的 C0 和 C1 后,服務器進入 NGX_RTMP_HANDSHAKE_SERVER_SEND_CHALLENGE(2)階段,即為
發送S0 和 S1 階段。
2.2.2 hs_stage: SERVER_SEND_CHALLENGE(2) 和 SERVER_SEND_RESPONSE(3)
該 SERVER_SEND_CHALLENGE 階段即為等待接收客戶端發送的 S0 和 S1 階段。但是實際上,服務器在發送完 S0 和
S1 后,進入到 SERVER_SEND_RESPONSE(3) 階段后又立刻發送 S2,因此,在抓到的包如下:
send: Handshake S0+S1+S2 圖(2)
2.2.3 hs_stage: SERVER_RECV_RESPONSE(4)
該階段為等待接收客戶端發送的 C2 階段。
receive:Handshake C2 圖(3)
至此,服務器和客戶端的 rtmp handshake 過程完整,開始正常的信息交互階段。
如下代碼,接收到 C2 后,服務器即進入循環處理客戶端的請求階段:ngx_rtmp_cycle
static void ngx_rtmp_handshake_done(ngx_rtmp_session_t *s) {ngx_rtmp_free_handshake_buffers(s);ngx_log_debug0(NGX_LOG_DEBUG_RTMP, s->connection->log, 0,"handshake: done");if (ngx_rtmp_fire_event(s, NGX_RTMP_HANDSHAKE_DONE,NULL, NULL) != NGX_OK){ngx_rtmp_finalize_session(s);return;}ngx_rtmp_cycle(s); }ngx_rtmp_cycle 函數中,重新設置了當前 rtmp 連接的讀、寫事件的回調函數,當監聽到客戶端發送的數據時,將調用
ngx_rtmp_recv 函數進行處理。
在 ngx_rtmp_recv 函數中,會循環接收客戶端發來的 rtmp 包數據,接收到完整的一個 rtmp message 后,會根據該消息
的 rtmp message type,調用相應的函數進行處理,如,若為 20,即為 amf0 類型的命令消息,就會調用
ngx_rtmp_amf_message_handler 函數進行處理。
2.3 connect(‘push‘)
hanshake 成功后,接收到客戶端發來的第一個 rtmp 包為連接 nginx.conf 中 rtmp{} 下的 application push{}
應用,如下圖:
receive: connect(‘push‘) 圖(4)
從該圖可知,該消息類型為 20,即為 AMF0 Command,因此會調用 ngx_rtmp_amf_message_handler 對該消息進行解析,
然后對其中的命令 connect 調用預先設置好的 ngx_rtmp_cmd_connect_init 回調函數。在 ngx_rtmp_cmd_connect_init
函數中,繼續解析該 connect 余下的消息后,開始 ngx_rtmp_connect 構件的 connect 函數鏈表,該鏈表中存放著各個
rtmp 模塊對該 connect 命令所要做的操作(注:僅有部分 rtmp 模塊會對該 connect 命令設置有回調函數,并且就算
設置了回調函數,也需要在配置文件中啟用相應的模塊才會真正執行該模塊對 connect 的處理)。因此,對于 connect
命令,這里僅會真正處理 ngx_rtmp_cmd_module 模塊設置 ngx_rtmp_cmd_connect 回調函數。
2.3.1 ngx_rtmp_cmd_connect
static ngx_int_t ngx_rtmp_cmd_connect(ngx_rtmp_session_t *s, ngx_rtmp_connect_t *v) {ngx_log_error(NGX_LOG_INFO, s->connection->log, 0,"rtmp cmd: connect");ngx_rtmp_core_srv_conf_t *cscf;ngx_rtmp_core_app_conf_t **cacfp;ngx_uint_t n;ngx_rtmp_header_t h;u_char *p;static double trans;static double capabilities = NGX_RTMP_CAPABILITIES;static double object_encoding = 0;/* 以下內容為服務器將要對客戶端的 connect 命令返回的 amf 類型的響應 */static ngx_rtmp_amf_elt_t out_obj[] = {{ NGX_RTMP_AMF_STRING,ngx_string("fmsVer"),NGX_RTMP_FMS_VERSION, 0 },{ NGX_RTMP_AMF_NUMBER,ngx_string("capabilities"),&capabilities, 0 },};static ngx_rtmp_amf_elt_t out_inf[] = {{ NGX_RTMP_AMF_STRING,ngx_string("level"),"status", 0 },{ NGX_RTMP_AMF_STRING,ngx_string("code"),"NetConnection.Connect.Success", 0 },{ NGX_RTMP_AMF_STRING,ngx_string("description"),"Connection succeeded.", 0 },{ NGX_RTMP_AMF_NUMBER,ngx_string("objectEncoding"),&object_encoding, 0 }};static ngx_rtmp_amf_elt_t out_elts[] = {{ NGX_RTMP_AMF_STRING,ngx_null_string,"_result", 0 },{ NGX_RTMP_AMF_NUMBER,ngx_null_string,&trans, 0 },{ NGX_RTMP_AMF_OBJECT,ngx_null_string,out_obj, sizeof(out_obj) },{ NGX_RTMP_AMF_OBJECT,ngx_null_string,out_inf, sizeof(out_inf) },};if (s->connected) {ngx_log_error(NGX_LOG_INFO, s->connection->log, 0,"connect: duplicate connection");return NGX_ERROR;}cscf = ngx_rtmp_get_module_srv_conf(s, ngx_rtmp_core_module);trans = v->trans;/* fill session parameters */s->connected = 1;ngx_memzero(&h, sizeof(h));h.csid = NGX_RTMP_CSID_AMF_INI;h.type = NGX_RTMP_MSG_AMF_CMD;#define NGX_RTMP_SET_STRPAR(name) s->name.len = ngx_strlen(v->name); s->name.data = ngx_palloc(s->connection->pool, s->name.len); ngx_memcpy(s->name.data, v->name, s->name.len)NGX_RTMP_SET_STRPAR(app);NGX_RTMP_SET_STRPAR(args);NGX_RTMP_SET_STRPAR(flashver);NGX_RTMP_SET_STRPAR(swf_url);NGX_RTMP_SET_STRPAR(tc_url);NGX_RTMP_SET_STRPAR(page_url);#undef NGX_RTMP_SET_STRPARp = ngx_strlchr(s->app.data, s->app.data + s->app.len, ‘?‘);if (p) {s->app.len = (p - s->app.data);}s->acodecs = (uint32_t) v->acodecs;s->vcodecs = (uint32_t) v->vcodecs;/* 找到客戶端 connect 的應用配置 *//* find application & set app_conf */cacfp = cscf->applications.elts;for(n = 0; n < cscf->applications.nelts; ++n, ++cacfp) {if ((*cacfp)->name.len == s->app.len &&ngx_strncmp((*cacfp)->name.data, s->app.data, s->app.len) == 0){/* found app! */s->app_conf = (*cacfp)->app_conf;break;}}if (s->app_conf == NULL) {ngx_log_error(NGX_LOG_INFO, s->connection->log, 0,"connect: application not found: ‘%V‘", &s->app);return NGX_ERROR;}object_encoding = v->object_encoding;/* 發送應答窗口大小:ack_size 給客戶端,該消息是用來通知對方應答窗口的大小,* 發送方在發送了等于窗口大小的數據之后,等的愛接收對方的應答消息(在接收 * 到應答消息之前停止發送數據)。接收當必須發送應答消息,在會話開始時,在 * 會話開始時,會從上一次發送應答之后接收到了等于窗口大小的數據 */return ngx_rtmp_send_ack_size(s, cscf->ack_window) != NGX_OK ||/* 發送 設置流帶寬消息。發送此消息來說明對方的出口帶寬限制,接收方以此來限制 * 自己的出口帶寬,即限制未被應答的消息數據大小。接收到此消息的一方,如果 * 窗口大小與上一次發送的不一致,應該回復應答窗口大小的消息 */ngx_rtmp_send_bandwidth(s, cscf->ack_window,NGX_RTMP_LIMIT_DYNAMIC) != NGX_OK ||/* 發送 設置塊消息消息,用來通知對方新的最大的塊大小。 */ngx_rtmp_send_chunk_size(s, cscf->chunk_size) != NGX_OK ||ngx_rtmp_send_amf(s, &h, out_elts,sizeof(out_elts) / sizeof(out_elts[0]))!= NGX_OK ? NGX_ERROR : NGX_OK; }send: ack_size 圖(5)
send: peer bandwidth 圖(6)
send:chunk_size 圖(7)
send:_result(‘NetConnection.Connect.Success‘) 圖(8)
2.4 releaseStream(‘test‘)
服務器響應客戶端 connect 命令消息后,客戶端接著發送 releaseStream 命令消息給服務器,但是 nginx-rtmp 中沒有
任何一個 rtmp 模塊對該命令設置有回調函數,因此,不進行處理,接著等待接收下一個消息。
receive: releaseStream(‘test‘) 圖(9)
2.5 createStream(‘‘)
接著服務器接收到客戶端發來的 createStream 命令消息。
receive: createStream(‘‘) 圖(10)
從以前的分析可知,此時,會調用 ngx_rtmp_cmd_create_stream_init 函數。
2.5.1 ngx_rtmp_cmd_create_stream_init
static ngx_int_t ngx_rtmp_cmd_create_stream_init(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h,ngx_chain_t *in) {static ngx_rtmp_create_stream_t v;static ngx_rtmp_amf_elt_t in_elts[] = {{ NGX_RTMP_AMF_NUMBER,ngx_null_string,&v.trans, sizeof(v.trans) },};/* 解析該 createStream 命令消息,獲取 v.trans 值,從圖(10) 可知,為 4 */if (ngx_rtmp_receive_amf(s, in, in_elts,sizeof(in_elts) / sizeof(in_elts[0]))){return NGX_ERROR;}ngx_log_error(NGX_LOG_INFO, s->connection->log, 0, "createStream");return ngx_rtmp_create_stream(s, &v); }接著,從該函數中開始調用 ngx_rtmp_create_stream 構建的函數鏈表。這里調用到的是 ngx_rtmp_cmd_create_stream
函數。
2.5.2 ngx_rtmp_cmd_create_stream
static ngx_int_t ngx_rtmp_cmd_create_stream(ngx_rtmp_session_t *s, ngx_rtmp_create_stream_t *v) {ngx_log_error(NGX_LOG_INFO, s->connection->log, 0, "rtmp cmd: create stream");/* support one message stream per connection */static double stream;static double trans;ngx_rtmp_header_t h;static ngx_rtmp_amf_elt_t out_elts[] = {{ NGX_RTMP_AMF_STRING,ngx_null_string,"_result", 0 },{ NGX_RTMP_AMF_NUMBER,ngx_null_string,&trans, 0 },{ NGX_RTMP_AMF_NULL,ngx_null_string,NULL, 0 },{ NGX_RTMP_AMF_NUMBER,ngx_null_string,&stream, sizeof(stream) },};trans = v->trans;stream = NGX_RTMP_MSID;ngx_memzero(&h, sizeof(h));h.csid = NGX_RTMP_CSID_AMF_INI;h.type = NGX_RTMP_MSG_AMF_CMD;return ngx_rtmp_send_amf(s, &h, out_elts,sizeof(out_elts) / sizeof(out_elts[0])) == NGX_OK ?NGX_DONE : NGX_ERROR; }該函數主要是發送服務器對 createStream 的響應。
send: _result()
2.6 publish(‘test‘)
接著,客戶端發送 publish 給服務器,用來發布一個有名字的流到服務器,其他客戶端可以使用此流名來播放流,接收
發布的音頻,視頻,以及其他數據消息。
receive:publish(‘test‘) 圖(11)
從圖中可知,publish type 為 ‘live‘,即服務器不會保存客戶端發布的流到文件中。
2.6.1 ngx_rtmp_cmd_publish_init
static ngx_int_t ngx_rtmp_cmd_publish_init(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h,ngx_chain_t *in) {static ngx_rtmp_publish_t v;static ngx_rtmp_amf_elt_t in_elts[] = {/* transaction is always 0 */{ NGX_RTMP_AMF_NUMBER,ngx_null_string,NULL, 0 },{ NGX_RTMP_AMF_NULL,ngx_null_string,NULL, 0 },{ NGX_RTMP_AMF_STRING,ngx_null_string,&v.name, sizeof(v.name) },{ NGX_RTMP_AMF_OPTIONAL | NGX_RTMP_AMF_STRING,ngx_null_string,&v.type, sizeof(v.type) },};ngx_memzero(&v, sizeof(v));/* 從 publish 命令消息中獲取 in_elts 中指定的值 */if (ngx_rtmp_receive_amf(s, in, in_elts,sizeof(in_elts) / sizeof(in_elts[0]))){return NGX_ERROR;}ngx_rtmp_cmd_fill_args(v.name, v.args);ngx_log_error(NGX_LOG_INFO, s->connection->log, 0,"publish: name=‘%s‘ args=‘%s‘ type=%s silent=%d",v.name, v.args, v.type, v.silent);return ngx_rtmp_publish(s, &v); }接著,該函數開始調用 ngx_rtmp_publish 構建的函數鏈表。從 nginx-rtmp 的源碼和 nginx.conf 的配置可知,主要調用
ngx_rtmp_relay_publish 和 ngx_rtmp_live_publish 兩個函數。
由 rtmp 模塊的排序,首先調用 ngx_rtmp_relay_publish。
2.6.2 ngx_rtmp_relay_publish
static ngx_int_t ngx_rtmp_relay_publish(ngx_rtmp_session_t *s, ngx_rtmp_publish_t *v) {ngx_rtmp_relay_app_conf_t *racf;ngx_rtmp_relay_target_t *target, **t;ngx_str_t name;size_t n;ngx_rtmp_relay_ctx_t *ctx;if (s->auto_pushed) {goto next;}ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_relay_module);if (ctx && s->relay) {goto next;}racf = ngx_rtmp_get_module_app_conf(s, ngx_rtmp_relay_module);if (racf == NULL || racf->pushes.nelts == 0) {goto next;}/* v->name 中保存的是從客戶端發送的 publish 命令消息中提取出的要發布的流名稱 */name.len = ngx_strlen(v->name);name.data = v->name;/* 從 pushes 數組中取出首元素,遍歷該數組 */t = racf->pushes.elts;for (n = 0; n < racf->pushes.nelts; ++n, ++t) {target = *t;/* 配置文件中是否指定了要推流的名稱,若是,則檢測指定的流名字與當前接收到的publish 流名* 是否一致 */if (target->name.len && (name.len != target->name.len ||ngx_memcmp(name.data, target->name.data, name.len))){continue;}if (ngx_rtmp_relay_push(s, &name, target) == NGX_OK) {continue;}ngx_log_error(NGX_LOG_ERR, s->connection->log, 0,"relay: push failed name=‘%V‘ app=‘%V‘ ""playpath=‘%V‘ url=‘%V‘",&name, &target->app, &target->play_path,&target->url.url);if (!ctx->push_evt.timer_set) {ngx_add_timer(&ctx->push_evt, racf->push_reconnect);}}next:return next_publish(s, v); }2.6.3 ngx_rtmp_relay_push
ngx_int_t ngx_rtmp_relay_push(ngx_rtmp_session_t *s, ngx_str_t *name,ngx_rtmp_relay_target_t *target) {ngx_log_error(NGX_LOG_INFO, s->connection->log, 0,"relay: create push name=‘%V‘ app=‘%V‘ playpath=‘%V‘ url=‘%V‘",name, &target->app, &target->play_path, &target->url.url);return ngx_rtmp_relay_create(s, name, target,ngx_rtmp_relay_create_local_ctx,ngx_rtmp_relay_create_remote_ctx); }2.6.4 ngx_rtmp_relay_create
static ngx_int_t ngx_rtmp_relay_create(ngx_rtmp_session_t *s, ngx_str_t *name,ngx_rtmp_relay_target_t *target,ngx_rtmp_relay_create_ctx_pt create_publish_ctx,ngx_rtmp_relay_create_ctx_pt create_play_ctx) {ngx_rtmp_relay_app_conf_t *racf;ngx_rtmp_relay_ctx_t *publish_ctx, *play_ctx, **cctx;ngx_uint_t hash;racf = ngx_rtmp_get_module_app_conf(s, ngx_rtmp_relay_module);if (racf == NULL) {return NGX_ERROR;}/* 該函數主要是創建一個新的連接,連接推流url中指定的地址,即將該地址作為上游服務器的地址,* 向該上游服務器發起連接 */play_ctx = create_play_ctx(s, name, target);if (play_ctx == NULL) {return NGX_ERROR;}hash = ngx_hash_key(name->data, name->len);cctx = &racf->ctx[hash % racf->nbuckets];for (; *cctx; cctx = &(*cctx)->next) {if ((*cctx)->name.len == name->len&& !ngx_memcmp(name->data, (*cctx)->name.data,name->len)){break;}}if (*cctx) {play_ctx->publish = (*cctx)->publish;play_ctx->next = (*cctx)->play;(*cctx)->play = play_ctx;return NGX_OK;}/* 創建一個本地 ngx_rtmp_relay_ctx_t */publish_ctx = create_publish_ctx(s, name, target);if (publish_ctx == NULL) {ngx_rtmp_finalize_session(play_ctx->session);return NGX_ERROR;}publish_ctx->publish = publish_ctx;publish_ctx->play = play_ctx;play_ctx->publish = publish_ctx;*cctx = publish_ctx;return NGX_OK; }2.6.4.1 ngx_rtmp_relay_create_remote_ctx
static ngx_rtmp_relay_ctx_t * ngx_rtmp_relay_create_remote_ctx(ngx_rtmp_session_t *s, ngx_str_t* name,ngx_rtmp_relay_target_t *target) {ngx_rtmp_conf_ctx_t cctx;cctx.app_conf = s->app_conf;cctx.srv_conf = s->srv_conf;cctx.main_conf = s->main_conf;return ngx_rtmp_relay_create_connection(&cctx, name, target); }2.6.4.2 ngx_rtmp_relay_create_connection
static ngx_rtmp_relay_ctx_t * ngx_rtmp_relay_create_connection(ngx_rtmp_conf_ctx_t *cctx, ngx_str_t* name,ngx_rtmp_relay_target_t *target) {ngx_rtmp_relay_app_conf_t *racf;ngx_rtmp_relay_ctx_t *rctx;ngx_rtmp_addr_conf_t *addr_conf;ngx_rtmp_conf_ctx_t *addr_ctx;ngx_rtmp_session_t *rs;ngx_peer_connection_t *pc;ngx_connection_t *c;ngx_addr_t *addr;ngx_pool_t *pool;ngx_int_t rc;ngx_str_t v, *uri;u_char *first, *last, *p;racf = ngx_rtmp_get_module_app_conf(cctx, ngx_rtmp_relay_module);ngx_log_debug0(NGX_LOG_DEBUG_RTMP, racf->log, 0,"relay: create remote context");pool = NULL;/* 分配一個內存池 */pool = ngx_create_pool(4096, racf->log);if (pool == NULL) {return NULL;}/* 從內存池中為 ngx_rtmp_relay_ctx_t 結構體分配內存 */rctx = ngx_pcalloc(pool, sizeof(ngx_rtmp_relay_ctx_t));if (rctx == NULL) {goto clear;}/* 將發布的流名拷貝到新建的 ngx_rtmp_relay_ctx_t 中的 name 成員 */if (name && ngx_rtmp_relay_copy_str(pool, &rctx->name, name) != NGX_OK) {goto clear;}/* 將配置文件中配置的 push 推流地址,即 url 拷貝到新建的 ngx_rtmp_relay_ctx_t* 結構體的 url 成員中 */if (ngx_rtmp_relay_copy_str(pool, &rctx->url, &target->url.url) != NGX_OK) {goto clear;}/* target->tag 指向 ngx_rtmp_relay_module 結構體的首地址 */rctx->tag = target->tag;/* target->data 指向當前 data 所屬的 ngx_rtmp_relay_ctx_t 結構體的首地址 */rctx->data = target->data;#define NGX_RTMP_RELAY_STR_COPY(to, from) if (ngx_rtmp_relay_copy_str(pool, &rctx->to, &target->from) != NGX_OK) { goto clear; }/* 將以下 target 中的值拷貝到新建的 ngx_rtmp_relay_ctx_t 結構體的相應成員中 */NGX_RTMP_RELAY_STR_COPY(app, app);NGX_RTMP_RELAY_STR_COPY(tc_url, tc_url);NGX_RTMP_RELAY_STR_COPY(page_url, page_url);NGX_RTMP_RELAY_STR_COPY(swf_url, swf_url);NGX_RTMP_RELAY_STR_COPY(flash_ver, flash_ver);NGX_RTMP_RELAY_STR_COPY(play_path, play_path);rctx->live = target->live;rctx->start = target->start;rctx->stop = target->stop;#undef NGX_RTMP_RELAY_STR_COPY/* 若 app 的值未知 */if (rctx->app.len == 0 || rctx->play_path.len == 0) {/* 這里是從推流地址中提取出 app 的值,下面分析以 "push rtmp:192.168.1.82:1935/live;" * 為例,則提出的 live 將賦給 rctx->app *//* parse uri */uri = &target->url.uri;first = uri->data;last = uri->data + uri->len;if (first != last && *first == ‘/‘) {++first;}if (first != last) {/* deduce app */p = ngx_strlchr(first, last, ‘/‘);if (p == NULL) {p = last;}if (rctx->app.len == 0 && first != p) {/* 這里 v.data 指向 "live" */v.data = first;v.len = p - first;/* 將 "live" 賦給 rctx->app */if (ngx_rtmp_relay_copy_str(pool, &rctx->app, &v) != NGX_OK) {goto clear;}}/* deduce play_path */if (p != last) {++p;}/* 若播放路徑為 NULL 且 p 不等于 last(注,這里 p 不等于 last 意味著 * "push rtmp:192.168.1.82:1935/live;" 的 "live" 字符串后面還有數據,* 但是,這里沒有)*/if (rctx->play_path.len == 0 && p != last) {v.data = p;v.len = last - p;if (ngx_rtmp_relay_copy_str(pool, &rctx->play_path, &v)!= NGX_OK){goto clear;}}}}/* 從內存池中為主動連接結構體 ngx_peer_connection_t 分配內存 */pc = ngx_pcalloc(pool, sizeof(ngx_peer_connection_t));if (pc == NULL) {goto clear;}if (target->url.naddrs == 0) {ngx_log_error(NGX_LOG_ERR, racf->log, 0,"relay: no address");goto clear;}/* get address *//* 獲取 推流地址 url 中指明的服務器地址(即推流的目標地址)* 如"push rtmp:192.168.1.82:1935/live;" 中的 "192.168.1.82:1935" */addr = &target->url.addrs[target->counter % target->url.naddrs];target->counter++;/* copy log to keep shared log unchanged */rctx->log = *racf->log;pc->log = &rctx->log;/* 當使用長連接與上游服務器通信時,可通過該方法由連接池中獲取一個新連接 */pc->get = ngx_rtmp_relay_get_peer;/* 當使用長連接與上游服務器通信時,通過該方法將使用完畢的連接釋放給連接池 */pc->free = ngx_rtmp_relay_free_peer;/* 遠端服務器的名稱,這里其實就是 "192.168.1.82:1935" 該串字符串 */pc->name = &addr->name;pc->socklen = addr->socklen;pc->sockaddr = (struct sockaddr *)ngx_palloc(pool, pc->socklen);if (pc->sockaddr == NULL) {goto clear;}/* 將 addr->sockaddr 中保存的遠端服務器的地址信息拷貝到 pc->sockaddr 中 */ngx_memcpy(pc->sockaddr, addr->sockaddr, pc->socklen);/* 開始連接上游服務器 */rc = ngx_event_connect_peer(pc);/* 由 ngx_event_connect_peer 源碼可知,因為 socket 套接字被設置為非阻塞,* 因為首次 connect 必定失敗,因此該函數返回 NGX_AGAIN */if (rc != NGX_OK && rc != NGX_AGAIN ) {ngx_log_debug0(NGX_LOG_DEBUG_RTMP, racf->log, 0,"relay: connection failed");goto clear;}c = pc->connection;c->pool = pool;/* 推流 URL */c->addr_text = rctx->url;addr_conf = ngx_pcalloc(pool, sizeof(ngx_rtmp_addr_conf_t));if (addr_conf == NULL) {goto clear;}addr_ctx = ngx_pcalloc(pool, sizeof(ngx_rtmp_conf_ctx_t));if (addr_ctx == NULL) {goto clear;}addr_conf->ctx = addr_ctx;addr_ctx->main_conf = cctx->main_conf;addr_ctx->srv_conf = cctx->srv_conf;ngx_str_set(&addr_conf->addr_text, "ngx-relay");/* 為該主動連接初始化一個會話 */rs = ngx_rtmp_init_session(c, addr_conf);if (rs == NULL) {/* no need to destroy pool */return NULL;}rs->app_conf = cctx->app_conf;/* 置該標志位為 1 */rs->relay = 1;rctx->session = rs;ngx_rtmp_set_ctx(rs, rctx, ngx_rtmp_relay_module);ngx_str_set(&rs->flashver, "ngx-local-relay");#if (NGX_STAT_STUB)(void) ngx_atomic_fetch_add(ngx_stat_active, 1); #endif/* 此時作為客戶端,開始向上游服務器發說送 hanshake 包,即 C0 + C1 */ngx_rtmp_client_handshake(rs, 1);return rctx;clear:if (pool) {ngx_destroy_pool(pool);}return NULL; }2.6.4.3 ngx_event_connect_peer
ngx_int_t ngx_event_connect_peer(ngx_peer_connection_t *pc) {int rc, type; #if (NGX_HAVE_IP_BIND_ADDRESS_NO_PORT || NGX_LINUX)in_port_t port; #endifngx_int_t event;ngx_err_t err;ngx_uint_t level;ngx_socket_t s;ngx_event_t *rev, *wev;ngx_connection_t *c;/* 該 get 方法其實沒有做任何處理 */rc = pc->get(pc, pc->data);if (rc != NGX_OK) {return rc;}type = (pc->type ? pc->type : SOCK_STREAM);/* 創建一個 socket 套接字 */s = ngx_socket(pc->sockaddr->sa_family, type, 0);ngx_log_debug2(NGX_LOG_DEBUG_EVENT, pc->log, 0, "%s socket %d",(type == SOCK_STREAM) ? "stream" : "dgram", s);if (s == (ngx_socket_t) -1) {ngx_log_error(NGX_LOG_ALERT, pc->log, ngx_socket_errno,ngx_socket_n " failed");return NGX_ERROR;}/* 從連接池中獲取一個空閑連接 */c = ngx_get_connection(s, pc->log);if (c == NULL) {if (ngx_close_socket(s) == -1) {ngx_log_error(NGX_LOG_ALERT, pc->log, ngx_socket_errno,ngx_close_socket_n "failed");}return NGX_ERROR;}/* 當前 socket 的類型,是 STREAM 還是 DGRAM,這里為 STREAM */c->type = type;/* 若設置了接收緩沖區的大小,從上面知沒有設置 */if (pc->rcvbuf) {if (setsockopt(s, SOL_SOCKET, SO_RCVBUF,(const void *) &pc->rcvbuf, sizeof(int)) == -1){ngx_log_error(NGX_LOG_ALERT, pc->log, ngx_socket_errno,"setsockopt(SO_RCVBUF) failed");goto failed;}}/* 將該 socket 套接字設置為非阻塞 */if (ngx_nonblocking(s) == -1) {ngx_log_error(NGX_LOG_ALERT, pc->log, ngx_socket_errno,ngx_nonblocking_n " failed");goto failed;}/* local 保存的是本地地址信息,則上面可知,沒有設置 */if (pc->local) {#if (NGX_HAVE_TRANSPARENT_PROXY)if (pc->transparent) {if (ngx_event_connect_set_transparent(pc, s) != NGX_OK) {goto failed;}} #endif#if (NGX_HAVE_IP_BIND_ADDRESS_NO_PORT || NGX_LINUX)port = ngx_inet_get_port(pc->local->sockaddr); #endif#if (NGX_HAVE_IP_BIND_ADDRESS_NO_PORT)if (pc->sockaddr->sa_family != AF_UNIX && port == 0) {static int bind_address_no_port = 1;if (bind_address_no_port) {if (setsockopt(s, IPPROTO_IP, IP_BIND_ADDRESS_NO_PORT,(const void *) &bind_address_no_port,sizeof(int)) == -1){err = ngx_socket_errno;if (err != NGX_EOPNOTSUPP && err != NGX_ENOPROTOOPT) {ngx_log_error(NGX_LOG_ALERT, pc->log, err,"setsockopt(IP_BIND_ADDRESS_NO_PORT) ""failed, ignored");} else {bind_address_no_port = 0;}}}}#endif#if (NGX_LINUX)if (pc->type == SOCK_DGRAM && port != 0) {int reuse_addr = 1;if (setsockopt(s, SOL_SOCKET, SO_REUSEADDR,(const void *) &reuse_addr, sizeof(int))== -1){ngx_log_error(NGX_LOG_ALERT, pc->log, ngx_socket_errno,"setsockopt(SO_REUSEADDR) failed");goto failed;}}#endifif (bind(s, pc->local->sockaddr, pc->local->socklen) == -1) {ngx_log_error(NGX_LOG_CRIT, pc->log, ngx_socket_errno,"bind(%V) failed", &pc->local->name);goto failed;}}if (type == SOCK_STREAM) {/* 設置當前連接的 IO 回調函數 */c->recv = ngx_recv;c->send = ngx_send;c->recv_chain = ngx_recv_chain;c->send_chain = ngx_send_chain;/* 使用 sendfile */c->sendfile = 1;if (pc->sockaddr->sa_family == AF_UNIX) {c->tcp_nopush = NGX_TCP_NOPUSH_DISABLED;c->tcp_nodelay = NGX_TCP_NODELAY_DISABLED;#if (NGX_SOLARIS)/* Solaris‘s sendfilev() supports AF_NCA, AF_INET, and AF_INET6 */c->sendfile = 0; #endif}} else { /* type == SOCK_DGRAM */c->recv = ngx_udp_recv;c->send = ngx_send;c->send_chain = ngx_udp_send_chain;}c->log_error = pc->log_error;/* 設置當前主動連接讀寫事件的回調函數 */rev = c->read;wev = c->write;rev->log = pc->log;wev->log = pc->log;pc->connection = c;c->number = ngx_atomic_fetch_add(ngx_connection_counter, 1);/* 將該主動連接的讀寫事件添加到 epoll 等事件監控機制中 */if (ngx_add_conn) {if (ngx_add_conn(c) == NGX_ERROR) {goto failed;}}ngx_log_debug3(NGX_LOG_DEBUG_EVENT, pc->log, 0,"connect to %V, fd:%d #%uA", pc->name, s, c->number);/* 連接該上游服務器,因為該 socket 套接字被設置為非阻塞,因此首次connect返回 -1,即失敗 */rc = connect(s, pc->sockaddr, pc->socklen);if (rc == -1) {err = ngx_socket_errno;if (err != NGX_EINPROGRESS #if (NGX_WIN32)/* Winsock returns WSAEWOULDBLOCK (NGX_EAGAIN) */&& err != NGX_EAGAIN #endif){if (err == NGX_ECONNREFUSED #if (NGX_LINUX)/** Linux returns EAGAIN instead of ECONNREFUSED* for unix sockets if listen queue is full*/|| err == NGX_EAGAIN #endif|| err == NGX_ECONNRESET|| err == NGX_ENETDOWN|| err == NGX_ENETUNREACH|| err == NGX_EHOSTDOWN|| err == NGX_EHOSTUNREACH){level = NGX_LOG_ERR;} else {level = NGX_LOG_CRIT;}ngx_log_error(level, c->log, err, "connect() to %V failed",pc->name);ngx_close_connection(c);pc->connection = NULL;return NGX_DECLINED;}}/* 因此,從這里返回 NGX_AGAIN */if (ngx_add_conn) {if (rc == -1) {/* NGX_EINPROGRESS */return NGX_AGAIN;}ngx_log_debug0(NGX_LOG_DEBUG_EVENT, pc->log, 0, "connected");wev->ready = 1;return NGX_OK;}if (ngx_event_flags & NGX_USE_IOCP_EVENT) {ngx_log_debug1(NGX_LOG_DEBUG_EVENT, pc->log, ngx_socket_errno,"connect(): %d", rc);if (ngx_blocking(s) == -1) {ngx_log_error(NGX_LOG_ALERT, pc->log, ngx_socket_errno,ngx_blocking_n " failed");goto failed;}/** FreeBSD‘s aio allows to post an operation on non-connected socket.* NT does not support it.** TODO: check in Win32, etc. As workaround we can use NGX_ONESHOT_EVENT*/rev->ready = 1;wev->ready = 1;return NGX_OK;}if (ngx_event_flags & NGX_USE_CLEAR_EVENT) {/* kqueue */event = NGX_CLEAR_EVENT;} else {/* select, poll, /dev/poll */event = NGX_LEVEL_EVENT;}if (ngx_add_event(rev, NGX_READ_EVENT, event) != NGX_OK) {goto failed;}if (rc == -1) {/* NGX_EINPROGRESS */if (ngx_add_event(wev, NGX_WRITE_EVENT, event) != NGX_OK) {goto failed;}return NGX_AGAIN;}ngx_log_debug0(NGX_LOG_DEBUG_EVENT, pc->log, 0, "connected");wev->ready = 1;return NGX_OK;failed:ngx_close_connection(c);pc->connection = NULL;return NGX_ERROR; }2.6.4.4 ngx_rtmp_client_handshake
void ngx_rtmp_client_handshake(ngx_rtmp_session_t *s, unsigned async) {ngx_connection_t *c;c = s->connection;/* 設置當前連接讀寫事件的回調函數 */c->read->handler = ngx_rtmp_handshake_recv;c->write->handler = ngx_rtmp_handshake_send;ngx_log_debug0(NGX_LOG_DEBUG_RTMP, s->connection->log, 0,"handshake: start client handshake");/* 為該將要進行的 hanshake 過程分配數據緩存,用于存儲接收/響應的 hanshake 包 */s->hs_buf = ngx_rtmp_alloc_handshake_buffer(s);/* 設置當前 hanshake 階段,即為 client send: C0 + C1 */s->hs_stage = NGX_RTMP_HANDSHAKE_CLIENT_SEND_CHALLENGE;/* 構建 C0 + C1 的 數據包 */if (ngx_rtmp_handshake_create_challenge(s,ngx_rtmp_client_version,&ngx_rtmp_client_partial_key) != NGX_OK){ngx_rtmp_finalize_session(s);return;}/* 有前面的調用傳入的參數可知,該值為 1,即為異步,因此這里暫時不向上游服務器發送 handshake,* 而是將其寫事件添加到定時器和 epoll 中,等待下次循環監控到該寫事件可寫時才發送 C0 + C1 */if (async) {/* 將該寫事件添加到定時器中,超時時間為 s->timeout */ngx_add_timer(c->write, s->timeout);/* 將該寫事件添加到 epoll 等事件監控機制中 */if (ngx_handle_write_event(c->write, 0) != NGX_OK) {ngx_rtmp_finalize_session(s);}return;}ngx_rtmp_handshake_send(c->write); }2.6.4.5 ngx_rtmp_relay_create_local_ctx
static ngx_rtmp_relay_ctx_t * ngx_rtmp_relay_create_local_ctx(ngx_rtmp_session_t *s, ngx_str_t *name,ngx_rtmp_relay_target_t *target) {ngx_rtmp_relay_ctx_t *ctx;ngx_log_debug0(NGX_LOG_DEBUG_RTMP, s->connection->log, 0,"relay: create local context");ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_relay_module);if (ctx == NULL) {ctx = ngx_pcalloc(s->connection->pool, sizeof(ngx_rtmp_relay_ctx_t));if (ctx == NULL) {return NULL;}ngx_rtmp_set_ctx(s, ctx, ngx_rtmp_relay_module);}ctx->session = s;ctx->push_evt.data = s;ctx->push_evt.log = s->connection->log;/* 設置該 push_evt 事件的回調函數 */ctx->push_evt.handler = ngx_rtmp_relay_push_reconnect;if (ctx->publish) {return NULL;}if (ngx_rtmp_relay_copy_str(s->connection->pool, &ctx->name, name)!= NGX_OK){return NULL;}return ctx; }從 ngx_rtmp_relay_create_local_ctx 函數返回后,就一直返回到 ngx_rtmp_relay_publish 函數中,接著執行 next_publish 的下
一個函數。這里為 ngx_rtmp_live_publish。
2.6.5 ngx_rtmp_live_publish
static ngx_int_t ngx_rtmp_live_publish(ngx_rtmp_session_t *s, ngx_rtmp_publish_t *v) {ngx_rtmp_live_app_conf_t *lacf;ngx_rtmp_live_ctx_t *ctx;lacf = ngx_rtmp_get_module_app_conf(s, ngx_rtmp_live_module);if (lacf == NULL || !lacf->live) {goto next;}ngx_log_debug2(NGX_LOG_DEBUG_RTMP, s->connection->log, 0,"live: publish: name=‘%s‘ type=‘%s‘",v->name, v->type);/* join stream as publisher *//* 構建一個 ngx_rtmp_live_ctx_t 結構體作為發布者 */ngx_rtmp_live_join(s, v->name, 1);/* 這里獲取到的就是上面構建的 ngx_rtmp_live_ctx_t 結構體 */ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_live_module);if (ctx == NULL || !ctx->publishing) {goto next;}ctx->silent = v->silent;if (!ctx->silent) {/* 對之前客戶端發送的 publish 返回一個響應 */ngx_rtmp_send_status(s, "NetStream.Publish.Start","status", "Start publishing");}next:return next_publish(s, v); }send: onStatus(‘NetStream.Publish.Start‘) 圖(12)
之后又回到 epoll_wait 處,等待監聽的事件觸發。接下來的分析先看 nginx 的一段打印:
ngx_process_cycle.c:ngx_single_process_cycle:307 worker cycle ngx_epoll_module.c:ngx_epoll_process_events:798 epoll timer: 59761 ngx_epoll_module.c:ngx_epoll_process_events:860 epoll: fd:9 ev:0004 d:088F6950 ngx_event_timer.h:ngx_event_del_timer:36 event timer del: 9: 958705070 ngx_send.c:ngx_unix_send:37 send: fd:9 1537 of 1537 ngx_epoll_module.c:ngx_epoll_del_event:686 epoll del event: fd:9 op:3 ev:00002001 ngx_rtmp_handshake.c:ngx_rtmp_handshake_send:544 handshake: stage 7 ngx_recv.c:ngx_unix_recv:58 recv: eof:0, avail:0 ngx_event_timer.h:ngx_event_add_timer:82 event timer add: 9: 60000:958705071 ngx_epoll_module.c:ngx_epoll_process_events:860 epoll: fd:5 ev:0001 d:088F67E8 ngx_event_accept.c:ngx_event_accept:58 accept on 0.0.0.0:1935, ready: 0 ngx_alloc.c:ngx_memalign:66 posix_memalign: 08930870:4096 @16 ngx_event_accept.c:ngx_event_accept:293 *3 accept: 192.168.1.82:39334 fd:10 ngx_rtmp_init.c:ngx_rtmp_init_connection:124 *3 client connected ‘192.168.1.82‘ ngx_rtmp_handler.c:ngx_rtmp_set_chunk_size:823 setting chunk_size=128 ngx_alloc.c:ngx_memalign:66 posix_memalign: 089318A0:4096 @16 ngx_rtmp_limit_module.c:ngx_rtmp_limit_connect:87 rtmp limit: connect ngx_rtmp_handshake.c:ngx_rtmp_handshake:589 handshake: start server handshake ngx_rtmp_handshake.c:ngx_rtmp_alloc_handshake_buffer:208 handshake: allocating buffer ngx_recv.c:ngx_unix_recv:58 recv: eof:0, avail:0 ngx_event_timer.h:ngx_event_add_timer:82 event timer add: 10: 60000:958705071 ngx_epoll_module.c:ngx_epoll_add_event:625 epoll add event: fd:10 op:1 ev:80002001 ngx_event.c:ngx_process_events_and_timers:247 timer delta: 1 ngx_process_cycle.c:ngx_single_process_cycle:307 worker cycle ngx_epoll_module.c:ngx_epoll_process_events:798 epoll timer: 59760 ngx_epoll_module.c:ngx_epoll_process_events:860 epoll: fd:10 ev:0001 d:088F69C8 ngx_event_timer.h:ngx_event_del_timer:36 event timer del: 10: 958705071 ngx_recv.c:ngx_unix_recv:58 recv: eof:0, avail:1 ngx_recv.c:ngx_unix_recv:72 recv: fd:10 1537 of 1537 ngx_epoll_module.c:ngx_epoll_del_event:686 epoll del event: fd:10 op:2 ev:00000000 ngx_rtmp_handshake.c:ngx_rtmp_handshake_recv:429 handshake: stage 2 ngx_rtmp_handshake.c:ngx_rtmp_handshake_parse_challenge:303 handshake: peer version=14.13.0.12 epoch=958645070 ngx_rtmp_handshake.c:ngx_rtmp_handshake_parse_challenge:320 handshake: digest found at pos=638 ngx_send.c:ngx_unix_send:37 send: fd:10 1537 of 1537 ngx_rtmp_handshake.c:ngx_rtmp_handshake_send:544 handshake: stage 3 ngx_send.c:ngx_unix_send:37 send: fd:10 1536 of 1536 ngx_rtmp_handshake.c:ngx_rtmp_handshake_send:544 handshake: stage 4 ngx_recv.c:ngx_unix_recv:58 recv: eof:0, avail:1 ngx_recv.c:ngx_unix_recv:72 recv: fd:10 -1 of 1536 ngx_recv.c:ngx_unix_recv:150 recv() not ready (11: Resource temporarily unavailable) ngx_event_timer.h:ngx_event_add_timer:82 event timer add: 10: 60000:958705071 ngx_epoll_module.c:ngx_epoll_add_event:625 epoll add event: fd:10 op:1 ev:80002001 ngx_event.c:ngx_process_events_and_timers:247 timer delta: 0 ngx_process_cycle.c:ngx_single_process_cycle:307 worker cycle ngx_epoll_module.c:ngx_epoll_process_events:798 epoll timer: 59760 ngx_epoll_module.c:ngx_epoll_process_events:860 epoll: fd:9 ev:0001 d:088F6950 ngx_event_timer.h:ngx_event_del_timer:36 event timer del: 9: 958705071 ngx_recv.c:ngx_unix_recv:58 recv: eof:0, avail:1 ngx_recv.c:ngx_unix_recv:72 recv: fd:9 1537 of 1537 ngx_epoll_module.c:ngx_epoll_del_event:686 epoll del event: fd:9 op:2 ev:00000000 ngx_rtmp_handshake.c:ngx_rtmp_handshake_recv:429 handshake: stage 8 ngx_rtmp_handshake.c:ngx_rtmp_handshake_parse_challenge:303 handshake: peer version=13.10.14.13 epoch=958645071 ngx_rtmp_handshake.c:ngx_rtmp_handshake_parse_challenge:320 handshake: digest found at pos=557 ngx_recv.c:ngx_unix_recv:58 recv: eof:0, avail:1 ngx_recv.c:ngx_unix_recv:72 recv: fd:9 1536 of 1536 ngx_rtmp_handshake.c:ngx_rtmp_handshake_recv:429 handshake: stage 9 ngx_send.c:ngx_unix_send:37 send: fd:9 1536 of 1536 ngx_rtmp_handshake.c:ngx_rtmp_handshake_send:544 handshake: stage 10 ngx_rtmp_handshake.c:ngx_rtmp_handshake_done:362 handshake: done ngx_rtmp_relay_module.c:ngx_rtmp_relay_handshake_done:1319 rtmp relay module: handhshake done首先 fd = 9 為連接上游服務器(192.168.1.82:1935) 時創建的作為客戶端的 STREAM 類型的 socket 套接字,而 fd = 5 為 nginx
啟動時創建的 STREAM 類型的 socket 監聽套接字。因此,從打印中可以看出,上面的打印是這么一個流程:
ngx_rtmp_handshake_send 函數,該函數將已經準備好的 C0 和 C1 通過該寫事件對應的 send 函數,即
ngx_unix_send 函數發送給上游服務器(192.168.1.82:1935);發送完后進入 CLIENT_RECV_CHALLENGE(7) 階段,
該階段為等待接收服務器 S0 和 S1 的階段;
連接,接受連接后服務器使用 fd:10 與客戶端進行交互,接著服務器開始進入 handshake 階段;
述,和之前分析的 hanshake 一樣。
客戶端發送 C2 后,會進入 NGX_RTMP_HANDSHAKE_CLIENT_DONE(10) 階段,接著會調用該函數 ngx_rtmp_handshake_done:
static void ngx_rtmp_handshake_done(ngx_rtmp_session_t *s) {ngx_rtmp_free_handshake_buffers(s);ngx_log_debug0(NGX_LOG_DEBUG_RTMP, s->connection->log, 0,"handshake: done");if (ngx_rtmp_fire_event(s, NGX_RTMP_HANDSHAKE_DONE,NULL, NULL) != NGX_OK){ngx_rtmp_finalize_session(s);return;}ngx_rtmp_cycle(s); }該函數接著會調用到 ngx_rtmp_relay_handshake_done 函數:
static ngx_int_t ngx_rtmp_relay_handshake_done(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h,ngx_chain_t *in) {ngx_log_debug0(NGX_LOG_DEBUG_RTMP, s->connection->log, 0, "rtmp relay module: handhshake done");ngx_rtmp_relay_ctx_t *ctx;ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_relay_module);if (ctx == NULL || !s->relay) {ngx_log_debug0(NGX_LOG_DEBUG_RTMP, s->connection->log, 0, "rtmp relay module: return");return NGX_OK;}/* 主要是向服務器發送 connect 連接命令 */return ngx_rtmp_relay_send_connect(s); }2.7 客戶端(fd = 9)發送:connect
客戶端(192.168.1.82:39334, fd = 9) hanshake 成功后會向服務器發送 connec 連接命令。
2.7.1 ngx_rtmp_relay_send_connect
static ngx_int_t ngx_rtmp_relay_send_connect(ngx_rtmp_session_t *s) {ngx_log_debug0(NGX_LOG_DEBUG_RTMP, s->connection->log, 0, "rtmp relay module: send connect");static double trans = NGX_RTMP_RELAY_CONNECT_TRANS;static double acodecs = 3575;static double vcodecs = 252;static ngx_rtmp_amf_elt_t out_cmd[] = {{ NGX_RTMP_AMF_STRING,ngx_string("app"),NULL, 0 }, /* <-- fill */{ NGX_RTMP_AMF_STRING,ngx_string("tcUrl"),NULL, 0 }, /* <-- fill */{ NGX_RTMP_AMF_STRING,ngx_string("pageUrl"),NULL, 0 }, /* <-- fill */{ NGX_RTMP_AMF_STRING,ngx_string("swfUrl"),NULL, 0 }, /* <-- fill */{ NGX_RTMP_AMF_STRING,ngx_string("flashVer"),NULL, 0 }, /* <-- fill */{ NGX_RTMP_AMF_NUMBER,ngx_string("audioCodecs"),&acodecs, 0 },{ NGX_RTMP_AMF_NUMBER,ngx_string("videoCodecs"),&vcodecs, 0 }};static ngx_rtmp_amf_elt_t out_elts[] = {{ NGX_RTMP_AMF_STRING,ngx_null_string,"connect", 0 },{ NGX_RTMP_AMF_NUMBER,ngx_null_string,&trans, 0 },{ NGX_RTMP_AMF_OBJECT,ngx_null_string,out_cmd, sizeof(out_cmd) }};ngx_rtmp_core_app_conf_t *cacf;ngx_rtmp_core_srv_conf_t *cscf;ngx_rtmp_relay_ctx_t *ctx;ngx_rtmp_header_t h;size_t len, url_len;u_char *p, *url_end;cacf = ngx_rtmp_get_module_app_conf(s, ngx_rtmp_core_module);cscf = ngx_rtmp_get_module_srv_conf(s, ngx_rtmp_core_module);ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_relay_module);if (cacf == NULL || ctx == NULL) {return NGX_ERROR;}/* app */if (ctx->app.len) {out_cmd[0].data = ctx->app.data;out_cmd[0].len = ctx->app.len;} else {out_cmd[0].data = cacf->name.data;out_cmd[0].len = cacf->name.len;}/* tcUrl */if (ctx->tc_url.len) {out_cmd[1].data = ctx->tc_url.data;out_cmd[1].len = ctx->tc_url.len;} else {len = sizeof("rtmp://") - 1 + ctx->url.len +sizeof("/") - 1 + ctx->app.len;p = ngx_palloc(s->connection->pool, len);if (p == NULL) {return NGX_ERROR;}out_cmd[1].data = p;p = ngx_cpymem(p, "rtmp://", sizeof("rtmp://") - 1);url_len = ctx->url.len;url_end = ngx_strlchr(ctx->url.data, ctx->url.data + ctx->url.len, ‘/‘);if (url_end) {url_len = (size_t) (url_end - ctx->url.data);}p = ngx_cpymem(p, ctx->url.data, url_len);*p++ = ‘/‘;p = ngx_cpymem(p, ctx->app.data, ctx->app.len);out_cmd[1].len = p - (u_char *)out_cmd[1].data;}/* pageUrl */out_cmd[2].data = ctx->page_url.data;out_cmd[2].len = ctx->page_url.len;/* swfUrl */out_cmd[3].data = ctx->swf_url.data;out_cmd[3].len = ctx->swf_url.len;/* flashVer */if (ctx->flash_ver.len) {out_cmd[4].data = ctx->flash_ver.data;out_cmd[4].len = ctx->flash_ver.len;} else {out_cmd[4].data = NGX_RTMP_RELAY_FLASHVER;out_cmd[4].len = sizeof(NGX_RTMP_RELAY_FLASHVER) - 1;}ngx_memzero(&h, sizeof(h));h.csid = NGX_RTMP_RELAY_CSID_AMF_INI;h.type = NGX_RTMP_MSG_AMF_CMD;return ngx_rtmp_send_chunk_size(s, cscf->chunk_size) != NGX_OK|| ngx_rtmp_send_ack_size(s, cscf->ack_window) != NGX_OK|| ngx_rtmp_send_amf(s, &h, out_elts,sizeof(out_elts) / sizeof(out_elts[0])) != NGX_OK? NGX_ERROR: NGX_OK; }發送完這幾個 RTMP 包,后,又回到 epoll_wait 中進行監聽。
下面的分析區分一個服務器,兩個客戶端:
- 服務器:192.168.1.82:1935
- 客戶端:obs 推流
- 客戶端:192.168.1.82:xxxx
2.8 服務器 接收 客戶端 obs: amf_meta(18)
此時,監聽到 obs 客戶端發送的類型為 amf_meta(18) 的 rtmp 消息。
receive: @setDataFrame(meta_data 18) 圖(13)
對于?"@setDataFrame",僅有 ngx_rtmp_codec_module 模塊對其設置了會調函數,為 ngx_rtmp_codec_meta_data 函數:
2.8.1 ngx_rtmp_codec_meta_data
static ngx_int_t ngx_rtmp_codec_meta_data(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h,ngx_chain_t *in) {ngx_rtmp_codec_app_conf_t *cacf;ngx_rtmp_codec_ctx_t *ctx;ngx_uint_t skip;static struct {double width;double height;double duration;double frame_rate;double video_data_rate;double video_codec_id_n;u_char video_codec_id_s[32];double audio_data_rate;double audio_codec_id_n;u_char audio_codec_id_s[32];u_char profile[32];u_char level[32];} v;static ngx_rtmp_amf_elt_t in_video_codec_id[] = {{ NGX_RTMP_AMF_NUMBER,ngx_null_string,&v.video_codec_id_n, 0 },{ NGX_RTMP_AMF_STRING,ngx_null_string,&v.video_codec_id_s, sizeof(v.video_codec_id_s) },};static ngx_rtmp_amf_elt_t in_audio_codec_id[] = {{ NGX_RTMP_AMF_NUMBER,ngx_null_string,&v.audio_codec_id_n, 0 },{ NGX_RTMP_AMF_STRING,ngx_null_string,&v.audio_codec_id_s, sizeof(v.audio_codec_id_s) },};static ngx_rtmp_amf_elt_t in_inf[] = {{ NGX_RTMP_AMF_NUMBER,ngx_string("width"),&v.width, 0 },{ NGX_RTMP_AMF_NUMBER,ngx_string("height"),&v.height, 0 },{ NGX_RTMP_AMF_NUMBER,ngx_string("duration"),&v.duration, 0 },{ NGX_RTMP_AMF_NUMBER,ngx_string("framerate"),&v.frame_rate, 0 },{ NGX_RTMP_AMF_NUMBER,ngx_string("fps"),&v.frame_rate, 0 },{ NGX_RTMP_AMF_NUMBER,ngx_string("videodatarate"),&v.video_data_rate, 0 },{ NGX_RTMP_AMF_VARIANT,ngx_string("videocodecid"),in_video_codec_id, sizeof(in_video_codec_id) },{ NGX_RTMP_AMF_NUMBER,ngx_string("audiodatarate"),&v.audio_data_rate, 0 },{ NGX_RTMP_AMF_VARIANT,ngx_string("audiocodecid"),in_audio_codec_id, sizeof(in_audio_codec_id) },{ NGX_RTMP_AMF_STRING,ngx_string("profile"),&v.profile, sizeof(v.profile) },{ NGX_RTMP_AMF_STRING,ngx_string("level"),&v.level, sizeof(v.level) },};static ngx_rtmp_amf_elt_t in_elts[] = {{ NGX_RTMP_AMF_STRING,ngx_null_string,NULL, 0 },{ NGX_RTMP_AMF_OBJECT,ngx_null_string,in_inf, sizeof(in_inf) },};cacf = ngx_rtmp_get_module_app_conf(s, ngx_rtmp_codec_module);ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_codec_module);if (ctx == NULL) {ctx = ngx_pcalloc(s->connection->pool, sizeof(ngx_rtmp_codec_ctx_t));ngx_rtmp_set_ctx(s, ctx, ngx_rtmp_codec_module);}ngx_memzero(&v, sizeof(v));/* use -1 as a sign of unchanged data;* 0 is a valid value for uncompressed audio */v.audio_codec_id_n = -1;/* FFmpeg sends a string in front of actal metadata; ignore it */skip = !(in->buf->last > in->buf->pos&& *in->buf->pos == NGX_RTMP_AMF_STRING);if (ngx_rtmp_receive_amf(s, in, in_elts + skip,sizeof(in_elts) / sizeof(in_elts[0]) - skip)){ngx_log_error(NGX_LOG_ERR, s->connection->log, 0,"codec: error parsing data frame");return NGX_OK;}ctx->width = (ngx_uint_t) v.width;ctx->height = (ngx_uint_t) v.height;ctx->duration = (ngx_uint_t) v.duration;ctx->frame_rate = (ngx_uint_t) v.frame_rate;ctx->video_data_rate = (ngx_uint_t) v.video_data_rate;ctx->video_codec_id = (ngx_uint_t) v.video_codec_id_n;ctx->audio_data_rate = (ngx_uint_t) v.audio_data_rate;ctx->audio_codec_id = (v.audio_codec_id_n == -1? 0 : v.audio_codec_id_n == 0? NGX_RTMP_AUDIO_UNCOMPRESSED : (ngx_uint_t) v.audio_codec_id_n);ngx_memcpy(ctx->profile, v.profile, sizeof(v.profile));ngx_memcpy(ctx->level, v.level, sizeof(v.level));ngx_log_debug8(NGX_LOG_DEBUG_RTMP, s->connection->log, 0,"codec: data frame: ""width=%ui height=%ui duration=%ui frame_rate=%ui ""video=%s (%ui) audio=%s (%ui)",ctx->width, ctx->height, ctx->duration, ctx->frame_rate,ngx_rtmp_get_video_codec_name(ctx->video_codec_id),ctx->video_codec_id,ngx_rtmp_get_audio_codec_name(ctx->audio_codec_id),ctx->audio_codec_id);switch (cacf->meta) {case NGX_RTMP_CODEC_META_ON: // 初始化為該值return ngx_rtmp_codec_reconstruct_meta(s);case NGX_RTMP_CODEC_META_COPY:return ngx_rtmp_codec_copy_meta(s, h, in);}/* NGX_RTMP_CODEC_META_OFF */return NGX_OK; }該函數主要是解析 setDataFrame 的數據,然后調用 ngx_rtmp_codec_reconstruct_meta 函數。
2.8.2 ngx_rtmp_codec_reconstruct_meta
static ngx_int_t ngx_rtmp_codec_reconstruct_meta(ngx_rtmp_session_t *s) {ngx_rtmp_codec_ctx_t *ctx;ngx_rtmp_core_srv_conf_t *cscf;ngx_int_t rc;static struct {double width;double height;double duration;double frame_rate;double video_data_rate;double video_codec_id;double audio_data_rate;double audio_codec_id;u_char profile[32];u_char level[32];} v;static ngx_rtmp_amf_elt_t out_inf[] = {{ NGX_RTMP_AMF_STRING,ngx_string("Server"),"NGINX RTMP (github.com/arut/nginx-rtmp-module)", 0 },{ NGX_RTMP_AMF_NUMBER,ngx_string("width"),&v.width, 0 },{ NGX_RTMP_AMF_NUMBER,ngx_string("height"),&v.height, 0 },{ NGX_RTMP_AMF_NUMBER,ngx_string("displayWidth"),&v.width, 0 },{ NGX_RTMP_AMF_NUMBER,ngx_string("displayHeight"),&v.height, 0 },{ NGX_RTMP_AMF_NUMBER,ngx_string("duration"),&v.duration, 0 },{ NGX_RTMP_AMF_NUMBER,ngx_string("framerate"),&v.frame_rate, 0 },{ NGX_RTMP_AMF_NUMBER,ngx_string("fps"),&v.frame_rate, 0 },{ NGX_RTMP_AMF_NUMBER,ngx_string("videodatarate"),&v.video_data_rate, 0 },{ NGX_RTMP_AMF_NUMBER,ngx_string("videocodecid"),&v.video_codec_id, 0 },{ NGX_RTMP_AMF_NUMBER,ngx_string("audiodatarate"),&v.audio_data_rate, 0 },{ NGX_RTMP_AMF_NUMBER,ngx_string("audiocodecid"),&v.audio_codec_id, 0 },{ NGX_RTMP_AMF_STRING,ngx_string("profile"),&v.profile, sizeof(v.profile) },{ NGX_RTMP_AMF_STRING,ngx_string("level"),&v.level, sizeof(v.level) },};static ngx_rtmp_amf_elt_t out_elts[] = {{ NGX_RTMP_AMF_STRING,ngx_null_string,"onMetaData", 0 },{ NGX_RTMP_AMF_OBJECT,ngx_null_string,out_inf, sizeof(out_inf) },};ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_codec_module);if (ctx == NULL) {return NGX_OK;}cscf = ngx_rtmp_get_module_srv_conf(s, ngx_rtmp_core_module);if (ctx->meta) {ngx_rtmp_free_shared_chain(cscf, ctx->meta);ctx->meta = NULL;}v.width = ctx->width;v.height = ctx->height;v.duration = ctx->duration;v.frame_rate = ctx->frame_rate;v.video_data_rate = ctx->video_data_rate;v.video_codec_id = ctx->video_codec_id;v.audio_data_rate = ctx->audio_data_rate;v.audio_codec_id = ctx->audio_codec_id;ngx_memcpy(v.profile, ctx->profile, sizeof(ctx->profile));ngx_memcpy(v.level, ctx->level, sizeof(ctx->level));rc = ngx_rtmp_append_amf(s, &ctx->meta, NULL, out_elts,sizeof(out_elts) / sizeof(out_elts[0]));if (rc != NGX_OK || ctx->meta == NULL) {return NGX_ERROR;}return ngx_rtmp_codec_prepare_meta(s, 0); }2.8.3 ngx_rtmp_codec_prepare_meta
static ngx_int_t ngx_rtmp_codec_prepare_meta(ngx_rtmp_session_t *s, uint32_t timestamp) {ngx_rtmp_header_t h;ngx_rtmp_codec_ctx_t *ctx;ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_codec_module);ngx_memzero(&h, sizeof(h));h.csid = NGX_RTMP_CSID_AMF;h.msid = NGX_RTMP_MSID;h.type = NGX_RTMP_MSG_AMF_META;h.timestamp = timestamp;/* 構造完整的 rtmp 消息 */ngx_rtmp_prepare_message(s, &h, NULL, ctx->meta);ctx->meta_version = ngx_rtmp_codec_get_next_version();return NGX_OK; }2.9 服務器 接收 客戶端(192.168.1.82:xxx):chunk_size(1)
服務器接收到客戶端發送的設置塊大小消息。此時服務器會調用到 ngx_rtmp_set_chunk_size 函數進行塊大小的設置。
2.9.1 ngx_rtmp_set_chunk_size
ngx_int_t ngx_rtmp_set_chunk_size(ngx_rtmp_session_t *s, ngx_uint_t size) {ngx_rtmp_core_srv_conf_t *cscf;ngx_chain_t *li, *fli, *lo, *flo;ngx_buf_t *bi, *bo;ngx_int_t n;ngx_log_debug1(NGX_LOG_DEBUG_RTMP, s->connection->log, 0,"setting chunk_size=%ui", size);if (size > NGX_RTMP_MAX_CHUNK_SIZE) {ngx_log_error(NGX_LOG_ALERT, s->connection->log, 0,"too big RTMP chunk size:%ui", size);return NGX_ERROR;}cscf = ngx_rtmp_get_module_srv_conf(s, ngx_rtmp_core_module);s->in_old_pool = s->in_pool;s->in_chunk_size = size;s->in_pool = ngx_create_pool(4096, s->connection->log);/* copy existing chunk data */if (s->in_old_pool) {s->in_chunk_size_changing = 1;s->in_streams[0].in = NULL;for(n = 1; n < cscf->max_streams; ++n) {/* stream buffer is circular* for all streams except for the current one* (which caused this chunk size change);* we can simply ignore it */li = s->in_streams[n].in;if (li == NULL || li->next == NULL) {s->in_streams[n].in = NULL;continue;}/* move from last to the first */li = li->next;fli = li;lo = ngx_rtmp_alloc_in_buf(s);if (lo == NULL) {return NGX_ERROR;}flo = lo;for ( ;; ) {bi = li->buf;bo = lo->buf;if (bo->end - bo->last >= bi->last - bi->pos) {bo->last = ngx_cpymem(bo->last, bi->pos,bi->last - bi->pos);li = li->next;if (li == fli) {lo->next = flo;s->in_streams[n].in = lo;break;}continue;}bi->pos += (ngx_cpymem(bo->last, bi->pos,bo->end - bo->last) - bo->last);lo->next = ngx_rtmp_alloc_in_buf(s);lo = lo->next;if (lo == NULL) {return NGX_ERROR;}}}}return NGX_OK; }2.10 服務器 接收 客戶端(192.168.1.82:xxx):ack_size(5)
服務器接收到客戶端發送的設置應答窗口大小的消息。
2.10.1 ngx_rtmp_protocol_message_handler
ngx_int_t ngx_rtmp_protocol_message_handler(ngx_rtmp_session_t *s,ngx_rtmp_header_t *h, ngx_chain_t *in) {...switch(h->type) {...case NGX_RTMP_MSG_ACK_SIZE:/* receive window size =val */ngx_log_debug1(NGX_LOG_DEBUG_RTMP, s->connection->log, 0,"receive ack_size=%uD", val);/* 直接設置應答窗口大小 */s->ack_size = val;break;...} }2.11 服務器 接收 客戶端(192.168.1.82:xxx): amf_cmd(20) 之 connect
服務器接收到客戶端發送的 connect 連接命令。該客戶端要連接的 app 為 live。
抓不到包,只能看打印:
2.11.1 ngx_rtmp_cmd_connect
static ngx_int_t ngx_rtmp_cmd_connect(ngx_rtmp_session_t *s, ngx_rtmp_connect_t *v) {ngx_log_error(NGX_LOG_INFO, s->connection->log, 0,"rtmp cmd: connect");ngx_rtmp_core_srv_conf_t *cscf;ngx_rtmp_core_app_conf_t **cacfp;ngx_uint_t n;ngx_rtmp_header_t h;u_char *p;static double trans;static double capabilities = NGX_RTMP_CAPABILITIES;static double object_encoding = 0;/* 以下內容為服務器將要對客戶端的 connect 命令返回的 amf 類型的響應 */static ngx_rtmp_amf_elt_t out_obj[] = {{ NGX_RTMP_AMF_STRING,ngx_string("fmsVer"),NGX_RTMP_FMS_VERSION, 0 },{ NGX_RTMP_AMF_NUMBER,ngx_string("capabilities"),&capabilities, 0 },};static ngx_rtmp_amf_elt_t out_inf[] = {{ NGX_RTMP_AMF_STRING,ngx_string("level"),"status", 0 },{ NGX_RTMP_AMF_STRING,ngx_string("code"),"NetConnection.Connect.Success", 0 },{ NGX_RTMP_AMF_STRING,ngx_string("description"),"Connection succeeded.", 0 },{ NGX_RTMP_AMF_NUMBER,ngx_string("objectEncoding"),&object_encoding, 0 }};static ngx_rtmp_amf_elt_t out_elts[] = {{ NGX_RTMP_AMF_STRING,ngx_null_string,"_result", 0 },{ NGX_RTMP_AMF_NUMBER,ngx_null_string,&trans, 0 },{ NGX_RTMP_AMF_OBJECT,ngx_null_string,out_obj, sizeof(out_obj) },{ NGX_RTMP_AMF_OBJECT,ngx_null_string,out_inf, sizeof(out_inf) },};if (s->connected) {ngx_log_error(NGX_LOG_INFO, s->connection->log, 0,"connect: duplicate connection");return NGX_ERROR;}cscf = ngx_rtmp_get_module_srv_conf(s, ngx_rtmp_core_module);trans = v->trans;/* fill session parameters */s->connected = 1;ngx_memzero(&h, sizeof(h));h.csid = NGX_RTMP_CSID_AMF_INI;h.type = NGX_RTMP_MSG_AMF_CMD;#define NGX_RTMP_SET_STRPAR(name) s->name.len = ngx_strlen(v->name); s->name.data = ngx_palloc(s->connection->pool, s->name.len); ngx_memcpy(s->name.data, v->name, s->name.len)NGX_RTMP_SET_STRPAR(app);NGX_RTMP_SET_STRPAR(args);NGX_RTMP_SET_STRPAR(flashver);NGX_RTMP_SET_STRPAR(swf_url);NGX_RTMP_SET_STRPAR(tc_url);NGX_RTMP_SET_STRPAR(page_url);#undef NGX_RTMP_SET_STRPARp = ngx_strlchr(s->app.data, s->app.data + s->app.len, ‘?‘);if (p) {s->app.len = (p - s->app.data);}s->acodecs = (uint32_t) v->acodecs;s->vcodecs = (uint32_t) v->vcodecs;/* 找到客戶端 connect 的應用配置 *//* find application & set app_conf */cacfp = cscf->applications.elts;for(n = 0; n < cscf->applications.nelts; ++n, ++cacfp) {if ((*cacfp)->name.len == s->app.len &&ngx_strncmp((*cacfp)->name.data, s->app.data, s->app.len) == 0){/* found app! */s->app_conf = (*cacfp)->app_conf;break;}}if (s->app_conf == NULL) {ngx_log_error(NGX_LOG_INFO, s->connection->log, 0,"connect: application not found: ‘%V‘", &s->app);return NGX_ERROR;}object_encoding = v->object_encoding;/* 發送應答窗口大小:ack_size 給客戶端,該消息是用來通知對方應答窗口的大小,* 發送方在發送了等于窗口大小的數據之后,等的愛接收對方的應答消息(在接收 * 到應答消息之前停止發送數據)。接收當必須發送應答消息,在會話開始時,在 * 會話開始時,會從上一次發送應答之后接收到了等于窗口大小的數據 */return ngx_rtmp_send_ack_size(s, cscf->ack_window) != NGX_OK ||/* 發送 設置流帶寬消息。發送此消息來說明對方的出口帶寬限制,接收方以此來限制 * 自己的出口帶寬,即限制未被應答的消息數據大小。接收到此消息的一方,如果 * 窗口大小與上一次發送的不一致,應該回復應答窗口大小的消息 */ngx_rtmp_send_bandwidth(s, cscf->ack_window,NGX_RTMP_LIMIT_DYNAMIC) != NGX_OK ||/* 發送 設置塊消息消息,用來通知對方新的最大的塊大小。 */ngx_rtmp_send_chunk_size(s, cscf->chunk_size) != NGX_OK ||ngx_rtmp_send_amf(s, &h, out_elts,sizeof(out_elts) / sizeof(out_elts[0]))!= NGX_OK ? NGX_ERROR : NGX_OK; }這里,服務器向客戶端(192.168.1.82:xxxx)發送了 ack_size、bandwidth、chunk_size 和 對 connect 的響應的包。
2.12 客戶端(192.168.1.82:xxx) 接收 服務器: ack_size(5)
客戶端接收到服務器發來的設置應答窗口大小的消息。
2.12.1 ngx_rtmp_protocol_message_handler
ngx_int_t ngx_rtmp_protocol_message_handler(ngx_rtmp_session_t *s,ngx_rtmp_header_t *h, ngx_chain_t *in) {...switch(h->type) {...case NGX_RTMP_MSG_ACK_SIZE:/* receive window size =val */ngx_log_debug1(NGX_LOG_DEBUG_RTMP, s->connection->log, 0,"receive ack_size=%uD", val);/* 直接設置應答窗口大小 */s->ack_size = val;break;...} }2.13 客戶端(192.168.1.82:xxx) 接收 服務器: bandwidth(6)
客戶端接收到服務器發來的設置流帶寬的消息。
2.13.1 ngx_rtmp_protocol_message_handler
ngx_int_t ngx_rtmp_protocol_message_handler(ngx_rtmp_session_t *s,ngx_rtmp_header_t *h, ngx_chain_t *in) {...switch(h->type) {...case NGX_RTMP_MSG_BANDWIDTH:if (b->last - b->pos >= 5) {limit = *(uint8_t*)&b->pos[4];(void)val;(void)limit;ngx_log_debug2(NGX_LOG_DEBUG_RTMP, s->connection->log, 0,"receive bandwidth=%uD limit=%d",val, (int)limit);/* receive window size =val* && limit */}break;...} }2.13 客戶端(192.168.1.82:xxx) 接收 服務器: chunk_size(1)
客戶端接收到服務器發來的設置塊大小的消息。因此調用 ngx_rtmp_set_chunk_size 函數進行設置。
2.13 客戶端(192.168.1.82:xxx) 接收 服務器: amf_cmd(20) 之 _result()
客戶端接收到服務器發送的對 connect 的響應:_result(NetConnection.Connect.Success)。
2.13.1 ngx_rtmp_relay_on_result
static ngx_int_t ngx_rtmp_relay_on_result(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h,ngx_chain_t *in) {ngx_rtmp_relay_ctx_t *ctx;static struct {double trans;u_char level[32];u_char code[128];u_char desc[1024];} v;static ngx_rtmp_amf_elt_t in_inf[] = {{ NGX_RTMP_AMF_STRING,ngx_string("level"),&v.level, sizeof(v.level) },{ NGX_RTMP_AMF_STRING,ngx_string("code"),&v.code, sizeof(v.code) },{ NGX_RTMP_AMF_STRING,ngx_string("description"),&v.desc, sizeof(v.desc) },};static ngx_rtmp_amf_elt_t in_elts[] = {{ NGX_RTMP_AMF_NUMBER,ngx_null_string,&v.trans, 0 },{ NGX_RTMP_AMF_NULL,ngx_null_string,NULL, 0 },{ NGX_RTMP_AMF_OBJECT,ngx_null_string,in_inf, sizeof(in_inf) },};ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_relay_module);if (ctx == NULL || !s->relay) {return NGX_OK;}ngx_memzero(&v, sizeof(v));if (ngx_rtmp_receive_amf(s, in, in_elts,sizeof(in_elts) / sizeof(in_elts[0]))){return NGX_ERROR;}ngx_log_debug3(NGX_LOG_DEBUG_RTMP, s->connection->log, 0,"relay: _result: level=‘%s‘ code=‘%s‘ description=‘%s‘",v.level, v.code, v.desc);switch ((ngx_int_t)v.trans) {case NGX_RTMP_RELAY_CONNECT_TRANS:/* 向服務器發送 createStream 命令 */return ngx_rtmp_relay_send_create_stream(s);case NGX_RTMP_RELAY_CREATE_STREAM_TRANS:if (ctx->publish != ctx && !s->static_relay) {if (ngx_rtmp_relay_send_publish(s) != NGX_OK) {return NGX_ERROR;}return ngx_rtmp_relay_play_local(s);} else {if (ngx_rtmp_relay_send_play(s) != NGX_OK) {return NGX_ERROR;}return ngx_rtmp_relay_publish_local(s);}default:return NGX_OK;} }該函數中首先解析接收到響應數據,然后根據 v.trans 調用相應的函數進行處理,這里為調用 ngx_rtmp_relay_send_create_stream。
2.13.2 ngx_rtmp_relay_send_create_stream
static ngx_int_t ngx_rtmp_relay_send_create_stream(ngx_rtmp_session_t *s) {ngx_log_error(NGX_LOG_INFO, s->connection->log, 0, "rtmp relay: send create stream");static double trans = NGX_RTMP_RELAY_CREATE_STREAM_TRANS;static ngx_rtmp_amf_elt_t out_elts[] = {{ NGX_RTMP_AMF_STRING,ngx_null_string,"createStream", 0 },{ NGX_RTMP_AMF_NUMBER,ngx_null_string,&trans, 0 },{ NGX_RTMP_AMF_NULL,ngx_null_string,NULL, 0 }};ngx_rtmp_header_t h;ngx_memzero(&h, sizeof(h));h.csid = NGX_RTMP_RELAY_CSID_AMF_INI;h.type = NGX_RTMP_MSG_AMF_CMD;return ngx_rtmp_send_amf(s, &h, out_elts,sizeof(out_elts) / sizeof(out_elts[0])); }該函數主要是構建 createStream 包,然后發送給服務器。
2.14 服務器 接收 客戶端(192.168.1.82:xxx): amf_cmd(20) 之 createStream
服務器接收到客戶端發來的 createStream 命令消息。
2.14.1 ngx_rtmp_cmd_create_stream_init
static ngx_int_t ngx_rtmp_cmd_create_stream_init(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h,ngx_chain_t *in) {static ngx_rtmp_create_stream_t v;static ngx_rtmp_amf_elt_t in_elts[] = {{ NGX_RTMP_AMF_NUMBER,ngx_null_string,&v.trans, sizeof(v.trans) },};/* 解析該 createStream 命令消息,獲取 v.trans 值 */if (ngx_rtmp_receive_amf(s, in, in_elts,sizeof(in_elts) / sizeof(in_elts[0]))){return NGX_ERROR;}ngx_log_error(NGX_LOG_INFO, s->connection->log, 0, "createStream");return ngx_rtmp_create_stream(s, &v); }接著,從該函數中開始調用 ngx_rtmp_create_stream 構建的函數鏈表。這里調用到的是 ngx_rtmp_cmd_create_stream
函數。
2.14.2 ngx_rtmp_cmd_create_stream
static ngx_int_t ngx_rtmp_cmd_create_stream(ngx_rtmp_session_t *s, ngx_rtmp_create_stream_t *v) {ngx_log_error(NGX_LOG_INFO, s->connection->log, 0, "rtmp cmd: create stream");/* support one message stream per connection */static double stream;static double trans;ngx_rtmp_header_t h;static ngx_rtmp_amf_elt_t out_elts[] = {{ NGX_RTMP_AMF_STRING,ngx_null_string,"_result", 0 },{ NGX_RTMP_AMF_NUMBER,ngx_null_string,&trans, 0 },{ NGX_RTMP_AMF_NULL,ngx_null_string,NULL, 0 },{ NGX_RTMP_AMF_NUMBER,ngx_null_string,&stream, sizeof(stream) },};trans = v->trans;stream = NGX_RTMP_MSID;ngx_memzero(&h, sizeof(h));h.csid = NGX_RTMP_CSID_AMF_INI;h.type = NGX_RTMP_MSG_AMF_CMD;return ngx_rtmp_send_amf(s, &h, out_elts,sizeof(out_elts) / sizeof(out_elts[0])) == NGX_OK ?NGX_DONE : NGX_ERROR; }該函數是構建對 createStream 的響應。
2.15 客戶端(192.168.1.82:xxx) 接收 服務器: amf_cmd(20) 之 _result()
客戶端接收到服務器對 createStream 的響應包:_result()
2.15.1 ngx_rtmp_relay_on_result
static ngx_int_t ngx_rtmp_relay_on_result(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h,ngx_chain_t *in) {ngx_rtmp_relay_ctx_t *ctx;static struct {double trans;u_char level[32];u_char code[128];u_char desc[1024];} v;static ngx_rtmp_amf_elt_t in_inf[] = {{ NGX_RTMP_AMF_STRING,ngx_string("level"),&v.level, sizeof(v.level) },{ NGX_RTMP_AMF_STRING,ngx_string("code"),&v.code, sizeof(v.code) },{ NGX_RTMP_AMF_STRING,ngx_string("description"),&v.desc, sizeof(v.desc) },};static ngx_rtmp_amf_elt_t in_elts[] = {{ NGX_RTMP_AMF_NUMBER,ngx_null_string,&v.trans, 0 },{ NGX_RTMP_AMF_NULL,ngx_null_string,NULL, 0 },{ NGX_RTMP_AMF_OBJECT,ngx_null_string,in_inf, sizeof(in_inf) },};ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_relay_module);if (ctx == NULL || !s->relay) {return NGX_OK;}ngx_memzero(&v, sizeof(v));if (ngx_rtmp_receive_amf(s, in, in_elts,sizeof(in_elts) / sizeof(in_elts[0]))){return NGX_ERROR;}ngx_log_debug3(NGX_LOG_DEBUG_RTMP, s->connection->log, 0,"relay: _result: level=‘%s‘ code=‘%s‘ description=‘%s‘",v.level, v.code, v.desc);switch ((ngx_int_t)v.trans) {case NGX_RTMP_RELAY_CONNECT_TRANS:return ngx_rtmp_relay_send_create_stream(s);case NGX_RTMP_RELAY_CREATE_STREAM_TRANS:if (ctx->publish != ctx && !s->static_relay) {/* 向服務器發送 publish 命令 */if (ngx_rtmp_relay_send_publish(s) != NGX_OK) {return NGX_ERROR;}return ngx_rtmp_relay_play_local(s);} else {if (ngx_rtmp_relay_send_play(s) != NGX_OK) {return NGX_ERROR;}return ngx_rtmp_relay_publish_local(s);}default:return NGX_OK;} }該函數中首先解析接收到響應數據,然后根據 v.trans 調用相應的函數進行處理,這里為調用 ngx_rtmp_relay_send_publish。
2.15.2 ngx_rtmp_relay_send_publish
static ngx_int_t ngx_rtmp_relay_send_publish(ngx_rtmp_session_t *s) {ngx_log_error(NGX_LOG_INFO, s->connection->log, 0, "rtmp relay: send publish");static double trans;static ngx_rtmp_amf_elt_t out_elts[] = {{ NGX_RTMP_AMF_STRING,ngx_null_string,"publish", 0 },{ NGX_RTMP_AMF_NUMBER,ngx_null_string,&trans, 0 },{ NGX_RTMP_AMF_NULL,ngx_null_string,NULL, 0 },{ NGX_RTMP_AMF_STRING,ngx_null_string,NULL, 0 }, /* <- to fill */{ NGX_RTMP_AMF_STRING,ngx_null_string,"live", 0 }};ngx_rtmp_header_t h;ngx_rtmp_relay_ctx_t *ctx;ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_relay_module);if (ctx == NULL) {return NGX_ERROR;}if (ctx->play_path.len) {out_elts[3].data = ctx->play_path.data;out_elts[3].len = ctx->play_path.len;} else {out_elts[3].data = ctx->name.data;out_elts[3].len = ctx->name.len;}ngx_memzero(&h, sizeof(h));h.csid = NGX_RTMP_RELAY_CSID_AMF;h.msid = NGX_RTMP_RELAY_MSID;h.type = NGX_RTMP_MSG_AMF_CMD;return ngx_rtmp_send_amf(s, &h, out_elts,sizeof(out_elts) / sizeof(out_elts[0])); }2.15.3 ngx_rtmp_relay_play_local
static ngx_int_t ngx_rtmp_relay_play_local(ngx_rtmp_session_t *s) {ngx_rtmp_play_t v;ngx_rtmp_relay_ctx_t *ctx;ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_relay_module);if (ctx == NULL) {return NGX_ERROR;}ngx_memzero(&v, sizeof(ngx_rtmp_play_t));v.silent = 1;*(ngx_cpymem(v.name, ctx->name.data,ngx_min(sizeof(v.name) - 1, ctx->name.len))) = 0;return ngx_rtmp_play(s, &v); }在該函數中又調用 ngx_rtmp_play 構建的函數鏈表,這里主要調用了 ngx_rtmp_live_play 函數。
2.15.4 ngx_rtmp_live_play
static ngx_int_t ngx_rtmp_live_play(ngx_rtmp_session_t *s, ngx_rtmp_play_t *v) {ngx_rtmp_live_app_conf_t *lacf;ngx_rtmp_live_ctx_t *ctx;lacf = ngx_rtmp_get_module_app_conf(s, ngx_rtmp_live_module);if (lacf == NULL || !lacf->live) {goto next;}ngx_log_debug4(NGX_LOG_DEBUG_RTMP, s->connection->log, 0,"live: play: name=‘%s‘ start=%uD duration=%uD reset=%d",v->name, (uint32_t) v->start,(uint32_t) v->duration, (uint32_t) v->reset);/* join stream as subscriber */ngx_rtmp_live_join(s, v->name, 0);ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_live_module);if (ctx == NULL) {goto next;}ctx->silent = v->silent;if (!ctx->silent && !lacf->play_restart) {ngx_rtmp_send_status(s, "NetStream.Play.Start","status", "Start live");ngx_rtmp_send_sample_access(s);}next:return next_play(s, v); }2.16 服務器 接收 客戶端(192.168.1.82:xxx): amf_cmd(20) 之 publish(‘test‘)
服務器接收到客戶端的 publish 命令。
2.16.1 ngx_rtmp_cmd_publish_init
static ngx_int_t ngx_rtmp_cmd_publish_init(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h,ngx_chain_t *in) {static ngx_rtmp_publish_t v;static ngx_rtmp_amf_elt_t in_elts[] = {/* transaction is always 0 */{ NGX_RTMP_AMF_NUMBER,ngx_null_string,NULL, 0 },{ NGX_RTMP_AMF_NULL,ngx_null_string,NULL, 0 },{ NGX_RTMP_AMF_STRING,ngx_null_string,&v.name, sizeof(v.name) },{ NGX_RTMP_AMF_OPTIONAL | NGX_RTMP_AMF_STRING,ngx_null_string,&v.type, sizeof(v.type) },};ngx_memzero(&v, sizeof(v));if (ngx_rtmp_receive_amf(s, in, in_elts,sizeof(in_elts) / sizeof(in_elts[0]))){return NGX_ERROR;}ngx_rtmp_cmd_fill_args(v.name, v.args);ngx_log_error(NGX_LOG_INFO, s->connection->log, 0,"publish: name=‘%s‘ args=‘%s‘ type=%s silent=%d",v.name, v.args, v.type, v.silent);return ngx_rtmp_publish(s, &v); }當前客戶端連接的 application 為 live,而該 application{} 下沒有 push,因此這里主要調用 ngx_rtmp_live_publish。
2.16.2 ngx_rtmp_live_publish
static ngx_int_t ngx_rtmp_live_publish(ngx_rtmp_session_t *s, ngx_rtmp_publish_t *v) {ngx_rtmp_live_app_conf_t *lacf;ngx_rtmp_live_ctx_t *ctx;lacf = ngx_rtmp_get_module_app_conf(s, ngx_rtmp_live_module);if (lacf == NULL || !lacf->live) {goto next;}ngx_log_debug2(NGX_LOG_DEBUG_RTMP, s->connection->log, 0,"live: publish: name=‘%s‘ type=‘%s‘",v->name, v->type);/* join stream as publisher */ngx_rtmp_live_join(s, v->name, 1);ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_live_module);if (ctx == NULL || !ctx->publishing) {goto next;}ctx->silent = v->silent;if (!ctx->silent) {/* 發送對 publish 的響應 */ngx_rtmp_send_status(s, "NetStream.Publish.Start","status", "Start publishing");}next:return next_publish(s, v); }2.17 客戶端(192.168.1.82:xxx) 接收 服務器: amf_cmd(20) 之 onStatus
客戶端接收到服務器發送的對 publish 的響應。表示客戶端可以向服務器發布流了。
2.17.1 ngx_rtmp_relay_on_status
static ngx_int_t ngx_rtmp_relay_on_status(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h,ngx_chain_t *in) {ngx_rtmp_relay_ctx_t *ctx;static struct {double trans;u_char level[32];u_char code[128];u_char desc[1024];} v;static ngx_rtmp_amf_elt_t in_inf[] = {{ NGX_RTMP_AMF_STRING,ngx_string("level"),&v.level, sizeof(v.level) },{ NGX_RTMP_AMF_STRING,ngx_string("code"),&v.code, sizeof(v.code) },{ NGX_RTMP_AMF_STRING,ngx_string("description"),&v.desc, sizeof(v.desc) },};static ngx_rtmp_amf_elt_t in_elts[] = {{ NGX_RTMP_AMF_NUMBER,ngx_null_string,&v.trans, 0 },{ NGX_RTMP_AMF_NULL,ngx_null_string,NULL, 0 },{ NGX_RTMP_AMF_OBJECT,ngx_null_string,in_inf, sizeof(in_inf) },};static ngx_rtmp_amf_elt_t in_elts_meta[] = {{ NGX_RTMP_AMF_OBJECT,ngx_null_string,in_inf, sizeof(in_inf) },};ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_relay_module);if (ctx == NULL || !s->relay) {return NGX_OK;}ngx_memzero(&v, sizeof(v));if (h->type == NGX_RTMP_MSG_AMF_META) {ngx_rtmp_receive_amf(s, in, in_elts_meta,sizeof(in_elts_meta) / sizeof(in_elts_meta[0]));} else {ngx_rtmp_receive_amf(s, in, in_elts,sizeof(in_elts) / sizeof(in_elts[0]));}ngx_log_debug3(NGX_LOG_DEBUG_RTMP, s->connection->log, 0,"relay: onStatus: level=‘%s‘ code=‘%s‘ description=‘%s‘",v.level, v.code, v.desc);return NGX_OK; }2.18 服務器 接收 客戶端(obs): audio(8)
服務器接收到客戶端 obs 發送的音頻包。
receive: audio(8) 圖(14)
對于 NGX_RTMP_MSG_AUDIO(8),主要有以下幾個 rtmp 模塊設置了回調函數:
- ngx_rtmp_dash_module
- ngx_rtmp_hls_module
- ngx_rtmp_live_module
- ngx_rtmp_record_module
- ngx_rtmp_codec_module
這里主要調用 codec 和 live 模塊設置的回調函數,首先調用 ngx_rtmp_codec_module 模塊設置的回調函數 ngx_rtmp_codec_av。
2.18.1 ngx_rtmp_codec_av
static ngx_int_t ngx_rtmp_codec_av(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h,ngx_chain_t *in) {ngx_rtmp_core_srv_conf_t *cscf;ngx_rtmp_codec_ctx_t *ctx;ngx_chain_t **header;uint8_t fmt;static ngx_uint_t sample_rates[] ={ 5512, 11025, 22050, 44100 };if (h->type != NGX_RTMP_MSG_AUDIO && h->type != NGX_RTMP_MSG_VIDEO) {return NGX_OK;}ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_codec_module);if (ctx == NULL) {ctx = ngx_pcalloc(s->connection->pool, sizeof(ngx_rtmp_codec_ctx_t));ngx_rtmp_set_ctx(s, ctx, ngx_rtmp_codec_module);}/* save codec */if (in->buf->last - in->buf->pos < 1) {return NGX_OK;}fmt = in->buf->pos[0];if (h->type == NGX_RTMP_MSG_AUDIO) {ctx->audio_codec_id = (fmt & 0xf0) >> 4;ctx->audio_channels = (fmt & 0x01) + 1;ctx->sample_size = (fmt & 0x02) ? 2 : 1;if (ctx->sample_rate == 0) {ctx->sample_rate = sample_rates[(fmt & 0x0c) >> 2];}} else {ctx->video_codec_id = (fmt & 0x0f);}/* save AVC/AAC header */if (in->buf->last - in->buf->pos < 3) {return NGX_OK;}/* no conf */if (!ngx_rtmp_is_codec_header(in)) {return NGX_OK;}cscf = ngx_rtmp_get_module_srv_conf(s, ngx_rtmp_core_module);header = NULL;if (h->type == NGX_RTMP_MSG_AUDIO) {if (ctx->audio_codec_id == NGX_RTMP_AUDIO_AAC) {header = &ctx->aac_header;ngx_rtmp_codec_parse_aac_header(s, in);}} else {if (ctx->video_codec_id == NGX_RTMP_VIDEO_H264) {header = &ctx->avc_header;ngx_rtmp_codec_parse_avc_header(s, in);}}if (header == NULL) {return NGX_OK;}if (*header) {ngx_rtmp_free_shared_chain(cscf, *header);}*header = ngx_rtmp_append_shared_bufs(cscf, NULL, in);return NGX_OK; }2.18.2 ngx_rtmp_codec_parse_aac_header
static void ngx_rtmp_codec_parse_aac_header(ngx_rtmp_session_t *s, ngx_chain_t *in) {ngx_uint_t idx;ngx_rtmp_codec_ctx_t *ctx;ngx_rtmp_bit_reader_t br;static ngx_uint_t aac_sample_rates[] ={ 96000, 88200, 64000, 48000,44100, 32000, 24000, 22050,16000, 12000, 11025, 8000,7350, 0, 0, 0 };#if (NGX_DEBUG)ngx_rtmp_codec_dump_header(s, "aac", in); #endifctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_codec_module);ngx_rtmp_bit_init_reader(&br, in->buf->pos, in->buf->last);/* 讀取 16 bit 的值,這里讀取到的值不做處理,相當于跳過 16 bit */ngx_rtmp_bit_read(&br, 16);/* 讀取 5 bit 的 aac_profile 值 */ctx->aac_profile = (ngx_uint_t) ngx_rtmp_bit_read(&br, 5);if (ctx->aac_profile == 31) {ctx->aac_profile = (ngx_uint_t) ngx_rtmp_bit_read(&br, 6) + 32;}idx = (ngx_uint_t) ngx_rtmp_bit_read(&br, 4);if (idx == 15) {ctx->sample_rate = (ngx_uint_t) ngx_rtmp_bit_read(&br, 24);} else {ctx->sample_rate = aac_sample_rates[idx];}ctx->aac_chan_conf = (ngx_uint_t) ngx_rtmp_bit_read(&br, 4);if (ctx->aac_profile == 5 || ctx->aac_profile == 29) {if (ctx->aac_profile == 29) {ctx->aac_ps = 1;}ctx->aac_sbr = 1;idx = (ngx_uint_t) ngx_rtmp_bit_read(&br, 4);if (idx == 15) {ctx->sample_rate = (ngx_uint_t) ngx_rtmp_bit_read(&br, 24);} else {ctx->sample_rate = aac_sample_rates[idx];}ctx->aac_profile = (ngx_uint_t) ngx_rtmp_bit_read(&br, 5);if (ctx->aac_profile == 31) {ctx->aac_profile = (ngx_uint_t) ngx_rtmp_bit_read(&br, 6) + 32;}}/* MPEG-4 Audio Specific Config5 bits: object typeif (object type == 31)6 bits + 32: object type4 bits: frequency indexif (frequency index == 15)24 bits: frequency4 bits: channel configurationif (object_type == 5)4 bits: frequency indexif (frequency index == 15)24 bits: frequency5 bits: object typeif (object type == 31)6 bits + 32: object typevar bits: AOT Specific Config*/ngx_log_debug3(NGX_LOG_DEBUG_RTMP, s->connection->log, 0,"codec: aac header profile=%ui, ""sample_rate=%ui, chan_conf=%ui",ctx->aac_profile, ctx->sample_rate, ctx->aac_chan_conf); }2.18.3 ngx_rtmp_bit_init_reader
void ngx_rtmp_bit_init_reader(ngx_rtmp_bit_reader_t *br, u_char *pos, u_char *last) {ngx_memzero(br, sizeof(ngx_rtmp_bit_reader_t));br->pos = pos;br->last = last; }該函數初始化一個 bit reader。
2.18.4 ngx_rtmp_bit_read
uint64_t ngx_rtmp_bit_read(ngx_rtmp_bit_reader_t *br, ngx_uint_t n) {uint64_t v;ngx_uint_t d;v = 0;while (n) {/* 若已經讀取到尾部,則置位錯誤標志位 */if (br->pos >= br->last) {br->err = 1;return 0;}/* 控制一次讀取的 bit 數不超過 8 bit */d = (br->offs + n > 8 ? (ngx_uint_t) (8 - br->offs) : n);v <<= d;/* 將讀取到的值追加到 v 中 */v += (*br->pos >> (8 - br->offs - d)) & ((u_char) 0xff >> (8 - d));/* 更新 bit reader 的 偏移值 offs */br->offs += d;n -= d;/* 若偏移值為8,則重置該偏移值 */if (br->offs == 8) {br->pos++;br->offs = 0;}}return v; }2.18.5 ngx_rtmp_live_av
static ngx_int_t ngx_rtmp_live_av(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h,ngx_chain_t *in) {ngx_rtmp_live_ctx_t *ctx, *pctx;ngx_rtmp_codec_ctx_t *codec_ctx;ngx_chain_t *header, *coheader, *meta,*apkt, *aapkt, *acopkt, *rpkt;ngx_rtmp_core_srv_conf_t *cscf;ngx_rtmp_live_app_conf_t *lacf;ngx_rtmp_session_t *ss;ngx_rtmp_header_t ch, lh, clh;ngx_int_t rc, mandatory, dummy_audio;ngx_uint_t prio;ngx_uint_t peers;ngx_uint_t meta_version;ngx_uint_t csidx;uint32_t delta;ngx_rtmp_live_chunk_stream_t *cs; #ifdef NGX_DEBUGconst char *type_s;type_s = (h->type == NGX_RTMP_MSG_VIDEO ? "video" : "audio"); #endiflacf = ngx_rtmp_get_module_app_conf(s, ngx_rtmp_live_module);if (lacf == NULL) {return NGX_ERROR;}if (!lacf->live || in == NULL || in->buf == NULL) {return NGX_OK;}ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_live_module);if (ctx == NULL || ctx->stream == NULL) {return NGX_OK;}if (ctx->publishing == 0) {ngx_log_debug1(NGX_LOG_DEBUG_RTMP, s->connection->log, 0,"live: %s from non-publisher", type_s);return NGX_OK;}/* 若當前流處于未活躍狀態 */if (!ctx->stream->active) {ngx_rtmp_live_start(s);}if (ctx->idle_evt.timer_set) {ngx_add_timer(&ctx->idle_evt, lacf->idle_timeout);}ngx_log_debug2(NGX_LOG_DEBUG_RTMP, s->connection->log, 0,"live: %s packet timestamp=%uD",type_s, h->timestamp);s->current_time = h->timestamp;peers = 0;apkt = NULL;aapkt = NULL;acopkt = NULL;header = NULL;coheader = NULL;meta = NULL;meta_version = 0;mandatory = 0;prio = (h->type == NGX_RTMP_MSG_VIDEO ?ngx_rtmp_get_video_frame_type(in) : 0);cscf = ngx_rtmp_get_module_srv_conf(s, ngx_rtmp_core_module);csidx = !(lacf->interleave || h->type == NGX_RTMP_MSG_VIDEO);cs = &ctx->cs[csidx];ngx_memzero(&ch, sizeof(ch));ch.timestamp = h->timestamp;ch.msid = NGX_RTMP_MSID;ch.csid = cs->csid;ch.type = h->type;lh = ch;if (cs->active) {lh.timestamp = cs->timestamp;}clh = lh;clh.type = (h->type == NGX_RTMP_MSG_AUDIO ? NGX_RTMP_MSG_VIDEO :NGX_RTMP_MSG_AUDIO);cs->active = 1;cs->timestamp = ch.timestamp;delta = ch.timestamp - lh.timestamp; /*if (delta >> 31) {ngx_log_debug2(NGX_LOG_DEBUG_RTMP, s->connection->log, 0,"live: clipping non-monotonical timestamp %uD->%uD",lh.timestamp, ch.timestamp);delta = 0;ch.timestamp = lh.timestamp;} */rpkt = ngx_rtmp_append_shared_bufs(cscf, NULL, in);ngx_rtmp_prepare_message(s, &ch, &lh, rpkt);codec_ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_codec_module);if (codec_ctx) {if (h->type == NGX_RTMP_MSG_AUDIO) {header = codec_ctx->aac_header;if (lacf->interleave) {coheader = codec_ctx->avc_header;}if (codec_ctx->audio_codec_id == NGX_RTMP_AUDIO_AAC &&ngx_rtmp_is_codec_header(in)){prio = 0;mandatory = 1;}} else {header = codec_ctx->avc_header;if (lacf->interleave) {coheader = codec_ctx->aac_header;}if (codec_ctx->video_codec_id == NGX_RTMP_VIDEO_H264 &&ngx_rtmp_is_codec_header(in)){prio = 0;mandatory = 1;}}if (codec_ctx->meta) {meta = codec_ctx->meta;meta_version = codec_ctx->meta_version;}}/* broadcast to all subscribers */for (pctx = ctx->stream->ctx; pctx; pctx = pctx->next) {if (pctx == ctx || pctx->paused) {continue;}ss = pctx->session;cs = &pctx->cs[csidx];/* send metadata */if (meta && meta_version != pctx->meta_version) {ngx_log_debug0(NGX_LOG_DEBUG_RTMP, ss->connection->log, 0,"live: meta");if (ngx_rtmp_send_message(ss, meta, 0) == NGX_OK) {pctx->meta_version = meta_version;}}/* sync stream */if (cs->active && (lacf->sync && cs->dropped > lacf->sync)) {ngx_log_debug2(NGX_LOG_DEBUG_RTMP, ss->connection->log, 0,"live: sync %s dropped=%uD", type_s, cs->dropped);cs->active = 0;cs->dropped = 0;}/* absolute packet */if (!cs->active) {if (mandatory) {ngx_log_debug0(NGX_LOG_DEBUG_RTMP, ss->connection->log, 0,"live: skipping header");continue;}if (lacf->wait_video && h->type == NGX_RTMP_MSG_AUDIO &&!pctx->cs[0].active){ngx_log_debug0(NGX_LOG_DEBUG_RTMP, ss->connection->log, 0,"live: waiting for video");continue;}if (lacf->wait_key && prio != NGX_RTMP_VIDEO_KEY_FRAME &&(lacf->interleave || h->type == NGX_RTMP_MSG_VIDEO)){ngx_log_debug0(NGX_LOG_DEBUG_RTMP, ss->connection->log, 0,"live: skip non-key");continue;}dummy_audio = 0;if (lacf->wait_video && h->type == NGX_RTMP_MSG_VIDEO &&!pctx->cs[1].active){dummy_audio = 1;if (aapkt == NULL) {aapkt = ngx_rtmp_alloc_shared_buf(cscf);ngx_rtmp_prepare_message(s, &clh, NULL, aapkt);}}if (header || coheader) {/* send absolute codec header */ngx_log_debug2(NGX_LOG_DEBUG_RTMP, ss->connection->log, 0,"live: abs %s header timestamp=%uD",type_s, lh.timestamp);if (header) {if (apkt == NULL) {apkt = ngx_rtmp_append_shared_bufs(cscf, NULL, header);ngx_rtmp_prepare_message(s, &lh, NULL, apkt);}rc = ngx_rtmp_send_message(ss, apkt, 0);if (rc != NGX_OK) {continue;}}if (coheader) {if (acopkt == NULL) {acopkt = ngx_rtmp_append_shared_bufs(cscf, NULL, coheader);ngx_rtmp_prepare_message(s, &clh, NULL, acopkt);}rc = ngx_rtmp_send_message(ss, acopkt, 0);if (rc != NGX_OK) {continue;}} else if (dummy_audio) {ngx_rtmp_send_message(ss, aapkt, 0);}cs->timestamp = lh.timestamp;cs->active = 1;ss->current_time = cs->timestamp;} else {/* send absolute packet */ngx_log_debug2(NGX_LOG_DEBUG_RTMP, ss->connection->log, 0,"live: abs %s packet timestamp=%uD",type_s, ch.timestamp);if (apkt == NULL) {apkt = ngx_rtmp_append_shared_bufs(cscf, NULL, in);ngx_rtmp_prepare_message(s, &ch, NULL, apkt);}rc = ngx_rtmp_send_message(ss, apkt, prio);if (rc != NGX_OK) {continue;}cs->timestamp = ch.timestamp;cs->active = 1;ss->current_time = cs->timestamp;++peers;if (dummy_audio) {ngx_rtmp_send_message(ss, aapkt, 0);}continue;}}/* send relative packet */ngx_log_debug2(NGX_LOG_DEBUG_RTMP, ss->connection->log, 0,"live: rel %s packet delta=%uD",type_s, delta);if (ngx_rtmp_send_message(ss, rpkt, prio) != NGX_OK) {++pctx->ndropped;cs->dropped += delta;if (mandatory) {ngx_log_debug0(NGX_LOG_DEBUG_RTMP, ss->connection->log, 0,"live: mandatory packet failed");ngx_rtmp_finalize_session(ss);}continue;}cs->timestamp += delta;++peers;ss->current_time = cs->timestamp;}if (rpkt) {ngx_rtmp_free_shared_chain(cscf, rpkt);}if (apkt) {ngx_rtmp_free_shared_chain(cscf, apkt);}if (aapkt) {ngx_rtmp_free_shared_chain(cscf, aapkt);}if (acopkt) {ngx_rtmp_free_shared_chain(cscf, acopkt);}ngx_rtmp_update_bandwidth(&ctx->stream->bw_in, h->mlen);ngx_rtmp_update_bandwidth(&ctx->stream->bw_out, h->mlen * peers);ngx_rtmp_update_bandwidth(h->type == NGX_RTMP_MSG_AUDIO ?&ctx->stream->bw_in_audio :&ctx->stream->bw_in_video,h->mlen);return NGX_OK; }該函數的主要是將接收到來自客戶端 obs 發送來的 音視頻 數據轉發給該流的訂購者,即 application live。
2.18.6 ngx_rtmp_live_start
static void ngx_rtmp_live_start(ngx_rtmp_session_t *s) {ngx_rtmp_core_srv_conf_t *cscf;ngx_rtmp_live_app_conf_t *lacf;ngx_chain_t *control;ngx_chain_t *status[3];size_t n, nstatus;cscf = ngx_rtmp_get_module_srv_conf(s, ngx_rtmp_core_module);lacf = ngx_rtmp_get_module_app_conf(s, ngx_rtmp_live_module);/* 構建好 stream_begin rtmp 包 */control = ngx_rtmp_create_stream_begin(s, NGX_RTMP_MSID);nstatus = 0;if (lacf->play_restart) {status[nstatus++] = ngx_rtmp_create_status(s, "NetStream.Play.Start","status", "Start live");status[nstatus++] = ngx_rtmp_create_sample_access(s);}if (lacf->publish_notify) {status[nstatus++] = ngx_rtmp_create_status(s,"NetStream.Play.PublishNotify","status", "Start publishing");}ngx_rtmp_live_set_status(s, control, status, nstatus, 1);if (control) {ngx_rtmp_free_shared_chain(cscf, control);}for (n = 0; n < nstatus; ++n) {ngx_rtmp_free_shared_chain(cscf, status[n]);} }2.19 服務器 接收 客戶端(obs): video(9)
receive: video(9) 圖(15)
2.19.1 ngx_rtmp_codec_av
static ngx_int_t ngx_rtmp_codec_av(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h,ngx_chain_t *in) {ngx_rtmp_core_srv_conf_t *cscf;ngx_rtmp_codec_ctx_t *ctx;ngx_chain_t **header;uint8_t fmt;static ngx_uint_t sample_rates[] ={ 5512, 11025, 22050, 44100 };if (h->type != NGX_RTMP_MSG_AUDIO && h->type != NGX_RTMP_MSG_VIDEO) {return NGX_OK;}ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_codec_module);if (ctx == NULL) {ctx = ngx_pcalloc(s->connection->pool, sizeof(ngx_rtmp_codec_ctx_t));ngx_rtmp_set_ctx(s, ctx, ngx_rtmp_codec_module);}/* save codec */if (in->buf->last - in->buf->pos < 1) {return NGX_OK;}fmt = in->buf->pos[0];if (h->type == NGX_RTMP_MSG_AUDIO) {ctx->audio_codec_id = (fmt & 0xf0) >> 4;ctx->audio_channels = (fmt & 0x01) + 1;ctx->sample_size = (fmt & 0x02) ? 2 : 1;if (ctx->sample_rate == 0) {ctx->sample_rate = sample_rates[(fmt & 0x0c) >> 2];}} else {ctx->video_codec_id = (fmt & 0x0f);}/* save AVC/AAC header */if (in->buf->last - in->buf->pos < 3) {return NGX_OK;}/* no conf */if (!ngx_rtmp_is_codec_header(in)) {return NGX_OK;}cscf = ngx_rtmp_get_module_srv_conf(s, ngx_rtmp_core_module);header = NULL;if (h->type == NGX_RTMP_MSG_AUDIO) {if (ctx->audio_codec_id == NGX_RTMP_AUDIO_AAC) {header = &ctx->aac_header;ngx_rtmp_codec_parse_aac_header(s, in);}} else {if (ctx->video_codec_id == NGX_RTMP_VIDEO_H264) {header = &ctx->avc_header;ngx_rtmp_codec_parse_avc_header(s, in);}}if (header == NULL) {return NGX_OK;}if (*header) {ngx_rtmp_free_shared_chain(cscf, *header);}*header = ngx_rtmp_append_shared_bufs(cscf, NULL, in);return NGX_OK; }2.19.2 ngx_rtmp_codec_parse_avc_header
static void ngx_rtmp_codec_parse_avc_header(ngx_rtmp_session_t *s, ngx_chain_t *in) {ngx_uint_t profile_idc, width, height, crop_left, crop_right,crop_top, crop_bottom, frame_mbs_only, n, cf_idc,num_ref_frames;ngx_rtmp_codec_ctx_t *ctx;ngx_rtmp_bit_reader_t br;#if (NGX_DEBUG)ngx_rtmp_codec_dump_header(s, "avc", in); #endifctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_codec_module);ngx_rtmp_bit_init_reader(&br, in->buf->pos, in->buf->last);ngx_rtmp_bit_read(&br, 48);ctx->avc_profile = (ngx_uint_t) ngx_rtmp_bit_read_8(&br);ctx->avc_compat = (ngx_uint_t) ngx_rtmp_bit_read_8(&br);ctx->avc_level = (ngx_uint_t) ngx_rtmp_bit_read_8(&br);/* nal bytes */ctx->avc_nal_bytes = (ngx_uint_t) ((ngx_rtmp_bit_read_8(&br) & 0x03) + 1);/* nnals */if ((ngx_rtmp_bit_read_8(&br) & 0x1f) == 0) {return;}/* nal size */ngx_rtmp_bit_read(&br, 16);/* nal type */if (ngx_rtmp_bit_read_8(&br) != 0x67) {return;}/* SPS *//* profile idc */profile_idc = (ngx_uint_t) ngx_rtmp_bit_read(&br, 8);/* flags */ngx_rtmp_bit_read(&br, 8);/* level idc */ngx_rtmp_bit_read(&br, 8);/* SPS id */ngx_rtmp_bit_read_golomb(&br);if (profile_idc == 100 || profile_idc == 110 ||profile_idc == 122 || profile_idc == 244 || profile_idc == 44 ||profile_idc == 83 || profile_idc == 86 || profile_idc == 118){/* chroma format idc */cf_idc = (ngx_uint_t) ngx_rtmp_bit_read_golomb(&br);if (cf_idc == 3) {/* separate color plane */ngx_rtmp_bit_read(&br, 1);}/* bit depth luma - 8 */ngx_rtmp_bit_read_golomb(&br);/* bit depth chroma - 8 */ngx_rtmp_bit_read_golomb(&br);/* qpprime y zero transform bypass */ngx_rtmp_bit_read(&br, 1);/* seq scaling matrix present */if (ngx_rtmp_bit_read(&br, 1)) {for (n = 0; n < (cf_idc != 3 ? 8u : 12u); n++) {/* seq scaling list present */if (ngx_rtmp_bit_read(&br, 1)) {/* TODO: scaling_list()if (n < 6) {} else {}*/}}}}/* log2 max frame num */ngx_rtmp_bit_read_golomb(&br);/* pic order cnt type */switch (ngx_rtmp_bit_read_golomb(&br)) {case 0:/* max pic order cnt */ngx_rtmp_bit_read_golomb(&br);break;case 1:/* delta pic order alwys zero */ngx_rtmp_bit_read(&br, 1);/* offset for non-ref pic */ngx_rtmp_bit_read_golomb(&br);/* offset for top to bottom field */ngx_rtmp_bit_read_golomb(&br);/* num ref frames in pic order */num_ref_frames = (ngx_uint_t) ngx_rtmp_bit_read_golomb(&br);for (n = 0; n < num_ref_frames; n++) {/* offset for ref frame */ngx_rtmp_bit_read_golomb(&br);}}/* num ref frames */ctx->avc_ref_frames = (ngx_uint_t) ngx_rtmp_bit_read_golomb(&br);/* gaps in frame num allowed */ngx_rtmp_bit_read(&br, 1);/* pic width in mbs - 1 */width = (ngx_uint_t) ngx_rtmp_bit_read_golomb(&br);/* pic height in map units - 1 */height = (ngx_uint_t) ngx_rtmp_bit_read_golomb(&br);/* frame mbs only flag */frame_mbs_only = (ngx_uint_t) ngx_rtmp_bit_read(&br, 1);if (!frame_mbs_only) {/* mbs adaprive frame field */ngx_rtmp_bit_read(&br, 1);}/* direct 8x8 inference flag */ngx_rtmp_bit_read(&br, 1);/* frame cropping */if (ngx_rtmp_bit_read(&br, 1)) {crop_left = (ngx_uint_t) ngx_rtmp_bit_read_golomb(&br);crop_right = (ngx_uint_t) ngx_rtmp_bit_read_golomb(&br);crop_top = (ngx_uint_t) ngx_rtmp_bit_read_golomb(&br);crop_bottom = (ngx_uint_t) ngx_rtmp_bit_read_golomb(&br);} else {crop_left = 0;crop_right = 0;crop_top = 0;crop_bottom = 0;}ctx->width = (width + 1) * 16 - (crop_left + crop_right) * 2;ctx->height = (2 - frame_mbs_only) * (height + 1) * 16 -(crop_top + crop_bottom) * 2;ngx_log_debug7(NGX_LOG_DEBUG_RTMP, s->connection->log, 0,"codec: avc header ""profile=%ui, compat=%ui, level=%ui, ""nal_bytes=%ui, ref_frames=%ui, width=%ui, height=%ui",ctx->avc_profile, ctx->avc_compat, ctx->avc_level,ctx->avc_nal_bytes, ctx->avc_ref_frames,ctx->width, ctx->height); }Nginx-rtmp直播之業務流程分析
標簽:應用???free???5.4???9.png???exist???綜述???int???core???ali???
原文地址:https://www.cnblogs.com/jimodetiantang/p/8994061.html
總結
以上是生活随笔為你收集整理的Nginx-rtmp直播之业务流程分析--比较详细的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: EPOLL事件之EPOLLRDHUP
- 下一篇: htop 命令详解