comparison proxy_websocket.t @ 248:c388d7c3a666

Tests: tests for upcoming websocket proxy support.
author Maxim Dounin <mdounin@mdounin.ru>
date Fri, 18 Jan 2013 21:00:29 +0400
parents
children 6a0d934950bc
comparison
equal deleted inserted replaced
247:bf6b05f03458 248:c388d7c3a666
1 #!/usr/bin/perl
2
3 # (C) Maxim Dounin
4
5 # Tests for http proxy websockets support.
6
7 ###############################################################################
8
9 use warnings;
10 use strict;
11
12 use Test::More;
13
14 use IO::Poll;
15 use IO::Select;
16 use IO::Socket::INET;
17 use Socket qw/ CRLF /;
18
19 BEGIN { use FindBin; chdir($FindBin::Bin); }
20
21 use lib 'lib';
22 use Test::Nginx;
23
24 ###############################################################################
25
26 select STDERR; $| = 1;
27 select STDOUT; $| = 1;
28
29 eval {
30 require Protocol::WebSocket::Handshake::Client;
31 require Protocol::WebSocket::Handshake::Server;
32 require Protocol::WebSocket::Frame;
33 };
34
35 plan(skip_all => 'Protocol::WebSocket not installed') if $@;
36
37 my $t = Test::Nginx->new()->has(qw/http proxy/)
38 ->write_file_expand('nginx.conf', <<'EOF')->plan(26);
39
40 %%TEST_GLOBALS%%
41
42 daemon off;
43
44 events {
45 }
46
47 http {
48 %%TEST_GLOBALS_HTTP%%
49
50 server {
51 listen 127.0.0.1:8080;
52 server_name localhost;
53
54 location / {
55 proxy_pass http://127.0.0.1:8081;
56 proxy_http_version 1.1;
57 proxy_set_header Upgrade $http_upgrade;
58 proxy_set_header Connection "Upgrade";
59 proxy_read_timeout 2s;
60 send_timeout 2s;
61 }
62 }
63 }
64
65 EOF
66
67 $t->run_daemon(\&websocket_fake_daemon);
68 $t->run();
69
70 ###############################################################################
71
72 TODO: {
73 local $TODO = 'not yet';
74
75 # establish websocket connection
76
77 my $s = websocket_connect();
78 ok($s, "websocket handshake");
79
80 SKIP: {
81 skip "handshake failed", 22 unless $s;
82
83 # send a frame
84
85 websocket_write($s, 'foo');
86 is(websocket_read($s), 'bar', "websocket response");
87
88 # send some big frame
89
90 websocket_write($s, 'foo' x 16384);
91 like(websocket_read($s), qr/^(bar){16384}$/, "websocket big response");
92
93 # send multiple frames
94
95 for my $i (1 .. 10) {
96 websocket_write($s, ('foo' x 16384) . $i);
97 websocket_write($s, 'bazz' . $i);
98 }
99
100 for my $i (1 .. 10) {
101 like(websocket_read($s), qr/^(bar){16384}\d+$/, "websocket $i");
102 is(websocket_read($s), 'bazz' . $i, "websocket small $i");
103 }
104 }
105
106 # establish websocket connection with some pipelined data
107 # and make sure they are correctly passed upstream
108
109 undef $s;
110 $s = websocket_connect("foo");
111 ok($s, "handshake pipelined");
112
113 SKIP: {
114 skip "handshake failed", 2 unless $s;
115
116 is(websocket_read($s), "bar", "response pipelined");
117
118 websocket_write($s, "foo");
119 is(websocket_read($s), "bar", "next to pipelined");
120 }
121
122 }
123
124 ###############################################################################
125
126 sub websocket_connect {
127 my ($message) = @_;
128
129 my $s = IO::Socket::INET->new(
130 Proto => 'tcp',
131 PeerAddr => '127.0.0.1:8080'
132 )
133 or die "Can't connect to nginx: $!\n";
134
135 my $h = Protocol::WebSocket::Handshake::Client->new(
136 url => 'ws://localhost');
137
138 # send request, $h->to_string
139
140 my $buf = $h->to_string;
141 $buf .= Protocol::WebSocket::Frame->new($message)->to_bytes
142 if $message;
143
144 local $SIG{PIPE} = 'IGNORE';
145
146 log_out($buf);
147 $s->syswrite($buf);
148
149 # read response
150
151 my $got = '';
152 $buf = '';
153
154 $s->blocking(0);
155 while (IO::Select->new($s)->can_read(1.5)) {
156 my $n = $s->sysread($buf, 1024);
157 last unless $n;
158 log_in($buf);
159 $got .= $buf;
160 last if $got =~ /\x0d?\x0a\x0d?\x0a$/;
161 }
162
163 # parse server response
164
165 $h->parse($got);
166 return $s if $h->is_done;
167 }
168
169 sub websocket_write {
170 my ($s, $message) = @_;
171 my $frame = Protocol::WebSocket::Frame->new($message);
172
173 local $SIG{PIPE} = 'IGNORE';
174 $s->blocking(1);
175
176 log_out($frame->to_bytes);
177 $s->syswrite($frame->to_bytes);
178 }
179
180 sub websocket_read {
181 my ($s) = @_;
182 my ($buf, $got);
183
184 # store frame object in socket itself to simplify things
185 # this works as $s is IO::Handle, see man IO::Handle
186
187 ${*$s}->{_websocket_frame} ||= Protocol::WebSocket::Frame->new();
188 my $frame = ${*$s}->{_websocket_frame};
189
190 $s->blocking(0);
191 $got = $frame->next();
192 return $got if defined $got;
193
194 while (IO::Select->new($s)->can_read(1.5)) {
195 my $n = $s->sysread($buf, 65536);
196 return $got unless $n;
197 log_in($buf);
198 $frame->append($buf);
199 $got = $frame->next();
200 return $got if defined $got;
201 }
202 }
203
204 ###############################################################################
205
206 sub websocket_fake_daemon {
207 my $server = IO::Socket::INET->new(
208 Proto => 'tcp',
209 LocalAddr => '127.0.0.1:8081',
210 Listen => 5,
211 Reuse => 1
212 )
213 or die "Can't create listening socket: $!\n";
214
215 while (my $client = $server->accept()) {
216 websocket_handle_client($client);
217 }
218 }
219
220 sub websocket_handle_client {
221 my ($client) = @_;
222
223 $client->autoflush(1);
224 $client->blocking(0);
225
226 my $poll = IO::Poll->new;
227
228 my $hs = Protocol::WebSocket::Handshake::Server->new;
229 my $frame = Protocol::WebSocket::Frame->new;
230 my $buffer = '';
231 my $closed;
232 my $n;
233
234 log2c("(new connection $client)");
235
236 while (1) {
237 $poll->mask($client => ($buffer ? POLLIN|POLLOUT : POLLIN));
238 my $p = $poll->poll(0.5);
239 log2c("(poll $p)");
240
241 foreach my $reader ($poll->handles(POLLIN)) {
242 $n = $client->sysread(my $chunk, 65536);
243 return unless $n;
244
245 log2i($chunk);
246
247 if (!$hs->is_done) {
248 unless (defined $hs->parse($chunk)) {
249 log2c("(error: " . $hs->error . ")");
250 return;
251 }
252
253 if ($hs->is_done) {
254 $buffer = $hs->to_string;
255 log2o($buffer);
256 }
257
258 log2c("(parse: $chunk)");
259 }
260
261 $frame->append($chunk);
262
263 while (defined(my $message = $frame->next)) {
264 my $f;
265
266 if ($frame->is_close) {
267 log2c("(close frame)");
268 $closed = 1;
269 $f = $frame->new(type => 'close')
270 ->to_bytes;
271 } else {
272 $message =~ s/foo/bar/g;
273 $f = $frame->new($message)->to_bytes;
274 }
275
276 log2o($f);
277 $buffer .= $f;
278 }
279 }
280
281 foreach my $writer ($poll->handles(POLLOUT)) {
282 next unless length $buffer;
283 $n = $writer->syswrite($buffer);
284 substr $buffer, 0, $n, '';
285 }
286
287 if ($closed && length $buffer == 0) {
288 log2c("(closed)");
289 return;
290 }
291 }
292 }
293
294 sub log2i { Test::Nginx::log_core('|| <<', @_); }
295 sub log2o { Test::Nginx::log_core('|| >>', @_); }
296 sub log2c { Test::Nginx::log_core('||', @_); }
297
298 ###############################################################################