changeset 8509:bce9e9643444 quic

QUIC: coalesce neighbouring stream send buffers. Previously a single STREAM frame was created for each buffer in stream output chain which is wasteful with respect to memory. The following changes were made in the stream send code: - ngx_quic_stream_send_chain() no longer calls ngx_quic_stream_send() and got a separate implementation that coalesces neighbouring buffers into a single frame - the new ngx_quic_stream_send_chain() respects the limit argument, which fixes sendfile_max_chunk and limit_rate - ngx_quic_stream_send() is reimplemented to call ngx_quic_stream_send_chain() - stream frame size limit is moved out to a separate function ngx_quic_max_stream_frame() - flow control is moved out to a separate function ngx_quic_max_stream_flow() - ngx_quic_stream_send_chain() is relocated next to ngx_quic_stream_send()
author Roman Arutyunyan <arut@nginx.com>
date Tue, 18 Aug 2020 12:28:33 +0300
parents 4604e6043657
children 532fe796b0e2
files src/event/ngx_event_quic.c
diffstat 1 files changed, 183 insertions(+), 127 deletions(-) [+]
line wrap: on
line diff
--- a/src/event/ngx_event_quic.c
+++ b/src/event/ngx_event_quic.c
@@ -280,9 +280,11 @@ static ssize_t ngx_quic_stream_recv(ngx_
     size_t size);
 static ssize_t ngx_quic_stream_send(ngx_connection_t *c, u_char *buf,
     size_t size);
-static void ngx_quic_stream_cleanup_handler(void *data);
 static ngx_chain_t *ngx_quic_stream_send_chain(ngx_connection_t *c,
     ngx_chain_t *in, off_t limit);
+static size_t ngx_quic_max_stream_frame(ngx_quic_connection_t *qc);
+static size_t ngx_quic_max_stream_flow(ngx_connection_t *c);
+static void ngx_quic_stream_cleanup_handler(void *data);
 static ngx_quic_frame_t *ngx_quic_alloc_frame(ngx_connection_t *c, size_t size);
 static void ngx_quic_free_frame(ngx_connection_t *c, ngx_quic_frame_t *frame);
 
@@ -4228,10 +4230,44 @@ ngx_quic_stream_recv(ngx_connection_t *c
 static ssize_t
 ngx_quic_stream_send(ngx_connection_t *c, u_char *buf, size_t size)
 {
-    u_char                 *p, *end;
-    size_t                  fsize, limit, n, len;
-    uint64_t                sent, unacked;
+    ngx_buf_t    b;
+    ngx_chain_t  cl;
+
+    ngx_memzero(&b, sizeof(ngx_buf_t));
+
+    b.memory = 1;
+    b.pos = buf;
+    b.last = buf + size;
+
+    cl.buf = &b;
+    cl.next = NULL;
+
+    if (ngx_quic_stream_send_chain(c, &cl, 0) == NGX_CHAIN_ERROR) {
+        return NGX_ERROR;
+    }
+
+    if (b.pos == buf) {
+        return NGX_AGAIN;
+    }
+
+    return b.pos - buf;
+}
+
+
+static ngx_chain_t *
+ngx_quic_stream_send_chain(ngx_connection_t *c, ngx_chain_t *in, off_t limit)
+{
+    u_char                 *p;
+    size_t                  n, max, max_frame, max_flow, max_limit, len;
+#if (NGX_DEBUG)
+    size_t                  sent;
+#endif
+    ngx_buf_t              *b;
+#if (NGX_DEBUG)
+    ngx_uint_t              nframes;
+#endif
     ngx_event_t            *wev;
+    ngx_chain_t            *cl;
     ngx_connection_t       *pc;
     ngx_quic_frame_t       *frame;
     ngx_quic_stream_t      *qs;
@@ -4243,69 +4279,49 @@ ngx_quic_stream_send(ngx_connection_t *c
     wev = c->write;
 
     if (wev->error) {
-        return NGX_ERROR;
-    }
-
-    ngx_log_debug2(NGX_LOG_DEBUG_EVENT, c->log, 0,
-                   "quic stream id 0x%xL send: %uz", qs->id, size);
-
-    /*
-     * we need to fit at least 1 frame into a packet, thus account head/tail;
-     * 25 = 1 + 8x3 is max header for STREAM frame, with 1 byte for frame type
-     */
-    limit = qc->ctp.max_udp_payload_size - NGX_QUIC_MAX_SHORT_HEADER - 25
-            - EVP_GCM_TLS_TAG_LEN;
-
-    len = size;
-    sent = c->sent;
-    unacked = sent - qs->acked;
-
-    if (qc->streams.send_max_data == 0) {
-        qc->streams.send_max_data = qc->ctp.initial_max_data;
-    }
-
-    if (unacked >= NGX_QUIC_STREAM_BUFSIZE) {
-        ngx_log_debug0(NGX_LOG_DEBUG_EVENT, c->log, 0,
-                       "quic send hit buffer size");
-        len = 0;
-
-    } else if (unacked + len > NGX_QUIC_STREAM_BUFSIZE) {
-        len = NGX_QUIC_STREAM_BUFSIZE - unacked;
-    }
-
-    if (qc->streams.sent >= qc->streams.send_max_data) {
-        ngx_log_debug0(NGX_LOG_DEBUG_EVENT, c->log, 0,
-                       "quic send hit MAX_DATA");
-        len = 0;
-
-    } else if (qc->streams.sent + len > qc->streams.send_max_data) {
-        len = qc->streams.send_max_data - qc->streams.sent;
-    }
-
-    if (sent >= qs->send_max_data) {
-        ngx_log_debug0(NGX_LOG_DEBUG_EVENT, c->log, 0,
-                       "quic send hit MAX_STREAM_DATA");
-        len = 0;
-
-    } else if (sent + len > qs->send_max_data) {
-        len = qs->send_max_data - sent;
-    }
-
-    p = (u_char *) buf;
-    end = (u_char *) buf + len;
-    n = 0;
-
-    while (p < end) {
-
-        fsize = ngx_min(limit, (size_t) (end - p));
-
-        frame = ngx_quic_alloc_frame(pc, fsize);
+        return NGX_CHAIN_ERROR;
+    }
+
+    max_frame = ngx_quic_max_stream_frame(qc);
+    max_flow = ngx_quic_max_stream_flow(c);
+    max_limit = limit;
+
+#if (NGX_DEBUG)
+    sent = 0;
+    nframes = 0;
+#endif
+
+    for ( ;; ) {
+        max = ngx_min(max_frame, max_flow);
+
+        if (limit) {
+            max = ngx_min(max, max_limit);
+        }
+
+        for (cl = in, n = 0; in; in = in->next) {
+
+            if (!ngx_buf_in_memory(in->buf)) {
+                continue;
+            }
+
+            n += ngx_buf_size(in->buf);
+
+            if (n > max) {
+                n = max;
+                break;
+            }
+        }
+
+        if (n == 0) {
+            wev->ready = (max_flow ? 1 : 0);
+            break;
+        }
+
+        frame = ngx_quic_alloc_frame(pc, n);
         if (frame == NULL) {
-            return 0;
+            return NGX_CHAIN_ERROR;
         }
 
-        ngx_memcpy(frame->data, p, fsize);
-
         frame->level = ssl_encryption_application;
         frame->type = NGX_QUIC_FT_STREAM6; /* OFF=1 LEN=1 FIN=0 */
         frame->u.stream.off = 1;
@@ -4315,33 +4331,115 @@ ngx_quic_stream_send(ngx_connection_t *c
         frame->u.stream.type = frame->type;
         frame->u.stream.stream_id = qs->id;
         frame->u.stream.offset = c->sent;
-        frame->u.stream.length = fsize;
+        frame->u.stream.length = n;
         frame->u.stream.data = frame->data;
 
-        c->sent += fsize;
-        qc->streams.sent += fsize;
-        p += fsize;
-        n += fsize;
-
-        ngx_sprintf(frame->info, "stream 0x%xL len=%ui level=%d",
-                    qs->id, fsize, frame->level);
+        ngx_sprintf(frame->info, "STREAM id:0x%xL len:%uz level:%d",
+                    qs->id, n, frame->level);
+
+        c->sent += n;
+        qc->streams.sent += n;
+        max_flow -= n;
+
+        if (limit) {
+            max_limit -= n;
+        }
+
+#if (NGX_DEBUG)
+        sent += n;
+        nframes++;
+#endif
+
+        for (p = frame->data; n > 0; cl = cl->next) {
+            b = cl->buf;
+
+            if (!ngx_buf_in_memory(b)) {
+                continue;
+            }
+
+            len = ngx_min(n, (size_t) (b->last - b->pos));
+            p = ngx_cpymem(p, b->pos, len);
+
+            b->pos += len;
+            n -= len;
+        }
 
         ngx_quic_queue_frame(qc, frame);
     }
 
-    ngx_log_debug4(NGX_LOG_DEBUG_EVENT, c->log, 0,
-                   "quic send %uz of %uz, sent:%O, unacked:%uL",
-                   n, size, c->sent, (uint64_t) c->sent - qs->acked);
-
-    if (n != size) {
-        c->write->ready = 0;
-    }
-
-    if (n == 0) {
-        return NGX_AGAIN;
-    }
-
-    return n;
+    ngx_log_debug2(NGX_LOG_DEBUG_EVENT, c->log, 0,
+                   "quic send_chain sent:%uz, frames:%ui", sent, nframes);
+
+    return in;
+}
+
+
+static size_t
+ngx_quic_max_stream_frame(ngx_quic_connection_t *qc)
+{
+    /*
+     * we need to fit at least 1 frame into a packet, thus account head/tail;
+     * 25 = 1 + 8x3 is max header for STREAM frame, with 1 byte for frame type
+     */
+
+    return qc->ctp.max_udp_payload_size - NGX_QUIC_MAX_SHORT_HEADER - 25
+           - EVP_GCM_TLS_TAG_LEN;
+}
+
+
+static size_t
+ngx_quic_max_stream_flow(ngx_connection_t *c)
+{
+    size_t                  size;
+    uint64_t                sent, unacked;
+    ngx_quic_stream_t      *qs;
+    ngx_quic_connection_t  *qc;
+
+    qs = c->qs;
+    qc = qs->parent->quic;
+
+    size = NGX_QUIC_STREAM_BUFSIZE;
+    sent = c->sent;
+    unacked = sent - qs->acked;
+
+    if (qc->streams.send_max_data == 0) {
+        qc->streams.send_max_data = qc->ctp.initial_max_data;
+    }
+
+    if (unacked >= NGX_QUIC_STREAM_BUFSIZE) {
+        ngx_log_debug0(NGX_LOG_DEBUG_EVENT, c->log, 0,
+                       "quic send flow hit buffer size");
+        return 0;
+    }
+
+    if (unacked + size > NGX_QUIC_STREAM_BUFSIZE) {
+        size = NGX_QUIC_STREAM_BUFSIZE - unacked;
+    }
+
+    if (qc->streams.sent >= qc->streams.send_max_data) {
+        ngx_log_debug0(NGX_LOG_DEBUG_EVENT, c->log, 0,
+                       "quic send flow hit MAX_DATA");
+        return 0;
+    }
+
+    if (qc->streams.sent + size > qc->streams.send_max_data) {
+        size = qc->streams.send_max_data - qc->streams.sent;
+    }
+
+    if (sent >= qs->send_max_data) {
+        ngx_log_debug0(NGX_LOG_DEBUG_EVENT, c->log, 0,
+                       "quic send flow hit MAX_STREAM_DATA");
+        return 0;
+    }
+
+    if (sent + size > qs->send_max_data) {
+        size = qs->send_max_data - sent;
+    }
+
+    ngx_log_debug1(NGX_LOG_DEBUG_EVENT, c->log, 0,
+                   "quic send flow: %uz", size);
+
+    return size;
 }
 
 
@@ -4431,48 +4529,6 @@ ngx_quic_stream_cleanup_handler(void *da
 }
 
 
-static ngx_chain_t *
-ngx_quic_stream_send_chain(ngx_connection_t *c, ngx_chain_t *in,
-    off_t limit)
-{
-    size_t      len;
-    ssize_t     n;
-    ngx_buf_t  *b;
-
-    for ( /* void */; in; in = in->next) {
-        b = in->buf;
-
-        if (!ngx_buf_in_memory(b)) {
-            continue;
-        }
-
-        if (ngx_buf_size(b) == 0) {
-            continue;
-        }
-
-        len = b->last - b->pos;
-
-        n = ngx_quic_stream_send(c, b->pos, len);
-
-        if (n == NGX_ERROR) {
-            return NGX_CHAIN_ERROR;
-        }
-
-        if (n == NGX_AGAIN) {
-            return in;
-        }
-
-        b->pos += n;
-
-        if (n != (ssize_t) len) {
-            return in;
-        }
-    }
-
-    return NULL;
-}
-
-
 static ngx_quic_frame_t *
 ngx_quic_alloc_frame(ngx_connection_t *c, size_t size)
 {