真实的国产乱ⅩXXX66竹夫人,五月香六月婷婷激情综合,亚洲日本VA一区二区三区,亚洲精品一区二区三区麻豆

成都創(chuàng)新互聯(lián)網(wǎng)站制作重慶分公司

nginx中worker進(jìn)程循環(huán)的實(shí)現(xiàn)示例

這篇文章給大家分享的是有關(guān)nginx中worker進(jìn)程循環(huán)的實(shí)現(xiàn)示例的內(nèi)容。小編覺得挺實(shí)用的,因此分享給大家做個(gè)參考,一起跟隨小編過來看看吧。

成都創(chuàng)新互聯(lián)公司成立于2013年,先為萬州等服務(wù)建站,萬州等地企業(yè),進(jìn)行企業(yè)商務(wù)咨詢服務(wù)。為萬州企業(yè)網(wǎng)站制作PC+手機(jī)+微官網(wǎng)三網(wǎng)同步一站式服務(wù)解決您的所有建站問題。

worker進(jìn)程啟動(dòng)后,其首先會初始化自身運(yùn)行所需要的環(huán)境,然后會進(jìn)入一個(gè)循環(huán),在該循環(huán)中不斷檢查是否有需要執(zhí)行的事件,然后處理事件。在這個(gè)過程中,worker進(jìn)程也是需要與master進(jìn)程交互的,更有甚者,worker進(jìn)程作為一個(gè)子進(jìn)程,也是可以接收命令行指令(比如kill等)以進(jìn)行相應(yīng)邏輯的處理的。那么worker進(jìn)程是如何與master或者命令行指令進(jìn)行交互的呢?本文首先會對worker進(jìn)程與master進(jìn)程交互方式,以及worker進(jìn)程如何處理命令行指令的流程進(jìn)行講解,然后會從源碼上對worker進(jìn)程交互的整個(gè)工作流程進(jìn)行介紹。

1. worker與master進(jìn)程交互方式

這里首先需要說明的是,無論是master還是外部命令的方式,nginx都是通過標(biāo)志位的方式來處理相應(yīng)的指令的,也即在接收到一個(gè)指令(無論是master還是外部命令)的時(shí)候,worker會在其回調(diào)方法中設(shè)置與該指令相對應(yīng)的標(biāo)志位,然后在worker進(jìn)程在其自身的循環(huán)中處理完事件之后會依次檢查這些標(biāo)志位是否為真,是則根據(jù)該標(biāo)志位的作用執(zhí)行相應(yīng)的邏輯。

對于worker進(jìn)程與master進(jìn)程的交互,其是通過socket管道的方式進(jìn)行的。在ngx_process.h文件中聲明了一個(gè)ngx_process_t結(jié)構(gòu)體,這里我們主要關(guān)注其channel屬性:

typedef struct {
  // 其余屬性...
  
  ngx_socket_t channel[2];
} ngx_process_t;

        這里的ngx_process_t結(jié)構(gòu)體的作用是存儲某個(gè)進(jìn)程相關(guān)的信息的,比如pid、channel、status等。每個(gè)進(jìn)程中都有一個(gè)ngx_processes數(shù)組,數(shù)組元素就是這里的ngx_process_t結(jié)構(gòu)體,也就是說每個(gè)進(jìn)程都會通過ngx_processes數(shù)組保存其余進(jìn)程的基本信息。其聲明如下:

// 存儲了nginx中所有的子進(jìn)程數(shù)組,每個(gè)子進(jìn)程都有一個(gè)對應(yīng)的ngx_process_t結(jié)構(gòu)體進(jìn)行標(biāo)記
extern ngx_process_t ngx_processes[NGX_MAX_PROCESSES];
        這里我們就可以看出,每個(gè)進(jìn)程都會一個(gè)與之對應(yīng)的channel數(shù)組,這個(gè)數(shù)組的長度為2,其是與master進(jìn)程進(jìn)行交互的管道流。在master進(jìn)程創(chuàng)建每一個(gè)子進(jìn)程的之前,都會創(chuàng)建一個(gè)channel數(shù)組,該數(shù)組的創(chuàng)建方法為:

int socketpair(int domain, int type, int protocol, int sv[2]);
        這個(gè)方法的主要作用是創(chuàng)建一對匿名的已經(jīng)連接的套接字,也就是說,如果在一個(gè)套接字中寫入數(shù)據(jù),那么在另一個(gè)套接字中就可以接收到寫入的數(shù)據(jù)。通過這種方式,如果在父進(jìn)程中往管道的一邊寫入數(shù)據(jù),那么在子進(jìn)程就可以在另一邊接收到數(shù)據(jù),這樣就可以實(shí)現(xiàn)父子進(jìn)程的數(shù)據(jù)通信了。

        在master進(jìn)程啟動(dòng)完子進(jìn)程之后,子進(jìn)程會保有master進(jìn)程中相應(yīng)的數(shù)據(jù),也包括這里的channel數(shù)組。如此,master進(jìn)程就可以通過channel數(shù)組實(shí)現(xiàn)與子進(jìn)程的通信了。

2. worker處理外部命令

        對于外部命令,其本質(zhì)上是通過signals數(shù)組中定義的各個(gè)信號以及回調(diào)方法進(jìn)行處理的。在master進(jìn)程初始化基本環(huán)境的時(shí)候,會將signals數(shù)組中指定的信號回調(diào)方法設(shè)置到對應(yīng)的信號中。由于worker進(jìn)程會繼承master進(jìn)程的基本環(huán)境,因而worker進(jìn)程在接收到這里設(shè)置的信號之后,也會調(diào)用對應(yīng)的回調(diào)方法。而該回調(diào)方法的主要邏輯也僅僅只是設(shè)置相應(yīng)的標(biāo)志位的值。關(guān)于nginx接收到信號之后如何設(shè)置對應(yīng)的標(biāo)志位,可以參照本人前面的文章(nginx master工作循環(huán) 超鏈接),這里不再贅述。

3. 源碼講解

        master進(jìn)程是通過ngx_start_worker_processes()方法啟動(dòng)各個(gè)子進(jìn)程的,如下是該方法源碼:

/**
 * 啟動(dòng)n個(gè)worker子進(jìn)程,并設(shè)置好每個(gè)子進(jìn)程與master父進(jìn)程之間使用socketpair
 * 系統(tǒng)調(diào)用建立起來的socket句柄通信機(jī)制
 */
static void ngx_start_worker_processes(ngx_cycle_t *cycle, ngx_int_t n, ngx_int_t type) {
 ngx_int_t i;
 ngx_channel_t ch;
 
 ngx_memzero(&ch, sizeof(ngx_channel_t));
 ch.command = NGX_CMD_OPEN_CHANNEL;

 for (i = 0; i < n; i++) {

  // spawn是產(chǎn)卵的意思,這里就是生成一個(gè)子進(jìn)程的意思,而該子進(jìn)程所進(jìn)行的事件循環(huán)就是
  // ngx_worker_process_cycle()方法,這里的ngx_worker_process_cycle是worker進(jìn)程處理事件的循環(huán),
  // worker進(jìn)程在一個(gè)無限for循環(huán)中,不斷的檢查相應(yīng)的事件模型中是否存在對應(yīng)的事件,
  // 然后將accept事件和read、write事件分開放入兩個(gè)隊(duì)列中,最后在事件循環(huán)中不斷的處理事件
  ngx_spawn_process(cycle, ngx_worker_process_cycle, 
           (void *) (intptr_t) i, "worker process", type);

  // 下面的這段代碼的主要作用是將新建進(jìn)程這個(gè)事件通知到其他的進(jìn)程,上面的
  // ch.command = NGX_CMD_OPEN_CHANNEL;中NGX_CMD_OPEN_CHANNEL表示的就是當(dāng)前是新建了一個(gè)進(jìn)程,
  // 而ngx_process_slot存儲的就是該新建進(jìn)程所存放的數(shù)組位置,這里需要進(jìn)行廣播的原因在于,
  // 每個(gè)子進(jìn)程被創(chuàng)建后,其內(nèi)存數(shù)據(jù)都是復(fù)制的父進(jìn)程的,但是ngx_processes數(shù)組是每個(gè)進(jìn)程都有一份的,
  // 因而數(shù)組中先創(chuàng)建的子進(jìn)程是沒有后創(chuàng)建的子進(jìn)程的數(shù)據(jù)的,但是master進(jìn)程是有所有子進(jìn)程的數(shù)據(jù)的,
  // 因而這里master進(jìn)程創(chuàng)建子進(jìn)程之后,其就會向ngx_processes數(shù)組的每個(gè)進(jìn)程的channel[0]上
  // 寫入當(dāng)前廣播的事件,也即這里的ch,通過這種方式,每個(gè)子進(jìn)程接收到這個(gè)事件之后,
  // 都會嘗試更新其所保存的ngx_processes數(shù)據(jù)信息
  ch.pid = ngx_processes[ngx_process_slot].pid;
  ch.slot = ngx_process_slot;
  ch.fd = ngx_processes[ngx_process_slot].channel[0];

  // 廣播事件
  ngx_pass_open_channel(cycle, &ch);
 }
}

        這里我們主要需要關(guān)注上面的啟動(dòng)子進(jìn)程的方法調(diào)用,也即這里的ngx_spawn_process()方法,該方法的第二個(gè)參數(shù)是一個(gè)方法,在啟動(dòng)子進(jìn)程之后,子進(jìn)程就會進(jìn)入該方法所指定的循環(huán)中。而在ngx_spawn_process()方法中,master進(jìn)程會為當(dāng)前新創(chuàng)建的子進(jìn)程創(chuàng)建一個(gè)channel數(shù)組,以用于與當(dāng)前子進(jìn)程進(jìn)行通信。如下是ngx_spawn_process()方法的源碼:

ngx_pid_t ngx_spawn_process(ngx_cycle_t *cycle, ngx_spawn_proc_pt proc, void *data, char *name, ngx_int_t respawn) {
 u_long on;
 ngx_pid_t pid;
 ngx_int_t s;

 if (respawn >= 0) {
  s = respawn;

 } else {
  // 在ngx_processes數(shù)組中存儲了當(dāng)前創(chuàng)建的所有進(jìn)程,而ngx_last_process則是當(dāng)前當(dāng)前記錄的最后一個(gè)
  // process在ngx_processes中的下一個(gè)位置的索引,只不過ngx_processes中記錄的進(jìn)程有可能有部分
  // 已經(jīng)失效了。當(dāng)前循環(huán)就是從頭開始查找是否有某個(gè)進(jìn)程已經(jīng)失效了,如果已經(jīng)失效了,則復(fù)用該進(jìn)程位置,
  // 否則直接使用ngx_last_process所指向的位置
  for (s = 0; s < ngx_last_process; s++) {
   if (ngx_processes[s].pid == -1) {
    break;
   }
  }

  // 這里說明所創(chuàng)建的進(jìn)程數(shù)達(dá)到了最大限度
  if (s == NGX_MAX_PROCESSES) {
   ngx_log_error(NGX_LOG_ALERT, cycle->log, 0,
          "no more than %d processes can be spawned",
          NGX_MAX_PROCESSES);
   return NGX_INVALID_PID;
  }
 }

 // NGX_PROCESS_DETACHED標(biāo)志表示當(dāng)前fork出來的進(jìn)程與原來的父進(jìn)程沒有任何關(guān)系,比如進(jìn)行nginx升級時(shí),
 // 新生成的master進(jìn)程就與原先的master進(jìn)程沒有關(guān)系
 if (respawn != NGX_PROCESS_DETACHED) {

  /* Solaris 9 still has no AF_LOCAL */

  // 這里的socketpair()方法的主要作用是生成一對套接字流,用于主進(jìn)程和子進(jìn)程的通信,這一對套接字會
  // 存儲在ngx_processes[s].channel中,本質(zhì)上這個(gè)字段是一個(gè)長度為2的整型數(shù)組。在主進(jìn)程和子進(jìn)程
  // 進(jìn)行通信的之前,主進(jìn)程會關(guān)閉其中一個(gè),而子進(jìn)程會關(guān)閉另一個(gè),然后相互之間往未關(guān)閉的另一個(gè)文件描述符中
  // 寫入或讀取數(shù)據(jù)即可實(shí)現(xiàn)通信。
  // AF_UNIX表示當(dāng)前使用的是UNIX文件形式的socket地址族
  // SOCK_STREAM指定了當(dāng)前套接字建立的通信方式是管道流,并且這個(gè)管道流是雙向的,
  // 即管道雙方都可以進(jìn)行讀寫操作
  // 第三個(gè)參數(shù)protocol必須為0
  if (socketpair(AF_UNIX, SOCK_STREAM, 0, ngx_processes[s].channel) == -1) {
   ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,
          "socketpair() failed while spawning \"%s\"", name);
   return NGX_INVALID_PID;
  }

  ngx_log_debug2(NGX_LOG_DEBUG_CORE, cycle->log, 0,
          "channel %d:%d",
          ngx_processes[s].channel[0],
          ngx_processes[s].channel[1]);

  // 將ngx_processes[s].channel[0]設(shè)置為非阻塞模式
  if (ngx_nonblocking(ngx_processes[s].channel[0]) == -1) {
   ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,
          ngx_nonblocking_n
            " failed while spawning \"%s\"",
          name);
   ngx_close_channel(ngx_processes[s].channel, cycle->log);
   return NGX_INVALID_PID;
  }

  // 將ngx_processes[s].channel[1]設(shè)置為非阻塞模式
  if (ngx_nonblocking(ngx_processes[s].channel[1]) == -1) {
   ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,
          ngx_nonblocking_n
            " failed while spawning \"%s\"",
          name);
   ngx_close_channel(ngx_processes[s].channel, cycle->log);
   return NGX_INVALID_PID;
  }

  on = 1;
  // 將ngx_processes[s].channel[0]套接字管道設(shè)置為異步模式
  if (ioctl(ngx_processes[s].channel[0], FIOASYNC, &on) == -1) {
   ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,
          "ioctl(FIOASYNC) failed while spawning \"%s\"", name);
   ngx_close_channel(ngx_processes[s].channel, cycle->log);
   return NGX_INVALID_PID;
  }

  // 當(dāng)前還處于主進(jìn)程中,這里的ngx_pid指向了主進(jìn)程的進(jìn)程id,當(dāng)前方法的作用主要是將
  // ngx_processes[s].channel[0]的操作權(quán)限設(shè)置給主進(jìn)程,也就是說主進(jìn)程通過向
  // ngx_processes[s].channel[0]寫入和讀取數(shù)據(jù)來與子進(jìn)程進(jìn)行通信
  if (fcntl(ngx_processes[s].channel[0], F_SETOWN, ngx_pid) == -1) {
   ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,
          "fcntl(F_SETOWN) failed while spawning \"%s\"", name);
   ngx_close_channel(ngx_processes[s].channel, cycle->log);
   return NGX_INVALID_PID;
  }

  // FD_CLOEXEC表示當(dāng)前指定的套接字管道在子進(jìn)程中可以使用,但是在execl()執(zhí)行的程序中不可使用
  if (fcntl(ngx_processes[s].channel[0], F_SETFD, FD_CLOEXEC) == -1) {
   ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,
          "fcntl(FD_CLOEXEC) failed while spawning \"%s\"",
          name);
   ngx_close_channel(ngx_processes[s].channel, cycle->log);
   return NGX_INVALID_PID;
  }

  // FD_CLOEXEC表示當(dāng)前指定的套接字管道在子進(jìn)程中可以使用,但是在execl()執(zhí)行的程序中不可使用
  if (fcntl(ngx_processes[s].channel[1], F_SETFD, FD_CLOEXEC) == -1) {
   ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,
          "fcntl(FD_CLOEXEC) failed while spawning \"%s\"",
          name);
   ngx_close_channel(ngx_processes[s].channel, cycle->log);
   return NGX_INVALID_PID;
  }

  // ngx_processes[s].channel[1]是用于給子進(jìn)程監(jiān)聽相關(guān)事件使用的,當(dāng)父進(jìn)程向
  // ngx_processes[s].channel[0]發(fā)布事件之后,ngx_processes[s].channel[1]中就會接收到
  // 對應(yīng)的事件,從而進(jìn)行相應(yīng)的處理
  ngx_channel = ngx_processes[s].channel[1];

 } else {
  // 如果是NGX_PROCESS_DETACHED模式,則表示當(dāng)前是另外新起的一個(gè)master進(jìn)程,因而將其管道值都置為-1
  ngx_processes[s].channel[0] = -1;
  ngx_processes[s].channel[1] = -1;
 }

 ngx_process_slot = s;


 // fork()方法將產(chǎn)生一個(gè)新的進(jìn)程,這個(gè)進(jìn)程與父進(jìn)程的關(guān)系是子進(jìn)程的內(nèi)存數(shù)據(jù)將完全復(fù)制父進(jìn)程的。
 // 還需要注意的是,fork()出來的子進(jìn)程執(zhí)行的代碼是從fork()之后開始執(zhí)行的,而對于父進(jìn)程而言,
 // 該方法的返回值為父進(jìn)程id,而對于子進(jìn)程而言,該方法返回值為0,因而通過if-else語句就可以讓父進(jìn)程
 // 和子進(jìn)程分別調(diào)用后續(xù)不同的代碼片段
 pid = fork();

 switch (pid) {

  case -1:
   // fork出錯(cuò)
   ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,
          "fork() failed while spawning \"%s\"", name);
   ngx_close_channel(ngx_processes[s].channel, cycle->log);
   return NGX_INVALID_PID;

  case 0:
   // 子進(jìn)程執(zhí)行的分支,這里的proc()方法是外部傳進(jìn)來的,也就是說,當(dāng)前方法只是創(chuàng)建一個(gè)新的進(jìn)程,
   // 具體的進(jìn)程處理邏輯,將交由外部代碼塊進(jìn)行定義ngx_getpid()方法獲取的就是當(dāng)前新創(chuàng)建的子進(jìn)程的進(jìn)程id
   ngx_pid = ngx_getpid();
   proc(cycle, data);
   break;

  default:
   // 父進(jìn)程會走到這里
   break;
 }

 ngx_log_error(NGX_LOG_NOTICE, cycle->log, 0, "start %s %P", name, pid);

 // 父進(jìn)程會走到這里,當(dāng)前的pid是fork()之后父進(jìn)程得到的新創(chuàng)建的子進(jìn)程的pid
 ngx_processes[s].pid = pid;
 ngx_processes[s].exited = 0;

 if (respawn >= 0) {
  return pid;
 }

 // 設(shè)置當(dāng)前進(jìn)程的各個(gè)屬性,并且存儲到ngx_processes數(shù)組中的對應(yīng)位置
 ngx_processes[s].proc = proc;
 ngx_processes[s].data = data;
 ngx_processes[s].name = name;
 ngx_processes[s].exiting = 0;

 switch (respawn) {

  case NGX_PROCESS_NORESPAWN:
   ngx_processes[s].respawn = 0;
   ngx_processes[s].just_spawn = 0;
   ngx_processes[s].detached = 0;
   break;

  case NGX_PROCESS_JUST_SPAWN:
   ngx_processes[s].respawn = 0;
   ngx_processes[s].just_spawn = 1;
   ngx_processes[s].detached = 0;
   break;

  case NGX_PROCESS_RESPAWN:
   ngx_processes[s].respawn = 1;
   ngx_processes[s].just_spawn = 0;
   ngx_processes[s].detached = 0;
   break;

  case NGX_PROCESS_JUST_RESPAWN:
   ngx_processes[s].respawn = 1;
   ngx_processes[s].just_spawn = 1;
   ngx_processes[s].detached = 0;
   break;

  case NGX_PROCESS_DETACHED:
   ngx_processes[s].respawn = 0;
   ngx_processes[s].just_spawn = 0;
   ngx_processes[s].detached = 1;
   break;
 }

 if (s == ngx_last_process) {
  ngx_last_process++;
 }

 return pid;
}

        ngx_spawn_process()方法最后會fork()一個(gè)子進(jìn)程以執(zhí)行其第二個(gè)參數(shù)所指定的回調(diào)方法。但是在這之前,我們需要說明的是,其通過socketpair()方法調(diào)用會創(chuàng)建一對匿名的socket,然后將其存儲在當(dāng)前進(jìn)程的channel數(shù)組中,如此就完成了channel數(shù)組的創(chuàng)建。

        worker進(jìn)程啟動(dòng)之后會執(zhí)行ngx_worker_process_cycle()方法,該方法首先會對worker進(jìn)程進(jìn)行初始化,其中就包括對繼承而來的channel數(shù)組的處理。由于master進(jìn)程和worker進(jìn)程都保有channel數(shù)組所指代的socket描述符,而本質(zhì)上master進(jìn)程和各個(gè)worker進(jìn)程只需要保有該數(shù)組的某一邊的描述符即可。因而這里worker進(jìn)程在初始化過程中,會關(guān)閉其所保存的另一邊的描述符。在nginx中,master進(jìn)程統(tǒng)一的會保留channel數(shù)組的0號位的socket描述符,關(guān)閉1號位的socket描述符,而worker進(jìn)程則會關(guān)閉0號位的socket描述符,保留1號位的描述符。這樣master進(jìn)程需要與worker進(jìn)程通信時(shí),就只需要往channel[0]中寫入數(shù)據(jù),而worker進(jìn)程則會監(jiān)聽channel[1],從而接收到master進(jìn)程的數(shù)據(jù)寫入。這里我們首先看一下worker進(jìn)程的初始化方法ngx_worker_process_init()的源碼:

/**
 * 這里主要是對當(dāng)前進(jìn)程進(jìn)行初始化,為其設(shè)置優(yōu)先級和打開的文件限制等參數(shù)。
 * 最后會為當(dāng)前進(jìn)程添加一個(gè)監(jiān)聽channel[1]的連接,以不斷讀取master進(jìn)程的消息,從而進(jìn)行相應(yīng)的處理
 */
static void ngx_worker_process_init(ngx_cycle_t *cycle, ngx_int_t worker) {
 sigset_t set;
 ngx_int_t n;
 ngx_time_t *tp;
 ngx_uint_t i;
 ngx_cpuset_t *cpu_affinity;
 struct rlimit rlmt;
 ngx_core_conf_t *ccf;
 ngx_listening_t *ls;

 // 設(shè)置時(shí)區(qū)相關(guān)的信息
 if (ngx_set_environment(cycle, NULL) == NULL) {
  /* fatal */
  exit(2);
 }

 ccf = (ngx_core_conf_t *) ngx_get_conf(cycle->conf_ctx, ngx_core_module);

 // 設(shè)置當(dāng)前進(jìn)程的優(yōu)先級
 if (worker >= 0 && ccf->priority != 0) {
  if (setpriority(PRIO_PROCESS, 0, ccf->priority) == -1) {
   ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,
          "setpriority(%d) failed", ccf->priority);
  }
 }

 // 設(shè)置當(dāng)前進(jìn)程能夠打開的文件句柄數(shù)
 if (ccf->rlimit_nofile != NGX_CONF_UNSET) {
  rlmt.rlim_cur = (rlim_t) ccf->rlimit_nofile;
  rlmt.rlim_max = (rlim_t) ccf->rlimit_nofile;

  if (setrlimit(RLIMIT_NOFILE, &rlmt) == -1) {
   ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,
          "setrlimit(RLIMIT_NOFILE, %i) failed",
          ccf->rlimit_nofile);
  }
 }

 // Changes the limit on the largest size of a core file(RLIMIT_CORE) for worker processes.
 // 簡而言之就是設(shè)置核心文件能夠使用的最大大小
 if (ccf->rlimit_core != NGX_CONF_UNSET) {
  rlmt.rlim_cur = (rlim_t) ccf->rlimit_core;
  rlmt.rlim_max = (rlim_t) ccf->rlimit_core;

  if (setrlimit(RLIMIT_CORE, &rlmt) == -1) {
   ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,
          "setrlimit(RLIMIT_CORE, %O) failed",
          ccf->rlimit_core);
  }
 }

 // geteuid()返回執(zhí)行當(dāng)前程序的用戶id,這里的0表示是否為root用戶
 if (geteuid() == 0) {
  // setgid()方法的作用是更改組的id
  if (setgid(ccf->group) == -1) {
   ngx_log_error(NGX_LOG_EMERG, cycle->log, ngx_errno,
          "setgid(%d) failed", ccf->group);
   /* fatal */
   exit(2);
  }

  // initgroups()是更改附加組的id
  if (initgroups(ccf->username, ccf->group) == -1) {
   ngx_log_error(NGX_LOG_EMERG, cycle->log, ngx_errno,
          "initgroups(%s, %d) failed",
          ccf->username, ccf->group);
  }

  // 更改用戶的id
  if (setuid(ccf->user) == -1) {
   ngx_log_error(NGX_LOG_EMERG, cycle->log, ngx_errno,
          "setuid(%d) failed", ccf->user);
   /* fatal */
   exit(2);
  }
 }

 // 需要注意的是,對于cache manager和cache loader進(jìn)程,這里的worker傳入的是-1,
 // 表示這兩個(gè)進(jìn)程不需要設(shè)置親核性
 if (worker >= 0) {
  // 獲取當(dāng)前worker的CPU親核性
  cpu_affinity = ngx_get_cpu_affinity(worker);

  if (cpu_affinity) {
   // 設(shè)置worker的親核心
   ngx_setaffinity(cpu_affinity, cycle->log);
  }
 }

#if (NGX_HAVE_PR_SET_DUMPABLE)
 if (prctl(PR_SET_DUMPABLE, 1, 0, 0, 0) == -1) {
   ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,
          "prctl(PR_SET_DUMPABLE) failed");
 }

#endif

 if (ccf->working_directory.len) {
  // chdir()的作用是將當(dāng)前的工作目錄更改為其參數(shù)所傳入的路徑
  if (chdir((char *) ccf->working_directory.data) == -1) {
   ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,
          "chdir(\"%s\") failed", ccf->working_directory.data);
   /* fatal */
   exit(2);
  }
 }

 // 初始化空的set指令集合
 sigemptyset(&set);

 // ◆ SIG_BLOCK:將 set 參數(shù)指向信號集中的信號加入到信號掩碼中。
 // ◆ SIG_UNBLOCK:將 set 參數(shù)指向的信號集中的信號從信號掩碼中刪除。
 // ◆ SIG_SETMASK:將 set 參數(shù)指向信號集設(shè)置為信號掩碼。
 // 這里就是直接初始化要阻塞的信號集,默認(rèn)為空集
 if (sigprocmask(SIG_SETMASK, &set, NULL) == -1) {
  ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,
         "sigprocmask() failed");
 }

 tp = ngx_timeofday();
 srandom(((unsigned) ngx_pid << 16) ^ tp->sec ^ tp->msec);

 ls = cycle->listening.elts;
 for (i = 0; i < cycle->listening.nelts; i++) {
  ls[i].previous = NULL;
 }

 // 這里調(diào)用各個(gè)模塊的init_process()方法進(jìn)行進(jìn)程模塊的初始化
 for (i = 0; cycle->modules[i]; i++) {
  if (cycle->modules[i]->init_process) {
   if (cycle->modules[i]->init_process(cycle) == NGX_ERROR) {
    /* fatal */
    exit(2);
   }
  }
 }

 // 這里主要是關(guān)閉當(dāng)前進(jìn)程中其他各個(gè)進(jìn)程的channel[1]管道句柄
 for (n = 0; n < ngx_last_process; n++) {

  if (ngx_processes[n].pid == -1) {
   continue;
  }

  if (n == ngx_process_slot) {
   continue;
  }

  if (ngx_processes[n].channel[1] == -1) {
   continue;
  }

  if (close(ngx_processes[n].channel[1]) == -1) {
   ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,
          "close() channel failed");
  }
 }

 // 關(guān)閉當(dāng)前進(jìn)程的channel[0]管道句柄
 if (close(ngx_processes[ngx_process_slot].channel[0]) == -1) {
  ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,
         "close() channel failed");
 }

#if 0
 ngx_last_process = 0;
#endif

 // ngx_channel指向的是當(dāng)前進(jìn)程的channel[1]句柄,也即監(jiān)聽master進(jìn)程發(fā)送消息的句柄。
 // 當(dāng)前方法中,首先會為當(dāng)前的句柄創(chuàng)建一個(gè)connection對象,并且將其封裝為一個(gè)事件,然后將該事件添加到
 // 對應(yīng)的事件模型隊(duì)列中以監(jiān)聽當(dāng)前句柄的事件,事件的處理邏輯則主要有這里的ngx_channel_handler()
 // 方法進(jìn)行。這里的ngx_channel_handler的主要處理邏輯是,根據(jù)當(dāng)前收到的消息設(shè)置當(dāng)前進(jìn)程的一些標(biāo)志位,
 // 或者更新某些緩存數(shù)據(jù),如此,在當(dāng)前進(jìn)行的事件循環(huán)中,通過不斷檢查這些標(biāo)志位,從而實(shí)現(xiàn)在事件進(jìn)程中
 // 處理真正的邏輯。因而這里的ngx_channel_handler的處理效率是非常高的
 if (ngx_add_channel_event(cycle, ngx_channel, NGX_READ_EVENT,
              ngx_channel_handler)
   == NGX_ERROR) {
  /* fatal */
  exit(2);
 }
}

        該方法主要是對worker進(jìn)程進(jìn)行初始化,這里我們主要需要關(guān)注最后會遍歷ngx_processes數(shù)組,這個(gè)數(shù)組中保存了當(dāng)前nginx中各個(gè)進(jìn)程的相關(guān)信息。在遍歷過程中,會關(guān)閉當(dāng)前進(jìn)程保有的其余進(jìn)程的channel[1]句柄,而保留有channel[0]句柄,這樣當(dāng)前進(jìn)程如果需要與其他進(jìn)程通信,也只需要往目標(biāo)進(jìn)程的channel[0]中寫入數(shù)據(jù)即可。在遍歷完成之后,當(dāng)前進(jìn)程就會關(guān)閉自身的channel[0]句柄,而保留channel[1]句柄。最后,會通過ngx_add_channel_event()方法為當(dāng)前進(jìn)程添加對channel[1]的監(jiān)聽事件,這里在調(diào)用ngx_add_channel_event()方法時(shí)傳入的第二個(gè)參數(shù)是ngx_channel,該參數(shù)是在前面的ngx_spawn_process()方法中賦值的,指向的就是當(dāng)前進(jìn)程的channel[1]的socket句柄。

        關(guān)于ngx_add_channel_event()方法,其本質(zhì)就是創(chuàng)建一個(gè)ngx_event_t結(jié)構(gòu)體的事件,然后將其添加到當(dāng)前所使用的事件模型(比如epoll)句柄中。這里不再贅述該方法的實(shí)現(xiàn)源碼,不過我們需要關(guān)注的是該事件觸發(fā)時(shí)的回調(diào)方法,即調(diào)用ngx_add_channel_event()方法時(shí)傳入的第三個(gè)參數(shù)ngx_channel_handler()方法。如下是該方法的源碼:

static void ngx_channel_handler(ngx_event_t *ev) {
 ngx_int_t n;
 ngx_channel_t ch;
 ngx_connection_t *c;

 if (ev->timedout) {
  ev->timedout = 0;
  return;
 }

 c = ev->data;

 for (;;) {

  // 在無限for循環(huán)中不斷讀取master進(jìn)程發(fā)過來的消息
  n = ngx_read_channel(c->fd, &ch, sizeof(ngx_channel_t), ev->log);

  // 如果讀取消息出錯(cuò),說明當(dāng)前的句柄可能失效了,就需要關(guān)閉當(dāng)前連接
  if (n == NGX_ERROR) {
   if (ngx_event_flags & NGX_USE_EPOLL_EVENT) {
    ngx_del_conn(c, 0);
   }

   ngx_close_connection(c);
   return;
  }

  if (ngx_event_flags & NGX_USE_EVENTPORT_EVENT) {
   if (ngx_add_event(ev, NGX_READ_EVENT, 0) == NGX_ERROR) {
    return;
   }
  }

  if (n == NGX_AGAIN) {
   return;
  }

  // 對發(fā)送過來的消息進(jìn)行處理
  switch (ch.command) {
   // 如果是quit消息,則設(shè)置quit標(biāo)志位
   case NGX_CMD_QUIT:
    ngx_quit = 1;
    break;

    // 如果terminate消息,則設(shè)置terminate標(biāo)志位
   case NGX_CMD_TERMINATE:
    ngx_terminate = 1;
    break;

    // 如果是reopen消息,則設(shè)置reopen標(biāo)志位
   case NGX_CMD_REOPEN:
    ngx_reopen = 1;
    break;

    // 如果是新建進(jìn)程消息,則更新當(dāng)前ngx_processes數(shù)組對應(yīng)位置的數(shù)據(jù)
   case NGX_CMD_OPEN_CHANNEL:
    ngx_processes[ch.slot].pid = ch.pid;
    ngx_processes[ch.slot].channel[0] = ch.fd;
    break;

    // 如果是關(guān)閉channel的消息,則關(guān)閉ngx_processes數(shù)組對應(yīng)位置的句柄
   case NGX_CMD_CLOSE_CHANNEL:
    if (close(ngx_processes[ch.slot].channel[0]) == -1) {
     ngx_log_error(NGX_LOG_ALERT, ev->log, ngx_errno,
            "close() channel failed");
    }

    ngx_processes[ch.slot].channel[0] = -1;
    break;
  }
 }
}

        在ngx_channel_handler()方法中,主要是讀取所監(jiān)聽的socket句柄中的數(shù)據(jù),而數(shù)據(jù)是以一個(gè)ngx_channel_t結(jié)構(gòu)體所承載的,這個(gè)ngx_channel_t是nginx所統(tǒng)一使用的master與worker進(jìn)程進(jìn)行通信的結(jié)構(gòu)體,其會指定當(dāng)前發(fā)生的事件類型,以及發(fā)生該事件的進(jìn)程信息。如下是ngx_channel_t結(jié)構(gòu)體的聲明:

typedef struct {
  // 當(dāng)前發(fā)生的事件類型
  ngx_uint_t command;
  // 發(fā)生事件的pid
  ngx_pid_t pid;
  // 發(fā)生事件的進(jìn)程在ngx_processes數(shù)組中的下標(biāo)
  ngx_int_t slot;
  // 發(fā)生事件的進(jìn)程的channel[0]描述符的值
  ngx_fd_t fd;
} ngx_channel_t;

       在從當(dāng)前進(jìn)程的channel[1]中讀取了ngx_channel_t結(jié)構(gòu)體的數(shù)據(jù)之后,ngx_channel_handler()方法會根據(jù)發(fā)生的事件類型更新相應(yīng)的標(biāo)志位的狀態(tài),并且會更新當(dāng)前進(jìn)程的ngx_processes數(shù)組中對應(yīng)的發(fā)生事件的進(jìn)程的狀態(tài)信息。

        在處理了master進(jìn)程所發(fā)送的事件之后,worker進(jìn)程就會繼續(xù)其循環(huán),在該循環(huán)中會檢查其所關(guān)注的標(biāo)志位的狀態(tài),然后會根據(jù)這些狀態(tài)執(zhí)行對應(yīng)的邏輯。如下是worker進(jìn)程工作的循環(huán)的源碼:

/**
 * 進(jìn)入worker進(jìn)程工作的循環(huán)
 */
static void ngx_worker_process_cycle(ngx_cycle_t *cycle, void *data) {
 ngx_int_t worker = (intptr_t) data;

 ngx_process = NGX_PROCESS_WORKER;
 ngx_worker = worker;

 // 初始化worker進(jìn)程,前面對該方法的源碼進(jìn)行了講解
 ngx_worker_process_init(cycle, worker);

 ngx_setproctitle("worker process");

 for (;;) {

  if (ngx_exiting) {
   // 這里主要是檢查有沒有事件是非cancelable狀態(tài)的,也就是說是否所有的事件都已經(jīng)取消了,如果取消了,
   // 就會返回NGX_OK。這里的邏輯可以理解為,如果被標(biāo)記為了ngx_exiting,那么此時(shí),如果還有未取消的
   // 事件存在,則會走到下面的ngx_process_events_and_timers()方法,如此就會處理未完成的事件,
   // 然后在循環(huán)中再次走到這個(gè)位置,最終if條件為true,從而執(zhí)行退出worker進(jìn)程的工作
   if (ngx_event_no_timers_left() == NGX_OK) {
    ngx_log_error(NGX_LOG_NOTICE, cycle->log, 0, "exiting");
    ngx_worker_process_exit(cycle);
   }
  }

  ngx_log_debug0(NGX_LOG_DEBUG_EVENT, cycle->log, 0, "worker cycle");

  // 這里通過檢查相應(yīng)的事件模型中是否存在對應(yīng)的事件,然后將其放入隊(duì)列中進(jìn)行處理,
  // 這里是worker進(jìn)程處理事件的核心方法
  ngx_process_events_and_timers(cycle);

  // 這里ngx_terminate是強(qiáng)制關(guān)閉nginx的選項(xiàng),如果向nginx發(fā)送了強(qiáng)制關(guān)閉nginx命令,則當(dāng)前進(jìn)程會直接退出
  if (ngx_terminate) {
   ngx_log_error(NGX_LOG_NOTICE, cycle->log, 0, "exiting");
   ngx_worker_process_exit(cycle);
  }

  // 這里ngx_quit是優(yōu)雅退出的選項(xiàng)。這里主要是將ngx_exiting置為1,用于表征當(dāng)前進(jìn)程需要退出,
  // 然后會執(zhí)行如下三個(gè)工作:
  // 1. 往事件隊(duì)列中添加一個(gè)事件,用于處理當(dāng)前處于活躍狀態(tài)的連接,將其close標(biāo)志位置為1,并且執(zhí)行該連接
  //  當(dāng)前的處理方法,以盡快完成連接事件;
  // 2. 關(guān)閉當(dāng)前cycle中監(jiān)聽的socket句柄;
  // 3. 將當(dāng)前所有處于空閑狀態(tài)的連接的close狀態(tài)標(biāo)記為1,然后調(diào)用其連接處理方法.
  if (ngx_quit) {
   ngx_quit = 0;
   ngx_log_error(NGX_LOG_NOTICE, cycle->log, 0, "gracefully shutting down");
   ngx_setproctitle("worker process is shutting down");

   if (!ngx_exiting) {
    ngx_exiting = 1;
    ngx_set_shutdown_timer(cycle);
    ngx_close_listening_sockets(cycle);
    ngx_close_idle_connections(cycle);
   }
  }

  // ngx_reopen主要是重新打開nginx的所有文件,比如切換nginx的日志文件等等
  if (ngx_reopen) {
   ngx_reopen = 0;
   ngx_log_error(NGX_LOG_NOTICE, cycle->log, 0, "reopening logs");
   ngx_reopen_files(cycle, -1);
  }
 }
}

        可以看到,worker進(jìn)程主要處理了nginx是否退出相關(guān)的標(biāo)志位,還處理了nginx是否重新讀取了配置文件的標(biāo)志位。

感謝各位的閱讀!關(guān)于“nginx中worker進(jìn)程循環(huán)的實(shí)現(xiàn)示例”這篇文章就分享到這里了,希望以上內(nèi)容可以對大家有一定的幫助,讓大家可以學(xué)到更多知識,如果覺得文章不錯(cuò),可以把它分享出去讓更多的人看到吧!


新聞名稱:nginx中worker進(jìn)程循環(huán)的實(shí)現(xiàn)示例
文章網(wǎng)址:http://weahome.cn/article/gsegdh.html

其他資訊

在線咨詢

微信咨詢

電話咨詢

028-86922220(工作日)

18980820575(7×24)

提交需求

返回頂部