comparison src/stream/ngx_stream_proxy_module.c @ 6201:24488e6db782

Stream: upstream and downstream limit rates.
author Roman Arutyunyan <arut@nginx.com>
date Tue, 23 Jun 2015 20:17:48 +0300
parents abee77018d3a
children 6345822f0abb
comparison
equal deleted inserted replaced
6200:abee77018d3a 6201:24488e6db782
16 typedef struct { 16 typedef struct {
17 ngx_msec_t connect_timeout; 17 ngx_msec_t connect_timeout;
18 ngx_msec_t timeout; 18 ngx_msec_t timeout;
19 ngx_msec_t next_upstream_timeout; 19 ngx_msec_t next_upstream_timeout;
20 size_t downstream_buf_size; 20 size_t downstream_buf_size;
21 size_t downstream_limit_rate;
21 size_t upstream_buf_size; 22 size_t upstream_buf_size;
23 size_t upstream_limit_rate;
22 ngx_uint_t next_upstream_tries; 24 ngx_uint_t next_upstream_tries;
23 ngx_flag_t next_upstream; 25 ngx_flag_t next_upstream;
24 ngx_flag_t proxy_protocol; 26 ngx_flag_t proxy_protocol;
25 ngx_addr_t *local; 27 ngx_addr_t *local;
26 28
130 ngx_conf_set_size_slot, 132 ngx_conf_set_size_slot,
131 NGX_STREAM_SRV_CONF_OFFSET, 133 NGX_STREAM_SRV_CONF_OFFSET,
132 offsetof(ngx_stream_proxy_srv_conf_t, downstream_buf_size), 134 offsetof(ngx_stream_proxy_srv_conf_t, downstream_buf_size),
133 NULL }, 135 NULL },
134 136
137 { ngx_string("proxy_downstream_limit_rate"),
138 NGX_STREAM_MAIN_CONF|NGX_STREAM_SRV_CONF|NGX_CONF_TAKE1,
139 ngx_conf_set_size_slot,
140 NGX_STREAM_SRV_CONF_OFFSET,
141 offsetof(ngx_stream_proxy_srv_conf_t, downstream_limit_rate),
142 NULL },
143
135 { ngx_string("proxy_upstream_buffer"), 144 { ngx_string("proxy_upstream_buffer"),
136 NGX_STREAM_MAIN_CONF|NGX_STREAM_SRV_CONF|NGX_CONF_TAKE1, 145 NGX_STREAM_MAIN_CONF|NGX_STREAM_SRV_CONF|NGX_CONF_TAKE1,
137 ngx_conf_set_size_slot, 146 ngx_conf_set_size_slot,
138 NGX_STREAM_SRV_CONF_OFFSET, 147 NGX_STREAM_SRV_CONF_OFFSET,
139 offsetof(ngx_stream_proxy_srv_conf_t, upstream_buf_size), 148 offsetof(ngx_stream_proxy_srv_conf_t, upstream_buf_size),
149 NULL },
150
151 { ngx_string("proxy_upstream_limit_rate"),
152 NGX_STREAM_MAIN_CONF|NGX_STREAM_SRV_CONF|NGX_CONF_TAKE1,
153 ngx_conf_set_size_slot,
154 NGX_STREAM_SRV_CONF_OFFSET,
155 offsetof(ngx_stream_proxy_srv_conf_t, upstream_limit_rate),
140 NULL }, 156 NULL },
141 157
142 { ngx_string("proxy_next_upstream"), 158 { ngx_string("proxy_next_upstream"),
143 NGX_STREAM_MAIN_CONF|NGX_STREAM_SRV_CONF|NGX_CONF_FLAG, 159 NGX_STREAM_MAIN_CONF|NGX_STREAM_SRV_CONF|NGX_CONF_FLAG,
144 ngx_conf_set_flag_slot, 160 ngx_conf_set_flag_slot,
338 { 354 {
339 u->peer.tries = pscf->next_upstream_tries; 355 u->peer.tries = pscf->next_upstream_tries;
340 } 356 }
341 357
342 u->proxy_protocol = pscf->proxy_protocol; 358 u->proxy_protocol = pscf->proxy_protocol;
359 u->start_sec = ngx_time();
343 360
344 p = ngx_pnalloc(c->pool, pscf->downstream_buf_size); 361 p = ngx_pnalloc(c->pool, pscf->downstream_buf_size);
345 if (p == NULL) { 362 if (p == NULL) {
346 ngx_stream_proxy_finalize(s, NGX_ERROR); 363 ngx_stream_proxy_finalize(s, NGX_ERROR);
347 return; 364 return;
829 846
830 847
831 static void 848 static void
832 ngx_stream_proxy_process_connection(ngx_event_t *ev, ngx_uint_t from_upstream) 849 ngx_stream_proxy_process_connection(ngx_event_t *ev, ngx_uint_t from_upstream)
833 { 850 {
834 ngx_connection_t *c; 851 ngx_connection_t *c, *pc;
835 ngx_stream_session_t *s; 852 ngx_stream_session_t *s;
836 ngx_stream_upstream_t *u; 853 ngx_stream_upstream_t *u;
854 ngx_stream_proxy_srv_conf_t *pscf;
837 855
838 c = ev->data; 856 c = ev->data;
839 s = c->data; 857 s = c->data;
840 u = s->upstream; 858 u = s->upstream;
841 859
842 if (ev->timedout) { 860 if (ev->timedout) {
843 ngx_connection_error(c, NGX_ETIMEDOUT, "connection timed out"); 861
844 ngx_stream_proxy_finalize(s, NGX_DECLINED); 862 if (ev->delayed) {
863
864 ev->timedout = 0;
865 ev->delayed = 0;
866
867 if (!ev->ready) {
868 if (ngx_handle_read_event(ev, 0) != NGX_OK) {
869 ngx_stream_proxy_finalize(s, NGX_ERROR);
870 return;
871 }
872
873 if (u->upstream_buf.start) {
874 pc = u->peer.connection;
875
876 if (!c->read->delayed && !pc->read->delayed) {
877 pscf = ngx_stream_get_module_srv_conf(s,
878 ngx_stream_proxy_module);
879 ngx_add_timer(c->write, pscf->timeout);
880 }
881 }
882
883 return;
884 }
885
886 } else {
887 ngx_connection_error(c, NGX_ETIMEDOUT, "connection timed out");
888 ngx_stream_proxy_finalize(s, NGX_DECLINED);
889 return;
890 }
891
892 } else if (ev->delayed) {
893
894 ngx_log_debug0(NGX_LOG_DEBUG_STREAM, c->log, 0,
895 "stream connection delayed");
896
897 if (ngx_handle_read_event(ev, 0) != NGX_OK) {
898 ngx_stream_proxy_finalize(s, NGX_ERROR);
899 }
900
845 return; 901 return;
846 } 902 }
847 903
848 if (from_upstream && u->upstream_buf.start == NULL) { 904 if (from_upstream && u->upstream_buf.start == NULL) {
849 return; 905 return;
928 984
929 static ngx_int_t 985 static ngx_int_t
930 ngx_stream_proxy_process(ngx_stream_session_t *s, ngx_uint_t from_upstream, 986 ngx_stream_proxy_process(ngx_stream_session_t *s, ngx_uint_t from_upstream,
931 ngx_uint_t do_write) 987 ngx_uint_t do_write)
932 { 988 {
933 size_t size; 989 off_t *received, limit;
990 size_t size, limit_rate;
934 ssize_t n; 991 ssize_t n;
935 ngx_buf_t *b; 992 ngx_buf_t *b;
936 ngx_uint_t flags; 993 ngx_uint_t flags;
994 ngx_msec_t delay;
937 ngx_connection_t *c, *pc, *src, *dst; 995 ngx_connection_t *c, *pc, *src, *dst;
938 ngx_log_handler_pt handler; 996 ngx_log_handler_pt handler;
939 ngx_stream_upstream_t *u; 997 ngx_stream_upstream_t *u;
940 ngx_stream_proxy_srv_conf_t *pscf; 998 ngx_stream_proxy_srv_conf_t *pscf;
941 999
942 u = s->upstream; 1000 u = s->upstream;
943 1001
944 c = s->connection; 1002 c = s->connection;
945 pc = u->upstream_buf.start ? u->peer.connection : NULL; 1003 pc = u->upstream_buf.start ? u->peer.connection : NULL;
946 1004
1005 pscf = ngx_stream_get_module_srv_conf(s, ngx_stream_proxy_module);
1006
947 if (from_upstream) { 1007 if (from_upstream) {
948 src = pc; 1008 src = pc;
949 dst = c; 1009 dst = c;
950 b = &u->upstream_buf; 1010 b = &u->upstream_buf;
1011 limit_rate = pscf->upstream_limit_rate;
1012 received = &u->received;
951 1013
952 } else { 1014 } else {
953 src = c; 1015 src = c;
954 dst = pc; 1016 dst = pc;
955 b = &u->downstream_buf; 1017 b = &u->downstream_buf;
1018 limit_rate = pscf->downstream_limit_rate;
1019 received = &s->received;
956 } 1020 }
957 1021
958 for ( ;; ) { 1022 for ( ;; ) {
959 1023
960 if (do_write) { 1024 if (do_write) {
981 } 1045 }
982 } 1046 }
983 1047
984 size = b->end - b->last; 1048 size = b->end - b->last;
985 1049
986 if (size && src->read->ready) { 1050 if (size && src->read->ready && !src->read->delayed) {
1051
1052 if (limit_rate) {
1053 limit = (off_t) limit_rate * (ngx_time() - u->start_sec + 1)
1054 - *received;
1055
1056 if (limit <= 0) {
1057 src->read->delayed = 1;
1058 delay = (ngx_msec_t) (- limit * 1000 / limit_rate + 1);
1059 ngx_add_timer(src->read, delay);
1060 break;
1061 }
1062
1063 if (size > (size_t) limit) {
1064 size = limit;
1065 }
1066 }
987 1067
988 n = src->recv(src, b->last, size); 1068 n = src->recv(src, b->last, size);
989 1069
990 if (n == NGX_AGAIN || n == 0) { 1070 if (n == NGX_AGAIN || n == 0) {
991 break; 1071 break;
992 } 1072 }
993 1073
994 if (n > 0) { 1074 if (n > 0) {
995 if (from_upstream) { 1075 if (limit_rate) {
996 u->received += n; 1076 delay = (ngx_msec_t) (n * 1000 / limit_rate);
997 1077
998 } else { 1078 if (delay > 0) {
999 s->received += n; 1079 src->read->delayed = 1;
1080 ngx_add_timer(src->read, delay);
1081 }
1000 } 1082 }
1001 1083
1084 *received += n;
1085 b->last += n;
1002 do_write = 1; 1086 do_write = 1;
1003 b->last += n; 1087
1004 continue; 1088 continue;
1005 } 1089 }
1006 1090
1007 if (n == NGX_ERROR) { 1091 if (n == NGX_ERROR) {
1008 src->read->eof = 1; 1092 src->read->eof = 1;
1009 } 1093 }
1010 } 1094 }
1011 1095
1012 break; 1096 break;
1013 } 1097 }
1014
1015 pscf = ngx_stream_get_module_srv_conf(s, ngx_stream_proxy_module);
1016 1098
1017 if (src->read->eof && (b->pos == b->last || (dst && dst->read->eof))) { 1099 if (src->read->eof && (b->pos == b->last || (dst && dst->read->eof))) {
1018 handler = c->log->handler; 1100 handler = c->log->handler;
1019 c->log->handler = NULL; 1101 c->log->handler = NULL;
1020 1102
1042 if (ngx_handle_write_event(dst->write, 0) != NGX_OK) { 1124 if (ngx_handle_write_event(dst->write, 0) != NGX_OK) {
1043 ngx_stream_proxy_finalize(s, NGX_ERROR); 1125 ngx_stream_proxy_finalize(s, NGX_ERROR);
1044 return NGX_ERROR; 1126 return NGX_ERROR;
1045 } 1127 }
1046 1128
1047 ngx_add_timer(c->read, pscf->timeout); 1129 if (!c->read->delayed && !pc->read->delayed) {
1130 ngx_add_timer(c->write, pscf->timeout);
1131
1132 } else if (c->write->timer_set) {
1133 ngx_del_timer(c->write);
1134 }
1048 } 1135 }
1049 1136
1050 return NGX_OK; 1137 return NGX_OK;
1051 } 1138 }
1052 1139
1205 1292
1206 conf->connect_timeout = NGX_CONF_UNSET_MSEC; 1293 conf->connect_timeout = NGX_CONF_UNSET_MSEC;
1207 conf->timeout = NGX_CONF_UNSET_MSEC; 1294 conf->timeout = NGX_CONF_UNSET_MSEC;
1208 conf->next_upstream_timeout = NGX_CONF_UNSET_MSEC; 1295 conf->next_upstream_timeout = NGX_CONF_UNSET_MSEC;
1209 conf->downstream_buf_size = NGX_CONF_UNSET_SIZE; 1296 conf->downstream_buf_size = NGX_CONF_UNSET_SIZE;
1297 conf->downstream_limit_rate = NGX_CONF_UNSET_SIZE;
1210 conf->upstream_buf_size = NGX_CONF_UNSET_SIZE; 1298 conf->upstream_buf_size = NGX_CONF_UNSET_SIZE;
1299 conf->upstream_limit_rate = NGX_CONF_UNSET_SIZE;
1211 conf->next_upstream_tries = NGX_CONF_UNSET_UINT; 1300 conf->next_upstream_tries = NGX_CONF_UNSET_UINT;
1212 conf->next_upstream = NGX_CONF_UNSET; 1301 conf->next_upstream = NGX_CONF_UNSET;
1213 conf->proxy_protocol = NGX_CONF_UNSET; 1302 conf->proxy_protocol = NGX_CONF_UNSET;
1214 conf->local = NGX_CONF_UNSET_PTR; 1303 conf->local = NGX_CONF_UNSET_PTR;
1215 1304
1242 prev->next_upstream_timeout, 0); 1331 prev->next_upstream_timeout, 0);
1243 1332
1244 ngx_conf_merge_size_value(conf->downstream_buf_size, 1333 ngx_conf_merge_size_value(conf->downstream_buf_size,
1245 prev->downstream_buf_size, 16384); 1334 prev->downstream_buf_size, 16384);
1246 1335
1336 ngx_conf_merge_size_value(conf->downstream_limit_rate,
1337 prev->downstream_limit_rate, 0);
1338
1247 ngx_conf_merge_size_value(conf->upstream_buf_size, 1339 ngx_conf_merge_size_value(conf->upstream_buf_size,
1248 prev->upstream_buf_size, 16384); 1340 prev->upstream_buf_size, 16384);
1341
1342 ngx_conf_merge_size_value(conf->upstream_limit_rate,
1343 prev->upstream_limit_rate, 0);
1249 1344
1250 ngx_conf_merge_uint_value(conf->next_upstream_tries, 1345 ngx_conf_merge_uint_value(conf->next_upstream_tries,
1251 prev->next_upstream_tries, 0); 1346 prev->next_upstream_tries, 0);
1252 1347
1253 ngx_conf_merge_value(conf->next_upstream, prev->next_upstream, 1); 1348 ngx_conf_merge_value(conf->next_upstream, prev->next_upstream, 1);