# HG changeset patch # User Maxim Dounin # Date 1358528429 -14400 # Node ID c388d7c3a666e4712f7dcd7904a6290b57abf05c # Parent bf6b05f034581c8bf7fca64683aa54e167714951 Tests: tests for upcoming websocket proxy support. diff --git a/proxy_websocket.t b/proxy_websocket.t new file mode 100644 --- /dev/null +++ b/proxy_websocket.t @@ -0,0 +1,298 @@ +#!/usr/bin/perl + +# (C) Maxim Dounin + +# Tests for http proxy websockets support. + +############################################################################### + +use warnings; +use strict; + +use Test::More; + +use IO::Poll; +use IO::Select; +use IO::Socket::INET; +use Socket qw/ CRLF /; + +BEGIN { use FindBin; chdir($FindBin::Bin); } + +use lib 'lib'; +use Test::Nginx; + +############################################################################### + +select STDERR; $| = 1; +select STDOUT; $| = 1; + +eval { + require Protocol::WebSocket::Handshake::Client; + require Protocol::WebSocket::Handshake::Server; + require Protocol::WebSocket::Frame; +}; + +plan(skip_all => 'Protocol::WebSocket not installed') if $@; + +my $t = Test::Nginx->new()->has(qw/http proxy/) + ->write_file_expand('nginx.conf', <<'EOF')->plan(26); + +%%TEST_GLOBALS%% + +daemon off; + +events { +} + +http { + %%TEST_GLOBALS_HTTP%% + + server { + listen 127.0.0.1:8080; + server_name localhost; + + location / { + proxy_pass http://127.0.0.1:8081; + proxy_http_version 1.1; + proxy_set_header Upgrade $http_upgrade; + proxy_set_header Connection "Upgrade"; + proxy_read_timeout 2s; + send_timeout 2s; + } + } +} + +EOF + +$t->run_daemon(\&websocket_fake_daemon); +$t->run(); + +############################################################################### + +TODO: { +local $TODO = 'not yet'; + +# establish websocket connection + +my $s = websocket_connect(); +ok($s, "websocket handshake"); + +SKIP: { + skip "handshake failed", 22 unless $s; + + # send a frame + + websocket_write($s, 'foo'); + is(websocket_read($s), 'bar', "websocket response"); + + # send some big frame + + websocket_write($s, 'foo' x 16384); + like(websocket_read($s), qr/^(bar){16384}$/, "websocket big response"); + + # send multiple frames + + for my $i (1 .. 10) { + websocket_write($s, ('foo' x 16384) . $i); + websocket_write($s, 'bazz' . $i); + } + + for my $i (1 .. 10) { + like(websocket_read($s), qr/^(bar){16384}\d+$/, "websocket $i"); + is(websocket_read($s), 'bazz' . $i, "websocket small $i"); + } +} + +# establish websocket connection with some pipelined data +# and make sure they are correctly passed upstream + +undef $s; +$s = websocket_connect("foo"); +ok($s, "handshake pipelined"); + +SKIP: { + skip "handshake failed", 2 unless $s; + + is(websocket_read($s), "bar", "response pipelined"); + + websocket_write($s, "foo"); + is(websocket_read($s), "bar", "next to pipelined"); +} + +} + +############################################################################### + +sub websocket_connect { + my ($message) = @_; + + my $s = IO::Socket::INET->new( + Proto => 'tcp', + PeerAddr => '127.0.0.1:8080' + ) + or die "Can't connect to nginx: $!\n"; + + my $h = Protocol::WebSocket::Handshake::Client->new( + url => 'ws://localhost'); + + # send request, $h->to_string + + my $buf = $h->to_string; + $buf .= Protocol::WebSocket::Frame->new($message)->to_bytes + if $message; + + local $SIG{PIPE} = 'IGNORE'; + + log_out($buf); + $s->syswrite($buf); + + # read response + + my $got = ''; + $buf = ''; + + $s->blocking(0); + while (IO::Select->new($s)->can_read(1.5)) { + my $n = $s->sysread($buf, 1024); + last unless $n; + log_in($buf); + $got .= $buf; + last if $got =~ /\x0d?\x0a\x0d?\x0a$/; + } + + # parse server response + + $h->parse($got); + return $s if $h->is_done; +} + +sub websocket_write { + my ($s, $message) = @_; + my $frame = Protocol::WebSocket::Frame->new($message); + + local $SIG{PIPE} = 'IGNORE'; + $s->blocking(1); + + log_out($frame->to_bytes); + $s->syswrite($frame->to_bytes); +} + +sub websocket_read { + my ($s) = @_; + my ($buf, $got); + + # store frame object in socket itself to simplify things + # this works as $s is IO::Handle, see man IO::Handle + + ${*$s}->{_websocket_frame} ||= Protocol::WebSocket::Frame->new(); + my $frame = ${*$s}->{_websocket_frame}; + + $s->blocking(0); + $got = $frame->next(); + return $got if defined $got; + + while (IO::Select->new($s)->can_read(1.5)) { + my $n = $s->sysread($buf, 65536); + return $got unless $n; + log_in($buf); + $frame->append($buf); + $got = $frame->next(); + return $got if defined $got; + } +} + +############################################################################### + +sub websocket_fake_daemon { + my $server = IO::Socket::INET->new( + Proto => 'tcp', + LocalAddr => '127.0.0.1:8081', + Listen => 5, + Reuse => 1 + ) + or die "Can't create listening socket: $!\n"; + + while (my $client = $server->accept()) { + websocket_handle_client($client); + } +} + +sub websocket_handle_client { + my ($client) = @_; + + $client->autoflush(1); + $client->blocking(0); + + my $poll = IO::Poll->new; + + my $hs = Protocol::WebSocket::Handshake::Server->new; + my $frame = Protocol::WebSocket::Frame->new; + my $buffer = ''; + my $closed; + my $n; + + log2c("(new connection $client)"); + + while (1) { + $poll->mask($client => ($buffer ? POLLIN|POLLOUT : POLLIN)); + my $p = $poll->poll(0.5); + log2c("(poll $p)"); + + foreach my $reader ($poll->handles(POLLIN)) { + $n = $client->sysread(my $chunk, 65536); + return unless $n; + + log2i($chunk); + + if (!$hs->is_done) { + unless (defined $hs->parse($chunk)) { + log2c("(error: " . $hs->error . ")"); + return; + } + + if ($hs->is_done) { + $buffer = $hs->to_string; + log2o($buffer); + } + + log2c("(parse: $chunk)"); + } + + $frame->append($chunk); + + while (defined(my $message = $frame->next)) { + my $f; + + if ($frame->is_close) { + log2c("(close frame)"); + $closed = 1; + $f = $frame->new(type => 'close') + ->to_bytes; + } else { + $message =~ s/foo/bar/g; + $f = $frame->new($message)->to_bytes; + } + + log2o($f); + $buffer .= $f; + } + } + + foreach my $writer ($poll->handles(POLLOUT)) { + next unless length $buffer; + $n = $writer->syswrite($buffer); + substr $buffer, 0, $n, ''; + } + + if ($closed && length $buffer == 0) { + log2c("(closed)"); + return; + } + } +} + +sub log2i { Test::Nginx::log_core('|| <<', @_); } +sub log2o { Test::Nginx::log_core('|| >>', @_); } +sub log2c { Test::Nginx::log_core('||', @_); } + +###############################################################################