comparison src/event/quic/ngx_event_quic_streams.c @ 8779:f52a2b77d406 quic

QUIC: generic buffering for stream input. Previously each stream had an input buffer. Now memory is allocated as bytes arrive. Generic buffering mechanism is used for this.
author Roman Arutyunyan <arut@nginx.com>
date Wed, 05 May 2021 17:15:20 +0300
parents 225e9f1dfe7c
children b3f6ad181df4
comparison
equal deleted inserted replaced
8778:5186ee5a94b9 8779:f52a2b77d406
14 14
15 15
16 static ngx_quic_stream_t *ngx_quic_create_client_stream(ngx_connection_t *c, 16 static ngx_quic_stream_t *ngx_quic_create_client_stream(ngx_connection_t *c,
17 uint64_t id); 17 uint64_t id);
18 static ngx_quic_stream_t *ngx_quic_create_stream(ngx_connection_t *c, 18 static ngx_quic_stream_t *ngx_quic_create_stream(ngx_connection_t *c,
19 uint64_t id, size_t rcvbuf_size); 19 uint64_t id);
20 static ssize_t ngx_quic_stream_recv(ngx_connection_t *c, u_char *buf, 20 static ssize_t ngx_quic_stream_recv(ngx_connection_t *c, u_char *buf,
21 size_t size); 21 size_t size);
22 static ssize_t ngx_quic_stream_send(ngx_connection_t *c, u_char *buf, 22 static ssize_t ngx_quic_stream_send(ngx_connection_t *c, u_char *buf,
23 size_t size); 23 size_t size);
24 static ngx_chain_t *ngx_quic_stream_send_chain(ngx_connection_t *c, 24 static ngx_chain_t *ngx_quic_stream_send_chain(ngx_connection_t *c,
28 28
29 29
30 ngx_connection_t * 30 ngx_connection_t *
31 ngx_quic_open_stream(ngx_connection_t *c, ngx_uint_t bidi) 31 ngx_quic_open_stream(ngx_connection_t *c, ngx_uint_t bidi)
32 { 32 {
33 size_t rcvbuf_size;
34 uint64_t id; 33 uint64_t id;
35 ngx_quic_stream_t *qs, *nqs; 34 ngx_quic_stream_t *qs, *nqs;
36 ngx_quic_connection_t *qc; 35 ngx_quic_connection_t *qc;
37 36
38 qs = c->quic; 37 qs = c->quic;
56 " streams:%uL max:%uL id:0x%xL", 55 " streams:%uL max:%uL id:0x%xL",
57 qc->streams.server_streams_bidi, 56 qc->streams.server_streams_bidi,
58 qc->streams.server_max_streams_bidi, id); 57 qc->streams.server_max_streams_bidi, id);
59 58
60 qc->streams.server_streams_bidi++; 59 qc->streams.server_streams_bidi++;
61 rcvbuf_size = qc->tp.initial_max_stream_data_bidi_local;
62 60
63 } else { 61 } else {
64 if (qc->streams.server_streams_uni 62 if (qc->streams.server_streams_uni
65 >= qc->streams.server_max_streams_uni) 63 >= qc->streams.server_max_streams_uni)
66 { 64 {
79 " streams:%uL max:%uL id:0x%xL", 77 " streams:%uL max:%uL id:0x%xL",
80 qc->streams.server_streams_uni, 78 qc->streams.server_streams_uni,
81 qc->streams.server_max_streams_uni, id); 79 qc->streams.server_max_streams_uni, id);
82 80
83 qc->streams.server_streams_uni++; 81 qc->streams.server_streams_uni++;
84 rcvbuf_size = 0; 82 }
85 } 83
86 84 nqs = ngx_quic_create_stream(qs->parent, id);
87 nqs = ngx_quic_create_stream(qs->parent, id, rcvbuf_size);
88 if (nqs == NULL) { 85 if (nqs == NULL) {
89 return NULL; 86 return NULL;
90 } 87 }
91 88
92 return nqs->connection; 89 return nqs->connection;
233 230
234 231
235 static ngx_quic_stream_t * 232 static ngx_quic_stream_t *
236 ngx_quic_create_client_stream(ngx_connection_t *c, uint64_t id) 233 ngx_quic_create_client_stream(ngx_connection_t *c, uint64_t id)
237 { 234 {
238 size_t n;
239 uint64_t min_id; 235 uint64_t min_id;
240 ngx_quic_stream_t *qs; 236 ngx_quic_stream_t *qs;
241 ngx_quic_connection_t *qc; 237 ngx_quic_connection_t *qc;
242 238
243 ngx_log_debug1(NGX_LOG_DEBUG_EVENT, c->log, 0, 239 ngx_log_debug1(NGX_LOG_DEBUG_EVENT, c->log, 0,
270 } 266 }
271 267
272 min_id = (qc->streams.client_streams_uni << 2) 268 min_id = (qc->streams.client_streams_uni << 2)
273 | NGX_QUIC_STREAM_UNIDIRECTIONAL; 269 | NGX_QUIC_STREAM_UNIDIRECTIONAL;
274 qc->streams.client_streams_uni = (id >> 2) + 1; 270 qc->streams.client_streams_uni = (id >> 2) + 1;
275 n = qc->tp.initial_max_stream_data_uni;
276 271
277 } else { 272 } else {
278 273
279 if (id & NGX_QUIC_STREAM_SERVER_INITIATED) { 274 if (id & NGX_QUIC_STREAM_SERVER_INITIATED) {
280 if ((id >> 2) < qc->streams.server_streams_bidi) { 275 if ((id >> 2) < qc->streams.server_streams_bidi) {
294 return NULL; 289 return NULL;
295 } 290 }
296 291
297 min_id = (qc->streams.client_streams_bidi << 2); 292 min_id = (qc->streams.client_streams_bidi << 2);
298 qc->streams.client_streams_bidi = (id >> 2) + 1; 293 qc->streams.client_streams_bidi = (id >> 2) + 1;
299 n = qc->tp.initial_max_stream_data_bidi_remote;
300 }
301
302 if (n < NGX_QUIC_STREAM_BUFSIZE) {
303 n = NGX_QUIC_STREAM_BUFSIZE;
304 } 294 }
305 295
306 /* 296 /*
307 * 2.1. Stream Types and Identifiers 297 * 2.1. Stream Types and Identifiers
308 * 298 *
312 * opened. 302 * opened.
313 */ 303 */
314 304
315 for ( /* void */ ; min_id < id; min_id += 0x04) { 305 for ( /* void */ ; min_id < id; min_id += 0x04) {
316 306
317 qs = ngx_quic_create_stream(c, min_id, n); 307 qs = ngx_quic_create_stream(c, min_id);
318 if (qs == NULL) { 308 if (qs == NULL) {
319 return NULL; 309 return NULL;
320 } 310 }
321 311
322 qs->connection->listening->handler(qs->connection); 312 qs->connection->listening->handler(qs->connection);
324 if (qc->shutdown) { 314 if (qc->shutdown) {
325 return NGX_QUIC_STREAM_GONE; 315 return NGX_QUIC_STREAM_GONE;
326 } 316 }
327 } 317 }
328 318
329 return ngx_quic_create_stream(c, id, n); 319 return ngx_quic_create_stream(c, id);
330 } 320 }
331 321
332 322
333 static ngx_quic_stream_t * 323 static ngx_quic_stream_t *
334 ngx_quic_create_stream(ngx_connection_t *c, uint64_t id, size_t rcvbuf_size) 324 ngx_quic_create_stream(ngx_connection_t *c, uint64_t id)
335 { 325 {
336 ngx_log_t *log; 326 ngx_log_t *log;
337 ngx_pool_t *pool; 327 ngx_pool_t *pool;
338 ngx_connection_t *sc; 328 ngx_connection_t *sc;
339 ngx_quic_stream_t *qs; 329 ngx_quic_stream_t *qs;
357 } 347 }
358 348
359 qs->node.key = id; 349 qs->node.key = id;
360 qs->parent = c; 350 qs->parent = c;
361 qs->id = id; 351 qs->id = id;
362
363 qs->b = ngx_create_temp_buf(pool, rcvbuf_size);
364 if (qs->b == NULL) {
365 ngx_destroy_pool(pool);
366 return NULL;
367 }
368 352
369 qs->fs = ngx_pcalloc(pool, sizeof(ngx_quic_frames_stream_t)); 353 qs->fs = ngx_pcalloc(pool, sizeof(ngx_quic_frames_stream_t));
370 if (qs->fs == NULL) { 354 if (qs->fs == NULL) {
371 ngx_destroy_pool(pool); 355 ngx_destroy_pool(pool);
372 return NULL; 356 return NULL;
418 } 402 }
419 403
420 if (id & NGX_QUIC_STREAM_UNIDIRECTIONAL) { 404 if (id & NGX_QUIC_STREAM_UNIDIRECTIONAL) {
421 if (id & NGX_QUIC_STREAM_SERVER_INITIATED) { 405 if (id & NGX_QUIC_STREAM_SERVER_INITIATED) {
422 qs->send_max_data = qc->ctp.initial_max_stream_data_uni; 406 qs->send_max_data = qc->ctp.initial_max_stream_data_uni;
407
408 } else {
409 qs->recv_max_data = qc->tp.initial_max_stream_data_uni;
423 } 410 }
424 411
425 } else { 412 } else {
426 if (id & NGX_QUIC_STREAM_SERVER_INITIATED) { 413 if (id & NGX_QUIC_STREAM_SERVER_INITIATED) {
427 qs->send_max_data = qc->ctp.initial_max_stream_data_bidi_remote; 414 qs->send_max_data = qc->ctp.initial_max_stream_data_bidi_remote;
415 qs->recv_max_data = qc->tp.initial_max_stream_data_bidi_local;
416
428 } else { 417 } else {
429 qs->send_max_data = qc->ctp.initial_max_stream_data_bidi_local; 418 qs->send_max_data = qc->ctp.initial_max_stream_data_bidi_local;
419 qs->recv_max_data = qc->tp.initial_max_stream_data_bidi_remote;
430 } 420 }
431 } 421 }
432 422
433 cln = ngx_pool_cleanup_add(pool, 0); 423 cln = ngx_pool_cleanup_add(pool, 0);
434 if (cln == NULL) { 424 if (cln == NULL) {
447 437
448 438
449 static ssize_t 439 static ssize_t
450 ngx_quic_stream_recv(ngx_connection_t *c, u_char *buf, size_t size) 440 ngx_quic_stream_recv(ngx_connection_t *c, u_char *buf, size_t size)
451 { 441 {
452 ssize_t len; 442 ssize_t len, n;
453 ngx_buf_t *b; 443 ngx_buf_t *b;
444 ngx_chain_t *cl, **ll;
454 ngx_event_t *rev; 445 ngx_event_t *rev;
455 ngx_connection_t *pc; 446 ngx_connection_t *pc;
456 ngx_quic_frame_t *frame; 447 ngx_quic_frame_t *frame;
457 ngx_quic_stream_t *qs; 448 ngx_quic_stream_t *qs;
458 ngx_quic_connection_t *qc; 449 ngx_quic_connection_t *qc;
459 450
460 qs = c->quic; 451 qs = c->quic;
461 b = qs->b;
462 pc = qs->parent; 452 pc = qs->parent;
463 qc = ngx_quic_get_connection(pc); 453 qc = ngx_quic_get_connection(pc);
464 rev = c->read; 454 rev = c->read;
465 455
466 if (rev->error) { 456 if (rev->error) {
467 return NGX_ERROR; 457 return NGX_ERROR;
468 } 458 }
469 459
470 ngx_log_debug3(NGX_LOG_DEBUG_EVENT, c->log, 0, 460 ngx_log_debug2(NGX_LOG_DEBUG_EVENT, c->log, 0,
471 "quic stream recv id:0x%xL eof:%d avail:%z", 461 "quic stream recv id:0x%xL eof:%d",
472 qs->id, rev->pending_eof, b->last - b->pos); 462 qs->id, rev->pending_eof);
473 463
474 if (b->pos == b->last) { 464 if (qs->in == NULL) {
475 rev->ready = 0; 465 rev->ready = 0;
476 466
477 if (rev->pending_eof) { 467 if (rev->pending_eof) {
478 rev->eof = 1; 468 rev->eof = 1;
479 return 0; 469 return 0;
482 ngx_log_debug1(NGX_LOG_DEBUG_EVENT, c->log, 0, 472 ngx_log_debug1(NGX_LOG_DEBUG_EVENT, c->log, 0,
483 "quic stream id:0x%xL recv() not ready", qs->id); 473 "quic stream id:0x%xL recv() not ready", qs->id);
484 return NGX_AGAIN; 474 return NGX_AGAIN;
485 } 475 }
486 476
487 len = ngx_min(b->last - b->pos, (ssize_t) size); 477 len = 0;
488 478 cl = qs->in;
489 ngx_memcpy(buf, b->pos, len); 479
490 480 for (ll = &cl; *ll; ll = &(*ll)->next) {
491 b->pos += len; 481 b = (*ll)->buf;
482
483 n = ngx_min(b->last - b->pos, (ssize_t) size);
484 buf = ngx_cpymem(buf, b->pos, n);
485
486 len += n;
487 size -= n;
488 b->pos += n;
489
490 if (b->pos != b->last) {
491 break;
492 }
493 }
494
495 qs->in = *ll;
496 *ll = NULL;
497
498 ngx_quic_free_bufs(pc, cl);
499
492 qc->streams.received += len; 500 qc->streams.received += len;
493 501 qs->recv_max_data += len;
494 if (b->pos == b->last) { 502
495 b->pos = b->start; 503 if (qs->in == NULL) {
496 b->last = b->start;
497 rev->ready = rev->pending_eof; 504 rev->ready = rev->pending_eof;
498 } 505 }
499 506
500 ngx_log_debug3(NGX_LOG_DEBUG_EVENT, c->log, 0, 507 ngx_log_debug3(NGX_LOG_DEBUG_EVENT, c->log, 0,
501 "quic stream id:0x%xL recv len:%z of size:%uz", 508 "quic stream id:0x%xL recv len:%z of size:%uz",
508 } 515 }
509 516
510 frame->level = ssl_encryption_application; 517 frame->level = ssl_encryption_application;
511 frame->type = NGX_QUIC_FT_MAX_STREAM_DATA; 518 frame->type = NGX_QUIC_FT_MAX_STREAM_DATA;
512 frame->u.max_stream_data.id = qs->id; 519 frame->u.max_stream_data.id = qs->id;
513 frame->u.max_stream_data.limit = qs->fs->received + (b->pos - b->start) 520 frame->u.max_stream_data.limit = qs->recv_max_data;
514 + (b->end - b->last);
515 521
516 ngx_quic_queue_frame(qc, frame); 522 ngx_quic_queue_frame(qc, frame);
517 } 523 }
518 524
519 if ((qc->streams.recv_max_data / 2) < qc->streams.received) { 525 if ((qc->streams.recv_max_data / 2) < qc->streams.received) {
712 ngx_log_debug1(NGX_LOG_DEBUG_EVENT, c->log, 0, 718 ngx_log_debug1(NGX_LOG_DEBUG_EVENT, c->log, 0,
713 "quic stream id:0x%xL cleanup", qs->id); 719 "quic stream id:0x%xL cleanup", qs->id);
714 720
715 ngx_rbtree_delete(&qc->streams.tree, &qs->node); 721 ngx_rbtree_delete(&qc->streams.tree, &qs->node);
716 ngx_quic_free_frames(pc, &qs->fs->frames); 722 ngx_quic_free_frames(pc, &qs->fs->frames);
723 ngx_quic_free_bufs(pc, qs->in);
717 724
718 if (qc->closing) { 725 if (qc->closing) {
719 /* schedule handler call to continue ngx_quic_close_connection() */ 726 /* schedule handler call to continue ngx_quic_close_connection() */
720 ngx_post_event(pc->read, &ngx_posted_events); 727 ngx_post_event(pc->read, &ngx_posted_events);
721 return; 728 return;
806 813
807 ngx_int_t 814 ngx_int_t
808 ngx_quic_handle_stream_frame(ngx_connection_t *c, ngx_quic_header_t *pkt, 815 ngx_quic_handle_stream_frame(ngx_connection_t *c, ngx_quic_header_t *pkt,
809 ngx_quic_frame_t *frame) 816 ngx_quic_frame_t *frame)
810 { 817 {
811 size_t window;
812 uint64_t last; 818 uint64_t last;
813 ngx_buf_t *b;
814 ngx_pool_t *pool; 819 ngx_pool_t *pool;
815 ngx_connection_t *sc; 820 ngx_connection_t *sc;
816 ngx_quic_stream_t *qs; 821 ngx_quic_stream_t *qs;
817 ngx_quic_connection_t *qc; 822 ngx_quic_connection_t *qc;
818 ngx_quic_stream_frame_t *f; 823 ngx_quic_stream_frame_t *f;
844 return NGX_OK; 849 return NGX_OK;
845 } 850 }
846 851
847 sc = qs->connection; 852 sc = qs->connection;
848 fs = qs->fs; 853 fs = qs->fs;
849 b = qs->b; 854
850 window = b->end - b->last; 855 if (last > qs->recv_max_data) {
851
852 if (last > window) {
853 qc->error = NGX_QUIC_ERR_FLOW_CONTROL_ERROR; 856 qc->error = NGX_QUIC_ERR_FLOW_CONTROL_ERROR;
854 goto cleanup; 857 goto cleanup;
855 } 858 }
856 859
857 if (ngx_quic_handle_ordered_frame(c, fs, frame, ngx_quic_stream_input, 860 if (ngx_quic_handle_ordered_frame(c, fs, frame, ngx_quic_stream_input,
865 868
866 return NGX_OK; 869 return NGX_OK;
867 } 870 }
868 871
869 fs = qs->fs; 872 fs = qs->fs;
870 b = qs->b; 873
871 window = (b->pos - b->start) + (b->end - b->last); 874 if (last > qs->recv_max_data) {
872
873 if (last > fs->received && last - fs->received > window) {
874 qc->error = NGX_QUIC_ERR_FLOW_CONTROL_ERROR; 875 qc->error = NGX_QUIC_ERR_FLOW_CONTROL_ERROR;
875 return NGX_ERROR; 876 return NGX_ERROR;
876 } 877 }
877 878
878 return ngx_quic_handle_ordered_frame(c, fs, frame, ngx_quic_stream_input, 879 return ngx_quic_handle_ordered_frame(c, fs, frame, ngx_quic_stream_input,
890 891
891 892
892 ngx_int_t 893 ngx_int_t
893 ngx_quic_stream_input(ngx_connection_t *c, ngx_quic_frame_t *frame, void *data) 894 ngx_quic_stream_input(ngx_connection_t *c, ngx_quic_frame_t *frame, void *data)
894 { 895 {
896 ssize_t n;
895 uint64_t id; 897 uint64_t id;
896 ngx_buf_t *b; 898 ngx_buf_t *b;
897 ngx_event_t *rev; 899 ngx_event_t *rev;
898 ngx_chain_t *cl; 900 ngx_chain_t *cl, **ll;
899 ngx_quic_stream_t *qs; 901 ngx_quic_stream_t *qs;
900 ngx_quic_connection_t *qc; 902 ngx_quic_connection_t *qc;
901 ngx_quic_stream_frame_t *f; 903 ngx_quic_stream_frame_t *f;
902 904
903 qc = ngx_quic_get_connection(c); 905 qc = ngx_quic_get_connection(c);
904 qs = data; 906 qs = data;
905 907
906 f = &frame->u.stream; 908 f = &frame->u.stream;
907 id = f->stream_id; 909 id = f->stream_id;
908 910 cl = frame->data;
909 b = qs->b; 911
910 912 for (ll = &qs->in; *ll; ll = &(*ll)->next) {
911 if ((size_t) ((b->pos - b->start) + (b->end - b->last)) < f->length) { 913 if ((*ll)->next) {
912 ngx_log_error(NGX_LOG_INFO, c->log, 0, 914 continue;
913 "quic no space in stream buffer"); 915 }
916
917 /* append to last buffer */
918
919 b = (*ll)->buf;
920
921 while (cl && b->last != b->end) {
922 n = ngx_min(cl->buf->last - cl->buf->pos, b->end - b->last);
923 b->last = ngx_cpymem(b->last, cl->buf->pos, n);
924 cl->buf->pos += n;
925
926 if (cl->buf->pos == cl->buf->last) {
927 cl = cl->next;
928 }
929 }
930 }
931
932 cl = ngx_quic_copy_chain(c, cl, 0);
933 if (cl == NGX_CHAIN_ERROR) {
914 return NGX_ERROR; 934 return NGX_ERROR;
915 } 935 }
916 936
917 if ((size_t) (b->end - b->last) < f->length) { 937 *ll = cl;
918 b->last = ngx_movemem(b->start, b->pos, b->last - b->pos);
919 b->pos = b->start;
920 }
921
922 for (cl = frame->data; cl; cl = cl->next) {
923 b->last = ngx_cpymem(b->last, cl->buf->pos,
924 cl->buf->last - cl->buf->pos);
925 }
926 938
927 rev = qs->connection->read; 939 rev = qs->connection->read;
928 rev->ready = 1; 940 rev->ready = 1;
929 941
930 if (f->fin) { 942 if (f->fin) {
993 1005
994 ngx_int_t 1006 ngx_int_t
995 ngx_quic_handle_stream_data_blocked_frame(ngx_connection_t *c, 1007 ngx_quic_handle_stream_data_blocked_frame(ngx_connection_t *c,
996 ngx_quic_header_t *pkt, ngx_quic_stream_data_blocked_frame_t *f) 1008 ngx_quic_header_t *pkt, ngx_quic_stream_data_blocked_frame_t *f)
997 { 1009 {
998 size_t n; 1010 uint64_t limit;
999 ngx_buf_t *b;
1000 ngx_quic_frame_t *frame; 1011 ngx_quic_frame_t *frame;
1001 ngx_quic_stream_t *qs; 1012 ngx_quic_stream_t *qs;
1002 ngx_quic_connection_t *qc; 1013 ngx_quic_connection_t *qc;
1003 1014
1004 qc = ngx_quic_get_connection(c); 1015 qc = ngx_quic_get_connection(c);
1021 1032
1022 if (qs == NGX_QUIC_STREAM_GONE) { 1033 if (qs == NGX_QUIC_STREAM_GONE) {
1023 return NGX_OK; 1034 return NGX_OK;
1024 } 1035 }
1025 1036
1026 b = qs->b; 1037 limit = qs->recv_max_data;
1027 n = b->end - b->last;
1028 1038
1029 qs->connection->listening->handler(qs->connection); 1039 qs->connection->listening->handler(qs->connection);
1030 1040
1031 } else { 1041 } else {
1032 b = qs->b; 1042 limit = qs->recv_max_data;
1033 n = qs->fs->received + (b->pos - b->start) + (b->end - b->last);
1034 } 1043 }
1035 1044
1036 frame = ngx_quic_alloc_frame(c); 1045 frame = ngx_quic_alloc_frame(c);
1037 if (frame == NULL) { 1046 if (frame == NULL) {
1038 return NGX_ERROR; 1047 return NGX_ERROR;
1039 } 1048 }
1040 1049
1041 frame->level = pkt->level; 1050 frame->level = pkt->level;
1042 frame->type = NGX_QUIC_FT_MAX_STREAM_DATA; 1051 frame->type = NGX_QUIC_FT_MAX_STREAM_DATA;
1043 frame->u.max_stream_data.id = f->id; 1052 frame->u.max_stream_data.id = f->id;
1044 frame->u.max_stream_data.limit = n; 1053 frame->u.max_stream_data.limit = limit;
1045 1054
1046 ngx_quic_queue_frame(qc, frame); 1055 ngx_quic_queue_frame(qc, frame);
1047 1056
1048 return NGX_OK; 1057 return NGX_OK;
1049 } 1058 }