# HG changeset patch # User Roman Arutyunyan # Date 1623049966 -10800 # Node ID af33d1ef1c3ce28e85f41b8c6d4e38f6b114cf96 # Parent ac0398da8f23d5765f18b3763b308ab8de105ac6 QUIC: stream flow control refactored. - Function ngx_quic_control_flow() is introduced. This functions does both MAX_DATA and MAX_STREAM_DATA flow controls. The function is called from STREAM and RESET_STREAM frame handlers. Previously, flow control was only accounted for STREAM. Also, MAX_DATA flow control was not accounted at all. - Function ngx_quic_update_flow() is introduced. This function advances flow control windows and sends MAX_DATA/MAX_STREAM_DATA. The function is called from RESET_STREAM frame handler, stream cleanup handler and stream recv() handler. diff --git a/src/event/quic/ngx_event_quic.c b/src/event/quic/ngx_event_quic.c --- a/src/event/quic/ngx_event_quic.c +++ b/src/event/quic/ngx_event_quic.c @@ -303,6 +303,7 @@ ngx_quic_new_connection(ngx_connection_t ctp->active_connection_id_limit = 2; qc->streams.recv_max_data = qc->tp.initial_max_data; + qc->streams.recv_window = qc->streams.recv_max_data; qc->streams.client_max_streams_uni = qc->tp.initial_max_streams_uni; qc->streams.client_max_streams_bidi = qc->tp.initial_max_streams_bidi; diff --git a/src/event/quic/ngx_event_quic.h b/src/event/quic/ngx_event_quic.h --- a/src/event/quic/ngx_event_quic.h +++ b/src/event/quic/ngx_event_quic.h @@ -75,6 +75,7 @@ struct ngx_quic_stream_s { uint64_t send_max_data; uint64_t recv_max_data; uint64_t recv_offset; + uint64_t recv_window; uint64_t recv_last; uint64_t final_size; ngx_chain_t *in; diff --git a/src/event/quic/ngx_event_quic_connection.h b/src/event/quic/ngx_event_quic_connection.h --- a/src/event/quic/ngx_event_quic_connection.h +++ b/src/event/quic/ngx_event_quic_connection.h @@ -115,8 +115,10 @@ typedef struct { ngx_rbtree_t tree; ngx_rbtree_node_t sentinel; - uint64_t received; uint64_t sent; + uint64_t recv_offset; + uint64_t recv_window; + uint64_t recv_last; uint64_t recv_max_data; uint64_t send_max_data; diff --git a/src/event/quic/ngx_event_quic_streams.c b/src/event/quic/ngx_event_quic_streams.c --- a/src/event/quic/ngx_event_quic_streams.c +++ b/src/event/quic/ngx_event_quic_streams.c @@ -25,6 +25,8 @@ static ngx_chain_t *ngx_quic_stream_send ngx_chain_t *in, off_t limit); static size_t ngx_quic_max_stream_flow(ngx_connection_t *c); static void ngx_quic_stream_cleanup_handler(void *data); +static ngx_int_t ngx_quic_control_flow(ngx_connection_t *c, uint64_t last); +static ngx_int_t ngx_quic_update_flow(ngx_connection_t *c, uint64_t last); ngx_connection_t * @@ -413,6 +415,8 @@ ngx_quic_create_stream(ngx_connection_t } } + qs->recv_window = qs->recv_max_data; + cln = ngx_pool_cleanup_add(pool, 0); if (cln == NULL) { ngx_close_connection(sc); @@ -432,18 +436,15 @@ ngx_quic_create_stream(ngx_connection_t static ssize_t ngx_quic_stream_recv(ngx_connection_t *c, u_char *buf, size_t size) { - ssize_t len, n; - ngx_buf_t *b; - ngx_chain_t *cl, **ll; - ngx_event_t *rev; - ngx_connection_t *pc; - ngx_quic_frame_t *frame; - ngx_quic_stream_t *qs; - ngx_quic_connection_t *qc; + ssize_t len, n; + ngx_buf_t *b; + ngx_chain_t *cl, **ll; + ngx_event_t *rev; + ngx_connection_t *pc; + ngx_quic_stream_t *qs; qs = c->quic; pc = qs->parent; - qc = ngx_quic_get_connection(pc); rev = c->read; if (rev->error) { @@ -495,10 +496,6 @@ ngx_quic_stream_recv(ngx_connection_t *c ngx_quic_free_bufs(pc, cl); - qc->streams.received += len; - qs->recv_max_data += len; - qs->recv_offset += len; - if (qs->in == NULL) { rev->ready = rev->pending_eof; } @@ -506,39 +503,8 @@ ngx_quic_stream_recv(ngx_connection_t *c ngx_log_debug2(NGX_LOG_DEBUG_EVENT, c->log, 0, "quic stream id:0x%xL recv len:%z", qs->id, len); - if (!rev->pending_eof) { - frame = ngx_quic_alloc_frame(pc); - if (frame == NULL) { - return NGX_ERROR; - } - - frame->level = ssl_encryption_application; - frame->type = NGX_QUIC_FT_MAX_STREAM_DATA; - frame->u.max_stream_data.id = qs->id; - frame->u.max_stream_data.limit = qs->recv_max_data; - - ngx_quic_queue_frame(qc, frame); - } - - if ((qc->streams.recv_max_data / 2) < qc->streams.received) { - - frame = ngx_quic_alloc_frame(pc); - - if (frame == NULL) { - return NGX_ERROR; - } - - qc->streams.recv_max_data *= 2; - - frame->level = ssl_encryption_application; - frame->type = NGX_QUIC_FT_MAX_DATA; - frame->u.max_data.max_data = qc->streams.recv_max_data; - - ngx_quic_queue_frame(qc, frame); - - ngx_log_debug2(NGX_LOG_DEBUG_EVENT, c->log, 0, - "quic stream id:0x%xL recv: increased max_data:%uL", - qs->id, qc->streams.recv_max_data); + if (ngx_quic_update_flow(c, qs->recv_offset + len) != NGX_OK) { + return NGX_ERROR; } return len; @@ -729,6 +695,10 @@ ngx_quic_stream_cleanup_handler(void *da goto done; } + c->read->pending_eof = 1; + + (void) ngx_quic_update_flow(c, qs->recv_last); + if ((qs->id & NGX_QUIC_STREAM_SERVER_INITIATED) == 0 || (qs->id & NGX_QUIC_STREAM_UNIDIRECTIONAL) == 0) { @@ -848,8 +818,7 @@ ngx_quic_handle_stream_frame(ngx_connect sc = qs->connection; - if (last > qs->recv_max_data) { - qc->error = NGX_QUIC_ERR_FLOW_CONTROL_ERROR; + if (ngx_quic_control_flow(sc, last) != NGX_OK) { goto cleanup; } @@ -858,8 +827,6 @@ ngx_quic_handle_stream_frame(ngx_connect qs->final_size = last; } - qs->recv_last = last; - if (f->offset == 0) { sc->read->ready = 1; } @@ -873,8 +840,15 @@ ngx_quic_handle_stream_frame(ngx_connect return NGX_OK; } - if (last > qs->recv_max_data) { - qc->error = NGX_QUIC_ERR_FLOW_CONTROL_ERROR; + sc = qs->connection; + + rev = sc->read; + + if (rev->error) { + return NGX_OK; + } + + if (ngx_quic_control_flow(sc, last) != NGX_OK) { return NGX_ERROR; } @@ -887,17 +861,11 @@ ngx_quic_handle_stream_frame(ngx_connect return NGX_OK; } - if (qs->recv_last < last) { - qs->recv_last = last; - } - if (f->offset < qs->recv_offset) { ngx_quic_trim_bufs(frame->data, qs->recv_offset - f->offset); f->offset = qs->recv_offset; } - rev = qs->connection->read; - if (f->fin) { if (qs->final_size != (uint64_t) -1 && qs->final_size != last) { qc->error = NGX_QUIC_ERR_FINAL_SIZE_ERROR; @@ -1108,6 +1076,7 @@ ngx_int_t ngx_quic_handle_reset_stream_frame(ngx_connection_t *c, ngx_quic_header_t *pkt, ngx_quic_reset_stream_frame_t *f) { + ngx_pool_t *pool; ngx_event_t *rev; ngx_connection_t *sc; ngx_quic_stream_t *qs; @@ -1135,19 +1104,37 @@ ngx_quic_handle_reset_stream_frame(ngx_c return NGX_OK; } - qs->final_size = f->final_size; - sc = qs->connection; rev = sc->read; rev->error = 1; rev->ready = 1; + if (ngx_quic_control_flow(sc, f->final_size) != NGX_OK) { + goto cleanup; + } + + qs->final_size = f->final_size; + + if (ngx_quic_update_flow(sc, qs->final_size) != NGX_OK) { + goto cleanup; + } + sc->listening->handler(sc); return NGX_OK; } + sc = qs->connection; + + rev = sc->read; + rev->error = 1; + rev->ready = 1; + + if (ngx_quic_control_flow(sc, f->final_size) != NGX_OK) { + return NGX_ERROR; + } + if (qs->final_size != (uint64_t) -1 && qs->final_size != f->final_size) { qc->error = NGX_QUIC_ERR_FINAL_SIZE_ERROR; return NGX_ERROR; @@ -1160,15 +1147,24 @@ ngx_quic_handle_reset_stream_frame(ngx_c qs->final_size = f->final_size; - rev = qs->connection->read; - rev->error = 1; - rev->ready = 1; + if (ngx_quic_update_flow(sc, qs->final_size) != NGX_OK) { + return NGX_ERROR; + } if (rev->active) { rev->handler(rev); } return NGX_OK; + +cleanup: + + pool = sc->pool; + + ngx_close_connection(sc); + ngx_destroy_pool(pool); + + return NGX_ERROR; } @@ -1285,3 +1281,118 @@ ngx_quic_handle_stream_ack(ngx_connectio "quic stream ack len:%uL acked:%uL unacked:%uL", f->u.stream.length, qs->acked, sent - qs->acked); } + + +static ngx_int_t +ngx_quic_control_flow(ngx_connection_t *c, uint64_t last) +{ + uint64_t len; + ngx_event_t *rev; + ngx_quic_stream_t *qs; + ngx_quic_connection_t *qc; + + rev = c->read; + qs = c->quic; + qc = ngx_quic_get_connection(qs->parent); + + if (last <= qs->recv_last) { + return NGX_OK; + } + + len = last - qs->recv_last; + + ngx_log_debug4(NGX_LOG_DEBUG_EVENT, c->log, 0, + "quic flow control msd:%uL/%uL md:%uL/%uL", + last, qs->recv_max_data, qc->streams.recv_last + len, + qc->streams.recv_max_data); + + qs->recv_last += len; + + if (!rev->error && qs->recv_last > qs->recv_max_data) { + qc->error = NGX_QUIC_ERR_FLOW_CONTROL_ERROR; + return NGX_ERROR; + } + + qc->streams.recv_last += len; + + if (qc->streams.recv_last > qc->streams.recv_max_data) { + qc->error = NGX_QUIC_ERR_FLOW_CONTROL_ERROR; + return NGX_ERROR; + } + + return NGX_OK; +} + + +static ngx_int_t +ngx_quic_update_flow(ngx_connection_t *c, uint64_t last) +{ + uint64_t len; + ngx_event_t *rev; + ngx_connection_t *pc; + ngx_quic_frame_t *frame; + ngx_quic_stream_t *qs; + ngx_quic_connection_t *qc; + + rev = c->read; + qs = c->quic; + pc = qs->parent; + qc = ngx_quic_get_connection(pc); + + if (last <= qs->recv_offset) { + return NGX_OK; + } + + len = last - qs->recv_offset; + + ngx_log_debug1(NGX_LOG_DEBUG_EVENT, c->log, 0, + "quic flow update %uL", last); + + qs->recv_offset += len; + + if (!rev->pending_eof && !rev->error + && qs->recv_max_data <= qs->recv_offset + qs->recv_window / 2) + { + qs->recv_max_data = qs->recv_offset + qs->recv_window; + + ngx_log_debug1(NGX_LOG_DEBUG_EVENT, c->log, 0, + "quic flow update msd:%uL", qs->recv_max_data); + + frame = ngx_quic_alloc_frame(pc); + if (frame == NULL) { + return NGX_ERROR; + } + + frame->level = ssl_encryption_application; + frame->type = NGX_QUIC_FT_MAX_STREAM_DATA; + frame->u.max_stream_data.id = qs->id; + frame->u.max_stream_data.limit = qs->recv_max_data; + + ngx_quic_queue_frame(qc, frame); + } + + qc->streams.recv_offset += len; + + if (qc->streams.recv_max_data + <= qc->streams.recv_offset + qc->streams.recv_window / 2) + { + qc->streams.recv_max_data = qc->streams.recv_offset + + qc->streams.recv_window; + + ngx_log_debug1(NGX_LOG_DEBUG_EVENT, pc->log, 0, + "quic flow update md:%uL", qc->streams.recv_max_data); + + frame = ngx_quic_alloc_frame(pc); + if (frame == NULL) { + return NGX_ERROR; + } + + frame->level = ssl_encryption_application; + frame->type = NGX_QUIC_FT_MAX_DATA; + frame->u.max_data.max_data = qc->streams.recv_max_data; + + ngx_quic_queue_frame(qc, frame); + } + + return NGX_OK; +}