# HG changeset patch # User Igor Sysoev # Date 1066593443 0 # Node ID ad5f382c9e7df685e90555c6425f1bd3ae05b3e3 # Parent 86404ba5c517627e3cdc2b3318b23874dcbee49d nginx-0.0.1-2003-10-19-23:57:23 import diff --git a/src/event/ngx_event_proxy.c b/src/event/ngx_event_proxy.c --- a/src/event/ngx_event_proxy.c +++ b/src/event/ngx_event_proxy.c @@ -12,14 +12,48 @@ ngx_inline static void ngx_add_after_par ngx_chain_t *ce); + +int ngx_event_proxy(ngx_event_proxy_t *p, int do_write) +{ + for ( ;; ) { + if (do_write) { + if (ngx_event_proxy_write_to_downstream(p) == NGX_ABORT) { + return NGX_ABORT; + } + } + + p->read = 0; + + if (ngx_event_proxy_read_upstream(p) == NGX_ABORT) { + return NGX_ABORT; + } + + if (!p->read) { + break; + } + + do_write = 1; + } + + if (ngx_handle_read_event(p->upstream->read) == NGX_ERROR) { + return NGX_ABORT; + } + + if (ngx_handle_write_event(p->downstream->write, + /* TODO: lowat */ 0) == NGX_ERROR) { + return NGX_ABORT; + } + + return NGX_OK; +} + + int ngx_event_proxy_read_upstream(ngx_event_proxy_t *p) { int n, rc, size; ngx_hunk_t *h; ngx_chain_t *chain, *ce, *te; - p->upstream_level++; - ngx_log_debug(p->log, "read upstream: %d" _ p->upstream->read->ready); while (p->preread_hunks @@ -29,6 +63,7 @@ int ngx_event_proxy_read_upstream(ngx_ev /* use the pre-read hunks if they exist */ + p->read = 1; chain = p->preread_hunks; p->preread_hunks = NULL; n = p->preread_size; @@ -57,6 +92,7 @@ int ngx_event_proxy_read_upstream(ngx_ev } else if (p->upstream->read->eof && p->upstream->read->available == 0) { p->upstream_eof = 1; + p->read = 1; break; } @@ -146,6 +182,9 @@ int ngx_event_proxy_read_upstream(ngx_ev break; } + /* TODO THINK about eof */ + p->read = 1; + if (n == 0) { p->free_raw_hunks = chain; p->upstream_eof = 1; @@ -190,33 +229,8 @@ int ngx_event_proxy_read_upstream(ngx_ev p->free_raw_hunks = p->free_raw_hunks->next; } - if (p->cachable) { - if (p->in) { - rc = ngx_event_proxy_write_chain_to_temp_file(p); - - if (rc != NGX_OK) { - return rc; - } - } - - if (p->out && p->downstream->write->ready) { - if (ngx_event_proxy_write_to_downstream(p) == NGX_ABORT) { - return NGX_ABORT; - } - } - - } else if ((p->out || p->in) && p->downstream->write->ready) { - if (ngx_event_proxy_write_to_downstream(p) == NGX_ABORT) { - return NGX_ABORT; - } - } - - p->upstream_level--; - -ngx_log_debug(p->log, "upstream level: %d" _ p->upstream_level); - - if (p->upstream_level == 0) { - if (ngx_handle_read_event(p->upstream->read) == NGX_ERROR) { + if (p->cachable && p->in) { + if (ngx_event_proxy_write_chain_to_temp_file(p) == NGX_ABORT) { return NGX_ABORT; } } @@ -231,7 +245,20 @@ int ngx_event_proxy_write_to_downstream( ngx_hunk_t *h; ngx_chain_t *out, *ce, *te; - while (p->downstream->write->ready) { + ngx_log_debug(p->log, "write downstream: %d" _ p->downstream->write->ready); + + for ( ;; ) { + + if ((p->upstream_eof || p->upstream_error || p->upstream_done) + && p->out == NULL && p->in == NULL) + { + p->downstream_done = 1; + break; + } + + if (!p->downstream->write->ready) { + break; + } if (p->out) { out = p->out; @@ -262,6 +289,10 @@ int ngx_event_proxy_write_to_downstream( rc = p->output_filter(p->output_ctx, out->hunk); + if (rc == NGX_ERROR) { + /* TODO */ + } + ngx_chain_update_chains(&p->free, &p->busy, &out); /* calculate p->busy_len */ @@ -287,18 +318,12 @@ int ngx_event_proxy_write_to_downstream( ce->hunk->shadow = NULL; } +#if 0 /* TODO THINK p->read_priority ??? */ if (p->upstream->read->ready) { - if (ngx_event_proxy_read_upstream(p) == NGX_ERROR) { - return NGX_ABORT; - } + return; } - } +#endif - if (p->downstream_level == 0) { - if (ngx_handle_write_event(p->downstream->write, - /* TODO: lowat */ 0) == NGX_ERROR) { - return NGX_ABORT; - } } if (p->upstream_done && p->in == NULL && p->out == NULL) { @@ -313,7 +338,7 @@ static int ngx_event_proxy_write_chain_t { int rc, size; ngx_hunk_t *h; - ngx_chain_t *ce, *next, *in, **last; + ngx_chain_t *ce, *te, *next, *in, **last, **last_free; ngx_log_debug(p->log, "write to file"); @@ -362,6 +387,13 @@ static int ngx_event_proxy_write_chain_t return NGX_ABORT; } + for (last_free = &p->free_raw_hunks; + *last_free != NULL; + last_free = &(*last)->next) + { + /* void */ + } + for (ce = p->in; ce; ce = next) { next = ce->next; ce->next = NULL; @@ -373,11 +405,14 @@ static int ngx_event_proxy_write_chain_t p->temp_offset += h->last - h->pos; h->file_last = p->temp_offset; + ngx_chain_add_ce(p->out, p->last_out, ce); + if (h->type & NGX_HUNK_LAST_SHADOW) { h->shadow->last = h->shadow->pos = h->shadow->start; + ngx_alloc_ce_and_set_hunk(te, h->shadow, p->pool, NGX_ABORT); + *last_free = te; + last_free = &te->next; } - - ngx_chain_add_ce(p->out, p->last_out, ce); } p->in = in; diff --git a/src/event/ngx_event_proxy.h b/src/event/ngx_event_proxy.h --- a/src/event/ngx_event_proxy.h +++ b/src/event/ngx_event_proxy.h @@ -36,6 +36,7 @@ struct ngx_event_proxy_s { ngx_event_proxy_output_filter_pt output_filter; void *output_ctx; + unsigned read:1; unsigned cachable:1; unsigned upstream_done:1; unsigned upstream_eof:1; @@ -43,9 +44,6 @@ struct ngx_event_proxy_s { unsigned downstream_done:1; unsigned downstream_error:1; - int upstream_level; - int downstream_level; - int hunks; ngx_bufs_t bufs; @@ -71,9 +69,12 @@ struct ngx_event_proxy_s { }; +int ngx_event_proxy(ngx_event_proxy_t *p, int do_write); +int ngx_event_proxy_copy_input_filter(ngx_event_proxy_t *p, ngx_hunk_t *hunk); + +/* STUB */ int ngx_event_proxy_read_upstream(ngx_event_proxy_t *p); int ngx_event_proxy_write_to_downstream(ngx_event_proxy_t *p); -int ngx_event_proxy_copy_input_filter(ngx_event_proxy_t *p, ngx_hunk_t *hunk); #endif /* _NGX_EVENT_PROXY_H_INCLUDED_ */ diff --git a/src/http/modules/proxy/ngx_http_proxy_handler.c b/src/http/modules/proxy/ngx_http_proxy_handler.c --- a/src/http/modules/proxy/ngx_http_proxy_handler.c +++ b/src/http/modules/proxy/ngx_http_proxy_handler.c @@ -775,7 +775,7 @@ static void ngx_http_proxy_process_upstr return; } - if (ngx_event_proxy_read_upstream(p->event_proxy) == NGX_ABORT) { + if (ngx_event_proxy(p->event_proxy, 0) == NGX_ABORT) { ngx_http_proxy_finalize_request(p, 0); return; } @@ -784,7 +784,8 @@ static void ngx_http_proxy_process_upstr || p->event_proxy->upstream_error || p->event_proxy->upstream_done) { - ngx_http_proxy_finalize_request(p, ngx_http_send_last(p->request)); + ngx_http_proxy_close_connection(c); + p->upstream.connection = NULL; return; } @@ -795,11 +796,13 @@ static void ngx_http_proxy_process_upstr static void ngx_http_proxy_process_downstream(ngx_event_t *wev) { ngx_connection_t *c; + ngx_http_request_t *r; ngx_http_proxy_ctx_t *p; c = wev->data; - p = c->data; - + r = c->data; + p = ngx_http_get_module_ctx(r, ngx_http_proxy_module); + ngx_log_debug(wev->log, "http proxy process downstream"); if (wev->timedout) { @@ -807,13 +810,15 @@ static void ngx_http_proxy_process_downs return; } - if (ngx_event_proxy_write_to_downstream(p->event_proxy) == NGX_ABORT) { + if (ngx_event_proxy(p->event_proxy, 1) == NGX_ABORT) { ngx_http_proxy_finalize_request(p, 0); return; } if (p->event_proxy->downstream_done) { - ngx_http_proxy_finalize_request(p, 0); +ngx_log_debug(wev->log, "http proxy downstream done"); + ngx_http_proxy_finalize_request(p, r->main ? 0: + ngx_http_send_last(p->request)); return; } diff --git a/src/os/unix/ngx_readv_chain.c b/src/os/unix/ngx_readv_chain.c --- a/src/os/unix/ngx_readv_chain.c +++ b/src/os/unix/ngx_readv_chain.c @@ -17,6 +17,8 @@ ssize_t ngx_readv_chain(ngx_connection_t ngx_init_array(io, c->pool, 10, sizeof(struct iovec), NGX_ERROR); + /* TODO: coalesce the neighbouring chain entries */ + while (entry) { ngx_test_null(iov, ngx_push_array(&io), NGX_ERROR); iov->iov_base = entry->hunk->pos;