# HG changeset patch # User Igor Sysoev # Date 1051023778 0 # Node ID eacfdd1c31b96793f7d17de772f84c3d88bda674 # Parent 9f81437e0ad3f4d528352217abca0d985eea7ea5 nginx-0.0.1-2003-04-22-19:02:58 import diff --git a/src/event/modules/ngx_kqueue_module.c b/src/event/modules/ngx_kqueue_module.c --- a/src/event/modules/ngx_kqueue_module.c +++ b/src/event/modules/ngx_kqueue_module.c @@ -12,19 +12,15 @@ #include #include -#if (USE_KQUEUE) && !(HAVE_KQUEUE) -#error "kqueue is not supported on this platform" -#endif - /* STUB */ #define KQUEUE_NCHANGES 512 #define KQUEUE_NEVENTS 512 -/* should be per-thread */ +/* should be per-thread if threads are used without thread pool */ #if 1 -int kq; +int kq; #else static int kq; #endif @@ -60,7 +56,6 @@ int ngx_kqueue_init(int max_connections, return NGX_ERROR; } -#if !(USE_KQUEUE) ngx_event_actions.add = ngx_kqueue_add_event; ngx_event_actions.del = ngx_kqueue_del_event; ngx_event_actions.timer = ngx_event_add_timer; @@ -91,8 +86,6 @@ int ngx_kqueue_init(int max_connections, #endif -#endif - return NGX_OK; } diff --git a/src/event/ngx_event.h b/src/event/ngx_event.h --- a/src/event/ngx_event.h +++ b/src/event/ngx_event.h @@ -146,31 +146,33 @@ typedef struct { } ngx_event_actions_t; -/* Event filter requires to read/write the whole data - +/* The event filter requires to read/write the whole data - select, poll, /dev/poll, kqueue. */ #define NGX_HAVE_LEVEL_EVENT 1 -/* Event filter is deleted after notification - select, poll, kqueue. - Using /dev/poll it can be implemented with additional syscall */ +/* The event filter is deleted after a notification without an additional + syscall - select, poll, kqueue. */ #define NGX_HAVE_ONESHOT_EVENT 2 -/* Event filter notifies only changes and initial level - kqueue */ +/* The event filter notifies only the changes and an initial level - kqueue */ #define NGX_HAVE_CLEAR_EVENT 4 -/* Event filter has kqueue features - eof flag, errno, available data, etc */ +/* The event filter has kqueue features - the eof flag, errno, + available data, etc */ #define NGX_HAVE_KQUEUE_EVENT 8 -/* Event filter supports low water mark - kqueue's NOTE_LOWAT, - early kqueue implementations have no NOTE_LOWAT so we need a separate flag */ +/* The event filter supports low water mark - kqueue's NOTE_LOWAT. + Early kqueue implementations have no NOTE_LOWAT so we need a separate flag */ #define NGX_HAVE_LOWAT_EVENT 0x00000010 -/* Event filter notifies only changes (edges) but not initial level - epoll */ +/* The event filter notifies only the changes (the edges) + but not an initial level - epoll */ #define NGX_HAVE_EDGE_EVENT 0x00000020 -/* No need to add or delete event filters - rt signals */ +/* No need to add or delete the event filters - rt signals */ #define NGX_HAVE_SIGIO_EVENT 0x00000040 -/* No need to add or delete event filters - overlapped, aio_read, aioread */ +/* No need to add or delete the event filters - overlapped, aio_read, aioread */ #define NGX_HAVE_AIO_EVENT 0x00000080 /* Need to add socket or handle only once - i/o completion port. @@ -284,6 +286,11 @@ extern int ngx_event_f #endif +#if !(HAVE_EPOLL) +#define ngx_edge_add_event(ev) NGX_ERROR +#endif + + ssize_t ngx_event_recv_core(ngx_connection_t *c, char *buf, size_t size); int ngx_event_close_connection(ngx_event_t *ev); diff --git a/src/event/ngx_event_proxy.c b/src/event/ngx_event_proxy.c --- a/src/event/ngx_event_proxy.c +++ b/src/event/ngx_event_proxy.c @@ -40,6 +40,27 @@ ngx_log_debug(p->log, "read upstream"); } else { +#if (HAVE_KQUEUE) /* kqueue notifies about the end of file or a pending error */ + + if (ngx_event_type == NGX_HAVE_KQUEUE_EVENT) { + + if (p->upstream->read->error) { + ngx_log_error(NGX_LOG_ERR, p->log, + p->upstream->read->error, + "readv() failed"); + p->upstream_error = 1; + + return NGX_ERROR; + + } else if (p->upstream->read->eof + && p->upstream->read->available == 0) { + p->upstream_eof = 1; + p->block_upstream = 0; + + break; + } + } +#endif /* use the free hunks if they exist */ if (p->free_hunks) { @@ -151,6 +172,7 @@ ngx_log_debug(p->log, "recv_chain: %d" _ } p->upstream_eof = 1; p->block_upstream = 0; + break; } @@ -399,7 +421,8 @@ int ngx_event_proxy_write_to_downstream( ngx_hunk_t *h; ngx_chain_t *entry; - if (p->downstream_level == 0 + if (p->upstream_level == 0 + && p->downstream_level == 0 && p->busy_hunk == NULL && p->out_hunks == NULL && p->in_hunks == NULL diff --git a/src/http/modules/proxy/ngx_http_event_proxy_handler.c b/src/http/modules/proxy/ngx_http_event_proxy_handler.c --- a/src/http/modules/proxy/ngx_http_event_proxy_handler.c +++ b/src/http/modules/proxy/ngx_http_event_proxy_handler.c @@ -332,7 +332,8 @@ static int ngx_http_proxy_process_upstre if (c) { p->cached_connection = 1; p->connection = c; - c->write->event_handler = ngx_http_proxy_process_upstream_event; + c->write->event_handler = c->read->event_handler = + ngx_http_proxy_process_upstream_event; rc = ngx_http_proxy_send_request(p); } else { @@ -370,10 +371,13 @@ static int ngx_http_proxy_process_upstre return NGX_DONE; } - if (rc == NGX_HTTP_BAD_GATEWAY || rc == NGX_HTTP_GATEWAY_TIME_OUT - || (rc == NGX_OK - && p->status == NGX_HTTP_INTERNAL_SERVER_ERROR - && p->lcf->retry_500_error)) + if (p->tries /* STUB !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! */ + + && (rc == NGX_HTTP_BAD_GATEWAY + || rc == NGX_HTTP_GATEWAY_TIME_OUT + || (rc == NGX_OK + && p->status == NGX_HTTP_INTERNAL_SERVER_ERROR + && p->lcf->retry_500_error))) { if (ev) { ngx_event_close_connection(ev); @@ -403,6 +407,8 @@ static int ngx_http_proxy_process_upstre return NGX_ERROR; } + /* reinitialize the proxy context for the next upstream */ + p->headers_in.server->key.len = 0; p->headers_in.connection->key.len = 0; p->headers_in.content_type->key.len = 0; @@ -441,8 +447,8 @@ static int ngx_http_proxy_connect(ngx_ht if (p->tries == p->upstreams->number) { /* Here is the race condition - when the upstreams are shared between threads or processes - but it should not be serious */ + when the upstreams are shared between + the threads or the processes but it should not be serious */ p->cur_upstream = p->upstreams->current++; @@ -465,9 +471,10 @@ static int ngx_http_proxy_connect(ngx_ht for ( ;; ) { u = &p->upstreams->u[p->cur_upstream]; + /* Here is the race condition - when the upstreams are shared between threads or processes - but it should not be serious */ + when the upstreams are shared between + the threads or the processes but it should not be serious */ if (u->fails > p->upstreams->max_fails || u->accessed < p->upstreams->fail_timeout) @@ -502,7 +509,7 @@ static int ngx_http_proxy_connect(ngx_ht if (s == -1) { ngx_log_error(NGX_LOG_ALERT, p->log, ngx_socket_errno, ngx_socket_n " failed"); - return NGX_ERROR; + return NGX_HTTP_INTERNAL_SERVER_ERROR; } if (p->lcf->rcvbuf) { @@ -516,7 +523,7 @@ static int ngx_http_proxy_connect(ngx_ht ngx_close_socket_n " failed"); } - return NGX_ERROR; + return NGX_HTTP_INTERNAL_SERVER_ERROR; } } @@ -529,7 +536,7 @@ static int ngx_http_proxy_connect(ngx_ht ngx_close_socket_n " failed"); } - return NGX_ERROR; + return NGX_HTTP_INTERNAL_SERVER_ERROR; } c = &ngx_connections[s]; @@ -549,26 +556,18 @@ static int ngx_http_proxy_connect(ngx_ht c->fd = s; wev->close_handler = rev->close_handler = ngx_event_close_connection; -#if !(USE_KQUEUE) - -#if (HAVE_EDGE_EVENT) /* epoll */ - if (ngx_event_flags & NGX_HAVE_EDGE_EVENT) { if (ngx_edge_add_event(wev) != NGX_OK) { - return NGX_ERROR; + return NGX_HTTP_INTERNAL_SERVER_ERROR; } } -#endif - -#endif - ngx_test_null(c->pool, ngx_create_pool(p->lcf->conn_pool_size, p->log), - NGX_ERROR); + NGX_HTTP_INTERNAL_SERVER_ERROR); ngx_test_null(p->sockaddr, ngx_pcalloc(c->pool, sizeof(struct sockaddr_in)), - NGX_ERROR); + NGX_HTTP_INTERNAL_SERVER_ERROR); addr = (struct sockaddr_in *) p->sockaddr; @@ -588,8 +587,6 @@ static int ngx_http_proxy_connect(ngx_ht ngx_close_socket_n " failed"); } - ngx_destroy_pool(c->pool); - return NGX_HTTP_BAD_GATEWAY; } } @@ -597,35 +594,22 @@ static int ngx_http_proxy_connect(ngx_ht c->data = p->request; p->connection = c; - ngx_test_null(c->pool, ngx_create_pool(p->lcf->conn_pool_size, p->log), - NGX_ERROR); - -#if (USE_KQUEUE) + if ((ngx_event_flags & NGX_HAVE_EDGE_EVENT) == 0) { /* not epoll */ - if (ngx_add_event(rev, NGX_READ_EVENT, NGX_CLEAR_EVENT) != NGX_OK) { - return NGX_ERROR; - } - -#else - - if ((ngx_event_flags & NGX_HAVE_EDGE_EVENT) == 0) { /* not epoll */ - - if (ngx_event_flags & NGX_HAVE_CLEAR_EVENT) { /* kqueue */ + if (ngx_event_flags & NGX_HAVE_CLEAR_EVENT) { /* kqueue */ event = NGX_CLEAR_EVENT; - } else { /* select, poll, /dev/poll */ + } else { /* select, poll, /dev/poll */ event = NGX_LEVEL_EVENT; } /* TODO: aio, iocp */ if (ngx_add_event(rev, NGX_READ_EVENT, event) != NGX_OK) { - return NGX_ERROR; + return NGX_HTTP_INTERNAL_SERVER_ERROR; } } -#endif /* USE_KQUEUE */ - wev->event_handler = rev->event_handler = ngx_http_proxy_process_upstream_event; @@ -645,14 +629,6 @@ static int ngx_http_proxy_connect(ngx_ht wev->timer_set = 1; ngx_add_timer(wev, p->lcf->connect_timeout); -#if (USE_KQUEUE) - - if (ngx_add_event(wev, NGX_WRITE_EVENT, NGX_CLEAR_EVENT) != NGX_OK) { - return NGX_ERROR; - } - -#else - /* TODO: aio, iocp */ if (ngx_event_flags & NGX_HAVE_EDGE_EVENT) { @@ -660,11 +636,9 @@ static int ngx_http_proxy_connect(ngx_ht } if (ngx_add_event(wev, NGX_WRITE_EVENT, event) != NGX_OK) { - return NGX_ERROR; + return NGX_HTTP_INTERNAL_SERVER_ERROR; } -#endif /* USE_KQUEUE */ - return NGX_DONE; } @@ -1123,436 +1097,6 @@ static int ngx_http_proxy_write_upstream } -#if 0 -static int ngx_http_proxy_read_upstream_body(ngx_event_t *rev) -{ - int rc, n, size, eof; - ngx_hunk_t *h; - ngx_chain_t *chain, *ce, *tce; - ngx_event_t *wev; - ngx_connection_t *c; - ngx_http_request_t *r; - ngx_http_proxy_ctx_t *p; - - c = (ngx_connection_t *) rev->data; - r = (ngx_http_request_t *) c->data; - p = (ngx_http_proxy_ctx_t *) - ngx_http_get_module_ctx(r, ngx_http_proxy_module_ctx); - - eof = 0; - - for ( ;; ) { - -#if (USE_KQUEUE) - - if (ev->eof && ev->available == 0) { - eof = 1; - break; - } - -#elif (HAVE_KQUEUE0) - - if (ngx_event_type == NGX_HAVE_KQUEUE_EVENT - && ev->eof && ev->available == 0) - { - eof = 1; - break; - } - -#endif - - /* use the free hunks if they exist */ - - if (p->free_hunks) { - chain = p->free_hunks; - p->free_hunks = NULL; - - /* allocate a new hunk if it's still allowed */ - - } else if (p->allocated < p->lcf->max_block_size) { - ngx_test_null(h, - ngx_create_temp_hunk(r->pool, p->block_size, 50, 50), - NGX_ERROR); - - p->allocated += p->block_size; - - ngx_test_null(tce, ngx_create_chain_entry(r->pool), NGX_ERROR); - tce->hunk = h; - tce->next = NULL; - chain = tce; - - /* use the shadow hunks if they exist */ - - } else if (p->shadow_hunks) { - chain = p->shadow_hunks; - p->shadow_hunks = NULL; - - /* write all the incoming hunks or the first hunk only - to a temporary file and convert them to the shadow hunks */ - - } else { - if (r->cachable) { - rc = ngx_http_proxy_write_chain_to_temp_file(p); - if (rc != NGX_OK) { - return rc; - } - - } else { - tce = p->in_hunks->next; - p->in_hunks->next = NULL; - - rc = ngx_http_proxy_write_chain_to_temp_file(p); - if (rc != NGX_OK) { - p->in_hunks = tce; - return rc; - } - - p->in_hunks = tce; - } - } - - n = ngx_recv_chain(c, chain); - - if (n == NGX_ERROR) { - return NGX_ERROR; - } - - if (n == NGX_AGAIN) { - return NGX_AGAIN; - } - - if (n == 0) { - eof = 1; - break; - } - - for (ce = chain; ce && n > 0; ce = ce->next) { - ngx_test_null(tce, ngx_create_chain_entry(r->pool), NGX_ERROR); - tce->hunk = ce->hunk; - tce->next = NULL; - - if (p->last_in_hunk) { - p->last_in_hunk->next = tce; - p->last_in_hunk = tce; - - } else { - p->last_in_hunk = tce; - } - - size = ce->hunk->end - ce->hunk->last; - - if (n >= size) { - n -= size; - ce->hunk->last = ce->hunk->end; - if (ce->hunk->shadow) { - ce->hunk->shadow->type &= ~(NGX_HUNK_TEMP - |NGX_HUNK_IN_MEMORY - |NGX_HUNK_RECYCLED); - ce->hunk->shadow->shadow = NULL; - - } - - continue; - } - - ce->hunk->last += n; - if (ce->hunk->shadow) { - ce->hunk->shadow->type &= ~(NGX_HUNK_TEMP - |NGX_HUNK_IN_MEMORY - |NGX_HUNK_RECYCLED); - ce->hunk->shadow->shadow = NULL; - } - - break; - } - - if (ce) { - ce->next = p->free_hunks; - p->free_hunks = ce; - break; - } - } - - wev = p->request->connection->write; - - if (r->cachable) { - if (p->in_hunks) { - rc = ngx_http_proxy_write_chain_to_temp_file(p); - if (rc != NGX_OK) { - return rc; - } - } - - if (p->out_hunks && wev->ready) { - return ngx_http_proxy_write_upstream_body(wev); - } - - } else { - if ((p->out_hunks || p->in_hunks) && wev->ready) { - return ngx_http_proxy_write_upstream_body(wev); - } - } - - return NGX_OK; -} - - -static int ngx_http_proxy_write_chain_to_temp_file(ngx_http_proxy_ctx_t *p) -{ - int i, rc; - ngx_hunk_t *h; - ngx_chain_t *ce, *tce; - - if (p->temp_file->fd == NGX_INVALID_FILE) { - rc = ngx_create_temp_file(p->temp_file, p->lcf->temp_path, - p->request->pool, - 0, 2, - p->request->cachable); - - if (rc != NGX_OK) { - return rc; - } - - if (p->lcf->temp_file_warn) { - ngx_log_error(NGX_LOG_WARN, p->log, 0, - "an upstream response is buffered " - "to a temporary file"); - } - } - - if (ngx_write_chain_to_file(p->temp_file, p->in_hunks, - p->temp_offset, p->request->pool) == NGX_ERROR) { - return NGX_ERROR; - } - - for (ce = p->in_hunks; ce; ce = ce->next) { - ngx_test_null(h, ngx_pcalloc(p->request->pool, sizeof(ngx_hunk_t)), - NGX_ERROR); - - h->type = NGX_HUNK_FILE - |NGX_HUNK_TEMP|NGX_HUNK_IN_MEMORY|NGX_HUNK_RECYCLED; - - ce->hunk->shadow = h; - h->shadow = ce->hunk; - - h->file_pos = p->temp_offset; - p->temp_offset += ce->hunk->last - ce->hunk->pos; - h->file_last = p->temp_offset; - - h->file->fd = p->temp_file->fd; - h->file->log = p->log; - - h->pos = ce->hunk->pos; - h->last = ce->hunk->last; - h->start = ce->hunk->start; - h->end = ce->hunk->end; - h->pre_start = ce->hunk->pre_start; - h->post_end = ce->hunk->post_end; - - ngx_test_null(tce, ngx_create_chain_entry(p->request->pool), NGX_ERROR); - tce->hunk = h; - tce->next = NULL; - - if (p->last_out_hunk) { - p->last_out_hunk->next = tce; - p->last_out_hunk = tce; - - } else { - p->last_out_hunk = tce; - } - } - - return NGX_OK; -} - -static int ngx_http_proxy_write_upstream_body(ngx_event_t *wev) -{ - int rc; - ngx_hunk_t *h, *sh; - ngx_chain_t *ce; - ngx_connection_t *c; - ngx_http_request_t *r; - ngx_http_proxy_ctx_t *p; - - c = (ngx_connection_t *) wev->data; - r = (ngx_http_request_t *) c->data; - p = (ngx_http_proxy_ctx_t *) - ngx_http_get_module_ctx(r, ngx_http_proxy_module_ctx); - - while (p->out_hunks) { - h = p->out_hunks->hunk; - rc = ngx_http_output_filter(r, h); - - if (rc == NGX_ERROR) { - return NGX_ERROR; - } - - if (rc == NGX_AGAIN || h->pos < h->last) { - return NGX_AGAIN; - } - - p->out_hunks = p->out_hunks->next; - - /* if the complete hunk has a shadow hunk - then add a shadow hunk to p->free_hunks chain */ - - sh = h->shadow; - - if (sh) { - sh->pos = sh->last = sh->start; - ngx_test_null(ce, ngx_create_chain_entry(r->pool), NGX_ERROR); - ce->hunk = sh; - ce->next = p->free_hunks; - p->free_hunks = ce; - } - } - - return NGX_OK; -} - - -#endif - - - - -static int ngx_http_proxy_read_response_body(ngx_event_t *ev) -{ - int n; - char *buf; - size_t left, size; - ngx_hunk_t *h, **ph; - ngx_connection_t *c; - ngx_http_request_t *r; - ngx_http_proxy_ctx_t *p; - - if (ev->timedout) { - return NGX_ERROR; - } - - c = (ngx_connection_t *) ev->data; - r = (ngx_http_request_t *) c->data; - p = (ngx_http_proxy_ctx_t *) - ngx_http_get_module_ctx(r, ngx_http_proxy_module_ctx); - - if (p->hunks.nelts > 0) { - h = ((ngx_hunk_t **) p->hunks.elts)[p->hunks.nelts - 1]; - left = h->end - h->last; - - } else { - h = NULL; - left = 0; - } - - do { - -#if (USE_KQUEUE) - - /* do not allocate new block if there is EOF */ - if (ev->eof && ev->available == 0) { - left = 1; - } - -#elif (HAVE_KQUEUE) - - if (ngx_event_type == NGX_HAVE_KQUEUE_EVENT) { - /* do not allocate new block if there is EOF */ - if (ev->eof && ev->available == 0) { - left = 1; - } - } - -#endif - - if (left == 0) { - ngx_test_null(ph, ngx_push_array(&p->hunks), NGX_ERROR); - ngx_test_null(h, - ngx_create_temp_hunk(r->pool, - /* STUB */ 4096 /**/, 0, 0), - NGX_ERROR); - - h->type = NGX_HUNK_MEMORY|NGX_HUNK_IN_MEMORY; - *ph = h; - } - - if (h != NULL) { - buf = h->last; - size = h->end - h->last; - - } else { - buf = (char *) &buf; - size = 0; - } - - n = ngx_event_recv(c, buf, size); - - ngx_log_debug(c->log, "READ:%d" _ n); - - if (n == NGX_AGAIN) { - return NGX_DONE; - } - - if (n == NGX_ERROR) { - return NGX_ERROR; - } - - h->last += n; - left = h->end - h->last; - - /* STUB */ - *h->last = '\0'; - ngx_log_debug(c->log, "PROXY:\n'%s'" _ h->pos); - /**/ - - } while (n > 0 && left == 0); - - if (n == 0) { - ngx_log_debug(c->log, "CLOSE proxy"); -#if 0 - ngx_del_event(ev, NGX_READ_EVENT, NGX_CLOSE_EVENT); -#endif - ngx_event_close_connection(ev); - - p->hunk_n = 0; - c->write->event_handler = ngx_http_proxy_write_to_client; - return ngx_http_proxy_write_to_client(c->write); - } - - /* STUB */ return NGX_DONE; -} - - -static int ngx_http_proxy_write_to_client(ngx_event_t *ev) -{ - int rc; - ngx_hunk_t *h; - ngx_connection_t *c; - ngx_http_request_t *r; - ngx_http_proxy_ctx_t *p; - - c = (ngx_connection_t *) ev->data; - r = (ngx_http_request_t *) c->data; - p = (ngx_http_proxy_ctx_t *) - ngx_http_get_module_ctx(r, ngx_http_proxy_module_ctx); - - do { - h = ((ngx_hunk_t **) p->hunks.elts)[p->hunk_n]; - - rc = ngx_http_output_filter(r, h); - if (rc != NGX_OK) { - return rc; - } - - if (p->hunk_n >= p->hunks.nelts) { - break; - } - - p->hunk_n++; - - } while (rc == NGX_OK); - - return NGX_OK; -} static int ngx_http_proxy_finalize_request(ngx_http_proxy_ctx_t *p, int error)