From f365c944f5bac20912627c8be807bd3010542fe4 Mon Sep 17 00:00:00 2001 From: Vasileios Zois <96085550+vazois@users.noreply.github.com> Date: Wed, 29 Jan 2025 16:15:28 -0800 Subject: [PATCH] Code Refactor for Replication/Migration (#962) * refactor/cleanup replication tests * refactor/cleanup migration iterative network send * add OneWriteLock * fix migrate progress flag * dispose gcs when background sync has not started * make test fixture nonparallelizable * use try write lock for dipose of GCS in AofSyncTask --- .../GarnetClientSessionIncremental.cs | 195 +++++++++++++++++ .../GarnetClientSessionMigrationExtensions.cs | 198 +----------------- .../Server/Migration/MigrateSessionKeys.cs | 9 +- .../Server/Migration/MigrateSessionSend.cs | 12 +- .../Replication/PrimaryOps/AofSyncTaskInfo.cs | 35 +++- .../Session/RespClusterMigrateCommands.cs | 2 +- libs/common/SingleWriterMultiReaderLock.cs | 19 ++ playground/TstRunner/Program.cs | 2 +- .../ClusterReplicationAsyncReplayTests.cs | 10 - .../ClusterReplicationTLSTests.cs | 72 ------- test/Garnet.test.cluster/ClusterTestUtils.cs | 2 +- .../ClusterReplicationAsyncReplay.cs | 24 +++ .../ClusterReplicationBaseTests.cs} | 13 +- .../ReplicationTests/ClusterReplicationTLS.cs | 24 +++ 14 files changed, 323 insertions(+), 294 deletions(-) create mode 100644 libs/client/ClientSession/GarnetClientSessionIncremental.cs delete mode 100644 test/Garnet.test.cluster/ClusterReplicationAsyncReplayTests.cs delete mode 100644 test/Garnet.test.cluster/ClusterReplicationTLSTests.cs create mode 100644 test/Garnet.test.cluster/ReplicationTests/ClusterReplicationAsyncReplay.cs rename test/Garnet.test.cluster/{ClusterReplicationTests.cs => ReplicationTests/ClusterReplicationBaseTests.cs} (99%) create mode 100644 test/Garnet.test.cluster/ReplicationTests/ClusterReplicationTLS.cs diff --git a/libs/client/ClientSession/GarnetClientSessionIncremental.cs b/libs/client/ClientSession/GarnetClientSessionIncremental.cs new file mode 100644 index 0000000000..35b91ad1fe --- /dev/null +++ b/libs/client/ClientSession/GarnetClientSessionIncremental.cs @@ -0,0 +1,195 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT license. + +using System; +using System.Diagnostics; +using System.Threading; +using System.Threading.Tasks; +using Garnet.common; +using Garnet.networking; +using Microsoft.Extensions.Logging; +using Tsavorite.core; + +namespace Garnet.client +{ + public sealed unsafe partial class GarnetClientSession : IServerHook, IMessageConsumer + { + bool isMainStore; + byte* curr, head; + int keyValuePairCount; + TaskCompletionSource currTcsIterationTask = null; + + /// + /// Getter to compute how much space to leave at the front of the buffer + /// in order to write the maximum possible RESP length header (of length bufferSize) + /// + int ExtraSpace => + 1 // $ + + bufferSizeDigits // Number of digits in maximum possible length (will be written with zero padding) + + 2 // \r\n + + 4; // We write a 4-byte int keyCount at the start of the payload + + /// + /// Check if header for batch is initialized + /// + public bool NeedsInitialization => curr == null; + + /// + /// Flush and initialize buffers/parameters used for migrate command + /// + /// + public void InitializeIterationBuffer(TimeSpan iterationProgressFreq = default) + { + Flush(); + currTcsIterationTask = null; + curr = head = null; + keyValuePairCount = 0; + this.iterationProgressFreq = default ? TimeSpan.FromSeconds(5) : iterationProgressFreq; + } + + /// + /// Send key value pair and reset migrate buffers + /// + public Task SendAndResetIterationBuffer() + { + if (keyValuePairCount == 0) return null; + + Debug.Assert(end - curr >= 2); + *curr++ = (byte)'\r'; + *curr++ = (byte)'\n'; + + // Payload format = [$length\r\n][number of keys (4 bytes)][raw key value pairs]\r\n + var size = (int)(curr - 2 - head - (ExtraSpace - 4)); + TrackIterationProgress(keyValuePairCount, size); + var success = RespWriteUtils.TryWritePaddedBulkStringLength(size, ExtraSpace - 4, ref head, end); + Debug.Assert(success); + + // Number of key value pairs in payload + *(int*)head = keyValuePairCount; + + // Reset offset and flush buffer + offset = curr; + Flush(); + Interlocked.Increment(ref numCommands); + + // Return outstanding task and reset current tcs + var task = currTcsIterationTask.Task; + currTcsIterationTask = null; + curr = head = null; + keyValuePairCount = 0; + return task; + } + + /// + /// Try write key value pair for main store directly to the client buffer + /// + /// + /// + /// + /// + public bool TryWriteKeyValueSpanByte(ref SpanByte key, ref SpanByte value, out Task task) + { + task = null; + // Try write key value pair directly to client buffer + if (!WriteSerializedSpanByte(ref key, ref value)) + { + // If failed to write because no space left send outstanding data and retrieve task + // Caller is responsible for retrying + task = SendAndResetIterationBuffer(); + return false; + } + + keyValuePairCount++; + return true; + + bool WriteSerializedSpanByte(ref SpanByte key, ref SpanByte value) + { + var totalLen = key.TotalSize + value.TotalSize + 2 + 2; + if (totalLen > (int)(end - curr)) + return false; + + key.CopyTo(curr); + curr += key.TotalSize; + value.CopyTo(curr); + curr += value.TotalSize; + return true; + } + } + + /// + /// Try write key value pair for object store directly to the client buffer + /// + /// + /// + /// + /// + /// + public bool TryWriteKeyValueByteArray(byte[] key, byte[] value, long expiration, out Task task) + { + task = null; + // Try write key value pair directly to client buffer + if (!WriteSerializedKeyValueByteArray(key, value, expiration)) + { + // If failed to write because no space left send outstanding data and retrieve task + // Caller is responsible for retrying + task = SendAndResetIterationBuffer(); + return false; + } + + keyValuePairCount++; + return true; + + bool WriteSerializedKeyValueByteArray(byte[] key, byte[] value, long expiration) + { + // We include space for newline at the end, to be added before sending + int totalLen = 4 + key.Length + 4 + value.Length + 8 + 2; + if (totalLen > (int)(end - curr)) + return false; + + *(int*)curr = key.Length; + curr += 4; + fixed (byte* keyPtr = key) + Buffer.MemoryCopy(keyPtr, curr, key.Length, key.Length); + curr += key.Length; + + *(int*)curr = value.Length; + curr += 4; + fixed (byte* valPtr = value) + Buffer.MemoryCopy(valPtr, curr, value.Length, value.Length); + curr += value.Length; + + *(long*)curr = expiration; + curr += 8; + + return true; + } + } + + long lastLog = 0; + long totalKeyCount = 0; + long totalPayloadSize = 0; + TimeSpan iterationProgressFreq; + + /// + /// Logging of migrate session status + /// + /// + /// + /// + private void TrackIterationProgress(int keyCount, int size, bool completed = false) + { + totalKeyCount += keyCount; + totalPayloadSize += size; + var duration = TimeSpan.FromTicks(Stopwatch.GetTimestamp() - lastLog); + if (completed || lastLog == 0 || duration >= iterationProgressFreq) + { + logger?.LogTrace("[{op}]: isMainStore:({storeType}) totalKeyCount:({totalKeyCount}), totalPayloadSize:({totalPayloadSize} KB)", + completed ? "COMPLETED" : "MIGRATING", + isMainStore, + totalKeyCount.ToString("N0"), + ((long)((double)totalPayloadSize / 1024)).ToString("N0")); + lastLog = Stopwatch.GetTimestamp(); + } + } + } +} \ No newline at end of file diff --git a/libs/client/ClientSession/GarnetClientSessionMigrationExtensions.cs b/libs/client/ClientSession/GarnetClientSessionMigrationExtensions.cs index d508fe25f9..b23f56d34a 100644 --- a/libs/client/ClientSession/GarnetClientSessionMigrationExtensions.cs +++ b/libs/client/ClientSession/GarnetClientSessionMigrationExtensions.cs @@ -8,8 +8,6 @@ using System.Threading.Tasks; using Garnet.common; using Garnet.networking; -using Microsoft.Extensions.Logging; -using Tsavorite.core; namespace Garnet.client { @@ -166,49 +164,16 @@ public Task SetSlotRange(Memory state, string nodeid, List<(int, i return tcs.Task; } - /// - /// Check if migrate command parameters need to be initialized - /// - public bool InitMigrateCommand => curr == null; - - /// - /// Getter to compute how much space to leave at the front of the buffer - /// in order to write the maximum possible RESP length header (of length bufferSize) - /// - int ExtraSpace => - 1 // $ - + bufferSizeDigits // Number of digits in maximum possible length (will be written with zero padding) - + 2 // \r\n - + 4; // We write a 4-byte int keyCount at the start of the payload - - bool isMainStore; - byte* curr, head; - int keyCount; - TaskCompletionSource currTcsMigrate = null; - - /// - /// Flush and initialize buffers/parameters used for migrate command - /// - /// - public void InitMigrateBuffer(TimeSpan migrateProgressFreq = default) - { - Flush(); - currTcsMigrate = null; - curr = head = null; - keyCount = 0; - this.migrateProgressFreq = default ? TimeSpan.FromSeconds(5) : migrateProgressFreq; - } - /// /// Write parameters of CLUSTER MIGRATE directly to the client buffer /// /// /// /// - public void SetClusterMigrate(string sourceNodeId, bool replace, bool isMainStore) + public void SetClusterMigrateHeader(string sourceNodeId, bool replace, bool isMainStore) { - currTcsMigrate = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); - tcsQueue.Enqueue(currTcsMigrate); + currTcsIterationTask = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + tcsQueue.Enqueue(currTcsIterationTask); curr = offset; this.isMainStore = isMainStore; var storeType = isMainStore ? MAIN_STORE : OBJECT_STORE; @@ -274,39 +239,6 @@ public void SetClusterMigrate(string sourceNodeId, bool replace, bool isMainStor curr += ExtraSpace; } - /// - /// Send key value pair and reset migrate buffers - /// - public Task SendAndResetMigrate() - { - if (keyCount == 0) return null; - - Debug.Assert(end - curr >= 2); - *curr++ = (byte)'\r'; - *curr++ = (byte)'\n'; - - // Payload format = [$length\r\n][number of keys (4 bytes)][raw key value pairs]\r\n - var size = (int)(curr - 2 - head - (ExtraSpace - 4)); - TrackMigrateProgress(keyCount, size); - var success = RespWriteUtils.TryWritePaddedBulkStringLength(size, ExtraSpace - 4, ref head, end); - Debug.Assert(success); - - // Number of key value pairs in payload - *(int*)head = keyCount; - - // Reset offset and flush buffer - offset = curr; - Flush(); - Interlocked.Increment(ref numCommands); - - // Return outstanding task and reset current tcs - var task = currTcsMigrate.Task; - currTcsMigrate = null; - curr = head = null; - keyCount = 0; - return task; - } - /// /// Signal completion of migration by sending an empty payload /// @@ -316,7 +248,7 @@ public Task SendAndResetMigrate() /// public Task CompleteMigrate(string sourceNodeId, bool replace, bool isMainStore) { - SetClusterMigrate(sourceNodeId, replace, isMainStore); + SetClusterMigrateHeader(sourceNodeId, replace, isMainStore); Debug.Assert(end - curr >= 2); *curr++ = (byte)'\r'; @@ -324,12 +256,12 @@ public Task CompleteMigrate(string sourceNodeId, bool replace, bool isMa // Payload format = [$length\r\n][number of keys (4 bytes)][raw key value pairs]\r\n var size = (int)(curr - 2 - head - (ExtraSpace - 4)); - TrackMigrateProgress(keyCount, size, completed: true); + TrackIterationProgress(keyValuePairCount, size, completed: true); var success = RespWriteUtils.TryWritePaddedBulkStringLength(size, ExtraSpace - 4, ref head, end); Debug.Assert(success); // Number of key value pairs in payload - *(int*)head = keyCount; + *(int*)head = keyValuePairCount; // Reset offset and flush buffer offset = curr; @@ -337,123 +269,11 @@ public Task CompleteMigrate(string sourceNodeId, bool replace, bool isMa Interlocked.Increment(ref numCommands); // Return outstanding task and reset current tcs - var task = currTcsMigrate.Task; - currTcsMigrate = null; + var task = currTcsIterationTask.Task; + currTcsIterationTask = null; curr = head = null; - keyCount = 0; + keyValuePairCount = 0; return task; } - - /// - /// Try write key value pair for main store directly to the client buffer - /// - /// - /// - /// - /// - public bool TryWriteKeyValueSpanByte(ref SpanByte key, ref SpanByte value, out Task migrateTask) - { - migrateTask = null; - // Try write key value pair directly to client buffer - if (!WriteSerializedSpanByte(ref key, ref value)) - { - // If failed to write because no space left send outstanding data and retrieve task - // Caller is responsible for retrying - migrateTask = SendAndResetMigrate(); - return false; - } - - keyCount++; - return true; - } - - /// - /// Try write key value pair for object store directly to the client buffer - /// - /// - /// - /// - /// - /// - public bool TryWriteKeyValueByteArray(byte[] key, byte[] value, long expiration, out Task migrateTask) - { - migrateTask = null; - // Try write key value pair directly to client buffer - if (!WriteSerializedKeyValueByteArray(key, value, expiration)) - { - // If failed to write because no space left send outstanding data and retrieve task - // Caller is responsible for retrying - migrateTask = SendAndResetMigrate(); - return false; - } - - keyCount++; - return true; - } - - private bool WriteSerializedSpanByte(ref SpanByte key, ref SpanByte value) - { - var totalLen = key.TotalSize + value.TotalSize + 2 + 2; - if (totalLen > (int)(end - curr)) - return false; - - key.CopyTo(curr); - curr += key.TotalSize; - value.CopyTo(curr); - curr += value.TotalSize; - return true; - } - - private bool WriteSerializedKeyValueByteArray(byte[] key, byte[] value, long expiration) - { - // We include space for newline at the end, to be added before sending - int totalLen = 4 + key.Length + 4 + value.Length + 8 + 2; - if (totalLen > (int)(end - curr)) - return false; - - *(int*)curr = key.Length; - curr += 4; - fixed (byte* keyPtr = key) - Buffer.MemoryCopy(keyPtr, curr, key.Length, key.Length); - curr += key.Length; - - *(int*)curr = value.Length; - curr += 4; - fixed (byte* valPtr = value) - Buffer.MemoryCopy(valPtr, curr, value.Length, value.Length); - curr += value.Length; - - *(long*)curr = expiration; - curr += 8; - - return true; - } - - long lastLog = 0; - long totalKeyCount = 0; - long totalPayloadSize = 0; - TimeSpan migrateProgressFreq; - - /// - /// Logging of migrate session status - /// - /// - /// - /// - private void TrackMigrateProgress(int keyCount, int size, bool completed = false) - { - totalKeyCount += keyCount; - totalPayloadSize += size; - var duration = TimeSpan.FromTicks(Stopwatch.GetTimestamp() - lastLog); - if (completed || lastLog == 0 || duration >= migrateProgressFreq) - { - logger?.LogTrace("[{op}]: isMainStore:({storeType}) totalKeyCount:({totalKeyCount}), totalPayloadSize:({totalPayloadSize} KB)", - completed ? "COMPLETED" : "MIGRATING", - isMainStore, - totalKeyCount.ToString("N0"), - ((long)((double)totalPayloadSize / 1024)).ToString("N0")); - lastLog = Stopwatch.GetTimestamp(); - } - } } } \ No newline at end of file diff --git a/libs/cluster/Server/Migration/MigrateSessionKeys.cs b/libs/cluster/Server/Migration/MigrateSessionKeys.cs index 61942ec69c..abc5028eba 100644 --- a/libs/cluster/Server/Migration/MigrateSessionKeys.cs +++ b/libs/cluster/Server/Migration/MigrateSessionKeys.cs @@ -2,7 +2,6 @@ // Licensed under the MIT license. using System; -using System.Runtime.CompilerServices; using Garnet.server; using Microsoft.Extensions.Logging; using Tsavorite.core; @@ -74,7 +73,7 @@ private bool MigrateKeysFromMainStore() } // Flush data in client buffer - if (!HandleMigrateTaskResponse(_gcs.SendAndResetMigrate())) + if (!HandleMigrateTaskResponse(_gcs.SendAndResetIterationBuffer())) return false; DeleteKeys(); @@ -130,7 +129,7 @@ private bool MigrateKeysFromObjectStore() } // Flush data in client buffer - if (!HandleMigrateTaskResponse(_gcs.SendAndResetMigrate())) + if (!HandleMigrateTaskResponse(_gcs.SendAndResetIterationBuffer())) return false; } finally @@ -183,14 +182,14 @@ public bool MigrateKeys() return false; // Migrate main store keys - _gcs.InitMigrateBuffer(clusterProvider.storeWrapper.loggingFrequncy); + _gcs.InitializeIterationBuffer(clusterProvider.storeWrapper.loggingFrequncy); if (!MigrateKeysFromMainStore()) return false; // Migrate object store keys if (!clusterProvider.serverOptions.DisableObjects) { - _gcs.InitMigrateBuffer(clusterProvider.storeWrapper.loggingFrequncy); + _gcs.InitializeIterationBuffer(clusterProvider.storeWrapper.loggingFrequncy); if (!MigrateKeysFromObjectStore()) return false; } diff --git a/libs/cluster/Server/Migration/MigrateSessionSend.cs b/libs/cluster/Server/Migration/MigrateSessionSend.cs index 6beaeb2b63..9b9a1ad802 100644 --- a/libs/cluster/Server/Migration/MigrateSessionSend.cs +++ b/libs/cluster/Server/Migration/MigrateSessionSend.cs @@ -19,8 +19,8 @@ internal sealed unsafe partial class MigrateSession : IDisposable private bool WriteOrSendMainStoreKeyValuePair(ref SpanByte key, ref SpanByte value) { // Check if we need to initialize cluster migrate command arguments - if (_gcs.InitMigrateCommand) - _gcs.SetClusterMigrate(_sourceNodeId, _replaceOption, isMainStore: true); + if (_gcs.NeedsInitialization) + _gcs.SetClusterMigrateHeader(_sourceNodeId, _replaceOption, isMainStore: true); // Try write serialized key value to client buffer while (!_gcs.TryWriteKeyValueSpanByte(ref key, ref value, out var task)) @@ -30,7 +30,7 @@ private bool WriteOrSendMainStoreKeyValuePair(ref SpanByte key, ref SpanByte val return false; // re-initialize cluster migrate command parameters - _gcs.SetClusterMigrate(_sourceNodeId, _replaceOption, isMainStore: true); + _gcs.SetClusterMigrateHeader(_sourceNodeId, _replaceOption, isMainStore: true); } return true; } @@ -45,15 +45,15 @@ private bool WriteOrSendMainStoreKeyValuePair(ref SpanByte key, ref SpanByte val private bool WriteOrSendObjectStoreKeyValuePair(byte[] key, byte[] value, long expiration) { // Check if we need to initialize cluster migrate command arguments - if (_gcs.InitMigrateCommand) - _gcs.SetClusterMigrate(_sourceNodeId, _replaceOption, isMainStore: false); + if (_gcs.NeedsInitialization) + _gcs.SetClusterMigrateHeader(_sourceNodeId, _replaceOption, isMainStore: false); while (!_gcs.TryWriteKeyValueByteArray(key, value, expiration, out var task)) { // Flush key value pairs in the buffer if (!HandleMigrateTaskResponse(task)) return false; - _gcs.SetClusterMigrate(_sourceNodeId, _replaceOption, isMainStore: false); + _gcs.SetClusterMigrateHeader(_sourceNodeId, _replaceOption, isMainStore: false); } return true; } diff --git a/libs/cluster/Server/Replication/PrimaryOps/AofSyncTaskInfo.cs b/libs/cluster/Server/Replication/PrimaryOps/AofSyncTaskInfo.cs index 5464171dad..7065ece0e4 100644 --- a/libs/cluster/Server/Replication/PrimaryOps/AofSyncTaskInfo.cs +++ b/libs/cluster/Server/Replication/PrimaryOps/AofSyncTaskInfo.cs @@ -5,6 +5,7 @@ using System.Threading; using System.Threading.Tasks; using Garnet.client; +using Garnet.common; using Microsoft.Extensions.Logging; using Tsavorite.core; @@ -23,6 +24,21 @@ internal sealed class AofSyncTaskInfo : IBulkLogEntryConsumer, IDisposable readonly long startAddress; public long previousAddress; + /// + /// Used to mark if syncing is in progress + /// + SingleWriterMultiReaderLock aofSyncInProgress; + + /// + /// Check if client connection is healthy + /// + public bool IsConnected => garnetClient != null && garnetClient.IsConnected; + + /// + /// Return start address for this AOF iterator + /// + public long StartAddress => startAddress; + public AofSyncTaskInfo( ClusterProvider clusterProvider, AofTaskStore aofTaskStore, @@ -40,7 +56,7 @@ public AofSyncTaskInfo( this.garnetClient = garnetClient; this.startAddress = startAddress; previousAddress = startAddress; - this.cts = new CancellationTokenSource(); + cts = new CancellationTokenSource(); } public void Dispose() @@ -53,6 +69,11 @@ public void Dispose() // Finally, dispose the cts cts?.Dispose(); + + // Dispose only if AOF sync has not started + // otherwise sync task will dispose the client + if (aofSyncInProgress.TryWriteLock()) + garnetClient?.Dispose(); } public unsafe void Consume(byte* payloadPtr, int payloadLength, long currentAddress, long nextAddress, bool isProtected) @@ -87,9 +108,17 @@ public async Task ReplicaSyncTask() { logger?.LogInformation("Starting ReplicationManager.ReplicaSyncTask for remote node {remoteNodeId} starting from address {address}", remoteNodeId, startAddress); + var failedToStart = false; try { - garnetClient.Connect(); + if (!aofSyncInProgress.TryWriteLock()) + { + logger?.LogWarning("{method} AOF sync for {remoteNodeId} failed to start", nameof(ReplicaSyncTask), remoteNodeId); + failedToStart = true; + return; + } + + if (!IsConnected) garnetClient.Connect(); iter = clusterProvider.storeWrapper.appendOnlyFile.ScanSingle(startAddress, long.MaxValue, scanUncommitted: true, recover: false, logger: logger); @@ -105,7 +134,7 @@ public async Task ReplicaSyncTask() } finally { - garnetClient.Dispose(); + if (!failedToStart) garnetClient.Dispose(); var (address, port) = clusterProvider.clusterManager.CurrentConfig.GetWorkerAddressFromNodeId(remoteNodeId); logger?.LogWarning("AofSync task terminated; client disposed {remoteNodeId} {address} {port} {currentAddress}", remoteNodeId, address, port, previousAddress); diff --git a/libs/cluster/Session/RespClusterMigrateCommands.cs b/libs/cluster/Session/RespClusterMigrateCommands.cs index b50a23cb29..26f6664b4c 100644 --- a/libs/cluster/Session/RespClusterMigrateCommands.cs +++ b/libs/cluster/Session/RespClusterMigrateCommands.cs @@ -103,7 +103,7 @@ private bool NetworkClusterMigrate(out bool invalidParameters) var keyCount = *(int*)payloadPtr; payloadPtr += 4; var i = 0; - TrackImportProgress(keyCount, isMainStore: true, keyCount == 0); + TrackImportProgress(keyCount, isMainStore: false, keyCount == 0); while (i < keyCount) { if (!RespReadUtils.TryReadSerializedData(out var key, out var data, out var expiration, ref payloadPtr, payloadEndPtr)) diff --git a/libs/common/SingleWriterMultiReaderLock.cs b/libs/common/SingleWriterMultiReaderLock.cs index 6e6b12ec2d..6c448399f5 100644 --- a/libs/common/SingleWriterMultiReaderLock.cs +++ b/libs/common/SingleWriterMultiReaderLock.cs @@ -87,6 +87,25 @@ public void ReadUnlock() Interlocked.Decrement(ref _lock); } + /// + /// Continuously attempt to acquire write lock until lock is acquired or it is write locked + /// + /// Return true if current thread is the one that acquired write lock + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public bool OneWriteLock() + { + while (true) + { + var isWriteLocked = IsWriteLocked; + var acquiredWriteLock = TryWriteLock(); + if (isWriteLocked || acquiredWriteLock) + { + return acquiredWriteLock; + } + Thread.Yield(); + } + } + /// public override string ToString() => _lock.ToString(); diff --git a/playground/TstRunner/Program.cs b/playground/TstRunner/Program.cs index ad8d1b36a2..515fb2288a 100644 --- a/playground/TstRunner/Program.cs +++ b/playground/TstRunner/Program.cs @@ -21,7 +21,7 @@ static void Main() while (true) { var clusterMigrateTests = new ClusterMigrateTests(false); - var clusterReplicationTests = new ClusterReplicationTests(false); + var clusterReplicationTests = new ClusterReplicationBaseTests(); Console.WriteLine($">>>>>>>>>> run: {i} StartedOn: {DateTime.Now}"); swatch.Start(); diff --git a/test/Garnet.test.cluster/ClusterReplicationAsyncReplayTests.cs b/test/Garnet.test.cluster/ClusterReplicationAsyncReplayTests.cs deleted file mode 100644 index df98c227eb..0000000000 --- a/test/Garnet.test.cluster/ClusterReplicationAsyncReplayTests.cs +++ /dev/null @@ -1,10 +0,0 @@ -// Copyright (c) Microsoft Corporation. -// Licensed under the MIT license. - -using NUnit.Framework; - -namespace Garnet.test.cluster -{ - [TestFixture(false, true), NonParallelizable] - public unsafe class ClusterReplicationAsyncReplayTests(bool UseTLS = false, bool asyncReplay = false) : ClusterReplicationTests(UseTLS, asyncReplay) { } -} \ No newline at end of file diff --git a/test/Garnet.test.cluster/ClusterReplicationTLSTests.cs b/test/Garnet.test.cluster/ClusterReplicationTLSTests.cs deleted file mode 100644 index 766819b0ec..0000000000 --- a/test/Garnet.test.cluster/ClusterReplicationTLSTests.cs +++ /dev/null @@ -1,72 +0,0 @@ -// Copyright (c) Microsoft Corporation. -// Licensed under the MIT license. - -using NUnit.Framework; - -namespace Garnet.test.cluster -{ - [TestFixture, NonParallelizable] - public unsafe class ClusterTLSRT - { - ClusterReplicationTests tests; - - [SetUp] - public void Setup() - { - tests = new ClusterReplicationTests(UseTLS: true); - tests.Setup(); - } - - [TearDown] - public void TearDown() - { - tests.TearDown(); - tests = null; - } - - [Test, Order(1)] - [Category("REPLICATION")] - public void ClusterTLSR([Values] bool disableObjects) - => tests.ClusterSRTest(disableObjects); - - [Test, Order(2)] - [Category("REPLICATION")] - public void ClusterTLSRCheckpointRestartSecondary([Values] bool performRMW, [Values] bool disableObjects) - => tests.ClusterSRNoCheckpointRestartSecondary(performRMW, disableObjects); - - [Test, Order(3)] - [Category("REPLICATION")] - public void ClusterTLSRPrimaryCheckpoint([Values] bool performRMW, [Values] bool disableObjects) - => tests.ClusterSRPrimaryCheckpoint(performRMW, disableObjects); - - [Test, Order(4)] - [Category("REPLICATION")] - public void ClusterTLSRPrimaryCheckpointRetrieve([Values] bool performRMW, [Values] bool disableObjects, [Values] bool lowMemory, [Values] bool manySegments) - => tests.ClusterSRPrimaryCheckpointRetrieve(performRMW, disableObjects, lowMemory, manySegments); - - [Test, Order(5)] - [Category("REPLICATION")] - public void ClusterTLSCheckpointRetrieveDisableStorageTier([Values] bool performRMW, [Values] bool disableObjects) - => tests.ClusterCheckpointRetrieveDisableStorageTier(performRMW, disableObjects); - - [Test, Order(6)] - [Category("REPLICATION")] - public void ClusterTLSRAddReplicaAfterPrimaryCheckpoint([Values] bool performRMW, [Values] bool disableObjects, [Values] bool lowMemory) - => tests.ClusterSRAddReplicaAfterPrimaryCheckpoint(performRMW, disableObjects, lowMemory); - - [Test, Order(7)] - [Category("REPLICATION")] - public void ClusterTLSRPrimaryRestart([Values] bool performRMW, [Values] bool disableObjects) - => tests.ClusterSRPrimaryRestart(performRMW, disableObjects); - - [Test, Order(8)] - [Category("REPLICATION")] - public void ClusterTLSRRedirectWrites() - => tests.ClusterSRRedirectWrites(); - - [Test, Order(9)] - [Category("REPLICATION")] - public void ClusterTLSRReplicaOfTest([Values] bool performRMW) - => tests.ClusterSRReplicaOfTest(performRMW); - } -} \ No newline at end of file diff --git a/test/Garnet.test.cluster/ClusterTestUtils.cs b/test/Garnet.test.cluster/ClusterTestUtils.cs index fdfdd27f01..367cc52f67 100644 --- a/test/Garnet.test.cluster/ClusterTestUtils.cs +++ b/test/Garnet.test.cluster/ClusterTestUtils.cs @@ -1864,7 +1864,7 @@ public string ClusterReplicate(IPEndPoint endPoint, string primaryNodeId, bool a try { var server = redis.GetServer(endPoint); - var args = async ? new List() { "replicate", primaryNodeId, "async" } : new List() { "replicate", primaryNodeId }; + List args = async ? ["replicate", primaryNodeId, "async"] : ["replicate", primaryNodeId]; var result = (string)server.Execute("cluster", args); ClassicAssert.AreEqual("OK", result); return result; diff --git a/test/Garnet.test.cluster/ReplicationTests/ClusterReplicationAsyncReplay.cs b/test/Garnet.test.cluster/ReplicationTests/ClusterReplicationAsyncReplay.cs new file mode 100644 index 0000000000..e910f9f377 --- /dev/null +++ b/test/Garnet.test.cluster/ReplicationTests/ClusterReplicationAsyncReplay.cs @@ -0,0 +1,24 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT license. + +using NUnit.Framework; + +namespace Garnet.test.cluster +{ + [NonParallelizable] + public class ClusterReplicationAsyncReplay : ClusterReplicationBaseTests + { + [SetUp] + public override void Setup() + { + asyncReplay = true; + base.Setup(); + } + + [TearDown] + public override void TearDown() + { + base.TearDown(); + } + } +} \ No newline at end of file diff --git a/test/Garnet.test.cluster/ClusterReplicationTests.cs b/test/Garnet.test.cluster/ReplicationTests/ClusterReplicationBaseTests.cs similarity index 99% rename from test/Garnet.test.cluster/ClusterReplicationTests.cs rename to test/Garnet.test.cluster/ReplicationTests/ClusterReplicationBaseTests.cs index cdccac8a3c..14d6f93ed3 100644 --- a/test/Garnet.test.cluster/ClusterReplicationTests.cs +++ b/test/Garnet.test.cluster/ReplicationTests/ClusterReplicationBaseTests.cs @@ -1,4 +1,4 @@ -// Copyright (c) Microsoft Corporation. +// Copyright (c) Microsoft Corporation. // Licensed under the MIT license. using System; @@ -16,8 +16,8 @@ namespace Garnet.test.cluster { - [TestFixture(false, false), NonParallelizable] - public class ClusterReplicationTests(bool UseTLS = false, bool asyncReplay = false) + [NonParallelizable] + public class ClusterReplicationBaseTests { public (Action, string)[] GetUnitTests() { @@ -81,7 +81,8 @@ public class ClusterReplicationTests(bool UseTLS = false, bool asyncReplay = fal ClusterTestContext context; public void SetLogTextWriter(TextWriter logTextWriter) => context.logTextWriter = logTextWriter; - readonly bool useTLS = UseTLS; + protected bool useTLS = false; + protected bool asyncReplay = false; readonly int timeout = 60; readonly int keyCount = 256; @@ -91,14 +92,14 @@ public class ClusterReplicationTests(bool UseTLS = false, bool asyncReplay = fal }; [SetUp] - public void Setup() + public virtual void Setup() { context = new ClusterTestContext(); context.Setup(monitorTests); } [TearDown] - public void TearDown() + public virtual void TearDown() { context?.TearDown(); } diff --git a/test/Garnet.test.cluster/ReplicationTests/ClusterReplicationTLS.cs b/test/Garnet.test.cluster/ReplicationTests/ClusterReplicationTLS.cs new file mode 100644 index 0000000000..76e0b11e45 --- /dev/null +++ b/test/Garnet.test.cluster/ReplicationTests/ClusterReplicationTLS.cs @@ -0,0 +1,24 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT license. + +using NUnit.Framework; + +namespace Garnet.test.cluster +{ + [NonParallelizable] + public class ClusterReplicationTLS : ClusterReplicationBaseTests + { + [SetUp] + public override void Setup() + { + useTLS = true; + base.Setup(); + } + + [TearDown] + public override void TearDown() + { + base.TearDown(); + } + } +} \ No newline at end of file