# HG changeset patch # User Igor Sysoev # Date 1050339898 0 # Node ID 869b10be682f6320e9fdc6a7d6a4c28dab60613f # Parent 17ab1af8c3dd89b38b717f4a629ae954d4db80e0 nginx-0.0.1-2003-04-14-21:04:58 import diff --git a/src/core/ngx_connection.h b/src/core/ngx_connection.h --- a/src/core/ngx_connection.h +++ b/src/core/ngx_connection.h @@ -85,6 +85,7 @@ extern ngx_chain_t *(*ngx_write_chain_pr (ngx_connection_t *c, ngx_chain_t *in); +ssize_t ngx_recv_chain(ngx_connection_t *c, ngx_chain_t *ce); ngx_chain_t *ngx_write_chain(ngx_connection_t *c, ngx_chain_t *in, off_t flush); diff --git a/src/core/ngx_file.c b/src/core/ngx_file.c --- a/src/core/ngx_file.c +++ b/src/core/ngx_file.c @@ -28,7 +28,7 @@ int ngx_create_temp_file(ngx_file_t *fil for ( ;; ) { snprintf(file->name.data + path->name.len + 1 + path->len, 11, - "%010d", num); + "%010u", num); ngx_create_hashed_filename(file, path); @@ -49,6 +49,8 @@ int ngx_create_temp_file(ngx_file_t *fil file->fd = ngx_open_tempfile(file->name.data, persistent); +ngx_log_debug(file->log, "temp fd: %d" _ file->fd); + if (file->fd != NGX_INVALID_FILE) { return NGX_OK; } @@ -56,7 +58,7 @@ int ngx_create_temp_file(ngx_file_t *fil err = ngx_errno; if (err == NGX_EEXIST) { - num *= step; + num = (num + 1) * step; continue; } diff --git a/src/core/ngx_hunk.h b/src/core/ngx_hunk.h --- a/src/core/ngx_hunk.h +++ b/src/core/ngx_hunk.h @@ -11,26 +11,27 @@ /* hunk type */ /* the hunk is in memory */ -#define NGX_HUNK_IN_MEMORY 0x0001 +#define NGX_HUNK_IN_MEMORY 0x0001 /* the hunk's content can be changed */ -#define NGX_HUNK_TEMP 0x0002 +#define NGX_HUNK_TEMP 0x0002 /* the hunk's content is in cache and can not be changed */ -#define NGX_HUNK_MEMORY 0x0004 +#define NGX_HUNK_MEMORY 0x0004 /* the hunk's content is mmap()ed and can not be changed */ -#define NGX_HUNK_MMAP 0x0008 +#define NGX_HUNK_MMAP 0x0008 -#define NGX_HUNK_RECYCLED 0x0010 +#define NGX_HUNK_RECYCLED 0x0010 /* the hunk is in file */ -#define NGX_HUNK_FILE 0x0100 +#define NGX_HUNK_FILE 0x0100 /* hunk flags */ /* in thread state flush means to write the hunk completely before return */ /* in event state flush means to start to write the hunk */ -#define NGX_HUNK_FLUSH 0x1000 +#define NGX_HUNK_FLUSH 0x1000 /* last hunk */ -#define NGX_HUNK_LAST 0x2000 +#define NGX_HUNK_LAST 0x2000 +#define NGX_HUNK_LAST_SHADOW 0x4000 @@ -69,7 +70,12 @@ struct ngx_chain_s { ngx_hunk_t *ngx_create_temp_hunk(ngx_pool_t *pool, int size, int before, int after); +#define ngx_alloc_hunk(pool) ngx_palloc(pool, sizeof(ngx_hunk_t)) +#define ngx_alloc_chain_entry(pool) ngx_palloc(pool, sizeof(ngx_chain_t)) + +/* STUB */ #define ngx_create_chain_entry(pool) ngx_palloc(pool, sizeof(ngx_chain_t)) +/**/ #define ngx_add_hunk_to_chain(chain, h, pool, error) \ do { \ diff --git a/src/event/ngx_event.h b/src/event/ngx_event.h --- a/src/event/ngx_event.h +++ b/src/event/ngx_event.h @@ -178,6 +178,7 @@ typedef struct { #define NGX_USE_LEVEL_EVENT 0x00010000 +#define NGX_USE_AIO_EVENT 0x00020000 /* Event filter is deleted before closing file. diff --git a/src/event/ngx_event_accept.c b/src/event/ngx_event_accept.c --- a/src/event/ngx_event_accept.c +++ b/src/event/ngx_event_accept.c @@ -119,8 +119,10 @@ int ngx_event_accept(ngx_event_t *ev) wev->write = 1; rev->first = wev->first = 1; -#if (HAVE_AIO_EVENT) - if (!(ngx_event_flags & NGX_HAVE_AIO_EVENT)) { +#if (USE_KQUEUE) + wev->ready = 1; +#else + if ((ngx_event_flags & NGX_USE_AIO_EVENT) == 0) { wev->ready = 1; } #endif diff --git a/src/event/ngx_event_proxy.c b/src/event/ngx_event_proxy.c new file mode 100644 --- /dev/null +++ b/src/event/ngx_event_proxy.c @@ -0,0 +1,510 @@ + +#include + + +int ngx_event_proxy_read_upstream(ngx_event_proxy_t *p) +{ + int n, rc, size; + ngx_hunk_t *h, *nh; + ngx_chain_t *chain, *temp, *entry, *next; + + p->level++; + +ngx_log_debug(p->log, "read upstream"); + + for ( ;; ) { + + /* use the free hunks if they exist */ + + if (p->free_hunks) { + chain = p->free_hunks; + p->free_hunks = NULL; + +ngx_log_debug(p->log, "free hunk: %08X:%d" _ chain->hunk _ + chain->hunk->end - chain->hunk->last); + + /* allocate a new hunk if it's still allowed */ + + } else if (p->allocated < p->max_block_size) { + ngx_test_null(h, + ngx_create_temp_hunk(p->pool, p->block_size, 20, 20), + NGX_ERROR); + + p->allocated += p->block_size; + + ngx_test_null(temp, ngx_alloc_chain_entry(p->pool), NGX_ERROR); + temp->hunk = h; + temp->next = NULL; + chain = temp; + +ngx_log_debug(p->log, "new hunk: %08X" _ chain->hunk); + + /* use the shadow hunks if they exist */ + + } else if (p->shadow_hunks) { + chain = p->shadow_hunks; + p->shadow_hunks = NULL; + +ngx_log_debug(p->log, "shadow hunk: %08X" _ chain->hunk); + + /* if it's allowed then save the incoming hunks to a temporary file, + move the saved hunks to a shadow chain, + and add the file hunks to an outgoing chain */ + + } else if (p->temp_offset < p->max_temp_size) { + rc = ngx_event_proxy_write_chain_to_temp_file(p); + +ngx_log_debug(p->log, "temp offset: %d" _ p->temp_offset); + + if (rc != NGX_OK) { + return rc; + } + + chain = p->shadow_hunks; + p->shadow_hunks = NULL; + +ngx_log_debug(p->log, "new shadow hunk: %08X" _ chain->hunk); + + /* if there're no hunks to read in then disable a level event */ + + } else { + if (ngx_event_flags & NGX_USE_LEVEL_EVENT) { + p->block_upstream = 1; + } + + break; + } + + n = ngx_recv_chain(p->upstream, chain); + +ngx_log_debug(p->log, "recv_chain: %d" _ n); + + if (n == NGX_ERROR) { + p->upstream_error = 1; + return NGX_ERROR; + } + + if (n == NGX_AGAIN) { + if (p->upstream->read->blocked) { + if (ngx_add_event(p->upstream->read, NGX_READ_EVENT, + NGX_LEVEL_EVENT) == NGX_ERROR) { + return NGX_ERROR; + } + p->upstream->read->blocked = 0; + } + + return NGX_AGAIN; + } + + if (n == 0) { + p->free_hunks = chain; + p->upstream_eof = 1; + p->block_upstream = 0; + break; + } + + /* move the full hunks to a read chain + and the partial filled hunk to a free chain + and remove the shadow links for these hunks */ + + for (entry = chain; entry && n > 0; entry = next) { + next = entry->next; + entry->next = NULL; + + if (entry->hunk->shadow) { + for (h = entry->hunk->shadow; + (h->type & NGX_HUNK_LAST_SHADOW) == 0; + h = nh) + { + nh = h->shadow; + h->shadow = NULL; + h->type &= ~(NGX_HUNK_TEMP + |NGX_HUNK_IN_MEMORY + |NGX_HUNK_RECYCLED); + } + + h->shadow = NULL; + h->type &= ~(NGX_HUNK_TEMP + |NGX_HUNK_IN_MEMORY + |NGX_HUNK_RECYCLED + |NGX_HUNK_LAST_SHADOW); + entry->hunk->shadow = NULL; + } + + size = entry->hunk->end - entry->hunk->last; + + if (n >= size) { + entry->hunk->last = entry->hunk->end; + + if (p->read_hunks) { + p->last_read_hunk->next = entry; + + } else { + p->read_hunks = entry; + } + + p->last_read_hunk = entry; + + n -= size; + + /* the copy input filter */ + + if (p->input_filter == NULL) { + ngx_test_null(h, ngx_alloc_hunk(p->pool), NGX_ERROR); + ngx_memcpy(h, entry->hunk, sizeof(ngx_hunk_t)); + h->shadow = entry->hunk; + h->type |= NGX_HUNK_LAST_SHADOW; + + ngx_test_null(temp, ngx_alloc_chain_entry(p->pool), + NGX_ERROR); + temp->hunk = h; + temp->next = NULL; + + if (p->in_hunks) { + p->last_in_hunk->next = temp; + + } else { + p->in_hunks = temp; + } + + p->last_in_hunk = temp; + } + + } else { + entry->hunk->last += n; + p->free_hunks = entry; + + n = 0; + } + } + +ngx_log_debug(p->log, "rest chain: %08X" _ entry); + + /* if the rest hunks are shadow then move them to a shadow chain + otherwise add them to a free chain */ + + if (entry) { + if (entry->hunk->shadow) { + p->shadow_hunks = entry; + + } else { + if (p->free_hunks) { + p->free_hunks->next = entry; + + } else { + p->free_hunks = entry; + } + } + + p->block_upstream = 0; + break; + } + + /* the input filter i.e. that moves HTTP/1.1 chunks + from a read chain to an incoming chain */ + + if (p->input_filter) { + if (p->input_filter(p) == NGX_ERROR) { + return NGX_ERROR; + } + } + } + +ngx_log_debug(p->log, "eof: %d block: %d" _ + p->upstream_eof _ p->block_upstream); + + /* if there's the end of upstream response then move + the partially filled hunk from a free chain to an incoming chain */ + + if (p->upstream_eof) { + p->upstream->read->ready = 0; + + if (p->free_hunks + && p->free_hunks->hunk->pos < p->free_hunks->hunk->last) + { + if (p->input_filter) { + if (p->input_filter(p) == NGX_ERROR) { + return NGX_ERROR; + } + + } else { + entry = p->free_hunks; + + if (p->in_hunks) { + p->last_in_hunk->next = entry; + + } else { + p->in_hunks = entry; + } + + p->last_in_hunk = entry; + } + + p->free_hunks = entry->next; + entry->next = NULL; + } + +#if 0 + /* free the unneeded hunks */ + + for (entry = p->free_hunks; entry; entry = ce->next) { + ngx_free_hunk(p->pool, entry->hunk); + } +#endif + + if (p->in_hunks) { + p->last_in_hunk->hunk->type |= NGX_HUNK_LAST; + + } else if (p->out_hunks) { + p->last_out_hunk->hunk->type |= NGX_HUNK_LAST; + } + } + + if (p->cachable) { + if (p->in_hunks) { + rc = ngx_event_proxy_write_chain_to_temp_file(p); + if (rc != NGX_OK) { + return rc; + } + } + + if (p->out_hunks && p->client->write->ready) { + rc = ngx_event_proxy_write_to_client(p); + } + + } else if ((p->out_hunks || p->in_hunks) && p->client->write->ready) { + rc = ngx_event_proxy_write_to_client(p); + } + + p->level--; + +ngx_log_debug(p->log, "level: %d" _ p->level); + + if (p->level == 0 && p->block_upstream) { + p->upstream->read->blocked = 1; + if (ngx_del_event(p->upstream->read, NGX_READ_EVENT, 0) == NGX_ERROR) { + return NGX_ERROR; + } + + return NGX_AGAIN; + } + + if (p->upstream_eof) { + return NGX_OK; + } else { + return NGX_AGAIN; + } +} + + +int ngx_event_proxy_write_to_client(ngx_event_proxy_t *p) +{ + int rc; + ngx_hunk_t *h; + ngx_chain_t *entry; + +ngx_log_debug(p->log, "write to client"); + + h = p->busy_hunk; + + for ( ;; ) { + + if (h == NULL) { + if (p->out_hunks) { + entry = p->out_hunks; + p->out_hunks = entry->next; + h = entry->hunk; + + if (p->shadow_hunks) { + if (p->shadow_hunks->hunk == h->shadow) { + p->shadow_hunks = p->shadow_hunks->next; + } + } + + entry->next = NULL; + + } else if (p->cachable == 0 && p->in_hunks) { + entry = p->in_hunks; + p->in_hunks = entry->next; + h = entry->hunk; + entry->next = NULL; + } + +ngx_log_debug(p->log, "event proxy write hunk: %08X:%08X" _ h _ h->pos); + + if (h == NULL) { + if (p->upstream->read->ready) { + rc = ngx_event_proxy_read_upstream(p); + } + + return NGX_OK; + } + } + +ngx_log_debug(p->log, "event proxy write: %d" _ h->last - h->pos); + + rc = p->output_filter(p->output_data, h); + +ngx_log_debug(p->log, "event proxy: %d" _ rc); + + if (rc == NGX_ERROR) { + return NGX_ERROR; + } + + if (rc == NGX_AGAIN + || (h->type & NGX_HUNK_IN_MEMORY && h->pos < h->last) + || (h->type & NGX_HUNK_FILE && h->file_pos < h->file_last)) + { + if (p->busy_hunk == NULL) { + p->busy_hunk = h; + } + return NGX_AGAIN; + } + + p->busy_hunk = NULL; + + /* if the complete hunk is the file hunk and it has a shadow hunk + then add a shadow hunk to a free chain */ + + if (h->type & NGX_HUNK_FILE) { + if (p->cachable == 0 && p->out_hunks == NULL) { + p->temp_offset = 0; + } + } + + if ((h->type & NGX_HUNK_LAST_SHADOW) == 0) { + h = NULL; + continue; + } + + + h->shadow->shadow = NULL; + h = h->shadow; + +#if 0 + /* free the unneeded hunk */ + + if (p->upstream_eof) { + ngx_free_hunk(p->pool, h); + continue; + } +#endif + + h->pos = h->last = h->start; + + entry->hunk = h; + + /* if the first hunk in a free chain is partially filled + then add the complete hunk after the first free hunk */ + + if (p->free_hunks + && p->free_hunks->hunk->start != p->free_hunks->hunk->last) + { + entry->next = p->free_hunks->next; + p->free_hunks->next = entry; + + } else { + entry->next = p->free_hunks; + p->free_hunks = entry; + } + + h = NULL; + } +} + + +int ngx_event_proxy_write_chain_to_temp_file(ngx_event_proxy_t *p) +{ + int i, rc, size; + ngx_hunk_t *h; + ngx_chain_t *entry, *next, *saved_in, *saved_read; + +ngx_log_debug(p->log, "write to file"); + + if (p->temp_file->fd == NGX_INVALID_FILE) { + rc = ngx_create_temp_file(p->temp_file, p->temp_path, p->pool, + p->number, p->random, p->cachable); + + if (rc != NGX_OK) { + return rc; + } + + if (p->cachable == 0 && p->temp_file_warn) { + ngx_log_error(NGX_LOG_WARN, p->log, 0, p->temp_file_warn); + } + } + + if (p->cachable == 0) { + + entry = p->read_hunks; + size = 0; + + do { + size += entry->hunk->last - entry->hunk->pos; + if (size >= p->file_block_size) { + break; + } + entry = entry->next; + + } while (entry); + + saved_read = entry->next; + entry->next = NULL; + + if (saved_read) { + for (entry = p->in_hunks; entry->next; entry = entry->next) { + if (entry->next->hunk->shadow == saved_read->hunk) { + break; + } + } + saved_in = entry->next; + entry->next = NULL; + + } else { + saved_in = NULL; + } + + } else { + saved_read = NULL; + saved_in = NULL; + } + + if (ngx_write_chain_to_file(p->temp_file, p->in_hunks, p->temp_offset, + p->pool) == NGX_ERROR) { + return NGX_ERROR; + } + + for (entry = p->in_hunks; entry; entry = next) { + next = entry->next; + entry->next = NULL; + + h = entry->hunk; + h->type |= NGX_HUNK_FILE; + h->file = p->temp_file; + h->file_pos = p->temp_offset; + p->temp_offset += h->last - h->pos; + h->file_last = p->temp_offset; + +ngx_log_debug(p->log, "event proxy file hunk: %08X:%08X" _ h _ h->pos); + + if (entry->hunk->type & NGX_HUNK_LAST_SHADOW) { + entry->hunk->shadow->last = entry->hunk->shadow->pos; + } + + if (p->out_hunks) { + p->last_out_hunk->next = entry; + + } else { + p->out_hunks = entry; + } + + p->last_out_hunk = entry; + } + + p->shadow_hunks = p->read_hunks; + + p->read_hunks = saved_read; + p->in_hunks = saved_in; + + return NGX_OK; +} diff --git a/src/event/ngx_event_proxy.h b/src/event/ngx_event_proxy.h new file mode 100644 --- /dev/null +++ b/src/event/ngx_event_proxy.h @@ -0,0 +1,72 @@ +#ifndef _NGX_EVENT_PROXY_H_INCLUDED_ +#define _NGX_EVENT_PROXY_H_INCLUDED_ + + +#include +#include +#include +#include +#include +#include + + +typedef struct ngx_event_proxy_s ngx_event_proxy_t; + +typedef int (*ngx_event_proxy_input_filter_pt)(ngx_event_proxy_t *p); +typedef int (*ngx_event_proxy_output_filter_pt)(void *data, ngx_hunk_t *hunk); + + +struct ngx_event_proxy_s { + ngx_chain_t *read_hunks; + ngx_chain_t *last_read_hunk; + ngx_chain_t *in_hunks; + ngx_chain_t *last_in_hunk; + ngx_chain_t *shadow_hunks; + ngx_chain_t *out_hunks; + ngx_chain_t *last_out_hunk; + ngx_chain_t *free_hunks; + ngx_hunk_t *busy_hunk; + + ngx_event_proxy_input_filter_pt input_filter; + void *input_data; + + ngx_event_proxy_output_filter_pt output_filter; + void *output_data; + + unsigned cachable:1; + unsigned block_upstream:1; + unsigned upstream_eof:1; + unsigned upstream_error:1; + unsigned client_eof:1; + unsigned client_error:1; + + int level; + + int allocated; + int block_size; + int max_block_size; + + off_t temp_offset; + off_t max_temp_size; + int file_block_size; + + ngx_connection_t *upstream; + ngx_connection_t *client; + + ngx_pool_t *pool; + ngx_log_t *log; + + ngx_file_t *temp_file; + ngx_path_t *temp_path; + int number; + int random; + char *temp_file_warn; +}; + + +int ngx_event_proxy_read_upstream(ngx_event_proxy_t *p); +int ngx_event_proxy_write_to_client(ngx_event_proxy_t *p); +int ngx_event_proxy_write_chain_to_temp_file(ngx_event_proxy_t *p); + + +#endif /* _NGX_EVENT_PROXY_H_INCLUDED_ */ diff --git a/src/http/modules/proxy/ngx_http_event_proxy_handler.c b/src/http/modules/proxy/ngx_http_event_proxy_handler.c --- a/src/http/modules/proxy/ngx_http_event_proxy_handler.c +++ b/src/http/modules/proxy/ngx_http_event_proxy_handler.c @@ -7,6 +7,7 @@ #include #include #include +#include #include #include @@ -15,6 +16,11 @@ #include +/* STUB */ +typedef struct { + int dummy; +} ngx_cache_header_t; + static int ngx_http_proxy_handler(ngx_http_request_t *r); static ngx_chain_t *ngx_http_proxy_create_request(ngx_http_proxy_ctx_t *p); @@ -30,8 +36,8 @@ static int ngx_http_proxy_process_upstre static int ngx_http_proxy_process_upstream_headers(ngx_http_proxy_ctx_t *p); static int ngx_http_proxy_process_upstream_header_line(ngx_http_proxy_ctx_t *p); - -static int ngx_http_proxy_write_upstream_body(ngx_event_t *wev); +static int ngx_http_proxy_read_upstream_body(ngx_http_proxy_ctx_t *p); +static int ngx_http_proxy_write_upstream_body(ngx_http_proxy_ctx_t *p); static int ngx_http_proxy_read_response_body(ngx_event_t *ev); @@ -715,7 +721,11 @@ static int ngx_http_proxy_init_upstream( r = p->request; ngx_test_null(p->header_in, - ngx_create_temp_hunk(r->pool, p->lcf->header_size, 0, 0), + ngx_create_temp_hunk(r->pool, + p->lcf->header_size + - sizeof(ngx_cache_header_t), + sizeof(ngx_cache_header_t), + 0), NGX_ERROR); p->header_in->type = NGX_HUNK_MEMORY|NGX_HUNK_IN_MEMORY; @@ -748,7 +758,9 @@ static int ngx_http_proxy_read_upstream_ int i, n, rc; ngx_event_t *rev; ngx_table_elt_t *ch, *ph; + ngx_event_proxy_t *ep; ngx_http_request_t *r; + ngx_http_proxy_log_ctx_t *lcx; rev = p->connection->read; @@ -833,10 +845,53 @@ static int ngx_http_proxy_read_upstream_ rc = ngx_http_send_header(r); +#if 1 + rc = ngx_http_output_filter(r, p->header_in); + + ngx_test_null(ep, ngx_pcalloc(r->pool, sizeof(ngx_event_proxy_t)), + NGX_ERROR); + + ep->output_filter = (ngx_event_proxy_output_filter_pt) + ngx_http_output_filter; + ep->output_data = r; + ep->block_size = p->lcf->block_size; + ep->max_block_size = p->lcf->max_block_size; + ep->file_block_size = p->lcf->file_block_size; + ep->upstream = p->connection; + ep->client = r->connection; + ep->pool = r->pool; + ep->log = p->log; + ep->temp_path = p->lcf->temp_path; + + ngx_test_null(ep->temp_file, ngx_palloc(r->pool, sizeof(ngx_file_t)), + NGX_ERROR); + ep->temp_file->fd = NGX_INVALID_FILE; + ep->temp_file->log = p->log; + + ep->number = 10; + ep->random = 5; + + ep->max_temp_size = 6000; + ep->temp_file_warn = "an upstream response is buffered " + "to a temporary file"; + + p->event_proxy = ep; + + lcx = p->log->data; + lcx->action = "reading an upstream"; + + p->state_read_upstream_handler = ngx_http_proxy_read_upstream_body; + p->state_write_upstream_handler = ngx_http_proxy_write_upstream_body; + + ngx_http_proxy_read_upstream_body(p); +#endif + +#if 0 /* STUB */ p->header_in->type |= NGX_HUNK_LAST; rc = ngx_http_output_filter(r, p->header_in); ngx_http_proxy_finalize_request(p, NGX_OK); +#endif /* STUB */ return NGX_DONE; } @@ -1030,11 +1085,32 @@ static int ngx_http_proxy_process_upstre } +static int ngx_http_proxy_read_upstream_body(ngx_http_proxy_ctx_t *p) +{ + int rc; + + rc = ngx_event_proxy_read_upstream(p->event_proxy); + if (rc == NGX_OK) { + rc = ngx_event_close_connection(p->connection->read); + } + + return rc; +} + + +static int ngx_http_proxy_write_upstream_body(ngx_http_proxy_ctx_t *p) +{ + return ngx_event_proxy_write_to_client(p->event_proxy); +} + + +#if 0 static int ngx_http_proxy_read_upstream_body(ngx_event_t *rev) { - int rc, n, size; + int rc, n, size, eof; ngx_hunk_t *h; - ngx_chain_t *chain, chain_entry, *ce, *te; + ngx_chain_t *chain, *ce, *tce; + ngx_event_t *wev; ngx_connection_t *c; ngx_http_request_t *r; ngx_http_proxy_ctx_t *p; @@ -1044,13 +1120,14 @@ static int ngx_http_proxy_read_upstream_ p = (ngx_http_proxy_ctx_t *) ngx_http_get_module_ctx(r, ngx_http_proxy_module_ctx); - chain_entry.next = NULL; + eof = 0; for ( ;; ) { #if (USE_KQUEUE) if (ev->eof && ev->available == 0) { + eof = 1; break; } @@ -1059,82 +1136,59 @@ static int ngx_http_proxy_read_upstream_ if (ngx_event_type == NGX_HAVE_KQUEUE_EVENT && ev->eof && ev->available == 0) { + eof = 1; break; } #endif + /* use the free hunks if they exist */ + if (p->free_hunks) { chain = p->free_hunks; p->free_hunks = NULL; + /* allocate a new hunk if it's still allowed */ + } else if (p->allocated < p->lcf->max_block_size) { ngx_test_null(h, ngx_create_temp_hunk(r->pool, p->block_size, 50, 50), NGX_ERROR); p->allocated += p->block_size; - chain_entry.hunk = h; - chain = &chain_entry; + + ngx_test_null(tce, ngx_create_chain_entry(r->pool), NGX_ERROR); + tce->hunk = h; + tce->next = NULL; + chain = tce; + + /* use the shadow hunks if they exist */ + + } else if (p->shadow_hunks) { + chain = p->shadow_hunks; + p->shadow_hunks = NULL; + + /* write all the incoming hunks or the first hunk only + to a temporary file and convert them to the shadow hunks */ } else { - if (p->temp_file->fd == NGX_INVALID_FILE) { - rc = ngx_create_temp_file(p->temp_file, p->lcf->temp_path, - r->pool, 0, 2, r->cachable); - + if (r->cachable) { + rc = ngx_http_proxy_write_chain_to_temp_file(p); if (rc != NGX_OK) { return rc; } - if (p->lcf->temp_file_warn) { - ngx_log_error(NGX_LOG_WARN, p->log, 0, - "an upstream response is buffered " - "to a temporary file"); + } else { + tce = p->in_hunks->next; + p->in_hunks->next = NULL; + + rc = ngx_http_proxy_write_chain_to_temp_file(p); + if (rc != NGX_OK) { + p->in_hunks = tce; + return rc; } - } - - n = ngx_write_chain_to_file(p->temp_file, p->in_hunks, - p->temp_offset, r->pool); - - if (n == NGX_ERROR) { - return NGX_ERROR; - } - - for (ce = p->in_hunks; ce; ce = ce->next) { - ngx_test_null(h, ngx_pcalloc(r->pool, sizeof(ngx_hunk_t)), - NGX_ERROR); - - h->type = NGX_HUNK_FILE - |NGX_HUNK_TEMP|NGX_HUNK_IN_MEMORY|NGX_HUNK_RECYCLED; - - ce->hunk->shadow = h; - h->shadow = ce->hunk; - - h->file_pos = p->temp_offset; - p->temp_offset += ce->hunk->last - ce->hunk->pos; - h->file_last = p->temp_offset; - - h->file->fd = p->temp_file->fd; - h->file->log = p->log; - - h->pos = ce->hunk->pos; - h->last = ce->hunk->last; - h->start = ce->hunk->start; - h->end = ce->hunk->end; - h->pre_start = ce->hunk->pre_start; - h->post_end = ce->hunk->post_end; - - ngx_test_null(te, ngx_create_chain_entry(r->pool), NGX_ERROR); - te->hunk = h; - te->next = NULL; - - if (p->last_out_hunk) { - p->last_out_hunk->next = te; - p->last_out_hunk = te; - - } else { - p->last_out_hunk = te; - } + + p->in_hunks = tce; } } @@ -1149,20 +1203,21 @@ static int ngx_http_proxy_read_upstream_ } if (n == 0) { + eof = 1; break; } for (ce = chain; ce && n > 0; ce = ce->next) { - ngx_test_null(te, ngx_create_chain_entry(r->pool), NGX_ERROR); - te->hunk = ce->hunk; - te->next = NULL; + ngx_test_null(tce, ngx_create_chain_entry(r->pool), NGX_ERROR); + tce->hunk = ce->hunk; + tce->next = NULL; if (p->last_in_hunk) { - p->last_in_hunk->next = te; - p->last_in_hunk = te; + p->last_in_hunk->next = tce; + p->last_in_hunk = tce; } else { - p->last_in_hunk = te; + p->last_in_hunk = tce; } size = ce->hunk->end - ce->hunk->last; @@ -1196,20 +1251,101 @@ static int ngx_http_proxy_read_upstream_ ce->next = p->free_hunks; p->free_hunks = ce; break; - - return NGX_OK; } } - if (p->out_hunks && p->request->connection->write->ready) { - return - ngx_http_proxy_write_upstream_body(p->request->connection->write); + wev = p->request->connection->write; + + if (r->cachable) { + if (p->in_hunks) { + rc = ngx_http_proxy_write_chain_to_temp_file(p); + if (rc != NGX_OK) { + return rc; + } + } + + if (p->out_hunks && wev->ready) { + return ngx_http_proxy_write_upstream_body(wev); + } + + } else { + if ((p->out_hunks || p->in_hunks) && wev->ready) { + return ngx_http_proxy_write_upstream_body(wev); + } } return NGX_OK; } +static int ngx_http_proxy_write_chain_to_temp_file(ngx_http_proxy_ctx_t *p) +{ + int i, rc; + ngx_hunk_t *h; + ngx_chain_t *ce, *tce; + + if (p->temp_file->fd == NGX_INVALID_FILE) { + rc = ngx_create_temp_file(p->temp_file, p->lcf->temp_path, + p->request->pool, + 0, 2, + p->request->cachable); + + if (rc != NGX_OK) { + return rc; + } + + if (p->lcf->temp_file_warn) { + ngx_log_error(NGX_LOG_WARN, p->log, 0, + "an upstream response is buffered " + "to a temporary file"); + } + } + + if (ngx_write_chain_to_file(p->temp_file, p->in_hunks, + p->temp_offset, p->request->pool) == NGX_ERROR) { + return NGX_ERROR; + } + + for (ce = p->in_hunks; ce; ce = ce->next) { + ngx_test_null(h, ngx_pcalloc(p->request->pool, sizeof(ngx_hunk_t)), + NGX_ERROR); + + h->type = NGX_HUNK_FILE + |NGX_HUNK_TEMP|NGX_HUNK_IN_MEMORY|NGX_HUNK_RECYCLED; + + ce->hunk->shadow = h; + h->shadow = ce->hunk; + + h->file_pos = p->temp_offset; + p->temp_offset += ce->hunk->last - ce->hunk->pos; + h->file_last = p->temp_offset; + + h->file->fd = p->temp_file->fd; + h->file->log = p->log; + + h->pos = ce->hunk->pos; + h->last = ce->hunk->last; + h->start = ce->hunk->start; + h->end = ce->hunk->end; + h->pre_start = ce->hunk->pre_start; + h->post_end = ce->hunk->post_end; + + ngx_test_null(tce, ngx_create_chain_entry(p->request->pool), NGX_ERROR); + tce->hunk = h; + tce->next = NULL; + + if (p->last_out_hunk) { + p->last_out_hunk->next = tce; + p->last_out_hunk = tce; + + } else { + p->last_out_hunk = tce; + } + } + + return NGX_OK; +} + static int ngx_http_proxy_write_upstream_body(ngx_event_t *wev) { int rc; @@ -1256,7 +1392,7 @@ static int ngx_http_proxy_write_upstream } - +#endif @@ -1628,7 +1764,8 @@ static void *ngx_http_proxy_create_loc_c conf->read_timeout = 10000; conf->header_size = 1024; conf->block_size = 4096; - conf->max_block_size = 32768; + conf->max_block_size = 4096 * 3; + conf->file_block_size = 4096; ngx_test_null(conf->temp_path, ngx_pcalloc(pool, sizeof(ngx_path_t)), NULL); diff --git a/src/http/modules/proxy/ngx_http_event_proxy_handler.h b/src/http/modules/proxy/ngx_http_event_proxy_handler.h --- a/src/http/modules/proxy/ngx_http_event_proxy_handler.h +++ b/src/http/modules/proxy/ngx_http_event_proxy_handler.h @@ -4,6 +4,7 @@ #include #include +#include #include @@ -70,6 +71,7 @@ typedef struct { int block_size; int max_block_size; + int file_block_size; ngx_path_t *temp_path; int temp_file_warn; @@ -94,9 +96,13 @@ typedef struct { typedef struct ngx_http_proxy_ctx_s ngx_http_proxy_ctx_t; struct ngx_http_proxy_ctx_s { + ngx_event_proxy_t *event_proxy; + ngx_chain_t *in_hunks; ngx_chain_t *last_in_hunk; + ngx_chain_t *shadow_hunks; + ngx_chain_t *out_hunks; ngx_chain_t *last_out_hunk; diff --git a/src/http/ngx_http_core_module.c b/src/http/ngx_http_core_module.c --- a/src/http/ngx_http_core_module.c +++ b/src/http/ngx_http_core_module.c @@ -674,6 +674,8 @@ static char *ngx_server_block(ngx_conf_t } } +ngx_log_debug(cf->pool->log, "main merge"); + if (module->merge_loc_conf) { if (module->merge_loc_conf(cf->pool, prev->loc_conf[module->index], @@ -682,6 +684,8 @@ static char *ngx_server_block(ngx_conf_t return NGX_CONF_ERROR; } +ngx_log_debug(cf->pool->log, "server merge"); + for (j = 0; j < scf->locations.nelts; j++) { if (module->merge_loc_conf(cf->pool, ctx->loc_conf[module->index], @@ -690,6 +694,7 @@ static char *ngx_server_block(ngx_conf_t return NGX_CONF_ERROR; } } +ngx_log_debug(cf->pool->log, "server merge done"); } } diff --git a/src/http/ngx_http_output_filter.c b/src/http/ngx_http_output_filter.c --- a/src/http/ngx_http_output_filter.c +++ b/src/http/ngx_http_output_filter.c @@ -265,7 +265,7 @@ static int ngx_http_output_filter_copy_h src->file_pos += n; } - if (src->type & NGX_HUNK_LAST && src->pos == src->last) { + if ((src->type & NGX_HUNK_LAST) && src->pos == src->last) { dst->type |= NGX_HUNK_LAST; } @@ -294,7 +294,7 @@ static int ngx_http_output_filter_copy_h src->file_pos += n; dst->last += n; - if (src->type & NGX_HUNK_LAST && src->file_pos == src->file_last) { + if ((src->type & NGX_HUNK_LAST) && src->file_pos == src->file_last) { dst->type |= NGX_HUNK_LAST; } } diff --git a/src/http/ngx_http_write_filter.c b/src/http/ngx_http_write_filter.c --- a/src/http/ngx_http_write_filter.c +++ b/src/http/ngx_http_write_filter.c @@ -122,8 +122,9 @@ int ngx_http_write_filter(ngx_http_reque ngx_http_write_filter_module_ctx); #if (NGX_DEBUG_WRITE_FILTER) - ngx_log_debug(r->connection->log, "write filter: last:%d flush:%d" _ - last _ flush); + ngx_log_debug(r->connection->log, + "write filter: last:%d flush:%qd size:%qd" _ + last _ flush _ size); #endif /* avoid the output if there is no last hunk, no flush point and @@ -164,6 +165,8 @@ static void *ngx_http_write_filter_creat conf->buffer_output = NGX_CONF_UNSET; +ngx_log_debug(pool->log, "write conf %08X %08X" _ conf _ conf->buffer_output); + return conf; } @@ -178,6 +181,8 @@ static char *ngx_http_write_filter_merge ngx_conf_size_merge(conf->buffer_output, prev->buffer_output, 1460); +ngx_log_debug(pool->log, "write merge %08X %08X %08X" _ prev _ conf _ conf->buffer_output); + return NULL; } diff --git a/src/os/unix/ngx_files.c b/src/os/unix/ngx_files.c --- a/src/os/unix/ngx_files.c +++ b/src/os/unix/ngx_files.c @@ -11,7 +11,8 @@ ssize_t ngx_read_file(ngx_file_t *file, { ssize_t n; - ngx_log_debug(file->log, "read: %x, %d, %qd" _ buf _ size _ offset); + ngx_log_debug(file->log, "read: %d, %x, %d, %qd" _ + file->fd _ buf _ size _ offset); n = pread(file->fd, buf, size, offset); diff --git a/src/os/unix/ngx_files.h b/src/os/unix/ngx_files.h --- a/src/os/unix/ngx_files.h +++ b/src/os/unix/ngx_files.h @@ -22,7 +22,7 @@ #define ngx_close_file_n "close()" #define ngx_open_tempfile(name, persistent) \ - open(name, O_CREAT|O_EXCL|O_WRONLY, 0600) + open(name, O_CREAT|O_EXCL|O_RDWR, 0600) #define ngx_open_tempfile_n "open()" ssize_t ngx_read_file(ngx_file_t *file, char *buf, size_t size, off_t offset); diff --git a/src/os/unix/ngx_recv_chain.c b/src/os/unix/ngx_recv_chain.c --- a/src/os/unix/ngx_recv_chain.c +++ b/src/os/unix/ngx_recv_chain.c @@ -18,10 +18,12 @@ ssize_t ngx_recv_chain(ngx_connection_t while (ce) { ngx_test_null(iov, ngx_push_array(&io), NGX_ERROR); iov->iov_base = ce->hunk->pos; - iov->iov_len = ce->hunk->last - ce->hunk->pos; + iov->iov_len = ce->hunk->end - ce->hunk->last; ce = ce->next; } +ngx_log_debug(c->log, "recv: %d:%d" _ io.nelts _ iov->iov_len); + n = readv(c->fd, (struct iovec *) io.elts, io.nelts); ngx_destroy_array(&io); @@ -29,6 +31,7 @@ ssize_t ngx_recv_chain(ngx_connection_t if (n == -1) { c->read->ready = 0; + err = ngx_errno; if (err == NGX_EAGAIN) { ngx_log_error(NGX_LOG_INFO, c->log, err, "readv() returned EAGAIN"); return NGX_AGAIN;