# HG changeset patch # User Roman Arutyunyan # Date 1584967771 -10800 # Node ID f92e583fc256cd844ce5d6b1a20fb12afe60834e # Parent d45325e9022150e7740172c7b673467e2e6bdf85 Better flow control and buffering for QUIC streams. diff --git a/src/event/ngx_event_quic.c b/src/event/ngx_event_quic.c --- a/src/event/ngx_event_quic.c +++ b/src/event/ngx_event_quic.c @@ -16,6 +16,9 @@ typedef enum { } ngx_quic_state_t; +#define NGX_QUIC_STREAM_BUFSIZE 16384 + + typedef struct { ngx_rbtree_node_t node; ngx_buf_t *b; @@ -106,6 +109,8 @@ static ngx_int_t ngx_quic_handle_stream_ ngx_quic_header_t *pkt, ngx_quic_stream_frame_t *frame); static ngx_int_t ngx_quic_handle_streams_blocked_frame(ngx_connection_t *c, ngx_quic_header_t *pkt, ngx_quic_streams_blocked_frame_t *f); +static ngx_int_t ngx_quic_handle_stream_data_blocked_frame(ngx_connection_t *c, + ngx_quic_header_t *pkt, ngx_quic_stream_data_blocked_frame_t *f); static void ngx_quic_queue_frame(ngx_quic_connection_t *qc, ngx_quic_frame_t *frame); @@ -885,6 +890,18 @@ ngx_quic_payload_handler(ngx_connection_ ack_this = 1; break; + case NGX_QUIC_FT_STREAM_DATA_BLOCKED: + + if (ngx_quic_handle_stream_data_blocked_frame(c, pkt, + &frame.u.stream_data_blocked) + != NGX_OK) + { + return NGX_ERROR; + } + + ack_this = 1; + break; + default: return NGX_ERROR; } @@ -1002,6 +1019,7 @@ ngx_quic_handle_stream_frame(ngx_connect ngx_quic_header_t *pkt, ngx_quic_stream_frame_t *f) { ngx_buf_t *b; + ngx_event_t *rev; ngx_quic_connection_t *qc; ngx_quic_stream_node_t *sn; @@ -1013,15 +1031,24 @@ ngx_quic_handle_stream_frame(ngx_connect ngx_log_debug0(NGX_LOG_DEBUG_EVENT, c->log, 0, "existing stream"); b = sn->b; - if ((size_t) (b->end - b->pos) < f->length) { + if ((size_t) ((b->pos - b->start) + (b->end - b->last)) < f->length) { ngx_log_error(NGX_LOG_INFO, c->log, 0, "no space in stream buffer"); return NGX_ERROR; } - ngx_memcpy(b->pos, f->data, f->length); - b->pos += f->length; + if ((size_t) (b->end - b->last) < f->length) { + b->last = ngx_movemem(b->start, b->pos, b->last - b->pos); + b->pos = b->start; + } + + b->last = ngx_cpymem(b->last, f->data, f->length); - // TODO: notify + rev = sn->c->read; + rev->ready = 1; + + if (rev->active) { + rev->handler(rev); + } return NGX_OK; } @@ -1071,6 +1098,48 @@ ngx_quic_handle_streams_blocked_frame(ng } +static ngx_int_t +ngx_quic_handle_stream_data_blocked_frame(ngx_connection_t *c, + ngx_quic_header_t *pkt, ngx_quic_stream_data_blocked_frame_t *f) +{ + size_t n; + ngx_buf_t *b; + ngx_quic_frame_t *frame; + ngx_quic_connection_t *qc; + ngx_quic_stream_node_t *sn; + + qc = c->quic; + sn = ngx_quic_find_stream(&qc->streams.tree, f->id); + + if (sn == NULL) { + ngx_log_error(NGX_LOG_INFO, c->log, 0, "unknown stream id:%uL", f->id); + return NGX_ERROR; + } + + b = sn->b; + n = (b->pos - b->start) + (b->end - b->last); + + frame = ngx_pcalloc(c->pool, sizeof(ngx_quic_frame_t)); + if (frame == NULL) { + return NGX_ERROR; + } + + frame->level = pkt->level; + frame->type = NGX_QUIC_FT_MAX_STREAM_DATA; + frame->u.max_stream_data.id = f->id; + frame->u.max_stream_data.limit = n; + + ngx_sprintf(frame->info, "MAX_STREAM_DATA id:%d limit:%d level=%d", + (int) frame->u.max_stream_data.id, + (int) frame->u.max_stream_data.limit, + frame->level); + + ngx_quic_queue_frame(c->quic, frame); + + return NGX_OK; +} + + static void ngx_quic_queue_frame(ngx_quic_connection_t *qc, ngx_quic_frame_t *frame) { @@ -1349,6 +1418,7 @@ ngx_quic_find_stream(ngx_rbtree_t *rbtre static ngx_quic_stream_node_t * ngx_quic_create_stream(ngx_connection_t *c, ngx_uint_t id) { + size_t n; ngx_log_t *log; ngx_pool_t *pool; ngx_event_t *rev, *wev; @@ -1402,8 +1472,11 @@ ngx_quic_create_stream(ngx_connection_t sn->c->number = ngx_atomic_fetch_add(ngx_connection_counter, 1); + n = ngx_max(NGX_QUIC_STREAM_BUFSIZE, + qc->tp.initial_max_stream_data_bidi_remote); + sn->node.key =id; - sn->b = ngx_create_temp_buf(pool, 16 * 1024); // XXX enough for everyone + sn->b = ngx_create_temp_buf(pool, n); if (sn->b == NULL) { return NULL; } @@ -1456,11 +1529,10 @@ ngx_quic_stream_recv(ngx_connection_t *c b = sn->b; - if (b->last - b->pos == 0) { + if (b->pos == b->last) { c->read->ready = 0; - ngx_log_debug0(NGX_LOG_DEBUG_EVENT, c->log, 0, - "quic recv() not ready"); - return NGX_AGAIN; // ? + ngx_log_debug0(NGX_LOG_DEBUG_EVENT, c->log, 0, "quic recv() not ready"); + return NGX_AGAIN; } len = ngx_min(b->last - b->pos, (ssize_t) size); @@ -1469,6 +1541,11 @@ ngx_quic_stream_recv(ngx_connection_t *c b->pos += len; + if (b->pos == b->last) { + b->pos = b->start; + b->last = b->start; + } + ngx_log_debug2(NGX_LOG_DEBUG_EVENT, c->log, 0, "quic recv: %z of %uz", len, size); diff --git a/src/event/ngx_event_quic_transport.c b/src/event/ngx_event_quic_transport.c --- a/src/event/ngx_event_quic_transport.c +++ b/src/event/ngx_event_quic_transport.c @@ -69,6 +69,8 @@ static size_t ngx_quic_create_crypto(u_c static size_t ngx_quic_create_stream(u_char *p, ngx_quic_stream_frame_t *sf); static size_t ngx_quic_create_max_streams(u_char *p, ngx_quic_max_streams_frame_t *ms); +static size_t ngx_quic_create_max_stream_data(u_char *p, + ngx_quic_max_stream_data_frame_t *ms); static size_t ngx_quic_create_close(u_char *p, ngx_quic_close_frame_t *cl); static ngx_int_t ngx_quic_parse_transport_param(u_char *p, u_char *end, @@ -1079,6 +1081,9 @@ ngx_quic_create_frame(u_char *p, u_char case NGX_QUIC_FT_MAX_STREAMS: return ngx_quic_create_max_streams(p, &f->u.max_streams); + case NGX_QUIC_FT_MAX_STREAM_DATA: + return ngx_quic_create_max_stream_data(p, &f->u.max_stream_data); + default: /* BUG: unsupported frame type generated */ return NGX_ERROR; @@ -1459,6 +1464,29 @@ ngx_quic_parse_transport_params(u_char * } +static size_t +ngx_quic_create_max_stream_data(u_char *p, ngx_quic_max_stream_data_frame_t *ms) +{ + size_t len; + u_char *start; + + if (p == NULL) { + len = ngx_quic_varint_len(NGX_QUIC_FT_MAX_STREAM_DATA); + len += ngx_quic_varint_len(ms->id); + len += ngx_quic_varint_len(ms->limit); + return len; + } + + start = p; + + ngx_quic_build_int(&p, NGX_QUIC_FT_MAX_STREAM_DATA); + ngx_quic_build_int(&p, ms->id); + ngx_quic_build_int(&p, ms->limit); + + return p - start; +} + + ssize_t ngx_quic_create_transport_params(u_char *pos, u_char *end, ngx_quic_tp_t *tp) {