Skip to content

Commit

Permalink
Merge pull request #237 from uhm0311/uhm0311/hash_collision
Browse files Browse the repository at this point in the history
ENHANCE: Select smallest host as owner when ketama hash collided #113
  • Loading branch information
jhpark816 committed Mar 8, 2022
2 parents feb9418 + 49dcab9 commit 83073f1
Show file tree
Hide file tree
Showing 6 changed files with 59 additions and 0 deletions.
7 changes: 7 additions & 0 deletions libmemcached/connect.cc
Original file line number Diff line number Diff line change
Expand Up @@ -131,12 +131,15 @@ static memcached_return_t set_hostinfo(memcached_server_st *server)
server->address_info_next= NULL;
}

#ifdef KETAMA_HASH_COLLSION
#else
char str_port[NI_MAXSERV];
int length= snprintf(str_port, NI_MAXSERV, "%u", (uint32_t)server->port);
if (length >= NI_MAXSERV or length < 0)
{
return MEMCACHED_FAILURE;
}
#endif

struct addrinfo hints;
memset(&hints, 0, sizeof(struct addrinfo));
Expand All @@ -157,7 +160,11 @@ static memcached_return_t set_hostinfo(memcached_server_st *server)

server->address_info= NULL;
int errcode;
#ifdef KETAMA_HASH_COLLSION
switch(errcode= getaddrinfo(server->hostname, server->str_port, &hints, &server->address_info))
#else
switch(errcode= getaddrinfo(server->hostname, str_port, &hints, &server->address_info))
#endif
{
case 0:
break;
Expand Down
1 change: 1 addition & 0 deletions libmemcached/constants.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
#define BOP_ARITHMETIC_INITIAL 1
#define POOL_UPDATE_SERVERLIST 1
#define POOL_MORE_CONCURRENCY 1
#define KETAMA_HASH_COLLSION 1

/* Public defines */
#define MEMCACHED_DEFAULT_PORT 11211
Expand Down
4 changes: 4 additions & 0 deletions libmemcached/hash.cc
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,10 @@ static uint32_t dispatch_host(const memcached_st *ptr, uint32_t hash)
}
if (right == end)
right= begin;
#ifdef KETAMA_HASH_COLLSION
while (right > begin && (right-1)->value == hash)
right= right-1;
#endif
return right->index;
}
case MEMCACHED_DISTRIBUTION_MODULA:
Expand Down
34 changes: 34 additions & 0 deletions libmemcached/hosts.cc
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,13 @@ static memcached_return_t update_continuum(memcached_st *ptr);
static memcached_return_t update_continuum_based_on_rgroups(memcached_st *ptr);
#endif

#ifdef KETAMA_HASH_COLLSION
#ifdef ENABLE_REPLICATION
memcached_rgroup_st *cmp_rgroups= NULL;
#endif
memcached_server_st *cmp_servers= NULL;
#endif

static int compare_servers(const void *p1, const void *p2)
{
int return_value;
Expand Down Expand Up @@ -198,7 +205,22 @@ static int continuum_item_cmp(const void *t1, const void *t2)
/* Why 153? Hmmm... */
WATCHPOINT_ASSERT(ct1->value != 153);
if (ct1->value == ct2->value)
{
#ifdef KETAMA_HASH_COLLSION
#ifdef ENABLE_REPLICATION
if (cmp_rgroups != NULL) {
return strcmp(cmp_rgroups[ct1->index].groupname, cmp_rgroups[ct2->index].groupname);
}
#endif
if (cmp_servers != NULL) {
int return_value= strcmp(cmp_servers[ct1->index].hostname, cmp_servers[ct2->index].hostname);
if (return_value == 0)
return_value= strcmp(cmp_servers[ct1->index].str_port, cmp_servers[ct2->index].str_port);
return return_value;
}
#endif
return 0;
}
if (ct1->value > ct2->value)
return 1;
else
Expand Down Expand Up @@ -446,7 +468,13 @@ static memcached_return_t update_continuum(memcached_st *ptr)
WATCHPOINT_ASSERT(new_continuum);
WATCHPOINT_ASSERT(memcached_server_count(ptr) * MEMCACHED_POINTS_PER_SERVER <= MEMCACHED_CONTINUUM_SIZE);

#ifdef KETAMA_HASH_COLLSION
cmp_servers= ptr->servers;
#endif
qsort(new_continuum, pointer_counter, sizeof(memcached_continuum_item_st), continuum_item_cmp);
#ifdef KETAMA_HASH_COLLSION
cmp_servers= NULL;
#endif

new_ketama_info->continuum= new_continuum;
new_ketama_info->continuum_points_counter= pointer_counter;
Expand Down Expand Up @@ -634,7 +662,13 @@ static memcached_return_t update_continuum_based_on_rgroups(memcached_st *ptr)
WATCHPOINT_ASSERT(new_continuum);
WATCHPOINT_ASSERT(memcached_server_count(ptr) * MEMCACHED_POINTS_PER_SERVER <= MEMCACHED_CONTINUUM_SIZE);

#ifdef KETAMA_HASH_COLLSION
cmp_rgroups= ptr->rgroups;
#endif
qsort(new_continuum, pointer_counter, sizeof(memcached_continuum_item_st), continuum_item_cmp);
#ifdef KETAMA_HASH_COLLSION
cmp_rgroups= NULL;
#endif

new_ketama_info->continuum= new_continuum;
new_ketama_info->continuum_points_counter= pointer_counter;
Expand Down
3 changes: 3 additions & 0 deletions libmemcached/server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,9 @@ static inline void _server_init(memcached_server_st *self, memcached_st *root,
self->limit_maxbytes= 0;
memcpy(self->hostname, hostname.c_str, hostname.size);
self->hostname[hostname.size]= 0;
#ifdef KETAMA_HASH_COLLSION
snprintf(self->str_port, MEMCACHED_NI_MAXSERV, "%u", (uint32_t)port);
#endif

#ifdef ENABLE_REPLICATION
/* Needed for replication */
Expand Down
10 changes: 10 additions & 0 deletions libmemcached/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,13 @@
#else
#define MEMCACHED_NI_MAXHOST 1025
#endif
#ifdef KETAMA_HASH_COLLSION
#ifdef NI_MAXSERV
#define MEMCACHED_NI_MAXSERV NI_MAXSERV
#else
#define MEMCACHED_NI_MAXSERV 32
#endif
#endif /* KETAMA_HASH_COLLSION */

enum memcached_server_state_t {
MEMCACHED_SERVER_STATE_NEW, // fd == -1, no address lookup has been done
Expand Down Expand Up @@ -106,6 +113,9 @@ struct memcached_server_st {
char read_buffer[MEMCACHED_MAX_BUFFER];
char write_buffer[MEMCACHED_MAX_BUFFER];
char hostname[MEMCACHED_NI_MAXHOST];
#ifdef KETAMA_HASH_COLLSION
char str_port[MEMCACHED_NI_MAXSERV];
#endif
};


Expand Down

0 comments on commit 83073f1

Please sign in to comment.