博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Nginx源码分析-事件循环
阅读量:6648 次
发布时间:2019-06-25

本文共 12101 字,大约阅读时间需要 40 分钟。

转自

 

事件循环这个概念貌似在windows编程中提得更多,Linux程序却很少提及这个概念。本文所提及的事件循环其实就是worker cycle,由于此处将关注的不再是worker进程,而是worker进程在循环过程中关于事件处理的环节,因此就盗用了事件循环这个概念。在具体看代码前,先看一下这个“循环”的概貌:

 

经过前面相关博文的介绍,我们了解到master进程创建好一个worker进程后,worker进程还会进行一个初始化工作,然后才会陷入“死”循环中。这个“死循环”也就是本文将谈及的事件循环,也就是上图中的黄色部分。整个黄色部份是由一个循环构成的,实际上,这个循环里将会做很多的事情,但本文将只关注图中红色标注的事件部分——ngx_process_events_and_timers。ngx_process_events_and_timers是一个函数(定义在src/event/ngx_event.c中)。接下来,就从这个函数开始进入事件驱动的核心。


 

[cpp]
  1. void  
  2. ngx_process_events_and_timers(ngx_cycle_t *cycle)  
  3. {  
  4.     ngx_uint_t  flags;  
  5.     ngx_msec_t  timer, delta;  
  6.     if (ngx_timer_resolution) {  
  7.         timer = NGX_TIMER_INFINITE;  
  8.         flags = 0;  
  9.     } else {  
  10.         timer = ngx_event_find_timer();  
  11.         flags = NGX_UPDATE_TIME;  
  12.     }  
  13.     /*ngx_use_accept_mutex变量代表是否使用accept互斥体 
  14.      默认是使用,accept_mutex off;指令关闭。 
  15.      accept mutex的作用就是避免惊群,同时实现负载均衡。 
  16.      */  
  17.     if (ngx_use_accept_mutex) {  
  18.       
  19.         /* 
  20.          ngx_accept_disabled变量在ngx_event_accept函数中计算。 
  21.          如果ngx_accept_disabled大于了0,就表示该进程接受的 
  22.          连接过多,因此就放弃一次争抢accept mutex的机会,同时将 
  23.          自己减1。然后,继续处理已有连接上的事件。Nginx就借用 
  24.          此变量实现了进程关于连接的基本负载均衡。 
  25.          */  
  26.         if (ngx_accept_disabled > 0) {  
  27.             ngx_accept_disabled--;  
  28.         } else {  
  29.             /* ngx_accept_disabled小于0,连接数没超载*/  
  30.               
  31.             /*尝试锁accept mutex,只有成功获取锁的进程,才会将listen 
  32.               套接字放入epoll中。因此,这就保证了只有一个进程拥有 
  33.               监听套接口,故所有进程阻塞在epoll_wait时,不会出现惊群现象。 
  34.             */  
  35.             if (ngx_trylock_accept_mutex(cycle) == NGX_ERROR) {  
  36.                 return;  
  37.             }  
  38.             if (ngx_accept_mutex_held) {  
  39.                 /*获取锁的进程,将添加一个NGX_POST_EVENTS标志, 
  40.                   此标志的作用是将所有产生的事件放入一个队列中, 
  41.                   等释放锁后,再慢慢来处理事件。因为,处理事件可能 
  42.                   会很耗时,如果不先释放锁再处理的话,该进程就长 
  43.                   时间霸占了锁,导致其他进程无法获取锁,这样accept 
  44.                   的效率就低了。 
  45.                 */  
  46.                 flags |= NGX_POST_EVENTS;   
  47.             } else {  
  48.               
  49.                 /*没有获得锁的进程,当然不需要NGX_POST_EVENTS标志了。 
  50.                   但需要设置最长延迟多久,再次去争抢锁。 
  51.                 */  
  52.                 if (timer == NGX_TIMER_INFINITE  
  53.                     || timer > ngx_accept_mutex_delay)  
  54.                 {  
  55.                     timer = ngx_accept_mutex_delay;  
  56.                 }  
  57.             }  
  58.         }  
  59.     }  
  60.       
  61.     delta = ngx_current_msec;  
  62.     /*epoll开始wait事件了,ngx_process_events的具体实现是对应到 
  63.       epoll模块中的ngx_epoll_process_events函数。单独分析epoll 
  64.       模块的时候,再具体看看。 
  65.     */  
  66.     (void) ngx_process_events(cycle, timer, flags);  
  67.     delta = ngx_current_msec - delta; /*统计本次wait事件的耗时*/  
  68.     /*ngx_posted_accept_events是一个事件队列 
  69.       暂存epoll从监听套接口wait到的accept事件。 
  70.       前文提到的NGX_POST_EVENTS标志被使用后,就会将 
  71.       所有的accept事件暂存到这个队列。 
  72.        
  73.       这里完成对队列中的accept事件的处理,实际就是调用 
  74.       ngx_event_accept函数来获取一个新的连接,然后放入 
  75.       epoll中。 
  76.     */  
  77.     if (ngx_posted_accept_events) {  
  78.         ngx_event_process_posted(cycle, &ngx_posted_accept_events);  
  79.     }  
  80.     /*所有accept事件处理完成,如果拥有锁的话,就赶紧释放了。 
  81.       其他进程还等着抢了。 
  82.     */  
  83.     if (ngx_accept_mutex_held) {  
  84.         ngx_shmtx_unlock(&ngx_accept_mutex);  
  85.     }  
  86.     /*delta是上文对epoll wait事件的耗时统计,存在毫秒级的耗时 
  87.       就对所有事件的timer进行检查,如果time out就从timer rbtree中, 
  88.       删除到期的timer,同时调用相应事件的handler函数完成处理。 
  89.     */  
  90.     if (delta) {  
  91.         ngx_event_expire_timers();  
  92.     }  
  93.     /*处理普通事件(连接上获得的读写事件)队列上的所有事件, 
  94.       因为每个事件都有自己的handler方法,该怎么处理事件就 
  95.       依赖于事件的具体handler了。 
  96.     */  
  97.     if (ngx_posted_events) {  
  98.         if (ngx_threaded) {  
  99.             ngx_wakeup_worker_thread(cycle);  
  100.         } else {  
  101.             ngx_event_process_posted(cycle, &ngx_posted_events);  
  102.         }  
  103.     }  
  104. }  

void ngx_process_events_and_timers(ngx_cycle_t *cycle) { ngx_uint_t flags; ngx_msec_t timer, delta; if (ngx_timer_resolution) { timer = NGX_TIMER_INFINITE; flags = 0; } else { timer = ngx_event_find_timer(); flags = NGX_UPDATE_TIME; } /*ngx_use_accept_mutex变量代表是否使用accept互斥体 默认是使用,accept_mutex off;指令关闭。 accept mutex的作用就是避免惊群,同时实现负载均衡。 */ if (ngx_use_accept_mutex) { /* ngx_accept_disabled变量在ngx_event_accept函数中计算。 如果ngx_accept_disabled大于了0,就表示该进程接受的 连接过多,因此就放弃一次争抢accept mutex的机会,同时将 自己减1。然后,继续处理已有连接上的事件。Nginx就借用 此变量实现了进程关于连接的基本负载均衡。 */ if (ngx_accept_disabled > 0) { ngx_accept_disabled--; } else { /* ngx_accept_disabled小于0,连接数没超载*/ /*尝试锁accept mutex,只有成功获取锁的进程,才会将listen 套接字放入epoll中。因此,这就保证了只有一个进程拥有 监听套接口,故所有进程阻塞在epoll_wait时,不会出现惊群现象。 */ if (ngx_trylock_accept_mutex(cycle) == NGX_ERROR) { return; } if (ngx_accept_mutex_held) { /*获取锁的进程,将添加一个NGX_POST_EVENTS标志, 此标志的作用是将所有产生的事件放入一个队列中, 等释放锁后,再慢慢来处理事件。因为,处理事件可能 会很耗时,如果不先释放锁再处理的话,该进程就长 时间霸占了锁,导致其他进程无法获取锁,这样accept 的效率就低了。 */ flags |= NGX_POST_EVENTS; } else { /*没有获得锁的进程,当然不需要NGX_POST_EVENTS标志了。 但需要设置最长延迟多久,再次去争抢锁。 */ if (timer == NGX_TIMER_INFINITE || timer > ngx_accept_mutex_delay) { timer = ngx_accept_mutex_delay; } } } } delta = ngx_current_msec; /*epoll开始wait事件了,ngx_process_events的具体实现是对应到 epoll模块中的ngx_epoll_process_events函数。单独分析epoll 模块的时候,再具体看看。 */ (void) ngx_process_events(cycle, timer, flags); delta = ngx_current_msec - delta; /*统计本次wait事件的耗时*/ /*ngx_posted_accept_events是一个事件队列 暂存epoll从监听套接口wait到的accept事件。 前文提到的NGX_POST_EVENTS标志被使用后,就会将 所有的accept事件暂存到这个队列。 这里完成对队列中的accept事件的处理,实际就是调用 ngx_event_accept函数来获取一个新的连接,然后放入 epoll中。 */ if (ngx_posted_accept_events) { ngx_event_process_posted(cycle, &ngx_posted_accept_events); } /*所有accept事件处理完成,如果拥有锁的话,就赶紧释放了。 其他进程还等着抢了。 */ if (ngx_accept_mutex_held) { ngx_shmtx_unlock(&ngx_accept_mutex); } /*delta是上文对epoll wait事件的耗时统计,存在毫秒级的耗时 就对所有事件的timer进行检查,如果time out就从timer rbtree中, 删除到期的timer,同时调用相应事件的handler函数完成处理。 */ if (delta) { ngx_event_expire_timers(); } /*处理普通事件(连接上获得的读写事件)队列上的所有事件, 因为每个事件都有自己的handler方法,该怎么处理事件就 依赖于事件的具体handler了。 */ if (ngx_posted_events) { if (ngx_threaded) { ngx_wakeup_worker_thread(cycle); } else { ngx_event_process_posted(cycle, &ngx_posted_events); } } }

 ngx_process_events_and_timers一做完工作,就又回到了事件循环中去了,上图示;但会很快又会回到事件处理中来。


 

上文中,分析了事件循环中有关事件处理的过程;在分析的过程中,我们有提到对accept事件的处理,accept事件就是监听套接口上有新的连接到来的事件;接下来,我们分析一下accept事件的handler方法,看看accept事件的处理过程是如何的。accept事件的handler方法是ngx_event_accept(位于src/event/ngx_event_accept.c中),代码分析如下:

[cpp]
  1. void  
  2. ngx_event_accept(ngx_event_t *ev)  
  3. {  
  4.     socklen_t          socklen;  
  5.     ngx_err_t          err;  
  6.     ngx_log_t         *log;  
  7.     ngx_socket_t       s;  
  8.     ngx_event_t       *rev, *wev;  
  9.     ngx_listening_t   *ls;  
  10.     ngx_connection_t  *c, *lc;  
  11.     ngx_event_conf_t  *ecf;  
  12.     u_char             sa[NGX_SOCKADDRLEN];  
  13.     。。。。。。。。。。。。。。。。。  
  14.       
  15.     lc = ev->data;  
  16.     ls = lc->listening;  
  17.     ev->ready = 0;  
  18.     do {  
  •         socklen = NGX_SOCKADDRLEN;  
  •         /*accept一个新的连接*/  
  •         s = accept(lc->fd, (struct sockaddr *) sa, &socklen);  
  •         /*连接的错误处理*/  
  •         if (s == -1) {  
  •             err = ngx_socket_errno;  
  •             if (err == NGX_EAGAIN) {  
  •                 ngx_log_debug0(NGX_LOG_DEBUG_EVENT, ev->log, err,  
  •                                "accept() not ready");  
  •                 return;  
  •             }  
  •             ngx_log_error((ngx_uint_t) ((err == NGX_ECONNABORTED) ?  
  •                                              NGX_LOG_ERR : NGX_LOG_ALERT),  
  •                           ev->log, err, "accept() failed");  
  •             if (err == NGX_ECONNABORTED) {  
  •                 if (ngx_event_flags & NGX_USE_KQUEUE_EVENT) {  
  •                     ev->available--;  
  •                 }  
  •                 if (ev->available) {  
  •                     continue;  
  •                 }  
  •             }  
  •             return;  
  •         }  
  •         /*accept到一个新的连接后,就重新计算ngx_accept_disabled的值 
  •          ngx_accept_disabled已经提及过了,它主要用来做负载均衡之用。 
  •           
  •          这里,我们能够看到它的求值方式是“总连接数的八分之一,减去 
  •          剩余的连接数”。总连接数是指每个进程设定的最大连接数,这个数字 
  •          可以在配置文件中指定。由此处的计算方式,可以看出:每个进程accept 
  •          到总连接数的7/8后,ngx_accept_disabled就大于0了,连接也就 
  •          超载了。 
  •         */  
  •         ngx_accept_disabled = ngx_cycle->connection_n / 8  
  •                               - ngx_cycle->free_connection_n;  
  •         /*从connections数组中获取一个connecttion slot来维护新的连接*/  
  •         c = ngx_get_connection(s, ev->log);  
  •         if (c == NULL) {  
  •             if (ngx_close_socket(s) == -1) {  
  •                 ngx_log_error(NGX_LOG_ALERT, ev->log, ngx_socket_errno,  
  •                               ngx_close_socket_n " failed");  
  •             }  
  •             return;  
  •         }  
  •         /*为新的连接创建起一个memory pool,连接关闭的时候,才释放这个pool*/  
  •         c->pool = ngx_create_pool(ls->pool_size, ev->log);  
  •         if (c->pool == NULL) {  
  •             ngx_close_accepted_connection(c);  
  •             return;  
  •         }  
  •         。。。。。。。。。。。。。。  
  •         /* set a blocking mode for aio and non-blocking mode for others */  
  •         if (ngx_inherited_nonblocking) {  
  •             if (ngx_event_flags & NGX_USE_AIO_EVENT) {  
  •                 if (ngx_blocking(s) == -1) {  
  •                     ngx_log_error(NGX_LOG_ALERT, ev->log, ngx_socket_errno,  
  •                                   ngx_blocking_n " failed");  
  •                     ngx_close_accepted_connection(c);  
  •                     return;  
  •                 }  
  •             }  
  •         } else {  
  •             /*我们使用的epoll模型,在这里设置连接为nonblocking*/  
  •             if (!(ngx_event_flags & (NGX_USE_AIO_EVENT|NGX_USE_RTSIG_EVENT))) {  
  •                 if (ngx_nonblocking(s) == -1) {  
  •                     ngx_log_error(NGX_LOG_ALERT, ev->log, ngx_socket_errno,  
  •                                   ngx_nonblocking_n " failed");  
  •                     ngx_close_accepted_connection(c);  
  •                     return;  
  •                 }  
  •             }  
  •         }  
  •         *log = ls->log;  
  •         /*初始化新连接*/  
  •         c->recv = ngx_recv;  
  •         c->send = ngx_send;  
  •         c->recv_chain = ngx_recv_chain;  
  •         c->send_chain = ngx_send_chain;  
  •         c->log = log;  
  •         c->pool->log = log;  
  •         c->socklen = socklen;  
  •         c->listening = ls;  
  •         c->local_sockaddr = ls->sockaddr;  
  •         c->local_socklen = ls->socklen;  
  •         c->unexpected_eof = 1;  
  •         rev = c->read;  
  •         wev = c->write;  
  •         wev->ready = 1;  
  •         if (ngx_event_flags & (NGX_USE_AIO_EVENT|NGX_USE_RTSIG_EVENT)) {  
  •             /* rtsig, aio, iocp */  
  •             rev->ready = 1;  
  •         }  
  •         if (ev->deferred_accept) {  
  •             rev->ready = 1;  
  •         }  
  •         rev->log = log;  
  •         wev->log = log;  
  •         /* 
  •          * TODO: MT: - ngx_atomic_fetch_add() 
  •          *             or protection by critical section or light mutex 
  •          * 
  •          * TODO: MP: - allocated in a shared memory 
  •          *           - ngx_atomic_fetch_add() 
  •          *             or protection by critical section or light mutex 
  •          */  
  •         c->number = ngx_atomic_fetch_add(ngx_connection_counter, 1);  
  •         。。。。。。。。。。。。。。。。。。。  
  •         if (ngx_add_conn && (ngx_event_flags & NGX_USE_EPOLL_EVENT) == 0) {  
  •             if (ngx_add_conn(c) == NGX_ERROR) {  
  •                 ngx_close_accepted_connection(c);  
  •                 return;  
  •             }  
  •         }  
  •         log->data = NULL;  
  •         log->handler = NULL;  
  •         /*这里的listen handler很重要,它将完成新连接的最后初始化工作 
  •           同时将accept到的新连接放入epoll中;挂在这个handler上的函数 
  •           就是ngx_http_init_connection(位于src/http/ngx_http_request.c中); 
  •           这个函数放在分析http模块的时候再看吧。 
  •         */  
  •         ls->handler(c);  
  •         if (ngx_event_flags & NGX_USE_KQUEUE_EVENT) {  
  •             ev->available--;  
  •         }  
  •     } while (ev->available);  
  • }  

void ngx_event_accept(ngx_event_t *ev) { socklen_t socklen; ngx_err_t err; ngx_log_t *log; ngx_socket_t s; ngx_event_t *rev, *wev; ngx_listening_t *ls; ngx_connection_t *c, *lc; ngx_event_conf_t *ecf; u_char sa[NGX_SOCKADDRLEN]; 。。。。。。。。。。。。。。。。。 lc = ev->data; ls = lc->listening; ev->ready = 0; do { socklen = NGX_SOCKADDRLEN; /*accept一个新的连接*/ s = accept(lc->fd, (struct sockaddr *) sa, &socklen); /*连接的错误处理*/ if (s == -1) { err = ngx_socket_errno; if (err == NGX_EAGAIN) { ngx_log_debug0(NGX_LOG_DEBUG_EVENT, ev->log, err, "accept() not ready"); return; } ngx_log_error((ngx_uint_t) ((err == NGX_ECONNABORTED) ? NGX_LOG_ERR : NGX_LOG_ALERT), ev->log, err, "accept() failed"); if (err == NGX_ECONNABORTED) { if (ngx_event_flags & NGX_USE_KQUEUE_EVENT) { ev->available--; } if (ev->available) { continue; } } return; } /*accept到一个新的连接后,就重新计算ngx_accept_disabled的值 ngx_accept_disabled已经提及过了,它主要用来做负载均衡之用。 这里,我们能够看到它的求值方式是“总连接数的八分之一,减去 剩余的连接数”。总连接数是指每个进程设定的最大连接数,这个数字 可以在配置文件中指定。由此处的计算方式,可以看出:每个进程accept 到总连接数的7/8后,ngx_accept_disabled就大于0了,连接也就 超载了。 */ ngx_accept_disabled = ngx_cycle->connection_n / 8 - ngx_cycle->free_connection_n; /*从connections数组中获取一个connecttion slot来维护新的连接*/ c = ngx_get_connection(s, ev->log); if (c == NULL) { if (ngx_close_socket(s) == -1) { ngx_log_error(NGX_LOG_ALERT, ev->log, ngx_socket_errno, ngx_close_socket_n " failed"); } return; } /*为新的连接创建起一个memory pool,连接关闭的时候,才释放这个pool*/ c->pool = ngx_create_pool(ls->pool_size, ev->log); if (c->pool == NULL) { ngx_close_accepted_connection(c); return; } 。。。。。。。。。。。。。。 /* set a blocking mode for aio and non-blocking mode for others */ if (ngx_inherited_nonblocking) { if (ngx_event_flags & NGX_USE_AIO_EVENT) { if (ngx_blocking(s) == -1) { ngx_log_error(NGX_LOG_ALERT, ev->log, ngx_socket_errno, ngx_blocking_n " failed"); ngx_close_accepted_connection(c); return; } } } else { /*我们使用的epoll模型,在这里设置连接为nonblocking*/ if (!(ngx_event_flags & (NGX_USE_AIO_EVENT|NGX_USE_RTSIG_EVENT))) { if (ngx_nonblocking(s) == -1) { ngx_log_error(NGX_LOG_ALERT, ev->log, ngx_socket_errno, ngx_nonblocking_n " failed"); ngx_close_accepted_connection(c); return; } } } *log = ls->log; /*初始化新连接*/ c->recv = ngx_recv; c->send = ngx_send; c->recv_chain = ngx_recv_chain; c->send_chain = ngx_send_chain; c->log = log; c->pool->log = log; c->socklen = socklen; c->listening = ls; c->local_sockaddr = ls->sockaddr; c->local_socklen = ls->socklen; c->unexpected_eof = 1; rev = c->read; wev = c->write; wev->ready = 1; if (ngx_event_flags & (NGX_USE_AIO_EVENT|NGX_USE_RTSIG_EVENT)) { /* rtsig, aio, iocp */ rev->ready = 1; } if (ev->deferred_accept) { rev->ready = 1; } rev->log = log; wev->log = log; /* * TODO: MT: - ngx_atomic_fetch_add() * or protection by critical section or light mutex * * TODO: MP: - allocated in a shared memory * - ngx_atomic_fetch_add() * or protection by critical section or light mutex */ c->number = ngx_atomic_fetch_add(ngx_connection_counter, 1); 。。。。。。。。。。。。。。。。。。。 if (ngx_add_conn && (ngx_event_flags & NGX_USE_EPOLL_EVENT) == 0) { if (ngx_add_conn(c) == NGX_ERROR) { ngx_close_accepted_connection(c); return; } } log->data = NULL; log->handler = NULL; /*这里的listen handler很重要,它将完成新连接的最后初始化工作 同时将accept到的新连接放入epoll中;挂在这个handler上的函数 就是ngx_http_init_connection(位于src/http/ngx_http_request.c中); 这个函数放在分析http模块的时候再看吧。 */ ls->handler(c); if (ngx_event_flags & NGX_USE_KQUEUE_EVENT) { ev->available--; } } while (ev->available); }

 

 

accept事件的handler方法也就是如此了。剩下的就是每个连接上的读写事件的handler方法没有分析了,这一部分的内容将直接带领我们进入http模块中,所以等我们把epoll看完了,再开始http模块的分析吧。

 

 

转载于:https://www.cnblogs.com/li-hao/archive/2013/03/14/2958934.html

你可能感兴趣的文章
java.io.IOException:stream closed 异常的原因及处理
查看>>
ACM HDU 1029Ignatius and the Princess IV
查看>>
iOS开发之一些字符串常用的代码
查看>>
Android开发笔记之adb参数指南
查看>>
SQL中sum(),avg()等统计结果为null的解决方法
查看>>
初学Java的几个tips
查看>>
cvDilate
查看>>
android照相及照片上传
查看>>
关于信息隐藏的感想及其它废话
查看>>
RCP学习:Bundle的生命周期
查看>>
现代 C++ 编程指南
查看>>
记录我的旅程8之JavaScript Dom学习笔记
查看>>
.NET中的加密算法总结(自定义加密Helper类续)
查看>>
sql 跨服务器数据库查询数据
查看>>
VBA SQLServer 基本操作
查看>>
在HTML语言网页中加载视频的代码
查看>>
POJ 1274 The Perfect Stall(二分图匹配)
查看>>
PHP全局错误处理
查看>>
数的1、2、3次方是否均为回文数
查看>>
kramdown 0.14.0,Ruby 的 Markdown 解析器
查看>>