comparison src/stream/ngx_stream_write_filter_module.c @ 6692:56fc55e32f23

Stream: filters.
author Roman Arutyunyan <arut@nginx.com>
date Thu, 15 Sep 2016 14:55:46 +0300
parents
children a7ff19afbb14
comparison
equal deleted inserted replaced
6691:4bce3edfac2c 6692:56fc55e32f23
1
2 /*
3 * Copyright (C) Igor Sysoev
4 * Copyright (C) Nginx, Inc.
5 */
6
7
8 #include <ngx_config.h>
9 #include <ngx_core.h>
10 #include <ngx_stream.h>
11
12
13 typedef struct {
14 ngx_chain_t *from_upstream;
15 ngx_chain_t *from_downstream;
16 } ngx_stream_write_filter_ctx_t;
17
18
19 static ngx_int_t ngx_stream_write_filter(ngx_stream_session_t *s,
20 ngx_chain_t *in, ngx_uint_t from_upstream);
21 static ngx_int_t ngx_stream_write_filter_init(ngx_conf_t *cf);
22
23
24 static ngx_stream_module_t ngx_stream_write_filter_module_ctx = {
25 NULL, /* preconfiguration */
26 ngx_stream_write_filter_init, /* postconfiguration */
27
28 NULL, /* create main configuration */
29 NULL, /* init main configuration */
30
31 NULL, /* create server configuration */
32 NULL /* merge server configuration */
33 };
34
35
36 ngx_module_t ngx_stream_write_filter_module = {
37 NGX_MODULE_V1,
38 &ngx_stream_write_filter_module_ctx, /* module context */
39 NULL, /* module directives */
40 NGX_STREAM_MODULE, /* module type */
41 NULL, /* init master */
42 NULL, /* init module */
43 NULL, /* init process */
44 NULL, /* init thread */
45 NULL, /* exit thread */
46 NULL, /* exit process */
47 NULL, /* exit master */
48 NGX_MODULE_V1_PADDING
49 };
50
51
52 static ngx_int_t
53 ngx_stream_write_filter(ngx_stream_session_t *s, ngx_chain_t *in,
54 ngx_uint_t from_upstream)
55 {
56 off_t size;
57 ngx_uint_t last, flush, sync;
58 ngx_chain_t *cl, *ln, **ll, **out, *chain;
59 ngx_connection_t *c;
60 ngx_stream_write_filter_ctx_t *ctx;
61
62 ctx = ngx_stream_get_module_ctx(s, ngx_stream_write_filter_module);
63
64 if (ctx == NULL) {
65 ctx = ngx_pcalloc(s->connection->pool,
66 sizeof(ngx_stream_write_filter_ctx_t));
67 if (ctx == NULL) {
68 return NGX_ERROR;
69 }
70
71 ngx_stream_set_ctx(s, ctx, ngx_stream_write_filter_module);
72 }
73
74 if (from_upstream) {
75 c = s->connection;
76 out = &ctx->from_upstream;
77
78 } else {
79 c = s->upstream->peer.connection;
80 out = &ctx->from_downstream;
81 }
82
83 if (c->error) {
84 return NGX_ERROR;
85 }
86
87 size = 0;
88 flush = 0;
89 sync = 0;
90 last = 0;
91 ll = out;
92
93 /* find the size, the flush point and the last link of the saved chain */
94
95 for (cl = *out; cl; cl = cl->next) {
96 ll = &cl->next;
97
98 ngx_log_debug7(NGX_LOG_DEBUG_EVENT, c->log, 0,
99 "write old buf t:%d f:%d %p, pos %p, size: %z "
100 "file: %O, size: %O",
101 cl->buf->temporary, cl->buf->in_file,
102 cl->buf->start, cl->buf->pos,
103 cl->buf->last - cl->buf->pos,
104 cl->buf->file_pos,
105 cl->buf->file_last - cl->buf->file_pos);
106
107 #if 1
108 if (ngx_buf_size(cl->buf) == 0 && !ngx_buf_special(cl->buf)) {
109 ngx_log_error(NGX_LOG_ALERT, c->log, 0,
110 "zero size buf in writer "
111 "t:%d r:%d f:%d %p %p-%p %p %O-%O",
112 cl->buf->temporary,
113 cl->buf->recycled,
114 cl->buf->in_file,
115 cl->buf->start,
116 cl->buf->pos,
117 cl->buf->last,
118 cl->buf->file,
119 cl->buf->file_pos,
120 cl->buf->file_last);
121
122 ngx_debug_point();
123 return NGX_ERROR;
124 }
125 #endif
126
127 size += ngx_buf_size(cl->buf);
128
129 if (cl->buf->flush || cl->buf->recycled) {
130 flush = 1;
131 }
132
133 if (cl->buf->sync) {
134 sync = 1;
135 }
136
137 if (cl->buf->last_buf) {
138 last = 1;
139 }
140 }
141
142 /* add the new chain to the existent one */
143
144 for (ln = in; ln; ln = ln->next) {
145 cl = ngx_alloc_chain_link(c->pool);
146 if (cl == NULL) {
147 return NGX_ERROR;
148 }
149
150 cl->buf = ln->buf;
151 *ll = cl;
152 ll = &cl->next;
153
154 ngx_log_debug7(NGX_LOG_DEBUG_EVENT, c->log, 0,
155 "write new buf t:%d f:%d %p, pos %p, size: %z "
156 "file: %O, size: %O",
157 cl->buf->temporary, cl->buf->in_file,
158 cl->buf->start, cl->buf->pos,
159 cl->buf->last - cl->buf->pos,
160 cl->buf->file_pos,
161 cl->buf->file_last - cl->buf->file_pos);
162
163 #if 1
164 if (ngx_buf_size(cl->buf) == 0 && !ngx_buf_special(cl->buf)) {
165 ngx_log_error(NGX_LOG_ALERT, c->log, 0,
166 "zero size buf in writer "
167 "t:%d r:%d f:%d %p %p-%p %p %O-%O",
168 cl->buf->temporary,
169 cl->buf->recycled,
170 cl->buf->in_file,
171 cl->buf->start,
172 cl->buf->pos,
173 cl->buf->last,
174 cl->buf->file,
175 cl->buf->file_pos,
176 cl->buf->file_last);
177
178 ngx_debug_point();
179 return NGX_ERROR;
180 }
181 #endif
182
183 size += ngx_buf_size(cl->buf);
184
185 if (cl->buf->flush || cl->buf->recycled) {
186 flush = 1;
187 }
188
189 if (cl->buf->sync) {
190 sync = 1;
191 }
192
193 if (cl->buf->last_buf) {
194 last = 1;
195 }
196 }
197
198 *ll = NULL;
199
200 ngx_log_debug3(NGX_LOG_DEBUG_STREAM, c->log, 0,
201 "stream write filter: l:%ui f:%ui s:%O", last, flush, size);
202
203 if (size == 0
204 && !(c->buffered & NGX_LOWLEVEL_BUFFERED)
205 && !(last && c->need_last_buf))
206 {
207 if (last || flush || sync) {
208 for (cl = *out; cl; /* void */) {
209 ln = cl;
210 cl = cl->next;
211 ngx_free_chain(c->pool, ln);
212 }
213
214 *out = NULL;
215 c->buffered &= ~NGX_STREAM_WRITE_BUFFERED;
216
217 return NGX_OK;
218 }
219
220 ngx_log_error(NGX_LOG_ALERT, c->log, 0,
221 "the stream output chain is empty");
222
223 ngx_debug_point();
224
225 return NGX_ERROR;
226 }
227
228 chain = c->send_chain(c, *out, 0);
229
230 ngx_log_debug1(NGX_LOG_DEBUG_STREAM, c->log, 0,
231 "stream write filter %p", chain);
232
233 if (chain == NGX_CHAIN_ERROR) {
234 c->error = 1;
235 return NGX_ERROR;
236 }
237
238 for (cl = *out; cl && cl != chain; /* void */) {
239 ln = cl;
240 cl = cl->next;
241 ngx_free_chain(c->pool, ln);
242 }
243
244 *out = chain;
245
246 if (chain) {
247 if (c->shared) {
248 ngx_log_error(NGX_LOG_ALERT, c->log, 0,
249 "shared connection is busy");
250 return NGX_ERROR;
251 }
252
253 c->buffered |= NGX_STREAM_WRITE_BUFFERED;
254 return NGX_AGAIN;
255 }
256
257 c->buffered &= ~NGX_STREAM_WRITE_BUFFERED;
258
259 if (c->buffered & NGX_LOWLEVEL_BUFFERED) {
260 return NGX_AGAIN;
261 }
262
263 return NGX_OK;
264 }
265
266
267 static ngx_int_t
268 ngx_stream_write_filter_init(ngx_conf_t *cf)
269 {
270 ngx_stream_top_filter = ngx_stream_write_filter;
271
272 return NGX_OK;
273 }