? 在實現(xiàn)http服務器之前,我們先需要一個可以接收數(shù)據(jù)和發(fā)送數(shù)據(jù)的tcp服務器。網(wǎng)絡連接上消息處理可以分為兩個階段:1. 等待消息準備好??2. 消息處理。
創(chuàng)新互聯(lián)專注于企業(yè)營銷型網(wǎng)站建設、網(wǎng)站重做改版、泉州網(wǎng)站定制設計、自適應品牌網(wǎng)站建設、H5技術、商城開發(fā)、集團公司官網(wǎng)建設、成都外貿(mào)網(wǎng)站制作、高端網(wǎng)站制作、響應式網(wǎng)頁設計等建站業(yè)務,價格優(yōu)惠性價比高,為泉州等各大城市提供網(wǎng)站開發(fā)制作服務。? 對于高并發(fā)tcp服務器來說,將上述兩個階段分開是最好的選擇。等待消息如何做到?就引出了I/O多路復用,使用一個I/O復用來監(jiān)督多個網(wǎng)絡連接,當網(wǎng)絡連接上出現(xiàn)可用事件時,則I/O多路復用可返回相應的連接。
? 對于一個I/O復用來說,需要關注的是三種事件:1. I/O事件 2. 定時器事件 3. 信號。
二. reactor模型? 對于Linux來說,底層提供了三種I/O復用,分別是:1. select 2. poll 3. epoll。其中epoll是一個高效的選擇,但是epoll太底層,它只是一個Linux的系統(tǒng)調(diào)用,需要通過某種事件處理機制進行進一步的封裝。
? reactor作為一種高效的事件處理模型,對epoll系統(tǒng)調(diào)用進行了進一步的封裝。在普通的事件處理機制中,首先程序調(diào)用某個函數(shù),然后函數(shù)執(zhí)行,程序等待,當函數(shù)執(zhí)行完畢后函數(shù)將結(jié)果和控制權返回給程序,最后程序繼續(xù)處理。與普通函數(shù)不同的是,reactor模式中,并不是主動取調(diào)用某個API完成處理,而是相反。對于事件處理流程進行了逆置,應用程序需要提供相應的接口并且注冊到reactor上,如果相應的事件發(fā)生,reactor將主動調(diào)用應用程序注冊的接口,這些接口又被稱為“回調(diào)函數(shù)”。reactor模型分為以下幾個部分:
事件源(handle):由操作系統(tǒng)提供,用于識別每一個事件,如Socket描述符、文件描述符等。在服務端系統(tǒng)中用一個整數(shù)表示。該事件可能來自外部,如來自客戶端的連接請求、數(shù)據(jù)等。也可能來自內(nèi)部,如定時器事件。
事件反應器(reactor):定義和應用程序控制事件調(diào)度,以及應用程序注冊、刪除事件處理器和相關描述符相關的接口。它是事件處理器的調(diào)度核心,使用事件分離器來等待事件的發(fā)生。一旦事件發(fā)生,反應器先是分離每個事件,然后調(diào)度具體事件的事件處理器中的回調(diào)函數(shù)處理事件。
事件分離器(demultiplexer):是一個有操作系統(tǒng)提供的I/O復用函數(shù),在此我們選用epoll。用來等待一個或多個事件的發(fā)生。調(diào)用者將會被阻塞,直到分離器分離的描述符集上有事件發(fā)生。
事件處理器(even handler):事件處理程序提供了一組接口,每個接口對應了一種類型的事件,供reactor在相應的事件發(fā)生時調(diào)用,執(zhí)行相應的事件處理。一般每個具體的事件處理器總是綁定一個有效的描述符句柄,用來識別事件和服務。
三. 基于reactor模型tcp服務器的實現(xiàn)(單線程)#include#include#include#include#include#include#include
#include#define BUFFER_LENGTH 2048
#define MAX_EPOLLSIZE 1024
typedef int (*HTTPCALLBACK)(int, int, void *);
struct _http_base;
struct _http_event;
struct _http_eventblock;
typedef struct _http_base
{
int _epoll_fd; // epoll句柄
struct _http_eventblock *_block; // 存儲事件塊鏈
int _block_cnt; // 塊鏈塊數(shù)
} http_base;
typedef struct _http_event
{
int _fd; // 事件句柄
int _event; // 事件
HTTPCALLBACK _callback; // 對應回調(diào)函數(shù)
void *_arg; // 回調(diào)函數(shù)參數(shù)
struct _http_base *_base; // 所屬base
char _send_buffer[BUFFER_LENGTH]; // 發(fā)送緩沖區(qū)
int _send_length; // 待發(fā)送字節(jié)數(shù)
char _receive_buffer[BUFFER_LENGTH]; // 接收緩沖區(qū)
int _receive_length; // 接收數(shù)據(jù)長度
int _status; // 當前事件存在狀態(tài)(0-epoll不管理狀態(tài) 1-epoll管理狀態(tài))
int _is_listenfd; // 是否為監(jiān)聽套接字
} http_event;
typedef struct _http_eventblock
{
struct _http_event *events;
struct _http_eventblock *next;
} http_eventblock;
// 分配一個http_eventblock
http_eventblock *http_eventblock_new();
// 初始化http_base
http_base *http_base_new();
// 銷毀http_base
void http_base_free(http_base *base);
// 事件循環(huán)
void dispatch(http_base *base);
// 增加http_eventblock塊
int http_eventblock_alloc(http_base *base);
// 返回fd對應的http_event地址
struct _http_event *http_eventblock_index(http_base *base, int fd);
// 設置http_event
int http_event_set(http_base *base, http_event *h_event, int fd, int event, int is_listenfd, HTTPCALLBACK callback, void *arg);
// 重新設置http_event
int http_event_reset(http_base *base, http_event *h_event, int fd, int event, int is_listenfd, HTTPCALLBACK callback, void *arg);
// http_event加入epoll管理
int http_event_add(http_event *hpev);
// http_event修改epoll管理
int http_event_mod(http_event *hpev);
// http_event刪除epoll管理
int http_event_del(http_event *hpev);
// 分配一個http_eventblock塊(內(nèi)部函數(shù))
http_eventblock *http_eventblock_new()
{
// 分配events
http_event *new_events = malloc(sizeof(http_event) * MAX_EPOLLSIZE);
if (!new_events)
{
printf("create events in %s err %s\n", __func__, strerror(errno));
return NULL;
}
memset(new_events, 0, sizeof(http_event) * MAX_EPOLLSIZE);
// 分配block塊
http_eventblock *new_block = malloc(sizeof(http_eventblock));
if (!new_block)
{
printf("create block in %s err %s\n", __func__, strerror(errno));
free(new_events);
return NULL;
}
memset(new_block, 0, sizeof(http_eventblock));
// 整合
new_block->events = new_events;
new_block->next = NULL;
return new_block;
}
http_base *http_base_new()
{
// 初始化base
http_base *base = malloc(sizeof(http_base));
if (!base)
{
printf("create base in %s err %s\n", __func__, strerror(errno));
return NULL;
}
memset(base, 0, sizeof(http_base));
// 初始化epoll_fd
base->_epoll_fd = epoll_create(1);
if (base->_epoll_fd<= 0)
{
printf("create epoll in %s err %s\n", __func__, strerror(errno));
free(base);
return NULL;
}
// 初始化塊
base->_block = http_eventblock_new();
if (!base->_block)
{
printf("create block in %s err %s\n", __func__, strerror(errno));
close(base->_epoll_fd);
free(base);
return NULL;
}
// 初始化塊數(shù)
base->_block_cnt = 1;
return base;
}
void http_base_free(http_base *base)
{
// 銷毀epoll_fd;
close(base->_epoll_fd);
// 銷毀httpevent_block
http_eventblock *block = base->_block;
while (block)
{
http_eventblock *next_block = block->next;
free(block->events);
free(block);
base->_block_cnt--;
block = next_block;
}
// 銷毀base
free(base);
}
void dispatch(http_base *base)
{
if (!base)
{
return;
}
if (!base->_block)
{
return;
}
if (base->_epoll_fd<= 0)
{
return;
}
struct epoll_event events[MAX_EPOLLSIZE];
while (1)
{
int n_ready = epoll_wait(base->_epoll_fd, events, MAX_EPOLLSIZE, -1);
for (int i = 0; i< n_ready; ++i)
{
http_event *hpev = events[i].data.ptr;
if (hpev->_is_listenfd)
{
hpev->_callback(hpev->_fd, events[i].events, hpev->_arg);
}
else if (hpev->_event & EPOLLIN)
{
if (!hpev->_callback(hpev->_fd, events[i].events, hpev->_arg))
{
http_event_del(hpev);
close(hpev->_fd);
}
}
else if (hpev->_event & EPOLLOUT)
{
if (!hpev->_callback(hpev->_fd, events[i].events, hpev->_arg))
{
http_event_del(hpev);
close(hpev->_fd);
}
}
else if (hpev->_event & (EPOLLERR | EPOLLRDHUP | EPOLLHUP))
{
http_event_del(hpev);
close(hpev->_fd);
}
else
{
}
}
}
}
int http_eventblock_alloc(http_base *base)
{
if (!base)
{
printf("!base in %s err %s\n", __func__, strerror(errno));
return 0;
}
if (!base->_block)
{
printf("!block in %s err %s\n", __func__, strerror(errno));
return 0;
}
http_eventblock *block = base->_block;
while (block->next)
{
block = block->next;
}
http_eventblock *new_block = http_eventblock_new();
if (!new_block)
{
return 0;
}
block->next = new_block;
base->_block_cnt++;
return 1;
}
struct _http_event *http_eventblock_index(http_base *base, int fd)
{
int block_index = fd / MAX_EPOLLSIZE;
// 塊不夠分配
while (base->_block_cnt<= block_index)
{
http_eventblock_alloc(base);
}
// 尋找塊
http_eventblock *block = base->_block;
while (block_index)
{
block = block->next;
block_index--;
}
return &(block->events[fd % MAX_EPOLLSIZE]);
}
int http_event_set(http_base *base, http_event *hpev, int fd, int event, int is_listenfd, HTTPCALLBACK callback, void *arg)
{
if (!base)
{
return 0;
}
if (!hpev)
{
return 0;
}
hpev->_base = base;
hpev->_fd = fd;
hpev->_event = event;
hpev->_is_listenfd = is_listenfd;
hpev->_callback = callback;
hpev->_arg = arg;
hpev->_status = 0;
memset(hpev->_receive_buffer, 0, BUFFER_LENGTH);
memset(hpev->_send_buffer, 0, BUFFER_LENGTH);
hpev->_receive_length = 0;
hpev->_send_length = 0;
return 1;
}
int http_event_reset(http_base *base, http_event *hpev, int fd, int event, int is_listenfd, HTTPCALLBACK callback, void *arg)
{
if (!base)
{
return 0;
}
if (!hpev)
{
return 0;
}
hpev->_base = base;
hpev->_fd = fd;
hpev->_event = event;
hpev->_is_listenfd = is_listenfd;
hpev->_callback = callback;
hpev->_arg = arg;
return 1;
}
int http_event_add(http_event *hpev)
{
if (!hpev)
{
printf("event in %s err %s\n", __func__, strerror(errno));
return 0;
}
if (hpev->_status == 1)
{
printf("event in %s exist %s\n", __func__, strerror(errno));
return 0;
}
hpev->_status = 1;
struct epoll_event ev = {0, {0}};
ev.data.ptr = hpev;
ev.events = hpev->_event;
http_base *base = hpev->_base;
if (epoll_ctl(base->_epoll_fd, EPOLL_CTL_ADD, hpev->_fd, &ev)< 0)
{
printf("event add failed [fd=%d], events[%d]\n", hpev->_fd, hpev->_event);
return 0;
}
return 1;
}
int http_event_mod(http_event *hpev)
{
if (!hpev)
{
printf("event in %s err %s\n", __func__, strerror(errno));
return 0;
}
if (hpev->_status == 0)
{
printf("event in %s is not exist %s\n", __func__, strerror(errno));
return 0;
}
struct epoll_event ev = {0, {0}};
ev.data.ptr = hpev;
ev.events = hpev->_event;
http_base *base = hpev->_base;
if (epoll_ctl(base->_epoll_fd, EPOLL_CTL_MOD, hpev->_fd, &ev)< 0)
{
printf("event mod failed [fd=%d], events[%d]\n", hpev->_fd, hpev->_event);
return 0;
}
return 1;
}
int http_event_del(http_event *hpev)
{
if (!hpev)
{
printf("event in %s err %s\n", __func__, strerror(errno));
return 0;
}
if (hpev->_status == 0)
{
printf("event in %s is not exist %s\n", __func__, strerror(errno));
return 0;
}
hpev->_status = 0;
http_base *base = hpev->_base;
struct epoll_event ev = {0, {0}};
ev.data.ptr = hpev;
ev.events = hpev->_event;
epoll_ctl(base->_epoll_fd, EPOLL_CTL_DEL, hpev->_fd, &ev);
printf("disconnect, pos[%d]\n", hpev->_fd);
return 1;
}
int accept_callback(int fd, int event, void *arg);
int read_callback(int fd, int event, void *arg);
int send_callback(int fd, int event, void *arg);
int accept_callback(int fd, int event, void *arg)
{
http_base *base = (http_base *)arg;
if (!base)
{
return 0;
}
struct sockaddr_in client_address;
socklen_t client_address_len = sizeof(client_address);
int client_fd;
if ((client_fd = accept(fd, (struct sockaddr *)&client_address, &client_address_len)) == -1)
{
printf("accept: %s\n", strerror(errno));
return 0;
}
int flag = 0;
if ((flag = fcntl(client_fd, F_SETFL, O_NONBLOCK))< 0)
{
printf("%s: fcntl nonblocking failed, %d\n", __func__, MAX_EPOLLSIZE);
}
http_event *hpev = http_eventblock_index(base, client_fd);
http_event_set(base, hpev, client_fd, EPOLLIN | EPOLLET | EPOLLRDHUP, 0, read_callback, base);
http_event_add(hpev);
printf("new connect [%s:%d], pos[%d]\n",
inet_ntoa(client_address.sin_addr), ntohs(client_address.sin_port), client_fd);
return 1;
}
int read_callback(int fd, int event, void *arg)
{
http_base *base = (http_base *)arg;
http_event *hpev = http_eventblock_index(base, fd);
if (hpev->_receive_length >= BUFFER_LENGTH)
{
return 0;
}
int byte_read = 0;
while (1)
{
byte_read = recv(fd,
hpev->_receive_buffer + hpev->_receive_length,
BUFFER_LENGTH - hpev->_receive_length, 0);
if (byte_read == -1)
{
if (errno == EINTR) {
continue;
}
else if (errno == EAGAIN || errno == EWOULDBLOCK)
{
http_event_reset(base, hpev, fd, EPOLLOUT | EPOLLET | EPOLLRDHUP, 0, send_callback, base);
http_event_mod(hpev);
break;
}
return 0;
}
else if (byte_read == 0)
{
return 0;
}
hpev->_receive_length += byte_read;
}
printf("receive %s\n", hpev->_receive_buffer);
memcpy(hpev->_send_buffer, hpev->_receive_buffer, hpev->_receive_length);
hpev->_send_length = hpev->_receive_length;
memset(hpev->_receive_buffer, 0, BUFFER_LENGTH);
hpev->_receive_length = 0;
return 1;
}
int send_callback(int fd, int event, void *arg)
{
http_base *base = (http_base *)arg;
http_event *hpev = http_eventblock_index(base, fd);
int temp = 0;
int byte_have_send = 0;
int byte_to_send = hpev->_send_length;
while (1)
{
if (byte_to_send == 0)
{
http_event_reset(base, hpev, fd, EPOLLIN | EPOLLET | EPOLLRDHUP, 0, read_callback, base);
http_event_mod(hpev);
break;
}
temp = send(hpev->_fd, hpev->_send_buffer, byte_to_send, 0);
if (temp == -1)
{
// 中斷
if (errno == EINTR)
{
continue;
}
// 寫緩沖區(qū)無數(shù)據(jù)
else if (errno == EAGAIN || errno == EWOULDBLOCK)
{
http_event_reset(base, hpev, fd, EPOLLIN | EPOLLET | EPOLLRDHUP, 0, read_callback, base);
http_event_mod(hpev);
break;
}
// 錯誤
else {
return 0;
}
}
byte_to_send -= temp;
byte_have_send += temp;
}
memset(hpev->_send_buffer, 0, BUFFER_LENGTH);
hpev->_send_length = 0;
return 1;
}
int init_sock(short port)
{
int fd = socket(AF_INET, SOCK_STREAM, 0);
fcntl(fd, F_SETFL, O_NONBLOCK);
int reuse = 1;
setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(reuse));
struct sockaddr_in server_addr;
memset(&server_addr, 0, sizeof(server_addr));
server_addr.sin_family = AF_INET;
server_addr.sin_addr.s_addr = htonl(INADDR_ANY);
server_addr.sin_port = htons(port);
bind(fd, (struct sockaddr *)&server_addr, sizeof(server_addr));
if (listen(fd, 20)< 0)
{
printf("listen failed : %s\n", strerror(errno));
}
return fd;
}
int main()
{
// 創(chuàng)建base
http_base *base = http_base_new();
// 創(chuàng)建監(jiān)聽事件
int listen_fd = init_sock(8080);
// 加入epoll管理
http_event *listen_event = http_eventblock_index(base, listen_fd);
http_event_set(base, listen_event, listen_fd, EPOLLIN, 1, accept_callback, base);
http_event_add(listen_event);
// 事件循環(huán)
dispatch(base);
// 消除事件管理
http_event_del(listen_event);
// 銷毀base
http_base_free(base);
return 0;
}
??
你是否還在尋找穩(wěn)定的海外服務器提供商?創(chuàng)新互聯(lián)www.cdcxhl.cn海外機房具備T級流量清洗系統(tǒng)配攻擊溯源,準確流量調(diào)度確保服務器高可用性,企業(yè)級服務器適合批量采購,新人活動首月15元起,快前往官網(wǎng)查看詳情吧