Mercurial > hg > nginx
comparison src/stream/ngx_stream_proxy_module.c @ 6436:8f038068f4bc
Stream: UDP proxy.
author | Roman Arutyunyan <arut@nginx.com> |
---|---|
date | Wed, 20 Jan 2016 19:52:12 +0300 |
parents | d1c791479bbb |
children | a01e315b3a78 |
comparison
equal
deleted
inserted
replaced
6435:d1c791479bbb | 6436:8f038068f4bc |
---|---|
15 ngx_msec_t timeout; | 15 ngx_msec_t timeout; |
16 ngx_msec_t next_upstream_timeout; | 16 ngx_msec_t next_upstream_timeout; |
17 size_t buffer_size; | 17 size_t buffer_size; |
18 size_t upload_rate; | 18 size_t upload_rate; |
19 size_t download_rate; | 19 size_t download_rate; |
20 ngx_uint_t responses; | |
20 ngx_uint_t next_upstream_tries; | 21 ngx_uint_t next_upstream_tries; |
21 ngx_flag_t next_upstream; | 22 ngx_flag_t next_upstream; |
22 ngx_flag_t proxy_protocol; | 23 ngx_flag_t proxy_protocol; |
23 ngx_addr_t *local; | 24 ngx_addr_t *local; |
24 | 25 |
165 ngx_conf_set_size_slot, | 166 ngx_conf_set_size_slot, |
166 NGX_STREAM_SRV_CONF_OFFSET, | 167 NGX_STREAM_SRV_CONF_OFFSET, |
167 offsetof(ngx_stream_proxy_srv_conf_t, download_rate), | 168 offsetof(ngx_stream_proxy_srv_conf_t, download_rate), |
168 NULL }, | 169 NULL }, |
169 | 170 |
171 { ngx_string("proxy_responses"), | |
172 NGX_STREAM_MAIN_CONF|NGX_STREAM_SRV_CONF|NGX_CONF_TAKE1, | |
173 ngx_conf_set_num_slot, | |
174 NGX_STREAM_SRV_CONF_OFFSET, | |
175 offsetof(ngx_stream_proxy_srv_conf_t, responses), | |
176 NULL }, | |
177 | |
170 { ngx_string("proxy_next_upstream"), | 178 { ngx_string("proxy_next_upstream"), |
171 NGX_STREAM_MAIN_CONF|NGX_STREAM_SRV_CONF|NGX_CONF_FLAG, | 179 NGX_STREAM_MAIN_CONF|NGX_STREAM_SRV_CONF|NGX_CONF_FLAG, |
172 ngx_conf_set_flag_slot, | 180 ngx_conf_set_flag_slot, |
173 NGX_STREAM_SRV_CONF_OFFSET, | 181 NGX_STREAM_SRV_CONF_OFFSET, |
174 offsetof(ngx_stream_proxy_srv_conf_t, next_upstream), | 182 offsetof(ngx_stream_proxy_srv_conf_t, next_upstream), |
349 | 357 |
350 u->peer.log = c->log; | 358 u->peer.log = c->log; |
351 u->peer.log_error = NGX_ERROR_ERR; | 359 u->peer.log_error = NGX_ERROR_ERR; |
352 | 360 |
353 u->peer.local = pscf->local; | 361 u->peer.local = pscf->local; |
362 u->peer.type = c->type; | |
354 | 363 |
355 uscf = pscf->upstream; | 364 uscf = pscf->upstream; |
356 | 365 |
357 if (uscf->peer.init(s, uscf) != NGX_OK) { | 366 if (uscf->peer.init(s, uscf) != NGX_OK) { |
358 ngx_stream_proxy_finalize(s, NGX_ERROR); | 367 ngx_stream_proxy_finalize(s, NGX_ERROR); |
368 } | 377 } |
369 | 378 |
370 u->proxy_protocol = pscf->proxy_protocol; | 379 u->proxy_protocol = pscf->proxy_protocol; |
371 u->start_sec = ngx_time(); | 380 u->start_sec = ngx_time(); |
372 | 381 |
382 c->write->handler = ngx_stream_proxy_downstream_handler; | |
383 c->read->handler = ngx_stream_proxy_downstream_handler; | |
384 | |
385 if (c->type == SOCK_DGRAM) { | |
386 ngx_stream_proxy_connect(s); | |
387 return; | |
388 } | |
389 | |
373 p = ngx_pnalloc(c->pool, pscf->buffer_size); | 390 p = ngx_pnalloc(c->pool, pscf->buffer_size); |
374 if (p == NULL) { | 391 if (p == NULL) { |
375 ngx_stream_proxy_finalize(s, NGX_ERROR); | 392 ngx_stream_proxy_finalize(s, NGX_ERROR); |
376 return; | 393 return; |
377 } | 394 } |
378 | 395 |
379 u->downstream_buf.start = p; | 396 u->downstream_buf.start = p; |
380 u->downstream_buf.end = p + pscf->buffer_size; | 397 u->downstream_buf.end = p + pscf->buffer_size; |
381 u->downstream_buf.pos = p; | 398 u->downstream_buf.pos = p; |
382 u->downstream_buf.last = p; | 399 u->downstream_buf.last = p; |
383 | |
384 c->write->handler = ngx_stream_proxy_downstream_handler; | |
385 c->read->handler = ngx_stream_proxy_downstream_handler; | |
386 | 400 |
387 if (u->proxy_protocol | 401 if (u->proxy_protocol |
388 #if (NGX_STREAM_SSL) | 402 #if (NGX_STREAM_SSL) |
389 && pscf->ssl == NULL | 403 && pscf->ssl == NULL |
390 #endif | 404 #endif |
486 u = s->upstream; | 500 u = s->upstream; |
487 pc = u->peer.connection; | 501 pc = u->peer.connection; |
488 | 502 |
489 cscf = ngx_stream_get_module_srv_conf(s, ngx_stream_core_module); | 503 cscf = ngx_stream_get_module_srv_conf(s, ngx_stream_core_module); |
490 | 504 |
491 if (cscf->tcp_nodelay && pc->tcp_nodelay == NGX_TCP_NODELAY_UNSET) { | 505 if (pc->type == SOCK_STREAM |
506 && cscf->tcp_nodelay | |
507 && pc->tcp_nodelay == NGX_TCP_NODELAY_UNSET) | |
508 { | |
492 ngx_log_debug0(NGX_LOG_DEBUG_STREAM, pc->log, 0, "tcp_nodelay"); | 509 ngx_log_debug0(NGX_LOG_DEBUG_STREAM, pc->log, 0, "tcp_nodelay"); |
493 | 510 |
494 tcp_nodelay = 1; | 511 tcp_nodelay = 1; |
495 | 512 |
496 if (setsockopt(pc->fd, IPPROTO_TCP, TCP_NODELAY, | 513 if (setsockopt(pc->fd, IPPROTO_TCP, TCP_NODELAY, |
514 } | 531 } |
515 | 532 |
516 pscf = ngx_stream_get_module_srv_conf(s, ngx_stream_proxy_module); | 533 pscf = ngx_stream_get_module_srv_conf(s, ngx_stream_proxy_module); |
517 | 534 |
518 #if (NGX_STREAM_SSL) | 535 #if (NGX_STREAM_SSL) |
519 if (pscf->ssl && pc->ssl == NULL) { | 536 if (pc->type == SOCK_STREAM && pscf->ssl && pc->ssl == NULL) { |
520 ngx_stream_proxy_ssl_init_connection(s); | 537 ngx_stream_proxy_ssl_init_connection(s); |
521 return; | 538 return; |
522 } | 539 } |
523 #endif | 540 #endif |
524 | 541 |
542 } | 559 } |
543 } | 560 } |
544 | 561 |
545 c->log->action = "proxying connection"; | 562 c->log->action = "proxying connection"; |
546 | 563 |
547 p = ngx_pnalloc(c->pool, pscf->buffer_size); | 564 if (u->upstream_buf.start == NULL) { |
548 if (p == NULL) { | 565 p = ngx_pnalloc(c->pool, pscf->buffer_size); |
549 ngx_stream_proxy_finalize(s, NGX_ERROR); | 566 if (p == NULL) { |
550 return; | 567 ngx_stream_proxy_finalize(s, NGX_ERROR); |
551 } | 568 return; |
552 | 569 } |
553 u->upstream_buf.start = p; | 570 |
554 u->upstream_buf.end = p + pscf->buffer_size; | 571 u->upstream_buf.start = p; |
555 u->upstream_buf.pos = p; | 572 u->upstream_buf.end = p + pscf->buffer_size; |
556 u->upstream_buf.last = p; | 573 u->upstream_buf.pos = p; |
574 u->upstream_buf.last = p; | |
575 } | |
576 | |
577 if (c->type == SOCK_DGRAM) { | |
578 s->received = c->buffer->last - c->buffer->pos; | |
579 u->downstream_buf = *c->buffer; | |
580 | |
581 if (pscf->responses == 0) { | |
582 pc->read->ready = 0; | |
583 pc->read->eof = 1; | |
584 } | |
585 } | |
557 | 586 |
558 u->connected = 1; | 587 u->connected = 1; |
559 | 588 |
560 pc->read->handler = ngx_stream_proxy_upstream_handler; | 589 pc->read->handler = ngx_stream_proxy_upstream_handler; |
561 pc->write->handler = ngx_stream_proxy_upstream_handler; | 590 pc->write->handler = ngx_stream_proxy_upstream_handler; |
562 | 591 |
563 if (pc->read->ready) { | 592 if (pc->read->ready || pc->read->eof) { |
564 ngx_post_event(pc->read, &ngx_posted_events); | 593 ngx_post_event(pc->read, &ngx_posted_events); |
565 } | 594 } |
566 | 595 |
567 ngx_stream_proxy_process(s, 0, 1); | 596 ngx_stream_proxy_process(s, 0, 1); |
568 } | 597 } |
892 | 921 |
893 c = ev->data; | 922 c = ev->data; |
894 s = c->data; | 923 s = c->data; |
895 u = s->upstream; | 924 u = s->upstream; |
896 | 925 |
926 c = s->connection; | |
927 pc = u->peer.connection; | |
928 | |
929 pscf = ngx_stream_get_module_srv_conf(s, ngx_stream_proxy_module); | |
930 | |
897 if (ev->timedout) { | 931 if (ev->timedout) { |
932 ev->timedout = 0; | |
898 | 933 |
899 if (ev->delayed) { | 934 if (ev->delayed) { |
900 | |
901 ev->timedout = 0; | |
902 ev->delayed = 0; | 935 ev->delayed = 0; |
903 | 936 |
904 if (!ev->ready) { | 937 if (!ev->ready) { |
905 if (ngx_handle_read_event(ev, 0) != NGX_OK) { | 938 if (ngx_handle_read_event(ev, 0) != NGX_OK) { |
906 ngx_stream_proxy_finalize(s, NGX_ERROR); | 939 ngx_stream_proxy_finalize(s, NGX_ERROR); |
907 return; | 940 return; |
908 } | 941 } |
909 | 942 |
910 if (u->connected) { | 943 if (u->connected && !c->read->delayed && !pc->read->delayed) { |
911 pc = u->peer.connection; | 944 ngx_add_timer(c->write, pscf->timeout); |
912 | |
913 if (!c->read->delayed && !pc->read->delayed) { | |
914 pscf = ngx_stream_get_module_srv_conf(s, | |
915 ngx_stream_proxy_module); | |
916 ngx_add_timer(c->write, pscf->timeout); | |
917 } | |
918 } | 945 } |
919 | 946 |
920 return; | 947 return; |
921 } | 948 } |
922 | 949 |
923 } else { | 950 } else { |
951 if (s->connection->type == SOCK_DGRAM) { | |
952 if (pscf->responses == NGX_MAX_INT32_VALUE) { | |
953 | |
954 /* | |
955 * successfully terminate timed out UDP session | |
956 * with unspecified number of responses | |
957 */ | |
958 | |
959 pc->read->ready = 0; | |
960 pc->read->eof = 1; | |
961 | |
962 ngx_stream_proxy_process(s, 1, 0); | |
963 return; | |
964 } | |
965 | |
966 if (u->received == 0) { | |
967 ngx_stream_proxy_next_upstream(s); | |
968 return; | |
969 } | |
970 } | |
971 | |
924 ngx_connection_error(c, NGX_ETIMEDOUT, "connection timed out"); | 972 ngx_connection_error(c, NGX_ETIMEDOUT, "connection timed out"); |
925 ngx_stream_proxy_finalize(s, NGX_DECLINED); | 973 ngx_stream_proxy_finalize(s, NGX_DECLINED); |
926 return; | 974 return; |
927 } | 975 } |
928 | 976 |
1037 u = s->upstream; | 1085 u = s->upstream; |
1038 | 1086 |
1039 c = s->connection; | 1087 c = s->connection; |
1040 pc = u->connected ? u->peer.connection : NULL; | 1088 pc = u->connected ? u->peer.connection : NULL; |
1041 | 1089 |
1090 if (c->type == SOCK_DGRAM && (ngx_terminate || ngx_exiting)) { | |
1091 | |
1092 /* socket is already closed on worker shutdown */ | |
1093 | |
1094 handler = c->log->handler; | |
1095 c->log->handler = NULL; | |
1096 | |
1097 ngx_log_error(NGX_LOG_INFO, c->log, 0, "disconnected on shutdown"); | |
1098 | |
1099 c->log->handler = handler; | |
1100 | |
1101 ngx_stream_proxy_finalize(s, NGX_OK); | |
1102 return; | |
1103 } | |
1104 | |
1042 pscf = ngx_stream_get_module_srv_conf(s, ngx_stream_proxy_module); | 1105 pscf = ngx_stream_get_module_srv_conf(s, ngx_stream_proxy_module); |
1043 | 1106 |
1044 if (from_upstream) { | 1107 if (from_upstream) { |
1045 src = pc; | 1108 src = pc; |
1046 dst = c; | 1109 dst = c; |
1064 | 1127 |
1065 if (size && dst && dst->write->ready) { | 1128 if (size && dst && dst->write->ready) { |
1066 | 1129 |
1067 n = dst->send(dst, b->pos, size); | 1130 n = dst->send(dst, b->pos, size); |
1068 | 1131 |
1132 if (n == NGX_AGAIN && dst->shared) { | |
1133 /* cannot wait on a shared socket */ | |
1134 n = NGX_ERROR; | |
1135 } | |
1136 | |
1069 if (n == NGX_ERROR) { | 1137 if (n == NGX_ERROR) { |
1138 if (c->type == SOCK_DGRAM && !from_upstream) { | |
1139 ngx_stream_proxy_next_upstream(s); | |
1140 return; | |
1141 } | |
1142 | |
1070 ngx_stream_proxy_finalize(s, NGX_DECLINED); | 1143 ngx_stream_proxy_finalize(s, NGX_DECLINED); |
1071 return; | 1144 return; |
1072 } | 1145 } |
1073 | 1146 |
1074 if (n > 0) { | 1147 if (n > 0) { |
1116 src->read->delayed = 1; | 1189 src->read->delayed = 1; |
1117 ngx_add_timer(src->read, delay); | 1190 ngx_add_timer(src->read, delay); |
1118 } | 1191 } |
1119 } | 1192 } |
1120 | 1193 |
1194 if (c->type == SOCK_DGRAM && ++u->responses == pscf->responses) | |
1195 { | |
1196 src->read->ready = 0; | |
1197 src->read->eof = 1; | |
1198 } | |
1199 | |
1121 *received += n; | 1200 *received += n; |
1122 b->last += n; | 1201 b->last += n; |
1123 do_write = 1; | 1202 do_write = 1; |
1124 | 1203 |
1125 continue; | 1204 continue; |
1126 } | 1205 } |
1127 | 1206 |
1128 if (n == NGX_ERROR) { | 1207 if (n == NGX_ERROR) { |
1208 if (c->type == SOCK_DGRAM && u->received == 0) { | |
1209 ngx_stream_proxy_next_upstream(s); | |
1210 return; | |
1211 } | |
1212 | |
1129 src->read->eof = 1; | 1213 src->read->eof = 1; |
1130 } | 1214 } |
1131 } | 1215 } |
1132 | 1216 |
1133 break; | 1217 break; |
1150 return; | 1234 return; |
1151 } | 1235 } |
1152 | 1236 |
1153 flags = src->read->eof ? NGX_CLOSE_EVENT : 0; | 1237 flags = src->read->eof ? NGX_CLOSE_EVENT : 0; |
1154 | 1238 |
1155 if (ngx_handle_read_event(src->read, flags) != NGX_OK) { | 1239 if (!src->shared && ngx_handle_read_event(src->read, flags) != NGX_OK) { |
1156 ngx_stream_proxy_finalize(s, NGX_ERROR); | 1240 ngx_stream_proxy_finalize(s, NGX_ERROR); |
1157 return; | 1241 return; |
1158 } | 1242 } |
1159 | 1243 |
1160 if (dst) { | 1244 if (dst) { |
1161 if (ngx_handle_write_event(dst->write, 0) != NGX_OK) { | 1245 if (!dst->shared && ngx_handle_write_event(dst->write, 0) != NGX_OK) { |
1162 ngx_stream_proxy_finalize(s, NGX_ERROR); | 1246 ngx_stream_proxy_finalize(s, NGX_ERROR); |
1163 return; | 1247 return; |
1164 } | 1248 } |
1165 | 1249 |
1166 if (!c->read->delayed && !pc->read->delayed) { | 1250 if (!c->read->delayed && !pc->read->delayed) { |
1329 conf->timeout = NGX_CONF_UNSET_MSEC; | 1413 conf->timeout = NGX_CONF_UNSET_MSEC; |
1330 conf->next_upstream_timeout = NGX_CONF_UNSET_MSEC; | 1414 conf->next_upstream_timeout = NGX_CONF_UNSET_MSEC; |
1331 conf->buffer_size = NGX_CONF_UNSET_SIZE; | 1415 conf->buffer_size = NGX_CONF_UNSET_SIZE; |
1332 conf->upload_rate = NGX_CONF_UNSET_SIZE; | 1416 conf->upload_rate = NGX_CONF_UNSET_SIZE; |
1333 conf->download_rate = NGX_CONF_UNSET_SIZE; | 1417 conf->download_rate = NGX_CONF_UNSET_SIZE; |
1418 conf->responses = NGX_CONF_UNSET_UINT; | |
1334 conf->next_upstream_tries = NGX_CONF_UNSET_UINT; | 1419 conf->next_upstream_tries = NGX_CONF_UNSET_UINT; |
1335 conf->next_upstream = NGX_CONF_UNSET; | 1420 conf->next_upstream = NGX_CONF_UNSET; |
1336 conf->proxy_protocol = NGX_CONF_UNSET; | 1421 conf->proxy_protocol = NGX_CONF_UNSET; |
1337 conf->local = NGX_CONF_UNSET_PTR; | 1422 conf->local = NGX_CONF_UNSET_PTR; |
1338 | 1423 |
1371 prev->upload_rate, 0); | 1456 prev->upload_rate, 0); |
1372 | 1457 |
1373 ngx_conf_merge_size_value(conf->download_rate, | 1458 ngx_conf_merge_size_value(conf->download_rate, |
1374 prev->download_rate, 0); | 1459 prev->download_rate, 0); |
1375 | 1460 |
1461 ngx_conf_merge_uint_value(conf->responses, | |
1462 prev->responses, NGX_MAX_INT32_VALUE); | |
1463 | |
1376 ngx_conf_merge_uint_value(conf->next_upstream_tries, | 1464 ngx_conf_merge_uint_value(conf->next_upstream_tries, |
1377 prev->next_upstream_tries, 0); | 1465 prev->next_upstream_tries, 0); |
1378 | 1466 |
1379 ngx_conf_merge_value(conf->next_upstream, prev->next_upstream, 1); | 1467 ngx_conf_merge_value(conf->next_upstream, prev->next_upstream, 1); |
1380 | 1468 |