comparison src/event/quic/ngx_event_quic_streams.c @ 8791:af33d1ef1c3c quic

QUIC: stream flow control refactored. - Function ngx_quic_control_flow() is introduced. This functions does both MAX_DATA and MAX_STREAM_DATA flow controls. The function is called from STREAM and RESET_STREAM frame handlers. Previously, flow control was only accounted for STREAM. Also, MAX_DATA flow control was not accounted at all. - Function ngx_quic_update_flow() is introduced. This function advances flow control windows and sends MAX_DATA/MAX_STREAM_DATA. The function is called from RESET_STREAM frame handler, stream cleanup handler and stream recv() handler.
author Roman Arutyunyan <arut@nginx.com>
date Mon, 07 Jun 2021 10:12:46 +0300
parents 7d32c3c93678
children 4715f3e669f1
comparison
equal deleted inserted replaced
8790:ac0398da8f23 8791:af33d1ef1c3c
23 size_t size); 23 size_t size);
24 static ngx_chain_t *ngx_quic_stream_send_chain(ngx_connection_t *c, 24 static ngx_chain_t *ngx_quic_stream_send_chain(ngx_connection_t *c,
25 ngx_chain_t *in, off_t limit); 25 ngx_chain_t *in, off_t limit);
26 static size_t ngx_quic_max_stream_flow(ngx_connection_t *c); 26 static size_t ngx_quic_max_stream_flow(ngx_connection_t *c);
27 static void ngx_quic_stream_cleanup_handler(void *data); 27 static void ngx_quic_stream_cleanup_handler(void *data);
28 static ngx_int_t ngx_quic_control_flow(ngx_connection_t *c, uint64_t last);
29 static ngx_int_t ngx_quic_update_flow(ngx_connection_t *c, uint64_t last);
28 30
29 31
30 ngx_connection_t * 32 ngx_connection_t *
31 ngx_quic_open_stream(ngx_connection_t *c, ngx_uint_t bidi) 33 ngx_quic_open_stream(ngx_connection_t *c, ngx_uint_t bidi)
32 { 34 {
411 qs->send_max_data = qc->ctp.initial_max_stream_data_bidi_local; 413 qs->send_max_data = qc->ctp.initial_max_stream_data_bidi_local;
412 qs->recv_max_data = qc->tp.initial_max_stream_data_bidi_remote; 414 qs->recv_max_data = qc->tp.initial_max_stream_data_bidi_remote;
413 } 415 }
414 } 416 }
415 417
418 qs->recv_window = qs->recv_max_data;
419
416 cln = ngx_pool_cleanup_add(pool, 0); 420 cln = ngx_pool_cleanup_add(pool, 0);
417 if (cln == NULL) { 421 if (cln == NULL) {
418 ngx_close_connection(sc); 422 ngx_close_connection(sc);
419 ngx_destroy_pool(pool); 423 ngx_destroy_pool(pool);
420 return NULL; 424 return NULL;
430 434
431 435
432 static ssize_t 436 static ssize_t
433 ngx_quic_stream_recv(ngx_connection_t *c, u_char *buf, size_t size) 437 ngx_quic_stream_recv(ngx_connection_t *c, u_char *buf, size_t size)
434 { 438 {
435 ssize_t len, n; 439 ssize_t len, n;
436 ngx_buf_t *b; 440 ngx_buf_t *b;
437 ngx_chain_t *cl, **ll; 441 ngx_chain_t *cl, **ll;
438 ngx_event_t *rev; 442 ngx_event_t *rev;
439 ngx_connection_t *pc; 443 ngx_connection_t *pc;
440 ngx_quic_frame_t *frame; 444 ngx_quic_stream_t *qs;
441 ngx_quic_stream_t *qs;
442 ngx_quic_connection_t *qc;
443 445
444 qs = c->quic; 446 qs = c->quic;
445 pc = qs->parent; 447 pc = qs->parent;
446 qc = ngx_quic_get_connection(pc);
447 rev = c->read; 448 rev = c->read;
448 449
449 if (rev->error) { 450 if (rev->error) {
450 return NGX_ERROR; 451 return NGX_ERROR;
451 } 452 }
493 qs->in = *ll; 494 qs->in = *ll;
494 *ll = NULL; 495 *ll = NULL;
495 496
496 ngx_quic_free_bufs(pc, cl); 497 ngx_quic_free_bufs(pc, cl);
497 498
498 qc->streams.received += len;
499 qs->recv_max_data += len;
500 qs->recv_offset += len;
501
502 if (qs->in == NULL) { 499 if (qs->in == NULL) {
503 rev->ready = rev->pending_eof; 500 rev->ready = rev->pending_eof;
504 } 501 }
505 502
506 ngx_log_debug2(NGX_LOG_DEBUG_EVENT, c->log, 0, 503 ngx_log_debug2(NGX_LOG_DEBUG_EVENT, c->log, 0,
507 "quic stream id:0x%xL recv len:%z", qs->id, len); 504 "quic stream id:0x%xL recv len:%z", qs->id, len);
508 505
509 if (!rev->pending_eof) { 506 if (ngx_quic_update_flow(c, qs->recv_offset + len) != NGX_OK) {
510 frame = ngx_quic_alloc_frame(pc); 507 return NGX_ERROR;
511 if (frame == NULL) {
512 return NGX_ERROR;
513 }
514
515 frame->level = ssl_encryption_application;
516 frame->type = NGX_QUIC_FT_MAX_STREAM_DATA;
517 frame->u.max_stream_data.id = qs->id;
518 frame->u.max_stream_data.limit = qs->recv_max_data;
519
520 ngx_quic_queue_frame(qc, frame);
521 }
522
523 if ((qc->streams.recv_max_data / 2) < qc->streams.received) {
524
525 frame = ngx_quic_alloc_frame(pc);
526
527 if (frame == NULL) {
528 return NGX_ERROR;
529 }
530
531 qc->streams.recv_max_data *= 2;
532
533 frame->level = ssl_encryption_application;
534 frame->type = NGX_QUIC_FT_MAX_DATA;
535 frame->u.max_data.max_data = qc->streams.recv_max_data;
536
537 ngx_quic_queue_frame(qc, frame);
538
539 ngx_log_debug2(NGX_LOG_DEBUG_EVENT, c->log, 0,
540 "quic stream id:0x%xL recv: increased max_data:%uL",
541 qs->id, qc->streams.recv_max_data);
542 } 508 }
543 509
544 return len; 510 return len;
545 } 511 }
546 512
726 } 692 }
727 693
728 if (qc->error) { 694 if (qc->error) {
729 goto done; 695 goto done;
730 } 696 }
697
698 c->read->pending_eof = 1;
699
700 (void) ngx_quic_update_flow(c, qs->recv_last);
731 701
732 if ((qs->id & NGX_QUIC_STREAM_SERVER_INITIATED) == 0 702 if ((qs->id & NGX_QUIC_STREAM_SERVER_INITIATED) == 0
733 || (qs->id & NGX_QUIC_STREAM_UNIDIRECTIONAL) == 0) 703 || (qs->id & NGX_QUIC_STREAM_UNIDIRECTIONAL) == 0)
734 { 704 {
735 if (!c->read->pending_eof && !c->read->error) { 705 if (!c->read->pending_eof && !c->read->error) {
846 return NGX_OK; 816 return NGX_OK;
847 } 817 }
848 818
849 sc = qs->connection; 819 sc = qs->connection;
850 820
851 if (last > qs->recv_max_data) { 821 if (ngx_quic_control_flow(sc, last) != NGX_OK) {
852 qc->error = NGX_QUIC_ERR_FLOW_CONTROL_ERROR;
853 goto cleanup; 822 goto cleanup;
854 } 823 }
855 824
856 if (f->fin) { 825 if (f->fin) {
857 sc->read->pending_eof = 1; 826 sc->read->pending_eof = 1;
858 qs->final_size = last; 827 qs->final_size = last;
859 } 828 }
860 829
861 qs->recv_last = last;
862
863 if (f->offset == 0) { 830 if (f->offset == 0) {
864 sc->read->ready = 1; 831 sc->read->ready = 1;
865 } 832 }
866 833
867 if (ngx_quic_order_bufs(c, &qs->in, frame->data, f->offset) != NGX_OK) { 834 if (ngx_quic_order_bufs(c, &qs->in, frame->data, f->offset) != NGX_OK) {
871 sc->listening->handler(sc); 838 sc->listening->handler(sc);
872 839
873 return NGX_OK; 840 return NGX_OK;
874 } 841 }
875 842
876 if (last > qs->recv_max_data) { 843 sc = qs->connection;
877 qc->error = NGX_QUIC_ERR_FLOW_CONTROL_ERROR; 844
845 rev = sc->read;
846
847 if (rev->error) {
848 return NGX_OK;
849 }
850
851 if (ngx_quic_control_flow(sc, last) != NGX_OK) {
878 return NGX_ERROR; 852 return NGX_ERROR;
879 } 853 }
880 854
881 if (qs->final_size != (uint64_t) -1 && last > qs->final_size) { 855 if (qs->final_size != (uint64_t) -1 && last > qs->final_size) {
882 qc->error = NGX_QUIC_ERR_FINAL_SIZE_ERROR; 856 qc->error = NGX_QUIC_ERR_FINAL_SIZE_ERROR;
883 return NGX_ERROR; 857 return NGX_ERROR;
884 } 858 }
885 859
886 if (last <= qs->recv_offset) { 860 if (last <= qs->recv_offset) {
887 return NGX_OK; 861 return NGX_OK;
888 }
889
890 if (qs->recv_last < last) {
891 qs->recv_last = last;
892 } 862 }
893 863
894 if (f->offset < qs->recv_offset) { 864 if (f->offset < qs->recv_offset) {
895 ngx_quic_trim_bufs(frame->data, qs->recv_offset - f->offset); 865 ngx_quic_trim_bufs(frame->data, qs->recv_offset - f->offset);
896 f->offset = qs->recv_offset; 866 f->offset = qs->recv_offset;
897 } 867 }
898
899 rev = qs->connection->read;
900 868
901 if (f->fin) { 869 if (f->fin) {
902 if (qs->final_size != (uint64_t) -1 && qs->final_size != last) { 870 if (qs->final_size != (uint64_t) -1 && qs->final_size != last) {
903 qc->error = NGX_QUIC_ERR_FINAL_SIZE_ERROR; 871 qc->error = NGX_QUIC_ERR_FINAL_SIZE_ERROR;
904 return NGX_ERROR; 872 return NGX_ERROR;
1106 1074
1107 ngx_int_t 1075 ngx_int_t
1108 ngx_quic_handle_reset_stream_frame(ngx_connection_t *c, 1076 ngx_quic_handle_reset_stream_frame(ngx_connection_t *c,
1109 ngx_quic_header_t *pkt, ngx_quic_reset_stream_frame_t *f) 1077 ngx_quic_header_t *pkt, ngx_quic_reset_stream_frame_t *f)
1110 { 1078 {
1079 ngx_pool_t *pool;
1111 ngx_event_t *rev; 1080 ngx_event_t *rev;
1112 ngx_connection_t *sc; 1081 ngx_connection_t *sc;
1113 ngx_quic_stream_t *qs; 1082 ngx_quic_stream_t *qs;
1114 ngx_quic_connection_t *qc; 1083 ngx_quic_connection_t *qc;
1115 1084
1133 1102
1134 if (qs == NGX_QUIC_STREAM_GONE) { 1103 if (qs == NGX_QUIC_STREAM_GONE) {
1135 return NGX_OK; 1104 return NGX_OK;
1136 } 1105 }
1137 1106
1138 qs->final_size = f->final_size;
1139
1140 sc = qs->connection; 1107 sc = qs->connection;
1141 1108
1142 rev = sc->read; 1109 rev = sc->read;
1143 rev->error = 1; 1110 rev->error = 1;
1144 rev->ready = 1; 1111 rev->ready = 1;
1145 1112
1113 if (ngx_quic_control_flow(sc, f->final_size) != NGX_OK) {
1114 goto cleanup;
1115 }
1116
1117 qs->final_size = f->final_size;
1118
1119 if (ngx_quic_update_flow(sc, qs->final_size) != NGX_OK) {
1120 goto cleanup;
1121 }
1122
1146 sc->listening->handler(sc); 1123 sc->listening->handler(sc);
1147 1124
1148 return NGX_OK; 1125 return NGX_OK;
1126 }
1127
1128 sc = qs->connection;
1129
1130 rev = sc->read;
1131 rev->error = 1;
1132 rev->ready = 1;
1133
1134 if (ngx_quic_control_flow(sc, f->final_size) != NGX_OK) {
1135 return NGX_ERROR;
1149 } 1136 }
1150 1137
1151 if (qs->final_size != (uint64_t) -1 && qs->final_size != f->final_size) { 1138 if (qs->final_size != (uint64_t) -1 && qs->final_size != f->final_size) {
1152 qc->error = NGX_QUIC_ERR_FINAL_SIZE_ERROR; 1139 qc->error = NGX_QUIC_ERR_FINAL_SIZE_ERROR;
1153 return NGX_ERROR; 1140 return NGX_ERROR;
1158 return NGX_ERROR; 1145 return NGX_ERROR;
1159 } 1146 }
1160 1147
1161 qs->final_size = f->final_size; 1148 qs->final_size = f->final_size;
1162 1149
1163 rev = qs->connection->read; 1150 if (ngx_quic_update_flow(sc, qs->final_size) != NGX_OK) {
1164 rev->error = 1; 1151 return NGX_ERROR;
1165 rev->ready = 1; 1152 }
1166 1153
1167 if (rev->active) { 1154 if (rev->active) {
1168 rev->handler(rev); 1155 rev->handler(rev);
1169 } 1156 }
1170 1157
1171 return NGX_OK; 1158 return NGX_OK;
1159
1160 cleanup:
1161
1162 pool = sc->pool;
1163
1164 ngx_close_connection(sc);
1165 ngx_destroy_pool(pool);
1166
1167 return NGX_ERROR;
1172 } 1168 }
1173 1169
1174 1170
1175 ngx_int_t 1171 ngx_int_t
1176 ngx_quic_handle_stop_sending_frame(ngx_connection_t *c, 1172 ngx_quic_handle_stop_sending_frame(ngx_connection_t *c,
1283 1279
1284 ngx_log_debug3(NGX_LOG_DEBUG_EVENT, qs->connection->log, 0, 1280 ngx_log_debug3(NGX_LOG_DEBUG_EVENT, qs->connection->log, 0,
1285 "quic stream ack len:%uL acked:%uL unacked:%uL", 1281 "quic stream ack len:%uL acked:%uL unacked:%uL",
1286 f->u.stream.length, qs->acked, sent - qs->acked); 1282 f->u.stream.length, qs->acked, sent - qs->acked);
1287 } 1283 }
1284
1285
1286 static ngx_int_t
1287 ngx_quic_control_flow(ngx_connection_t *c, uint64_t last)
1288 {
1289 uint64_t len;
1290 ngx_event_t *rev;
1291 ngx_quic_stream_t *qs;
1292 ngx_quic_connection_t *qc;
1293
1294 rev = c->read;
1295 qs = c->quic;
1296 qc = ngx_quic_get_connection(qs->parent);
1297
1298 if (last <= qs->recv_last) {
1299 return NGX_OK;
1300 }
1301
1302 len = last - qs->recv_last;
1303
1304 ngx_log_debug4(NGX_LOG_DEBUG_EVENT, c->log, 0,
1305 "quic flow control msd:%uL/%uL md:%uL/%uL",
1306 last, qs->recv_max_data, qc->streams.recv_last + len,
1307 qc->streams.recv_max_data);
1308
1309 qs->recv_last += len;
1310
1311 if (!rev->error && qs->recv_last > qs->recv_max_data) {
1312 qc->error = NGX_QUIC_ERR_FLOW_CONTROL_ERROR;
1313 return NGX_ERROR;
1314 }
1315
1316 qc->streams.recv_last += len;
1317
1318 if (qc->streams.recv_last > qc->streams.recv_max_data) {
1319 qc->error = NGX_QUIC_ERR_FLOW_CONTROL_ERROR;
1320 return NGX_ERROR;
1321 }
1322
1323 return NGX_OK;
1324 }
1325
1326
1327 static ngx_int_t
1328 ngx_quic_update_flow(ngx_connection_t *c, uint64_t last)
1329 {
1330 uint64_t len;
1331 ngx_event_t *rev;
1332 ngx_connection_t *pc;
1333 ngx_quic_frame_t *frame;
1334 ngx_quic_stream_t *qs;
1335 ngx_quic_connection_t *qc;
1336
1337 rev = c->read;
1338 qs = c->quic;
1339 pc = qs->parent;
1340 qc = ngx_quic_get_connection(pc);
1341
1342 if (last <= qs->recv_offset) {
1343 return NGX_OK;
1344 }
1345
1346 len = last - qs->recv_offset;
1347
1348 ngx_log_debug1(NGX_LOG_DEBUG_EVENT, c->log, 0,
1349 "quic flow update %uL", last);
1350
1351 qs->recv_offset += len;
1352
1353 if (!rev->pending_eof && !rev->error
1354 && qs->recv_max_data <= qs->recv_offset + qs->recv_window / 2)
1355 {
1356 qs->recv_max_data = qs->recv_offset + qs->recv_window;
1357
1358 ngx_log_debug1(NGX_LOG_DEBUG_EVENT, c->log, 0,
1359 "quic flow update msd:%uL", qs->recv_max_data);
1360
1361 frame = ngx_quic_alloc_frame(pc);
1362 if (frame == NULL) {
1363 return NGX_ERROR;
1364 }
1365
1366 frame->level = ssl_encryption_application;
1367 frame->type = NGX_QUIC_FT_MAX_STREAM_DATA;
1368 frame->u.max_stream_data.id = qs->id;
1369 frame->u.max_stream_data.limit = qs->recv_max_data;
1370
1371 ngx_quic_queue_frame(qc, frame);
1372 }
1373
1374 qc->streams.recv_offset += len;
1375
1376 if (qc->streams.recv_max_data
1377 <= qc->streams.recv_offset + qc->streams.recv_window / 2)
1378 {
1379 qc->streams.recv_max_data = qc->streams.recv_offset
1380 + qc->streams.recv_window;
1381
1382 ngx_log_debug1(NGX_LOG_DEBUG_EVENT, pc->log, 0,
1383 "quic flow update md:%uL", qc->streams.recv_max_data);
1384
1385 frame = ngx_quic_alloc_frame(pc);
1386 if (frame == NULL) {
1387 return NGX_ERROR;
1388 }
1389
1390 frame->level = ssl_encryption_application;
1391 frame->type = NGX_QUIC_FT_MAX_DATA;
1392 frame->u.max_data.max_data = qc->streams.recv_max_data;
1393
1394 ngx_quic_queue_frame(qc, frame);
1395 }
1396
1397 return NGX_OK;
1398 }