# HG changeset patch # User Roman Arutyunyan # Date 1414488599 -10800 # Node ID 973ee2276300c49bb22810491b2faff50d431933 # Parent ec81934727a18a04b085f3c346a47aaed013111a Upstream: proxy_limit_rate and friends. The directives limit the upstream read rate. For example, "proxy_limit_rate 42" limits proxy upstream read rate to 42 bytes per second. diff --git a/src/event/ngx_event_pipe.c b/src/event/ngx_event_pipe.c --- a/src/event/ngx_event_pipe.c +++ b/src/event/ngx_event_pipe.c @@ -66,11 +66,13 @@ ngx_event_pipe(ngx_event_pipe_t *p, ngx_ return NGX_ABORT; } - if (rev->active && !rev->ready) { - ngx_add_timer(rev, p->read_timeout); + if (!rev->delayed) { + if (rev->active && !rev->ready) { + ngx_add_timer(rev, p->read_timeout); - } else if (rev->timer_set) { - ngx_del_timer(rev); + } else if (rev->timer_set) { + ngx_del_timer(rev); + } } } @@ -99,9 +101,11 @@ ngx_event_pipe(ngx_event_pipe_t *p, ngx_ static ngx_int_t ngx_event_pipe_read_upstream(ngx_event_pipe_t *p) { + off_t limit; ssize_t n, size; ngx_int_t rc; ngx_buf_t *b; + ngx_msec_t delay; ngx_chain_t *chain, *cl, *ln; if (p->upstream_eof || p->upstream_error || p->upstream_done) { @@ -169,6 +173,25 @@ ngx_event_pipe_read_upstream(ngx_event_p } #endif + if (p->limit_rate) { + if (p->upstream->read->delayed) { + break; + } + + limit = (off_t) p->limit_rate * (ngx_time() - p->start_sec + 1) + - p->read_length; + + if (limit <= 0) { + p->upstream->read->delayed = 1; + delay = (ngx_msec_t) (- limit * 1000 / p->limit_rate + 1); + ngx_add_timer(p->upstream->read, delay); + break; + } + + } else { + limit = 0; + } + if (p->free_raw_bufs) { /* use the free bufs if they exist */ @@ -270,7 +293,7 @@ ngx_event_pipe_read_upstream(ngx_event_p break; } - n = p->upstream->recv_chain(p->upstream, chain, 0); + n = p->upstream->recv_chain(p->upstream, chain, limit); ngx_log_debug1(NGX_LOG_DEBUG_EVENT, p->log, 0, "pipe recv chain: %z", n); @@ -301,6 +324,8 @@ ngx_event_pipe_read_upstream(ngx_event_p } } + delay = p->limit_rate ? (ngx_msec_t) n * 1000 / p->limit_rate : 0; + p->read_length += n; cl = chain; p->free_raw_bufs = NULL; @@ -337,6 +362,12 @@ ngx_event_pipe_read_upstream(ngx_event_p ln->next = p->free_raw_bufs; p->free_raw_bufs = cl; } + + if (delay > 0) { + p->upstream->read->delayed = 1; + ngx_add_timer(p->upstream->read, delay); + break; + } } #if (NGX_DEBUG) diff --git a/src/event/ngx_event_pipe.h b/src/event/ngx_event_pipe.h --- a/src/event/ngx_event_pipe.h +++ b/src/event/ngx_event_pipe.h @@ -80,6 +80,9 @@ struct ngx_event_pipe_s { size_t preread_size; ngx_buf_t *buf_to_file; + size_t limit_rate; + time_t start_sec; + ngx_temp_file_t *temp_file; /* STUB */ int num; diff --git a/src/http/modules/ngx_http_fastcgi_module.c b/src/http/modules/ngx_http_fastcgi_module.c --- a/src/http/modules/ngx_http_fastcgi_module.c +++ b/src/http/modules/ngx_http_fastcgi_module.c @@ -333,6 +333,13 @@ static ngx_command_t ngx_http_fastcgi_c offsetof(ngx_http_fastcgi_loc_conf_t, upstream.force_ranges), NULL }, + { ngx_string("fastcgi_limit_rate"), + NGX_HTTP_MAIN_CONF|NGX_HTTP_SRV_CONF|NGX_HTTP_LOC_CONF|NGX_CONF_TAKE1, + ngx_conf_set_size_slot, + NGX_HTTP_LOC_CONF_OFFSET, + offsetof(ngx_http_fastcgi_loc_conf_t, upstream.limit_rate), + NULL }, + #if (NGX_HTTP_CACHE) { ngx_string("fastcgi_cache"), @@ -2350,6 +2357,7 @@ ngx_http_fastcgi_create_loc_conf(ngx_con conf->upstream.send_lowat = NGX_CONF_UNSET_SIZE; conf->upstream.buffer_size = NGX_CONF_UNSET_SIZE; + conf->upstream.limit_rate = NGX_CONF_UNSET_SIZE; conf->upstream.busy_buffers_size_conf = NGX_CONF_UNSET_SIZE; conf->upstream.max_temp_file_size_conf = NGX_CONF_UNSET_SIZE; @@ -2446,6 +2454,9 @@ ngx_http_fastcgi_merge_loc_conf(ngx_conf prev->upstream.buffer_size, (size_t) ngx_pagesize); + ngx_conf_merge_size_value(conf->upstream.limit_rate, + prev->upstream.limit_rate, 0); + ngx_conf_merge_bufs_value(conf->upstream.bufs, prev->upstream.bufs, 8, ngx_pagesize); diff --git a/src/http/modules/ngx_http_proxy_module.c b/src/http/modules/ngx_http_proxy_module.c --- a/src/http/modules/ngx_http_proxy_module.c +++ b/src/http/modules/ngx_http_proxy_module.c @@ -396,6 +396,13 @@ static ngx_command_t ngx_http_proxy_com offsetof(ngx_http_proxy_loc_conf_t, upstream.force_ranges), NULL }, + { ngx_string("proxy_limit_rate"), + NGX_HTTP_MAIN_CONF|NGX_HTTP_SRV_CONF|NGX_HTTP_LOC_CONF|NGX_CONF_TAKE1, + ngx_conf_set_size_slot, + NGX_HTTP_LOC_CONF_OFFSET, + offsetof(ngx_http_proxy_loc_conf_t, upstream.limit_rate), + NULL }, + #if (NGX_HTTP_CACHE) { ngx_string("proxy_cache"), @@ -2490,6 +2497,7 @@ ngx_http_proxy_create_loc_conf(ngx_conf_ conf->upstream.send_lowat = NGX_CONF_UNSET_SIZE; conf->upstream.buffer_size = NGX_CONF_UNSET_SIZE; + conf->upstream.limit_rate = NGX_CONF_UNSET_SIZE; conf->upstream.busy_buffers_size_conf = NGX_CONF_UNSET_SIZE; conf->upstream.max_temp_file_size_conf = NGX_CONF_UNSET_SIZE; @@ -2601,6 +2609,9 @@ ngx_http_proxy_merge_loc_conf(ngx_conf_t prev->upstream.buffer_size, (size_t) ngx_pagesize); + ngx_conf_merge_size_value(conf->upstream.limit_rate, + prev->upstream.limit_rate, 0); + ngx_conf_merge_bufs_value(conf->upstream.bufs, prev->upstream.bufs, 8, ngx_pagesize); diff --git a/src/http/modules/ngx_http_scgi_module.c b/src/http/modules/ngx_http_scgi_module.c --- a/src/http/modules/ngx_http_scgi_module.c +++ b/src/http/modules/ngx_http_scgi_module.c @@ -190,6 +190,13 @@ static ngx_command_t ngx_http_scgi_comma offsetof(ngx_http_scgi_loc_conf_t, upstream.force_ranges), NULL }, + { ngx_string("scgi_limit_rate"), + NGX_HTTP_MAIN_CONF|NGX_HTTP_SRV_CONF|NGX_HTTP_LOC_CONF|NGX_CONF_TAKE1, + ngx_conf_set_size_slot, + NGX_HTTP_LOC_CONF_OFFSET, + offsetof(ngx_http_scgi_loc_conf_t, upstream.limit_rate), + NULL }, + #if (NGX_HTTP_CACHE) { ngx_string("scgi_cache"), @@ -1109,6 +1116,7 @@ ngx_http_scgi_create_loc_conf(ngx_conf_t conf->upstream.send_lowat = NGX_CONF_UNSET_SIZE; conf->upstream.buffer_size = NGX_CONF_UNSET_SIZE; + conf->upstream.limit_rate = NGX_CONF_UNSET_SIZE; conf->upstream.busy_buffers_size_conf = NGX_CONF_UNSET_SIZE; conf->upstream.max_temp_file_size_conf = NGX_CONF_UNSET_SIZE; @@ -1200,6 +1208,9 @@ ngx_http_scgi_merge_loc_conf(ngx_conf_t prev->upstream.buffer_size, (size_t) ngx_pagesize); + ngx_conf_merge_size_value(conf->upstream.limit_rate, + prev->upstream.limit_rate, 0); + ngx_conf_merge_bufs_value(conf->upstream.bufs, prev->upstream.bufs, 8, ngx_pagesize); diff --git a/src/http/modules/ngx_http_uwsgi_module.c b/src/http/modules/ngx_http_uwsgi_module.c --- a/src/http/modules/ngx_http_uwsgi_module.c +++ b/src/http/modules/ngx_http_uwsgi_module.c @@ -245,6 +245,13 @@ static ngx_command_t ngx_http_uwsgi_comm offsetof(ngx_http_uwsgi_loc_conf_t, upstream.force_ranges), NULL }, + { ngx_string("uwsgi_limit_rate"), + NGX_HTTP_MAIN_CONF|NGX_HTTP_SRV_CONF|NGX_HTTP_LOC_CONF|NGX_CONF_TAKE1, + ngx_conf_set_size_slot, + NGX_HTTP_LOC_CONF_OFFSET, + offsetof(ngx_http_uwsgi_loc_conf_t, upstream.limit_rate), + NULL }, + #if (NGX_HTTP_CACHE) { ngx_string("uwsgi_cache"), @@ -1289,6 +1296,7 @@ ngx_http_uwsgi_create_loc_conf(ngx_conf_ conf->upstream.send_lowat = NGX_CONF_UNSET_SIZE; conf->upstream.buffer_size = NGX_CONF_UNSET_SIZE; + conf->upstream.limit_rate = NGX_CONF_UNSET_SIZE; conf->upstream.busy_buffers_size_conf = NGX_CONF_UNSET_SIZE; conf->upstream.max_temp_file_size_conf = NGX_CONF_UNSET_SIZE; @@ -1387,6 +1395,9 @@ ngx_http_uwsgi_merge_loc_conf(ngx_conf_t prev->upstream.buffer_size, (size_t) ngx_pagesize); + ngx_conf_merge_size_value(conf->upstream.limit_rate, + prev->upstream.limit_rate, 0); + ngx_conf_merge_bufs_value(conf->upstream.bufs, prev->upstream.bufs, 8, ngx_pagesize); diff --git a/src/http/ngx_http_upstream.c b/src/http/ngx_http_upstream.c --- a/src/http/ngx_http_upstream.c +++ b/src/http/ngx_http_upstream.c @@ -2579,6 +2579,8 @@ ngx_http_upstream_send_response(ngx_http p->downstream = c; p->pool = r->pool; p->log = c->log; + p->limit_rate = u->conf->limit_rate; + p->start_sec = ngx_time(); p->cacheable = u->cacheable || u->store; @@ -3253,21 +3255,61 @@ static void ngx_http_upstream_process_upstream(ngx_http_request_t *r, ngx_http_upstream_t *u) { + ngx_event_t *rev; + ngx_event_pipe_t *p; ngx_connection_t *c; c = u->peer.connection; + p = u->pipe; + rev = c->read; ngx_log_debug0(NGX_LOG_DEBUG_HTTP, c->log, 0, "http upstream process upstream"); c->log->action = "reading upstream"; - if (c->read->timedout) { - u->pipe->upstream_error = 1; - ngx_connection_error(c, NGX_ETIMEDOUT, "upstream timed out"); + if (rev->timedout) { + + if (rev->delayed) { + + rev->timedout = 0; + rev->delayed = 0; + + if (!rev->ready) { + ngx_add_timer(rev, p->read_timeout); + + if (ngx_handle_read_event(rev, 0) != NGX_OK) { + ngx_http_upstream_finalize_request(r, u, NGX_ERROR); + } + + return; + } + + if (ngx_event_pipe(p, 0) == NGX_ABORT) { + ngx_http_upstream_finalize_request(r, u, NGX_ERROR); + return; + } + + } else { + p->upstream_error = 1; + ngx_connection_error(c, NGX_ETIMEDOUT, "upstream timed out"); + } } else { - if (ngx_event_pipe(u->pipe, 0) == NGX_ABORT) { + + if (rev->delayed) { + + ngx_log_debug0(NGX_LOG_DEBUG_HTTP, c->log, 0, + "http upstream delayed"); + + if (ngx_handle_read_event(rev, 0) != NGX_OK) { + ngx_http_upstream_finalize_request(r, u, NGX_ERROR); + } + + return; + } + + if (ngx_event_pipe(p, 0) == NGX_ABORT) { ngx_http_upstream_finalize_request(r, u, NGX_ERROR); return; } diff --git a/src/http/ngx_http_upstream.h b/src/http/ngx_http_upstream.h --- a/src/http/ngx_http_upstream.h +++ b/src/http/ngx_http_upstream.h @@ -141,6 +141,7 @@ typedef struct { size_t send_lowat; size_t buffer_size; + size_t limit_rate; size_t busy_buffers_size; size_t max_temp_file_size;