From 539374717956982927d4d8b0d4a6aeccae14ee09 Mon Sep 17 00:00:00 2001 From: uhm0311 Date: Wed, 29 Dec 2021 13:39:11 +0900 Subject: [PATCH] ENHANCE: Enhance error handling where uses memcached_vdo() #167 --- libmemcached/auto.cc | 22 ++ libmemcached/collection.cc | 412 +++++++++++++++++++++++++++++++++++++ libmemcached/constants.h | 2 + libmemcached/delete.cc | 75 +++++++ libmemcached/exist.cc | 33 +++ libmemcached/get.cc | 6 + libmemcached/stats.cc | 9 + libmemcached/storage.cc | 54 +++++ libmemcached/version.cc | 8 + 9 files changed, 621 insertions(+) diff --git a/libmemcached/auto.cc b/libmemcached/auto.cc index e8ceca19..6d399705 100644 --- a/libmemcached/auto.cc +++ b/libmemcached/auto.cc @@ -113,9 +113,21 @@ static memcached_return_t text_incr_decr(memcached_st *ptr, #endif /* Send command header */ memcached_return_t rc= memcached_vdo(instance, vector, 7, true); +#ifdef MEMCACHED_VDO_ERROR_HANDLING + if (memcached_failed(rc)) + { + if (rc == MEMCACHED_WRITE_FAILURE) + memcached_io_reset(instance); + return memcached_set_error(*instance, rc, MEMCACHED_AT); + } + + if (ptr->flags.no_reply) + return MEMCACHED_SUCCESS; +#else if (ptr->flags.no_reply or memcached_failed(rc)) { return rc; +#endif } char result[MEMCACHED_DEFAULT_COMMAND_SIZE]; @@ -214,12 +226,22 @@ static memcached_return_t binary_incr_decr(memcached_st *ptr, uint8_t cmd, #ifdef ENABLE_REPLICATION do_action: #endif +#ifdef MEMCACHED_VDO_ERROR_HANDLING + memcached_return_t rc= memcached_vdo(instance, vector, 3, true); + if (memcached_failed(rc)) + { + if (rc == MEMCACHED_WRITE_FAILURE) + memcached_io_reset(instance); + return memcached_set_error(*instance, rc, MEMCACHED_AT); + } +#else memcached_return_t rc; if (memcached_failed(rc= memcached_vdo(instance, vector, 3, true))) { memcached_io_reset(instance); return (rc == MEMCACHED_SUCCESS) ? MEMCACHED_WRITE_FAILURE : rc; } +#endif if (no_reply) return MEMCACHED_SUCCESS; diff --git a/libmemcached/collection.cc b/libmemcached/collection.cc index e8bd370c..6ada8749 100644 --- a/libmemcached/collection.cc +++ b/libmemcached/collection.cc @@ -734,6 +734,35 @@ memcached_return_t memcached_set_attrs(memcached_st *ptr, WATCHPOINT_IFERROR(rc); +#ifdef MEMCACHED_VDO_ERROR_HANDLING + if (memcached_failed(rc)) + { + if (rc == MEMCACHED_WRITE_FAILURE) + memcached_io_reset(instance); + return memcached_set_error(*instance, rc, MEMCACHED_AT); + } + + if (to_write == false) + return MEMCACHED_BUFFERED; + + if (ptr->flags.no_reply) + return MEMCACHED_SUCCESS; + + // expecting OK (MEMCACHED_SUCCESS) + char result[MEMCACHED_DEFAULT_COMMAND_SIZE]; + rc= memcached_coll_response(instance, result, MEMCACHED_DEFAULT_COMMAND_SIZE, NULL); +#ifdef ENABLE_REPLICATION + if (rc == MEMCACHED_SWITCHOVER or rc == MEMCACHED_REPL_SLAVE) + { + ZOO_LOG_INFO(("Switchover: hostname=%s port=%d error=%s", + instance->hostname, instance->port, memcached_strerror(ptr, rc))); + if (memcached_rgroup_switchover(ptr, instance) == true) { + instance= memcached_server_instance_fetch(ptr, server_key); + goto do_action; + } + } +#endif +#else if (rc == MEMCACHED_SUCCESS) { if (to_write == false) @@ -765,6 +794,7 @@ memcached_return_t memcached_set_attrs(memcached_st *ptr, if (rc == MEMCACHED_WRITE_FAILURE) memcached_io_reset(instance); +#endif return rc; } @@ -797,8 +827,20 @@ memcached_return_t memcached_get_attrs(memcached_st *ptr, /* Request */ rc= memcached_vdo(instance, vector, 3, true); +#ifdef MEMCACHED_VDO_ERROR_HANDLING + if (memcached_failed(rc)) + { + if (rc == MEMCACHED_WRITE_FAILURE) + memcached_io_reset(instance); + return memcached_set_error(*instance, rc, MEMCACHED_AT); + } + + if (ptr->flags.no_reply) + return MEMCACHED_SUCCESS; +#else if (rc != MEMCACHED_SUCCESS) return rc; +#endif /* Response */ char result[MEMCACHED_DEFAULT_COMMAND_SIZE]; // Uninitialized... valgrind would warn about this, but that would be okay. @@ -1009,6 +1051,38 @@ static memcached_return_t do_coll_create(memcached_st *ptr, #endif rc= memcached_vdo(instance, vector, 4, to_write); +#ifdef MEMCACHED_VDO_ERROR_HANDLING + if (memcached_failed(rc)) + { + if (rc == MEMCACHED_WRITE_FAILURE) + memcached_io_reset(instance); + return memcached_set_error(*instance, rc, MEMCACHED_AT); + } + + if (to_write == false) + return MEMCACHED_BUFFERED; + + if (ptr->flags.no_reply or ptr->flags.piped) + return MEMCACHED_SUCCESS; + + char result[MEMCACHED_DEFAULT_COMMAND_SIZE]; + rc= memcached_coll_response(instance, result, MEMCACHED_DEFAULT_COMMAND_SIZE, NULL); +#ifdef ENABLE_REPLICATION + if (rc == MEMCACHED_SWITCHOVER or rc == MEMCACHED_REPL_SLAVE) + { + ZOO_LOG_INFO(("Switchover: hostname=%s port=%d error=%s", + instance->hostname, instance->port, memcached_strerror(ptr, rc))); + if (memcached_rgroup_switchover(ptr, instance) == true) { + instance= memcached_server_instance_fetch(ptr, server_key); + goto do_action; + } + } +#endif + memcached_set_last_response_code(ptr, rc); + + if (rc == MEMCACHED_CREATED) + rc= MEMCACHED_SUCCESS; +#else if (rc == MEMCACHED_SUCCESS) { if (to_write == false) @@ -1047,6 +1121,7 @@ static memcached_return_t do_coll_create(memcached_st *ptr, { memcached_io_reset(instance); } +#endif return rc; } @@ -1184,11 +1259,19 @@ static memcached_return_t internal_coll_piped_insert(memcached_st *ptr, bool to_write= true; /* do not buffer requests internally. */ rc= memcached_vdo(instance, vector, 7, to_write); +#ifdef MEMCACHED_VDO_ERROR_HANDLING + if (memcached_failed(rc)) +#else if (rc != MEMCACHED_SUCCESS) +#endif { if (rc == MEMCACHED_WRITE_FAILURE) memcached_io_reset(instance); +#ifdef MEMCACHED_VDO_ERROR_HANDLING + return memcached_set_error(*instance, rc, MEMCACHED_AT); +#else return rc; +#endif } ptr->pipe_buffer_pos = 0; /* reset pipe_buffer_pos */ @@ -1259,11 +1342,19 @@ static memcached_return_t internal_coll_piped_exist(memcached_st *ptr, bool to_write= true; /* do not buffer requests internally. */ rc= memcached_vdo(instance, vector, 6, to_write); +#ifdef MEMCACHED_VDO_ERROR_HANDLING + if (memcached_failed(rc)) +#else if (rc != MEMCACHED_SUCCESS) +#endif { if (rc == MEMCACHED_WRITE_FAILURE) memcached_io_reset(instance); +#ifdef MEMCACHED_VDO_ERROR_HANDLING + return memcached_set_error(*instance, rc, MEMCACHED_AT); +#else return rc; +#endif } ptr->pipe_buffer_pos = 0; /* reset pipe_buffer_pos */ @@ -1387,6 +1478,38 @@ static memcached_return_t do_coll_insert(memcached_st *ptr, /* Send command header */ rc= memcached_vdo(instance, vector, 6, to_write); +#ifdef MEMCACHED_VDO_ERROR_HANDLING + if (memcached_failed(rc)) + { + if (rc == MEMCACHED_WRITE_FAILURE) + memcached_io_reset(instance); + return memcached_set_error(*instance, rc, MEMCACHED_AT); + } + + if (to_write == false) + return MEMCACHED_BUFFERED; + + if (ptr->flags.no_reply or ptr->flags.piped) + return MEMCACHED_SUCCESS; + + char result[MEMCACHED_DEFAULT_COMMAND_SIZE]; + rc= memcached_coll_response(instance, result, MEMCACHED_DEFAULT_COMMAND_SIZE, NULL); +#ifdef ENABLE_REPLICATION + if (rc == MEMCACHED_SWITCHOVER or rc == MEMCACHED_REPL_SLAVE) + { + ZOO_LOG_INFO(("Switchover: hostname=%s port=%d error=%s", + instance->hostname, instance->port, memcached_strerror(ptr, rc))); + if (memcached_rgroup_switchover(ptr, instance) == true) { + instance= memcached_server_instance_fetch(ptr, server_key); + goto do_action; + } + } +#endif + memcached_set_last_response_code(ptr, rc); + + if ((rc == MEMCACHED_STORED or rc == MEMCACHED_CREATED_STORED) or (rc == MEMCACHED_REPLACED && verb == BOP_UPSERT_OP)) + rc= MEMCACHED_SUCCESS; /* bop upsert returns REPLACED if the same bkey element is replaced. */ +#else if (rc == MEMCACHED_SUCCESS) { if (to_write == false) @@ -1428,6 +1551,7 @@ static memcached_return_t do_coll_insert(memcached_st *ptr, if (rc == MEMCACHED_WRITE_FAILURE) memcached_io_reset(instance); +#endif return rc; } @@ -1621,6 +1745,38 @@ static memcached_return_t do_coll_delete(memcached_st *ptr, #endif rc= memcached_vdo(instance, vector, veclen, to_write); +#ifdef MEMCACHED_VDO_ERROR_HANDLING + if (memcached_failed(rc)) + { + if (rc == MEMCACHED_WRITE_FAILURE) + memcached_io_reset(instance); + return memcached_set_error(*instance, rc, MEMCACHED_AT); + } + + if (to_write == false) + return MEMCACHED_BUFFERED; + + if (ptr->flags.no_reply) + return MEMCACHED_SUCCESS; + + char result[MEMCACHED_DEFAULT_COMMAND_SIZE]; + rc= memcached_coll_response(instance, result, MEMCACHED_DEFAULT_COMMAND_SIZE, NULL); +#ifdef ENABLE_REPLICATION + if (rc == MEMCACHED_SWITCHOVER or rc == MEMCACHED_REPL_SLAVE) + { + ZOO_LOG_INFO(("Switchover: hostname=%s port=%d error=%s", + instance->hostname, instance->port, memcached_strerror(ptr, rc))); + if (memcached_rgroup_switchover(ptr, instance) == true) { + instance= memcached_server_instance_fetch(ptr, server_key); + goto do_action; + } + } +#endif + memcached_set_last_response_code(ptr, rc); + + if (rc == MEMCACHED_DELETED or rc == MEMCACHED_DELETED_DROPPED) + rc= MEMCACHED_SUCCESS; +#else if (rc == MEMCACHED_SUCCESS) { if (to_write == false) @@ -1660,6 +1816,7 @@ static memcached_return_t do_coll_delete(memcached_st *ptr, { memcached_io_reset(instance); } +#endif return rc; } @@ -1917,6 +2074,54 @@ static memcached_return_t do_coll_get(memcached_st *ptr, #endif rc= memcached_vdo(instance, vector, veclen, to_write); +#ifdef MEMCACHED_VDO_ERROR_HANDLING + if (memcached_failed(rc)) + { + if (mkey_buffer) + { + libmemcached_free(ptr, mkey_buffer); + mkey_buffer= NULL; + } + if (rc == MEMCACHED_WRITE_FAILURE) + memcached_io_reset(instance); + return memcached_set_error(*instance, rc, MEMCACHED_AT); + } + + if (to_write == false) + return MEMCACHED_BUFFERED; + + if (ptr->flags.no_reply or ptr->flags.piped) + return MEMCACHED_SUCCESS; + + /* Fetch results */ + result = memcached_coll_fetch_result(ptr, result, &rc); +#ifdef ENABLE_REPLICATION + if (rc == MEMCACHED_SWITCHOVER or rc == MEMCACHED_REPL_SLAVE) + { + ZOO_LOG_INFO(("Switchover: hostname=%s port=%d error=%s", + instance->hostname, instance->port, memcached_strerror(ptr, rc))); + if (memcached_rgroup_switchover(ptr, instance) == true) { + instance= memcached_server_instance_fetch(ptr, server_key); + goto do_action; + } + } +#endif + /* Search for END or something */ + if (result) + { + memcached_coll_result_reset(&ptr->collection_result); + memcached_coll_fetch_result(ptr, &ptr->collection_result, &rc); + } + memcached_set_last_response_code(ptr, rc); + + if (rc == MEMCACHED_END or + rc == MEMCACHED_TRIMMED or + rc == MEMCACHED_DELETED or + rc == MEMCACHED_DELETED_DROPPED ) + { + rc= MEMCACHED_SUCCESS; + } +#else if (rc == MEMCACHED_SUCCESS) { if (to_write == false) @@ -1970,6 +2175,7 @@ static memcached_return_t do_coll_get(memcached_st *ptr, { memcached_io_reset(instance); } +#endif return rc; } @@ -2314,6 +2520,31 @@ static memcached_return_t do_bop_find_position(memcached_st *ptr, rc = memcached_vdo(instance, vector, 4, to_write); +#ifdef MEMCACHED_VDO_ERROR_HANDLING + if (memcached_failed(rc)) + { + if (rc == MEMCACHED_WRITE_FAILURE) + memcached_io_reset(instance); + return memcached_set_error(*instance, rc, MEMCACHED_AT); + } + + if (to_write == false) + return MEMCACHED_BUFFERED; + + if (ptr->flags.no_reply or ptr->flags.piped) + return MEMCACHED_SUCCESS; + + char response[MEMCACHED_DEFAULT_COMMAND_SIZE]; + rc= memcached_coll_response(instance, response, MEMCACHED_DEFAULT_COMMAND_SIZE, &ptr->collection_result); + + if (rc == MEMCACHED_POSITION) + { + *position= ptr->collection_result.btree_position; + /* reset btree_position because it is intended for use in bop pwg */ + ptr->collection_result.btree_position= 0; + rc= MEMCACHED_SUCCESS; + } +#else if (rc == MEMCACHED_SUCCESS) { if (to_write == false) @@ -2343,6 +2574,7 @@ static memcached_return_t do_bop_find_position(memcached_st *ptr, { memcached_io_reset(instance); } +#endif return rc; } @@ -2413,6 +2645,33 @@ static memcached_return_t do_bop_get_by_position(memcached_st *ptr, rc = memcached_vdo(instance, vector, 4, to_write); +#ifdef MEMCACHED_VDO_ERROR_HANDLING + if (memcached_failed(rc)) + { + if (rc == MEMCACHED_WRITE_FAILURE) + memcached_io_reset(instance); + return memcached_set_error(*instance, rc, MEMCACHED_AT); + } + + if (to_write == false) + return MEMCACHED_BUFFERED; + + if (ptr->flags.no_reply or ptr->flags.piped) + return MEMCACHED_SUCCESS; + + /* Fetch results */ + result = memcached_coll_fetch_result(ptr, result, &rc); + + /* Search for END or something */ + if (result) + { + memcached_coll_result_reset(&ptr->collection_result); + memcached_coll_fetch_result(ptr, &ptr->collection_result, &rc); + } + + if (rc == MEMCACHED_END) + rc= MEMCACHED_SUCCESS; +#else if (rc == MEMCACHED_SUCCESS) { if (to_write == false) @@ -2446,6 +2705,7 @@ static memcached_return_t do_bop_get_by_position(memcached_st *ptr, { memcached_io_reset(instance); } +#endif return rc; } @@ -2526,6 +2786,35 @@ static memcached_return_t do_bop_find_position_with_get(memcached_st *ptr, rc = memcached_vdo(instance, vector, 4, to_write); +#ifdef MEMCACHED_VDO_ERROR_HANDLING + if (memcached_failed(rc)) + { + if (rc == MEMCACHED_WRITE_FAILURE) + memcached_io_reset(instance); + return memcached_set_error(*instance, rc, MEMCACHED_AT); + } + + if (to_write == false) + return MEMCACHED_BUFFERED; + + if (ptr->flags.no_reply or ptr->flags.piped) + return MEMCACHED_SUCCESS; + + /* Fetch results */ + result = memcached_coll_fetch_result(ptr, result, &rc); + + /* Search for END or something */ + if (result) + { + memcached_coll_result_reset(&ptr->collection_result); + memcached_coll_fetch_result(ptr, &ptr->collection_result, &rc); + } + + if (rc == MEMCACHED_END) + { + rc= MEMCACHED_SUCCESS; + } +#else if (rc == MEMCACHED_SUCCESS) { if (to_write == false) @@ -2559,6 +2848,7 @@ static memcached_return_t do_bop_find_position_with_get(memcached_st *ptr, { memcached_io_reset(instance); } +#endif return rc; } @@ -2904,6 +3194,27 @@ static memcached_return_t do_coll_exist(memcached_st *ptr, rc= memcached_vdo(instance, vector, 5, to_write); +#ifdef MEMCACHED_VDO_ERROR_HANDLING + if (memcached_failed(rc)) + { + if (rc == MEMCACHED_WRITE_FAILURE) + memcached_io_reset(instance); + return memcached_set_error(*instance, rc, MEMCACHED_AT); + } + + if (to_write == false) + return MEMCACHED_BUFFERED; + + if (ptr->flags.no_reply) + return MEMCACHED_SUCCESS; + + char result[MEMCACHED_DEFAULT_COMMAND_SIZE]; + rc= memcached_coll_response(instance, result, MEMCACHED_DEFAULT_COMMAND_SIZE, NULL); + memcached_set_last_response_code(ptr, rc); + + if (rc == MEMCACHED_EXIST) + rc= MEMCACHED_SUCCESS; +#else if (rc == MEMCACHED_SUCCESS) { if (to_write == false) @@ -2931,6 +3242,7 @@ static memcached_return_t do_coll_exist(memcached_st *ptr, { memcached_io_reset(instance); } +#endif return rc; } @@ -3472,6 +3784,38 @@ static memcached_return_t do_coll_update(memcached_st *ptr, #endif rc= memcached_vdo(instance, vector, veclen, to_write); +#ifdef MEMCACHED_VDO_ERROR_HANDLING + if (memcached_failed(rc)) + { + if (rc == MEMCACHED_WRITE_FAILURE) + memcached_io_reset(instance); + return memcached_set_error(*instance, rc, MEMCACHED_AT); + } + + if (to_write == false) + return MEMCACHED_BUFFERED; + + if (ptr->flags.no_reply or ptr->flags.piped) + return MEMCACHED_SUCCESS; + + char result[MEMCACHED_DEFAULT_COMMAND_SIZE]; + rc= memcached_coll_response(instance, result, MEMCACHED_DEFAULT_COMMAND_SIZE, NULL); +#ifdef ENABLE_REPLICATION + if (rc == MEMCACHED_SWITCHOVER or rc == MEMCACHED_REPL_SLAVE) + { + ZOO_LOG_INFO(("Switchover: hostname=%s port=%d error=%s", + instance->hostname, instance->port, memcached_strerror(ptr, rc))); + if (memcached_rgroup_switchover(ptr, instance) == true) { + instance= memcached_server_instance_fetch(ptr, server_key); + goto do_action; + } + } +#endif + memcached_set_last_response_code(ptr, rc); + + if (rc == MEMCACHED_UPDATED) + rc= MEMCACHED_SUCCESS; +#else if (rc == MEMCACHED_SUCCESS) { if (to_write == false) @@ -3510,6 +3854,7 @@ static memcached_return_t do_coll_update(memcached_st *ptr, { memcached_io_reset(instance); } +#endif return rc; } @@ -3618,6 +3963,46 @@ static memcached_return_t do_coll_arithmetic(memcached_st *ptr, #endif rc= memcached_vdo(instance, vector, 4, to_write); +#ifdef MEMCACHED_VDO_ERROR_HANDLING + if (memcached_failed(rc)) + { + if (rc == MEMCACHED_WRITE_FAILURE) + memcached_io_reset(instance); + return memcached_set_error(*instance, rc, MEMCACHED_AT); + } + + if (to_write == false) + return MEMCACHED_BUFFERED; + + if (ptr->flags.no_reply or ptr->flags.piped) + return MEMCACHED_SUCCESS; + + char result[MEMCACHED_DEFAULT_COMMAND_SIZE]; + rc= memcached_coll_response(instance, result, MEMCACHED_DEFAULT_COMMAND_SIZE, NULL); +#ifdef ENABLE_REPLICATION + if (rc == MEMCACHED_SWITCHOVER or rc == MEMCACHED_REPL_SLAVE) + { + ZOO_LOG_INFO(("Switchover: hostname=%s port=%d error=%s", + instance->hostname, instance->port, memcached_strerror(ptr, rc))); + if (memcached_rgroup_switchover(ptr, instance) == true) { + instance= memcached_server_instance_fetch(ptr, server_key); + goto do_action; + } + } +#endif + if (rc == MEMCACHED_NOTFOUND + || rc == MEMCACHED_NOTFOUND_ELEMENT + || rc == MEMCACHED_CLIENT_ERROR + || rc == MEMCACHED_TYPE_MISMATCH + || rc == MEMCACHED_BKEY_MISMATCH + || rc == MEMCACHED_SERVER_ERROR) + { + *value= 0; + return rc; + } + + *value= strtoull(result, (char **)NULL, 10); +#else if (rc == MEMCACHED_SUCCESS) { if (to_write == false) @@ -3659,6 +4044,7 @@ static memcached_return_t do_coll_arithmetic(memcached_st *ptr, } } } +#endif return rc; } @@ -3766,6 +4152,31 @@ static memcached_return_t do_coll_count(memcached_st *ptr, rc= memcached_vdo(instance, vector, 4, to_write); +#ifdef MEMCACHED_VDO_ERROR_HANDLING + if (memcached_failed(rc)) + { + if (rc == MEMCACHED_WRITE_FAILURE) + memcached_io_reset(instance); + return memcached_set_error(*instance, rc, MEMCACHED_AT); + } + + if (to_write == false) + return MEMCACHED_BUFFERED; + + if (ptr->flags.no_reply or ptr->flags.piped) + return MEMCACHED_SUCCESS; + + char response[MEMCACHED_DEFAULT_COMMAND_SIZE]; + rc= memcached_coll_response(instance, response, MEMCACHED_DEFAULT_COMMAND_SIZE, &ptr->collection_result); + + if (rc == MEMCACHED_COUNT) + { + *count= ptr->collection_result.collection_count; + /* reset collection because it is used in memcached_coll_result_reset(collection_result.cc)*/ + ptr->collection_result.collection_count= 0; + rc= MEMCACHED_SUCCESS; + } +#else if (rc == MEMCACHED_SUCCESS) { if (to_write == false) @@ -3795,6 +4206,7 @@ static memcached_return_t do_coll_count(memcached_st *ptr, { memcached_io_reset(instance); } +#endif return rc; } diff --git a/libmemcached/constants.h b/libmemcached/constants.h index 72e58223..6909c4c9 100644 --- a/libmemcached/constants.h +++ b/libmemcached/constants.h @@ -66,6 +66,8 @@ #define POOL_UPDATE_SERVERLIST 1 #define POOL_MORE_CONCURRENCY 1 +#define MEMCACHED_VDO_ERROR_HANDLING 1 + /* Public defines */ #define MEMCACHED_DEFAULT_PORT 11211 #define MEMCACHED_MAX_KEY 4001 /* (4000 + 1) We add one to have it null terminated */ diff --git a/libmemcached/delete.cc b/libmemcached/delete.cc index 8f1de223..7c3ac7f6 100644 --- a/libmemcached/delete.cc +++ b/libmemcached/delete.cc @@ -101,6 +101,37 @@ static inline memcached_return_t ascii_delete(memcached_st *ptr, /* Send command header */ memcached_return_t rc= memcached_vdo(instance, vector, 5, to_write); +#ifdef MEMCACHED_VDO_ERROR_HANDLING + if (memcached_failed(rc)) + { + if (rc == MEMCACHED_WRITE_FAILURE) + memcached_io_reset(instance); + return memcached_set_error(*instance, rc, MEMCACHED_AT); + } + + if (to_write == false) + return MEMCACHED_BUFFERED; + + if (no_reply) + return MEMCACHED_SUCCESS; + + char result[MEMCACHED_DEFAULT_COMMAND_SIZE]; + rc= memcached_response(instance, result, MEMCACHED_DEFAULT_COMMAND_SIZE, NULL); + + if (rc == MEMCACHED_DELETED) + return MEMCACHED_SUCCESS; +#ifdef ENABLE_REPLICATION + if (rc == MEMCACHED_SWITCHOVER or rc == MEMCACHED_REPL_SLAVE) + { + ZOO_LOG_INFO(("Switchover: hostname=%s port=%d error=%s", + instance->hostname, instance->port, memcached_strerror(ptr, rc))); + if (memcached_rgroup_switchover(ptr, instance) == true) { + instance= memcached_server_instance_fetch(ptr, server_key); + goto do_action; + } + } +#endif +#else if (rc == MEMCACHED_SUCCESS) { if (to_write == false) @@ -134,6 +165,7 @@ static inline memcached_return_t ascii_delete(memcached_st *ptr, memcached_io_reset(instance); return rc; } +#endif return rc; } @@ -184,12 +216,22 @@ static inline memcached_return_t binary_delete(memcached_st *ptr, memcached_io_write(instance, NULL, 0, true); } +#ifdef MEMCACHED_VDO_ERROR_HANDLING + memcached_return_t rc= memcached_vdo(instance, vector, 3, to_write); + if (memcached_failed(rc)) + { + if (rc == MEMCACHED_WRITE_FAILURE) + memcached_io_reset(instance); + return memcached_set_error(*instance, rc, MEMCACHED_AT); + } +#else memcached_return_t rc= MEMCACHED_SUCCESS; if ((rc= memcached_vdo(instance, vector, 3, to_write)) != MEMCACHED_SUCCESS) { memcached_io_reset(instance); return rc; } +#endif unlikely (ptr->number_of_replicas > 0) { @@ -205,10 +247,19 @@ static inline memcached_return_t binary_delete(memcached_st *ptr, replica= memcached_server_instance_fetch(ptr, server_key); +#ifdef MEMCACHED_VDO_ERROR_HANDLING + rc= memcached_vdo(replica, vector, 3, to_write); + if (memcached_failed(rc)) + { + if (rc == MEMCACHED_WRITE_FAILURE) + memcached_io_reset(replica); + } +#else if (memcached_vdo(replica, vector, 3, to_write) != MEMCACHED_SUCCESS) { memcached_io_reset(replica); } +#endif else { memcached_server_response_decrement(replica); @@ -216,6 +267,29 @@ static inline memcached_return_t binary_delete(memcached_st *ptr, } } +#ifdef MEMCACHED_VDO_ERROR_HANDLING + if (to_write == false) + return MEMCACHED_BUFFERED; + + if (no_reply) + return MEMCACHED_SUCCESS + + char result[MEMCACHED_DEFAULT_COMMAND_SIZE]; + rc= memcached_response(instance, result, MEMCACHED_DEFAULT_COMMAND_SIZE, NULL); + if (rc == MEMCACHED_DELETED) + return MEMCACHED_SUCCESS; +#ifdef ENABLE_REPLICATION + else if (rc == MEMCACHED_SWITCHOVER or rc == MEMCACHED_REPL_SLAVE) + { + ZOO_LOG_INFO(("Switchover: hostname=%s port=%d error=%s", + instance->hostname, instance->port, memcached_strerror(ptr, rc))); + if (memcached_rgroup_switchover(ptr, instance) == true) { + instance= memcached_server_instance_fetch(ptr, server_key); + goto do_action; + } + } +#endif +#else if (to_write == false) { rc= MEMCACHED_BUFFERED; @@ -240,6 +314,7 @@ static inline memcached_return_t binary_delete(memcached_st *ptr, } #endif } +#endif return rc; } diff --git a/libmemcached/exist.cc b/libmemcached/exist.cc index 5cdeb6ed..369d9257 100644 --- a/libmemcached/exist.cc +++ b/libmemcached/exist.cc @@ -75,6 +75,23 @@ static memcached_return_t ascii_exist(memcached_st *memc, /* Send command header */ memcached_return_t rc= memcached_vdo(instance, vector, 8, true); +#ifdef MEMCACHED_VDO_ERROR_HANDLING + if (memcached_failed(rc)) + { + if (rc == MEMCACHED_WRITE_FAILURE) + memcached_io_reset(instance); + return memcached_set_error(*instance, rc, MEMCACHED_AT); + } + + char buffer[MEMCACHED_DEFAULT_COMMAND_SIZE]; + rc= memcached_response(instance, buffer, MEMCACHED_DEFAULT_COMMAND_SIZE, NULL); + + if (rc == MEMCACHED_NOTSTORED) + return MEMCACHED_SUCCESS; + + if (rc == MEMCACHED_STORED) + rc= MEMCACHED_NOTFOUND; +#else if (rc == MEMCACHED_SUCCESS) { char buffer[MEMCACHED_DEFAULT_COMMAND_SIZE]; @@ -89,6 +106,7 @@ static memcached_return_t ascii_exist(memcached_st *memc, if (rc == MEMCACHED_WRITE_FAILURE) memcached_io_reset(instance); +#endif return rc; } @@ -125,17 +143,32 @@ static memcached_return_t binary_exist(memcached_st *memc, memcached_server_write_instance_st instance= memcached_server_instance_fetch(memc, server_key); /* write the header */ +#ifdef MEMCACHED_VDO_ERROR_HANDLING + memcached_return_t rc= memcached_vdo(instance, vector, 3, true); + if (memcached_failed(rc)) + { + if (rc == MEMCACHED_WRITE_FAILURE) + memcached_io_reset(instance); + return memcached_set_error(*instance, rc, MEMCACHED_AT); + } +#else memcached_return_t rc; if ((rc= memcached_vdo(instance, vector, 3, true)) != MEMCACHED_SUCCESS) { memcached_io_reset(instance); return (rc == MEMCACHED_SUCCESS) ? MEMCACHED_WRITE_FAILURE : rc; } +#endif rc= memcached_response(instance, NULL, 0, NULL); + if (rc == MEMCACHED_SUCCESS) +#ifdef MEMCACHED_VDO_ERROR_HANDLING + return MEMCACHED_NOTFOUND; +#else rc= MEMCACHED_NOTFOUND; +#endif if (rc == MEMCACHED_DATA_EXISTS) rc= MEMCACHED_SUCCESS; diff --git a/libmemcached/get.cc b/libmemcached/get.cc index aa0a263d..a6c3f15d 100644 --- a/libmemcached/get.cc +++ b/libmemcached/get.cc @@ -201,8 +201,14 @@ static memcached_return_t ascii_get_by_key(memcached_st *ptr, rc= memcached_vdo(instance, vector, 4, true); if (memcached_failed(rc)) { +#ifdef MEMCACHED_VDO_ERROR_HANDLING + if (rc == MEMCACHED_WRITE_FAILURE) + memcached_io_reset(instance); + memcached_set_error(*instance, rc, MEMCACHED_AT); +#else memcached_io_reset(instance); memcached_set_error(*ptr, rc, MEMCACHED_AT); +#endif } return rc; } diff --git a/libmemcached/stats.cc b/libmemcached/stats.cc index fbc3b95f..40d94203 100644 --- a/libmemcached/stats.cc +++ b/libmemcached/stats.cc @@ -352,11 +352,20 @@ static memcached_return_t binary_stats_fetch(memcached_stat_st *memc_stat, { len, args } }; +#ifdef MEMCACHED_VDO_ERROR_HANDLING + if (memcached_failed(rc)) + { + if (rc == MEMCACHED_WRITE_FAILURE) + memcached_io_reset(instance); + return memcached_set_error(*instance, rc, MEMCACHED_AT); + } +#else if (memcached_vdo(instance, vector, 2, true) != MEMCACHED_SUCCESS) { memcached_io_reset(instance); return MEMCACHED_WRITE_FAILURE; } +#endif } else { diff --git a/libmemcached/storage.cc b/libmemcached/storage.cc index aae5fd39..5ee112ce 100644 --- a/libmemcached/storage.cc +++ b/libmemcached/storage.cc @@ -196,12 +196,22 @@ static memcached_return_t memcached_send_binary(memcached_st *ptr, } /* write the header */ +#ifdef MEMCACHED_VDO_ERROR_HANDLING + memcached_return_t rc= memcached_vdo(server, vector, 4, flush); + if (memcached_failed(rc)) + { + if (rc == MEMCACHED_WRITE_FAILURE) + memcached_io_reset(server); + return memcached_set_error(*server, rc, MEMCACHED_AT); + } +#else memcached_return_t rc; if ((rc= memcached_vdo(server, vector, 4, flush)) != MEMCACHED_SUCCESS) { memcached_io_reset(server); return (rc == MEMCACHED_SUCCESS) ? MEMCACHED_WRITE_FAILURE : rc; } +#endif if (verb == SET_OP && ptr->number_of_replicas > 0) { @@ -218,10 +228,19 @@ static memcached_return_t memcached_send_binary(memcached_st *ptr, instance= memcached_server_instance_fetch(ptr, server_key); +#ifdef MEMCACHED_VDO_ERROR_HANDLING + rc= memcached_vdo(instance, vector, 4, false); + if (memcached_failed(rc)) + { + if (rc == MEMCACHED_WRITE_FAILURE) + memcached_io_reset(instance); + } +#else if (memcached_vdo(instance, vector, 4, false) != MEMCACHED_SUCCESS) { memcached_io_reset(instance); } +#endif else { memcached_server_response_decrement(instance); @@ -354,6 +373,40 @@ static memcached_return_t memcached_send_ascii(memcached_st *ptr, /* Send command header */ memcached_return_t rc= memcached_vdo(instance, vector, 11, to_write); +#ifdef MEMCACHED_VDO_ERROR_HANDLING + if (memcached_failed(rc)) + { + if (rc == MEMCACHED_WRITE_FAILURE) + memcached_io_reset(instance); + return memcached_set_error(*instance, rc, MEMCACHED_AT); + } + + if (to_write == false) + return MEMCACHED_BUFFERED; + + if (ptr->flags.no_reply) + return MEMCACHED_SUCCESS; + + char result[MEMCACHED_DEFAULT_COMMAND_SIZE]; + rc= memcached_response(instance, result, MEMCACHED_DEFAULT_COMMAND_SIZE, NULL); + + if (rc == MEMCACHED_STORED) + { + rc= MEMCACHED_SUCCESS; + return rc; + } +#ifdef ENABLE_REPLICATION + if (rc == MEMCACHED_SWITCHOVER or rc == MEMCACHED_REPL_SLAVE) + { + ZOO_LOG_INFO(("Switchover: hostname=%s port=%d error=%s", + instance->hostname, instance->port, memcached_strerror(ptr, rc))); + if (memcached_rgroup_switchover(ptr, instance) == true) { + instance= memcached_server_instance_fetch(ptr, server_key); + goto do_action; + } + } +#endif +#else if (rc == MEMCACHED_SUCCESS) { if (ptr->flags.no_reply) @@ -389,6 +442,7 @@ static memcached_return_t memcached_send_ascii(memcached_st *ptr, if (rc == MEMCACHED_WRITE_FAILURE) memcached_io_reset(instance); +#endif return rc; } diff --git a/libmemcached/version.cc b/libmemcached/version.cc index d632be41..9efad754 100644 --- a/libmemcached/version.cc +++ b/libmemcached/version.cc @@ -174,7 +174,11 @@ static inline memcached_return_t version_ascii_instance(memcached_server_st *ins uint32_t before_active= instance->cursor_active; rc= memcached_vdo(instance, vector, 1, true); +#ifdef MEMCACHED_VDO_ERROR_HANDLING + if (memcached_failed(rc)) +#else if (rc == MEMCACHED_WRITE_FAILURE) +#endif { (void)memcached_set_error(*instance, rc, MEMCACHED_AT); memcached_io_reset(instance); @@ -212,7 +216,11 @@ static inline memcached_return_t version_binary_instance(memcached_server_st *in uint32_t before_active= instance->cursor_active; rc= memcached_vdo(instance, vector, 1, true); +#ifdef MEMCACHED_VDO_ERROR_HANDLING + if (memcached_failed(rc)) +#else if (rc == MEMCACHED_WRITE_FAILURE) +#endif { (void)memcached_set_error(*instance, rc, MEMCACHED_AT); memcached_io_reset(instance);