# HG changeset patch # User Vladimir Homutov # Date 1584099563 -10800 # Node ID 6bc18966b8c19b9f1c583751ce682e3b38c4be65 # Parent b761ca7df7d03168edae481c933cfe0928283e56 Stream "connection" read/write methods. diff --git a/src/event/ngx_event_quic.c b/src/event/ngx_event_quic.c --- a/src/event/ngx_event_quic.c +++ b/src/event/ngx_event_quic.c @@ -60,7 +60,7 @@ (*(uint32_t *) (p) = htonl((uint32_t) (s)), (p) + sizeof(uint32_t)) #define ngx_quic_varint_len(value) \ - ((value) <= 63 ? 1 : (value) <= 16383 ? 2 : (value) <= 1073741823 ? 4 : 8) + ((value) <= 63 ? 1 : ((uint32_t)value) <= 16383 ? 2 : ((uint64_t)value) <= 1073741823 ? 4 : 8) #if (NGX_DEBUG) @@ -105,7 +105,7 @@ do { #define NGX_QUIC_FT_STOP_SENDING 0x05 #define NGX_QUIC_FT_CRYPTO 0x06 #define NGX_QUIC_FT_NEW_TOKEN 0x07 -#define NGX_QUIC_FT_STREAM 0x08 +#define NGX_QUIC_FT_STREAM0 0x08 #define NGX_QUIC_FT_STREAM1 0x09 #define NGX_QUIC_FT_STREAM2 0x0A #define NGX_QUIC_FT_STREAM3 0x0B @@ -130,6 +130,12 @@ do { #define NGX_QUIC_FT_HANDSHAKE_DONE 0x1e +#define ngx_quic_stream_bit_off(val) (((val) & 0x04) ? 1 : 0) +#define ngx_quic_stream_bit_len(val) (((val) & 0x02) ? 1 : 0) +#define ngx_quic_stream_bit_fin(val) (((val) & 0x01) ? 1 : 0) + + + /* TODO: real states, these are stubs */ typedef enum { NGX_QUIC_ST_INITIAL, @@ -184,6 +190,7 @@ typedef struct { typedef struct { + uint8_t type; uint64_t stream_id; uint64_t offset; uint64_t length; @@ -350,6 +357,13 @@ static ngx_int_t ngx_quic_tls_hp(ngx_con static ngx_int_t ngx_quic_ciphers(ngx_connection_t *c, ngx_quic_ciphers_t *ciphers, enum ssl_encryption_level_t level); +static ssize_t ngx_quic_stream_recv(ngx_connection_t *c, u_char *buf, + size_t size); +static ssize_t ngx_quic_stream_send(ngx_connection_t *c, u_char *buf, + size_t size); +static ngx_chain_t *ngx_quic_stream_send_chain(ngx_connection_t *c, + ngx_chain_t *in, off_t limit); + static SSL_QUIC_METHOD quic_method = { #if BORINGSSL_API_VERSION >= 10 ngx_quic_set_read_secret, @@ -640,6 +654,57 @@ ngx_quic_create_crypto(u_char *p, ngx_qu return p - start; } + +static size_t +ngx_quic_create_stream(u_char *p, ngx_quic_stream_frame_t *sf) +{ + size_t len; + u_char *start; + + if (!ngx_quic_stream_bit_len(sf->type)) { +#if 0 + ngx_log_error(NGX_LOG_INFO, c->log, 0, + "attempt to generate a stream frame without length"); +#endif + // XXX: handle error in caller + return NGX_ERROR; + } + + if (p == NULL) { + len = ngx_quic_varint_len(sf->type); + + if (ngx_quic_stream_bit_off(sf->type)) { + len += ngx_quic_varint_len(sf->offset); + } + + len += ngx_quic_varint_len(sf->stream_id); + + /* length is always present in generated frames */ + len += ngx_quic_varint_len(sf->length); + + len += sf->length; + + return len; + } + + start = p; + + ngx_quic_build_int(&p, sf->type); + ngx_quic_build_int(&p, sf->stream_id); + + if (ngx_quic_stream_bit_off(sf->type)) { + ngx_quic_build_int(&p, sf->offset); + } + + /* length is always present in generated frames */ + ngx_quic_build_int(&p, sf->length); + + p = ngx_cpymem(p, sf->data, sf->length); + + return p - start; +} + + size_t ngx_quic_frame_len(ngx_quic_frame_t *frame) { @@ -648,6 +713,16 @@ ngx_quic_frame_len(ngx_quic_frame_t *fra return ngx_quic_create_ack(NULL, &frame->u.ack); case NGX_QUIC_FT_CRYPTO: return ngx_quic_create_crypto(NULL, &frame->u.crypto); + + case NGX_QUIC_FT_STREAM0: + case NGX_QUIC_FT_STREAM1: + case NGX_QUIC_FT_STREAM2: + case NGX_QUIC_FT_STREAM3: + case NGX_QUIC_FT_STREAM4: + case NGX_QUIC_FT_STREAM5: + case NGX_QUIC_FT_STREAM6: + case NGX_QUIC_FT_STREAM7: + return ngx_quic_create_stream(NULL, &frame->u.stream); default: /* BUG: unsupported frame type generated */ return 0; @@ -687,6 +762,17 @@ ngx_quic_frames_send(ngx_connection_t *c p += ngx_quic_create_crypto(p, &f->u.crypto); break; + case NGX_QUIC_FT_STREAM0: + case NGX_QUIC_FT_STREAM1: + case NGX_QUIC_FT_STREAM2: + case NGX_QUIC_FT_STREAM3: + case NGX_QUIC_FT_STREAM4: + case NGX_QUIC_FT_STREAM5: + case NGX_QUIC_FT_STREAM6: + case NGX_QUIC_FT_STREAM7: + p += ngx_quic_create_stream(p, &f->u.stream); + break; + default: /* BUG: unsupported frame type generated */ return NGX_ERROR; @@ -1653,7 +1739,7 @@ ngx_quic_read_frame(ngx_connection_t *c, return NGX_ERROR; break; - case NGX_QUIC_FT_STREAM: + case NGX_QUIC_FT_STREAM0: case NGX_QUIC_FT_STREAM1: case NGX_QUIC_FT_STREAM2: case NGX_QUIC_FT_STREAM3: @@ -1665,6 +1751,8 @@ ngx_quic_read_frame(ngx_connection_t *c, ngx_log_debug1(NGX_LOG_DEBUG_EVENT, c->log, 0, "STREAM frame, type: 0x%xi", frame->type); + frame->u.stream.type = frame->type; + frame->u.stream.stream_id = ngx_quic_parse_int(&p); if (frame->type & 0x04) { frame->u.stream.offset = ngx_quic_parse_int(&p); @@ -1797,6 +1885,109 @@ ngx_quic_init_connection(ngx_connection_ } +static ssize_t +ngx_quic_stream_recv(ngx_connection_t *c, u_char *buf, size_t size) +{ + ssize_t len; + ngx_buf_t *b; + ngx_quic_stream_t *qs; + ngx_quic_connection_t *qc; + ngx_quic_stream_node_t *sn; + + qs = c->qs; + qc = qs->parent->quic; + + // XXX: get direct pointer from stream structure? + sn = ngx_quic_stream_lookup(&qc->stree, qs->id); + + if (sn == NULL) { + return NGX_ERROR; + } + + // XXX: how to return EOF? + + b = sn->b; + + if (b->last - b->pos == 0) { + ngx_log_debug0(NGX_LOG_DEBUG_EVENT, c->log, 0, + "quic recv() not ready"); + return NGX_AGAIN; // ? + } + + len = ngx_min(b->last - b->pos, (ssize_t) size); + + ngx_memcpy(buf, b->pos, len); + + b->pos += len; + + ngx_log_debug2(NGX_LOG_DEBUG_EVENT, c->log, 0, + "quic recv: %z of %uz", len, size); + + return len; +} + + +static ssize_t +ngx_quic_stream_send(ngx_connection_t *c, u_char *buf, size_t size) +{ + u_char *p; + ngx_quic_frame_t *frame; + ngx_quic_stream_t *qs; + ngx_quic_connection_t *qc; + ngx_quic_stream_node_t *sn; + + ngx_log_debug1(NGX_LOG_DEBUG_EVENT, c->log, 0, "quic send: %uz", size); + + qs = c->qs; + qc = qs->parent->quic; + + + // XXX: get direct pointer from stream structure? + sn = ngx_quic_stream_lookup(&qc->stree, qs->id); + + if (sn == NULL) { + return NGX_ERROR; + } + + frame = ngx_pcalloc(c->pool, sizeof(ngx_quic_frame_t)); + if (frame == NULL) { + return 0; + } + + p = ngx_pnalloc(c->pool, size); + if (p == NULL) { + return 0; + } + + ngx_memcpy(p, buf, size); + + frame->level = ssl_encryption_application; + frame->type = NGX_QUIC_FT_STREAM2; /* OFF=0 LEN=1 FIN=0 */ + + frame->u.stream.type = frame->type; + frame->u.stream.stream_id = qs->id; + frame->u.stream.offset = 0; + frame->u.stream.length = size; + frame->u.stream.data = p; + + ngx_sprintf(frame->info, "stream %xi len=%ui level=%d", + qs->id, size, frame->level); + + ngx_quic_queue_frame(qc, frame); + + return size; +} + + +static ngx_chain_t * +ngx_quic_stream_send_chain(ngx_connection_t *c, ngx_chain_t *in, + off_t limit) +{ + // TODO + return NULL; +} + + /* process all payload from the current packet and generate ack if required */ static ngx_int_t ngx_quic_payload_handler(ngx_connection_t *c, ngx_quic_header_t *pkt) @@ -1805,6 +1996,8 @@ ngx_quic_payload_handler(ngx_connection_ ssize_t len; ngx_buf_t *b; ngx_uint_t ack_this; + ngx_pool_t *pool; + ngx_event_t *rev, *wev; ngx_quic_frame_t frame, *ack_frame; ngx_quic_connection_t *qc; ngx_quic_stream_node_t *sn; @@ -1873,7 +2066,7 @@ ngx_quic_payload_handler(ngx_connection_ frame.u.ncid.len); continue; - case NGX_QUIC_FT_STREAM: + case NGX_QUIC_FT_STREAM0: case NGX_QUIC_FT_STREAM1: case NGX_QUIC_FT_STREAM2: case NGX_QUIC_FT_STREAM3: @@ -1882,12 +2075,15 @@ ngx_quic_payload_handler(ngx_connection_ case NGX_QUIC_FT_STREAM6: case NGX_QUIC_FT_STREAM7: - ngx_log_debug4(NGX_LOG_DEBUG_EVENT, c->log, 0, - "STREAM frame 0x%xi id 0x%xi off 0x%xi len 0x%xi", + ngx_log_debug7(NGX_LOG_DEBUG_EVENT, c->log, 0, + "STREAM frame 0x%xi id 0x%xi offset 0x%xi len 0x%xi bits:off=%d len=%d fin=%d", frame.type, frame.u.stream.stream_id, frame.u.stream.offset, - frame.u.stream.length); + frame.u.stream.length, + ngx_quic_stream_bit_off(frame.u.stream.type), + ngx_quic_stream_bit_len(frame.u.stream.type), + ngx_quic_stream_bit_fin(frame.u.stream.type)); sn = ngx_quic_stream_lookup(&qc->stree, frame.u.stream.stream_id); @@ -1899,13 +2095,28 @@ ngx_quic_payload_handler(ngx_connection_ return NGX_ERROR; } + pool = ngx_create_pool(NGX_DEFAULT_POOL_SIZE, c->log); + if (pool == NULL) { + return NGX_ERROR; + } + sn->c = ngx_get_connection(-1, c->log); // TODO: free on connection termination if (sn->c == NULL) { return NGX_ERROR; } + sn->c->pool = pool; + + rev = sn->c->read; + wev = sn->c->write; + + rev->log = c->log; + wev->log = c->log; + + sn->c->number = ngx_atomic_fetch_add(ngx_connection_counter, 1); + sn->node.key = frame.u.stream.stream_id; - sn->b = ngx_create_temp_buf(c->pool, 16 * 1024); // XXX enough for everyone + sn->b = ngx_create_temp_buf(pool, 16 * 1024); // XXX enough for everyone if (sn->b == NULL) { return NGX_ERROR; } @@ -1917,9 +2128,14 @@ ngx_quic_payload_handler(ngx_connection_ ngx_rbtree_insert(&qc->stree, &sn->node); sn->s.id = frame.u.stream.stream_id; + sn->s.unidirectional = (sn->s.id & 0x02) ? 1 : 0; sn->s.parent = c; sn->c->qs = &sn->s; + sn->c->recv = ngx_quic_stream_recv; + sn->c->send = ngx_quic_stream_send; + sn->c->send_chain = ngx_quic_stream_send_chain; + qc->stream_handler(sn->c); } else { @@ -1973,7 +2189,7 @@ ngx_quic_payload_handler(ngx_connection_ ack_frame->type = NGX_QUIC_FT_ACK; ack_frame->u.ack.pn = pkt->pn; - ngx_sprintf(ack_frame->info, "ACK for PN=%d from frame handler", pkt->pn); + ngx_sprintf(ack_frame->info, "ACK for PN=%d from frame handler level=%d", pkt->pn, pkt->level); ngx_quic_queue_frame(qc, ack_frame); return ngx_quic_output(c); diff --git a/src/http/ngx_http_request.c b/src/http/ngx_http_request.c --- a/src/http/ngx_http_request.c +++ b/src/http/ngx_http_request.c @@ -395,8 +395,39 @@ ngx_http_quic_stream_handler(ngx_connect { ngx_quic_stream_t *qs = c->qs; + // STUB for stream read/write + ngx_log_debug1(NGX_LOG_DEBUG_HTTP, c->log, 0, "quic stream: 0x%uXL", qs->id); + ssize_t n; + ngx_buf_t b; + + u_char buf[512]; + + b.start = buf; + b.end = buf + 512; + b.pos = b.last = b.start; + + n = c->recv(c, b.pos, b.end - b.start); + if (n < 0) { + ngx_log_error(NGX_LOG_INFO, c->log, 0, "stream read failed"); + return; + } + + ngx_log_debug2(NGX_LOG_DEBUG_HTTP, c->log, 0, + "quic stream: 0x%uXL %ui bytes read", qs->id, n); + + b.last += n; + + n = c->send(c, b.start, n); + + if (n < 0) { + ngx_log_error(NGX_LOG_INFO, c->log, 0, "stream write failed"); + return; + } + + ngx_log_debug2(NGX_LOG_DEBUG_HTTP, c->log, 0, + "quic stream: 0x%uXL %ui bytes written", qs->id, n); }