Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow customization of native outgoing messages #1468

Merged
merged 4 commits into from
Nov 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
namespace NServiceBus.Transport.RabbitMQ.AcceptanceTests
{
using System.Threading.Tasks;
using AcceptanceTesting;
using global::RabbitMQ.Client.Events;
using NServiceBus.AcceptanceTests;
using NServiceBus.AcceptanceTests.EndpointTemplates;
using NUnit.Framework;

class When_customizing_outgoing_messages : NServiceBusAcceptanceTest
{
[Test]
public async Task Should_set_value()
{
var scenario = await Scenario.Define<Context>()
.WithEndpoint<Receiver>(b => b.When((bus, c) => bus.SendLocal(new Message())))
.Done(c => c.MessageReceived)
.Run();

Assert.That(scenario.BasicDeliverEventArgs.BasicProperties.AppId, Is.EqualTo("MyValue"));
}

public class Receiver : EndpointConfigurationBuilder
{
public Receiver()
{
EndpointSetup<DefaultServer>(endpointConfiguration =>
{
endpointConfiguration.ConfigureRabbitMQTransport().OutgoingNativeMessageCustomization =
(operation, properties) =>
{
properties.AppId = "MyValue";
};
});
}

class MyEventHandler : IHandleMessages<Message>
{
Context testContext;

public MyEventHandler(Context testContext)
{
this.testContext = testContext;
}

public Task Handle(Message message, IMessageHandlerContext context)
{
testContext.BasicDeliverEventArgs = context.Extensions.Get<BasicDeliverEventArgs>();
testContext.MessageReceived = true;

return Task.CompletedTask;
}
}
}

public class Message : IMessage
{
}

class Context : ScenarioContext
{
public bool MessageReceived { get; set; }

public BasicDeliverEventArgs BasicDeliverEventArgs { get; set; }
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ namespace NServiceBus
public System.TimeSpan HeartbeatInterval { get; set; }
public System.Func<RabbitMQ.Client.Events.BasicDeliverEventArgs, string> MessageIdStrategy { get; set; }
public System.TimeSpan NetworkRecoveryInterval { get; set; }
public System.Action<NServiceBus.Transport.IOutgoingTransportOperation, RabbitMQ.Client.IBasicProperties> OutgoingNativeMessageCustomization { get; set; }
public NServiceBus.PrefetchCountCalculation PrefetchCountCalculation { get; set; }
public System.TimeSpan TimeToWaitBeforeTriggeringCircuitBreaker { get; set; }
public bool UseExternalAuthMechanism { get; set; }
Expand Down
44 changes: 39 additions & 5 deletions src/NServiceBus.Transport.RabbitMQ/RabbitMQTransport.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
using System.Security.Cryptography.X509Certificates;
using System.Threading;
using System.Threading.Tasks;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using Transport;
using Transport.RabbitMQ;
Expand Down Expand Up @@ -91,6 +92,21 @@ public TimeSpan TimeToWaitBeforeTriggeringCircuitBreaker
}
}

/// <summary>
/// Gets or sets the action that allows customization of the native <see cref="BasicProperties"/>
/// just before it is dispatched to the rabbitmq client.
/// </summary>
/// <remarks>
/// <para>
/// This customization is applied after any configured transport customizations, meaning that
/// any changes made here may override or conflict with previous transport-level adjustments.
/// Exercise caution, as modifying the message at this stage can lead to unintended behavior
/// downstream if the message structure or properties are altered in ways that do not align
/// with expectations elsewhere in the system.
/// </para>
/// </remarks>
public Action<IOutgoingTransportOperation, IBasicProperties> OutgoingNativeMessageCustomization { get; set; }
danielmarbach marked this conversation as resolved.
Show resolved Hide resolved

/// <summary>
/// The calculation method for the prefetch count. The default is 3 times the maximum concurrency value.
/// </summary>
Expand Down Expand Up @@ -183,17 +199,35 @@ public override async Task<TransportInfrastructure> Initialize(HostSettings host
certCollection = new X509Certificate2Collection(ClientCertificate);
}

var connectionFactory = new ConnectionFactory(hostSettings.Name, ConnectionConfiguration, certCollection, !ValidateRemoteCertificate,
UseExternalAuthMechanism, HeartbeatInterval, NetworkRecoveryInterval, additionalClusterNodes);
var connectionFactory = new ConnectionFactory(
hostSettings.Name,
ConnectionConfiguration,
certCollection,
!ValidateRemoteCertificate,
UseExternalAuthMechanism,
HeartbeatInterval,
NetworkRecoveryInterval,
additionalClusterNodes
);

var channelProvider = new ChannelProvider(connectionFactory, NetworkRecoveryInterval, RoutingTopology);
await channelProvider.CreateConnection(cancellationToken).ConfigureAwait(false);

var converter = new MessageConverter(MessageIdStrategy);

var infra = new RabbitMQTransportInfrastructure(hostSettings, receivers, connectionFactory,
RoutingTopology, channelProvider, converter, TimeToWaitBeforeTriggeringCircuitBreaker,
PrefetchCountCalculation, NetworkRecoveryInterval, SupportsDelayedDelivery);
var infra = new RabbitMQTransportInfrastructure(
hostSettings,
receivers,
connectionFactory,
RoutingTopology,
channelProvider,
converter,
OutgoingNativeMessageCustomization,
TimeToWaitBeforeTriggeringCircuitBreaker,
PrefetchCountCalculation,
NetworkRecoveryInterval,
SupportsDelayedDelivery
);

if (hostSettings.SetupInfrastructure)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using global::RabbitMQ.Client;

sealed class RabbitMQTransportInfrastructure : TransportInfrastructure
{
Expand All @@ -17,6 +18,7 @@ sealed class RabbitMQTransportInfrastructure : TransportInfrastructure
public RabbitMQTransportInfrastructure(HostSettings hostSettings, ReceiveSettings[] receiverSettings,
ConnectionFactory connectionFactory, IRoutingTopology routingTopology,
ChannelProvider channelProvider, MessageConverter messageConverter,
Action<IOutgoingTransportOperation, IBasicProperties> messageCustomization,
TimeSpan timeToWaitBeforeTriggeringCircuitBreaker, PrefetchCountCalculation prefetchCountCalculation,
TimeSpan networkRecoveryInterval, bool supportsDelayedDelivery)
{
Expand All @@ -26,7 +28,7 @@ public RabbitMQTransportInfrastructure(HostSettings hostSettings, ReceiveSetting
this.networkRecoveryInterval = networkRecoveryInterval;
this.supportsDelayedDelivery = supportsDelayedDelivery;

Dispatcher = new MessageDispatcher(channelProvider, supportsDelayedDelivery);
Dispatcher = new MessageDispatcher(channelProvider, messageCustomization, supportsDelayedDelivery);
Receivers = receiverSettings.Select(x => CreateMessagePump(hostSettings, x, messageConverter, timeToWaitBeforeTriggeringCircuitBreaker, prefetchCountCalculation))
.ToDictionary(x => x.Id, x => x);
}
Expand Down
10 changes: 9 additions & 1 deletion src/NServiceBus.Transport.RabbitMQ/Sending/MessageDispatcher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,17 @@
class MessageDispatcher : IMessageDispatcher
{
readonly ChannelProvider channelProvider;
readonly Action<IOutgoingTransportOperation, IBasicProperties> messageCustomization;
readonly bool supportsDelayedDelivery;

public MessageDispatcher(ChannelProvider channelProvider, bool supportsDelayedDelivery)
public MessageDispatcher(
ChannelProvider channelProvider,
Action<IOutgoingTransportOperation, IBasicProperties> messageCustomization,
bool supportsDelayedDelivery
)
{
this.channelProvider = channelProvider;
this.messageCustomization = messageCustomization ?? (static (_, _) => { });
this.supportsDelayedDelivery = supportsDelayedDelivery;
}

Expand Down Expand Up @@ -55,6 +61,7 @@ Task SendMessage(UnicastTransportOperation transportOperation, ConfirmsAwareChan

var properties = new BasicProperties();
properties.Fill(message, transportOperation.Properties);
messageCustomization(transportOperation, properties);

return channel.SendMessage(transportOperation.Destination, message, properties, cancellationToken);
}
Expand All @@ -67,6 +74,7 @@ Task PublishMessage(MulticastTransportOperation transportOperation, ConfirmsAwar

var properties = new BasicProperties();
properties.Fill(message, transportOperation.Properties);
messageCustomization(transportOperation, properties);

return channel.PublishMessage(transportOperation.MessageType, message, properties, cancellationToken);
}
Expand Down