diff --git a/.editorconfig b/.editorconfig index 9d3abe5..50615ba 100644 --- a/.editorconfig +++ b/.editorconfig @@ -42,10 +42,10 @@ csharp_indent_labels = one_less_than_current csharp_preferred_modifier_order = public,private,protected,internal,file,static,extern,new,virtual,abstract,sealed,override,readonly,unsafe,required,volatile,async:suggestion # avoid this. unless absolutely necessary -dotnet_style_qualification_for_field = false:suggestion -dotnet_style_qualification_for_property = false:suggestion -dotnet_style_qualification_for_method = false:suggestion -dotnet_style_qualification_for_event = false:suggestion +dotnet_style_qualification_for_field = false:error +dotnet_style_qualification_for_property = false:error +dotnet_style_qualification_for_method = false:error +dotnet_style_qualification_for_event = false:error # Types: use keywords instead of BCL types, and permit var only when the type is clear csharp_style_var_for_built_in_types = false:suggestion @@ -154,6 +154,10 @@ csharp_space_between_method_declaration_parameter_list_parentheses = false csharp_space_between_parentheses = false csharp_space_between_square_brackets = false +# Custom +csharp_style_namespace_declarations = file_scoped:error +dotnet_diagnostic.IDE0005.severity = error # Using directive is unnecessary. + # C++ Files [*.{cpp,h,in}] curly_bracket_next_line = true diff --git a/src/Foundatio.AzureServiceBus/Extensions/QueueEntryExtensions.cs b/src/Foundatio.AzureServiceBus/Extensions/QueueEntryExtensions.cs index 5e18296..6172ef1 100644 --- a/src/Foundatio.AzureServiceBus/Extensions/QueueEntryExtensions.cs +++ b/src/Foundatio.AzureServiceBus/Extensions/QueueEntryExtensions.cs @@ -1,17 +1,16 @@ using Foundatio.Queues; using Microsoft.Azure.ServiceBus; -namespace Foundatio.Extensions +namespace Foundatio.Extensions; + +internal static class QueueEntryExtensions { - internal static class QueueEntryExtensions - { - public static string LockToken(this IQueueEntry entry) => entry.Properties["LockToken"]; + public static string LockToken(this IQueueEntry entry) => entry.Properties["LockToken"]; - public static void SetLockToken(this IQueueEntryMetadata entry, Message message) - { - if (message.SystemProperties.IsReceived) - entry.Properties.Add("LockToken", message.SystemProperties.LockToken); - } + public static void SetLockToken(this IQueueEntryMetadata entry, Message message) + { + if (message.SystemProperties.IsReceived) + entry.Properties.Add("LockToken", message.SystemProperties.LockToken); } } diff --git a/src/Foundatio.AzureServiceBus/Extensions/TaskExtensions.cs b/src/Foundatio.AzureServiceBus/Extensions/TaskExtensions.cs index f8fa69d..f0fd963 100644 --- a/src/Foundatio.AzureServiceBus/Extensions/TaskExtensions.cs +++ b/src/Foundatio.AzureServiceBus/Extensions/TaskExtensions.cs @@ -4,26 +4,25 @@ using System.Threading.Tasks; using Foundatio.AsyncEx; -namespace Foundatio.Extensions +namespace Foundatio.Extensions; + +internal static class TaskExtensions { - internal static class TaskExtensions + [DebuggerStepThrough] + public static ConfiguredTaskAwaitable AnyContext(this Task task) { - [DebuggerStepThrough] - public static ConfiguredTaskAwaitable AnyContext(this Task task) - { - return task.ConfigureAwait(continueOnCapturedContext: false); - } + return task.ConfigureAwait(continueOnCapturedContext: false); + } - [DebuggerStepThrough] - public static ConfiguredTaskAwaitable AnyContext(this Task task) - { - return task.ConfigureAwait(continueOnCapturedContext: false); - } + [DebuggerStepThrough] + public static ConfiguredTaskAwaitable AnyContext(this Task task) + { + return task.ConfigureAwait(continueOnCapturedContext: false); + } - [DebuggerStepThrough] - public static ConfiguredTaskAwaitable AnyContext(this AwaitableDisposable task) where TResult : IDisposable - { - return task.ConfigureAwait(continueOnCapturedContext: false); - } + [DebuggerStepThrough] + public static ConfiguredTaskAwaitable AnyContext(this AwaitableDisposable task) where TResult : IDisposable + { + return task.ConfigureAwait(continueOnCapturedContext: false); } } diff --git a/src/Foundatio.AzureServiceBus/Messaging/AzureServiceBusMessageBus.cs b/src/Foundatio.AzureServiceBus/Messaging/AzureServiceBusMessageBus.cs index aa697b8..99fc109 100644 --- a/src/Foundatio.AzureServiceBus/Messaging/AzureServiceBusMessageBus.cs +++ b/src/Foundatio.AzureServiceBus/Messaging/AzureServiceBusMessageBus.cs @@ -10,255 +10,254 @@ using Microsoft.Azure.ServiceBus.Management; using Microsoft.Extensions.Logging; -namespace Foundatio.Messaging +namespace Foundatio.Messaging; + +public class AzureServiceBusMessageBus : MessageBusBase { - public class AzureServiceBusMessageBus : MessageBusBase + private readonly AsyncLock _lock = new(); + private readonly ManagementClient _managementClient; + private TopicClient _topicClient; + private SubscriptionClient _subscriptionClient; + private readonly string _subscriptionName; + + public AzureServiceBusMessageBus(AzureServiceBusMessageBusOptions options) : base(options) { - private readonly AsyncLock _lock = new(); - private readonly ManagementClient _managementClient; - private TopicClient _topicClient; - private SubscriptionClient _subscriptionClient; - private readonly string _subscriptionName; + if (String.IsNullOrEmpty(options.ConnectionString)) + throw new ArgumentException("ConnectionString is required."); - public AzureServiceBusMessageBus(AzureServiceBusMessageBusOptions options) : base(options) - { - if (String.IsNullOrEmpty(options.ConnectionString)) - throw new ArgumentException("ConnectionString is required."); + _managementClient = new ManagementClient(options.ConnectionString); + _subscriptionName = _options.SubscriptionName ?? MessageBusId; + } - _managementClient = new ManagementClient(options.ConnectionString); - _subscriptionName = _options.SubscriptionName ?? MessageBusId; - } + public AzureServiceBusMessageBus(Builder config) + : this(config(new AzureServiceBusMessageBusOptionsBuilder()).Build()) { } + + protected override async Task EnsureTopicSubscriptionAsync(CancellationToken cancellationToken) + { + if (_subscriptionClient != null) + return; - public AzureServiceBusMessageBus(Builder config) - : this(config(new AzureServiceBusMessageBusOptionsBuilder()).Build()) { } + if (!TopicIsCreated) + await EnsureTopicCreatedAsync(cancellationToken).AnyContext(); - protected override async Task EnsureTopicSubscriptionAsync(CancellationToken cancellationToken) + using (await _lock.LockAsync().AnyContext()) { if (_subscriptionClient != null) return; - if (!TopicIsCreated) - await EnsureTopicCreatedAsync(cancellationToken).AnyContext(); - - using (await _lock.LockAsync().AnyContext()) + var sw = Stopwatch.StartNew(); + try { - if (_subscriptionClient != null) - return; - - var sw = Stopwatch.StartNew(); - try - { - await _managementClient.CreateSubscriptionAsync(CreateSubscriptionDescription(), cancellationToken).AnyContext(); - } - catch (MessagingEntityAlreadyExistsException) { } - - // Look into message factory with multiple receivers so more than one connection is made and managed.... - _subscriptionClient = new SubscriptionClient(_options.ConnectionString, _options.Topic, _subscriptionName, _options.SubscriptionReceiveMode, _options.SubscriptionRetryPolicy); - _subscriptionClient.RegisterMessageHandler(OnMessageAsync, new MessageHandlerOptions(MessageHandlerException) - { - /* AutoComplete = true, // Don't run with receive and delete */ - MaxConcurrentCalls = 6 /* calculate this based on the the thread count. */ - }); - if (_options.PrefetchCount.HasValue) - _subscriptionClient.PrefetchCount = _options.PrefetchCount.Value; - sw.Stop(); - _logger.LogTrace("Ensure topic subscription exists took {0}ms.", sw.ElapsedMilliseconds); + await _managementClient.CreateSubscriptionAsync(CreateSubscriptionDescription(), cancellationToken).AnyContext(); } + catch (MessagingEntityAlreadyExistsException) { } + + // Look into message factory with multiple receivers so more than one connection is made and managed.... + _subscriptionClient = new SubscriptionClient(_options.ConnectionString, _options.Topic, _subscriptionName, _options.SubscriptionReceiveMode, _options.SubscriptionRetryPolicy); + _subscriptionClient.RegisterMessageHandler(OnMessageAsync, new MessageHandlerOptions(MessageHandlerException) + { + /* AutoComplete = true, // Don't run with receive and delete */ + MaxConcurrentCalls = 6 /* calculate this based on the the thread count. */ + }); + if (_options.PrefetchCount.HasValue) + _subscriptionClient.PrefetchCount = _options.PrefetchCount.Value; + sw.Stop(); + _logger.LogTrace("Ensure topic subscription exists took {0}ms.", sw.ElapsedMilliseconds); } + } - private Task OnMessageAsync(Microsoft.Azure.ServiceBus.Message brokeredMessage, CancellationToken cancellationToken) + private Task OnMessageAsync(Microsoft.Azure.ServiceBus.Message brokeredMessage, CancellationToken cancellationToken) + { + if (_subscribers.IsEmpty) + return Task.CompletedTask; + + _logger.LogTrace("OnMessageAsync({messageId})", brokeredMessage.MessageId); + var message = new Message(brokeredMessage.Body, DeserializeMessageBody) { - if (_subscribers.IsEmpty) - return Task.CompletedTask; + Type = brokeredMessage.ContentType, + ClrType = GetMappedMessageType(brokeredMessage.ContentType), + CorrelationId = brokeredMessage.CorrelationId, + UniqueId = brokeredMessage.MessageId + }; - _logger.LogTrace("OnMessageAsync({messageId})", brokeredMessage.MessageId); - var message = new Message(brokeredMessage.Body, DeserializeMessageBody) - { - Type = brokeredMessage.ContentType, - ClrType = GetMappedMessageType(brokeredMessage.ContentType), - CorrelationId = brokeredMessage.CorrelationId, - UniqueId = brokeredMessage.MessageId - }; + foreach (var property in brokeredMessage.UserProperties) + message.Properties[property.Key] = property.Value.ToString(); - foreach (var property in brokeredMessage.UserProperties) - message.Properties[property.Key] = property.Value.ToString(); + return SendMessageToSubscribersAsync(message); + } - return SendMessageToSubscribersAsync(message); - } + private Task MessageHandlerException(ExceptionReceivedEventArgs e) + { + _logger.LogWarning("Exception: \"{0}\" {0}", e.Exception.Message, e.ExceptionReceivedContext.EntityPath); + return Task.CompletedTask; + } - private Task MessageHandlerException(ExceptionReceivedEventArgs e) - { - _logger.LogWarning("Exception: \"{0}\" {0}", e.Exception.Message, e.ExceptionReceivedContext.EntityPath); - return Task.CompletedTask; - } + private bool TopicIsCreated => _topicClient != null; + protected override async Task EnsureTopicCreatedAsync(CancellationToken cancellationToken) + { + if (TopicIsCreated) + return; - private bool TopicIsCreated => _topicClient != null; - protected override async Task EnsureTopicCreatedAsync(CancellationToken cancellationToken) + using (await _lock.LockAsync().AnyContext()) { if (TopicIsCreated) return; - using (await _lock.LockAsync().AnyContext()) + var sw = Stopwatch.StartNew(); + try { - if (TopicIsCreated) - return; - - var sw = Stopwatch.StartNew(); - try - { - await _managementClient.CreateTopicAsync(CreateTopicDescription()).AnyContext(); - } - catch (MessagingEntityAlreadyExistsException) { } - - _topicClient = new TopicClient(_options.ConnectionString, _options.Topic); - sw.Stop(); - _logger.LogTrace("Ensure topic exists took {0}ms.", sw.ElapsedMilliseconds); + await _managementClient.CreateTopicAsync(CreateTopicDescription()).AnyContext(); } + catch (MessagingEntityAlreadyExistsException) { } + + _topicClient = new TopicClient(_options.ConnectionString, _options.Topic); + sw.Stop(); + _logger.LogTrace("Ensure topic exists took {0}ms.", sw.ElapsedMilliseconds); } + } - protected override Task PublishImplAsync(string messageType, object message, MessageOptions options, CancellationToken cancellationToken) + protected override Task PublishImplAsync(string messageType, object message, MessageOptions options, CancellationToken cancellationToken) + { + var brokeredMessage = new Microsoft.Azure.ServiceBus.Message(_serializer.SerializeToBytes(message)) { - var brokeredMessage = new Microsoft.Azure.ServiceBus.Message(_serializer.SerializeToBytes(message)) - { - CorrelationId = options.CorrelationId, - ContentType = messageType - }; + CorrelationId = options.CorrelationId, + ContentType = messageType + }; - if (!String.IsNullOrEmpty(options.UniqueId)) - brokeredMessage.MessageId = options.UniqueId; + if (!String.IsNullOrEmpty(options.UniqueId)) + brokeredMessage.MessageId = options.UniqueId; - foreach (var property in options.Properties) - brokeredMessage.UserProperties[property.Key] = property.Value; - - if (options.DeliveryDelay.HasValue && options.DeliveryDelay.Value > TimeSpan.Zero) - { - _logger.LogTrace("Schedule delayed message: {messageType} ({delay}ms)", messageType, options.DeliveryDelay.Value.TotalMilliseconds); - brokeredMessage.ScheduledEnqueueTimeUtc = SystemClock.UtcNow.Add(options.DeliveryDelay.Value); - } - else - { - _logger.LogTrace("Message Publish: {messageType}", messageType); - } + foreach (var property in options.Properties) + brokeredMessage.UserProperties[property.Key] = property.Value; - return _topicClient.SendAsync(brokeredMessage); + if (options.DeliveryDelay.HasValue && options.DeliveryDelay.Value > TimeSpan.Zero) + { + _logger.LogTrace("Schedule delayed message: {messageType} ({delay}ms)", messageType, options.DeliveryDelay.Value.TotalMilliseconds); + brokeredMessage.ScheduledEnqueueTimeUtc = SystemClock.UtcNow.Add(options.DeliveryDelay.Value); } - - private TopicDescription CreateTopicDescription() + else { - var td = new TopicDescription(_options.Topic); + _logger.LogTrace("Message Publish: {messageType}", messageType); + } - if (_options.TopicAutoDeleteOnIdle.HasValue) - td.AutoDeleteOnIdle = _options.TopicAutoDeleteOnIdle.Value; + return _topicClient.SendAsync(brokeredMessage); + } - if (_options.TopicDefaultMessageTimeToLive.HasValue) - td.DefaultMessageTimeToLive = _options.TopicDefaultMessageTimeToLive.Value; + private TopicDescription CreateTopicDescription() + { + var td = new TopicDescription(_options.Topic); - if (_options.TopicMaxSizeInMegabytes.HasValue) - td.MaxSizeInMB = _options.TopicMaxSizeInMegabytes.Value; + if (_options.TopicAutoDeleteOnIdle.HasValue) + td.AutoDeleteOnIdle = _options.TopicAutoDeleteOnIdle.Value; - if (_options.TopicRequiresDuplicateDetection.HasValue) - td.RequiresDuplicateDetection = _options.TopicRequiresDuplicateDetection.Value; + if (_options.TopicDefaultMessageTimeToLive.HasValue) + td.DefaultMessageTimeToLive = _options.TopicDefaultMessageTimeToLive.Value; - if (_options.TopicDuplicateDetectionHistoryTimeWindow.HasValue) - td.DuplicateDetectionHistoryTimeWindow = _options.TopicDuplicateDetectionHistoryTimeWindow.Value; + if (_options.TopicMaxSizeInMegabytes.HasValue) + td.MaxSizeInMB = _options.TopicMaxSizeInMegabytes.Value; - if (_options.TopicEnableBatchedOperations.HasValue) - td.EnableBatchedOperations = _options.TopicEnableBatchedOperations.Value; + if (_options.TopicRequiresDuplicateDetection.HasValue) + td.RequiresDuplicateDetection = _options.TopicRequiresDuplicateDetection.Value; - if (_options.TopicStatus.HasValue) - td.Status = _options.TopicStatus.Value; + if (_options.TopicDuplicateDetectionHistoryTimeWindow.HasValue) + td.DuplicateDetectionHistoryTimeWindow = _options.TopicDuplicateDetectionHistoryTimeWindow.Value; - if (_options.TopicSupportOrdering.HasValue) - td.SupportOrdering = _options.TopicSupportOrdering.Value; + if (_options.TopicEnableBatchedOperations.HasValue) + td.EnableBatchedOperations = _options.TopicEnableBatchedOperations.Value; - if (_options.TopicEnablePartitioning.HasValue) - td.EnablePartitioning = _options.TopicEnablePartitioning.Value; + if (_options.TopicStatus.HasValue) + td.Status = _options.TopicStatus.Value; - if (!String.IsNullOrEmpty(_options.TopicUserMetadata)) - td.UserMetadata = _options.TopicUserMetadata; + if (_options.TopicSupportOrdering.HasValue) + td.SupportOrdering = _options.TopicSupportOrdering.Value; - return td; - } + if (_options.TopicEnablePartitioning.HasValue) + td.EnablePartitioning = _options.TopicEnablePartitioning.Value; - private SubscriptionDescription CreateSubscriptionDescription() - { - var sd = new SubscriptionDescription(_options.Topic, _subscriptionName); + if (!String.IsNullOrEmpty(_options.TopicUserMetadata)) + td.UserMetadata = _options.TopicUserMetadata; - if (_options.SubscriptionAutoDeleteOnIdle.HasValue) - sd.AutoDeleteOnIdle = _options.SubscriptionAutoDeleteOnIdle.Value; + return td; + } - if (_options.SubscriptionDefaultMessageTimeToLive.HasValue) - sd.DefaultMessageTimeToLive = _options.SubscriptionDefaultMessageTimeToLive.Value; + private SubscriptionDescription CreateSubscriptionDescription() + { + var sd = new SubscriptionDescription(_options.Topic, _subscriptionName); - if (_options.SubscriptionWorkItemTimeout.HasValue) - sd.LockDuration = _options.SubscriptionWorkItemTimeout.Value; + if (_options.SubscriptionAutoDeleteOnIdle.HasValue) + sd.AutoDeleteOnIdle = _options.SubscriptionAutoDeleteOnIdle.Value; - if (_options.SubscriptionRequiresSession.HasValue) - sd.RequiresSession = _options.SubscriptionRequiresSession.Value; + if (_options.SubscriptionDefaultMessageTimeToLive.HasValue) + sd.DefaultMessageTimeToLive = _options.SubscriptionDefaultMessageTimeToLive.Value; - if (_options.SubscriptionEnableDeadLetteringOnMessageExpiration.HasValue) - sd.EnableDeadLetteringOnMessageExpiration = _options.SubscriptionEnableDeadLetteringOnMessageExpiration.Value; + if (_options.SubscriptionWorkItemTimeout.HasValue) + sd.LockDuration = _options.SubscriptionWorkItemTimeout.Value; - if (_options.SubscriptionEnableDeadLetteringOnFilterEvaluationExceptions.HasValue) - sd.EnableDeadLetteringOnFilterEvaluationExceptions = _options.SubscriptionEnableDeadLetteringOnFilterEvaluationExceptions.Value; + if (_options.SubscriptionRequiresSession.HasValue) + sd.RequiresSession = _options.SubscriptionRequiresSession.Value; - if (_options.SubscriptionMaxDeliveryCount.HasValue) - sd.MaxDeliveryCount = _options.SubscriptionMaxDeliveryCount.Value; + if (_options.SubscriptionEnableDeadLetteringOnMessageExpiration.HasValue) + sd.EnableDeadLetteringOnMessageExpiration = _options.SubscriptionEnableDeadLetteringOnMessageExpiration.Value; - if (_options.SubscriptionEnableBatchedOperations.HasValue) - sd.EnableBatchedOperations = _options.SubscriptionEnableBatchedOperations.Value; + if (_options.SubscriptionEnableDeadLetteringOnFilterEvaluationExceptions.HasValue) + sd.EnableDeadLetteringOnFilterEvaluationExceptions = _options.SubscriptionEnableDeadLetteringOnFilterEvaluationExceptions.Value; - if (_options.SubscriptionStatus.HasValue) - sd.Status = _options.SubscriptionStatus.Value; + if (_options.SubscriptionMaxDeliveryCount.HasValue) + sd.MaxDeliveryCount = _options.SubscriptionMaxDeliveryCount.Value; - if (!String.IsNullOrEmpty(_options.SubscriptionForwardTo)) - sd.ForwardTo = _options.SubscriptionForwardTo; + if (_options.SubscriptionEnableBatchedOperations.HasValue) + sd.EnableBatchedOperations = _options.SubscriptionEnableBatchedOperations.Value; - if (!String.IsNullOrEmpty(_options.SubscriptionForwardDeadLetteredMessagesTo)) - sd.ForwardDeadLetteredMessagesTo = _options.SubscriptionForwardDeadLetteredMessagesTo; + if (_options.SubscriptionStatus.HasValue) + sd.Status = _options.SubscriptionStatus.Value; - if (!String.IsNullOrEmpty(_options.SubscriptionUserMetadata)) - sd.UserMetadata = _options.SubscriptionUserMetadata; + if (!String.IsNullOrEmpty(_options.SubscriptionForwardTo)) + sd.ForwardTo = _options.SubscriptionForwardTo; - return sd; - } + if (!String.IsNullOrEmpty(_options.SubscriptionForwardDeadLetteredMessagesTo)) + sd.ForwardDeadLetteredMessagesTo = _options.SubscriptionForwardDeadLetteredMessagesTo; - public override void Dispose() - { - base.Dispose(); - CloseTopicClient(); - CloseSubscriptionClient(); - _managementClient?.CloseAsync(); - } + if (!String.IsNullOrEmpty(_options.SubscriptionUserMetadata)) + sd.UserMetadata = _options.SubscriptionUserMetadata; + + return sd; + } + + public override void Dispose() + { + base.Dispose(); + CloseTopicClient(); + CloseSubscriptionClient(); + _managementClient?.CloseAsync(); + } - private void CloseTopicClient() + private void CloseTopicClient() + { + if (_topicClient == null) + return; + + using (_lock.Lock()) { if (_topicClient == null) return; - using (_lock.Lock()) - { - if (_topicClient == null) - return; - - _topicClient?.CloseAsync(); - _topicClient = null; - } + _topicClient?.CloseAsync(); + _topicClient = null; } + } + + private void CloseSubscriptionClient() + { + if (_subscriptionClient == null) + return; - private void CloseSubscriptionClient() + using (_lock.Lock()) { if (_subscriptionClient == null) return; - using (_lock.Lock()) - { - if (_subscriptionClient == null) - return; - - _subscriptionClient?.CloseAsync(); - _subscriptionClient = null; - } + _subscriptionClient?.CloseAsync(); + _subscriptionClient = null; } } } diff --git a/src/Foundatio.AzureServiceBus/Messaging/AzureServiceBusMessageBusOptions.cs b/src/Foundatio.AzureServiceBus/Messaging/AzureServiceBusMessageBusOptions.cs index 91f53b4..33e6785 100644 --- a/src/Foundatio.AzureServiceBus/Messaging/AzureServiceBusMessageBusOptions.cs +++ b/src/Foundatio.AzureServiceBus/Messaging/AzureServiceBusMessageBusOptions.cs @@ -2,335 +2,334 @@ using Microsoft.Azure.ServiceBus; using Microsoft.Azure.ServiceBus.Management; -namespace Foundatio.Messaging +namespace Foundatio.Messaging; + +public class AzureServiceBusMessageBusOptions : SharedMessageBusOptions +{ + public string ConnectionString { get; set; } + + /// + /// Prefetching enables the queue or subscription client to load additional messages from the service when it performs a receive operation. + /// https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-performance-improvements + /// + public int? PrefetchCount { get; set; } + + /// + /// The idle interval after which the topic is automatically deleted. The minimum duration is 5 minutes. + /// + public TimeSpan? TopicAutoDeleteOnIdle { get; set; } + + /// + /// The default message time to live value for a topic + /// + public TimeSpan? TopicDefaultMessageTimeToLive { get; set; } + + /// + /// The maximum size of the topic in megabytes. + /// + public long? TopicMaxSizeInMegabytes { get; set; } + + /// + /// Set to true if topic requires duplicate detection. + /// + public bool? TopicRequiresDuplicateDetection { get; set; } + + /// + /// The duration of the duplicate detection history. + /// + public TimeSpan? TopicDuplicateDetectionHistoryTimeWindow { get; set; } + + /// + /// Returns true if server-side batched operations are enabled. + /// + public bool? TopicEnableBatchedOperations { get; set; } + + /// + /// Controls whether messages should be filtered before publishing. + /// + public bool? TopicEnableFilteringMessagesBeforePublishing { get; set; } + + /// + /// Returns true if the message is anonymous accessible. + /// + public bool? TopicIsAnonymousAccessible { get; set; } + + /// + /// Returns the status of the topic (enabled or disabled). When an entity is disabled, that entity cannot send or receive messages. + /// + public EntityStatus? TopicStatus { get; set; } + + /// + /// Returns true if the queue supports ordering. + /// + public bool? TopicSupportOrdering { get; set; } + + /// + /// Returns true if the topic is to be partitioned across multiple message brokers. + /// + public bool? TopicEnablePartitioning { get; set; } + + /// + /// Returns true if the queue holds a message in memory temporarily before writing it to persistent storage. + /// + public bool? TopicEnableExpress { get; set; } + + /// + /// Returns user metadata. + /// + public string TopicUserMetadata { get; set; } + + /// + /// If no subscription name is specified, then a fanout type message bus will be created. + /// + public string SubscriptionName { get; set; } + + /// + /// The idle interval after which the subscription is automatically deleted. The minimum duration is 5 minutes. + /// + public TimeSpan? SubscriptionAutoDeleteOnIdle { get; set; } + + /// + /// The default message time to live. + /// + public TimeSpan? SubscriptionDefaultMessageTimeToLive { get; set; } + + /// + /// The lock duration time span for the subscription. + /// + public TimeSpan? SubscriptionWorkItemTimeout { get; set; } + + /// + /// the value indicating if a subscription supports the concept of session. + /// + public bool? SubscriptionRequiresSession { get; set; } + + /// + /// Returns true if the subscription has dead letter support when a message expires. + /// + public bool? SubscriptionEnableDeadLetteringOnMessageExpiration { get; set; } + + /// + /// Returns true if the subscription has dead letter support on filter evaluation exceptions. + /// + public bool? SubscriptionEnableDeadLetteringOnFilterEvaluationExceptions { get; set; } + + /// + /// The number of maximum deliveries. + /// + public int? SubscriptionMaxDeliveryCount { get; set; } + + /// + /// Returns true if server-side batched operations are enabled. + /// + public bool? SubscriptionEnableBatchedOperations { get; set; } + + /// + /// Returns the status of the subcription (enabled or disabled). When an entity is disabled, that entity cannot send or receive messages. + /// + public EntityStatus? SubscriptionStatus { get; set; } + + /// + /// Returns the path to the recipient to which the message is forwarded. + /// + public string SubscriptionForwardTo { get; set; } + + /// + /// Returns the path to the recipient to which the dead lettered message is forwarded. + /// + public string SubscriptionForwardDeadLetteredMessagesTo { get; set; } + + /// + /// Returns user metadata. + /// + public string SubscriptionUserMetadata { get; set; } + + public RetryPolicy SubscriptionRetryPolicy { get; set; } + + public ReceiveMode SubscriptionReceiveMode { get; set; } = ReceiveMode.ReceiveAndDelete; +} + +public class AzureServiceBusMessageBusOptionsBuilder : SharedMessageBusOptionsBuilder< + AzureServiceBusMessageBusOptions, AzureServiceBusMessageBusOptionsBuilder> { - public class AzureServiceBusMessageBusOptions : SharedMessageBusOptions + + public AzureServiceBusMessageBusOptionsBuilder ConnectionString(string connectionString) + { + Target.ConnectionString = connectionString; + return this; + } + + public AzureServiceBusMessageBusOptionsBuilder PrefetchCount(int prefetchCount) + { + Target.PrefetchCount = prefetchCount; + return this; + } + + public AzureServiceBusMessageBusOptionsBuilder TopicAutoDeleteOnIdle(TimeSpan topicAutoDeleteOnIdle) + { + Target.TopicAutoDeleteOnIdle = topicAutoDeleteOnIdle; + return this; + } + + public AzureServiceBusMessageBusOptionsBuilder TopicDefaultMessageTimeToLive(TimeSpan topicDefaultMessageTimeToLive) + { + Target.TopicDefaultMessageTimeToLive = topicDefaultMessageTimeToLive; + return this; + } + + public AzureServiceBusMessageBusOptionsBuilder TopicMaxSizeInMegabytes(long topicMaxSizeInMegabytes) + { + Target.TopicMaxSizeInMegabytes = topicMaxSizeInMegabytes; + return this; + } + + public AzureServiceBusMessageBusOptionsBuilder TopicRequiresDuplicateDetection(bool topicRequiresDuplicateDetection) + { + Target.TopicRequiresDuplicateDetection = topicRequiresDuplicateDetection; + return this; + } + + public AzureServiceBusMessageBusOptionsBuilder TopicDuplicateDetectionHistoryTimeWindow(TimeSpan topicDuplicateDetectionHistoryTimeWindow) + { + Target.TopicDuplicateDetectionHistoryTimeWindow = topicDuplicateDetectionHistoryTimeWindow; + return this; + } + + public AzureServiceBusMessageBusOptionsBuilder TopicEnableBatchedOperations(bool topicEnableBatchedOperations) + { + Target.TopicEnableBatchedOperations = topicEnableBatchedOperations; + return this; + } + + public AzureServiceBusMessageBusOptionsBuilder TopicEnableFilteringMessagesBeforePublishing(bool topicEnableFilteringMessagesBeforePublishing) + { + Target.TopicEnableFilteringMessagesBeforePublishing = topicEnableFilteringMessagesBeforePublishing; + return this; + } + + public AzureServiceBusMessageBusOptionsBuilder TopicIsAnonymousAccessible(bool topicIsAnonymousAccessible) + { + Target.TopicIsAnonymousAccessible = topicIsAnonymousAccessible; + return this; + } + + public AzureServiceBusMessageBusOptionsBuilder TopicStatus(EntityStatus topicStatus) + { + Target.TopicStatus = topicStatus; + return this; + } + + public AzureServiceBusMessageBusOptionsBuilder TopicSupportOrdering(bool topicSupportOrdering) + { + Target.TopicSupportOrdering = topicSupportOrdering; + return this; + } + + public AzureServiceBusMessageBusOptionsBuilder TopicEnablePartitioning(bool topicEnablePartitioning) + { + Target.TopicEnablePartitioning = topicEnablePartitioning; + return this; + } + + public AzureServiceBusMessageBusOptionsBuilder TopicEnableExpress(bool topicEnableExpress) { - public string ConnectionString { get; set; } - - /// - /// Prefetching enables the queue or subscription client to load additional messages from the service when it performs a receive operation. - /// https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-performance-improvements - /// - public int? PrefetchCount { get; set; } - - /// - /// The idle interval after which the topic is automatically deleted. The minimum duration is 5 minutes. - /// - public TimeSpan? TopicAutoDeleteOnIdle { get; set; } - - /// - /// The default message time to live value for a topic - /// - public TimeSpan? TopicDefaultMessageTimeToLive { get; set; } - - /// - /// The maximum size of the topic in megabytes. - /// - public long? TopicMaxSizeInMegabytes { get; set; } - - /// - /// Set to true if topic requires duplicate detection. - /// - public bool? TopicRequiresDuplicateDetection { get; set; } - - /// - /// The duration of the duplicate detection history. - /// - public TimeSpan? TopicDuplicateDetectionHistoryTimeWindow { get; set; } - - /// - /// Returns true if server-side batched operations are enabled. - /// - public bool? TopicEnableBatchedOperations { get; set; } - - /// - /// Controls whether messages should be filtered before publishing. - /// - public bool? TopicEnableFilteringMessagesBeforePublishing { get; set; } - - /// - /// Returns true if the message is anonymous accessible. - /// - public bool? TopicIsAnonymousAccessible { get; set; } - - /// - /// Returns the status of the topic (enabled or disabled). When an entity is disabled, that entity cannot send or receive messages. - /// - public EntityStatus? TopicStatus { get; set; } - - /// - /// Returns true if the queue supports ordering. - /// - public bool? TopicSupportOrdering { get; set; } - - /// - /// Returns true if the topic is to be partitioned across multiple message brokers. - /// - public bool? TopicEnablePartitioning { get; set; } - - /// - /// Returns true if the queue holds a message in memory temporarily before writing it to persistent storage. - /// - public bool? TopicEnableExpress { get; set; } - - /// - /// Returns user metadata. - /// - public string TopicUserMetadata { get; set; } - - /// - /// If no subscription name is specified, then a fanout type message bus will be created. - /// - public string SubscriptionName { get; set; } - - /// - /// The idle interval after which the subscription is automatically deleted. The minimum duration is 5 minutes. - /// - public TimeSpan? SubscriptionAutoDeleteOnIdle { get; set; } - - /// - /// The default message time to live. - /// - public TimeSpan? SubscriptionDefaultMessageTimeToLive { get; set; } - - /// - /// The lock duration time span for the subscription. - /// - public TimeSpan? SubscriptionWorkItemTimeout { get; set; } - - /// - /// the value indicating if a subscription supports the concept of session. - /// - public bool? SubscriptionRequiresSession { get; set; } - - /// - /// Returns true if the subscription has dead letter support when a message expires. - /// - public bool? SubscriptionEnableDeadLetteringOnMessageExpiration { get; set; } - - /// - /// Returns true if the subscription has dead letter support on filter evaluation exceptions. - /// - public bool? SubscriptionEnableDeadLetteringOnFilterEvaluationExceptions { get; set; } - - /// - /// The number of maximum deliveries. - /// - public int? SubscriptionMaxDeliveryCount { get; set; } - - /// - /// Returns true if server-side batched operations are enabled. - /// - public bool? SubscriptionEnableBatchedOperations { get; set; } - - /// - /// Returns the status of the subcription (enabled or disabled). When an entity is disabled, that entity cannot send or receive messages. - /// - public EntityStatus? SubscriptionStatus { get; set; } - - /// - /// Returns the path to the recipient to which the message is forwarded. - /// - public string SubscriptionForwardTo { get; set; } - - /// - /// Returns the path to the recipient to which the dead lettered message is forwarded. - /// - public string SubscriptionForwardDeadLetteredMessagesTo { get; set; } - - /// - /// Returns user metadata. - /// - public string SubscriptionUserMetadata { get; set; } - - public RetryPolicy SubscriptionRetryPolicy { get; set; } - - public ReceiveMode SubscriptionReceiveMode { get; set; } = ReceiveMode.ReceiveAndDelete; + Target.TopicEnableExpress = topicEnableExpress; + return this; } - public class AzureServiceBusMessageBusOptionsBuilder : SharedMessageBusOptionsBuilder< - AzureServiceBusMessageBusOptions, AzureServiceBusMessageBusOptionsBuilder> + public AzureServiceBusMessageBusOptionsBuilder TopicUserMetadata(string topicUserMetadata) { + Target.TopicUserMetadata = topicUserMetadata ?? throw new ArgumentNullException(nameof(topicUserMetadata)); + return this; + } + + public AzureServiceBusMessageBusOptionsBuilder SubscriptionName(string subscriptionName) + { + Target.SubscriptionName = subscriptionName ?? throw new ArgumentNullException(nameof(subscriptionName)); + return this; + } + + public AzureServiceBusMessageBusOptionsBuilder SubscriptionAutoDeleteOnIdle(TimeSpan subscriptionAutoDeleteOnIdle) + { + Target.SubscriptionAutoDeleteOnIdle = subscriptionAutoDeleteOnIdle; + return this; + } + + public AzureServiceBusMessageBusOptionsBuilder SubscriptionDefaultMessageTimeToLive(TimeSpan subscriptionDefaultMessageTimeToLive) + { + Target.SubscriptionDefaultMessageTimeToLive = subscriptionDefaultMessageTimeToLive; + return this; + } + + public AzureServiceBusMessageBusOptionsBuilder SubscriptionWorkItemTimeout(TimeSpan subscriptionWorkItemTimeout) + { + Target.SubscriptionWorkItemTimeout = subscriptionWorkItemTimeout; + return this; + } + + public AzureServiceBusMessageBusOptionsBuilder SubscriptionRequiresSession(bool subscriptionRequiresSession) + { + Target.SubscriptionRequiresSession = subscriptionRequiresSession; + return this; + } + + public AzureServiceBusMessageBusOptionsBuilder SubscriptionEnableDeadLetteringOnMessageExpiration(bool subscriptionEnableDeadLetteringOnMessageExpiration) + { + Target.SubscriptionEnableDeadLetteringOnMessageExpiration = subscriptionEnableDeadLetteringOnMessageExpiration; + return this; + } + + public AzureServiceBusMessageBusOptionsBuilder SubscriptionEnableDeadLetteringOnFilterEvaluationExceptions(bool subscriptionEnableDeadLetteringOnFilterEvaluationExceptions) + { + Target.SubscriptionEnableDeadLetteringOnFilterEvaluationExceptions = subscriptionEnableDeadLetteringOnFilterEvaluationExceptions; + return this; + } - public AzureServiceBusMessageBusOptionsBuilder ConnectionString(string connectionString) - { - Target.ConnectionString = connectionString; - return this; - } - - public AzureServiceBusMessageBusOptionsBuilder PrefetchCount(int prefetchCount) - { - Target.PrefetchCount = prefetchCount; - return this; - } - - public AzureServiceBusMessageBusOptionsBuilder TopicAutoDeleteOnIdle(TimeSpan topicAutoDeleteOnIdle) - { - Target.TopicAutoDeleteOnIdle = topicAutoDeleteOnIdle; - return this; - } - - public AzureServiceBusMessageBusOptionsBuilder TopicDefaultMessageTimeToLive(TimeSpan topicDefaultMessageTimeToLive) - { - Target.TopicDefaultMessageTimeToLive = topicDefaultMessageTimeToLive; - return this; - } - - public AzureServiceBusMessageBusOptionsBuilder TopicMaxSizeInMegabytes(long topicMaxSizeInMegabytes) - { - Target.TopicMaxSizeInMegabytes = topicMaxSizeInMegabytes; - return this; - } - - public AzureServiceBusMessageBusOptionsBuilder TopicRequiresDuplicateDetection(bool topicRequiresDuplicateDetection) - { - Target.TopicRequiresDuplicateDetection = topicRequiresDuplicateDetection; - return this; - } - - public AzureServiceBusMessageBusOptionsBuilder TopicDuplicateDetectionHistoryTimeWindow(TimeSpan topicDuplicateDetectionHistoryTimeWindow) - { - Target.TopicDuplicateDetectionHistoryTimeWindow = topicDuplicateDetectionHistoryTimeWindow; - return this; - } - - public AzureServiceBusMessageBusOptionsBuilder TopicEnableBatchedOperations(bool topicEnableBatchedOperations) - { - Target.TopicEnableBatchedOperations = topicEnableBatchedOperations; - return this; - } - - public AzureServiceBusMessageBusOptionsBuilder TopicEnableFilteringMessagesBeforePublishing(bool topicEnableFilteringMessagesBeforePublishing) - { - Target.TopicEnableFilteringMessagesBeforePublishing = topicEnableFilteringMessagesBeforePublishing; - return this; - } - - public AzureServiceBusMessageBusOptionsBuilder TopicIsAnonymousAccessible(bool topicIsAnonymousAccessible) - { - Target.TopicIsAnonymousAccessible = topicIsAnonymousAccessible; - return this; - } - - public AzureServiceBusMessageBusOptionsBuilder TopicStatus(EntityStatus topicStatus) - { - Target.TopicStatus = topicStatus; - return this; - } - - public AzureServiceBusMessageBusOptionsBuilder TopicSupportOrdering(bool topicSupportOrdering) - { - Target.TopicSupportOrdering = topicSupportOrdering; - return this; - } - - public AzureServiceBusMessageBusOptionsBuilder TopicEnablePartitioning(bool topicEnablePartitioning) - { - Target.TopicEnablePartitioning = topicEnablePartitioning; - return this; - } - - public AzureServiceBusMessageBusOptionsBuilder TopicEnableExpress(bool topicEnableExpress) - { - Target.TopicEnableExpress = topicEnableExpress; - return this; - } - - public AzureServiceBusMessageBusOptionsBuilder TopicUserMetadata(string topicUserMetadata) - { - Target.TopicUserMetadata = topicUserMetadata ?? throw new ArgumentNullException(nameof(topicUserMetadata)); - return this; - } - - public AzureServiceBusMessageBusOptionsBuilder SubscriptionName(string subscriptionName) - { - Target.SubscriptionName = subscriptionName ?? throw new ArgumentNullException(nameof(subscriptionName)); - return this; - } - - public AzureServiceBusMessageBusOptionsBuilder SubscriptionAutoDeleteOnIdle(TimeSpan subscriptionAutoDeleteOnIdle) - { - Target.SubscriptionAutoDeleteOnIdle = subscriptionAutoDeleteOnIdle; - return this; - } - - public AzureServiceBusMessageBusOptionsBuilder SubscriptionDefaultMessageTimeToLive(TimeSpan subscriptionDefaultMessageTimeToLive) - { - Target.SubscriptionDefaultMessageTimeToLive = subscriptionDefaultMessageTimeToLive; - return this; - } - - public AzureServiceBusMessageBusOptionsBuilder SubscriptionWorkItemTimeout(TimeSpan subscriptionWorkItemTimeout) - { - Target.SubscriptionWorkItemTimeout = subscriptionWorkItemTimeout; - return this; - } - - public AzureServiceBusMessageBusOptionsBuilder SubscriptionRequiresSession(bool subscriptionRequiresSession) - { - Target.SubscriptionRequiresSession = subscriptionRequiresSession; - return this; - } - - public AzureServiceBusMessageBusOptionsBuilder SubscriptionEnableDeadLetteringOnMessageExpiration(bool subscriptionEnableDeadLetteringOnMessageExpiration) - { - Target.SubscriptionEnableDeadLetteringOnMessageExpiration = subscriptionEnableDeadLetteringOnMessageExpiration; - return this; - } - - public AzureServiceBusMessageBusOptionsBuilder SubscriptionEnableDeadLetteringOnFilterEvaluationExceptions(bool subscriptionEnableDeadLetteringOnFilterEvaluationExceptions) - { - Target.SubscriptionEnableDeadLetteringOnFilterEvaluationExceptions = subscriptionEnableDeadLetteringOnFilterEvaluationExceptions; - return this; - } - - public AzureServiceBusMessageBusOptionsBuilder SubscriptionMaxDeliveryCount(int subscriptionMaxDeliveryCount) - { - Target.SubscriptionMaxDeliveryCount = subscriptionMaxDeliveryCount; - return this; - } - - public AzureServiceBusMessageBusOptionsBuilder SubscriptionEnableBatchedOperations(bool subscriptionEnableBatchedOperations) - { - Target.SubscriptionEnableBatchedOperations = subscriptionEnableBatchedOperations; - return this; - } - - public AzureServiceBusMessageBusOptionsBuilder SubscriptionStatus(EntityStatus subscriptionStatus) - { - Target.SubscriptionStatus = subscriptionStatus; - return this; - } - - public AzureServiceBusMessageBusOptionsBuilder SubscriptionForwardTo(string subscriptionForwardTo) - { - Target.SubscriptionForwardTo = subscriptionForwardTo ?? throw new ArgumentNullException(nameof(subscriptionForwardTo)); - return this; - } - - public AzureServiceBusMessageBusOptionsBuilder SubscriptionForwardDeadLetteredMessagesTo(string subscriptionForwardDeadLetteredMessagesTo) - { - Target.SubscriptionForwardDeadLetteredMessagesTo = subscriptionForwardDeadLetteredMessagesTo ?? throw new ArgumentNullException(nameof(subscriptionForwardDeadLetteredMessagesTo)); - return this; - } - - public AzureServiceBusMessageBusOptionsBuilder SubscriptionUserMetadata(string subscriptionUserMetadata) - { - Target.SubscriptionUserMetadata = subscriptionUserMetadata ?? throw new ArgumentNullException(nameof(subscriptionUserMetadata)); - return this; - } - - public AzureServiceBusMessageBusOptionsBuilder SubscriptionRetryPolicy(RetryPolicy subscriptionRetryPolicy) - { - Target.SubscriptionRetryPolicy = subscriptionRetryPolicy ?? throw new ArgumentNullException(nameof(subscriptionRetryPolicy)); - return this; - } - - public AzureServiceBusMessageBusOptionsBuilder SubscriptionReceiveMode(ReceiveMode receiveMode) - { - Target.SubscriptionReceiveMode = receiveMode; - return this; - } + public AzureServiceBusMessageBusOptionsBuilder SubscriptionMaxDeliveryCount(int subscriptionMaxDeliveryCount) + { + Target.SubscriptionMaxDeliveryCount = subscriptionMaxDeliveryCount; + return this; + } + + public AzureServiceBusMessageBusOptionsBuilder SubscriptionEnableBatchedOperations(bool subscriptionEnableBatchedOperations) + { + Target.SubscriptionEnableBatchedOperations = subscriptionEnableBatchedOperations; + return this; + } + + public AzureServiceBusMessageBusOptionsBuilder SubscriptionStatus(EntityStatus subscriptionStatus) + { + Target.SubscriptionStatus = subscriptionStatus; + return this; + } + + public AzureServiceBusMessageBusOptionsBuilder SubscriptionForwardTo(string subscriptionForwardTo) + { + Target.SubscriptionForwardTo = subscriptionForwardTo ?? throw new ArgumentNullException(nameof(subscriptionForwardTo)); + return this; + } + + public AzureServiceBusMessageBusOptionsBuilder SubscriptionForwardDeadLetteredMessagesTo(string subscriptionForwardDeadLetteredMessagesTo) + { + Target.SubscriptionForwardDeadLetteredMessagesTo = subscriptionForwardDeadLetteredMessagesTo ?? throw new ArgumentNullException(nameof(subscriptionForwardDeadLetteredMessagesTo)); + return this; + } + + public AzureServiceBusMessageBusOptionsBuilder SubscriptionUserMetadata(string subscriptionUserMetadata) + { + Target.SubscriptionUserMetadata = subscriptionUserMetadata ?? throw new ArgumentNullException(nameof(subscriptionUserMetadata)); + return this; + } + + public AzureServiceBusMessageBusOptionsBuilder SubscriptionRetryPolicy(RetryPolicy subscriptionRetryPolicy) + { + Target.SubscriptionRetryPolicy = subscriptionRetryPolicy ?? throw new ArgumentNullException(nameof(subscriptionRetryPolicy)); + return this; + } + + public AzureServiceBusMessageBusOptionsBuilder SubscriptionReceiveMode(ReceiveMode receiveMode) + { + Target.SubscriptionReceiveMode = receiveMode; + return this; } } diff --git a/src/Foundatio.AzureServiceBus/Queues/AzureServiceBusQueue.cs b/src/Foundatio.AzureServiceBus/Queues/AzureServiceBusQueue.cs index 25438bf..810a49e 100644 --- a/src/Foundatio.AzureServiceBus/Queues/AzureServiceBusQueue.cs +++ b/src/Foundatio.AzureServiceBus/Queues/AzureServiceBusQueue.cs @@ -7,7 +7,6 @@ using Foundatio.AsyncEx; using Foundatio.AzureServiceBus.Queues; using Foundatio.Extensions; -using Foundatio.Messaging; using Foundatio.Serializer; using Foundatio.Utility; using Microsoft.Azure.ServiceBus; @@ -16,340 +15,339 @@ using Microsoft.Extensions.Logging; using Message = Microsoft.Azure.ServiceBus.Message; -namespace Foundatio.Queues +namespace Foundatio.Queues; + +public class AzureServiceBusQueue : QueueBase> where T : class { - public class AzureServiceBusQueue : QueueBase> where T : class + private readonly AsyncLock _lock = new(); + private readonly ManagementClient _managementClient; + private MessageSender _queueSender; + private MessageReceiver _queueReceiver; + private long _enqueuedCount; + private long _dequeuedCount; + private long _completedCount; + private long _abandonedCount; + private long _workerErrorCount; + + public AzureServiceBusQueue(AzureServiceBusQueueOptions options) : base(options) { - private readonly AsyncLock _lock = new(); - private readonly ManagementClient _managementClient; - private MessageSender _queueSender; - private MessageReceiver _queueReceiver; - private long _enqueuedCount; - private long _dequeuedCount; - private long _completedCount; - private long _abandonedCount; - private long _workerErrorCount; - - public AzureServiceBusQueue(AzureServiceBusQueueOptions options) : base(options) - { - if (String.IsNullOrEmpty(options.ConnectionString)) - throw new ArgumentException("ConnectionString is required."); + if (String.IsNullOrEmpty(options.ConnectionString)) + throw new ArgumentException("ConnectionString is required."); - if (options.Name.Length > 260) - throw new ArgumentException("Queue name must be set and be less than 260 characters."); + if (options.Name.Length > 260) + throw new ArgumentException("Queue name must be set and be less than 260 characters."); - if (options.WorkItemTimeout > TimeSpan.FromMinutes(5)) - throw new ArgumentException("The maximum WorkItemTimeout value for is 5 minutes; the default value is 1 minute."); + if (options.WorkItemTimeout > TimeSpan.FromMinutes(5)) + throw new ArgumentException("The maximum WorkItemTimeout value for is 5 minutes; the default value is 1 minute."); - if (options.AutoDeleteOnIdle.HasValue && options.AutoDeleteOnIdle < TimeSpan.FromMinutes(5)) - throw new ArgumentException("The minimum AutoDeleteOnIdle duration is 5 minutes."); + if (options.AutoDeleteOnIdle.HasValue && options.AutoDeleteOnIdle < TimeSpan.FromMinutes(5)) + throw new ArgumentException("The minimum AutoDeleteOnIdle duration is 5 minutes."); - if (options.DuplicateDetectionHistoryTimeWindow.HasValue && (options.DuplicateDetectionHistoryTimeWindow < TimeSpan.FromSeconds(20.0) || options.DuplicateDetectionHistoryTimeWindow > TimeSpan.FromDays(7.0))) - throw new ArgumentException("The minimum DuplicateDetectionHistoryTimeWindow duration is 20 seconds and maximum is 7 days."); + if (options.DuplicateDetectionHistoryTimeWindow.HasValue && (options.DuplicateDetectionHistoryTimeWindow < TimeSpan.FromSeconds(20.0) || options.DuplicateDetectionHistoryTimeWindow > TimeSpan.FromDays(7.0))) + throw new ArgumentException("The minimum DuplicateDetectionHistoryTimeWindow duration is 20 seconds and maximum is 7 days."); - if (options.UserMetadata != null && options.UserMetadata.Length > 260) - throw new ArgumentException("Queue UserMetadata must be less than 1024 characters."); + if (options.UserMetadata != null && options.UserMetadata.Length > 260) + throw new ArgumentException("Queue UserMetadata must be less than 1024 characters."); - _managementClient = new ManagementClient(options.ConnectionString); - } + _managementClient = new ManagementClient(options.ConnectionString); + } + + public AzureServiceBusQueue(Builder, AzureServiceBusQueueOptions> config) + : this(config(new AzureServiceBusQueueOptionsBuilder()).Build()) { } - public AzureServiceBusQueue(Builder, AzureServiceBusQueueOptions> config) - : this(config(new AzureServiceBusQueueOptionsBuilder()).Build()) { } + public ManagementClient ManagementClient => _managementClient; + public MessageReceiver MessageReceiver => _queueReceiver; + public MessageSender MessageSender => _queueSender; - public ManagementClient ManagementClient => _managementClient; - public MessageReceiver MessageReceiver => _queueReceiver; - public MessageSender MessageSender => _queueSender; + private bool QueueIsCreated => _queueReceiver != null && _queueSender != null; + protected override async Task EnsureQueueCreatedAsync(CancellationToken cancellationToken = new CancellationToken()) + { + if (QueueIsCreated) + return; - private bool QueueIsCreated => _queueReceiver != null && _queueSender != null; - protected override async Task EnsureQueueCreatedAsync(CancellationToken cancellationToken = new CancellationToken()) + using (await _lock.LockAsync().AnyContext()) { if (QueueIsCreated) return; - using (await _lock.LockAsync().AnyContext()) + var sw = Stopwatch.StartNew(); + try { - if (QueueIsCreated) - return; - - var sw = Stopwatch.StartNew(); - try - { - await _managementClient.CreateQueueAsync(CreateQueueDescription()).AnyContext(); - } - catch (MessagingEntityAlreadyExistsException) { } - - _queueSender = new MessageSender(_options.ConnectionString, _options.Name, _options.RetryPolicy); - _queueReceiver = new MessageReceiver(_options.ConnectionString, _options.Name, ReceiveMode.PeekLock, _options.RetryPolicy); - sw.Stop(); - _logger.LogTrace("Ensure queue exists took {0}ms.", sw.ElapsedMilliseconds); + await _managementClient.CreateQueueAsync(CreateQueueDescription()).AnyContext(); } + catch (MessagingEntityAlreadyExistsException) { } + + _queueSender = new MessageSender(_options.ConnectionString, _options.Name, _options.RetryPolicy); + _queueReceiver = new MessageReceiver(_options.ConnectionString, _options.Name, ReceiveMode.PeekLock, _options.RetryPolicy); + sw.Stop(); + _logger.LogTrace("Ensure queue exists took {0}ms.", sw.ElapsedMilliseconds); } + } - public override async Task DeleteQueueAsync() - { - if (await _managementClient.QueueExistsAsync(_options.Name).AnyContext()) - await _managementClient.DeleteQueueAsync(_options.Name).AnyContext(); + public override async Task DeleteQueueAsync() + { + if (await _managementClient.QueueExistsAsync(_options.Name).AnyContext()) + await _managementClient.DeleteQueueAsync(_options.Name).AnyContext(); + + _queueSender = null; + _queueReceiver = null; + _enqueuedCount = 0; + _dequeuedCount = 0; + _completedCount = 0; + _abandonedCount = 0; + _workerErrorCount = 0; + } - _queueSender = null; - _queueReceiver = null; - _enqueuedCount = 0; - _dequeuedCount = 0; - _completedCount = 0; - _abandonedCount = 0; - _workerErrorCount = 0; - } + protected override async Task GetQueueStatsImplAsync() + { + if (!QueueIsCreated) + return new QueueStats(); - protected override async Task GetQueueStatsImplAsync() + var q = await _managementClient.GetQueueRuntimeInfoAsync(_options.Name).AnyContext(); + return new QueueStats { - if (!QueueIsCreated) - return new QueueStats(); + Queued = q.MessageCount, + Working = 0, + Deadletter = q.MessageCountDetails.DeadLetterMessageCount, + Enqueued = _enqueuedCount, + Dequeued = _dequeuedCount, + Completed = _completedCount, + Abandoned = _abandonedCount, + Errors = _workerErrorCount, + Timeouts = 0 + }; + } - var q = await _managementClient.GetQueueRuntimeInfoAsync(_options.Name).AnyContext(); - return new QueueStats - { - Queued = q.MessageCount, - Working = 0, - Deadletter = q.MessageCountDetails.DeadLetterMessageCount, - Enqueued = _enqueuedCount, - Dequeued = _dequeuedCount, - Completed = _completedCount, - Abandoned = _abandonedCount, - Errors = _workerErrorCount, - Timeouts = 0 - }; - } + protected override Task> GetDeadletterItemsImplAsync(CancellationToken cancellationToken) + { + throw new NotImplementedException(); + } - protected override Task> GetDeadletterItemsImplAsync(CancellationToken cancellationToken) - { - throw new NotImplementedException(); - } + protected override async Task EnqueueImplAsync(T data, QueueEntryOptions options) + { + if (!await OnEnqueuingAsync(data, options).AnyContext()) + return null; - protected override async Task EnqueueImplAsync(T data, QueueEntryOptions options) - { - if (!await OnEnqueuingAsync(data, options).AnyContext()) - return null; + Interlocked.Increment(ref _enqueuedCount); + var stream = new MemoryStream(); + _serializer.Serialize(data, stream); + var brokeredMessage = new Message(stream.ToArray()); + if (!String.IsNullOrEmpty(options.UniqueId)) + brokeredMessage.MessageId = options.UniqueId; + brokeredMessage.CorrelationId = options.CorrelationId; - Interlocked.Increment(ref _enqueuedCount); - var stream = new MemoryStream(); - _serializer.Serialize(data, stream); - var brokeredMessage = new Message(stream.ToArray()); - if (!String.IsNullOrEmpty(options.UniqueId)) - brokeredMessage.MessageId = options.UniqueId; - brokeredMessage.CorrelationId = options.CorrelationId; + if (options is AzureServiceBusQueueEntryOptions asbOptions) + brokeredMessage.SessionId = asbOptions.SessionId; - if (options is AzureServiceBusQueueEntryOptions asbOptions) - brokeredMessage.SessionId = asbOptions.SessionId; + if (options.DeliveryDelay.HasValue && options.DeliveryDelay.Value > TimeSpan.Zero) + brokeredMessage.ScheduledEnqueueTimeUtc = DateTime.UtcNow.Add(options.DeliveryDelay.Value); - if (options.DeliveryDelay.HasValue && options.DeliveryDelay.Value > TimeSpan.Zero) - brokeredMessage.ScheduledEnqueueTimeUtc = DateTime.UtcNow.Add(options.DeliveryDelay.Value); + foreach (var property in options.Properties) + brokeredMessage.UserProperties[property.Key] = property.Value; - foreach (var property in options.Properties) - brokeredMessage.UserProperties[property.Key] = property.Value; + await _queueSender.SendAsync(brokeredMessage).AnyContext(); - await _queueSender.SendAsync(brokeredMessage).AnyContext(); + var entry = new QueueEntry(brokeredMessage.MessageId, brokeredMessage.CorrelationId, data, this, SystemClock.UtcNow, 0); + entry.SetLockToken(brokeredMessage); + foreach (var property in brokeredMessage.UserProperties) + entry.Properties.Add(property.Key, property.Value.ToString()); - var entry = new QueueEntry(brokeredMessage.MessageId, brokeredMessage.CorrelationId, data, this, SystemClock.UtcNow, 0); - entry.SetLockToken(brokeredMessage); - foreach (var property in brokeredMessage.UserProperties) - entry.Properties.Add(property.Key, property.Value.ToString()); + await OnEnqueuedAsync(entry).AnyContext(); - await OnEnqueuedAsync(entry).AnyContext(); + return brokeredMessage.MessageId; + } - return brokeredMessage.MessageId; - } + protected override void StartWorkingImpl(Func, CancellationToken, Task> handler, bool autoComplete, CancellationToken cancellationToken) + { + if (handler == null) + throw new ArgumentNullException(nameof(handler)); - protected override void StartWorkingImpl(Func, CancellationToken, Task> handler, bool autoComplete, CancellationToken cancellationToken) + _logger.LogTrace("WorkerLoop Start {QueueName}", _options.Name); + _queueReceiver.RegisterMessageHandler(async (msg, cancellationToken1) => { - if (handler == null) - throw new ArgumentNullException(nameof(handler)); + _logger.LogTrace("WorkerLoop Signaled {QueueName}", _options.Name); + var queueEntry = await HandleDequeueAsync(msg).AnyContext(); - _logger.LogTrace("WorkerLoop Start {QueueName}", _options.Name); - _queueReceiver.RegisterMessageHandler(async (msg, cancellationToken1) => + try { - _logger.LogTrace("WorkerLoop Signaled {QueueName}", _options.Name); - var queueEntry = await HandleDequeueAsync(msg).AnyContext(); - - try + using (var linkedCancellationToken = GetLinkedDisposableCancellationTokenSource(cancellationToken)) { - using (var linkedCancellationToken = GetLinkedDisposableCancellationTokenSource(cancellationToken)) - { - await handler(queueEntry, linkedCancellationToken.Token).AnyContext(); - } - - if (autoComplete && !queueEntry.IsAbandoned && !queueEntry.IsCompleted) - await queueEntry.CompleteAsync().AnyContext(); + await handler(queueEntry, linkedCancellationToken.Token).AnyContext(); } - catch (Exception ex) - { - Interlocked.Increment(ref _workerErrorCount); - _logger.LogWarning(ex, "Error sending work item to worker: {0}", ex.Message); - if (!queueEntry.IsAbandoned && !queueEntry.IsCompleted) - await queueEntry.AbandonAsync().AnyContext(); - } - }, new MessageHandlerOptions(LogMessageHandlerException)); - } + if (autoComplete && !queueEntry.IsAbandoned && !queueEntry.IsCompleted) + await queueEntry.CompleteAsync().AnyContext(); + } + catch (Exception ex) + { + Interlocked.Increment(ref _workerErrorCount); + _logger.LogWarning(ex, "Error sending work item to worker: {0}", ex.Message); - private Task LogMessageHandlerException(ExceptionReceivedEventArgs e) - { - _logger.LogWarning("Exception: \"{0}\" {0}", e.Exception.Message, e.ExceptionReceivedContext.EntityPath); - return Task.CompletedTask; - } + if (!queueEntry.IsAbandoned && !queueEntry.IsCompleted) + await queueEntry.AbandonAsync().AnyContext(); + } + }, new MessageHandlerOptions(LogMessageHandlerException)); + } - public override async Task> DequeueAsync(TimeSpan? timeout = null) - { - if (!QueueIsCreated) - await EnsureQueueCreatedAsync().AnyContext(); + private Task LogMessageHandlerException(ExceptionReceivedEventArgs e) + { + _logger.LogWarning("Exception: \"{0}\" {0}", e.Exception.Message, e.ExceptionReceivedContext.EntityPath); + return Task.CompletedTask; + } - var msg = await _queueReceiver.ReceiveAsync((timeout == null || timeout.Value.Ticks == 0) ? TimeSpan.FromSeconds(5) : timeout.Value).AnyContext(); - return await HandleDequeueAsync(msg).AnyContext(); - } + public override async Task> DequeueAsync(TimeSpan? timeout = null) + { + if (!QueueIsCreated) + await EnsureQueueCreatedAsync().AnyContext(); - protected override Task> DequeueImplAsync(CancellationToken cancellationToken) - { - _logger.LogWarning("Azure Service Bus does not support CancellationTokens - use TimeSpan overload instead. Using default 30 second timeout."); - return DequeueAsync(); - } + var msg = await _queueReceiver.ReceiveAsync((timeout == null || timeout.Value.Ticks == 0) ? TimeSpan.FromSeconds(5) : timeout.Value).AnyContext(); + return await HandleDequeueAsync(msg).AnyContext(); + } - public override async Task RenewLockAsync(IQueueEntry entry) - { - _logger.LogDebug("Queue {0} renew lock item: {1}", _options.Name, entry.Id); - await _queueReceiver.RenewLockAsync(entry.LockToken()).AnyContext(); - await OnLockRenewedAsync(entry).AnyContext(); - _logger.LogTrace("Renew lock done: {0}", entry.Id); - } + protected override Task> DequeueImplAsync(CancellationToken cancellationToken) + { + _logger.LogWarning("Azure Service Bus does not support CancellationTokens - use TimeSpan overload instead. Using default 30 second timeout."); + return DequeueAsync(); + } - public override async Task CompleteAsync(IQueueEntry entry) - { - _logger.LogDebug("Queue {0} complete item: {1}", _options.Name, entry.Id); - if (entry.IsAbandoned || entry.IsCompleted) - throw new InvalidOperationException("Queue entry has already been completed or abandoned."); - - await _queueReceiver.CompleteAsync(entry.LockToken()).AnyContext(); - Interlocked.Increment(ref _completedCount); - entry.MarkCompleted(); - await OnCompletedAsync(entry).AnyContext(); - _logger.LogTrace("Complete done: {0}", entry.Id); - } + public override async Task RenewLockAsync(IQueueEntry entry) + { + _logger.LogDebug("Queue {0} renew lock item: {1}", _options.Name, entry.Id); + await _queueReceiver.RenewLockAsync(entry.LockToken()).AnyContext(); + await OnLockRenewedAsync(entry).AnyContext(); + _logger.LogTrace("Renew lock done: {0}", entry.Id); + } - public override async Task AbandonAsync(IQueueEntry entry) - { - _logger.LogDebug("Queue {QueueName}:{QueueId} abandon item: {EntryId}", _options.Name, QueueId, entry.Id); - if (entry.IsAbandoned || entry.IsCompleted) - throw new InvalidOperationException("Queue entry has already been completed or abandoned."); - - await _queueReceiver.AbandonAsync(entry.LockToken()).AnyContext(); - Interlocked.Increment(ref _abandonedCount); - entry.MarkAbandoned(); - await OnAbandonedAsync(entry).AnyContext(); - _logger.LogTrace("Abandon complete: {EntryId}", entry.Id); - } + public override async Task CompleteAsync(IQueueEntry entry) + { + _logger.LogDebug("Queue {0} complete item: {1}", _options.Name, entry.Id); + if (entry.IsAbandoned || entry.IsCompleted) + throw new InvalidOperationException("Queue entry has already been completed or abandoned."); + + await _queueReceiver.CompleteAsync(entry.LockToken()).AnyContext(); + Interlocked.Increment(ref _completedCount); + entry.MarkCompleted(); + await OnCompletedAsync(entry).AnyContext(); + _logger.LogTrace("Complete done: {0}", entry.Id); + } - private async Task> HandleDequeueAsync(Message brokeredMessage) - { - if (brokeredMessage == null) - return null; - - var message = _serializer.Deserialize(brokeredMessage.Body); - Interlocked.Increment(ref _dequeuedCount); - var entry = new QueueEntry(brokeredMessage.MessageId, brokeredMessage.CorrelationId, message, this, brokeredMessage.SystemProperties.EnqueuedTimeUtc, brokeredMessage.SystemProperties.DeliveryCount); - entry.SetLockToken(brokeredMessage); - foreach (var property in brokeredMessage.UserProperties) - entry.Properties.Add(property.Key, property.Value.ToString()); - - await OnDequeuedAsync(entry).AnyContext(); - return entry; - } + public override async Task AbandonAsync(IQueueEntry entry) + { + _logger.LogDebug("Queue {QueueName}:{QueueId} abandon item: {EntryId}", _options.Name, QueueId, entry.Id); + if (entry.IsAbandoned || entry.IsCompleted) + throw new InvalidOperationException("Queue entry has already been completed or abandoned."); + + await _queueReceiver.AbandonAsync(entry.LockToken()).AnyContext(); + Interlocked.Increment(ref _abandonedCount); + entry.MarkAbandoned(); + await OnAbandonedAsync(entry).AnyContext(); + _logger.LogTrace("Abandon complete: {EntryId}", entry.Id); + } - private QueueDescription CreateQueueDescription() + private async Task> HandleDequeueAsync(Message brokeredMessage) + { + if (brokeredMessage == null) + return null; + + var message = _serializer.Deserialize(brokeredMessage.Body); + Interlocked.Increment(ref _dequeuedCount); + var entry = new QueueEntry(brokeredMessage.MessageId, brokeredMessage.CorrelationId, message, this, brokeredMessage.SystemProperties.EnqueuedTimeUtc, brokeredMessage.SystemProperties.DeliveryCount); + entry.SetLockToken(brokeredMessage); + foreach (var property in brokeredMessage.UserProperties) + entry.Properties.Add(property.Key, property.Value.ToString()); + + await OnDequeuedAsync(entry).AnyContext(); + return entry; + } + + private QueueDescription CreateQueueDescription() + { + var qd = new QueueDescription(_options.Name) { - var qd = new QueueDescription(_options.Name) - { - LockDuration = _options.WorkItemTimeout, - MaxDeliveryCount = _options.Retries + 1 - }; + LockDuration = _options.WorkItemTimeout, + MaxDeliveryCount = _options.Retries + 1 + }; - if (_options.AutoDeleteOnIdle.HasValue) - qd.AutoDeleteOnIdle = _options.AutoDeleteOnIdle.Value; + if (_options.AutoDeleteOnIdle.HasValue) + qd.AutoDeleteOnIdle = _options.AutoDeleteOnIdle.Value; - if (_options.DefaultMessageTimeToLive.HasValue) - qd.DefaultMessageTimeToLive = _options.DefaultMessageTimeToLive.Value; + if (_options.DefaultMessageTimeToLive.HasValue) + qd.DefaultMessageTimeToLive = _options.DefaultMessageTimeToLive.Value; - if (_options.DuplicateDetectionHistoryTimeWindow.HasValue) - qd.DuplicateDetectionHistoryTimeWindow = _options.DuplicateDetectionHistoryTimeWindow.Value; + if (_options.DuplicateDetectionHistoryTimeWindow.HasValue) + qd.DuplicateDetectionHistoryTimeWindow = _options.DuplicateDetectionHistoryTimeWindow.Value; - if (_options.EnableBatchedOperations.HasValue) - qd.EnableBatchedOperations = _options.EnableBatchedOperations.Value; + if (_options.EnableBatchedOperations.HasValue) + qd.EnableBatchedOperations = _options.EnableBatchedOperations.Value; - if (_options.EnableDeadLetteringOnMessageExpiration.HasValue) - qd.EnableDeadLetteringOnMessageExpiration = _options.EnableDeadLetteringOnMessageExpiration.Value; + if (_options.EnableDeadLetteringOnMessageExpiration.HasValue) + qd.EnableDeadLetteringOnMessageExpiration = _options.EnableDeadLetteringOnMessageExpiration.Value; - if (_options.EnablePartitioning.HasValue) - qd.EnablePartitioning = _options.EnablePartitioning.Value; + if (_options.EnablePartitioning.HasValue) + qd.EnablePartitioning = _options.EnablePartitioning.Value; - if (!String.IsNullOrEmpty(_options.ForwardDeadLetteredMessagesTo)) - qd.ForwardDeadLetteredMessagesTo = _options.ForwardDeadLetteredMessagesTo; + if (!String.IsNullOrEmpty(_options.ForwardDeadLetteredMessagesTo)) + qd.ForwardDeadLetteredMessagesTo = _options.ForwardDeadLetteredMessagesTo; - if (!String.IsNullOrEmpty(_options.ForwardTo)) - qd.ForwardTo = _options.ForwardTo; + if (!String.IsNullOrEmpty(_options.ForwardTo)) + qd.ForwardTo = _options.ForwardTo; - if (_options.MaxSizeInMegabytes.HasValue) - qd.MaxSizeInMB = _options.MaxSizeInMegabytes.Value; + if (_options.MaxSizeInMegabytes.HasValue) + qd.MaxSizeInMB = _options.MaxSizeInMegabytes.Value; - if (_options.RequiresDuplicateDetection.HasValue) - qd.RequiresDuplicateDetection = _options.RequiresDuplicateDetection.Value; + if (_options.RequiresDuplicateDetection.HasValue) + qd.RequiresDuplicateDetection = _options.RequiresDuplicateDetection.Value; - if (_options.RequiresSession.HasValue) - qd.RequiresSession = _options.RequiresSession.Value; + if (_options.RequiresSession.HasValue) + qd.RequiresSession = _options.RequiresSession.Value; - if (_options.Status.HasValue) - qd.Status = _options.Status.Value; + if (_options.Status.HasValue) + qd.Status = _options.Status.Value; - if (!String.IsNullOrEmpty(_options.UserMetadata)) - qd.UserMetadata = _options.UserMetadata; + if (!String.IsNullOrEmpty(_options.UserMetadata)) + qd.UserMetadata = _options.UserMetadata; - return qd; - } + return qd; + } - public override void Dispose() - { - base.Dispose(); - CloseSender(); - CloseReceiver(); - _managementClient.CloseAsync(); - } + public override void Dispose() + { + base.Dispose(); + CloseSender(); + CloseReceiver(); + _managementClient.CloseAsync(); + } + + private void CloseSender() + { + if (_queueSender == null) + return; - private void CloseSender() + using (_lock.Lock()) { if (_queueSender == null) return; - using (_lock.Lock()) - { - if (_queueSender == null) - return; - - _queueSender?.CloseAsync(); - _queueSender = null; - } + _queueSender?.CloseAsync(); + _queueSender = null; } + } - private void CloseReceiver() + private void CloseReceiver() + { + if (_queueReceiver == null) + return; + + using (_lock.Lock()) { if (_queueReceiver == null) return; - using (_lock.Lock()) - { - if (_queueReceiver == null) - return; - - _queueReceiver?.CloseAsync(); - _queueReceiver = null; - } + _queueReceiver?.CloseAsync(); + _queueReceiver = null; } } } diff --git a/src/Foundatio.AzureServiceBus/Queues/AzureServiceBusQueueEntryOptions.cs b/src/Foundatio.AzureServiceBus/Queues/AzureServiceBusQueueEntryOptions.cs index 3c422cb..281fb53 100644 --- a/src/Foundatio.AzureServiceBus/Queues/AzureServiceBusQueueEntryOptions.cs +++ b/src/Foundatio.AzureServiceBus/Queues/AzureServiceBusQueueEntryOptions.cs @@ -1,9 +1,8 @@ using Foundatio.Queues; -namespace Foundatio.AzureServiceBus.Queues +namespace Foundatio.AzureServiceBus.Queues; + +public class AzureServiceBusQueueEntryOptions : QueueEntryOptions { - public class AzureServiceBusQueueEntryOptions : QueueEntryOptions - { - public string SessionId { get; set; } - } + public string SessionId { get; set; } } diff --git a/src/Foundatio.AzureServiceBus/Queues/AzureServiceBusQueueOptions.cs b/src/Foundatio.AzureServiceBus/Queues/AzureServiceBusQueueOptions.cs index e35dd41..ab1bfdd 100644 --- a/src/Foundatio.AzureServiceBus/Queues/AzureServiceBusQueueOptions.cs +++ b/src/Foundatio.AzureServiceBus/Queues/AzureServiceBusQueueOptions.cs @@ -2,206 +2,205 @@ using Microsoft.Azure.ServiceBus; using Microsoft.Azure.ServiceBus.Management; -namespace Foundatio.Queues +namespace Foundatio.Queues; + +public class AzureServiceBusQueueOptions : SharedQueueOptions where T : class +{ + public string ConnectionString { get; set; } + + /// + /// The queue idle interval after which the queue is automatically deleted. + /// + public TimeSpan? AutoDeleteOnIdle { get; set; } + + /// + /// The maximum size of the queue in megabytes. + /// + public long? MaxSizeInMegabytes { get; set; } + + /// + /// Set to true if queue requires duplicate detection. + /// + public bool? RequiresDuplicateDetection { get; set; } + + /// + /// Set to true if queue supports the concept of sessions. + /// + public bool? RequiresSession { get; set; } + + /// + /// The default message time to live. + /// + public TimeSpan? DefaultMessageTimeToLive { get; set; } + + /// + /// Returns true if the queue has dead letter support when a message expires. + /// + public bool? EnableDeadLetteringOnMessageExpiration { get; set; } + + /// + /// The duration of the duplicate detection history. + /// + public TimeSpan? DuplicateDetectionHistoryTimeWindow { get; set; } + + /// + /// Returns true if server-side batched operations are enabled. + /// + public bool? EnableBatchedOperations { get; set; } + + /// + /// Returns true if the message is anonymous accessible. + /// + public bool? IsAnonymousAccessible { get; set; } + + /// + /// Returns true if the queue supports ordering. + /// + public bool? SupportOrdering { get; set; } + + /// + /// Returns the status of the queue (enabled or disabled). When an entity is disabled, that entity cannot send or receive messages. + /// + public EntityStatus? Status { get; set; } + + /// + /// Returns the path to the recipient to which the message is forwarded. + /// + public string ForwardTo { get; set; } + + /// + /// Returns the path to the recipient to which the dead lettered message is forwarded. + /// + public string ForwardDeadLetteredMessagesTo { get; set; } + + /// + /// Returns true if the queue is to be partitioned across multiple message brokers. + /// + public bool? EnablePartitioning { get; set; } + + /// + /// Returns user metadata. + /// + public string UserMetadata { get; set; } + + /// + /// Returns true if the queue holds a message in memory temporarily before writing it to persistent storage. + /// + public bool? EnableExpress { get; set; } + + /// + /// Returns the retry policy; + /// + public RetryPolicy RetryPolicy { get; set; } +} + +public class AzureServiceBusQueueOptionsBuilder : SharedQueueOptionsBuilder, AzureServiceBusQueueOptionsBuilder> where T : class { - public class AzureServiceBusQueueOptions : SharedQueueOptions where T : class - { - public string ConnectionString { get; set; } - - /// - /// The queue idle interval after which the queue is automatically deleted. - /// - public TimeSpan? AutoDeleteOnIdle { get; set; } - - /// - /// The maximum size of the queue in megabytes. - /// - public long? MaxSizeInMegabytes { get; set; } - - /// - /// Set to true if queue requires duplicate detection. - /// - public bool? RequiresDuplicateDetection { get; set; } - - /// - /// Set to true if queue supports the concept of sessions. - /// - public bool? RequiresSession { get; set; } - - /// - /// The default message time to live. - /// - public TimeSpan? DefaultMessageTimeToLive { get; set; } - - /// - /// Returns true if the queue has dead letter support when a message expires. - /// - public bool? EnableDeadLetteringOnMessageExpiration { get; set; } - - /// - /// The duration of the duplicate detection history. - /// - public TimeSpan? DuplicateDetectionHistoryTimeWindow { get; set; } - - /// - /// Returns true if server-side batched operations are enabled. - /// - public bool? EnableBatchedOperations { get; set; } - - /// - /// Returns true if the message is anonymous accessible. - /// - public bool? IsAnonymousAccessible { get; set; } - - /// - /// Returns true if the queue supports ordering. - /// - public bool? SupportOrdering { get; set; } - - /// - /// Returns the status of the queue (enabled or disabled). When an entity is disabled, that entity cannot send or receive messages. - /// - public EntityStatus? Status { get; set; } - - /// - /// Returns the path to the recipient to which the message is forwarded. - /// - public string ForwardTo { get; set; } - - /// - /// Returns the path to the recipient to which the dead lettered message is forwarded. - /// - public string ForwardDeadLetteredMessagesTo { get; set; } - - /// - /// Returns true if the queue is to be partitioned across multiple message brokers. - /// - public bool? EnablePartitioning { get; set; } - - /// - /// Returns user metadata. - /// - public string UserMetadata { get; set; } - - /// - /// Returns true if the queue holds a message in memory temporarily before writing it to persistent storage. - /// - public bool? EnableExpress { get; set; } - - /// - /// Returns the retry policy; - /// - public RetryPolicy RetryPolicy { get; set; } - } - - public class AzureServiceBusQueueOptionsBuilder : SharedQueueOptionsBuilder, AzureServiceBusQueueOptionsBuilder> where T : class - { - public AzureServiceBusQueueOptionsBuilder ConnectionString(string connectionString) - { - Target.ConnectionString = connectionString ?? throw new ArgumentNullException(nameof(connectionString)); - return this; - } - - public AzureServiceBusQueueOptionsBuilder AutoDeleteOnIdle(TimeSpan autoDeleteOnIdle) - { - Target.AutoDeleteOnIdle = autoDeleteOnIdle; - return this; - } - - public AzureServiceBusQueueOptionsBuilder MaxSizeInMegabytes(long maxSizeInMegabytes) - { - Target.MaxSizeInMegabytes = maxSizeInMegabytes; - return this; - } - - public AzureServiceBusQueueOptionsBuilder RequiresDuplicateDetection(bool requiresDuplicateDetection) - { - Target.RequiresDuplicateDetection = requiresDuplicateDetection; - return this; - } - - public AzureServiceBusQueueOptionsBuilder RequiresSession(bool requiresSession) - { - Target.RequiresSession = requiresSession; - return this; - } - - public AzureServiceBusQueueOptionsBuilder DefaultMessageTimeToLive(TimeSpan defaultMessageTimeToLive) - { - Target.DefaultMessageTimeToLive = defaultMessageTimeToLive; - return this; - } - - public AzureServiceBusQueueOptionsBuilder EnableDeadLetteringOnMessageExpiration(bool enableDeadLetteringOnMessageExpiration) - { - Target.EnableDeadLetteringOnMessageExpiration = enableDeadLetteringOnMessageExpiration; - return this; - } - - public AzureServiceBusQueueOptionsBuilder DuplicateDetectionHistoryTimeWindow(TimeSpan duplicateDetectionHistoryTimeWindow) - { - Target.DuplicateDetectionHistoryTimeWindow = duplicateDetectionHistoryTimeWindow; - return this; - } - - public AzureServiceBusQueueOptionsBuilder EnableBatchedOperations(bool enableBatchedOperations) - { - Target.EnableBatchedOperations = enableBatchedOperations; - return this; - } - - public AzureServiceBusQueueOptionsBuilder IsAnonymousAccessible(bool isAnonymousAccessible) - { - Target.IsAnonymousAccessible = isAnonymousAccessible; - return this; - } - - public AzureServiceBusQueueOptionsBuilder SupportOrdering(bool supportOrdering) - { - Target.SupportOrdering = supportOrdering; - return this; - } - - public AzureServiceBusQueueOptionsBuilder Status(EntityStatus status) - { - Target.Status = status; - return this; - } - - public AzureServiceBusQueueOptionsBuilder ForwardTo(string forwardTo) - { - Target.ForwardTo = forwardTo ?? throw new ArgumentNullException(nameof(forwardTo)); - return this; - } - - public AzureServiceBusQueueOptionsBuilder ForwardDeadLetteredMessagesTo(string forwardDeadLetteredMessagesTo) - { - Target.ForwardDeadLetteredMessagesTo = forwardDeadLetteredMessagesTo ?? throw new ArgumentNullException(nameof(forwardDeadLetteredMessagesTo)); - return this; - } - - public AzureServiceBusQueueOptionsBuilder EnablePartitioning(bool enablePartitioning) - { - Target.EnablePartitioning = enablePartitioning; - return this; - } - - public AzureServiceBusQueueOptionsBuilder UserMetadata(string userMetadata) - { - Target.UserMetadata = userMetadata ?? throw new ArgumentNullException(nameof(userMetadata)); - return this; - } - - public AzureServiceBusQueueOptionsBuilder EnableExpress(bool enableExpress) - { - Target.EnableExpress = enableExpress; - return this; - } - - public AzureServiceBusQueueOptionsBuilder RetryPolicy(RetryPolicy retryPolicy) - { - Target.RetryPolicy = retryPolicy ?? throw new ArgumentNullException(nameof(retryPolicy)); - return this; - } + public AzureServiceBusQueueOptionsBuilder ConnectionString(string connectionString) + { + Target.ConnectionString = connectionString ?? throw new ArgumentNullException(nameof(connectionString)); + return this; + } + + public AzureServiceBusQueueOptionsBuilder AutoDeleteOnIdle(TimeSpan autoDeleteOnIdle) + { + Target.AutoDeleteOnIdle = autoDeleteOnIdle; + return this; + } + + public AzureServiceBusQueueOptionsBuilder MaxSizeInMegabytes(long maxSizeInMegabytes) + { + Target.MaxSizeInMegabytes = maxSizeInMegabytes; + return this; + } + + public AzureServiceBusQueueOptionsBuilder RequiresDuplicateDetection(bool requiresDuplicateDetection) + { + Target.RequiresDuplicateDetection = requiresDuplicateDetection; + return this; + } + + public AzureServiceBusQueueOptionsBuilder RequiresSession(bool requiresSession) + { + Target.RequiresSession = requiresSession; + return this; + } + + public AzureServiceBusQueueOptionsBuilder DefaultMessageTimeToLive(TimeSpan defaultMessageTimeToLive) + { + Target.DefaultMessageTimeToLive = defaultMessageTimeToLive; + return this; + } + + public AzureServiceBusQueueOptionsBuilder EnableDeadLetteringOnMessageExpiration(bool enableDeadLetteringOnMessageExpiration) + { + Target.EnableDeadLetteringOnMessageExpiration = enableDeadLetteringOnMessageExpiration; + return this; + } + + public AzureServiceBusQueueOptionsBuilder DuplicateDetectionHistoryTimeWindow(TimeSpan duplicateDetectionHistoryTimeWindow) + { + Target.DuplicateDetectionHistoryTimeWindow = duplicateDetectionHistoryTimeWindow; + return this; + } + + public AzureServiceBusQueueOptionsBuilder EnableBatchedOperations(bool enableBatchedOperations) + { + Target.EnableBatchedOperations = enableBatchedOperations; + return this; + } + + public AzureServiceBusQueueOptionsBuilder IsAnonymousAccessible(bool isAnonymousAccessible) + { + Target.IsAnonymousAccessible = isAnonymousAccessible; + return this; + } + + public AzureServiceBusQueueOptionsBuilder SupportOrdering(bool supportOrdering) + { + Target.SupportOrdering = supportOrdering; + return this; + } + + public AzureServiceBusQueueOptionsBuilder Status(EntityStatus status) + { + Target.Status = status; + return this; + } + + public AzureServiceBusQueueOptionsBuilder ForwardTo(string forwardTo) + { + Target.ForwardTo = forwardTo ?? throw new ArgumentNullException(nameof(forwardTo)); + return this; + } + + public AzureServiceBusQueueOptionsBuilder ForwardDeadLetteredMessagesTo(string forwardDeadLetteredMessagesTo) + { + Target.ForwardDeadLetteredMessagesTo = forwardDeadLetteredMessagesTo ?? throw new ArgumentNullException(nameof(forwardDeadLetteredMessagesTo)); + return this; + } + + public AzureServiceBusQueueOptionsBuilder EnablePartitioning(bool enablePartitioning) + { + Target.EnablePartitioning = enablePartitioning; + return this; + } + + public AzureServiceBusQueueOptionsBuilder UserMetadata(string userMetadata) + { + Target.UserMetadata = userMetadata ?? throw new ArgumentNullException(nameof(userMetadata)); + return this; + } + + public AzureServiceBusQueueOptionsBuilder EnableExpress(bool enableExpress) + { + Target.EnableExpress = enableExpress; + return this; + } + + public AzureServiceBusQueueOptionsBuilder RetryPolicy(RetryPolicy retryPolicy) + { + Target.RetryPolicy = retryPolicy ?? throw new ArgumentNullException(nameof(retryPolicy)); + return this; } } diff --git a/tests/Foundatio.AzureServiceBus.Tests/Messaging/AzureServiceBusMessageBusTests.cs b/tests/Foundatio.AzureServiceBus.Tests/Messaging/AzureServiceBusMessageBusTests.cs index b5ab6cd..23fadbd 100644 --- a/tests/Foundatio.AzureServiceBus.Tests/Messaging/AzureServiceBusMessageBusTests.cs +++ b/tests/Foundatio.AzureServiceBus.Tests/Messaging/AzureServiceBusMessageBusTests.cs @@ -7,134 +7,133 @@ using Xunit; using Xunit.Abstractions; -namespace Foundatio.AzureServiceBus.Tests.Messaging +namespace Foundatio.AzureServiceBus.Tests.Messaging; + +public class AzureServiceBusMessageBusTests : MessageBusTestBase { - public class AzureServiceBusMessageBusTests : MessageBusTestBase + public AzureServiceBusMessageBusTests(ITestOutputHelper output) : base(output) { } + + protected override IMessageBus GetMessageBus(Func config = null) { - public AzureServiceBusMessageBusTests(ITestOutputHelper output) : base(output) { } + string connectionString = Configuration.GetConnectionString("AzureServiceBusConnectionString"); + if (String.IsNullOrEmpty(connectionString)) + return null; + + return new AzureServiceBusMessageBus(o => + { + o.ConnectionString(connectionString); + o.Topic("test-messages"); + o.TopicEnableBatchedOperations(true); + o.TopicEnableExpress(true); + o.TopicEnablePartitioning(true); + o.TopicSupportOrdering(false); + o.TopicRequiresDuplicateDetection(false); + o.SubscriptionAutoDeleteOnIdle(TimeSpan.FromMinutes(5)); + o.SubscriptionEnableBatchedOperations(true); + o.SubscriptionMaxDeliveryCount(Int32.MaxValue); + o.PrefetchCount(500); + o.LoggerFactory(Log); + + config?.Invoke(o.Target); + + return o; + }); + } - protected override IMessageBus GetMessageBus(Func config = null) - { - string connectionString = Configuration.GetConnectionString("AzureServiceBusConnectionString"); - if (String.IsNullOrEmpty(connectionString)) - return null; - - return new AzureServiceBusMessageBus(o => - { - o.ConnectionString(connectionString); - o.Topic("test-messages"); - o.TopicEnableBatchedOperations(true); - o.TopicEnableExpress(true); - o.TopicEnablePartitioning(true); - o.TopicSupportOrdering(false); - o.TopicRequiresDuplicateDetection(false); - o.SubscriptionAutoDeleteOnIdle(TimeSpan.FromMinutes(5)); - o.SubscriptionEnableBatchedOperations(true); - o.SubscriptionMaxDeliveryCount(Int32.MaxValue); - o.PrefetchCount(500); - o.LoggerFactory(Log); - - config?.Invoke(o.Target); - - return o; - }); - } - - [Fact] - public override Task CanUseMessageOptionsAsync() - { - return base.CanUseMessageOptionsAsync(); - } + [Fact] + public override Task CanUseMessageOptionsAsync() + { + return base.CanUseMessageOptionsAsync(); + } - [Fact] - public override Task CanSendMessageAsync() - { - return base.CanSendMessageAsync(); - } + [Fact] + public override Task CanSendMessageAsync() + { + return base.CanSendMessageAsync(); + } - [Fact] - public override Task CanHandleNullMessageAsync() - { - return base.CanHandleNullMessageAsync(); - } + [Fact] + public override Task CanHandleNullMessageAsync() + { + return base.CanHandleNullMessageAsync(); + } - [Fact] - public override Task CanSendDerivedMessageAsync() - { - return base.CanSendDerivedMessageAsync(); - } + [Fact] + public override Task CanSendDerivedMessageAsync() + { + return base.CanSendDerivedMessageAsync(); + } - [Fact] - public override Task CanSendDelayedMessageAsync() - { - Log.SetLogLevel(LogLevel.Information); - return base.CanSendDelayedMessageAsync(); - } + [Fact] + public override Task CanSendDelayedMessageAsync() + { + Log.SetLogLevel(LogLevel.Information); + return base.CanSendDelayedMessageAsync(); + } - [Fact] - public override Task CanSubscribeConcurrentlyAsync() - { - return base.CanSubscribeConcurrentlyAsync(); - } + [Fact] + public override Task CanSubscribeConcurrentlyAsync() + { + return base.CanSubscribeConcurrentlyAsync(); + } - [Fact] - public override Task CanReceiveMessagesConcurrentlyAsync() - { - return base.CanReceiveMessagesConcurrentlyAsync(); - } + [Fact] + public override Task CanReceiveMessagesConcurrentlyAsync() + { + return base.CanReceiveMessagesConcurrentlyAsync(); + } - [Fact] - public override Task CanSendMessageToMultipleSubscribersAsync() - { - return base.CanSendMessageToMultipleSubscribersAsync(); - } + [Fact] + public override Task CanSendMessageToMultipleSubscribersAsync() + { + return base.CanSendMessageToMultipleSubscribersAsync(); + } - [Fact] - public override Task CanTolerateSubscriberFailureAsync() - { - return base.CanTolerateSubscriberFailureAsync(); - } + [Fact] + public override Task CanTolerateSubscriberFailureAsync() + { + return base.CanTolerateSubscriberFailureAsync(); + } - [Fact] - public override Task WillOnlyReceiveSubscribedMessageTypeAsync() - { - return base.WillOnlyReceiveSubscribedMessageTypeAsync(); - } + [Fact] + public override Task WillOnlyReceiveSubscribedMessageTypeAsync() + { + return base.WillOnlyReceiveSubscribedMessageTypeAsync(); + } - [Fact] - public override Task WillReceiveDerivedMessageTypesAsync() - { - return base.WillReceiveDerivedMessageTypesAsync(); - } + [Fact] + public override Task WillReceiveDerivedMessageTypesAsync() + { + return base.WillReceiveDerivedMessageTypesAsync(); + } - [Fact] - public override Task CanSubscribeToAllMessageTypesAsync() - { - return base.CanSubscribeToAllMessageTypesAsync(); - } + [Fact] + public override Task CanSubscribeToAllMessageTypesAsync() + { + return base.CanSubscribeToAllMessageTypesAsync(); + } - [Fact] - public override Task CanCancelSubscriptionAsync() - { - return base.CanCancelSubscriptionAsync(); - } + [Fact] + public override Task CanCancelSubscriptionAsync() + { + return base.CanCancelSubscriptionAsync(); + } - [Fact] - public override Task WontKeepMessagesWithNoSubscribersAsync() - { - return base.WontKeepMessagesWithNoSubscribersAsync(); - } + [Fact] + public override Task WontKeepMessagesWithNoSubscribersAsync() + { + return base.WontKeepMessagesWithNoSubscribersAsync(); + } - [Fact] - public override Task CanReceiveFromMultipleSubscribersAsync() - { - return base.CanReceiveFromMultipleSubscribersAsync(); - } + [Fact] + public override Task CanReceiveFromMultipleSubscribersAsync() + { + return base.CanReceiveFromMultipleSubscribersAsync(); + } - [Fact] - public override void CanDisposeWithNoSubscribersOrPublishers() - { - base.CanDisposeWithNoSubscribersOrPublishers(); - } + [Fact] + public override void CanDisposeWithNoSubscribersOrPublishers() + { + base.CanDisposeWithNoSubscribersOrPublishers(); } } diff --git a/tests/Foundatio.AzureServiceBus.Tests/Queues/AzureServiceBusQueueTests.cs b/tests/Foundatio.AzureServiceBus.Tests/Queues/AzureServiceBusQueueTests.cs index 5354018..2464d7c 100644 --- a/tests/Foundatio.AzureServiceBus.Tests/Queues/AzureServiceBusQueueTests.cs +++ b/tests/Foundatio.AzureServiceBus.Tests/Queues/AzureServiceBusQueueTests.cs @@ -8,190 +8,189 @@ using Xunit; using Xunit.Abstractions; -namespace Foundatio.AzureServiceBus.Tests.Queue +namespace Foundatio.AzureServiceBus.Tests.Queue; + +public class AzureServiceBusQueueTests : QueueTestBase { - public class AzureServiceBusQueueTests : QueueTestBase + private readonly string _queueName = "foundatio-" + Guid.NewGuid().ToString("N").Substring(10); + + public AzureServiceBusQueueTests(ITestOutputHelper output) : base(output) { - private readonly string _queueName = "foundatio-" + Guid.NewGuid().ToString("N").Substring(10); + Log.SetLogLevel>(LogLevel.Trace); + } - public AzureServiceBusQueueTests(ITestOutputHelper output) : base(output) - { - Log.SetLogLevel>(LogLevel.Trace); - } + protected override IQueue GetQueue(int retries = 1, TimeSpan? workItemTimeout = null, TimeSpan? retryDelay = null, int[] retryMultipliers = null, int deadLetterMaxItems = 100, bool runQueueMaintenance = true) + { + string connectionString = Configuration.GetConnectionString("AzureServiceBusConnectionString"); + if (String.IsNullOrEmpty(connectionString)) + return null; + + // TODO: Respect retryMultipliers + var retryPolicy = retryDelay.GetValueOrDefault() > TimeSpan.Zero + ? new RetryExponential(retryDelay.GetValueOrDefault(), retryDelay.GetValueOrDefault() + retryDelay.GetValueOrDefault(), retries + 1) + : RetryPolicy.NoRetry; + + _logger.LogDebug("Queue Id: {queueId}", _queueName); + return new AzureServiceBusQueue(new AzureServiceBusQueueOptions + { + ConnectionString = connectionString, + Name = _queueName, + AutoDeleteOnIdle = TimeSpan.FromMinutes(5), + EnableBatchedOperations = true, + EnableExpress = true, + EnablePartitioning = true, + SupportOrdering = false, + RequiresDuplicateDetection = false, + RequiresSession = false, + Retries = retries, + RetryPolicy = retryPolicy, + WorkItemTimeout = workItemTimeout.GetValueOrDefault(TimeSpan.FromMinutes(5)), + LoggerFactory = Log + }); + } - protected override IQueue GetQueue(int retries = 1, TimeSpan? workItemTimeout = null, TimeSpan? retryDelay = null, int[] retryMultipliers = null, int deadLetterMaxItems = 100, bool runQueueMaintenance = true) - { - string connectionString = Configuration.GetConnectionString("AzureServiceBusConnectionString"); - if (String.IsNullOrEmpty(connectionString)) - return null; - - // TODO: Respect retryMultipliers - var retryPolicy = retryDelay.GetValueOrDefault() > TimeSpan.Zero - ? new RetryExponential(retryDelay.GetValueOrDefault(), retryDelay.GetValueOrDefault() + retryDelay.GetValueOrDefault(), retries + 1) - : RetryPolicy.NoRetry; - - _logger.LogDebug("Queue Id: {queueId}", _queueName); - return new AzureServiceBusQueue(new AzureServiceBusQueueOptions - { - ConnectionString = connectionString, - Name = _queueName, - AutoDeleteOnIdle = TimeSpan.FromMinutes(5), - EnableBatchedOperations = true, - EnableExpress = true, - EnablePartitioning = true, - SupportOrdering = false, - RequiresDuplicateDetection = false, - RequiresSession = false, - Retries = retries, - RetryPolicy = retryPolicy, - WorkItemTimeout = workItemTimeout.GetValueOrDefault(TimeSpan.FromMinutes(5)), - LoggerFactory = Log - }); - } - - protected override Task CleanupQueueAsync(IQueue queue) - { - // Don't delete the queue, it's super expensive and will be cleaned up later. - queue?.Dispose(); - return Task.CompletedTask; - } + protected override Task CleanupQueueAsync(IQueue queue) + { + // Don't delete the queue, it's super expensive and will be cleaned up later. + queue?.Dispose(); + return Task.CompletedTask; + } - [Fact] - public override Task CanQueueAndDequeueWorkItemAsync() - { - return base.CanQueueAndDequeueWorkItemAsync(); - } + [Fact] + public override Task CanQueueAndDequeueWorkItemAsync() + { + return base.CanQueueAndDequeueWorkItemAsync(); + } - [Fact] - public override Task CanDequeueWithCancelledTokenAsync() - { - return base.CanDequeueWithCancelledTokenAsync(); - } + [Fact] + public override Task CanDequeueWithCancelledTokenAsync() + { + return base.CanDequeueWithCancelledTokenAsync(); + } - [Fact] - public override Task CanQueueAndDequeueMultipleWorkItemsAsync() - { - return base.CanQueueAndDequeueMultipleWorkItemsAsync(); - } + [Fact] + public override Task CanQueueAndDequeueMultipleWorkItemsAsync() + { + return base.CanQueueAndDequeueMultipleWorkItemsAsync(); + } - [Fact] - public override Task WillWaitForItemAsync() - { - return base.WillWaitForItemAsync(); - } + [Fact] + public override Task WillWaitForItemAsync() + { + return base.WillWaitForItemAsync(); + } - [Fact] - public override Task DequeueWaitWillGetSignaledAsync() - { - return base.DequeueWaitWillGetSignaledAsync(); - } + [Fact] + public override Task DequeueWaitWillGetSignaledAsync() + { + return base.DequeueWaitWillGetSignaledAsync(); + } - [Fact] - public override Task CanUseQueueWorkerAsync() - { - return base.CanUseQueueWorkerAsync(); - } + [Fact] + public override Task CanUseQueueWorkerAsync() + { + return base.CanUseQueueWorkerAsync(); + } - [Fact] - public override Task CanHandleErrorInWorkerAsync() - { - return base.CanHandleErrorInWorkerAsync(); - } + [Fact] + public override Task CanHandleErrorInWorkerAsync() + { + return base.CanHandleErrorInWorkerAsync(); + } - [Fact(Skip = "Dequeue Time takes forever")] - public override Task WorkItemsWillTimeoutAsync() - { - return base.WorkItemsWillTimeoutAsync(); - } + [Fact(Skip = "Dequeue Time takes forever")] + public override Task WorkItemsWillTimeoutAsync() + { + return base.WorkItemsWillTimeoutAsync(); + } - [Fact(Skip = "Dequeue Time takes forever")] - public override Task WillNotWaitForItemAsync() - { - return base.WillNotWaitForItemAsync(); - } + [Fact(Skip = "Dequeue Time takes forever")] + public override Task WillNotWaitForItemAsync() + { + return base.WillNotWaitForItemAsync(); + } - [Fact] - public override Task WorkItemsWillGetMovedToDeadletterAsync() - { - return base.WorkItemsWillGetMovedToDeadletterAsync(); - } + [Fact] + public override Task WorkItemsWillGetMovedToDeadletterAsync() + { + return base.WorkItemsWillGetMovedToDeadletterAsync(); + } - [Fact(Skip = "Dequeue Time takes forever")] - public override Task CanResumeDequeueEfficientlyAsync() - { - return base.CanResumeDequeueEfficientlyAsync(); - } + [Fact(Skip = "Dequeue Time takes forever")] + public override Task CanResumeDequeueEfficientlyAsync() + { + return base.CanResumeDequeueEfficientlyAsync(); + } - [Fact(Skip = "Dequeue Time takes forever")] - public override Task CanDequeueEfficientlyAsync() - { - return base.CanDequeueEfficientlyAsync(); - } + [Fact(Skip = "Dequeue Time takes forever")] + public override Task CanDequeueEfficientlyAsync() + { + return base.CanDequeueEfficientlyAsync(); + } - [Fact] - public override Task CanDequeueWithLockingAsync() - { - return base.CanDequeueWithLockingAsync(); - } + [Fact] + public override Task CanDequeueWithLockingAsync() + { + return base.CanDequeueWithLockingAsync(); + } - [Fact] - public override Task CanHaveMultipleQueueInstancesWithLockingAsync() - { - return base.CanHaveMultipleQueueInstancesWithLockingAsync(); - } + [Fact] + public override Task CanHaveMultipleQueueInstancesWithLockingAsync() + { + return base.CanHaveMultipleQueueInstancesWithLockingAsync(); + } - [Fact] - public override Task CanAutoCompleteWorkerAsync() - { - return base.CanAutoCompleteWorkerAsync(); - } + [Fact] + public override Task CanAutoCompleteWorkerAsync() + { + return base.CanAutoCompleteWorkerAsync(); + } - [Fact] - public override Task CanHaveMultipleQueueInstancesAsync() - { - return base.CanHaveMultipleQueueInstancesAsync(); - } + [Fact] + public override Task CanHaveMultipleQueueInstancesAsync() + { + return base.CanHaveMultipleQueueInstancesAsync(); + } - [Fact] - public override Task CanRunWorkItemWithMetricsAsync() - { - return base.CanRunWorkItemWithMetricsAsync(); - } + [Fact] + public override Task CanRunWorkItemWithMetricsAsync() + { + return base.CanRunWorkItemWithMetricsAsync(); + } - [Fact(Skip = "Dequeue Time takes forever")] - public override Task CanRenewLockAsync() - { - return base.CanRenewLockAsync(); - } + [Fact(Skip = "Dequeue Time takes forever")] + public override Task CanRenewLockAsync() + { + return base.CanRenewLockAsync(); + } - [Fact] - public override Task CanAbandonQueueEntryOnceAsync() - { - return base.CanAbandonQueueEntryOnceAsync(); - } + [Fact] + public override Task CanAbandonQueueEntryOnceAsync() + { + return base.CanAbandonQueueEntryOnceAsync(); + } - [Fact] - public override Task CanCompleteQueueEntryOnceAsync() - { - return base.CanCompleteQueueEntryOnceAsync(); - } + [Fact] + public override Task CanCompleteQueueEntryOnceAsync() + { + return base.CanCompleteQueueEntryOnceAsync(); + } - [Fact(Skip = "Not using this test because you can set specific delay times for servicebus")] - public override Task CanDelayRetryAsync() - { - return base.CanDelayRetryAsync(); - } + [Fact(Skip = "Not using this test because you can set specific delay times for servicebus")] + public override Task CanDelayRetryAsync() + { + return base.CanDelayRetryAsync(); + } - [Fact] - public override Task VerifyRetryAttemptsAsync() - { - return base.VerifyRetryAttemptsAsync(); - } + [Fact] + public override Task VerifyRetryAttemptsAsync() + { + return base.VerifyRetryAttemptsAsync(); + } - [Fact] - public override Task VerifyDelayedRetryAttemptsAsync() - { - return base.VerifyDelayedRetryAttemptsAsync(); - } + [Fact] + public override Task VerifyDelayedRetryAttemptsAsync() + { + return base.VerifyDelayedRetryAttemptsAsync(); } }