changeset 8266:f92e583fc256 quic

Better flow control and buffering for QUIC streams.
author Roman Arutyunyan <arut@nginx.com>
date Mon, 23 Mar 2020 15:49:31 +0300
parents d45325e90221
children a8349cc72c64
files src/event/ngx_event_quic.c src/event/ngx_event_quic_transport.c
diffstat 2 files changed, 114 insertions(+), 9 deletions(-) [+]
line wrap: on
line diff
--- a/src/event/ngx_event_quic.c
+++ b/src/event/ngx_event_quic.c
@@ -16,6 +16,9 @@ typedef enum {
 } ngx_quic_state_t;
 
 
+#define NGX_QUIC_STREAM_BUFSIZE        16384
+
+
 typedef struct {
     ngx_rbtree_node_t                  node;
     ngx_buf_t                         *b;
@@ -106,6 +109,8 @@ static ngx_int_t ngx_quic_handle_stream_
     ngx_quic_header_t *pkt, ngx_quic_stream_frame_t *frame);
 static ngx_int_t ngx_quic_handle_streams_blocked_frame(ngx_connection_t *c,
     ngx_quic_header_t *pkt, ngx_quic_streams_blocked_frame_t *f);
+static ngx_int_t ngx_quic_handle_stream_data_blocked_frame(ngx_connection_t *c,
+    ngx_quic_header_t *pkt, ngx_quic_stream_data_blocked_frame_t *f);
 
 static void ngx_quic_queue_frame(ngx_quic_connection_t *qc,
     ngx_quic_frame_t *frame);
@@ -885,6 +890,18 @@ ngx_quic_payload_handler(ngx_connection_
             ack_this = 1;
             break;
 
+        case NGX_QUIC_FT_STREAM_DATA_BLOCKED:
+
+            if (ngx_quic_handle_stream_data_blocked_frame(c, pkt,
+                                                  &frame.u.stream_data_blocked)
+                != NGX_OK)
+            {
+                return NGX_ERROR;
+            }
+
+            ack_this = 1;
+            break;
+
         default:
             return NGX_ERROR;
         }
@@ -1002,6 +1019,7 @@ ngx_quic_handle_stream_frame(ngx_connect
     ngx_quic_header_t *pkt, ngx_quic_stream_frame_t *f)
 {
     ngx_buf_t               *b;
+    ngx_event_t             *rev;
     ngx_quic_connection_t   *qc;
     ngx_quic_stream_node_t  *sn;
 
@@ -1013,15 +1031,24 @@ ngx_quic_handle_stream_frame(ngx_connect
         ngx_log_debug0(NGX_LOG_DEBUG_EVENT, c->log, 0, "existing stream");
         b = sn->b;
 
-        if ((size_t) (b->end - b->pos) < f->length) {
+        if ((size_t) ((b->pos - b->start) + (b->end - b->last)) < f->length) {
             ngx_log_error(NGX_LOG_INFO, c->log, 0, "no space in stream buffer");
             return NGX_ERROR;
         }
 
-        ngx_memcpy(b->pos, f->data, f->length);
-        b->pos += f->length;
+        if ((size_t) (b->end - b->last) < f->length) {
+            b->last = ngx_movemem(b->start, b->pos, b->last - b->pos);
+            b->pos = b->start;
+        }
+
+        b->last = ngx_cpymem(b->last, f->data, f->length);
 
-        // TODO: notify
+        rev = sn->c->read;
+        rev->ready = 1;
+
+        if (rev->active) {
+            rev->handler(rev);
+        }
 
         return NGX_OK;
     }
@@ -1071,6 +1098,48 @@ ngx_quic_handle_streams_blocked_frame(ng
 }
 
 
+static ngx_int_t
+ngx_quic_handle_stream_data_blocked_frame(ngx_connection_t *c,
+    ngx_quic_header_t *pkt, ngx_quic_stream_data_blocked_frame_t *f)
+{
+    size_t                   n;
+    ngx_buf_t               *b;
+    ngx_quic_frame_t        *frame;
+    ngx_quic_connection_t   *qc;
+    ngx_quic_stream_node_t  *sn;
+
+    qc = c->quic;
+    sn = ngx_quic_find_stream(&qc->streams.tree, f->id);
+
+    if (sn == NULL) {
+        ngx_log_error(NGX_LOG_INFO, c->log, 0, "unknown stream id:%uL", f->id);
+        return NGX_ERROR;
+    }
+
+    b = sn->b;
+    n = (b->pos - b->start) + (b->end - b->last);
+
+    frame = ngx_pcalloc(c->pool, sizeof(ngx_quic_frame_t));
+    if (frame == NULL) {
+        return NGX_ERROR;
+    }
+
+    frame->level = pkt->level;
+    frame->type = NGX_QUIC_FT_MAX_STREAM_DATA;
+    frame->u.max_stream_data.id = f->id;
+    frame->u.max_stream_data.limit = n;
+
+    ngx_sprintf(frame->info, "MAX_STREAM_DATA id:%d limit:%d level=%d",
+                (int) frame->u.max_stream_data.id,
+                (int) frame->u.max_stream_data.limit,
+                frame->level);
+
+    ngx_quic_queue_frame(c->quic, frame);
+
+    return NGX_OK;
+}
+
+
 static void
 ngx_quic_queue_frame(ngx_quic_connection_t *qc, ngx_quic_frame_t *frame)
 {
@@ -1349,6 +1418,7 @@ ngx_quic_find_stream(ngx_rbtree_t *rbtre
 static ngx_quic_stream_node_t *
 ngx_quic_create_stream(ngx_connection_t *c, ngx_uint_t id)
 {
+    size_t                   n;
     ngx_log_t               *log;
     ngx_pool_t              *pool;
     ngx_event_t             *rev, *wev;
@@ -1402,8 +1472,11 @@ ngx_quic_create_stream(ngx_connection_t 
 
     sn->c->number = ngx_atomic_fetch_add(ngx_connection_counter, 1);
 
+    n = ngx_max(NGX_QUIC_STREAM_BUFSIZE,
+                qc->tp.initial_max_stream_data_bidi_remote);
+
     sn->node.key =id;
-    sn->b = ngx_create_temp_buf(pool, 16 * 1024); // XXX enough for everyone
+    sn->b = ngx_create_temp_buf(pool, n);
     if (sn->b == NULL) {
         return NULL;
     }
@@ -1456,11 +1529,10 @@ ngx_quic_stream_recv(ngx_connection_t *c
 
     b = sn->b;
 
-    if (b->last - b->pos == 0) {
+    if (b->pos == b->last) {
         c->read->ready = 0;
-        ngx_log_debug0(NGX_LOG_DEBUG_EVENT, c->log, 0,
-                       "quic recv() not ready");
-        return NGX_AGAIN; // ?
+        ngx_log_debug0(NGX_LOG_DEBUG_EVENT, c->log, 0, "quic recv() not ready");
+        return NGX_AGAIN;
     }
 
     len = ngx_min(b->last - b->pos, (ssize_t) size);
@@ -1469,6 +1541,11 @@ ngx_quic_stream_recv(ngx_connection_t *c
 
     b->pos += len;
 
+    if (b->pos == b->last) {
+        b->pos = b->start;
+        b->last = b->start;
+    }
+
     ngx_log_debug2(NGX_LOG_DEBUG_EVENT, c->log, 0,
                   "quic recv: %z of %uz", len, size);
 
--- a/src/event/ngx_event_quic_transport.c
+++ b/src/event/ngx_event_quic_transport.c
@@ -69,6 +69,8 @@ static size_t ngx_quic_create_crypto(u_c
 static size_t ngx_quic_create_stream(u_char *p, ngx_quic_stream_frame_t *sf);
 static size_t ngx_quic_create_max_streams(u_char *p,
     ngx_quic_max_streams_frame_t *ms);
+static size_t ngx_quic_create_max_stream_data(u_char *p,
+    ngx_quic_max_stream_data_frame_t *ms);
 static size_t ngx_quic_create_close(u_char *p, ngx_quic_close_frame_t *cl);
 
 static ngx_int_t ngx_quic_parse_transport_param(u_char *p, u_char *end,
@@ -1079,6 +1081,9 @@ ngx_quic_create_frame(u_char *p, u_char 
     case NGX_QUIC_FT_MAX_STREAMS:
         return ngx_quic_create_max_streams(p, &f->u.max_streams);
 
+    case NGX_QUIC_FT_MAX_STREAM_DATA:
+        return ngx_quic_create_max_stream_data(p, &f->u.max_stream_data);
+
     default:
         /* BUG: unsupported frame type generated */
         return NGX_ERROR;
@@ -1459,6 +1464,29 @@ ngx_quic_parse_transport_params(u_char *
 }
 
 
+static size_t
+ngx_quic_create_max_stream_data(u_char *p, ngx_quic_max_stream_data_frame_t *ms)
+{
+    size_t   len;
+    u_char  *start;
+
+    if (p == NULL) {
+        len = ngx_quic_varint_len(NGX_QUIC_FT_MAX_STREAM_DATA);
+        len += ngx_quic_varint_len(ms->id);
+        len += ngx_quic_varint_len(ms->limit);
+        return len;
+    }
+
+    start = p;
+
+    ngx_quic_build_int(&p, NGX_QUIC_FT_MAX_STREAM_DATA);
+    ngx_quic_build_int(&p, ms->id);
+    ngx_quic_build_int(&p, ms->limit);
+
+    return p - start;
+}
+
+
 ssize_t
 ngx_quic_create_transport_params(u_char *pos, u_char *end, ngx_quic_tp_t *tp)
 {