Skip to content

Commit

Permalink
Replication Checkpoint Fix (#248)
Browse files Browse the repository at this point in the history
* code cleanup

* separate recovered repl_id from current_replid

* renaming recovered and current replication-id

* update replication-id on failover for ReplicationLogCheckpointManager

* move more inline errors to CmdStrings

---------

Co-authored-by: Badrish Chandramouli <badrishc@microsoft.com>
  • Loading branch information
vazois and badrishc authored Apr 9, 2024
1 parent 765e614 commit 5d394d3
Show file tree
Hide file tree
Showing 24 changed files with 543 additions and 502 deletions.
11 changes: 9 additions & 2 deletions libs/cluster/CmdStrings.cs
Original file line number Diff line number Diff line change
Expand Up @@ -107,15 +107,19 @@ static class CmdStrings
public static ReadOnlySpan<byte> RESP_ERR_GENERIC_CLUSTER => "ERR This instance has cluster support disabled"u8;
public static ReadOnlySpan<byte> RESP_ERR_GENERIC_SLOT_OUT_OFF_RANGE => "ERR Slot out of range"u8;
public static ReadOnlySpan<byte> RESP_ERR_GENERIC_CONFIG_UPDATE => "ERR Updating the config epoch"u8;
public static ReadOnlySpan<byte> RESP_ERR_GENERIC_CONFIG_EPOCH_ASSIGNMENT => "ERR The user can assign a config epoch only when the node does not know any other node."u8;
public static ReadOnlySpan<byte> RESP_ERR_GENERIC_REPLICATION_AOF_TURNEDOFF => "ERR Replica AOF is switched off. Replication unavailable. Please restart replica with --aof option."u8;
public static ReadOnlySpan<byte> RESP_ERR_GENERIC_CONFIG_EPOCH_ASSIGNMENT => "ERR The user can assign a config epoch only when the node does not know any other node"u8;
public static ReadOnlySpan<byte> RESP_ERR_GENERIC_REPLICATION_AOF_TURNEDOFF => "ERR Replication unaivalable because AOF is switched off, please restart replica with --aof option"u8;
public static ReadOnlySpan<byte> RESP_ERR_GENERIC_SLOTSTATE_TRANSITION => "ERR Slot already in that state"u8;
public static ReadOnlySpan<byte> RESP_ERR_GENERIC_CANNOT_FORGET_MYSELF => "ERR I tried hard but I can't forget myself"u8;
public static ReadOnlySpan<byte> RESP_ERR_GENERIC_CANNOT_FORGET_MY_PRIMARY => "ERR Can't forget my primary"u8;
public static ReadOnlySpan<byte> RESP_ERR_GENERIC_CANNOT_FAILOVER_FROM_NON_MASTER => "ERR Cannot failover a non-master node"u8;
public static ReadOnlySpan<byte> RESP_ERR_GENERIC_UNKNOWN_ENDPOINT => "ERR Unknown endpoint"u8;
public static ReadOnlySpan<byte> RESP_ERR_GENERIC_CANNOT_MAKE_REPLICA_WITH_ASSIGNED_SLOTS => "ERR Primary has been assigned slots and cannot be a replica"u8;
public static ReadOnlySpan<byte> RESP_ERR_GENERIC_CANNOT_REPLICATE_SELF => "ERR Can't replicate myself"u8;
public static ReadOnlySpan<byte> RESP_ERR_GENERIC_NOT_ASSIGNED_PRIMARY_ERROR => "ERR Don't have primary"u8;
public static ReadOnlySpan<byte> RESP_ERR_GENERIC_WORKERS_NOT_INITIALIZED => "ERR workers not initialized"u8;
public static ReadOnlySpan<byte> RESP_ERR_GENERIC_CONFIG_EPOCH_NOT_SET => "ERR Node config epoch was not set due to invalid epoch specified"u8;
public static ReadOnlySpan<byte> RESP_ERR_GENERIC_NOT_IN_IMPORTING_STATE => "ERR Node not in IMPORTING state"u8;

/// <summary>
/// Generic error response strings for <c>MIGRATE</c> command
Expand All @@ -131,5 +135,8 @@ static class CmdStrings
public static ReadOnlySpan<byte> RESP_ERR_CROSSLOT => "CROSSSLOT Keys in request do not hash to the same slot"u8;
public static ReadOnlySpan<byte> RESP_ERR_CLUSTERDOWN => "CLUSTERDOWN Hash slot not served"u8;
public static ReadOnlySpan<byte> RESP_ERR_MIGRATING => "MIGRATING"u8;
public static ReadOnlySpan<byte> RESP_ERR_CREATE_SYNC_SESSION_ERROR => "PRIMARY-ERR Failed creating replica sync session task"u8;
public static ReadOnlySpan<byte> RESP_ERR_RETRIEVE_SYNC_SESSION_ERROR => "PRIMARY-ERR Failed retrieving replica sync session"u8;
public static ReadOnlySpan<byte> RESP_ERR_IOERR => "IOERR Migrate keys failed"u8;
}
}
20 changes: 5 additions & 15 deletions libs/cluster/Server/ClusterManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@
// Licensed under the MIT license.

using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
using Garnet.common;
using Garnet.server;
Expand All @@ -21,12 +19,6 @@ internal sealed partial class ClusterManager : IDisposable
ClusterConfig currentConfig;
readonly IDevice clusterConfigDevice;
readonly SectorAlignedBufferPool pool;

/// <summary>
/// Replication manager - needs to be set after instantiation, hence made public
/// </summary>
public ReplicationManager replicationManager;

readonly ILogger logger;

/// <summary>
Expand All @@ -47,9 +39,7 @@ internal sealed partial class ClusterManager : IDisposable
/// <summary>
/// Constructor
/// </summary>
public unsafe ClusterManager(
ClusterProvider clusterProvider,
ILoggerFactory loggerFactory = null)
public unsafe ClusterManager(ClusterProvider clusterProvider, ILogger logger = null)
{
this.clusterProvider = clusterProvider;
var opts = clusterProvider.serverOptions;
Expand All @@ -61,7 +51,7 @@ public unsafe ClusterManager(
pool = new(1, (int)clusterConfigDevice.SectorSize);

var address = opts.Address ?? StoreWrapper.GetIp();
logger = loggerFactory?.CreateLogger($"ClusterManager-{address}:{opts.Port}");
this.logger = logger;
var recoverConfig = clusterConfigDevice.GetFileSize(0) > 0 && !opts.CleanClusterConfig;

tlsOptions = opts.TlsOptions;
Expand Down Expand Up @@ -231,14 +221,14 @@ public bool TrySetLocalConfigEpoch(long configEpoch, out ReadOnlySpan<byte> erro
var current = currentConfig;
if (current.NumWorkers == 0)
{
errorMessage = "ERR workers not initialized."u8;
errorMessage = CmdStrings.RESP_ERR_GENERIC_WORKERS_NOT_INITIALIZED;
return false;
}

var newConfig = currentConfig.SetLocalWorkerConfigEpoch(configEpoch);
if (newConfig == null)
{
errorMessage = "ERR Node config epoch was not set due to invalid epoch specified."u8;
errorMessage = CmdStrings.RESP_ERR_GENERIC_CONFIG_EPOCH_NOT_SET;
return false;
}

Expand Down Expand Up @@ -294,7 +284,7 @@ public void TryResetReplica()
if (Interlocked.CompareExchange(ref currentConfig, newConfig, current) == current)
break;
}
replicationManager.Reset();
clusterProvider.replicationManager.Reset();
FlushConfig();
}

Expand Down
121 changes: 41 additions & 80 deletions libs/cluster/Server/ClusterProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// Licensed under the MIT license.

using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Threading;
using Garnet.common;
Expand Down Expand Up @@ -31,7 +32,6 @@ public class ClusterProvider : IClusterProvider
internal long GarnetCurrentEpoch = 1;
ClusterAuthContainer authContainer;


/// <summary>
/// Get cluster username
/// </summary>
Expand All @@ -57,18 +57,16 @@ public ClusterProvider(StoreWrapper storeWrapper)
ClusterPassword = serverOptions.ClusterPassword
};

if (serverOptions.GossipSamplePercent > 100 || serverOptions.GossipSamplePercent < 0)
if (serverOptions.GossipSamplePercent is > 100 or < 0)
{
throw new Exception("Gossip sample fraction should be in range [0,100]");
}

this.clusterManager = NewClusterManagerInstance(serverOptions, loggerFactory);
this.replicationManager = NewReplicationManagerInstance(serverOptions, this, loggerFactory);
// Now set replication manager field in cluster manager, to break circular dependency
if (clusterManager != null) clusterManager.replicationManager = replicationManager;
this.clusterManager = new ClusterManager(this, logger: loggerFactory?.CreateLogger("ClusterManager"));
this.replicationManager = new ReplicationManager(this, logger: loggerFactory?.CreateLogger("ReplicationManager"));

this.failoverManager = NewFailoverManagerInstance(serverOptions, this, loggerFactory);
this.migrationManager = NewMigrationManagerInstance(this, loggerFactory);
this.failoverManager = new FailoverManager(this, logger: loggerFactory?.CreateLogger("FailoverManager"));
this.migrationManager = new MigrationManager(this, logger: loggerFactory?.CreateLogger("MigrationManager"));
}

/// <inheritdoc />
Expand Down Expand Up @@ -134,9 +132,9 @@ public void FlushDB(bool unsafeTruncateLog = false)
/// <inheritdoc />
public void SafeTruncateAOF(StoreType storeType, bool full, long CheckpointCoveredAofAddress, Guid storeCheckpointToken, Guid objectStoreCheckpointToken)
{
CheckpointEntry entry = new CheckpointEntry();
var entry = new CheckpointEntry();

if (storeType == StoreType.Main || storeType == StoreType.All)
if (storeType is StoreType.Main or StoreType.All)
{
entry.storeVersion = storeWrapper.store.CurrentVersion;
entry.storeHlogToken = storeCheckpointToken;
Expand All @@ -145,7 +143,7 @@ public void SafeTruncateAOF(StoreType storeType, bool full, long CheckpointCover
entry.storePrimaryReplId = replicationManager.PrimaryReplId;
}

if (storeType == StoreType.Object || storeType == StoreType.All)
if (storeType is StoreType.Object or StoreType.All)
{
entry.objectStoreVersion = serverOptions.DisableObjects ? -1 : storeWrapper.objectStore.CurrentVersion;
entry.objectStoreHlogToken = serverOptions.DisableObjects ? default : objectStoreCheckpointToken;
Expand All @@ -159,7 +157,7 @@ public void SafeTruncateAOF(StoreType storeType, bool full, long CheckpointCover
replicationManager.AddCheckpointEntry(entry, storeType, full);

if (clusterManager.CurrentConfig.GetLocalNodeRole() == NodeRole.PRIMARY)
replicationManager.SafeTruncateAof(CheckpointCoveredAofAddress);
_ = replicationManager.SafeTruncateAof(CheckpointCoveredAofAddress);
else
{
if (serverOptions.MainMemoryReplication)
Expand All @@ -182,63 +180,59 @@ public void OnCheckpointInitiated(out long CheckpointCoveredAofAddress)
CheckpointCoveredAofAddress = storeWrapper.appendOnlyFile.TailAddress;

replicationManager?.UpdateCommitSafeAofAddress(CheckpointCoveredAofAddress);
replicationManager?.SetPrimaryReplicationId();
}

/// <inheritdoc />
public MetricsItem[] GetReplicationInfo()
{
bool clusterEnabled = serverOptions.EnableCluster;
ClusterConfig config = clusterEnabled ? clusterManager.CurrentConfig : null;
var clusterEnabled = serverOptions.EnableCluster;
var config = clusterEnabled ? clusterManager.CurrentConfig : null;
var replicaInfo = clusterEnabled ? replicationManager.GetReplicaInfo() : null;
int replicaCount = clusterEnabled ? replicaInfo.Count : 0;
var role = clusterEnabled ? config.GetLocalNodeRole() : NodeRole.PRIMARY;
int commonInfoCount = 11;
int replicaInfoCount = 10;
int replicationInfoCount = commonInfoCount + replicaCount;
replicationInfoCount += role == NodeRole.REPLICA ? replicaInfoCount : 0;
var replication_offset = !clusterEnabled ? "N/A" : replicationManager.ReplicationOffset.ToString();
var replication_offset2 = !clusterEnabled ? "N/A" : replicationManager.ReplicationOffset2.ToString();

var replicationInfo = new MetricsItem[replicationInfoCount];
replicationInfo[0] = (new("role", NodeRole.PRIMARY == role ? "master" : "slave"));
replicationInfo[1] = (new("connected_slaves", !clusterEnabled ? "0" : replicationManager.ConnectedReplicasCount.ToString()));
replicationInfo[2] = (new("master_failover_state", !clusterEnabled ? FailoverUtils.GetFailoverStatus(FailoverStatus.NO_FAILOVER) : failoverManager.GetFailoverStatus()));
var replicationInfo = new List<MetricsItem>()
{
new("role", NodeRole.PRIMARY == role ? "master" : "slave"),
new("connected_slaves", !clusterEnabled ? "0" : replicationManager.ConnectedReplicasCount.ToString()),
new("master_failover_state", !clusterEnabled ? FailoverUtils.GetFailoverStatus(FailoverStatus.NO_FAILOVER) : failoverManager.GetFailoverStatus()),
new("master_replid", clusterEnabled ? replicationManager.PrimaryReplId : Generator.DefaultHexId()),
new("master_replid2", clusterEnabled ? replicationManager.PrimaryReplId2 : Generator.DefaultHexId()),
new("master_repl_offset", replication_offset),
new("second_repl_offset", replication_offset2),
new("store_current_safe_aof_address", clusterEnabled ? replicationManager.StoreCurrentSafeAofAddress.ToString() : "N/A"),
new("store_recovered_safe_aof_address", clusterEnabled ? replicationManager.StoreRecoveredSafeAofTailAddress.ToString() : "N/A"),
new("object_store_current_safe_aof_address", clusterEnabled && !serverOptions.DisableObjects ? replicationManager.ObjectStoreCurrentSafeAofAddress.ToString() : "N/A"),
new("object_store_recovered_safe_aof_address", clusterEnabled && !serverOptions.DisableObjects ? replicationManager.ObjectStoreRecoveredSafeAofTailAddress.ToString() : "N/A")

var replication_offset = !clusterEnabled ? "N/A" : replicationManager.ReplicationOffset.ToString();
replicationInfo[3] = (new("master_replid", clusterEnabled ? replicationManager.PrimaryReplId : Generator.DefaultHexId()));
replicationInfo[4] = (new("master_replid2", clusterEnabled ? replicationManager.PrimaryReplId2 : Generator.DefaultHexId()));
replicationInfo[5] = (new("master_repl_offset", replication_offset));
replicationInfo[6] = (new("second_repl_offset", replication_offset));
replicationInfo[7] = (new("store_current_safe_aof_address", clusterEnabled ? replicationManager.StoreCurrentSafeAofAddress.ToString() : "N/A"));
replicationInfo[8] = (new("store_recovered_safe_aof_address", clusterEnabled ? replicationManager.StoreRecoveredSafeAofTailAddress.ToString() : "N/A"));
replicationInfo[9] = (new("object_store_current_safe_aof_address", clusterEnabled && !serverOptions.DisableObjects ? replicationManager.ObjectStoreCurrentSafeAofAddress.ToString() : "N/A"));
replicationInfo[10] = (new("object_store_recovered_safe_aof_address", clusterEnabled && !serverOptions.DisableObjects ? replicationManager.ObjectStoreRecoveredSafeAofTailAddress.ToString() : "N/A"));
};

if (clusterEnabled)
{
if (role == NodeRole.REPLICA)
{
var (address, port) = config.GetLocalNodePrimaryAddress();
var primaryLinkStatus = clusterManager.GetPrimaryLinkStatus(config);
replicationInfo[commonInfoCount + 0] = new("master_host", address);
replicationInfo[commonInfoCount + 1] = new("master_port", port.ToString());
replicationInfo[commonInfoCount + 2] = primaryLinkStatus[0];
replicationInfo[commonInfoCount + 3] = primaryLinkStatus[1];
replicationInfo[commonInfoCount + 4] = new("master_sync_in_progress", replicationManager.recovering.ToString());
replicationInfo[commonInfoCount + 5] = new("slave_read_repl_offset", replication_offset);
replicationInfo[commonInfoCount + 6] = new("slave_priority", "100");
replicationInfo[commonInfoCount + 7] = new("slave_read_only", "1");
replicationInfo[commonInfoCount + 8] = new("replica_announced", "1");
replicationInfo[commonInfoCount + 9] = new("master_sync_last_io_seconds_ago", replicationManager.LastPrimarySyncSeconds.ToString());
replicationInfo.Add(new("master_host", address));
replicationInfo.Add(new("master_port", port.ToString()));
replicationInfo.Add(primaryLinkStatus[0]);
replicationInfo.Add(primaryLinkStatus[1]);
replicationInfo.Add(new("master_sync_in_progress", replicationManager.recovering.ToString()));
replicationInfo.Add(new("slave_read_repl_offset", replication_offset));
replicationInfo.Add(new("slave_priority", "100"));
replicationInfo.Add(new("slave_read_only", "1"));
replicationInfo.Add(new("replica_announced", "1"));
replicationInfo.Add(new("master_sync_last_io_seconds_ago", replicationManager.LastPrimarySyncSeconds.ToString()));
}
else
{
//replica0: ip=127.0.0.1,port=7001,state=online,offset=56,lag=0
int i = commonInfoCount;
// replica0: ip=127.0.0.1,port=7001,state=online,offset=56,lag=0
foreach (var ri in replicaInfo)
replicationInfo[i++] = new(ri.Item1, ri.Item2);
replicationInfo.Add(new(ri.Item1, ri.Item2));
}
}
return replicationInfo;
return [.. replicationInfo];
}

/// <inheritdoc />
Expand All @@ -260,10 +254,6 @@ public MetricsItem[] GetGossipStats(bool metricsDisabled)
];
}

/// <inheritdoc />
public DeviceLogCommitCheckpointManager CreateCheckpointManager(INamedDeviceFactory deviceFactory, ICheckpointNamingScheme checkpointNamingScheme, bool isMainStore)
=> new(deviceFactory, checkpointNamingScheme, isMainStore);

internal ReplicationLogCheckpointManager GetReplicationLogCheckpointManager(StoreType storeType)
{
Debug.Assert(serverOptions.EnableCluster);
Expand Down Expand Up @@ -304,34 +294,5 @@ internal bool WaitForConfigTransition()
}
return true;
}

ClusterManager NewClusterManagerInstance(GarnetServerOptions serverOptions, ILoggerFactory loggerFactory)
{
if (!serverOptions.EnableCluster)
return null;
return new ClusterManager(this, loggerFactory);
}

ReplicationManager NewReplicationManagerInstance(GarnetServerOptions serverOptions, ClusterProvider clusterProvider, ILoggerFactory loggerFactory)
{
if (!serverOptions.EnableCluster)
return null;
return new ReplicationManager(clusterProvider, opts: serverOptions, logger: loggerFactory?.CreateLogger("StoreWrapper"));
}

FailoverManager NewFailoverManagerInstance(GarnetServerOptions serverOptions, ClusterProvider clusterProvider, ILoggerFactory loggerFactory)
{
if (!serverOptions.EnableCluster)
return null;
var clusterTimeout = serverOptions.ClusterTimeout <= 0 ? Timeout.InfiniteTimeSpan : TimeSpan.FromSeconds(serverOptions.ClusterTimeout);
return new FailoverManager(clusterProvider, serverOptions, clusterTimeout, loggerFactory);
}

MigrationManager NewMigrationManagerInstance(ClusterProvider clusterProvider, ILoggerFactory loggerFactory)
{
if (!serverOptions.EnableCluster)
return null;
return new MigrationManager(clusterProvider, logger: loggerFactory?.CreateLogger("MigrationManager"));
}
}
}
Loading

0 comments on commit 5d394d3

Please sign in to comment.