diff --git a/.github/.gitversion.yml b/.github/.gitversion.yml index 094a12111..d5b19d6c3 100644 --- a/.github/.gitversion.yml +++ b/.github/.gitversion.yml @@ -29,4 +29,4 @@ branches: ignore: sha: [] merge-message-formats: {} -next-version: 0.3.15 +next-version: 0.3.16 diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 59167bb4f..9127e26aa 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -425,7 +425,7 @@ jobs: - name: Anchore container scan id: anchore-scan - uses: anchore/scan-action@v3.3.4 + uses: anchore/scan-action@v3.3.5 if: ${{ (matrix.os == 'ubuntu-latest') }} with: image: ${{ fromJSON(steps.meta.outputs.json).tags[0] }} diff --git a/src/Api/Storage/Payload.cs b/src/Api/Storage/Payload.cs index 14e9f58db..cc305a67d 100644 --- a/src/Api/Storage/Payload.cs +++ b/src/Api/Storage/Payload.cs @@ -76,6 +76,7 @@ public TimeSpan Elapsed public string? CalledAeTitle { get => Files.OfType().Select(p => p.CalledAeTitle).FirstOrDefault(); } public int FilesUploaded { get => Files.Count(p => p.IsUploaded); } + public int FilesFailedToUpload { get => Files.Count(p => p.IsUploadFailed); } public Payload(string key, string correlationId, uint timeout) diff --git a/src/Common/ExtensionMethods.cs b/src/Common/ExtensionMethods.cs index 997311217..664f9abeb 100644 --- a/src/Common/ExtensionMethods.cs +++ b/src/Common/ExtensionMethods.cs @@ -76,5 +76,15 @@ public static async Task Post(this ActionBlock actionBlock await Task.Delay(delay).ConfigureAwait(false); return actionBlock.Post(input); } + + /// + /// Checks if a given task is faulted or cancelled. + /// + /// The task object + /// True if canceled or faulted. False otherwise. + public static bool IsCanceledOrFaulted(this Task task) + { + return task.IsCanceled || task.IsFaulted; + } } } diff --git a/src/Configuration/DicomWebConfiguration.cs b/src/Configuration/DicomWebConfiguration.cs index 4e5eed446..62cdbcf34 100644 --- a/src/Configuration/DicomWebConfiguration.cs +++ b/src/Configuration/DicomWebConfiguration.cs @@ -39,7 +39,7 @@ public class DicomWebConfiguration /// Gets or sets the maximum number of simultaneous DICOMweb connections. /// [ConfigurationKeyName("maximumNumberOfConnections")] - public int MaximumNumberOfConnection { get; set; } = 2; + public ushort MaximumNumberOfConnection { get; set; } = 2; /// /// Gets or set the maximum allowed file size in bytes with default to 2GiB. diff --git a/src/Configuration/ScuConfiguration.cs b/src/Configuration/ScuConfiguration.cs index 39a9e5c94..a2ae5ab23 100644 --- a/src/Configuration/ScuConfiguration.cs +++ b/src/Configuration/ScuConfiguration.cs @@ -54,7 +54,7 @@ public class ScuConfiguration /// Gets or sets the maximum number of simultaneous DICOM associations for the SCU service. /// [ConfigurationKeyName("maximumNumberOfAssociations")] - public int MaximumNumberOfAssociations { get; set; } = 8; + public ushort MaximumNumberOfAssociations { get; set; } = 8; public ScuConfiguration() { diff --git a/src/InformaticsGateway/Common/PostPayloadException.cs b/src/InformaticsGateway/Common/PostPayloadException.cs new file mode 100644 index 000000000..34487130f --- /dev/null +++ b/src/InformaticsGateway/Common/PostPayloadException.cs @@ -0,0 +1,50 @@ +/* + * Copyright 2023 MONAI Consortium + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +using System; +using System.Runtime.Serialization; +using Monai.Deploy.InformaticsGateway.Api.Storage; + +namespace Monai.Deploy.InformaticsGateway.Common +{ + internal class PostPayloadException : Exception + { + public Payload.PayloadState TargetQueue { get; } + public Payload Payload { get; } + + public PostPayloadException() + { + } + + public PostPayloadException(Api.Storage.Payload.PayloadState targetState, Payload payload) + { + TargetQueue = targetState; + Payload = payload; + } + + public PostPayloadException(string message) : base(message) + { + } + + public PostPayloadException(string message, Exception innerException) : base(message, innerException) + { + } + + protected PostPayloadException(SerializationInfo info, StreamingContext context) : base(info, context) + { + } + } +} diff --git a/src/InformaticsGateway/Logging/Log.3000.PayloadAssembler.cs b/src/InformaticsGateway/Logging/Log.3000.PayloadAssembler.cs index 4a0aff069..6cc4ce556 100644 --- a/src/InformaticsGateway/Logging/Log.3000.PayloadAssembler.cs +++ b/src/InformaticsGateway/Logging/Log.3000.PayloadAssembler.cs @@ -1,5 +1,5 @@ /* - * Copyright 2022 MONAI Consortium + * Copyright 2022-2023 MONAI Consortium * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -33,8 +33,8 @@ public static partial class Log [LoggerMessage(EventId = 3004, Level = LogLevel.Trace, Message = "Number of incomplete payloads waiting for processing: {count}.")] public static partial void BucketsActive(this ILogger logger, int count); - [LoggerMessage(EventId = 3005, Level = LogLevel.Trace, Message = "Checking elapsed time for bucket: {key} with timeout set to {timeout}s. Elapsed {elapsed}s with {failedFiles} failures out of {totalNumberOfFiles}.")] - public static partial void BucketElapsedTime(this ILogger logger, string key, uint timeout, double elapsed, int totalNumberOfFiles, int failedFiles); + [LoggerMessage(EventId = 3005, Level = LogLevel.Trace, Message = "Checking elapsed time for bucket: {key} with timeout set to {timeout}s. Elapsed {elapsed}s with {succeededFiles} uplaoded and {failedFiles} failures out of {totalNumberOfFiles}.")] + public static partial void BucketElapsedTime(this ILogger logger, string key, uint timeout, double elapsed, int totalNumberOfFiles, int succeededFiles, int failedFiles); [LoggerMessage(EventId = 3007, Level = LogLevel.Information, Message = "Bucket {key} sent to processing queue with {count} files.")] public static partial void BucketReady(this ILogger logger, string key, int count); diff --git a/src/InformaticsGateway/Logging/Log.4000.ObjectUploadService.cs b/src/InformaticsGateway/Logging/Log.4000.ObjectUploadService.cs index 0938d5f30..e6e9957c4 100644 --- a/src/InformaticsGateway/Logging/Log.4000.ObjectUploadService.cs +++ b/src/InformaticsGateway/Logging/Log.4000.ObjectUploadService.cs @@ -54,7 +54,7 @@ public static partial class Log [LoggerMessage(EventId = 4010, Level = LogLevel.Debug, Message = "File {path} exists={exists}.")] public static partial void VerifyFileExists(this ILogger logger, string path, bool exists); - [LoggerMessage(EventId = 4011, Level = LogLevel.Information, Message = "Initializing Object Uploader service with {threads} workers.")] + [LoggerMessage(EventId = 4011, Level = LogLevel.Information, Message = "Initializing Object Uploser service with {threads} workers.")] public static partial void InitializeThreads(this ILogger logger, int threads); } } diff --git a/src/InformaticsGateway/Logging/Log.500.ExportService.cs b/src/InformaticsGateway/Logging/Log.500.ExportService.cs index 94f3c6808..71804d54f 100644 --- a/src/InformaticsGateway/Logging/Log.500.ExportService.cs +++ b/src/InformaticsGateway/Logging/Log.500.ExportService.cs @@ -123,5 +123,11 @@ public static partial class Log [LoggerMessage(EventId = 533, Level = LogLevel.Error, Message = "Recovering messaging service connection due to {reason}.")] public static partial void MessagingServiceErrorRecover(this ILogger logger, string reason); + + [LoggerMessage(EventId = 534, Level = LogLevel.Error, Message = "Error posting export job for processing correlation ID {correlationId}, export task ID {exportTaskId}.")] + public static partial void ErrorPostingExportJobToQueue(this ILogger logger, string correlationId, string exportTaskId); + + [LoggerMessage(EventId = 535, Level = LogLevel.Warning, Message = "Exceeded maximum number of worker in {serviceName}: {count}.")] + public static partial void ExceededMaxmimumNumberOfWorkers(this ILogger logger, string serviceName, ulong count); } } diff --git a/src/InformaticsGateway/Logging/Log.700.PayloadService.cs b/src/InformaticsGateway/Logging/Log.700.PayloadService.cs index a44b2e4a2..992f98815 100644 --- a/src/InformaticsGateway/Logging/Log.700.PayloadService.cs +++ b/src/InformaticsGateway/Logging/Log.700.PayloadService.cs @@ -136,5 +136,17 @@ public static partial class Log [LoggerMessage(EventId = 743, Level = LogLevel.Error, Message = "Exception moving payload.")] public static partial void PayloadMoveException(this ILogger logger, Exception ex); + + [LoggerMessage(EventId = 744, Level = LogLevel.Warning, Message = "PayloadNotification move payload queue: faulted: {isFauled}, cancelled: {isCancelled}.")] + public static partial void MoveQueueFaulted(this ILogger logger, bool isFauled, bool isCancelled); + + [LoggerMessage(EventId = 745, Level = LogLevel.Warning, Message = "PayloadNotification publishing payload queue: faulted: {isFauled}, cancelled: {isCancelled}.")] + public static partial void PublishQueueFaulted(this ILogger logger, bool isFauled, bool isCancelled); + + [LoggerMessage(EventId = 746, Level = LogLevel.Error, Message = "Error posting payload to move queue.")] + public static partial void ErrorPostingJobToMovePayloadsQueue(this ILogger logger); + + [LoggerMessage(EventId = 747, Level = LogLevel.Error, Message = "Error posting payload to publish queue.")] + public static partial void ErrorPostingJobToPublishPayloadsQueue(this ILogger logger); } } diff --git a/src/InformaticsGateway/Services/Connectors/PayloadAssembler.cs b/src/InformaticsGateway/Services/Connectors/PayloadAssembler.cs index e3d0b0153..265e4dbd0 100644 --- a/src/InformaticsGateway/Services/Connectors/PayloadAssembler.cs +++ b/src/InformaticsGateway/Services/Connectors/PayloadAssembler.cs @@ -135,7 +135,7 @@ private async void OnTimedEvent(Object source, System.Timers.ElapsedEventArgs e) var payload = await _payloads[key].Task.ConfigureAwait(false); using var loggerScope = _logger.BeginScope(new LoggingDataDictionary { { "CorrelationId", payload.CorrelationId } }); - _logger.BucketElapsedTime(key, payload.Timeout, payload.ElapsedTime().TotalSeconds, payload.Files.Count, payload.FilesFailedToUpload); + _logger.BucketElapsedTime(key, payload.Timeout, payload.ElapsedTime().TotalSeconds, payload.Files.Count, payload.FilesUploaded, payload.FilesFailedToUpload); // Wait for timer window closes before sending payload for processing if (payload.HasTimedOut) { diff --git a/src/InformaticsGateway/Services/Connectors/PayloadMoveActionHandler.cs b/src/InformaticsGateway/Services/Connectors/PayloadMoveActionHandler.cs index 6d72334b2..bfc65d558 100644 --- a/src/InformaticsGateway/Services/Connectors/PayloadMoveActionHandler.cs +++ b/src/InformaticsGateway/Services/Connectors/PayloadMoveActionHandler.cs @@ -86,7 +86,10 @@ public async Task MoveFilesAsync(Payload payload, ActionBlock moveQueue var action = await UpdatePayloadState(payload, ex, cancellationToken).ConfigureAwait(false); if (action == PayloadAction.Updated) { - await moveQueue.Post(payload, _options.Value.Storage.Retries.RetryDelays.ElementAt(payload.RetryCount - 1)).ConfigureAwait(false); + if (!await moveQueue.Post(payload, _options.Value.Storage.Retries.RetryDelays.ElementAt(payload.RetryCount - 1)).ConfigureAwait(false)) + { + throw new PostPayloadException(Payload.PayloadState.Move, payload); + } } } finally @@ -111,7 +114,11 @@ private async Task NotifyIfCompleted(Payload payload, ActionBlock notif await repository.UpdateAsync(payload, cancellationToken).ConfigureAwait(false); _logger.PayloadSaved(payload.PayloadId); - notificationQueue.Post(payload); + if (!notificationQueue.Post(payload)) + { + throw new PostPayloadException(Payload.PayloadState.Notify, payload); + } + _logger.PayloadReadyToBePublished(payload.PayloadId); } else // we should never hit this else block. @@ -185,7 +192,7 @@ await _storageService.CopyObjectAsync( await VerifyFileExists(payloadId, file, cancellationToken).ConfigureAwait(false); } - catch (StorageServiceException ex) when (ex.Message.Contains("Not found", StringComparison.OrdinalIgnoreCase)) // TODO: StorageLib shall not throw any errors from MINIO + catch (StorageObjectNotFoundException ex) when (ex.Message.Contains("Not found", StringComparison.OrdinalIgnoreCase)) // TODO: StorageLib shall not throw any errors from MINIO { // when file cannot be found on the Storage Service, we assume file has been moved previously by verifying the file exists on destination. _logger.FileMissingInPayload(payloadId, file.GetTempStoragPath(_options.Value.Storage.RemoteTemporaryStoragePath), ex); diff --git a/src/InformaticsGateway/Services/Connectors/PayloadNotificationActionHandler.cs b/src/InformaticsGateway/Services/Connectors/PayloadNotificationActionHandler.cs index f50d5dc19..cc685665f 100644 --- a/src/InformaticsGateway/Services/Connectors/PayloadNotificationActionHandler.cs +++ b/src/InformaticsGateway/Services/Connectors/PayloadNotificationActionHandler.cs @@ -80,8 +80,11 @@ public async Task NotifyAsync(Payload payload, ActionBlock notification var action = await UpdatePayloadState(payload, cancellationToken).ConfigureAwait(false); if (action == PayloadAction.Updated) { - await notificationQueue.Post(payload, _options.Value.Messaging.Retries.RetryDelays.ElementAt(payload.RetryCount - 1)).ConfigureAwait(false); _logger.FailedToPublishWorkflowRequest(payload.PayloadId, ex); + if (!await notificationQueue.Post(payload, _options.Value.Messaging.Retries.RetryDelays.ElementAt(payload.RetryCount - 1)).ConfigureAwait(false)) + { + throw new PostPayloadException(Payload.PayloadState.Notify, payload); + } } } } diff --git a/src/InformaticsGateway/Services/Connectors/PayloadNotificationService.cs b/src/InformaticsGateway/Services/Connectors/PayloadNotificationService.cs index ffedb0784..1b429f9be 100644 --- a/src/InformaticsGateway/Services/Connectors/PayloadNotificationService.cs +++ b/src/InformaticsGateway/Services/Connectors/PayloadNotificationService.cs @@ -88,16 +88,38 @@ public PayloadNotificationService(IServiceScopeFactory serviceScopeFactory, _cancellationTokenSource = new CancellationTokenSource(); } - public async Task StartAsync(CancellationToken cancellationToken) + public Task StartAsync(CancellationToken cancellationToken) { - _moveFileQueue = new ActionBlock( - MoveActionHandler, - new ExecutionDataflowBlockOptions - { - MaxDegreeOfParallelism = _options.Value.Storage.PayloadProcessThreads, - MaxMessagesPerTask = 1, - CancellationToken = cancellationToken - }); + SetupQueues(cancellationToken); + + var task = Task.Run(async () => + { + await RestoreFromDatabaseAsync(cancellationToken).ConfigureAwait(false); + BackgroundProcessing(cancellationToken); + }, CancellationToken.None); + + Status = ServiceStatus.Running; + _logger.ServiceStarted(ServiceName); + + if (task.IsCompleted) + return task; + + return Task.CompletedTask; + } + + private void SetupQueues(CancellationToken cancellationToken) + { + ResetMoveQueue(cancellationToken); + ResetPublishQueue(cancellationToken); + } + + private void ResetPublishQueue(CancellationToken cancellationToken) + { + if (_publishQueue is not null) + { + _logger.PublishQueueFaulted(_publishQueue.Completion.IsFaulted, _publishQueue.Completion.IsCanceled); + _publishQueue.Complete(); + } _publishQueue = new ActionBlock( NotificationHandler, @@ -107,21 +129,24 @@ public async Task StartAsync(CancellationToken cancellationToken) MaxMessagesPerTask = 1, CancellationToken = cancellationToken }); + } - await RestoreFromDatabaseAsync(cancellationToken).ConfigureAwait(false); - - var task = Task.Run(() => + private void ResetMoveQueue(CancellationToken cancellationToken) + { + if (_moveFileQueue is not null) { - BackgroundProcessing(cancellationToken); - }, CancellationToken.None); - - Status = ServiceStatus.Running; - _logger.ServiceStarted(ServiceName); - - if (task.IsCompleted) - await task.ConfigureAwait(false); + _logger.MoveQueueFaulted(_moveFileQueue.Completion.IsFaulted, _moveFileQueue.Completion.IsCanceled); + _moveFileQueue.Complete(); + } - await Task.CompletedTask.ConfigureAwait(false); + _moveFileQueue = new ActionBlock( + MoveActionHandler, + new ExecutionDataflowBlockOptions + { + MaxDegreeOfParallelism = _options.Value.Storage.PayloadProcessThreads, + MaxMessagesPerTask = 1, + CancellationToken = cancellationToken + }); } private async Task NotificationHandler(Payload payload) @@ -134,6 +159,10 @@ private async Task NotificationHandler(Payload payload) { await _payloadNotificationActionHandler.NotifyAsync(payload, _publishQueue, _cancellationTokenSource.Token).ConfigureAwait(false); } + catch (PostPayloadException ex) + { + HandlePostPayloadException(ex); + } catch (Exception ex) { if (ex is PayloadNotifyException payloadMoveException && @@ -158,6 +187,10 @@ private async Task MoveActionHandler(Payload payload) { await _payloadMoveActionHandler.MoveFilesAsync(payload, _moveFileQueue, _publishQueue, _cancellationTokenSource.Token).ConfigureAwait(false); } + catch (PostPayloadException ex) + { + HandlePostPayloadException(ex); + } catch (Exception ex) { if (ex is PayloadNotifyException payloadMoveException && @@ -172,17 +205,45 @@ private async Task MoveActionHandler(Payload payload) } } + private void HandlePostPayloadException(PostPayloadException ex) + { + Guard.Against.Null(ex); + + if (ex.TargetQueue == Payload.PayloadState.Move) + { + ResetIfFaultedOrCancelled(_moveFileQueue, ResetMoveQueue, CancellationToken.None); + if (!_moveFileQueue.Post(ex.Payload)) + { + _logger.ErrorPostingJobToMovePayloadsQueue(); + } + } + else if (ex.TargetQueue == Payload.PayloadState.Notify) + { + ResetIfFaultedOrCancelled(_publishQueue, ResetPublishQueue, CancellationToken.None); + if (!_publishQueue.Post(ex.Payload)) + { + _logger.ErrorPostingJobToPublishPayloadsQueue(); + } + } + } + private void BackgroundProcessing(CancellationToken cancellationToken) { _logger.ServiceRunning(ServiceName); while (!cancellationToken.IsCancellationRequested) { + ResetIfFaultedOrCancelled(_moveFileQueue, ResetMoveQueue, cancellationToken); + ResetIfFaultedOrCancelled(_publishQueue, ResetPublishQueue, cancellationToken); + Payload payload = null; try { payload = _payloadAssembler.Dequeue(cancellationToken); - _moveFileQueue.Post(payload); + while (!_moveFileQueue.Post(payload)) + { + ResetIfFaultedOrCancelled(_moveFileQueue, ResetMoveQueue, cancellationToken); + } _logger.PayloadQueuedForProcessing(payload.PayloadId, ServiceName); } catch (OperationCanceledException ex) @@ -202,6 +263,18 @@ private void BackgroundProcessing(CancellationToken cancellationToken) _logger.ServiceCancelled(ServiceName); } + private static void ResetIfFaultedOrCancelled(ActionBlock queue, Action resetFunction, CancellationToken cancellationToken) + { + Guard.Against.Null(queue); + Guard.Against.Null(resetFunction); + + if (queue.Completion.IsCanceledOrFaulted()) + { + resetFunction(cancellationToken); + } + } + + private async Task RestoreFromDatabaseAsync(CancellationToken cancellationToken) { _logger.StartupRestoreFromDatabase(); @@ -214,11 +287,17 @@ private async Task RestoreFromDatabaseAsync(CancellationToken cancellationToken) { if (payload.State == Payload.PayloadState.Move) { - _moveFileQueue.Post(payload); + if (!_moveFileQueue.Post(payload)) + { + _logger.ErrorPostingJobToMovePayloadsQueue(); + } } else if (payload.State == Payload.PayloadState.Notify) { - _publishQueue.Post(payload); + if (!_publishQueue.Post(payload)) + { + _logger.ErrorPostingJobToPublishPayloadsQueue(); + } } } _logger.RestoredFromDatabase(payloads.Count); diff --git a/src/InformaticsGateway/Services/Export/DicomWebExportService.cs b/src/InformaticsGateway/Services/Export/DicomWebExportService.cs index 8226b16a8..08139b9d6 100644 --- a/src/InformaticsGateway/Services/Export/DicomWebExportService.cs +++ b/src/InformaticsGateway/Services/Export/DicomWebExportService.cs @@ -49,7 +49,7 @@ internal class DicomWebExportService : ExportServiceBase private readonly IOptions _configuration; private readonly IDicomToolkit _dicomToolkit; - protected override int Concurrency { get; } + protected override ushort Concurrency { get; } public override string RoutingKey { get; } public override string ServiceName => "DICOMweb Export Service"; diff --git a/src/InformaticsGateway/Services/Export/ExportServiceBase.cs b/src/InformaticsGateway/Services/Export/ExportServiceBase.cs index a81bf9bbe..55f3ee3f6 100644 --- a/src/InformaticsGateway/Services/Export/ExportServiceBase.cs +++ b/src/InformaticsGateway/Services/Export/ExportServiceBase.cs @@ -58,9 +58,10 @@ public abstract class ExportServiceBase : IHostedService, IMonaiService, IDispos private readonly Dictionary _exportRequests; private readonly IStorageInfoProvider _storageInfoProvider; private bool _disposedValue; + private ulong _activeWorkers = 0; public abstract string RoutingKey { get; } - protected abstract int Concurrency { get; } + protected abstract ushort Concurrency { get; } public ServiceStatus Status { get; set; } = ServiceStatus.Unknown; public abstract string ServiceName { get; } @@ -122,11 +123,11 @@ public Task StopAsync(CancellationToken cancellationToken) private void SetupPolling() { - _messageSubscriber.Subscribe(RoutingKey, RoutingKey, OnMessageReceivedCallback); + _messageSubscriber.SubscribeAsync(RoutingKey, RoutingKey, OnMessageReceivedCallback, prefetchCount: Concurrency); _logger.ExportEventSubscription(ServiceName, RoutingKey); } - private void OnMessageReceivedCallback(MessageReceivedEventArgs eventArgs) + private async Task OnMessageReceivedCallback(MessageReceivedEventArgs eventArgs) { if (!_storageInfoProvider.HasSpaceAvailableForExport) { @@ -135,6 +136,14 @@ private void OnMessageReceivedCallback(MessageReceivedEventArgs eventArgs) return; } + if (Interlocked.Read(ref _activeWorkers) >= Concurrency) + { + _logger.ExceededMaxmimumNumberOfWorkers(ServiceName, _activeWorkers); + _messageSubscriber.Reject(eventArgs.Message); + return; + } + + Interlocked.Increment(ref _activeWorkers); try { var executionOptions = new ExecutionDataflowBlockOptions @@ -178,12 +187,19 @@ private void OnMessageReceivedCallback(MessageReceivedEventArgs eventArgs) var exportRequestWithDetails = new ExportRequestEventDetails(exportRequest); _exportRequests.Add(exportRequest.ExportTaskId, exportRequestWithDetails); - exportFlow.Post(exportRequestWithDetails); - _logger.ExportRequestQueuedForProcessing(exportRequest.CorrelationId, exportRequest.ExportTaskId); + if (!exportFlow.Post(exportRequestWithDetails)) + { + _logger.ErrorPostingExportJobToQueue(exportRequest.CorrelationId, exportRequest.ExportTaskId); + _messageSubscriber.Reject(eventArgs.Message); + } + else + { + _logger.ExportRequestQueuedForProcessing(exportRequest.CorrelationId, exportRequest.ExportTaskId); + } } exportFlow.Complete(); - reportingActionBlock.Completion.Wait(_cancellationTokenSource.Token); + await reportingActionBlock.Completion.ConfigureAwait(false); } catch (AggregateException ex) { @@ -196,6 +212,10 @@ private void OnMessageReceivedCallback(MessageReceivedEventArgs eventArgs) { _logger.ErrorProcessingExportTask(ex); } + finally + { + Interlocked.Decrement(ref _activeWorkers); + } } // TPL doesn't yet support IAsyncEnumerable @@ -229,7 +249,7 @@ private IEnumerable DownloadPayloadActionCallback(Expo _logger.FileReadyForExport(file); }); - task.Wait(); + task.Wait(cancellationToken); } catch (Exception ex) { diff --git a/src/InformaticsGateway/Services/Export/ScuExportService.cs b/src/InformaticsGateway/Services/Export/ScuExportService.cs index e1bdd4ac7..231687e49 100644 --- a/src/InformaticsGateway/Services/Export/ScuExportService.cs +++ b/src/InformaticsGateway/Services/Export/ScuExportService.cs @@ -43,7 +43,7 @@ internal class ScuExportService : ExportServiceBase private readonly IOptions _configuration; private readonly IDicomToolkit _dicomToolkit; - protected override int Concurrency { get; } + protected override ushort Concurrency { get; } public override string RoutingKey { get; } public override string ServiceName => "DICOM Export Service"; @@ -69,7 +69,7 @@ protected override async Task ExportDataBlockCallback( foreach (var destinationName in exportRequestData.Destinations) { - await HandleDesination(exportRequestData, destinationName, cancellationToken); + await HandleDesination(exportRequestData, destinationName, cancellationToken).ConfigureAwait(false); } return exportRequestData; @@ -118,7 +118,7 @@ await Policy client.ServiceOptions.LogDimseDatasets = _configuration.Value.Dicom.Scu.LogDimseDatasets; client.NegotiateAsyncOps(); - if (await GenerateRequestsAsync(exportRequestData, client, manualResetEvent)) + if (await GenerateRequestsAsync(exportRequestData, client, manualResetEvent).ConfigureAwait(false)) { _logger.DimseExporting(destination.AeTitle, destination.HostIp, destination.Port); await client.SendAsync(cancellationToken).ConfigureAwait(false); diff --git a/src/InformaticsGateway/Test/Services/Export/DicomWebExportServiceTest.cs b/src/InformaticsGateway/Test/Services/Export/DicomWebExportServiceTest.cs index 75708dde6..36b2a83f6 100644 --- a/src/InformaticsGateway/Test/Services/Export/DicomWebExportServiceTest.cs +++ b/src/InformaticsGateway/Test/Services/Export/DicomWebExportServiceTest.cs @@ -127,13 +127,13 @@ public async Task ExportDataBlockCallback_ReturnsNullIfInferenceRequestCannotBeF _messageSubscriberService.Setup(p => p.Acknowledge(It.IsAny())); _messageSubscriberService.Setup(p => p.RequeueWithDelay(It.IsAny())); _messageSubscriberService.Setup( - p => p.Subscribe(It.IsAny(), + p => p.SubscribeAsync(It.IsAny(), It.IsAny(), - It.IsAny>(), + It.IsAny>(), It.IsAny())) - .Callback, ushort>((topic, queue, messageReceivedCallback, prefetchCount) => + .Callback, ushort>(async (topic, queue, messageReceivedCallback, prefetchCount) => { - messageReceivedCallback(CreateMessageReceivedEventArgs(transactionId)); + await messageReceivedCallback(CreateMessageReceivedEventArgs(transactionId)); }); _storageService.Setup(p => p.GetObjectAsync(It.IsAny(), It.IsAny(), It.IsAny())) @@ -164,9 +164,9 @@ public async Task ExportDataBlockCallback_ReturnsNullIfInferenceRequestCannotBeF It.Is(match => CheckMessage(match, ExportStatus.Failure, FileExportStatus.ConfigurationError))), Times.Once()); _messageSubscriberService.Verify(p => p.Acknowledge(It.IsAny()), Times.Once()); _messageSubscriberService.Verify(p => p.RequeueWithDelay(It.IsAny()), Times.Never()); - _messageSubscriberService.Verify(p => p.Subscribe(It.IsAny(), + _messageSubscriberService.Verify(p => p.SubscribeAsync(It.IsAny(), It.IsAny(), - It.IsAny>(), + It.IsAny>(), It.IsAny()), Times.Once()); _logger.VerifyLogging($"The specified inference request '{transactionId}' cannot be found and will not be exported.", LogLevel.Error, Times.Once()); @@ -182,13 +182,13 @@ public async Task ExportDataBlockCallback_ReturnsNullIfInferenceRequestContainsN _messageSubscriberService.Setup(p => p.Acknowledge(It.IsAny())); _messageSubscriberService.Setup(p => p.RequeueWithDelay(It.IsAny())); _messageSubscriberService.Setup( - p => p.Subscribe(It.IsAny(), + p => p.SubscribeAsync(It.IsAny(), It.IsAny(), - It.IsAny>(), + It.IsAny>(), It.IsAny())) - .Callback, ushort>((topic, queue, messageReceivedCallback, prefetchCount) => + .Callback, ushort>(async (topic, queue, messageReceivedCallback, prefetchCount) => { - messageReceivedCallback(CreateMessageReceivedEventArgs(transactionId)); + await messageReceivedCallback(CreateMessageReceivedEventArgs(transactionId)); }); _storageService.Setup(p => p.GetObjectAsync(It.IsAny(), It.IsAny(), It.IsAny())) @@ -219,9 +219,9 @@ public async Task ExportDataBlockCallback_ReturnsNullIfInferenceRequestContainsN It.Is(match => CheckMessage(match, ExportStatus.Failure, FileExportStatus.ConfigurationError))), Times.Once()); _messageSubscriberService.Verify(p => p.Acknowledge(It.IsAny()), Times.Once()); _messageSubscriberService.Verify(p => p.RequeueWithDelay(It.IsAny()), Times.Never()); - _messageSubscriberService.Verify(p => p.Subscribe(It.IsAny(), + _messageSubscriberService.Verify(p => p.SubscribeAsync(It.IsAny(), It.IsAny(), - It.IsAny>(), + It.IsAny>(), It.IsAny()), Times.Once()); _logger.VerifyLogging($"The inference request contains no `outputResources` nor any DICOMweb export destinations.", LogLevel.Error, Times.Once()); @@ -248,13 +248,13 @@ public async Task ExportDataBlockCallback_RecordsStowFailuresAndReportFailure() _messageSubscriberService.Setup(p => p.Acknowledge(It.IsAny())); _messageSubscriberService.Setup(p => p.RequeueWithDelay(It.IsAny())); _messageSubscriberService.Setup( - p => p.Subscribe(It.IsAny(), + p => p.SubscribeAsync(It.IsAny(), It.IsAny(), - It.IsAny>(), + It.IsAny>(), It.IsAny())) - .Callback, ushort>((topic, queue, messageReceivedCallback, prefetchCount) => + .Callback, ushort>(async (topic, queue, messageReceivedCallback, prefetchCount) => { - messageReceivedCallback(CreateMessageReceivedEventArgs(transactionId)); + await messageReceivedCallback(CreateMessageReceivedEventArgs(transactionId)); }); _storageService.Setup(p => p.GetObjectAsync(It.IsAny(), It.IsAny(), It.IsAny())) @@ -290,7 +290,7 @@ public async Task ExportDataBlockCallback_RecordsStowFailuresAndReportFailure() }; await service.StartAsync(_cancellationTokenSource.Token); - Assert.True(dataflowCompleted.WaitOne(3000)); + Assert.True(dataflowCompleted.WaitOne(5000)); await StopAndVerify(service); _messagePublisherService.Verify( @@ -298,9 +298,9 @@ public async Task ExportDataBlockCallback_RecordsStowFailuresAndReportFailure() It.Is(match => CheckMessage(match, ExportStatus.Failure, FileExportStatus.ServiceError))), Times.Once()); _messageSubscriberService.Verify(p => p.Acknowledge(It.IsAny()), Times.Once()); _messageSubscriberService.Verify(p => p.RequeueWithDelay(It.IsAny()), Times.Never()); - _messageSubscriberService.Verify(p => p.Subscribe(It.IsAny(), + _messageSubscriberService.Verify(p => p.SubscribeAsync(It.IsAny(), It.IsAny(), - It.IsAny>(), + It.IsAny>(), It.IsAny()), Times.Once()); _logger.VerifyLogging($"Exporting data to {inferenceRequest.OutputResources.First().ConnectionDetails.Uri}.", LogLevel.Debug, Times.Once()); @@ -331,13 +331,13 @@ public async Task CompletesDataflow(HttpStatusCode httpStatusCode) _messageSubscriberService.Setup(p => p.Acknowledge(It.IsAny())); _messageSubscriberService.Setup(p => p.RequeueWithDelay(It.IsAny())); _messageSubscriberService.Setup( - p => p.Subscribe(It.IsAny(), + p => p.SubscribeAsync(It.IsAny(), It.IsAny(), - It.IsAny>(), + It.IsAny>(), It.IsAny())) - .Callback, ushort>((topic, queue, messageReceivedCallback, prefetchCount) => + .Callback, ushort>(async (topic, queue, messageReceivedCallback, prefetchCount) => { - messageReceivedCallback(CreateMessageReceivedEventArgs(transactionId)); + await messageReceivedCallback(CreateMessageReceivedEventArgs(transactionId)); }); _storageService.Setup(p => p.GetObjectAsync(It.IsAny(), It.IsAny(), It.IsAny())) @@ -378,7 +378,7 @@ public async Task CompletesDataflow(HttpStatusCode httpStatusCode) }; await service.StartAsync(_cancellationTokenSource.Token); - Assert.True(dataflowCompleted.WaitOne(3000)); + Assert.True(dataflowCompleted.WaitOne(5000)); await StopAndVerify(service); _messagePublisherService.Verify( @@ -386,9 +386,9 @@ public async Task CompletesDataflow(HttpStatusCode httpStatusCode) It.Is(match => CheckMessage(match, (httpStatusCode == HttpStatusCode.OK ? ExportStatus.Success : ExportStatus.Failure), (httpStatusCode == HttpStatusCode.OK ? FileExportStatus.Success : FileExportStatus.ServiceError)))), Times.Once()); _messageSubscriberService.Verify(p => p.Acknowledge(It.IsAny()), Times.Once()); _messageSubscriberService.Verify(p => p.RequeueWithDelay(It.IsAny()), Times.Never()); - _messageSubscriberService.Verify(p => p.Subscribe(It.IsAny(), + _messageSubscriberService.Verify(p => p.SubscribeAsync(It.IsAny(), It.IsAny(), - It.IsAny>(), + It.IsAny>(), It.IsAny()), Times.Once()); _logger.VerifyLogging($"Exporting data to {inferenceRequest.OutputResources.First().ConnectionDetails.Uri}.", LogLevel.Debug, Times.AtLeastOnce()); diff --git a/src/InformaticsGateway/Test/Services/Export/ExportServiceBaseTest.cs b/src/InformaticsGateway/Test/Services/Export/ExportServiceBaseTest.cs index 799e08122..4328fdab9 100644 --- a/src/InformaticsGateway/Test/Services/Export/ExportServiceBaseTest.cs +++ b/src/InformaticsGateway/Test/Services/Export/ExportServiceBaseTest.cs @@ -44,7 +44,7 @@ public class TestExportService : ExportServiceBase public event EventHandler ExportDataBlockCalled; public bool ExportShallFail = false; - protected override int Concurrency => 1; + protected override ushort Concurrency => 1; public override string RoutingKey => AgentName; public override string ServiceName { get => "Test Export Service"; } @@ -132,11 +132,11 @@ public async Task DataflowTest_RejectOnInsufficientStorageSpace() _messageSubscriberService.Setup(p => p.Reject(It.IsAny(), It.IsAny())); _messageSubscriberService.Setup( - p => p.Subscribe(It.IsAny(), + p => p.SubscribeAsync(It.IsAny(), It.IsAny(), - It.IsAny>(), + It.IsAny>(), It.IsAny())) - .Callback((string topic, string queue, Action messageReceivedCallback, ushort prefetchCount) => + .Callback((string topic, string queue, Func messageReceivedCallback, ushort prefetchCount) => { messageReceivedCallback(CreateMessageReceivedEventArgs()); }); @@ -146,9 +146,9 @@ public async Task DataflowTest_RejectOnInsufficientStorageSpace() await StopAndVerify(service); _messageSubscriberService.Verify(p => p.Reject(It.IsAny(), It.IsAny()), Times.Once()); - _messageSubscriberService.Verify(p => p.Subscribe(It.IsAny(), + _messageSubscriberService.Verify(p => p.SubscribeAsync(It.IsAny(), It.IsAny(), - It.IsAny>(), + It.IsAny>(), It.IsAny()), Times.Once()); } @@ -161,13 +161,13 @@ public async Task DataflowTest_PayloadDownlaodFailure() _messageSubscriberService.Setup(p => p.Acknowledge(It.IsAny())); _messageSubscriberService.Setup(p => p.RequeueWithDelay(It.IsAny())); _messageSubscriberService.Setup( - p => p.Subscribe(It.IsAny(), + p => p.SubscribeAsync(It.IsAny(), It.IsAny(), - It.IsAny>(), + It.IsAny>(), It.IsAny())) - .Callback, ushort>((topic, queue, messageReceivedCallback, prefetchCount) => + .Callback, ushort>(async (topic, queue, messageReceivedCallback, prefetchCount) => { - messageReceivedCallback(CreateMessageReceivedEventArgs()); + await messageReceivedCallback(CreateMessageReceivedEventArgs()); }); _storageService.Setup(p => p.GetObjectAsync(It.IsAny(), It.IsAny(), It.IsAny())) @@ -188,9 +188,9 @@ public async Task DataflowTest_PayloadDownlaodFailure() It.Is(match => (match.ConvertTo()).Status == ExportStatus.Failure)), Times.Once()); _messageSubscriberService.Verify(p => p.Acknowledge(It.IsAny()), Times.Once()); _messageSubscriberService.Verify(p => p.RequeueWithDelay(It.IsAny()), Times.Never()); - _messageSubscriberService.Verify(p => p.Subscribe(It.IsAny(), + _messageSubscriberService.Verify(p => p.SubscribeAsync(It.IsAny(), It.IsAny(), - It.IsAny>(), + It.IsAny>(), It.IsAny()), Times.Once()); } @@ -204,15 +204,15 @@ public async Task DataflowTest_EndToEnd_WithPartialFailure() _messageSubscriberService.Setup(p => p.Acknowledge(It.IsAny())); _messageSubscriberService.Setup(p => p.RequeueWithDelay(It.IsAny())); _messageSubscriberService.Setup( - p => p.Subscribe(It.IsAny(), + p => p.SubscribeAsync(It.IsAny(), It.IsAny(), - It.IsAny>(), + It.IsAny>(), It.IsAny())) - .Callback, ushort>((topic, queue, messageReceivedCallback, prefetchCount) => + .Callback, ushort>(async (topic, queue, messageReceivedCallback, prefetchCount) => { while (messageCount-- > 0) { - messageReceivedCallback(CreateMessageReceivedEventArgs()); + await messageReceivedCallback(CreateMessageReceivedEventArgs()); } }); @@ -242,9 +242,9 @@ public async Task DataflowTest_EndToEnd_WithPartialFailure() It.Is(match => (match.ConvertTo()).Status == ExportStatus.PartialFailure)), Times.Exactly(5)); _messageSubscriberService.Verify(p => p.Acknowledge(It.IsAny()), Times.Exactly(5)); _messageSubscriberService.Verify(p => p.RequeueWithDelay(It.IsAny()), Times.Never()); - _messageSubscriberService.Verify(p => p.Subscribe(It.IsAny(), + _messageSubscriberService.Verify(p => p.SubscribeAsync(It.IsAny(), It.IsAny(), - It.IsAny>(), + It.IsAny>(), It.IsAny()), Times.Once()); } @@ -258,15 +258,15 @@ public async Task DataflowTest_EndToEnd() _messageSubscriberService.Setup(p => p.Acknowledge(It.IsAny())); _messageSubscriberService.Setup(p => p.RequeueWithDelay(It.IsAny())); _messageSubscriberService.Setup( - p => p.Subscribe(It.IsAny(), + p => p.SubscribeAsync(It.IsAny(), It.IsAny(), - It.IsAny>(), + It.IsAny>(), It.IsAny())) - .Callback, ushort>((topic, queue, messageReceivedCallback, prefetchCount) => + .Callback, ushort>(async (topic, queue, messageReceivedCallback, prefetchCount) => { while (messageCount-- > 0) { - messageReceivedCallback(CreateMessageReceivedEventArgs()); + await messageReceivedCallback(CreateMessageReceivedEventArgs()); } }); @@ -294,9 +294,9 @@ public async Task DataflowTest_EndToEnd() It.Is(match => (match.ConvertTo()).Status == ExportStatus.Success)), Times.Exactly(5)); _messageSubscriberService.Verify(p => p.Acknowledge(It.IsAny()), Times.Exactly(5)); _messageSubscriberService.Verify(p => p.RequeueWithDelay(It.IsAny()), Times.Never()); - _messageSubscriberService.Verify(p => p.Subscribe(It.IsAny(), + _messageSubscriberService.Verify(p => p.SubscribeAsync(It.IsAny(), It.IsAny(), - It.IsAny>(), + It.IsAny>(), It.IsAny()), Times.Once()); } diff --git a/src/InformaticsGateway/Test/Services/Export/ScuExportServiceTest.cs b/src/InformaticsGateway/Test/Services/Export/ScuExportServiceTest.cs index 79f8cb10b..920d3253d 100644 --- a/src/InformaticsGateway/Test/Services/Export/ScuExportServiceTest.cs +++ b/src/InformaticsGateway/Test/Services/Export/ScuExportServiceTest.cs @@ -122,13 +122,13 @@ public async Task ShallFailWhenNoDestinationIsDefined() _messageSubscriberService.Setup(p => p.Acknowledge(It.IsAny())); _messageSubscriberService.Setup(p => p.RequeueWithDelay(It.IsAny())); _messageSubscriberService.Setup( - p => p.Subscribe(It.IsAny(), + p => p.SubscribeAsync(It.IsAny(), It.IsAny(), - It.IsAny>(), + It.IsAny>(), It.IsAny())) - .Callback, ushort>((topic, queue, messageReceivedCallback, prefetchCount) => + .Callback, ushort>(async (topic, queue, messageReceivedCallback, prefetchCount) => { - messageReceivedCallback(CreateMessageReceivedEventArgs(string.Empty)); + await messageReceivedCallback(CreateMessageReceivedEventArgs(string.Empty)); }); _storageService.Setup(p => p.GetObjectAsync(It.IsAny(), It.IsAny(), It.IsAny())) @@ -151,9 +151,9 @@ public async Task ShallFailWhenNoDestinationIsDefined() It.Is(match => CheckMessage(match, ExportStatus.Failure, FileExportStatus.ConfigurationError))), Times.Once()); _messageSubscriberService.Verify(p => p.Acknowledge(It.IsAny()), Times.Once()); _messageSubscriberService.Verify(p => p.RequeueWithDelay(It.IsAny()), Times.Never()); - _messageSubscriberService.Verify(p => p.Subscribe(It.IsAny(), + _messageSubscriberService.Verify(p => p.SubscribeAsync(It.IsAny(), It.IsAny(), - It.IsAny>(), + It.IsAny>(), It.IsAny()), Times.Once()); _logger.VerifyLogging("SCU Export configuration error: Export task does not have destination set.", LogLevel.Error, Times.Once()); } @@ -165,13 +165,13 @@ public async Task ShallFailWhenDestinationIsNotConfigured() _messageSubscriberService.Setup(p => p.Acknowledge(It.IsAny())); _messageSubscriberService.Setup(p => p.RequeueWithDelay(It.IsAny())); _messageSubscriberService.Setup( - p => p.Subscribe(It.IsAny(), + p => p.SubscribeAsync(It.IsAny(), It.IsAny(), - It.IsAny>(), + It.IsAny>(), It.IsAny())) - .Callback, ushort>((topic, queue, messageReceivedCallback, prefetchCount) => + .Callback, ushort>(async (topic, queue, messageReceivedCallback, prefetchCount) => { - messageReceivedCallback(CreateMessageReceivedEventArgs("pacs")); + await messageReceivedCallback(CreateMessageReceivedEventArgs("pacs")); }); _storageService.Setup(p => p.GetObjectAsync(It.IsAny(), It.IsAny(), It.IsAny())) @@ -196,9 +196,9 @@ public async Task ShallFailWhenDestinationIsNotConfigured() It.Is(match => CheckMessage(match, ExportStatus.Failure, FileExportStatus.ConfigurationError))), Times.Once()); _messageSubscriberService.Verify(p => p.Acknowledge(It.IsAny()), Times.Once()); _messageSubscriberService.Verify(p => p.RequeueWithDelay(It.IsAny()), Times.Never()); - _messageSubscriberService.Verify(p => p.Subscribe(It.IsAny(), + _messageSubscriberService.Verify(p => p.SubscribeAsync(It.IsAny(), It.IsAny(), - It.IsAny>(), + It.IsAny>(), It.IsAny()), Times.Once()); _logger.VerifyLogging($"SCU Export configuration error: Specified destination 'pacs' does not exist.", LogLevel.Error, Times.Once()); @@ -214,13 +214,13 @@ public async Task AssociationRejected() _messageSubscriberService.Setup(p => p.Acknowledge(It.IsAny())); _messageSubscriberService.Setup(p => p.RequeueWithDelay(It.IsAny())); _messageSubscriberService.Setup( - p => p.Subscribe(It.IsAny(), + p => p.SubscribeAsync(It.IsAny(), It.IsAny(), - It.IsAny>(), + It.IsAny>(), It.IsAny())) - .Callback, ushort>((topic, queue, messageReceivedCallback, prefetchCount) => + .Callback, ushort>(async (topic, queue, messageReceivedCallback, prefetchCount) => { - messageReceivedCallback(CreateMessageReceivedEventArgs("pacs")); + await messageReceivedCallback(CreateMessageReceivedEventArgs("pacs")); }); _storageService.Setup(p => p.GetObjectAsync(It.IsAny(), It.IsAny(), It.IsAny())) @@ -246,9 +246,9 @@ public async Task AssociationRejected() It.Is(match => CheckMessage(match, ExportStatus.Failure, FileExportStatus.ServiceError))), Times.Once()); _messageSubscriberService.Verify(p => p.Acknowledge(It.IsAny()), Times.Once()); _messageSubscriberService.Verify(p => p.RequeueWithDelay(It.IsAny()), Times.Never()); - _messageSubscriberService.Verify(p => p.Subscribe(It.IsAny(), + _messageSubscriberService.Verify(p => p.SubscribeAsync(It.IsAny(), It.IsAny(), - It.IsAny>(), + It.IsAny>(), It.IsAny()), Times.Once()); _logger.VerifyLogging($"Association rejected.", LogLevel.Warning, Times.AtLeastOnce()); @@ -267,13 +267,13 @@ public async Task SimulateAbort() _messageSubscriberService.Setup(p => p.Acknowledge(It.IsAny())); _messageSubscriberService.Setup(p => p.RequeueWithDelay(It.IsAny())); _messageSubscriberService.Setup( - p => p.Subscribe(It.IsAny(), + p => p.SubscribeAsync(It.IsAny(), It.IsAny(), - It.IsAny>(), + It.IsAny>(), It.IsAny())) - .Callback, ushort>((topic, queue, messageReceivedCallback, prefetchCount) => + .Callback, ushort>(async (topic, queue, messageReceivedCallback, prefetchCount) => { - messageReceivedCallback(CreateMessageReceivedEventArgs("pacs")); + await messageReceivedCallback(CreateMessageReceivedEventArgs("pacs")); }); _storageService.Setup(p => p.GetObjectAsync(It.IsAny(), It.IsAny(), It.IsAny())) @@ -298,9 +298,9 @@ public async Task SimulateAbort() It.Is(match => CheckMessage(match, ExportStatus.Failure, FileExportStatus.ServiceError))), Times.Once()); _messageSubscriberService.Verify(p => p.Acknowledge(It.IsAny()), Times.Once()); _messageSubscriberService.Verify(p => p.RequeueWithDelay(It.IsAny()), Times.Never()); - _messageSubscriberService.Verify(p => p.Subscribe(It.IsAny(), + _messageSubscriberService.Verify(p => p.SubscribeAsync(It.IsAny(), It.IsAny(), - It.IsAny>(), + It.IsAny>(), It.IsAny()), Times.Once()); _logger.VerifyLoggingMessageBeginsWith($"Association aborted with reason", LogLevel.Error, Times.Once()); @@ -318,13 +318,13 @@ public async Task CStoreFailure() _messageSubscriberService.Setup(p => p.Acknowledge(It.IsAny())); _messageSubscriberService.Setup(p => p.RequeueWithDelay(It.IsAny())); _messageSubscriberService.Setup( - p => p.Subscribe(It.IsAny(), + p => p.SubscribeAsync(It.IsAny(), It.IsAny(), - It.IsAny>(), + It.IsAny>(), It.IsAny())) - .Callback, ushort>((topic, queue, messageReceivedCallback, prefetchCount) => + .Callback, ushort>(async (topic, queue, messageReceivedCallback, prefetchCount) => { - messageReceivedCallback(CreateMessageReceivedEventArgs("pacs")); + await messageReceivedCallback(CreateMessageReceivedEventArgs("pacs")); }); _storageService.Setup(p => p.GetObjectAsync(It.IsAny(), It.IsAny(), It.IsAny())) @@ -349,9 +349,9 @@ public async Task CStoreFailure() It.Is(match => CheckMessage(match, ExportStatus.Failure, FileExportStatus.ServiceError))), Times.Once()); _messageSubscriberService.Verify(p => p.Acknowledge(It.IsAny()), Times.Once()); _messageSubscriberService.Verify(p => p.RequeueWithDelay(It.IsAny()), Times.Never()); - _messageSubscriberService.Verify(p => p.Subscribe(It.IsAny(), + _messageSubscriberService.Verify(p => p.SubscribeAsync(It.IsAny(), It.IsAny(), - It.IsAny>(), + It.IsAny>(), It.IsAny()), Times.Once()); _logger.VerifyLogging("Association accepted.", LogLevel.Information, Times.Once()); _logger.VerifyLogging($"Failed to export with error {DicomStatus.ResourceLimitation}.", LogLevel.Error, Times.Once()); @@ -369,13 +369,13 @@ public async Task ErrorLoadingDicomContent() _messageSubscriberService.Setup(p => p.Acknowledge(It.IsAny())); _messageSubscriberService.Setup(p => p.RequeueWithDelay(It.IsAny())); _messageSubscriberService.Setup( - p => p.Subscribe(It.IsAny(), + p => p.SubscribeAsync(It.IsAny(), It.IsAny(), - It.IsAny>(), + It.IsAny>(), It.IsAny())) - .Callback, ushort>((topic, queue, messageReceivedCallback, prefetchCount) => + .Callback, ushort>(async (topic, queue, messageReceivedCallback, prefetchCount) => { - messageReceivedCallback(CreateMessageReceivedEventArgs("pacs")); + await messageReceivedCallback(CreateMessageReceivedEventArgs("pacs")); }); _storageService.Setup(p => p.GetObjectAsync(It.IsAny(), It.IsAny(), It.IsAny())) @@ -400,9 +400,9 @@ public async Task ErrorLoadingDicomContent() It.Is(match => CheckMessage(match, ExportStatus.Failure, FileExportStatus.UnsupportedDataType))), Times.Once()); _messageSubscriberService.Verify(p => p.Acknowledge(It.IsAny()), Times.Once()); _messageSubscriberService.Verify(p => p.RequeueWithDelay(It.IsAny()), Times.Never()); - _messageSubscriberService.Verify(p => p.Subscribe(It.IsAny(), + _messageSubscriberService.Verify(p => p.SubscribeAsync(It.IsAny(), It.IsAny(), - It.IsAny>(), + It.IsAny>(), It.IsAny()), Times.Once()); _logger.VerifyLoggingMessageBeginsWith("Error reading DICOM file: error", LogLevel.Error, Times.Once()); @@ -420,11 +420,11 @@ public async Task UnreachableServer() _messageSubscriberService.Setup(p => p.Acknowledge(It.IsAny())); _messageSubscriberService.Setup(p => p.RequeueWithDelay(It.IsAny())); _messageSubscriberService.Setup( - p => p.Subscribe(It.IsAny(), + p => p.SubscribeAsync(It.IsAny(), It.IsAny(), - It.IsAny>(), + It.IsAny>(), It.IsAny())) - .Callback, ushort>((topic, queue, messageReceivedCallback, prefetchCount) => + .Callback, ushort>((topic, queue, messageReceivedCallback, prefetchCount) => { messageReceivedCallback(CreateMessageReceivedEventArgs("pacs")); }); @@ -451,9 +451,9 @@ public async Task UnreachableServer() It.Is(match => CheckMessage(match, ExportStatus.Failure, FileExportStatus.ServiceError))), Times.Once()); _messageSubscriberService.Verify(p => p.Acknowledge(It.IsAny()), Times.Once()); _messageSubscriberService.Verify(p => p.RequeueWithDelay(It.IsAny()), Times.Never()); - _messageSubscriberService.Verify(p => p.Subscribe(It.IsAny(), + _messageSubscriberService.Verify(p => p.SubscribeAsync(It.IsAny(), It.IsAny(), - It.IsAny>(), + It.IsAny>(), It.IsAny()), Times.Once()); _logger.VerifyLoggingMessageBeginsWith("Association aborted with error", LogLevel.Error, Times.Once()); } @@ -470,11 +470,11 @@ public async Task ExportCompletes() _messageSubscriberService.Setup(p => p.Acknowledge(It.IsAny())); _messageSubscriberService.Setup(p => p.RequeueWithDelay(It.IsAny())); _messageSubscriberService.Setup( - p => p.Subscribe(It.IsAny(), + p => p.SubscribeAsync(It.IsAny(), It.IsAny(), - It.IsAny>(), + It.IsAny>(), It.IsAny())) - .Callback, ushort>((topic, queue, messageReceivedCallback, prefetchCount) => + .Callback, ushort>((topic, queue, messageReceivedCallback, prefetchCount) => { messageReceivedCallback(CreateMessageReceivedEventArgs("pacs")); }); @@ -501,9 +501,9 @@ public async Task ExportCompletes() It.Is(match => CheckMessage(match, ExportStatus.Success, FileExportStatus.Success))), Times.Once()); _messageSubscriberService.Verify(p => p.Acknowledge(It.IsAny()), Times.Once()); _messageSubscriberService.Verify(p => p.RequeueWithDelay(It.IsAny()), Times.Never()); - _messageSubscriberService.Verify(p => p.Subscribe(It.IsAny(), + _messageSubscriberService.Verify(p => p.SubscribeAsync(It.IsAny(), It.IsAny(), - It.IsAny>(), + It.IsAny>(), It.IsAny()), Times.Once()); _logger.VerifyLogging("Association accepted.", LogLevel.Information, Times.Once()); _logger.VerifyLogging($"Instance sent successfully.", LogLevel.Information, Times.Once());