Skip to content

Commit

Permalink
ENHANCE: Enhance error handling when serverlist update failed naver#219
Browse files Browse the repository at this point in the history
  • Loading branch information
uhm0311 committed Mar 14, 2022
1 parent 83073f1 commit f773f94
Show file tree
Hide file tree
Showing 24 changed files with 928 additions and 0 deletions.
74 changes: 74 additions & 0 deletions libmemcached/arcus.cc
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,11 @@ static inline void do_arcus_zk_watcher_cachelist(zhandle_t *zh, int type, int st
* ARCUS_ZK_OPERATIONS
*/
static inline void do_arcus_zk_update_cachelist(memcached_st *mc, const struct String_vector *strings);
#ifdef CACHELIST_ERROR_HANDLING
static inline memcached_return_t do_arcus_zk_update_cachelist_by_string(memcached_st *mc, char *serverlist, const size_t size);
#else
static inline void do_arcus_zk_update_cachelist_by_string(memcached_st *mc, char *serverlist, const size_t size);
#endif
static inline void do_arcus_zk_watch_and_update_cachelist(memcached_st *mc, bool *retry);

/**
Expand Down Expand Up @@ -418,7 +422,13 @@ arcus_return_t arcus_proxy_connect(memcached_st *mc,

arcus_set_log_stream(mc, proxy->logfile);
rc= do_arcus_proxy_connect(mc, proxy);
#ifdef CACHELIST_ERROR_HANDLING
if (rc != ARCUS_SUCCESS or arcus_server_check_for_update(mc) != MEMCACHED_SUCCESS) {
return ARCUS_ERROR;
}
#else
arcus_server_check_for_update(mc);
#endif

return rc;
}
Expand Down Expand Up @@ -725,15 +735,26 @@ static inline arcus_return_t do_arcus_zk_close(memcached_st *mc)
* PUBLIC API
* Check for cache server updates.
*/
#ifdef CACHELIST_ERROR_HANDLING
memcached_return_t arcus_server_check_for_update(memcached_st *ptr)
#else
void arcus_server_check_for_update(memcached_st *ptr)
#endif
{
#ifdef CACHELIST_ERROR_HANDLING
memcached_return_t rc= MEMCACHED_SUCCESS;
#endif
arcus_st *arcus;
size_t size;
uint32_t version;

arcus= static_cast<arcus_st *>(memcached_get_server_manager(ptr));
if (not arcus) {
#ifdef CACHELIST_ERROR_HANDLING
return rc;
#else
return;
#endif
}

if (arcus->proxy.data && arcus->proxy.data->version != arcus->proxy.current_version)
Expand All @@ -751,10 +772,18 @@ void arcus_server_check_for_update(memcached_st *ptr)
if (master && master->configure.version == ptr->configure.version)
#endif
{
#ifdef CACHELIST_ERROR_HANDLING
rc= do_arcus_zk_update_cachelist_by_string(master, arcus->proxy.data->serverlist, size);
#else
do_arcus_zk_update_cachelist_by_string(master, arcus->proxy.data->serverlist, size);
#endif
}
} else {
#ifdef CACHELIST_ERROR_HANDLING
rc= do_arcus_zk_update_cachelist_by_string(ptr, arcus->proxy.data->serverlist, size);
#else
do_arcus_zk_update_cachelist_by_string(ptr, arcus->proxy.data->serverlist, size);
#endif
}
arcus->proxy.current_version= version;
}
Expand All @@ -772,10 +801,17 @@ void arcus_server_check_for_update(memcached_st *ptr)
{
/* master's cache list was changed, update member's cache list */
pthread_mutex_lock(&lock_arcus);
#ifdef CACHELIST_ERROR_HANDLING
rc= memcached_pool_update_member(arcus->pool, ptr);
#else
(void)memcached_pool_update_member(arcus->pool, ptr);
#endif
pthread_mutex_unlock(&lock_arcus);
}
}
#ifdef CACHELIST_ERROR_HANDLING
return rc;
#endif
}

/**
Expand Down Expand Up @@ -854,9 +890,15 @@ static inline int do_add_server_to_cachelist(struct arcus_zk_st *zkinfo, char *n
return 0;
}

#ifdef CACHELIST_ERROR_HANDLING
static inline memcached_return_t do_arcus_update_cachelist(memcached_st *mc,
memcached_server_info_st *serverinfo,
uint32_t servercount)
#else
static inline void do_arcus_update_cachelist(memcached_st *mc,
memcached_server_info_st *serverinfo,
uint32_t servercount)
#endif
{
arcus_st *arcus= static_cast<arcus_st *>(memcached_get_server_manager(mc));
memcached_return_t error= MEMCACHED_SUCCESS;
Expand Down Expand Up @@ -907,25 +949,46 @@ static inline void do_arcus_update_cachelist(memcached_st *mc,
ZOO_LOG_WARN(("CACHE_LIST=UPDATED, to %s, cache_servers=%d in %d ms",
arcus->zk.ensemble_list, mc->number_of_hosts, msec));
} else {
#ifdef CACHELIST_ERROR_HANDLING
ZOO_LOG_WARN(("CACHE_LIST=UPDATE_FAIL in %d ms. Reason=%s(%d)", msec, memcached_strerror(NULL, error), error));
#else
ZOO_LOG_WARN(("CACHE_LIST=UPDATE_FAIL in %d ms", msec));
#endif
}
#ifdef CACHELIST_ERROR_HANDLING
return error;
#endif
}

/**
* Rebuild the memcached server list by string
*/
#ifdef CACHELIST_ERROR_HANDLING
static inline memcached_return_t do_arcus_zk_update_cachelist_by_string(memcached_st *mc,
char *serverlist,
const size_t size)

#else
static inline void do_arcus_zk_update_cachelist_by_string(memcached_st *mc,
char *serverlist,
const size_t size)
#endif
{
#ifdef CACHELIST_ERROR_HANDLING
memcached_return_t rc= MEMCACHED_SUCCESS;
#endif
arcus_st *arcus = static_cast<arcus_st *>(memcached_get_server_manager(mc));
uint32_t servercount= 0;
memcached_server_info_st *serverinfo;
char buffer[ARCUS_MAX_PROXY_FILE_LENGTH];

serverinfo = static_cast<memcached_server_info_st *>(libmemcached_malloc(mc, sizeof(memcached_server_info_st)*(size+1)));
if (not serverinfo) {
#ifdef CACHELIST_ERROR_HANDLING
return rc;
#else
return;
#endif
}

strncpy(buffer, serverlist, ARCUS_MAX_PROXY_FILE_LENGTH);
Expand All @@ -948,10 +1011,17 @@ static inline void do_arcus_zk_update_cachelist_by_string(memcached_st *mc,
}

pthread_mutex_lock(&lock_arcus);
#ifdef CACHELIST_ERROR_HANDLING
rc= do_arcus_update_cachelist(mc, serverinfo, servercount);
#else
do_arcus_update_cachelist(mc, serverinfo, servercount);
#endif
pthread_mutex_unlock(&lock_arcus);

libmemcached_free(mc, serverinfo);
#ifdef CACHELIST_ERROR_HANDLING
return rc;
#endif
}

static inline int do_arcus_zk_process_reconnect(memcached_st *mc, bool *retry)
Expand Down Expand Up @@ -1058,7 +1128,11 @@ static inline void do_arcus_zk_update_cachelist(memcached_st *mc,
servercount++; /* valid znode name */
}
}
#ifdef CACHELIST_ERROR_HANDLING
(void)do_arcus_update_cachelist(mc, serverinfo, servercount);
#else
do_arcus_update_cachelist(mc, serverinfo, servercount);
#endif
libmemcached_free(mc, serverinfo);
}
} while(0);
Expand Down
11 changes: 11 additions & 0 deletions libmemcached/arcus_priv.h
Original file line number Diff line number Diff line change
Expand Up @@ -125,14 +125,25 @@ struct memcached_server_info_st
};

LIBMEMCACHED_API
#ifdef CACHELIST_ERROR_HANDLING
memcached_return_t arcus_server_check_for_update(memcached_st *ptr);
#else
void arcus_server_check_for_update(memcached_st *ptr);
#endif

#else /* LIBMEMCACHED_WITH_ZK_INTEGRATION */

#ifdef CACHELIST_ERROR_HANDLING
static inline memcached_return_t arcus_server_check_for_update(memcached_st *)
{
return MEMCACHED_SUCCESS;
}
#else
static inline void arcus_server_check_for_update(memcached_st *)
{
/* Nothing */
}
#endif
#endif /* LIBMEMCACHED_WITH_ZK_INTEGRATION */

#endif /* __LIBMEMCACHED_ARCUS_PRIV_H__ */
22 changes: 22 additions & 0 deletions libmemcached/auto.cc
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,13 @@ static memcached_return_t text_incr_decr(memcached_st *ptr,
const time_t expiration,
uint64_t *value)
{
#ifdef CACHELIST_ERROR_HANDLING
if (arcus_server_check_for_update(ptr) != MEMCACHED_SUCCESS) {
return memcached_set_error(*ptr, MEMCACHED_INVALID_SERVERLIST, MEMCACHED_AT);
}
#else
arcus_server_check_for_update(ptr);
#endif

if (memcached_failed(memcached_key_test(*ptr, (const char **)&key, &key_length, 1)))
{
Expand Down Expand Up @@ -106,6 +112,11 @@ static memcached_return_t text_incr_decr(memcached_st *ptr,
};

uint32_t server_key= memcached_generate_hash_with_redistribution(ptr, group_key, group_key_length);
#ifdef CACHELIST_ERROR_HANDLING
if (server_key == UINT32_MAX) {
return memcached_set_error(*ptr, MEMCACHED_INVALID_HASHRING, MEMCACHED_AT);
}
#endif
memcached_server_write_instance_st instance= memcached_server_instance_fetch(ptr, server_key);

#ifdef ENABLE_REPLICATION
Expand Down Expand Up @@ -176,7 +187,13 @@ static memcached_return_t binary_incr_decr(memcached_st *ptr, uint8_t cmd,
{
bool no_reply= ptr->flags.no_reply;

#ifdef CACHELIST_ERROR_HANDLING
if (arcus_server_check_for_update(ptr) != MEMCACHED_SUCCESS) {
return memcached_set_error(*ptr, MEMCACHED_INVALID_SERVERLIST, MEMCACHED_AT);
}
#else
arcus_server_check_for_update(ptr);
#endif

if (memcached_server_count(ptr) == 0)
return memcached_set_error(*ptr, MEMCACHED_NO_SERVERS, MEMCACHED_AT);
Expand Down Expand Up @@ -209,6 +226,11 @@ static memcached_return_t binary_incr_decr(memcached_st *ptr, uint8_t cmd,
};

uint32_t server_key= memcached_generate_hash_with_redistribution(ptr, group_key, group_key_length);
#ifdef CACHELIST_ERROR_HANDLING
if (server_key == UINT32_MAX) {
return memcached_set_error(*ptr, MEMCACHED_INVALID_HASHRING, MEMCACHED_AT);
}
#endif
memcached_server_write_instance_st instance= memcached_server_instance_fetch(ptr, server_key);

#ifdef ENABLE_REPLICATION
Expand Down
12 changes: 12 additions & 0 deletions libmemcached/behavior.cc
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,11 @@ memcached_return_t memcached_behavior_set(memcached_st *ptr,
case MEMCACHED_BEHAVIOR_SORT_HOSTS:
{
ptr->flags.use_sort_hosts= bool(data);
#ifdef CACHELIST_ERROR_HANDLING
return run_distribution(ptr);
#else
run_distribution(ptr);
#endif

break;
}
Expand Down Expand Up @@ -487,9 +491,13 @@ memcached_return_t memcached_behavior_set_distribution(memcached_st *ptr, memcac
}

ptr->distribution= type;
#ifdef CACHELIST_ERROR_HANDLING
return run_distribution(ptr);
#else
run_distribution(ptr);

return MEMCACHED_SUCCESS;
#endif
}

return memcached_set_error(*ptr, MEMCACHED_INVALID_ARGUMENTS, MEMCACHED_AT,
Expand Down Expand Up @@ -616,7 +624,11 @@ memcached_return_t memcached_bucket_set(memcached_st *self,

if (memcached_failed(rc= memcached_virtual_bucket_create(self, host_map, forward_map, buckets, replicas)))
{
#ifdef CACHELIST_ERROR_HANDLING
return memcached_behavior_set_distribution(self, old);
#else
memcached_behavior_set_distribution(self, old);
#endif
}

return rc;
Expand Down
Loading

0 comments on commit f773f94

Please sign in to comment.