Skip to content

Commit

Permalink
Merge pull request #147 from TurnerSoftware/value-refresh-updates
Browse files Browse the repository at this point in the history
Changing when cache refreshes happen
  • Loading branch information
Turnerj authored Mar 2, 2021
2 parents 0d96f98 + fe16ffe commit bc7d2c7
Show file tree
Hide file tree
Showing 5 changed files with 88 additions and 81 deletions.
42 changes: 19 additions & 23 deletions src/CacheTower/CacheStack.cs
Original file line number Diff line number Diff line change
Expand Up @@ -192,28 +192,15 @@ public async ValueTask<T> GetOrSetAsync<T>(string cacheKey, Func<T, Task<T>> get
throw new ArgumentNullException(nameof(getter));
}

var currentTime = DateTime.UtcNow;
var cacheEntryPoint = await GetWithLayerIndexAsync<T>(cacheKey);
if (cacheEntryPoint != default)
if (cacheEntryPoint != default && cacheEntryPoint.CacheEntry.Expiry > currentTime)
{
var cacheEntry = cacheEntryPoint.CacheEntry;
var currentTime = DateTime.UtcNow;
if (cacheEntry.GetStaleDate(settings) < currentTime)
{
if (cacheEntry.Expiry < currentTime)
{
//Refresh the value in the current thread though short circuit if we're unable to establish a lock
//If the lock isn't established, it will instead use the stale cache entry (even if past the allowed stale period)
var refreshedCacheEntry = await RefreshValueAsync(cacheKey, getter, settings, waitForRefresh: false);
if (refreshedCacheEntry != default)
{
cacheEntry = refreshedCacheEntry;
}
}
else
{
//Refresh the value in the background
_ = RefreshValueAsync(cacheKey, getter, settings, waitForRefresh: false);
}
//Refresh the value in the background
_ = RefreshValueAsync(cacheKey, getter, settings, noExistingValueAvailable: false);
}
else if (cacheEntryPoint.LayerIndex > 0)
{
Expand All @@ -226,7 +213,7 @@ public async ValueTask<T> GetOrSetAsync<T>(string cacheKey, Func<T, Task<T>> get
else
{
//Refresh the value in the current thread though because we have no old cache value, we have to lock and wait
return (await RefreshValueAsync(cacheKey, getter, settings, waitForRefresh: true)).Value;
return (await RefreshValueAsync(cacheKey, getter, settings, noExistingValueAvailable: true)).Value;
}
}

Expand Down Expand Up @@ -268,7 +255,7 @@ private async ValueTask BackPopulateCacheAsync<T>(int fromIndexExclusive, string
}
}

private async ValueTask<CacheEntry<T>> RefreshValueAsync<T>(string cacheKey, Func<T, Task<T>> getter, CacheSettings settings, bool waitForRefresh)
private async ValueTask<CacheEntry<T>> RefreshValueAsync<T>(string cacheKey, Func<T, Task<T>> getter, CacheSettings settings, bool noExistingValueAvailable)
{
ThrowIfDisposed();

Expand All @@ -290,12 +277,21 @@ private async ValueTask<CacheEntry<T>> RefreshValueAsync<T>(string cacheKey, Fun
{
try
{
return await Extensions.WithRefreshAsync(cacheKey, async () =>
var previousEntry = await GetAsync<T>(cacheKey);
if (previousEntry != default && noExistingValueAvailable && previousEntry.Expiry < DateTime.UtcNow)
{
var previousEntry = await GetAsync<T>(cacheKey);
//The Cache Stack will always return an unexpired value if one exists.
//If we are told to refresh because one doesn't and we find one, we return the existing value, ignoring the refresh.
//This can happen due to the race condition of getting the values out of the cache.
//We can only do any of this because we have the local lock.
UnlockWaitingTasks(cacheKey, previousEntry);
return previousEntry;
}

return await Extensions.WithRefreshAsync(cacheKey, async () =>
{
var oldValue = default(T);
if (previousEntry != null)
if (previousEntry != default)
{
oldValue = previousEntry.Value;
}
Expand All @@ -314,7 +310,7 @@ private async ValueTask<CacheEntry<T>> RefreshValueAsync<T>(string cacheKey, Fun
throw;
}
}
else if (waitForRefresh)
else if (noExistingValueAvailable)
{
var delayedResultSource = new TaskCompletionSource<object>();

Expand Down
85 changes: 32 additions & 53 deletions tests/CacheTower.Tests/CacheStackTests.cs
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
using CacheTower.Providers.Memory;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Microsoft.VisualStudio.TestTools.UnitTesting;
using Moq;
using System.Threading;

namespace CacheTower.Tests
{
Expand Down Expand Up @@ -240,26 +239,25 @@ public async Task GetOrSet_CacheHit()
Assert.AreEqual(17, result);
}
[TestMethod]
public async Task GetOrSet_CacheHitBackgroundRefresh()
public async Task GetOrSet_StaleCacheHit()
{
await using var cacheStack = new CacheStack(new[] { new MemoryCacheLayer() }, Array.Empty<ICacheExtension>());
var cacheEntry = new CacheEntry<int>(17, DateTime.UtcNow.AddDays(1));
await cacheStack.SetAsync("GetOrSet_CacheHitBackgroundRefresh", cacheEntry);
var cacheEntry = new CacheEntry<int>(17, DateTime.UtcNow.AddDays(2));
await cacheStack.SetAsync("GetOrSet_StaleCacheHit", cacheEntry);

var waitingOnBackgroundTask = new TaskCompletionSource<int>();
var refreshWaitSource = new TaskCompletionSource<bool>();

var result = await cacheStack.GetOrSetAsync<int>("GetOrSet_CacheHitBackgroundRefresh", (oldValue) =>
var result = await cacheStack.GetOrSetAsync<int>("GetOrSet_StaleCacheHit", (oldValue) =>
{
waitingOnBackgroundTask.TrySetResult(27);
Assert.AreEqual(17, oldValue);
refreshWaitSource.TrySetResult(true);
return Task.FromResult(27);
}, new CacheSettings(TimeSpan.FromDays(2), TimeSpan.Zero));
Assert.AreEqual(17, result);

await waitingOnBackgroundTask.Task;
//Give 400ms to return the value and set it to the MemoryCacheLayer
await Task.Delay(400);
await Task.WhenAny(refreshWaitSource.Task, Task.Delay(TimeSpan.FromSeconds(5)));

var refetchedResult = await cacheStack.GetAsync<int>("GetOrSet_CacheHitBackgroundRefresh");
var refetchedResult = await cacheStack.GetAsync<int>("GetOrSet_StaleCacheHit");
Assert.AreEqual(27, refetchedResult.Value);
}
[TestMethod]
Expand Down Expand Up @@ -287,52 +285,33 @@ public async Task GetOrSet_BackPropagatesToEarlierCacheLayers()
Assert.IsNull(await layer3.GetAsync<int>("GetOrSet_BackPropagatesToEarlierCacheLayers"));
}
[TestMethod]
public async Task GetOrSet_CacheHitButAllowedStalePoint()
public async Task GetOrSet_ConcurrentStaleCacheHits_OnlyOneRefresh()
{
await using var cacheStack = new CacheStack(new[] { new MemoryCacheLayer() }, Array.Empty<ICacheExtension>());
var cacheEntry = new CacheEntry<int>(17, DateTime.UtcNow.AddDays(-1));
await cacheStack.SetAsync("GetOrSet_CacheHitButAllowedStalePoint", cacheEntry);
var cacheEntry = new CacheEntry<int>(23, DateTime.UtcNow.AddDays(2));
await cacheStack.SetAsync("GetOrSet_ConcurrentStaleCacheHits_OnlyOneRefresh", cacheEntry);

var result = await cacheStack.GetOrSetAsync<int>("GetOrSet_CacheHitButAllowedStalePoint", (oldValue) =>
{
return Task.FromResult(27);
}, new CacheSettings(TimeSpan.FromDays(1), TimeSpan.Zero));
Assert.AreEqual(27, result);
}
[TestMethod]
public async Task GetOrSet_ConcurrentStaleCacheHits()
{
await using var cacheStack = new CacheStack(new[] { new MemoryCacheLayer() }, Array.Empty<ICacheExtension>());
var cacheEntry = new CacheEntry<int>(23, DateTime.UtcNow.AddDays(-2));
await cacheStack.SetAsync("GetOrSet_ConcurrentStaleCacheHits", cacheEntry);

var request1LockSource = new TaskCompletionSource<bool>();
var request2StartLockSource = new TaskCompletionSource<bool>();
var refreshWaitSource = new TaskCompletionSource<bool>();
var getterCallCount = 0;

//Request 1 gets the lock on the refresh and ends up being tied up due to the TaskCompletionSource
var request1Task = cacheStack.GetOrSetAsync<int>("GetOrSet_ConcurrentStaleCacheHits", async (oldValue) =>
Parallel.For(0, 100, async v =>
{
request2StartLockSource.SetResult(true);
await request1LockSource.Task;
return 99;
}, new CacheSettings(TimeSpan.FromDays(2), TimeSpan.Zero));

await request2StartLockSource.Task;

//Request 2 sees there is a lock already and because we still at least have old data, rather than wait
//it is given the old cache data even though we are past the point where even stale data should be removed
var request2Result = await cacheStack.GetOrSetAsync<int>("GetOrSet_ConcurrentStaleCacheHits", (oldValue) =>
{
return Task.FromResult(99);
}, new CacheSettings(TimeSpan.FromDays(2), TimeSpan.Zero));

//Unlock Request 1 to to continue
request1LockSource.SetResult(true);
//Wait for Request 1 to complete so we get the new data
var request1Result = await request1Task;

Assert.AreEqual(99, request1Result);
Assert.AreEqual(23, request2Result);
await cacheStack.GetOrSetAsync<int>(
"GetOrSet_ConcurrentStaleCacheHits_OnlyOneRefresh",
async _ =>
{
await Task.Delay(250);
Interlocked.Increment(ref getterCallCount);
refreshWaitSource.TrySetResult(true);
return 27;
},
new CacheSettings(TimeSpan.FromDays(2), TimeSpan.Zero)
);
});

await Task.WhenAny(refreshWaitSource.Task, Task.Delay(TimeSpan.FromSeconds(5)));

Assert.AreEqual(1, getterCallCount);
}
[TestMethod, ExpectedException(typeof(ObjectDisposedException))]
public async Task GetOrSet_ThrowsOnUseAfterDisposal()
Expand Down
21 changes: 18 additions & 3 deletions tests/CacheTower.Tests/Extensions/Redis/RedisLockExtensionTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,12 @@ await extension.WithRefreshAsync("TestKey",
() => new ValueTask<CacheEntry<int>>(cacheEntry), new CacheSettings(TimeSpan.FromDays(1)));

var succeedingTask = await Task.WhenAny(completionSource.Task, Task.Delay(TimeSpan.FromSeconds(10)));
Assert.AreEqual(completionSource.Task, succeedingTask, "Subscriber response took too long");
if (!succeedingTask.Equals(completionSource.Task))
{
RedisHelper.DebugInfo(connection);
Assert.Fail("Subscriber response took too long");
}

Assert.IsTrue(completionSource.Task.Result, "Subscribers were not notified about the refreshed value");
}

Expand Down Expand Up @@ -142,7 +147,12 @@ public async Task ObservedLockSingle()
await connection.GetSubscriber().PublishAsync("CacheTower.CacheLock", "TestKey");

var succeedingTask = await Task.WhenAny(refreshTask, Task.Delay(TimeSpan.FromSeconds(10)));
Assert.AreEqual(refreshTask, succeedingTask, "Refresh has timed out - something has gone very wrong");
if (!succeedingTask.Equals(refreshTask))
{
RedisHelper.DebugInfo(connection);
Assert.Fail("Refresh has timed out - something has gone very wrong");
}

cacheStackMock.Verify(c => c.GetAsync<int>("TestKey"), Times.Exactly(2), "Two checks to the cache stack are expected");
}

Expand Down Expand Up @@ -189,7 +199,12 @@ public async Task ObservedLockMultiple()

var whenAllRefreshesTask = Task.WhenAll(refreshTask1, refreshTask2);
var succeedingTask = await Task.WhenAny(whenAllRefreshesTask, Task.Delay(TimeSpan.FromSeconds(10)));
Assert.AreEqual(whenAllRefreshesTask, succeedingTask, "Refresh has timed out - something has gone very wrong");
if (!succeedingTask.Equals(whenAllRefreshesTask))
{
RedisHelper.DebugInfo(connection);
Assert.Fail("Refresh has timed out - something has gone very wrong");
}

cacheStackMock.Verify(c => c.GetAsync<int>("TestKey"), Times.Exactly(4), "Two checks to the cache stack are expected");
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,12 @@ public async Task RemoteEvictionOccursOnLocalEviction()
await extensionOne.OnCacheEvictionAsync("TestKey");

var succeedingTask = await Task.WhenAny(completionSource.Task, Task.Delay(TimeSpan.FromSeconds(10)));
Assert.AreEqual(completionSource.Task, succeedingTask, "Subscriber response took too long");
if (!succeedingTask.Equals(completionSource.Task))
{
RedisHelper.DebugInfo(connection);
Assert.Fail("Subscriber response took too long");
}

Assert.IsTrue(completionSource.Task.Result, "Subscribers were not notified about the refreshed value");

await Task.Delay(500);
Expand Down Expand Up @@ -159,7 +164,12 @@ public async Task RemoteFlush()
await extensionOne.OnCacheFlushAsync();

var succeedingTask = await Task.WhenAny(completionSource.Task, Task.Delay(TimeSpan.FromSeconds(10)));
Assert.AreEqual(completionSource.Task, succeedingTask, "Subscriber response took too long");
if (!succeedingTask.Equals(completionSource.Task))
{
RedisHelper.DebugInfo(connection);
Assert.Fail("Subscriber response took too long");
}

Assert.IsTrue(completionSource.Task.Result, "Subscribers were not notified about the flush");

await Task.Delay(500);
Expand Down
7 changes: 7 additions & 0 deletions tests/CacheTower.Tests/Utils/RedisHelper.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Text;
using System.Threading.Tasks;
using StackExchange.Redis;
Expand All @@ -24,5 +25,11 @@ public static void FlushDatabase()
{
GetConnection().GetServer(Endpoint).FlushDatabase();
}

public static void DebugInfo(IConnectionMultiplexer connection)
{
Debug.WriteLine(connection.GetStatus());
Debug.WriteLine(connection.GetCounters().ToString());
}
}
}

0 comments on commit bc7d2c7

Please sign in to comment.