Skip to content

Commit

Permalink
Garnet Epoch Protection Fix & Misc (#361)
Browse files Browse the repository at this point in the history
* fix wait for config transition

* code cleanup

* add max retry on replica attach to avoid getting stuck in prod environment

* update waitcheckpoint util to reflect correct check against lastSaveTime

* default AOF start to 64
  • Loading branch information
vazois authored May 6, 2024
1 parent b8a2674 commit a6218f9
Show file tree
Hide file tree
Showing 6 changed files with 69 additions and 25 deletions.
2 changes: 1 addition & 1 deletion libs/cluster/Server/ClusterProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ internal bool WaitForConfigTransition()
foreach (var s in sessions)
{
var entryEpoch = s.LocalCurrentEpoch;
if (entryEpoch != 0 && entryEpoch >= currentEpoch)
if (entryEpoch != 0 && entryEpoch < currentEpoch)
goto retry;
}
break;
Expand Down
2 changes: 1 addition & 1 deletion libs/cluster/Server/Replication/CheckpointEntry.cs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public CheckpointEntry()
}

public long GetMinAofCoveredAddress()
=> Math.Min(storeCheckpointCoveredAofAddress, objectCheckpointCoveredAofAddress);
=> Math.Max(Math.Min(storeCheckpointCoveredAofAddress, objectCheckpointCoveredAofAddress), 64);

/// <summary>
/// Indicate addition of new reader by trying to increment reader counter
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ public void Dispose()
public async Task<bool> SendCheckpoint()
{
errorMsg = default;
var retryCount = 0;
var storeCkptManager = clusterProvider.GetReplicationLogCheckpointManager(StoreType.Main);
var objectStoreCkptManager = clusterProvider.GetReplicationLogCheckpointManager(StoreType.Object);
var current = clusterProvider.clusterManager.CurrentConfig;
Expand Down Expand Up @@ -96,6 +97,8 @@ public async Task<bool> SendCheckpoint()
{
localEntry.RemoveReader();
_ = Thread.Yield();
if (retryCount++ > 10)
throw new GarnetException("Attaching replica maximum retry count reached!");
goto retry;
}
}
Expand All @@ -110,6 +113,8 @@ public async Task<bool> SendCheckpoint()
{
localEntry.RemoveReader();
_ = Thread.Yield();
if (retryCount++ > 10)
throw new GarnetException("Attaching replica maximum retry count reached!");
goto retry;
}
}
Expand Down Expand Up @@ -187,7 +192,7 @@ public async Task<bool> SendCheckpoint()
var beginAddress = RecoveredReplicationOffset;
if (!recoverFromRemote)
{
//If replica is ahead of this primary it will force itself to forget and start syncing from RecoveredReplicationOffset
// If replica is ahead of this primary it will force itself to forget and start syncing from RecoveredReplicationOffset
if (replicaAofBeginAddress > ReplicationManager.kFirstValidAofAddress && replicaAofBeginAddress > RecoveredReplicationOffset)
{
logger?.LogInformation(
Expand Down
19 changes: 10 additions & 9 deletions libs/server/StoreWrapper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,8 @@ public void RecoverCheckpoint(bool recoverMainStoreFromToken = false, bool recov
{
storeVersion = !recoverMainStoreFromToken ? store.Recover() : store.Recover(storeIndexToken, storeHlogToken);
if (objectStore != null) objectStoreVersion = !recoverObjectStoreFromToken ? objectStore.Recover() : objectStore.Recover(objectStoreIndexToken, objectStoreHlogToken);
lastSaveTime = DateTimeOffset.UtcNow;
if (storeVersion > 0 || objectStoreVersion > 0)
lastSaveTime = DateTimeOffset.UtcNow;
}
catch (Exception ex)
{
Expand Down Expand Up @@ -269,8 +270,8 @@ public long ReplayAOF(long untilAddress = -1)
long replicationOffset = 0;
try
{
//When replaying AOF we do not want to write record again to AOF.
//So initialize local AofProcessor with recordToAof: false.
// When replaying AOF we do not want to write record again to AOF.
// So initialize local AofProcessor with recordToAof: false.
var aofProcessor = new AofProcessor(this, recordToAof: false, logger);
aofProcessor.Recover(untilAddress);
aofProcessor.Dispose();
Expand Down Expand Up @@ -578,22 +579,22 @@ void CompleteCheckpoint()
/// <summary>
/// Take a checkpoint if no checkpoint was taken after the provided time offset
/// </summary>
/// <param name="afterTime"></param>
/// <param name="entryTime"></param>
/// <returns></returns>
public async Task TakeOnDemandCheckpoint(DateTimeOffset afterTime)
public async Task TakeOnDemandCheckpoint(DateTimeOffset entryTime)
{
//Take lock to ensure not other task will be taking a checkpoint
// Take lock to ensure no other task will be taking a checkpoint
while (!StartCheckpoint())
await Task.Yield();

//If an external task has taken a checkpoint after the provided afterTime return
if (this.lastSaveTime > afterTime)
// If an external task has taken a checkpoint beyond the provided entryTime return
if (this.lastSaveTime > entryTime)
{
CompleteCheckpoint();
return;
}

//If no newer checkpoint was taken compared to the provided afterTime take a checkpoint
// Necessary to take a checkpoint because the latest checkpoint is before entryTime
await CheckpointTask(StoreType.All, logger: logger);
}

Expand Down
33 changes: 24 additions & 9 deletions test/Garnet.test.cluster/ClusterReplicationTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -236,15 +236,18 @@ public void ClusterSRPrimaryCheckpoint([Values] bool performRMW, [Values] bool d
context.PopulatePrimary(ref context.kvPairs, keyLength, kvpairCount, 0);
else
context.PopulatePrimaryRMW(ref context.kvPairs, keyLength, kvpairCount, 0, addCount);

var primaryLastSaveTime = context.clusterTestUtils.LastSave(0, logger: context.logger);
var replicaLastSaveTime = context.clusterTestUtils.LastSave(1, logger: context.logger);
context.clusterTestUtils.Checkpoint(0, logger: context.logger);

// Populate Primary
context.PopulatePrimary(ref context.kvPairs, keyLength, kvpairCount, 0);
context.ValidateKVCollectionAgainstReplica(ref context.kvPairs, 1);

context.clusterTestUtils.WaitForReplicaAofSync(0, 1, context.logger);
context.clusterTestUtils.WaitFirstCheckpoint(0, context.logger);
context.clusterTestUtils.WaitFirstCheckpoint(1, context.logger);
context.clusterTestUtils.WaitCheckpoint(0, primaryLastSaveTime, logger: context.logger);
context.clusterTestUtils.WaitCheckpoint(1, replicaLastSaveTime, logger: context.logger);

// Shutdown secondary
context.nodes[1].Store.CommitAOF(true);
Expand Down Expand Up @@ -599,9 +602,11 @@ public void ClusterReplicationSimpleFailover([Values] bool performRMW, [Values]

if (checkpoint)
{
var primaryLastSaveTime = context.clusterTestUtils.LastSave(0, logger: context.logger);
var replicaLastSaveTime = context.clusterTestUtils.LastSave(1, logger: context.logger);
context.clusterTestUtils.Checkpoint(0);
context.clusterTestUtils.WaitFirstCheckpoint(0, logger: context.logger);
context.clusterTestUtils.WaitFirstCheckpoint(1, logger: context.logger);
context.clusterTestUtils.WaitCheckpoint(0, primaryLastSaveTime, logger: context.logger);
context.clusterTestUtils.WaitCheckpoint(1, replicaLastSaveTime, logger: context.logger);
}

#region InitiateFailover
Expand Down Expand Up @@ -667,8 +672,9 @@ public void ClusterFailoverAttachReplicas([Values] bool performRMW, [Values] boo

if (takePrimaryCheckpoint)
{
var primaryLastSaveTime = context.clusterTestUtils.LastSave(0, logger: context.logger);
context.clusterTestUtils.Checkpoint(0, logger: context.logger);
context.clusterTestUtils.WaitFirstCheckpoint(0, logger: context.logger);
context.clusterTestUtils.WaitCheckpoint(0, primaryLastSaveTime, logger: context.logger);
}

// Wait for replication offsets to synchronize
Expand All @@ -692,8 +698,9 @@ public void ClusterFailoverAttachReplicas([Values] bool performRMW, [Values] boo

if (takeNewPrimaryCheckpoint)
{
var newPrimaryLastSaveTime = context.clusterTestUtils.LastSave(1, logger: context.logger);
context.clusterTestUtils.Checkpoint(1, logger: context.logger);
context.clusterTestUtils.WaitFirstCheckpoint(1, logger: context.logger);
context.clusterTestUtils.WaitCheckpoint(1, newPrimaryLastSaveTime, logger: context.logger);
}
context.clusterTestUtils.WaitForReplicaAofSync(1, 2, context.logger);

Expand Down Expand Up @@ -912,11 +919,19 @@ void ClusterDivergentReplicasTest(bool performRMW, bool disableObjects, bool ckp
}
else context.PopulatePrimaryWithObjects(ref context.kvPairsObj, keyLength, kvpairCount, primaryIndex: oldPrimaryIndex, set: set);

if (ckptBeforeDivergence) context.clusterTestUtils.Checkpoint(oldPrimaryIndex, logger: context.logger);
if (ckptBeforeDivergence)
{
var oldPrimaryLastSaveTime = context.clusterTestUtils.LastSave(oldPrimaryIndex, logger: context.logger);
var newPrimaryLastSaveTime = context.clusterTestUtils.LastSave(newPrimaryIndex, logger: context.logger);
var replicaLastSaveTime = context.clusterTestUtils.LastSave(replicaIndex, logger: context.logger);
context.clusterTestUtils.Checkpoint(oldPrimaryIndex, logger: context.logger);
context.clusterTestUtils.WaitCheckpoint(oldPrimaryIndex, oldPrimaryLastSaveTime, logger: context.logger);
context.clusterTestUtils.WaitCheckpoint(newPrimaryIndex, newPrimaryLastSaveTime, logger: context.logger);
context.clusterTestUtils.WaitCheckpoint(replicaIndex, replicaLastSaveTime, logger: context.logger);
}

context.clusterTestUtils.WaitForReplicaAofSync(oldPrimaryIndex, newPrimaryIndex, context.logger);
context.clusterTestUtils.WaitForReplicaAofSync(oldPrimaryIndex, replicaIndex, context.logger);
context.clusterTestUtils.WaitFirstCheckpoint(newPrimaryIndex, logger: context.logger);
context.clusterTestUtils.WaitFirstCheckpoint(replicaIndex, logger: context.logger);

// Make this replica of no-one
_ = context.clusterTestUtils.ReplicaOf(1, logger: context.logger);
Expand Down
31 changes: 27 additions & 4 deletions test/Garnet.test.cluster/ClusterTestUtils.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2647,16 +2647,39 @@ public void Checkpoint(IPEndPoint endPoint, ILogger logger = null)
}
}

public void WaitFirstCheckpoint(int nodeIndex, ILogger logger = null)
=> WaitCheckpoint((IPEndPoint)endpoints[nodeIndex], logger: logger);
public DateTime LastSave(int nodeIndex, ILogger logger = null)
=> LastSave((IPEndPoint)endpoints[nodeIndex], logger: logger);

public void WaitCheckpoint(IPEndPoint endPoint, ILogger logger = null)
public DateTime LastSave(IPEndPoint endPoint, ILogger logger = null)
{
try
{
var server = redis.GetServer(endPoint);
while (server.LastSave().Ticks == DateTimeOffset.FromUnixTimeSeconds(0).Ticks)
return server.LastSave();
}
catch (Exception ex)
{
logger?.LogError(ex, "An error has occurred; WaitCheckpoint");
Assert.Fail();
}
return default;
}

public void WaitCheckpoint(int nodeIndex, DateTime time, ILogger logger = null)
=> WaitCheckpoint((IPEndPoint)endpoints[nodeIndex], time: time, logger: logger);

public void WaitCheckpoint(IPEndPoint endPoint, DateTime time, ILogger logger = null)
{
try
{
var server = redis.GetServer(endPoint);
while (true)
{
var lastSaveTime = server.LastSave();
if (lastSaveTime >= time)
break;
BackOff();
}
}
catch (Exception ex)
{
Expand Down

0 comments on commit a6218f9

Please sign in to comment.