Skip to content

Commit

Permalink
Add unit test to reproduce the issue FoundatioFx#64 (FoundatioFx#65)
Browse files Browse the repository at this point in the history
fixed issue FoundatioFx#64 - DequeueIdAsync might leave queue item in "phantom" state
  • Loading branch information
Seadoo2017 authored Jul 30, 2020
1 parent 2239399 commit 82859e3
Show file tree
Hide file tree
Showing 4 changed files with 133 additions and 13 deletions.
2 changes: 1 addition & 1 deletion src/Foundatio.Redis/Cache/RedisCacheClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ public sealed class RedisCacheClient : ICacheClient, IHaveSerializer {
private LoadedLuaScript _removeIfEqual;
private LoadedLuaScript _replaceIfEqual;
private LoadedLuaScript _setIfHigher;
private LoadedLuaScript _setIfLower;
private LoadedLuaScript _setIfLower;

public RedisCacheClient(RedisCacheClientOptions options) {
_options = options;
Expand Down
55 changes: 43 additions & 12 deletions src/Foundatio.Redis/Queues/RedisQueue.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
using Foundatio.Utility;
using Microsoft.Extensions.Logging;
using StackExchange.Redis;
using Foundatio.Redis.Utility;
#pragma warning disable 4014

namespace Foundatio.Queues {
Expand All @@ -30,6 +31,9 @@ public class RedisQueue<T> : QueueBase<T, RedisQueueOptions<T>> where T : class
private Task _maintenanceTask;
private bool _isSubscribed;
private readonly TimeSpan _payloadTimeToLive;
private bool _scriptsLoaded;

private LoadedLuaScript _dequeueId;

public RedisQueue(RedisQueueOptions<T> options) : base(options) {
if (options.ConnectionMultiplexer == null)
Expand Down Expand Up @@ -275,14 +279,7 @@ protected override async Task<IQueueEntry<T>> DequeueImplAsync(CancellationToken

if (value.IsNullOrEmpty)
return null;

var wiTimeoutTtl = GetWorkItemTimeoutTimeTtl();
long now = SystemClock.UtcNow.Ticks;
await Run.WithRetriesAsync(() => Task.WhenAll(
_cache.SetAsync(GetDequeuedTimeKey(value), now, wiTimeoutTtl),
_cache.SetAsync(GetRenewedTimeKey(value), now, wiTimeoutTtl)
), logger: _logger).AnyContext();


try {
var entry = await GetQueueEntryAsync(value).AnyContext();
if (entry == null)
Expand Down Expand Up @@ -323,13 +320,44 @@ private async Task<QueueEntry<T>> GetQueueEntryAsync(string workId) {
private async Task<RedisValue> DequeueIdAsync(CancellationToken linkedCancellationToken) {
try {
return await Run.WithRetriesAsync(async () => {
return await Database.ListRightPopLeftPushAsync(_queueListName, _workListName).AnyContext();
}, 3, TimeSpan.FromMilliseconds(100), linkedCancellationToken, _logger).AnyContext();
} catch (Exception) {
var wiTimeoutTtl = GetWorkItemTimeoutTimeTtl();
long now = SystemClock.UtcNow.Ticks;

// we must move the item between the in and work queues and set the dequeued & renewed keys in transaction to avoid situations where
// we have item in the work queue without the keys which will prevent maintainance from handling it
// we can't use transaction as we need to use the result of ListRightPopLeftPushAsync to generate the keys
await LoadScriptsAsync().AnyContext();
var result = await Database.ScriptEvaluateAsync(_dequeueId, new { queueListName = _queueListName, workListName = _workListName, queueName = _options.Name, now, wiTimeoutTtl = wiTimeoutTtl.Ticks }).AnyContext();
return result.ToString();
} , 3, TimeSpan.FromMilliseconds(100), linkedCancellationToken, _logger).AnyContext();
} catch (Exception ex) {
if (_logger.IsEnabled(LogLevel.Error)) _logger.LogError("Queue {Name} dequeue id async error: {Error}", _options.Name, ex);
return RedisValue.Null;
}
}

private async Task LoadScriptsAsync() {
if (_scriptsLoaded)
return;

using (await _lock.LockAsync().AnyContext()) {
if (_scriptsLoaded)
return;

var dequeueId = LuaScript.Prepare(DequeueIdScript);

foreach (var endpoint in _options.ConnectionMultiplexer.GetEndPoints()) {
var server = _options.ConnectionMultiplexer.GetServer(endpoint);
if (server.IsReplica)
continue;

_dequeueId = await dequeueId.LoadAsync(server).AnyContext();
}

_scriptsLoaded = true;
}
}

public override async Task CompleteAsync(IQueueEntry<T> entry) {
if (_logger.IsEnabled(LogLevel.Debug)) _logger.LogDebug("Queue {Name} complete item: {EntryId}", _options.Name, entry.Id);
if (entry.IsAbandoned || entry.IsCompleted)
Expand Down Expand Up @@ -506,8 +534,9 @@ private void OnTopicMessage(RedisChannel redisChannel, RedisValue redisValue) {
_autoResetEvent.Set();
}

private void ConnectionMultiplexerOnConnectionRestored(object sender, ConnectionFailedEventArgs connectionFailedEventArgs) {
private void ConnectionMultiplexerOnConnectionRestored(object sender, ConnectionFailedEventArgs connectionFailedEventArgs) {
if (_logger.IsEnabled(LogLevel.Information)) _logger.LogInformation("Redis connection restored.");
_scriptsLoaded = false;
_autoResetEvent.Set();
}

Expand Down Expand Up @@ -614,5 +643,7 @@ public override void Dispose() {

_cache.Dispose();
}

private static readonly string DequeueIdScript = EmbeddedResourceLoader.GetEmbeddedResource("Foundatio.Redis.Scripts.DequeueId.lua");
}
}
10 changes: 10 additions & 0 deletions src/Foundatio.Redis/Scripts/DequeueId.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
local item = redis.call('RPOPLPUSH', @queueListName, @workListName);
if item then
local dequeuedTimeKey = 'q:' .. @queueName .. ':' .. item .. ':dequeued';
local renewedTimeKey = 'q:' .. @queueName .. ':' .. item .. ':renewed';
redis.call('SET', dequeuedTimeKey, @now, 'EX', @wiTimeoutTtl);
redis.call('SET', renewedTimeKey, @now, 'EX', @wiTimeoutTtl);
return item;
else
return nil;
end
79 changes: 79 additions & 0 deletions tests/Foundatio.Redis.Tests/Queues/RedisQueueTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
using Xunit;
using Xunit.Abstractions;
using Foundatio.Xunit;
using Foundatio.Tests.Utility;
using StackExchange.Redis;
using System.Diagnostics;

#pragma warning disable 4014

Expand Down Expand Up @@ -420,6 +423,82 @@ public async Task VerifyFirstDequeueTimeout() {
}
}

// test to reproduce issue #64 - https://github.com/FoundatioFx/Foundatio.Redis/issues/64
// should be skipped by default and run only by demand or 5 sec run is acceptable?
//[Fact(Skip ="This test needs to simulate database timeout which makes the runtime ~5 sec which might be too big to be run automatically")]
[Fact]
public async Task DatabaseTimeoutDuringDequeueHandledCorectly_Issue64() {
// not using GetQueue() here because I need to change the ops timeout in the redis connection string
const int OPS_TIMEOUT_MS = 100;
string connectionString = Configuration.GetConnectionString("RedisConnectionString") + $",syncTimeout={OPS_TIMEOUT_MS},asyncTimeout={OPS_TIMEOUT_MS}"; ;
var muxer = await ConnectionMultiplexer.ConnectAsync(connectionString);

const string QUEUE_NAME = "Test";
var queue = new RedisQueue<SimpleWorkItem>(o => o
.ConnectionMultiplexer(muxer)
.LoggerFactory(Log)
.Name(QUEUE_NAME)
.RunMaintenanceTasks(false)
);

// enqueue item to queue, no reader yet
await queue.EnqueueAsync(new SimpleWorkItem());

// create database, we want to cause delay in redis to reproduce the issue
var database = muxer.GetDatabase();

// sync / async ops timeout is not working as described: https://stackexchange.github.io/StackExchange.Redis/Configuration
// it should have timed out after 100 ms but it actually takes a lot more time to time out so we have to use longer delay until this issue is resolved
// value can be up to 1,000,000 - 1
//const int DELAY_TIME_USEC = 200000; // 200 msec
//string databaseDelayScript = $"local usecnow = tonumber(redis.call(\"time\")[2]); while ((((tonumber(redis.call(\"time\")[2]) - usecnow) + 1000000) % 1000000) < {DELAY_TIME_USEC}) do end";

const int DELAY_TIME_SEC = 5;
string databaseDelayScript = $@"
local now = tonumber(redis.call(""time"")[1]);
while ((((tonumber(redis.call(""time"")[1]) - now))) < {DELAY_TIME_SEC}) " +
"do end";

// db will be busy for DELAY_TIME_USEC which will cause timeout on the dequeue to follow
database.ScriptEvaluateAsync(databaseDelayScript);

var completion = new TaskCompletionSource<bool>();
await queue.StartWorkingAsync(async (item) => {
await item.CompleteAsync();
completion.SetResult(true);
});

// wait for the databaseDelayScript to finish
await Task.Delay(DELAY_TIME_SEC * 1000);

// item should've either time out at some iterations and after databaseDelayScript is done be received
// or it might have moved to work, in this case we want to make sure the correct keys were created
Stopwatch stopwatch = Stopwatch.StartNew();
bool success = false;
while (stopwatch.Elapsed.TotalSeconds < 10) {

string workListName = $"q:{QUEUE_NAME}:work";
long workListLen = await database.ListLengthAsync(new RedisKey(workListName));
var item = await database.ListLeftPopAsync(workListName);
string dequeuedItemKey = String.Concat("q:", QUEUE_NAME, ":", item, ":dequeued");
bool dequeuedItemKeyExists = await database.KeyExistsAsync(new RedisKey(dequeuedItemKey));
if (workListLen == 1) {
Assert.True(dequeuedItemKeyExists);
success = true;
break;
}

var timeoutCancellationTokenSource = new CancellationTokenSource();
var completedTask = await Task.WhenAny(completion.Task, Task.Delay(TimeSpan.FromMilliseconds(100), timeoutCancellationTokenSource.Token));
if (completion.Task == completedTask) {
success = true;
break;
}
}

Assert.True(success);
}

// TODO: Need to write tests that verify the cache data is correct after each operation.

[Fact(Skip = "Performance Test")]
Expand Down

0 comments on commit 82859e3

Please sign in to comment.