Mercurial > hg > nginx
comparison src/event/ngx_event_quic.c @ 8266:f92e583fc256 quic
Better flow control and buffering for QUIC streams.
author | Roman Arutyunyan <arut@nginx.com> |
---|---|
date | Mon, 23 Mar 2020 15:49:31 +0300 |
parents | d45325e90221 |
children | a8349cc72c64 |
comparison
equal
deleted
inserted
replaced
8265:d45325e90221 | 8266:f92e583fc256 |
---|---|
12 typedef enum { | 12 typedef enum { |
13 NGX_QUIC_ST_INITIAL, /* connection just created */ | 13 NGX_QUIC_ST_INITIAL, /* connection just created */ |
14 NGX_QUIC_ST_HANDSHAKE, /* handshake started */ | 14 NGX_QUIC_ST_HANDSHAKE, /* handshake started */ |
15 NGX_QUIC_ST_APPLICATION /* handshake complete */ | 15 NGX_QUIC_ST_APPLICATION /* handshake complete */ |
16 } ngx_quic_state_t; | 16 } ngx_quic_state_t; |
17 | |
18 | |
19 #define NGX_QUIC_STREAM_BUFSIZE 16384 | |
17 | 20 |
18 | 21 |
19 typedef struct { | 22 typedef struct { |
20 ngx_rbtree_node_t node; | 23 ngx_rbtree_node_t node; |
21 ngx_buf_t *b; | 24 ngx_buf_t *b; |
104 ngx_quic_header_t *pkt, ngx_quic_crypto_frame_t *frame); | 107 ngx_quic_header_t *pkt, ngx_quic_crypto_frame_t *frame); |
105 static ngx_int_t ngx_quic_handle_stream_frame(ngx_connection_t *c, | 108 static ngx_int_t ngx_quic_handle_stream_frame(ngx_connection_t *c, |
106 ngx_quic_header_t *pkt, ngx_quic_stream_frame_t *frame); | 109 ngx_quic_header_t *pkt, ngx_quic_stream_frame_t *frame); |
107 static ngx_int_t ngx_quic_handle_streams_blocked_frame(ngx_connection_t *c, | 110 static ngx_int_t ngx_quic_handle_streams_blocked_frame(ngx_connection_t *c, |
108 ngx_quic_header_t *pkt, ngx_quic_streams_blocked_frame_t *f); | 111 ngx_quic_header_t *pkt, ngx_quic_streams_blocked_frame_t *f); |
112 static ngx_int_t ngx_quic_handle_stream_data_blocked_frame(ngx_connection_t *c, | |
113 ngx_quic_header_t *pkt, ngx_quic_stream_data_blocked_frame_t *f); | |
109 | 114 |
110 static void ngx_quic_queue_frame(ngx_quic_connection_t *qc, | 115 static void ngx_quic_queue_frame(ngx_quic_connection_t *qc, |
111 ngx_quic_frame_t *frame); | 116 ngx_quic_frame_t *frame); |
112 | 117 |
113 static ngx_int_t ngx_quic_output(ngx_connection_t *c); | 118 static ngx_int_t ngx_quic_output(ngx_connection_t *c); |
883 } | 888 } |
884 | 889 |
885 ack_this = 1; | 890 ack_this = 1; |
886 break; | 891 break; |
887 | 892 |
893 case NGX_QUIC_FT_STREAM_DATA_BLOCKED: | |
894 | |
895 if (ngx_quic_handle_stream_data_blocked_frame(c, pkt, | |
896 &frame.u.stream_data_blocked) | |
897 != NGX_OK) | |
898 { | |
899 return NGX_ERROR; | |
900 } | |
901 | |
902 ack_this = 1; | |
903 break; | |
904 | |
888 default: | 905 default: |
889 return NGX_ERROR; | 906 return NGX_ERROR; |
890 } | 907 } |
891 } | 908 } |
892 | 909 |
1000 static ngx_int_t | 1017 static ngx_int_t |
1001 ngx_quic_handle_stream_frame(ngx_connection_t *c, | 1018 ngx_quic_handle_stream_frame(ngx_connection_t *c, |
1002 ngx_quic_header_t *pkt, ngx_quic_stream_frame_t *f) | 1019 ngx_quic_header_t *pkt, ngx_quic_stream_frame_t *f) |
1003 { | 1020 { |
1004 ngx_buf_t *b; | 1021 ngx_buf_t *b; |
1022 ngx_event_t *rev; | |
1005 ngx_quic_connection_t *qc; | 1023 ngx_quic_connection_t *qc; |
1006 ngx_quic_stream_node_t *sn; | 1024 ngx_quic_stream_node_t *sn; |
1007 | 1025 |
1008 qc = c->quic; | 1026 qc = c->quic; |
1009 | 1027 |
1011 | 1029 |
1012 if (sn) { | 1030 if (sn) { |
1013 ngx_log_debug0(NGX_LOG_DEBUG_EVENT, c->log, 0, "existing stream"); | 1031 ngx_log_debug0(NGX_LOG_DEBUG_EVENT, c->log, 0, "existing stream"); |
1014 b = sn->b; | 1032 b = sn->b; |
1015 | 1033 |
1016 if ((size_t) (b->end - b->pos) < f->length) { | 1034 if ((size_t) ((b->pos - b->start) + (b->end - b->last)) < f->length) { |
1017 ngx_log_error(NGX_LOG_INFO, c->log, 0, "no space in stream buffer"); | 1035 ngx_log_error(NGX_LOG_INFO, c->log, 0, "no space in stream buffer"); |
1018 return NGX_ERROR; | 1036 return NGX_ERROR; |
1019 } | 1037 } |
1020 | 1038 |
1021 ngx_memcpy(b->pos, f->data, f->length); | 1039 if ((size_t) (b->end - b->last) < f->length) { |
1022 b->pos += f->length; | 1040 b->last = ngx_movemem(b->start, b->pos, b->last - b->pos); |
1023 | 1041 b->pos = b->start; |
1024 // TODO: notify | 1042 } |
1043 | |
1044 b->last = ngx_cpymem(b->last, f->data, f->length); | |
1045 | |
1046 rev = sn->c->read; | |
1047 rev->ready = 1; | |
1048 | |
1049 if (rev->active) { | |
1050 rev->handler(rev); | |
1051 } | |
1025 | 1052 |
1026 return NGX_OK; | 1053 return NGX_OK; |
1027 } | 1054 } |
1028 | 1055 |
1029 ngx_log_debug0(NGX_LOG_DEBUG_EVENT, c->log, 0, "stream is new"); | 1056 ngx_log_debug0(NGX_LOG_DEBUG_EVENT, c->log, 0, "stream is new"); |
1061 frame->u.max_streams.bidi = f->bidi; | 1088 frame->u.max_streams.bidi = f->bidi; |
1062 | 1089 |
1063 ngx_sprintf(frame->info, "MAX_STREAMS limit:%d bidi:%d level=%d", | 1090 ngx_sprintf(frame->info, "MAX_STREAMS limit:%d bidi:%d level=%d", |
1064 (int) frame->u.max_streams.limit, | 1091 (int) frame->u.max_streams.limit, |
1065 (int) frame->u.max_streams.bidi, | 1092 (int) frame->u.max_streams.bidi, |
1093 frame->level); | |
1094 | |
1095 ngx_quic_queue_frame(c->quic, frame); | |
1096 | |
1097 return NGX_OK; | |
1098 } | |
1099 | |
1100 | |
1101 static ngx_int_t | |
1102 ngx_quic_handle_stream_data_blocked_frame(ngx_connection_t *c, | |
1103 ngx_quic_header_t *pkt, ngx_quic_stream_data_blocked_frame_t *f) | |
1104 { | |
1105 size_t n; | |
1106 ngx_buf_t *b; | |
1107 ngx_quic_frame_t *frame; | |
1108 ngx_quic_connection_t *qc; | |
1109 ngx_quic_stream_node_t *sn; | |
1110 | |
1111 qc = c->quic; | |
1112 sn = ngx_quic_find_stream(&qc->streams.tree, f->id); | |
1113 | |
1114 if (sn == NULL) { | |
1115 ngx_log_error(NGX_LOG_INFO, c->log, 0, "unknown stream id:%uL", f->id); | |
1116 return NGX_ERROR; | |
1117 } | |
1118 | |
1119 b = sn->b; | |
1120 n = (b->pos - b->start) + (b->end - b->last); | |
1121 | |
1122 frame = ngx_pcalloc(c->pool, sizeof(ngx_quic_frame_t)); | |
1123 if (frame == NULL) { | |
1124 return NGX_ERROR; | |
1125 } | |
1126 | |
1127 frame->level = pkt->level; | |
1128 frame->type = NGX_QUIC_FT_MAX_STREAM_DATA; | |
1129 frame->u.max_stream_data.id = f->id; | |
1130 frame->u.max_stream_data.limit = n; | |
1131 | |
1132 ngx_sprintf(frame->info, "MAX_STREAM_DATA id:%d limit:%d level=%d", | |
1133 (int) frame->u.max_stream_data.id, | |
1134 (int) frame->u.max_stream_data.limit, | |
1066 frame->level); | 1135 frame->level); |
1067 | 1136 |
1068 ngx_quic_queue_frame(c->quic, frame); | 1137 ngx_quic_queue_frame(c->quic, frame); |
1069 | 1138 |
1070 return NGX_OK; | 1139 return NGX_OK; |
1347 | 1416 |
1348 | 1417 |
1349 static ngx_quic_stream_node_t * | 1418 static ngx_quic_stream_node_t * |
1350 ngx_quic_create_stream(ngx_connection_t *c, ngx_uint_t id) | 1419 ngx_quic_create_stream(ngx_connection_t *c, ngx_uint_t id) |
1351 { | 1420 { |
1421 size_t n; | |
1352 ngx_log_t *log; | 1422 ngx_log_t *log; |
1353 ngx_pool_t *pool; | 1423 ngx_pool_t *pool; |
1354 ngx_event_t *rev, *wev; | 1424 ngx_event_t *rev, *wev; |
1355 ngx_pool_cleanup_t *cln; | 1425 ngx_pool_cleanup_t *cln; |
1356 ngx_quic_connection_t *qc; | 1426 ngx_quic_connection_t *qc; |
1400 rev->log = c->log; | 1470 rev->log = c->log; |
1401 wev->log = c->log; | 1471 wev->log = c->log; |
1402 | 1472 |
1403 sn->c->number = ngx_atomic_fetch_add(ngx_connection_counter, 1); | 1473 sn->c->number = ngx_atomic_fetch_add(ngx_connection_counter, 1); |
1404 | 1474 |
1475 n = ngx_max(NGX_QUIC_STREAM_BUFSIZE, | |
1476 qc->tp.initial_max_stream_data_bidi_remote); | |
1477 | |
1405 sn->node.key =id; | 1478 sn->node.key =id; |
1406 sn->b = ngx_create_temp_buf(pool, 16 * 1024); // XXX enough for everyone | 1479 sn->b = ngx_create_temp_buf(pool, n); |
1407 if (sn->b == NULL) { | 1480 if (sn->b == NULL) { |
1408 return NULL; | 1481 return NULL; |
1409 } | 1482 } |
1410 | 1483 |
1411 ngx_rbtree_insert(&qc->streams.tree, &sn->node); | 1484 ngx_rbtree_insert(&qc->streams.tree, &sn->node); |
1454 | 1527 |
1455 // XXX: how to return EOF? | 1528 // XXX: how to return EOF? |
1456 | 1529 |
1457 b = sn->b; | 1530 b = sn->b; |
1458 | 1531 |
1459 if (b->last - b->pos == 0) { | 1532 if (b->pos == b->last) { |
1460 c->read->ready = 0; | 1533 c->read->ready = 0; |
1461 ngx_log_debug0(NGX_LOG_DEBUG_EVENT, c->log, 0, | 1534 ngx_log_debug0(NGX_LOG_DEBUG_EVENT, c->log, 0, "quic recv() not ready"); |
1462 "quic recv() not ready"); | 1535 return NGX_AGAIN; |
1463 return NGX_AGAIN; // ? | |
1464 } | 1536 } |
1465 | 1537 |
1466 len = ngx_min(b->last - b->pos, (ssize_t) size); | 1538 len = ngx_min(b->last - b->pos, (ssize_t) size); |
1467 | 1539 |
1468 ngx_memcpy(buf, b->pos, len); | 1540 ngx_memcpy(buf, b->pos, len); |
1469 | 1541 |
1470 b->pos += len; | 1542 b->pos += len; |
1543 | |
1544 if (b->pos == b->last) { | |
1545 b->pos = b->start; | |
1546 b->last = b->start; | |
1547 } | |
1471 | 1548 |
1472 ngx_log_debug2(NGX_LOG_DEBUG_EVENT, c->log, 0, | 1549 ngx_log_debug2(NGX_LOG_DEBUG_EVENT, c->log, 0, |
1473 "quic recv: %z of %uz", len, size); | 1550 "quic recv: %z of %uz", len, size); |
1474 | 1551 |
1475 return len; | 1552 return len; |