diff src/event/ngx_event_pipe.c @ 154:eac26585476e

nginx-0.0.1-2003-10-22-11:05:29 import
author Igor Sysoev <igor@sysoev.ru>
date Wed, 22 Oct 2003 07:05:29 +0000
parents c71aeb75c071
children 46eb23d9471d
line wrap: on
line diff
--- a/src/event/ngx_event_pipe.c
+++ b/src/event/ngx_event_pipe.c
@@ -27,12 +27,13 @@ int ngx_event_pipe(ngx_event_pipe_t *p, 
         }
 
         p->read = 0;
+        p->upstream_blocked = 0;
 
         if (ngx_event_pipe_read_upstream(p) == NGX_ABORT) {
             return NGX_ABORT;
         }
 
-        if (!p->read) {
+        if (!p->read && !p->upstream_blocked) {
             break;
         }
 
@@ -140,6 +141,8 @@ int ngx_event_pipe_read_upstream(ngx_eve
                  * a downstream is ready then write the hunks to a downstream
                  */
 
+                p->upstream_blocked = 1;
+
                 ngx_log_debug(p->log, "downstream ready");
 
                 break;
@@ -184,6 +187,8 @@ int ngx_event_pipe_read_upstream(ngx_eve
                 break;
             }
 
+            ngx_log_debug(p->log, "HUNK_FREE: %d" _ chain->hunk->num);
+
             n = ngx_recv_chain(p->upstream, chain);
 
             ngx_log_debug(p->log, "recv_chain: %d" _ n);
@@ -218,6 +223,8 @@ int ngx_event_pipe_read_upstream(ngx_eve
             if (n >= size) {
                 ce->hunk->last = ce->hunk->end;
 
+    /* STUB */ ce->hunk->num = p->num++;
+
                 if (p->input_filter(p, ce->hunk) == NGX_ERROR) {
                     return NGX_ABORT;
                 }
@@ -235,6 +242,7 @@ int ngx_event_pipe_read_upstream(ngx_eve
     }
 
     if ((p->upstream_eof || p->upstream_error) && p->free_raw_hunks) {
+    /* STUB */ p->free_raw_hunks->hunk->num = p->num++;
         if (p->input_filter(p, p->free_raw_hunks->hunk) == NGX_ERROR) {
             return NGX_ABORT;
         }
@@ -302,6 +310,8 @@ int ngx_event_pipe_write_to_downstream(n
                 p->out = p->out->next;
                 ngx_remove_shadow_free_raw_hunk(&p->free_raw_hunks, ce->hunk);
 
+ngx_log_debug(p->log, "HUNK OUT: %d %x" _ ce->hunk->num _ ce->hunk->type);
+
             } else if (!p->cachable && p->in) {
                 ce = p->in;
 
@@ -313,6 +323,8 @@ int ngx_event_pipe_write_to_downstream(n
 
                 p->in = p->in->next;
 
+ngx_log_debug(p->log, "HUNK IN: %d" _ ce->hunk->num);
+
             } else {
                 break;
             }
@@ -323,22 +335,32 @@ int ngx_event_pipe_write_to_downstream(n
         }
 
         if (out == NULL) {
-            break;
+            ngx_log_debug(p->log, "no hunks to write BUSY: %d" _ busy_len);
+
+            if (!p->upstream_blocked || busy_len == 0) {
+                break;
+            }
+
+            /* if the upstream is blocked then write the busy hunks */
         }
 
         if (p->output_filter(p->output_ctx, out) == NGX_ERROR) {
             p->downstream_error = 1;
+
+            /* handle the downstream error at the begin of the cycle.  */
+
             continue;
         }
 
-        ngx_chain_update_chains(&p->free, &p->busy, &out);
-
-        /* add the free shadow raw hunks to p->free_raw_hunks */
+        ngx_chain_update_chains(&p->free, &p->busy, &out, p->tag);
 
         for (ce = p->free; ce; ce = ce->next) {
+
+            /* add the free shadow raw hunk to p->free_raw_hunks */
+
             if (ce->hunk->type & NGX_HUNK_LAST_SHADOW) {
                 h = ce->hunk->shadow;
-                /* THINK NEEDED ??? */ h->pos = h->last = h->start;
+                h->pos = h->last = h->start;
                 h->shadow = NULL;
                 ngx_alloc_ce_and_set_hunk(te, h, p->pool, NGX_ABORT);
                 ngx_add_after_partially_filled_hunk(&p->free_raw_hunks, te);
@@ -346,6 +368,15 @@ int ngx_event_pipe_write_to_downstream(n
                 ce->hunk->type &= ~NGX_HUNK_LAST_SHADOW;
             }
             ce->hunk->shadow = NULL;
+
+            if (p->cyclic_temp_file && (ce->hunk->type & NGX_HUNK_TEMP_FILE)) {
+
+                /* reset p->temp_offset if all hunks had been sent */
+
+                if (ce->hunk->file_last == p->temp_offset) {
+                    p->temp_offset = 0;
+                }
+            }
         }
     }
 
@@ -355,7 +386,7 @@ int ngx_event_pipe_write_to_downstream(n
 
 static int ngx_event_pipe_write_chain_to_temp_file(ngx_event_pipe_t *p)
 {
-    int           rc, size, hunk_size;
+    int           rc, size, hsize;
     ngx_hunk_t   *h;
     ngx_chain_t  *ce, *te, *next, *out, **le, **last_free;
 
@@ -389,17 +420,17 @@ static int ngx_event_pipe_write_chain_to
 ngx_log_debug(p->log, "offset: %d" _ p->temp_offset);
 
         do {
-            hunk_size = ce->hunk->last - ce->hunk->pos;
+            hsize = ce->hunk->last - ce->hunk->pos;
 
-ngx_log_debug(p->log, "hunk size: %d" _ hunk_size);
+ngx_log_debug(p->log, "hunk size: %d" _ hsize);
 
-            if ((size + hunk_size > p->temp_file_write_size)
-                || (p->temp_offset + hunk_size > p->max_temp_file_size))
+            if ((size + hsize > p->temp_file_write_size)
+                || (p->temp_offset + size + hsize > p->max_temp_file_size))
             {
                 break;
             }
 
-            size += hunk_size;
+            size += hsize;
             le = &ce->next;
             ce = ce->next;
 
@@ -438,12 +469,17 @@ ngx_log_debug(p->log, "size: %d" _ size)
         ce->next = NULL;
 
         h = ce->hunk;
-        h->type |= NGX_HUNK_FILE;
         h->file = p->temp_file;
         h->file_pos = p->temp_offset;
         p->temp_offset += h->last - h->pos;
         h->file_last = p->temp_offset;
 
+        if (p->cachable) {
+            h->type |= NGX_HUNK_FILE;
+        } else {
+            h->type |= NGX_HUNK_FILE|NGX_HUNK_TEMP_FILE;
+        }
+
         ngx_chain_add_ce(p->out, p->last_out, ce);
 
         if (h->type & NGX_HUNK_LAST_SHADOW) {
@@ -479,10 +515,12 @@ int ngx_event_pipe_copy_input_filter(ngx
 
     ngx_memcpy(h, hunk, sizeof(ngx_hunk_t));
     h->shadow = hunk;
+    h->tag = p->tag;
     h->type |= NGX_HUNK_LAST_SHADOW|NGX_HUNK_RECYCLED;
     hunk->shadow = h;
 
     ngx_alloc_ce_and_set_hunk(ce, h, p->pool, NGX_ERROR);
+ngx_log_debug(p->log, "HUNK %d" _ h->num);
     ngx_chain_add_ce(p->in, p->last_in, ce);
 
     return NGX_OK;