comparison src/event/ngx_event_pipe.c @ 153:c71aeb75c071

nginx-0.0.1-2003-10-21-20:49:56 import
author Igor Sysoev <igor@sysoev.ru>
date Tue, 21 Oct 2003 16:49:56 +0000
parents src/event/ngx_event_proxy.c@fb48bf4fea1c
children eac26585476e
comparison
equal deleted inserted replaced
152:fb48bf4fea1c 153:c71aeb75c071
1
2 #include <ngx_config.h>
3 #include <ngx_core.h>
4 #include <ngx_event.h>
5 #include <ngx_event_pipe.h>
6
7
8 static int ngx_event_pipe_read_upstream(ngx_event_pipe_t *p);
9 static int ngx_event_pipe_write_to_downstream(ngx_event_pipe_t *p);
10
11 static int ngx_event_pipe_write_chain_to_temp_file(ngx_event_pipe_t *p);
12 ngx_inline static void ngx_remove_shadow_links(ngx_hunk_t *hunk);
13 ngx_inline static void ngx_remove_shadow_free_raw_hunk(ngx_chain_t **free,
14 ngx_hunk_t *h);
15 ngx_inline static void ngx_add_after_partially_filled_hunk(ngx_chain_t **chain,
16 ngx_chain_t *ce);
17 static int ngx_drain_chains(ngx_event_pipe_t *p);
18
19
20 int ngx_event_pipe(ngx_event_pipe_t *p, int do_write)
21 {
22 for ( ;; ) {
23 if (do_write) {
24 if (ngx_event_pipe_write_to_downstream(p) == NGX_ABORT) {
25 return NGX_ABORT;
26 }
27 }
28
29 p->read = 0;
30
31 if (ngx_event_pipe_read_upstream(p) == NGX_ABORT) {
32 return NGX_ABORT;
33 }
34
35 if (!p->read) {
36 break;
37 }
38
39 do_write = 1;
40 }
41
42 if (ngx_handle_read_event(p->upstream->read) == NGX_ERROR) {
43 return NGX_ABORT;
44 }
45
46 if (ngx_handle_write_event(p->downstream->write,
47 /* TODO: lowat */ 0) == NGX_ERROR) {
48 return NGX_ABORT;
49 }
50
51 return NGX_OK;
52 }
53
54
55 int ngx_event_pipe_read_upstream(ngx_event_pipe_t *p)
56 {
57 int n, rc, size;
58 ngx_hunk_t *h;
59 ngx_chain_t *chain, *ce, *te;
60
61 if (p->upstream_eof || p->upstream_error || p->upstream_done) {
62 return NGX_OK;
63 }
64
65 ngx_log_debug(p->log, "read upstream: %d" _ p->upstream->read->ready);
66
67 for ( ;; ) {
68
69 if (p->upstream_eof || p->upstream_error || p->upstream_done) {
70 break;
71 }
72
73 if (p->preread_hunks == NULL && !p->upstream->read->ready) {
74 break;
75 }
76
77 if (p->preread_hunks) {
78
79 /* use the pre-read hunks if they exist */
80
81 p->read = 1;
82 chain = p->preread_hunks;
83 p->preread_hunks = NULL;
84 n = p->preread_size;
85
86 ngx_log_debug(p->log, "preread: %d" _ n);
87
88 } else {
89
90 #if (HAVE_KQUEUE)
91
92 /*
93 * kqueue notifies about the end of file or a pending error.
94 * This test allows not to allocate a hunk on these conditions
95 * and not to call ngx_recv_chain().
96 */
97
98 if (ngx_event_flags == NGX_HAVE_KQUEUE_EVENT) {
99
100 if (p->upstream->read->error) {
101 ngx_log_error(NGX_LOG_ERR, p->log, p->upstream->read->error,
102 "readv() failed");
103 p->upstream_error = 1;
104
105 break;
106
107 } else if (p->upstream->read->eof
108 && p->upstream->read->available == 0) {
109 p->upstream_eof = 1;
110 p->read = 1;
111
112 break;
113 }
114 }
115 #endif
116
117 if (p->free_raw_hunks) {
118
119 /* use the free hunks if they exist */
120
121 chain = p->free_raw_hunks;
122 p->free_raw_hunks = NULL;
123
124 } else if (p->hunks < p->bufs.num) {
125
126 /* allocate a new hunk if it's still allowed */
127
128 ngx_test_null(h, ngx_create_temp_hunk(p->pool,
129 p->bufs.size, 0, 0),
130 NGX_ABORT);
131 p->hunks++;
132
133 ngx_alloc_ce_and_set_hunk(te, h, p->pool, NGX_ABORT);
134 chain = te;
135
136 } else if (!p->cachable && p->downstream->write->ready) {
137
138 /*
139 * if the hunks are not needed to be saved in a cache and
140 * a downstream is ready then write the hunks to a downstream
141 */
142
143 ngx_log_debug(p->log, "downstream ready");
144
145 break;
146
147 } else if (p->cachable || p->temp_offset < p->max_temp_file_size) {
148
149 /*
150 * if it's allowed then save some hunks from r->in
151 * to a temporary file, and add them to a r->out chain
152 */
153
154 rc = ngx_event_pipe_write_chain_to_temp_file(p);
155
156 ngx_log_debug(p->log, "temp offset: %d" _ p->temp_offset);
157
158 if (rc == NGX_AGAIN) {
159 if (ngx_event_flags & NGX_USE_LEVEL_EVENT
160 && p->upstream->read->active
161 && p->upstream->read->ready)
162 {
163 if (ngx_del_event(p->upstream->read, NGX_READ_EVENT, 0)
164 == NGX_ERROR)
165 {
166 return NGX_ABORT;
167 }
168 }
169 }
170
171 if (rc != NGX_OK) {
172 return rc;
173 }
174
175 chain = p->free_raw_hunks;
176 p->free_raw_hunks = NULL;
177
178 } else {
179
180 /* if there're no hunks to read in then disable a level event */
181
182 ngx_log_debug(p->log, "no hunks to read in");
183
184 break;
185 }
186
187 n = ngx_recv_chain(p->upstream, chain);
188
189 ngx_log_debug(p->log, "recv_chain: %d" _ n);
190
191 p->free_raw_hunks = chain;
192
193 if (n == NGX_ERROR) {
194 p->upstream_error = 1;
195 return NGX_ERROR;
196 }
197
198 if (n == NGX_AGAIN) {
199 break;
200 }
201
202 p->read = 1;
203
204 if (n == 0) {
205 p->upstream_eof = 1;
206 break;
207 }
208 }
209
210 ce = chain;
211
212 while (ce && n > 0) {
213
214 ngx_remove_shadow_links(ce->hunk);
215
216 size = ce->hunk->end - ce->hunk->last;
217
218 if (n >= size) {
219 ce->hunk->last = ce->hunk->end;
220
221 if (p->input_filter(p, ce->hunk) == NGX_ERROR) {
222 return NGX_ABORT;
223 }
224
225 n -= size;
226 ce = ce->next;
227
228 } else {
229 ce->hunk->last += n;
230 n = 0;
231 }
232 }
233
234 p->free_raw_hunks = ce;
235 }
236
237 if ((p->upstream_eof || p->upstream_error) && p->free_raw_hunks) {
238 if (p->input_filter(p, p->free_raw_hunks->hunk) == NGX_ERROR) {
239 return NGX_ABORT;
240 }
241
242 /* TODO: p->free_raw_hunk->next can be free()ed */
243 p->free_raw_hunks = p->free_raw_hunks->next;
244 }
245
246 if (p->cachable && p->in) {
247 if (ngx_event_pipe_write_chain_to_temp_file(p) == NGX_ABORT) {
248 return NGX_ABORT;
249 }
250 }
251
252 return NGX_OK;
253 }
254
255
256 int ngx_event_pipe_write_to_downstream(ngx_event_pipe_t *p)
257 {
258 size_t busy_len;
259 ngx_hunk_t *h;
260 ngx_chain_t *out, **le, *ce, *te;
261
262 ngx_log_debug(p->log, "write downstream: %d" _ p->downstream->write->ready);
263
264 for ( ;; ) {
265 if (p->downstream_error) {
266 return ngx_drain_chains(p);
267 }
268
269 if ((p->upstream_eof || p->upstream_error || p->upstream_done)
270 && p->out == NULL && p->in == NULL)
271 {
272 p->downstream_done = 1;
273 break;
274 }
275
276 if (!p->downstream->write->ready) {
277 break;
278 }
279
280 busy_len = 0;
281
282 if (!(p->upstream_eof || p->upstream_error || p->upstream_done)) {
283 /* calculate p->busy_len */
284 for (ce = p->busy; ce; ce = ce->next) {
285 busy_len += ngx_hunk_size(ce->hunk);
286 }
287 }
288
289 out = NULL;
290 le = NULL;
291
292 for ( ;; ) {
293 if (p->out) {
294 ce = p->out;
295
296 if (!(p->upstream_eof || p->upstream_error || p->upstream_done)
297 && (busy_len + ngx_hunk_size(ce->hunk) > p->max_busy_len))
298 {
299 break;
300 }
301
302 p->out = p->out->next;
303 ngx_remove_shadow_free_raw_hunk(&p->free_raw_hunks, ce->hunk);
304
305 } else if (!p->cachable && p->in) {
306 ce = p->in;
307
308 if (!(p->upstream_eof || p->upstream_error || p->upstream_done)
309 && (busy_len + ngx_hunk_size(ce->hunk) > p->max_busy_len))
310 {
311 break;
312 }
313
314 p->in = p->in->next;
315
316 } else {
317 break;
318 }
319
320 busy_len += ngx_hunk_size(ce->hunk);
321 ce->next = NULL;
322 ngx_chain_add_ce(out, le, ce);
323 }
324
325 if (out == NULL) {
326 break;
327 }
328
329 if (p->output_filter(p->output_ctx, out) == NGX_ERROR) {
330 p->downstream_error = 1;
331 continue;
332 }
333
334 ngx_chain_update_chains(&p->free, &p->busy, &out);
335
336 /* add the free shadow raw hunks to p->free_raw_hunks */
337
338 for (ce = p->free; ce; ce = ce->next) {
339 if (ce->hunk->type & NGX_HUNK_LAST_SHADOW) {
340 h = ce->hunk->shadow;
341 /* THINK NEEDED ??? */ h->pos = h->last = h->start;
342 h->shadow = NULL;
343 ngx_alloc_ce_and_set_hunk(te, h, p->pool, NGX_ABORT);
344 ngx_add_after_partially_filled_hunk(&p->free_raw_hunks, te);
345
346 ce->hunk->type &= ~NGX_HUNK_LAST_SHADOW;
347 }
348 ce->hunk->shadow = NULL;
349 }
350 }
351
352 return NGX_OK;
353 }
354
355
356 static int ngx_event_pipe_write_chain_to_temp_file(ngx_event_pipe_t *p)
357 {
358 int rc, size, hunk_size;
359 ngx_hunk_t *h;
360 ngx_chain_t *ce, *te, *next, *out, **le, **last_free;
361
362 ngx_log_debug(p->log, "write to file");
363
364 if (p->temp_file->fd == NGX_INVALID_FILE) {
365 rc = ngx_create_temp_file(p->temp_file, p->temp_path, p->pool,
366 p->cachable);
367
368 if (rc == NGX_ERROR) {
369 return NGX_ABORT;
370 }
371
372 if (rc == NGX_AGAIN) {
373 return NGX_AGAIN;
374 }
375
376 if (!p->cachable && p->temp_file_warn) {
377 ngx_log_error(NGX_LOG_WARN, p->log, 0, p->temp_file_warn);
378 }
379 }
380
381 out = p->in;
382
383 if (!p->cachable) {
384
385 size = 0;
386 ce = p->in;
387 le = NULL;
388
389 ngx_log_debug(p->log, "offset: %d" _ p->temp_offset);
390
391 do {
392 hunk_size = ce->hunk->last - ce->hunk->pos;
393
394 ngx_log_debug(p->log, "hunk size: %d" _ hunk_size);
395
396 if ((size + hunk_size > p->temp_file_write_size)
397 || (p->temp_offset + hunk_size > p->max_temp_file_size))
398 {
399 break;
400 }
401
402 size += hunk_size;
403 le = &ce->next;
404 ce = ce->next;
405
406 } while (ce);
407
408 ngx_log_debug(p->log, "size: %d" _ size);
409
410 if (ce) {
411 p->in = ce;
412 *le = NULL;
413
414 } else {
415 p->in = NULL;
416 p->last_in = &p->in;
417 }
418
419 } else {
420 p->in = NULL;
421 p->last_in = &p->in;
422 }
423
424 if (ngx_write_chain_to_file(p->temp_file, out, p->temp_offset,
425 p->pool) == NGX_ERROR) {
426 return NGX_ABORT;
427 }
428
429 for (last_free = &p->free_raw_hunks;
430 *last_free != NULL;
431 last_free = &(*last_free)->next)
432 {
433 /* void */
434 }
435
436 for (ce = out; ce; ce = next) {
437 next = ce->next;
438 ce->next = NULL;
439
440 h = ce->hunk;
441 h->type |= NGX_HUNK_FILE;
442 h->file = p->temp_file;
443 h->file_pos = p->temp_offset;
444 p->temp_offset += h->last - h->pos;
445 h->file_last = p->temp_offset;
446
447 ngx_chain_add_ce(p->out, p->last_out, ce);
448
449 if (h->type & NGX_HUNK_LAST_SHADOW) {
450 h->shadow->last = h->shadow->pos = h->shadow->start;
451 ngx_alloc_ce_and_set_hunk(te, h->shadow, p->pool, NGX_ABORT);
452 *last_free = te;
453 last_free = &te->next;
454 }
455 }
456
457 return NGX_OK;
458 }
459
460
461 /* the copy input filter */
462
463 int ngx_event_pipe_copy_input_filter(ngx_event_pipe_t *p, ngx_hunk_t *hunk)
464 {
465 ngx_hunk_t *h;
466 ngx_chain_t *ce;
467
468 if (hunk->pos == hunk->last) {
469 return NGX_OK;
470 }
471
472 if (p->free) {
473 h = p->free->hunk;
474 p->free = p->free->next;
475
476 } else {
477 ngx_test_null(h, ngx_alloc_hunk(p->pool), NGX_ERROR);
478 }
479
480 ngx_memcpy(h, hunk, sizeof(ngx_hunk_t));
481 h->shadow = hunk;
482 h->type |= NGX_HUNK_LAST_SHADOW|NGX_HUNK_RECYCLED;
483 hunk->shadow = h;
484
485 ngx_alloc_ce_and_set_hunk(ce, h, p->pool, NGX_ERROR);
486 ngx_chain_add_ce(p->in, p->last_in, ce);
487
488 return NGX_OK;
489 }
490
491
492 ngx_inline static void ngx_remove_shadow_links(ngx_hunk_t *hunk)
493 {
494 ngx_hunk_t *h, *next;
495
496 if (hunk->shadow == NULL) {
497 return;
498 }
499
500 h = hunk->shadow;
501
502 while (!(h->type & NGX_HUNK_LAST_SHADOW)) {
503 next = h->shadow;
504 h->type &= ~(NGX_HUNK_TEMP|NGX_HUNK_IN_MEMORY|NGX_HUNK_RECYCLED);
505 h->shadow = NULL;
506 h = next;
507 }
508
509 h->type &= ~(NGX_HUNK_TEMP
510 |NGX_HUNK_IN_MEMORY
511 |NGX_HUNK_RECYCLED
512 |NGX_HUNK_LAST_SHADOW);
513 h->shadow = NULL;
514
515 hunk->shadow = NULL;
516 }
517
518
519 ngx_inline static void ngx_remove_shadow_free_raw_hunk(ngx_chain_t **free,
520 ngx_hunk_t *h)
521 {
522 ngx_hunk_t *s;
523 ngx_chain_t *ce, **le;
524
525 if (h->shadow == NULL) {
526 return;
527 }
528
529 for (s = h->shadow; !(s->type & NGX_HUNK_LAST_SHADOW); s = s->shadow) {
530 /* void */
531 }
532
533 le = free;
534
535 for (ce = *free ; ce; ce = ce->next) {
536 if (ce->hunk == s) {
537 *le = ce->next;
538 break;
539 }
540
541 if (ce->hunk->shadow) {
542 break;
543 }
544
545 le = &ce->next;
546 }
547 }
548
549
550 ngx_inline static void ngx_add_after_partially_filled_hunk(ngx_chain_t **chain,
551 ngx_chain_t *ce)
552 {
553 if (*chain == NULL) {
554 *chain = ce;
555 return;
556 }
557
558 if ((*chain)->hunk->pos != (*chain)->hunk->last) {
559 ce->next = (*chain)->next;
560 (*chain)->next = ce;
561
562 } else {
563 ce->next = (*chain);
564 (*chain) = ce;
565 }
566 }
567
568
569 static int ngx_drain_chains(ngx_event_pipe_t *p)
570 {
571 ngx_hunk_t *h;
572 ngx_chain_t *ce, *te;
573
574 for ( ;; ) {
575 if (p->busy) {
576 ce = p->busy;
577
578 } else if (p->out) {
579 ce = p->out;
580
581 } else if (p->in) {
582 ce = p->in;
583
584 } else {
585 return NGX_OK;
586 }
587
588 while (ce) {
589 if (ce->hunk->type & NGX_HUNK_LAST_SHADOW) {
590 h = ce->hunk->shadow;
591 /* THINK NEEDED ??? */ h->pos = h->last = h->start;
592 h->shadow = NULL;
593 ngx_alloc_ce_and_set_hunk(te, h, p->pool, NGX_ABORT);
594 ngx_add_after_partially_filled_hunk(&p->free_raw_hunks, te);
595
596 ce->hunk->type &= ~NGX_HUNK_LAST_SHADOW;
597 }
598
599 ce->hunk->shadow = NULL;
600 te = ce->next;
601 ce->next = p->free;
602 p->free = ce;
603 ce = te;
604 }
605 }
606 }