Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix issues with Redis CI #111

Merged
merged 2 commits into from
Nov 13, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/CacheTower.Extensions.Redis/AssemblyInternals.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
using System.Runtime.CompilerServices;

[assembly: InternalsVisibleTo("CacheTower.Tests")]
2 changes: 1 addition & 1 deletion src/CacheTower.Extensions.Redis/RedisLockExtension.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ public class RedisLockExtension : IRefreshWrapperExtension

private ICacheStack RegisteredStack { get; set; }

private ConcurrentDictionary<string, IEnumerable<TaskCompletionSource<bool>>> LockedOnKeyRefresh { get; }
internal ConcurrentDictionary<string, IEnumerable<TaskCompletionSource<bool>>> LockedOnKeyRefresh { get; }

public RedisLockExtension(ConnectionMultiplexer connection, int databaseIndex = -1, string channelPrefix = "CacheTower", TimeSpan? lockTimeout = default)
{
Expand Down
25 changes: 13 additions & 12 deletions src/CacheTower.Extensions.Redis/RedisRemoteEvictionExtension.cs
Original file line number Diff line number Diff line change
Expand Up @@ -52,20 +52,21 @@ public void Register(ICacheStack cacheStack)
}
IsRegistered = true;

Subscriber.Subscribe(RedisChannel, async (channel, value) =>
{
string cacheKey = value;
var shouldEvictLocally = false;
lock (FlaggedRefreshesLockObj)
Subscriber.Subscribe(RedisChannel, CommandFlags.FireAndForget)
.OnMessage(async (channelMessage) =>
{
shouldEvictLocally = FlaggedRefreshes.Remove(cacheKey) == false;
}
string cacheKey = channelMessage.Message;
var shouldEvictLocally = false;
lock (FlaggedRefreshesLockObj)
{
shouldEvictLocally = FlaggedRefreshes.Remove(cacheKey) == false;
}

if (shouldEvictLocally)
{
await cacheStack.EvictAsync(cacheKey);
}
}, CommandFlags.FireAndForget);
if (shouldEvictLocally)
{
await cacheStack.EvictAsync(cacheKey);
}
});
}
}
}
123 changes: 60 additions & 63 deletions tests/CacheTower.Tests/Extensions/Redis/RedisLockExtensionTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
using CacheTower.Tests.Utils;
using Microsoft.VisualStudio.TestTools.UnitTesting;
using Moq;
using StackExchange.Redis;

namespace CacheTower.Tests.Extensions.Redis
{
Expand Down Expand Up @@ -44,6 +45,8 @@ public void ThrowForRegisteringTwoCacheStacks()
[TestMethod]
public async Task CustomLockTimeout()
{
RedisHelper.FlushDatabase();

var extension = new RedisLockExtension(RedisHelper.GetConnection(), lockTimeout: TimeSpan.FromDays(1));
var refreshWaiterTask = new TaskCompletionSource<bool>();
var lockWaiterTask = new TaskCompletionSource<bool>();
Expand Down Expand Up @@ -77,127 +80,121 @@ public async Task CustomLockTimeout()
[TestMethod]
public async Task RefreshValueNotifiesChannelSubscribers()
{
RedisHelper.FlushDatabase();

var connection = RedisHelper.GetConnection();
var taskCompletionSource = new TaskCompletionSource<bool>();

var cacheStackMock = new Mock<ICacheStack>();
var extension = new RedisLockExtension(connection);
extension.Register(cacheStackMock.Object);

var completionSource = new TaskCompletionSource<bool>();

await connection.GetSubscriber().SubscribeAsync("CacheTower.CacheLock", (channel, value) =>
{
if (value == "TestKey")
{
taskCompletionSource.SetResult(true);
completionSource.SetResult(true);
}
else
{
completionSource.SetResult(false);
}

Assert.Fail($"Unexpected value '{value}'");
taskCompletionSource.SetResult(false);
});

var cacheStackMock = new Mock<ICacheStack>();
var extension = new RedisLockExtension(connection);
extension.Register(cacheStackMock.Object);

var cacheEntry = new CacheEntry<int>(13, TimeSpan.FromDays(1));

await extension.RefreshValueAsync("TestKey",
await extension.RefreshValueAsync("TestKey",
() => new ValueTask<CacheEntry<int>>(cacheEntry), new CacheSettings(TimeSpan.FromDays(1)));

var waitTask = taskCompletionSource.Task;
await Task.WhenAny(waitTask, Task.Delay(TimeSpan.FromSeconds(10)));

if (!waitTask.IsCompleted)
{
Assert.Fail("Subscriber response took too long");
}

Assert.IsTrue(waitTask.Result, "Subscribers were not notified about the refreshed value");
var succeedingTask = await Task.WhenAny(completionSource.Task, Task.Delay(TimeSpan.FromSeconds(10)));
Assert.AreEqual(completionSource.Task, succeedingTask, "Subscriber response took too long");
Assert.IsTrue(completionSource.Task.Result, "Subscribers were not notified about the refreshed value");
}


[TestMethod]
public async Task WaitingTaskInSameInstanceUnlocksAndCompletes()
public async Task ObservedLockSingle()
{
RedisHelper.FlushDatabase();

var connection = RedisHelper.GetConnection();

var cacheStackMock = new Mock<ICacheStack>();
var extension = new RedisLockExtension(connection);
extension.Register(cacheStackMock.Object);

var cacheEntry = new CacheEntry<int>(13, TimeSpan.FromDays(1));
var secondaryTaskKickoff = new TaskCompletionSource<bool>();

var primaryTask = extension.RefreshValueAsync("TestKey",
async () =>
{
secondaryTaskKickoff.SetResult(true);
await Task.Delay(3000);
return cacheEntry;
},
new CacheSettings(TimeSpan.FromDays(1))
).AsTask();
//Establish lock
await connection.GetDatabase().StringSetAsync("TestKey", RedisValue.EmptyString);

await secondaryTaskKickoff.Task;

var secondaryTask = extension.RefreshValueAsync("TestKey",
var refreshTask = extension.RefreshValueAsync("TestKey",
() =>
{
return new ValueTask<CacheEntry<int>>(cacheEntry);
},
new CacheSettings(TimeSpan.FromDays(1))
).AsTask();

var succeedingTask = await Task.WhenAny(primaryTask, secondaryTask);
Assert.AreEqual(await primaryTask, await succeedingTask, "Processing task call didn't complete first - something has gone very wrong.");
//Delay to allow for Redis check and self-entry into lock
await Task.Delay(TimeSpan.FromSeconds(1));

Assert.IsTrue(extension.LockedOnKeyRefresh.ContainsKey("TestKey"), "Lock was not established");

//Let the secondary task finish before we verify ICacheStack method calls
await secondaryTask;
//Trigger the end of the lock
await connection.GetSubscriber().PublishAsync("CacheTower.CacheLock", "TestKey");

cacheStackMock.Verify(c => c.GetAsync<int>("TestKey"), Times.Exactly(2), "Missed checks whether waiting was required or retrieving the updated value");
var succeedingTask = await Task.WhenAny(refreshTask, Task.Delay(TimeSpan.FromSeconds(10)));
Assert.AreEqual(refreshTask, succeedingTask, "Refresh has timed out - something has gone very wrong");
cacheStackMock.Verify(c => c.GetAsync<int>("TestKey"), Times.Exactly(2), "Two checks to the cache stack are expected");
}


[TestMethod]
public async Task WaitingTaskInDifferentInstanceUnlocksAndCompletes()
public async Task ObservedLockMultiple()
{
var connection = RedisHelper.GetConnection();
RedisHelper.FlushDatabase();

var cacheStackMockOne = new Mock<ICacheStack>();
var extensionOne = new RedisLockExtension(connection);
extensionOne.Register(cacheStackMockOne.Object);
var connection = RedisHelper.GetConnection();

var cacheStackMockTwo = new Mock<ICacheStack>();
var extensionTwo = new RedisLockExtension(connection);
extensionTwo.Register(cacheStackMockTwo.Object);
var cacheStackMock = new Mock<ICacheStack>();
var extension = new RedisLockExtension(connection);
extension.Register(cacheStackMock.Object);

var cacheEntry = new CacheEntry<int>(13, TimeSpan.FromDays(1));
var secondaryTaskKickoff = new TaskCompletionSource<bool>();

var primaryTask = extensionOne.RefreshValueAsync("TestKey",
async () =>
//Establish lock
await connection.GetDatabase().StringSetAsync("TestKey", RedisValue.EmptyString);

var refreshTask1 = extension.RefreshValueAsync("TestKey",
() =>
{
secondaryTaskKickoff.SetResult(true);
await Task.Delay(3000);
return cacheEntry;
return new ValueTask<CacheEntry<int>>(cacheEntry);
},
new CacheSettings(TimeSpan.FromDays(1))
).AsTask();

await secondaryTaskKickoff.Task;

var secondaryTask = extensionTwo.RefreshValueAsync("TestKey",
var refreshTask2 = extension.RefreshValueAsync("TestKey",
() =>
{
return new ValueTask<CacheEntry<int>>(cacheEntry);
},
new CacheSettings(TimeSpan.FromDays(1))
).AsTask();

var succeedingTask = await Task.WhenAny(primaryTask, secondaryTask);
Assert.AreEqual(await primaryTask, await succeedingTask, "Processing task call didn't complete first - something has gone very wrong.");
//Delay to allow for Redis check and self-entry into lock
await Task.Delay(TimeSpan.FromSeconds(2));

Assert.IsTrue(extension.LockedOnKeyRefresh.ContainsKey("TestKey"), "Lock was not established");

//Let the secondary task finish before we verify ICacheStack method calls
await secondaryTask;
//Trigger the end of the lock
await connection.GetSubscriber().PublishAsync("CacheTower.CacheLock", "TestKey");

cacheStackMockOne.Verify(c => c.GetAsync<int>("TestKey"), Times.Never, "Processing task shouldn't be querying existing values");
cacheStackMockTwo.Verify(c => c.GetAsync<int>("TestKey"), Times.Exactly(2), "Missed GetAsync for retrieving the updated value - this means the registered stack returned the updated value early");
var whenAllRefreshesTask = Task.WhenAll(refreshTask1, refreshTask2);
var succeedingTask = await Task.WhenAny(whenAllRefreshesTask, Task.Delay(TimeSpan.FromSeconds(10)));
Assert.AreEqual(whenAllRefreshesTask, succeedingTask, "Refresh has timed out - something has gone very wrong");
cacheStackMock.Verify(c => c.GetAsync<int>("TestKey"), Times.Exactly(4), "Two checks to the cache stack are expected");
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,14 @@ public void ThrowForRegisteringMoreThanOneCacheStack()
[TestMethod]
public async Task EvictsFromChannelButNotFromRegisteredCacheStack()
{
RedisHelper.FlushDatabase();

var connection = RedisHelper.GetConnection();

var cacheStackMock = new Mock<ICacheStack>();
var extension = new RedisRemoteEvictionExtension(connection);
extension.Register(cacheStackMock.Object);

var completionSource = new TaskCompletionSource<bool>();

await connection.GetSubscriber().SubscribeAsync("CacheTower.RemoteEviction", (channel, value) =>
Expand All @@ -46,18 +53,18 @@ await connection.GetSubscriber().SubscribeAsync("CacheTower.RemoteEviction", (ch
{
completionSource.SetResult(true);
}
else
{
completionSource.SetResult(false);
}
});

var cacheStackMock = new Mock<ICacheStack>();
var extension = new RedisRemoteEvictionExtension(connection);
extension.Register(cacheStackMock.Object);

await extension.OnValueRefreshAsync("TestKey", TimeSpan.FromDays(1));

var completedTask = await Task.WhenAny(completionSource.Task, Task.Delay(TimeSpan.FromSeconds(30)));

Assert.AreEqual(completionSource.Task, completedTask, "Subscribers were not notified about the refreshed value within the time limit");
var succeedingTask = await Task.WhenAny(completionSource.Task, Task.Delay(TimeSpan.FromSeconds(10)));
Assert.AreEqual(completionSource.Task, succeedingTask, "Subscriber response took too long");
Assert.IsTrue(completionSource.Task.Result, "Subscribers were not notified about the refreshed value");
cacheStackMock.Verify(c => c.EvictAsync("TestKey"), Times.Never, "The CacheStack that published the refresh was told to evict its own cache");
}
}
}
}