diff src/event/quic/ngx_event_quic_streams.c @ 8782:b3f6ad181df4 quic

QUIC: refactored CRYPTO and STREAM buffer ordering. Generic function ngx_quic_order_bufs() is introduced. This function creates and maintains a chain of buffers with holes. Holes are marked with b->sync flag. Several buffers and holes in this chain may share the same underlying memory buffer. When processing STREAM frames with this function, frame data is copied only once to the right place in the stream input chain. Previously data could be copied twice. First when buffering an out-of-order frame data, and then when filling stream buffer from ordered frame queue. Now there's only one data chain for both tasks.
author Roman Arutyunyan <arut@nginx.com>
date Tue, 25 May 2021 13:55:12 +0300
parents f52a2b77d406
children 60c6e8d8d3ae
line wrap: on
line diff
--- a/src/event/quic/ngx_event_quic_streams.c
+++ b/src/event/quic/ngx_event_quic_streams.c
@@ -349,14 +349,7 @@ ngx_quic_create_stream(ngx_connection_t 
     qs->node.key = id;
     qs->parent = c;
     qs->id = id;
-
-    qs->fs = ngx_pcalloc(pool, sizeof(ngx_quic_frames_stream_t));
-    if (qs->fs == NULL) {
-        ngx_destroy_pool(pool);
-        return NULL;
-    }
-
-    ngx_queue_init(&qs->fs->frames);
+    qs->final_size = (uint64_t) -1;
 
     log = ngx_palloc(pool, sizeof(ngx_log_t));
     if (log == NULL) {
@@ -457,14 +450,14 @@ ngx_quic_stream_recv(ngx_connection_t *c
         return NGX_ERROR;
     }
 
-    ngx_log_debug2(NGX_LOG_DEBUG_EVENT, c->log, 0,
-                   "quic stream recv id:0x%xL eof:%d",
-                   qs->id, rev->pending_eof);
+    ngx_log_debug3(NGX_LOG_DEBUG_EVENT, c->log, 0,
+                   "quic stream id:0x%xL recv eof:%d buf:%uz",
+                   qs->id, rev->pending_eof, size);
 
-    if (qs->in == NULL) {
+    if (qs->in == NULL || qs->in->buf->sync) {
         rev->ready = 0;
 
-        if (rev->pending_eof) {
+        if (qs->recv_offset == qs->final_size) {
             rev->eof = 1;
             return 0;
         }
@@ -480,6 +473,11 @@ ngx_quic_stream_recv(ngx_connection_t *c
     for (ll = &cl; *ll; ll = &(*ll)->next) {
         b = (*ll)->buf;
 
+        if (b->sync) {
+            /* hole */
+            break;
+        }
+
         n = ngx_min(b->last - b->pos, (ssize_t) size);
         buf = ngx_cpymem(buf, b->pos, n);
 
@@ -499,14 +497,14 @@ ngx_quic_stream_recv(ngx_connection_t *c
 
     qc->streams.received += len;
     qs->recv_max_data += len;
+    qs->recv_offset += len;
 
     if (qs->in == NULL) {
         rev->ready = rev->pending_eof;
     }
 
-    ngx_log_debug3(NGX_LOG_DEBUG_EVENT, c->log, 0,
-                   "quic stream id:0x%xL recv len:%z of size:%uz",
-                   qs->id, len, size);
+    ngx_log_debug2(NGX_LOG_DEBUG_EVENT, c->log, 0,
+                   "quic stream id:0x%xL recv len:%z", qs->id, len);
 
     if (!rev->pending_eof) {
         frame = ngx_quic_alloc_frame(pc);
@@ -719,7 +717,6 @@ ngx_quic_stream_cleanup_handler(void *da
                    "quic stream id:0x%xL cleanup", qs->id);
 
     ngx_rbtree_delete(&qc->streams.tree, &qs->node);
-    ngx_quic_free_frames(pc, &qs->fs->frames);
     ngx_quic_free_bufs(pc, qs->in);
 
     if (qc->closing) {
@@ -815,13 +812,12 @@ ngx_int_t
 ngx_quic_handle_stream_frame(ngx_connection_t *c, ngx_quic_header_t *pkt,
     ngx_quic_frame_t *frame)
 {
-    uint64_t                   last;
-    ngx_pool_t                *pool;
-    ngx_connection_t          *sc;
-    ngx_quic_stream_t         *qs;
-    ngx_quic_connection_t     *qc;
-    ngx_quic_stream_frame_t   *f;
-    ngx_quic_frames_stream_t  *fs;
+    uint64_t                  last;
+    ngx_pool_t               *pool;
+    ngx_connection_t         *sc;
+    ngx_quic_stream_t        *qs;
+    ngx_quic_connection_t    *qc;
+    ngx_quic_stream_frame_t  *f;
 
     qc = ngx_quic_get_connection(c);
     f = &frame->u.stream;
@@ -850,17 +846,22 @@ ngx_quic_handle_stream_frame(ngx_connect
         }
 
         sc = qs->connection;
-        fs = qs->fs;
 
         if (last > qs->recv_max_data) {
             qc->error = NGX_QUIC_ERR_FLOW_CONTROL_ERROR;
             goto cleanup;
         }
 
-        if (ngx_quic_handle_ordered_frame(c, fs, frame, ngx_quic_stream_input,
-                                          qs)
-            != NGX_OK)
-        {
+        if (f->fin) {
+            sc->read->pending_eof = 1;
+            qs->final_size = last;
+        }
+
+        if (f->offset == 0) {
+            sc->read->ready = 1;
+        }
+
+        if (ngx_quic_order_bufs(c, &qs->in, frame->data, f->offset) != NGX_OK) {
             goto cleanup;
         }
 
@@ -869,15 +870,41 @@ ngx_quic_handle_stream_frame(ngx_connect
         return NGX_OK;
     }
 
-    fs = qs->fs;
-
     if (last > qs->recv_max_data) {
         qc->error = NGX_QUIC_ERR_FLOW_CONTROL_ERROR;
         return NGX_ERROR;
     }
 
-    return ngx_quic_handle_ordered_frame(c, fs, frame, ngx_quic_stream_input,
-                                         qs);
+    if (qs->final_size != (uint64_t) -1 && last > qs->final_size) {
+        qc->error = NGX_QUIC_ERR_FINAL_SIZE_ERROR;
+        return NGX_ERROR;
+    }
+
+    if (last <= qs->recv_offset) {
+        return NGX_OK;
+    }
+
+    if (f->offset < qs->recv_offset) {
+        ngx_quic_trim_bufs(frame->data, qs->recv_offset - f->offset);
+        f->offset = qs->recv_offset;
+    }
+
+    if (f->offset == qs->recv_offset) {
+        qs->connection->read->ready = 1;
+    }
+
+    if (f->fin) {
+        if (qs->final_size != (uint64_t) -1 && qs->final_size != last) {
+            qc->error = NGX_QUIC_ERR_FINAL_SIZE_ERROR;
+            return NGX_ERROR;
+        }
+
+        qs->connection->read->pending_eof = 1;
+        qs->final_size = last;
+    }
+
+    return ngx_quic_order_bufs(c, &qs->in, frame->data,
+                               f->offset - qs->recv_offset);
 
 cleanup:
 
@@ -891,72 +918,6 @@ cleanup:
 
 
 ngx_int_t
-ngx_quic_stream_input(ngx_connection_t *c, ngx_quic_frame_t *frame, void *data)
-{
-    ssize_t                   n;
-    uint64_t                  id;
-    ngx_buf_t                *b;
-    ngx_event_t              *rev;
-    ngx_chain_t              *cl, **ll;
-    ngx_quic_stream_t        *qs;
-    ngx_quic_connection_t    *qc;
-    ngx_quic_stream_frame_t  *f;
-
-    qc = ngx_quic_get_connection(c);
-    qs = data;
-
-    f = &frame->u.stream;
-    id = f->stream_id;
-    cl = frame->data;
-
-    for (ll = &qs->in; *ll; ll = &(*ll)->next) {
-        if ((*ll)->next) {
-            continue;
-        }
-
-        /* append to last buffer */
-
-        b = (*ll)->buf;
-
-        while (cl && b->last != b->end) {
-            n = ngx_min(cl->buf->last - cl->buf->pos, b->end - b->last);
-            b->last = ngx_cpymem(b->last, cl->buf->pos, n);
-            cl->buf->pos += n;
-
-            if (cl->buf->pos == cl->buf->last) {
-                cl = cl->next;
-            }
-        }
-    }
-
-    cl = ngx_quic_copy_chain(c, cl, 0);
-    if (cl == NGX_CHAIN_ERROR) {
-        return NGX_ERROR;
-    }
-
-    *ll = cl;
-
-    rev = qs->connection->read;
-    rev->ready = 1;
-
-    if (f->fin) {
-        rev->pending_eof = 1;
-    }
-
-    if (rev->active) {
-        rev->handler(rev);
-    }
-
-    /* check if stream was destroyed by handler */
-    if (ngx_quic_find_stream(&qc->streams.tree, id) == NULL) {
-        return NGX_DONE;
-    }
-
-    return NGX_OK;
-}
-
-
-ngx_int_t
 ngx_quic_handle_max_data_frame(ngx_connection_t *c,
     ngx_quic_max_data_frame_t *f)
 {
@@ -1150,6 +1111,8 @@ ngx_quic_handle_reset_stream_frame(ngx_c
             return NGX_OK;
         }
 
+        qs->final_size = f->final_size;
+
         sc = qs->connection;
 
         rev = sc->read;
@@ -1161,6 +1124,13 @@ ngx_quic_handle_reset_stream_frame(ngx_c
         return NGX_OK;
     }
 
+    if (qs->final_size != (uint64_t) -1 && qs->final_size != f->final_size) {
+        qc->error = NGX_QUIC_ERR_FINAL_SIZE_ERROR;
+        return NGX_ERROR;
+    }
+
+    qs->final_size = f->final_size;
+
     rev = qs->connection->read;
     rev->error = 1;
     rev->ready = 1;