Skip to content

Commit

Permalink
Guard against multiple recovery tasks (#403)
Browse files Browse the repository at this point in the history
* indirect transition to recovering state

* replace recovering with recoverLock to prohibit parallel recovering tasks

* check primaryId is not null when retrieving list of primary slots

* added logging level to garnet exception

* Ensure node is replica and has assigned primary when starting replica failover
Validate replica node configuration before transitioning to primary

* Handle config transition failure when taking over as new primary.
Acquire copy of config for failoversession.

* minor typo

* added new test multiple consecutive failovers

* fix typos

* pass clusterTimeout to gossipWithMeet

* remove unstable new test
  • Loading branch information
vazois authored May 30, 2024
1 parent 12e9656 commit caf6f59
Show file tree
Hide file tree
Showing 19 changed files with 196 additions and 105 deletions.
2 changes: 2 additions & 0 deletions libs/cluster/CmdStrings.cs
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,8 @@ static class CmdStrings
public static ReadOnlySpan<byte> RESP_ERR_GENERIC_CANNOT_FAILOVER_FROM_NON_MASTER => "ERR Cannot failover a non-master node"u8;
public static ReadOnlySpan<byte> RESP_ERR_GENERIC_UNKNOWN_ENDPOINT => "ERR Unknown endpoint"u8;
public static ReadOnlySpan<byte> RESP_ERR_GENERIC_CANNOT_MAKE_REPLICA_WITH_ASSIGNED_SLOTS => "ERR Primary has been assigned slots and cannot be a replica"u8;
public static ReadOnlySpan<byte> RESP_ERR_GENERIC_CANNOT_ACQUIRE_RECOVERY_LOCK => "ERR Recovery in progress, could not acquire recoverLock"u8;
public static ReadOnlySpan<byte> RESP_ERR_GENERIC_CANNOT_TAKEOVER_FROM_PRIMARY => "ERR Could not take over from primary"u8;
public static ReadOnlySpan<byte> RESP_ERR_GENERIC_CANNOT_REPLICATE_SELF => "ERR Can't replicate myself"u8;
public static ReadOnlySpan<byte> RESP_ERR_GENERIC_NOT_ASSIGNED_PRIMARY_ERROR => "ERR Don't have primary"u8;
public static ReadOnlySpan<byte> RESP_ERR_GENERIC_WORKERS_NOT_INITIALIZED => "ERR workers not initialized"u8;
Expand Down
16 changes: 10 additions & 6 deletions libs/cluster/Server/ClusterConfig.cs
Original file line number Diff line number Diff line change
Expand Up @@ -264,13 +264,17 @@ public bool IsKnown(string nodeid)
public List<int> GetLocalPrimarySlots()
{
var primaryId = LocalNodePrimaryId;
List<int> result = new();
for (int i = 0; i < MAX_HASH_SLOT_VALUE; i++)
List<int> slots = [];

if (primaryId != null)
{
if (workers[slotMap[i].workerId].Nodeid.Equals(primaryId, StringComparison.OrdinalIgnoreCase))
result.Add(i);
for (var i = 0; i < MAX_HASH_SLOT_VALUE; i++)
{
if (slotMap[i].workerId > 0 && workers[slotMap[i].workerId].Nodeid.Equals(primaryId, StringComparison.OrdinalIgnoreCase))
slots.Add(i);
}
}
return result;
return slots;
}

/// <summary>
Expand Down Expand Up @@ -1010,7 +1014,7 @@ public ClusterConfig TakeOverFromPrimary()
var slots = GetLocalPrimarySlots();
var newSlotMap = new HashSlot[MAX_HASH_SLOT_VALUE];
Array.Copy(slotMap, newSlotMap, slotMap.Length);
foreach (int slot in slots)
foreach (var slot in slots)
{
newSlotMap[slot]._workerId = 1;
newSlotMap[slot]._state = SlotState.STABLE;
Expand Down
11 changes: 7 additions & 4 deletions libs/cluster/Server/ClusterManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,6 @@ public void TryResetReplica()
if (Interlocked.CompareExchange(ref currentConfig, newConfig, current) == current)
break;
}
clusterProvider.replicationManager.Reset();
FlushConfig();
}

Expand All @@ -303,17 +302,21 @@ public void TryStopWrites(string replicaId)
/// <summary>
/// Takeover as new primary but forcefully claim ownership of old primary's slots.
/// </summary>
public void TryTakeOverForPrimary()
public bool TryTakeOverForPrimary()
{
while (true)
{
var current = currentConfig;
var newConfig = current.TakeOverFromPrimary();
newConfig = newConfig.BumpLocalNodeConfigEpoch();

if (!current.IsReplica || current.LocalNodePrimaryId == null)
return false;

var newConfig = current.TakeOverFromPrimary().BumpLocalNodeConfigEpoch();
if (Interlocked.CompareExchange(ref currentConfig, newConfig, current) == current)
break;
}
FlushConfig();
return true;
}
}
}
16 changes: 11 additions & 5 deletions libs/cluster/Server/ClusterManagerWorkerState.cs
Original file line number Diff line number Diff line change
Expand Up @@ -138,10 +138,9 @@ public ReadOnlySpan<byte> TryReset(bool soft, int expirySeconds = 60)
/// </summary>
/// <param name="nodeid"></param>
/// <param name="force">Check if node is clean (i.e. is PRIMARY without any assigned nodes)</param>
/// <param name="recovering"></param>
/// <param name="errorMessage">The ASCII encoded error response if the method returned <see langword="false"/>; otherwise <see langword="default"/></param>
/// <param name="logger"></param>
public bool TryAddReplica(string nodeid, bool force, ref bool recovering, out ReadOnlySpan<byte> errorMessage, ILogger logger = null)
public bool TryAddReplica(string nodeid, bool force, out ReadOnlySpan<byte> errorMessage, ILogger logger = null)
{
errorMessage = default;
while (true)
Expand All @@ -150,7 +149,7 @@ public bool TryAddReplica(string nodeid, bool force, ref bool recovering, out Re
if (current.LocalNodeId.Equals(nodeid, StringComparison.OrdinalIgnoreCase))
{
errorMessage = CmdStrings.RESP_ERR_GENERIC_CANNOT_REPLICATE_SELF;
logger?.LogError(Encoding.ASCII.GetString(errorMessage));
logger?.LogError($"{nameof(TryAddReplica)}: {{logMessage}}", Encoding.ASCII.GetString(errorMessage));
return false;
}

Expand All @@ -164,7 +163,7 @@ public bool TryAddReplica(string nodeid, bool force, ref bool recovering, out Re
if (!force && current.HasAssignedSlots(1))
{
errorMessage = CmdStrings.RESP_ERR_GENERIC_CANNOT_MAKE_REPLICA_WITH_ASSIGNED_SLOTS;
logger?.LogError(Encoding.ASCII.GetString(errorMessage));
logger?.LogError($"{nameof(TryAddReplica)}: {{logMessage}}", Encoding.ASCII.GetString(errorMessage));
return false;
}

Expand All @@ -183,7 +182,14 @@ public bool TryAddReplica(string nodeid, bool force, ref bool recovering, out Re
return false;
}

recovering = true;
// Transition to recovering state
if (!clusterProvider.replicationManager.StartRecovery())
{
logger?.LogError($"{nameof(TryAddReplica)}: {{logMessage}}", Encoding.ASCII.GetString(CmdStrings.RESP_ERR_GENERIC_CANNOT_ACQUIRE_RECOVERY_LOCK));
errorMessage = CmdStrings.RESP_ERR_GENERIC_CANNOT_ACQUIRE_RECOVERY_LOCK;
return false;
}

var newConfig = currentConfig.MakeReplicaOf(nodeid);
newConfig = newConfig.BumpLocalNodeConfigEpoch();
if (Interlocked.CompareExchange(ref currentConfig, newConfig, current) == current)
Expand Down
4 changes: 2 additions & 2 deletions libs/cluster/Server/ClusterProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ public void Dispose()

/// <inheritdoc />
public bool IsReplica()
=> clusterManager?.CurrentConfig.LocalNodeRole == NodeRole.REPLICA || replicationManager?.recovering == true;
=> clusterManager?.CurrentConfig.LocalNodeRole == NodeRole.REPLICA || replicationManager?.Recovering == true;

/// <inheritdoc />
public void ResetGossipStats()
Expand Down Expand Up @@ -218,7 +218,7 @@ public MetricsItem[] GetReplicationInfo()
replicationInfo.Add(new("master_port", port.ToString()));
replicationInfo.Add(primaryLinkStatus[0]);
replicationInfo.Add(primaryLinkStatus[1]);
replicationInfo.Add(new("master_sync_in_progress", replicationManager.recovering.ToString()));
replicationInfo.Add(new("master_sync_in_progress", replicationManager.Recovering.ToString()));
replicationInfo.Add(new("slave_read_repl_offset", replication_offset));
replicationInfo.Add(new("slave_priority", "100"));
replicationInfo.Add(new("slave_read_only", "1"));
Expand Down
9 changes: 5 additions & 4 deletions libs/cluster/Server/Failover/FailoverSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ internal sealed partial class FailoverSession : IDisposable

public bool FailoverTimeout => failoverDeadline < DateTime.UtcNow;

readonly ClusterConfig currentConfig;
readonly ClusterConfig oldConfig;

/// <summary>
/// FailoverSession constructor
Expand All @@ -52,15 +52,16 @@ public FailoverSession(
this.clusterTimeout = clusterTimeout;
this.option = option;
this.logger = logger;
currentConfig = clusterProvider.clusterManager.CurrentConfig;
oldConfig = clusterProvider.clusterManager.CurrentConfig.Copy();
cts = new();

// TODO: move connection initialization at start of async primary failover
// Initialize connections only when failover is initiated by the primary
if (!isReplicaSession)
{
var endpoints = hostPort == -1
? currentConfig.GetLocalNodePrimaryEndpoints(includeMyPrimaryFirst: true)
: hostPort == 0 ? currentConfig.GetLocalNodeReplicaEndpoints() : null;
? oldConfig.GetLocalNodePrimaryEndpoints(includeMyPrimaryFirst: true)
: hostPort == 0 ? oldConfig.GetLocalNodeReplicaEndpoints() : null;
clients = endpoints != null ? new GarnetClient[endpoints.Count] : new GarnetClient[1];

if (clients.Length > 1)
Expand Down
78 changes: 52 additions & 26 deletions libs/cluster/Server/Failover/ReplicaFailoverSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,11 @@ namespace Garnet.cluster
{
internal sealed partial class FailoverSession : IDisposable
{
/// <summary>
/// Connection to primary if reachable
/// </summary>
GarnetClient primaryClient = null;

/// <summary>
/// Set to true to re-use established gossip connections for failover.
/// Note connection might abruptly close due to timeout.
Expand All @@ -33,7 +38,7 @@ private GarnetClient GetOrAddConnection(string nodeId)
// If connection not available try to initialize it
if (gsn == null)
{
var (address, port) = currentConfig.GetEndpointFromNodeId(nodeId);
var (address, port) = oldConfig.GetEndpointFromNodeId(nodeId);
gsn = new GarnetServerNode(
clusterProvider,
address,
Expand Down Expand Up @@ -67,7 +72,7 @@ private GarnetClient GetOrAddConnection(string nodeId)
/// <returns></returns>
private GarnetClient CreateConnection(string nodeId)
{
var (address, port) = currentConfig.GetEndpointFromNodeId(nodeId);
var (address, port) = oldConfig.GetEndpointFromNodeId(nodeId);
var client = new GarnetClient(
address,
port,
Expand Down Expand Up @@ -107,7 +112,7 @@ private GarnetClient GetConnection(string nodeId)
/// <returns>True on success, false otherwise</returns>
private async Task<bool> PauseWritesAndWaitForSync()
{
var primaryId = currentConfig.LocalNodePrimaryId;
var primaryId = oldConfig.LocalNodePrimaryId;
var client = GetConnection(primaryId);
try
{
Expand All @@ -117,9 +122,12 @@ private async Task<bool> PauseWritesAndWaitForSync()
return false;
}

// Cache connection for use with next operations
primaryClient = client;

// Issue stop writes to the primary
status = FailoverStatus.ISSUING_PAUSE_WRITES;
var localIdBytes = Encoding.ASCII.GetBytes(currentConfig.LocalNodeId);
var localIdBytes = Encoding.ASCII.GetBytes(oldConfig.LocalNodeId);
var primaryReplicationOffset = await client.failstopwrites(localIdBytes).WaitAsync(failoverTimeout, cts.Token);

// Wait for replica to catch up
Expand All @@ -142,35 +150,47 @@ private async Task<bool> PauseWritesAndWaitForSync()
logger?.LogError(ex, "PauseWritesAndWaitForSync Error");
return false;
}
finally
{
if (!useGossipConnections)
client?.Dispose();
}
}

/// <summary>
/// Perform series of steps to update local config and take ownership of primary slots.
/// </summary>
private void TakeOverAsPrimary()
private bool TakeOverAsPrimary()
{
// Take over as primary and inform old primary
status = FailoverStatus.TAKING_OVER_AS_PRIMARY;

// Make replica syncing unavailable by setting recovery flag
clusterProvider.replicationManager.recovering = true;
if (!clusterProvider.replicationManager.StartRecovery())
{
logger?.LogError($"{nameof(TakeOverAsPrimary)}: {{logMessage}}", Encoding.ASCII.GetString(CmdStrings.RESP_ERR_GENERIC_CANNOT_ACQUIRE_RECOVERY_LOCK));
return false;
}
_ = clusterProvider.WaitForConfigTransition();

// Update replicationIds and replicationOffset2
clusterProvider.replicationManager.TryUpdateForFailover();
try
{
// Take over slots from old primary
if (!clusterProvider.clusterManager.TryTakeOverForPrimary())
{
logger?.LogError($"{nameof(TakeOverAsPrimary)}: {{logMessage}}", Encoding.ASCII.GetString(CmdStrings.RESP_ERR_GENERIC_CANNOT_TAKEOVER_FROM_PRIMARY));
return false;
}

// Update replicationIds and replicationOffset2
clusterProvider.replicationManager.TryUpdateForFailover();

// Initialize checkpoint history
clusterProvider.replicationManager.InitializeCheckpointStore();
clusterProvider.clusterManager.TryTakeOverForPrimary();
_ = clusterProvider.WaitForConfigTransition();
// Initialize checkpoint history
clusterProvider.replicationManager.InitializeCheckpointStore();
_ = clusterProvider.WaitForConfigTransition();
}
finally
{
// Disable recovering as now this node has become a primary or failed in its attempt earlier
clusterProvider.replicationManager.SuspendRecovery();
}

// Disable recovering as now we have become a primary
clusterProvider.replicationManager.recovering = false;
return true;
}

/// <summary>
Expand All @@ -181,14 +201,15 @@ private void TakeOverAsPrimary()
/// <returns></returns>
private async Task BroadcastConfigAndRequestAttach(string replicaId, byte[] configByteArray)
{
var oldPrimaryId = oldConfig.LocalNodePrimaryId;
var newConfig = clusterProvider.clusterManager.CurrentConfig;
var client = GetConnection(replicaId);
var client = oldPrimaryId.Equals(replicaId) ? primaryClient : GetConnection(replicaId);

try
{
if (client == null)
{
logger?.LogError("Failed to initialize connection to replica {primaryId}", replicaId);
logger?.LogError("Failed to initialize connection to replica {replicaId}", replicaId);
return;
}

Expand Down Expand Up @@ -223,8 +244,8 @@ await client.Gossip(configByteArray).ContinueWith(t =>
}
}, TaskContinuationOptions.RunContinuationsAsynchronously).WaitAsync(failoverTimeout, cts.Token);

var localAddress = currentConfig.LocalNodeIp;
var localPort = currentConfig.LocalNodePort;
var localAddress = oldConfig.LocalNodeIp;
var localPort = oldConfig.LocalNodePort;

// Ask replica to attach and sync
var replicaOfResp = await client.ReplicaOf(localAddress, localPort).WaitAsync(failoverTimeout, cts.Token);
Expand All @@ -249,15 +270,14 @@ private async Task IssueAttachReplicas()
// Get information of local node from newConfig
var newConfig = clusterProvider.clusterManager.CurrentConfig;
// Get replica ids for old primary from old configuration
var oldPrimaryId = currentConfig.LocalNodePrimaryId;
var oldPrimaryId = oldConfig.LocalNodePrimaryId;
var replicaIds = newConfig.GetReplicaIds(oldPrimaryId);
var configByteArray = newConfig.ToByteArray();
var attachReplicaTasks = new List<Task>();

// If DEFAULT failover try to make old primary replica of this new primary
if (option is FailoverOption.DEFAULT)
{
// TODO: enable primary to replica failover
replicaIds.Add(oldPrimaryId);
}

Expand Down Expand Up @@ -312,7 +332,12 @@ public async Task<bool> BeginAsyncReplicaFailover()
}

// Transition to primary role
TakeOverAsPrimary();
if (!TakeOverAsPrimary())
{
// Request primary to be reset to original state only if DEFAULT option was used
_ = await primaryClient?.failstopwrites(Array.Empty<byte>()).WaitAsync(failoverTimeout, cts.Token);
return false;
}

// Attach to old replicas, and old primary if DEFAULT option
await IssueAttachReplicas();
Expand All @@ -326,6 +351,7 @@ public async Task<bool> BeginAsyncReplicaFailover()
}
finally
{
primaryClient?.Dispose();
status = FailoverStatus.NO_FAILOVER;
}
}
Expand Down
6 changes: 3 additions & 3 deletions libs/cluster/Server/GarnetClientExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ internal static partial class GarnetClientExtensions
/// <param name="cancellationToken"></param>
/// <returns></returns>
public static Task<MemoryResult<byte>> Gossip(this GarnetClient client, Memory<byte> data, CancellationToken cancellationToken = default)
=> client.ExecuteForMemoryResultWithCancellationAsync(GarnetClient.CLUSTER, new Memory<byte>[] { GOSSIP, data }, cancellationToken);
=> client.ExecuteForMemoryResultWithCancellationAsync(GarnetClient.CLUSTER, [GOSSIP, data], cancellationToken);

/// <summary>
/// Send config
Expand All @@ -33,7 +33,7 @@ public static Task<MemoryResult<byte>> Gossip(this GarnetClient client, Memory<b
/// <param name="cancellationToken"></param>
/// <returns></returns>
public static Task<MemoryResult<byte>> GossipWithMeet(this GarnetClient client, Memory<byte> data, CancellationToken cancellationToken = default)
=> client.ExecuteForMemoryResultWithCancellationAsync(GarnetClient.CLUSTER, new Memory<byte>[] { GOSSIP, WITHMEET, data }, cancellationToken);
=> client.ExecuteForMemoryResultWithCancellationAsync(GarnetClient.CLUSTER, [GOSSIP, WITHMEET, data], cancellationToken);

/// <summary>
/// Send stop writes to primary
Expand All @@ -43,7 +43,7 @@ public static Task<MemoryResult<byte>> GossipWithMeet(this GarnetClient client,
/// <param name="cancellationToken"></param>
/// <returns></returns>
public static async Task<long> failstopwrites(this GarnetClient client, Memory<byte> nodeid, CancellationToken cancellationToken = default)
=> await client.ExecuteForLongResultWithCancellationAsync(GarnetClient.CLUSTER, new Memory<byte>[] { CmdStrings.failstopwrites.ToArray(), nodeid }, cancellationToken).ConfigureAwait(false);
=> await client.ExecuteForLongResultWithCancellationAsync(GarnetClient.CLUSTER, [CmdStrings.failstopwrites.ToArray(), nodeid], cancellationToken).ConfigureAwait(false);

/// <summary>
/// Send request to await for replication offset sync with replica
Expand Down
2 changes: 1 addition & 1 deletion libs/cluster/Server/GarnetServerNode.cs
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ public MemoryResult<byte> TryMeet(byte[] configByteArray)
{
_ = meetLock.TryWriteLock();
UpdateGossipSend();
var resp = gc.GossipWithMeet(configByteArray).WaitAsync(clusterProvider.clusterManager.gossipDelay, cts.Token).GetAwaiter().GetResult();
var resp = gc.GossipWithMeet(configByteArray).WaitAsync(clusterProvider.clusterManager.clusterTimeout, cts.Token).GetAwaiter().GetResult();
return resp;
}
finally
Expand Down
Loading

0 comments on commit caf6f59

Please sign in to comment.