上一节我们讲到了事件驱动的模块,它把我们引入epoll模块,今天我们主要学习下nginx如何使用epoll完成时间驱动,实现高并发;这里不详细讲解epoll原理,如果有机会再做一次单独的epoll的学习。
本文来自于:http://blog.csdn.net/lengzijian
回忆一下上一节的内容,在我们讲到ngx_process_events_and_timers时,在源码最后提到了ngx_process_events,这里是把我们引入epoll的入口:
1.先来看下ngx_process_events的宏定义:
src/event/ngx_event.h #define ngx_process_events ngx_event_actions.process_events
2.继续查找ngx_event_actions,我们找到如下结构体:
src/event/ngx_event.h typedef struct { ngx_int_t (*add)(ngx_event_t *ev, ngx_int_t event, ngx_uint_t flags); ngx_int_t (*del)(ngx_event_t *ev, ngx_int_t event, ngx_uint_t flags); ngx_int_t (*enable)(ngx_event_t *ev, ngx_int_t event, ngx_uint_t flags); ngx_int_t (*disable)(ngx_event_t *ev, ngx_int_t event, ngx_uint_t flags); ngx_int_t (*add_conn)(ngx_connection_t *c); ngx_int_t (*del_conn)(ngx_connection_t *c, ngx_uint_t flags); ngx_int_t (*process_changes)(ngx_cycle_t *cycle, ngx_uint_t nowait); ngx_int_t (*process_events)(ngx_cycle_t *cycle, ngx_msec_t timer, ngx_uint_t flags); ngx_int_t (*init)(ngx_cycle_t *cycle, ngx_msec_t timer); void (*done)(ngx_cycle_t *cycle); } ngx_event_actions_t;
a.我们去源代码中搜索下关键字ngx_event_actions:
modules/ngx_epoll_module.c: ngx_event_actions = ngx_epoll_module_ctx.actions; modules/ngx_select_module.c: ngx_event_actions = ngx_select_module_ctx.actions; modules/ngx_poll_module.c: ngx_event_actions = ngx_poll_module_ctx.actions; ngx_event.c:ngx_event_actions_t ngx_event_actions; 前面三行表示:所有event模块对象中的actions就是ngx_event_actions_t对象,而ngx_event_action在第四行定义为全局变量,用于同一接口,下面又存在一个疑问,event模块到底做了些什么?
b.先找到ngx_event_module_t的结构体:
src/event/ngx_event.h typedef struct { ngx_str_t *name; //模块名 void *(*create_conf)(ngx_cycle_t *cycle); //钩子函数,之前讲过 char *(*init_conf)(ngx_cycle_t *cycle, void *conf);//同上 ngx_event_actions_t actions; //接下来主要看 } ngx_event_module_t;
我们找一个例子来详细讲解下
src/event/modules/ngx_epoll_module.c ngx_event_module_t ngx_epoll_module_ctx = { &epoll_name, ngx_epoll_create_conf, /* create configuration */ ngx_epoll_init_conf, /* init configuration */ { ngx_epoll_add_event, /* add an event */ ngx_epoll_del_event, /* delete an event */ ngx_epoll_add_event, /* enable an event */ ngx_epoll_del_event, /* disable an event */ ngx_epoll_add_connection, /* add an connection */ ngx_epoll_del_connection, /* delete an connection */ NULL, /* process the changes */ ngx_epoll_process_events, /* process the events */ ngx_epoll_init, /* init the events */ ngx_epoll_done, /* done the events */ } }; 这里有注释就不详细讲解了。
ngx_process_events这个函数就是我们要找的,要了好大一圈,ngx_process_events实际上就是调用这个函数,此处本人纠结,为什么作者不加点注释呢。
3.下面正式观察下ngx_epoll_init函数:
src/event/modules/ngx_epoll_module.c static ngx_int_t ngx_epoll_init(ngx_cycle_t *cycle, ngx_msec_t timer) { ngx_epoll_conf_t *epcf; //获取epoll模块的配置结构 epcf = ngx_event_get_conf(cycle->conf_ctx, ngx_epoll_module); //ep是epoll模块定义的一个全局变量,初始化为-1 if (ep == -1) { //创建一个epoll对象,容量为总连接数的一半 ep = epoll_create(cycle->connection_n / 2); if (ep == -1) { ngx_log_error(NGX_LOG_EMERG, cycle->log, ngx_errno, "epoll_create() failed"); return NGX_ERROR; } } //nevents也是epoll模块的全局变量,初始化为0 if (nevents < epcf->events) { if (event_list) { ngx_free(event_list); } //event_list存储产生时间的数组 event_list = ngx_alloc(sizeof(struct epoll_event) * epcf->events, cycle->log); if (event_list == NULL) { return NGX_ERROR; } } nevents = epcf->events; /*初始化全局变量ngx_io,ngx_os_io定义为: ngx_os_io_t ngx_os_io = { ngx_unix_recv, ngx_readv_chain, ngx_udp_unix_recv, ngx_unix_send, ngx_writev_chain, 0 };(src/os/unix/ngx_posix_init.c) */ ngx_io = ngx_os_io; //这里之前讲过 ngx_event_actions = ngx_epoll_module_ctx.actions; #if (NGX_HAVE_CLEAR_EVENT) //实现边沿触发 ngx_event_flags = NGX_USE_CLEAR_EVENT #else //实现水平出发 ngx_event_flags = NGX_USE_LEVEL_EVENT #endif |NGX_USE_GREEDY_EVENT //io是知道收到DAGAIN为止 |NGX_USE_EPOLL_EVENT; //epoll标志 return NGX_OK; }
4.下面观察下主要的函数ngx_epoll_process_events:
src/event/modules/ngx_epoll_module.c static ngx_int_t ngx_epoll_process_events(ngx_cycle_t *cycle, ngx_msec_t timer, ngx_uint_t flags) { int events; uint32_t revents; ngx_int_t instance, i; ngx_uint_t level; ngx_err_t err; ngx_event_t *rev, *wev, **queue; ngx_connection_t *c; /* NGX_TIMER_INFINITE == INFTIM */ ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0, "epoll timer: %M", timer); //一开支就等待时间,最长等待时间为timer,这里的timer下一节会详细讲解 events = epoll_wait(ep, event_list, (int) nevents, timer); err = (events == -1) ? ngx_errno : 0; if (flags & NGX_UPDATE_TIME || ngx_event_timer_alarm) { //执行一次时间更新,nginx将时间缓存到一组全局变量中,方便程序高效获取事件 ngx_time_update(); } //wait出错 if (err) { if (err == NGX_EINTR) { if (ngx_event_timer_alarm) { ngx_event_timer_alarm = 0; return NGX_OK; } level = NGX_LOG_INFO; } else { level = NGX_LOG_ALERT; } ngx_log_error(level, cycle->log, err, "epoll_wait() failed"); return NGX_ERROR; } //wait返回事件数0,可能是timeout返回,如果不是timeout返回,那么就是error if (events == 0) { //这里限定timer不是无线超时 if (timer != NGX_TIMER_INFINITE) { return NGX_OK; } ngx_log_error(NGX_LOG_ALERT, cycle->log, 0, "epoll_wait() returned no events without timeout"); return NGX_ERROR; } //ngx_posted_events_mutex上锁 ngx_mutex_lock(ngx_posted_events_mutex); // for (i = 0; i < events; i++) { c = event_list[i].data.ptr; instance = (uintptr_t) c & 1; //从发生的epoll事件对象中取得ngx_connection_t对象 c = (ngx_connection_t *) ((uintptr_t) c & (uintptr_t) ~1); //取出读事件 rev = c->read; //.... //取得发生的时间 revents = event_list[i].events; //记录wait的错误返回状态 if (revents & (EPOLLERR|EPOLLHUP)) { ngx_log_debug2(NGX_LOG_DEBUG_EVENT, cycle->log, 0, "epoll_wait() error on fd:%d ev:%04XD", c->fd, revents); } if ((revents & (EPOLLERR|EPOLLHUP)) && (revents & (EPOLLIN|EPOLLOUT)) == 0) { /* * if the error events were returned without EPOLLIN or EPOLLOUT, * then add these flags to handle the events at least in one * active handler */ revents |= EPOLLIN|EPOLLOUT; } //读取一个事件,并且该连接上注册的读时间是active的 if ((revents & EPOLLIN) && rev->active) { if ((flags & NGX_POST_THREAD_EVENTS) && !rev->accept) { rev->posted_ready = 1; } else { rev->ready = 1; } //时间放入相应的队列中;之前有说过nginx是先放入队列,释放锁之后再做处理 if (flags & NGX_POST_EVENTS) { queue = (ngx_event_t **) (rev->accept ? &ngx_posted_accept_events : &ngx_posted_events); /* 这里根据accept状态 如果accept为真:加入到ngx_posted_accept_events事件队列中 如果accept为假:加入到ngx_posted_events事件队列中 */ ngx_locked_post_event(rev, queue);//加入到ngx_posted_accept_events队里面 } else { rev->handler(rev);//调用读事件处理函数,通常就是读取事件了 } } //取出写事件 wev = c->write; //如果是写事件并且active状态 if ((revents & EPOLLOUT) && wev->active) { if (c->fd == -1 || wev->instance != instance) { /* * the stale event from a file descriptor * that was just closed in this iteration */ ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0, "epoll: stale event %p", c); continue; } if (flags & NGX_POST_THREAD_EVENTS) { //写事件设为posted_ready状态 wev->posted_ready = 1; } else { 写事件设为ready状态 wev->ready = 1; } if (flags & NGX_POST_EVENTS) { //不立即处理,要时间排队,加入队列 ngx_locked_post_event(wev, &ngx_posted_events); } else { //调用写事件处理函数,通常就是写入数据 wev->handler(wev); } } } //ngx_mutex_unlock解锁 ngx_mutex_unlock(ngx_posted_events_mutex); return NGX_OK; } //其中用到了ngx_locked_post_event()这个宏,它把事件放到事件队列的头部。