Skip to content

Commit

Permalink
Do something horribly inefficient with even uglier hacks to get thing…
Browse files Browse the repository at this point in the history
…s going
  • Loading branch information
danielmarbach committed Dec 13, 2024
1 parent 8bbf5aa commit 1aa7b78
Show file tree
Hide file tree
Showing 6 changed files with 84 additions and 25 deletions.
36 changes: 24 additions & 12 deletions src/Transport/Administration/NamespacePermissions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,21 +12,33 @@ sealed class NamespacePermissions
{
readonly ServiceBusAdministrationClient adminClient;

public NamespacePermissions(TokenCredential? tokenCredential, string fullyQualifiedNamespace, string connectionString)
=> adminClient = tokenCredential != null ? new ServiceBusAdministrationClient(fullyQualifiedNamespace, tokenCredential) : new ServiceBusAdministrationClient(connectionString);
readonly Lazy<Task<ServiceBusAdministrationClient>> manageClient;

public async ValueTask<ServiceBusAdministrationClient> CanManage(CancellationToken cancellationToken = default)
public NamespacePermissions(TokenCredential? tokenCredential, string fullyQualifiedNamespace, string connectionString)
{
try
{
await adminClient.QueueExistsAsync("$nservicebus-verification-queue", cancellationToken)
.ConfigureAwait(false);
return adminClient;
}
catch (UnauthorizedAccessException e)
adminClient = tokenCredential != null
? new ServiceBusAdministrationClient(fullyQualifiedNamespace, tokenCredential)
: new ServiceBusAdministrationClient(connectionString);

manageClient = new Lazy<Task<ServiceBusAdministrationClient>>(async () =>
{
throw new Exception("Management rights are required to run this endpoint. Verify that the SAS policy has the Manage claim.", e);
}
try
{
await adminClient.QueueExistsAsync("$nservicebus-verification-queue")
.ConfigureAwait(false);
return adminClient;
}
catch (UnauthorizedAccessException e)
{
throw new Exception("Management rights are required to run this endpoint. Verify that the SAS policy has the Manage claim.", e);
}
}, LazyThreadSafetyMode.ExecutionAndPublication);
}

public Task<ServiceBusAdministrationClient> CanManage(CancellationToken cancellationToken = default)
{
cancellationToken.ThrowIfCancellationRequested();
return manageClient.Value;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,17 @@ class TopicPerEventTypeTopologySubscriptionManager : ISubscriptionManager
{
static readonly ILog Logger = LogManager.GetLogger<TopicPerEventTypeTopologySubscriptionManager>();

readonly ServiceBusAdministrationClient client;
readonly NamespacePermissions namespacePermissions;
readonly string subscribingQueue;
readonly string subscriptionName;

public TopicPerEventTypeTopologySubscriptionManager(
string subscribingQueue,
AzureServiceBusTransport transportSettings,
ServiceBusAdministrationClient client)
NamespacePermissions namespacePermissions)
{
this.subscribingQueue = subscribingQueue;
this.client = client;
this.namespacePermissions = namespacePermissions;

subscriptionName = transportSettings.SubscriptionNamingConvention(subscribingQueue);
}
Expand All @@ -36,24 +36,39 @@ public async Task SubscribeAll(MessageMetadata[] eventTypes, ContextBag context,
return;
}

ServiceBusAdministrationClient client;
try
{
client = await namespacePermissions.CanManage(cancellationToken).ConfigureAwait(false);
}
catch (Exception e) when (!e.IsCausedBy(cancellationToken))
{
return;
}
catch (Exception e) when (e.InnerException is UnauthorizedAccessException unauthorizedAccessException)
{
Logger.InfoFormat("Subscription {0} could not be created. Reason: {1}", subscriptionName, unauthorizedAccessException.Message);
return;
}

if (eventTypes.Length == 1)
{
await SubscribeEvent(eventTypes[0], cancellationToken)
await SubscribeEvent(client, eventTypes[0], cancellationToken)
.ConfigureAwait(false);
}
else
{
var subscribeTasks = new List<Task>(eventTypes.Length);
foreach (var eventType in eventTypes)
{
subscribeTasks.Add(SubscribeEvent(eventType, cancellationToken));
subscribeTasks.Add(SubscribeEvent(client, eventType, cancellationToken));
}
await Task.WhenAll(subscribeTasks)
.ConfigureAwait(false);
}
}

async Task SubscribeEvent(MessageMetadata eventType, CancellationToken cancellationToken)
async Task SubscribeEvent(ServiceBusAdministrationClient client, MessageMetadata eventType, CancellationToken cancellationToken)
{
// TODO: There is no convention nor mapping here currently.
// TODO: Is it a good idea to use the subscriptionName as the endpoint name?
Expand Down Expand Up @@ -87,6 +102,21 @@ async Task SubscribeEvent(MessageMetadata eventType, CancellationToken cancellat

public async Task Unsubscribe(MessageMetadata eventType, ContextBag context, CancellationToken cancellationToken = default)
{
ServiceBusAdministrationClient client;
try
{
client = await namespacePermissions.CanManage(cancellationToken).ConfigureAwait(false);
}
catch (Exception e) when (!e.IsCausedBy(cancellationToken))
{
return;
}
catch (Exception e) when (e.InnerException is UnauthorizedAccessException unauthorizedAccessException)
{
Logger.InfoFormat("Subscription {0} could not be created. Reason: {1}", subscriptionName, unauthorizedAccessException.Message);
return;
}

try
{
// TODO: There is no convention nor mapping here currently.
Expand All @@ -96,6 +126,10 @@ public async Task Unsubscribe(MessageMetadata eventType, ContextBag context, Can
catch (ServiceBusException sbe) when (sbe.Reason == ServiceBusFailureReason.MessagingEntityNotFound)
{
}
catch (UnauthorizedAccessException unauthorizedAccessException)
{
Logger.InfoFormat("Subscription {0} could not be deleted. Reason: {1}", subscriptionName, unauthorizedAccessException.Message);
}
}
}
}
5 changes: 3 additions & 2 deletions src/Transport/AzureServiceBusTransport.cs
Original file line number Diff line number Diff line change
Expand Up @@ -97,11 +97,12 @@ public override async Task<TransportInfrastructure> Initialize(HostSettings host
? new ServiceBusClient(FullyQualifiedNamespace, TokenCredential, defaultClientOptions)
: new ServiceBusClient(ConnectionString, defaultClientOptions);

var infrastructure = new AzureServiceBusTransportInfrastructure(this, hostSettings, receiveSettingsAndClientPairs, defaultClient);
var namespacePermissions = new NamespacePermissions(TokenCredential, FullyQualifiedNamespace, ConnectionString);

var infrastructure = new AzureServiceBusTransportInfrastructure(this, hostSettings, receiveSettingsAndClientPairs, defaultClient, namespacePermissions);

if (hostSettings.SetupInfrastructure)
{
var namespacePermissions = new NamespacePermissions(TokenCredential, FullyQualifiedNamespace, ConnectionString);
var adminClient = await namespacePermissions.CanManage(cancellationToken)
.ConfigureAwait(false);

Expand Down
9 changes: 7 additions & 2 deletions src/Transport/AzureServiceBusTransportInfrastructure.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,22 @@ sealed class AzureServiceBusTransportInfrastructure : TransportInfrastructure
readonly MessageSenderRegistry messageSenderRegistry;
readonly HostSettings hostSettings;
readonly ServiceBusClient defaultClient;
readonly NamespacePermissions namespacePermissions;
readonly (ReceiveSettings receiveSettings, ServiceBusClient client)[] receiveSettingsAndClientPairs;

public AzureServiceBusTransportInfrastructure(
AzureServiceBusTransport transportSettings,
HostSettings hostSettings,
(ReceiveSettings receiveSettings, ServiceBusClient client)[] receiveSettingsAndClientPairs,
ServiceBusClient defaultClient
ServiceBusClient defaultClient,
NamespacePermissions namespacePermissions
)
{
this.transportSettings = transportSettings;

this.hostSettings = hostSettings;
this.defaultClient = defaultClient;
this.namespacePermissions = namespacePermissions;
this.receiveSettingsAndClientPairs = receiveSettingsAndClientPairs;

messageSenderRegistry = new MessageSenderRegistry(defaultClient);
Expand Down Expand Up @@ -79,7 +82,9 @@ IMessageReceiver CreateMessagePump(ReceiveSettings receiveSettings, ServiceBusCl
receiveSettings,
hostSettings.CriticalErrorAction,
receiveSettings.UsePublishSubscribe
? new ForwardingTopologySubscriptionManager(receiveAddress, transportSettings, defaultClient)
? transportSettings.Topology.TopicToSubscribeOn is not null ?
new ForwardingTopologySubscriptionManager(receiveAddress, transportSettings, defaultClient) :
new TopicPerEventTypeTopologySubscriptionManager(receiveAddress, transportSettings, namespacePermissions)
: null,
subQueue
);
Expand Down
8 changes: 5 additions & 3 deletions src/Transport/Sending/OutgoingTransportOperationExtensions.cs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
#nullable enable

namespace NServiceBus.Transport.AzureServiceBus
{
using System;
Expand Down Expand Up @@ -26,12 +28,12 @@ public static void ApplyCustomizationToOutgoingNativeMessage(this IOutgoingTrans
action(message);
}

public static string ExtractDestination(this IOutgoingTransportOperation outgoingTransportOperation, string defaultMulticastRoute)
public static string ExtractDestination(this IOutgoingTransportOperation outgoingTransportOperation, string? defaultMulticastRoute)
{
switch (outgoingTransportOperation)
{
case MulticastTransportOperation:
return defaultMulticastRoute;
case MulticastTransportOperation multicastTransportOperation:
return (defaultMulticastRoute ?? multicastTransportOperation.MessageType.FullName) ?? throw new InvalidOperationException("Multicast route is not defined.");
case UnicastTransportOperation unicastTransportOperation:
var destination = unicastTransportOperation.Destination;

Expand Down
5 changes: 5 additions & 0 deletions src/Transport/TopicTopology.cs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,11 @@ public static TopicTopology Hierarchy(string topicToPublishTo, string topicToSub
return hierarchy;
}

/// <summary>
///
/// </summary>
public static TopicTopology TopicPerEventType => new();

/// <inheritdoc />
public bool Equals(TopicTopology other) => string.Equals(TopicToPublishTo, other.TopicToPublishTo, StringComparison.OrdinalIgnoreCase) && string.Equals(TopicToSubscribeOn, other.TopicToSubscribeOn, StringComparison.OrdinalIgnoreCase);

Expand Down

0 comments on commit 1aa7b78

Please sign in to comment.