Skip to content

Commit

Permalink
Subscription Refinements (#1547)
Browse files Browse the repository at this point in the history
  • Loading branch information
michaelstaib authored Mar 12, 2020
1 parent 69dad6b commit 6a994a6
Show file tree
Hide file tree
Showing 16 changed files with 288 additions and 90 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@ public static IServiceCollection AddInMemorySubscriptions(
}

services.AddSingleton<InMemoryPubSub>();
services.AddSingleton<IEventDispatcher>(sp =>
services.AddSingleton<ITopicEventSender>(sp =>
sp.GetRequiredService<InMemoryPubSub>());
services.AddSingleton<IEventTopicObserver>(sp =>
services.AddSingleton<ITopicEventReceiver>(sp =>
sp.GetRequiredService<InMemoryPubSub>());
return services;
}
Expand Down
4 changes: 2 additions & 2 deletions src/Core/Subscriptions.InMemory/InMemoryPubSub.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@
namespace HotChocolate.Subscriptions.InMemory
{
public class InMemoryPubSub
: IEventTopicObserver
, IEventDispatcher
: ITopicEventReceiver
, ITopicEventSender
{
private readonly ConcurrentDictionary<object, IEventTopic> _topics =
new ConcurrentDictionary<object, IEventTopic>();
Expand Down
4 changes: 2 additions & 2 deletions src/Core/Subscriptions.Redis.Tests/RedisIntegrationTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ public class RedisIntegrationTests
: IClassFixture<RedisResource>
{
private readonly ConnectionMultiplexer _connection;
private readonly IEventDispatcher _sender;
private readonly ITopicEventSender _sender;

public RedisIntegrationTests(RedisResource redisResource)
{
Expand Down Expand Up @@ -47,7 +47,7 @@ public Task Subscribe()
.Field(name)
.Resolver("baz")
.Subscribe(async ctx =>
await ctx.Service<IEventTopicObserver>()
await ctx.Service<ITopicEventReceiver>()
.SubscribeAsync<string, string>("foo", ctx.RequestAborted)))
.Create()
.MakeExecutable();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,9 @@ private static IServiceCollection AddServices(
{
services
.AddSingleton<RedisPubSub>()
.AddSingleton<IEventDispatcher>(sp =>
.AddSingleton<ITopicEventSender>(sp =>
sp.GetRequiredService<RedisPubSub>())
.AddSingleton<IEventTopicObserver>(sp =>
.AddSingleton<ITopicEventReceiver>(sp =>
sp.GetRequiredService<RedisPubSub>());
return services;
}
Expand Down
4 changes: 2 additions & 2 deletions src/Core/Subscriptions.Redis/RedisPubSub.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@
namespace HotChocolate.Subscriptions.Redis
{
public class RedisPubSub
: IEventTopicObserver
, IEventDispatcher
: ITopicEventReceiver
, ITopicEventSender
{
private readonly IConnectionMultiplexer _connection;
public const string Completed = "{completed}";
Expand Down
10 changes: 5 additions & 5 deletions src/Core/Subscriptions.Tests/InMemorySubscriptionTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ public class MutationRoot
{
public async Task<string> FooAsync(
string a,
[Service]IEventDispatcher eventDispatcher,
[Service]ITopicEventSender eventDispatcher,
CancellationToken cancellationToken)
{
await eventDispatcher.SendAsync("foo", a);
Expand All @@ -160,19 +160,19 @@ public async Task<string> FooAsync(

public class SubscriptionRoot
{
[Subscribe]
[SubscribeAndResolve]
public async Task<IAsyncEnumerable<string>> OnFooAsync(
[Service]IEventTopicObserver eventTopicObserver,
[Service]ITopicEventReceiver eventTopicObserver,
CancellationToken cancellationToken)
{
return await eventTopicObserver.SubscribeAsync<string, string>(
"foo", cancellationToken);
}

[Subscribe]
[SubscribeAndResolve]
public async Task<IAsyncEnumerable<string>> OnBarAsync(
string topic,
[Service]IEventTopicObserver eventTopicObserver,
[Service]ITopicEventReceiver eventTopicObserver,
CancellationToken cancellationToken)
{
return await eventTopicObserver.SubscribeAsync<string, string>(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,11 @@
namespace HotChocolate.Subscriptions
{
/// <summary>
/// The <see cref="IEventTopicObserver" /> creates subscription to
/// a specific event topic and returns an <see cref="IEventStream{TMessage}" />
/// The <see cref="ITopicEventReceiver" /> creates subscriptions to
/// specific event topics and returns an <see cref="IEventStream{TMessage}" />
/// which represents a stream of event message for the specified topic.
/// </summary>
public interface IEventTopicObserver
public interface ITopicEventReceiver
{
/// <summary>
/// Subscribes to the specified event <paramref name="topic" />.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,11 @@
namespace HotChocolate.Subscriptions
{
/// <summary>
/// The event dispatcher sends event messages to the pub/sub-system.
/// The topic event sender sends event messages to the pub/sub-system.
/// Typically a mutation would use the event dispatcher to raise events
/// after some changes were committed to the backend system.
/// </summary>
public interface IEventDispatcher
public interface ITopicEventSender
{
/// <summary>
/// Sends an event message to the pub/sub-system.
Expand Down
26 changes: 26 additions & 0 deletions src/Core/Types.Tests/Types/ObjectTypeTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1513,6 +1513,18 @@ public void Infer_Argument_Default_Values()
schema.ToString().MatchSnapshot();
}

[Fact]
public void Combine_Subscribe_And_Resolve_Into_One_Field()
{
ISchema schema = SchemaBuilder.New()
.AddQueryType(c => c.Name("Query"))
.AddSubscriptionType<SubscriptionWithSubscribe>()
.Use(next => context => Task.CompletedTask)
.Create();

schema.ToString().MatchSnapshot();
}

public class GenericFoo<T>
{
public T Value { get; }
Expand Down Expand Up @@ -1666,5 +1678,19 @@ public string Field2(
[DefaultValue(null)]string a,
[DefaultValue("abc")]string b) => null;
}

public class SubscriptionWithSubscribe
{
[Subscribe(nameof(OnFoo))]
public IAsyncEnumerable<string> SubscribeToFoo()
{
throw new NotImplementedException();
}

public string OnFoo()
{
throw new NotImplementedException();
}
}
}
}
Loading

0 comments on commit 6a994a6

Please sign in to comment.