diff --git a/src/CacheTower.Extensions.Redis/AssemblyInternals.cs b/src/CacheTower.Extensions.Redis/AssemblyInternals.cs new file mode 100644 index 00000000..292deb68 --- /dev/null +++ b/src/CacheTower.Extensions.Redis/AssemblyInternals.cs @@ -0,0 +1,3 @@ +using System.Runtime.CompilerServices; + +[assembly: InternalsVisibleTo("CacheTower.Tests")] \ No newline at end of file diff --git a/src/CacheTower.Extensions.Redis/RedisLockExtension.cs b/src/CacheTower.Extensions.Redis/RedisLockExtension.cs index 75405713..58c147d0 100644 --- a/src/CacheTower.Extensions.Redis/RedisLockExtension.cs +++ b/src/CacheTower.Extensions.Redis/RedisLockExtension.cs @@ -21,7 +21,7 @@ public class RedisLockExtension : IRefreshWrapperExtension private ICacheStack RegisteredStack { get; set; } - private ConcurrentDictionary>> LockedOnKeyRefresh { get; } + internal ConcurrentDictionary>> LockedOnKeyRefresh { get; } public RedisLockExtension(ConnectionMultiplexer connection, int databaseIndex = -1, string channelPrefix = "CacheTower", TimeSpan? lockTimeout = default) { diff --git a/src/CacheTower.Extensions.Redis/RedisRemoteEvictionExtension.cs b/src/CacheTower.Extensions.Redis/RedisRemoteEvictionExtension.cs index cd03e211..5f7991c0 100644 --- a/src/CacheTower.Extensions.Redis/RedisRemoteEvictionExtension.cs +++ b/src/CacheTower.Extensions.Redis/RedisRemoteEvictionExtension.cs @@ -52,20 +52,21 @@ public void Register(ICacheStack cacheStack) } IsRegistered = true; - Subscriber.Subscribe(RedisChannel, async (channel, value) => - { - string cacheKey = value; - var shouldEvictLocally = false; - lock (FlaggedRefreshesLockObj) + Subscriber.Subscribe(RedisChannel, CommandFlags.FireAndForget) + .OnMessage(async (channelMessage) => { - shouldEvictLocally = FlaggedRefreshes.Remove(cacheKey) == false; - } + string cacheKey = channelMessage.Message; + var shouldEvictLocally = false; + lock (FlaggedRefreshesLockObj) + { + shouldEvictLocally = FlaggedRefreshes.Remove(cacheKey) == false; + } - if (shouldEvictLocally) - { - await cacheStack.EvictAsync(cacheKey); - } - }, CommandFlags.FireAndForget); + if (shouldEvictLocally) + { + await cacheStack.EvictAsync(cacheKey); + } + }); } } } diff --git a/tests/CacheTower.Tests/Extensions/Redis/RedisLockExtensionTests.cs b/tests/CacheTower.Tests/Extensions/Redis/RedisLockExtensionTests.cs index 46400c54..153fcb44 100644 --- a/tests/CacheTower.Tests/Extensions/Redis/RedisLockExtensionTests.cs +++ b/tests/CacheTower.Tests/Extensions/Redis/RedisLockExtensionTests.cs @@ -8,6 +8,7 @@ using CacheTower.Tests.Utils; using Microsoft.VisualStudio.TestTools.UnitTesting; using Moq; +using StackExchange.Redis; namespace CacheTower.Tests.Extensions.Redis { @@ -44,6 +45,8 @@ public void ThrowForRegisteringTwoCacheStacks() [TestMethod] public async Task CustomLockTimeout() { + RedisHelper.FlushDatabase(); + var extension = new RedisLockExtension(RedisHelper.GetConnection(), lockTimeout: TimeSpan.FromDays(1)); var refreshWaiterTask = new TaskCompletionSource(); var lockWaiterTask = new TaskCompletionSource(); @@ -77,66 +80,56 @@ public async Task CustomLockTimeout() [TestMethod] public async Task RefreshValueNotifiesChannelSubscribers() { + RedisHelper.FlushDatabase(); + var connection = RedisHelper.GetConnection(); - var taskCompletionSource = new TaskCompletionSource(); + + var cacheStackMock = new Mock(); + var extension = new RedisLockExtension(connection); + extension.Register(cacheStackMock.Object); + + var completionSource = new TaskCompletionSource(); await connection.GetSubscriber().SubscribeAsync("CacheTower.CacheLock", (channel, value) => { if (value == "TestKey") { - taskCompletionSource.SetResult(true); + completionSource.SetResult(true); + } + else + { + completionSource.SetResult(false); } - - Assert.Fail($"Unexpected value '{value}'"); - taskCompletionSource.SetResult(false); }); - var cacheStackMock = new Mock(); - var extension = new RedisLockExtension(connection); - extension.Register(cacheStackMock.Object); - var cacheEntry = new CacheEntry(13, TimeSpan.FromDays(1)); - await extension.RefreshValueAsync("TestKey", + await extension.RefreshValueAsync("TestKey", () => new ValueTask>(cacheEntry), new CacheSettings(TimeSpan.FromDays(1))); - var waitTask = taskCompletionSource.Task; - await Task.WhenAny(waitTask, Task.Delay(TimeSpan.FromSeconds(10))); - - if (!waitTask.IsCompleted) - { - Assert.Fail("Subscriber response took too long"); - } - - Assert.IsTrue(waitTask.Result, "Subscribers were not notified about the refreshed value"); + var succeedingTask = await Task.WhenAny(completionSource.Task, Task.Delay(TimeSpan.FromSeconds(10))); + Assert.AreEqual(completionSource.Task, succeedingTask, "Subscriber response took too long"); + Assert.IsTrue(completionSource.Task.Result, "Subscribers were not notified about the refreshed value"); } [TestMethod] - public async Task WaitingTaskInSameInstanceUnlocksAndCompletes() + public async Task ObservedLockSingle() { + RedisHelper.FlushDatabase(); + var connection = RedisHelper.GetConnection(); - + var cacheStackMock = new Mock(); var extension = new RedisLockExtension(connection); extension.Register(cacheStackMock.Object); var cacheEntry = new CacheEntry(13, TimeSpan.FromDays(1)); - var secondaryTaskKickoff = new TaskCompletionSource(); - var primaryTask = extension.RefreshValueAsync("TestKey", - async () => - { - secondaryTaskKickoff.SetResult(true); - await Task.Delay(3000); - return cacheEntry; - }, - new CacheSettings(TimeSpan.FromDays(1)) - ).AsTask(); + //Establish lock + await connection.GetDatabase().StringSetAsync("TestKey", RedisValue.EmptyString); - await secondaryTaskKickoff.Task; - - var secondaryTask = extension.RefreshValueAsync("TestKey", + var refreshTask = extension.RefreshValueAsync("TestKey", () => { return new ValueTask>(cacheEntry); @@ -144,45 +137,45 @@ public async Task WaitingTaskInSameInstanceUnlocksAndCompletes() new CacheSettings(TimeSpan.FromDays(1)) ).AsTask(); - var succeedingTask = await Task.WhenAny(primaryTask, secondaryTask); - Assert.AreEqual(await primaryTask, await succeedingTask, "Processing task call didn't complete first - something has gone very wrong."); + //Delay to allow for Redis check and self-entry into lock + await Task.Delay(TimeSpan.FromSeconds(1)); + + Assert.IsTrue(extension.LockedOnKeyRefresh.ContainsKey("TestKey"), "Lock was not established"); - //Let the secondary task finish before we verify ICacheStack method calls - await secondaryTask; + //Trigger the end of the lock + await connection.GetSubscriber().PublishAsync("CacheTower.CacheLock", "TestKey"); - cacheStackMock.Verify(c => c.GetAsync("TestKey"), Times.Exactly(2), "Missed checks whether waiting was required or retrieving the updated value"); + var succeedingTask = await Task.WhenAny(refreshTask, Task.Delay(TimeSpan.FromSeconds(10))); + Assert.AreEqual(refreshTask, succeedingTask, "Refresh has timed out - something has gone very wrong"); + cacheStackMock.Verify(c => c.GetAsync("TestKey"), Times.Exactly(2), "Two checks to the cache stack are expected"); } [TestMethod] - public async Task WaitingTaskInDifferentInstanceUnlocksAndCompletes() + public async Task ObservedLockMultiple() { - var connection = RedisHelper.GetConnection(); + RedisHelper.FlushDatabase(); - var cacheStackMockOne = new Mock(); - var extensionOne = new RedisLockExtension(connection); - extensionOne.Register(cacheStackMockOne.Object); + var connection = RedisHelper.GetConnection(); - var cacheStackMockTwo = new Mock(); - var extensionTwo = new RedisLockExtension(connection); - extensionTwo.Register(cacheStackMockTwo.Object); + var cacheStackMock = new Mock(); + var extension = new RedisLockExtension(connection); + extension.Register(cacheStackMock.Object); var cacheEntry = new CacheEntry(13, TimeSpan.FromDays(1)); - var secondaryTaskKickoff = new TaskCompletionSource(); - var primaryTask = extensionOne.RefreshValueAsync("TestKey", - async () => + //Establish lock + await connection.GetDatabase().StringSetAsync("TestKey", RedisValue.EmptyString); + + var refreshTask1 = extension.RefreshValueAsync("TestKey", + () => { - secondaryTaskKickoff.SetResult(true); - await Task.Delay(3000); - return cacheEntry; + return new ValueTask>(cacheEntry); }, new CacheSettings(TimeSpan.FromDays(1)) ).AsTask(); - await secondaryTaskKickoff.Task; - - var secondaryTask = extensionTwo.RefreshValueAsync("TestKey", + var refreshTask2 = extension.RefreshValueAsync("TestKey", () => { return new ValueTask>(cacheEntry); @@ -190,14 +183,18 @@ public async Task WaitingTaskInDifferentInstanceUnlocksAndCompletes() new CacheSettings(TimeSpan.FromDays(1)) ).AsTask(); - var succeedingTask = await Task.WhenAny(primaryTask, secondaryTask); - Assert.AreEqual(await primaryTask, await succeedingTask, "Processing task call didn't complete first - something has gone very wrong."); + //Delay to allow for Redis check and self-entry into lock + await Task.Delay(TimeSpan.FromSeconds(2)); + + Assert.IsTrue(extension.LockedOnKeyRefresh.ContainsKey("TestKey"), "Lock was not established"); - //Let the secondary task finish before we verify ICacheStack method calls - await secondaryTask; + //Trigger the end of the lock + await connection.GetSubscriber().PublishAsync("CacheTower.CacheLock", "TestKey"); - cacheStackMockOne.Verify(c => c.GetAsync("TestKey"), Times.Never, "Processing task shouldn't be querying existing values"); - cacheStackMockTwo.Verify(c => c.GetAsync("TestKey"), Times.Exactly(2), "Missed GetAsync for retrieving the updated value - this means the registered stack returned the updated value early"); + var whenAllRefreshesTask = Task.WhenAll(refreshTask1, refreshTask2); + var succeedingTask = await Task.WhenAny(whenAllRefreshesTask, Task.Delay(TimeSpan.FromSeconds(10))); + Assert.AreEqual(whenAllRefreshesTask, succeedingTask, "Refresh has timed out - something has gone very wrong"); + cacheStackMock.Verify(c => c.GetAsync("TestKey"), Times.Exactly(4), "Two checks to the cache stack are expected"); } } -} +} \ No newline at end of file diff --git a/tests/CacheTower.Tests/Extensions/Redis/RedisRemoteEvictionExtensionTests.cs b/tests/CacheTower.Tests/Extensions/Redis/RedisRemoteEvictionExtensionTests.cs index a02f1e85..37bfc582 100644 --- a/tests/CacheTower.Tests/Extensions/Redis/RedisRemoteEvictionExtensionTests.cs +++ b/tests/CacheTower.Tests/Extensions/Redis/RedisRemoteEvictionExtensionTests.cs @@ -37,7 +37,14 @@ public void ThrowForRegisteringMoreThanOneCacheStack() [TestMethod] public async Task EvictsFromChannelButNotFromRegisteredCacheStack() { + RedisHelper.FlushDatabase(); + var connection = RedisHelper.GetConnection(); + + var cacheStackMock = new Mock(); + var extension = new RedisRemoteEvictionExtension(connection); + extension.Register(cacheStackMock.Object); + var completionSource = new TaskCompletionSource(); await connection.GetSubscriber().SubscribeAsync("CacheTower.RemoteEviction", (channel, value) => @@ -46,18 +53,18 @@ await connection.GetSubscriber().SubscribeAsync("CacheTower.RemoteEviction", (ch { completionSource.SetResult(true); } + else + { + completionSource.SetResult(false); + } }); - var cacheStackMock = new Mock(); - var extension = new RedisRemoteEvictionExtension(connection); - extension.Register(cacheStackMock.Object); - await extension.OnValueRefreshAsync("TestKey", TimeSpan.FromDays(1)); - var completedTask = await Task.WhenAny(completionSource.Task, Task.Delay(TimeSpan.FromSeconds(30))); - - Assert.AreEqual(completionSource.Task, completedTask, "Subscribers were not notified about the refreshed value within the time limit"); + var succeedingTask = await Task.WhenAny(completionSource.Task, Task.Delay(TimeSpan.FromSeconds(10))); + Assert.AreEqual(completionSource.Task, succeedingTask, "Subscriber response took too long"); + Assert.IsTrue(completionSource.Task.Result, "Subscribers were not notified about the refreshed value"); cacheStackMock.Verify(c => c.EvictAsync("TestKey"), Times.Never, "The CacheStack that published the refresh was told to evict its own cache"); } } -} +} \ No newline at end of file