當需要同時監(jiān)聽多個文件描述符時,就需要I/O復用函數(shù),I/O復用函數(shù)有select、poll、epoll,今天主要使用poll函數(shù)。
成都創(chuàng)新互聯(lián)是由多位在大型網(wǎng)絡公司、廣告設計公司的優(yōu)秀設計人員和策劃人員組成的一個具有豐富經(jīng)驗的團隊,其中包括網(wǎng)站策劃、網(wǎng)頁美工、網(wǎng)站程序員、網(wǎng)頁設計師、平面廣告設計師、網(wǎng)絡營銷人員及形象策劃。承接:成都網(wǎng)站設計、成都做網(wǎng)站、網(wǎng)站改版、網(wǎng)頁設計制作、網(wǎng)站建設與維護、網(wǎng)絡推廣、數(shù)據(jù)庫開發(fā),以高性價比制作企業(yè)網(wǎng)站、行業(yè)門戶平臺等全方位的服務。
poll()接受一個指向結(jié)構(gòu)'struct pollfd'列表的指針,其中包括了你想測試的文件描述符和事件。事件由一個在結(jié)構(gòu)中事件域的比特掩碼確定。當前的結(jié)構(gòu)在調(diào)用后將被填寫并在事件發(fā)生后返回。
函數(shù)原型:
#includeint?poll(struct?pollfd?*fds,?nfds_t?nfds,?int?timeout); struct?pollfd{ ????int?fd;?????????/*file?descriptor*/ ????short?events;???/*requested?events*/ ????short?revents;??/*returned?events*/ }
函數(shù)參數(shù):fds是要監(jiān)聽的fd的數(shù)組,nfds是數(shù)組個數(shù),timeout 超時時間 -1是阻塞;
函數(shù)說用:通過傳入的events的類型去判斷返回的類型是否一致,如果一致就該干事了。
events:
常量 | 說明 |
POLLIN | 普通或優(yōu)先級帶數(shù)據(jù)可讀 |
POLLRDNORM | 普通數(shù)據(jù)可讀 |
POLLRDBAND | 優(yōu)先級帶數(shù)據(jù)可讀 |
POLLPRI | 高優(yōu)先級數(shù)據(jù)可讀 |
POLLOUT | 普通數(shù)據(jù)可寫 |
POLLWRNORM | 普通數(shù)據(jù)可寫 |
POLLWRBAND | 優(yōu)先級帶數(shù)據(jù)可寫 |
POLLERR | 發(fā)生錯誤 |
POLLHUP | 發(fā)生掛起 |
POLLNVAL | 描述字不是一個打開的文件 |
接下來是是一個服務器監(jiān)聽兩個socket的例子:
服務器代碼:
#include#include ?? #include ?? #include ?? #include ?? #include ?? #include ? #include #include #include #define?MAX_POLLFD_NUM?2 //#define?SERVER_CONN_IP1?"1.1.1.1" //#define?SERVER_CONN_IP2?"1.1.1.1" #define?DEFAULT_PORT?8000 #define?BUFF_MAX?1024 typedef?void(*Server_Rrocess_Thread_Fun)(void?*arg); /*thread?process?function?define?*/ typedef?struct?sever_thread_fun{ Server_Rrocess_Thread_Fun?Server_Process_Client_Conn1; Server_Rrocess_Thread_Fun?Server_Process_Client_Conn2; }Poll_Server_Process_Clinet_FUN_t; /*connect?1?thread?function*/ void?*Poll_Conn1_Process(void?*arg) { unsigned?char?ucBufArr[BUFF_MAX]?=?{0}; unsigned?long?ulSize?=?0; int?connect_fd?; ????printf("come?pthread?conn1?fun?to?proess?conn1\n"); if(?(connect_fd?=?accept(?*(int*)arg,?(struct?sockaddr*)NULL,?NULL))?==?-1){?? printf("accept?socket?error:?%s(errno:?%d)",strerror(errno),errno);?? ????} ulSize?=?recv(connect_fd,?ucBufArr,?BUFF_MAX,?0);?? if(send(connect_fd,?"Hello,you?are?connected?1!\n",?26,0)?==?-1)?? perror("send?error");?? ucBufArr[ulSize]?=?'\0';?? ????//sleep(20); printf("recv?msg?from?client:?%s\n",?ucBufArr);?? close(connect_fd);?? pthread_exit((void*)1); } /*connect?1?thread?function*/ void?*Poll_Conn2_Process(void?*arg) { unsigned?char?ucBufArr[BUFF_MAX]?=?{0}; unsigned?long?ulSize?=?0; int?connect_fd?; ????printf("come?pthread?conn2?fun?to?proess?conn2\n"); if(?(connect_fd?=?accept(?*(int*)arg,?(struct?sockaddr*)NULL,?NULL))?==?-1){?? printf("accept?socket?error:?%s(errno:?%d)",strerror(errno),errno);?? ????} ulSize?=?recv(connect_fd,?ucBufArr,?BUFF_MAX,?0);?? if(send(connect_fd,?"Hello,you?are?connected?2!\n",?26,0)?==?-1)?? perror("send?error");?? ucBufArr[ulSize]?=?'\0';?? ???//?sleep(20); printf("recv?msg?from?client:?%s\n",?ucBufArr);?? close(connect_fd);?? pthread_exit((void*)2); } int?main() { ????int?poll_ret?=?0; pthread_t?thread_conn1; pthread_t?thread_conn2; ????/*socket?var*/ int?server_socket_fd_conn1; int?server_socket_fd_conn2; struct?sockaddr_in?servaddr_conn1; struct?sockaddr_in?serveraddr_conn2; /*poll?var*/ ????struct?pollfd?pollfd_arr[MAX_POLLFD_NUM]; /*init?thread?fun*/ Poll_Server_Process_Clinet_FUN_t?server_conn_handel; server_conn_handel.Server_Process_Client_Conn1?=?Poll_Conn1_Process; server_conn_handel.Server_Process_Client_Conn2?=?Poll_Conn2_Process; /*create?two?server?socket*/ if(?(server_socket_fd_conn1?=?socket(AF_INET,?SOCK_STREAM,?0))?==?-1?){?? ????printf("create?socket?conn1?error:?%s(errno:?%d)\n",strerror(errno),errno);?? ????exit(0);?? ????}?? if(?(server_socket_fd_conn2?=?socket(AF_INET,?SOCK_STREAM,?0))?==?-1?){?? ????printf("create?socket?conn2?error:?%s(errno:?%d)\n",strerror(errno),errno);?? ????exit(0);?? ????}?? ????/*init?socket?1?for?conn1*/?? ????memset(&servaddr_conn1,?0,?sizeof(servaddr_conn1));?? ????servaddr_conn1.sin_family?=?AF_INET;?? ????servaddr_conn1.sin_addr.s_addr?=?htonl(INADDR_ANY);? ????servaddr_conn1.sin_port?=?htons(DEFAULT_PORT);? // if(?inet_pton(AF_INET,?SERVER_CONN_IP1,?&servaddr_conn1.sin_addr.s_addr)?<=?0){?? ?//???printf("inet_pton?error?for?%s\n",SERVER_CONN_IP1);?? ??//??exit(0);?? ???//?}?? /*init?socket?2?for?conn2*/?? memset(&serveraddr_conn2,?0,?sizeof(serveraddr_conn2));?? ????serveraddr_conn2.sin_family?=?AF_INET;?? ????serveraddr_conn2.sin_addr.s_addr?=?htonl(INADDR_ANY);? ????serveraddr_conn2.sin_port?=?htons(DEFAULT_PORT+1);? // if(?inet_pton(AF_INET,?SERVER_CONN_IP2,?&serveraddr_conn2.sin_addr.s_addr)?<=?0){?? ?//???printf("inet_pton?error?for?%s\n",SERVER_CONN_IP2);?? ??//??exit(0);?? ???//?}?? ?? ????/*bind?connect?1?socket*/ ????if(?bind(server_socket_fd_conn1,?(struct?sockaddr*)&servaddr_conn1,?sizeof(servaddr_conn1))?==?-1){?? ????printf("bind?socket?error1:?%s(errno:?%d)\n",strerror(errno),errno);?? ????exit(0);?? ????}?? /*bind?connect?2?socket*/ if(?bind(server_socket_fd_conn2,?(struct?sockaddr*)&serveraddr_conn2,?sizeof(serveraddr_conn2))?==?-1){?? ????printf("bind?socket?error2:?%s(errno:?%d)\n",strerror(errno),errno);?? ????exit(0);?? ????}?? ????/*listen?connect?1*/?? ????if(?listen(server_socket_fd_conn1,?10)?==?-1){?? ????printf("listen?socket?error:?%s(errno:?%d)\n",strerror(errno),errno);?? ????exit(0);?? ????}?? /*listen?connect?2*/?? ????if(?listen(server_socket_fd_conn2,?10)?==?-1){?? ????printf("listen?socket?error:?%s(errno:?%d)\n",strerror(errno),errno);?? ????exit(0);?? ????}?? pollfd_arr[0].events?=?POLLRDNORM;/*only?read?event*/ pollfd_arr[0].fd?=?server_socket_fd_conn1; pollfd_arr[1].events?=?POLLRDNORM;/*only?read?event*/ pollfd_arr[1].fd?=?server_socket_fd_conn2; ????while(1) { poll_ret?=?poll(?pollfd_arr,?MAX_POLLFD_NUM,-1); if(?pollfd_arr[0].revents?&?POLLRDNORM?) { /*connect?1?process?task*/ pthread_create(?&thread_conn1,?NULL,?server_conn_handel.Server_Process_Client_Conn1,?(void?*)(&pollfd_arr[0].fd)); } if(?pollfd_arr[1].revents?&?POLLRDNORM?) { /*connect?2?process?task*/ pthread_create(?&thread_conn2,?NULL,?server_conn_handel.Server_Process_Client_Conn2,?(void?*)(&pollfd_arr[1].fd)); } sleep(1); } return?0; }
客戶端代碼:
#include?? #include ?? #include ?? #include ?? #include ?? #include ?? #include ?? ?? #define?BUFF_MAX?1024 #define?DEFAULT_PORT?8000 int?main(int?argc,?char**?argv)?? {?? ????int????sockfd,?n,rec_len;?? ????char????recvline[BUFF_MAX],?sendline[BUFF_MAX];?? ????char????buf[BUFF_MAX];?? ????struct?sockaddr_in????servaddr;?? ?? ?? ????if(?argc?!=?2){?? ????printf("usage:?./client? \n");?? ????exit(0);?? ????}?? ?? ????/*create?socket*/ ????if(?(sockfd?=?socket(AF_INET,?SOCK_STREAM,?0))?0){?? ????printf("create?socket?error:?%s(errno:?%d)\n",?strerror(errno),errno);?? ????exit(0);?? ????}?? ????memset(&servaddr,?0,?sizeof(servaddr));?? ????servaddr.sin_family?=?AF_INET;?? ????servaddr.sin_port?=?htons(DEFAULT_PORT/*+1/);/*DEFAULT_PORT?is?connect?1?to?server,DEFAULT_PORT+1?is?connect?2?to?server*/?? /*string?ip?to?int?ip*/ ????if(?inet_pton(AF_INET,?argv[1],?&servaddr.sin_addr)?<=?0){?? ????printf("inet_pton?error?for?%s\n",argv[1]);?? ????exit(0);?? ????}?? ????/*connect?server*/ ????if(?connect(sockfd,?(struct?sockaddr*)&servaddr,?sizeof(servaddr))?0){?? ????printf("connect?error:?%s(errno:?%d)\n",strerror(errno),errno);?? ????exit(0);?? ????}?? ????printf("send?msg?to?server:?\n");?? ????fgets(sendline,?BUFF_MAX,?stdin);?? /*write?data?to?sockfd*/ ????if(?send(sockfd,?sendline,?strlen(sendline),?0)?0)?? ????{?? ????printf("send?msg?error:?%s(errno:?%d)\n",?strerror(errno),?errno);?? ????exit(0);?? ????}?? /*read?socket?data*/ ????if((rec_len?=?recv(sockfd,?buf,?BUFF_MAX,0))?==?-1)?{?? ???????perror("recv?error");?? ???????exit(1);?? ????}?? ????buf[rec_len]??=?'\0';?? ????printf("Received?:?%s?\n",buf);?? /*colse?socket*/ ????close(sockfd);?? ????exit(0);?? }
解釋:服務器端監(jiān)聽 ip 127.0.0.1 port 8000和ip 127.0.0.1 port 8001
結(jié)果演示: