Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Combining the outbox with a message-driven transport may lead to message loss when publishing to multiple subscribers #6909

Merged
merged 1 commit into from
Nov 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
{
using System;
using System.Collections.Concurrent;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Extensibility;
Expand Down Expand Up @@ -29,7 +30,7 @@ public Task Store(OutboxMessage message, IOutboxTransaction transaction, Context
var tx = (AcceptanceTestingOutboxTransaction)transaction;
tx.Enlist(() =>
{
if (!storage.TryAdd(message.MessageId, new StoredMessage(message.MessageId, message.TransportOperations)))
if (!storage.TryAdd(message.MessageId, new StoredMessage(message.MessageId, message.TransportOperations.Select(o => o.DeepCopy()).ToArray())))
{
throw new Exception($"Outbox message with id '{message.MessageId}' is already present in storage.");
}
Expand Down
168 changes: 168 additions & 0 deletions src/NServiceBus.AcceptanceTests/Outbox/When_publishing_with_outbox.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
namespace NServiceBus.AcceptanceTests.Outbox
{
using System;
using System.Threading;
using System.Threading.Tasks;
using AcceptanceTesting;
using AcceptanceTesting.Customization;
using EndpointTemplates;
using Features;
using NServiceBus.Pipeline;
using NUnit.Framework;

public class When_publishing_with_outbox : NServiceBusAcceptanceTest
{
[Test]
public async Task Should_be_delivered_to_all_subscribers()
{
Requires.OutboxPersistence();

Context context = await Scenario.Define<Context>()
.WithEndpoint<Publisher>(b =>
b.When(c => c.Subscriber1Subscribed && c.Subscriber2Subscribed, (session, c) =>
{
// Send a trigger message that will invoke the handler method that publishes the event
c.AddTrace("Both subscribers are subscribed, going to send TriggerMessage");
return session.SendLocal(new TriggerMessage());
})
)
.WithEndpoint<Subscriber1>(b => b.When(async (session, ctx) =>
{
await session.Subscribe<MyEvent>();
if (ctx.HasNativePubSubSupport)
{
ctx.Subscriber1Subscribed = true;
ctx.AddTrace("Subscriber1 is now subscribed (at least we have asked the broker to be subscribed)");
}
else
{
ctx.AddTrace("Subscriber1 has now asked to be subscribed to MyEvent");
}
}))
.WithEndpoint<Subscriber2>(b => b.When(async (session, ctx) =>
{
await session.Subscribe<MyEvent>();
if (ctx.HasNativePubSubSupport)
{
ctx.Subscriber2Subscribed = true;
ctx.AddTrace("Subscriber2 is now subscribed (at least we have asked the broker to be subscribed)");
}
else
{
ctx.AddTrace("Subscriber2 has now asked to be subscribed to MyEvent");
}
}))
.Done(c => c.Subscriber1GotTheEvent && c.Subscriber2GotTheEvent)
.Run(TimeSpan.FromSeconds(10));

Assert.True(context.Subscriber1GotTheEvent);
Assert.True(context.Subscriber2GotTheEvent);
}

public class Context : ScenarioContext
{
public bool Subscriber1GotTheEvent { get; set; }
public bool Subscriber2GotTheEvent { get; set; }
public bool Subscriber1Subscribed { get; set; }
public bool Subscriber2Subscribed { get; set; }
}

public class Publisher : EndpointConfigurationBuilder
{
public Publisher() =>
EndpointSetup<DefaultPublisher>(b =>
{
b.ConfigureTransport().TransportTransactionMode = TransportTransactionMode.ReceiveOnly;
b.EnableOutbox();
// Test the outbox behavior in situations where messages are deserialized and dispatched from the outbox storage by injecting an exception into the dispatch pipeline
b.Pipeline.Register(new BlowUpAfterDispatchBehavior(), "ensure outbox dispatch fails");
b.Recoverability().Immediate(i => i.NumberOfRetries(1));
b.OnEndpointSubscribed<Context>((s, context) =>
{
var subscriber1 = Conventions.EndpointNamingConvention(typeof(Subscriber1));
if (s.SubscriberEndpoint.Contains(subscriber1))
{
context.Subscriber1Subscribed = true;
context.AddTrace($"{subscriber1} is now subscribed");
}
var subscriber2 = Conventions.EndpointNamingConvention(typeof(Subscriber2));
if (s.SubscriberEndpoint.Contains(subscriber2))
{
context.AddTrace($"{subscriber2} is now subscribed");
context.Subscriber2Subscribed = true;
}
});
b.DisableFeature<AutoSubscribe>();
});

public class TriggerHandler : IHandleMessages<TriggerMessage>
{
public Task Handle(TriggerMessage message, IMessageHandlerContext context)
=> context.Publish(new MyEvent());
}

class BlowUpAfterDispatchBehavior : IBehavior<IBatchDispatchContext, IBatchDispatchContext>
{
public async Task Invoke(IBatchDispatchContext context, Func<IBatchDispatchContext, Task> next)
{
if (Interlocked.Increment(ref invocationCounter) == 1)
{
throw new SimulatedException();
}

await next(context).ConfigureAwait(false);
}

int invocationCounter;
}
}

public class Subscriber1 : EndpointConfigurationBuilder
{
public Subscriber1() =>
EndpointSetup<DefaultServer>(c => c.DisableFeature<AutoSubscribe>(),
metadata => metadata.RegisterPublisherFor<MyEvent>(typeof(Publisher)));

public class MyHandler : IHandleMessages<MyEvent>
{
public MyHandler(Context testContext) => this.testContext = testContext;

public Task Handle(MyEvent message, IMessageHandlerContext context)
{
testContext.Subscriber1GotTheEvent = true;
return Task.CompletedTask;
}

readonly Context testContext;
}
}

public class Subscriber2 : EndpointConfigurationBuilder
{
public Subscriber2() =>
EndpointSetup<DefaultServer>(c => c.DisableFeature<AutoSubscribe>(),
metadata => metadata.RegisterPublisherFor<MyEvent>(typeof(Publisher)));

public class MyHandler : IHandleMessages<MyEvent>
{
public MyHandler(Context testContext) => this.testContext = testContext;

public Task Handle(MyEvent messageThatIsEnlisted, IMessageHandlerContext context)
{
testContext.Subscriber2GotTheEvent = true;
return Task.CompletedTask;
}

readonly Context testContext;
}
}

public class MyEvent : IEvent
{
}

public class TriggerMessage : ICommand
{
}
}
}
119 changes: 117 additions & 2 deletions src/NServiceBus.Core.Tests/Routing/RoutingToDispatchConnectorTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,107 @@
[TestFixture]
public class RoutingToDispatchConnectorTests
{
[Test]
public async Task Should_preserve_message_state_for_one_routing_strategy_for_allocation_reasons()
{
var behavior = new RoutingToDispatchConnector();
IEnumerable<TransportOperation> operations = null;
var testableRoutingContext = new TestableRoutingContext
{
RoutingStrategies = new List<RoutingStrategy>
{
new DestinationRoutingStrategy("destination1", "HeaderKeyAddedByTheRoutingStrategy1", "HeaderValueAddedByTheRoutingStrategy1")
}
};
var originalDispatchProperties = new DispatchProperties
{
{ "SomeKey", "SomeValue" }
};
testableRoutingContext.Extensions.Set(originalDispatchProperties);
var originalHeaders = new Dictionary<string, string> { { "SomeHeaderKey", "SomeHeaderValue" } };
testableRoutingContext.Message = new OutgoingMessage("ID", originalHeaders, Array.Empty<byte>());
await behavior.Invoke(testableRoutingContext, context =>
{
operations = context.Operations;
return Task.CompletedTask;
});

Assert.That(operations, Has.Length.EqualTo(1));

TransportOperation destination1Operation = operations.ElementAt(0);
Assert.That(destination1Operation.Message.MessageId, Is.EqualTo("ID"));
Assert.That((destination1Operation.AddressTag as UnicastAddressTag)?.Destination, Is.EqualTo("destination1"));
Dictionary<string, string> destination1Headers = destination1Operation.Message.Headers;
Assert.That(destination1Headers, Contains.Item(new KeyValuePair<string, string>("SomeHeaderKey", "SomeHeaderValue")));
Assert.That(destination1Headers, Contains.Item(new KeyValuePair<string, string>("HeaderKeyAddedByTheRoutingStrategy1", "HeaderValueAddedByTheRoutingStrategy1")));
Assert.That(destination1Headers, Is.SameAs(originalHeaders));
DispatchProperties destination1DispatchProperties = destination1Operation.Properties;
Assert.That(destination1DispatchProperties, Contains.Item(new KeyValuePair<string, string>("SomeKey", "SomeValue")));
Assert.That(destination1DispatchProperties, Is.SameAs(originalDispatchProperties));
}

[Test]
public async Task Should_copy_message_state_for_multiple_routing_strategies()
{
var behavior = new RoutingToDispatchConnector();
IEnumerable<TransportOperation> operations = null;
var testableRoutingContext = new TestableRoutingContext
{
RoutingStrategies = new List<RoutingStrategy>
{
new DestinationRoutingStrategy("destination1", "HeaderKeyAddedByTheRoutingStrategy1", "HeaderValueAddedByTheRoutingStrategy1"),
new DestinationRoutingStrategy("destination2", "HeaderKeyAddedByTheRoutingStrategy2", "HeaderValueAddedByTheRoutingStrategy2")
}
};
var originalDispatchProperties = new DispatchProperties
{
{ "SomeKey", "SomeValue" }
};
testableRoutingContext.Extensions.Set(originalDispatchProperties);
var originalHeaders = new Dictionary<string, string> { { "SomeHeaderKey", "SomeHeaderValue" } };
testableRoutingContext.Message = new OutgoingMessage("ID", originalHeaders, Array.Empty<byte>());
await behavior.Invoke(testableRoutingContext, context =>
{
operations = context.Operations;
return Task.CompletedTask;
});

Assert.That(operations, Has.Length.EqualTo(2));

TransportOperation destination1Operation = operations.ElementAt(0);
Assert.That(destination1Operation.Message.MessageId, Is.EqualTo("ID"));
Assert.That((destination1Operation.AddressTag as UnicastAddressTag)?.Destination, Is.EqualTo("destination1"));
Dictionary<string, string> destination1Headers = destination1Operation.Message.Headers;
Assert.That(destination1Headers, Contains.Item(new KeyValuePair<string, string>("SomeHeaderKey", "SomeHeaderValue")));
Assert.That(destination1Headers, Contains.Item(new KeyValuePair<string, string>("HeaderKeyAddedByTheRoutingStrategy1", "HeaderValueAddedByTheRoutingStrategy1")));
Assert.That(destination1Headers, Does.Not.Contain(new KeyValuePair<string, string>("HeaderKeyAddedByTheRoutingStrategy2", "HeaderValueAddedByTheRoutingStrategy2")));
Assert.That(destination1Headers, Is.Not.SameAs(originalHeaders));
DispatchProperties destination1DispatchProperties = destination1Operation.Properties;
Assert.That(destination1DispatchProperties, Contains.Item(new KeyValuePair<string, string>("SomeKey", "SomeValue")));
Assert.That(destination1DispatchProperties, Is.Not.SameAs(originalDispatchProperties));

TransportOperation destination2Operation = operations.ElementAt(1);
Assert.That(destination2Operation.Message.MessageId, Is.EqualTo("ID"));
Assert.That((destination2Operation.AddressTag as UnicastAddressTag)?.Destination, Is.EqualTo("destination2"));
Dictionary<string, string> destination2Headers = destination2Operation.Message.Headers;
Assert.That(destination2Headers, Contains.Item(new KeyValuePair<string, string>("SomeHeaderKey", "SomeHeaderValue")));
Assert.That(destination2Headers, Contains.Item(new KeyValuePair<string, string>("HeaderKeyAddedByTheRoutingStrategy2", "HeaderValueAddedByTheRoutingStrategy2")));
Assert.That(destination2Headers, Does.Not.Contain(new KeyValuePair<string, string>("HeaderKeyAddedByTheRoutingStrategy1", "HeaderValueAddedByTheRoutingStrategy1")));
Assert.That(destination2Headers, Is.Not.SameAs(originalHeaders));
DispatchProperties destination2DispatchProperties = destination2Operation.Properties;
Assert.That(destination2DispatchProperties, Is.Not.SameAs(originalDispatchProperties));
Assert.That(destination2DispatchProperties, Contains.Item(new KeyValuePair<string, string>("SomeKey", "SomeValue")));

Assert.That(destination1Headers, Is.Not.SameAs(destination2Headers));
Assert.That(destination1DispatchProperties, Is.Not.SameAs(destination2DispatchProperties));
}

[Test]
public async Task Should_preserve_headers_generated_by_custom_routing_strategy()
{
var behavior = new RoutingToDispatchConnector();
Dictionary<string, string> headers = null;
await behavior.Invoke(new TestableRoutingContext { RoutingStrategies = new List<RoutingStrategy> { new CustomRoutingStrategy() } }, context =>
await behavior.Invoke(new TestableRoutingContext { RoutingStrategies = new List<RoutingStrategy> { new HeaderModifyingRoutingStrategy() } }, context =>
{
headers = context.Operations.First().Message.Headers;
return Task.CompletedTask;
Expand Down Expand Up @@ -113,7 +208,7 @@ static IOutgoingSendContext CreateContext(SendOptions options, bool fromHandler)
return context;
}

class CustomRoutingStrategy : RoutingStrategy
class HeaderModifyingRoutingStrategy : RoutingStrategy
{
public override AddressTag Apply(Dictionary<string, string> headers)
{
Expand All @@ -122,6 +217,26 @@ public override AddressTag Apply(Dictionary<string, string> headers)
}
}

class DestinationRoutingStrategy : RoutingStrategy
{
public DestinationRoutingStrategy(string destination, string headerKey, string headerValue)
{
this.destination = destination;
this.headerKey = headerKey;
this.headerValue = headerValue;
}

public override AddressTag Apply(Dictionary<string, string> headers)
{
headers[headerKey] = headerValue;
return new UnicastAddressTag(destination);
}

readonly string destination;
readonly string headerKey;
readonly string headerValue;
}

class MyMessage : IMessage
{
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,20 +1,20 @@
namespace NServiceBus
{
using System.Collections.Generic;
using Pipeline;
using Routing;
using Transport;

static class RoutingContextExtensions
{
public static TransportOperation ToTransportOperation(this IRoutingContext context, RoutingStrategy strategy, DispatchConsistency dispatchConsistency)
public static TransportOperation ToTransportOperation(this IRoutingContext context, RoutingStrategy strategy, DispatchConsistency dispatchConsistency, bool copySharedMutableMessageState)
{
var addressLabel = strategy.Apply(context.Message.Headers);
var message = new OutgoingMessage(context.Message.MessageId, context.Message.Headers, context.Message.Body);

if (!context.Extensions.TryGet(out DispatchProperties dispatchProperties))
{
dispatchProperties = new DispatchProperties();
}
var headers = copySharedMutableMessageState ? new Dictionary<string, string>(context.Message.Headers) : context.Message.Headers;
var dispatchProperties = context.Extensions.TryGet(out DispatchProperties properties)
? copySharedMutableMessageState ? new DispatchProperties(properties) : properties
: new DispatchProperties();
var addressLabel = strategy.Apply(headers);
var message = new OutgoingMessage(context.Message.MessageId, headers, context.Message.Body);

var transportOperation = new TransportOperation(message, addressLabel, dispatchProperties, dispatchConsistency);
return transportOperation;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,11 @@ public override Task Invoke(IRoutingContext context, Func<IDispatchContext, Task

var operations = new TransportOperation[context.RoutingStrategies.Count];
var index = 0;
// when there are more than one routing strategy we want to make sure each transport operation is independent
var copySharedMutableMessageState = context.RoutingStrategies.Count > 1;
foreach (var strategy in context.RoutingStrategies)
{
operations[index] = context.ToTransportOperation(strategy, dispatchConsistency);
operations[index] = context.ToTransportOperation(strategy, dispatchConsistency, copySharedMutableMessageState);
index++;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,11 @@ public async Task<ErrorHandleResult> Invoke(
// using the count here is not entirely accurate because of the way we duplicate based on the strategies
// but in many cases it is a good approximation.
transportOperations ??= new List<TransportOperation>(routingContexts.Count);
// when there are more than one routing strategy we want to make sure each transport operation is independent
var copySharedMutableMessageState = routingContext.RoutingStrategies.Count > 1;
foreach (var strategy in routingContext.RoutingStrategies)
{
var transportOperation = routingContext.ToTransportOperation(strategy, DispatchConsistency.Default);
var transportOperation = routingContext.ToTransportOperation(strategy, DispatchConsistency.Default, copySharedMutableMessageState);
transportOperations.Add(transportOperation);
}
}
Expand Down