changeset 150:ad5f382c9e7d

nginx-0.0.1-2003-10-19-23:57:23 import
author Igor Sysoev <igor@sysoev.ru>
date Sun, 19 Oct 2003 19:57:23 +0000
parents 86404ba5c517
children 2d9e4a8b6d11
files src/event/ngx_event_proxy.c src/event/ngx_event_proxy.h src/http/modules/proxy/ngx_http_proxy_handler.c src/os/unix/ngx_readv_chain.c
diffstat 4 files changed, 95 insertions(+), 52 deletions(-) [+]
line wrap: on
line diff
--- a/src/event/ngx_event_proxy.c
+++ b/src/event/ngx_event_proxy.c
@@ -12,14 +12,48 @@ ngx_inline static void ngx_add_after_par
                                                            ngx_chain_t *ce);
 
 
+
+int ngx_event_proxy(ngx_event_proxy_t *p, int do_write)
+{
+    for ( ;; ) {
+        if (do_write) {
+            if (ngx_event_proxy_write_to_downstream(p) == NGX_ABORT) {
+                return NGX_ABORT;
+            }
+        }
+
+        p->read = 0;
+
+        if (ngx_event_proxy_read_upstream(p) == NGX_ABORT) {
+            return NGX_ABORT;
+        }
+
+        if (!p->read) {
+            break;
+        }
+
+        do_write = 1;
+    }
+
+    if (ngx_handle_read_event(p->upstream->read) == NGX_ERROR) {
+        return NGX_ABORT;
+    }
+
+    if (ngx_handle_write_event(p->downstream->write,
+                                           /* TODO: lowat */ 0) == NGX_ERROR) {
+        return NGX_ABORT;
+    }
+
+    return NGX_OK;
+}
+
+
 int ngx_event_proxy_read_upstream(ngx_event_proxy_t *p)
 {
     int           n, rc, size;
     ngx_hunk_t   *h;
     ngx_chain_t  *chain, *ce, *te;
 
-    p->upstream_level++;
-
     ngx_log_debug(p->log, "read upstream: %d" _ p->upstream->read->ready);
 
     while (p->preread_hunks
@@ -29,6 +63,7 @@ int ngx_event_proxy_read_upstream(ngx_ev
 
             /* use the pre-read hunks if they exist */
 
+            p->read = 1;
             chain = p->preread_hunks;
             p->preread_hunks = NULL;
             n = p->preread_size;
@@ -57,6 +92,7 @@ int ngx_event_proxy_read_upstream(ngx_ev
                 } else if (p->upstream->read->eof
                            && p->upstream->read->available == 0) {
                     p->upstream_eof = 1;
+                    p->read = 1;
 
                     break;
                 }
@@ -146,6 +182,9 @@ int ngx_event_proxy_read_upstream(ngx_ev
                 break;
             }
 
+            /* TODO THINK about eof */
+            p->read = 1;
+
             if (n == 0) {
                 p->free_raw_hunks = chain;
                 p->upstream_eof = 1;
@@ -190,33 +229,8 @@ int ngx_event_proxy_read_upstream(ngx_ev
         p->free_raw_hunks = p->free_raw_hunks->next;
     }
 
-    if (p->cachable) {
-        if (p->in) {
-            rc = ngx_event_proxy_write_chain_to_temp_file(p);
-
-            if (rc != NGX_OK) {
-                return rc;
-            }
-        }
-
-        if (p->out && p->downstream->write->ready) {
-            if (ngx_event_proxy_write_to_downstream(p) == NGX_ABORT) {
-                return NGX_ABORT;
-            }
-        }
-
-    } else if ((p->out || p->in) && p->downstream->write->ready) {
-        if (ngx_event_proxy_write_to_downstream(p) == NGX_ABORT) {
-            return NGX_ABORT;
-        }
-    }
-
-    p->upstream_level--;
-
-ngx_log_debug(p->log, "upstream level: %d" _ p->upstream_level);
-
-    if (p->upstream_level == 0) {
-        if (ngx_handle_read_event(p->upstream->read) == NGX_ERROR) {
+    if (p->cachable && p->in) {
+        if (ngx_event_proxy_write_chain_to_temp_file(p) == NGX_ABORT) {
             return NGX_ABORT;
         }
     }
@@ -231,7 +245,20 @@ int ngx_event_proxy_write_to_downstream(
     ngx_hunk_t   *h;
     ngx_chain_t  *out, *ce, *te;
 
-    while (p->downstream->write->ready) {
+    ngx_log_debug(p->log, "write downstream: %d" _ p->downstream->write->ready);
+
+    for ( ;; ) {
+
+        if ((p->upstream_eof || p->upstream_error || p->upstream_done)
+            && p->out == NULL && p->in == NULL)
+        {
+            p->downstream_done = 1;
+            break;
+        }
+
+        if (!p->downstream->write->ready) {
+            break;
+        }
 
         if (p->out) {
             out = p->out;
@@ -262,6 +289,10 @@ int ngx_event_proxy_write_to_downstream(
 
         rc = p->output_filter(p->output_ctx, out->hunk);
 
+        if (rc == NGX_ERROR) {
+            /* TODO */
+        }
+
         ngx_chain_update_chains(&p->free, &p->busy, &out);
 
         /* calculate p->busy_len */
@@ -287,18 +318,12 @@ int ngx_event_proxy_write_to_downstream(
             ce->hunk->shadow = NULL;
         }
 
+#if 0 /* TODO THINK p->read_priority ??? */
         if (p->upstream->read->ready) {
-            if (ngx_event_proxy_read_upstream(p) == NGX_ERROR) {
-                return NGX_ABORT;
-            }
+            return;
         }
-    }
+#endif
 
-    if (p->downstream_level == 0) {
-        if (ngx_handle_write_event(p->downstream->write,
-                                           /* TODO: lowat */ 0) == NGX_ERROR) {
-            return NGX_ABORT;
-        }
     }
 
     if (p->upstream_done && p->in == NULL && p->out == NULL) {
@@ -313,7 +338,7 @@ static int ngx_event_proxy_write_chain_t
 {
     int           rc, size;
     ngx_hunk_t   *h;
-    ngx_chain_t  *ce, *next, *in, **last;
+    ngx_chain_t  *ce, *te, *next, *in, **last, **last_free;
 
     ngx_log_debug(p->log, "write to file");
 
@@ -362,6 +387,13 @@ static int ngx_event_proxy_write_chain_t
         return NGX_ABORT;
     }
 
+    for (last_free = &p->free_raw_hunks;
+         *last_free != NULL;
+         last_free = &(*last)->next)
+    {
+        /* void */
+    }
+
     for (ce = p->in; ce; ce = next) {
         next = ce->next;
         ce->next = NULL;
@@ -373,11 +405,14 @@ static int ngx_event_proxy_write_chain_t
         p->temp_offset += h->last - h->pos;
         h->file_last = p->temp_offset;
 
+        ngx_chain_add_ce(p->out, p->last_out, ce);
+
         if (h->type & NGX_HUNK_LAST_SHADOW) {
             h->shadow->last = h->shadow->pos = h->shadow->start;
+            ngx_alloc_ce_and_set_hunk(te, h->shadow, p->pool, NGX_ABORT);
+            *last_free = te;
+            last_free = &te->next;
         }
-
-        ngx_chain_add_ce(p->out, p->last_out, ce);
     }
 
     p->in = in;
--- a/src/event/ngx_event_proxy.h
+++ b/src/event/ngx_event_proxy.h
@@ -36,6 +36,7 @@ struct ngx_event_proxy_s {
     ngx_event_proxy_output_filter_pt   output_filter;
     void                              *output_ctx;
 
+    unsigned           read:1;
     unsigned           cachable:1;
     unsigned           upstream_done:1;
     unsigned           upstream_eof:1;
@@ -43,9 +44,6 @@ struct ngx_event_proxy_s {
     unsigned           downstream_done:1;
     unsigned           downstream_error:1;
 
-    int                upstream_level;
-    int                downstream_level;
-
     int                hunks;
     ngx_bufs_t         bufs;
 
@@ -71,9 +69,12 @@ struct ngx_event_proxy_s {
 };
 
 
+int ngx_event_proxy(ngx_event_proxy_t *p, int do_write);
+int ngx_event_proxy_copy_input_filter(ngx_event_proxy_t *p, ngx_hunk_t *hunk);
+
+/* STUB */
 int ngx_event_proxy_read_upstream(ngx_event_proxy_t *p);
 int ngx_event_proxy_write_to_downstream(ngx_event_proxy_t *p);
-int ngx_event_proxy_copy_input_filter(ngx_event_proxy_t *p, ngx_hunk_t *hunk);
 
 
 #endif /* _NGX_EVENT_PROXY_H_INCLUDED_ */
--- a/src/http/modules/proxy/ngx_http_proxy_handler.c
+++ b/src/http/modules/proxy/ngx_http_proxy_handler.c
@@ -775,7 +775,7 @@ static void ngx_http_proxy_process_upstr
         return;
     }
 
-    if (ngx_event_proxy_read_upstream(p->event_proxy) == NGX_ABORT) {
+    if (ngx_event_proxy(p->event_proxy, 0) == NGX_ABORT) {
         ngx_http_proxy_finalize_request(p, 0);
         return;
     }
@@ -784,7 +784,8 @@ static void ngx_http_proxy_process_upstr
         || p->event_proxy->upstream_error
         || p->event_proxy->upstream_done)
     {
-        ngx_http_proxy_finalize_request(p, ngx_http_send_last(p->request));
+        ngx_http_proxy_close_connection(c);
+        p->upstream.connection = NULL;
         return;
     }
 
@@ -795,11 +796,13 @@ static void ngx_http_proxy_process_upstr
 static void ngx_http_proxy_process_downstream(ngx_event_t *wev)
 {
     ngx_connection_t      *c;
+    ngx_http_request_t    *r;
     ngx_http_proxy_ctx_t  *p;
 
     c = wev->data;
-    p = c->data;
-
+    r = c->data;
+    p = ngx_http_get_module_ctx(r, ngx_http_proxy_module);
+ 
     ngx_log_debug(wev->log, "http proxy process downstream");
 
     if (wev->timedout) {
@@ -807,13 +810,15 @@ static void ngx_http_proxy_process_downs
         return;
     }
 
-    if (ngx_event_proxy_write_to_downstream(p->event_proxy) == NGX_ABORT) {
+    if (ngx_event_proxy(p->event_proxy, 1) == NGX_ABORT) {
         ngx_http_proxy_finalize_request(p, 0);
         return;
     }
 
     if (p->event_proxy->downstream_done) {
-        ngx_http_proxy_finalize_request(p, 0);
+ngx_log_debug(wev->log, "http proxy downstream done");
+        ngx_http_proxy_finalize_request(p, r->main ? 0:
+                                           ngx_http_send_last(p->request));
         return;
     }
 
--- a/src/os/unix/ngx_readv_chain.c
+++ b/src/os/unix/ngx_readv_chain.c
@@ -17,6 +17,8 @@ ssize_t ngx_readv_chain(ngx_connection_t
 
     ngx_init_array(io, c->pool, 10, sizeof(struct iovec), NGX_ERROR);
 
+    /* TODO: coalesce the neighbouring chain entries */
+
     while (entry) {
         ngx_test_null(iov, ngx_push_array(&io), NGX_ERROR);
         iov->iov_base = entry->hunk->pos;