Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

URGENT Fix error in batch/transaction handling #2177

Merged
merged 8 commits into from
Jun 28, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/ReleaseNotes.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

## Pending

- URGENT Fix: [#2167](https://github.com/StackExchange/StackExchange.Redis/issues/2167), [#2176](https://github.com/StackExchange/StackExchange.Redis/issues/2176): fix error in batch/transaction handling that can result in out-of-order instructions ([#2177 by MarcGravell](https://github.com/StackExchange/StackExchange.Redis/pull/2177))
- Fix: [#2164](https://github.com/StackExchange/StackExchange.Redis/issues/2164): fix `LuaScript.Prepare` for scripts that don't have parameters ([#2166 by MarcGravell](https://github.com/StackExchange/StackExchange.Redis/pull/2166))

## 2.6.45
Expand Down
23 changes: 23 additions & 0 deletions src/StackExchange.Redis/RedisBatch.cs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,29 @@ public void Execute()
}
}

internal override Task<T> ExecuteAsync<T>(Message? message, ResultProcessor<T>? processor, T defaultValue, ServerEndPoint? server = null)
{
if (message == null) return CompletedTask<T>.FromDefault(defaultValue, asyncState);
multiplexer.CheckMessage(message);

// prepare the inner command as a task
Task<T> task;
if (message.IsFireAndForget)
{
task = CompletedTask<T>.FromDefault(defaultValue, null); // F+F explicitly does not get async-state
}
else
{
var source = TaskResultBox<T>.Create(out var tcs, asyncState);
task = tcs.Task;
message.SetSource(source, processor);
}

// store it
(pending ??= new List<Message>()).Add(message);
return task!;
}

internal override Task<T?> ExecuteAsync<T>(Message? message, ResultProcessor<T>? processor, ServerEndPoint? server = null) where T : default
{
if (message == null) return CompletedTask<T>.Default(asyncState);
Expand Down
32 changes: 31 additions & 1 deletion src/StackExchange.Redis/RedisTransaction.cs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,30 @@ public Task<bool> ExecuteAsync(CommandFlags flags)
return base.ExecuteAsync(msg, proc); // need base to avoid our local wrapping override
}

internal override Task<T> ExecuteAsync<T>(Message? message, ResultProcessor<T>? processor, T defaultValue, ServerEndPoint? server = null)
{
if (message == null) return CompletedTask<T>.FromDefault(defaultValue, asyncState);
multiplexer.CheckMessage(message);

multiplexer.Trace("Wrapping " + message.Command, "Transaction");
// prepare the inner command as a task
Task<T> task;
if (message.IsFireAndForget)
{
task = CompletedTask<T>.FromDefault(defaultValue, null); // F+F explicitly does not get async-state
}
else
{
var source = TaskResultBox<T>.Create(out var tcs, asyncState);
message.SetSource(source, processor);
task = tcs.Task;
}

QueueMessage(message);

return task;
}

internal override Task<T?> ExecuteAsync<T>(Message? message, ResultProcessor<T>? processor, ServerEndPoint? server = null) where T : default
{
if (message == null) return CompletedTask<T>.Default(asyncState);
Expand All @@ -75,6 +99,13 @@ public Task<bool> ExecuteAsync(CommandFlags flags)
task = tcs.Task;
}

QueueMessage(message);

return task;
}

private void QueueMessage(Message message)
{
// prepare an outer-command that decorates that, but expects QUEUED
var queued = new QueuedMessage(message);
var wasQueued = SimpleResultBox<bool>.Create();
Expand Down Expand Up @@ -102,7 +133,6 @@ public Task<bool> ExecuteAsync(CommandFlags flags)
break;
}
}
return task;
}

internal override T? ExecuteSync<T>(Message? message, ResultProcessor<T>? processor, ServerEndPoint? server = null, T? defaultValue = default) where T : default
Expand Down
83 changes: 83 additions & 0 deletions tests/StackExchange.Redis.Tests/Issues/Issue2176.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Xunit;
using Xunit.Abstractions;

namespace StackExchange.Redis.Tests.Issues
{
public class Issue2176 : TestBase
{
public Issue2176(ITestOutputHelper output) : base(output) { }

[Fact]
public void Execute_Batch()
{
using var conn = Create();
var db = conn.GetDatabase();

var me = Me();
var key = me + ":1";
var key2 = me + ":2";
var keyIntersect = me + ":result";

db.KeyDelete(key);
db.KeyDelete(key2);
db.KeyDelete(keyIntersect);
db.SortedSetAdd(key, "a", 1345);

var tasks = new List<Task>();
var batch = db.CreateBatch();
tasks.Add(batch.SortedSetAddAsync(key2, "a", 4567));
tasks.Add(batch.SortedSetCombineAndStoreAsync(SetOperation.Intersect,
keyIntersect, new RedisKey[] { key, key2 }));
var rangeByRankTask = batch.SortedSetRangeByRankAsync(keyIntersect);
tasks.Add(rangeByRankTask);
batch.Execute();

Task.WhenAll(tasks.ToArray());

var rangeByRankSortedSetValues = rangeByRankTask.Result;

int size = rangeByRankSortedSetValues.Length;
Assert.Equal(1, size);
string firstRedisValue = rangeByRankSortedSetValues.FirstOrDefault().ToString();
Assert.Equal("a", firstRedisValue);
}

[Fact]
public void Execute_Transaction()
{
using var conn = Create();
var db = conn.GetDatabase();

var me = Me();
var key = me + ":1";
var key2 = me + ":2";
var keyIntersect = me + ":result";

db.KeyDelete(key);
db.KeyDelete(key2);
db.KeyDelete(keyIntersect);
db.SortedSetAdd(key, "a", 1345);

var tasks = new List<Task>();
var batch = db.CreateTransaction();
tasks.Add(batch.SortedSetAddAsync(key2, "a", 4567));
tasks.Add(batch.SortedSetCombineAndStoreAsync(SetOperation.Intersect,
keyIntersect, new RedisKey[] { key, key2 }));
var rangeByRankTask = batch.SortedSetRangeByRankAsync(keyIntersect);
tasks.Add(rangeByRankTask);
batch.Execute();

Task.WhenAll(tasks.ToArray());

var rangeByRankSortedSetValues = rangeByRankTask.Result;

int size = rangeByRankSortedSetValues.Length;
Assert.Equal(1, size);
string firstRedisValue = rangeByRankSortedSetValues.FirstOrDefault().ToString();
Assert.Equal("a", firstRedisValue);
}
}
}