Skip to content

Commit

Permalink
expose IAsyncEnumerable on ChannelMessageQueue (#2402)
Browse files Browse the repository at this point in the history
* expose IAsyncEnumerable on ChannelMessageQueue
fix #2400

* PR number

* move ChannelMessageQueue.GetAsyncEnumerator to shipped
  • Loading branch information
mgravell authored Mar 15, 2023
1 parent 867b04d commit 9698aaa
Show file tree
Hide file tree
Showing 4 changed files with 58 additions and 7 deletions.
4 changes: 2 additions & 2 deletions docs/ReleaseNotes.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,11 @@ Current package versions:

## Unreleased

No pending changes.
- Fix [#2400](https://github.com/StackExchange/StackExchange.Redis/issues/2400): Expose `ChannelMessageQueue` as `IAsyncEnumerable<ChannelMessage>` ([#2402 by mgravell](https://github.com/StackExchange/StackExchange.Redis/pull/2402))

## 2.6.96

- Fix [#2350](https://github.com/StackExchange/StackExchange.Redis/issues/2350): Properly parse lua script paramters in all cultures ([#2351 by NickCraver](https://github.com/StackExchange/StackExchange.Redis/pull/2351))
- Fix [#2350](https://github.com/StackExchange/StackExchange.Redis/issues/2350): Properly parse lua script parameters in all cultures ([#2351 by NickCraver](https://github.com/StackExchange/StackExchange.Redis/pull/2351))
- Fix [#2362](https://github.com/StackExchange/StackExchange.Redis/issues/2362): Set `RedisConnectionException.FailureType` to `AuthenticationFailure` on all authentication scenarios for better handling ([#2367 by NickCraver](https://github.com/StackExchange/StackExchange.Redis/pull/2367))
- Fix [#2368](https://github.com/StackExchange/StackExchange.Redis/issues/2368): Support `RedisValue.Length()` for all storage types ([#2370 by mgravell](https://github.com/StackExchange/StackExchange.Redis/pull/2370))
- Fix [#2376](https://github.com/StackExchange/StackExchange.Redis/issues/2376): Avoid a (rare) deadlock scenario ([#2378 by mgravell](https://github.com/StackExchange/StackExchange.Redis/pull/2378))
Expand Down
26 changes: 21 additions & 5 deletions src/StackExchange.Redis/ChannelMessageQueue.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
using System;
using System.Collections.Generic;
using System.Reflection;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
Expand Down Expand Up @@ -66,7 +68,7 @@ internal ChannelMessage(ChannelMessageQueue queue, in RedisChannel channel, in R
/// To create a ChannelMessageQueue, use <see cref="ISubscriber.Subscribe(RedisChannel, CommandFlags)"/>
/// or <see cref="ISubscriber.SubscribeAsync(RedisChannel, CommandFlags)"/>.
/// </remarks>
public sealed class ChannelMessageQueue
public sealed class ChannelMessageQueue : IAsyncEnumerable<ChannelMessage>
{
private readonly Channel<ChannelMessage> _queue;
/// <summary>
Expand Down Expand Up @@ -319,10 +321,7 @@ internal void UnsubscribeImpl(Exception? error = null, CommandFlags flags = Comm
{
var parent = _parent;
_parent = null;
if (parent != null)
{
parent.UnsubscribeAsync(Channel, null, this, flags);
}
parent?.UnsubscribeAsync(Channel, null, this, flags);
_queue.Writer.TryComplete(error);
}

Expand All @@ -348,5 +347,22 @@ internal async Task UnsubscribeAsyncImpl(Exception? error = null, CommandFlags f
/// </summary>
/// <param name="flags">The flags to use when unsubscribing.</param>
public Task UnsubscribeAsync(CommandFlags flags = CommandFlags.None) => UnsubscribeAsyncImpl(null, flags);

/// <inheritdoc cref="IAsyncEnumerable{ChannelMessage}.GetAsyncEnumerator(CancellationToken)"/>
#if NETCOREAPP3_0_OR_GREATER
public IAsyncEnumerator<ChannelMessage> GetAsyncEnumerator(CancellationToken cancellationToken = default)
=> _queue.Reader.ReadAllAsync().GetAsyncEnumerator(cancellationToken);
#else
public async IAsyncEnumerator<ChannelMessage> GetAsyncEnumerator(CancellationToken cancellationToken = default)
{
while (await _queue.Reader.WaitToReadAsync(cancellationToken).ConfigureAwait(false))
{
while (_queue.Reader.TryRead(out var item))
{
yield return item;
}
}
}
#endif
}
}
1 change: 1 addition & 0 deletions src/StackExchange.Redis/PublicAPI/PublicAPI.Shipped.txt
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ StackExchange.Redis.ChannelMessage.SubscriptionChannel.get -> StackExchange.Redi
StackExchange.Redis.ChannelMessageQueue
StackExchange.Redis.ChannelMessageQueue.Channel.get -> StackExchange.Redis.RedisChannel
StackExchange.Redis.ChannelMessageQueue.Completion.get -> System.Threading.Tasks.Task!
StackExchange.Redis.ChannelMessageQueue.GetAsyncEnumerator(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Collections.Generic.IAsyncEnumerator<StackExchange.Redis.ChannelMessage>!
StackExchange.Redis.ChannelMessageQueue.OnMessage(System.Action<StackExchange.Redis.ChannelMessage>! handler) -> void
StackExchange.Redis.ChannelMessageQueue.OnMessage(System.Func<StackExchange.Redis.ChannelMessage, System.Threading.Tasks.Task!>! handler) -> void
StackExchange.Redis.ChannelMessageQueue.ReadAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.ValueTask<StackExchange.Redis.ChannelMessage>
Expand Down
34 changes: 34 additions & 0 deletions tests/StackExchange.Redis.Tests/PubSubTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,40 @@ private void TestMassivePublish(ISubscriber sub, string channel, string caption)
Assert.True(withFAF.ElapsedMilliseconds < withAsync.ElapsedMilliseconds + 3000, caption);
}

[Fact]
public async Task SubscribeAsyncEnumerable()
{
using var conn = Create(syncTimeout: 20000, shared: false, log: Writer);

var sub = conn.GetSubscriber();
RedisChannel channel = Me();

const int TO_SEND = 5;
var gotall = new TaskCompletionSource<int>();

var source = await sub.SubscribeAsync(channel);
var op = Task.Run(async () => {
int count = 0;
await foreach (var item in source)
{
count++;
if (count == TO_SEND) gotall.TrySetResult(count);
}
return count;
});

for (int i = 0; i < TO_SEND; i++)
{
await sub.PublishAsync(channel, i);
}
await gotall.Task.WithTimeout(5000);

// check the enumerator exits cleanly
sub.Unsubscribe(channel);
var count = await op.WithTimeout(1000);
Assert.Equal(5, count);
}

[Fact]
public async Task PubSubGetAllAnyOrder()
{
Expand Down

0 comments on commit 9698aaa

Please sign in to comment.