真实的国产乱ⅩXXX66竹夫人,五月香六月婷婷激情综合,亚洲日本VA一区二区三区,亚洲精品一区二区三区麻豆

成都創(chuàng)新互聯(lián)網(wǎng)站制作重慶分公司

Node.js中網(wǎng)絡(luò)與流的示例分析

這篇文章給大家分享的是有關(guān)Node.js中網(wǎng)絡(luò)與流的示例分析的內(nèi)容。小編覺得挺實(shí)用的,因此分享給大家做個(gè)參考,一起跟隨小編過來看看吧。

成都創(chuàng)新互聯(lián)專注于企業(yè)全網(wǎng)營(yíng)銷推廣、網(wǎng)站重做改版、潁州網(wǎng)站定制設(shè)計(jì)、自適應(yīng)品牌網(wǎng)站建設(shè)、H5高端網(wǎng)站建設(shè)電子商務(wù)商城網(wǎng)站建設(shè)、集團(tuán)公司官網(wǎng)建設(shè)、外貿(mào)網(wǎng)站建設(shè)、高端網(wǎng)站制作、響應(yīng)式網(wǎng)頁設(shè)計(jì)等建站業(yè)務(wù),價(jià)格優(yōu)惠性價(jià)比高,為潁州等各大城市提供網(wǎng)站開發(fā)制作服務(wù)。

涉及的知識(shí)點(diǎn)

  • libuv 中網(wǎng)絡(luò)的實(shí)現(xiàn)

  • libuv 解決 accept (EMFILE錯(cuò)誤)

  • BSD 套接字

  • SOCKADDR_IN

  • UNIX 域協(xié)議使用! 在進(jìn)程間傳遞“文件描述符”

例子 tcp-echo-server/main.c

libuv 異步使用 BSD 套接字 的例子

libuv 中的網(wǎng)絡(luò)和直接使用 BSD 套接字接口沒有什么不同,有些事情更簡(jiǎn)單,都是無阻塞的,但概念都是一樣的。此外,libuv 還提供了一些實(shí)用的函數(shù)來抽象出那些煩人的、重復(fù)的、低級(jí)的任務(wù),比如使用BSD套接字結(jié)構(gòu)設(shè)置套接字、DNS查詢以及調(diào)整各種套接字參數(shù)。

int main() {
    loop = uv_default_loop();

    uv_tcp_t server;
    uv_tcp_init(loop, &server);

    uv_ip4_addr("0.0.0.0", DEFAULT_PORT, &addr);

    uv_tcp_bind(&server, (const struct sockaddr*)&addr, 0);
    int r = uv_listen((uv_stream_t*) &server, DEFAULT_BACKLOG, on_new_connection);
    if (r) {
        fprintf(stderr, "Listen error %s\n", uv_strerror(r));
        return 1;
    }
    return uv_run(loop, UV_RUN_DEFAULT);
}

void on_new_connection(uv_stream_t *server, int status) {
    if (status < 0) {
        fprintf(stderr, "New connection error %s\n", uv_strerror(status));
        // error!
        return;
    }

    uv_tcp_t *client = (uv_tcp_t*) malloc(sizeof(uv_tcp_t));
    uv_tcp_init(loop, client);
    if (uv_accept(server, (uv_stream_t*) client) == 0) {
        uv_read_start((uv_stream_t*) client, alloc_buffer, echo_read);
}

同步的例子

這是一個(gè)正常同步使用 BSD 套接字 的例子。

作為參照可以發(fā)現(xiàn)主要有如下幾步

  • 首先調(diào)用 socket() 為通訊創(chuàng)建一個(gè)端點(diǎn),為套接字返回一個(gè)文件描述符。

  • 接著調(diào)用 bind() 為一個(gè)套接字分配地址。當(dāng)使用 socket() 創(chuàng)建套接字后,只賦予其所使用的協(xié)議,并未分配地址。在接受其它主機(jī)的連接前,必須先調(diào)用 bind() 為套接字分配一個(gè)地址。

  • 當(dāng) socket 和一個(gè)地址綁定之后,再調(diào)用 listen() 函數(shù)會(huì)開始監(jiān)聽可能的連接請(qǐng)求。

  • 最后調(diào)用 accept, 當(dāng)應(yīng)用程序監(jiān)聽來自其他主機(jī)的面對(duì)數(shù)據(jù)流的連接時(shí),通過事件(比如Unix select()系統(tǒng)調(diào)用)通知它。必須用 accept()函數(shù)初始化連接。 accept() 為每個(gè)連接創(chuàng)立新的套接字并從監(jiān)聽隊(duì)列中移除這個(gè)連接。

int main(void)
  {
    struct sockaddr_in stSockAddr;
    int SocketFD = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP);
  
    if(-1 == SocketFD)
    {
      perror("can not create socket");
      exit(EXIT_FAILURE);
    }
  
    memset(&stSockAddr, 0, sizeof(struct sockaddr_in));
  
    stSockAddr.sin_family = AF_INET;
    stSockAddr.sin_port = htons(1100);
    stSockAddr.sin_addr.s_addr = INADDR_ANY;
  
    if(-1 == bind(SocketFD,(const struct sockaddr *)&stSockAddr, sizeof(struct sockaddr_in)))
    {
      perror("error bind failed");
      close(SocketFD);
      exit(EXIT_FAILURE);
    }
  
    if(-1 == listen(SocketFD, 10))
    {
      perror("error listen failed");
      close(SocketFD);
      exit(EXIT_FAILURE);
    }
  
    for(;;)
    {
      int ConnectFD = accept(SocketFD, NULL, NULL);
  
      if(0 > ConnectFD)
      {
        perror("error accept failed");
        close(SocketFD);
        exit(EXIT_FAILURE);
      }
  
     /* perform read write operations ... */
  
      shutdown(ConnectFD, SHUT_RDWR);
  
      close(ConnectFD);
    }

    close(SocketFD);
    return 0;
  }

uv_tcp_init

main > uv_tcp_init

1、對(duì) domain 進(jìn)行了驗(yàn)證, 需要是下面3種的一種

  • AF_INET 表示 IPv4 網(wǎng)絡(luò)協(xié)議

  • AF_INET6 表示 IPv6

  • AF_UNSPEC 表示適用于指定主機(jī)名和服務(wù)名且適合任何協(xié)議族的地址

2、tcp 也是一種流, 調(diào)用 uv__stream_init 對(duì)流數(shù)據(jù)進(jìn)行初始化

int uv_tcp_init(uv_loop_t* loop, uv_tcp_t* tcp) {
  return uv_tcp_init_ex(loop, tcp, AF_UNSPEC);
}

int uv_tcp_init_ex(uv_loop_t* loop, uv_tcp_t* tcp, unsigned int flags) {
  int domain;

  /* Use the lower 8 bits for the domain */
  domain = flags & 0xFF;
  if (domain != AF_INET && domain != AF_INET6 && domain != AF_UNSPEC)
    return UV_EINVAL;

  if (flags & ~0xFF)
    return UV_EINVAL;

  uv__stream_init(loop, (uv_stream_t*)tcp, UV_TCP);

  ...

  return 0;
}

uv__stream_init

main > uv_tcp_init > uv__stream_init

流的初始化函數(shù)使用的地方還是特別多的, 也特別重要。下述 i/o 的完整實(shí)現(xiàn)參考 【libuv 源碼學(xué)習(xí)筆記】線程池與i/o

1、對(duì)流會(huì)被調(diào)用的回調(diào)函數(shù)等進(jìn)行一個(gè)初始化

  • 如 read_cb 函數(shù), 在本例子中 on_new_connection > uv_read_start 函數(shù)就會(huì)真實(shí)的設(shè)置該 read_cb 為用戶傳入的參數(shù) echo_read, 其被調(diào)用時(shí)機(jī)是該 stream 上設(shè)置的 io_watcher.fd 有數(shù)據(jù)寫入時(shí), 在事件循環(huán)階段被 epoll 捕獲后。

  • alloc_cb 函數(shù)的調(diào)用過程同 read_cb, alloc 類型函數(shù)一般是設(shè)置當(dāng)前需要讀取的內(nèi)容長(zhǎng)度, 在流數(shù)據(jù)傳輸時(shí)通常首先會(huì)寫入本次傳輸數(shù)據(jù)的長(zhǎng)度, 然后是具體的內(nèi)容, 主要是為了接收方能夠合理的申請(qǐng)內(nèi)存進(jìn)行存儲(chǔ)。如 grpc, thread-loader 中都有詳細(xì)的應(yīng)用。

  • close_cb 函數(shù)被調(diào)用在 stream 數(shù)據(jù)結(jié)束時(shí)或者出錯(cuò)時(shí)。

  • connection_cb 函數(shù)如本例子 tcp 流, 當(dāng) accept 接收到新連接時(shí)被調(diào)用。本例子中即為 on_new_connection

  • connect_req 結(jié)構(gòu)主要用于 tcp 客戶端相關(guān)連接回調(diào)等數(shù)據(jù)的掛載使用。

  • shutdown_req 結(jié)構(gòu)主要用于流 destroy 時(shí)回調(diào)等數(shù)據(jù)的掛載使用。

  • accepted_fd 當(dāng) accept 接收到新連接時(shí), 存儲(chǔ) accept(SocketFD, NULL, NULL) 返回的 ConnectFD。

  • queued_fds 用于保存等待處理的連接, 其主要用于 node cluster 集群 的實(shí)現(xiàn)。

// queued_fds

1. 當(dāng)收到其他進(jìn)程通過 ipc 寫入的數(shù)據(jù)時(shí), 調(diào)用 uv__stream_recv_cmsg 函數(shù)
2. uv__stream_recv_cmsg 函數(shù)讀取到進(jìn)程傳遞過來的 fd 引用, 調(diào)用 uv__stream_queue_fd 函數(shù)保存。
3. queued_fds 被消費(fèi)主要在 src/stream_wrap.cc LibuvStreamWrap::OnUvRead > AcceptHandle 函數(shù)中。

2、其中專門為 loop->emfile_fd 通過 uv__open_cloexec 方法創(chuàng)建一個(gè)指向空文件(/dev/null)的 idlefd 文件描述符, 追蹤發(fā)現(xiàn)原來是解決 accept (EMFILE錯(cuò)誤), 下面我們講 uv__accept 的時(shí)候再細(xì)說這個(gè) loop->emfile_fd 的妙用。

accept處理連接時(shí),若出現(xiàn) EMFILE 錯(cuò)誤不進(jìn)行處理,則內(nèi)核間隔性嘗試連接,導(dǎo)致整個(gè)網(wǎng)絡(luò)設(shè)計(jì)程序崩潰

3、調(diào)用 uv__io_init 初始化的該 stream 的 i/o 觀察者的回調(diào)函數(shù)為 uv__stream_io

void uv__stream_init(uv_loop_t* loop,
                     uv_stream_t* stream,
                     uv_handle_type type) {
  int err;

  uv__handle_init(loop, (uv_handle_t*)stream, type);
  stream->read_cb = NULL;
  stream->alloc_cb = NULL;
  stream->close_cb = NULL;
  stream->connection_cb = NULL;
  stream->connect_req = NULL;
  stream->shutdown_req = NULL;
  stream->accepted_fd = -1;
  stream->queued_fds = NULL;
  stream->delayed_error = 0;
  QUEUE_INIT(&stream->write_queue);
  QUEUE_INIT(&stream->write_completed_queue);
  stream->write_queue_size = 0;

  if (loop->emfile_fd == -1) {
    err = uv__open_cloexec("/dev/null", O_RDONLY);
    if (err < 0)
        /* In the rare case that "/dev/null" isn't mounted open "/"
         * instead.
         */
        err = uv__open_cloexec("/", O_RDONLY);
    if (err >= 0)
      loop->emfile_fd = err;
  }

#if defined(__APPLE__)
  stream->select = NULL;
#endif /* defined(__APPLE_) */

  uv__io_init(&stream->io_watcher, uv__stream_io, -1);
}

uv__open_cloexec

main > uv_tcp_init > uv__stream_init > uv__open_cloexec

同步調(diào)用 open 方法拿到了 fd, 也許你會(huì)問為啥不像 【libuv 源碼學(xué)習(xí)筆記】線程池與i/o 中調(diào)用 uv_fs_open 異步獲取 fd, 其實(shí) libuv 中并不全部是異步的實(shí)現(xiàn), 比如當(dāng)前的例子啟動(dòng) tcp 服務(wù)前的一些初始化, 而不是用戶請(qǐng)求過程中發(fā)生的任務(wù), 同步也是能接受的。

int uv__open_cloexec(const char* path, int flags) {
#if defined(O_CLOEXEC)
  int fd;

  fd = open(path, flags | O_CLOEXEC);
  if (fd == -1)
    return UV__ERR(errno);

  return fd;
#else  /* O_CLOEXEC */
  int err;
  int fd;

  fd = open(path, flags);
  if (fd == -1)
    return UV__ERR(errno);

  err = uv__cloexec(fd, 1);
  if (err) {
    uv__close(fd);
    return err;
  }

  return fd;
#endif  /* O_CLOEXEC */
}

uv__stream_io

main > uv_tcp_init > uv__stream_init > uv__stream_io

雙工流的 i/o 觀察者回調(diào)函數(shù), 如調(diào)用的 stream->connect_req 函數(shù), 其值是例子中 uv_listen 函數(shù)的最后一個(gè)參數(shù) on_new_connection。

  • 當(dāng)發(fā)生 POLLIN | POLLERR | POLLHUP 事件時(shí): 該 fd 有可讀數(shù)據(jù)時(shí)調(diào)用 uv__read 函數(shù)

  • 當(dāng)發(fā)生 POLLOUT | POLLERR | POLLHUP 事件時(shí): 該 fd 有可讀數(shù)據(jù)時(shí)調(diào)用 uv__write 函數(shù)

static void uv__stream_io(uv_loop_t* loop, uv__io_t* w, unsigned int events) {
  uv_stream_t* stream;

  stream = container_of(w, uv_stream_t, io_watcher);

  assert(stream->type == UV_TCP ||
         stream->type == UV_NAMED_PIPE ||
         stream->type == UV_TTY);
  assert(!(stream->flags & UV_HANDLE_CLOSING));

  if (stream->connect_req) {
    uv__stream_connect(stream);
    return;
  }

  assert(uv__stream_fd(stream) >= 0);

  if (events & (POLLIN | POLLERR | POLLHUP))
    uv__read(stream);

  if (uv__stream_fd(stream) == -1)
    return;  /* read_cb closed stream. */

  if ((events & POLLHUP) &&
      (stream->flags & UV_HANDLE_READING) &&
      (stream->flags & UV_HANDLE_READ_PARTIAL) &&
      !(stream->flags & UV_HANDLE_READ_EOF)) {
    uv_buf_t buf = { NULL, 0 };
    uv__stream_eof(stream, &buf);
  }

  if (uv__stream_fd(stream) == -1)
    return;  /* read_cb closed stream. */

  if (events & (POLLOUT | POLLERR | POLLHUP)) {
    uv__write(stream);
    uv__write_callbacks(stream);

    /* Write queue drained. */
    if (QUEUE_EMPTY(&stream->write_queue))
      uv__drain(stream);
  }
}

uv_ip4_addr

main > uv_ip4_addr

uv_ip4_addr 用于將人類可讀的 IP 地址、端口對(duì)轉(zhuǎn)換為 BSD 套接字 API 所需的 sockaddr_in 結(jié)構(gòu)。

int uv_ip4_addr(const char* ip, int port, struct sockaddr_in* addr) {
  memset(addr, 0, sizeof(*addr));
  addr->sin_family = AF_INET;
  addr->sin_port = htons(port);
#ifdef SIN6_LEN
  addr->sin_len = sizeof(*addr);
#endif
  return uv_inet_pton(AF_INET, ip, &(addr->sin_addr.s_addr));
}

uv_tcp_bind

main > uv_tcp_bind

從 uv_ip4_addr 函數(shù)的實(shí)現(xiàn), 其實(shí)是在 addr 的 sin_family 上面設(shè)置值為 AF_INET, 但在 uv_tcp_bind 函數(shù)里面卻是從 addr 的 sa_family屬性上面取的值, 這讓 c 初學(xué)者的我又陷入了一陣思考 ...

sockaddr_in 和 sockaddr 是并列的結(jié)構(gòu),指向 sockaddr_in 的結(jié)構(gòu)體的指針也可以指向 sockaddr 的結(jié)構(gòu)體,并代替它。也就是說,你可以使用 sockaddr_in 建立你所需要的信息,然后用 memset 函數(shù)初始化就可以了memset((char*)&mysock,0,sizeof(mysock));//初始化

原來是這樣, 這里通過強(qiáng)制指針類型轉(zhuǎn)換 const struct sockaddr* addr 達(dá)到的目的, 函數(shù)的最后調(diào)用了 uv__tcp_bind 函數(shù)。

int uv_tcp_bind(uv_tcp_t* handle,
                const struct sockaddr* addr,
                unsigned int flags) {
  unsigned int addrlen;

  if (handle->type != UV_TCP)
    return UV_EINVAL;

  if (addr->sa_family == AF_INET)
    addrlen = sizeof(struct sockaddr_in);
  else if (addr->sa_family == AF_INET6)
    addrlen = sizeof(struct sockaddr_in6);
  else
    return UV_EINVAL;

  return uv__tcp_bind(handle, addr, addrlen, flags);
}

uv__tcp_bind

main > uv_tcp_bind > uv__tcp_bind

  • 調(diào)用 maybe_new_socket, 如果當(dāng)前未設(shè)置 socketfd, 則調(diào)用 new_socket 獲取

  • 調(diào)用 setsockopt 用于為指定的套接字設(shè)定一個(gè)特定的套接字選項(xiàng)

  • 調(diào)用 bind 為一個(gè)套接字分配地址。當(dāng)使用socket()創(chuàng)建套接字后,只賦予其所使用的協(xié)議,并未分配地址。

int uv__tcp_bind(uv_tcp_t* tcp,
                 const struct sockaddr* addr,
                 unsigned int addrlen,
                 unsigned int flags) {
  int err;
  int on;

  /* Cannot set IPv6-only mode on non-IPv6 socket. */
  if ((flags & UV_TCP_IPV6ONLY) && addr->sa_family != AF_INET6)
    return UV_EINVAL;

  err = maybe_new_socket(tcp, addr->sa_family, 0);
  if (err)
    return err;

  on = 1;
  if (setsockopt(tcp->io_watcher.fd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)))
    return UV__ERR(errno);

...

  errno = 0;
  if (bind(tcp->io_watcher.fd, addr, addrlen) && errno != EADDRINUSE) {
    if (errno == EAFNOSUPPORT)
      return UV_EINVAL;
    return UV__ERR(errno);
  }
...
}

new_socket

main > uv_tcp_bind > uv__tcp_bind > maybe_new_socket > new_socket

  • 通過 uv__socket 其本質(zhì)調(diào)用 socket 獲取到 sockfd

  • 調(diào)用 uv__stream_open 設(shè)置 stream i/o 觀察的 fd 為步驟1 拿到的 sockfd

static int new_socket(uv_tcp_t* handle, int domain, unsigned long flags) {
  struct sockaddr_storage saddr;
  socklen_t slen;
  int sockfd;
  int err;

  err = uv__socket(domain, SOCK_STREAM, 0);
  if (err < 0)
    return err;
  sockfd = err;

  err = uv__stream_open((uv_stream_t*) handle, sockfd, flags);
  
  ...

  return 0;
}

uv__stream_open

main > uv_tcp_bind > uv__tcp_bind > maybe_new_socket > new_socket > uv__stream_open

主要用于設(shè)置 stream->io_watcher.fd 為參數(shù)傳入的 fd。

int uv__stream_open(uv_stream_t* stream, int fd, int flags) {
#if defined(__APPLE__)
  int enable;
#endif

  if (!(stream->io_watcher.fd == -1 || stream->io_watcher.fd == fd))
    return UV_EBUSY;

  assert(fd >= 0);
  stream->flags |= flags;

  if (stream->type == UV_TCP) {
    if ((stream->flags & UV_HANDLE_TCP_NODELAY) && uv__tcp_nodelay(fd, 1))
      return UV__ERR(errno);

    /* TODO Use delay the user passed in. */
    if ((stream->flags & UV_HANDLE_TCP_KEEPALIVE) &&
        uv__tcp_keepalive(fd, 1, 60)) {
      return UV__ERR(errno);
    }
  }

#if defined(__APPLE__)
  enable = 1;
  if (setsockopt(fd, SOL_SOCKET, SO_OOBINLINE, &enable, sizeof(enable)) &&
      errno != ENOTSOCK &&
      errno != EINVAL) {
    return UV__ERR(errno);
  }
#endif

  stream->io_watcher.fd = fd;

  return 0;
}

uv_listen

main > uv_listen

主要調(diào)用了 uv_tcp_listen 函數(shù)。

int uv_listen(uv_stream_t* stream, int backlog, uv_connection_cb cb) {
  int err;

  err = ERROR_INVALID_PARAMETER;
  switch (stream->type) {
    case UV_TCP:
      err = uv_tcp_listen((uv_tcp_t*)stream, backlog, cb);
      break;
    case UV_NAMED_PIPE:
      err = uv_pipe_listen((uv_pipe_t*)stream, backlog, cb);
      break;
    default:
      assert(0);
  }

  return uv_translate_sys_error(err);
}

uv_tcp_listen

main > uv_listen > uv_tcp_listen

  • 調(diào)用 listen 開始監(jiān)聽可能的連接請(qǐng)求

  • 掛載例子中傳入的回調(diào) on_new_connection

  • 暴力改寫 i/o 觀察者的回調(diào), 在上面的 uv__stream_init 函數(shù)中, 通過 uv__io_init 設(shè)置了 i/o 觀察者的回調(diào)為 uv__stream_io, 作為普通的雙工流是適用的, 這里 tcp 流直接通過 tcp->io_watcher.cb = uv__server_io 賦值語句設(shè)置 i/o 觀察者回調(diào)為 uv__server_io

  • 調(diào)用 uv__io_start 注冊(cè) i/o 觀察者, 開始監(jiān)聽工作。

int uv_tcp_listen(uv_tcp_t* tcp, int backlog, uv_connection_cb cb) {
  ...

  if (listen(tcp->io_watcher.fd, backlog))
    return UV__ERR(errno);

  tcp->connection_cb = cb;
  tcp->flags |= UV_HANDLE_BOUND;

  /* Start listening for connections. */
  tcp->io_watcher.cb = uv__server_io;
  uv__io_start(tcp->loop, &tcp->io_watcher, POLLIN);

  return 0;
}

uv__server_io

main > uv_listen > uv_tcp_listen > uv__server_io

tcp 流的 i/o 觀察者回調(diào)函數(shù)

  • 調(diào)用 uv__accept, 拿到該連接的 ConnectFD

  • 此時(shí)如果出現(xiàn)了上面 uv__stream_init 時(shí)說的 accept (EMFILE錯(cuò)誤), 則調(diào)用 uv__emfile_trick 函數(shù)

  • 把步驟1拿到的 ConnectFD 掛載在了 stream->accepted_fd 上面

  • 調(diào)用例子中傳入的回調(diào) on_new_connection

void uv__server_io(uv_loop_t* loop, uv__io_t* w, unsigned int events) {
  ...
  
  while (uv__stream_fd(stream) != -1) {
    assert(stream->accepted_fd == -1);

    err = uv__accept(uv__stream_fd(stream));
    if (err < 0) {
      if (err == UV_EAGAIN || err == UV__ERR(EWOULDBLOCK))
        return;  /* Not an error. */

      if (err == UV_ECONNABORTED)
        continue;  /* Ignore. Nothing we can do about that. */

      if (err == UV_EMFILE || err == UV_ENFILE) {
        err = uv__emfile_trick(loop, uv__stream_fd(stream));
        if (err == UV_EAGAIN || err == UV__ERR(EWOULDBLOCK))
          break;
      }

      stream->connection_cb(stream, err);
      continue;
    }

    UV_DEC_BACKLOG(w)
    stream->accepted_fd = err;
    stream->connection_cb(stream, 0);

    ...
}

uv__emfile_trick

main > uv_listen > uv_tcp_listen > uv__server_io > uv__emfile_trick

在上面的 uv__stream_init 函數(shù)中, 我們發(fā)現(xiàn) loop 的 emfile_fd 屬性上通過 uv__open_cloexec 方法創(chuàng)建一個(gè)指向空文件(/dev/null)的 idlefd 文件描述符。

當(dāng)出現(xiàn) accept (EMFILE錯(cuò)誤)即文件描述符用盡時(shí)的錯(cuò)誤時(shí)

首先將 loop->emfile_fd 文件描述符, 使其能 accept 新連接, 然后我們新連接將其關(guān)閉,以使其低于EMFILE的限制。接下來,我們接受所有等待的連接并關(guān)閉它們以向客戶發(fā)出信號(hào),告訴他們我們已經(jīng)超載了--我們確實(shí)超載了,但是我們?nèi)栽诶^續(xù)工作。

static int uv__emfile_trick(uv_loop_t* loop, int accept_fd) {
  int err;
  int emfile_fd;

  if (loop->emfile_fd == -1)
    return UV_EMFILE;

  uv__close(loop->emfile_fd);
  loop->emfile_fd = -1;

  do {
    err = uv__accept(accept_fd);
    if (err >= 0)
      uv__close(err);
  } while (err >= 0 || err == UV_EINTR);

  emfile_fd = uv__open_cloexec("/", O_RDONLY);
  if (emfile_fd >= 0)
    loop->emfile_fd = emfile_fd;

  return err;
}

on_new_connection

當(dāng)收到一個(gè)新連接, 例子中的 on_new_connection 函數(shù)被調(diào)用

  • 通過 uv_tcp_init 初始化了一個(gè) tcp 客戶端流

  • 調(diào)用 uv_accept 函數(shù)

void on_new_connection(uv_stream_t *server, int status) {
    if (status < 0) {
        fprintf(stderr, "New connection error %s\n", uv_strerror(status));
        // error!
        return;
    }

    uv_tcp_t *client = (uv_tcp_t*) malloc(sizeof(uv_tcp_t));
    uv_tcp_init(loop, client);
    if (uv_accept(server, (uv_stream_t*) client) == 0) {
        uv_read_start((uv_stream_t*) client, alloc_buffer, echo_read);
}

uv_accept

on_new_connection > uv_accept

根據(jù)不同的協(xié)議調(diào)用不同的方法, 該例子 tcp 調(diào)用 uv__stream_open 方法

uv__stream_open 設(shè)置給初始化完成的 client 流設(shè)置了 i/o 觀察者的 fd。該 fd 即是 uv__server_io 中提到的 ConnectFD 。

int uv_accept(uv_stream_t* server, uv_stream_t* client) {
  int err;

  assert(server->loop == client->loop);

  if (server->accepted_fd == -1)
    return UV_EAGAIN;

  switch (client->type) {
    case UV_NAMED_PIPE:
    case UV_TCP:
      err = uv__stream_open(client,
                            server->accepted_fd,
                            UV_HANDLE_READABLE | UV_HANDLE_WRITABLE);
      if (err) {
        /* TODO handle error */
        uv__close(server->accepted_fd);
        goto done;
      }
      break;

    case UV_UDP:
      err = uv_udp_open((uv_udp_t*) client, server->accepted_fd);
      if (err) {
        uv__close(server->accepted_fd);
        goto done;
      }
      break;

    default:
      return UV_EINVAL;
  }

  client->flags |= UV_HANDLE_BOUND;

done:
  /* Process queued fds */
  if (server->queued_fds != NULL) {
    uv__stream_queued_fds_t* queued_fds;

    queued_fds = server->queued_fds;

    /* Read first */
    server->accepted_fd = queued_fds->fds[0];

    /* All read, free */
    assert(queued_fds->offset > 0);
    if (--queued_fds->offset == 0) {
      uv__free(queued_fds);
      server->queued_fds = NULL;
    } else {
      /* Shift rest */
      memmove(queued_fds->fds,
              queued_fds->fds + 1,
              queued_fds->offset * sizeof(*queued_fds->fds));
    }
  } else {
    server->accepted_fd = -1;
    if (err == 0)
      uv__io_start(server->loop, &server->io_watcher, POLLIN);
  }
  return err;
}

uv_read_start

on_new_connection > uv_read_start

開啟一個(gè)流的監(jiān)聽工作

  • 掛載回調(diào)函數(shù) read_cb 為例子中的 echo_read, 當(dāng)流有數(shù)據(jù)寫入時(shí)被調(diào)用

  • 掛載回調(diào)函數(shù) alloc_cb 為例子中的 alloc_buffer

  • 調(diào)用 uv__io_start 函數(shù), 這可是老朋友了, 通常用在 uv__io_init 初始化 i/o 觀察者后面, 用于注冊(cè) i/o 觀察者。

uv_read_start 主要是調(diào)用了 uv__read_start 函數(shù)。開始了普通流的 i/o 過程。

  • 初始化 i/o 觀察者在 uv_tcp_init > uv_tcp_init_ex > uv__stream_init > uv__io_init 設(shè)置其觀察者回調(diào)函數(shù)為 uv__stream_io

  • 注冊(cè) i/o 觀察者為 uv__io_start 開始監(jiān)聽工作。

int uv__read_start(uv_stream_t* stream,
                   uv_alloc_cb alloc_cb,
                   uv_read_cb read_cb) {
  assert(stream->type == UV_TCP || stream->type == UV_NAMED_PIPE ||
      stream->type == UV_TTY);

  /* The UV_HANDLE_READING flag is irrelevant of the state of the tcp - it just
   * expresses the desired state of the user.
   */
  stream->flags |= UV_HANDLE_READING;

  /* TODO: try to do the read inline? */
  /* TODO: keep track of tcp state. If we've gotten a EOF then we should
   * not start the IO watcher.
   */
  assert(uv__stream_fd(stream) >= 0);
  assert(alloc_cb);

  stream->read_cb = read_cb;
  stream->alloc_cb = alloc_cb;

  uv__io_start(stream->loop, &stream->io_watcher, POLLIN);
  uv__handle_start(stream);
  uv__stream_osx_interrupt_select(stream);

  return 0;
}

感謝各位的閱讀!關(guān)于“Node.js中網(wǎng)絡(luò)與流的示例分析”這篇文章就分享到這里了,希望以上內(nèi)容可以對(duì)大家有一定的幫助,讓大家可以學(xué)到更多知識(shí),如果覺得文章不錯(cuò),可以把它分享出去讓更多的人看到吧!


網(wǎng)頁標(biāo)題:Node.js中網(wǎng)絡(luò)與流的示例分析
瀏覽路徑:http://weahome.cn/article/ihposc.html

其他資訊

在線咨詢

微信咨詢

電話咨詢

028-86922220(工作日)

18980820575(7×24)

提交需求

返回頂部