changeset 8211:6bc18966b8c1 quic

Stream "connection" read/write methods.
author Vladimir Homutov <vl@nginx.com>
date Fri, 13 Mar 2020 14:39:23 +0300
parents b761ca7df7d0
children e3c0b19a3a8a
files src/event/ngx_event_quic.c src/http/ngx_http_request.c
diffstat 2 files changed, 256 insertions(+), 9 deletions(-) [+]
line wrap: on
line diff
--- a/src/event/ngx_event_quic.c
+++ b/src/event/ngx_event_quic.c
@@ -60,7 +60,7 @@
     (*(uint32_t *) (p) = htonl((uint32_t) (s)), (p) + sizeof(uint32_t))
 
 #define ngx_quic_varint_len(value)                                            \
-    ((value) <= 63 ? 1 : (value) <= 16383 ? 2 : (value) <= 1073741823 ?  4 : 8)
+    ((value) <= 63 ? 1 : ((uint32_t)value) <= 16383 ? 2 : ((uint64_t)value) <= 1073741823 ?  4 : 8)
 
 
 #if (NGX_DEBUG)
@@ -105,7 +105,7 @@ do {                                    
 #define NGX_QUIC_FT_STOP_SENDING           0x05
 #define NGX_QUIC_FT_CRYPTO                 0x06
 #define NGX_QUIC_FT_NEW_TOKEN              0x07
-#define NGX_QUIC_FT_STREAM                 0x08
+#define NGX_QUIC_FT_STREAM0                0x08
 #define NGX_QUIC_FT_STREAM1                0x09
 #define NGX_QUIC_FT_STREAM2                0x0A
 #define NGX_QUIC_FT_STREAM3                0x0B
@@ -130,6 +130,12 @@ do {                                    
 #define NGX_QUIC_FT_HANDSHAKE_DONE         0x1e
 
 
+#define ngx_quic_stream_bit_off(val)  (((val) & 0x04) ? 1 : 0)
+#define ngx_quic_stream_bit_len(val)  (((val) & 0x02) ? 1 : 0)
+#define ngx_quic_stream_bit_fin(val)  (((val) & 0x01) ? 1 : 0)
+
+
+
 /* TODO: real states, these are stubs */
 typedef enum  {
     NGX_QUIC_ST_INITIAL,
@@ -184,6 +190,7 @@ typedef struct {
 
 
 typedef struct {
+    uint8_t                      type;
     uint64_t                     stream_id;
     uint64_t                     offset;
     uint64_t                     length;
@@ -350,6 +357,13 @@ static ngx_int_t ngx_quic_tls_hp(ngx_con
 static ngx_int_t ngx_quic_ciphers(ngx_connection_t *c,
     ngx_quic_ciphers_t *ciphers, enum ssl_encryption_level_t level);
 
+static ssize_t ngx_quic_stream_recv(ngx_connection_t *c, u_char *buf,
+    size_t size);
+static ssize_t ngx_quic_stream_send(ngx_connection_t *c, u_char *buf,
+    size_t size);
+static ngx_chain_t *ngx_quic_stream_send_chain(ngx_connection_t *c,
+    ngx_chain_t *in, off_t limit);
+
 static SSL_QUIC_METHOD quic_method = {
 #if BORINGSSL_API_VERSION >= 10
     ngx_quic_set_read_secret,
@@ -640,6 +654,57 @@ ngx_quic_create_crypto(u_char *p, ngx_qu
     return p - start;
 }
 
+
+static size_t
+ngx_quic_create_stream(u_char *p, ngx_quic_stream_frame_t *sf)
+{
+    size_t   len;
+    u_char  *start;
+
+    if (!ngx_quic_stream_bit_len(sf->type)) {
+#if 0
+        ngx_log_error(NGX_LOG_INFO, c->log, 0,
+                      "attempt to generate a stream frame without length");
+#endif
+        // XXX: handle error in caller
+        return NGX_ERROR;
+    }
+
+    if (p == NULL) {
+        len = ngx_quic_varint_len(sf->type);
+
+        if (ngx_quic_stream_bit_off(sf->type)) {
+            len += ngx_quic_varint_len(sf->offset);
+        }
+
+        len += ngx_quic_varint_len(sf->stream_id);
+
+        /* length is always present in generated frames */
+        len += ngx_quic_varint_len(sf->length);
+
+        len += sf->length;
+
+        return len;
+    }
+
+    start = p;
+
+    ngx_quic_build_int(&p, sf->type);
+    ngx_quic_build_int(&p, sf->stream_id);
+
+    if (ngx_quic_stream_bit_off(sf->type)) {
+        ngx_quic_build_int(&p, sf->offset);
+    }
+
+    /* length is always present in generated frames */
+    ngx_quic_build_int(&p, sf->length);
+
+    p = ngx_cpymem(p, sf->data, sf->length);
+
+    return p - start;
+}
+
+
 size_t
 ngx_quic_frame_len(ngx_quic_frame_t *frame)
 {
@@ -648,6 +713,16 @@ ngx_quic_frame_len(ngx_quic_frame_t *fra
             return ngx_quic_create_ack(NULL, &frame->u.ack);
         case NGX_QUIC_FT_CRYPTO:
             return ngx_quic_create_crypto(NULL, &frame->u.crypto);
+
+        case NGX_QUIC_FT_STREAM0:
+        case NGX_QUIC_FT_STREAM1:
+        case NGX_QUIC_FT_STREAM2:
+        case NGX_QUIC_FT_STREAM3:
+        case NGX_QUIC_FT_STREAM4:
+        case NGX_QUIC_FT_STREAM5:
+        case NGX_QUIC_FT_STREAM6:
+        case NGX_QUIC_FT_STREAM7:
+            return ngx_quic_create_stream(NULL, &frame->u.stream);
         default:
             /* BUG: unsupported frame type generated */
             return 0;
@@ -687,6 +762,17 @@ ngx_quic_frames_send(ngx_connection_t *c
             p += ngx_quic_create_crypto(p, &f->u.crypto);
             break;
 
+        case NGX_QUIC_FT_STREAM0:
+        case NGX_QUIC_FT_STREAM1:
+        case NGX_QUIC_FT_STREAM2:
+        case NGX_QUIC_FT_STREAM3:
+        case NGX_QUIC_FT_STREAM4:
+        case NGX_QUIC_FT_STREAM5:
+        case NGX_QUIC_FT_STREAM6:
+        case NGX_QUIC_FT_STREAM7:
+            p += ngx_quic_create_stream(p, &f->u.stream);
+            break;
+
         default:
             /* BUG: unsupported frame type generated */
             return NGX_ERROR;
@@ -1653,7 +1739,7 @@ ngx_quic_read_frame(ngx_connection_t *c,
         return NGX_ERROR;
         break;
 
-    case NGX_QUIC_FT_STREAM:
+    case NGX_QUIC_FT_STREAM0:
     case NGX_QUIC_FT_STREAM1:
     case NGX_QUIC_FT_STREAM2:
     case NGX_QUIC_FT_STREAM3:
@@ -1665,6 +1751,8 @@ ngx_quic_read_frame(ngx_connection_t *c,
         ngx_log_debug1(NGX_LOG_DEBUG_EVENT, c->log, 0,
                        "STREAM frame, type: 0x%xi", frame->type);
 
+        frame->u.stream.type = frame->type;
+
         frame->u.stream.stream_id = ngx_quic_parse_int(&p);
         if (frame->type & 0x04) {
             frame->u.stream.offset = ngx_quic_parse_int(&p);
@@ -1797,6 +1885,109 @@ ngx_quic_init_connection(ngx_connection_
 }
 
 
+static ssize_t
+ngx_quic_stream_recv(ngx_connection_t *c, u_char *buf, size_t size)
+{
+    ssize_t                  len;
+    ngx_buf_t               *b;
+    ngx_quic_stream_t       *qs;
+    ngx_quic_connection_t   *qc;
+    ngx_quic_stream_node_t  *sn;
+
+    qs = c->qs;
+    qc = qs->parent->quic;
+
+    // XXX: get direct pointer from stream structure?
+    sn = ngx_quic_stream_lookup(&qc->stree, qs->id);
+
+    if (sn == NULL) {
+        return NGX_ERROR;
+    }
+
+    // XXX: how to return EOF?
+
+    b = sn->b;
+
+    if (b->last - b->pos == 0) {
+        ngx_log_debug0(NGX_LOG_DEBUG_EVENT, c->log, 0,
+                       "quic recv() not ready");
+        return NGX_AGAIN; // ?
+    }
+
+    len = ngx_min(b->last - b->pos, (ssize_t) size);
+
+    ngx_memcpy(buf, b->pos, len);
+
+    b->pos += len;
+
+    ngx_log_debug2(NGX_LOG_DEBUG_EVENT, c->log, 0,
+                  "quic recv: %z of %uz", len, size);
+
+    return len;
+}
+
+
+static ssize_t
+ngx_quic_stream_send(ngx_connection_t *c, u_char *buf, size_t size)
+{
+    u_char                  *p;
+    ngx_quic_frame_t        *frame;
+    ngx_quic_stream_t       *qs;
+    ngx_quic_connection_t   *qc;
+    ngx_quic_stream_node_t  *sn;
+
+    ngx_log_debug1(NGX_LOG_DEBUG_EVENT, c->log, 0, "quic send: %uz", size);
+
+    qs = c->qs;
+    qc = qs->parent->quic;
+
+
+    // XXX: get direct pointer from stream structure?
+    sn = ngx_quic_stream_lookup(&qc->stree, qs->id);
+
+    if (sn == NULL) {
+        return NGX_ERROR;
+    }
+
+    frame = ngx_pcalloc(c->pool, sizeof(ngx_quic_frame_t));
+    if (frame == NULL) {
+        return 0;
+    }
+
+    p = ngx_pnalloc(c->pool, size);
+    if (p == NULL) {
+        return 0;
+    }
+
+    ngx_memcpy(p, buf, size);
+
+    frame->level = ssl_encryption_application;
+    frame->type = NGX_QUIC_FT_STREAM2; /* OFF=0 LEN=1 FIN=0 */
+
+    frame->u.stream.type = frame->type;
+    frame->u.stream.stream_id = qs->id;
+    frame->u.stream.offset = 0;
+    frame->u.stream.length = size;
+    frame->u.stream.data = p;
+
+    ngx_sprintf(frame->info, "stream %xi len=%ui level=%d",
+                qs->id, size, frame->level);
+
+    ngx_quic_queue_frame(qc, frame);
+
+    return size;
+}
+
+
+static ngx_chain_t *
+ngx_quic_stream_send_chain(ngx_connection_t *c, ngx_chain_t *in,
+    off_t limit)
+{
+    // TODO
+    return NULL;
+}
+
+
 /* process all payload from the current packet and generate ack if required */
 static ngx_int_t
 ngx_quic_payload_handler(ngx_connection_t *c, ngx_quic_header_t *pkt)
@@ -1805,6 +1996,8 @@ ngx_quic_payload_handler(ngx_connection_
     ssize_t                  len;
     ngx_buf_t               *b;
     ngx_uint_t               ack_this;
+    ngx_pool_t              *pool;
+    ngx_event_t             *rev, *wev;
     ngx_quic_frame_t         frame, *ack_frame;
     ngx_quic_connection_t   *qc;
     ngx_quic_stream_node_t  *sn;
@@ -1873,7 +2066,7 @@ ngx_quic_payload_handler(ngx_connection_
                            frame.u.ncid.len);
             continue;
 
-        case NGX_QUIC_FT_STREAM:
+        case NGX_QUIC_FT_STREAM0:
         case NGX_QUIC_FT_STREAM1:
         case NGX_QUIC_FT_STREAM2:
         case NGX_QUIC_FT_STREAM3:
@@ -1882,12 +2075,15 @@ ngx_quic_payload_handler(ngx_connection_
         case NGX_QUIC_FT_STREAM6:
         case NGX_QUIC_FT_STREAM7:
 
-            ngx_log_debug4(NGX_LOG_DEBUG_EVENT, c->log, 0,
-                           "STREAM frame 0x%xi id 0x%xi off 0x%xi len 0x%xi",
+            ngx_log_debug7(NGX_LOG_DEBUG_EVENT, c->log, 0,
+                           "STREAM frame 0x%xi id 0x%xi offset 0x%xi len 0x%xi bits:off=%d len=%d fin=%d",
                            frame.type,
                            frame.u.stream.stream_id,
                            frame.u.stream.offset,
-                           frame.u.stream.length);
+                           frame.u.stream.length,
+                           ngx_quic_stream_bit_off(frame.u.stream.type),
+                           ngx_quic_stream_bit_len(frame.u.stream.type),
+                           ngx_quic_stream_bit_fin(frame.u.stream.type));
 
 
             sn = ngx_quic_stream_lookup(&qc->stree, frame.u.stream.stream_id);
@@ -1899,13 +2095,28 @@ ngx_quic_payload_handler(ngx_connection_
                     return NGX_ERROR;
                 }
 
+                pool = ngx_create_pool(NGX_DEFAULT_POOL_SIZE, c->log);
+                if (pool == NULL) {
+                    return NGX_ERROR;
+                }
+
                 sn->c = ngx_get_connection(-1, c->log); // TODO: free on connection termination
                 if (sn->c == NULL) {
                     return NGX_ERROR;
                 }
 
+                sn->c->pool = pool;
+
+                rev = sn->c->read;
+                wev = sn->c->write;
+
+                rev->log = c->log;
+                wev->log = c->log;
+
+                sn->c->number = ngx_atomic_fetch_add(ngx_connection_counter, 1);
+
                 sn->node.key = frame.u.stream.stream_id;
-                sn->b = ngx_create_temp_buf(c->pool, 16 * 1024); // XXX enough for everyone
+                sn->b = ngx_create_temp_buf(pool, 16 * 1024); // XXX enough for everyone
                 if (sn->b == NULL) {
                     return NGX_ERROR;
                 }
@@ -1917,9 +2128,14 @@ ngx_quic_payload_handler(ngx_connection_
                 ngx_rbtree_insert(&qc->stree, &sn->node);
 
                 sn->s.id = frame.u.stream.stream_id;
+                sn->s.unidirectional = (sn->s.id & 0x02) ? 1 : 0;
                 sn->s.parent = c;
                 sn->c->qs = &sn->s;
 
+                sn->c->recv = ngx_quic_stream_recv;
+                sn->c->send = ngx_quic_stream_send;
+                sn->c->send_chain = ngx_quic_stream_send_chain;
+
                 qc->stream_handler(sn->c);
 
             } else {
@@ -1973,7 +2189,7 @@ ngx_quic_payload_handler(ngx_connection_
     ack_frame->type = NGX_QUIC_FT_ACK;
     ack_frame->u.ack.pn = pkt->pn;
 
-    ngx_sprintf(ack_frame->info, "ACK for PN=%d from frame handler", pkt->pn);
+    ngx_sprintf(ack_frame->info, "ACK for PN=%d from frame handler level=%d", pkt->pn, pkt->level);
     ngx_quic_queue_frame(qc, ack_frame);
 
     return ngx_quic_output(c);
--- a/src/http/ngx_http_request.c
+++ b/src/http/ngx_http_request.c
@@ -395,8 +395,39 @@ ngx_http_quic_stream_handler(ngx_connect
 {
     ngx_quic_stream_t *qs = c->qs;
 
+    // STUB for stream read/write
+
     ngx_log_debug1(NGX_LOG_DEBUG_HTTP, c->log, 0,
                    "quic stream: 0x%uXL", qs->id);
+    ssize_t    n;
+    ngx_buf_t  b;
+
+    u_char     buf[512];
+
+    b.start = buf;
+    b.end = buf + 512;
+    b.pos = b.last = b.start;
+
+    n = c->recv(c, b.pos, b.end - b.start);
+    if (n < 0) {
+        ngx_log_error(NGX_LOG_INFO, c->log, 0, "stream read failed");
+        return;
+    }
+
+    ngx_log_debug2(NGX_LOG_DEBUG_HTTP, c->log, 0,
+                   "quic stream: 0x%uXL %ui bytes read", qs->id, n);
+
+    b.last += n;
+
+    n = c->send(c, b.start, n);
+
+    if (n < 0) {
+        ngx_log_error(NGX_LOG_INFO, c->log, 0, "stream write failed");
+        return;
+    }
+
+    ngx_log_debug2(NGX_LOG_DEBUG_HTTP, c->log, 0,
+                   "quic stream: 0x%uXL %ui bytes written", qs->id, n);
 }