comparison stream_js_fetch.t @ 1785:2c08bfffe112

Tests: added stream js test for s.send() in async context.
author Dmitry Volyntsev <xeioex@nginx.com>
date Tue, 23 Aug 2022 19:40:07 -0700
parents 898598722ad4
children cadf15e2e2b3
comparison
equal deleted inserted replaced
1784:898598722ad4 1785:2c08bfffe112
64 64
65 server { 65 server {
66 listen 127.0.0.1:8081; 66 listen 127.0.0.1:8081;
67 js_preread test.preread_verify; 67 js_preread test.preread_verify;
68 proxy_pass 127.0.0.1:8090; 68 proxy_pass 127.0.0.1:8090;
69 }
70
71 server {
72 listen 127.0.0.1:8082;
73 js_filter test.filter_verify;
74 proxy_pass 127.0.0.1:8091;
69 } 75 }
70 } 76 }
71 77
72 EOF 78 EOF
73 79
101 s.deny(); 107 s.deny();
102 } 108 }
103 }); 109 });
104 } 110 }
105 111
106 export default {njs: test_njs, validate, preread_verify} 112 function filter_verify(s) {
113 var collect = Buffer.from([]);
114
115 s.on('upstream', async function (data, flags) {
116 collect = Buffer.concat([collect, data]);
117
118 if (collect.length >= 4 && collect.readUInt16BE(0) == 0xabcd) {
119 s.off('upstream');
120
121 let reply = await ngx.fetch('http://127.0.0.1:$p/validate',
122 {body: collect.slice(2,4),
123 headers: {Host:'aaa'}});
124
125 if (reply.status == 200) {
126 s.send(collect.slice(4), flags);
127
128 } else {
129 s.send("__CLOSE__", flags);
130 }
131 }
132 });
133 }
134
135 export default {njs: test_njs, validate, preread_verify, filter_verify};
107 EOF 136 EOF
108 137
109 $t->try_run('no stream njs available')->plan(4); 138 $t->try_run('no stream njs available')->plan(7);
110 139
111 $t->run_daemon(\&stream_daemon, port(8090)); 140 $t->run_daemon(\&stream_daemon, port(8090), port(8091));
112 $t->waitforsocket('127.0.0.1:' . port(8090)); 141 $t->waitforsocket('127.0.0.1:' . port(8090));
142 $t->waitforsocket('127.0.0.1:' . port(8091));
113 143
114 ############################################################################### 144 ###############################################################################
115 145
116 local $TODO = 'not yet' 146 local $TODO = 'not yet'
117 unless http_get('/njs') =~ /^([.0-9]+)$/m && $1 ge '0.5.1'; 147 unless http_get('/njs') =~ /^([.0-9]+)$/m && $1 ge '0.5.1';
122 is(stream('127.0.0.1:' . port(8081))->io("\xAC\xCDQZ##"), '', 152 is(stream('127.0.0.1:' . port(8081))->io("\xAC\xCDQZ##"), '',
123 'preread invalid magic'); 153 'preread invalid magic');
124 is(stream('127.0.0.1:' . port(8081))->io("\xAB\xCDQQ##"), '', 154 is(stream('127.0.0.1:' . port(8081))->io("\xAB\xCDQQ##"), '',
125 'preread validation failed'); 155 'preread validation failed');
126 156
157 is(stream('127.0.0.1:' . port(8082))->io("\xAB\xCDQQ##"), '',
158 'filter validation failed');
159
160 my $s = stream('127.0.0.1:' . port(8082));
161
162 TODO: {
163 local $TODO = 'not yet'
164 unless http_get('/njs') =~ /^([.0-9]+)$/m && $1 ge '0.7.7';
165
166 is($s->io("\xAB\xCDQZ##", read => 1), '##', 'filter validated');
167
168 }
169
170 is($s->io("@@", read => 1), '@@', 'filter off');
171
127 ############################################################################### 172 ###############################################################################
128 173
129 sub stream_daemon { 174 sub stream_daemon {
130 my $server = IO::Socket::INET->new( 175 my (@ports) = @_;
131 Proto => 'tcp', 176 my (@socks, @clients);
132 LocalAddr => '127.0.0.1:' . port(8090), 177
133 Listen => 5, 178 for my $port (@ports) {
134 Reuse => 1 179 my $server = IO::Socket::INET->new(
135 ) 180 Proto => 'tcp',
136 or die "Can't create listening socket: $!\n"; 181 LocalAddr => "127.0.0.1:$port",
182 Listen => 5,
183 Reuse => 1
184 )
185 or die "Can't create listening socket: $!\n";
186 push @socks, $server;
187 }
188
189 my $sel = IO::Select->new(@socks);
137 190
138 local $SIG{PIPE} = 'IGNORE'; 191 local $SIG{PIPE} = 'IGNORE';
139 192
140 while (my $client = $server->accept()) { 193 while (my @ready = $sel->can_read) {
141 $client->autoflush(1); 194 foreach my $fh (@ready) {
142 195 if (grep $_ == $fh, @socks) {
143 log2c("(new connection $client)"); 196 my $new = $fh->accept;
144 197 $new->autoflush(1);
145 $client->sysread(my $buffer, 65536) or next; 198 $sel->add($new);
146 199
147 log2i("$client $buffer"); 200 } elsif (stream_handle_client($fh)
148 201 || $fh->sockport() == port(8090))
149 log2o("$client $buffer"); 202 {
150 203 $sel->remove($fh);
151 $client->syswrite($buffer); 204 $fh->close;
152 205 }
153 close $client; 206 }
154 } 207 }
208 }
209
210 sub stream_handle_client {
211 my ($client) = @_;
212
213 log2c("(new connection $client)");
214
215 $client->sysread(my $buffer, 65536) or return 1;
216
217 log2i("$client $buffer");
218
219 if ($buffer eq "__CLOSE__") {
220 return 1;
221 }
222
223 log2o("$client $buffer");
224
225 $client->syswrite($buffer);
226
227 return 0;
155 } 228 }
156 229
157 sub log2i { Test::Nginx::log_core('|| <<', @_); } 230 sub log2i { Test::Nginx::log_core('|| <<', @_); }
158 sub log2o { Test::Nginx::log_core('|| >>', @_); } 231 sub log2o { Test::Nginx::log_core('|| >>', @_); }
159 sub log2c { Test::Nginx::log_core('||', @_); } 232 sub log2c { Test::Nginx::log_core('||', @_); }