Nginx訪問上游服務器的流程大致分以下幾個階段:啟動upstream、連接上游服務器、向上游發送請求、接收上游響應(包頭/包體)、結束請求。本篇主要從代碼流程的角度,梳理一下upstream的整個的數據的處理流程。下面先看一下upstream相關的兩個重要數據結構ngx_http_upstream_t和ngx_http_upstream_conf_t:
相關數據結構
typedef struct ngx_http_upstream_s ? ?ngx_http_upstream_t;struct?ngx_http_upstream_s {ngx_http_upstream_handler_pt read_event_handler; ? ?// 處理讀事件的回調方法ngx_http_upstream_handler_pt write_event_handler; ??// 處理寫事件的回調方法ngx_peer_connection_t peer; ? ? ? ? ? ? ? ? ? ? ? ??// 主動向上游發起的連接,稍后會詳細分析ngx_event_pipe_t?*pipe; ? ? ? ? ? ? ? ? ? ? ? ? ? ??// 當開啟緩存配置,會用pipe來轉發響應,需要http模塊在使用upstream機制前構造pipe結構體ngx_chain_t?*request_bufs; ? ? ? ? ? ? ? ? ? ? ? ? ?//?用鏈表將ngx_buf_t緩沖區鏈接起來,表示所有需要發送到上游的請求內容,//?create_request回調在于構造request_buf鏈表ngx_output_chain_ctx_t output; ? ? ? ? ? ? ? ? ? ? ?// 向下游發送響應的方式,稍后會詳細分析ngx_chain_writer_ctx_t writer; ? ? ? ? ? ? ? ? ? ? ?// 向下游發送響應的方式,稍后會詳細分析ngx_http_upstream_conf_t?*conf; ? ? ? ? ? ? ? ? ? ??// upstream相關的配置信息
#if?(NGX_HTTP_CACHE)ngx_array_t?*caches; ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?// 緩存數組,稍后會單獨介紹緩存相關內容
#endifngx_http_upstream_headers_in_t headers_in; ? ? ? ? ?// 當直接轉發時,process_header將解析的頭部適配為http頭部,同時將包頭信息放在headers_in中ngx_http_upstream_resolved_t?*resolved; ? ? ? ? ? ??// 用于解析主機域名,后面會詳細介紹ngx_buf_t from_client; ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?// ToDo....ngx_buf_t buffer; ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ??// 接收上游服務器響應包頭的緩存區,當不需要直接響應或buffering為0時,也作為轉發包體緩沖區??off_t?length; ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ??// 來自上游服務器的響應包體的長度ngx_chain_t?*out_bufs; ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?// 使用時再具體介紹,不同場景下有不同意義ngx_chain_t?*busy_bufs; ? ? ? ? ? ? ? ? ? ? ? ? ? ??// 當buffering為0時,表示上一次向下游轉發響應時沒有發送完成的內容ngx_chain_t?*free_bufs; ? ? ? ? ? ? ? ? ? ? ? ? ? ??// 當buffering為0時,用于回收out_bufs中已經發送給下游的ngx_buf_t結構體ngx_int_t?(*input_filter_init)(void?*data); ? ? ? ? // 處理包體前的初始化方法,其中data用于傳遞用戶數據結構,即下方的input_filter_ctxngx_int_t?(*input_filter)(void?*data,?ssize_t bytes)// 處理包體的方法,bytes表示本次接收到的包體長度,data同上void?*input_filter_ctx; ? ? ? ? ? ? ? ? ? ? ? ? ? ? // 傳遞http模塊的自定義的數據結構#if?(NGX_HTTP_CACHE)ngx_int_t?(*create_key)(ngx_http_request_t?*r);? ? ?// cache部分,后面再分析
#endifngx_int_t?(*create_request)(ngx_http_request_t?*r); // 用于構造發往上游服務器的請求ngx_int_t?(*reinit_request)(ngx_http_request_t?*r); // 與上游通訊失敗,需要重新發起連接時,用該方法重新初始化請求信息ngx_int_t?(*process_header)(ngx_http_request_t?*r); // 解析上游服務器返回響應的包頭,NGX_AGAIN接收不完整,NGX_OK解析到完整包頭void?(*abort_request)(ngx_http_request_t?*r); ? ? ? // 暫時沒有用到void?(*finalize_request)(ngx_http_request_t?*r, ? ? // 請求結束時會調用,目前沒有實際作用ngx_int_t rc);ngx_int_t?(*rewrite_redirect)(ngx_http_request_t?*r,// 上游返回響應中含Location或Refresh時,process_header會調用http模塊實現的該方法ngx_table_elt_t?*h,?size_t prefix);ngx_int_t?(*rewrite_cookie)(ngx_http_request_t?*r, ?// 同上,當響應中含Set-Cookie時,會調用http模塊實現的該方法ngx_table_elt_t?*h);ngx_msec_t timeout;????????????? ? ????????? ? ? ? ?// 暫時沒有用到ngx_http_upstream_state_t?*state; ? ? ? ? ? ? ? ? ??// 用于表示上游響應的錯誤碼、包體長度等信息ngx_str_t method; ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ??// 用于文件緩存,稍后再進行分析ngx_str_t schema; ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ??// 記錄日志時使用ngx_str_t uri; ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?// 記錄日志時使用ngx_http_cleanup_pt?*cleanup; ? ? ? ? ? ? ? ? ? ? ??// 用于標識是否需要清理資源,相當于一個標志位,實際不會調用該方法unsigned store:1; ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ??// 是否指定文件緩存路徑的標志位unsigned cacheable:1; ? ? ? ? ? ? ? ? ? ? ? ? ? ? ??// 是否啟用文件緩存unsigned accel:1; ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ??// 目前沒有用到unsigned ssl:1; ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ??// 是否基于SSL協議訪問上游服務器unsigned buffering:1; ? ? ? ? ? ? ? ? ? ? ? ? ? ? ??// 向下游轉發響應包體時,是否開啟更大內存及臨時磁盤文件用于緩存來不及發送到下游的響應包體unsigned keepalive:1; ? ? ? ? ? ? ? ? ? ? ? ? ? ? ??// 標識與后端是否開啟keepalive ?unsigned upgrade:1; ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ??// 是否存在upgrade headerunsigned request_sent:1; ? ? ? ? ? ? ? ? ? ? ? ? ? ?// 是否向上游服務器發送了請求unsigned header_sent:1; ? ? ? ? ? ? ? ? ? ? ? ? ? ??// 為1時,表示包頭已經轉發給客戶端了
}
ngx_http_upstream_conf_t:指定了upstream的運行方式,必須在啟動upstream之前設置
點擊(此處)折疊或打開
typedef?struct?{ngx_http_upstream_srv_conf_t?*upstream; // 當上面沒有實現resolved成員時,用該結構體定義上游服務器的配置ngx_msec_t connect_timeout; ? ? ? // 建立tcp連接的超時時間,即寫事件添加到定時器中設置的超時時間ngx_msec_t send_timeout; ? ? ? ? ?// 發送請求的超時時間,即寫事件添加到定時器中設置的超時時間ngx_msec_t read_timeout; ? ? ? ? ?// 接收響應的超時時間,即讀事件添加到定時器中設置的超時時間ngx_msec_t timeout; ? ? ? ? ? ? ??// 暫時沒有使用ngx_msec_t next_upstream_timeout; //?size_t send_lowat; ? ? ? ? ? ? ? ?// 發送緩存區的下限,即TCP的SO_SNOLOWAT選項size_t buffer_size; ? ? ? ? ? ? ??//?指定接收頭部緩沖區分配的內存大小,當buffering為0時,由于上述buffer同時用于接收包體,也表示接收包體緩沖區大小size_t limit_rate; ? ? ? ? ? ? ? ?//?size_t busy_buffers_size; ? ? ? ??// 當buffering為1,且向下游轉發響應時生效,會設置到ngx_event_pipe_t結構體的busy_size中size_t max_temp_file_size; ? ? ? ?// 指定臨時文件的大小,限制ngx_event_pipe_t中的temp_filesize_t temp_file_write_size; ? ? ?// 將緩沖區的響應寫入臨時文件時,一次寫入字符流的最大長度......ngx_bufs_t bufs; ? ? ? ? ? ? ? ? ?// 以緩存響應的方式轉發上游服務器的包體時所使用的內存大小ngx_uint_t ignore_headers; ? ? ? ?// 以位圖的形式標識在轉發時需要忽略的headersngx_uint_t next_upstream; ? ? ? ? // 以位圖的方式表示一些錯誤碼,當處理上游響應時發現該錯誤碼,選擇下一個上游服務器重發請求ngx_uint_t store_access; ? ? ? ? ?// 表示創建的臨時目錄和文件的權限ngx_uint_t next_upstream_tries; ? //?ngx_flag_t buffering; ? ? ? ? ? ? //?為1時表示打開緩存,盡量在內存和磁盤中緩存來自上游的響應,為0時則開辟固定大小內存塊作為緩存來轉發響應......ngx_flag_t ignore_client_abort; ? // 為1時,表示與上游服務器交互時不檢查nginx與下游服務器是否斷開,即使下游主動關閉連接,也不會中斷與上游交互ngx_flag_t intercept_errors; ? ? ?// 詳見ngx_http_upstream_intercept_errorsngx_flag_t cyclic_temp_file; ? ? ?// 為1時,會嘗試復用臨時文件中已經使用過的空間......ngx_path_t?*temp_path; ? ? ? ? ? ?//?buffering為1的情況下轉發響應時,存放臨時文件的路徑ngx_hash_t hide_headers_hash; ? ? // 不轉發的頭部,根據hide_headers和pass_headers動態數組構造出的需要隱藏的http頭部散列表ngx_array_t?*hide_headers; ? ? ? ?// 當轉發上游頭部給下游時,如果不希望將某些頭部轉發給下游,則設置到該數組中ngx_array_t?*pass_headers; ? ? ? ?// 轉發頭部時upstream機制默認不會轉發某些頭部,當確定需要轉發時,需要設置到該數組中ngx_http_upstream_local_t?*local; // 連接上游服務器時,需要使用的本機地址ngx_array_t?*store_lengths; ? ? ??// 當需要將上游響應緩存到文件中時,表示存放路徑的長度ngx_array_t?*store_values; ? ? ? ?//?當需要將上游響應緩存到文件中時,表示存放路徑......signed store:2; ? ? ? ? ? ? ? ? ??// 同ngx_http_upstream_t中的storeunsigned intercept_404:1; ? ? ? ??// 如果該值設為1,當上游返回404時直接轉發該錯誤碼給下游,而不會去與error_page進行比較unsigned change_buffering:1; ? ? ?// 當為1時,根據上游服務器返回的響應頭部,動態決定是以上游網速優先,還是下游網速優先......ngx_str_t module; ? ? ? ? ? ? ? ??// 使用upstream的模塊名稱,僅用于記錄日志
} ngx_http_upstream_conf_t
啟動upstream
當收到請求后,http的代理模塊是ngx_http_proxy_module,其NGX_HTTP_CONTENT_PHASE階段的處理函數為ngx_http_proxy_handler
點擊(此處)折疊或打開
static ngx_int_t
ngx_http_proxy_handler(ngx_http_request_t?*r)
{// 創建ngx_http_upstream_t結構,并賦值給r->upstreamif?(ngx_http_upstream_create(r)?!=?NGX_OK)?{return NGX_HTTP_INTERNAL_SERVER_ERROR;}.....plcf?=?ngx_http_get_module_loc_conf(r,?ngx_http_proxy_module);.....u?=?r->upstream;.....// 給upstream的conf成員賦值,記錄相關的配置信息u->conf?=?&plcf->upstream;// 設置相關的回調信息u->create_request?=?ngx_http_proxy_create_request;u->reinit_request?=?ngx_http_proxy_reinit_request;u->process_header?=?ngx_http_proxy_process_status_line;u->abort_request?=?ngx_http_proxy_abort_request;u->finalize_request?=?ngx_http_proxy_finalize_request;......u->buffering?=?plcf->upstream.buffering;.....// 調用ngx_http_upstream_init函數rc?=?ngx_http_read_client_request_body(r,?ngx_http_upstream_init);.....return NGX_DONE;
}
首先創建upstream的結構并進行設置,然后設置ngx_http_upstream_conf_t配置結構體給upstream->conf。ngx_http_upstream_init函數會根據
ngx_http_upstream_conf_t配置的信息初始化upstream,同時開始連接上游服務器,由此展開整個upstream的處理流程。
點擊(此處)折疊或打開
void?ngx_http_upstream_init(ngx_http_request_t?*r)
{ngx_connection_t?*c;// 客戶端的連接c?=?r->connection;......// 當啟用upstream時,需要將客戶端對應的讀事件從定時器中刪除,此時主要關注上游的連接相關的事件if?(c->read->timer_set)?{ngx_del_timer(c->read);}......ngx_http_upstream_init_request(r);
}
繼續看ngx_http_upstream_init_request函數
點擊(此處)折疊或打開
static void?ngx_http_upstream_init_request(ngx_http_request_t?*r)
{u?=?r->upstream;u->store?=?u->conf->store;......// 設置Nginx與下游客戶端之間TCP連接的檢查方法,注意幾個條件,ignore來自之前配置屬性,是否忽略客戶端的連接狀態if?(!u->store &&?!r->post_action &&?!u->conf->ignore_client_abort)?{r->read_event_handler?=?ngx_http_upstream_rd_check_broken_connection;r->write_event_handler?=?ngx_http_upstream_wr_check_broken_connection;}......// 調用http模塊實現的create_request方法,即前面注冊的ngx_http_proxy_create_request函數,用于構造發到上游服務器的請求if?(u->create_request(r)?!=?NGX_OK)?{ngx_http_finalize_request(r,?NGX_HTTP_INTERNAL_SERVER_ERROR);return;}......// 向當前請求的main成員指向的原始請求中的cleanup鏈表末尾添加一個新成員cln?=?ngx_http_cleanup_add(r,?0);// 將handler的回調方法設置為ngx_http_upstream_cleanupcln->handler?=?ngx_http_upstream_cleanup;cln->data?=?r;u->cleanup?=?&cln->handler;......// 調用ngx_http_upstream_connect向上游服務器發起連接ngx_http_upstream_connect(r,?u);
}
與上游服務器建立連接
upstream機制與上游服務器之間通過tcp建立連接,為了保證三次握手的過程中不阻塞進程,Nginx采用了無阻塞的套接字來連接上游服務器。
ngx_http_upstream_connect負責發起建連動作,如果沒有立即返回成功,需要在epoll中監控該套接字,當出現可寫事件時,則說明連接已經建立成功。
點擊(此處)折疊或打開
static void?ngx_http_upstream_connect(ngx_http_request_t?*r,?ngx_http_upstream_t?*u)
{// 建連的動作主要由下面函數進行.....rc?=?ngx_event_connect_peer(&u->peer);....
點擊(此處)折疊或打開
ngx_int_t?ngx_event_connect_peer(ngx_peer_connection_t?*pc)
{// 創建tcp socket套接字s?=?ngx_socket(pc->sockaddr->sa_family,?SOCK_STREAM,?0);......// 獲取空閑的ngx_connection_t結構來承載連接,從ngx_cycle_t的free_connections指向的空閑連接池中獲取c?=?ngx_get_connection(s,?pc->log);......// 設置連接為非阻塞的模式if?(ngx_nonblocking(s)?==?-1)?{......// 綁定地址和端口if?(pc->local)?{if?(bind(s,?pc->local->sockaddr,?pc->local->socklen)?==?-1)?{......// 設置連接收發相關的回調函數c->recv?=?ngx_recv;c->send?=?ngx_send;c->recv_chain?=?ngx_recv_chain;c->send_chain?=?ngx_send_chain;// 啟用sendfile的支持c->sendfile?=?1;......rev = c->read;wev = c->write;......pc->connection = c;// 調用ngx_event_actions.add_conn將tcp套接字以期待可讀、可寫的方式添加到事件搜集器中,這里是把套接字加到epoll中if (ngx_add_conn) {if (ngx_add_conn(c) == NGX_ERROR) {goto failed;}}// 向上游服務器發起連接,由于非阻塞,調用會立即返回rc = connect(s, pc->sockaddr, pc->socklen);......
回到ngx_http_upstream_connect繼續分析
點擊(此處)折疊或打開
static void?ngx_http_upstream_connect(ngx_http_request_t?*r,?ngx_http_upstream_t?*u)
{......// 上面已經分析了,該函數主要進行上游服務器的連接rc?=?ngx_event_connect_peer(&u->peer);......c?=?u->peer.connection;c->data?=?r;// 將上游connection上讀寫事件的回調,都設置為ngx_http_upstream_handlerc->write->handler?=?ngx_http_upstream_handler;c->read->handler?=?ngx_http_upstream_handler;// 設置upstream機制的write_event_handler和read_event_handler,具體使用見后續的ngx_upstream_handler函數//?ngx_http_upstream_send_request_handler用于向上游發送請求u->write_event_handler?=?ngx_http_upstream_send_request_handler;//?ngx_http_upstream_process_header接收和解析上游服務器的響應u->read_event_handler?=?ngx_http_upstream_process_header;......if?(rc?==?NGX_AGAIN)?{// 當連接沒有建立成功時,套接字已經在epoll中了,將寫事件添加到定時器中,超時時間是ngx_http_upstream_conf_t中的connect_timeout成員ngx_add_timer(c->write,?u->conf->connect_timeout);return;}......// 當成功建立連接時,向上游服務器發送請求,注意:此處的函數與上面設置的定時器回調的函數有所不同,下文會進行說明ngx_http_upstream_send_request(r,?u);
}
下面先簡單看一下connection的讀寫回調函數——ngx_http_upstream_handler
點擊(此處)折疊或打開
static void
ngx_http_upstream_handler(ngx_event_t?*ev)
{......// 由事件的data成員取得ngx_connection_t連接,該連接是nginx與上游服務器之間的連接c?=?ev->data;// 由連接的data取得ngx_http_request_t結構體r?=?c->data;// 由請求的upstream成員取的表示upstream機制的ngx_http_upstream_t結構體u?=?r->upstream;// 此處ngx_http_request_t結構中的connection成員代表的是客戶端與nginx之間連接c?=?r->connection;......if?(ev->write)?{// nginx與上游服務器間的tcp連接的可寫事件被觸發時,該方法被調用u->write_event_handler(r,?u);}?else?{//?nginx與上游服務器間的tcp連接的可讀事件被觸發時,該方法被調用u->read_event_handler(r,?u);}// 與nginx_http_request_handler相同,最后一步執行post請求ngx_http_run_posted_requests(c);
}
發送請求到上游服務器
前面在介紹ngx_http_upstream_connect函數時,我們看到將ngx_http_upstream_t中的write_event_handler設置為了ngx_http_upstream_send_request_handler,而ngx_http_upstream_connect的最后直接調用了ngx_http_upstream_send_request發送請求。
下面先來看一下兩者的區別
點擊(此處)折疊或打開
static void?ngx_http_upstream_send_request_handler(ngx_http_request_t?*r,?ngx_http_upstream_t?*u)
{ngx_connection_t?*c;// 獲取與上游服務器間表示連接的ngx_connection_t結構體c?=?u->peer.connection;// 當寫事件的timeout被設置為1時,則代表向上游發送http請求已經超時if?(c->write->timedout)?{// 將超時錯誤傳給next方法,next方法根據允許的重傳策略決定:重新發起連接執行upstream請求,還是結束upstream請求ngx_http_upstream_next(r,?u,?NGX_HTTP_UPSTREAM_FT_TIMEOUT);return;}......// header_sent為1時,表示上游服務器的響應需要直接轉發給客戶端,而且此時響應包頭已經轉給客戶端了if?(u->header_sent)?{// 由于此時已經收到了上游服務器的完整包頭,此時不需要再向上游發送請求,因此將write回調設置為空函數(只記錄日志)u->write_event_handler?=?ngx_http_upstream_dummy_handler;// 將寫事件添加到epoll中(void)?ngx_handle_write_event(c->write,?0);return;}// 調用下面函數向上游發送http請求ngx_http_upstream_send_request(r,?u);
}
通過上面的分析,現在很容易看出兩者的區別,ngx_http_upstream_send_request_handler更多的是在檢測請求的狀態,而實際的發送函數是
ngx_http_upstream_send_request,下面繼續看一下該函數。
點擊(此處)折疊或打開
static void?ngx_http_upstream_send_request(ngx_http_request_t?*r,?ngx_http_upstream_t?*u)
{......// 發送u->request_bufs鏈表上的請求內容,該函數會把未一次發送完的鏈表緩沖區保存下來,再次調用時不需要request_bufs參數rc?=?ngx_output_chain(&u->output,?u->request_sent???NULL?:?u->request_bufs);// 標識已經向上游發送了請求,實際上是為了標識是否調用過ngx_output_chain,除了第一次,其他時候不需要再傳送request_bufs,直接設置為NULLu->request_sent?=?1;......// 當寫事件仍在定時器中時,先將寫事件從定時器中移出,由ngx_output_chain的返回值決定是否需要向定時器中增加寫事件if?(c->write->timer_set)?{ngx_del_timer(c->write);}// 當ngx_output_chain返回NGX_AGAIN時,說明請求還沒有發完,此時需要設置寫事件定時器if?(rc?==?NGX_AGAIN)?{ngx_add_timer(c->write,?u->conf->send_timeout);// 將寫事件添加到epoll中if?(ngx_handle_write_event(c->write,?u->conf->send_lowat)?!=?NGX_OK)?{ngx_http_upstream_finalize_request(r,?u,NGX_HTTP_INTERNAL_SERVER_ERROR);return;}// 結束ngx_http_upstream_send_request的執行,等待epoll事件觸發return;}/*?rc?==?NGX_OK?*/// 當ngx_output_chain返回NGX_OK時,表示向上游服務器發送完了所有的請求,將寫事件的回調設置為空函數......u->write_event_handler?=?ngx_http_upstream_dummy_handler;// 重新添加到epoll中if?(ngx_handle_write_event(c->write,?0)?!=?NGX_OK)?{ngx_http_upstream_finalize_request(r,?u,NGX_HTTP_INTERNAL_SERVER_ERROR);return;}// 發送完請求后,需要開始讀上游返回的響應,設置讀事件的超時時間ngx_add_timer(c->read,?u->conf->read_timeout);// 當ready已經設置時,說明應答已經到位,調用process_header開始處理來自上游的響應if?(c->read->ready)?{ngx_http_upstream_process_header(r,?u);return;}
}
接收上游服務器的響應
Nginx的upstream機制支持三種響應包體的處理方式:不轉發響應、轉發響應時以下游網速優先、轉發響應時以上游網速優先。當ngx_http_request_t結構體的
subrequest_in_memory標志位為1時,即不轉發響應;當subrequest_in_memory為0時,則轉發響應;而ngx_http_upstream_conf_t配置結構中的buffering
為0時,則以下游網速優先,即使用固定大小的內存作為緩存;當buffering為1時,則以上游網速優先,即采用更多的內存、硬盤文件作為緩存。
下面看一下用于接收、解析響應頭部的ngx_http_upstream_process_header方法
點擊(此處)折疊或打開
static void
ngx_http_upstream_process_header(ngx_http_request_t?*r,?ngx_http_upstream_t?*u)
{......// 獲取到上游服務器的連接信息c?=?u->peer.connection;......// 檢查是否發生了讀事件超時,如果發生了超時,則調用ngx_http_upstream_next函數決定下一步動作if?(c->read->timedout)?{ngx_http_upstream_next(r,?u,?NGX_HTTP_UPSTREAM_FT_TIMEOUT);return;}// request_sent為1則代表已經向上游發過請求;為0則代表還沒有發送請求,沒有發送請求卻收到上游的響應時,則不符合邏輯,進行下一步動作// ngx_http_upstream_next會根據配置信息決定是否直接結束請求,還是尋找下一個上游服務器if?(!u->request_sent && ngx_http_upstream_test_connect(c)?!=?NGX_OK)?{ngx_http_upstream_next(r,?u,?NGX_HTTP_UPSTREAM_FT_ERROR);return;}// 檢查用于接收上游響應的buffer,當start為NULL時,代表該緩沖區尚未進行分配,此時會按照配置指定的buffer_size進行緩沖區的分配if?(u->buffer.start?==?NULL)?{u->buffer.start?=?ngx_palloc(r->pool,?u->conf->buffer_size);if?(u->buffer.start?==?NULL)?{ngx_http_upstream_finalize_request(r,?u,?NGX_HTTP_INTERNAL_SERVER_ERROR);return;}// 針對新申請的緩沖區進行初始化,省略.......}for?(?;;?)?{// 讀取響應的內容存儲在buffer中,每次讀取的最大不超過buffer_size,即當前緩沖區的剩余空間大小n?=?c->recv(c,?u->buffer.last,?u->buffer.end?-?u->buffer.last);// NGX_AGAIN代表響應還沒有讀完,設置讀事件到epoll中,等待下一次讀取if?(n?==?NGX_AGAIN)?{if?(ngx_handle_read_event(c->read,?0)?!=?NGX_OK)?{ngx_http_upstream_finalize_request(r,?u,?NGX_HTTP_INTERNAL_SERVER_ERROR);return;}return;}// 讀取出錯或者連接已關閉,則調用next函數決定是終止連接,還是重新選擇上游服務器if?(n?==?NGX_ERROR?||?n?==?0)?{ngx_http_upstream_next(r,?u,?NGX_HTTP_UPSTREAM_FT_ERROR);return;}// n 大于0時,代表讀取到的數據,此時last游標需要往后移動n個字節,last初始化時與start相同,指向buffer起始地址u->buffer.last?+=?n;// 開始處理讀取到的響應頭部信息rc?=?u->process_header(r);// 檢查process_header的返回值,當返回NGX_AGAIN時,需要判斷一下是否當前的緩沖區已經被用盡,如果被用盡說明一個buffer_size無法容納整個響應頭部if?(rc?==?NGX_AGAIN)?{// 當buffer無法容納整個響應頭部時,調用next決定是終止連接還是選擇下一個上游服務器if?(u->buffer.last?==?u->buffer.end)?{ngx_http_upstream_next(r,?u,?NGX_HTTP_UPSTREAM_FT_INVALID_HEADER);......}// 當process_header處理的是完整的響應頭部時,會進一步判斷其返回值,檢測到無效的響應頭部時,進行next的進一步決策處理if?(rc?==?NGX_HTTP_UPSTREAM_INVALID_HEADER)?{ngx_http_upstream_next(r,?u,?NGX_HTTP_UPSTREAM_FT_INVALID_HEADER);return;}// 當process_header返回ERROR時,直接終止當前的請求if?(rc?==?NGX_ERROR)?{ngx_http_upstream_finalize_request(r,?u,?NGX_HTTP_INTERNAL_SERVER_ERROR);return;}// 走到目前位置,當前的process_header至少是執行成功了,完整的解析了響應的頭部信息/*?rc?==?NGX_OK?*/.......// 處理已經解析出的頭部,該函數會把已經解析出的頭部,設置到ngx_http_request_t結構體的headers_out成員中// 當調用ngx_http_send_header時,可以將設置到headers_out中的響應頭部發送給客戶端if?(ngx_http_upstream_process_headers(r,?u)?!=?NGX_OK)?{return;}// subrequest_in_memory字段為0時,表示需要轉發響應到客戶端;為1時,表示不需要轉發響應到客戶端if?(!r->subrequest_in_memory)?{// 發送響應給客戶端ngx_http_upstream_send_response(r,?u);return;}// 以下的邏輯是不需要轉發響應給客戶端,即subrequest_in_memory為1的情況/*?subrequest content?in?memory?*/// 檢查一下input_filter是否為NULL,input_filter用于處理響應的包體,當沒有定義自己的實現方法時,使用默認的處理方法if?(u->input_filter?==?NULL)?{u->input_filter_init?=?ngx_http_upstream_non_buffered_filter_init;?u->input_filter?=?ngx_http_upstream_non_buffered_filter; ?u->input_filter_ctx?=?r;?}// 調用init方法為即將進行的包體處理做一些初始化的工作,默認的init函數是空的,什么也沒做? ??if?(u->input_filter_init(u->input_filter_ctx)?==?NGX_ERROR)?{ngx_http_upstream_finalize_request(r,?u,?NGX_ERROR);return;}// pos與last之間的內容是已經讀取但尚未處理的數據n?=?u->buffer.last?-?u->buffer.pos;// 當process_header處理完后,如果還有尚未處理的數據,那說明除了讀到了包頭之外,還讀到部分包體信息if?(n)?{u->buffer.last?=?u->buffer.pos;u->state->response_length?+=?n;// 調用input_filter處理已經讀到的包體信息if?(u->input_filter(u->input_filter_ctx,?n)?==?NGX_ERROR)?{ngx_http_upstream_finalize_request(r,?u,?NGX_ERROR);return;}}......// 設置處理上游響應包體的回調函數u->read_event_handler?=?ngx_http_upstream_process_body_in_memory;// 開始處理包體的信息ngx_http_upstream_process_body_in_memory(r,?u);
}
下面繼續分析一下,不用upstream直接轉發響應時的具體處理流程,主要是上面subrequest_memory為1的場景,此時該請求屬于一個子請求。
我們看一下上面分析時提到的默認的input_filter的處理方法,在上面的分析中,如果讀取包頭時同時讀到了包體信息,會調用input_filter方法處理:
點擊(此處)折疊或打開
static ngx_int_t?ngx_http_upstream_non_buffered_filter(void?*data,?ssize_t bytes)
{// data指向了請求的ngx_http_request_t結構,前面函數中當沒有定義input_filter時,對input_filter_ctx進行了重新初始化,指向了ngx_http_request_tngx_http_request_t?*r?=?data;......u?=?r->upstream;// 遍歷out_bufs使ll指向最后一個緩沖區->next的地址for?(cl?=?u->out_bufs,?ll?=?&u->out_bufs;?cl;?cl?=?cl->next)?{ll?=?&cl->next;}// 申請新的緩沖區cl?=?ngx_chain_get_free_buf(r->pool,?&u->free_bufs);if?(cl?==?NULL)?{return NGX_ERROR;}// 將新申請的緩沖區掛在out_bufs的鏈表末尾*ll?=?cl;......// buffer為接收上游響應包體的緩沖區b?=?&u->buffer;// b->last在調用該函數時,已經指向了接收到的包體的首地址,cl->buf->pos指向首地址后,將b->last和cl->buf->last設置為保存包體尾部cl->buf->pos?=?b->last;b->last?+=?bytes;cl->buf->last?=?b->last;cl->buf->tag?=?u->output.tag;// 如果沒有設置包體長度,則到此可以結束了if?(u->length?==?-1)?{return NGX_OK;}// 計算還需要接收的包體的長度u->length?-=?bytes;return NGX_OK;
}
繼續向下分析,process_header調用input_filter處理完包體后,最后調用的函數時ngx_http_upstream_process_body_in_memory,
該函數實際上會接收上游服務器的包體內容,下面看一下具體實現。
點擊(此處)折疊或打開
static void?ngx_http_upstream_process_body_in_memory(ngx_http_request_t?*r,?ngx_http_upstream_t?*u)
{......// 獲取到上游的連接信息c?=?u->peer.connection;// 獲取該連接的讀事件,判斷是否發生了讀事件的超時,如果超時,則直接結束連接rev?=?c->read;if?(rev->timedout)?{ngx_connection_error(c,?NGX_ETIMEDOUT,?"upstream timed out");ngx_http_upstream_finalize_request(r,?u,?NGX_HTTP_GATEWAY_TIME_OUT);return;}// buffer為存儲上游響應包體的緩沖區b?=?&u->buffer;for?(?;;?)?{// 計算剩余空閑緩沖區的大小size?=?b->end?-?b->last;......// 如果還有空閑的空間,調用recv方法繼續讀取響應n?=?c->recv(c,?b->last,?size);// 此處NGX_AGAIN代表需要等待下一次的讀事件if?(n?==?NGX_AGAIN)?{break;}// 如果上游主動關閉連接,或者讀取出現錯誤,則直接關閉連接? ??if?(n?==?0?||?n?==?NGX_ERROR)?{ngx_http_upstream_finalize_request(r,?u,?n);return;}// 更新讀到的響應包體的長度u->state->response_length?+=?n;// 處理讀到的包體內容if?(u->input_filter(u->input_filter_ctx,?n)?==?NGX_ERROR)?{ngx_http_upstream_finalize_request(r,?u,?NGX_ERROR);return;}if?(!rev->ready)?{break;}}// 如果包體長度沒有設置,則可以直接結束請求了if?(u->length?==?0)?{ngx_http_upstream_finalize_request(r,?u,?0);return;}// 將讀事件增加到Epoll中if?(ngx_handle_read_event(rev,?0)?!=?NGX_OK)?{ngx_http_upstream_finalize_request(r,?u,?NGX_ERROR);return;}// 將讀事件同時添加到定時器中,超時時間為配置的read_timeout,避免長時間等待if?(rev->active)?{ngx_add_timer(rev,?u->conf->read_timeout);}?else?if?(rev->timer_set)?{ngx_del_timer(rev);}
}
上面流程很容易看出一個問題,那就是讀取響應頭的Buffer的空間可能不足,導致處理出現問題。使用時關鍵還在于Input_filter方法中對buffer的管理。
分析完不轉發響應的過程后,繼續看一下轉發響應的兩種實現方式,下游網速優先和上游網速優先的實現。由于上游網速優先的方式,實現較為復雜,下面先看一下下游網速優先的方式,即采用固定的內存大小,作為響應的緩沖區。代碼上也刪減不必要的邏輯。
下游網速優先
點擊(此處)折疊或打開
static void
ngx_http_upstream_send_response(ngx_http_request_t?*r,?ngx_http_upstream_t?*u)
{......// 向下游的客戶端發送響應頭部,前面process_header處理時先將響應頭部設置到了headers_in中,然后upstream_process_headers將headers_in中的//?頭部設置到headers_out中,ngx_http_send_header就是將headers_out中的http包頭發送給客戶端rc?=?ngx_http_send_header(r);......// 設置頭部已發送的標志u->header_sent?=?1;......// 如果早期的請求攜帶了包體信息,且用到了臨時文件,則先清理臨時文件,因為已經收到響應了,請求的臨時文件肯定用不到了if?(r->request_body && r->request_body->temp_file)?{ngx_pool_run_cleanup_file(r->pool,?r->request_body->temp_file->file.fd);r->request_body->temp_file->file.fd?=?NGX_INVALID_FILE;}......// buffering為1代表上游網速優先,為0代表下游網速優先if?(!u->buffering)?{// 看一下用戶有沒有設置input_filter,沒有的話使用默認的input_filter函數if?(u->input_filter?==?NULL)?{u->input_filter_init?=?ngx_http_upstream_non_buffered_filter_init;u->input_filter?=?ngx_http_upstream_non_buffered_filter;u->input_filter_ctx?=?r;}// 設置接收上游響應的回調函數u->read_event_handler?=?ngx_http_upstream_process_non_buffered_upstream;// 設置向下游客戶端發送報文的回調函數r->write_event_handler?=?ngx_http_upstream_process_non_buffered_downstream;r->limit_rate?=?0;// 為input_filter處理包體做初始化的準備函數,默認實現是空的if?(u->input_filter_init(u->input_filter_ctx)?==?NGX_ERROR)?{ngx_http_upstream_finalize_request(r,?u,?NGX_ERROR);return;}......// 看一下解析完包頭后,是否還有未解析的包體信息,如果存在包體,則先處理一次包體,和前面分析不轉發響應的邏輯是一樣的n?=?u->buffer.last?-?u->buffer.pos;if?(n)?{// last指向代處理的包體的起始地址,更新response_length,調用input_filter處理當前的包體u->buffer.last?=?u->buffer.pos;u->state->response_length?+=?n;if?(u->input_filter(u->input_filter_ctx,?n)?==?NGX_ERROR)?{ngx_http_upstream_finalize_request(r,?u,?NGX_ERROR);return;}// 調用downstream將本次的包體轉發給客戶端ngx_http_upstream_process_non_buffered_downstream(r);}?else?{// 清空buff,實際上是將pos和last指針復位u->buffer.pos?=?u->buffer.start;u->buffer.last?=?u->buffer.start;// Todo....if?(ngx_http_send_special(r,?NGX_HTTP_FLUSH)?==?NGX_ERROR)?{ngx_http_upstream_finalize_request(r,?u,?NGX_ERROR);return;}// 如果連接上的讀事件已經準備好或者響應頭部沒有指定包體長度時,直接調用downstream方法處理響應if?(u->peer.connection->read->ready?||?u->length?==?0)?{ngx_http_upstream_process_non_buffered_upstream(r,?u);}}// 將控制權轉交給Nginx框架return;}......
}
ngx_http_upstream_process_non_buffered_downstream函數,用于處理上游服務器響應的讀事件
點擊(此處)折疊或打開
static void?ngx_http_upstream_process_non_buffered_downstream(ngx_http_request_t?*r)
{// 獲取與上游服務器的連接信息,和寫事件信息c?=?r->connection;u?=?r->upstream;wev?=?c->write;......// 如果出現寫事件超時,則設置超時標簽,同時終止連接if?(wev->timedout)?{c->timedout?=?1;ngx_connection_error(c,?NGX_ETIMEDOUT,?"client timed out");ngx_http_upstream_finalize_request(r,?u,?NGX_HTTP_REQUEST_TIME_OUT);return;}// non_buffered即固定內存,用固定內存處理轉發響應,其中第二個參數是個標簽,為1時代表向下游發送響應,為0時代表讀取上游的響應ngx_http_upstream_process_non_buffered_request(r,?1);
}
下面繼續分析一下ngx_http_upstream_process_non_buffered_request
點擊(此處)折疊或打開
static void
ngx_http_upstream_process_non_buffered_request(ngx_http_request_t?*r,?ngx_uint_t do_write)
{// 獲取上游和現有的連接信息,記錄為downstream和upstreamu?=?r->upstream;downstream?=?r->connection;upstream?=?u->peer.connection;b?=?&u->buffer;// 判斷是否向下游寫,do_write是調用方設置的,而u->length表示還需要接收的上游響應的長度,為0則代表不需要繼續接收do_write?=?do_write?||?u->length?==?0;for?(?;;?)?{// 判斷是否需要向客戶端寫數據if?(do_write)?{// out_bufs中記錄的是需要向下游寫的數據,而busy_bufs用于記錄當out_bufs無法一次發完時指向out_bufs,從而將out_bufs置空if?(u->out_bufs?||?u->busy_bufs)?{// 向下游發送out_bufs指向的內容,busy_bufs中記錄的是上一次的out_bufs的內容,現在已經合并當前的out_bufs中了rc?=?ngx_http_output_filter(r,?u->out_bufs);// 回收out_bufs上已經發送的buf,將未發送完的buf設置到busy_buf上,清空out_bufsngx_chain_update_chains(r->pool,?&u->free_bufs,?&u->busy_bufs,?&u->out_bufs,?u->output.tag);}// 當busy_bufs為空時,說明當前沒有需要發送到客戶端的內容了if?(u->busy_bufs?==?NULL)?{......// 將pos和last重新置位b->pos?=?b->start;b->last?=?b->start;}}// 計算一下buffer的可用空間size?=?b->end?-?b->last;// 繼續讀取響應if?(size?&& upstream->read->ready)?{n?=?upstream->recv(upstream,?b->last,?size);// NGX_AGAIN代表需要等待下一次讀事件if?(n?==?NGX_AGAIN)?{break;}// n > 0表示讀到的n字節的正常響應包體if?(n?>?0)?{// 更新response_lengthu->state->response_length?+=?n;// 調用input_filter處理包體if?(u->input_filter(u->input_filter_ctx,?n)?==?NGX_ERROR)?{ngx_http_upstream_finalize_request(r,?u,?NGX_ERROR);return;}}// 設置do_write標簽,標識已經讀到新的響應包體,需要向客戶端轉發數據do_write?=?1;continue;}break;}......if?(downstream->data?==?r)?{// 將下游的寫事件添加到epoll中if?(ngx_handle_write_event(downstream->write,?clcf->send_lowat)?!=?NGX_OK)......}// 同時設置超時定時器,控制等待時間,超時時間為配置的send_timeoutif?(downstream->write->active &&?!downstream->write->ready)?{ngx_add_timer(downstream->write,?clcf->send_timeout);}?......// 將連接上游的讀事件也添加到epoll中去if?(ngx_handle_read_event(upstream->read,?0)?!=?NGX_OK)?{ngx_http_upstream_finalize_request(r,?u,?NGX_ERROR);return;}// 設置讀定時器,控制等待時間,超時時間為配置的read_timeoutif?(upstream->read->active &&?!upstream->read->ready)?{ngx_add_timer(upstream->read,?u->conf->read_timeout);}......
}
上游網速優先
上游網速優先的實現比較復雜,目前官方攜帶的ngx_http_proxy_module即采用了上游網速優先的實現方式。當ngx_http_upstream_conf_t中的Buffering設置
為1時,則說明需要使用上游網速優先的方式。此時需要用ngx_event_pipe_t結構,這個結構維護著上下游間轉發的響應包體,用于解決內存復制的問題。
點擊(此處)折疊或打開
typedef?struct?ngx_event_pipe_s ngx_event_pipe_t;struct?ngx_event_pipe_s {ngx_connection_t?*upstream;????????????????????? ??// 與上游服務器間的連接ngx_connection_t?*downstream;????????????????? ? ??// 與下游客戶端間的連接????????????????????????ngx_chain_t?*free_raw_bufs;????????????????????????// 用于接收上游服務器響應的緩沖區鏈表,新收到的響應向鏈表頭部插入ngx_chain_t?*in;???????????????????????????????????// 接收到上游響應的緩沖區,ngx_event_pipe_copy_input_filter將buffer中的數據設置到in中ngx_chain_t?**last_in;????????????????????????? ? ?// 指向剛剛接收到的緩沖區ngx_chain_t?*out;????????????????????????????????? //?將要發給客戶端的緩沖區鏈表,ngx_chain_t?*free;????????????????????????????? ? ?//?等待釋放的緩沖區ngx_chain_t?*busy;????????????????????????????? ???//?表示上次發送響應時未發完的緩沖區鏈表,下一次發送時會合并到out鏈表中/**?the input filter i.e.?that moves HTTP/1.1 chunks*?from the raw bufs to an incoming chain*/ngx_event_pipe_input_filter_pt input_filter;? ? ? ?//?處理接收到的來自上游服務器的緩沖區,接收響應的處理方法void?*input_ctx;????????????????????????????????? ?//?input_filter函數的參數,通常設置為ngx_http_request_tngx_event_pipe_output_filter_pt output_filter; ? ??//?向下游發送響應的方法,默認為ngx_http_output_filtervoid?*output_ctx;??????????????????????????????????// output_filter函數的參數,通常設置為ngx_http_request_tunsigned read:1;????????????????????????????????? ?// 為1表示當前已經讀到來自上游的響應unsigned cacheable:1;????????????????????????? ? ??//?為1時表示啟用文件緩存unsigned single_buf:1;????????????????????????? ? ?//?為1時表示接收上游的響應時一次只能接收一個ngx_buf_t緩沖區unsigned free_bufs:1; ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?// 為1時表示當不再接收上游的響應包體時,盡可能快的釋放緩沖區unsigned upstream_done:1;??????????????????????????//?input_filter中用到的標識位,表示Nginx與上游間的交互已經結束unsigned upstream_error:1;????????????????????? ? ?//?與上游連接出現錯誤時,將該標識為置為1,比如超時,解析錯誤等unsigned upstream_eof:1;???????????????????????????//?與上游的連接已經關閉時,該標志位置為1unsigned upstream_blocked:1;????????????????????? ?//?表示暫時阻塞讀取上游響應的流程,先發送響應,再用釋放的緩沖區接收響應unsigned downstream_done:1;????????????????????????//?為1時表示與下游的交互已經結束unsigned downstream_error:1; ? ? ? ? ? ? ? ? ? ? ??//?與下游連接出現錯誤時,設置為1unsigned cyclic_temp_file:1;????????????????????? ?//?為1時會試圖復用臨時文件中曾用過的空間ngx_int_t allocated; ? ? ? ? ? ? ? ? ? ? ? ? ? ? ??//?表示已經分配的緩沖區的數目,其受bufs.num成員的限制ngx_bufs_t bufs;????????????????????????????????? ?//?記錄了接收上游響應的內存緩沖區的大小,bufs.size記錄每個緩沖區大小,bufs.num記錄緩沖區個數ngx_buf_tag_t tag;????????????????????????????? ? ?//?用于設置、比較緩沖區鏈表中ngx_buf_t結構體的tag標志位ssize_t busy_size;off_t read_length;?????????????????????????????????//?已經接收到上游響應包體長度off_t?length;????????????????????????????????? ? ??// 表示臨時文件的最大長度off_t max_temp_file_size;??????????????????????????//?表示臨時文件的最大長度ssize_t temp_file_write_size;????????????????????? //?表示一次寫入文件時的最大長度ngx_msec_t read_timeout;????????????????????????? ?//?讀取上游響應的超時時間ngx_msec_t send_timeout;????????????????????????? ?//?向下游發送響應的超時時間ssize_t send_lowat;????????????????????????????????//?向下游發送響應時,TCP連接中設置的參數ngx_pool_t?*pool;????????????????????????????? ? ??//?用于分配內存緩沖區的連接池對象ngx_log_t?*log;????????????????????????????????????//?用于記錄日志的ngx_log_t對象ngx_chain_t?*preread_bufs;????????????????????? ???// 表示接收上游服務器響應頭部的階段,已經讀到的響應包體size_t preread_size;???????????????????????????????// 表示接收上游服務器響應頭部的階段,已經讀到的響應包體長度ngx_buf_t?*buf_to_file;????????????????????? ? ? ? //?size_t limit_rate;?????????????????????????????????// 發送速率的限制time_t start_sec;??????????????????????????????????// 連接的啟動時間ngx_temp_file_t?*temp_file;????????????????????? ??// 存放上游響應的臨時文件/*?STUB?*/?int?num;????????????????????????????????// 已經使用的ngx_buf_t的數目
}
不管上游網速優先還是下游網速優先,響應的轉發都是通過ngx_http_upstream_send_response函數進行的。前面分析過下游網速優先的部分流程,
下面再繼續分析一下剩下的部分
點擊(此處)折疊或打開
static void?ngx_http_upstream_send_response(ngx_http_request_t?*r,?ngx_http_upstream_t?*u)
{//?發送設置到r->headers_out中的響應頭部rc?=?ngx_http_send_header(r);......// 如果客戶端的請求攜帶了包體,且包體已經保存到了臨時文件中,則清理臨時文件,前面分析過了if?(r->request_body && r->request_body->temp_file)?{ngx_pool_run_cleanup_file(r->pool,?r->request_body->temp_file->file.fd);r->request_body->temp_file->file.fd?=?NGX_INVALID_FILE;}......// buffering為1時走的是上游網速優先的流程,為0時走的是下游網速優先的流程if?(!u->buffering)?{......return ;}/*?TODO:?preallocate event_pipe bufs,?look?"Content-Length"?*/// pipe的內存在upstream啟動時已經分配了,這里直接使用,對pipe進行初始化p?=?u->pipe;// 設置向下游發送響應的方法p->output_filter?=?(ngx_event_pipe_output_filter_pt)?ngx_http_output_filter;// 將pipe的output_ctx指向ngx_http_request_t結構,后續傳入的參數都是pipe,通過pipe->output_ctx找到ngx_http_request_tp->output_ctx?=?r;// 設置轉發響應時啟用的每個緩沖區的tag標志位p->tag?=?u->output.tag;// bufs指定了內存緩沖區的限制p->bufs?=?u->conf->bufs;// 設置busy緩沖區中待發送的響應長度觸發值p->busy_size?=?u->conf->busy_buffers_size;// upstream指向nginx與上游服務器的連接p->upstream?=?u->peer.connection;// downstream指向nginx與客戶端之間的連接p->downstream?=?c;// 初始化用于分配內存緩沖區的內存池p->pool?=?r->pool;// 初始化用于記錄日志的log成員p->log?=?c->log;// 初始化速率閥值p->limit_rate?=?u->conf->limit_rate;// 記錄當前的時間p->start_sec?=?ngx_time();// 記錄是否進行文件緩存p->cacheable?=?u->cacheable?||?u->store;// 申請臨時文件結構p->temp_file?=?ngx_pcalloc(r->pool,?sizeof(ngx_temp_file_t));if?(p->temp_file?==?NULL)?{ngx_http_upstream_finalize_request(r,?u,?NGX_ERROR);return;}// 初始化臨時文件的結構信息p->temp_file->file.fd?=?NGX_INVALID_FILE;p->temp_file->file.log?=?c->log;p->temp_file->path?=?u->conf->temp_path;p->temp_file->pool?=?r->pool;......// 設置臨時存放上游響應的單個緩存文件的最大長度p->max_temp_file_size?=?u->conf->max_temp_file_size;// 設置一次寫入臨時文件時寫入的最大長度p->temp_file_write_size?=?u->conf->temp_file_write_size;// 申請預讀緩沖區鏈表,該鏈表的緩沖區不會分配內存來存放上游的響應內容,而用ngx_buf_t指向實際存放包體的內容p->preread_bufs?=?ngx_alloc_chain_link(r->pool);if?(p->preread_bufs?==?NULL)?{ngx_http_upstream_finalize_request(r,?u,?NGX_ERROR);return;}// 初始化預讀緩沖區的鏈表,(預讀是在讀取包頭時,同時讀到了包體的情況)p->preread_bufs->buf?=?&u->buffer;p->preread_bufs->next?=?NULL;u->buffer.recycled?=?1;p->preread_size?=?u->buffer.last?-?u->buffer.pos;.......// 設置讀取上游服務器響應的超時時間p->read_timeout?=?u->conf->read_timeout;// 設置發送到下游客戶端的超時時間p->send_timeout?=?clcf->send_timeout;// 設置向客戶端發送響應時TCP的send_lowat選項p->send_lowat?=?clcf->send_lowat;.......// 設置處理上游讀事件的回調u->read_event_handler?=?ngx_http_upstream_process_upstream;// 設置處理下游寫事件的回調r->write_event_handler?=?ngx_http_upstream_process_downstream;// 處理上游發來的響應包體ngx_http_upstream_process_upstream(r,?u);
}
不管是讀取上游的響應事件process_upstream,還是向客戶端寫數據的process_downstream,最終都是通過ngx_event_pipe實現緩存轉發響應的。
下面來看一下ngx_event_pipe的具體實現:
點擊(此處)折疊或打開
ngx_int_t?ngx_event_pipe(ngx_event_pipe_t?*p,?ngx_int_t do_write)
{// do_write為1表示需要向下游客戶端發送響應,為0表示需要從上游客戶端接收響應for?(?;;?)?{if?(do_write)?{// do_write為1,向下游發送響應包體,并檢查其返回值rc?=?ngx_event_pipe_write_to_downstream(p);......// 返回NGX_OK時繼續讀取上游的響應事件,返回其他值需要終止ngx_event_pipe函數}......// 從上游讀取響應數據if?(ngx_event_pipe_read_upstream(p)?==?NGX_ABORT)?{return NGX_ABORT;}// 當沒有讀取到響應數據,并且也不需要暫停讀取響應的讀取時,跳出當前循環,即不對do_write進行設置if?(!p->read &&?!p->upstream_blocked)?{break;}// 當讀到的響應數據,或者需要暫停讀取數據,先給客戶端發送響應以釋放緩沖區時,設置do_write進行響應的發送do_write?=?1;}if?(p->upstream->fd?!=?(ngx_socket_t)?-1)?{// 將上游讀事件添加到epoll中if?(ngx_handle_read_event(rev,?flags)?!=?NGX_OK)?{return NGX_ABORT;}// 同時設置讀事件的超時定時器if?(!rev->delayed)?{if?(rev->active &&?!rev->ready)?{ngx_add_timer(rev,?p->read_timeout);......}// 將下游的寫事件添加到epoll中,并且設置寫事件的定時器if?(p->downstream->fd?!=?(ngx_socket_t)?-1?&& p->downstream->data?==?p->output_ctx){wev?=?p->downstream->write;if?(ngx_handle_write_event(wev,?p->send_lowat)?!=?NGX_OK)?{return NGX_ABORT;}if?(!wev->delayed)?{if?(wev->active &&?!wev->ready)?{ngx_add_timer(wev,?p->send_timeout);.......}return NGX_OK;
}
上面函數中提到的ngx_event_pipe_read_upstream用于接收上游的響應,下面來具體看一下:
點擊(此處)折疊或打開
static ngx_int_t ngx_event_pipe_read_upstream(ngx_event_pipe_t?*p)
{......for?(?;;?)?{// 檢查上游連接是否結束,如果已經結束,不再接收新的響應,跳出循環if?(p->upstream_eof?||?p->upstream_error?||?p->upstream_done)?{break;}// 如果preread_bufs為NULL代表讀包頭時沒有讀到包體信息或者已經處理完成,ready為0表示沒有上游響應可以接收,跳出循環if?(p->preread_bufs?==?NULL &&?!p->upstream->read->ready)?{break;}// preread_bufs存放著接收包頭時可能讀取到的包體信息,如果不為空,則先要優先處理這部分包體信息if?(p->preread_bufs)?{chain?=?p->preread_bufs;// 用chain保存待處理的緩沖區,重置preread_bufs,下次循環則不會再走到該邏輯p->preread_bufs?=?NULL;n?=?p->preread_size;// 有待處理的包體信息,將read設置為1,表示接收到的包體待處理if?(n)?{p->read?=?1;}}?else?{.......}?else?{limit?=?0;}// free_raw_bufs用于表示一次ngx_event_pipe_read_upstream方法調用過程中接收到的上游響應if?(p->free_raw_bufs)?{chain?=?p->free_raw_bufs;if?(p->single_buf)?{p->free_raw_bufs?=?p->free_raw_bufs->next;chain->next?=?NULL;}?else?{p->free_raw_bufs?=?NULL;}// 判斷當前已分配的緩沖區的數量是否超過了bufs.num,沒有超過時可以繼續分配}?else?if?(p->allocated?<?p->bufs.num)?{b?=?ngx_create_temp_buf(p->pool,?p->bufs.size);if?(b?==?NULL)?{return NGX_ABORT;}p->allocated++;chain?=?ngx_alloc_chain_link(p->pool);if?(chain?==?NULL)?{return NGX_ABORT;}chain->buf?=?b;chain->next?=?NULL;// 緩沖區已經達到上限,如果寫事件的ready為1時表示可以向下游發送響應,而delay為0代表并不是由于限速的原因導致寫事件就緒// 當ready為1,且delay為0時,可以向下游發送響應來釋放緩沖區了}?else?if?(!p->cacheable&& p->downstream->data?==?p->output_ctx&& p->downstream->write->ready&&?!p->downstream->write->delayed){p->upstream_blocked?=?1;break;// offset表示臨時文件中已經寫入的響應內容的長度,檢查是否達到了配置的上限,當達到上限時,暫時不再接收上游響應// 沒有達到上限時,調用下面write方法將響應寫入臨時文件中}?else?if?(p->cacheable||?p->temp_file->offset?<?p->max_temp_file_size){/**?if?it is allowed,?then save some bufs from p->in*?to a temporary file,?and?add?them to a p->out?chain*/// 該函數將in緩沖區鏈表中的內容寫入temp_file臨時文件中,再將寫入臨時文件的ngx_buf_t緩沖區由in緩沖區鏈表中移出,添加到out緩沖區鏈表中rc?=?ngx_event_pipe_write_chain_to_temp_file(p);......// 調用recv_chain接收上游的響應n?=?p->upstream->recv_chain(p->upstream,?chain,?limit);// 將新接收到的緩沖區放置到free_raw_bufs鏈表的最后if?(p->free_raw_bufs)?{chain->next?=?p->free_raw_bufs;}......while?(cl?&& n?>?0)?{// 從接收到的緩沖區鏈表中取出一塊緩沖區,將其shadow域釋放掉ngx_event_pipe_remove_shadow_links(cl->buf);// 檢查當前收到的包體長度是否小于緩沖區的大小,小于時當前緩沖區可以繼續接收響應包體,否則緩沖區已滿,需要調用input_filter函數處理size?=?cl->buf->end?-?cl->buf->last;if?(n?>=?size)?{// 當前緩沖區已滿,需要處理,下面的input_filter方法是ngx_event_pipe_copy_input_filter函數,其主要在in鏈表中增加這個緩沖區cl->buf->last?=?cl->buf->end;if?(p->input_filter(p,?cl->buf)?==?NGX_ERROR)?{return NGX_ABORT;}// 更新待處理的包體的長度,釋放已經處理的緩沖區n?-=?size;ln?=?cl;cl?=?cl->next;ngx_free_chain(p->pool,?ln);}?else?{// 緩沖區沒有滿,更新last位置,n可以設置為0了,因為last的位置已經包含當前讀到的包體信息cl->buf->last?+=?n;n?=?0;}}// 走到這里時cl的鏈表中一定有緩沖區沒有用滿(最后一個?),此時cl不為NULL;或者cl的所有緩沖區都已經被處理回收了,此時cl為NULLif?(cl)?{for?(ln?=?cl;?ln->next;?ln?=?ln->next)?{ /*?void?*/?}// 此時的p->free_raw_bufs已經為NULL了,將p->free_raw_bufs指向當前待處理的緩沖區鏈表ln->next?=?p->free_raw_bufs;p->free_raw_bufs?=?cl;}......}......// upstream_eof為1時表示上游服務器關閉了連接,upstream_error表示處理過程中出現了錯誤,而free_raw_bufs不為空代表還有需要處理的包體信息if?((p->upstream_eof?||?p->upstream_error)?&& p->free_raw_bufs)?{// 調用input_filter處理剩余的包體信息if?(p->input_filter(p,?p->free_raw_bufs->buf)?==?NGX_ERROR)?{return NGX_ABORT;}p->free_raw_bufs?=?p->free_raw_bufs->next;// free_bufs為1時代表需要盡快釋放緩沖區中用到內存,此時應該調用ngx_pfree盡快釋放shadow域為空的緩沖區if?(p->free_bufs && p->buf_to_file?==?NULL)?{for?(cl?=?p->free_raw_bufs;?cl;?cl?=?cl->next)?{if?(cl->buf->shadow?==?NULL)?{ngx_pfree(p->pool,?cl->buf->start);......}.......
}
看完接收響應的處理過程,再來看一下發送響應的處理流程,對應的函數是ngx_event_pipe_write_to_downstream
點擊(此處)折疊或打開
static ngx_int_t?ngx_event_pipe_write_to_downstream(ngx_event_pipe_t?*p)
{......for?(?;;?)?{if?(p->downstream_error)?{return ngx_event_pipe_drain_chains(p);}// 檢查與上游的連接是否結束if?(p->upstream_eof?||?p->upstream_error?||?p->upstream_done)?{// 發送out鏈表中的緩沖區給客戶端if?(p->out)?{for?(cl?=?p->out;?cl;?cl?=?cl->next)?{cl->buf->recycled?=?0;}rc?=?p->output_filter(p->output_ctx,?p->out);......}// 發送in鏈表中的緩沖區給客戶端if?(p->in)?{for?(cl?=?p->in;?cl;?cl?=?cl->next)?{cl->buf->recycled?=?0;}rc?=?p->output_filter(p->output_ctx,?p->in);......}// 標識需要向下游發送的響應已經完成p->downstream_done?=?1;break;}......// 計算busy緩沖區中待發送的響應長度for?(cl?=?p->busy;?cl;?cl?=?cl->next)?{if?(cl->buf->recycled)?{......bsize?+=?cl->buf->end?-?cl->buf->start;prev?=?cl->buf->start;}}......// 檢查是否超過了busy_size的配置,當超過配置值時跳轉至flush處檢查和發送out緩沖區if?(bsize?>=?(size_t)?p->busy_size)?{flush?=?1;goto?flush;}......for?(?;;?)?{// 先檢查out鏈表是否為NULL,不為空則先發送out鏈表的緩沖區if?(p->out)?{cl?=?p->out;p->out?=?p->out->next;// 當out鏈表中的數據被處理完成后,開始處理in鏈表中的數據}?else?if?(!p->cacheable && p->in)?{cl?=?p->in;.....}?else?{break;}cl->next?=?NULL;if?(out)?{*ll?=?cl;}?else?{out?=?cl;}ll?=?&cl->next;}flush:......// 發送響應給客戶端rc?=?p->output_filter(p->output_ctx,?out);// 更新free、busy和out緩沖區ngx_chain_update_chains(p->pool,?&p->free,?&p->busy,?&out,?p->tag);......// 遍歷free鏈表中的緩沖區,釋放緩沖區中shadow域for?(cl?=?p->free;?cl;?cl?=?cl->next)?{......if?(cl->buf->last_shadow)?{if?(ngx_event_pipe_add_free_buf(p,?cl->buf->shadow)?!=?NGX_OK)?{return NGX_ABORT;}cl->buf->last_shadow?=?0;}cl->buf->shadow?=?NULL;}}return NGX_OK;
}
終于快要結束了,upstream的流程還是比較復雜的,最后看一下結束upstream的請求
結束upstream的請求
upstream請求的結束的流程,有三個函數可以進來,ngx_http_upstream_finalize_request、ngx_http_upstream_cleanup、ngx_http_upstream_next。
其中cleanup和next真正終止upstream時還是會調用到finalize_request函數。ngx_http_upstream_cleanup函數在啟動upstream時,會掛在到請求的cleanup
鏈表中,當HTTP框架結束http請求時一定會調用到upstream_cleanup函數。
點擊(此處)折疊或打開
static void?ngx_http_upstream_cleanup(void?*data)
{ngx_http_request_t?*r?=?data;ngx_http_upstream_finalize_request(r,?r->upstream,?NGX_DONE);
}
可以看到upstream_cleanup的實現,其實是直接調用了ngx_http_upstream_finalize_request,這個流程是我們期待的關閉方式。
而ngx_http_upstream_next函數,是在處理請求的的流程中出現錯誤才會主動調用到,該函數通過重連服務器、選取新的服務器等策略來提高服務的可用性。目前
nginx的負載均衡的功能就是通過next函數來實現的,我們后面會進行詳細分析,這里只簡單說明一下。
點擊(此處)折疊或打開
static void?ngx_http_upstream_next(ngx_http_request_t?*r,?ngx_http_upstream_t?*u,?ngx_uint_t ft_type)
{......if?(status)?{u->state->status?=?status;timeout?=?u->conf->next_upstream_timeout;// 當tries為0時,才最終結束upstream的請求if?(u->peer.tries?==?0||?!(u->conf->next_upstream & ft_type)||?(timeout && ngx_current_msec?-?u->peer.start_time?>=?timeout)){ngx_http_upstream_finalize_request(r,?u,?status);return;}}// 由于要發起新的連接,所以需要先關閉和上游服務器的已有連接if?(u->peer.connection)?{? ? ?if?(u->peer.connection->pool)?{ngx_destroy_pool(u->peer.connection->pool);}ngx_close_connection(u->peer.connection);u->peer.connection?=?NULL;}// 重新發起連接ngx_http_upstream_connect(r,?u);
}
最后看一下ngx_http_upstream_finalize_request的具體實現
點擊(此處)折疊或打開
static void?ngx_http_upstream_finalize_request(ngx_http_request_t?*r,?ngx_http_upstream_t?*u,?ngx_int_t rc)
{// 將cleanup指向的清理資源回調方法設置為NULLif?(u->cleanup)?{*u->cleanup?=?NULL;u->cleanup?=?NULL;}// 釋放解析主機域名時分配的資源if?(u->resolved && u->resolved->ctx)?{ngx_resolve_name_done(u->resolved->ctx);u->resolved->ctx?=?NULL;}......// 調用http模塊實現的finalize_request方法u->finalize_request(r,?rc);// 釋放與上游的連接if?(u->peer.connection)?{if?(u->peer.connection->pool)?{ngx_destroy_pool(u->peer.connection->pool);}ngx_close_connection(u->peer.connection);}u->peer.connection?=?NULL;// 刪除用于緩存響應的臨時文件if?(u->store && u->pipe && u->pipe->temp_file&& u->pipe->temp_file->file.fd?!=?NGX_INVALID_FILE){if?(ngx_delete_file(u->pipe->temp_file->file.name.data)==?NGX_FILE_ERROR)......}......// 最后還是調用HTTP框架提供的方法結束請求ngx_http_finalize_request(r,?rc);
}
至此,大概的梳理了一下upstream的處理流程,后面會針對目前已經實現的負載均衡各類算法,以及Nginx cache功能進行分析。。
總結
以上是生活随笔為你收集整理的Nginx upstream (一) 整体流程分析的全部內容,希望文章能夠幫你解決所遇到的問題。
如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。