Skip to content

Commit

Permalink
Merge pull request #111 from TurnerSoftware/fix-redis-ci-issues
Browse files Browse the repository at this point in the history
Fix issues with Redis CI
  • Loading branch information
Turnerj authored Nov 13, 2020
2 parents 60d473d + 51d5424 commit f475292
Show file tree
Hide file tree
Showing 5 changed files with 92 additions and 84 deletions.
3 changes: 3 additions & 0 deletions src/CacheTower.Extensions.Redis/AssemblyInternals.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
using System.Runtime.CompilerServices;

[assembly: InternalsVisibleTo("CacheTower.Tests")]
2 changes: 1 addition & 1 deletion src/CacheTower.Extensions.Redis/RedisLockExtension.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ public class RedisLockExtension : IRefreshWrapperExtension

private ICacheStack RegisteredStack { get; set; }

private ConcurrentDictionary<string, IEnumerable<TaskCompletionSource<bool>>> LockedOnKeyRefresh { get; }
internal ConcurrentDictionary<string, IEnumerable<TaskCompletionSource<bool>>> LockedOnKeyRefresh { get; }

public RedisLockExtension(ConnectionMultiplexer connection, int databaseIndex = -1, string channelPrefix = "CacheTower", TimeSpan? lockTimeout = default)
{
Expand Down
25 changes: 13 additions & 12 deletions src/CacheTower.Extensions.Redis/RedisRemoteEvictionExtension.cs
Original file line number Diff line number Diff line change
Expand Up @@ -52,20 +52,21 @@ public void Register(ICacheStack cacheStack)
}
IsRegistered = true;

Subscriber.Subscribe(RedisChannel, async (channel, value) =>
{
string cacheKey = value;
var shouldEvictLocally = false;
lock (FlaggedRefreshesLockObj)
Subscriber.Subscribe(RedisChannel, CommandFlags.FireAndForget)
.OnMessage(async (channelMessage) =>
{
shouldEvictLocally = FlaggedRefreshes.Remove(cacheKey) == false;
}
string cacheKey = channelMessage.Message;
var shouldEvictLocally = false;
lock (FlaggedRefreshesLockObj)
{
shouldEvictLocally = FlaggedRefreshes.Remove(cacheKey) == false;
}

if (shouldEvictLocally)
{
await cacheStack.EvictAsync(cacheKey);
}
}, CommandFlags.FireAndForget);
if (shouldEvictLocally)
{
await cacheStack.EvictAsync(cacheKey);
}
});
}
}
}
123 changes: 60 additions & 63 deletions tests/CacheTower.Tests/Extensions/Redis/RedisLockExtensionTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
using CacheTower.Tests.Utils;
using Microsoft.VisualStudio.TestTools.UnitTesting;
using Moq;
using StackExchange.Redis;

namespace CacheTower.Tests.Extensions.Redis
{
Expand Down Expand Up @@ -44,6 +45,8 @@ public void ThrowForRegisteringTwoCacheStacks()
[TestMethod]
public async Task CustomLockTimeout()
{
RedisHelper.FlushDatabase();

var extension = new RedisLockExtension(RedisHelper.GetConnection(), lockTimeout: TimeSpan.FromDays(1));
var refreshWaiterTask = new TaskCompletionSource<bool>();
var lockWaiterTask = new TaskCompletionSource<bool>();
Expand Down Expand Up @@ -77,127 +80,121 @@ public async Task CustomLockTimeout()
[TestMethod]
public async Task RefreshValueNotifiesChannelSubscribers()
{
RedisHelper.FlushDatabase();

var connection = RedisHelper.GetConnection();
var taskCompletionSource = new TaskCompletionSource<bool>();

var cacheStackMock = new Mock<ICacheStack>();
var extension = new RedisLockExtension(connection);
extension.Register(cacheStackMock.Object);

var completionSource = new TaskCompletionSource<bool>();

await connection.GetSubscriber().SubscribeAsync("CacheTower.CacheLock", (channel, value) =>
{
if (value == "TestKey")
{
taskCompletionSource.SetResult(true);
completionSource.SetResult(true);
}
else
{
completionSource.SetResult(false);
}

Assert.Fail($"Unexpected value '{value}'");
taskCompletionSource.SetResult(false);
});

var cacheStackMock = new Mock<ICacheStack>();
var extension = new RedisLockExtension(connection);
extension.Register(cacheStackMock.Object);

var cacheEntry = new CacheEntry<int>(13, TimeSpan.FromDays(1));

await extension.RefreshValueAsync("TestKey",
await extension.RefreshValueAsync("TestKey",
() => new ValueTask<CacheEntry<int>>(cacheEntry), new CacheSettings(TimeSpan.FromDays(1)));

var waitTask = taskCompletionSource.Task;
await Task.WhenAny(waitTask, Task.Delay(TimeSpan.FromSeconds(10)));

if (!waitTask.IsCompleted)
{
Assert.Fail("Subscriber response took too long");
}

Assert.IsTrue(waitTask.Result, "Subscribers were not notified about the refreshed value");
var succeedingTask = await Task.WhenAny(completionSource.Task, Task.Delay(TimeSpan.FromSeconds(10)));
Assert.AreEqual(completionSource.Task, succeedingTask, "Subscriber response took too long");
Assert.IsTrue(completionSource.Task.Result, "Subscribers were not notified about the refreshed value");
}


[TestMethod]
public async Task WaitingTaskInSameInstanceUnlocksAndCompletes()
public async Task ObservedLockSingle()
{
RedisHelper.FlushDatabase();

var connection = RedisHelper.GetConnection();

var cacheStackMock = new Mock<ICacheStack>();
var extension = new RedisLockExtension(connection);
extension.Register(cacheStackMock.Object);

var cacheEntry = new CacheEntry<int>(13, TimeSpan.FromDays(1));
var secondaryTaskKickoff = new TaskCompletionSource<bool>();

var primaryTask = extension.RefreshValueAsync("TestKey",
async () =>
{
secondaryTaskKickoff.SetResult(true);
await Task.Delay(3000);
return cacheEntry;
},
new CacheSettings(TimeSpan.FromDays(1))
).AsTask();
//Establish lock
await connection.GetDatabase().StringSetAsync("TestKey", RedisValue.EmptyString);

await secondaryTaskKickoff.Task;

var secondaryTask = extension.RefreshValueAsync("TestKey",
var refreshTask = extension.RefreshValueAsync("TestKey",
() =>
{
return new ValueTask<CacheEntry<int>>(cacheEntry);
},
new CacheSettings(TimeSpan.FromDays(1))
).AsTask();

var succeedingTask = await Task.WhenAny(primaryTask, secondaryTask);
Assert.AreEqual(await primaryTask, await succeedingTask, "Processing task call didn't complete first - something has gone very wrong.");
//Delay to allow for Redis check and self-entry into lock
await Task.Delay(TimeSpan.FromSeconds(1));

Assert.IsTrue(extension.LockedOnKeyRefresh.ContainsKey("TestKey"), "Lock was not established");

//Let the secondary task finish before we verify ICacheStack method calls
await secondaryTask;
//Trigger the end of the lock
await connection.GetSubscriber().PublishAsync("CacheTower.CacheLock", "TestKey");

cacheStackMock.Verify(c => c.GetAsync<int>("TestKey"), Times.Exactly(2), "Missed checks whether waiting was required or retrieving the updated value");
var succeedingTask = await Task.WhenAny(refreshTask, Task.Delay(TimeSpan.FromSeconds(10)));
Assert.AreEqual(refreshTask, succeedingTask, "Refresh has timed out - something has gone very wrong");
cacheStackMock.Verify(c => c.GetAsync<int>("TestKey"), Times.Exactly(2), "Two checks to the cache stack are expected");
}


[TestMethod]
public async Task WaitingTaskInDifferentInstanceUnlocksAndCompletes()
public async Task ObservedLockMultiple()
{
var connection = RedisHelper.GetConnection();
RedisHelper.FlushDatabase();

var cacheStackMockOne = new Mock<ICacheStack>();
var extensionOne = new RedisLockExtension(connection);
extensionOne.Register(cacheStackMockOne.Object);
var connection = RedisHelper.GetConnection();

var cacheStackMockTwo = new Mock<ICacheStack>();
var extensionTwo = new RedisLockExtension(connection);
extensionTwo.Register(cacheStackMockTwo.Object);
var cacheStackMock = new Mock<ICacheStack>();
var extension = new RedisLockExtension(connection);
extension.Register(cacheStackMock.Object);

var cacheEntry = new CacheEntry<int>(13, TimeSpan.FromDays(1));
var secondaryTaskKickoff = new TaskCompletionSource<bool>();

var primaryTask = extensionOne.RefreshValueAsync("TestKey",
async () =>
//Establish lock
await connection.GetDatabase().StringSetAsync("TestKey", RedisValue.EmptyString);

var refreshTask1 = extension.RefreshValueAsync("TestKey",
() =>
{
secondaryTaskKickoff.SetResult(true);
await Task.Delay(3000);
return cacheEntry;
return new ValueTask<CacheEntry<int>>(cacheEntry);
},
new CacheSettings(TimeSpan.FromDays(1))
).AsTask();

await secondaryTaskKickoff.Task;

var secondaryTask = extensionTwo.RefreshValueAsync("TestKey",
var refreshTask2 = extension.RefreshValueAsync("TestKey",
() =>
{
return new ValueTask<CacheEntry<int>>(cacheEntry);
},
new CacheSettings(TimeSpan.FromDays(1))
).AsTask();

var succeedingTask = await Task.WhenAny(primaryTask, secondaryTask);
Assert.AreEqual(await primaryTask, await succeedingTask, "Processing task call didn't complete first - something has gone very wrong.");
//Delay to allow for Redis check and self-entry into lock
await Task.Delay(TimeSpan.FromSeconds(2));

Assert.IsTrue(extension.LockedOnKeyRefresh.ContainsKey("TestKey"), "Lock was not established");

//Let the secondary task finish before we verify ICacheStack method calls
await secondaryTask;
//Trigger the end of the lock
await connection.GetSubscriber().PublishAsync("CacheTower.CacheLock", "TestKey");

cacheStackMockOne.Verify(c => c.GetAsync<int>("TestKey"), Times.Never, "Processing task shouldn't be querying existing values");
cacheStackMockTwo.Verify(c => c.GetAsync<int>("TestKey"), Times.Exactly(2), "Missed GetAsync for retrieving the updated value - this means the registered stack returned the updated value early");
var whenAllRefreshesTask = Task.WhenAll(refreshTask1, refreshTask2);
var succeedingTask = await Task.WhenAny(whenAllRefreshesTask, Task.Delay(TimeSpan.FromSeconds(10)));
Assert.AreEqual(whenAllRefreshesTask, succeedingTask, "Refresh has timed out - something has gone very wrong");
cacheStackMock.Verify(c => c.GetAsync<int>("TestKey"), Times.Exactly(4), "Two checks to the cache stack are expected");
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,14 @@ public void ThrowForRegisteringMoreThanOneCacheStack()
[TestMethod]
public async Task EvictsFromChannelButNotFromRegisteredCacheStack()
{
RedisHelper.FlushDatabase();

var connection = RedisHelper.GetConnection();

var cacheStackMock = new Mock<ICacheStack>();
var extension = new RedisRemoteEvictionExtension(connection);
extension.Register(cacheStackMock.Object);

var completionSource = new TaskCompletionSource<bool>();

await connection.GetSubscriber().SubscribeAsync("CacheTower.RemoteEviction", (channel, value) =>
Expand All @@ -46,18 +53,18 @@ await connection.GetSubscriber().SubscribeAsync("CacheTower.RemoteEviction", (ch
{
completionSource.SetResult(true);
}
else
{
completionSource.SetResult(false);
}
});

var cacheStackMock = new Mock<ICacheStack>();
var extension = new RedisRemoteEvictionExtension(connection);
extension.Register(cacheStackMock.Object);

await extension.OnValueRefreshAsync("TestKey", TimeSpan.FromDays(1));

var completedTask = await Task.WhenAny(completionSource.Task, Task.Delay(TimeSpan.FromSeconds(30)));

Assert.AreEqual(completionSource.Task, completedTask, "Subscribers were not notified about the refreshed value within the time limit");
var succeedingTask = await Task.WhenAny(completionSource.Task, Task.Delay(TimeSpan.FromSeconds(10)));
Assert.AreEqual(completionSource.Task, succeedingTask, "Subscriber response took too long");
Assert.IsTrue(completionSource.Task.Result, "Subscribers were not notified about the refreshed value");
cacheStackMock.Verify(c => c.EvictAsync("TestKey"), Times.Never, "The CacheStack that published the refresh was told to evict its own cache");
}
}
}
}

0 comments on commit f475292

Please sign in to comment.