Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support for Diskless Replication #997

Draft
wants to merge 32 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
e92d8c1
expose diskless replication parameters
vazois Jan 25, 2025
1bd4927
refactor/cleanup legacy ReplicaSyncSession
vazois Jan 25, 2025
1131c57
add interface to support diskless replication session and aof tasks
vazois Jan 25, 2025
ea8f8bb
core diskless replication implementation
vazois Jan 25, 2025
e3f35ed
expose diskless replication API
vazois Jan 25, 2025
ea1dfdd
adding test for diskless replication
vazois Jan 25, 2025
dbfa67b
update gcs extension to clearly mark logging progress
vazois Jan 27, 2025
112a197
fix gcs dispose on diskless attach, call dispose of replicationSyncMa…
vazois Jan 27, 2025
a20ac46
complete first diskless replication test
vazois Jan 27, 2025
471c73c
fix iterator check for null when empty store
vazois Jan 27, 2025
9ecb023
fix iterator for object store cluster sync
vazois Jan 28, 2025
2e59fb2
add simple diskless sync test
vazois Jan 28, 2025
a6e6036
cleanup code
vazois Jan 28, 2025
288de60
replica fall behind test
vazois Jan 29, 2025
99f9ce6
wip
vazois Jan 29, 2025
2dadafe
register cts at wait for sync completion
vazois Jan 29, 2025
670b995
add db version alignment test
vazois Jan 29, 2025
51d2168
avoid using close lock for leader based syncing
vazois Jan 30, 2025
8df1dd2
truncate AOF after streaming checkpoint is taken
vazois Jan 31, 2025
e3ef1b1
add tests for failover with diskless replication
vazois Jan 31, 2025
2d91745
fix formatting and conversion to IPEndpoint
vazois Feb 4, 2025
456e455
fix RepCommandsTests
vazois Feb 5, 2025
a0807a8
dispose aofSyncTask if failed to add to AofSyncTaskStore
vazois Feb 5, 2025
8575064
overload dispose ReplicaSyncSession
vazois Feb 5, 2025
7195c2d
explicitly dispose gcs used for full sync at replicaSyncSession sync
vazois Feb 5, 2025
bdee03c
dispose gcs once on return
vazois Feb 5, 2025
5df44b7
code cleanup
vazois Feb 6, 2025
e2be1b7
update tests to provide more context logging
vazois Feb 6, 2025
0a366fd
add more comprehensive logging of syncMetadata
vazois Feb 6, 2025
dcf9138
add timeout for streaming checkpoint
vazois Feb 6, 2025
393d8fa
add clusterTimeout for diskless repl tests
vazois Feb 6, 2025
b802961
some more logging
vazois Feb 7, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 10 additions & 3 deletions libs/client/ClientSession/GarnetClientSessionIncremental.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,15 @@

namespace Garnet.client
{
enum IncrementalSendType : byte
{
MIGRATE,
SYNC
}

public sealed unsafe partial class GarnetClientSession : IServerHook, IMessageConsumer
{
IncrementalSendType ist;
bool isMainStore;
byte* curr, head;
int keyValuePairCount;
Expand Down Expand Up @@ -183,9 +190,9 @@ private void TrackIterationProgress(int keyCount, int size, bool completed = fal
var duration = TimeSpan.FromTicks(Stopwatch.GetTimestamp() - lastLog);
if (completed || lastLog == 0 || duration >= iterationProgressFreq)
{
logger?.LogTrace("[{op}]: isMainStore:({storeType}) totalKeyCount:({totalKeyCount}), totalPayloadSize:({totalPayloadSize} KB)",
completed ? "COMPLETED" : "MIGRATING",
isMainStore,
logger?.LogTrace("[{op}]: store:({storeType}) totalKeyCount:({totalKeyCount}), totalPayloadSize:({totalPayloadSize} KB)",
completed ? "COMPLETED" : ist,
isMainStore ? "MAIN STORE" : "OBJECT STORE",
totalKeyCount.ToString("N0"),
((long)((double)totalPayloadSize / 1024)).ToString("N0"));
lastLog = Stopwatch.GetTimestamp();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@ public void SetClusterMigrateHeader(string sourceNodeId, bool replace, bool isMa
tcsQueue.Enqueue(currTcsIterationTask);
curr = offset;
this.isMainStore = isMainStore;
this.ist = IncrementalSendType.MIGRATE;
var storeType = isMainStore ? MAIN_STORE : OBJECT_STORE;
var replaceOption = replace ? T : F;

Expand Down
115 changes: 115 additions & 0 deletions libs/client/ClientSession/GarnetClientSessionReplicationExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ public sealed unsafe partial class GarnetClientSession : IServerHook, IMessageCo
static ReadOnlySpan<byte> send_ckpt_metadata => "SEND_CKPT_METADATA"u8;
static ReadOnlySpan<byte> send_ckpt_file_segment => "SEND_CKPT_FILE_SEGMENT"u8;
static ReadOnlySpan<byte> begin_replica_recover => "BEGIN_REPLICA_RECOVER"u8;
static ReadOnlySpan<byte> attach_sync => "ATTACH_SYNC"u8;
static ReadOnlySpan<byte> sync => "SYNC"u8;

/// <summary>
/// Initiate checkpoint retrieval from replica by sending replica checkpoint information and AOF address range
Expand Down Expand Up @@ -352,5 +354,118 @@ public Task<string> ExecuteBeginReplicaRecover(bool sendStoreCheckpoint, bool se
Interlocked.Increment(ref numCommands);
return tcs.Task;
}

/// <summary>
/// Initiate attach from replica
/// </summary>
/// <param name="syncMetadata"></param>
/// <returns></returns>
public Task<string> ExecuteAttachSync(byte[] syncMetadata)
{
var tcs = new TaskCompletionSource<string>(TaskCreationOptions.RunContinuationsAsynchronously);
tcsQueue.Enqueue(tcs);
byte* curr = offset;
int arraySize = 3;

while (!RespWriteUtils.TryWriteArrayLength(arraySize, ref curr, end))
{
Flush();
curr = offset;
}
offset = curr;

//1
while (!RespWriteUtils.TryWriteDirect(CLUSTER, ref curr, end))
{
Flush();
curr = offset;
}
offset = curr;

//2
while (!RespWriteUtils.TryWriteBulkString(attach_sync, ref curr, end))
{
Flush();
curr = offset;
}
offset = curr;

//3
while (!RespWriteUtils.TryWriteBulkString(syncMetadata, ref curr, end))
{
Flush();
curr = offset;
}
offset = curr;

Flush();
Interlocked.Increment(ref numCommands);
return tcs.Task;
}

/// <summary>
/// Set CLUSTER SYNC header info
/// </summary>
/// <param name="sourceNodeId"></param>
/// <param name="isMainStore"></param>
public void SetClusterSyncHeader(string sourceNodeId, bool isMainStore)
{
currTcsIterationTask = new TaskCompletionSource<string>(TaskCreationOptions.RunContinuationsAsynchronously);
tcsQueue.Enqueue(currTcsIterationTask);
curr = offset;
this.isMainStore = isMainStore;
this.ist = IncrementalSendType.SYNC;
var storeType = isMainStore ? MAIN_STORE : OBJECT_STORE;

var arraySize = 5;
while (!RespWriteUtils.TryWriteArrayLength(arraySize, ref curr, end))
{
Flush();
curr = offset;
}
offset = curr;

// 1
while (!RespWriteUtils.TryWriteDirect(CLUSTER, ref curr, end))
{
Flush();
curr = offset;
}
offset = curr;

// 2
while (!RespWriteUtils.TryWriteBulkString(sync, ref curr, end))
{
Flush();
curr = offset;
}
offset = curr;

// 3
while (!RespWriteUtils.TryWriteAsciiBulkString(sourceNodeId, ref curr, end))
{
Flush();
curr = offset;
}
offset = curr;

// 4
while (!RespWriteUtils.TryWriteBulkString(storeType, ref curr, end))
{
Flush();
curr = offset;
}
offset = curr;

// 5
// Reserve space for the bulk string header + final newline
while (ExtraSpace + 2 > (int)(end - curr))
{
Flush();
curr = offset;
}
head = curr;
curr += ExtraSpace;
}
}
}
1 change: 1 addition & 0 deletions libs/cluster/Server/Replication/CheckpointEntry.cs
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ public byte[] ToByteArray()

public static CheckpointEntry FromByteArray(byte[] serialized)
{
if (serialized.Length == 0) return null;
var ms = new MemoryStream(serialized);
var reader = new BinaryReader(ms);
var cEntry = new CheckpointEntry
Expand Down
21 changes: 1 addition & 20 deletions libs/cluster/Server/Replication/PrimaryOps/AofSyncTaskInfo.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
using System.Threading;
using System.Threading.Tasks;
using Garnet.client;
using Garnet.common;
using Microsoft.Extensions.Logging;
using Tsavorite.core;

Expand All @@ -24,11 +23,6 @@ internal sealed class AofSyncTaskInfo : IBulkLogEntryConsumer, IDisposable
readonly long startAddress;
public long previousAddress;

/// <summary>
/// Used to mark if syncing is in progress
/// </summary>
SingleWriterMultiReaderLock aofSyncInProgress;

/// <summary>
/// Check if client connection is healthy
/// </summary>
Expand Down Expand Up @@ -69,11 +63,6 @@ public void Dispose()

// Finally, dispose the cts
cts?.Dispose();

// Dispose only if AOF sync has not started
// otherwise sync task will dispose the client
if (aofSyncInProgress.TryWriteLock())
garnetClient?.Dispose();
}

public unsafe void Consume(byte* payloadPtr, int payloadLength, long currentAddress, long nextAddress, bool isProtected)
Expand Down Expand Up @@ -108,16 +97,8 @@ public async Task ReplicaSyncTask()
{
logger?.LogInformation("Starting ReplicationManager.ReplicaSyncTask for remote node {remoteNodeId} starting from address {address}", remoteNodeId, startAddress);

var failedToStart = false;
try
{
if (!aofSyncInProgress.TryWriteLock())
{
logger?.LogWarning("{method} AOF sync for {remoteNodeId} failed to start", nameof(ReplicaSyncTask), remoteNodeId);
failedToStart = true;
return;
}

if (!IsConnected) garnetClient.Connect();

iter = clusterProvider.storeWrapper.appendOnlyFile.ScanSingle(startAddress, long.MaxValue, scanUncommitted: true, recover: false, logger: logger);
Expand All @@ -134,7 +115,7 @@ public async Task ReplicaSyncTask()
}
finally
{
if (!failedToStart) garnetClient.Dispose();
garnetClient.Dispose();
var (address, port) = clusterProvider.clusterManager.CurrentConfig.GetWorkerAddressFromNodeId(remoteNodeId);
logger?.LogWarning("AofSync task terminated; client disposed {remoteNodeId} {address} {port} {currentAddress}", remoteNodeId, address, port, previousAddress);

Expand Down
108 changes: 108 additions & 0 deletions libs/cluster/Server/Replication/PrimaryOps/AofTaskStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,114 @@ public bool TryAddReplicationTask(string remoteNodeId, long startAddress, out Ao
return success;
}

public bool TryAddReplicationTasks(ReplicaSyncSession[] replicaSyncSessions, long startAddress)
{
var current = clusterProvider.clusterManager.CurrentConfig;
var success = true;
if (startAddress == 0) startAddress = ReplicationManager.kFirstValidAofAddress;

// First iterate through all sync sessions and add an AOF sync task
// All tasks will be
foreach (var rss in replicaSyncSessions)
{
if (rss == null) continue;
var replicaNodeId = rss.replicaSyncMetadata.originNodeId;
var (address, port) = current.GetWorkerAddressFromNodeId(replicaNodeId);

try
{
rss.AddAofSyncTask(new AofSyncTaskInfo(
clusterProvider,
this,
current.LocalNodeId,
replicaNodeId,
new GarnetClientSession(
new IPEndPoint(IPAddress.Parse(address), port),
clusterProvider.replicationManager.GetAofSyncNetworkBufferSettings,
clusterProvider.replicationManager.GetNetworkPool,
tlsOptions: clusterProvider.serverOptions.TlsOptions?.TlsClientOptions,
authUsername: clusterProvider.ClusterUsername,
authPassword: clusterProvider.ClusterPassword,
logger: logger),
startAddress,
logger));
}
catch (Exception ex)
{
logger?.LogWarning(ex, "{method} creating AOF sync task for {replicaNodeId} failed", nameof(TryAddReplicationTasks), replicaNodeId);
return false;
}
}

_lock.WriteLock();
try
{
if (_disposed) return false;

// Fail adding the task if truncation has happened
if (startAddress < TruncatedUntil)
{
logger?.LogWarning("{method} failed to add tasks for AOF sync {startAddress} {truncatedUntil}", nameof(TryAddReplicationTasks), startAddress, TruncatedUntil);
return false;
}

foreach (var rss in replicaSyncSessions)
{
if (rss == null) continue;

var added = false;
// Find if AOF sync task already exists
for (var i = 0; i < numTasks; i++)
{
var t = tasks[i];
Debug.Assert(t != null);
if (t.remoteNodeId == rss.replicaNodeId)
{
tasks[i] = rss.AofSyncTask;
t.Dispose();
added = true;
break;
}
}

if (added) continue;

// If AOF sync task did not exist and was not added we added below
// Check if array can hold a new AOF sync task
if (numTasks == tasks.Length)
{
var old_tasks = tasks;
var _tasks = new AofSyncTaskInfo[tasks.Length * 2];
Array.Copy(tasks, _tasks, tasks.Length);
tasks = _tasks;
Array.Clear(old_tasks);
}
// Add new AOF sync task
tasks[numTasks++] = rss.AofSyncTask;
}

success = true;
}
finally
{
_lock.WriteUnlock();

if (!success)
{
foreach (var rss in replicaSyncSessions)
{
if (rss == null) continue;
if (rss.AofSyncTask != null)
{
rss.AofSyncTask.Dispose();
}
}
}
}

return true;
}

public bool TryRemove(AofSyncTaskInfo aofSyncTask)
{
// Lock addition of new tasks
Expand Down
Loading