Skip to content

Commit

Permalink
messaging host builder with type dependent pipeline (#240)
Browse files Browse the repository at this point in the history
Co-authored-by: Radu Popovici <rpopovici@totalsoft.ro>
  • Loading branch information
oncicaradupopovici and Radu Popovici authored Aug 23, 2022
1 parent 502a689 commit 92e36ae
Show file tree
Hide file tree
Showing 4 changed files with 61 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -67,17 +67,15 @@ public IMessagingHostPipelineBuilder WithOptions(Action<SubscriberOptionsBuilder
return this;
}

public void UsePipeline(Action<IPipelineBuilder<MessagingContext>> configurePipeline)
public void UsePipeline(Action<Type, IPipelineBuilder<MessagingContext>> configurePipeline)
{
var builder = new PipelineBuilder<MessagingContext>();
configurePipeline?.Invoke(builder);

foreach (var subscriber in _currentSubscriberGroup)
{
var messageType = subscriber.MessageType;
var builder = new PipelineBuilder<MessagingContext>();
configurePipeline?.Invoke(messageType, builder);
subscriber.Pipeline = builder.Pipeline;
}


_currentSubscriberGroup = null;
}

Expand Down Expand Up @@ -153,6 +151,9 @@ public interface IMessagingHostPipelineBuilder : IMessagingHostConfigurationBuil
/// </summary>
/// <param name="configurePipeline">The pipeline configurator is used to add the middleware to the pipeline.</param>
/// <returns>The messaging host subscriberBuilder to further subscriberBuilder the messaging host. It is used in the fluent API</returns>
void UsePipeline(Action<IPipelineBuilder<MessagingContext>> configurePipeline);
void UsePipeline(Action<Type, IPipelineBuilder<MessagingContext>> configurePipeline);

public void UsePipeline(Action<IPipelineBuilder<MessagingContext>> configurePipeline)
=> UsePipeline((_, b) => configurePipeline(b));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,4 @@ public class Subscriber
public PipelineDelegate<MessagingContext> Pipeline { get; set; }
}
}
}
}
22 changes: 22 additions & 0 deletions src/Messaging/NBB.Messaging.Host/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,28 @@ services.AddMessagingHost(
);
}
);
```

#### Register a type dependent pipeline

In case you need to register a type dependent pipeline (like conditional middleware) for more subscribers, you can build something like:

```csharp

services.AddMessagingHost(
Configuration,
hostBuilder =>
{
hostBuilder.Configure(configBuilder => configBuilder
.AddSubscriberServices(...)
.WithOptions(...)
.UsePipeline((t, p) => p
.Use(...)
.When(t == typeof(MyCommand), p => p.Use(...)) //<-- conditional middleware
.Use(...)
);
}
);
```

#### Advanced scenarios
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,36 @@ public void Should_register_same_pipeline_for_more_subscriber_groups()

}

[Fact]
public void Should_register_a_type_dependent_pipeline()
{
//Arrange
var services = Mock.Of<IServiceCollection>();
var provider = Mock.Of<IServiceProvider>();
PipelineDelegate<MessagingContext> mockMiddlewareFunc = (ctx, ct) => Task.CompletedTask;
Func<PipelineDelegate<MessagingContext>, PipelineDelegate<MessagingContext>> mockMiddleware = next => mockMiddlewareFunc;

//Act
var builder = new MessagingHostConfigurationBuilder(provider, services);
builder
.AddSubscriberServices(cfg => cfg
.AddType<CommandMessage>()
.AddType<EventMessage>()
.FromTopic("OtherTopicName"))
.WithDefaultOptions()
.UsePipeline((t, p) => p
.When(t == typeof(CommandMessage), p => p.Use(mockMiddleware)));
var config = builder.Build();

//Assert
config.Subscribers.Should().HaveCount(3);
config.Subscribers[0].MessageType.Should().Be(typeof(CommandMessage));
config.Subscribers[0].Pipeline.Should().Be(mockMiddlewareFunc);

config.Subscribers[1].Pipeline.Should().NotBe(mockMiddlewareFunc);
config.Subscribers[2].Pipeline.Should().NotBe(mockMiddlewareFunc);
}

public record CommandMessage : IRequest;

public record EventMessage : INotification;
Expand Down

0 comments on commit 92e36ae

Please sign in to comment.