# HG changeset patch # User Roman Arutyunyan # Date 1620224120 -10800 # Node ID f52a2b77d406ce355ad0fc83b1af4e3ac229f1a9 # Parent 5186ee5a94b9a5cb05f599c41cb8bd7ef482ff2c QUIC: generic buffering for stream input. Previously each stream had an input buffer. Now memory is allocated as bytes arrive. Generic buffering mechanism is used for this. diff --git a/src/event/quic/ngx_event_quic.h b/src/event/quic/ngx_event_quic.h --- a/src/event/quic/ngx_event_quic.h +++ b/src/event/quic/ngx_event_quic.h @@ -79,7 +79,8 @@ struct ngx_quic_stream_s { uint64_t id; uint64_t acked; uint64_t send_max_data; - ngx_buf_t *b; + uint64_t recv_max_data; + ngx_chain_t *in; ngx_quic_frames_stream_t *fs; ngx_uint_t cancelable; /* unsigned cancelable:1; */ }; diff --git a/src/event/quic/ngx_event_quic_ack.c b/src/event/quic/ngx_event_quic_ack.c --- a/src/event/quic/ngx_event_quic_ack.c +++ b/src/event/quic/ngx_event_quic_ack.c @@ -432,8 +432,6 @@ ngx_quic_detect_lost(ngx_connection_t *c void ngx_quic_resend_frames(ngx_connection_t *c, ngx_quic_send_ctx_t *ctx) { - size_t n; - ngx_buf_t *b; ngx_queue_t *q; ngx_quic_frame_t *f, *start; ngx_quic_stream_t *qs; @@ -497,13 +495,7 @@ ngx_quic_resend_frames(ngx_connection_t break; } - b = qs->b; - n = qs->fs->received + (b->pos - b->start) + (b->end - b->last); - - if (f->u.max_stream_data.limit < n) { - f->u.max_stream_data.limit = n; - } - + f->u.max_stream_data.limit = qs->recv_max_data; ngx_quic_queue_frame(qc, f); break; diff --git a/src/event/quic/ngx_event_quic_frames.c b/src/event/quic/ngx_event_quic_frames.c --- a/src/event/quic/ngx_event_quic_frames.c +++ b/src/event/quic/ngx_event_quic_frames.c @@ -13,7 +13,6 @@ #define NGX_QUIC_BUFFER_SIZE 4096 -static void ngx_quic_free_bufs(ngx_connection_t *c, ngx_chain_t *in); static ngx_chain_t *ngx_quic_split_bufs(ngx_connection_t *c, ngx_chain_t *in, size_t len); @@ -84,7 +83,7 @@ ngx_quic_free_frame(ngx_connection_t *c, } -static void +void ngx_quic_free_bufs(ngx_connection_t *c, ngx_chain_t *in) { ngx_buf_t *b, *shadow; diff --git a/src/event/quic/ngx_event_quic_frames.h b/src/event/quic/ngx_event_quic_frames.h --- a/src/event/quic/ngx_event_quic_frames.h +++ b/src/event/quic/ngx_event_quic_frames.h @@ -28,6 +28,7 @@ ngx_chain_t *ngx_quic_copy_buf(ngx_conne size_t len); ngx_chain_t *ngx_quic_copy_chain(ngx_connection_t *c, ngx_chain_t *in, size_t limit); +void ngx_quic_free_bufs(ngx_connection_t *c, ngx_chain_t *in); ngx_int_t ngx_quic_handle_ordered_frame(ngx_connection_t *c, ngx_quic_frames_stream_t *fs, ngx_quic_frame_t *frame, diff --git a/src/event/quic/ngx_event_quic_streams.c b/src/event/quic/ngx_event_quic_streams.c --- a/src/event/quic/ngx_event_quic_streams.c +++ b/src/event/quic/ngx_event_quic_streams.c @@ -16,7 +16,7 @@ static ngx_quic_stream_t *ngx_quic_create_client_stream(ngx_connection_t *c, uint64_t id); static ngx_quic_stream_t *ngx_quic_create_stream(ngx_connection_t *c, - uint64_t id, size_t rcvbuf_size); + uint64_t id); static ssize_t ngx_quic_stream_recv(ngx_connection_t *c, u_char *buf, size_t size); static ssize_t ngx_quic_stream_send(ngx_connection_t *c, u_char *buf, @@ -30,7 +30,6 @@ static void ngx_quic_stream_cleanup_hand ngx_connection_t * ngx_quic_open_stream(ngx_connection_t *c, ngx_uint_t bidi) { - size_t rcvbuf_size; uint64_t id; ngx_quic_stream_t *qs, *nqs; ngx_quic_connection_t *qc; @@ -58,7 +57,6 @@ ngx_quic_open_stream(ngx_connection_t *c qc->streams.server_max_streams_bidi, id); qc->streams.server_streams_bidi++; - rcvbuf_size = qc->tp.initial_max_stream_data_bidi_local; } else { if (qc->streams.server_streams_uni @@ -81,10 +79,9 @@ ngx_quic_open_stream(ngx_connection_t *c qc->streams.server_max_streams_uni, id); qc->streams.server_streams_uni++; - rcvbuf_size = 0; } - nqs = ngx_quic_create_stream(qs->parent, id, rcvbuf_size); + nqs = ngx_quic_create_stream(qs->parent, id); if (nqs == NULL) { return NULL; } @@ -235,7 +232,6 @@ ngx_quic_reset_stream(ngx_connection_t * static ngx_quic_stream_t * ngx_quic_create_client_stream(ngx_connection_t *c, uint64_t id) { - size_t n; uint64_t min_id; ngx_quic_stream_t *qs; ngx_quic_connection_t *qc; @@ -272,7 +268,6 @@ ngx_quic_create_client_stream(ngx_connec min_id = (qc->streams.client_streams_uni << 2) | NGX_QUIC_STREAM_UNIDIRECTIONAL; qc->streams.client_streams_uni = (id >> 2) + 1; - n = qc->tp.initial_max_stream_data_uni; } else { @@ -296,11 +291,6 @@ ngx_quic_create_client_stream(ngx_connec min_id = (qc->streams.client_streams_bidi << 2); qc->streams.client_streams_bidi = (id >> 2) + 1; - n = qc->tp.initial_max_stream_data_bidi_remote; - } - - if (n < NGX_QUIC_STREAM_BUFSIZE) { - n = NGX_QUIC_STREAM_BUFSIZE; } /* @@ -314,7 +304,7 @@ ngx_quic_create_client_stream(ngx_connec for ( /* void */ ; min_id < id; min_id += 0x04) { - qs = ngx_quic_create_stream(c, min_id, n); + qs = ngx_quic_create_stream(c, min_id); if (qs == NULL) { return NULL; } @@ -326,12 +316,12 @@ ngx_quic_create_client_stream(ngx_connec } } - return ngx_quic_create_stream(c, id, n); + return ngx_quic_create_stream(c, id); } static ngx_quic_stream_t * -ngx_quic_create_stream(ngx_connection_t *c, uint64_t id, size_t rcvbuf_size) +ngx_quic_create_stream(ngx_connection_t *c, uint64_t id) { ngx_log_t *log; ngx_pool_t *pool; @@ -360,12 +350,6 @@ ngx_quic_create_stream(ngx_connection_t qs->parent = c; qs->id = id; - qs->b = ngx_create_temp_buf(pool, rcvbuf_size); - if (qs->b == NULL) { - ngx_destroy_pool(pool); - return NULL; - } - qs->fs = ngx_pcalloc(pool, sizeof(ngx_quic_frames_stream_t)); if (qs->fs == NULL) { ngx_destroy_pool(pool); @@ -420,13 +404,19 @@ ngx_quic_create_stream(ngx_connection_t if (id & NGX_QUIC_STREAM_UNIDIRECTIONAL) { if (id & NGX_QUIC_STREAM_SERVER_INITIATED) { qs->send_max_data = qc->ctp.initial_max_stream_data_uni; + + } else { + qs->recv_max_data = qc->tp.initial_max_stream_data_uni; } } else { if (id & NGX_QUIC_STREAM_SERVER_INITIATED) { qs->send_max_data = qc->ctp.initial_max_stream_data_bidi_remote; + qs->recv_max_data = qc->tp.initial_max_stream_data_bidi_local; + } else { qs->send_max_data = qc->ctp.initial_max_stream_data_bidi_local; + qs->recv_max_data = qc->tp.initial_max_stream_data_bidi_remote; } } @@ -449,8 +439,9 @@ ngx_quic_create_stream(ngx_connection_t static ssize_t ngx_quic_stream_recv(ngx_connection_t *c, u_char *buf, size_t size) { - ssize_t len; + ssize_t len, n; ngx_buf_t *b; + ngx_chain_t *cl, **ll; ngx_event_t *rev; ngx_connection_t *pc; ngx_quic_frame_t *frame; @@ -458,7 +449,6 @@ ngx_quic_stream_recv(ngx_connection_t *c ngx_quic_connection_t *qc; qs = c->quic; - b = qs->b; pc = qs->parent; qc = ngx_quic_get_connection(pc); rev = c->read; @@ -467,11 +457,11 @@ ngx_quic_stream_recv(ngx_connection_t *c return NGX_ERROR; } - ngx_log_debug3(NGX_LOG_DEBUG_EVENT, c->log, 0, - "quic stream recv id:0x%xL eof:%d avail:%z", - qs->id, rev->pending_eof, b->last - b->pos); + ngx_log_debug2(NGX_LOG_DEBUG_EVENT, c->log, 0, + "quic stream recv id:0x%xL eof:%d", + qs->id, rev->pending_eof); - if (b->pos == b->last) { + if (qs->in == NULL) { rev->ready = 0; if (rev->pending_eof) { @@ -484,16 +474,33 @@ ngx_quic_stream_recv(ngx_connection_t *c return NGX_AGAIN; } - len = ngx_min(b->last - b->pos, (ssize_t) size); + len = 0; + cl = qs->in; + + for (ll = &cl; *ll; ll = &(*ll)->next) { + b = (*ll)->buf; - ngx_memcpy(buf, b->pos, len); + n = ngx_min(b->last - b->pos, (ssize_t) size); + buf = ngx_cpymem(buf, b->pos, n); + + len += n; + size -= n; + b->pos += n; - b->pos += len; - qc->streams.received += len; + if (b->pos != b->last) { + break; + } + } - if (b->pos == b->last) { - b->pos = b->start; - b->last = b->start; + qs->in = *ll; + *ll = NULL; + + ngx_quic_free_bufs(pc, cl); + + qc->streams.received += len; + qs->recv_max_data += len; + + if (qs->in == NULL) { rev->ready = rev->pending_eof; } @@ -510,8 +517,7 @@ ngx_quic_stream_recv(ngx_connection_t *c frame->level = ssl_encryption_application; frame->type = NGX_QUIC_FT_MAX_STREAM_DATA; frame->u.max_stream_data.id = qs->id; - frame->u.max_stream_data.limit = qs->fs->received + (b->pos - b->start) - + (b->end - b->last); + frame->u.max_stream_data.limit = qs->recv_max_data; ngx_quic_queue_frame(qc, frame); } @@ -714,6 +720,7 @@ ngx_quic_stream_cleanup_handler(void *da ngx_rbtree_delete(&qc->streams.tree, &qs->node); ngx_quic_free_frames(pc, &qs->fs->frames); + ngx_quic_free_bufs(pc, qs->in); if (qc->closing) { /* schedule handler call to continue ngx_quic_close_connection() */ @@ -808,9 +815,7 @@ ngx_int_t ngx_quic_handle_stream_frame(ngx_connection_t *c, ngx_quic_header_t *pkt, ngx_quic_frame_t *frame) { - size_t window; uint64_t last; - ngx_buf_t *b; ngx_pool_t *pool; ngx_connection_t *sc; ngx_quic_stream_t *qs; @@ -846,10 +851,8 @@ ngx_quic_handle_stream_frame(ngx_connect sc = qs->connection; fs = qs->fs; - b = qs->b; - window = b->end - b->last; - if (last > window) { + if (last > qs->recv_max_data) { qc->error = NGX_QUIC_ERR_FLOW_CONTROL_ERROR; goto cleanup; } @@ -867,10 +870,8 @@ ngx_quic_handle_stream_frame(ngx_connect } fs = qs->fs; - b = qs->b; - window = (b->pos - b->start) + (b->end - b->last); - if (last > fs->received && last - fs->received > window) { + if (last > qs->recv_max_data) { qc->error = NGX_QUIC_ERR_FLOW_CONTROL_ERROR; return NGX_ERROR; } @@ -892,10 +893,11 @@ cleanup: ngx_int_t ngx_quic_stream_input(ngx_connection_t *c, ngx_quic_frame_t *frame, void *data) { + ssize_t n; uint64_t id; ngx_buf_t *b; ngx_event_t *rev; - ngx_chain_t *cl; + ngx_chain_t *cl, **ll; ngx_quic_stream_t *qs; ngx_quic_connection_t *qc; ngx_quic_stream_frame_t *f; @@ -905,24 +907,34 @@ ngx_quic_stream_input(ngx_connection_t * f = &frame->u.stream; id = f->stream_id; + cl = frame->data; - b = qs->b; + for (ll = &qs->in; *ll; ll = &(*ll)->next) { + if ((*ll)->next) { + continue; + } + + /* append to last buffer */ + + b = (*ll)->buf; - if ((size_t) ((b->pos - b->start) + (b->end - b->last)) < f->length) { - ngx_log_error(NGX_LOG_INFO, c->log, 0, - "quic no space in stream buffer"); + while (cl && b->last != b->end) { + n = ngx_min(cl->buf->last - cl->buf->pos, b->end - b->last); + b->last = ngx_cpymem(b->last, cl->buf->pos, n); + cl->buf->pos += n; + + if (cl->buf->pos == cl->buf->last) { + cl = cl->next; + } + } + } + + cl = ngx_quic_copy_chain(c, cl, 0); + if (cl == NGX_CHAIN_ERROR) { return NGX_ERROR; } - if ((size_t) (b->end - b->last) < f->length) { - b->last = ngx_movemem(b->start, b->pos, b->last - b->pos); - b->pos = b->start; - } - - for (cl = frame->data; cl; cl = cl->next) { - b->last = ngx_cpymem(b->last, cl->buf->pos, - cl->buf->last - cl->buf->pos); - } + *ll = cl; rev = qs->connection->read; rev->ready = 1; @@ -995,8 +1007,7 @@ ngx_int_t ngx_quic_handle_stream_data_blocked_frame(ngx_connection_t *c, ngx_quic_header_t *pkt, ngx_quic_stream_data_blocked_frame_t *f) { - size_t n; - ngx_buf_t *b; + uint64_t limit; ngx_quic_frame_t *frame; ngx_quic_stream_t *qs; ngx_quic_connection_t *qc; @@ -1023,14 +1034,12 @@ ngx_quic_handle_stream_data_blocked_fram return NGX_OK; } - b = qs->b; - n = b->end - b->last; + limit = qs->recv_max_data; qs->connection->listening->handler(qs->connection); } else { - b = qs->b; - n = qs->fs->received + (b->pos - b->start) + (b->end - b->last); + limit = qs->recv_max_data; } frame = ngx_quic_alloc_frame(c); @@ -1041,7 +1050,7 @@ ngx_quic_handle_stream_data_blocked_fram frame->level = pkt->level; frame->type = NGX_QUIC_FT_MAX_STREAM_DATA; frame->u.max_stream_data.id = f->id; - frame->u.max_stream_data.limit = n; + frame->u.max_stream_data.limit = limit; ngx_quic_queue_frame(qc, frame);