comparison src/event/quic/ngx_event_quic_streams.c @ 8782:b3f6ad181df4 quic

QUIC: refactored CRYPTO and STREAM buffer ordering. Generic function ngx_quic_order_bufs() is introduced. This function creates and maintains a chain of buffers with holes. Holes are marked with b->sync flag. Several buffers and holes in this chain may share the same underlying memory buffer. When processing STREAM frames with this function, frame data is copied only once to the right place in the stream input chain. Previously data could be copied twice. First when buffering an out-of-order frame data, and then when filling stream buffer from ordered frame queue. Now there's only one data chain for both tasks.
author Roman Arutyunyan <arut@nginx.com>
date Tue, 25 May 2021 13:55:12 +0300
parents f52a2b77d406
children 60c6e8d8d3ae
comparison
equal deleted inserted replaced
8781:81d491f0dc8c 8782:b3f6ad181df4
347 } 347 }
348 348
349 qs->node.key = id; 349 qs->node.key = id;
350 qs->parent = c; 350 qs->parent = c;
351 qs->id = id; 351 qs->id = id;
352 352 qs->final_size = (uint64_t) -1;
353 qs->fs = ngx_pcalloc(pool, sizeof(ngx_quic_frames_stream_t));
354 if (qs->fs == NULL) {
355 ngx_destroy_pool(pool);
356 return NULL;
357 }
358
359 ngx_queue_init(&qs->fs->frames);
360 353
361 log = ngx_palloc(pool, sizeof(ngx_log_t)); 354 log = ngx_palloc(pool, sizeof(ngx_log_t));
362 if (log == NULL) { 355 if (log == NULL) {
363 ngx_destroy_pool(pool); 356 ngx_destroy_pool(pool);
364 return NULL; 357 return NULL;
455 448
456 if (rev->error) { 449 if (rev->error) {
457 return NGX_ERROR; 450 return NGX_ERROR;
458 } 451 }
459 452
460 ngx_log_debug2(NGX_LOG_DEBUG_EVENT, c->log, 0, 453 ngx_log_debug3(NGX_LOG_DEBUG_EVENT, c->log, 0,
461 "quic stream recv id:0x%xL eof:%d", 454 "quic stream id:0x%xL recv eof:%d buf:%uz",
462 qs->id, rev->pending_eof); 455 qs->id, rev->pending_eof, size);
463 456
464 if (qs->in == NULL) { 457 if (qs->in == NULL || qs->in->buf->sync) {
465 rev->ready = 0; 458 rev->ready = 0;
466 459
467 if (rev->pending_eof) { 460 if (qs->recv_offset == qs->final_size) {
468 rev->eof = 1; 461 rev->eof = 1;
469 return 0; 462 return 0;
470 } 463 }
471 464
472 ngx_log_debug1(NGX_LOG_DEBUG_EVENT, c->log, 0, 465 ngx_log_debug1(NGX_LOG_DEBUG_EVENT, c->log, 0,
478 cl = qs->in; 471 cl = qs->in;
479 472
480 for (ll = &cl; *ll; ll = &(*ll)->next) { 473 for (ll = &cl; *ll; ll = &(*ll)->next) {
481 b = (*ll)->buf; 474 b = (*ll)->buf;
482 475
476 if (b->sync) {
477 /* hole */
478 break;
479 }
480
483 n = ngx_min(b->last - b->pos, (ssize_t) size); 481 n = ngx_min(b->last - b->pos, (ssize_t) size);
484 buf = ngx_cpymem(buf, b->pos, n); 482 buf = ngx_cpymem(buf, b->pos, n);
485 483
486 len += n; 484 len += n;
487 size -= n; 485 size -= n;
497 495
498 ngx_quic_free_bufs(pc, cl); 496 ngx_quic_free_bufs(pc, cl);
499 497
500 qc->streams.received += len; 498 qc->streams.received += len;
501 qs->recv_max_data += len; 499 qs->recv_max_data += len;
500 qs->recv_offset += len;
502 501
503 if (qs->in == NULL) { 502 if (qs->in == NULL) {
504 rev->ready = rev->pending_eof; 503 rev->ready = rev->pending_eof;
505 } 504 }
506 505
507 ngx_log_debug3(NGX_LOG_DEBUG_EVENT, c->log, 0, 506 ngx_log_debug2(NGX_LOG_DEBUG_EVENT, c->log, 0,
508 "quic stream id:0x%xL recv len:%z of size:%uz", 507 "quic stream id:0x%xL recv len:%z", qs->id, len);
509 qs->id, len, size);
510 508
511 if (!rev->pending_eof) { 509 if (!rev->pending_eof) {
512 frame = ngx_quic_alloc_frame(pc); 510 frame = ngx_quic_alloc_frame(pc);
513 if (frame == NULL) { 511 if (frame == NULL) {
514 return NGX_ERROR; 512 return NGX_ERROR;
717 715
718 ngx_log_debug1(NGX_LOG_DEBUG_EVENT, c->log, 0, 716 ngx_log_debug1(NGX_LOG_DEBUG_EVENT, c->log, 0,
719 "quic stream id:0x%xL cleanup", qs->id); 717 "quic stream id:0x%xL cleanup", qs->id);
720 718
721 ngx_rbtree_delete(&qc->streams.tree, &qs->node); 719 ngx_rbtree_delete(&qc->streams.tree, &qs->node);
722 ngx_quic_free_frames(pc, &qs->fs->frames);
723 ngx_quic_free_bufs(pc, qs->in); 720 ngx_quic_free_bufs(pc, qs->in);
724 721
725 if (qc->closing) { 722 if (qc->closing) {
726 /* schedule handler call to continue ngx_quic_close_connection() */ 723 /* schedule handler call to continue ngx_quic_close_connection() */
727 ngx_post_event(pc->read, &ngx_posted_events); 724 ngx_post_event(pc->read, &ngx_posted_events);
813 810
814 ngx_int_t 811 ngx_int_t
815 ngx_quic_handle_stream_frame(ngx_connection_t *c, ngx_quic_header_t *pkt, 812 ngx_quic_handle_stream_frame(ngx_connection_t *c, ngx_quic_header_t *pkt,
816 ngx_quic_frame_t *frame) 813 ngx_quic_frame_t *frame)
817 { 814 {
818 uint64_t last; 815 uint64_t last;
819 ngx_pool_t *pool; 816 ngx_pool_t *pool;
820 ngx_connection_t *sc; 817 ngx_connection_t *sc;
821 ngx_quic_stream_t *qs; 818 ngx_quic_stream_t *qs;
822 ngx_quic_connection_t *qc; 819 ngx_quic_connection_t *qc;
823 ngx_quic_stream_frame_t *f; 820 ngx_quic_stream_frame_t *f;
824 ngx_quic_frames_stream_t *fs;
825 821
826 qc = ngx_quic_get_connection(c); 822 qc = ngx_quic_get_connection(c);
827 f = &frame->u.stream; 823 f = &frame->u.stream;
828 824
829 if ((f->stream_id & NGX_QUIC_STREAM_UNIDIRECTIONAL) 825 if ((f->stream_id & NGX_QUIC_STREAM_UNIDIRECTIONAL)
848 if (qs == NGX_QUIC_STREAM_GONE) { 844 if (qs == NGX_QUIC_STREAM_GONE) {
849 return NGX_OK; 845 return NGX_OK;
850 } 846 }
851 847
852 sc = qs->connection; 848 sc = qs->connection;
853 fs = qs->fs;
854 849
855 if (last > qs->recv_max_data) { 850 if (last > qs->recv_max_data) {
856 qc->error = NGX_QUIC_ERR_FLOW_CONTROL_ERROR; 851 qc->error = NGX_QUIC_ERR_FLOW_CONTROL_ERROR;
857 goto cleanup; 852 goto cleanup;
858 } 853 }
859 854
860 if (ngx_quic_handle_ordered_frame(c, fs, frame, ngx_quic_stream_input, 855 if (f->fin) {
861 qs) 856 sc->read->pending_eof = 1;
862 != NGX_OK) 857 qs->final_size = last;
863 { 858 }
859
860 if (f->offset == 0) {
861 sc->read->ready = 1;
862 }
863
864 if (ngx_quic_order_bufs(c, &qs->in, frame->data, f->offset) != NGX_OK) {
864 goto cleanup; 865 goto cleanup;
865 } 866 }
866 867
867 sc->listening->handler(sc); 868 sc->listening->handler(sc);
868 869
869 return NGX_OK; 870 return NGX_OK;
870 } 871 }
871
872 fs = qs->fs;
873 872
874 if (last > qs->recv_max_data) { 873 if (last > qs->recv_max_data) {
875 qc->error = NGX_QUIC_ERR_FLOW_CONTROL_ERROR; 874 qc->error = NGX_QUIC_ERR_FLOW_CONTROL_ERROR;
876 return NGX_ERROR; 875 return NGX_ERROR;
877 } 876 }
878 877
879 return ngx_quic_handle_ordered_frame(c, fs, frame, ngx_quic_stream_input, 878 if (qs->final_size != (uint64_t) -1 && last > qs->final_size) {
880 qs); 879 qc->error = NGX_QUIC_ERR_FINAL_SIZE_ERROR;
880 return NGX_ERROR;
881 }
882
883 if (last <= qs->recv_offset) {
884 return NGX_OK;
885 }
886
887 if (f->offset < qs->recv_offset) {
888 ngx_quic_trim_bufs(frame->data, qs->recv_offset - f->offset);
889 f->offset = qs->recv_offset;
890 }
891
892 if (f->offset == qs->recv_offset) {
893 qs->connection->read->ready = 1;
894 }
895
896 if (f->fin) {
897 if (qs->final_size != (uint64_t) -1 && qs->final_size != last) {
898 qc->error = NGX_QUIC_ERR_FINAL_SIZE_ERROR;
899 return NGX_ERROR;
900 }
901
902 qs->connection->read->pending_eof = 1;
903 qs->final_size = last;
904 }
905
906 return ngx_quic_order_bufs(c, &qs->in, frame->data,
907 f->offset - qs->recv_offset);
881 908
882 cleanup: 909 cleanup:
883 910
884 pool = sc->pool; 911 pool = sc->pool;
885 912
886 ngx_close_connection(sc); 913 ngx_close_connection(sc);
887 ngx_destroy_pool(pool); 914 ngx_destroy_pool(pool);
888 915
889 return NGX_ERROR; 916 return NGX_ERROR;
890 }
891
892
893 ngx_int_t
894 ngx_quic_stream_input(ngx_connection_t *c, ngx_quic_frame_t *frame, void *data)
895 {
896 ssize_t n;
897 uint64_t id;
898 ngx_buf_t *b;
899 ngx_event_t *rev;
900 ngx_chain_t *cl, **ll;
901 ngx_quic_stream_t *qs;
902 ngx_quic_connection_t *qc;
903 ngx_quic_stream_frame_t *f;
904
905 qc = ngx_quic_get_connection(c);
906 qs = data;
907
908 f = &frame->u.stream;
909 id = f->stream_id;
910 cl = frame->data;
911
912 for (ll = &qs->in; *ll; ll = &(*ll)->next) {
913 if ((*ll)->next) {
914 continue;
915 }
916
917 /* append to last buffer */
918
919 b = (*ll)->buf;
920
921 while (cl && b->last != b->end) {
922 n = ngx_min(cl->buf->last - cl->buf->pos, b->end - b->last);
923 b->last = ngx_cpymem(b->last, cl->buf->pos, n);
924 cl->buf->pos += n;
925
926 if (cl->buf->pos == cl->buf->last) {
927 cl = cl->next;
928 }
929 }
930 }
931
932 cl = ngx_quic_copy_chain(c, cl, 0);
933 if (cl == NGX_CHAIN_ERROR) {
934 return NGX_ERROR;
935 }
936
937 *ll = cl;
938
939 rev = qs->connection->read;
940 rev->ready = 1;
941
942 if (f->fin) {
943 rev->pending_eof = 1;
944 }
945
946 if (rev->active) {
947 rev->handler(rev);
948 }
949
950 /* check if stream was destroyed by handler */
951 if (ngx_quic_find_stream(&qc->streams.tree, id) == NULL) {
952 return NGX_DONE;
953 }
954
955 return NGX_OK;
956 } 917 }
957 918
958 919
959 ngx_int_t 920 ngx_int_t
960 ngx_quic_handle_max_data_frame(ngx_connection_t *c, 921 ngx_quic_handle_max_data_frame(ngx_connection_t *c,
1148 1109
1149 if (qs == NGX_QUIC_STREAM_GONE) { 1110 if (qs == NGX_QUIC_STREAM_GONE) {
1150 return NGX_OK; 1111 return NGX_OK;
1151 } 1112 }
1152 1113
1114 qs->final_size = f->final_size;
1115
1153 sc = qs->connection; 1116 sc = qs->connection;
1154 1117
1155 rev = sc->read; 1118 rev = sc->read;
1156 rev->error = 1; 1119 rev->error = 1;
1157 rev->ready = 1; 1120 rev->ready = 1;
1158 1121
1159 sc->listening->handler(sc); 1122 sc->listening->handler(sc);
1160 1123
1161 return NGX_OK; 1124 return NGX_OK;
1162 } 1125 }
1126
1127 if (qs->final_size != (uint64_t) -1 && qs->final_size != f->final_size) {
1128 qc->error = NGX_QUIC_ERR_FINAL_SIZE_ERROR;
1129 return NGX_ERROR;
1130 }
1131
1132 qs->final_size = f->final_size;
1163 1133
1164 rev = qs->connection->read; 1134 rev = qs->connection->read;
1165 rev->error = 1; 1135 rev->error = 1;
1166 rev->ready = 1; 1136 rev->ready = 1;
1167 1137