From d822906cab03f39141ea24d2398732c991fed604 Mon Sep 17 00:00:00 2001 From: uhm0311 Date: Tue, 19 Apr 2022 16:32:58 +0900 Subject: [PATCH] INTERNAL: Move io reset from where calls memcached_vdo() to where fails inside of memcached_vdo() #167 --- libmemcached/auto.cc | 11 ++ libmemcached/collection.cc | 364 +++++++++++++++++++++++++++++++++++++ libmemcached/constants.h | 1 + libmemcached/delete.cc | 68 +++++++ libmemcached/do.cc | 6 + libmemcached/exist.cc | 28 +++ libmemcached/flush.cc | 8 + libmemcached/get.cc | 4 + libmemcached/stats.cc | 14 ++ libmemcached/storage.cc | 37 ++++ libmemcached/version.cc | 28 +++ 11 files changed, 569 insertions(+) diff --git a/libmemcached/auto.cc b/libmemcached/auto.cc index e8ceca19..312dbddc 100644 --- a/libmemcached/auto.cc +++ b/libmemcached/auto.cc @@ -113,7 +113,11 @@ static memcached_return_t text_incr_decr(memcached_st *ptr, #endif /* Send command header */ memcached_return_t rc= memcached_vdo(instance, vector, 7, true); +#ifdef MEMCACHED_VDO_ERROR_HANDLING + if (ptr->flags.no_reply or rc != MEMCACHED_SUCCESS) +#else if (ptr->flags.no_reply or memcached_failed(rc)) +#endif { return rc; } @@ -214,6 +218,12 @@ static memcached_return_t binary_incr_decr(memcached_st *ptr, uint8_t cmd, #ifdef ENABLE_REPLICATION do_action: #endif +#ifdef MEMCACHED_VDO_ERROR_HANDLING + memcached_return_t rc= memcached_vdo(instance, vector, 3, true); + if (no_reply or rc != MEMCACHED_SUCCESS) { + return rc; + } +#else memcached_return_t rc; if (memcached_failed(rc= memcached_vdo(instance, vector, 3, true))) { @@ -223,6 +233,7 @@ static memcached_return_t binary_incr_decr(memcached_st *ptr, uint8_t cmd, if (no_reply) return MEMCACHED_SUCCESS; +#endif rc= memcached_response(instance, (char*)value, sizeof(*value), NULL); #ifdef ENABLE_REPLICATION diff --git a/libmemcached/collection.cc b/libmemcached/collection.cc index e9f05024..b1aab2a6 100644 --- a/libmemcached/collection.cc +++ b/libmemcached/collection.cc @@ -733,7 +733,32 @@ memcached_return_t memcached_set_attrs(memcached_st *ptr, rc= memcached_vdo(instance, vector, 4, to_write); WATCHPOINT_IFERROR(rc); +#ifdef MEMCACHED_VDO_ERROR_HANDLING + if (rc != MEMCACHED_SUCCESS) { + return rc; + } + if (to_write == false) { + return MEMCACHED_BUFFERED; + } + if (ptr->flags.no_reply) { + return MEMCACHED_SUCCESS; + } + // expecting OK (MEMCACHED_SUCCESS) + char result[MEMCACHED_DEFAULT_COMMAND_SIZE]; + rc= memcached_coll_response(instance, result, MEMCACHED_DEFAULT_COMMAND_SIZE, NULL); +#ifdef ENABLE_REPLICATION + if (rc == MEMCACHED_SWITCHOVER or rc == MEMCACHED_REPL_SLAVE) + { + ZOO_LOG_INFO(("Switchover: hostname=%s port=%d error=%s", + instance->hostname, instance->port, memcached_strerror(ptr, rc))); + if (memcached_rgroup_switchover(ptr, instance) == true) { + instance= memcached_server_instance_fetch(ptr, server_key); + goto do_action; + } + } +#endif +#else if (rc == MEMCACHED_SUCCESS) { if (to_write == false) @@ -765,6 +790,7 @@ memcached_return_t memcached_set_attrs(memcached_st *ptr, if (rc == MEMCACHED_WRITE_FAILURE) memcached_io_reset(instance); +#endif return rc; } @@ -1008,7 +1034,36 @@ static memcached_return_t do_coll_create(memcached_st *ptr, do_action: #endif rc= memcached_vdo(instance, vector, 4, to_write); +#ifdef MEMCACHED_VDO_ERROR_HANDLING + if (rc != MEMCACHED_SUCCESS) { + return rc; + } + if (to_write == false) { + return MEMCACHED_BUFFERED; + } + if (ptr->flags.no_reply || ptr->flags.piped) { + return MEMCACHED_SUCCESS; + } + + char result[MEMCACHED_DEFAULT_COMMAND_SIZE]; + rc= memcached_coll_response(instance, result, MEMCACHED_DEFAULT_COMMAND_SIZE, NULL); +#ifdef ENABLE_REPLICATION + if (rc == MEMCACHED_SWITCHOVER or rc == MEMCACHED_REPL_SLAVE) + { + ZOO_LOG_INFO(("Switchover: hostname=%s port=%d error=%s", + instance->hostname, instance->port, memcached_strerror(ptr, rc))); + if (memcached_rgroup_switchover(ptr, instance) == true) { + instance= memcached_server_instance_fetch(ptr, server_key); + goto do_action; + } + } +#endif + memcached_set_last_response_code(ptr, rc); + if (rc == MEMCACHED_CREATED) { + rc= MEMCACHED_SUCCESS; + } +#else if (rc == MEMCACHED_SUCCESS) { if (to_write == false) @@ -1047,6 +1102,7 @@ static memcached_return_t do_coll_create(memcached_st *ptr, { memcached_io_reset(instance); } +#endif return rc; } @@ -1184,10 +1240,14 @@ static memcached_return_t internal_coll_piped_insert(memcached_st *ptr, bool to_write= true; /* do not buffer requests internally. */ rc= memcached_vdo(instance, vector, 7, to_write); +#ifdef MEMCACHED_VDO_ERROR_HANDLING + if (rc != MEMCACHED_SUCCESS) { +#else if (rc != MEMCACHED_SUCCESS) { if (rc == MEMCACHED_WRITE_FAILURE) memcached_io_reset(instance); +#endif return rc; } @@ -1259,10 +1319,14 @@ static memcached_return_t internal_coll_piped_exist(memcached_st *ptr, bool to_write= true; /* do not buffer requests internally. */ rc= memcached_vdo(instance, vector, 6, to_write); +#ifdef MEMCACHED_VDO_ERROR_HANDLING + if (rc != MEMCACHED_SUCCESS) { +#else if (rc != MEMCACHED_SUCCESS) { if (rc == MEMCACHED_WRITE_FAILURE) memcached_io_reset(instance); +#endif return rc; } @@ -1386,7 +1450,42 @@ static memcached_return_t do_coll_insert(memcached_st *ptr, #endif /* Send command header */ rc= memcached_vdo(instance, vector, 6, to_write); +#ifdef MEMCACHED_VDO_ERROR_HANDLING + if (rc != MEMCACHED_SUCCESS) { + return rc; + } + if (to_write == false) { + return MEMCACHED_BUFFERED; + } + if (ptr->flags.no_reply or ptr->flags.piped) { + return MEMCACHED_SUCCESS; + } + + char result[MEMCACHED_DEFAULT_COMMAND_SIZE]; + rc= memcached_coll_response(instance, result, MEMCACHED_DEFAULT_COMMAND_SIZE, NULL); +#ifdef ENABLE_REPLICATION + if (rc == MEMCACHED_SWITCHOVER or rc == MEMCACHED_REPL_SLAVE) + { + ZOO_LOG_INFO(("Switchover: hostname=%s port=%d error=%s", + instance->hostname, instance->port, memcached_strerror(ptr, rc))); + if (memcached_rgroup_switchover(ptr, instance) == true) { + instance= memcached_server_instance_fetch(ptr, server_key); + goto do_action; + } + } +#endif + memcached_set_last_response_code(ptr, rc); + if (rc == MEMCACHED_STORED || rc == MEMCACHED_CREATED_STORED) + { + rc= MEMCACHED_SUCCESS; + } + else if (rc == MEMCACHED_REPLACED && verb == BOP_UPSERT_OP) + { + /* bop upsert returns REPLACED if the same bkey element is replaced. */ + rc= MEMCACHED_SUCCESS; + } +#else if (rc == MEMCACHED_SUCCESS) { if (to_write == false) @@ -1428,6 +1527,7 @@ static memcached_return_t do_coll_insert(memcached_st *ptr, if (rc == MEMCACHED_WRITE_FAILURE) memcached_io_reset(instance); +#endif return rc; } @@ -1620,7 +1720,38 @@ static memcached_return_t do_coll_delete(memcached_st *ptr, do_action: #endif rc= memcached_vdo(instance, vector, veclen, to_write); +#ifdef MEMCACHED_VDO_ERROR_HANDLING + if (rc != MEMCACHED_SUCCESS) { + return rc; + } + if (to_write == false) { + return MEMCACHED_BUFFERED; + } + if (ptr->flags.no_reply) { + return MEMCACHED_SUCCESS; + } + + char result[MEMCACHED_DEFAULT_COMMAND_SIZE]; + rc= memcached_coll_response(instance, result, MEMCACHED_DEFAULT_COMMAND_SIZE, NULL); +#ifdef ENABLE_REPLICATION + if (rc == MEMCACHED_SWITCHOVER or rc == MEMCACHED_REPL_SLAVE) + { + ZOO_LOG_INFO(("Switchover: hostname=%s port=%d error=%s", + instance->hostname, instance->port, memcached_strerror(ptr, rc))); + if (memcached_rgroup_switchover(ptr, instance) == true) { + instance= memcached_server_instance_fetch(ptr, server_key); + goto do_action; + } + } +#endif + memcached_set_last_response_code(ptr, rc); + if (rc == MEMCACHED_DELETED or + rc == MEMCACHED_DELETED_DROPPED) + { + rc= MEMCACHED_SUCCESS; + } +#else if (rc == MEMCACHED_SUCCESS) { if (to_write == false) @@ -1660,6 +1791,7 @@ static memcached_return_t do_coll_delete(memcached_st *ptr, { memcached_io_reset(instance); } +#endif return rc; } @@ -1916,7 +2048,48 @@ static memcached_return_t do_coll_get(memcached_st *ptr, do_action: #endif rc= memcached_vdo(instance, vector, veclen, to_write); +#ifdef MEMCACHED_VDO_ERROR_HANDLING + if (rc != MEMCACHED_SUCCESS) { + goto RETURN; + } + if (to_write == false) { + rc= MEMCACHED_BUFFERED; + goto RETURN; + } + if (ptr->flags.no_reply || ptr->flags.piped) { + rc= MEMCACHED_SUCCESS; + goto RETURN; + } + + /* Fetch results */ + result= memcached_coll_fetch_result(ptr, result, &rc); +#ifdef ENABLE_REPLICATION + if (rc == MEMCACHED_SWITCHOVER or rc == MEMCACHED_REPL_SLAVE) + { + ZOO_LOG_INFO(("Switchover: hostname=%s port=%d error=%s", + instance->hostname, instance->port, memcached_strerror(ptr, rc))); + if (memcached_rgroup_switchover(ptr, instance) == true) { + instance= memcached_server_instance_fetch(ptr, server_key); + goto do_action; + } + } +#endif + /* Search for END or something */ + if (result) + { + memcached_coll_result_reset(&ptr->collection_result); + memcached_coll_fetch_result(ptr, &ptr->collection_result, &rc); + } + memcached_set_last_response_code(ptr, rc); + if (rc == MEMCACHED_END or + rc == MEMCACHED_TRIMMED or + rc == MEMCACHED_DELETED or + rc == MEMCACHED_DELETED_DROPPED ) + { + rc= MEMCACHED_SUCCESS; + } +#else if (rc == MEMCACHED_SUCCESS) { if (to_write == false) @@ -1959,17 +2132,24 @@ static memcached_return_t do_coll_get(memcached_st *ptr, } } } +#endif +#ifdef MEMCACHED_VDO_ERROR_HANDLING +RETURN: +#endif if (mkey_buffer) { libmemcached_free(ptr, mkey_buffer); mkey_buffer= NULL; } +#ifdef MEMCACHED_VDO_ERROR_HANDLING +#else if (rc == MEMCACHED_WRITE_FAILURE) { memcached_io_reset(instance); } +#endif return rc; } @@ -2313,7 +2493,28 @@ static memcached_return_t do_bop_find_position(memcached_st *ptr, memcached_server_write_instance_st instance= memcached_server_instance_fetch(ptr, server_key); rc = memcached_vdo(instance, vector, 4, to_write); +#ifdef MEMCACHED_VDO_ERROR_HANDLING + if (rc != MEMCACHED_SUCCESS) { + return rc; + } + if (to_write == false) { + return MEMCACHED_BUFFERED; + } + if (ptr->flags.no_reply || ptr->flags.piped) { + return MEMCACHED_SUCCESS; + } + char response[MEMCACHED_DEFAULT_COMMAND_SIZE]; + rc= memcached_coll_response(instance, response, MEMCACHED_DEFAULT_COMMAND_SIZE, &ptr->collection_result); + + if (rc == MEMCACHED_POSITION) + { + *position= ptr->collection_result.btree_position; + /* reset btree_position because it is intended for use in bop pwg */ + ptr->collection_result.btree_position= 0; + rc= MEMCACHED_SUCCESS; + } +#else if (rc == MEMCACHED_SUCCESS) { if (to_write == false) @@ -2343,6 +2544,7 @@ static memcached_return_t do_bop_find_position(memcached_st *ptr, { memcached_io_reset(instance); } +#endif return rc; } @@ -2412,7 +2614,32 @@ static memcached_return_t do_bop_get_by_position(memcached_st *ptr, memcached_server_write_instance_st instance= memcached_server_instance_fetch(ptr, server_key); rc = memcached_vdo(instance, vector, 4, to_write); +#ifdef MEMCACHED_VDO_ERROR_HANDLING + if (rc != MEMCACHED_SUCCESS) { + return rc; + } + if (to_write == false) { + return MEMCACHED_BUFFERED; + } + if (ptr->flags.no_reply || ptr->flags.piped) { + return MEMCACHED_SUCCESS; + } + + /* Fetch results */ + result= memcached_coll_fetch_result(ptr, result, &rc); + + /* Search for END or something */ + if (result) + { + memcached_coll_result_reset(&ptr->collection_result); + memcached_coll_fetch_result(ptr, &ptr->collection_result, &rc); + } + if (rc == MEMCACHED_END) + { + rc= MEMCACHED_SUCCESS; + } +#else if (rc == MEMCACHED_SUCCESS) { if (to_write == false) @@ -2446,6 +2673,7 @@ static memcached_return_t do_bop_get_by_position(memcached_st *ptr, { memcached_io_reset(instance); } +#endif return rc; } @@ -2525,7 +2753,32 @@ static memcached_return_t do_bop_find_position_with_get(memcached_st *ptr, memcached_server_write_instance_st instance= memcached_server_instance_fetch(ptr, server_key); rc = memcached_vdo(instance, vector, 4, to_write); +#ifdef MEMCACHED_VDO_ERROR_HANDLING + if (rc != MEMCACHED_SUCCESS) { + return rc; + } + if (to_write == false) { + return MEMCACHED_BUFFERED; + } + if (ptr->flags.no_reply || ptr->flags.piped) { + return MEMCACHED_SUCCESS; + } + /* Fetch results */ + result= memcached_coll_fetch_result(ptr, result, &rc); + + /* Search for END or something */ + if (result) + { + memcached_coll_result_reset(&ptr->collection_result); + memcached_coll_fetch_result(ptr, &ptr->collection_result, &rc); + } + + if (rc == MEMCACHED_END) + { + rc= MEMCACHED_SUCCESS; + } +#else if (rc == MEMCACHED_SUCCESS) { if (to_write == false) @@ -2559,6 +2812,7 @@ static memcached_return_t do_bop_find_position_with_get(memcached_st *ptr, { memcached_io_reset(instance); } +#endif return rc; } @@ -2903,7 +3157,26 @@ static memcached_return_t do_coll_exist(memcached_st *ptr, memcached_server_write_instance_st instance= memcached_server_instance_fetch(ptr, server_key); rc= memcached_vdo(instance, vector, 5, to_write); +#ifdef MEMCACHED_VDO_ERROR_HANDLING + if (rc != MEMCACHED_SUCCESS) { + return rc; + } + if (to_write == false) { + return MEMCACHED_BUFFERED; + } + if (ptr->flags.no_reply) { + return MEMCACHED_SUCCESS; + } + char result[MEMCACHED_DEFAULT_COMMAND_SIZE]; + rc= memcached_coll_response(instance, result, MEMCACHED_DEFAULT_COMMAND_SIZE, NULL); + memcached_set_last_response_code(ptr, rc); + + if (rc == MEMCACHED_EXIST) + { + rc= MEMCACHED_SUCCESS; + } +#else if (rc == MEMCACHED_SUCCESS) { if (to_write == false) @@ -2931,6 +3204,7 @@ static memcached_return_t do_coll_exist(memcached_st *ptr, { memcached_io_reset(instance); } +#endif return rc; } @@ -3471,7 +3745,37 @@ static memcached_return_t do_coll_update(memcached_st *ptr, do_action: #endif rc= memcached_vdo(instance, vector, veclen, to_write); +#ifdef MEMCACHED_VDO_ERROR_HANDLING + if (rc != MEMCACHED_SUCCESS) { + return rc; + } + if (to_write == false) { + return MEMCACHED_BUFFERED; + } + if (ptr->flags.no_reply || ptr->flags.piped) { + return MEMCACHED_SUCCESS; + } + + char result[MEMCACHED_DEFAULT_COMMAND_SIZE]; + rc= memcached_coll_response(instance, result, MEMCACHED_DEFAULT_COMMAND_SIZE, NULL); +#ifdef ENABLE_REPLICATION + if (rc == MEMCACHED_SWITCHOVER or rc == MEMCACHED_REPL_SLAVE) + { + ZOO_LOG_INFO(("Switchover: hostname=%s port=%d error=%s", + instance->hostname, instance->port, memcached_strerror(ptr, rc))); + if (memcached_rgroup_switchover(ptr, instance) == true) { + instance= memcached_server_instance_fetch(ptr, server_key); + goto do_action; + } + } +#endif + memcached_set_last_response_code(ptr, rc); + if (rc == MEMCACHED_UPDATED) + { + rc= MEMCACHED_SUCCESS; + } +#else if (rc == MEMCACHED_SUCCESS) { if (to_write == false) @@ -3510,6 +3814,7 @@ static memcached_return_t do_coll_update(memcached_st *ptr, { memcached_io_reset(instance); } +#endif return rc; } @@ -3617,7 +3922,43 @@ static memcached_return_t do_coll_arithmetic(memcached_st *ptr, do_action: #endif rc= memcached_vdo(instance, vector, 4, to_write); +#ifdef MEMCACHED_VDO_ERROR_HANDLING + if (rc != MEMCACHED_SUCCESS) { + return rc; + } + if (to_write == false) { + return MEMCACHED_BUFFERED; + } + if (ptr->flags.no_reply || ptr->flags.piped) { + return MEMCACHED_SUCCESS; + } + + char result[MEMCACHED_DEFAULT_COMMAND_SIZE]; + rc= memcached_coll_response(instance, result, MEMCACHED_DEFAULT_COMMAND_SIZE, NULL); +#ifdef ENABLE_REPLICATION + if (rc == MEMCACHED_SWITCHOVER or rc == MEMCACHED_REPL_SLAVE) + { + ZOO_LOG_INFO(("Switchover: hostname=%s port=%d error=%s", + instance->hostname, instance->port, memcached_strerror(ptr, rc))); + if (memcached_rgroup_switchover(ptr, instance) == true) { + instance= memcached_server_instance_fetch(ptr, server_key); + goto do_action; + } + } +#endif + if (rc == MEMCACHED_NOTFOUND + || rc == MEMCACHED_NOTFOUND_ELEMENT + || rc == MEMCACHED_CLIENT_ERROR + || rc == MEMCACHED_TYPE_MISMATCH + || rc == MEMCACHED_BKEY_MISMATCH + || rc == MEMCACHED_SERVER_ERROR) + { + *value= 0; + return rc; + } + *value= strtoull(result, (char **)NULL, 10); +#else if (rc == MEMCACHED_SUCCESS) { if (to_write == false) @@ -3659,6 +4000,7 @@ static memcached_return_t do_coll_arithmetic(memcached_st *ptr, } } } +#endif return rc; } @@ -3765,7 +4107,28 @@ static memcached_return_t do_coll_count(memcached_st *ptr, memcached_server_write_instance_st instance= memcached_server_instance_fetch(ptr, server_key); rc= memcached_vdo(instance, vector, 4, to_write); +#ifdef MEMCACHED_VDO_ERROR_HANDLING + if (rc != MEMCACHED_SUCCESS) { + return rc; + } + if (to_write == false) { + return MEMCACHED_BUFFERED; + } + if (ptr->flags.no_reply || ptr->flags.piped) { + return MEMCACHED_SUCCESS; + } + char response[MEMCACHED_DEFAULT_COMMAND_SIZE]; + rc= memcached_coll_response(instance, response, MEMCACHED_DEFAULT_COMMAND_SIZE, &ptr->collection_result); + + if (rc == MEMCACHED_COUNT) + { + *count= ptr->collection_result.collection_count; + /* reset collection because it is used in memcached_coll_result_reset(collection_result.cc)*/ + ptr->collection_result.collection_count= 0; + rc= MEMCACHED_SUCCESS; + } +#else if (rc == MEMCACHED_SUCCESS) { if (to_write == false) @@ -3795,6 +4158,7 @@ static memcached_return_t do_coll_count(memcached_st *ptr, { memcached_io_reset(instance); } +#endif return rc; } diff --git a/libmemcached/constants.h b/libmemcached/constants.h index 18686c64..74751b5d 100644 --- a/libmemcached/constants.h +++ b/libmemcached/constants.h @@ -66,6 +66,7 @@ #define POOL_UPDATE_SERVERLIST 1 #define POOL_MORE_CONCURRENCY 1 #define KETAMA_HASH_COLLSION 1 +#define MEMCACHED_VDO_ERROR_HANDLING 1 /* Public defines */ #define MEMCACHED_DEFAULT_PORT 11211 diff --git a/libmemcached/delete.cc b/libmemcached/delete.cc index 8f1de223..cf575c34 100644 --- a/libmemcached/delete.cc +++ b/libmemcached/delete.cc @@ -100,7 +100,36 @@ static inline memcached_return_t ascii_delete(memcached_st *ptr, /* Send command header */ memcached_return_t rc= memcached_vdo(instance, vector, 5, to_write); +#ifdef MEMCACHED_VDO_ERROR_HANDLING + if (rc != MEMCACHED_SUCCESS) { + return rc; + } + if (to_write == false) { + return MEMCACHED_BUFFERED; + } + if (no_reply) { + return MEMCACHED_SUCCESS; + } + char result[MEMCACHED_DEFAULT_COMMAND_SIZE]; + rc= memcached_response(instance, result, MEMCACHED_DEFAULT_COMMAND_SIZE, NULL); + + if (rc == MEMCACHED_DELETED) + { + rc= MEMCACHED_SUCCESS; + } +#ifdef ENABLE_REPLICATION + else if (rc == MEMCACHED_SWITCHOVER or rc == MEMCACHED_REPL_SLAVE) + { + ZOO_LOG_INFO(("Switchover: hostname=%s port=%d error=%s", + instance->hostname, instance->port, memcached_strerror(ptr, rc))); + if (memcached_rgroup_switchover(ptr, instance) == true) { + instance= memcached_server_instance_fetch(ptr, server_key); + goto do_action; + } + } +#endif +#else if (rc == MEMCACHED_SUCCESS) { if (to_write == false) @@ -134,6 +163,7 @@ static inline memcached_return_t ascii_delete(memcached_st *ptr, memcached_io_reset(instance); return rc; } +#endif return rc; } @@ -184,10 +214,15 @@ static inline memcached_return_t binary_delete(memcached_st *ptr, memcached_io_write(instance, NULL, 0, true); } +#ifdef MEMCACHED_VDO_ERROR_HANDLING + memcached_return_t rc= memcached_vdo(instance, vector, 3, to_write); + if (rc != MEMCACHED_SUCCESS) { +#else memcached_return_t rc= MEMCACHED_SUCCESS; if ((rc= memcached_vdo(instance, vector, 3, to_write)) != MEMCACHED_SUCCESS) { memcached_io_reset(instance); +#endif return rc; } @@ -205,6 +240,12 @@ static inline memcached_return_t binary_delete(memcached_st *ptr, replica= memcached_server_instance_fetch(ptr, server_key); +#ifdef MEMCACHED_VDO_ERROR_HANDLING + if (memcached_vdo(replica, vector, 3, to_write) == MEMCACHED_SUCCESS) + { + memcached_server_response_decrement(replica); + } +#else if (memcached_vdo(replica, vector, 3, to_write) != MEMCACHED_SUCCESS) { memcached_io_reset(replica); @@ -213,9 +254,35 @@ static inline memcached_return_t binary_delete(memcached_st *ptr, { memcached_server_response_decrement(replica); } +#endif } } +#ifdef MEMCACHED_VDO_ERROR_HANDLING + if (to_write == false) { + return MEMCACHED_BUFFERED; + } + if (no_reply) { + return MEMCACHED_SUCCESS; + } + char result[MEMCACHED_DEFAULT_COMMAND_SIZE]; + rc= memcached_response(instance, result, MEMCACHED_DEFAULT_COMMAND_SIZE, NULL); + if (rc == MEMCACHED_DELETED) + { + return MEMCACHED_SUCCESS; + } +#ifdef ENABLE_REPLICATION + if (rc == MEMCACHED_SWITCHOVER or rc == MEMCACHED_REPL_SLAVE) + { + ZOO_LOG_INFO(("Switchover: hostname=%s port=%d error=%s", + instance->hostname, instance->port, memcached_strerror(ptr, rc))); + if (memcached_rgroup_switchover(ptr, instance) == true) { + instance= memcached_server_instance_fetch(ptr, server_key); + goto do_action; + } + } +#endif +#else if (to_write == false) { rc= MEMCACHED_BUFFERED; @@ -240,6 +307,7 @@ static inline memcached_return_t binary_delete(memcached_st *ptr, } #endif } +#endif return rc; } diff --git a/libmemcached/do.cc b/libmemcached/do.cc index 14b65a8d..64dd9328 100644 --- a/libmemcached/do.cc +++ b/libmemcached/do.cc @@ -61,6 +61,9 @@ memcached_return_t memcached_do(memcached_server_write_instance_st ptr, if (sent_length == -1 || (size_t)sent_length != command_length) { rc= MEMCACHED_WRITE_FAILURE; +#ifdef MEMCACHED_VDO_ERROR_HANDLING + memcached_io_reset(ptr); +#endif return rc; } if ((ptr->root->flags.no_reply) == 0) @@ -114,6 +117,9 @@ memcached_return_t memcached_vdo(memcached_server_write_instance_st ptr, rc= MEMCACHED_WRITE_FAILURE; WATCHPOINT_ERROR(rc); WATCHPOINT_ERRNO(errno); +#ifdef MEMCACHED_VDO_ERROR_HANDLING + memcached_io_reset(ptr); +#endif return rc; } if ((ptr->root->flags.no_reply) == 0 and (ptr->root->flags.piped == false)) diff --git a/libmemcached/exist.cc b/libmemcached/exist.cc index e8db2e46..76659d8b 100644 --- a/libmemcached/exist.cc +++ b/libmemcached/exist.cc @@ -92,6 +92,26 @@ static memcached_return_t ascii_exist(memcached_st *memc, #else memcached_return_t rc= memcached_vdo(instance, vector, 8, true); #endif +#ifdef MEMCACHED_VDO_ERROR_HANDLING + if (rc != MEMCACHED_SUCCESS) { + return rc; + } + + char buffer[MEMCACHED_DEFAULT_COMMAND_SIZE]; +#ifdef LIBMEMCACHED_WITH_ZK_INTEGRATION + while ((rc= memcached_coll_response(instance, buffer, MEMCACHED_DEFAULT_COMMAND_SIZE, NULL)) == MEMCACHED_ATTR); + if (rc == MEMCACHED_END) + rc= MEMCACHED_SUCCESS; +#else + rc= memcached_response(instance, buffer, MEMCACHED_DEFAULT_COMMAND_SIZE, NULL); + + if (rc == MEMCACHED_NOTSTORED) + rc= MEMCACHED_SUCCESS; + + if (rc == MEMCACHED_STORED) + rc= MEMCACHED_NOTFOUND; +#endif +#else if (rc == MEMCACHED_SUCCESS) { char buffer[MEMCACHED_DEFAULT_COMMAND_SIZE]; @@ -112,6 +132,7 @@ static memcached_return_t ascii_exist(memcached_st *memc, if (rc == MEMCACHED_WRITE_FAILURE) memcached_io_reset(instance); +#endif return rc; } @@ -148,12 +169,19 @@ static memcached_return_t binary_exist(memcached_st *memc, memcached_server_write_instance_st instance= memcached_server_instance_fetch(memc, server_key); /* write the header */ +#ifdef MEMCACHED_VDO_ERROR_HANDLING + memcached_return_t rc= memcached_vdo(instance, vector, 3, true); + if (rc != MEMCACHED_SUCCESS) { + return rc; + } +#else memcached_return_t rc; if ((rc= memcached_vdo(instance, vector, 3, true)) != MEMCACHED_SUCCESS) { memcached_io_reset(instance); return (rc == MEMCACHED_SUCCESS) ? MEMCACHED_WRITE_FAILURE : rc; } +#endif rc= memcached_response(instance, NULL, 0, NULL); diff --git a/libmemcached/flush.cc b/libmemcached/flush.cc index 20061317..c1c06115 100644 --- a/libmemcached/flush.cc +++ b/libmemcached/flush.cc @@ -219,11 +219,19 @@ static memcached_return_t memcached_flush_binary(memcached_st *ptr, request.message.header.request.opcode= PROTOCOL_BINARY_CMD_FLUSH; } +#ifdef MEMCACHED_VDO_ERROR_HANDLING + memcached_return_t rc = memcached_do(instance, request.bytes, sizeof(request.bytes), true); + if (rc != MEMCACHED_SUCCESS) + { + return rc; + } +#else if (memcached_do(instance, request.bytes, sizeof(request.bytes), true) != MEMCACHED_SUCCESS) { memcached_io_reset(instance); return MEMCACHED_WRITE_FAILURE; } +#endif } for (uint32_t x= 0; x < memcached_server_count(ptr); x++) diff --git a/libmemcached/get.cc b/libmemcached/get.cc index 8edbedf8..33d3d361 100644 --- a/libmemcached/get.cc +++ b/libmemcached/get.cc @@ -199,9 +199,13 @@ static memcached_return_t ascii_get_by_key(memcached_st *ptr, }; rc= memcached_vdo(instance, vector, 4, true); +#ifdef MEMCACHED_VDO_ERROR_HANDLING + if (rc != MEMCACHED_SUCCESS) { +#else if (memcached_failed(rc)) { memcached_io_reset(instance); +#endif memcached_set_error(*ptr, rc, MEMCACHED_AT); } return rc; diff --git a/libmemcached/stats.cc b/libmemcached/stats.cc index fbc3b95f..d777e71f 100644 --- a/libmemcached/stats.cc +++ b/libmemcached/stats.cc @@ -352,20 +352,34 @@ static memcached_return_t binary_stats_fetch(memcached_stat_st *memc_stat, { len, args } }; +#ifdef MEMCACHED_VDO_ERROR_HANDLING + rc = memcached_vdo(instance, vector, 2, true); + if (rc != MEMCACHED_SUCCESS) { + return rc; + } +#else if (memcached_vdo(instance, vector, 2, true) != MEMCACHED_SUCCESS) { memcached_io_reset(instance); return MEMCACHED_WRITE_FAILURE; } +#endif } else { +#ifdef MEMCACHED_VDO_ERROR_HANDLING + memcached_return_t rc = memcached_do(instance, request.bytes, sizeof(request.bytes), true); + if (rc != MEMCACHED_SUCCESS) { + return rc; + } +#else if (memcached_do(instance, request.bytes, sizeof(request.bytes), true) != MEMCACHED_SUCCESS) { memcached_io_reset(instance); return MEMCACHED_WRITE_FAILURE; } +#endif } memcached_server_response_decrement(instance); diff --git a/libmemcached/storage.cc b/libmemcached/storage.cc index aae5fd39..541f76de 100644 --- a/libmemcached/storage.cc +++ b/libmemcached/storage.cc @@ -218,6 +218,12 @@ static memcached_return_t memcached_send_binary(memcached_st *ptr, instance= memcached_server_instance_fetch(ptr, server_key); +#ifdef MEMCACHED_VDO_ERROR_HANDLING + if (memcached_vdo(instance, vector, 4, false) == MEMCACHED_SUCCESS) + { + memcached_server_response_decrement(instance); + } +#else if (memcached_vdo(instance, vector, 4, false) != MEMCACHED_SUCCESS) { memcached_io_reset(instance); @@ -226,6 +232,7 @@ static memcached_return_t memcached_send_binary(memcached_st *ptr, { memcached_server_response_decrement(instance); } +#endif } } @@ -353,7 +360,36 @@ static memcached_return_t memcached_send_ascii(memcached_st *ptr, /* Send command header */ memcached_return_t rc= memcached_vdo(instance, vector, 11, to_write); +#ifdef MEMCACHED_VDO_ERROR_HANDLING + if (rc != MEMCACHED_SUCCESS) { + return rc; + } + if (to_write == false) { + return MEMCACHED_BUFFERED; + } + if (ptr->flags.no_reply) { + return MEMCACHED_SUCCESS; + } + + char result[MEMCACHED_DEFAULT_COMMAND_SIZE]; + rc= memcached_response(instance, result, MEMCACHED_DEFAULT_COMMAND_SIZE, NULL); + if (rc == MEMCACHED_STORED) + { + rc= MEMCACHED_SUCCESS; + } +#ifdef ENABLE_REPLICATION + else if (rc == MEMCACHED_SWITCHOVER or rc == MEMCACHED_REPL_SLAVE) + { + ZOO_LOG_INFO(("Switchover: hostname=%s port=%d error=%s", + instance->hostname, instance->port, memcached_strerror(ptr, rc))); + if (memcached_rgroup_switchover(ptr, instance) == true) { + instance= memcached_server_instance_fetch(ptr, server_key); + goto do_action; + } + } +#endif +#else if (rc == MEMCACHED_SUCCESS) { if (ptr->flags.no_reply) @@ -389,6 +425,7 @@ static memcached_return_t memcached_send_ascii(memcached_st *ptr, if (rc == MEMCACHED_WRITE_FAILURE) memcached_io_reset(instance); +#endif return rc; } diff --git a/libmemcached/version.cc b/libmemcached/version.cc index d632be41..3b2165d9 100644 --- a/libmemcached/version.cc +++ b/libmemcached/version.cc @@ -79,11 +79,18 @@ static inline memcached_return_t memcached_version_textual(memcached_st *ptr) } memcached_return_t rrc= memcached_vdo(instance, vector, 1, true); +#ifdef MEMCACHED_VDO_ERROR_HANDLING + if (rrc != MEMCACHED_SUCCESS) +#else if (memcached_failed(rrc)) +#endif { errors_happened= true; (void)memcached_set_error(*instance, rrc, MEMCACHED_AT); +#ifdef MEMCACHED_VDO_ERROR_HANDLING +#else memcached_io_reset(instance); +#endif continue; } success++; @@ -133,11 +140,18 @@ static inline memcached_return_t memcached_version_binary(memcached_st *ptr) } memcached_return_t rrc= memcached_vdo(instance, vector, 1, true); +#ifdef MEMCACHED_VDO_ERROR_HANDLING + if (rrc != MEMCACHED_SUCCESS) +#else if (memcached_failed(rrc)) +#endif { errors_happened= true; (void)memcached_set_error(*instance, rrc, MEMCACHED_AT); +#ifdef MEMCACHED_VDO_ERROR_HANDLING +#else memcached_io_reset(instance); +#endif continue; } success++; @@ -174,10 +188,17 @@ static inline memcached_return_t version_ascii_instance(memcached_server_st *ins uint32_t before_active= instance->cursor_active; rc= memcached_vdo(instance, vector, 1, true); +#ifdef MEMCACHED_VDO_ERROR_HANDLING + if (rc != MEMCACHED_SUCCESS) +#else if (rc == MEMCACHED_WRITE_FAILURE) +#endif { (void)memcached_set_error(*instance, rc, MEMCACHED_AT); +#ifdef MEMCACHED_VDO_ERROR_HANDLING +#else memcached_io_reset(instance); +#endif return rc; } /* If no_reply or piped is set, cursor_active is not increased in memcached_vdo(). @@ -212,10 +233,17 @@ static inline memcached_return_t version_binary_instance(memcached_server_st *in uint32_t before_active= instance->cursor_active; rc= memcached_vdo(instance, vector, 1, true); +#ifdef MEMCACHED_VDO_ERROR_HANDLING + if (rc != MEMCACHED_SUCCESS) +#else if (rc == MEMCACHED_WRITE_FAILURE) +#endif { (void)memcached_set_error(*instance, rc, MEMCACHED_AT); +#ifdef MEMCACHED_VDO_ERROR_HANDLING +#else memcached_io_reset(instance); +#endif return rc; } /* If no_reply or piped is set, cursor_active is not increased in memcached_vdo().