comparison src/stream/ngx_stream_upstream_hash_module.c @ 6115:61d7ae76647d

Stream: port from NGINX+.
author Ruslan Ermilov <ru@nginx.com>
date Mon, 20 Apr 2015 13:05:11 +0300
parents
children b6047abf5f30
comparison
equal deleted inserted replaced
6114:4a640716f4e2 6115:61d7ae76647d
1
2 /*
3 * Copyright (C) Roman Arutyunyan
4 * Copyright (C) Nginx, Inc.
5 */
6
7
8 #include <ngx_config.h>
9 #include <ngx_core.h>
10 #include <ngx_stream.h>
11
12
13 typedef struct {
14 uint32_t hash;
15 ngx_str_t *server;
16 } ngx_stream_upstream_chash_point_t;
17
18
19 typedef struct {
20 ngx_uint_t number;
21 ngx_stream_upstream_chash_point_t point[1];
22 } ngx_stream_upstream_chash_points_t;
23
24
25 typedef struct {
26 ngx_stream_upstream_chash_points_t *points;
27 } ngx_stream_upstream_hash_srv_conf_t;
28
29
30 typedef struct {
31 /* the round robin data must be first */
32 ngx_stream_upstream_rr_peer_data_t rrp;
33 ngx_stream_upstream_hash_srv_conf_t *conf;
34 ngx_str_t key;
35 ngx_uint_t tries;
36 ngx_uint_t rehash;
37 uint32_t hash;
38 ngx_event_get_peer_pt get_rr_peer;
39 } ngx_stream_upstream_hash_peer_data_t;
40
41
42 static ngx_int_t ngx_stream_upstream_init_hash(ngx_conf_t *cf,
43 ngx_stream_upstream_srv_conf_t *us);
44 static ngx_int_t ngx_stream_upstream_init_hash_peer(ngx_stream_session_t *s,
45 ngx_stream_upstream_srv_conf_t *us);
46 static ngx_int_t ngx_stream_upstream_get_hash_peer(ngx_peer_connection_t *pc,
47 void *data);
48
49 static ngx_int_t ngx_stream_upstream_init_chash(ngx_conf_t *cf,
50 ngx_stream_upstream_srv_conf_t *us);
51 static int ngx_libc_cdecl
52 ngx_stream_upstream_chash_cmp_points(const void *one, const void *two);
53 static ngx_uint_t ngx_stream_upstream_find_chash_point(
54 ngx_stream_upstream_chash_points_t *points, uint32_t hash);
55 static ngx_int_t ngx_stream_upstream_init_chash_peer(ngx_stream_session_t *s,
56 ngx_stream_upstream_srv_conf_t *us);
57 static ngx_int_t ngx_stream_upstream_get_chash_peer(ngx_peer_connection_t *pc,
58 void *data);
59
60 static void *ngx_stream_upstream_hash_create_conf(ngx_conf_t *cf);
61 static char *ngx_stream_upstream_hash(ngx_conf_t *cf, ngx_command_t *cmd,
62 void *conf);
63
64
65 static ngx_command_t ngx_stream_upstream_hash_commands[] = {
66
67 { ngx_string("hash"),
68 NGX_STREAM_UPS_CONF|NGX_CONF_TAKE12,
69 ngx_stream_upstream_hash,
70 NGX_STREAM_SRV_CONF_OFFSET,
71 0,
72 NULL },
73
74 ngx_null_command
75 };
76
77
78 static ngx_stream_module_t ngx_stream_upstream_hash_module_ctx = {
79 NULL, /* create main configuration */
80 NULL, /* init main configuration */
81
82 ngx_stream_upstream_hash_create_conf, /* create server configuration */
83 NULL, /* merge server configuration */
84 };
85
86
87 ngx_module_t ngx_stream_upstream_hash_module = {
88 NGX_MODULE_V1,
89 &ngx_stream_upstream_hash_module_ctx, /* module context */
90 ngx_stream_upstream_hash_commands, /* module directives */
91 NGX_STREAM_MODULE, /* module type */
92 NULL, /* init master */
93 NULL, /* init module */
94 NULL, /* init process */
95 NULL, /* init thread */
96 NULL, /* exit thread */
97 NULL, /* exit process */
98 NULL, /* exit master */
99 NGX_MODULE_V1_PADDING
100 };
101
102
103 static ngx_int_t
104 ngx_stream_upstream_init_hash(ngx_conf_t *cf,
105 ngx_stream_upstream_srv_conf_t *us)
106 {
107 if (ngx_stream_upstream_init_round_robin(cf, us) != NGX_OK) {
108 return NGX_ERROR;
109 }
110
111 us->peer.init = ngx_stream_upstream_init_hash_peer;
112
113 return NGX_OK;
114 }
115
116
117 static ngx_int_t
118 ngx_stream_upstream_init_hash_peer(ngx_stream_session_t *s,
119 ngx_stream_upstream_srv_conf_t *us)
120 {
121 ngx_stream_upstream_hash_srv_conf_t *hcf;
122 ngx_stream_upstream_hash_peer_data_t *hp;
123
124 hp = ngx_palloc(s->connection->pool,
125 sizeof(ngx_stream_upstream_hash_peer_data_t));
126 if (hp == NULL) {
127 return NGX_ERROR;
128 }
129
130 s->upstream->peer.data = &hp->rrp;
131
132 if (ngx_stream_upstream_init_round_robin_peer(s, us) != NGX_OK) {
133 return NGX_ERROR;
134 }
135
136 s->upstream->peer.get = ngx_stream_upstream_get_hash_peer;
137
138 hcf = ngx_stream_conf_upstream_srv_conf(us,
139 ngx_stream_upstream_hash_module);
140
141 hp->key = s->connection->addr_text;
142
143 ngx_log_debug1(NGX_LOG_DEBUG_STREAM, s->connection->log, 0,
144 "upstream hash key:\"%V\"", &hp->key);
145
146 hp->conf = hcf;
147 hp->tries = 0;
148 hp->rehash = 0;
149 hp->hash = 0;
150 hp->get_rr_peer = ngx_stream_upstream_get_round_robin_peer;
151
152 return NGX_OK;
153 }
154
155
156 static ngx_int_t
157 ngx_stream_upstream_get_hash_peer(ngx_peer_connection_t *pc, void *data)
158 {
159 ngx_stream_upstream_hash_peer_data_t *hp = data;
160
161 time_t now;
162 u_char buf[NGX_INT_T_LEN];
163 size_t size;
164 uint32_t hash;
165 ngx_int_t w;
166 uintptr_t m;
167 ngx_uint_t i, n, p;
168 ngx_stream_upstream_rr_peer_t *peer;
169
170 ngx_log_debug1(NGX_LOG_DEBUG_STREAM, pc->log, 0,
171 "get hash peer, try: %ui", pc->tries);
172
173 ngx_stream_upstream_rr_peers_wlock(hp->rrp.peers);
174
175 if (hp->tries > 20 || hp->rrp.peers->single) {
176 ngx_stream_upstream_rr_peers_unlock(hp->rrp.peers);
177 return hp->get_rr_peer(pc, &hp->rrp);
178 }
179
180 now = ngx_time();
181
182 pc->connection = NULL;
183
184 for ( ;; ) {
185
186 /*
187 * Hash expression is compatible with Cache::Memcached:
188 * ((crc32([REHASH] KEY) >> 16) & 0x7fff) + PREV_HASH
189 * with REHASH omitted at the first iteration.
190 */
191
192 ngx_crc32_init(hash);
193
194 if (hp->rehash > 0) {
195 size = ngx_sprintf(buf, "%ui", hp->rehash) - buf;
196 ngx_crc32_update(&hash, buf, size);
197 }
198
199 ngx_crc32_update(&hash, hp->key.data, hp->key.len);
200 ngx_crc32_final(hash);
201
202 hash = (hash >> 16) & 0x7fff;
203
204 hp->hash += hash;
205 hp->rehash++;
206
207 if (!hp->rrp.peers->weighted) {
208 p = hp->hash % hp->rrp.peers->number;
209
210 peer = hp->rrp.peers->peer;
211 for (i = 0; i < p; i++) {
212 peer = peer->next;
213 }
214
215 } else {
216 w = hp->hash % hp->rrp.peers->total_weight;
217
218 for (peer = hp->rrp.peers->peer, i = 0;
219 peer;
220 peer = peer->next, i++)
221 {
222 w -= peer->weight;
223 if (w < 0) {
224 break;
225 }
226 }
227
228 p = i;
229 }
230
231 n = p / (8 * sizeof(uintptr_t));
232 m = (uintptr_t) 1 << p % (8 * sizeof(uintptr_t));
233
234 if (hp->rrp.tried[n] & m) {
235 goto next;
236 }
237
238 ngx_log_debug2(NGX_LOG_DEBUG_STREAM, pc->log, 0,
239 "get hash peer, value:%uD, peer:%ui", hp->hash, p);
240
241 if (peer->down) {
242 goto next;
243 }
244
245 if (peer->max_fails
246 && peer->fails >= peer->max_fails
247 && now - peer->checked <= peer->fail_timeout)
248 {
249 goto next;
250 }
251
252 break;
253
254 next:
255
256 if (++hp->tries > 20) {
257 ngx_stream_upstream_rr_peers_unlock(hp->rrp.peers);
258 return hp->get_rr_peer(pc, &hp->rrp);
259 }
260 }
261
262 hp->rrp.current = peer;
263
264 pc->sockaddr = peer->sockaddr;
265 pc->socklen = peer->socklen;
266 pc->name = &peer->name;
267
268 peer->conns++;
269
270 if (now - peer->checked > peer->fail_timeout) {
271 peer->checked = now;
272 }
273
274 ngx_stream_upstream_rr_peers_unlock(hp->rrp.peers);
275
276 hp->rrp.tried[n] |= m;
277
278 return NGX_OK;
279 }
280
281
282 static ngx_int_t
283 ngx_stream_upstream_init_chash(ngx_conf_t *cf,
284 ngx_stream_upstream_srv_conf_t *us)
285 {
286 u_char *host, *port, c;
287 size_t host_len, port_len, size;
288 uint32_t hash, base_hash, prev_hash;
289 ngx_str_t *server;
290 ngx_uint_t npoints, i, j;
291 ngx_stream_upstream_rr_peer_t *peer;
292 ngx_stream_upstream_rr_peers_t *peers;
293 ngx_stream_upstream_chash_points_t *points;
294 ngx_stream_upstream_hash_srv_conf_t *hcf;
295
296 if (ngx_stream_upstream_init_round_robin(cf, us) != NGX_OK) {
297 return NGX_ERROR;
298 }
299
300 us->peer.init = ngx_stream_upstream_init_chash_peer;
301
302 peers = us->peer.data;
303 npoints = peers->total_weight * 160;
304
305 size = sizeof(ngx_stream_upstream_chash_points_t)
306 + sizeof(ngx_stream_upstream_chash_point_t) * (npoints - 1);
307
308 points = ngx_palloc(cf->pool, size);
309 if (points == NULL) {
310 return NGX_ERROR;
311 }
312
313 points->number = 0;
314
315 for (peer = peers->peer; peer; peer = peer->next) {
316 server = &peer->server;
317
318 /*
319 * Hash expression is compatible with Cache::Memcached::Fast:
320 * crc32(HOST \0 PORT PREV_HASH).
321 */
322
323 if (server->len >= 5
324 && ngx_strncasecmp(server->data, (u_char *) "unix:", 5) == 0)
325 {
326 host = server->data + 5;
327 host_len = server->len - 5;
328 port = NULL;
329 port_len = 0;
330 goto done;
331 }
332
333 for (j = 0; j < server->len; j++) {
334 c = server->data[server->len - j - 1];
335
336 if (c == ':') {
337 host = server->data;
338 host_len = server->len - j - 1;
339 port = server->data + server->len - j;
340 port_len = j;
341 goto done;
342 }
343
344 if (c < '0' || c > '9') {
345 break;
346 }
347 }
348
349 host = server->data;
350 host_len = server->len;
351 port = NULL;
352 port_len = 0;
353
354 done:
355
356 ngx_crc32_init(base_hash);
357 ngx_crc32_update(&base_hash, host, host_len);
358 ngx_crc32_update(&base_hash, (u_char *) "", 1);
359 ngx_crc32_update(&base_hash, port, port_len);
360
361 prev_hash = 0;
362 npoints = peer->weight * 160;
363
364 for (j = 0; j < npoints; j++) {
365 hash = base_hash;
366
367 ngx_crc32_update(&hash, (u_char *) &prev_hash, sizeof(uint32_t));
368 ngx_crc32_final(hash);
369
370 points->point[points->number].hash = hash;
371 points->point[points->number].server = server;
372 points->number++;
373
374 prev_hash = hash;
375 }
376 }
377
378 ngx_qsort(points->point,
379 points->number,
380 sizeof(ngx_stream_upstream_chash_point_t),
381 ngx_stream_upstream_chash_cmp_points);
382
383 for (i = 0, j = 1; j < points->number; j++) {
384 if (points->point[i].hash != points->point[j].hash) {
385 points->point[++i] = points->point[j];
386 }
387 }
388
389 points->number = i + 1;
390
391 hcf = ngx_stream_conf_upstream_srv_conf(us,
392 ngx_stream_upstream_hash_module);
393 hcf->points = points;
394
395 return NGX_OK;
396 }
397
398
399 static int ngx_libc_cdecl
400 ngx_stream_upstream_chash_cmp_points(const void *one, const void *two)
401 {
402 ngx_stream_upstream_chash_point_t *first =
403 (ngx_stream_upstream_chash_point_t *) one;
404 ngx_stream_upstream_chash_point_t *second =
405 (ngx_stream_upstream_chash_point_t *) two;
406
407 if (first->hash < second->hash) {
408 return -1;
409
410 } else if (first->hash > second->hash) {
411 return 1;
412
413 } else {
414 return 0;
415 }
416 }
417
418
419 static ngx_uint_t
420 ngx_stream_upstream_find_chash_point(ngx_stream_upstream_chash_points_t *points,
421 uint32_t hash)
422 {
423 ngx_uint_t i, j, k;
424 ngx_stream_upstream_chash_point_t *point;
425
426 /* find first point >= hash */
427
428 point = &points->point[0];
429
430 i = 0;
431 j = points->number;
432
433 while (i < j) {
434 k = (i + j) / 2;
435
436 if (hash > point[k].hash) {
437 i = k + 1;
438
439 } else if (hash < point[k].hash) {
440 j = k;
441
442 } else {
443 return k;
444 }
445 }
446
447 return i;
448 }
449
450
451 static ngx_int_t
452 ngx_stream_upstream_init_chash_peer(ngx_stream_session_t *s,
453 ngx_stream_upstream_srv_conf_t *us)
454 {
455 uint32_t hash;
456 ngx_stream_upstream_hash_srv_conf_t *hcf;
457 ngx_stream_upstream_hash_peer_data_t *hp;
458
459 if (ngx_stream_upstream_init_hash_peer(s, us) != NGX_OK) {
460 return NGX_ERROR;
461 }
462
463 s->upstream->peer.get = ngx_stream_upstream_get_chash_peer;
464
465 hp = s->upstream->peer.data;
466 hcf = ngx_stream_conf_upstream_srv_conf(us,
467 ngx_stream_upstream_hash_module);
468
469 hash = ngx_crc32_long(hp->key.data, hp->key.len);
470
471 ngx_stream_upstream_rr_peers_rlock(hp->rrp.peers);
472
473 hp->hash = ngx_stream_upstream_find_chash_point(hcf->points, hash);
474
475 ngx_stream_upstream_rr_peers_unlock(hp->rrp.peers);
476
477 return NGX_OK;
478 }
479
480
481 static ngx_int_t
482 ngx_stream_upstream_get_chash_peer(ngx_peer_connection_t *pc, void *data)
483 {
484 ngx_stream_upstream_hash_peer_data_t *hp = data;
485
486 time_t now;
487 intptr_t m;
488 ngx_str_t *server;
489 ngx_int_t total;
490 ngx_uint_t i, n, best_i;
491 ngx_stream_upstream_rr_peer_t *peer, *best;
492 ngx_stream_upstream_chash_point_t *point;
493 ngx_stream_upstream_chash_points_t *points;
494 ngx_stream_upstream_hash_srv_conf_t *hcf;
495
496 ngx_log_debug1(NGX_LOG_DEBUG_STREAM, pc->log, 0,
497 "get consistent hash peer, try: %ui", pc->tries);
498
499 ngx_stream_upstream_rr_peers_wlock(hp->rrp.peers);
500
501 pc->connection = NULL;
502
503 now = ngx_time();
504 hcf = hp->conf;
505
506 points = hcf->points;
507 point = &points->point[0];
508
509 for ( ;; ) {
510 server = point[hp->hash % points->number].server;
511
512 ngx_log_debug2(NGX_LOG_DEBUG_STREAM, pc->log, 0,
513 "consistent hash peer:%uD, server:\"%V\"",
514 hp->hash, server);
515
516 best = NULL;
517 best_i = 0;
518 total = 0;
519
520 for (peer = hp->rrp.peers->peer, i = 0;
521 peer;
522 peer = peer->next, i++)
523 {
524
525 n = i / (8 * sizeof(uintptr_t));
526 m = (uintptr_t) 1 << i % (8 * sizeof(uintptr_t));
527
528 if (hp->rrp.tried[n] & m) {
529 continue;
530 }
531
532 if (peer->down) {
533 continue;
534 }
535
536 if (peer->server.len != server->len
537 || ngx_strncmp(peer->server.data, server->data, server->len)
538 != 0)
539 {
540 continue;
541 }
542
543 if (peer->max_fails
544 && peer->fails >= peer->max_fails
545 && now - peer->checked <= peer->fail_timeout)
546 {
547 continue;
548 }
549
550 peer->current_weight += peer->effective_weight;
551 total += peer->effective_weight;
552
553 if (peer->effective_weight < peer->weight) {
554 peer->effective_weight++;
555 }
556
557 if (best == NULL || peer->current_weight > best->current_weight) {
558 best = peer;
559 best_i = i;
560 }
561 }
562
563 if (best) {
564 best->current_weight -= total;
565 break;
566 }
567
568 hp->hash++;
569 hp->tries++;
570
571 if (hp->tries >= points->number) {
572 ngx_stream_upstream_rr_peers_unlock(hp->rrp.peers);
573 return NGX_BUSY;
574 }
575 }
576
577 hp->rrp.current = best;
578
579 pc->sockaddr = best->sockaddr;
580 pc->socklen = best->socklen;
581 pc->name = &best->name;
582
583 best->conns++;
584
585 if (now - best->checked > best->fail_timeout) {
586 best->checked = now;
587 }
588
589 ngx_stream_upstream_rr_peers_unlock(hp->rrp.peers);
590
591 n = best_i / (8 * sizeof(uintptr_t));
592 m = (uintptr_t) 1 << best_i % (8 * sizeof(uintptr_t));
593
594 hp->rrp.tried[n] |= m;
595
596 return NGX_OK;
597 }
598
599
600 static void *
601 ngx_stream_upstream_hash_create_conf(ngx_conf_t *cf)
602 {
603 ngx_stream_upstream_hash_srv_conf_t *conf;
604
605 conf = ngx_palloc(cf->pool, sizeof(ngx_stream_upstream_hash_srv_conf_t));
606 if (conf == NULL) {
607 return NULL;
608 }
609
610 conf->points = NULL;
611
612 return conf;
613 }
614
615
616 static char *
617 ngx_stream_upstream_hash(ngx_conf_t *cf, ngx_command_t *cmd, void *conf)
618 {
619 ngx_str_t *value;
620 ngx_stream_upstream_srv_conf_t *uscf;
621
622 value = cf->args->elts;
623
624 if (ngx_strcmp(value[1].data, "$remote_addr")) {
625 ngx_conf_log_error(NGX_LOG_EMERG, cf, 0,
626 "unsupported hash key \"%V\", use $remote_addr",
627 &value[1]);
628 return NGX_CONF_ERROR;
629 }
630
631 uscf = ngx_stream_conf_get_module_srv_conf(cf, ngx_stream_upstream_module);
632
633 if (uscf->peer.init_upstream) {
634 ngx_conf_log_error(NGX_LOG_WARN, cf, 0,
635 "load balancing method redefined");
636 }
637
638 uscf->flags = NGX_STREAM_UPSTREAM_CREATE
639 |NGX_STREAM_UPSTREAM_WEIGHT
640 |NGX_STREAM_UPSTREAM_MAX_FAILS
641 |NGX_STREAM_UPSTREAM_FAIL_TIMEOUT
642 |NGX_STREAM_UPSTREAM_DOWN;
643
644 if (cf->args->nelts == 2) {
645 uscf->peer.init_upstream = ngx_stream_upstream_init_hash;
646
647 } else if (ngx_strcmp(value[2].data, "consistent") == 0) {
648 uscf->peer.init_upstream = ngx_stream_upstream_init_chash;
649
650 } else {
651 ngx_conf_log_error(NGX_LOG_EMERG, cf, 0,
652 "invalid parameter \"%V\"", &value[2]);
653 return NGX_CONF_ERROR;
654 }
655
656 return NGX_CONF_OK;
657 }