diff src/event/ngx_event_pipe.c @ 5883:973ee2276300

Upstream: proxy_limit_rate and friends. The directives limit the upstream read rate. For example, "proxy_limit_rate 42" limits proxy upstream read rate to 42 bytes per second.
author Roman Arutyunyan <arut@nginx.com>
date Tue, 28 Oct 2014 12:29:59 +0300
parents ec81934727a1
children b8926ba4d087
line wrap: on
line diff
--- a/src/event/ngx_event_pipe.c
+++ b/src/event/ngx_event_pipe.c
@@ -66,11 +66,13 @@ ngx_event_pipe(ngx_event_pipe_t *p, ngx_
             return NGX_ABORT;
         }
 
-        if (rev->active && !rev->ready) {
-            ngx_add_timer(rev, p->read_timeout);
+        if (!rev->delayed) {
+            if (rev->active && !rev->ready) {
+                ngx_add_timer(rev, p->read_timeout);
 
-        } else if (rev->timer_set) {
-            ngx_del_timer(rev);
+            } else if (rev->timer_set) {
+                ngx_del_timer(rev);
+            }
         }
     }
 
@@ -99,9 +101,11 @@ ngx_event_pipe(ngx_event_pipe_t *p, ngx_
 static ngx_int_t
 ngx_event_pipe_read_upstream(ngx_event_pipe_t *p)
 {
+    off_t         limit;
     ssize_t       n, size;
     ngx_int_t     rc;
     ngx_buf_t    *b;
+    ngx_msec_t    delay;
     ngx_chain_t  *chain, *cl, *ln;
 
     if (p->upstream_eof || p->upstream_error || p->upstream_done) {
@@ -169,6 +173,25 @@ ngx_event_pipe_read_upstream(ngx_event_p
             }
 #endif
 
+            if (p->limit_rate) {
+                if (p->upstream->read->delayed) {
+                    break;
+                }
+
+                limit = (off_t) p->limit_rate * (ngx_time() - p->start_sec + 1)
+                        - p->read_length;
+
+                if (limit <= 0) {
+                    p->upstream->read->delayed = 1;
+                    delay = (ngx_msec_t) (- limit * 1000 / p->limit_rate + 1);
+                    ngx_add_timer(p->upstream->read, delay);
+                    break;
+                }
+
+            } else {
+                limit = 0;
+            }
+
             if (p->free_raw_bufs) {
 
                 /* use the free bufs if they exist */
@@ -270,7 +293,7 @@ ngx_event_pipe_read_upstream(ngx_event_p
                 break;
             }
 
-            n = p->upstream->recv_chain(p->upstream, chain, 0);
+            n = p->upstream->recv_chain(p->upstream, chain, limit);
 
             ngx_log_debug1(NGX_LOG_DEBUG_EVENT, p->log, 0,
                            "pipe recv chain: %z", n);
@@ -301,6 +324,8 @@ ngx_event_pipe_read_upstream(ngx_event_p
             }
         }
 
+        delay = p->limit_rate ? (ngx_msec_t) n * 1000 / p->limit_rate : 0;
+
         p->read_length += n;
         cl = chain;
         p->free_raw_bufs = NULL;
@@ -337,6 +362,12 @@ ngx_event_pipe_read_upstream(ngx_event_p
             ln->next = p->free_raw_bufs;
             p->free_raw_bufs = cl;
         }
+
+        if (delay > 0) {
+            p->upstream->read->delayed = 1;
+            ngx_add_timer(p->upstream->read, delay);
+            break;
+        }
     }
 
 #if (NGX_DEBUG)