changeset 248:c388d7c3a666

Tests: tests for upcoming websocket proxy support.
author Maxim Dounin <mdounin@mdounin.ru>
date Fri, 18 Jan 2013 21:00:29 +0400
parents bf6b05f03458
children 6a0d934950bc
files proxy_websocket.t
diffstat 1 files changed, 298 insertions(+), 0 deletions(-) [+]
line wrap: on
line diff
new file mode 100644
--- /dev/null
+++ b/proxy_websocket.t
@@ -0,0 +1,298 @@
+#!/usr/bin/perl
+
+# (C) Maxim Dounin
+
+# Tests for http proxy websockets support.
+
+###############################################################################
+
+use warnings;
+use strict;
+
+use Test::More;
+
+use IO::Poll;
+use IO::Select;
+use IO::Socket::INET;
+use Socket qw/ CRLF /;
+
+BEGIN { use FindBin; chdir($FindBin::Bin); }
+
+use lib 'lib';
+use Test::Nginx;
+
+###############################################################################
+
+select STDERR; $| = 1;
+select STDOUT; $| = 1;
+
+eval {
+	require Protocol::WebSocket::Handshake::Client;
+	require Protocol::WebSocket::Handshake::Server;
+	require Protocol::WebSocket::Frame;
+};
+
+plan(skip_all => 'Protocol::WebSocket not installed') if $@;
+
+my $t = Test::Nginx->new()->has(qw/http proxy/)
+	->write_file_expand('nginx.conf', <<'EOF')->plan(26);
+
+%%TEST_GLOBALS%%
+
+daemon         off;
+
+events {
+}
+
+http {
+    %%TEST_GLOBALS_HTTP%%
+
+    server {
+        listen       127.0.0.1:8080;
+        server_name  localhost;
+
+        location / {
+            proxy_pass    http://127.0.0.1:8081;
+            proxy_http_version 1.1;
+            proxy_set_header Upgrade $http_upgrade;
+            proxy_set_header Connection "Upgrade";
+            proxy_read_timeout 2s;
+            send_timeout 2s;
+        }
+    }
+}
+
+EOF
+
+$t->run_daemon(\&websocket_fake_daemon);
+$t->run();
+
+###############################################################################
+
+TODO: {
+local $TODO = 'not yet';
+
+# establish websocket connection
+
+my $s = websocket_connect();
+ok($s, "websocket handshake");
+
+SKIP: {
+	skip "handshake failed", 22 unless $s;
+
+	# send a frame
+
+	websocket_write($s, 'foo');
+	is(websocket_read($s), 'bar', "websocket response");
+
+	# send some big frame
+
+	websocket_write($s, 'foo' x 16384);
+	like(websocket_read($s), qr/^(bar){16384}$/, "websocket big response");
+
+	# send multiple frames
+
+	for my $i (1 .. 10) {
+		websocket_write($s, ('foo' x 16384) . $i);
+		websocket_write($s, 'bazz' . $i);
+	}
+
+	for my $i (1 .. 10) {
+		like(websocket_read($s), qr/^(bar){16384}\d+$/, "websocket $i");
+		is(websocket_read($s), 'bazz' . $i, "websocket small $i");
+	}
+}
+
+# establish websocket connection with some pipelined data
+# and make sure they are correctly passed upstream
+
+undef $s;
+$s = websocket_connect("foo");
+ok($s, "handshake pipelined");
+
+SKIP: {
+	skip "handshake failed", 2 unless $s;
+
+	is(websocket_read($s), "bar", "response pipelined");
+
+	websocket_write($s, "foo");
+	is(websocket_read($s), "bar", "next to pipelined");
+}
+
+}
+
+###############################################################################
+
+sub websocket_connect {
+	my ($message) = @_;
+
+	my $s = IO::Socket::INET->new(
+		Proto => 'tcp',
+		PeerAddr => '127.0.0.1:8080'
+	)
+		or die "Can't connect to nginx: $!\n";
+
+	my $h = Protocol::WebSocket::Handshake::Client->new(
+		url => 'ws://localhost');
+
+	# send request, $h->to_string
+
+	my $buf = $h->to_string;
+	$buf .= Protocol::WebSocket::Frame->new($message)->to_bytes
+		if $message;
+
+	local $SIG{PIPE} = 'IGNORE';
+
+	log_out($buf);
+	$s->syswrite($buf);
+
+	# read response
+
+	my $got = '';
+	$buf = '';
+
+	$s->blocking(0);
+	while (IO::Select->new($s)->can_read(1.5)) {
+		my $n = $s->sysread($buf, 1024);
+		last unless $n;
+		log_in($buf);
+		$got .= $buf;
+		last if $got =~ /\x0d?\x0a\x0d?\x0a$/;
+	}
+
+	# parse server response
+
+	$h->parse($got);
+	return $s if $h->is_done;
+}
+
+sub websocket_write {
+	my ($s, $message) = @_;
+	my $frame = Protocol::WebSocket::Frame->new($message);
+
+	local $SIG{PIPE} = 'IGNORE';
+	$s->blocking(1);
+
+	log_out($frame->to_bytes);
+	$s->syswrite($frame->to_bytes);
+}
+
+sub websocket_read {
+	my ($s) = @_;
+	my ($buf, $got);
+
+	# store frame object in socket itself to simplify things
+	# this works as $s is IO::Handle, see man IO::Handle
+
+	${*$s}->{_websocket_frame} ||= Protocol::WebSocket::Frame->new();
+	my $frame = ${*$s}->{_websocket_frame};
+
+	$s->blocking(0);
+	$got = $frame->next();
+	return $got if defined $got;
+
+	while (IO::Select->new($s)->can_read(1.5)) {
+		my $n = $s->sysread($buf, 65536);
+		return $got unless $n;
+		log_in($buf);
+		$frame->append($buf);
+		$got = $frame->next();
+		return $got if defined $got;
+	}
+}
+
+###############################################################################
+
+sub websocket_fake_daemon {
+	my $server = IO::Socket::INET->new(
+		Proto => 'tcp',
+		LocalAddr => '127.0.0.1:8081',
+		Listen => 5,
+		Reuse => 1
+	)
+		or die "Can't create listening socket: $!\n";
+
+	while (my $client = $server->accept()) {
+		websocket_handle_client($client);
+        }
+}
+
+sub websocket_handle_client {
+	my ($client) = @_;
+
+	$client->autoflush(1);
+	$client->blocking(0);
+
+	my $poll = IO::Poll->new;
+
+	my $hs = Protocol::WebSocket::Handshake::Server->new;
+	my $frame = Protocol::WebSocket::Frame->new;
+	my $buffer = '';
+	my $closed;
+	my $n;
+
+	log2c("(new connection $client)");
+
+	while (1) {
+        	$poll->mask($client => ($buffer ? POLLIN|POLLOUT : POLLIN));
+		my $p = $poll->poll(0.5);
+		log2c("(poll $p)");
+
+		foreach my $reader ($poll->handles(POLLIN)) {
+			$n = $client->sysread(my $chunk, 65536);
+			return unless $n;
+
+			log2i($chunk);
+
+			if (!$hs->is_done) {
+				unless (defined $hs->parse($chunk)) {
+					log2c("(error: " . $hs->error . ")");
+					return;
+				}
+
+				if ($hs->is_done) {
+					$buffer = $hs->to_string;
+					log2o($buffer);
+				}
+
+				log2c("(parse: $chunk)");
+			}
+
+			$frame->append($chunk);
+
+			while (defined(my $message = $frame->next)) {
+				my $f;
+
+				if ($frame->is_close) {
+					log2c("(close frame)");
+					$closed = 1;
+					$f = $frame->new(type => 'close')
+						->to_bytes;
+				} else {
+					$message =~ s/foo/bar/g;
+					$f = $frame->new($message)->to_bytes;
+				}
+
+				log2o($f);
+				$buffer .= $f;
+			}
+		}
+
+		foreach my $writer ($poll->handles(POLLOUT)) {
+			next unless length $buffer;
+			$n = $writer->syswrite($buffer);
+			substr $buffer, 0, $n, '';
+		}
+
+		if ($closed && length $buffer == 0) {
+			log2c("(closed)");
+			return;
+		}
+	}
+}
+
+sub log2i { Test::Nginx::log_core('|| <<', @_); }
+sub log2o { Test::Nginx::log_core('|| >>', @_); }
+sub log2c { Test::Nginx::log_core('||', @_); }
+
+###############################################################################