這篇文章給大家分享的是有關(guān)nginx中worker進(jìn)程循環(huán)的實(shí)現(xiàn)示例的內(nèi)容。小編覺得挺實(shí)用的,因此分享給大家做個(gè)參考,一起跟隨小編過來看看吧。
成都創(chuàng)新互聯(lián)公司成立于2013年,先為萬州等服務(wù)建站,萬州等地企業(yè),進(jìn)行企業(yè)商務(wù)咨詢服務(wù)。為萬州企業(yè)網(wǎng)站制作PC+手機(jī)+微官網(wǎng)三網(wǎng)同步一站式服務(wù)解決您的所有建站問題。
worker進(jìn)程啟動(dòng)后,其首先會初始化自身運(yùn)行所需要的環(huán)境,然后會進(jìn)入一個(gè)循環(huán),在該循環(huán)中不斷檢查是否有需要執(zhí)行的事件,然后處理事件。在這個(gè)過程中,worker進(jìn)程也是需要與master進(jìn)程交互的,更有甚者,worker進(jìn)程作為一個(gè)子進(jìn)程,也是可以接收命令行指令(比如kill等)以進(jìn)行相應(yīng)邏輯的處理的。那么worker進(jìn)程是如何與master或者命令行指令進(jìn)行交互的呢?本文首先會對worker進(jìn)程與master進(jìn)程交互方式,以及worker進(jìn)程如何處理命令行指令的流程進(jìn)行講解,然后會從源碼上對worker進(jìn)程交互的整個(gè)工作流程進(jìn)行介紹。
1. worker與master進(jìn)程交互方式
這里首先需要說明的是,無論是master還是外部命令的方式,nginx都是通過標(biāo)志位的方式來處理相應(yīng)的指令的,也即在接收到一個(gè)指令(無論是master還是外部命令)的時(shí)候,worker會在其回調(diào)方法中設(shè)置與該指令相對應(yīng)的標(biāo)志位,然后在worker進(jìn)程在其自身的循環(huán)中處理完事件之后會依次檢查這些標(biāo)志位是否為真,是則根據(jù)該標(biāo)志位的作用執(zhí)行相應(yīng)的邏輯。
對于worker進(jìn)程與master進(jìn)程的交互,其是通過socket管道的方式進(jìn)行的。在ngx_process.h文件中聲明了一個(gè)ngx_process_t結(jié)構(gòu)體,這里我們主要關(guān)注其channel屬性:
typedef struct { // 其余屬性... ngx_socket_t channel[2]; } ngx_process_t;
這里的ngx_process_t結(jié)構(gòu)體的作用是存儲某個(gè)進(jìn)程相關(guān)的信息的,比如pid、channel、status等。每個(gè)進(jìn)程中都有一個(gè)ngx_processes數(shù)組,數(shù)組元素就是這里的ngx_process_t結(jié)構(gòu)體,也就是說每個(gè)進(jìn)程都會通過ngx_processes數(shù)組保存其余進(jìn)程的基本信息。其聲明如下:
// 存儲了nginx中所有的子進(jìn)程數(shù)組,每個(gè)子進(jìn)程都有一個(gè)對應(yīng)的ngx_process_t結(jié)構(gòu)體進(jìn)行標(biāo)記
extern ngx_process_t ngx_processes[NGX_MAX_PROCESSES];
這里我們就可以看出,每個(gè)進(jìn)程都會一個(gè)與之對應(yīng)的channel數(shù)組,這個(gè)數(shù)組的長度為2,其是與master進(jìn)程進(jìn)行交互的管道流。在master進(jìn)程創(chuàng)建每一個(gè)子進(jìn)程的之前,都會創(chuàng)建一個(gè)channel數(shù)組,該數(shù)組的創(chuàng)建方法為:
int socketpair(int domain, int type, int protocol, int sv[2]);
這個(gè)方法的主要作用是創(chuàng)建一對匿名的已經(jīng)連接的套接字,也就是說,如果在一個(gè)套接字中寫入數(shù)據(jù),那么在另一個(gè)套接字中就可以接收到寫入的數(shù)據(jù)。通過這種方式,如果在父進(jìn)程中往管道的一邊寫入數(shù)據(jù),那么在子進(jìn)程就可以在另一邊接收到數(shù)據(jù),這樣就可以實(shí)現(xiàn)父子進(jìn)程的數(shù)據(jù)通信了。
在master進(jìn)程啟動(dòng)完子進(jìn)程之后,子進(jìn)程會保有master進(jìn)程中相應(yīng)的數(shù)據(jù),也包括這里的channel數(shù)組。如此,master進(jìn)程就可以通過channel數(shù)組實(shí)現(xiàn)與子進(jìn)程的通信了。
2. worker處理外部命令
對于外部命令,其本質(zhì)上是通過signals數(shù)組中定義的各個(gè)信號以及回調(diào)方法進(jìn)行處理的。在master進(jìn)程初始化基本環(huán)境的時(shí)候,會將signals數(shù)組中指定的信號回調(diào)方法設(shè)置到對應(yīng)的信號中。由于worker進(jìn)程會繼承master進(jìn)程的基本環(huán)境,因而worker進(jìn)程在接收到這里設(shè)置的信號之后,也會調(diào)用對應(yīng)的回調(diào)方法。而該回調(diào)方法的主要邏輯也僅僅只是設(shè)置相應(yīng)的標(biāo)志位的值。關(guān)于nginx接收到信號之后如何設(shè)置對應(yīng)的標(biāo)志位,可以參照本人前面的文章(nginx master工作循環(huán) 超鏈接),這里不再贅述。
3. 源碼講解
master進(jìn)程是通過ngx_start_worker_processes()方法啟動(dòng)各個(gè)子進(jìn)程的,如下是該方法源碼:
/** * 啟動(dòng)n個(gè)worker子進(jìn)程,并設(shè)置好每個(gè)子進(jìn)程與master父進(jìn)程之間使用socketpair * 系統(tǒng)調(diào)用建立起來的socket句柄通信機(jī)制 */ static void ngx_start_worker_processes(ngx_cycle_t *cycle, ngx_int_t n, ngx_int_t type) { ngx_int_t i; ngx_channel_t ch; ngx_memzero(&ch, sizeof(ngx_channel_t)); ch.command = NGX_CMD_OPEN_CHANNEL; for (i = 0; i < n; i++) { // spawn是產(chǎn)卵的意思,這里就是生成一個(gè)子進(jìn)程的意思,而該子進(jìn)程所進(jìn)行的事件循環(huán)就是 // ngx_worker_process_cycle()方法,這里的ngx_worker_process_cycle是worker進(jìn)程處理事件的循環(huán), // worker進(jìn)程在一個(gè)無限for循環(huán)中,不斷的檢查相應(yīng)的事件模型中是否存在對應(yīng)的事件, // 然后將accept事件和read、write事件分開放入兩個(gè)隊(duì)列中,最后在事件循環(huán)中不斷的處理事件 ngx_spawn_process(cycle, ngx_worker_process_cycle, (void *) (intptr_t) i, "worker process", type); // 下面的這段代碼的主要作用是將新建進(jìn)程這個(gè)事件通知到其他的進(jìn)程,上面的 // ch.command = NGX_CMD_OPEN_CHANNEL;中NGX_CMD_OPEN_CHANNEL表示的就是當(dāng)前是新建了一個(gè)進(jìn)程, // 而ngx_process_slot存儲的就是該新建進(jìn)程所存放的數(shù)組位置,這里需要進(jìn)行廣播的原因在于, // 每個(gè)子進(jìn)程被創(chuàng)建后,其內(nèi)存數(shù)據(jù)都是復(fù)制的父進(jìn)程的,但是ngx_processes數(shù)組是每個(gè)進(jìn)程都有一份的, // 因而數(shù)組中先創(chuàng)建的子進(jìn)程是沒有后創(chuàng)建的子進(jìn)程的數(shù)據(jù)的,但是master進(jìn)程是有所有子進(jìn)程的數(shù)據(jù)的, // 因而這里master進(jìn)程創(chuàng)建子進(jìn)程之后,其就會向ngx_processes數(shù)組的每個(gè)進(jìn)程的channel[0]上 // 寫入當(dāng)前廣播的事件,也即這里的ch,通過這種方式,每個(gè)子進(jìn)程接收到這個(gè)事件之后, // 都會嘗試更新其所保存的ngx_processes數(shù)據(jù)信息 ch.pid = ngx_processes[ngx_process_slot].pid; ch.slot = ngx_process_slot; ch.fd = ngx_processes[ngx_process_slot].channel[0]; // 廣播事件 ngx_pass_open_channel(cycle, &ch); } }
這里我們主要需要關(guān)注上面的啟動(dòng)子進(jìn)程的方法調(diào)用,也即這里的ngx_spawn_process()方法,該方法的第二個(gè)參數(shù)是一個(gè)方法,在啟動(dòng)子進(jìn)程之后,子進(jìn)程就會進(jìn)入該方法所指定的循環(huán)中。而在ngx_spawn_process()方法中,master進(jìn)程會為當(dāng)前新創(chuàng)建的子進(jìn)程創(chuàng)建一個(gè)channel數(shù)組,以用于與當(dāng)前子進(jìn)程進(jìn)行通信。如下是ngx_spawn_process()方法的源碼:
ngx_pid_t ngx_spawn_process(ngx_cycle_t *cycle, ngx_spawn_proc_pt proc, void *data, char *name, ngx_int_t respawn) { u_long on; ngx_pid_t pid; ngx_int_t s; if (respawn >= 0) { s = respawn; } else { // 在ngx_processes數(shù)組中存儲了當(dāng)前創(chuàng)建的所有進(jìn)程,而ngx_last_process則是當(dāng)前當(dāng)前記錄的最后一個(gè) // process在ngx_processes中的下一個(gè)位置的索引,只不過ngx_processes中記錄的進(jìn)程有可能有部分 // 已經(jīng)失效了。當(dāng)前循環(huán)就是從頭開始查找是否有某個(gè)進(jìn)程已經(jīng)失效了,如果已經(jīng)失效了,則復(fù)用該進(jìn)程位置, // 否則直接使用ngx_last_process所指向的位置 for (s = 0; s < ngx_last_process; s++) { if (ngx_processes[s].pid == -1) { break; } } // 這里說明所創(chuàng)建的進(jìn)程數(shù)達(dá)到了最大限度 if (s == NGX_MAX_PROCESSES) { ngx_log_error(NGX_LOG_ALERT, cycle->log, 0, "no more than %d processes can be spawned", NGX_MAX_PROCESSES); return NGX_INVALID_PID; } } // NGX_PROCESS_DETACHED標(biāo)志表示當(dāng)前fork出來的進(jìn)程與原來的父進(jìn)程沒有任何關(guān)系,比如進(jìn)行nginx升級時(shí), // 新生成的master進(jìn)程就與原先的master進(jìn)程沒有關(guān)系 if (respawn != NGX_PROCESS_DETACHED) { /* Solaris 9 still has no AF_LOCAL */ // 這里的socketpair()方法的主要作用是生成一對套接字流,用于主進(jìn)程和子進(jìn)程的通信,這一對套接字會 // 存儲在ngx_processes[s].channel中,本質(zhì)上這個(gè)字段是一個(gè)長度為2的整型數(shù)組。在主進(jìn)程和子進(jìn)程 // 進(jìn)行通信的之前,主進(jìn)程會關(guān)閉其中一個(gè),而子進(jìn)程會關(guān)閉另一個(gè),然后相互之間往未關(guān)閉的另一個(gè)文件描述符中 // 寫入或讀取數(shù)據(jù)即可實(shí)現(xiàn)通信。 // AF_UNIX表示當(dāng)前使用的是UNIX文件形式的socket地址族 // SOCK_STREAM指定了當(dāng)前套接字建立的通信方式是管道流,并且這個(gè)管道流是雙向的, // 即管道雙方都可以進(jìn)行讀寫操作 // 第三個(gè)參數(shù)protocol必須為0 if (socketpair(AF_UNIX, SOCK_STREAM, 0, ngx_processes[s].channel) == -1) { ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno, "socketpair() failed while spawning \"%s\"", name); return NGX_INVALID_PID; } ngx_log_debug2(NGX_LOG_DEBUG_CORE, cycle->log, 0, "channel %d:%d", ngx_processes[s].channel[0], ngx_processes[s].channel[1]); // 將ngx_processes[s].channel[0]設(shè)置為非阻塞模式 if (ngx_nonblocking(ngx_processes[s].channel[0]) == -1) { ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno, ngx_nonblocking_n " failed while spawning \"%s\"", name); ngx_close_channel(ngx_processes[s].channel, cycle->log); return NGX_INVALID_PID; } // 將ngx_processes[s].channel[1]設(shè)置為非阻塞模式 if (ngx_nonblocking(ngx_processes[s].channel[1]) == -1) { ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno, ngx_nonblocking_n " failed while spawning \"%s\"", name); ngx_close_channel(ngx_processes[s].channel, cycle->log); return NGX_INVALID_PID; } on = 1; // 將ngx_processes[s].channel[0]套接字管道設(shè)置為異步模式 if (ioctl(ngx_processes[s].channel[0], FIOASYNC, &on) == -1) { ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno, "ioctl(FIOASYNC) failed while spawning \"%s\"", name); ngx_close_channel(ngx_processes[s].channel, cycle->log); return NGX_INVALID_PID; } // 當(dāng)前還處于主進(jìn)程中,這里的ngx_pid指向了主進(jìn)程的進(jìn)程id,當(dāng)前方法的作用主要是將 // ngx_processes[s].channel[0]的操作權(quán)限設(shè)置給主進(jìn)程,也就是說主進(jìn)程通過向 // ngx_processes[s].channel[0]寫入和讀取數(shù)據(jù)來與子進(jìn)程進(jìn)行通信 if (fcntl(ngx_processes[s].channel[0], F_SETOWN, ngx_pid) == -1) { ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno, "fcntl(F_SETOWN) failed while spawning \"%s\"", name); ngx_close_channel(ngx_processes[s].channel, cycle->log); return NGX_INVALID_PID; } // FD_CLOEXEC表示當(dāng)前指定的套接字管道在子進(jìn)程中可以使用,但是在execl()執(zhí)行的程序中不可使用 if (fcntl(ngx_processes[s].channel[0], F_SETFD, FD_CLOEXEC) == -1) { ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno, "fcntl(FD_CLOEXEC) failed while spawning \"%s\"", name); ngx_close_channel(ngx_processes[s].channel, cycle->log); return NGX_INVALID_PID; } // FD_CLOEXEC表示當(dāng)前指定的套接字管道在子進(jìn)程中可以使用,但是在execl()執(zhí)行的程序中不可使用 if (fcntl(ngx_processes[s].channel[1], F_SETFD, FD_CLOEXEC) == -1) { ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno, "fcntl(FD_CLOEXEC) failed while spawning \"%s\"", name); ngx_close_channel(ngx_processes[s].channel, cycle->log); return NGX_INVALID_PID; } // ngx_processes[s].channel[1]是用于給子進(jìn)程監(jiān)聽相關(guān)事件使用的,當(dāng)父進(jìn)程向 // ngx_processes[s].channel[0]發(fā)布事件之后,ngx_processes[s].channel[1]中就會接收到 // 對應(yīng)的事件,從而進(jìn)行相應(yīng)的處理 ngx_channel = ngx_processes[s].channel[1]; } else { // 如果是NGX_PROCESS_DETACHED模式,則表示當(dāng)前是另外新起的一個(gè)master進(jìn)程,因而將其管道值都置為-1 ngx_processes[s].channel[0] = -1; ngx_processes[s].channel[1] = -1; } ngx_process_slot = s; // fork()方法將產(chǎn)生一個(gè)新的進(jìn)程,這個(gè)進(jìn)程與父進(jìn)程的關(guān)系是子進(jìn)程的內(nèi)存數(shù)據(jù)將完全復(fù)制父進(jìn)程的。 // 還需要注意的是,fork()出來的子進(jìn)程執(zhí)行的代碼是從fork()之后開始執(zhí)行的,而對于父進(jìn)程而言, // 該方法的返回值為父進(jìn)程id,而對于子進(jìn)程而言,該方法返回值為0,因而通過if-else語句就可以讓父進(jìn)程 // 和子進(jìn)程分別調(diào)用后續(xù)不同的代碼片段 pid = fork(); switch (pid) { case -1: // fork出錯(cuò) ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno, "fork() failed while spawning \"%s\"", name); ngx_close_channel(ngx_processes[s].channel, cycle->log); return NGX_INVALID_PID; case 0: // 子進(jìn)程執(zhí)行的分支,這里的proc()方法是外部傳進(jìn)來的,也就是說,當(dāng)前方法只是創(chuàng)建一個(gè)新的進(jìn)程, // 具體的進(jìn)程處理邏輯,將交由外部代碼塊進(jìn)行定義ngx_getpid()方法獲取的就是當(dāng)前新創(chuàng)建的子進(jìn)程的進(jìn)程id ngx_pid = ngx_getpid(); proc(cycle, data); break; default: // 父進(jìn)程會走到這里 break; } ngx_log_error(NGX_LOG_NOTICE, cycle->log, 0, "start %s %P", name, pid); // 父進(jìn)程會走到這里,當(dāng)前的pid是fork()之后父進(jìn)程得到的新創(chuàng)建的子進(jìn)程的pid ngx_processes[s].pid = pid; ngx_processes[s].exited = 0; if (respawn >= 0) { return pid; } // 設(shè)置當(dāng)前進(jìn)程的各個(gè)屬性,并且存儲到ngx_processes數(shù)組中的對應(yīng)位置 ngx_processes[s].proc = proc; ngx_processes[s].data = data; ngx_processes[s].name = name; ngx_processes[s].exiting = 0; switch (respawn) { case NGX_PROCESS_NORESPAWN: ngx_processes[s].respawn = 0; ngx_processes[s].just_spawn = 0; ngx_processes[s].detached = 0; break; case NGX_PROCESS_JUST_SPAWN: ngx_processes[s].respawn = 0; ngx_processes[s].just_spawn = 1; ngx_processes[s].detached = 0; break; case NGX_PROCESS_RESPAWN: ngx_processes[s].respawn = 1; ngx_processes[s].just_spawn = 0; ngx_processes[s].detached = 0; break; case NGX_PROCESS_JUST_RESPAWN: ngx_processes[s].respawn = 1; ngx_processes[s].just_spawn = 1; ngx_processes[s].detached = 0; break; case NGX_PROCESS_DETACHED: ngx_processes[s].respawn = 0; ngx_processes[s].just_spawn = 0; ngx_processes[s].detached = 1; break; } if (s == ngx_last_process) { ngx_last_process++; } return pid; }
ngx_spawn_process()方法最后會fork()一個(gè)子進(jìn)程以執(zhí)行其第二個(gè)參數(shù)所指定的回調(diào)方法。但是在這之前,我們需要說明的是,其通過socketpair()方法調(diào)用會創(chuàng)建一對匿名的socket,然后將其存儲在當(dāng)前進(jìn)程的channel數(shù)組中,如此就完成了channel數(shù)組的創(chuàng)建。
worker進(jìn)程啟動(dòng)之后會執(zhí)行ngx_worker_process_cycle()方法,該方法首先會對worker進(jìn)程進(jìn)行初始化,其中就包括對繼承而來的channel數(shù)組的處理。由于master進(jìn)程和worker進(jìn)程都保有channel數(shù)組所指代的socket描述符,而本質(zhì)上master進(jìn)程和各個(gè)worker進(jìn)程只需要保有該數(shù)組的某一邊的描述符即可。因而這里worker進(jìn)程在初始化過程中,會關(guān)閉其所保存的另一邊的描述符。在nginx中,master進(jìn)程統(tǒng)一的會保留channel數(shù)組的0號位的socket描述符,關(guān)閉1號位的socket描述符,而worker進(jìn)程則會關(guān)閉0號位的socket描述符,保留1號位的描述符。這樣master進(jìn)程需要與worker進(jìn)程通信時(shí),就只需要往channel[0]中寫入數(shù)據(jù),而worker進(jìn)程則會監(jiān)聽channel[1],從而接收到master進(jìn)程的數(shù)據(jù)寫入。這里我們首先看一下worker進(jìn)程的初始化方法ngx_worker_process_init()的源碼:
/** * 這里主要是對當(dāng)前進(jìn)程進(jìn)行初始化,為其設(shè)置優(yōu)先級和打開的文件限制等參數(shù)。 * 最后會為當(dāng)前進(jìn)程添加一個(gè)監(jiān)聽channel[1]的連接,以不斷讀取master進(jìn)程的消息,從而進(jìn)行相應(yīng)的處理 */ static void ngx_worker_process_init(ngx_cycle_t *cycle, ngx_int_t worker) { sigset_t set; ngx_int_t n; ngx_time_t *tp; ngx_uint_t i; ngx_cpuset_t *cpu_affinity; struct rlimit rlmt; ngx_core_conf_t *ccf; ngx_listening_t *ls; // 設(shè)置時(shí)區(qū)相關(guān)的信息 if (ngx_set_environment(cycle, NULL) == NULL) { /* fatal */ exit(2); } ccf = (ngx_core_conf_t *) ngx_get_conf(cycle->conf_ctx, ngx_core_module); // 設(shè)置當(dāng)前進(jìn)程的優(yōu)先級 if (worker >= 0 && ccf->priority != 0) { if (setpriority(PRIO_PROCESS, 0, ccf->priority) == -1) { ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno, "setpriority(%d) failed", ccf->priority); } } // 設(shè)置當(dāng)前進(jìn)程能夠打開的文件句柄數(shù) if (ccf->rlimit_nofile != NGX_CONF_UNSET) { rlmt.rlim_cur = (rlim_t) ccf->rlimit_nofile; rlmt.rlim_max = (rlim_t) ccf->rlimit_nofile; if (setrlimit(RLIMIT_NOFILE, &rlmt) == -1) { ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno, "setrlimit(RLIMIT_NOFILE, %i) failed", ccf->rlimit_nofile); } } // Changes the limit on the largest size of a core file(RLIMIT_CORE) for worker processes. // 簡而言之就是設(shè)置核心文件能夠使用的最大大小 if (ccf->rlimit_core != NGX_CONF_UNSET) { rlmt.rlim_cur = (rlim_t) ccf->rlimit_core; rlmt.rlim_max = (rlim_t) ccf->rlimit_core; if (setrlimit(RLIMIT_CORE, &rlmt) == -1) { ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno, "setrlimit(RLIMIT_CORE, %O) failed", ccf->rlimit_core); } } // geteuid()返回執(zhí)行當(dāng)前程序的用戶id,這里的0表示是否為root用戶 if (geteuid() == 0) { // setgid()方法的作用是更改組的id if (setgid(ccf->group) == -1) { ngx_log_error(NGX_LOG_EMERG, cycle->log, ngx_errno, "setgid(%d) failed", ccf->group); /* fatal */ exit(2); } // initgroups()是更改附加組的id if (initgroups(ccf->username, ccf->group) == -1) { ngx_log_error(NGX_LOG_EMERG, cycle->log, ngx_errno, "initgroups(%s, %d) failed", ccf->username, ccf->group); } // 更改用戶的id if (setuid(ccf->user) == -1) { ngx_log_error(NGX_LOG_EMERG, cycle->log, ngx_errno, "setuid(%d) failed", ccf->user); /* fatal */ exit(2); } } // 需要注意的是,對于cache manager和cache loader進(jìn)程,這里的worker傳入的是-1, // 表示這兩個(gè)進(jìn)程不需要設(shè)置親核性 if (worker >= 0) { // 獲取當(dāng)前worker的CPU親核性 cpu_affinity = ngx_get_cpu_affinity(worker); if (cpu_affinity) { // 設(shè)置worker的親核心 ngx_setaffinity(cpu_affinity, cycle->log); } } #if (NGX_HAVE_PR_SET_DUMPABLE) if (prctl(PR_SET_DUMPABLE, 1, 0, 0, 0) == -1) { ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno, "prctl(PR_SET_DUMPABLE) failed"); } #endif if (ccf->working_directory.len) { // chdir()的作用是將當(dāng)前的工作目錄更改為其參數(shù)所傳入的路徑 if (chdir((char *) ccf->working_directory.data) == -1) { ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno, "chdir(\"%s\") failed", ccf->working_directory.data); /* fatal */ exit(2); } } // 初始化空的set指令集合 sigemptyset(&set); // ◆ SIG_BLOCK:將 set 參數(shù)指向信號集中的信號加入到信號掩碼中。 // ◆ SIG_UNBLOCK:將 set 參數(shù)指向的信號集中的信號從信號掩碼中刪除。 // ◆ SIG_SETMASK:將 set 參數(shù)指向信號集設(shè)置為信號掩碼。 // 這里就是直接初始化要阻塞的信號集,默認(rèn)為空集 if (sigprocmask(SIG_SETMASK, &set, NULL) == -1) { ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno, "sigprocmask() failed"); } tp = ngx_timeofday(); srandom(((unsigned) ngx_pid << 16) ^ tp->sec ^ tp->msec); ls = cycle->listening.elts; for (i = 0; i < cycle->listening.nelts; i++) { ls[i].previous = NULL; } // 這里調(diào)用各個(gè)模塊的init_process()方法進(jìn)行進(jìn)程模塊的初始化 for (i = 0; cycle->modules[i]; i++) { if (cycle->modules[i]->init_process) { if (cycle->modules[i]->init_process(cycle) == NGX_ERROR) { /* fatal */ exit(2); } } } // 這里主要是關(guān)閉當(dāng)前進(jìn)程中其他各個(gè)進(jìn)程的channel[1]管道句柄 for (n = 0; n < ngx_last_process; n++) { if (ngx_processes[n].pid == -1) { continue; } if (n == ngx_process_slot) { continue; } if (ngx_processes[n].channel[1] == -1) { continue; } if (close(ngx_processes[n].channel[1]) == -1) { ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno, "close() channel failed"); } } // 關(guān)閉當(dāng)前進(jìn)程的channel[0]管道句柄 if (close(ngx_processes[ngx_process_slot].channel[0]) == -1) { ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno, "close() channel failed"); } #if 0 ngx_last_process = 0; #endif // ngx_channel指向的是當(dāng)前進(jìn)程的channel[1]句柄,也即監(jiān)聽master進(jìn)程發(fā)送消息的句柄。 // 當(dāng)前方法中,首先會為當(dāng)前的句柄創(chuàng)建一個(gè)connection對象,并且將其封裝為一個(gè)事件,然后將該事件添加到 // 對應(yīng)的事件模型隊(duì)列中以監(jiān)聽當(dāng)前句柄的事件,事件的處理邏輯則主要有這里的ngx_channel_handler() // 方法進(jìn)行。這里的ngx_channel_handler的主要處理邏輯是,根據(jù)當(dāng)前收到的消息設(shè)置當(dāng)前進(jìn)程的一些標(biāo)志位, // 或者更新某些緩存數(shù)據(jù),如此,在當(dāng)前進(jìn)行的事件循環(huán)中,通過不斷檢查這些標(biāo)志位,從而實(shí)現(xiàn)在事件進(jìn)程中 // 處理真正的邏輯。因而這里的ngx_channel_handler的處理效率是非常高的 if (ngx_add_channel_event(cycle, ngx_channel, NGX_READ_EVENT, ngx_channel_handler) == NGX_ERROR) { /* fatal */ exit(2); } }
該方法主要是對worker進(jìn)程進(jìn)行初始化,這里我們主要需要關(guān)注最后會遍歷ngx_processes數(shù)組,這個(gè)數(shù)組中保存了當(dāng)前nginx中各個(gè)進(jìn)程的相關(guān)信息。在遍歷過程中,會關(guān)閉當(dāng)前進(jìn)程保有的其余進(jìn)程的channel[1]句柄,而保留有channel[0]句柄,這樣當(dāng)前進(jìn)程如果需要與其他進(jìn)程通信,也只需要往目標(biāo)進(jìn)程的channel[0]中寫入數(shù)據(jù)即可。在遍歷完成之后,當(dāng)前進(jìn)程就會關(guān)閉自身的channel[0]句柄,而保留channel[1]句柄。最后,會通過ngx_add_channel_event()方法為當(dāng)前進(jìn)程添加對channel[1]的監(jiān)聽事件,這里在調(diào)用ngx_add_channel_event()方法時(shí)傳入的第二個(gè)參數(shù)是ngx_channel,該參數(shù)是在前面的ngx_spawn_process()方法中賦值的,指向的就是當(dāng)前進(jìn)程的channel[1]的socket句柄。
關(guān)于ngx_add_channel_event()方法,其本質(zhì)就是創(chuàng)建一個(gè)ngx_event_t結(jié)構(gòu)體的事件,然后將其添加到當(dāng)前所使用的事件模型(比如epoll)句柄中。這里不再贅述該方法的實(shí)現(xiàn)源碼,不過我們需要關(guān)注的是該事件觸發(fā)時(shí)的回調(diào)方法,即調(diào)用ngx_add_channel_event()方法時(shí)傳入的第三個(gè)參數(shù)ngx_channel_handler()方法。如下是該方法的源碼:
static void ngx_channel_handler(ngx_event_t *ev) { ngx_int_t n; ngx_channel_t ch; ngx_connection_t *c; if (ev->timedout) { ev->timedout = 0; return; } c = ev->data; for (;;) { // 在無限for循環(huán)中不斷讀取master進(jìn)程發(fā)過來的消息 n = ngx_read_channel(c->fd, &ch, sizeof(ngx_channel_t), ev->log); // 如果讀取消息出錯(cuò),說明當(dāng)前的句柄可能失效了,就需要關(guān)閉當(dāng)前連接 if (n == NGX_ERROR) { if (ngx_event_flags & NGX_USE_EPOLL_EVENT) { ngx_del_conn(c, 0); } ngx_close_connection(c); return; } if (ngx_event_flags & NGX_USE_EVENTPORT_EVENT) { if (ngx_add_event(ev, NGX_READ_EVENT, 0) == NGX_ERROR) { return; } } if (n == NGX_AGAIN) { return; } // 對發(fā)送過來的消息進(jìn)行處理 switch (ch.command) { // 如果是quit消息,則設(shè)置quit標(biāo)志位 case NGX_CMD_QUIT: ngx_quit = 1; break; // 如果terminate消息,則設(shè)置terminate標(biāo)志位 case NGX_CMD_TERMINATE: ngx_terminate = 1; break; // 如果是reopen消息,則設(shè)置reopen標(biāo)志位 case NGX_CMD_REOPEN: ngx_reopen = 1; break; // 如果是新建進(jìn)程消息,則更新當(dāng)前ngx_processes數(shù)組對應(yīng)位置的數(shù)據(jù) case NGX_CMD_OPEN_CHANNEL: ngx_processes[ch.slot].pid = ch.pid; ngx_processes[ch.slot].channel[0] = ch.fd; break; // 如果是關(guān)閉channel的消息,則關(guān)閉ngx_processes數(shù)組對應(yīng)位置的句柄 case NGX_CMD_CLOSE_CHANNEL: if (close(ngx_processes[ch.slot].channel[0]) == -1) { ngx_log_error(NGX_LOG_ALERT, ev->log, ngx_errno, "close() channel failed"); } ngx_processes[ch.slot].channel[0] = -1; break; } } }
在ngx_channel_handler()方法中,主要是讀取所監(jiān)聽的socket句柄中的數(shù)據(jù),而數(shù)據(jù)是以一個(gè)ngx_channel_t結(jié)構(gòu)體所承載的,這個(gè)ngx_channel_t是nginx所統(tǒng)一使用的master與worker進(jìn)程進(jìn)行通信的結(jié)構(gòu)體,其會指定當(dāng)前發(fā)生的事件類型,以及發(fā)生該事件的進(jìn)程信息。如下是ngx_channel_t結(jié)構(gòu)體的聲明:
typedef struct { // 當(dāng)前發(fā)生的事件類型 ngx_uint_t command; // 發(fā)生事件的pid ngx_pid_t pid; // 發(fā)生事件的進(jìn)程在ngx_processes數(shù)組中的下標(biāo) ngx_int_t slot; // 發(fā)生事件的進(jìn)程的channel[0]描述符的值 ngx_fd_t fd; } ngx_channel_t;
在從當(dāng)前進(jìn)程的channel[1]中讀取了ngx_channel_t結(jié)構(gòu)體的數(shù)據(jù)之后,ngx_channel_handler()方法會根據(jù)發(fā)生的事件類型更新相應(yīng)的標(biāo)志位的狀態(tài),并且會更新當(dāng)前進(jìn)程的ngx_processes數(shù)組中對應(yīng)的發(fā)生事件的進(jìn)程的狀態(tài)信息。
在處理了master進(jìn)程所發(fā)送的事件之后,worker進(jìn)程就會繼續(xù)其循環(huán),在該循環(huán)中會檢查其所關(guān)注的標(biāo)志位的狀態(tài),然后會根據(jù)這些狀態(tài)執(zhí)行對應(yīng)的邏輯。如下是worker進(jìn)程工作的循環(huán)的源碼:
/** * 進(jìn)入worker進(jìn)程工作的循環(huán) */ static void ngx_worker_process_cycle(ngx_cycle_t *cycle, void *data) { ngx_int_t worker = (intptr_t) data; ngx_process = NGX_PROCESS_WORKER; ngx_worker = worker; // 初始化worker進(jìn)程,前面對該方法的源碼進(jìn)行了講解 ngx_worker_process_init(cycle, worker); ngx_setproctitle("worker process"); for (;;) { if (ngx_exiting) { // 這里主要是檢查有沒有事件是非cancelable狀態(tài)的,也就是說是否所有的事件都已經(jīng)取消了,如果取消了, // 就會返回NGX_OK。這里的邏輯可以理解為,如果被標(biāo)記為了ngx_exiting,那么此時(shí),如果還有未取消的 // 事件存在,則會走到下面的ngx_process_events_and_timers()方法,如此就會處理未完成的事件, // 然后在循環(huán)中再次走到這個(gè)位置,最終if條件為true,從而執(zhí)行退出worker進(jìn)程的工作 if (ngx_event_no_timers_left() == NGX_OK) { ngx_log_error(NGX_LOG_NOTICE, cycle->log, 0, "exiting"); ngx_worker_process_exit(cycle); } } ngx_log_debug0(NGX_LOG_DEBUG_EVENT, cycle->log, 0, "worker cycle"); // 這里通過檢查相應(yīng)的事件模型中是否存在對應(yīng)的事件,然后將其放入隊(duì)列中進(jìn)行處理, // 這里是worker進(jìn)程處理事件的核心方法 ngx_process_events_and_timers(cycle); // 這里ngx_terminate是強(qiáng)制關(guān)閉nginx的選項(xiàng),如果向nginx發(fā)送了強(qiáng)制關(guān)閉nginx命令,則當(dāng)前進(jìn)程會直接退出 if (ngx_terminate) { ngx_log_error(NGX_LOG_NOTICE, cycle->log, 0, "exiting"); ngx_worker_process_exit(cycle); } // 這里ngx_quit是優(yōu)雅退出的選項(xiàng)。這里主要是將ngx_exiting置為1,用于表征當(dāng)前進(jìn)程需要退出, // 然后會執(zhí)行如下三個(gè)工作: // 1. 往事件隊(duì)列中添加一個(gè)事件,用于處理當(dāng)前處于活躍狀態(tài)的連接,將其close標(biāo)志位置為1,并且執(zhí)行該連接 // 當(dāng)前的處理方法,以盡快完成連接事件; // 2. 關(guān)閉當(dāng)前cycle中監(jiān)聽的socket句柄; // 3. 將當(dāng)前所有處于空閑狀態(tài)的連接的close狀態(tài)標(biāo)記為1,然后調(diào)用其連接處理方法. if (ngx_quit) { ngx_quit = 0; ngx_log_error(NGX_LOG_NOTICE, cycle->log, 0, "gracefully shutting down"); ngx_setproctitle("worker process is shutting down"); if (!ngx_exiting) { ngx_exiting = 1; ngx_set_shutdown_timer(cycle); ngx_close_listening_sockets(cycle); ngx_close_idle_connections(cycle); } } // ngx_reopen主要是重新打開nginx的所有文件,比如切換nginx的日志文件等等 if (ngx_reopen) { ngx_reopen = 0; ngx_log_error(NGX_LOG_NOTICE, cycle->log, 0, "reopening logs"); ngx_reopen_files(cycle, -1); } } }
可以看到,worker進(jìn)程主要處理了nginx是否退出相關(guān)的標(biāo)志位,還處理了nginx是否重新讀取了配置文件的標(biāo)志位。
感謝各位的閱讀!關(guān)于“nginx中worker進(jìn)程循環(huán)的實(shí)現(xiàn)示例”這篇文章就分享到這里了,希望以上內(nèi)容可以對大家有一定的幫助,讓大家可以學(xué)到更多知識,如果覺得文章不錯(cuò),可以把它分享出去讓更多的人看到吧!