From 2a5e715887a213c1e7b2e7a8169c7a6604465eb4 Mon Sep 17 00:00:00 2001 From: Daniel Marbach Date: Fri, 3 Nov 2023 16:11:40 +0100 Subject: [PATCH] Make sure transport operations do not share mutable state between multiple routing strategies (#6905) (#6909) * Add routing to dispatch connector tests to reproduce the shared state problem * Fix the state sharing problem in ToTransportOperation * Cross check headers and properties * Test for the header modifications in the routing strategy * add acceptance test * better naming and comments * block scoped namespace * Remove left overs * Move test * Primary ctor * Behavior --------- Co-authored-by: Tim Bussmann --- .../Outbox/AcceptanceTestingOutboxStorage.cs | 3 +- .../Outbox/When_publishing_with_outbox.cs | 168 ++++++++++++++++++ .../RoutingToDispatchConnectorTests.cs | 119 ++++++++++++- .../Outgoing/RoutingContextExtensions.cs | 16 +- .../Outgoing/RoutingToDispatchConnector.cs | 4 +- .../SatelliteRecoverabilityExecutor.cs | 4 +- 6 files changed, 301 insertions(+), 13 deletions(-) create mode 100644 src/NServiceBus.AcceptanceTests/Outbox/When_publishing_with_outbox.cs diff --git a/src/NServiceBus.AcceptanceTesting/AcceptanceTestingPersistence/Outbox/AcceptanceTestingOutboxStorage.cs b/src/NServiceBus.AcceptanceTesting/AcceptanceTestingPersistence/Outbox/AcceptanceTestingOutboxStorage.cs index 8884ea17db..e1bb282140 100644 --- a/src/NServiceBus.AcceptanceTesting/AcceptanceTestingPersistence/Outbox/AcceptanceTestingOutboxStorage.cs +++ b/src/NServiceBus.AcceptanceTesting/AcceptanceTestingPersistence/Outbox/AcceptanceTestingOutboxStorage.cs @@ -2,6 +2,7 @@ { using System; using System.Collections.Concurrent; + using System.Linq; using System.Threading; using System.Threading.Tasks; using Extensibility; @@ -29,7 +30,7 @@ public Task Store(OutboxMessage message, IOutboxTransaction transaction, Context var tx = (AcceptanceTestingOutboxTransaction)transaction; tx.Enlist(() => { - if (!storage.TryAdd(message.MessageId, new StoredMessage(message.MessageId, message.TransportOperations))) + if (!storage.TryAdd(message.MessageId, new StoredMessage(message.MessageId, message.TransportOperations.Select(o => o.DeepCopy()).ToArray()))) { throw new Exception($"Outbox message with id '{message.MessageId}' is already present in storage."); } diff --git a/src/NServiceBus.AcceptanceTests/Outbox/When_publishing_with_outbox.cs b/src/NServiceBus.AcceptanceTests/Outbox/When_publishing_with_outbox.cs new file mode 100644 index 0000000000..3bf7af55f7 --- /dev/null +++ b/src/NServiceBus.AcceptanceTests/Outbox/When_publishing_with_outbox.cs @@ -0,0 +1,168 @@ +namespace NServiceBus.AcceptanceTests.Outbox +{ + using System; + using System.Threading; + using System.Threading.Tasks; + using AcceptanceTesting; + using AcceptanceTesting.Customization; + using EndpointTemplates; + using Features; + using NServiceBus.Pipeline; + using NUnit.Framework; + + public class When_publishing_with_outbox : NServiceBusAcceptanceTest + { + [Test] + public async Task Should_be_delivered_to_all_subscribers() + { + Requires.OutboxPersistence(); + + Context context = await Scenario.Define() + .WithEndpoint(b => + b.When(c => c.Subscriber1Subscribed && c.Subscriber2Subscribed, (session, c) => + { + // Send a trigger message that will invoke the handler method that publishes the event + c.AddTrace("Both subscribers are subscribed, going to send TriggerMessage"); + return session.SendLocal(new TriggerMessage()); + }) + ) + .WithEndpoint(b => b.When(async (session, ctx) => + { + await session.Subscribe(); + if (ctx.HasNativePubSubSupport) + { + ctx.Subscriber1Subscribed = true; + ctx.AddTrace("Subscriber1 is now subscribed (at least we have asked the broker to be subscribed)"); + } + else + { + ctx.AddTrace("Subscriber1 has now asked to be subscribed to MyEvent"); + } + })) + .WithEndpoint(b => b.When(async (session, ctx) => + { + await session.Subscribe(); + if (ctx.HasNativePubSubSupport) + { + ctx.Subscriber2Subscribed = true; + ctx.AddTrace("Subscriber2 is now subscribed (at least we have asked the broker to be subscribed)"); + } + else + { + ctx.AddTrace("Subscriber2 has now asked to be subscribed to MyEvent"); + } + })) + .Done(c => c.Subscriber1GotTheEvent && c.Subscriber2GotTheEvent) + .Run(TimeSpan.FromSeconds(10)); + + Assert.True(context.Subscriber1GotTheEvent); + Assert.True(context.Subscriber2GotTheEvent); + } + + public class Context : ScenarioContext + { + public bool Subscriber1GotTheEvent { get; set; } + public bool Subscriber2GotTheEvent { get; set; } + public bool Subscriber1Subscribed { get; set; } + public bool Subscriber2Subscribed { get; set; } + } + + public class Publisher : EndpointConfigurationBuilder + { + public Publisher() => + EndpointSetup(b => + { + b.ConfigureTransport().TransportTransactionMode = TransportTransactionMode.ReceiveOnly; + b.EnableOutbox(); + // Test the outbox behavior in situations where messages are deserialized and dispatched from the outbox storage by injecting an exception into the dispatch pipeline + b.Pipeline.Register(new BlowUpAfterDispatchBehavior(), "ensure outbox dispatch fails"); + b.Recoverability().Immediate(i => i.NumberOfRetries(1)); + b.OnEndpointSubscribed((s, context) => + { + var subscriber1 = Conventions.EndpointNamingConvention(typeof(Subscriber1)); + if (s.SubscriberEndpoint.Contains(subscriber1)) + { + context.Subscriber1Subscribed = true; + context.AddTrace($"{subscriber1} is now subscribed"); + } + var subscriber2 = Conventions.EndpointNamingConvention(typeof(Subscriber2)); + if (s.SubscriberEndpoint.Contains(subscriber2)) + { + context.AddTrace($"{subscriber2} is now subscribed"); + context.Subscriber2Subscribed = true; + } + }); + b.DisableFeature(); + }); + + public class TriggerHandler : IHandleMessages + { + public Task Handle(TriggerMessage message, IMessageHandlerContext context) + => context.Publish(new MyEvent()); + } + + class BlowUpAfterDispatchBehavior : IBehavior + { + public async Task Invoke(IBatchDispatchContext context, Func next) + { + if (Interlocked.Increment(ref invocationCounter) == 1) + { + throw new SimulatedException(); + } + + await next(context).ConfigureAwait(false); + } + + int invocationCounter; + } + } + + public class Subscriber1 : EndpointConfigurationBuilder + { + public Subscriber1() => + EndpointSetup(c => c.DisableFeature(), + metadata => metadata.RegisterPublisherFor(typeof(Publisher))); + + public class MyHandler : IHandleMessages + { + public MyHandler(Context testContext) => this.testContext = testContext; + + public Task Handle(MyEvent message, IMessageHandlerContext context) + { + testContext.Subscriber1GotTheEvent = true; + return Task.CompletedTask; + } + + readonly Context testContext; + } + } + + public class Subscriber2 : EndpointConfigurationBuilder + { + public Subscriber2() => + EndpointSetup(c => c.DisableFeature(), + metadata => metadata.RegisterPublisherFor(typeof(Publisher))); + + public class MyHandler : IHandleMessages + { + public MyHandler(Context testContext) => this.testContext = testContext; + + public Task Handle(MyEvent messageThatIsEnlisted, IMessageHandlerContext context) + { + testContext.Subscriber2GotTheEvent = true; + return Task.CompletedTask; + } + + readonly Context testContext; + } + } + + public class MyEvent : IEvent + { + } + + public class TriggerMessage : ICommand + { + } + } +} \ No newline at end of file diff --git a/src/NServiceBus.Core.Tests/Routing/RoutingToDispatchConnectorTests.cs b/src/NServiceBus.Core.Tests/Routing/RoutingToDispatchConnectorTests.cs index 6b7aa41440..6cd74a25a9 100644 --- a/src/NServiceBus.Core.Tests/Routing/RoutingToDispatchConnectorTests.cs +++ b/src/NServiceBus.Core.Tests/Routing/RoutingToDispatchConnectorTests.cs @@ -14,12 +14,107 @@ [TestFixture] public class RoutingToDispatchConnectorTests { + [Test] + public async Task Should_preserve_message_state_for_one_routing_strategy_for_allocation_reasons() + { + var behavior = new RoutingToDispatchConnector(); + IEnumerable operations = null; + var testableRoutingContext = new TestableRoutingContext + { + RoutingStrategies = new List + { + new DestinationRoutingStrategy("destination1", "HeaderKeyAddedByTheRoutingStrategy1", "HeaderValueAddedByTheRoutingStrategy1") + } + }; + var originalDispatchProperties = new DispatchProperties + { + { "SomeKey", "SomeValue" } + }; + testableRoutingContext.Extensions.Set(originalDispatchProperties); + var originalHeaders = new Dictionary { { "SomeHeaderKey", "SomeHeaderValue" } }; + testableRoutingContext.Message = new OutgoingMessage("ID", originalHeaders, Array.Empty()); + await behavior.Invoke(testableRoutingContext, context => + { + operations = context.Operations; + return Task.CompletedTask; + }); + + Assert.That(operations, Has.Length.EqualTo(1)); + + TransportOperation destination1Operation = operations.ElementAt(0); + Assert.That(destination1Operation.Message.MessageId, Is.EqualTo("ID")); + Assert.That((destination1Operation.AddressTag as UnicastAddressTag)?.Destination, Is.EqualTo("destination1")); + Dictionary destination1Headers = destination1Operation.Message.Headers; + Assert.That(destination1Headers, Contains.Item(new KeyValuePair("SomeHeaderKey", "SomeHeaderValue"))); + Assert.That(destination1Headers, Contains.Item(new KeyValuePair("HeaderKeyAddedByTheRoutingStrategy1", "HeaderValueAddedByTheRoutingStrategy1"))); + Assert.That(destination1Headers, Is.SameAs(originalHeaders)); + DispatchProperties destination1DispatchProperties = destination1Operation.Properties; + Assert.That(destination1DispatchProperties, Contains.Item(new KeyValuePair("SomeKey", "SomeValue"))); + Assert.That(destination1DispatchProperties, Is.SameAs(originalDispatchProperties)); + } + + [Test] + public async Task Should_copy_message_state_for_multiple_routing_strategies() + { + var behavior = new RoutingToDispatchConnector(); + IEnumerable operations = null; + var testableRoutingContext = new TestableRoutingContext + { + RoutingStrategies = new List + { + new DestinationRoutingStrategy("destination1", "HeaderKeyAddedByTheRoutingStrategy1", "HeaderValueAddedByTheRoutingStrategy1"), + new DestinationRoutingStrategy("destination2", "HeaderKeyAddedByTheRoutingStrategy2", "HeaderValueAddedByTheRoutingStrategy2") + } + }; + var originalDispatchProperties = new DispatchProperties + { + { "SomeKey", "SomeValue" } + }; + testableRoutingContext.Extensions.Set(originalDispatchProperties); + var originalHeaders = new Dictionary { { "SomeHeaderKey", "SomeHeaderValue" } }; + testableRoutingContext.Message = new OutgoingMessage("ID", originalHeaders, Array.Empty()); + await behavior.Invoke(testableRoutingContext, context => + { + operations = context.Operations; + return Task.CompletedTask; + }); + + Assert.That(operations, Has.Length.EqualTo(2)); + + TransportOperation destination1Operation = operations.ElementAt(0); + Assert.That(destination1Operation.Message.MessageId, Is.EqualTo("ID")); + Assert.That((destination1Operation.AddressTag as UnicastAddressTag)?.Destination, Is.EqualTo("destination1")); + Dictionary destination1Headers = destination1Operation.Message.Headers; + Assert.That(destination1Headers, Contains.Item(new KeyValuePair("SomeHeaderKey", "SomeHeaderValue"))); + Assert.That(destination1Headers, Contains.Item(new KeyValuePair("HeaderKeyAddedByTheRoutingStrategy1", "HeaderValueAddedByTheRoutingStrategy1"))); + Assert.That(destination1Headers, Does.Not.Contain(new KeyValuePair("HeaderKeyAddedByTheRoutingStrategy2", "HeaderValueAddedByTheRoutingStrategy2"))); + Assert.That(destination1Headers, Is.Not.SameAs(originalHeaders)); + DispatchProperties destination1DispatchProperties = destination1Operation.Properties; + Assert.That(destination1DispatchProperties, Contains.Item(new KeyValuePair("SomeKey", "SomeValue"))); + Assert.That(destination1DispatchProperties, Is.Not.SameAs(originalDispatchProperties)); + + TransportOperation destination2Operation = operations.ElementAt(1); + Assert.That(destination2Operation.Message.MessageId, Is.EqualTo("ID")); + Assert.That((destination2Operation.AddressTag as UnicastAddressTag)?.Destination, Is.EqualTo("destination2")); + Dictionary destination2Headers = destination2Operation.Message.Headers; + Assert.That(destination2Headers, Contains.Item(new KeyValuePair("SomeHeaderKey", "SomeHeaderValue"))); + Assert.That(destination2Headers, Contains.Item(new KeyValuePair("HeaderKeyAddedByTheRoutingStrategy2", "HeaderValueAddedByTheRoutingStrategy2"))); + Assert.That(destination2Headers, Does.Not.Contain(new KeyValuePair("HeaderKeyAddedByTheRoutingStrategy1", "HeaderValueAddedByTheRoutingStrategy1"))); + Assert.That(destination2Headers, Is.Not.SameAs(originalHeaders)); + DispatchProperties destination2DispatchProperties = destination2Operation.Properties; + Assert.That(destination2DispatchProperties, Is.Not.SameAs(originalDispatchProperties)); + Assert.That(destination2DispatchProperties, Contains.Item(new KeyValuePair("SomeKey", "SomeValue"))); + + Assert.That(destination1Headers, Is.Not.SameAs(destination2Headers)); + Assert.That(destination1DispatchProperties, Is.Not.SameAs(destination2DispatchProperties)); + } + [Test] public async Task Should_preserve_headers_generated_by_custom_routing_strategy() { var behavior = new RoutingToDispatchConnector(); Dictionary headers = null; - await behavior.Invoke(new TestableRoutingContext { RoutingStrategies = new List { new CustomRoutingStrategy() } }, context => + await behavior.Invoke(new TestableRoutingContext { RoutingStrategies = new List { new HeaderModifyingRoutingStrategy() } }, context => { headers = context.Operations.First().Message.Headers; return Task.CompletedTask; @@ -113,7 +208,7 @@ static IOutgoingSendContext CreateContext(SendOptions options, bool fromHandler) return context; } - class CustomRoutingStrategy : RoutingStrategy + class HeaderModifyingRoutingStrategy : RoutingStrategy { public override AddressTag Apply(Dictionary headers) { @@ -122,6 +217,26 @@ public override AddressTag Apply(Dictionary headers) } } + class DestinationRoutingStrategy : RoutingStrategy + { + public DestinationRoutingStrategy(string destination, string headerKey, string headerValue) + { + this.destination = destination; + this.headerKey = headerKey; + this.headerValue = headerValue; + } + + public override AddressTag Apply(Dictionary headers) + { + headers[headerKey] = headerValue; + return new UnicastAddressTag(destination); + } + + readonly string destination; + readonly string headerKey; + readonly string headerValue; + } + class MyMessage : IMessage { } diff --git a/src/NServiceBus.Core/Pipeline/Outgoing/RoutingContextExtensions.cs b/src/NServiceBus.Core/Pipeline/Outgoing/RoutingContextExtensions.cs index 3fcb435da1..c15d6caa64 100644 --- a/src/NServiceBus.Core/Pipeline/Outgoing/RoutingContextExtensions.cs +++ b/src/NServiceBus.Core/Pipeline/Outgoing/RoutingContextExtensions.cs @@ -1,20 +1,20 @@ namespace NServiceBus { + using System.Collections.Generic; using Pipeline; using Routing; using Transport; static class RoutingContextExtensions { - public static TransportOperation ToTransportOperation(this IRoutingContext context, RoutingStrategy strategy, DispatchConsistency dispatchConsistency) + public static TransportOperation ToTransportOperation(this IRoutingContext context, RoutingStrategy strategy, DispatchConsistency dispatchConsistency, bool copySharedMutableMessageState) { - var addressLabel = strategy.Apply(context.Message.Headers); - var message = new OutgoingMessage(context.Message.MessageId, context.Message.Headers, context.Message.Body); - - if (!context.Extensions.TryGet(out DispatchProperties dispatchProperties)) - { - dispatchProperties = new DispatchProperties(); - } + var headers = copySharedMutableMessageState ? new Dictionary(context.Message.Headers) : context.Message.Headers; + var dispatchProperties = context.Extensions.TryGet(out DispatchProperties properties) + ? copySharedMutableMessageState ? new DispatchProperties(properties) : properties + : new DispatchProperties(); + var addressLabel = strategy.Apply(headers); + var message = new OutgoingMessage(context.Message.MessageId, headers, context.Message.Body); var transportOperation = new TransportOperation(message, addressLabel, dispatchProperties, dispatchConsistency); return transportOperation; diff --git a/src/NServiceBus.Core/Pipeline/Outgoing/RoutingToDispatchConnector.cs b/src/NServiceBus.Core/Pipeline/Outgoing/RoutingToDispatchConnector.cs index 055c725469..e1966936ae 100644 --- a/src/NServiceBus.Core/Pipeline/Outgoing/RoutingToDispatchConnector.cs +++ b/src/NServiceBus.Core/Pipeline/Outgoing/RoutingToDispatchConnector.cs @@ -26,9 +26,11 @@ public override Task Invoke(IRoutingContext context, Func 1; foreach (var strategy in context.RoutingStrategies) { - operations[index] = context.ToTransportOperation(strategy, dispatchConsistency); + operations[index] = context.ToTransportOperation(strategy, dispatchConsistency, copySharedMutableMessageState); index++; } diff --git a/src/NServiceBus.Core/Recoverability/SatelliteRecoverabilityExecutor.cs b/src/NServiceBus.Core/Recoverability/SatelliteRecoverabilityExecutor.cs index bfefca4a62..52e50b94d7 100644 --- a/src/NServiceBus.Core/Recoverability/SatelliteRecoverabilityExecutor.cs +++ b/src/NServiceBus.Core/Recoverability/SatelliteRecoverabilityExecutor.cs @@ -44,9 +44,11 @@ public async Task Invoke( // using the count here is not entirely accurate because of the way we duplicate based on the strategies // but in many cases it is a good approximation. transportOperations ??= new List(routingContexts.Count); + // when there are more than one routing strategy we want to make sure each transport operation is independent + var copySharedMutableMessageState = routingContext.RoutingStrategies.Count > 1; foreach (var strategy in routingContext.RoutingStrategies) { - var transportOperation = routingContext.ToTransportOperation(strategy, DispatchConsistency.Default); + var transportOperation = routingContext.ToTransportOperation(strategy, DispatchConsistency.Default, copySharedMutableMessageState); transportOperations.Add(transportOperation); } }