Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

INTERNAL: Move io reset from where calls memcached_vdo() to where fails insied of memcached_vdo() #167 #250

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions libmemcached/auto.cc
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,11 @@ static memcached_return_t text_incr_decr(memcached_st *ptr,
#endif
/* Send command header */
memcached_return_t rc= memcached_vdo(instance, vector, 7, true);
#ifdef MEMCACHED_VDO_ERROR_HANDLING
if (ptr->flags.no_reply or rc != MEMCACHED_SUCCESS)
#else
if (ptr->flags.no_reply or memcached_failed(rc))
#endif
{
return rc;
}
Expand Down Expand Up @@ -214,6 +218,12 @@ static memcached_return_t binary_incr_decr(memcached_st *ptr, uint8_t cmd,
#ifdef ENABLE_REPLICATION
do_action:
#endif
#ifdef MEMCACHED_VDO_ERROR_HANDLING
memcached_return_t rc= memcached_vdo(instance, vector, 3, true);
if (no_reply or rc != MEMCACHED_SUCCESS) {
return rc;
}
#else
memcached_return_t rc;
if (memcached_failed(rc= memcached_vdo(instance, vector, 3, true)))
{
Expand All @@ -223,6 +233,7 @@ static memcached_return_t binary_incr_decr(memcached_st *ptr, uint8_t cmd,

if (no_reply)
return MEMCACHED_SUCCESS;
#endif

rc= memcached_response(instance, (char*)value, sizeof(*value), NULL);
#ifdef ENABLE_REPLICATION
Expand Down
39 changes: 39 additions & 0 deletions libmemcached/collection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -763,8 +763,11 @@ memcached_return_t memcached_set_attrs(memcached_st *ptr,
}
}

#ifdef MEMCACHED_VDO_ERROR_HANDLING
#else
if (rc == MEMCACHED_WRITE_FAILURE)
memcached_io_reset(instance);
#endif

return rc;
}
Expand Down Expand Up @@ -1043,10 +1046,13 @@ static memcached_return_t do_coll_create(memcached_st *ptr,
}
}

#ifdef MEMCACHED_VDO_ERROR_HANDLING
#else
if (rc == MEMCACHED_WRITE_FAILURE)
{
memcached_io_reset(instance);
}
#endif

return rc;
}
Expand Down Expand Up @@ -1186,8 +1192,11 @@ static memcached_return_t internal_coll_piped_insert(memcached_st *ptr,
rc= memcached_vdo(instance, vector, 7, to_write);
if (rc != MEMCACHED_SUCCESS)
{
#ifdef MEMCACHED_VDO_ERROR_HANDLING
#else
if (rc == MEMCACHED_WRITE_FAILURE)
memcached_io_reset(instance);
#endif
return rc;
}

Expand Down Expand Up @@ -1261,8 +1270,11 @@ static memcached_return_t internal_coll_piped_exist(memcached_st *ptr,
rc= memcached_vdo(instance, vector, 6, to_write);
if (rc != MEMCACHED_SUCCESS)
{
#ifdef MEMCACHED_VDO_ERROR_HANDLING
#else
if (rc == MEMCACHED_WRITE_FAILURE)
memcached_io_reset(instance);
#endif
return rc;
}

Expand Down Expand Up @@ -1426,8 +1438,11 @@ static memcached_return_t do_coll_insert(memcached_st *ptr,
}
}

#ifdef MEMCACHED_VDO_ERROR_HANDLING
#else
if (rc == MEMCACHED_WRITE_FAILURE)
memcached_io_reset(instance);
#endif

return rc;
}
Expand Down Expand Up @@ -1656,10 +1671,13 @@ static memcached_return_t do_coll_delete(memcached_st *ptr,
}
}

#ifdef MEMCACHED_VDO_ERROR_HANDLING
#else
if (rc == MEMCACHED_WRITE_FAILURE)
{
memcached_io_reset(instance);
}
#endif

return rc;
}
Expand Down Expand Up @@ -1966,10 +1984,13 @@ static memcached_return_t do_coll_get(memcached_st *ptr,
mkey_buffer= NULL;
}

#ifdef MEMCACHED_VDO_ERROR_HANDLING
#else
if (rc == MEMCACHED_WRITE_FAILURE)
{
memcached_io_reset(instance);
}
#endif

return rc;
}
Expand Down Expand Up @@ -2339,10 +2360,13 @@ static memcached_return_t do_bop_find_position(memcached_st *ptr,
}
}

#ifdef MEMCACHED_VDO_ERROR_HANDLING
#else
if (rc == MEMCACHED_WRITE_FAILURE)
{
memcached_io_reset(instance);
}
#endif

return rc;
}
Expand Down Expand Up @@ -2442,10 +2466,13 @@ static memcached_return_t do_bop_get_by_position(memcached_st *ptr,
}
}

#ifdef MEMCACHED_VDO_ERROR_HANDLING
#else
if (rc == MEMCACHED_WRITE_FAILURE)
{
memcached_io_reset(instance);
}
#endif

return rc;
}
Expand Down Expand Up @@ -2555,10 +2582,13 @@ static memcached_return_t do_bop_find_position_with_get(memcached_st *ptr,
}
}

#ifdef MEMCACHED_VDO_ERROR_HANDLING
#else
if (rc == MEMCACHED_WRITE_FAILURE)
{
memcached_io_reset(instance);
}
#endif

return rc;
}
Expand Down Expand Up @@ -2927,10 +2957,13 @@ static memcached_return_t do_coll_exist(memcached_st *ptr,
}
}

#ifdef MEMCACHED_VDO_ERROR_HANDLING
#else
if (rc == MEMCACHED_WRITE_FAILURE)
{
memcached_io_reset(instance);
}
#endif

return rc;
}
Expand Down Expand Up @@ -3506,10 +3539,13 @@ static memcached_return_t do_coll_update(memcached_st *ptr,
}
}

#ifdef MEMCACHED_VDO_ERROR_HANDLING
#else
if (rc == MEMCACHED_WRITE_FAILURE)
{
memcached_io_reset(instance);
}
#endif

return rc;
}
Expand Down Expand Up @@ -3791,10 +3827,13 @@ static memcached_return_t do_coll_count(memcached_st *ptr,
}
}

#ifdef MEMCACHED_VDO_ERROR_HANDLING
#else
if (rc == MEMCACHED_WRITE_FAILURE)
{
memcached_io_reset(instance);
}
#endif

return rc;
}
Expand Down
1 change: 1 addition & 0 deletions libmemcached/constants.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
#define POOL_UPDATE_SERVERLIST 1
#define POOL_MORE_CONCURRENCY 1
#define KETAMA_HASH_COLLSION 1
#define MEMCACHED_VDO_ERROR_HANDLING 1

/* Public defines */
#define MEMCACHED_DEFAULT_PORT 11211
Expand Down
14 changes: 14 additions & 0 deletions libmemcached/delete.cc
Original file line number Diff line number Diff line change
Expand Up @@ -129,11 +129,14 @@ static inline memcached_return_t ascii_delete(memcached_st *ptr,
#endif
}
}
#ifdef MEMCACHED_VDO_ERROR_HANDLING
#else
else
{
memcached_io_reset(instance);
return rc;
}
#endif

return rc;
}
Expand Down Expand Up @@ -184,10 +187,15 @@ static inline memcached_return_t binary_delete(memcached_st *ptr,
memcached_io_write(instance, NULL, 0, true);
}

#ifdef MEMCACHED_VDO_ERROR_HANDLING
memcached_return_t rc= memcached_vdo(instance, vector, 3, to_write);
if (rc != MEMCACHED_SUCCESS) {
#else
memcached_return_t rc= MEMCACHED_SUCCESS;
if ((rc= memcached_vdo(instance, vector, 3, to_write)) != MEMCACHED_SUCCESS)
{
memcached_io_reset(instance);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

하나만 더 질문할게요.

기존 코드가 잘못되어 있긴 하지만, memcached_vdo() 수행이 실패하면, 무조건 io_reset 하는 코드도 있습니다.
connect() 하는 과정에서 실패하면 해당 socket은 close 상태로 설정하는 것 같지만,
해당 instance의 state 등(그 외 어떤 필드가 변경되는 지는 잘 모름)은 변경된 상태로 그대로 둡니다.
이러한 정보를 다시 초기화하기 위해서 io_reset 수행이 필요하지 않는가요 ?
초기화하지 않아도 문제가 없는 지를 확인해 주면 좋겠습니다.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

확인해보니 connect 실패 시에 io reset 하지 않는 게 connect 로직에 맞습니다.
io reset 하지 않아야 connect 과정에서 사용하는 필드의 변경 사항이 그대로 남고, 그 변경된 필드 값으로 connect 로직(실제 연결 시도 전 retry time 조건에 따라 MEMCACHED_SERVER_TEMPORARILY_DISABLED 에러 발생 등)을 수행합니다.

#endif
return rc;
}

Expand All @@ -205,6 +213,11 @@ static inline memcached_return_t binary_delete(memcached_st *ptr,

replica= memcached_server_instance_fetch(ptr, server_key);

#ifdef MEMCACHED_VDO_ERROR_HANDLING
if (memcached_vdo(replica, vector, 3, to_write) == MEMCACHED_SUCCESS) {
memcached_server_response_decrement(replica);
}
#else
if (memcached_vdo(replica, vector, 3, to_write) != MEMCACHED_SUCCESS)
{
memcached_io_reset(replica);
Expand All @@ -213,6 +226,7 @@ static inline memcached_return_t binary_delete(memcached_st *ptr,
{
memcached_server_response_decrement(replica);
}
#endif
}
}

Expand Down
6 changes: 6 additions & 0 deletions libmemcached/do.cc
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,9 @@ memcached_return_t memcached_do(memcached_server_write_instance_st ptr,
if (sent_length == -1 || (size_t)sent_length != command_length)
{
rc= MEMCACHED_WRITE_FAILURE;
#ifdef MEMCACHED_VDO_ERROR_HANDLING
memcached_io_reset(ptr);
#endif
return rc;
}
if ((ptr->root->flags.no_reply) == 0)
Expand Down Expand Up @@ -114,6 +117,9 @@ memcached_return_t memcached_vdo(memcached_server_write_instance_st ptr,
rc= MEMCACHED_WRITE_FAILURE;
WATCHPOINT_ERROR(rc);
WATCHPOINT_ERRNO(errno);
#ifdef MEMCACHED_VDO_ERROR_HANDLING
memcached_io_reset(ptr);
#endif
return rc;
}
if ((ptr->root->flags.no_reply) == 0 and (ptr->root->flags.piped == false))
Expand Down
10 changes: 10 additions & 0 deletions libmemcached/exist.cc
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,11 @@ static memcached_return_t ascii_exist(memcached_st *memc,
#endif
}

#ifdef MEMCACHED_VDO_ERROR_HANDLING
#else
if (rc == MEMCACHED_WRITE_FAILURE)
memcached_io_reset(instance);
#endif

return rc;
}
Expand Down Expand Up @@ -148,12 +151,19 @@ static memcached_return_t binary_exist(memcached_st *memc,
memcached_server_write_instance_st instance= memcached_server_instance_fetch(memc, server_key);

/* write the header */
#ifdef MEMCACHED_VDO_ERROR_HANDLING
memcached_return_t rc= memcached_vdo(instance, vector, 3, true);
if (rc != MEMCACHED_SUCCESS) {
return rc;
}
#else
memcached_return_t rc;
if ((rc= memcached_vdo(instance, vector, 3, true)) != MEMCACHED_SUCCESS)
{
memcached_io_reset(instance);
return (rc == MEMCACHED_SUCCESS) ? MEMCACHED_WRITE_FAILURE : rc;
}
#endif

rc= memcached_response(instance, NULL, 0, NULL);

Expand Down
7 changes: 7 additions & 0 deletions libmemcached/flush.cc
Original file line number Diff line number Diff line change
Expand Up @@ -219,11 +219,18 @@ static memcached_return_t memcached_flush_binary(memcached_st *ptr,
request.message.header.request.opcode= PROTOCOL_BINARY_CMD_FLUSH;
}

#ifdef MEMCACHED_VDO_ERROR_HANDLING
memcached_return_t rc = memcached_do(instance, request.bytes, sizeof(request.bytes), true);
if (rc != MEMCACHED_SUCCESS) {
return rc;
}
#else
if (memcached_do(instance, request.bytes, sizeof(request.bytes), true) != MEMCACHED_SUCCESS)
{
memcached_io_reset(instance);
return MEMCACHED_WRITE_FAILURE;
}
#endif
}

for (uint32_t x= 0; x < memcached_server_count(ptr); x++)
Expand Down
4 changes: 4 additions & 0 deletions libmemcached/get.cc
Original file line number Diff line number Diff line change
Expand Up @@ -199,9 +199,13 @@ static memcached_return_t ascii_get_by_key(memcached_st *ptr,
};

rc= memcached_vdo(instance, vector, 4, true);
#ifdef MEMCACHED_VDO_ERROR_HANDLING
if (rc != MEMCACHED_SUCCESS) {
#else
if (memcached_failed(rc))
{
memcached_io_reset(instance);
#endif
memcached_set_error(*ptr, rc, MEMCACHED_AT);
}
return rc;
Expand Down
18 changes: 18 additions & 0 deletions libmemcached/stats.cc
Original file line number Diff line number Diff line change
Expand Up @@ -352,20 +352,34 @@ static memcached_return_t binary_stats_fetch(memcached_stat_st *memc_stat,
{ len, args }
};

#ifdef MEMCACHED_VDO_ERROR_HANDLING
rc = memcached_vdo(instance, vector, 2, true);
if (rc != MEMCACHED_SUCCESS) {
return rc;
}
#else
if (memcached_vdo(instance, vector, 2, true) != MEMCACHED_SUCCESS)
{
memcached_io_reset(instance);
return MEMCACHED_WRITE_FAILURE;
}
#endif
}
else
{
#ifdef MEMCACHED_VDO_ERROR_HANDLING
memcached_return_t rc = memcached_do(instance, request.bytes, sizeof(request.bytes), true);
if (rc != MEMCACHED_SUCCESS) {
return rc;
}
#else
if (memcached_do(instance, request.bytes,
sizeof(request.bytes), true) != MEMCACHED_SUCCESS)
{
memcached_io_reset(instance);
return MEMCACHED_WRITE_FAILURE;
}
#endif
}

memcached_server_response_decrement(instance);
Expand Down Expand Up @@ -434,7 +448,11 @@ static memcached_return_t ascii_stats_fetch(memcached_stat_st *memc_stat,
}

memcached_return_t rc= memcached_do(instance, buffer, (size_t)write_length, true);
#ifdef MEMCACHED_VDO_ERROR_HANDLING
if (rc == MEMCACHED_SUCCESS)
#else
if (memcached_success(rc))
#endif
{
char result[MEMCACHED_DEFAULT_COMMAND_SIZE];
while ((rc= memcached_response(instance, result, MEMCACHED_DEFAULT_COMMAND_SIZE, NULL)) == MEMCACHED_STAT)
Expand Down
Loading