diff src/http/ngx_http_spdy_filter_module.c @ 5549:39d7eef2e332

SPDY: protocol implementation switched to spdy/3.1.
author Valentin Bartenev <vbart@nginx.com>
date Fri, 31 Jan 2014 19:17:26 +0400
parents 827e53c136b0
children 2bc609a4b516
line wrap: on
line diff
--- a/src/http/ngx_http_spdy_filter_module.c
+++ b/src/http/ngx_http_spdy_filter_module.c
@@ -17,9 +17,9 @@
 #define ngx_http_spdy_nv_nsize(h)  (NGX_SPDY_NV_NLEN_SIZE + sizeof(h) - 1)
 #define ngx_http_spdy_nv_vsize(h)  (NGX_SPDY_NV_VLEN_SIZE + sizeof(h) - 1)
 
-#define ngx_http_spdy_nv_write_num   ngx_spdy_frame_write_uint16
-#define ngx_http_spdy_nv_write_nlen  ngx_spdy_frame_write_uint16
-#define ngx_http_spdy_nv_write_vlen  ngx_spdy_frame_write_uint16
+#define ngx_http_spdy_nv_write_num   ngx_spdy_frame_write_uint32
+#define ngx_http_spdy_nv_write_nlen  ngx_spdy_frame_write_uint32
+#define ngx_http_spdy_nv_write_vlen  ngx_spdy_frame_write_uint32
 
 #define ngx_http_spdy_nv_write_name(p, h)                                     \
     ngx_cpymem(ngx_http_spdy_nv_write_nlen(p, sizeof(h) - 1), h, sizeof(h) - 1)
@@ -33,6 +33,10 @@ static ngx_chain_t *ngx_http_spdy_send_c
 
 static ngx_inline ngx_int_t ngx_http_spdy_filter_send(
     ngx_connection_t *fc, ngx_http_spdy_stream_t *stream);
+static ngx_inline ngx_int_t ngx_http_spdy_flow_control(
+    ngx_http_spdy_connection_t *sc, ngx_http_spdy_stream_t *stream);
+static void ngx_http_spdy_waiting_queue(ngx_http_spdy_connection_t *sc,
+    ngx_http_spdy_stream_t *stream);
 
 static ngx_chain_t *ngx_http_spdy_filter_get_shadow(
     ngx_http_spdy_stream_t *stream, ngx_buf_t *buf, off_t offset, off_t size);
@@ -162,9 +166,9 @@ ngx_http_spdy_header_filter(ngx_http_req
     }
 
     len = NGX_SPDY_NV_NUM_SIZE
-          + ngx_http_spdy_nv_nsize("version")
+          + ngx_http_spdy_nv_nsize(":version")
           + ngx_http_spdy_nv_vsize("HTTP/1.1")
-          + ngx_http_spdy_nv_nsize("status")
+          + ngx_http_spdy_nv_nsize(":status")
           + (r->headers_out.status_line.len
              ? NGX_SPDY_NV_VLEN_SIZE + r->headers_out.status_line.len
              : ngx_http_spdy_nv_vsize("418"));
@@ -305,10 +309,10 @@ ngx_http_spdy_header_filter(ngx_http_req
 
     last = buf + NGX_SPDY_NV_NUM_SIZE;
 
-    last = ngx_http_spdy_nv_write_name(last, "version");
+    last = ngx_http_spdy_nv_write_name(last, ":version");
     last = ngx_http_spdy_nv_write_val(last, "HTTP/1.1");
 
-    last = ngx_http_spdy_nv_write_name(last, "status");
+    last = ngx_http_spdy_nv_write_name(last, ":status");
 
     if (r->headers_out.status_line.len) {
         last = ngx_http_spdy_nv_write_vlen(last,
@@ -457,17 +461,6 @@ ngx_http_spdy_header_filter(ngx_http_req
             continue;
         }
 
-        if ((header[i].key.len == 6
-             && ngx_strncasecmp(header[i].key.data,
-                                (u_char *) "status", 6) == 0)
-            || (header[i].key.len == 7
-                && ngx_strncasecmp(header[i].key.data,
-                                   (u_char *) "version", 7) == 0))
-        {
-            header[i].hash = 0;
-            continue;
-        }
-
         last = ngx_http_spdy_nv_write_nlen(last, header[i].key.len);
 
         ngx_strlow(last, header[i].key.data, header[i].key.len);
@@ -620,13 +613,14 @@ ngx_http_spdy_header_filter(ngx_http_req
 static ngx_chain_t *
 ngx_http_spdy_send_chain(ngx_connection_t *fc, ngx_chain_t *in, off_t limit)
 {
-    off_t                       size, offset;
-    size_t                      rest, frame_size;
-    ngx_chain_t                *cl, *out, **ln;
-    ngx_http_request_t         *r;
-    ngx_http_spdy_stream_t     *stream;
-    ngx_http_spdy_loc_conf_t   *slcf;
-    ngx_http_spdy_out_frame_t  *frame;
+    off_t                        size, offset;
+    size_t                       rest, frame_size;
+    ngx_chain_t                 *cl, *out, **ln;
+    ngx_http_request_t          *r;
+    ngx_http_spdy_stream_t      *stream;
+    ngx_http_spdy_loc_conf_t    *slcf;
+    ngx_http_spdy_out_frame_t   *frame;
+    ngx_http_spdy_connection_t  *sc;
 
     r = fc->data;
     stream = r->spdy_stream;
@@ -642,8 +636,23 @@ ngx_http_spdy_send_chain(ngx_connection_
         return NULL;
     }
 
+    sc = stream->connection;
+
     size = ngx_buf_size(in->buf);
 
+    if (size && ngx_http_spdy_flow_control(sc, stream) == NGX_DECLINED) {
+        fc->write->delayed = 1;
+        return in;
+    }
+
+    if (limit == 0 || limit > (off_t) sc->send_window) {
+        limit = sc->send_window;
+    }
+
+    if (limit > stream->send_window) {
+        limit = (stream->send_window > 0) ? stream->send_window : 0;
+    }
+
     if (in->buf->tag == (ngx_buf_tag_t) &ngx_http_spdy_filter_get_shadow) {
         cl = ngx_alloc_chain_link(r->pool);
         if (cl == NULL) {
@@ -670,9 +679,8 @@ ngx_http_spdy_send_chain(ngx_connection_
 
     slcf = ngx_http_get_module_loc_conf(r, ngx_http_spdy_module);
 
-    frame_size = (limit && limit <= (off_t) slcf->chunk_size)
-                 ? (size_t) limit
-                 : slcf->chunk_size;
+    frame_size = (limit <= (off_t) slcf->chunk_size) ? (size_t) limit
+                                                     : slcf->chunk_size;
 
     for ( ;; ) {
         ln = &out;
@@ -735,24 +743,25 @@ ngx_http_spdy_send_chain(ngx_connection_
             return NGX_CHAIN_ERROR;
         }
 
-        ngx_http_spdy_queue_frame(stream->connection, frame);
+        ngx_http_spdy_queue_frame(sc, frame);
 
+        sc->send_window -= frame_size;
+
+        stream->send_window -= frame_size;
         stream->queued++;
 
         if (in == NULL) {
             break;
         }
 
-        if (limit) {
-            limit -= frame_size;
+        limit -= frame_size;
 
-            if (limit == 0) {
-                break;
-            }
+        if (limit == 0) {
+            break;
+        }
 
-            if (limit < (off_t) slcf->chunk_size) {
-                frame_size = (size_t) limit;
-            }
+        if (limit < (off_t) slcf->chunk_size) {
+            frame_size = (size_t) limit;
         }
     }
 
@@ -770,6 +779,10 @@ ngx_http_spdy_send_chain(ngx_connection_
         return NGX_CHAIN_ERROR;
     }
 
+    if (in && ngx_http_spdy_flow_control(sc, stream) == NGX_DECLINED) {
+        fc->write->delayed = 1;
+    }
+
     return in;
 }
 
@@ -876,6 +889,8 @@ ngx_http_spdy_filter_get_data_frame(ngx_
 
         cl->next = first;
         first = cl;
+
+        last->buf->flush = 1;
     }
 
     frame->first = first;
@@ -915,6 +930,52 @@ ngx_http_spdy_filter_send(ngx_connection
 }
 
 
+static ngx_inline ngx_int_t
+ngx_http_spdy_flow_control(ngx_http_spdy_connection_t *sc,
+    ngx_http_spdy_stream_t *stream)
+{
+    if (stream->send_window <= 0) {
+        stream->exhausted = 1;
+        return NGX_DECLINED;
+    }
+
+    if (sc->send_window == 0) {
+        ngx_http_spdy_waiting_queue(sc, stream);
+        return NGX_DECLINED;
+    }
+
+    return NGX_OK;
+}
+
+
+static void
+ngx_http_spdy_waiting_queue(ngx_http_spdy_connection_t *sc,
+    ngx_http_spdy_stream_t *stream)
+{
+    ngx_queue_t             *q;
+    ngx_http_spdy_stream_t  *s;
+
+    if (stream->handled) {
+        return;
+    }
+
+    stream->handled = 1;
+
+    for (q = ngx_queue_last(&sc->waiting);
+         q != ngx_queue_sentinel(&sc->waiting);
+         q = ngx_queue_prev(q))
+    {
+        s = ngx_queue_data(q, ngx_http_spdy_stream_t, queue);
+
+        if (s->priority >= stream->priority) {
+            break;
+        }
+    }
+
+    ngx_queue_insert_after(q, &stream->queue);
+}
+
+
 static ngx_int_t
 ngx_http_spdy_syn_frame_handler(ngx_http_spdy_connection_t *sc,
     ngx_http_spdy_out_frame_t *frame)
@@ -1063,7 +1124,7 @@ ngx_http_spdy_handle_stream(ngx_http_spd
 {
     ngx_event_t  *wev;
 
-    if (stream->handled || stream->blocked) {
+    if (stream->handled || stream->blocked || stream->exhausted) {
         return;
     }
 
@@ -1083,13 +1144,22 @@ ngx_http_spdy_filter_cleanup(void *data)
 {
     ngx_http_spdy_stream_t *stream = data;
 
-    ngx_http_spdy_out_frame_t  *frame, **fn;
+    size_t                       delta;
+    ngx_http_spdy_out_frame_t   *frame, **fn;
+    ngx_http_spdy_connection_t  *sc;
+
+    if (stream->handled) {
+        stream->handled = 0;
+        ngx_queue_remove(&stream->queue);
+    }
 
     if (stream->queued == 0) {
         return;
     }
 
-    fn = &stream->connection->last_out;
+    delta = 0;
+    sc = stream->connection;
+    fn = &sc->last_out;
 
     for ( ;; ) {
         frame = *fn;
@@ -1099,14 +1169,26 @@ ngx_http_spdy_filter_cleanup(void *data)
         }
 
         if (frame->stream == stream && !frame->blocked) {
-            stream->queued--;
+            *fn = frame->next;
+
+            delta += frame->length;
 
-            *fn = frame->next;
+            if (--stream->queued == 0) {
+                break;
+            }
+
             continue;
         }
 
         fn = &frame->next;
     }
+
+    if (sc->send_window == 0 && delta && !ngx_queue_empty(&sc->waiting)) {
+        ngx_queue_add(&sc->posted, &sc->waiting);
+        ngx_queue_init(&sc->waiting);
+    }
+
+    sc->send_window += delta;
 }