diff --git a/src/replication.c b/src/replication.c index f7eb0475ae..9d9a5987db 100644 --- a/src/replication.c +++ b/src/replication.c @@ -2644,7 +2644,6 @@ static int dualChannelReplHandleReplconfReply(connection *conn, sds *err) { return C_OK; } -#define C_RETRY -2 static int dualChannelReplHandleEndOffsetResponse(connection *conn, sds *err) { uint64_t rdb_client_id; *err = receiveSynchronousResponse(conn); @@ -2716,12 +2715,12 @@ static void dualChannelFullSyncWithPrimary(connection *conn) { switch (server.repl_rdb_channel_state) { case REPL_DUAL_CHANNEL_SEND_HANDSHAKE: ret = dualChannelReplHandleHandshake(conn, &err); - server.repl_rdb_channel_state = REPL_DUAL_CHANNEL_RECEIVE_AUTH_REPLY; + if (ret == C_OK) server.repl_rdb_channel_state = REPL_DUAL_CHANNEL_RECEIVE_AUTH_REPLY; break; case REPL_DUAL_CHANNEL_RECEIVE_AUTH_REPLY: if (server.primary_auth) { ret = dualChannelReplHandleAuthReply(conn, &err); - server.repl_rdb_channel_state = REPL_DUAL_CHANNEL_RECEIVE_REPLCONF_REPLY; + if (ret == C_OK) server.repl_rdb_channel_state = REPL_DUAL_CHANNEL_RECEIVE_REPLCONF_REPLY; /* Wait for next bulk before trying to read replconf reply. */ break; } @@ -2729,11 +2728,11 @@ static void dualChannelFullSyncWithPrimary(connection *conn) { /* fall through */ case REPL_DUAL_CHANNEL_RECEIVE_REPLCONF_REPLY: ret = dualChannelReplHandleReplconfReply(conn, &err); - server.repl_rdb_channel_state = REPL_DUAL_CHANNEL_RECEIVE_ENDOFF; + if (ret == C_OK) server.repl_rdb_channel_state = REPL_DUAL_CHANNEL_RECEIVE_ENDOFF; break; case REPL_DUAL_CHANNEL_RECEIVE_ENDOFF: ret = dualChannelReplHandleEndOffsetResponse(conn, &err); - if (ret != C_RETRY) server.repl_rdb_channel_state = REPL_DUAL_CHANNEL_RDB_LOAD; + if (ret == C_OK) server.repl_rdb_channel_state = REPL_DUAL_CHANNEL_RDB_LOAD; break; default: break; } @@ -2742,8 +2741,10 @@ static void dualChannelFullSyncWithPrimary(connection *conn) { return; error: - connClose(conn); - server.repl_transfer_s = NULL; + if (server.repl_transfer_s) { + connClose(server.repl_transfer_s); + server.repl_transfer_s = NULL; + } if (server.repl_rdb_transfer_s) { connClose(server.repl_rdb_transfer_s); server.repl_rdb_transfer_s = NULL; @@ -3263,27 +3264,29 @@ void dualChannelSetupMainConnForPsync(connection *conn) { char *err = NULL; int ret; + REPL_STATE_RECEIVE_PING_REPLY && server.repl_state <= REPL_STATE_RECEIVE_PSYNC_REPLY; + switch (server.repl_state) { case REPL_STATE_SEND_HANDSHAKE: ret = dualChannelReplMainConnSendHandshake(conn, &err); - server.repl_state = REPL_STATE_RECEIVE_CAPA_REPLY; + if (ret == C_OK) server.repl_state = REPL_STATE_RECEIVE_CAPA_REPLY; break; case REPL_STATE_RECEIVE_CAPA_REPLY: ret = dualChannelReplMainConnRecvCapaReply(conn, &err); if (ret == C_ERR) { break; } - server.repl_state = REPL_STATE_SEND_PSYNC; + if (ret == C_OK) server.repl_state = REPL_STATE_SEND_PSYNC; sdsfree(err); err = NULL; /* fall through */ case REPL_STATE_SEND_PSYNC: ret = dualChannelReplMainConnSendPsync(conn, &err); - server.repl_state = REPL_STATE_RECEIVE_PSYNC_REPLY; + if (ret == C_OK) server.repl_state = REPL_STATE_RECEIVE_PSYNC_REPLY; break; case REPL_STATE_RECEIVE_PSYNC_REPLY: ret = dualChannelReplMainConnRecvPsyncReply(conn, &err); - if (server.repl_rdb_channel_state != REPL_DUAL_CHANNEL_STATE_NONE) server.repl_state = REPL_STATE_TRANSFER; + if (ret == C_OK && server.repl_rdb_channel_state != REPL_DUAL_CHANNEL_STATE_NONE) server.repl_state = REPL_STATE_TRANSFER; /* In case the RDB is already loaded, the repl_state will be set during establishPrimaryConnection. */ break; default: diff --git a/src/server.h b/src/server.h index fbe57917c8..ea4693a9d7 100644 --- a/src/server.h +++ b/src/server.h @@ -110,6 +110,7 @@ struct hdr_histogram; /* Error codes */ #define C_OK 0 #define C_ERR -1 +#define C_RETRY -2 /* Static server configuration */ #define CONFIG_DEFAULT_HZ 10 /* Time interrupt calls/sec. */