comparison stream_js_send.t @ 1793:261967d85363

Tests: added js tests for s.send() in async context.
author Dmitry Volyntsev <xeioex@nginx.com>
date Mon, 26 Sep 2022 17:48:03 -0700
parents
children 520fb74cce4c
comparison
equal deleted inserted replaced
1792:d16310f0ada7 1793:261967d85363
1 #!/usr/bin/perl
2
3 # (C) Dmitry Volyntsev
4 # (C) Nginx, Inc.
5
6 # Tests for s.send() in stream njs module.
7
8 ###############################################################################
9
10 use warnings;
11 use strict;
12
13 use Test::More;
14
15 BEGIN { use FindBin; chdir($FindBin::Bin); }
16
17 use lib 'lib';
18 use Test::Nginx;
19 use Test::Nginx::Stream qw/ stream /;
20
21 ###############################################################################
22
23 select STDERR; $| = 1;
24 select STDOUT; $| = 1;
25
26 my $t = Test::Nginx->new()->has(qw/http stream/)
27 ->write_file_expand('nginx.conf', <<'EOF');
28
29 %%TEST_GLOBALS%%
30
31 daemon off;
32
33 events {
34 }
35
36 http {
37 %%TEST_GLOBALS_HTTP%%
38
39 js_import test.js;
40
41 server {
42 listen 127.0.0.1:8080;
43 server_name localhost;
44
45 location /njs {
46 js_content test.njs;
47 }
48 }
49 }
50
51 stream {
52 %%TEST_GLOBALS_STREAM%%
53
54 js_import test.js;
55
56 server {
57 listen 127.0.0.1:8081;
58 js_filter test.filter;
59 proxy_pass 127.0.0.1:8090;
60 }
61
62 server {
63 listen 127.0.0.1:8082;
64 js_filter test.filter_direct;
65 proxy_pass 127.0.0.1:8090;
66 }
67 }
68
69 EOF
70
71 $t->write_file('test.js', <<EOF);
72 function test_njs(r) {
73 r.return(200, njs.version);
74 }
75
76 function filter(s) {
77 s.on("upload", async (data, flags) => {
78 s.send("__HANDSHAKE__", flags);
79
80 const p = new Promise((resolve, reject) => {
81 s.on("download", (data, flags) => {
82 s.off("download");
83 resolve(data);
84 });
85 });
86
87 s.off("upload");
88
89 const handshakeResponse = await p;
90 if (handshakeResponse != '__HANDSHAKE_RESPONSE__') {
91 throw `Handshake failed: \${handshakeResponse}`;
92 }
93
94 s.send(data, flags);
95 });
96 }
97
98 function filter_direct(s) {
99 s.on("upload", async (data, flags) => {
100 s.sendUpstream("__HANDSHAKE__", flags);
101
102 const p = new Promise((resolve, reject) => {
103 s.on("download", (data, flags) => {
104 s.off("download");
105 resolve(data);
106 });
107 });
108
109 s.off("upload");
110
111 const handshakeResponse = await p;
112 if (handshakeResponse != '__HANDSHAKE_RESPONSE__') {
113 throw `Handshake failed: \${handshakeResponse}`;
114 }
115
116 s.sendDownstream('xxx', flags);
117 s.sendUpstream(data, flags);
118 });
119 }
120
121 export default {njs:test_njs, filter, filter_direct};
122
123 EOF
124
125 $t->run_daemon(\&stream_daemon, port(8090));
126 $t->try_run('no stream njs available')->plan(2);
127 $t->waitforsocket('127.0.0.1:' . port(8090));
128
129 ###############################################################################
130
131 TODO: {
132 local $TODO = 'not yet'
133 unless http_get('/njs') =~ /^([.0-9]+)$/m && $1 ge '0.7.8';
134
135 is(stream('127.0.0.1:' . port(8081))->io('abc'), 'ABC',
136 'async filter');;
137 is(stream('127.0.0.1:' . port(8082))->io('abc'), 'xxxABC',
138 'async filter direct');
139
140 }
141
142 $t->stop();
143
144 ###############################################################################
145
146 sub stream_daemon {
147 my $server = IO::Socket::INET->new(
148 Proto => 'tcp',
149 LocalAddr => '127.0.0.1:' . port(8090),
150 Listen => 5,
151 Reuse => 1
152 )
153 or die "Can't create listening socket: $!\n";
154
155 local $SIG{PIPE} = 'IGNORE';
156
157 while (my $client = $server->accept()) {
158 $client->autoflush(1);
159
160 log2c("(new connection $client)");
161
162 $client->sysread(my $buffer, 65536) or next;
163
164 log2i("$client $buffer");
165
166 if ($buffer ne "__HANDSHAKE__") {
167 $buffer = "__HANDSHAKE_INVALID__";
168 log2o("$client $buffer");
169 $client->syswrite($buffer);
170
171 close $client;
172 }
173
174 $buffer = "__HANDSHAKE_RESPONSE__";
175 log2o("$client $buffer");
176 $client->syswrite($buffer);
177
178 $client->sysread($buffer, 65536) or next;
179
180 $buffer = uc($buffer);
181 log2o("$client $buffer");
182 $client->syswrite($buffer);
183
184 close $client;
185 }
186 }
187
188 sub log2i { Test::Nginx::log_core('|| <<', @_); }
189 sub log2o { Test::Nginx::log_core('|| >>', @_); }
190 sub log2c { Test::Nginx::log_core('||', @_); }
191
192 ###############################################################################