comparison src/http/modules/ngx_http_upstream_hash_module.c @ 5717:efc84a5723b3

Upstream: generic hash module.
author Roman Arutyunyan <arut@nginx.com>
date Mon, 02 Jun 2014 16:16:22 +0400
parents
children 435ee290c2e1
comparison
equal deleted inserted replaced
5716:34d460c5d186 5717:efc84a5723b3
1
2 /*
3 * Copyright (C) Roman Arutyunyan
4 * Copyright (C) Nginx, Inc.
5 */
6
7
8 #include <ngx_config.h>
9 #include <ngx_core.h>
10 #include <ngx_http.h>
11
12
13 typedef struct {
14 uint32_t hash;
15 ngx_str_t *server;
16 } ngx_http_upstream_chash_point_t;
17
18
19 typedef struct {
20 ngx_uint_t number;
21 ngx_http_upstream_chash_point_t point[1];
22 } ngx_http_upstream_chash_points_t;
23
24
25 typedef struct {
26 ngx_http_complex_value_t key;
27 ngx_http_upstream_chash_points_t *points;
28 } ngx_http_upstream_hash_srv_conf_t;
29
30
31 typedef struct {
32 /* the round robin data must be first */
33 ngx_http_upstream_rr_peer_data_t rrp;
34 ngx_http_upstream_hash_srv_conf_t *conf;
35 ngx_str_t key;
36 ngx_uint_t tries;
37 ngx_uint_t rehash;
38 uint32_t hash;
39 ngx_event_get_peer_pt get_rr_peer;
40 } ngx_http_upstream_hash_peer_data_t;
41
42
43 static ngx_int_t ngx_http_upstream_init_hash(ngx_conf_t *cf,
44 ngx_http_upstream_srv_conf_t *us);
45 static ngx_int_t ngx_http_upstream_init_hash_peer(ngx_http_request_t *r,
46 ngx_http_upstream_srv_conf_t *us);
47 static ngx_int_t ngx_http_upstream_get_hash_peer(ngx_peer_connection_t *pc,
48 void *data);
49
50 static ngx_int_t ngx_http_upstream_init_chash(ngx_conf_t *cf,
51 ngx_http_upstream_srv_conf_t *us);
52 static void ngx_http_upstream_add_chash_point(
53 ngx_http_upstream_chash_points_t *points, uint32_t hash, ngx_str_t *server);
54 static ngx_uint_t ngx_http_upstream_find_chash_point(
55 ngx_http_upstream_chash_points_t *points, uint32_t hash);
56 static ngx_int_t ngx_http_upstream_init_chash_peer(ngx_http_request_t *r,
57 ngx_http_upstream_srv_conf_t *us);
58 static ngx_int_t ngx_http_upstream_get_chash_peer(ngx_peer_connection_t *pc,
59 void *data);
60
61 static void *ngx_http_upstream_hash_create_conf(ngx_conf_t *cf);
62 static char *ngx_http_upstream_hash(ngx_conf_t *cf, ngx_command_t *cmd,
63 void *conf);
64
65
66 static ngx_command_t ngx_http_upstream_hash_commands[] = {
67
68 { ngx_string("hash"),
69 NGX_HTTP_UPS_CONF|NGX_CONF_TAKE12,
70 ngx_http_upstream_hash,
71 NGX_HTTP_SRV_CONF_OFFSET,
72 0,
73 NULL },
74
75 ngx_null_command
76 };
77
78
79 static ngx_http_module_t ngx_http_upstream_hash_module_ctx = {
80 NULL, /* preconfiguration */
81 NULL, /* postconfiguration */
82
83 NULL, /* create main configuration */
84 NULL, /* init main configuration */
85
86 ngx_http_upstream_hash_create_conf, /* create server configuration */
87 NULL, /* merge server configuration */
88
89 NULL, /* create location configuration */
90 NULL /* merge location configuration */
91 };
92
93
94 ngx_module_t ngx_http_upstream_hash_module = {
95 NGX_MODULE_V1,
96 &ngx_http_upstream_hash_module_ctx, /* module context */
97 ngx_http_upstream_hash_commands, /* module directives */
98 NGX_HTTP_MODULE, /* module type */
99 NULL, /* init master */
100 NULL, /* init module */
101 NULL, /* init process */
102 NULL, /* init thread */
103 NULL, /* exit thread */
104 NULL, /* exit process */
105 NULL, /* exit master */
106 NGX_MODULE_V1_PADDING
107 };
108
109
110 static ngx_int_t
111 ngx_http_upstream_init_hash(ngx_conf_t *cf, ngx_http_upstream_srv_conf_t *us)
112 {
113 if (ngx_http_upstream_init_round_robin(cf, us) != NGX_OK) {
114 return NGX_ERROR;
115 }
116
117 us->peer.init = ngx_http_upstream_init_hash_peer;
118
119 return NGX_OK;
120 }
121
122
123 static ngx_int_t
124 ngx_http_upstream_init_hash_peer(ngx_http_request_t *r,
125 ngx_http_upstream_srv_conf_t *us)
126 {
127 ngx_http_upstream_hash_srv_conf_t *hcf;
128 ngx_http_upstream_hash_peer_data_t *hp;
129
130 hp = ngx_palloc(r->pool, sizeof(ngx_http_upstream_hash_peer_data_t));
131 if (hp == NULL) {
132 return NGX_ERROR;
133 }
134
135 r->upstream->peer.data = &hp->rrp;
136
137 if (ngx_http_upstream_init_round_robin_peer(r, us) != NGX_OK) {
138 return NGX_ERROR;
139 }
140
141 r->upstream->peer.get = ngx_http_upstream_get_hash_peer;
142
143 hcf = ngx_http_conf_upstream_srv_conf(us, ngx_http_upstream_hash_module);
144
145 if (ngx_http_complex_value(r, &hcf->key, &hp->key) != NGX_OK) {
146 return NGX_ERROR;
147 }
148
149 ngx_log_debug1(NGX_LOG_DEBUG_HTTP, r->connection->log, 0,
150 "upstream hash key:\"%V\"", &hp->key);
151
152 hp->conf = hcf;
153 hp->tries = 0;
154 hp->rehash = 0;
155 hp->hash = 0;
156 hp->get_rr_peer = ngx_http_upstream_get_round_robin_peer;
157
158 return NGX_OK;
159 }
160
161
162 static ngx_int_t
163 ngx_http_upstream_get_hash_peer(ngx_peer_connection_t *pc, void *data)
164 {
165 ngx_http_upstream_hash_peer_data_t *hp = data;
166
167 time_t now;
168 u_char buf[NGX_INT_T_LEN];
169 size_t size;
170 uint32_t hash;
171 ngx_int_t w;
172 uintptr_t m;
173 ngx_uint_t i, n, p;
174 ngx_http_upstream_rr_peer_t *peer;
175
176 ngx_log_debug1(NGX_LOG_DEBUG_HTTP, pc->log, 0,
177 "get hash peer, try: %ui", pc->tries);
178
179 if (hp->tries > 20 || hp->rrp.peers->single) {
180 return hp->get_rr_peer(pc, &hp->rrp);
181 }
182
183 now = ngx_time();
184
185 pc->cached = 0;
186 pc->connection = NULL;
187
188 for ( ;; ) {
189
190 /*
191 * Hash expression is compatible with Cache::Memcached:
192 * ((crc32([REHASH] KEY) >> 16) & 0x7fff) + PREV_HASH
193 * with REHASH omitted at the first iteration.
194 */
195
196 ngx_crc32_init(hash);
197
198 if (hp->rehash > 0) {
199 size = ngx_sprintf(buf, "%ui", hp->rehash) - buf;
200 ngx_crc32_update(&hash, buf, size);
201 }
202
203 ngx_crc32_update(&hash, hp->key.data, hp->key.len);
204 ngx_crc32_final(hash);
205
206 hash = (hash >> 16) & 0x7fff;
207
208 hp->hash += hash;
209 hp->rehash++;
210
211 if (!hp->rrp.peers->weighted) {
212 p = hp->hash % hp->rrp.peers->number;
213
214 } else {
215 w = hp->hash % hp->rrp.peers->total_weight;
216
217 for (i = 0; i < hp->rrp.peers->number; i++) {
218 w -= hp->rrp.peers->peer[i].weight;
219 if (w < 0) {
220 break;
221 }
222 }
223
224 p = i;
225 }
226
227 n = p / (8 * sizeof(uintptr_t));
228 m = (uintptr_t) 1 << p % (8 * sizeof(uintptr_t));
229
230 if (hp->rrp.tried[n] & m) {
231 goto next;
232 }
233
234 ngx_log_debug2(NGX_LOG_DEBUG_HTTP, pc->log, 0,
235 "get hash peer, value:%uD, peer:%ui", hp->hash, p);
236
237 peer = &hp->rrp.peers->peer[p];
238
239 if (peer->down) {
240 goto next;
241 }
242
243 if (peer->max_fails
244 && peer->fails >= peer->max_fails
245 && now - peer->checked <= peer->fail_timeout)
246 {
247 goto next;
248 }
249
250 break;
251
252 next:
253
254 if (++hp->tries > 20) {
255 return hp->get_rr_peer(pc, &hp->rrp);
256 }
257 }
258
259 hp->rrp.current = p;
260
261 pc->sockaddr = peer->sockaddr;
262 pc->socklen = peer->socklen;
263 pc->name = &peer->name;
264
265 if (now - peer->checked > peer->fail_timeout) {
266 peer->checked = now;
267 }
268
269 hp->rrp.tried[n] |= m;
270
271 return NGX_OK;
272 }
273
274
275 static ngx_int_t
276 ngx_http_upstream_init_chash(ngx_conf_t *cf, ngx_http_upstream_srv_conf_t *us)
277 {
278 u_char *host, *port, c;
279 size_t host_len, port_len, size;
280 uint32_t hash, base_hash, prev_hash;
281 ngx_str_t *server;
282 ngx_uint_t npoints, i, j;
283 ngx_http_upstream_rr_peer_t *peer;
284 ngx_http_upstream_rr_peers_t *peers;
285 ngx_http_upstream_chash_points_t *points;
286 ngx_http_upstream_hash_srv_conf_t *hcf;
287
288 if (ngx_http_upstream_init_round_robin(cf, us) != NGX_OK) {
289 return NGX_ERROR;
290 }
291
292 us->peer.init = ngx_http_upstream_init_chash_peer;
293
294 peers = us->peer.data;
295 npoints = peers->total_weight * 160;
296
297 size = sizeof(ngx_http_upstream_chash_points_t)
298 + sizeof(ngx_http_upstream_chash_point_t) * (npoints - 1);
299
300 points = ngx_palloc(cf->pool, size);
301 if (points == NULL) {
302 return NGX_ERROR;
303 }
304
305 points->number = 0;
306
307 for (i = 0; i < peers->number; i++) {
308 peer = &peers->peer[i];
309 server = &peer->server;
310
311 /*
312 * Hash expression is compatible with Cache::Memcached::Fast:
313 * crc32(HOST \0 PORT PREV_HASH).
314 */
315
316 if (server->len >= 5
317 && ngx_strncasecmp(server->data, (u_char *) "unix:", 5) == 0)
318 {
319 host = server->data + 5;
320 host_len = server->len - 5;
321 port = NULL;
322 port_len = 0;
323 goto done;
324 }
325
326 for (j = 0; j < server->len; j++) {
327 c = server->data[server->len - j - 1];
328
329 if (c == ':') {
330 host = server->data;
331 host_len = server->len - j - 1;
332 port = server->data + server->len - j;
333 port_len = j;
334 goto done;
335 }
336
337 if (c < '0' || c > '9') {
338 break;
339 }
340 }
341
342 host = server->data;
343 host_len = server->len;
344 port = NULL;
345 port_len = 0;
346
347 done:
348
349 ngx_crc32_init(base_hash);
350 ngx_crc32_update(&base_hash, host, host_len);
351 ngx_crc32_update(&base_hash, (u_char *) "", 1);
352 ngx_crc32_update(&base_hash, port, port_len);
353
354 prev_hash = 0;
355 npoints = peer->weight * 160;
356
357 for (j = 0; j < npoints; j++) {
358 hash = base_hash;
359
360 ngx_crc32_update(&hash, (u_char *) &prev_hash, sizeof(uint32_t));
361 ngx_crc32_final(hash);
362
363 ngx_http_upstream_add_chash_point(points, hash, &peer->server);
364
365 prev_hash = hash;
366 }
367 }
368
369 hcf = ngx_http_conf_upstream_srv_conf(us, ngx_http_upstream_hash_module);
370 hcf->points = points;
371
372 return NGX_OK;
373 }
374
375
376 static void
377 ngx_http_upstream_add_chash_point(ngx_http_upstream_chash_points_t *points,
378 uint32_t hash, ngx_str_t *server)
379 {
380 size_t size;
381 ngx_uint_t i;
382 ngx_http_upstream_chash_point_t *point;
383
384 i = ngx_http_upstream_find_chash_point(points, hash);
385 point = &points->point[i];
386
387 if (point->hash == hash) {
388 return;
389 }
390
391 size = (points->number - i) * sizeof(ngx_http_upstream_chash_point_t);
392
393 ngx_memmove(point + 1, point, size);
394
395 points->number++;
396 point->hash = hash;
397 point->server = server;
398 }
399
400
401 static ngx_uint_t
402 ngx_http_upstream_find_chash_point(ngx_http_upstream_chash_points_t *points,
403 uint32_t hash)
404 {
405 ngx_uint_t i, j, k;
406 ngx_http_upstream_chash_point_t *point;
407
408 /* find first point >= hash */
409
410 point = &points->point[0];
411
412 i = 0;
413 j = points->number;
414
415 while (i < j) {
416 k = (i + j) / 2;
417
418 if (hash > point[k].hash) {
419 i = k + 1;
420
421 } else if (hash < point[k].hash) {
422 j = k;
423
424 } else {
425 return k;
426 }
427 }
428
429 return i;
430 }
431
432
433 static ngx_int_t
434 ngx_http_upstream_init_chash_peer(ngx_http_request_t *r,
435 ngx_http_upstream_srv_conf_t *us)
436 {
437 uint32_t hash;
438 ngx_http_upstream_hash_srv_conf_t *hcf;
439 ngx_http_upstream_hash_peer_data_t *hp;
440
441 if (ngx_http_upstream_init_hash_peer(r, us) != NGX_OK) {
442 return NGX_ERROR;
443 }
444
445 r->upstream->peer.get = ngx_http_upstream_get_chash_peer;
446
447 hp = r->upstream->peer.data;
448 hcf = ngx_http_conf_upstream_srv_conf(us, ngx_http_upstream_hash_module);
449
450 hash = ngx_crc32_long(hp->key.data, hp->key.len);
451 hp->hash = ngx_http_upstream_find_chash_point(hcf->points, hash);
452
453 return NGX_OK;
454 }
455
456
457 static ngx_int_t
458 ngx_http_upstream_get_chash_peer(ngx_peer_connection_t *pc, void *data)
459 {
460 ngx_http_upstream_hash_peer_data_t *hp = data;
461
462 time_t now;
463 intptr_t m;
464 ngx_str_t *server;
465 ngx_int_t total;
466 ngx_uint_t i, n;
467 ngx_http_upstream_rr_peer_t *peer, *best;
468 ngx_http_upstream_chash_point_t *point;
469 ngx_http_upstream_chash_points_t *points;
470 ngx_http_upstream_hash_srv_conf_t *hcf;
471
472 ngx_log_debug1(NGX_LOG_DEBUG_HTTP, pc->log, 0,
473 "get consistent hash peer, try: %ui", pc->tries);
474
475 pc->cached = 0;
476 pc->connection = NULL;
477
478 now = ngx_time();
479 hcf = hp->conf;
480
481 points = hcf->points;
482 point = &points->point[0];
483
484 for ( ;; ) {
485 server = point[hp->hash % points->number].server;
486
487 ngx_log_debug2(NGX_LOG_DEBUG_HTTP, pc->log, 0,
488 "consistent hash peer:%uD, server:\"%V\"",
489 hp->hash, server);
490
491 best = NULL;
492 total = 0;
493
494 for (i = 0; i < hp->rrp.peers->number; i++) {
495
496 n = i / (8 * sizeof(uintptr_t));
497 m = (uintptr_t) 1 << i % (8 * sizeof(uintptr_t));
498
499 if (hp->rrp.tried[n] & m) {
500 continue;
501 }
502
503 peer = &hp->rrp.peers->peer[i];
504
505 if (peer->down) {
506 continue;
507 }
508
509 if (peer->server.len != server->len
510 || ngx_strncmp(peer->server.data, server->data, server->len)
511 != 0)
512 {
513 continue;
514 }
515
516 if (peer->max_fails
517 && peer->fails >= peer->max_fails
518 && now - peer->checked <= peer->fail_timeout)
519 {
520 continue;
521 }
522
523 peer->current_weight += peer->effective_weight;
524 total += peer->effective_weight;
525
526 if (peer->effective_weight < peer->weight) {
527 peer->effective_weight++;
528 }
529
530 if (best == NULL || peer->current_weight > best->current_weight) {
531 best = peer;
532 }
533 }
534
535 if (best) {
536 best->current_weight -= total;
537
538 i = best - &hp->rrp.peers->peer[0];
539
540 hp->rrp.current = i;
541
542 n = i / (8 * sizeof(uintptr_t));
543 m = (uintptr_t) 1 << i % (8 * sizeof(uintptr_t));
544
545 hp->rrp.tried[n] |= m;
546
547 if (now - best->checked > best->fail_timeout) {
548 best->checked = now;
549 }
550
551 pc->sockaddr = best->sockaddr;
552 pc->socklen = best->socklen;
553 pc->name = &best->name;
554
555 return NGX_OK;
556 }
557
558 hp->hash++;
559 hp->tries++;
560
561 if (hp->tries >= points->number) {
562 return NGX_BUSY;
563 }
564 }
565 }
566
567
568 static void *
569 ngx_http_upstream_hash_create_conf(ngx_conf_t *cf)
570 {
571 ngx_http_upstream_hash_srv_conf_t *conf;
572
573 conf = ngx_palloc(cf->pool, sizeof(ngx_http_upstream_hash_srv_conf_t));
574 if (conf == NULL) {
575 return NULL;
576 }
577
578 conf->points = NULL;
579
580 return conf;
581 }
582
583
584 static char *
585 ngx_http_upstream_hash(ngx_conf_t *cf, ngx_command_t *cmd, void *conf)
586 {
587 ngx_http_upstream_hash_srv_conf_t *hcf = conf;
588
589 ngx_str_t *value;
590 ngx_http_upstream_srv_conf_t *uscf;
591 ngx_http_compile_complex_value_t ccv;
592
593 value = cf->args->elts;
594
595 ngx_memzero(&ccv, sizeof(ngx_http_compile_complex_value_t));
596
597 ccv.cf = cf;
598 ccv.value = &value[1];
599 ccv.complex_value = &hcf->key;
600
601 if (ngx_http_compile_complex_value(&ccv) != NGX_OK) {
602 return NGX_CONF_ERROR;
603 }
604
605 uscf = ngx_http_conf_get_module_srv_conf(cf, ngx_http_upstream_module);
606
607 if (uscf->peer.init_upstream) {
608 ngx_conf_log_error(NGX_LOG_WARN, cf, 0,
609 "load balancing method redefined");
610 }
611
612 uscf->flags = NGX_HTTP_UPSTREAM_CREATE
613 |NGX_HTTP_UPSTREAM_WEIGHT
614 |NGX_HTTP_UPSTREAM_MAX_FAILS
615 |NGX_HTTP_UPSTREAM_FAIL_TIMEOUT
616 |NGX_HTTP_UPSTREAM_DOWN;
617
618 if (cf->args->nelts == 2) {
619 uscf->peer.init_upstream = ngx_http_upstream_init_hash;
620
621 } else if (ngx_strcmp(value[2].data, "consistent") == 0) {
622 uscf->peer.init_upstream = ngx_http_upstream_init_chash;
623
624 } else {
625 ngx_conf_log_error(NGX_LOG_EMERG, cf, 0,
626 "invalid parameter \"%V\"", &value[2]);
627 return NGX_CONF_ERROR;
628 }
629
630 return NGX_CONF_OK;
631 }