Skip to content

Commit

Permalink
fix: Replication checkpoint recovery fix (#227)
Browse files Browse the repository at this point in the history
* add replication recovery state fix

* add metrics to update primary recovery time

* fix whitespace formatting error

* Address comments - change DateTime to timestamp and formatting fixes
  • Loading branch information
msft-paddy14 authored Apr 4, 2024
1 parent b3d9a20 commit cff0b57
Show file tree
Hide file tree
Showing 5 changed files with 19 additions and 2 deletions.
1 change: 1 addition & 0 deletions libs/cluster/Server/ClusterManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,7 @@ public void TryResetReplica()
if (Interlocked.CompareExchange(ref currentConfig, newConfig, current) == current)
break;
}
replicationManager.Reset();
FlushConfig();
}

Expand Down
3 changes: 2 additions & 1 deletion libs/cluster/Server/ClusterProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ public MetricsItem[] GetReplicationInfo()
int replicaCount = clusterEnabled ? replicaInfo.Count : 0;
var role = clusterEnabled ? config.GetLocalNodeRole() : NodeRole.PRIMARY;
int commonInfoCount = 11;
int replicaInfoCount = 9;
int replicaInfoCount = 10;
int replicationInfoCount = commonInfoCount + replicaCount;
replicationInfoCount += role == NodeRole.REPLICA ? replicaInfoCount : 0;

Expand Down Expand Up @@ -228,6 +228,7 @@ public MetricsItem[] GetReplicationInfo()
replicationInfo[commonInfoCount + 6] = new("slave_priority", "100");
replicationInfo[commonInfoCount + 7] = new("slave_read_only", "1");
replicationInfo[commonInfoCount + 8] = new("replica_announced", "1");
replicationInfo[commonInfoCount + 9] = new("master_sync_last_io_seconds_ago", replicationManager.LastPrimarySyncSeconds.ToString());
}
else
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ public void CloseDevice()
/// <param name="segmentId"></param>
public void ProcessFileSegments(int segmentId, Guid token, CheckpointFileType type, long startAddress, Span<byte> data)
{
clusterProvider.replicationManager.UpdateLastPrimarySyncTime();
if (writeIntoCkptDevice == null)
{
Debug.Assert(writeIntoCkptDevice == null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@ private async Task<string> InitiateReplicaSync()
/// <exception cref="Exception">Throws invalid type checkpoint metadata.</exception>
public void ProcessCheckpointMetadata(Guid fileToken, CheckpointFileType fileType, byte[] checkpointMetadata)
{
UpdateLastPrimarySyncTime();
ReplicationLogCheckpointManager ckptManager = fileType switch
{
CheckpointFileType.STORE_SNAPSHOT or
Expand Down Expand Up @@ -290,6 +291,7 @@ public long BeginReplicaRecover(
long beginAddress,
long recoveredReplicationOffset)
{
UpdateLastPrimarySyncTime();
storeWrapper.RecoverCheckpoint(recoverMainStoreFromToken, recoverObjectStoreFromToken,
remoteCheckpoint.storeIndexToken, remoteCheckpoint.storeHlogToken, remoteCheckpoint.objectStoreIndexToken, remoteCheckpoint.objectStoreHlogToken);

Expand Down
14 changes: 13 additions & 1 deletion libs/cluster/Server/Replication/ReplicationManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,16 @@ internal sealed partial class ReplicationManager : IDisposable

readonly ILogger logger;
bool _disposed;

private long primary_sync_last_time;

internal long LastPrimarySyncSeconds => recovering ? (DateTime.UtcNow.Ticks - primary_sync_last_time) / TimeSpan.TicksPerSecond : 0;

internal void UpdateLastPrimarySyncTime() => this.primary_sync_last_time = DateTime.UtcNow.Ticks;


public bool recovering;
private long replicationOffset;

public long ReplicationOffset
{
get
Expand Down Expand Up @@ -126,6 +133,11 @@ void CheckpointVersionShift(bool isMainStore, long oldVersion, long newVersion)
storeWrapper.EnqueueCommit(isMainStore, newVersion);
}

public void Reset()
{
recovering = false;
}

public void Dispose()
{
_disposed = true;
Expand Down

0 comments on commit cff0b57

Please sign in to comment.