Mercurial > hg > nginx
comparison src/event/quic/ngx_event_quic_streams.c @ 8791:af33d1ef1c3c quic
QUIC: stream flow control refactored.
- Function ngx_quic_control_flow() is introduced. This functions does
both MAX_DATA and MAX_STREAM_DATA flow controls. The function is called
from STREAM and RESET_STREAM frame handlers. Previously, flow control
was only accounted for STREAM. Also, MAX_DATA flow control was not accounted
at all.
- Function ngx_quic_update_flow() is introduced. This function advances flow
control windows and sends MAX_DATA/MAX_STREAM_DATA. The function is called
from RESET_STREAM frame handler, stream cleanup handler and stream recv()
handler.
author | Roman Arutyunyan <arut@nginx.com> |
---|---|
date | Mon, 07 Jun 2021 10:12:46 +0300 |
parents | 7d32c3c93678 |
children | 4715f3e669f1 |
comparison
equal
deleted
inserted
replaced
8790:ac0398da8f23 | 8791:af33d1ef1c3c |
---|---|
23 size_t size); | 23 size_t size); |
24 static ngx_chain_t *ngx_quic_stream_send_chain(ngx_connection_t *c, | 24 static ngx_chain_t *ngx_quic_stream_send_chain(ngx_connection_t *c, |
25 ngx_chain_t *in, off_t limit); | 25 ngx_chain_t *in, off_t limit); |
26 static size_t ngx_quic_max_stream_flow(ngx_connection_t *c); | 26 static size_t ngx_quic_max_stream_flow(ngx_connection_t *c); |
27 static void ngx_quic_stream_cleanup_handler(void *data); | 27 static void ngx_quic_stream_cleanup_handler(void *data); |
28 static ngx_int_t ngx_quic_control_flow(ngx_connection_t *c, uint64_t last); | |
29 static ngx_int_t ngx_quic_update_flow(ngx_connection_t *c, uint64_t last); | |
28 | 30 |
29 | 31 |
30 ngx_connection_t * | 32 ngx_connection_t * |
31 ngx_quic_open_stream(ngx_connection_t *c, ngx_uint_t bidi) | 33 ngx_quic_open_stream(ngx_connection_t *c, ngx_uint_t bidi) |
32 { | 34 { |
411 qs->send_max_data = qc->ctp.initial_max_stream_data_bidi_local; | 413 qs->send_max_data = qc->ctp.initial_max_stream_data_bidi_local; |
412 qs->recv_max_data = qc->tp.initial_max_stream_data_bidi_remote; | 414 qs->recv_max_data = qc->tp.initial_max_stream_data_bidi_remote; |
413 } | 415 } |
414 } | 416 } |
415 | 417 |
418 qs->recv_window = qs->recv_max_data; | |
419 | |
416 cln = ngx_pool_cleanup_add(pool, 0); | 420 cln = ngx_pool_cleanup_add(pool, 0); |
417 if (cln == NULL) { | 421 if (cln == NULL) { |
418 ngx_close_connection(sc); | 422 ngx_close_connection(sc); |
419 ngx_destroy_pool(pool); | 423 ngx_destroy_pool(pool); |
420 return NULL; | 424 return NULL; |
430 | 434 |
431 | 435 |
432 static ssize_t | 436 static ssize_t |
433 ngx_quic_stream_recv(ngx_connection_t *c, u_char *buf, size_t size) | 437 ngx_quic_stream_recv(ngx_connection_t *c, u_char *buf, size_t size) |
434 { | 438 { |
435 ssize_t len, n; | 439 ssize_t len, n; |
436 ngx_buf_t *b; | 440 ngx_buf_t *b; |
437 ngx_chain_t *cl, **ll; | 441 ngx_chain_t *cl, **ll; |
438 ngx_event_t *rev; | 442 ngx_event_t *rev; |
439 ngx_connection_t *pc; | 443 ngx_connection_t *pc; |
440 ngx_quic_frame_t *frame; | 444 ngx_quic_stream_t *qs; |
441 ngx_quic_stream_t *qs; | |
442 ngx_quic_connection_t *qc; | |
443 | 445 |
444 qs = c->quic; | 446 qs = c->quic; |
445 pc = qs->parent; | 447 pc = qs->parent; |
446 qc = ngx_quic_get_connection(pc); | |
447 rev = c->read; | 448 rev = c->read; |
448 | 449 |
449 if (rev->error) { | 450 if (rev->error) { |
450 return NGX_ERROR; | 451 return NGX_ERROR; |
451 } | 452 } |
493 qs->in = *ll; | 494 qs->in = *ll; |
494 *ll = NULL; | 495 *ll = NULL; |
495 | 496 |
496 ngx_quic_free_bufs(pc, cl); | 497 ngx_quic_free_bufs(pc, cl); |
497 | 498 |
498 qc->streams.received += len; | |
499 qs->recv_max_data += len; | |
500 qs->recv_offset += len; | |
501 | |
502 if (qs->in == NULL) { | 499 if (qs->in == NULL) { |
503 rev->ready = rev->pending_eof; | 500 rev->ready = rev->pending_eof; |
504 } | 501 } |
505 | 502 |
506 ngx_log_debug2(NGX_LOG_DEBUG_EVENT, c->log, 0, | 503 ngx_log_debug2(NGX_LOG_DEBUG_EVENT, c->log, 0, |
507 "quic stream id:0x%xL recv len:%z", qs->id, len); | 504 "quic stream id:0x%xL recv len:%z", qs->id, len); |
508 | 505 |
509 if (!rev->pending_eof) { | 506 if (ngx_quic_update_flow(c, qs->recv_offset + len) != NGX_OK) { |
510 frame = ngx_quic_alloc_frame(pc); | 507 return NGX_ERROR; |
511 if (frame == NULL) { | |
512 return NGX_ERROR; | |
513 } | |
514 | |
515 frame->level = ssl_encryption_application; | |
516 frame->type = NGX_QUIC_FT_MAX_STREAM_DATA; | |
517 frame->u.max_stream_data.id = qs->id; | |
518 frame->u.max_stream_data.limit = qs->recv_max_data; | |
519 | |
520 ngx_quic_queue_frame(qc, frame); | |
521 } | |
522 | |
523 if ((qc->streams.recv_max_data / 2) < qc->streams.received) { | |
524 | |
525 frame = ngx_quic_alloc_frame(pc); | |
526 | |
527 if (frame == NULL) { | |
528 return NGX_ERROR; | |
529 } | |
530 | |
531 qc->streams.recv_max_data *= 2; | |
532 | |
533 frame->level = ssl_encryption_application; | |
534 frame->type = NGX_QUIC_FT_MAX_DATA; | |
535 frame->u.max_data.max_data = qc->streams.recv_max_data; | |
536 | |
537 ngx_quic_queue_frame(qc, frame); | |
538 | |
539 ngx_log_debug2(NGX_LOG_DEBUG_EVENT, c->log, 0, | |
540 "quic stream id:0x%xL recv: increased max_data:%uL", | |
541 qs->id, qc->streams.recv_max_data); | |
542 } | 508 } |
543 | 509 |
544 return len; | 510 return len; |
545 } | 511 } |
546 | 512 |
726 } | 692 } |
727 | 693 |
728 if (qc->error) { | 694 if (qc->error) { |
729 goto done; | 695 goto done; |
730 } | 696 } |
697 | |
698 c->read->pending_eof = 1; | |
699 | |
700 (void) ngx_quic_update_flow(c, qs->recv_last); | |
731 | 701 |
732 if ((qs->id & NGX_QUIC_STREAM_SERVER_INITIATED) == 0 | 702 if ((qs->id & NGX_QUIC_STREAM_SERVER_INITIATED) == 0 |
733 || (qs->id & NGX_QUIC_STREAM_UNIDIRECTIONAL) == 0) | 703 || (qs->id & NGX_QUIC_STREAM_UNIDIRECTIONAL) == 0) |
734 { | 704 { |
735 if (!c->read->pending_eof && !c->read->error) { | 705 if (!c->read->pending_eof && !c->read->error) { |
846 return NGX_OK; | 816 return NGX_OK; |
847 } | 817 } |
848 | 818 |
849 sc = qs->connection; | 819 sc = qs->connection; |
850 | 820 |
851 if (last > qs->recv_max_data) { | 821 if (ngx_quic_control_flow(sc, last) != NGX_OK) { |
852 qc->error = NGX_QUIC_ERR_FLOW_CONTROL_ERROR; | |
853 goto cleanup; | 822 goto cleanup; |
854 } | 823 } |
855 | 824 |
856 if (f->fin) { | 825 if (f->fin) { |
857 sc->read->pending_eof = 1; | 826 sc->read->pending_eof = 1; |
858 qs->final_size = last; | 827 qs->final_size = last; |
859 } | 828 } |
860 | 829 |
861 qs->recv_last = last; | |
862 | |
863 if (f->offset == 0) { | 830 if (f->offset == 0) { |
864 sc->read->ready = 1; | 831 sc->read->ready = 1; |
865 } | 832 } |
866 | 833 |
867 if (ngx_quic_order_bufs(c, &qs->in, frame->data, f->offset) != NGX_OK) { | 834 if (ngx_quic_order_bufs(c, &qs->in, frame->data, f->offset) != NGX_OK) { |
871 sc->listening->handler(sc); | 838 sc->listening->handler(sc); |
872 | 839 |
873 return NGX_OK; | 840 return NGX_OK; |
874 } | 841 } |
875 | 842 |
876 if (last > qs->recv_max_data) { | 843 sc = qs->connection; |
877 qc->error = NGX_QUIC_ERR_FLOW_CONTROL_ERROR; | 844 |
845 rev = sc->read; | |
846 | |
847 if (rev->error) { | |
848 return NGX_OK; | |
849 } | |
850 | |
851 if (ngx_quic_control_flow(sc, last) != NGX_OK) { | |
878 return NGX_ERROR; | 852 return NGX_ERROR; |
879 } | 853 } |
880 | 854 |
881 if (qs->final_size != (uint64_t) -1 && last > qs->final_size) { | 855 if (qs->final_size != (uint64_t) -1 && last > qs->final_size) { |
882 qc->error = NGX_QUIC_ERR_FINAL_SIZE_ERROR; | 856 qc->error = NGX_QUIC_ERR_FINAL_SIZE_ERROR; |
883 return NGX_ERROR; | 857 return NGX_ERROR; |
884 } | 858 } |
885 | 859 |
886 if (last <= qs->recv_offset) { | 860 if (last <= qs->recv_offset) { |
887 return NGX_OK; | 861 return NGX_OK; |
888 } | |
889 | |
890 if (qs->recv_last < last) { | |
891 qs->recv_last = last; | |
892 } | 862 } |
893 | 863 |
894 if (f->offset < qs->recv_offset) { | 864 if (f->offset < qs->recv_offset) { |
895 ngx_quic_trim_bufs(frame->data, qs->recv_offset - f->offset); | 865 ngx_quic_trim_bufs(frame->data, qs->recv_offset - f->offset); |
896 f->offset = qs->recv_offset; | 866 f->offset = qs->recv_offset; |
897 } | 867 } |
898 | |
899 rev = qs->connection->read; | |
900 | 868 |
901 if (f->fin) { | 869 if (f->fin) { |
902 if (qs->final_size != (uint64_t) -1 && qs->final_size != last) { | 870 if (qs->final_size != (uint64_t) -1 && qs->final_size != last) { |
903 qc->error = NGX_QUIC_ERR_FINAL_SIZE_ERROR; | 871 qc->error = NGX_QUIC_ERR_FINAL_SIZE_ERROR; |
904 return NGX_ERROR; | 872 return NGX_ERROR; |
1106 | 1074 |
1107 ngx_int_t | 1075 ngx_int_t |
1108 ngx_quic_handle_reset_stream_frame(ngx_connection_t *c, | 1076 ngx_quic_handle_reset_stream_frame(ngx_connection_t *c, |
1109 ngx_quic_header_t *pkt, ngx_quic_reset_stream_frame_t *f) | 1077 ngx_quic_header_t *pkt, ngx_quic_reset_stream_frame_t *f) |
1110 { | 1078 { |
1079 ngx_pool_t *pool; | |
1111 ngx_event_t *rev; | 1080 ngx_event_t *rev; |
1112 ngx_connection_t *sc; | 1081 ngx_connection_t *sc; |
1113 ngx_quic_stream_t *qs; | 1082 ngx_quic_stream_t *qs; |
1114 ngx_quic_connection_t *qc; | 1083 ngx_quic_connection_t *qc; |
1115 | 1084 |
1133 | 1102 |
1134 if (qs == NGX_QUIC_STREAM_GONE) { | 1103 if (qs == NGX_QUIC_STREAM_GONE) { |
1135 return NGX_OK; | 1104 return NGX_OK; |
1136 } | 1105 } |
1137 | 1106 |
1138 qs->final_size = f->final_size; | |
1139 | |
1140 sc = qs->connection; | 1107 sc = qs->connection; |
1141 | 1108 |
1142 rev = sc->read; | 1109 rev = sc->read; |
1143 rev->error = 1; | 1110 rev->error = 1; |
1144 rev->ready = 1; | 1111 rev->ready = 1; |
1145 | 1112 |
1113 if (ngx_quic_control_flow(sc, f->final_size) != NGX_OK) { | |
1114 goto cleanup; | |
1115 } | |
1116 | |
1117 qs->final_size = f->final_size; | |
1118 | |
1119 if (ngx_quic_update_flow(sc, qs->final_size) != NGX_OK) { | |
1120 goto cleanup; | |
1121 } | |
1122 | |
1146 sc->listening->handler(sc); | 1123 sc->listening->handler(sc); |
1147 | 1124 |
1148 return NGX_OK; | 1125 return NGX_OK; |
1126 } | |
1127 | |
1128 sc = qs->connection; | |
1129 | |
1130 rev = sc->read; | |
1131 rev->error = 1; | |
1132 rev->ready = 1; | |
1133 | |
1134 if (ngx_quic_control_flow(sc, f->final_size) != NGX_OK) { | |
1135 return NGX_ERROR; | |
1149 } | 1136 } |
1150 | 1137 |
1151 if (qs->final_size != (uint64_t) -1 && qs->final_size != f->final_size) { | 1138 if (qs->final_size != (uint64_t) -1 && qs->final_size != f->final_size) { |
1152 qc->error = NGX_QUIC_ERR_FINAL_SIZE_ERROR; | 1139 qc->error = NGX_QUIC_ERR_FINAL_SIZE_ERROR; |
1153 return NGX_ERROR; | 1140 return NGX_ERROR; |
1158 return NGX_ERROR; | 1145 return NGX_ERROR; |
1159 } | 1146 } |
1160 | 1147 |
1161 qs->final_size = f->final_size; | 1148 qs->final_size = f->final_size; |
1162 | 1149 |
1163 rev = qs->connection->read; | 1150 if (ngx_quic_update_flow(sc, qs->final_size) != NGX_OK) { |
1164 rev->error = 1; | 1151 return NGX_ERROR; |
1165 rev->ready = 1; | 1152 } |
1166 | 1153 |
1167 if (rev->active) { | 1154 if (rev->active) { |
1168 rev->handler(rev); | 1155 rev->handler(rev); |
1169 } | 1156 } |
1170 | 1157 |
1171 return NGX_OK; | 1158 return NGX_OK; |
1159 | |
1160 cleanup: | |
1161 | |
1162 pool = sc->pool; | |
1163 | |
1164 ngx_close_connection(sc); | |
1165 ngx_destroy_pool(pool); | |
1166 | |
1167 return NGX_ERROR; | |
1172 } | 1168 } |
1173 | 1169 |
1174 | 1170 |
1175 ngx_int_t | 1171 ngx_int_t |
1176 ngx_quic_handle_stop_sending_frame(ngx_connection_t *c, | 1172 ngx_quic_handle_stop_sending_frame(ngx_connection_t *c, |
1283 | 1279 |
1284 ngx_log_debug3(NGX_LOG_DEBUG_EVENT, qs->connection->log, 0, | 1280 ngx_log_debug3(NGX_LOG_DEBUG_EVENT, qs->connection->log, 0, |
1285 "quic stream ack len:%uL acked:%uL unacked:%uL", | 1281 "quic stream ack len:%uL acked:%uL unacked:%uL", |
1286 f->u.stream.length, qs->acked, sent - qs->acked); | 1282 f->u.stream.length, qs->acked, sent - qs->acked); |
1287 } | 1283 } |
1284 | |
1285 | |
1286 static ngx_int_t | |
1287 ngx_quic_control_flow(ngx_connection_t *c, uint64_t last) | |
1288 { | |
1289 uint64_t len; | |
1290 ngx_event_t *rev; | |
1291 ngx_quic_stream_t *qs; | |
1292 ngx_quic_connection_t *qc; | |
1293 | |
1294 rev = c->read; | |
1295 qs = c->quic; | |
1296 qc = ngx_quic_get_connection(qs->parent); | |
1297 | |
1298 if (last <= qs->recv_last) { | |
1299 return NGX_OK; | |
1300 } | |
1301 | |
1302 len = last - qs->recv_last; | |
1303 | |
1304 ngx_log_debug4(NGX_LOG_DEBUG_EVENT, c->log, 0, | |
1305 "quic flow control msd:%uL/%uL md:%uL/%uL", | |
1306 last, qs->recv_max_data, qc->streams.recv_last + len, | |
1307 qc->streams.recv_max_data); | |
1308 | |
1309 qs->recv_last += len; | |
1310 | |
1311 if (!rev->error && qs->recv_last > qs->recv_max_data) { | |
1312 qc->error = NGX_QUIC_ERR_FLOW_CONTROL_ERROR; | |
1313 return NGX_ERROR; | |
1314 } | |
1315 | |
1316 qc->streams.recv_last += len; | |
1317 | |
1318 if (qc->streams.recv_last > qc->streams.recv_max_data) { | |
1319 qc->error = NGX_QUIC_ERR_FLOW_CONTROL_ERROR; | |
1320 return NGX_ERROR; | |
1321 } | |
1322 | |
1323 return NGX_OK; | |
1324 } | |
1325 | |
1326 | |
1327 static ngx_int_t | |
1328 ngx_quic_update_flow(ngx_connection_t *c, uint64_t last) | |
1329 { | |
1330 uint64_t len; | |
1331 ngx_event_t *rev; | |
1332 ngx_connection_t *pc; | |
1333 ngx_quic_frame_t *frame; | |
1334 ngx_quic_stream_t *qs; | |
1335 ngx_quic_connection_t *qc; | |
1336 | |
1337 rev = c->read; | |
1338 qs = c->quic; | |
1339 pc = qs->parent; | |
1340 qc = ngx_quic_get_connection(pc); | |
1341 | |
1342 if (last <= qs->recv_offset) { | |
1343 return NGX_OK; | |
1344 } | |
1345 | |
1346 len = last - qs->recv_offset; | |
1347 | |
1348 ngx_log_debug1(NGX_LOG_DEBUG_EVENT, c->log, 0, | |
1349 "quic flow update %uL", last); | |
1350 | |
1351 qs->recv_offset += len; | |
1352 | |
1353 if (!rev->pending_eof && !rev->error | |
1354 && qs->recv_max_data <= qs->recv_offset + qs->recv_window / 2) | |
1355 { | |
1356 qs->recv_max_data = qs->recv_offset + qs->recv_window; | |
1357 | |
1358 ngx_log_debug1(NGX_LOG_DEBUG_EVENT, c->log, 0, | |
1359 "quic flow update msd:%uL", qs->recv_max_data); | |
1360 | |
1361 frame = ngx_quic_alloc_frame(pc); | |
1362 if (frame == NULL) { | |
1363 return NGX_ERROR; | |
1364 } | |
1365 | |
1366 frame->level = ssl_encryption_application; | |
1367 frame->type = NGX_QUIC_FT_MAX_STREAM_DATA; | |
1368 frame->u.max_stream_data.id = qs->id; | |
1369 frame->u.max_stream_data.limit = qs->recv_max_data; | |
1370 | |
1371 ngx_quic_queue_frame(qc, frame); | |
1372 } | |
1373 | |
1374 qc->streams.recv_offset += len; | |
1375 | |
1376 if (qc->streams.recv_max_data | |
1377 <= qc->streams.recv_offset + qc->streams.recv_window / 2) | |
1378 { | |
1379 qc->streams.recv_max_data = qc->streams.recv_offset | |
1380 + qc->streams.recv_window; | |
1381 | |
1382 ngx_log_debug1(NGX_LOG_DEBUG_EVENT, pc->log, 0, | |
1383 "quic flow update md:%uL", qc->streams.recv_max_data); | |
1384 | |
1385 frame = ngx_quic_alloc_frame(pc); | |
1386 if (frame == NULL) { | |
1387 return NGX_ERROR; | |
1388 } | |
1389 | |
1390 frame->level = ssl_encryption_application; | |
1391 frame->type = NGX_QUIC_FT_MAX_DATA; | |
1392 frame->u.max_data.max_data = qc->streams.recv_max_data; | |
1393 | |
1394 ngx_quic_queue_frame(qc, frame); | |
1395 } | |
1396 | |
1397 return NGX_OK; | |
1398 } |