Mercurial > hg > nginx
comparison src/event/ngx_event_pipe.c @ 153:c71aeb75c071
nginx-0.0.1-2003-10-21-20:49:56 import
author | Igor Sysoev <igor@sysoev.ru> |
---|---|
date | Tue, 21 Oct 2003 16:49:56 +0000 |
parents | src/event/ngx_event_proxy.c@fb48bf4fea1c |
children | eac26585476e |
comparison
equal
deleted
inserted
replaced
152:fb48bf4fea1c | 153:c71aeb75c071 |
---|---|
1 | |
2 #include <ngx_config.h> | |
3 #include <ngx_core.h> | |
4 #include <ngx_event.h> | |
5 #include <ngx_event_pipe.h> | |
6 | |
7 | |
8 static int ngx_event_pipe_read_upstream(ngx_event_pipe_t *p); | |
9 static int ngx_event_pipe_write_to_downstream(ngx_event_pipe_t *p); | |
10 | |
11 static int ngx_event_pipe_write_chain_to_temp_file(ngx_event_pipe_t *p); | |
12 ngx_inline static void ngx_remove_shadow_links(ngx_hunk_t *hunk); | |
13 ngx_inline static void ngx_remove_shadow_free_raw_hunk(ngx_chain_t **free, | |
14 ngx_hunk_t *h); | |
15 ngx_inline static void ngx_add_after_partially_filled_hunk(ngx_chain_t **chain, | |
16 ngx_chain_t *ce); | |
17 static int ngx_drain_chains(ngx_event_pipe_t *p); | |
18 | |
19 | |
20 int ngx_event_pipe(ngx_event_pipe_t *p, int do_write) | |
21 { | |
22 for ( ;; ) { | |
23 if (do_write) { | |
24 if (ngx_event_pipe_write_to_downstream(p) == NGX_ABORT) { | |
25 return NGX_ABORT; | |
26 } | |
27 } | |
28 | |
29 p->read = 0; | |
30 | |
31 if (ngx_event_pipe_read_upstream(p) == NGX_ABORT) { | |
32 return NGX_ABORT; | |
33 } | |
34 | |
35 if (!p->read) { | |
36 break; | |
37 } | |
38 | |
39 do_write = 1; | |
40 } | |
41 | |
42 if (ngx_handle_read_event(p->upstream->read) == NGX_ERROR) { | |
43 return NGX_ABORT; | |
44 } | |
45 | |
46 if (ngx_handle_write_event(p->downstream->write, | |
47 /* TODO: lowat */ 0) == NGX_ERROR) { | |
48 return NGX_ABORT; | |
49 } | |
50 | |
51 return NGX_OK; | |
52 } | |
53 | |
54 | |
55 int ngx_event_pipe_read_upstream(ngx_event_pipe_t *p) | |
56 { | |
57 int n, rc, size; | |
58 ngx_hunk_t *h; | |
59 ngx_chain_t *chain, *ce, *te; | |
60 | |
61 if (p->upstream_eof || p->upstream_error || p->upstream_done) { | |
62 return NGX_OK; | |
63 } | |
64 | |
65 ngx_log_debug(p->log, "read upstream: %d" _ p->upstream->read->ready); | |
66 | |
67 for ( ;; ) { | |
68 | |
69 if (p->upstream_eof || p->upstream_error || p->upstream_done) { | |
70 break; | |
71 } | |
72 | |
73 if (p->preread_hunks == NULL && !p->upstream->read->ready) { | |
74 break; | |
75 } | |
76 | |
77 if (p->preread_hunks) { | |
78 | |
79 /* use the pre-read hunks if they exist */ | |
80 | |
81 p->read = 1; | |
82 chain = p->preread_hunks; | |
83 p->preread_hunks = NULL; | |
84 n = p->preread_size; | |
85 | |
86 ngx_log_debug(p->log, "preread: %d" _ n); | |
87 | |
88 } else { | |
89 | |
90 #if (HAVE_KQUEUE) | |
91 | |
92 /* | |
93 * kqueue notifies about the end of file or a pending error. | |
94 * This test allows not to allocate a hunk on these conditions | |
95 * and not to call ngx_recv_chain(). | |
96 */ | |
97 | |
98 if (ngx_event_flags == NGX_HAVE_KQUEUE_EVENT) { | |
99 | |
100 if (p->upstream->read->error) { | |
101 ngx_log_error(NGX_LOG_ERR, p->log, p->upstream->read->error, | |
102 "readv() failed"); | |
103 p->upstream_error = 1; | |
104 | |
105 break; | |
106 | |
107 } else if (p->upstream->read->eof | |
108 && p->upstream->read->available == 0) { | |
109 p->upstream_eof = 1; | |
110 p->read = 1; | |
111 | |
112 break; | |
113 } | |
114 } | |
115 #endif | |
116 | |
117 if (p->free_raw_hunks) { | |
118 | |
119 /* use the free hunks if they exist */ | |
120 | |
121 chain = p->free_raw_hunks; | |
122 p->free_raw_hunks = NULL; | |
123 | |
124 } else if (p->hunks < p->bufs.num) { | |
125 | |
126 /* allocate a new hunk if it's still allowed */ | |
127 | |
128 ngx_test_null(h, ngx_create_temp_hunk(p->pool, | |
129 p->bufs.size, 0, 0), | |
130 NGX_ABORT); | |
131 p->hunks++; | |
132 | |
133 ngx_alloc_ce_and_set_hunk(te, h, p->pool, NGX_ABORT); | |
134 chain = te; | |
135 | |
136 } else if (!p->cachable && p->downstream->write->ready) { | |
137 | |
138 /* | |
139 * if the hunks are not needed to be saved in a cache and | |
140 * a downstream is ready then write the hunks to a downstream | |
141 */ | |
142 | |
143 ngx_log_debug(p->log, "downstream ready"); | |
144 | |
145 break; | |
146 | |
147 } else if (p->cachable || p->temp_offset < p->max_temp_file_size) { | |
148 | |
149 /* | |
150 * if it's allowed then save some hunks from r->in | |
151 * to a temporary file, and add them to a r->out chain | |
152 */ | |
153 | |
154 rc = ngx_event_pipe_write_chain_to_temp_file(p); | |
155 | |
156 ngx_log_debug(p->log, "temp offset: %d" _ p->temp_offset); | |
157 | |
158 if (rc == NGX_AGAIN) { | |
159 if (ngx_event_flags & NGX_USE_LEVEL_EVENT | |
160 && p->upstream->read->active | |
161 && p->upstream->read->ready) | |
162 { | |
163 if (ngx_del_event(p->upstream->read, NGX_READ_EVENT, 0) | |
164 == NGX_ERROR) | |
165 { | |
166 return NGX_ABORT; | |
167 } | |
168 } | |
169 } | |
170 | |
171 if (rc != NGX_OK) { | |
172 return rc; | |
173 } | |
174 | |
175 chain = p->free_raw_hunks; | |
176 p->free_raw_hunks = NULL; | |
177 | |
178 } else { | |
179 | |
180 /* if there're no hunks to read in then disable a level event */ | |
181 | |
182 ngx_log_debug(p->log, "no hunks to read in"); | |
183 | |
184 break; | |
185 } | |
186 | |
187 n = ngx_recv_chain(p->upstream, chain); | |
188 | |
189 ngx_log_debug(p->log, "recv_chain: %d" _ n); | |
190 | |
191 p->free_raw_hunks = chain; | |
192 | |
193 if (n == NGX_ERROR) { | |
194 p->upstream_error = 1; | |
195 return NGX_ERROR; | |
196 } | |
197 | |
198 if (n == NGX_AGAIN) { | |
199 break; | |
200 } | |
201 | |
202 p->read = 1; | |
203 | |
204 if (n == 0) { | |
205 p->upstream_eof = 1; | |
206 break; | |
207 } | |
208 } | |
209 | |
210 ce = chain; | |
211 | |
212 while (ce && n > 0) { | |
213 | |
214 ngx_remove_shadow_links(ce->hunk); | |
215 | |
216 size = ce->hunk->end - ce->hunk->last; | |
217 | |
218 if (n >= size) { | |
219 ce->hunk->last = ce->hunk->end; | |
220 | |
221 if (p->input_filter(p, ce->hunk) == NGX_ERROR) { | |
222 return NGX_ABORT; | |
223 } | |
224 | |
225 n -= size; | |
226 ce = ce->next; | |
227 | |
228 } else { | |
229 ce->hunk->last += n; | |
230 n = 0; | |
231 } | |
232 } | |
233 | |
234 p->free_raw_hunks = ce; | |
235 } | |
236 | |
237 if ((p->upstream_eof || p->upstream_error) && p->free_raw_hunks) { | |
238 if (p->input_filter(p, p->free_raw_hunks->hunk) == NGX_ERROR) { | |
239 return NGX_ABORT; | |
240 } | |
241 | |
242 /* TODO: p->free_raw_hunk->next can be free()ed */ | |
243 p->free_raw_hunks = p->free_raw_hunks->next; | |
244 } | |
245 | |
246 if (p->cachable && p->in) { | |
247 if (ngx_event_pipe_write_chain_to_temp_file(p) == NGX_ABORT) { | |
248 return NGX_ABORT; | |
249 } | |
250 } | |
251 | |
252 return NGX_OK; | |
253 } | |
254 | |
255 | |
256 int ngx_event_pipe_write_to_downstream(ngx_event_pipe_t *p) | |
257 { | |
258 size_t busy_len; | |
259 ngx_hunk_t *h; | |
260 ngx_chain_t *out, **le, *ce, *te; | |
261 | |
262 ngx_log_debug(p->log, "write downstream: %d" _ p->downstream->write->ready); | |
263 | |
264 for ( ;; ) { | |
265 if (p->downstream_error) { | |
266 return ngx_drain_chains(p); | |
267 } | |
268 | |
269 if ((p->upstream_eof || p->upstream_error || p->upstream_done) | |
270 && p->out == NULL && p->in == NULL) | |
271 { | |
272 p->downstream_done = 1; | |
273 break; | |
274 } | |
275 | |
276 if (!p->downstream->write->ready) { | |
277 break; | |
278 } | |
279 | |
280 busy_len = 0; | |
281 | |
282 if (!(p->upstream_eof || p->upstream_error || p->upstream_done)) { | |
283 /* calculate p->busy_len */ | |
284 for (ce = p->busy; ce; ce = ce->next) { | |
285 busy_len += ngx_hunk_size(ce->hunk); | |
286 } | |
287 } | |
288 | |
289 out = NULL; | |
290 le = NULL; | |
291 | |
292 for ( ;; ) { | |
293 if (p->out) { | |
294 ce = p->out; | |
295 | |
296 if (!(p->upstream_eof || p->upstream_error || p->upstream_done) | |
297 && (busy_len + ngx_hunk_size(ce->hunk) > p->max_busy_len)) | |
298 { | |
299 break; | |
300 } | |
301 | |
302 p->out = p->out->next; | |
303 ngx_remove_shadow_free_raw_hunk(&p->free_raw_hunks, ce->hunk); | |
304 | |
305 } else if (!p->cachable && p->in) { | |
306 ce = p->in; | |
307 | |
308 if (!(p->upstream_eof || p->upstream_error || p->upstream_done) | |
309 && (busy_len + ngx_hunk_size(ce->hunk) > p->max_busy_len)) | |
310 { | |
311 break; | |
312 } | |
313 | |
314 p->in = p->in->next; | |
315 | |
316 } else { | |
317 break; | |
318 } | |
319 | |
320 busy_len += ngx_hunk_size(ce->hunk); | |
321 ce->next = NULL; | |
322 ngx_chain_add_ce(out, le, ce); | |
323 } | |
324 | |
325 if (out == NULL) { | |
326 break; | |
327 } | |
328 | |
329 if (p->output_filter(p->output_ctx, out) == NGX_ERROR) { | |
330 p->downstream_error = 1; | |
331 continue; | |
332 } | |
333 | |
334 ngx_chain_update_chains(&p->free, &p->busy, &out); | |
335 | |
336 /* add the free shadow raw hunks to p->free_raw_hunks */ | |
337 | |
338 for (ce = p->free; ce; ce = ce->next) { | |
339 if (ce->hunk->type & NGX_HUNK_LAST_SHADOW) { | |
340 h = ce->hunk->shadow; | |
341 /* THINK NEEDED ??? */ h->pos = h->last = h->start; | |
342 h->shadow = NULL; | |
343 ngx_alloc_ce_and_set_hunk(te, h, p->pool, NGX_ABORT); | |
344 ngx_add_after_partially_filled_hunk(&p->free_raw_hunks, te); | |
345 | |
346 ce->hunk->type &= ~NGX_HUNK_LAST_SHADOW; | |
347 } | |
348 ce->hunk->shadow = NULL; | |
349 } | |
350 } | |
351 | |
352 return NGX_OK; | |
353 } | |
354 | |
355 | |
356 static int ngx_event_pipe_write_chain_to_temp_file(ngx_event_pipe_t *p) | |
357 { | |
358 int rc, size, hunk_size; | |
359 ngx_hunk_t *h; | |
360 ngx_chain_t *ce, *te, *next, *out, **le, **last_free; | |
361 | |
362 ngx_log_debug(p->log, "write to file"); | |
363 | |
364 if (p->temp_file->fd == NGX_INVALID_FILE) { | |
365 rc = ngx_create_temp_file(p->temp_file, p->temp_path, p->pool, | |
366 p->cachable); | |
367 | |
368 if (rc == NGX_ERROR) { | |
369 return NGX_ABORT; | |
370 } | |
371 | |
372 if (rc == NGX_AGAIN) { | |
373 return NGX_AGAIN; | |
374 } | |
375 | |
376 if (!p->cachable && p->temp_file_warn) { | |
377 ngx_log_error(NGX_LOG_WARN, p->log, 0, p->temp_file_warn); | |
378 } | |
379 } | |
380 | |
381 out = p->in; | |
382 | |
383 if (!p->cachable) { | |
384 | |
385 size = 0; | |
386 ce = p->in; | |
387 le = NULL; | |
388 | |
389 ngx_log_debug(p->log, "offset: %d" _ p->temp_offset); | |
390 | |
391 do { | |
392 hunk_size = ce->hunk->last - ce->hunk->pos; | |
393 | |
394 ngx_log_debug(p->log, "hunk size: %d" _ hunk_size); | |
395 | |
396 if ((size + hunk_size > p->temp_file_write_size) | |
397 || (p->temp_offset + hunk_size > p->max_temp_file_size)) | |
398 { | |
399 break; | |
400 } | |
401 | |
402 size += hunk_size; | |
403 le = &ce->next; | |
404 ce = ce->next; | |
405 | |
406 } while (ce); | |
407 | |
408 ngx_log_debug(p->log, "size: %d" _ size); | |
409 | |
410 if (ce) { | |
411 p->in = ce; | |
412 *le = NULL; | |
413 | |
414 } else { | |
415 p->in = NULL; | |
416 p->last_in = &p->in; | |
417 } | |
418 | |
419 } else { | |
420 p->in = NULL; | |
421 p->last_in = &p->in; | |
422 } | |
423 | |
424 if (ngx_write_chain_to_file(p->temp_file, out, p->temp_offset, | |
425 p->pool) == NGX_ERROR) { | |
426 return NGX_ABORT; | |
427 } | |
428 | |
429 for (last_free = &p->free_raw_hunks; | |
430 *last_free != NULL; | |
431 last_free = &(*last_free)->next) | |
432 { | |
433 /* void */ | |
434 } | |
435 | |
436 for (ce = out; ce; ce = next) { | |
437 next = ce->next; | |
438 ce->next = NULL; | |
439 | |
440 h = ce->hunk; | |
441 h->type |= NGX_HUNK_FILE; | |
442 h->file = p->temp_file; | |
443 h->file_pos = p->temp_offset; | |
444 p->temp_offset += h->last - h->pos; | |
445 h->file_last = p->temp_offset; | |
446 | |
447 ngx_chain_add_ce(p->out, p->last_out, ce); | |
448 | |
449 if (h->type & NGX_HUNK_LAST_SHADOW) { | |
450 h->shadow->last = h->shadow->pos = h->shadow->start; | |
451 ngx_alloc_ce_and_set_hunk(te, h->shadow, p->pool, NGX_ABORT); | |
452 *last_free = te; | |
453 last_free = &te->next; | |
454 } | |
455 } | |
456 | |
457 return NGX_OK; | |
458 } | |
459 | |
460 | |
461 /* the copy input filter */ | |
462 | |
463 int ngx_event_pipe_copy_input_filter(ngx_event_pipe_t *p, ngx_hunk_t *hunk) | |
464 { | |
465 ngx_hunk_t *h; | |
466 ngx_chain_t *ce; | |
467 | |
468 if (hunk->pos == hunk->last) { | |
469 return NGX_OK; | |
470 } | |
471 | |
472 if (p->free) { | |
473 h = p->free->hunk; | |
474 p->free = p->free->next; | |
475 | |
476 } else { | |
477 ngx_test_null(h, ngx_alloc_hunk(p->pool), NGX_ERROR); | |
478 } | |
479 | |
480 ngx_memcpy(h, hunk, sizeof(ngx_hunk_t)); | |
481 h->shadow = hunk; | |
482 h->type |= NGX_HUNK_LAST_SHADOW|NGX_HUNK_RECYCLED; | |
483 hunk->shadow = h; | |
484 | |
485 ngx_alloc_ce_and_set_hunk(ce, h, p->pool, NGX_ERROR); | |
486 ngx_chain_add_ce(p->in, p->last_in, ce); | |
487 | |
488 return NGX_OK; | |
489 } | |
490 | |
491 | |
492 ngx_inline static void ngx_remove_shadow_links(ngx_hunk_t *hunk) | |
493 { | |
494 ngx_hunk_t *h, *next; | |
495 | |
496 if (hunk->shadow == NULL) { | |
497 return; | |
498 } | |
499 | |
500 h = hunk->shadow; | |
501 | |
502 while (!(h->type & NGX_HUNK_LAST_SHADOW)) { | |
503 next = h->shadow; | |
504 h->type &= ~(NGX_HUNK_TEMP|NGX_HUNK_IN_MEMORY|NGX_HUNK_RECYCLED); | |
505 h->shadow = NULL; | |
506 h = next; | |
507 } | |
508 | |
509 h->type &= ~(NGX_HUNK_TEMP | |
510 |NGX_HUNK_IN_MEMORY | |
511 |NGX_HUNK_RECYCLED | |
512 |NGX_HUNK_LAST_SHADOW); | |
513 h->shadow = NULL; | |
514 | |
515 hunk->shadow = NULL; | |
516 } | |
517 | |
518 | |
519 ngx_inline static void ngx_remove_shadow_free_raw_hunk(ngx_chain_t **free, | |
520 ngx_hunk_t *h) | |
521 { | |
522 ngx_hunk_t *s; | |
523 ngx_chain_t *ce, **le; | |
524 | |
525 if (h->shadow == NULL) { | |
526 return; | |
527 } | |
528 | |
529 for (s = h->shadow; !(s->type & NGX_HUNK_LAST_SHADOW); s = s->shadow) { | |
530 /* void */ | |
531 } | |
532 | |
533 le = free; | |
534 | |
535 for (ce = *free ; ce; ce = ce->next) { | |
536 if (ce->hunk == s) { | |
537 *le = ce->next; | |
538 break; | |
539 } | |
540 | |
541 if (ce->hunk->shadow) { | |
542 break; | |
543 } | |
544 | |
545 le = &ce->next; | |
546 } | |
547 } | |
548 | |
549 | |
550 ngx_inline static void ngx_add_after_partially_filled_hunk(ngx_chain_t **chain, | |
551 ngx_chain_t *ce) | |
552 { | |
553 if (*chain == NULL) { | |
554 *chain = ce; | |
555 return; | |
556 } | |
557 | |
558 if ((*chain)->hunk->pos != (*chain)->hunk->last) { | |
559 ce->next = (*chain)->next; | |
560 (*chain)->next = ce; | |
561 | |
562 } else { | |
563 ce->next = (*chain); | |
564 (*chain) = ce; | |
565 } | |
566 } | |
567 | |
568 | |
569 static int ngx_drain_chains(ngx_event_pipe_t *p) | |
570 { | |
571 ngx_hunk_t *h; | |
572 ngx_chain_t *ce, *te; | |
573 | |
574 for ( ;; ) { | |
575 if (p->busy) { | |
576 ce = p->busy; | |
577 | |
578 } else if (p->out) { | |
579 ce = p->out; | |
580 | |
581 } else if (p->in) { | |
582 ce = p->in; | |
583 | |
584 } else { | |
585 return NGX_OK; | |
586 } | |
587 | |
588 while (ce) { | |
589 if (ce->hunk->type & NGX_HUNK_LAST_SHADOW) { | |
590 h = ce->hunk->shadow; | |
591 /* THINK NEEDED ??? */ h->pos = h->last = h->start; | |
592 h->shadow = NULL; | |
593 ngx_alloc_ce_and_set_hunk(te, h, p->pool, NGX_ABORT); | |
594 ngx_add_after_partially_filled_hunk(&p->free_raw_hunks, te); | |
595 | |
596 ce->hunk->type &= ~NGX_HUNK_LAST_SHADOW; | |
597 } | |
598 | |
599 ce->hunk->shadow = NULL; | |
600 te = ce->next; | |
601 ce->next = p->free; | |
602 p->free = ce; | |
603 ce = te; | |
604 } | |
605 } | |
606 } |