changeset 8779:f52a2b77d406 quic

QUIC: generic buffering for stream input. Previously each stream had an input buffer. Now memory is allocated as bytes arrive. Generic buffering mechanism is used for this.
author Roman Arutyunyan <arut@nginx.com>
date Wed, 05 May 2021 17:15:20 +0300
parents 5186ee5a94b9
children 557dc6a06ba6
files src/event/quic/ngx_event_quic.h src/event/quic/ngx_event_quic_ack.c src/event/quic/ngx_event_quic_frames.c src/event/quic/ngx_event_quic_frames.h src/event/quic/ngx_event_quic_streams.c
diffstat 5 files changed, 79 insertions(+), 77 deletions(-) [+]
line wrap: on
line diff
--- a/src/event/quic/ngx_event_quic.h
+++ b/src/event/quic/ngx_event_quic.h
@@ -79,7 +79,8 @@ struct ngx_quic_stream_s {
     uint64_t                   id;
     uint64_t                   acked;
     uint64_t                   send_max_data;
-    ngx_buf_t                 *b;
+    uint64_t                   recv_max_data;
+    ngx_chain_t               *in;
     ngx_quic_frames_stream_t  *fs;
     ngx_uint_t                 cancelable;  /* unsigned  cancelable:1; */
 };
--- a/src/event/quic/ngx_event_quic_ack.c
+++ b/src/event/quic/ngx_event_quic_ack.c
@@ -432,8 +432,6 @@ ngx_quic_detect_lost(ngx_connection_t *c
 void
 ngx_quic_resend_frames(ngx_connection_t *c, ngx_quic_send_ctx_t *ctx)
 {
-    size_t                  n;
-    ngx_buf_t              *b;
     ngx_queue_t            *q;
     ngx_quic_frame_t       *f, *start;
     ngx_quic_stream_t      *qs;
@@ -497,13 +495,7 @@ ngx_quic_resend_frames(ngx_connection_t 
                 break;
             }
 
-            b = qs->b;
-            n = qs->fs->received + (b->pos - b->start) + (b->end - b->last);
-
-            if (f->u.max_stream_data.limit < n) {
-                f->u.max_stream_data.limit = n;
-            }
-
+            f->u.max_stream_data.limit = qs->recv_max_data;
             ngx_quic_queue_frame(qc, f);
             break;
 
--- a/src/event/quic/ngx_event_quic_frames.c
+++ b/src/event/quic/ngx_event_quic_frames.c
@@ -13,7 +13,6 @@
 #define NGX_QUIC_BUFFER_SIZE  4096
 
 
-static void ngx_quic_free_bufs(ngx_connection_t *c, ngx_chain_t *in);
 static ngx_chain_t *ngx_quic_split_bufs(ngx_connection_t *c, ngx_chain_t *in,
     size_t len);
 
@@ -84,7 +83,7 @@ ngx_quic_free_frame(ngx_connection_t *c,
 }
 
 
-static void
+void
 ngx_quic_free_bufs(ngx_connection_t *c, ngx_chain_t *in)
 {
     ngx_buf_t              *b, *shadow;
--- a/src/event/quic/ngx_event_quic_frames.h
+++ b/src/event/quic/ngx_event_quic_frames.h
@@ -28,6 +28,7 @@ ngx_chain_t *ngx_quic_copy_buf(ngx_conne
     size_t len);
 ngx_chain_t *ngx_quic_copy_chain(ngx_connection_t *c, ngx_chain_t *in,
     size_t limit);
+void ngx_quic_free_bufs(ngx_connection_t *c, ngx_chain_t *in);
 
 ngx_int_t ngx_quic_handle_ordered_frame(ngx_connection_t *c,
     ngx_quic_frames_stream_t *fs, ngx_quic_frame_t *frame,
--- a/src/event/quic/ngx_event_quic_streams.c
+++ b/src/event/quic/ngx_event_quic_streams.c
@@ -16,7 +16,7 @@
 static ngx_quic_stream_t *ngx_quic_create_client_stream(ngx_connection_t *c,
     uint64_t id);
 static ngx_quic_stream_t *ngx_quic_create_stream(ngx_connection_t *c,
-    uint64_t id, size_t rcvbuf_size);
+    uint64_t id);
 static ssize_t ngx_quic_stream_recv(ngx_connection_t *c, u_char *buf,
     size_t size);
 static ssize_t ngx_quic_stream_send(ngx_connection_t *c, u_char *buf,
@@ -30,7 +30,6 @@ static void ngx_quic_stream_cleanup_hand
 ngx_connection_t *
 ngx_quic_open_stream(ngx_connection_t *c, ngx_uint_t bidi)
 {
-    size_t                  rcvbuf_size;
     uint64_t                id;
     ngx_quic_stream_t      *qs, *nqs;
     ngx_quic_connection_t  *qc;
@@ -58,7 +57,6 @@ ngx_quic_open_stream(ngx_connection_t *c
                        qc->streams.server_max_streams_bidi, id);
 
         qc->streams.server_streams_bidi++;
-        rcvbuf_size = qc->tp.initial_max_stream_data_bidi_local;
 
     } else {
         if (qc->streams.server_streams_uni
@@ -81,10 +79,9 @@ ngx_quic_open_stream(ngx_connection_t *c
                        qc->streams.server_max_streams_uni, id);
 
         qc->streams.server_streams_uni++;
-        rcvbuf_size = 0;
     }
 
-    nqs = ngx_quic_create_stream(qs->parent, id, rcvbuf_size);
+    nqs = ngx_quic_create_stream(qs->parent, id);
     if (nqs == NULL) {
         return NULL;
     }
@@ -235,7 +232,6 @@ ngx_quic_reset_stream(ngx_connection_t *
 static ngx_quic_stream_t *
 ngx_quic_create_client_stream(ngx_connection_t *c, uint64_t id)
 {
-    size_t                  n;
     uint64_t                min_id;
     ngx_quic_stream_t      *qs;
     ngx_quic_connection_t  *qc;
@@ -272,7 +268,6 @@ ngx_quic_create_client_stream(ngx_connec
         min_id = (qc->streams.client_streams_uni << 2)
                  | NGX_QUIC_STREAM_UNIDIRECTIONAL;
         qc->streams.client_streams_uni = (id >> 2) + 1;
-        n = qc->tp.initial_max_stream_data_uni;
 
     } else {
 
@@ -296,11 +291,6 @@ ngx_quic_create_client_stream(ngx_connec
 
         min_id = (qc->streams.client_streams_bidi << 2);
         qc->streams.client_streams_bidi = (id >> 2) + 1;
-        n = qc->tp.initial_max_stream_data_bidi_remote;
-    }
-
-    if (n < NGX_QUIC_STREAM_BUFSIZE) {
-        n = NGX_QUIC_STREAM_BUFSIZE;
     }
 
     /*
@@ -314,7 +304,7 @@ ngx_quic_create_client_stream(ngx_connec
 
     for ( /* void */ ; min_id < id; min_id += 0x04) {
 
-        qs = ngx_quic_create_stream(c, min_id, n);
+        qs = ngx_quic_create_stream(c, min_id);
         if (qs == NULL) {
             return NULL;
         }
@@ -326,12 +316,12 @@ ngx_quic_create_client_stream(ngx_connec
         }
     }
 
-    return ngx_quic_create_stream(c, id, n);
+    return ngx_quic_create_stream(c, id);
 }
 
 
 static ngx_quic_stream_t *
-ngx_quic_create_stream(ngx_connection_t *c, uint64_t id, size_t rcvbuf_size)
+ngx_quic_create_stream(ngx_connection_t *c, uint64_t id)
 {
     ngx_log_t              *log;
     ngx_pool_t             *pool;
@@ -360,12 +350,6 @@ ngx_quic_create_stream(ngx_connection_t 
     qs->parent = c;
     qs->id = id;
 
-    qs->b = ngx_create_temp_buf(pool, rcvbuf_size);
-    if (qs->b == NULL) {
-        ngx_destroy_pool(pool);
-        return NULL;
-    }
-
     qs->fs = ngx_pcalloc(pool, sizeof(ngx_quic_frames_stream_t));
     if (qs->fs == NULL) {
         ngx_destroy_pool(pool);
@@ -420,13 +404,19 @@ ngx_quic_create_stream(ngx_connection_t 
     if (id & NGX_QUIC_STREAM_UNIDIRECTIONAL) {
         if (id & NGX_QUIC_STREAM_SERVER_INITIATED) {
             qs->send_max_data = qc->ctp.initial_max_stream_data_uni;
+
+        } else {
+            qs->recv_max_data = qc->tp.initial_max_stream_data_uni;
         }
 
     } else {
         if (id & NGX_QUIC_STREAM_SERVER_INITIATED) {
             qs->send_max_data = qc->ctp.initial_max_stream_data_bidi_remote;
+            qs->recv_max_data = qc->tp.initial_max_stream_data_bidi_local;
+
         } else {
             qs->send_max_data = qc->ctp.initial_max_stream_data_bidi_local;
+            qs->recv_max_data = qc->tp.initial_max_stream_data_bidi_remote;
         }
     }
 
@@ -449,8 +439,9 @@ ngx_quic_create_stream(ngx_connection_t 
 static ssize_t
 ngx_quic_stream_recv(ngx_connection_t *c, u_char *buf, size_t size)
 {
-    ssize_t                 len;
+    ssize_t                 len, n;
     ngx_buf_t              *b;
+    ngx_chain_t            *cl, **ll;
     ngx_event_t            *rev;
     ngx_connection_t       *pc;
     ngx_quic_frame_t       *frame;
@@ -458,7 +449,6 @@ ngx_quic_stream_recv(ngx_connection_t *c
     ngx_quic_connection_t  *qc;
 
     qs = c->quic;
-    b = qs->b;
     pc = qs->parent;
     qc = ngx_quic_get_connection(pc);
     rev = c->read;
@@ -467,11 +457,11 @@ ngx_quic_stream_recv(ngx_connection_t *c
         return NGX_ERROR;
     }
 
-    ngx_log_debug3(NGX_LOG_DEBUG_EVENT, c->log, 0,
-                   "quic stream recv id:0x%xL eof:%d avail:%z",
-                   qs->id, rev->pending_eof, b->last - b->pos);
+    ngx_log_debug2(NGX_LOG_DEBUG_EVENT, c->log, 0,
+                   "quic stream recv id:0x%xL eof:%d",
+                   qs->id, rev->pending_eof);
 
-    if (b->pos == b->last) {
+    if (qs->in == NULL) {
         rev->ready = 0;
 
         if (rev->pending_eof) {
@@ -484,16 +474,33 @@ ngx_quic_stream_recv(ngx_connection_t *c
         return NGX_AGAIN;
     }
 
-    len = ngx_min(b->last - b->pos, (ssize_t) size);
+    len = 0;
+    cl = qs->in;
+
+    for (ll = &cl; *ll; ll = &(*ll)->next) {
+        b = (*ll)->buf;
 
-    ngx_memcpy(buf, b->pos, len);
+        n = ngx_min(b->last - b->pos, (ssize_t) size);
+        buf = ngx_cpymem(buf, b->pos, n);
+
+        len += n;
+        size -= n;
+        b->pos += n;
 
-    b->pos += len;
-    qc->streams.received += len;
+        if (b->pos != b->last) {
+            break;
+        }
+    }
 
-    if (b->pos == b->last) {
-        b->pos = b->start;
-        b->last = b->start;
+    qs->in = *ll;
+    *ll = NULL;
+
+    ngx_quic_free_bufs(pc, cl);
+
+    qc->streams.received += len;
+    qs->recv_max_data += len;
+
+    if (qs->in == NULL) {
         rev->ready = rev->pending_eof;
     }
 
@@ -510,8 +517,7 @@ ngx_quic_stream_recv(ngx_connection_t *c
         frame->level = ssl_encryption_application;
         frame->type = NGX_QUIC_FT_MAX_STREAM_DATA;
         frame->u.max_stream_data.id = qs->id;
-        frame->u.max_stream_data.limit = qs->fs->received + (b->pos - b->start)
-                                         + (b->end - b->last);
+        frame->u.max_stream_data.limit = qs->recv_max_data;
 
         ngx_quic_queue_frame(qc, frame);
     }
@@ -714,6 +720,7 @@ ngx_quic_stream_cleanup_handler(void *da
 
     ngx_rbtree_delete(&qc->streams.tree, &qs->node);
     ngx_quic_free_frames(pc, &qs->fs->frames);
+    ngx_quic_free_bufs(pc, qs->in);
 
     if (qc->closing) {
         /* schedule handler call to continue ngx_quic_close_connection() */
@@ -808,9 +815,7 @@ ngx_int_t
 ngx_quic_handle_stream_frame(ngx_connection_t *c, ngx_quic_header_t *pkt,
     ngx_quic_frame_t *frame)
 {
-    size_t                     window;
     uint64_t                   last;
-    ngx_buf_t                 *b;
     ngx_pool_t                *pool;
     ngx_connection_t          *sc;
     ngx_quic_stream_t         *qs;
@@ -846,10 +851,8 @@ ngx_quic_handle_stream_frame(ngx_connect
 
         sc = qs->connection;
         fs = qs->fs;
-        b = qs->b;
-        window = b->end - b->last;
 
-        if (last > window) {
+        if (last > qs->recv_max_data) {
             qc->error = NGX_QUIC_ERR_FLOW_CONTROL_ERROR;
             goto cleanup;
         }
@@ -867,10 +870,8 @@ ngx_quic_handle_stream_frame(ngx_connect
     }
 
     fs = qs->fs;
-    b = qs->b;
-    window = (b->pos - b->start) + (b->end - b->last);
 
-    if (last > fs->received && last - fs->received > window) {
+    if (last > qs->recv_max_data) {
         qc->error = NGX_QUIC_ERR_FLOW_CONTROL_ERROR;
         return NGX_ERROR;
     }
@@ -892,10 +893,11 @@ cleanup:
 ngx_int_t
 ngx_quic_stream_input(ngx_connection_t *c, ngx_quic_frame_t *frame, void *data)
 {
+    ssize_t                   n;
     uint64_t                  id;
     ngx_buf_t                *b;
     ngx_event_t              *rev;
-    ngx_chain_t              *cl;
+    ngx_chain_t              *cl, **ll;
     ngx_quic_stream_t        *qs;
     ngx_quic_connection_t    *qc;
     ngx_quic_stream_frame_t  *f;
@@ -905,24 +907,34 @@ ngx_quic_stream_input(ngx_connection_t *
 
     f = &frame->u.stream;
     id = f->stream_id;
+    cl = frame->data;
 
-    b = qs->b;
+    for (ll = &qs->in; *ll; ll = &(*ll)->next) {
+        if ((*ll)->next) {
+            continue;
+        }
+
+        /* append to last buffer */
+
+        b = (*ll)->buf;
 
-    if ((size_t) ((b->pos - b->start) + (b->end - b->last)) < f->length) {
-        ngx_log_error(NGX_LOG_INFO, c->log, 0,
-                      "quic no space in stream buffer");
+        while (cl && b->last != b->end) {
+            n = ngx_min(cl->buf->last - cl->buf->pos, b->end - b->last);
+            b->last = ngx_cpymem(b->last, cl->buf->pos, n);
+            cl->buf->pos += n;
+
+            if (cl->buf->pos == cl->buf->last) {
+                cl = cl->next;
+            }
+        }
+    }
+
+    cl = ngx_quic_copy_chain(c, cl, 0);
+    if (cl == NGX_CHAIN_ERROR) {
         return NGX_ERROR;
     }
 
-    if ((size_t) (b->end - b->last) < f->length) {
-        b->last = ngx_movemem(b->start, b->pos, b->last - b->pos);
-        b->pos = b->start;
-    }
-
-    for (cl = frame->data; cl; cl = cl->next) {
-        b->last = ngx_cpymem(b->last, cl->buf->pos,
-                             cl->buf->last - cl->buf->pos);
-    }
+    *ll = cl;
 
     rev = qs->connection->read;
     rev->ready = 1;
@@ -995,8 +1007,7 @@ ngx_int_t
 ngx_quic_handle_stream_data_blocked_frame(ngx_connection_t *c,
     ngx_quic_header_t *pkt, ngx_quic_stream_data_blocked_frame_t *f)
 {
-    size_t                  n;
-    ngx_buf_t              *b;
+    uint64_t                limit;
     ngx_quic_frame_t       *frame;
     ngx_quic_stream_t      *qs;
     ngx_quic_connection_t  *qc;
@@ -1023,14 +1034,12 @@ ngx_quic_handle_stream_data_blocked_fram
             return NGX_OK;
         }
 
-        b = qs->b;
-        n = b->end - b->last;
+        limit = qs->recv_max_data;
 
         qs->connection->listening->handler(qs->connection);
 
     } else {
-        b = qs->b;
-        n = qs->fs->received + (b->pos - b->start) + (b->end - b->last);
+        limit = qs->recv_max_data;
     }
 
     frame = ngx_quic_alloc_frame(c);
@@ -1041,7 +1050,7 @@ ngx_quic_handle_stream_data_blocked_fram
     frame->level = pkt->level;
     frame->type = NGX_QUIC_FT_MAX_STREAM_DATA;
     frame->u.max_stream_data.id = f->id;
-    frame->u.max_stream_data.limit = n;
+    frame->u.max_stream_data.limit = limit;
 
     ngx_quic_queue_frame(qc, frame);