Mercurial > hg > nginx-quic
view src/http/ngx_http_write_filter_module.c @ 4667:d05ab8793a69 stable-1.2
Merge of r4622, r4623: balancing changes.
*) Upstream: smooth weighted round-robin balancing.
For edge case weights like { 5, 1, 1 } we now produce { a, a, b, a, c, a, a }
sequence instead of { c, b, a, a, a, a, a } produced previously.
Algorithm is as follows: on each peer selection we increase current_weight
of each eligible peer by its weight, select peer with greatest current_weight
and reduce its current_weight by total number of weight points distributed
among peers.
In case of { 5, 1, 1 } weights this gives the following sequence of
current_weight's:
a b c
0 0 0 (initial state)
5 1 1 (a selected)
-2 1 1
3 2 2 (a selected)
-4 2 2
1 3 3 (b selected)
1 -4 3
6 -3 4 (a selected)
-1 -3 4
4 -2 5 (c selected)
4 -2 -2
9 -1 -1 (a selected)
2 -1 -1
7 0 0 (a selected)
0 0 0
To preserve weight reduction in case of failures the effective_weight
variable was introduced, which usually matches peer's weight, but is
reduced temporarily on peer failures.
This change also fixes loop with backup servers and proxy_next_upstream
http_404 (ticket #47), and skipping alive upstreams in some cases if there
are multiple dead ones (ticket #64).
*) Upstream: fixed ip_hash rebalancing with the "down" flag.
Due to weight being set to 0 for down peers, order of peers after sorting
wasn't the same as without the "down" flag (with down peers at the end),
resulting in client rebalancing for clients on other servers. The only
rebalancing which should happen after adding "down" to a server is one
for clients on the server.
The problem was introduced in r1377 (which fixed endless loop by setting
weight to 0 for down servers). The loop is no longer possible with new
smooth algorithm, so preserving original weight is safe.
author | Maxim Dounin <mdounin@mdounin.ru> |
---|---|
date | Mon, 04 Jun 2012 11:21:58 +0000 |
parents | d620f497c50f |
children | 3464d0b63300 |
line wrap: on
line source
/* * Copyright (C) Igor Sysoev * Copyright (C) Nginx, Inc. */ #include <ngx_config.h> #include <ngx_core.h> #include <ngx_http.h> static ngx_int_t ngx_http_write_filter_init(ngx_conf_t *cf); static ngx_http_module_t ngx_http_write_filter_module_ctx = { NULL, /* preconfiguration */ ngx_http_write_filter_init, /* postconfiguration */ NULL, /* create main configuration */ NULL, /* init main configuration */ NULL, /* create server configuration */ NULL, /* merge server configuration */ NULL, /* create location configuration */ NULL, /* merge location configuration */ }; ngx_module_t ngx_http_write_filter_module = { NGX_MODULE_V1, &ngx_http_write_filter_module_ctx, /* module context */ NULL, /* module directives */ NGX_HTTP_MODULE, /* module type */ NULL, /* init master */ NULL, /* init module */ NULL, /* init process */ NULL, /* init thread */ NULL, /* exit thread */ NULL, /* exit process */ NULL, /* exit master */ NGX_MODULE_V1_PADDING }; ngx_int_t ngx_http_write_filter(ngx_http_request_t *r, ngx_chain_t *in) { off_t size, sent, nsent, limit; ngx_uint_t last, flush; ngx_msec_t delay; ngx_chain_t *cl, *ln, **ll, *chain; ngx_connection_t *c; ngx_http_core_loc_conf_t *clcf; c = r->connection; if (c->error) { return NGX_ERROR; } size = 0; flush = 0; last = 0; ll = &r->out; /* find the size, the flush point and the last link of the saved chain */ for (cl = r->out; cl; cl = cl->next) { ll = &cl->next; ngx_log_debug7(NGX_LOG_DEBUG_EVENT, c->log, 0, "write old buf t:%d f:%d %p, pos %p, size: %z " "file: %O, size: %z", cl->buf->temporary, cl->buf->in_file, cl->buf->start, cl->buf->pos, cl->buf->last - cl->buf->pos, cl->buf->file_pos, cl->buf->file_last - cl->buf->file_pos); #if 1 if (ngx_buf_size(cl->buf) == 0 && !ngx_buf_special(cl->buf)) { ngx_log_error(NGX_LOG_ALERT, c->log, 0, "zero size buf in writer " "t:%d r:%d f:%d %p %p-%p %p %O-%O", cl->buf->temporary, cl->buf->recycled, cl->buf->in_file, cl->buf->start, cl->buf->pos, cl->buf->last, cl->buf->file, cl->buf->file_pos, cl->buf->file_last); ngx_debug_point(); return NGX_ERROR; } #endif size += ngx_buf_size(cl->buf); if (cl->buf->flush || cl->buf->recycled) { flush = 1; } if (cl->buf->last_buf) { last = 1; } } /* add the new chain to the existent one */ for (ln = in; ln; ln = ln->next) { cl = ngx_alloc_chain_link(r->pool); if (cl == NULL) { return NGX_ERROR; } cl->buf = ln->buf; *ll = cl; ll = &cl->next; ngx_log_debug7(NGX_LOG_DEBUG_EVENT, c->log, 0, "write new buf t:%d f:%d %p, pos %p, size: %z " "file: %O, size: %z", cl->buf->temporary, cl->buf->in_file, cl->buf->start, cl->buf->pos, cl->buf->last - cl->buf->pos, cl->buf->file_pos, cl->buf->file_last - cl->buf->file_pos); #if 1 if (ngx_buf_size(cl->buf) == 0 && !ngx_buf_special(cl->buf)) { ngx_log_error(NGX_LOG_ALERT, c->log, 0, "zero size buf in writer " "t:%d r:%d f:%d %p %p-%p %p %O-%O", cl->buf->temporary, cl->buf->recycled, cl->buf->in_file, cl->buf->start, cl->buf->pos, cl->buf->last, cl->buf->file, cl->buf->file_pos, cl->buf->file_last); ngx_debug_point(); return NGX_ERROR; } #endif size += ngx_buf_size(cl->buf); if (cl->buf->flush || cl->buf->recycled) { flush = 1; } if (cl->buf->last_buf) { last = 1; } } *ll = NULL; ngx_log_debug3(NGX_LOG_DEBUG_HTTP, c->log, 0, "http write filter: l:%d f:%d s:%O", last, flush, size); clcf = ngx_http_get_module_loc_conf(r, ngx_http_core_module); /* * avoid the output if there are no last buf, no flush point, * there are the incoming bufs and the size of all bufs * is smaller than "postpone_output" directive */ if (!last && !flush && in && size < (off_t) clcf->postpone_output) { return NGX_OK; } if (c->write->delayed) { c->buffered |= NGX_HTTP_WRITE_BUFFERED; return NGX_AGAIN; } if (size == 0 && !(c->buffered & NGX_LOWLEVEL_BUFFERED)) { if (last) { r->out = NULL; c->buffered &= ~NGX_HTTP_WRITE_BUFFERED; return NGX_OK; } if (flush) { do { r->out = r->out->next; } while (r->out); c->buffered &= ~NGX_HTTP_WRITE_BUFFERED; return NGX_OK; } ngx_log_error(NGX_LOG_ALERT, c->log, 0, "the http output chain is empty"); ngx_debug_point(); return NGX_ERROR; } if (r->limit_rate) { limit = r->limit_rate * (ngx_time() - r->start_sec + 1) - (c->sent - clcf->limit_rate_after); if (limit <= 0) { c->write->delayed = 1; ngx_add_timer(c->write, (ngx_msec_t) (- limit * 1000 / r->limit_rate + 1)); c->buffered |= NGX_HTTP_WRITE_BUFFERED; return NGX_AGAIN; } if (clcf->sendfile_max_chunk && (off_t) clcf->sendfile_max_chunk < limit) { limit = clcf->sendfile_max_chunk; } } else { limit = clcf->sendfile_max_chunk; } sent = c->sent; ngx_log_debug1(NGX_LOG_DEBUG_HTTP, c->log, 0, "http write filter limit %O", limit); chain = c->send_chain(c, r->out, limit); ngx_log_debug1(NGX_LOG_DEBUG_HTTP, c->log, 0, "http write filter %p", chain); if (chain == NGX_CHAIN_ERROR) { c->error = 1; return NGX_ERROR; } if (r->limit_rate) { nsent = c->sent; if (clcf->limit_rate_after) { sent -= clcf->limit_rate_after; if (sent < 0) { sent = 0; } nsent -= clcf->limit_rate_after; if (nsent < 0) { nsent = 0; } } delay = (ngx_msec_t) ((nsent - sent) * 1000 / r->limit_rate); if (delay > 0) { limit = 0; c->write->delayed = 1; ngx_add_timer(c->write, delay); } } if (limit && c->write->ready && c->sent - sent >= limit - (off_t) (2 * ngx_pagesize)) { c->write->delayed = 1; ngx_add_timer(c->write, 1); } for (cl = r->out; cl && cl != chain; /* void */) { ln = cl; cl = cl->next; ngx_free_chain(r->pool, ln); } r->out = chain; if (chain) { c->buffered |= NGX_HTTP_WRITE_BUFFERED; return NGX_AGAIN; } c->buffered &= ~NGX_HTTP_WRITE_BUFFERED; if ((c->buffered & NGX_LOWLEVEL_BUFFERED) && r->postponed == NULL) { return NGX_AGAIN; } return NGX_OK; } static ngx_int_t ngx_http_write_filter_init(ngx_conf_t *cf) { ngx_http_top_body_filter = ngx_http_write_filter; return NGX_OK; }