Skip to content

Commit

Permalink
Dual channel replication (#60)
Browse files Browse the repository at this point in the history
In this PR we introduce the main benefit of dual channel replication by
continuously steaming the COB (client output buffers) in parallel to the
RDB and thus keeping the primary's side COB small AND accelerating the
overall sync process. By streaming the replication data to the replica
during the full sync, we reduce
1. Memory load from the primary's node.
2. CPU load from the primary's main process. [Latest performance
tests](#data)

## Motivation
* Reduce primary memory load. We do that by moving the COB tracking to
the replica side. This also decrease the chance for COB overruns. Note
that primary's input buffer limits at the replica side are less
restricted then primary's COB as the replica plays less critical part in
the replication group. While increasing the primary’s COB may end up
with primary reaching swap and clients suffering, at replica side we’re
more at ease with it. Larger COB means better chance to sync
successfully.
* Reduce primary main process CPU load. By opening a new, dedicated
connection for the RDB transfer, child processes can have direct access
to the new connection. Due to TLS connection restrictions, this was not
possible using one main connection. We eliminate the need for the child
process to use the primary's child-proc -> main-proc pipeline, thus
freeing up the main process to process clients queries.


 ## Dual Channel Replication high level interface design
- Dual channel replication begins when the replica sends a `REPLCONF
CAPA DUALCHANNEL` to the primary during initial
handshake. This is used to state that the replica is capable of dual
channel sync and that this is the replica's main channel, which is not
used for snapshot transfer.
- When replica lacks sufficient data for PSYNC, the primary will send
`-FULLSYNCNEEDED` response instead
of RDB data. As a next step, the replica creates a new connection
(rdb-channel) and configures it against
the primary with the appropriate capabilities and requirements. The
replica then requests a sync
     using the RDB channel. 
- Prior to forking, the primary sends the replica the snapshot's end
repl-offset, and attaches the replica
to the replication backlog to keep repl data until the replica requests
psync. The replica uses the main
     channel to request a PSYNC starting at the snapshot end offset. 
- The primary main threads sends incremental changes via the main
channel, while the bgsave process
sends the RDB directly to the replica via the rdb-channel. As for the
replica, the incremental
changes are stored on a local buffer, while the RDB is loaded into
memory.
- Once the replica completes loading the rdb, it drops the
rdb-connection and streams the accumulated incremental
     changes into memory. Repl steady state continues normally.

## New replica state machine


![image](https://github.com/user-attachments/assets/38fbfff0-60b9-4066-8b13-becdb87babc3)





## Data <a name="data"></a>

![image](https://github.com/user-attachments/assets/d73631a7-0a58-4958-a494-a7f4add9108f)


![image](https://github.com/user-attachments/assets/f44936ed-c59a-4223-905d-0fe48a6d31a6)


![image](https://github.com/user-attachments/assets/bd333ee2-3c47-47e5-b244-4ea75f77c836)

## Explanation 
These graphs demonstrate performance improvements during full sync
sessions using rdb-channel + streaming rdb directly from the background
process to the replica.

First graph- with at most 50 clients and light weight commands, we saw
5%-7.5% improvement in write latency during sync session.
Two graphs below- full sync was tested during heavy read commands from
the primary (such as sdiff, sunion on large sets). In that case, the
child process writes to the replica without sharing CPU with the loaded
main process. As a result, this not only improves client response time,
but may also shorten sync time by about 50%. The shorter sync time
results in less memory being used to store replication diffs (>60% in
some of the tested cases).

## Test setup 
Both primary and replica in the performance tests ran on the same
machine. RDB size in all tests is 3.7gb. I generated write load using
valkey-benchmark ` ./valkey-benchmark -r 100000 -n 6000000 lpush my_list
__rand_int__`.

---------

Signed-off-by: naglera <anagler123@gmail.com>
Signed-off-by: naglera <58042354+naglera@users.noreply.github.com>
Co-authored-by: Viktor Söderqvist <viktor.soderqvist@est.tech>
Co-authored-by: Ping Xie <pingxie@outlook.com>
Co-authored-by: Madelyn Olson <madelyneolson@gmail.com>
  • Loading branch information
4 people authored Jul 17, 2024
1 parent 66d0f7d commit ff6b780
Show file tree
Hide file tree
Showing 21 changed files with 2,623 additions and 225 deletions.
1 change: 1 addition & 0 deletions src/config.c
Original file line number Diff line number Diff line change
Expand Up @@ -3074,6 +3074,7 @@ standardConfig static_configs[] = {
createBoolConfig("lazyfree-lazy-user-flush", NULL, DEBUG_CONFIG | MODIFIABLE_CONFIG, server.lazyfree_lazy_user_flush, 0, NULL, NULL),
createBoolConfig("repl-disable-tcp-nodelay", NULL, MODIFIABLE_CONFIG, server.repl_disable_tcp_nodelay, 0, NULL, NULL),
createBoolConfig("repl-diskless-sync", NULL, DEBUG_CONFIG | MODIFIABLE_CONFIG, server.repl_diskless_sync, 1, NULL, NULL),
createBoolConfig("dual-channel-replication-enabled", NULL, DEBUG_CONFIG | MODIFIABLE_CONFIG | PROTECTED_CONFIG, server.dual_channel_replication, 0, NULL, NULL),
createBoolConfig("aof-rewrite-incremental-fsync", NULL, MODIFIABLE_CONFIG, server.aof_rewrite_incremental_fsync, 1, NULL, NULL),
createBoolConfig("no-appendfsync-on-rewrite", NULL, MODIFIABLE_CONFIG, server.aof_no_fsync_on_rewrite, 0, NULL, NULL),
createBoolConfig("cluster-require-full-coverage", NULL, MODIFIABLE_CONFIG, server.cluster_require_full_coverage, 1, NULL, NULL),
Expand Down
15 changes: 15 additions & 0 deletions src/debug.c
Original file line number Diff line number Diff line change
Expand Up @@ -495,6 +495,10 @@ void debugCommand(client *c) {
" In case RESET is provided the peak reset time will be restored to the default value",
"REPLYBUFFER RESIZING <0|1>",
" Enable or disable the reply buffer resize cron job",
"SLEEP-AFTER-FORK-SECONDS <seconds>",
" Stop the server's main process for <seconds> after forking.",
"DELAY-RDB-CLIENT-FREE-SECOND <seconds>",
" Grace period in seconds for replica main channel to establish psync.",
"DICT-RESIZING <0|1>",
" Enable or disable the main dict and expire dict resizing.",
NULL};
Expand Down Expand Up @@ -991,6 +995,17 @@ void debugCommand(client *c) {
return;
}
addReply(c, shared.ok);
} else if (!strcasecmp(c->argv[1]->ptr, "sleep-after-fork-seconds") && c->argc == 3) {
double sleep_after_fork_seconds;
if (getDoubleFromObjectOrReply(c, c->argv[2], &sleep_after_fork_seconds, NULL) != C_OK) {
addReply(c, shared.err);
return;
}
server.debug_sleep_after_fork_us = (int)(sleep_after_fork_seconds * 1e6);
addReply(c, shared.ok);
} else if (!strcasecmp(c->argv[1]->ptr, "delay-rdb-client-free-seconds") && c->argc == 3) {
server.wait_before_rdb_client_free = atoi(c->argv[2]->ptr);
addReply(c, shared.ok);
} else if (!strcasecmp(c->argv[1]->ptr, "dict-resizing") && c->argc == 3) {
server.dict_resizing = atoi(c->argv[2]->ptr);
addReply(c, shared.ok);
Expand Down
50 changes: 44 additions & 6 deletions src/networking.c
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,11 @@ int authRequired(client *c) {
return auth_required;
}

static inline int isReplicaReadyForReplData(client *replica) {
return (replica->repl_state == REPLICA_STATE_ONLINE || replica->repl_state == REPLICA_STATE_BG_RDB_LOAD) &&
!(replica->flag.close_asap);
}

client *createClient(connection *conn) {
client *c = zmalloc(sizeof(client));

Expand Down Expand Up @@ -189,6 +194,8 @@ client *createClient(connection *conn) {
c->replica_version = 0;
c->replica_capa = REPLICA_CAPA_NONE;
c->replica_req = REPLICA_REQ_NONE;
c->associated_rdb_client_id = 0;
c->rdb_client_disconnect_time = 0;
c->reply = listCreate();
c->deferred_reply_errors = NULL;
c->reply_bytes = 0;
Expand Down Expand Up @@ -255,8 +262,8 @@ void putClientInPendingWriteQueue(client *c) {
/* Schedule the client to write the output buffers to the socket only
* if not already done and, for replicas, if the replica can actually receive
* writes at this stage. */
if (!c->flag.pending_write && (c->repl_state == REPL_STATE_NONE ||
(c->repl_state == REPLICA_STATE_ONLINE && !c->repl_start_cmd_stream_on_ack))) {
if (!c->flag.pending_write &&
(c->repl_state == REPL_STATE_NONE || (isReplicaReadyForReplData(c) && !c->repl_start_cmd_stream_on_ack))) {
/* Here instead of installing the write handler, we just flag the
* client and put it into a list of clients that have something
* to write to the socket. This way before re-entering the event
Expand Down Expand Up @@ -1598,7 +1605,7 @@ void freeClient(client *c) {

/* If a client is protected, yet we need to free it right now, make sure
* to at least use asynchronous freeing. */
if (c->flag.protected) {
if (c->flag.protected || c->flag.protected_rdb_channel) {
freeClientAsync(c);
return;
}
Expand Down Expand Up @@ -1644,7 +1651,10 @@ void freeClient(client *c) {

/* Log link disconnection with replica */
if (getClientType(c) == CLIENT_TYPE_REPLICA) {
serverLog(LL_NOTICE, "Connection with replica %s lost.", replicationGetReplicaName(c));
serverLog(LL_NOTICE,
c->flag.repl_rdb_channel ? "Replica %s rdb channel disconnected."
: "Connection with replica %s lost.",
replicationGetReplicaName(c));
}

/* Free the query buffer */
Expand Down Expand Up @@ -1874,6 +1884,26 @@ int freeClientsInAsyncFreeQueue(void) {
while ((ln = listNext(&li)) != NULL) {
client *c = listNodeValue(ln);

if (c->flag.protected_rdb_channel) {
/* Check if it's safe to remove RDB connection protection during synchronization
* The primary gives a grace period before freeing this client because
* it serves as a reference to the first required replication data block for
* this replica */
if (!c->rdb_client_disconnect_time) {
c->rdb_client_disconnect_time = server.unixtime;
serverLog(LL_VERBOSE, "Postpone RDB client id=%llu (%s) free for %d seconds", (unsigned long long)c->id,
replicationGetReplicaName(c), server.wait_before_rdb_client_free);
continue;
}
if (server.unixtime - c->rdb_client_disconnect_time > server.wait_before_rdb_client_free) {
serverLog(LL_NOTICE,
"Replica main channel failed to establish PSYNC within the grace period (%ld seconds). "
"Freeing RDB client %llu.",
(long int)(server.unixtime - c->rdb_client_disconnect_time), (unsigned long long)c->id);
c->flag.protected_rdb_channel = 0;
}
}

if (c->flag.protected) continue;

c->flag.close_asap = 0;
Expand Down Expand Up @@ -2984,6 +3014,10 @@ int processInputBuffer(client *c) {
void readToQueryBuf(client *c) {
int big_arg = 0;
size_t qblen, readlen;

/* If the replica RDB client is marked as closed ASAP, do not try to read from it */
if (c->flag.close_asap) return;

int is_primary = c->read_flags & READ_FLAGS_PRIMARY;

readlen = PROTO_IOBUF_LEN;
Expand Down Expand Up @@ -4289,9 +4323,13 @@ int closeClientOnOutputBufferLimitReached(client *c, int async) {
serverAssert(c->reply_bytes < SIZE_MAX - (1024 * 64));
/* Note that c->reply_bytes is irrelevant for replica clients
* (they use the global repl buffers). */
if ((c->reply_bytes == 0 && getClientType(c) != CLIENT_TYPE_REPLICA) || c->flag.close_asap) return 0;
if ((c->reply_bytes == 0 && getClientType(c) != CLIENT_TYPE_REPLICA) ||
(c->flag.close_asap && !(c->flag.protected_rdb_channel)))
return 0;
if (checkClientOutputBufferLimits(c)) {
sds client = catClientInfoString(sdsempty(), c);
/* Remove RDB connection protection on COB overrun */
c->flag.protected_rdb_channel = 0;

if (async) {
freeClientAsync(c);
Expand Down Expand Up @@ -4335,7 +4373,7 @@ void flushReplicasOutputBuffers(void) {
*
* 3. Obviously if the replica is not ONLINE.
*/
if (replica->repl_state == REPLICA_STATE_ONLINE && !(replica->flag.close_asap) && can_receive_writes &&
if (isReplicaReadyForReplData(replica) && !(replica->flag.close_asap) && can_receive_writes &&
!replica->repl_start_cmd_stream_on_ack && clientHasPendingReplies(replica)) {
writeToClient(replica);
}
Expand Down
127 changes: 86 additions & 41 deletions src/rdb.c
Original file line number Diff line number Diff line change
Expand Up @@ -3451,12 +3451,15 @@ static void backgroundSaveDoneHandlerSocket(int exitcode, int bysignal) {
serverLog(LL_NOTICE, "Background RDB transfer terminated with success");
} else if (!bysignal && exitcode != 0) {
serverLog(LL_WARNING, "Background transfer error");
server.lastbgsave_status = C_ERR;
} else {
serverLog(LL_WARNING, "Background transfer terminated by signal %d", bysignal);
}
if (server.rdb_child_exit_pipe != -1) close(server.rdb_child_exit_pipe);
aeDeleteFileEvent(server.el, server.rdb_pipe_read, AE_READABLE);
close(server.rdb_pipe_read);
if (server.rdb_pipe_read > 0) {
aeDeleteFileEvent(server.el, server.rdb_pipe_read, AE_READABLE);
close(server.rdb_pipe_read);
}
server.rdb_child_exit_pipe = -1;
server.rdb_pipe_read = -1;
zfree(server.rdb_pipe_conns);
Expand Down Expand Up @@ -3507,43 +3510,65 @@ int rdbSaveToReplicasSockets(int req, rdbSaveInfo *rsi) {
listIter li;
pid_t childpid;
int pipefds[2], rdb_pipe_write, safe_to_exit_pipe;
int dual_channel = (req & REPLICA_REQ_RDB_CHANNEL);

if (hasActiveChildProcess()) return C_ERR;
serverAssert(server.rdb_pipe_read == -1 && server.rdb_child_exit_pipe == -1);

/* Even if the previous fork child exited, don't start a new one until we
* drained the pipe. */
if (server.rdb_pipe_conns) return C_ERR;

/* Before to fork, create a pipe that is used to transfer the rdb bytes to
* the parent, we can't let it write directly to the sockets, since in case
* of TLS we must let the parent handle a continuous TLS state when the
* child terminates and parent takes over. */
if (anetPipe(pipefds, O_NONBLOCK, 0) == -1) return C_ERR;
server.rdb_pipe_read = pipefds[0]; /* read end */
rdb_pipe_write = pipefds[1]; /* write end */

/* create another pipe that is used by the parent to signal to the child
* that it can exit. */
if (anetPipe(pipefds, 0, 0) == -1) {
close(rdb_pipe_write);
close(server.rdb_pipe_read);
return C_ERR;
if (!dual_channel) {
/* Before to fork, create a pipe that is used to transfer the rdb bytes to
* the parent, we can't let it write directly to the sockets, since in case
* of TLS we must let the parent handle a continuous TLS state when the
* child terminates and parent takes over. */
if (anetPipe(pipefds, O_NONBLOCK, 0) == -1) return C_ERR;
server.rdb_pipe_read = pipefds[0]; /* read end */
rdb_pipe_write = pipefds[1]; /* write end */

/* create another pipe that is used by the parent to signal to the child
* that it can exit. */
if (anetPipe(pipefds, 0, 0) == -1) {
close(rdb_pipe_write);
close(server.rdb_pipe_read);
return C_ERR;
}
safe_to_exit_pipe = pipefds[0]; /* read end */
server.rdb_child_exit_pipe = pipefds[1]; /* write end */
}
safe_to_exit_pipe = pipefds[0]; /* read end */
server.rdb_child_exit_pipe = pipefds[1]; /* write end */

/* Collect the connections of the replicas we want to transfer
* the RDB to, which are i WAIT_BGSAVE_START state. */
server.rdb_pipe_conns = zmalloc(sizeof(connection *) * listLength(server.replicas));
server.rdb_pipe_numconns = 0;
server.rdb_pipe_numconns_writing = 0;
int connsnum = 0;
connection **conns = zmalloc(sizeof(connection *) * listLength(server.replicas));
server.rdb_pipe_conns = NULL;
if (!dual_channel) {
server.rdb_pipe_conns = conns;
server.rdb_pipe_numconns = 0;
server.rdb_pipe_numconns_writing = 0;
}
/* Filter replica connections pending full sync (ie. in WAIT_BGSAVE_START state). */
listRewind(server.replicas, &li);
while ((ln = listNext(&li))) {
client *replica = ln->value;
if (replica->repl_state == REPLICA_STATE_WAIT_BGSAVE_START) {
/* Check replica has the exact requirements */
if (replica->replica_req != req) continue;
server.rdb_pipe_conns[server.rdb_pipe_numconns++] = replica->conn;

conns[connsnum++] = replica->conn;
if (dual_channel) {
/* Put the socket in blocking mode to simplify RDB transfer. */
connBlock(replica->conn);
connSendTimeout(replica->conn, server.repl_timeout * 1000);
/* This replica uses diskless dual channel sync, hence we need
* to inform it with the save end offset.*/
sendCurrentOffsetToReplica(replica);
/* Make sure repl traffic is appended to the replication backlog */
addRdbReplicaToPsyncWait(replica);
} else {
server.rdb_pipe_numconns++;
}
replicationSetupReplicaForFullResync(replica, getPsyncInitialOffset());
}
}
Expand All @@ -3553,12 +3578,15 @@ int rdbSaveToReplicasSockets(int req, rdbSaveInfo *rsi) {
/* Child */
int retval, dummy;
rio rdb;

rioInitWithFd(&rdb, rdb_pipe_write);
if (dual_channel) {
rioInitWithConnset(&rdb, conns, connsnum);
} else {
rioInitWithFd(&rdb, rdb_pipe_write);
}

/* Close the reading part, so that if the parent crashes, the child will
* get a write error and exit. */
close(server.rdb_pipe_read);
if (!dual_channel) close(server.rdb_pipe_read);
if (strstr(server.exec_argv[0], "redis-server") != NULL) {
serverSetProcTitle("redis-rdb-to-slaves");
} else {
Expand All @@ -3572,14 +3600,18 @@ int rdbSaveToReplicasSockets(int req, rdbSaveInfo *rsi) {
if (retval == C_OK) {
sendChildCowInfo(CHILD_INFO_TYPE_RDB_COW_SIZE, "RDB");
}

rioFreeFd(&rdb);
if (dual_channel) {
rioFreeConnset(&rdb);
} else {
rioFreeFd(&rdb);
close(rdb_pipe_write);
}
zfree(conns);
/* wake up the reader, tell it we're done. */
close(rdb_pipe_write);
close(server.rdb_child_exit_pipe); /* close write end so that we can detect the close on the parent. */
/* hold exit until the parent tells us it's safe. we're not expecting
* to read anything, just get the error when the pipe is closed. */
dummy = read(safe_to_exit_pipe, pipefds, 1);
if (!dual_channel) dummy = read(safe_to_exit_pipe, pipefds, 1);
UNUSED(dummy);
exitFromChild((retval == C_OK) ? 0 : 1);
} else {
Expand All @@ -3597,23 +3629,36 @@ int rdbSaveToReplicasSockets(int req, rdbSaveInfo *rsi) {
replica->repl_state = REPLICA_STATE_WAIT_BGSAVE_START;
}
}
close(rdb_pipe_write);
close(server.rdb_pipe_read);
if (!dual_channel) {
close(rdb_pipe_write);
close(server.rdb_pipe_read);
}
close(server.rdb_child_exit_pipe);
zfree(server.rdb_pipe_conns);
server.rdb_pipe_conns = NULL;
server.rdb_pipe_numconns = 0;
server.rdb_pipe_numconns_writing = 0;
zfree(conns);
if (dual_channel) {
closeChildInfoPipe();
} else {
server.rdb_pipe_conns = NULL;
server.rdb_pipe_numconns = 0;
server.rdb_pipe_numconns_writing = 0;
}
} else {
serverLog(LL_NOTICE, "Background RDB transfer started by pid %ld", (long)childpid);
serverLog(LL_NOTICE, "Background RDB transfer started by pid %ld to %s", (long)childpid,
dual_channel ? "direct socket to replica" : "pipe through parent process");
server.rdb_save_time_start = time(NULL);
server.rdb_child_type = RDB_CHILD_TYPE_SOCKET;
close(rdb_pipe_write); /* close write in parent so that it can detect the close on the child. */
if (aeCreateFileEvent(server.el, server.rdb_pipe_read, AE_READABLE, rdbPipeReadHandler, NULL) == AE_ERR) {
serverPanic("Unrecoverable error creating server.rdb_pipe_read file event.");
if (dual_channel) {
/* For dual channel sync, the main process no longer requires these RDB connections. */
zfree(conns);
} else {
close(rdb_pipe_write); /* close write in parent so that it can detect the close on the child. */
if (aeCreateFileEvent(server.el, server.rdb_pipe_read, AE_READABLE, rdbPipeReadHandler, NULL) ==
AE_ERR) {
serverPanic("Unrecoverable error creating server.rdb_pipe_read file event.");
}
}
}
close(safe_to_exit_pipe);
if (!dual_channel) close(safe_to_exit_pipe);
return (childpid == -1) ? C_ERR : C_OK;
}
return C_OK; /* Unreached. */
Expand Down
Loading

0 comments on commit ff6b780

Please sign in to comment.