comparison stream_upstream_least_conn.t @ 557:05cbe9e2def8

Tests: basic stream tests for upstream least_conn.
author Sergey Kandaurov <pluknet@nginx.com>
date Thu, 23 Apr 2015 14:01:13 +0300
parents
children ff49e1c00b35
comparison
equal deleted inserted replaced
556:97d89d9ab4ed 557:05cbe9e2def8
1 #!/usr/bin/perl
2
3 # (C) Sergey Kandaurov
4 # (C) Nginx, Inc.
5
6 # Stream tests for upstream least_conn balancer module.
7
8 ###############################################################################
9
10 use warnings;
11 use strict;
12
13 use Test::More;
14
15 use IO::Select;
16
17 BEGIN { use FindBin; chdir($FindBin::Bin); }
18
19 use lib 'lib';
20 use Test::Nginx;
21
22 ###############################################################################
23
24 select STDERR; $| = 1;
25 select STDOUT; $| = 1;
26
27 my $t = Test::Nginx->new()->has(qw/stream stream_upstream_least_conn/)->plan(2)
28 ->write_file_expand('nginx.conf', <<'EOF');
29
30 %%TEST_GLOBALS%%
31
32 daemon off;
33
34 events {
35 }
36
37 stream {
38 upstream u {
39 least_conn;
40 server 127.0.0.1:8081;
41 server 127.0.0.1:8082;
42 }
43
44 server {
45 listen 127.0.0.1:8080;
46 proxy_pass u;
47 }
48 }
49
50 EOF
51
52 $t->run_daemon(\&stream_daemon, 8081);
53 $t->run_daemon(\&stream_daemon, 8082);
54 $t->run();
55
56 $t->waitforsocket('127.0.0.1:8081');
57 $t->waitforsocket('127.0.0.1:8082');
58
59 ###############################################################################
60
61 is(many('.', 10), '8081: 5, 8082: 5', 'balanced');
62 is(parallel('w', 10), '8081: 1, 8082: 9', 'least_conn');
63
64 ###############################################################################
65
66 sub many {
67 my ($data, $count, %opts) = @_;
68 my (%ports, $peer);
69
70 $peer = $opts{peer};
71
72 for (1 .. $count) {
73 if (stream_get($data, $peer) =~ /(\d+)/) {
74 $ports{$1} = 0 unless defined $ports{$1};
75 $ports{$1}++;
76 }
77 }
78
79 return join ', ', map { $_ . ": " . $ports{$_} } sort keys %ports;
80 }
81
82 sub parallel {
83 my ($data, $count, %opts) = @_;
84 my (@sockets, %ports, $peer);
85
86 $peer = $opts{peer} || undef;
87
88 for (1 .. $count) {
89 my $s = stream_connect($peer);
90 push @sockets, $s;
91 stream_write($s, $data);
92 select undef, undef, undef, 0.2;
93 }
94
95 for (1 .. $count) {
96 my $s = pop @sockets;
97 if (stream_read($s) =~ /(\d+)/) {
98 $ports{$1} = 0 unless defined $ports{$1};
99 $ports{$1}++;
100 }
101 close $s;
102 }
103
104 return join ', ', map { $_ . ": " . $ports{$_} } sort keys %ports;
105 }
106
107 sub stream_get {
108 my ($data, $peer) = @_;
109
110 my $s = stream_connect($peer);
111 stream_write($s, $data);
112 my $r = stream_read($s);
113
114 $s->close;
115 return $r;
116 }
117
118 sub stream_connect {
119 my $peer = shift;
120 my $s = IO::Socket::INET->new(
121 Proto => 'tcp',
122 PeerAddr => $peer || '127.0.0.1:8080'
123 )
124 or die "Can't connect to nginx: $!\n";
125
126 return $s;
127 }
128
129 sub stream_write {
130 my ($s, $message) = @_;
131
132 local $SIG{PIPE} = 'IGNORE';
133
134 $s->blocking(0);
135 while (IO::Select->new($s)->can_write(1.5)) {
136 my $n = $s->syswrite($message);
137 last unless $n;
138 $message = substr($message, $n);
139 last unless length $message;
140 }
141
142 if (length $message) {
143 $s->close();
144 }
145 }
146
147 sub stream_read {
148 my ($s) = @_;
149 my ($buf);
150
151 $s->blocking(0);
152 if (IO::Select->new($s)->can_read(3)) {
153 $s->sysread($buf, 1024);
154 };
155
156 log_in($buf);
157 return $buf;
158 }
159
160 ###############################################################################
161
162 sub stream_daemon {
163 my ($port) = @_;
164
165 my $server = IO::Socket::INET->new(
166 Proto => 'tcp',
167 LocalAddr => '127.0.0.1',
168 LocalPort => $port,
169 Listen => 5,
170 Reuse => 1
171 )
172 or die "Can't create listening socket: $!\n";
173
174 my $sel = IO::Select->new($server);
175
176 local $SIG{PIPE} = 'IGNORE';
177
178 while (my @ready = $sel->can_read) {
179 foreach my $fh (@ready) {
180 if ($server == $fh) {
181 my $new = $fh->accept;
182 $new->autoflush(1);
183 $sel->add($new);
184
185 } elsif (stream_handle_client($fh)) {
186 $sel->remove($fh);
187 $fh->close;
188 }
189 }
190 }
191 }
192
193 sub stream_handle_client {
194 my ($client) = @_;
195
196 log2c("(new connection $client)");
197
198 $client->sysread(my $buffer, 65536) or return 1;
199
200 log2i("$client $buffer");
201
202 my $port = $client->sockport();
203
204 if ($buffer =~ /w/ && $port == 8081) {
205 Test::Nginx::log_core('||', "$port: sleep(2.5)");
206 select undef, undef, undef, 2.5;
207 }
208
209 $buffer = $port;
210
211 log2o("$client $buffer");
212
213 $client->syswrite($buffer);
214
215 return 1;
216 }
217
218 sub log2i { Test::Nginx::log_core('|| <<', @_); }
219 sub log2o { Test::Nginx::log_core('|| >>', @_); }
220 sub log2c { Test::Nginx::log_core('||', @_); }
221
222 ###############################################################################