Skip to content

Commit

Permalink
Merge pull request #1422 from Particular/disable-delayed-infra
Browse files Browse the repository at this point in the history
Add a constructor parameter to not create the delayed delivery infrastructure
  • Loading branch information
WilliamBZA authored Aug 6, 2024
2 parents 29ec7a8 + 1a98486 commit ff6fce3
Show file tree
Hide file tree
Showing 6 changed files with 82 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ namespace NServiceBus
public class RabbitMQTransport : NServiceBus.Transport.TransportDefinition
{
public RabbitMQTransport(NServiceBus.RoutingTopology routingTopology, string connectionString) { }
public RabbitMQTransport(NServiceBus.RoutingTopology routingTopology, string connectionString, bool enableDelayedDelivery) { }
public System.Security.Cryptography.X509Certificates.X509Certificate2 ClientCertificate { get; set; }
public System.TimeSpan HeartbeatInterval { get; set; }
public System.Func<RabbitMQ.Client.Events.BasicDeliverEventArgs, string> MessageIdStrategy { get; set; }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ public TransportDefinition CreateTransportDefinition()
{
var connectionString = Environment.GetEnvironmentVariable("RabbitMQTransport_ConnectionString") ?? "host=localhost";

var transport = new RabbitMQTransport(RoutingTopology.Conventional(QueueType.Classic), connectionString);
var transport = new RabbitMQTransport(RoutingTopology.Conventional(QueueType.Classic), connectionString, false);

return transport;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
namespace NServiceBus.Transport.RabbitMQ.TransportTests
{
using System;
using System.Threading.Tasks;
using DelayedDelivery;
using NServiceBus.TransportTests;
using NUnit.Framework;

public class When_delayed_delivery_is_disabled : NServiceBusTransportTest
{
[TestCase(TransportTransactionMode.ReceiveOnly)]
public async Task Should_throw_when_sending(TransportTransactionMode transactionMode)
{
await StartPump(
(context, _) => Task.FromResult(0),
(_, __) => Task.FromResult(ErrorHandleResult.RetryRequired),
transactionMode);

Assert.ThrowsAsync<Exception>(async () =>
{
var dispatchProperties = new DispatchProperties { DelayDeliveryWith = new DelayDeliveryWith(TimeSpan.FromHours(1)) };
await SendMessage(InputQueueName, [], null, dispatchProperties);
});
}
}
}
21 changes: 20 additions & 1 deletion src/NServiceBus.Transport.RabbitMQ/RabbitMQTransport.cs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,25 @@ public RabbitMQTransport(RoutingTopology routingTopology, string connectionStrin
ConnectionConfiguration = ConnectionConfiguration.Create(connectionString);
}

/// <summary>
/// Creates a new instance of the RabbitMQ transport.
/// </summary>
/// <param name="routingTopology">The routing topology to use.</param>
/// <param name="connectionString">The connection string to use when connecting to the broker.</param>
/// <param name="enableDelayedDelivery">Should the delayed delivery infrastructure be created by the endpoint</param>
public RabbitMQTransport(RoutingTopology routingTopology, string connectionString, bool enableDelayedDelivery)
: base(TransportTransactionMode.ReceiveOnly,
supportsDelayedDelivery: enableDelayedDelivery,
supportsPublishSubscribe: true,
supportsTTBR: true)
{
ArgumentNullException.ThrowIfNull(routingTopology);
ArgumentNullException.ThrowIfNull(connectionString);

RoutingTopology = routingTopology.Create();
ConnectionConfiguration = ConnectionConfiguration.Create(connectionString);
}

internal ConnectionConfiguration ConnectionConfiguration { get; set; }

internal IRoutingTopology RoutingTopology { get; set; }
Expand Down Expand Up @@ -174,7 +193,7 @@ public override Task<TransportInfrastructure> Initialize(HostSettings hostSettin

var infra = new RabbitMQTransportInfrastructure(hostSettings, receivers, connectionFactory,
RoutingTopology, channelProvider, converter, TimeToWaitBeforeTriggeringCircuitBreaker,
PrefetchCountCalculation, NetworkRecoveryInterval);
PrefetchCountCalculation, NetworkRecoveryInterval, SupportsDelayedDelivery);

if (hostSettings.SetupInfrastructure)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,21 @@ sealed class RabbitMQTransportInfrastructure : TransportInfrastructure
readonly ChannelProvider channelProvider;
readonly IRoutingTopology routingTopology;
readonly TimeSpan networkRecoveryInterval;
readonly bool supportsDelayedDelivery;

public RabbitMQTransportInfrastructure(HostSettings hostSettings, ReceiveSettings[] receiverSettings, ConnectionFactory connectionFactory, IRoutingTopology routingTopology,
public RabbitMQTransportInfrastructure(HostSettings hostSettings, ReceiveSettings[] receiverSettings,
ConnectionFactory connectionFactory, IRoutingTopology routingTopology,
ChannelProvider channelProvider, MessageConverter messageConverter,
TimeSpan timeToWaitBeforeTriggeringCircuitBreaker, PrefetchCountCalculation prefetchCountCalculation, TimeSpan networkRecoveryInterval)
TimeSpan timeToWaitBeforeTriggeringCircuitBreaker, PrefetchCountCalculation prefetchCountCalculation,
TimeSpan networkRecoveryInterval, bool supportsDelayedDelivery)
{
this.connectionFactory = connectionFactory;
this.routingTopology = routingTopology;
this.channelProvider = channelProvider;
this.networkRecoveryInterval = networkRecoveryInterval;
this.supportsDelayedDelivery = supportsDelayedDelivery;

Dispatcher = new MessageDispatcher(channelProvider);
Dispatcher = new MessageDispatcher(channelProvider, supportsDelayedDelivery);
Receivers = receiverSettings.Select(x => CreateMessagePump(hostSettings, x, messageConverter, timeToWaitBeforeTriggeringCircuitBreaker, prefetchCountCalculation))
.ToDictionary(x => x.Id, x => x);
}
Expand All @@ -42,15 +46,22 @@ internal void SetupInfrastructure(string[] sendingQueues)

using var channel = connection.CreateModel();

DelayInfrastructure.Build(channel);
if (supportsDelayedDelivery)
{
DelayInfrastructure.Build(channel);
}

var receivingQueues = Receivers.Select(r => r.Value.ReceiveAddress).ToArray();

routingTopology.Initialize(channel, receivingQueues, sendingQueues);

foreach (string receivingAddress in receivingQueues)
if (supportsDelayedDelivery)
{
routingTopology.BindToDelayInfrastructure(channel, receivingAddress, DelayInfrastructure.DeliveryExchange, DelayInfrastructure.BindingKey(receivingAddress));
foreach (string receivingAddress in receivingQueues)
{
routingTopology.BindToDelayInfrastructure(channel, receivingAddress,
DelayInfrastructure.DeliveryExchange, DelayInfrastructure.BindingKey(receivingAddress));
}
}
}

Expand Down
18 changes: 17 additions & 1 deletion src/NServiceBus.Transport.RabbitMQ/Sending/MessageDispatcher.cs
Original file line number Diff line number Diff line change
@@ -1,16 +1,19 @@
namespace NServiceBus.Transport.RabbitMQ
{
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;

class MessageDispatcher : IMessageDispatcher
{
readonly ChannelProvider channelProvider;
readonly bool supportsDelayedDelivery;

public MessageDispatcher(ChannelProvider channelProvider)
public MessageDispatcher(ChannelProvider channelProvider, bool supportsDelayedDelivery)
{
this.channelProvider = channelProvider;
this.supportsDelayedDelivery = supportsDelayedDelivery;
}

public Task Dispatch(TransportOperations outgoingMessages, TransportTransaction transaction, CancellationToken cancellationToken = default)
Expand Down Expand Up @@ -50,6 +53,8 @@ public Task Dispatch(TransportOperations outgoingMessages, TransportTransaction

Task SendMessage(UnicastTransportOperation transportOperation, ConfirmsAwareChannel channel, CancellationToken cancellationToken)
{
ThrowIfDelayedDeliveryIsDisabledAndMessageIsDelayed(transportOperation);

var message = transportOperation.Message;

var properties = channel.CreateBasicProperties();
Expand All @@ -60,6 +65,8 @@ Task SendMessage(UnicastTransportOperation transportOperation, ConfirmsAwareChan

Task PublishMessage(MulticastTransportOperation transportOperation, ConfirmsAwareChannel channel, CancellationToken cancellationToken)
{
ThrowIfDelayedDeliveryIsDisabledAndMessageIsDelayed(transportOperation);

var message = transportOperation.Message;

var properties = channel.CreateBasicProperties();
Expand All @@ -68,5 +75,14 @@ Task PublishMessage(MulticastTransportOperation transportOperation, ConfirmsAwar
return channel.PublishMessage(transportOperation.MessageType, message, properties, cancellationToken);
}

void ThrowIfDelayedDeliveryIsDisabledAndMessageIsDelayed(IOutgoingTransportOperation transportOperation)
{
if (!supportsDelayedDelivery &&
(transportOperation.Properties.DelayDeliveryWith != null ||
transportOperation.Properties.DoNotDeliverBefore != null))
{
throw new Exception("Delayed delivery has been disabled in the transport settings.");
}
}
}
}

0 comments on commit ff6fce3

Please sign in to comment.