comparison src/event/ngx_event_pipe.c @ 0:f0b350454894 NGINX_0_1_0

nginx 0.1.0 *) The first public version.
author Igor Sysoev <http://sysoev.ru>
date Mon, 04 Oct 2004 00:00:00 +0400
parents
children cc9f381affaa
comparison
equal deleted inserted replaced
-1:000000000000 0:f0b350454894
1
2 /*
3 * Copyright (C) Igor Sysoev
4 */
5
6
7 #include <ngx_config.h>
8 #include <ngx_core.h>
9 #include <ngx_event.h>
10 #include <ngx_event_pipe.h>
11
12
13 static ngx_int_t ngx_event_pipe_read_upstream(ngx_event_pipe_t *p);
14 static ngx_int_t ngx_event_pipe_write_to_downstream(ngx_event_pipe_t *p);
15
16 static ngx_int_t ngx_event_pipe_write_chain_to_temp_file(ngx_event_pipe_t *p);
17 ngx_inline static void ngx_event_pipe_remove_shadow_links(ngx_buf_t *buf);
18 ngx_inline static void ngx_event_pipe_free_shadow_raw_buf(ngx_chain_t **free,
19 ngx_buf_t *buf);
20 ngx_inline static void ngx_event_pipe_add_free_buf(ngx_chain_t **chain,
21 ngx_chain_t *cl);
22 static ngx_int_t ngx_event_pipe_drain_chains(ngx_event_pipe_t *p);
23
24
25 ngx_int_t ngx_event_pipe(ngx_event_pipe_t *p, int do_write)
26 {
27 u_int flags;
28 ngx_event_t *rev, *wev;
29
30 for ( ;; ) {
31 if (do_write) {
32 if (ngx_event_pipe_write_to_downstream(p) == NGX_ABORT) {
33 return NGX_ABORT;
34 }
35 }
36
37 p->read = 0;
38 p->upstream_blocked = 0;
39
40 if (ngx_event_pipe_read_upstream(p) == NGX_ABORT) {
41 return NGX_ABORT;
42 }
43
44 if (!p->read && !p->upstream_blocked) {
45 break;
46 }
47
48 do_write = 1;
49 }
50
51 if (p->upstream->fd != -1) {
52 rev = p->upstream->read;
53
54 flags = (rev->eof || rev->error) ? NGX_CLOSE_EVENT : 0;
55
56 if (ngx_handle_read_event(rev, flags) == NGX_ERROR) {
57 return NGX_ABORT;
58 }
59
60 if (rev->active) {
61 ngx_add_timer(rev, p->read_timeout);
62 }
63 }
64
65 if (p->downstream->fd != -1) {
66 wev = p->downstream->write;
67 wev->available = p->send_lowat;
68 if (ngx_handle_write_event(wev, NGX_LOWAT_EVENT) == NGX_ERROR) {
69 return NGX_ABORT;
70 }
71
72 if (wev->active) {
73 ngx_add_timer(wev, p->send_timeout);
74 }
75 }
76
77 return NGX_OK;
78 }
79
80
81 ngx_int_t ngx_event_pipe_read_upstream(ngx_event_pipe_t *p)
82 {
83 int n, rc, size;
84 ngx_buf_t *b;
85 ngx_chain_t *chain, *cl, *tl;
86
87 if (p->upstream_eof || p->upstream_error || p->upstream_done) {
88 return NGX_OK;
89 }
90
91 ngx_log_debug1(NGX_LOG_DEBUG_EVENT, p->log, 0,
92 "pipe read upstream: %d", p->upstream->read->ready);
93
94 for ( ;; ) {
95
96 if (p->upstream_eof || p->upstream_error || p->upstream_done) {
97 break;
98 }
99
100 if (p->preread_bufs == NULL && !p->upstream->read->ready) {
101 break;
102 }
103
104 if (p->preread_bufs) {
105
106 /* use the pre-read bufs if they exist */
107
108 chain = p->preread_bufs;
109 p->preread_bufs = NULL;
110 n = p->preread_size;
111
112 ngx_log_debug1(NGX_LOG_DEBUG_EVENT, p->log, 0,
113 "pipe preread: %d", n);
114
115 if (n) {
116 p->read = 1;
117 }
118
119 } else {
120
121 /*
122 * kqueue notifies about the end of file or a pending error.
123 * This test allows not to allocate a buf on these conditions
124 * and not to call ngx_recv_chain().
125 */
126
127 if (p->upstream->read->available == 0
128 && p->upstream->read->pending_eof)
129 {
130 p->upstream->read->ready = 0;
131 p->upstream->read->eof = 0;
132 p->upstream_eof = 1;
133 p->read = 1;
134
135 #if (HAVE_KQUEUE)
136 if (p->upstream->read->kq_errno) {
137 p->upstream->read->error = 1;
138 p->upstream_error = 1;
139 p->upstream_eof = 0;
140
141 ngx_log_error(NGX_LOG_ERR, p->log,
142 p->upstream->read->kq_errno,
143 "readv() failed");
144 }
145 #endif
146
147 break;
148 }
149
150 if (p->free_raw_bufs) {
151
152 /* use the free bufs if they exist */
153
154 chain = p->free_raw_bufs;
155 if (p->single_buf) {
156 p->free_raw_bufs = p->free_raw_bufs->next;
157 chain->next = NULL;
158 } else {
159 p->free_raw_bufs = NULL;
160 }
161
162 } else if (p->allocated < p->bufs.num) {
163
164 /* allocate a new buf if it's still allowed */
165
166 if (!(b = ngx_create_temp_buf(p->pool, p->bufs.size))) {
167 return NGX_ABORT;
168 }
169
170 p->allocated++;
171
172 ngx_alloc_link_and_set_buf(tl, b, p->pool, NGX_ABORT);
173 chain = tl;
174
175 } else if (!p->cachable && p->downstream->write->ready) {
176
177 /*
178 * if the bufs are not needed to be saved in a cache and
179 * a downstream is ready then write the bufs to a downstream
180 */
181
182 p->upstream_blocked = 1;
183
184 ngx_log_debug0(NGX_LOG_DEBUG_EVENT, p->log, 0,
185 "pipe downstream ready");
186
187 break;
188
189 } else if (p->cachable
190 || p->temp_file->offset < p->max_temp_file_size)
191 {
192
193 /*
194 * if it's allowed then save some bufs from r->in
195 * to a temporary file, and add them to a r->out chain
196 */
197
198 rc = ngx_event_pipe_write_chain_to_temp_file(p);
199
200 ngx_log_debug1(NGX_LOG_DEBUG_EVENT, p->log, 0,
201 "pipe temp offset: %d", p->temp_file->offset);
202
203 if (rc == NGX_AGAIN) {
204 if (ngx_event_flags & NGX_USE_LEVEL_EVENT
205 && p->upstream->read->active
206 && p->upstream->read->ready)
207 {
208 if (ngx_del_event(p->upstream->read, NGX_READ_EVENT, 0)
209 == NGX_ERROR)
210 {
211 return NGX_ABORT;
212 }
213 }
214 }
215
216 if (rc != NGX_OK) {
217 return rc;
218 }
219
220 chain = p->free_raw_bufs;
221 if (p->single_buf) {
222 p->free_raw_bufs = p->free_raw_bufs->next;
223 chain->next = NULL;
224 } else {
225 p->free_raw_bufs = NULL;
226 }
227
228 } else {
229
230 /* if there're no bufs to read in then disable a level event */
231
232 ngx_log_debug0(NGX_LOG_DEBUG_EVENT, p->log, 0,
233 "no pipe bufs to read in");
234
235 break;
236 }
237
238 n = ngx_recv_chain(p->upstream, chain);
239
240 ngx_log_debug1(NGX_LOG_DEBUG_EVENT, p->log, 0,
241 "pipe recv chain: %d", n);
242
243 if (p->free_raw_bufs) {
244 chain->next = p->free_raw_bufs;
245 }
246 p->free_raw_bufs = chain;
247
248 if (n == NGX_ERROR) {
249 p->upstream_error = 1;
250 return NGX_ERROR;
251 }
252
253 if (n == NGX_AGAIN) {
254 if (p->single_buf) {
255 ngx_event_pipe_remove_shadow_links(chain->buf);
256 }
257
258 break;
259 }
260
261 p->read = 1;
262
263 if (n == 0) {
264 p->upstream_eof = 1;
265 break;
266 }
267 }
268
269 p->read_length += n;
270 cl = chain;
271
272 while (cl && n > 0) {
273
274 ngx_event_pipe_remove_shadow_links(cl->buf);
275
276 size = cl->buf->end - cl->buf->last;
277
278 if (n >= size) {
279 cl->buf->last = cl->buf->end;
280
281 /* STUB */ cl->buf->num = p->num++;
282
283 if (p->input_filter(p, cl->buf) == NGX_ERROR) {
284 return NGX_ABORT;
285 }
286
287 n -= size;
288 cl = cl->next;
289
290 } else {
291 cl->buf->last += n;
292 n = 0;
293 }
294 }
295
296 p->free_raw_bufs = cl;
297 }
298
299 #if (NGX_DEBUG)
300
301 if (p->in || p->busy || p->free_raw_bufs) {
302 ngx_log_debug0(NGX_LOG_DEBUG_EVENT, p->log, 0, "pipe buf");
303 }
304
305 for (cl = p->in; cl; cl = cl->next) {
306 ngx_log_debug3(NGX_LOG_DEBUG_EVENT, p->log, 0,
307 "pipe buf in " PTR_FMT ", pos " PTR_FMT ", size: %d",
308 cl->buf->start, cl->buf->pos,
309 cl->buf->last - cl->buf->pos);
310 }
311
312 for (cl = p->busy; cl; cl = cl->next) {
313 ngx_log_debug3(NGX_LOG_DEBUG_EVENT, p->log, 0,
314 "pipe buf busy " PTR_FMT ", pos " PTR_FMT ", size: %d",
315 cl->buf->start, cl->buf->pos,
316 cl->buf->last - cl->buf->pos);
317 }
318
319 for (cl = p->free_raw_bufs; cl; cl = cl->next) {
320 ngx_log_debug3(NGX_LOG_DEBUG_EVENT, p->log, 0,
321 "pipe buf free " PTR_FMT ", last " PTR_FMT ", size: %d",
322 cl->buf->start, cl->buf->last,
323 cl->buf->end - cl->buf->last);
324 }
325
326 #endif
327
328 if ((p->upstream_eof || p->upstream_error) && p->free_raw_bufs) {
329
330 /* STUB */ p->free_raw_bufs->buf->num = p->num++;
331
332 if (p->input_filter(p, p->free_raw_bufs->buf) == NGX_ERROR) {
333 return NGX_ABORT;
334 }
335
336 p->free_raw_bufs = p->free_raw_bufs->next;
337
338 if (p->free_bufs) {
339 for (cl = p->free_raw_bufs; cl; cl = cl->next) {
340 ngx_pfree(p->pool, cl->buf->start);
341 }
342 }
343 }
344
345 if (p->cachable && p->in) {
346 if (ngx_event_pipe_write_chain_to_temp_file(p) == NGX_ABORT) {
347 return NGX_ABORT;
348 }
349 }
350
351 return NGX_OK;
352 }
353
354
355 ngx_int_t ngx_event_pipe_write_to_downstream(ngx_event_pipe_t *p)
356 {
357 size_t bsize;
358 ngx_uint_t flush;
359 ngx_buf_t *b;
360 ngx_chain_t *out, **ll, *cl, *tl;
361
362 ngx_log_debug1(NGX_LOG_DEBUG_EVENT, p->log, 0,
363 "pipe write downstream: %d", p->downstream->write->ready);
364
365 for ( ;; ) {
366 if (p->downstream_error) {
367 return ngx_event_pipe_drain_chains(p);
368 }
369
370 if (p->upstream_eof || p->upstream_error || p->upstream_done) {
371
372 /* pass the p->out and p->in chains to the output filter */
373
374 if (p->out) {
375 ngx_log_debug0(NGX_LOG_DEBUG_EVENT, p->log, 0,
376 "pipe write downstream flush out");
377
378 if (p->output_filter(p->output_ctx, p->out) == NGX_ERROR) {
379 p->downstream_error = 1;
380 return ngx_event_pipe_drain_chains(p);
381 }
382
383 p->out = NULL;
384 }
385
386 if (p->in) {
387 ngx_log_debug0(NGX_LOG_DEBUG_EVENT, p->log, 0,
388 "pipe write downstream flush in");
389
390 if (p->output_filter(p->output_ctx, p->in) == NGX_ERROR) {
391 p->downstream_error = 1;
392 return ngx_event_pipe_drain_chains(p);
393 }
394
395 p->in = NULL;
396 }
397
398 ngx_log_debug0(NGX_LOG_DEBUG_EVENT, p->log, 0,
399 "pipe write downstream done");
400
401 /* TODO: free unused bufs */
402
403 p->downstream_done = 1;
404 break;
405 }
406
407 if (!p->downstream->write->ready) {
408 break;
409 }
410
411 /* bsize is the size of the busy bufs */
412
413 bsize = 0;
414
415 for (cl = p->busy; cl; cl = cl->next) {
416 bsize += cl->buf->end - cl->buf->start;
417 }
418
419 ngx_log_debug1(NGX_LOG_DEBUG_EVENT, p->log, 0,
420 "pipe write busy: " SIZE_T_FMT, bsize);
421
422 out = NULL;
423 ll = NULL;
424 flush = 0;
425
426 for ( ;; ) {
427 if (p->out) {
428 cl = p->out;
429
430 if (bsize + ngx_buf_size(cl->buf) > p->busy_size) {
431 flush = 1;
432 break;
433 }
434
435 p->out = p->out->next;
436 ngx_event_pipe_free_shadow_raw_buf(&p->free_raw_bufs,
437 cl->buf);
438
439 } else if (!p->cachable && p->in) {
440 cl = p->in;
441
442 if (bsize + ngx_buf_size(cl->buf) > p->busy_size) {
443 flush = 1;
444 break;
445 }
446
447 p->in = p->in->next;
448
449 } else {
450 break;
451 }
452
453 bsize += ngx_buf_size(cl->buf);
454 cl->next = NULL;
455 ngx_chain_add_link(out, ll, cl);
456 }
457
458 ngx_log_debug2(NGX_LOG_DEBUG_EVENT, p->log, 0,
459 "pipe write: out:" PTR_FMT ", f:%d", out, flush);
460
461 if (out == NULL && !flush) {
462 break;
463 }
464
465 if (p->output_filter(p->output_ctx, out) == NGX_ERROR) {
466 p->downstream_error = 1;
467 return ngx_event_pipe_drain_chains(p);
468 }
469
470 ngx_chain_update_chains(&p->free, &p->busy, &out, p->tag);
471
472 for (cl = p->free; cl; cl = cl->next) {
473
474 if (cl->buf->temp_file) {
475 if (p->cachable || !p->cyclic_temp_file) {
476 continue;
477 }
478
479 /* reset p->temp_offset if all bufs had been sent */
480
481 if (cl->buf->file_last == p->temp_file->offset) {
482 p->temp_file->offset = 0;
483 }
484 }
485
486 /* TODO: free buf if p->free_bufs && upstream done */
487
488 /* add the free shadow raw buf to p->free_raw_bufs */
489
490 if (cl->buf->last_shadow) {
491 b = cl->buf->shadow;
492 b->pos = b->last = b->start;
493 b->shadow = NULL;
494 ngx_alloc_link_and_set_buf(tl, b, p->pool, NGX_ABORT);
495 ngx_event_pipe_add_free_buf(&p->free_raw_bufs, tl);
496
497 cl->buf->last_shadow = 0;
498 }
499
500 cl->buf->shadow = NULL;
501 }
502 }
503
504 return NGX_OK;
505 }
506
507
508 static ngx_int_t ngx_event_pipe_write_chain_to_temp_file(ngx_event_pipe_t *p)
509 {
510 ssize_t size, bsize;
511 ngx_buf_t *b;
512 ngx_chain_t *cl, *tl, *next, *out, **ll, **last_free, fl;
513
514 if (p->buf_to_file) {
515 fl.buf = p->buf_to_file;
516 fl.next = p->in;
517 out = &fl;
518
519 } else {
520 out = p->in;
521 }
522
523 if (!p->cachable) {
524
525 size = 0;
526 cl = out;
527 ll = NULL;
528
529 ngx_log_debug1(NGX_LOG_DEBUG_EVENT, p->log, 0,
530 "pipe offset: %d", p->temp_file->offset);
531
532 do {
533 bsize = cl->buf->last - cl->buf->pos;
534
535 ngx_log_debug3(NGX_LOG_DEBUG_EVENT, p->log, 0,
536 "pipe buf " PTR_FMT ", pos " PTR_FMT ", size: %d",
537 cl->buf->start, cl->buf->pos, bsize);
538
539 if ((size + bsize > p->temp_file_write_size)
540 || (p->temp_file->offset + size + bsize > p->max_temp_file_size))
541 {
542 break;
543 }
544
545 size += bsize;
546 ll = &cl->next;
547 cl = cl->next;
548
549 } while (cl);
550
551 ngx_log_debug1(NGX_LOG_DEBUG_EVENT, p->log, 0, "size: %d", size);
552
553 if (cl) {
554 p->in = cl;
555 *ll = NULL;
556
557 } else {
558 p->in = NULL;
559 p->last_in = &p->in;
560 }
561
562 } else {
563 p->in = NULL;
564 p->last_in = &p->in;
565 }
566
567 if (ngx_write_chain_to_temp_file(p->temp_file, out) == NGX_ERROR) {
568 return NGX_ABORT;
569 }
570
571 for (last_free = &p->free_raw_bufs;
572 *last_free != NULL;
573 last_free = &(*last_free)->next)
574 {
575 /* void */
576 }
577
578 if (p->buf_to_file) {
579 p->temp_file->offset = p->buf_to_file->last - p->buf_to_file->pos;
580 p->buf_to_file = NULL;
581 out = out->next;
582 }
583
584 for (cl = out; cl; cl = next) {
585 next = cl->next;
586 cl->next = NULL;
587
588 b = cl->buf;
589 b->file = &p->temp_file->file;
590 b->file_pos = p->temp_file->offset;
591 p->temp_file->offset += b->last - b->pos;
592 b->file_last = p->temp_file->offset;
593
594 b->in_file = 1;
595 b->temp_file = 1;
596
597 ngx_chain_add_link(p->out, p->last_out, cl);
598
599 if (b->last_shadow) {
600 b->shadow->last = b->shadow->pos = b->shadow->start;
601 ngx_alloc_link_and_set_buf(tl, b->shadow, p->pool, NGX_ABORT);
602 *last_free = tl;
603 last_free = &tl->next;
604 }
605 }
606
607 return NGX_OK;
608 }
609
610
611 /* the copy input filter */
612
613 ngx_int_t ngx_event_pipe_copy_input_filter(ngx_event_pipe_t *p, ngx_buf_t *buf)
614 {
615 ngx_buf_t *b;
616 ngx_chain_t *cl;
617
618 if (buf->pos == buf->last) {
619 return NGX_OK;
620 }
621
622 if (p->free) {
623 b = p->free->buf;
624 p->free = p->free->next;
625
626 } else {
627 if (!(b = ngx_alloc_buf(p->pool))) {
628 return NGX_ERROR;
629 }
630 }
631
632 ngx_memcpy(b, buf, sizeof(ngx_buf_t));
633 b->shadow = buf;
634 b->tag = p->tag;
635 b->last_shadow = 1;
636 b->recycled = 1;
637 buf->shadow = b;
638
639 ngx_alloc_link_and_set_buf(cl, b, p->pool, NGX_ERROR);
640
641 ngx_log_debug1(NGX_LOG_DEBUG_EVENT, p->log, 0, "buf #%d", b->num);
642
643 ngx_chain_add_link(p->in, p->last_in, cl);
644
645 return NGX_OK;
646 }
647
648
649 ngx_inline static void ngx_event_pipe_remove_shadow_links(ngx_buf_t *buf)
650 {
651 ngx_buf_t *b, *next;
652
653 if (buf->shadow == NULL) {
654 return;
655 }
656
657 b = buf->shadow;
658
659 while (!b->last_shadow) {
660 next = b->shadow;
661
662 b->in_file = 0;
663 b->temp_file = 0;
664 b->flush = 0;
665 b->zerocopy_busy = 0;
666
667 b->shadow = NULL;
668 b = next;
669 }
670
671 b->in_file = 0;
672 b->temp_file = 0;
673 b->flush = 0;
674 b->zerocopy_busy = 0;
675 b->last_shadow = 0;
676
677 b->shadow = NULL;
678
679 buf->shadow = NULL;
680 }
681
682
683 ngx_inline static void ngx_event_pipe_free_shadow_raw_buf(ngx_chain_t **free,
684 ngx_buf_t *buf)
685 {
686 ngx_buf_t *s;
687 ngx_chain_t *cl, **ll;
688
689 if (buf->shadow == NULL) {
690 return;
691 }
692
693 for (s = buf->shadow; !s->last_shadow; s = s->shadow) { /* void */ }
694
695 ll = free;
696
697 for (cl = *free ; cl; cl = cl->next) {
698 if (cl->buf == s) {
699 *ll = cl->next;
700 break;
701 }
702
703 if (cl->buf->shadow) {
704 break;
705 }
706
707 ll = &cl->next;
708 }
709 }
710
711
712 ngx_inline static void ngx_event_pipe_add_free_buf(ngx_chain_t **chain,
713 ngx_chain_t *cl)
714 {
715 if (*chain == NULL) {
716 *chain = cl;
717 return;
718 }
719
720 if ((*chain)->buf->pos != (*chain)->buf->last) {
721 cl->next = (*chain)->next;
722 (*chain)->next = cl;
723
724 } else {
725 cl->next = (*chain);
726 (*chain) = cl;
727 }
728 }
729
730
731 static ngx_int_t ngx_event_pipe_drain_chains(ngx_event_pipe_t *p)
732 {
733 ngx_buf_t *b;
734 ngx_chain_t *cl, *tl;
735
736 for ( ;; ) {
737 if (p->busy) {
738 cl = p->busy;
739 p->busy = NULL;
740
741 } else if (p->out) {
742 cl = p->out;
743 p->out = NULL;
744
745 } else if (p->in) {
746 cl = p->in;
747 p->in = NULL;
748
749 } else {
750 return NGX_OK;
751 }
752
753 while (cl) {
754 if (cl->buf->last_shadow) {
755 b = cl->buf->shadow;
756 b->pos = b->last = b->start;
757 b->shadow = NULL;
758 ngx_alloc_link_and_set_buf(tl, b, p->pool, NGX_ABORT);
759 ngx_event_pipe_add_free_buf(&p->free_raw_bufs, tl);
760
761 cl->buf->last_shadow = 0;
762 }
763
764 cl->buf->shadow = NULL;
765 tl = cl->next;
766 cl->next = p->free;
767 p->free = cl;
768 cl = tl;
769 }
770 }
771 }