comparison src/event/ngx_event_proxy.c @ 147:be71fca7f9d7

nginx-0.0.1-2003-10-14-19:06:38 import
author Igor Sysoev <igor@sysoev.ru>
date Tue, 14 Oct 2003 15:06:38 +0000
parents 5ac79e574285
children 5afee0074707
comparison
equal deleted inserted replaced
146:5ac79e574285 147:be71fca7f9d7
1 1
2 #include <ngx_config.h> 2 #include <ngx_config.h>
3 #include <ngx_core.h> 3 #include <ngx_core.h>
4 #include <ngx_event.h> 4 #include <ngx_event.h>
5 #include <ngx_event_proxy.h> 5 #include <ngx_event_proxy.h>
6
7
8 #define NGX_EVENT_COPY_FILTER 0
9
10 #if (NGX_EVENT_COPY_FILTER)
11 static int ngx_event_proxy_copy_input_filter(ngx_event_proxy_t *p,
12 ngx_chain_t *chain);
13 #endif
14
15 6
16 7
17 int ngx_event_proxy_read_upstream(ngx_event_proxy_t *p) 8 int ngx_event_proxy_read_upstream(ngx_event_proxy_t *p)
18 { 9 {
19 int n, rc, size; 10 int n, rc, size;
20 ngx_hunk_t *h, *nh; 11 ngx_hunk_t *h, *nh;
21 ngx_chain_t *chain, *rest, *ce, *next; 12 ngx_chain_t *chain, *rest, *ce, *next;
22 13
23 #if (NGX_SUPPRESS_WARN)
24 rest = NULL; 14 rest = NULL;
25 #endif
26
27 #if (NGX_EVENT_COPY_FILTER)
28
29 if (p->input_filter == NULL) {
30 p->input_filter = ngx_event_proxy_copy_input_filter;
31 }
32
33 #endif
34 15
35 p->upstream_level++; 16 p->upstream_level++;
36 17
37 ngx_log_debug(p->log, "read upstream"); 18 ngx_log_debug(p->log, "read upstream");
38 19
63 if (p->upstream->read->error) { 44 if (p->upstream->read->error) {
64 ngx_log_error(NGX_LOG_ERR, p->log, p->upstream->read->error, 45 ngx_log_error(NGX_LOG_ERR, p->log, p->upstream->read->error,
65 "readv() failed"); 46 "readv() failed");
66 p->upstream_error = 1; 47 p->upstream_error = 1;
67 48
68 return NGX_ERROR;
69
70 } else if (p->upstream->read->eof 49 } else if (p->upstream->read->eof
71 && p->upstream->read->available == 0) { 50 && p->upstream->read->available == 0) {
72 p->upstream_eof = 1; 51 p->upstream_eof = 1;
73 52 }
74 break; 53
75 } 54 if ((p->upstream_eof || p->upstream_error)
55 && p->free_raw_hunk)
56 {
57 if (p->input_filter(p->input_data, p->free_raw_hunk->hunk)
58 == NGX_ERROR) {
59 return NGX_ABORT;
60 }
61
62 /* TODO: p->free_raw_hunk->next can be free()ed */
63 p->free_raw_hunk = p->free_raw_hunk->next;
64 }
65
66 break;
76 } 67 }
77 #endif 68 #endif
78 69
79 if (p->free_hunks) { 70 if (p->free_raw_hunks) {
80 71
81 /* use the free hunks if they exist */ 72 /* use the free hunks if they exist */
82 73
83 chain = p->free_hunks; 74 chain = p->free_raw_hunks;
84 p->free_hunks = NULL; 75 p->free_raw_hunks = NULL;
85 76
86 ngx_log_debug(p->log, "free hunk: %08X:%d" _ chain->hunk _ 77 ngx_log_debug(p->log, "free hunk: %08X:%d" _ chain->hunk _
87 chain->hunk->end - chain->hunk->last); 78 chain->hunk->end - chain->hunk->last);
88 79
89 } else if (p->hunks < p->bufs.num) { 80 } else if (p->hunks < p->bufs.num) {
98 ngx_alloc_ce_and_set_hunk(te, h, p->pool, NGX_ABORT); 89 ngx_alloc_ce_and_set_hunk(te, h, p->pool, NGX_ABORT);
99 chain = te; 90 chain = te;
100 91
101 ngx_log_debug(p->log, "new hunk: %08X" _ chain->hunk); 92 ngx_log_debug(p->log, "new hunk: %08X" _ chain->hunk);
102 93
103 } else if (p->file_hunks) {
104
105 /* use the file hunks if they exist */
106
107 chain = p->file_hunks;
108 p->file_hunks = NULL;
109
110 ngx_log_debug(p->log, "file hunk: %08X" _ chain->hunk _
111 chain->hunk->end - chain->hunk->last);
112
113 } else if (!p->cachable && p->downstream->write->ready) { 94 } else if (!p->cachable && p->downstream->write->ready) {
114 95
115 /* 96 /*
116 * if the hunks are not needed to be saved in a cache and 97 * If the hunks are not needed to be saved in a cache and
117 * a downstream is ready then write the hunks to a downstream 98 * a downstream is ready then write the hunks to a downstream.
118 */ 99 */
119 100
120 ngx_log_debug(p->log, "downstream ready"); 101 ngx_log_debug(p->log, "downstream ready");
121 102
122 break; 103 break;
123 104
124 } else if (p->temp_offset < p->max_temp_file_size) { 105 } else if (p->temp_offset < p->max_temp_file_size) {
125 106
126 /* 107 /*
127 * if it's allowed then save the incoming hunks to a temporary 108 * If it's allowed then save some hunks from r->in
128 * file, move the saved read hunks to a file chain, 109 * to a temporary file, and add them to a r->out chain.
129 * convert the incoming hunks into the file hunks
130 * and add them to an outgoing chain
131 */ 110 */
132 111
133 rc = ngx_event_proxy_write_chain_to_temp_file(p); 112 rc = ngx_event_proxy_write_chain_to_temp_file(p);
134 113
135 ngx_log_debug(p->log, "temp offset: %d" _ p->temp_offset); 114 ngx_log_debug(p->log, "temp offset: %d" _ p->temp_offset);
115
116 if (rc == NGX_AGAIN) {
117 if (ngx_event_flags & NGX_USE_LEVEL_EVENT
118 && p->upstream->read->active
119 && p->upstream->read->ready)
120 {
121 if (ngx_del_event(p->upstream->read, NGX_READ_EVENT, 0)
122 == NGX_ERROR)
123 {
124 return NGX_ABORT;
125 }
126 }
127 }
136 128
137 if (rc != NGX_OK) { 129 if (rc != NGX_OK) {
138 return rc; 130 return rc;
139 } 131 }
140 132
141 chain = p->file_hunks; 133 chain = p->free_raw_hunks;
142 p->file_hunks = NULL; 134 p->free_raw_hunks = NULL;
143 135
144 ngx_log_debug(p->log, "new file hunk: %08X:%d" _ chain->hunk _ 136 ngx_log_debug(p->log, "new file hunk: %08X:%d" _ chain->hunk _
145 chain->hunk->end - chain->hunk->last); 137 chain->hunk->end - chain->hunk->last);
146 138
147 } else { 139 } else {
161 p->upstream_error = 1; 153 p->upstream_error = 1;
162 return NGX_ERROR; 154 return NGX_ERROR;
163 } 155 }
164 156
165 if (n == NGX_AGAIN) { 157 if (n == NGX_AGAIN) {
166 if (ngx_handle_read_event(p->upstream->read) == NGX_ERROR) {
167 return NGX_ABORT;
168 }
169
170 break; 158 break;
171 } 159 }
172 160
173 if (n == 0) { 161 if (n == 0) {
162 THINK
174 if (chain->hunk->shadow == NULL) { 163 if (chain->hunk->shadow == NULL) {
175 p->free_hunks = chain; 164 p->free_hunks = chain;
176 } 165 }
177 p->upstream_eof = 1; 166 p->upstream_eof = 1;
178 167
179 break; 168 break;
180 } 169 }
181 170
182 } 171 }
183 172
184 /* 173 for (ce = chain; ce && n > 0; ce = ce->next) {
185 * move the full hunks to a read chain, the partial filled hunk 174
186 * to a free chain, and remove the shadow links for these hunks 175 size = ce->hunk->end - ce->hunk->last;
187 */ 176
188 177 if (n >= size) {
189 for (ce = chain; ce && n > 0; ce = next) { 178 ce->hunk->last = ce->hunk->end;
179
180 if (p->input_filter(p->input_data, ce->hunk) == NGX_ERROR) {
181 return NGX_ABORT;
182 }
183
184 n -= size;
185
186 } else if (p->upstream_eof || p->upstream_error) {
187
188 if (p->input_filter(p->input_data, ce->hunk) == NGX_ERROR) {
189 return NGX_ABORT;
190 }
191
192 } else {
193 ce->hunk->last += n;
194 n = 0;
195 }
196
197
198
199
200
201
202
203
190 next = ce->next; 204 next = ce->next;
191 ce->next = NULL; 205 ce->next = NULL;
192 206
193 if (ce->hunk->shadow) { 207 if (ce->hunk->shadow) {
194 for (h = ce->hunk->shadow; 208 for (h = ce->hunk->shadow;
224 238
225 p->last_read_hunk = ce; 239 p->last_read_hunk = ce;
226 240
227 n -= size; 241 n -= size;
228 242
229 #if !(NGX_EVENT_COPY_FILTER)
230
231 if (p->input_filter) {
232 continue;
233 }
234
235 /* the inline copy input filter */
236
237 ngx_test_null(h, ngx_alloc_hunk(p->pool), NGX_ABORT);
238
239 ngx_memcpy(h, ce->hunk, sizeof(ngx_hunk_t));
240 h->shadow = ce->hunk;
241 h->type |= NGX_HUNK_LAST_SHADOW|NGX_HUNK_RECYCLED;
242 ce->hunk->shadow = h;
243
244 ngx_alloc_ce_and_set_hunk(te, h, p->pool, NGX_ABORT);
245
246 ngx_chain_add_ce(p->in_hunks, p->last_in_hunk, te);
247 #endif
248
249 } else { 243 } else {
250 ce->hunk->last += n; 244 ce->hunk->last += n;
251 p->free_hunks = ce; 245 p->free_hunks = ce;
252 246
253 n = 0; 247 n = 0;
261 /* 255 /*
262 * the input filter i.e. that moves HTTP/1.1 chunks 256 * the input filter i.e. that moves HTTP/1.1 chunks
263 * from a read chain to an incoming chain 257 * from a read chain to an incoming chain
264 */ 258 */
265 259
266 if (p->input_filter) { 260 if (p->input_filter(p, chain) == NGX_ERROR) {
267 if (p->input_filter(p, chain) == NGX_ERROR) { 261 return NGX_ABORT;
268 return NGX_ABORT;
269 }
270 } 262 }
271 263
272 ngx_log_debug(p->log, "rest chain: %08X" _ ce); 264 ngx_log_debug(p->log, "rest chain: %08X" _ ce);
273 265
274 /* 266 /*
376 p->upstream_level--; 368 p->upstream_level--;
377 369
378 ngx_log_debug(p->log, "upstream level: %d" _ p->upstream_level); 370 ngx_log_debug(p->log, "upstream level: %d" _ p->upstream_level);
379 371
380 if (p->upstream_level == 0) { 372 if (p->upstream_level == 0) {
381 if (ngx_handler_read_event(p->upstream->read) == NGX_ERROR) { 373 if (ngx_handle_read_event(p->upstream->read) == NGX_ERROR) {
382 return NGX_ABORT; 374 return NGX_ABORT;
383 } 375 }
384 } 376 }
385 377
386 if (p->upstream_eof) { 378 if (p->upstream_eof) {
393 385
394 386
395 int ngx_event_proxy_write_to_downstream(ngx_event_proxy_t *p) 387 int ngx_event_proxy_write_to_downstream(ngx_event_proxy_t *p)
396 { 388 {
397 ngx_chain_t *out, *ce; 389 ngx_chain_t *out, *ce;
398 ngx_event_proxy_downstream_t *d; 390
399 391 while (p->downstream->write->ready) {
400 d = &p->downstream;
401
402 for ( ;; ) {
403
404 if (!d->write->ready || p->busy_hunks_num == p->max_busy_hunks) {
405 break;
406 }
407 392
408 if (p->out) { 393 if (p->out) {
409 out = p->out; 394 out = p->out;
395
396 if (p->busy_len + ngx_hunk_size(out->hunk)) > p->max_busy_len) {
397 break;
398 }
399
410 p->out = p->out->next; 400 p->out = p->out->next;
401 ngx_remove_shadow_free_raw_hunk(&p->free_raw_hunks, out->hunk);
411 402
412 } else if (!p->cachable && p->in) { 403 } else if (!p->cachable && p->in) {
413 out = p->in; 404 out = p->in;
405
406 if (p->busy_len + ngx_hunk_size(out->hunk)) > p->max_busy_len) {
407 break;
408 }
409
414 p->in = p->in->next; 410 p->in = p->in->next;
415 411
416 } else { 412 } else {
417 break; 413 break;
418 } 414 }
419 415
420 out->next = NULL; 416 out->next = NULL;
421 417
422 rc = p->output_filter(p->output_data, out->hunk); 418 rc = p->output_filter(p->output_data, out->hunk);
423 419
424 ngx_chain_update_chains(p->shadow_free, p->busy, out); 420 ngx_chain_update_chains(p->free, p->busy, out);
425 421
426 for (ce = p->shadow_free; ce; ce = ce->next) { 422 /* calculate p->busy_len */
427 423
428 if (ce->hunk->type & NGX_LAST_SHADOW_HUNK) { 424 p->busy_len = 0;
425 for (ce = p->busy; ce; ce = ce->next) {
426 p->busy_len += ngx_hunk_size(ce->hunk);
427 }
428
429 /* add the free shadow raw hunks to p->free_raw_hunks */
430
431 for (ce = p->free; ce; ce = ce->next) {
432 if (ce->hunk->type & NGX_HUNK_LAST_SHADOW) {
429 h = ce->hunk->shadow; 433 h = ce->hunk->shadow;
430 h->type = (NGX_HUNK_TEMP|NGX_HUNK_IN_MEMORY|NGX_HUNK_RECYCLED); 434 /* THINK NEEDED ??? */ h->pos = h->last = h->start;
431 h->pos = p->last = h->start;
432 h->shadow = NULL; 435 h->shadow = NULL;
433 436 ngx_alloc_ce_and_set_hunk(te, ce->hunk->shadow, p->pool,
434 ngx_alloc_ce_and_set_hunk(te, h, p->pool, NGX_ABORT); 437 NGX_ABORT);
435 te->next = p->free; 438 ngx_add_after_partially_filled_hunk(p->free_raw_hunks, te);
436 p->free = te; 439
437 } 440 ce->hunk->type &= ~NGX_HUNK_LAST_SHADOW;
438 } 441 }
439 442 ce->hunk->shadow = NULL;
440 p->busy_hunks_num = 0;
441 for (ce = p->busy; ce; ce = ce->next) {
442 if (ce->hunk->type & NGX_LAST_SHADOW_HUNK) {
443 p->busy_hunks_num++;
444 }
445 } 443 }
446 444
447 if (p->upstream.read->ready) 445 if (p->upstream.read->ready)
448 if (ngx_event_proxy_read_upstream(p) == NGX_ERROR) { 446 if (ngx_event_proxy_read_upstream(p) == NGX_ERROR) {
449 return NGX_ABORT; 447 return NGX_ABORT;
450 } 448 }
451 } 449 }
452 } 450 }
453 451
454 if (d->level == 0) { 452 if (p->downstream_level == 0) {
455 if (ngx_handler_write_event(d->write) == NGX_ERROR) { 453 if (ngx_handler_write_event(p->downstream->write) == NGX_ERROR) {
456 return NGX_ABORT; 454 return NGX_ABORT;
457 } 455 }
458 } 456 }
459 457
460 if (p->upstream_done && p->in == NULL && p->out == NULL) { 458 if (p->upstream_done && p->in == NULL && p->out == NULL) {
461 p->downstream_done = 1; 459 p->downstream_done = 1;
462 } 460 }
463
464 return NGX_OK;
465 }
466
467
468
469 int ngx_event_proxy_write_to_downstream(ngx_event_proxy_t *p)
470 {
471 int rc;
472 ngx_hunk_t *h;
473 ngx_chain_t *entry;
474
475 #if 0
476
477 if (p->upstream_level == 0
478 && p->downstream_level == 0
479 && p->busy_hunk == NULL
480 && p->out_hunks == NULL
481 && p->in_hunks == NULL
482 && ngx_event_flags & NGX_USE_LEVEL_EVENT)
483 {
484 if (ngx_del_event(p->downstream->write, NGX_WRITE_EVENT, 0)
485 == NGX_ERROR) {
486 return NGX_ABORT;
487 }
488
489 p->downstream->write->blocked = 1;
490 return NGX_AGAIN;
491 }
492
493 #endif
494
495 p->downstream_level++;
496
497 ngx_log_debug(p->log, "write to downstream");
498
499 entry = NULL;
500 h = p->busy_hunk;
501
502 for ( ;; ) {
503
504 if (h == NULL) {
505 if (p->out_hunks) {
506 entry = p->out_hunks;
507 p->out_hunks = entry->next;
508 h = entry->hunk;
509 entry->next = NULL;
510
511 if (p->file_hunks) {
512 if (p->file_hunks->hunk == h->shadow) {
513 p->file_hunks = p->file_hunks->next;
514 }
515 }
516
517
518 } else if (p->cachable == 0 && p->in_hunks) {
519 entry = p->in_hunks;
520 p->in_hunks = entry->next;
521 h = entry->hunk;
522 entry->next = NULL;
523
524 if (p->read_hunks) {
525 if (p->read_hunks->hunk == h->shadow) {
526 p->read_hunks = p->read_hunks->next;
527
528 } else {
529 ngx_log_error(NGX_LOG_CRIT, p->log, 0, "ERROR0");
530 }
531 }
532 }
533
534 ngx_log_debug(p->log, "event proxy write hunk: %08X" _ h);
535
536 if (h == NULL) {
537 break;
538 }
539 }
540
541 #if 0
542 ngx_log_debug(p->log, "event proxy write: %d" _ h->last - h->pos);
543 #endif
544
545 rc = p->output_filter(p->output_data, h);
546
547 ngx_log_debug(p->log, "event proxy: %d" _ rc);
548
549 if (rc == NGX_ERROR) {
550 p->downstream_error = 1;
551 return NGX_ERROR;
552 }
553
554 if (rc == NGX_AGAIN) {
555 #if 0
556 || (h->type & NGX_HUNK_IN_MEMORY && h->pos < h->last)
557 || (h->type & NGX_HUNK_FILE && h->file_pos < h->file_last)
558 #endif
559 if (p->busy_hunk == NULL) {
560 p->busy_hunk = h;
561 }
562
563 if (p->downstream->write->blocked) {
564 if (ngx_add_event(p->downstream->write, NGX_WRITE_EVENT,
565 NGX_LEVEL_EVENT) == NGX_ERROR) {
566 return NGX_ABORT;
567 }
568 p->downstream->write->blocked = 0;
569 }
570
571 p->downstream_level--;
572
573 return NGX_AGAIN;
574 }
575
576 p->busy_hunk = NULL;
577
578 /* if the complete hunk is the file hunk and it has a shadow read hunk
579 then add a shadow read hunk to a free chain */
580
581 if (h->type & NGX_HUNK_FILE) {
582 if (p->cachable == 0 && p->out_hunks == NULL) {
583 p->temp_offset = 0;
584 }
585 }
586
587 if ((h->type & NGX_HUNK_LAST_SHADOW) == 0) {
588 h = NULL;
589 continue;
590 }
591
592
593 h->shadow->shadow = NULL;
594 h = h->shadow;
595
596 #if 0
597 /* free the unneeded hunk */
598
599 if (p->upstream_eof) {
600 ngx_free_hunk(p->pool, h);
601 h = NULL;
602 continue;
603 }
604 #endif
605
606 h->pos = h->last = h->start;
607
608 if (entry == NULL) {
609 h = NULL;
610 continue;
611 }
612
613 entry->hunk = h;
614
615 /* if the first hunk in a free chain is partially filled
616 then add the complete hunk after the first free hunk */
617
618 if (p->free_hunks
619 && p->free_hunks->hunk->start != p->free_hunks->hunk->last)
620 {
621 entry->next = p->free_hunks->next;
622 p->free_hunks->next = entry;
623
624 } else {
625 entry->next = p->free_hunks;
626 p->free_hunks = entry;
627 }
628
629 h = NULL;
630 }
631
632 if (p->upstream->read->ready) {
633 if (ngx_event_proxy_read_upstream(p) == NGX_ERROR) {
634 return NGX_ABORT;
635 }
636 }
637
638 p->downstream_level--;
639
640 ngx_log_debug(p->log, "downstream level: %d" _ p->downstream_level);
641 461
642 return NGX_OK; 462 return NGX_OK;
643 } 463 }
644 464
645 465
746 566
747 return NGX_OK; 567 return NGX_OK;
748 } 568 }
749 569
750 570
751 #if (NGX_EVENT_COPY_FILTER)
752
753 /* the copy input filter */ 571 /* the copy input filter */
754 572
755 static int ngx_event_proxy_copy_input_filter(ngx_event_proxy_t *p, 573 int ngx_event_proxy_copy_input_filter(ngx_event_proxy_t *p, ngx_hunk_t *hunk)
756 ngx_chain_t *chain)
757 { 574 {
758 ngx_hunk_t *h; 575 ngx_hunk_t *h;
759 ngx_chain_t *ce, *temp; 576 ngx_chain_t *ce;
760 577
761 if (p->upstream_eof) { 578 if (hunk->pos == hunk->last) {
762
763 /* TODO: comment */
764
765 ce = p->free_hunks;
766
767 ngx_chain_add_ce(p->in_hunk, p->last_in_hunk, ce);
768
769 p->free_hunks = ce->next;
770 ce->next = NULL;
771
772 return NGX_OK; 579 return NGX_OK;
773 } 580 }
774 581
775 for (ce = chain; ce; ce = ce->next) { 582 if (p->free) {
583 h = p->free->hunk;
584 p->free = p->free->next;
585
586 } else {
776 ngx_test_null(h, ngx_alloc_hunk(p->pool), NGX_ERROR); 587 ngx_test_null(h, ngx_alloc_hunk(p->pool), NGX_ERROR);
777 ngx_memcpy(h, ce->hunk, sizeof(ngx_hunk_t)); 588 }
778 h->shadow = ce->hunk; 589
779 h->type |= NGX_HUNK_LAST_SHADOW|NGX_HUNK_RECYCLED; 590 ngx_memcpy(h, hunk, sizeof(ngx_hunk_t));
780 ce->hunk->shadow = h; 591 h->shadow = hunk;
781 592 h->type |= NGX_HUNK_LAST_SHADOW|NGX_HUNK_RECYCLED;
782 ngx_alloc_ce_and_set_hunk(te, h, p->pool, NGX_ERROR); 593 hunk->shadow = h;
783 594
784 ngx_chain_add_ce(p->in_hunk, p->last_in_hunk, te); 595 ngx_alloc_ce_and_set_hunk(ce, h, p->pool, NGX_ERROR);
785 } 596 ngx_chain_add_ce(p->in_hunk, p->last_in_hunk, ce);
786 597
787 return NGX_OK; 598 return NGX_OK;
788 } 599 }
789 600
790 #endif 601
602 ngx_inline static void ngx_remove_shadow_links(ngx_chain_t *ce)
603 {
604 for (
605 }
606
607
608 ngx_inline static void ngx_remove_shadow_free_raw_hunk(ngx_chain_t **free;
609 ngx_hunk_t *h);
610 {
611 ngx_hunk_t *s;
612
613 if (h->shadow == NULL) {
614 return;
615 }
616
617 for (s = h->shadow; !(s->type & NGX_HUNK_LAST_SHADOW); s = s->shadow) {
618 /* void */
619 }
620
621 le = free;
622
623 for (ce = *free ; ce; ce = ce->next) {
624 if (ce->hunk == s) {
625 *le = ce->next;
626 break;
627 }
628
629 if (ce->hunk->shadow) {
630 break;
631 }
632
633 le = &ce->next;
634 }
635 }
636
637
638 ngx_inline static void ngx_add_after_partially_filled_hunk(ngx_chain_t *chain,
639 ngx_chain_t *ce)
640 {
641 if (chain->hunk->pos != chain->hunk->last) {
642 ce->next = chain->next;
643 chain->next = ce;
644
645 } else {
646 ce->next = chain;
647 chain = ce;
648 }
649 }