diff --git a/src/InformaticsGateway/Services/Connectors/PayloadAssembler.cs b/src/InformaticsGateway/Services/Connectors/PayloadAssembler.cs index a539f5f23..38aed2197 100644 --- a/src/InformaticsGateway/Services/Connectors/PayloadAssembler.cs +++ b/src/InformaticsGateway/Services/Connectors/PayloadAssembler.cs @@ -135,20 +135,21 @@ private async void OnTimedEvent(Object source, System.Timers.ElapsedEventArgs e) var payload = await _payloads[key].Task.ConfigureAwait(false); using var loggerScope = _logger.BeginScope(new LoggingDataDictionary { { "CorrelationId", payload.CorrelationId } }); - if (payload.IsUploadCompleted()) + // Wait for timer window closes before sending payload for processing + if (payload.HasTimedOut) { - if (_payloads.TryRemove(key, out _)) + if (payload.IsUploadCompleted()) { - await QueueBucketForNotification(key, payload).ConfigureAwait(false); + if (_payloads.TryRemove(key, out _)) + { + await QueueBucketForNotification(key, payload).ConfigureAwait(false); + } + else + { + _logger.BucketRemoveError(key); + } } - else - { - _logger.BucketRemoveError(key); - } - } - else if (payload.HasTimedOut) - { - if (payload.AnyUploadFailures()) + else if (payload.AnyUploadFailures()) { _payloads.TryRemove(key, out _); _logger.PayloadRemovedWithFailureUploads(key); diff --git a/tests/Integration.Test/Common/Assertions.cs b/tests/Integration.Test/Common/Assertions.cs index 0aef382db..33787d549 100644 --- a/tests/Integration.Test/Common/Assertions.cs +++ b/tests/Integration.Test/Common/Assertions.cs @@ -199,7 +199,15 @@ internal void ShouldHaveCorrectNumberOfWorkflowRequestMessages(DataProvider data message.ApplicationId.Should().Be(MessageBrokerConfiguration.InformaticsGatewayApplicationId); var request = message.ConvertTo(); request.Should().NotBeNull(); - request.FileCount.Should().Be((dataProvider.DicomSpecs.NumberOfExpectedFiles(dataProvider.StudyGrouping))); + + if (dataProvider.ClientSendOverAssociations == 1 || messages.Count == 1) + { + request.FileCount.Should().Be((dataProvider.DicomSpecs.NumberOfExpectedFiles(dataProvider.StudyGrouping))); + } + else + { + request.FileCount.Should().Be(dataProvider.DicomSpecs.FileCount / dataProvider.ClientSendOverAssociations); + } if (dataProvider.Workflows is not null) { diff --git a/tests/Integration.Test/Common/DataProvider.cs b/tests/Integration.Test/Common/DataProvider.cs index cdab85f37..069282473 100644 --- a/tests/Integration.Test/Common/DataProvider.cs +++ b/tests/Integration.Test/Common/DataProvider.cs @@ -38,6 +38,9 @@ internal class DataProvider public DicomStatus DimseRsponse { get; internal set; } public string StudyGrouping { get; internal set; } public string[] Workflows { get; internal set; } = null; + public int ClientTimeout { get; internal set; } + public int ClientAssociationPulseTime { get; internal set; } = 0; + public int ClientSendOverAssociations { get; internal set; } = 1; public DataProvider(Configurations configurations, ISpecFlowOutputHelper outputHelper) { diff --git a/tests/Integration.Test/Common/DicomCStoreDataClient.cs b/tests/Integration.Test/Common/DicomCStoreDataClient.cs index da8c357b0..848590694 100644 --- a/tests/Integration.Test/Common/DicomCStoreDataClient.cs +++ b/tests/Integration.Test/Common/DicomCStoreDataClient.cs @@ -16,6 +16,7 @@ using System.Diagnostics; using Ardalis.GuardClauses; +using FellowOakDicom; using FellowOakDicom.Network; using FellowOakDicom.Network.Client; using Monai.Deploy.InformaticsGateway.Configuration; @@ -48,15 +49,59 @@ public async Task SendAsync(DataProvider dataProvider, params object[] args) var host = args[1].ToString(); var port = (int)args[2]; var calledAeTitle = args[3].ToString(); - var timeout = (TimeSpan)args[4]; + var timeout = TimeSpan.FromSeconds(dataProvider.ClientTimeout); + var associations = dataProvider.ClientSendOverAssociations; + var pauseTime = TimeSpan.FromSeconds(dataProvider.ClientAssociationPulseTime); _outputHelper.WriteLine($"C-STORE: {callingAeTitle} => {host}:{port}@{calledAeTitle}"); var stopwatch = new Stopwatch(); stopwatch.Start(); - var dicomClient = DicomClientFactory.Create(host, port, false, callingAeTitle, calledAeTitle); - var countdownEvent = new CountdownEvent(dataProvider.DicomSpecs.Files.Count); + + var filesPerAssociations = dataProvider.DicomSpecs.Files.Count / associations; + var failureStatus = new List(); - foreach (var file in dataProvider.DicomSpecs.Files) + for (int i = 0; i < associations; i++) + { + var files = dataProvider.DicomSpecs.Files.Skip(i * filesPerAssociations).Take(filesPerAssociations).ToList(); + if (i + 1 == associations && dataProvider.DicomSpecs.Files.Count > (i + 1) * filesPerAssociations) + { + files.AddRange(dataProvider.DicomSpecs.Files.Skip(i * filesPerAssociations)); + } + + try + { + await SendBatchAsync( + files, + callingAeTitle, + host, + port, + calledAeTitle, + timeout, + stopwatch, + failureStatus); + await Task.Delay(pauseTime); + } + catch (DicomAssociationRejectedException ex) + { + _outputHelper.WriteLine($"Association Rejected: {ex.Message}"); + dataProvider.DimseRsponse = DicomStatus.Cancel; + } + } + + stopwatch.Stop(); + lock (SyncRoot) + { + TotalTime += (int)stopwatch.Elapsed.TotalMilliseconds; + } + _outputHelper.WriteLine($"DICOMsend:{stopwatch.Elapsed.TotalSeconds}s"); + dataProvider.DimseRsponse = (failureStatus.Count == 0) ? DicomStatus.Success : failureStatus.First(); + } + + private async Task SendBatchAsync(List files, string callingAeTitle, string host, int port, string calledAeTitle, TimeSpan timeout, Stopwatch stopwatch, List failureStatus) + { + var dicomClient = DicomClientFactory.Create(host, port, false, callingAeTitle, calledAeTitle); + var countdownEvent = new CountdownEvent(files.Count); + foreach (var file in files) { var cStoreRequest = new DicomCStoreRequest(file); cStoreRequest.OnResponseReceived += (DicomCStoreRequest request, DicomCStoreResponse response) => @@ -67,24 +112,8 @@ public async Task SendAsync(DataProvider dataProvider, params object[] args) await dicomClient.AddRequestAsync(cStoreRequest); } - try - { - await dicomClient.SendAsync(); - countdownEvent.Wait(timeout); - stopwatch.Stop(); - lock (SyncRoot) - { - TotalTime += (int)stopwatch.Elapsed.TotalMilliseconds; - } - _outputHelper.WriteLine($"DICOMsend:{stopwatch.Elapsed.TotalSeconds}s"); - } - catch (DicomAssociationRejectedException ex) - { - _outputHelper.WriteLine($"Association Rejected: {ex.Message}"); - dataProvider.DimseRsponse = DicomStatus.Cancel; - } - - dataProvider.DimseRsponse = (failureStatus.Count == 0) ? DicomStatus.Success : failureStatus.First(); + await dicomClient.SendAsync(); + countdownEvent.Wait(timeout); } } } diff --git a/tests/Integration.Test/Drivers/RabbitMqConsumer.cs b/tests/Integration.Test/Drivers/RabbitMqConsumer.cs index 06f17f66a..38f07d716 100644 --- a/tests/Integration.Test/Drivers/RabbitMqConsumer.cs +++ b/tests/Integration.Test/Drivers/RabbitMqConsumer.cs @@ -16,6 +16,7 @@ */ using System.Collections.Concurrent; +using System.Diagnostics; using Monai.Deploy.Messaging.Messages; using Monai.Deploy.Messaging.RabbitMQ; using TechTalk.SpecFlow.Infrastructure; @@ -32,7 +33,6 @@ internal class RabbitMqConsumer : IDisposable public IReadOnlyList Messages { get { return _messages.ToList(); } } - public CountdownEvent MessageWaitHandle { get; private set; } public RabbitMqConsumer(RabbitMQMessageSubscriberService subscriberService, string queueName, ISpecFlowOutputHelper outputHelper) { @@ -54,15 +54,13 @@ public RabbitMqConsumer(RabbitMQMessageSubscriberService subscriberService, stri _messages.Add(eventArgs.Message); subscriberService.Acknowledge(eventArgs.Message); _outputHelper.WriteLine($"{DateTime.UtcNow} - {queueName} message received with correlation ID={eventArgs.Message.CorrelationId}, delivery tag={eventArgs.Message.DeliveryTag}"); - MessageWaitHandle?.Signal(); }); } - public void SetupMessageHandle(int count) + public void ClearMessages() { - _outputHelper.WriteLine($"Expecting {count} {_queueName} messages from RabbitMQ"); + _outputHelper.WriteLine($"Clearing messages received from RabbitMQ"); _messages.Clear(); - MessageWaitHandle = new CountdownEvent(count); } protected virtual void Dispose(bool disposing) @@ -84,5 +82,18 @@ public void Dispose() Dispose(disposing: true); GC.SuppressFinalize(this); } + + internal async Task WaitforAsync(int messageCount, TimeSpan messageWaitTimeSpan) + { + var stopwatch = new Stopwatch(); + stopwatch.Start(); + + while (messageCount > _messages.Count && stopwatch.Elapsed < messageWaitTimeSpan) + { + await Task.Delay(100); + } + + return messageCount >= _messages.Count; + } } } diff --git a/tests/Integration.Test/Features/AcrApi.feature b/tests/Integration.Test/Features/AcrApi.feature index 9c0bbec5c..9ad899b79 100644 --- a/tests/Integration.Test/Features/AcrApi.feature +++ b/tests/Integration.Test/Features/AcrApi.feature @@ -28,7 +28,7 @@ Feature: ACR API Given a DICOM study on a remote DICOMweb service And an ACR API request to query & retrieve by When the ACR API request is sent - Then a workflow requests sent to the message broker + Then a single workflow request is sent to the message broker And a study is uploaded to the storage service Examples: diff --git a/tests/Integration.Test/Features/DicomDimseScp.feature b/tests/Integration.Test/Features/DicomDimseScp.feature index fcaea8823..b14b93d4f 100644 --- a/tests/Integration.Test/Features/DicomDimseScp.feature +++ b/tests/Integration.Test/Features/DicomDimseScp.feature @@ -31,14 +31,17 @@ Feature: DICOM DIMSE SCP Services Scenario: Response to C-ECHO-RQ Given a called AE Title named 'C-ECHO-TEST' that groups by '0020,000D' for 5 seconds - When a C-ECHO-RQ is sent to 'C-ECHO-TEST' from 'TEST-RUNNER' with timeout of 30 seconds + And a DICOM client configured with 30 seconds timeout + When a C-ECHO-RQ is sent to 'C-ECHO-TEST' from 'TEST-RUNNER' Then a successful response should be received @messaging_workflow_request @messaging Scenario Outline: Respond to C-STORE-RQ and group data by Study Instance UID Given a called AE Title named 'C-STORE-STUDY' that groups by '0020,000D' for 3 seconds + And a DICOM client configured with 300 seconds timeout + And a DICOM client configured to send data over 1 associations and wait 0 between each association And studies - When a C-STORE-RQ is sent to 'Informatics Gateway' with AET 'C-STORE-STUDY' from 'TEST-RUNNER' with timeout of 300 seconds + When a C-STORE-RQ is sent to 'Informatics Gateway' with AET 'C-STORE-STUDY' from 'TEST-RUNNER' Then a successful response should be received And workflow requests sent to message broker And studies are uploaded to storage service @@ -53,8 +56,10 @@ Feature: DICOM DIMSE SCP Services @messaging_workflow_request @messaging Scenario Outline: Respond to C-STORE-RQ and group data by Series Instance UID Given a called AE Title named 'C-STORE-SERIES' that groups by '0020,000E' for 3 seconds + And a DICOM client configured with 300 seconds timeout + And a DICOM client configured to send data over 1 associations and wait 0 between each association And studies with series per study - When a C-STORE-RQ is sent to 'Informatics Gateway' with AET 'C-STORE-SERIES' from 'TEST-RUNNER' with timeout of 300 seconds + When a C-STORE-RQ is sent to 'Informatics Gateway' with AET 'C-STORE-SERIES' from 'TEST-RUNNER' Then a successful response should be received And workflow requests sent to message broker And studies are uploaded to storage service @@ -65,3 +70,19 @@ Feature: DICOM DIMSE SCP Services | CT | 1 | 2 | | MG | 1 | 3 | | US | 1 | 2 | + + @messaging_workflow_request @messaging + Scenario Outline: Respond to C-STORE-RQ and group data by Study Instance UID over multiple associations + Given a called AE Title named 'C-STORE-STUDY' that groups by '0020,000D' for 5 seconds + And a DICOM client configured with 300 seconds timeout + And a DICOM client configured to send data over associations and wait between each association + And studies with series per study + When C-STORE-RQ are sent to 'Informatics Gateway' with AET 'C-STORE-STUDY' from 'TEST-RUNNER' + Then a successful response should be received + And workflow requests sent to message broker + And studies are uploaded to storage service + + Examples: + | modality | study_count | series_count | seconds | workflow_requests | + | MG | 1 | 3 | 3 | 1 | + | MG | 1 | 3 | 6 | 3 | diff --git a/tests/Integration.Test/StepDefinitions/AcrApiStepDefinitions.cs b/tests/Integration.Test/StepDefinitions/AcrApiStepDefinitions.cs index 56845731c..80c8aea65 100644 --- a/tests/Integration.Test/StepDefinitions/AcrApiStepDefinitions.cs +++ b/tests/Integration.Test/StepDefinitions/AcrApiStepDefinitions.cs @@ -59,7 +59,7 @@ public async Task GivenADICOMStudySentToAETFromWithTimeoutOfSeconds() { var modality = "US"; _dataProvider.GenerateDicomData(modality, WorkflowStudyCount); - _receivedMessages.SetupMessageHandle(WorkflowStudyCount); + _receivedMessages.ClearMessages(); var storeScu = _objectContainer.Resolve("StoreSCU"); await storeScu.SendAsync(_dataProvider, "TEST-RUNNER", _configurations.OrthancOptions.Host, _configurations.OrthancOptions.DimsePort, "ORTHANC", TimeSpan.FromSeconds(300)); @@ -80,17 +80,16 @@ public async Task WhenTheACRAPIRequestIsSentTo() await _informaticsGatewayClient.Inference.NewInferenceRequest(_dataProvider.AcrRequest, CancellationToken.None); } - [Then(@"a workflow requests sent to the message broker")] - public void ThenAWorkflowRequestsSentToTheMessageBroker() + [Then(@"a single workflow request is sent to the message broker")] + public async Task ThenAWorkflowRequestsSentToTheMessageBroker() { - _receivedMessages.MessageWaitHandle.Wait(MessageWaitTimeSpan).Should().BeTrue(); + (await _receivedMessages.WaitforAsync(1, MessageWaitTimeSpan)).Should().BeTrue(); _assertions.ShouldHaveCorrectNumberOfWorkflowRequestMessagesAndAcrRequest(_dataProvider, _receivedMessages.Messages, WorkflowStudyCount); } [Then(@"a study is uploaded to the storage service")] public async Task ThenAStudyIsUploadedToTheStorageService() { - _receivedMessages.MessageWaitHandle.Wait(MessageWaitTimeSpan).Should().BeTrue(); _receivedMessages.Messages.Should().NotBeNullOrEmpty(); await _assertions.ShouldHaveUploadedDicomDataToMinio(_receivedMessages.Messages, _dataProvider.DicomSpecs.FileHashes); } diff --git a/tests/Integration.Test/StepDefinitions/DicomDimseScpServicesStepDefinitions.cs b/tests/Integration.Test/StepDefinitions/DicomDimseScpServicesStepDefinitions.cs index 82b1d6bc4..e88e2ef40 100644 --- a/tests/Integration.Test/StepDefinitions/DicomDimseScpServicesStepDefinitions.cs +++ b/tests/Integration.Test/StepDefinitions/DicomDimseScpServicesStepDefinitions.cs @@ -88,7 +88,8 @@ public void GivenXStudiesWithYSeriesPerStudy(int studyCount, string modality, in Guard.Against.NegativeOrZero(seriesPerStudy); _dataProvider.GenerateDicomData(modality, studyCount, seriesPerStudy); - _receivedMessages.SetupMessageHandle(_dataProvider.DicomSpecs.NumberOfExpectedRequests(_dataProvider.StudyGrouping)); + + _receivedMessages.ClearMessages(); } [Given(@"a called AE Title named '([^']*)' that groups by '([^']*)' for (.*) seconds")] @@ -125,12 +126,28 @@ await _informaticsGatewayClient.MonaiScpAeTitle.Create(new MonaiApplicationEntit } } - [When(@"a C-ECHO-RQ is sent to '([^']*)' from '([^']*)' with timeout of (.*) seconds")] - public async Task WhenAC_ECHO_RQIsSentToFromWithTimeoutOfSeconds(string calledAeTitle, string callingAeTitle, int clientTimeoutSeconds) + [Given(@"a DICOM client configured with (.*) seconds timeout")] + public void GivenADICOMClientConfiguredWithSecondsTimeout(int timeout) + { + Guard.Against.NegativeOrZero(timeout); + _dataProvider.ClientTimeout = timeout; + } + + [Given(@"a DICOM client configured to send data over (.*) associations and wait (.*) between each association")] + public void GivenADICOMClientConfiguredToSendDataOverAssociationsAndWaitSecondsBetweenEachAssociation(int associations, int pulseTime) + { + Guard.Against.NegativeOrZero(associations); + Guard.Against.Negative(pulseTime); + + _dataProvider.ClientSendOverAssociations = associations; + _dataProvider.ClientAssociationPulseTime = pulseTime; + } + + [When(@"a C-ECHO-RQ is sent to '([^']*)' from '([^']*)'")] + public async Task WhenAC_ECHO_RQIsSentToFromWithTimeoutOfSeconds(string calledAeTitle, string callingAeTitle) { Guard.Against.NullOrWhiteSpace(calledAeTitle); Guard.Against.NullOrWhiteSpace(callingAeTitle); - Guard.Against.NegativeOrZero(clientTimeoutSeconds); var echoScu = _objectContainer.Resolve("EchoSCU"); await echoScu.SendAsync( @@ -139,7 +156,7 @@ await echoScu.SendAsync( _configuration.InformaticsGatewayOptions.Host, _informaticsGatewayConfiguration.Dicom.Scp.Port, calledAeTitle, - TimeSpan.FromSeconds(clientTimeoutSeconds)); + TimeSpan.FromSeconds(_dataProvider.ClientTimeout)); } [Then(@"a successful response should be received")] @@ -148,13 +165,13 @@ public void ThenASuccessfulResponseShouldBeReceived() _dataProvider.DimseRsponse.Should().Be(DicomStatus.Success); } - [When(@"a C-STORE-RQ is sent to '([^']*)' with AET '([^']*)' from '([^']*)' with timeout of (.*) seconds")] - public async Task WhenAC_STORE_RQIsSentToWithAETFromWithTimeoutOfSeconds(string application, string calledAeTitle, string callingAeTitle, int clientTimeoutSeconds) + [When(@"a C-STORE-RQ is sent to '([^']*)' with AET '([^']*)' from '([^']*)'")] + [When(@"C-STORE-RQ are sent to '([^']*)' with AET '([^']*)' from '([^']*)'")] + public async Task WhenAC_STORE_RQIsSentToWithAETFromWithTimeoutOfSeconds(string application, string calledAeTitle, string callingAeTitle) { Guard.Against.NullOrWhiteSpace(application); Guard.Against.NullOrWhiteSpace(calledAeTitle); Guard.Against.NullOrWhiteSpace(callingAeTitle); - Guard.Against.NegativeOrZero(clientTimeoutSeconds); var storeScu = _objectContainer.Resolve("StoreSCU"); @@ -168,8 +185,7 @@ await storeScu.SendAsync( callingAeTitle, host, port, - calledAeTitle, - TimeSpan.FromSeconds(clientTimeoutSeconds)); + calledAeTitle); _dataProvider.ReplaceGeneratedDicomDataWithHashes(); } diff --git a/tests/Integration.Test/StepDefinitions/DicomWebStowServiceStepDefinitions.cs b/tests/Integration.Test/StepDefinitions/DicomWebStowServiceStepDefinitions.cs index bf383b356..63df359e7 100644 --- a/tests/Integration.Test/StepDefinitions/DicomWebStowServiceStepDefinitions.cs +++ b/tests/Integration.Test/StepDefinitions/DicomWebStowServiceStepDefinitions.cs @@ -51,7 +51,7 @@ public void GivenNStudies(int studyCount, string modality, string grouping) _dataProvider.GenerateDicomData(modality, studyCount); _dataProvider.StudyGrouping = grouping; - _receivedMessages.SetupMessageHandle(_dataProvider.DicomSpecs.NumberOfExpectedRequests(grouping)); + _receivedMessages.ClearMessages(); } [Given(@"a workflow named '(.*)'")] diff --git a/tests/Integration.Test/StepDefinitions/ExportServicesStepDefinitions.cs b/tests/Integration.Test/StepDefinitions/ExportServicesStepDefinitions.cs index b78d655e9..6d62a6723 100644 --- a/tests/Integration.Test/StepDefinitions/ExportServicesStepDefinitions.cs +++ b/tests/Integration.Test/StepDefinitions/ExportServicesStepDefinitions.cs @@ -132,14 +132,14 @@ public void WhenAExportRequestIsReceivedDesignatedFor(string routingKey) exportRequestEvent.CorrelationId, string.Empty); - _receivedMessages.SetupMessageHandle(1); + _receivedMessages.ClearMessages(); _messagePublisher.Publish(routingKey, message.ToMessage()); } [Then(@"Informatics Gateway exports the studies to the DICOM SCP")] public async Task ThenExportTheInstancesToTheDicomScp() { - _receivedMessages.MessageWaitHandle.Wait(DicomScpWaitTimeSpan).Should().BeTrue(); + (await _receivedMessages.WaitforAsync(1, DicomScpWaitTimeSpan)).Should().BeTrue(); foreach (var key in _dataProvider.DicomSpecs.FileHashes.Keys) { @@ -151,7 +151,7 @@ public async Task ThenExportTheInstancesToTheDicomScp() [Then(@"Informatics Gateway exports the studies to Orthanc")] public async Task ThenExportTheInstancesToOrthanc() { - _receivedMessages.MessageWaitHandle.Wait(DicomScpWaitTimeSpan).Should().BeTrue(); + (await _receivedMessages.WaitforAsync(1, DicomScpWaitTimeSpan)).Should().BeTrue(); var httpClient = new HttpClient(); var dicomWebClient = new DicomWebClient(httpClient, null); dicomWebClient.ConfigureServiceUris(new Uri(_configuration.OrthancOptions.DicomWebRoot)); diff --git a/tests/Integration.Test/StepDefinitions/FhirDefinitions.cs b/tests/Integration.Test/StepDefinitions/FhirDefinitions.cs index ab38d8c6b..d1122a070 100644 --- a/tests/Integration.Test/StepDefinitions/FhirDefinitions.cs +++ b/tests/Integration.Test/StepDefinitions/FhirDefinitions.cs @@ -57,7 +57,7 @@ public async Task GivenHl7MessagesInVersionX(string version, string format) Guard.Against.NullOrWhiteSpace(format); await _dataProvider.GenerateFhirMessages(version, format); - _receivedMessages.SetupMessageHandle(_dataProvider.FhirSpecs.Files.Count); + _receivedMessages.ClearMessages(); } [When(@"the FHIR messages are sent to Informatics Gateway")] @@ -67,9 +67,9 @@ public async Task WhenTheMessagesAreSentToInformaticsGateway() } [Then(@"workflow requests are sent to message broker")] - public void ThenWorkflowRequestAreSentToMessageBroker() + public async Task ThenWorkflowRequestAreSentToMessageBrokerAsync() { - _receivedMessages.MessageWaitHandle.Wait(WaitTimeSpan).Should().BeTrue(); + (await _receivedMessages.WaitforAsync(_dataProvider.FhirSpecs.Files.Count, WaitTimeSpan)).Should().BeTrue(); } [Then(@"FHIR resources are uploaded to storage service")] diff --git a/tests/Integration.Test/StepDefinitions/HealthLevel7Definitions.cs b/tests/Integration.Test/StepDefinitions/HealthLevel7Definitions.cs index 53436dfd5..22ccde5f3 100644 --- a/tests/Integration.Test/StepDefinitions/HealthLevel7Definitions.cs +++ b/tests/Integration.Test/StepDefinitions/HealthLevel7Definitions.cs @@ -49,7 +49,7 @@ public async Task GivenHl7MessagesInVersionX(string version) { Guard.Against.NullOrWhiteSpace(version); await _dataProvider.GenerateHl7Messages(version); - _receivedMessages.SetupMessageHandle(1); + _receivedMessages.ClearMessages(); } [When(@"the message are sent to Informatics Gateway")] @@ -71,9 +71,9 @@ public void ThenAcknowledgementAreReceived() } [Then(@"a workflow requests sent to message broker")] - public void ThenAWorkflowRequestIsSentToMessageBroker() + public async Task ThenAWorkflowRequestIsSentToMessageBrokerAsync() { - _receivedMessages.MessageWaitHandle.Wait(WaitTimeSpan).Should().BeTrue(); + (await _receivedMessages.WaitforAsync(_dataProvider.HL7Specs.Files.Count, WaitTimeSpan)).Should().BeTrue(); } [Then(@"messages are uploaded to storage service")] diff --git a/tests/Integration.Test/StepDefinitions/SharedDefinitions.cs b/tests/Integration.Test/StepDefinitions/SharedDefinitions.cs index 7c362328a..896923162 100644 --- a/tests/Integration.Test/StepDefinitions/SharedDefinitions.cs +++ b/tests/Integration.Test/StepDefinitions/SharedDefinitions.cs @@ -53,22 +53,21 @@ public void GivenNStudies(int studyCount, string modality) _dataProvider.GenerateDicomData(modality, studyCount); - _receivedMessages.SetupMessageHandle(_dataProvider.DicomSpecs.NumberOfExpectedRequests(_dataProvider.StudyGrouping)); + _receivedMessages.ClearMessages(); } [Then(@"(.*) workflow requests sent to message broker")] - public void ThenWorkflowRequestSentToMessageBroker(int workflowCount) + public async Task ThenWorkflowRequestSentToMessageBrokerAsync(int workflowCount) { Guard.Against.NegativeOrZero(workflowCount); - _receivedMessages.MessageWaitHandle.Wait(MessageWaitTimeSpan).Should().BeTrue(); + (await _receivedMessages.WaitforAsync(workflowCount, MessageWaitTimeSpan)).Should().BeTrue(); _assertions.ShouldHaveCorrectNumberOfWorkflowRequestMessages(_dataProvider, _receivedMessages.Messages, workflowCount); } [Then(@"studies are uploaded to storage service")] public async Task ThenXXFilesUploadedToStorageService() { - _receivedMessages.MessageWaitHandle.Wait(MessageWaitTimeSpan).Should().BeTrue(); await _assertions.ShouldHaveUploadedDicomDataToMinio(_receivedMessages.Messages, _dataProvider.DicomSpecs.FileHashes); } }