Skip to content

Commit

Permalink
Fix payload assembler not respecting user configured timeout window (#…
Browse files Browse the repository at this point in the history
…330)

* Fix payload assembler not respecting user configured timeout window

Signed-off-by: Victor Chang <vicchang@nvidia.com>

* Add test case for grouping over multiple associations

Signed-off-by: Victor Chang <vicchang@nvidia.com>

* Adjust pulse time to catch the previous regression

Signed-off-by: Victor Chang <vicchang@nvidia.com>

---------

Signed-off-by: Victor Chang <vicchang@nvidia.com>
  • Loading branch information
mocsharp authored Feb 9, 2023
1 parent 7ad457a commit e3a463b
Show file tree
Hide file tree
Showing 14 changed files with 159 additions and 72 deletions.
23 changes: 12 additions & 11 deletions src/InformaticsGateway/Services/Connectors/PayloadAssembler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -135,20 +135,21 @@ private async void OnTimedEvent(Object source, System.Timers.ElapsedEventArgs e)
var payload = await _payloads[key].Task.ConfigureAwait(false);
using var loggerScope = _logger.BeginScope(new LoggingDataDictionary<string, object> { { "CorrelationId", payload.CorrelationId } });

if (payload.IsUploadCompleted())
// Wait for timer window closes before sending payload for processing
if (payload.HasTimedOut)
{
if (_payloads.TryRemove(key, out _))
if (payload.IsUploadCompleted())
{
await QueueBucketForNotification(key, payload).ConfigureAwait(false);
if (_payloads.TryRemove(key, out _))
{
await QueueBucketForNotification(key, payload).ConfigureAwait(false);
}
else
{
_logger.BucketRemoveError(key);
}
}
else
{
_logger.BucketRemoveError(key);
}
}
else if (payload.HasTimedOut)
{
if (payload.AnyUploadFailures())
else if (payload.AnyUploadFailures())
{
_payloads.TryRemove(key, out _);
_logger.PayloadRemovedWithFailureUploads(key);
Expand Down
10 changes: 9 additions & 1 deletion tests/Integration.Test/Common/Assertions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,15 @@ internal void ShouldHaveCorrectNumberOfWorkflowRequestMessages(DataProvider data
message.ApplicationId.Should().Be(MessageBrokerConfiguration.InformaticsGatewayApplicationId);
var request = message.ConvertTo<WorkflowRequestEvent>();
request.Should().NotBeNull();
request.FileCount.Should().Be((dataProvider.DicomSpecs.NumberOfExpectedFiles(dataProvider.StudyGrouping)));

if (dataProvider.ClientSendOverAssociations == 1 || messages.Count == 1)
{
request.FileCount.Should().Be((dataProvider.DicomSpecs.NumberOfExpectedFiles(dataProvider.StudyGrouping)));
}
else
{
request.FileCount.Should().Be(dataProvider.DicomSpecs.FileCount / dataProvider.ClientSendOverAssociations);
}

if (dataProvider.Workflows is not null)
{
Expand Down
3 changes: 3 additions & 0 deletions tests/Integration.Test/Common/DataProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ internal class DataProvider
public DicomStatus DimseRsponse { get; internal set; }
public string StudyGrouping { get; internal set; }
public string[] Workflows { get; internal set; } = null;
public int ClientTimeout { get; internal set; }
public int ClientAssociationPulseTime { get; internal set; } = 0;
public int ClientSendOverAssociations { get; internal set; } = 1;

public DataProvider(Configurations configurations, ISpecFlowOutputHelper outputHelper)
{
Expand Down
73 changes: 51 additions & 22 deletions tests/Integration.Test/Common/DicomCStoreDataClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

using System.Diagnostics;
using Ardalis.GuardClauses;
using FellowOakDicom;
using FellowOakDicom.Network;
using FellowOakDicom.Network.Client;
using Monai.Deploy.InformaticsGateway.Configuration;
Expand Down Expand Up @@ -48,15 +49,59 @@ public async Task SendAsync(DataProvider dataProvider, params object[] args)
var host = args[1].ToString();
var port = (int)args[2];
var calledAeTitle = args[3].ToString();
var timeout = (TimeSpan)args[4];
var timeout = TimeSpan.FromSeconds(dataProvider.ClientTimeout);
var associations = dataProvider.ClientSendOverAssociations;
var pauseTime = TimeSpan.FromSeconds(dataProvider.ClientAssociationPulseTime);

_outputHelper.WriteLine($"C-STORE: {callingAeTitle} => {host}:{port}@{calledAeTitle}");
var stopwatch = new Stopwatch();
stopwatch.Start();
var dicomClient = DicomClientFactory.Create(host, port, false, callingAeTitle, calledAeTitle);
var countdownEvent = new CountdownEvent(dataProvider.DicomSpecs.Files.Count);

var filesPerAssociations = dataProvider.DicomSpecs.Files.Count / associations;

var failureStatus = new List<DicomStatus>();
foreach (var file in dataProvider.DicomSpecs.Files)
for (int i = 0; i < associations; i++)
{
var files = dataProvider.DicomSpecs.Files.Skip(i * filesPerAssociations).Take(filesPerAssociations).ToList();
if (i + 1 == associations && dataProvider.DicomSpecs.Files.Count > (i + 1) * filesPerAssociations)
{
files.AddRange(dataProvider.DicomSpecs.Files.Skip(i * filesPerAssociations));
}

try
{
await SendBatchAsync(
files,
callingAeTitle,
host,
port,
calledAeTitle,
timeout,
stopwatch,
failureStatus);
await Task.Delay(pauseTime);
}
catch (DicomAssociationRejectedException ex)
{
_outputHelper.WriteLine($"Association Rejected: {ex.Message}");
dataProvider.DimseRsponse = DicomStatus.Cancel;
}
}

stopwatch.Stop();
lock (SyncRoot)
{
TotalTime += (int)stopwatch.Elapsed.TotalMilliseconds;
}
_outputHelper.WriteLine($"DICOMsend:{stopwatch.Elapsed.TotalSeconds}s");
dataProvider.DimseRsponse = (failureStatus.Count == 0) ? DicomStatus.Success : failureStatus.First();
}

private async Task SendBatchAsync(List<DicomFile> files, string callingAeTitle, string host, int port, string calledAeTitle, TimeSpan timeout, Stopwatch stopwatch, List<DicomStatus> failureStatus)
{
var dicomClient = DicomClientFactory.Create(host, port, false, callingAeTitle, calledAeTitle);
var countdownEvent = new CountdownEvent(files.Count);
foreach (var file in files)
{
var cStoreRequest = new DicomCStoreRequest(file);
cStoreRequest.OnResponseReceived += (DicomCStoreRequest request, DicomCStoreResponse response) =>
Expand All @@ -67,24 +112,8 @@ public async Task SendAsync(DataProvider dataProvider, params object[] args)
await dicomClient.AddRequestAsync(cStoreRequest);
}

try
{
await dicomClient.SendAsync();
countdownEvent.Wait(timeout);
stopwatch.Stop();
lock (SyncRoot)
{
TotalTime += (int)stopwatch.Elapsed.TotalMilliseconds;
}
_outputHelper.WriteLine($"DICOMsend:{stopwatch.Elapsed.TotalSeconds}s");
}
catch (DicomAssociationRejectedException ex)
{
_outputHelper.WriteLine($"Association Rejected: {ex.Message}");
dataProvider.DimseRsponse = DicomStatus.Cancel;
}

dataProvider.DimseRsponse = (failureStatus.Count == 0) ? DicomStatus.Success : failureStatus.First();
await dicomClient.SendAsync();
countdownEvent.Wait(timeout);
}
}
}
21 changes: 16 additions & 5 deletions tests/Integration.Test/Drivers/RabbitMqConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/

using System.Collections.Concurrent;
using System.Diagnostics;
using Monai.Deploy.Messaging.Messages;
using Monai.Deploy.Messaging.RabbitMQ;
using TechTalk.SpecFlow.Infrastructure;
Expand All @@ -32,7 +33,6 @@ internal class RabbitMqConsumer : IDisposable

public IReadOnlyList<Message> Messages
{ get { return _messages.ToList(); } }
public CountdownEvent MessageWaitHandle { get; private set; }

public RabbitMqConsumer(RabbitMQMessageSubscriberService subscriberService, string queueName, ISpecFlowOutputHelper outputHelper)
{
Expand All @@ -54,15 +54,13 @@ public RabbitMqConsumer(RabbitMQMessageSubscriberService subscriberService, stri
_messages.Add(eventArgs.Message);
subscriberService.Acknowledge(eventArgs.Message);
_outputHelper.WriteLine($"{DateTime.UtcNow} - {queueName} message received with correlation ID={eventArgs.Message.CorrelationId}, delivery tag={eventArgs.Message.DeliveryTag}");
MessageWaitHandle?.Signal();
});
}

public void SetupMessageHandle(int count)
public void ClearMessages()
{
_outputHelper.WriteLine($"Expecting {count} {_queueName} messages from RabbitMQ");
_outputHelper.WriteLine($"Clearing messages received from RabbitMQ");
_messages.Clear();
MessageWaitHandle = new CountdownEvent(count);
}

protected virtual void Dispose(bool disposing)
Expand All @@ -84,5 +82,18 @@ public void Dispose()
Dispose(disposing: true);
GC.SuppressFinalize(this);
}

internal async Task<bool> WaitforAsync(int messageCount, TimeSpan messageWaitTimeSpan)
{
var stopwatch = new Stopwatch();
stopwatch.Start();

while (messageCount > _messages.Count && stopwatch.Elapsed < messageWaitTimeSpan)
{
await Task.Delay(100);
}

return messageCount >= _messages.Count;
}
}
}
2 changes: 1 addition & 1 deletion tests/Integration.Test/Features/AcrApi.feature
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ Feature: ACR API
Given a DICOM study on a remote DICOMweb service
And an ACR API request to query & retrieve by <requestType>
When the ACR API request is sent
Then a workflow requests sent to the message broker
Then a single workflow request is sent to the message broker
And a study is uploaded to the storage service

Examples:
Expand Down
27 changes: 24 additions & 3 deletions tests/Integration.Test/Features/DicomDimseScp.feature
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,17 @@ Feature: DICOM DIMSE SCP Services

Scenario: Response to C-ECHO-RQ
Given a called AE Title named 'C-ECHO-TEST' that groups by '0020,000D' for 5 seconds
When a C-ECHO-RQ is sent to 'C-ECHO-TEST' from 'TEST-RUNNER' with timeout of 30 seconds
And a DICOM client configured with 30 seconds timeout
When a C-ECHO-RQ is sent to 'C-ECHO-TEST' from 'TEST-RUNNER'
Then a successful response should be received

@messaging_workflow_request @messaging
Scenario Outline: Respond to C-STORE-RQ and group data by Study Instance UID
Given a called AE Title named 'C-STORE-STUDY' that groups by '0020,000D' for 3 seconds
And a DICOM client configured with 300 seconds timeout
And a DICOM client configured to send data over 1 associations and wait 0 between each association
And <count> <modality> studies
When a C-STORE-RQ is sent to 'Informatics Gateway' with AET 'C-STORE-STUDY' from 'TEST-RUNNER' with timeout of 300 seconds
When a C-STORE-RQ is sent to 'Informatics Gateway' with AET 'C-STORE-STUDY' from 'TEST-RUNNER'
Then a successful response should be received
And <count> workflow requests sent to message broker
And studies are uploaded to storage service
Expand All @@ -53,8 +56,10 @@ Feature: DICOM DIMSE SCP Services
@messaging_workflow_request @messaging
Scenario Outline: Respond to C-STORE-RQ and group data by Series Instance UID
Given a called AE Title named 'C-STORE-SERIES' that groups by '0020,000E' for 3 seconds
And a DICOM client configured with 300 seconds timeout
And a DICOM client configured to send data over 1 associations and wait 0 between each association
And <study_count> <modality> studies with <series_count> series per study
When a C-STORE-RQ is sent to 'Informatics Gateway' with AET 'C-STORE-SERIES' from 'TEST-RUNNER' with timeout of 300 seconds
When a C-STORE-RQ is sent to 'Informatics Gateway' with AET 'C-STORE-SERIES' from 'TEST-RUNNER'
Then a successful response should be received
And <series_count> workflow requests sent to message broker
And studies are uploaded to storage service
Expand All @@ -65,3 +70,19 @@ Feature: DICOM DIMSE SCP Services
| CT | 1 | 2 |
| MG | 1 | 3 |
| US | 1 | 2 |

@messaging_workflow_request @messaging
Scenario Outline: Respond to C-STORE-RQ and group data by Study Instance UID over multiple associations
Given a called AE Title named 'C-STORE-STUDY' that groups by '0020,000D' for 5 seconds
And a DICOM client configured with 300 seconds timeout
And a DICOM client configured to send data over <series_count> associations and wait <seconds> between each association
And <study_count> <modality> studies with <series_count> series per study
When C-STORE-RQ are sent to 'Informatics Gateway' with AET 'C-STORE-STUDY' from 'TEST-RUNNER'
Then a successful response should be received
And <workflow_requests> workflow requests sent to message broker
And studies are uploaded to storage service

Examples:
| modality | study_count | series_count | seconds | workflow_requests |
| MG | 1 | 3 | 3 | 1 |
| MG | 1 | 3 | 6 | 3 |
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public async Task GivenADICOMStudySentToAETFromWithTimeoutOfSeconds()
{
var modality = "US";
_dataProvider.GenerateDicomData(modality, WorkflowStudyCount);
_receivedMessages.SetupMessageHandle(WorkflowStudyCount);
_receivedMessages.ClearMessages();

var storeScu = _objectContainer.Resolve<IDataClient>("StoreSCU");
await storeScu.SendAsync(_dataProvider, "TEST-RUNNER", _configurations.OrthancOptions.Host, _configurations.OrthancOptions.DimsePort, "ORTHANC", TimeSpan.FromSeconds(300));
Expand All @@ -80,17 +80,16 @@ public async Task WhenTheACRAPIRequestIsSentTo()
await _informaticsGatewayClient.Inference.NewInferenceRequest(_dataProvider.AcrRequest, CancellationToken.None);
}

[Then(@"a workflow requests sent to the message broker")]
public void ThenAWorkflowRequestsSentToTheMessageBroker()
[Then(@"a single workflow request is sent to the message broker")]
public async Task ThenAWorkflowRequestsSentToTheMessageBroker()
{
_receivedMessages.MessageWaitHandle.Wait(MessageWaitTimeSpan).Should().BeTrue();
(await _receivedMessages.WaitforAsync(1, MessageWaitTimeSpan)).Should().BeTrue();
_assertions.ShouldHaveCorrectNumberOfWorkflowRequestMessagesAndAcrRequest(_dataProvider, _receivedMessages.Messages, WorkflowStudyCount);
}

[Then(@"a study is uploaded to the storage service")]
public async Task ThenAStudyIsUploadedToTheStorageService()
{
_receivedMessages.MessageWaitHandle.Wait(MessageWaitTimeSpan).Should().BeTrue();
_receivedMessages.Messages.Should().NotBeNullOrEmpty();
await _assertions.ShouldHaveUploadedDicomDataToMinio(_receivedMessages.Messages, _dataProvider.DicomSpecs.FileHashes);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,8 @@ public void GivenXStudiesWithYSeriesPerStudy(int studyCount, string modality, in
Guard.Against.NegativeOrZero(seriesPerStudy);

_dataProvider.GenerateDicomData(modality, studyCount, seriesPerStudy);
_receivedMessages.SetupMessageHandle(_dataProvider.DicomSpecs.NumberOfExpectedRequests(_dataProvider.StudyGrouping));

_receivedMessages.ClearMessages();
}

[Given(@"a called AE Title named '([^']*)' that groups by '([^']*)' for (.*) seconds")]
Expand Down Expand Up @@ -125,12 +126,28 @@ await _informaticsGatewayClient.MonaiScpAeTitle.Create(new MonaiApplicationEntit
}
}

[When(@"a C-ECHO-RQ is sent to '([^']*)' from '([^']*)' with timeout of (.*) seconds")]
public async Task WhenAC_ECHO_RQIsSentToFromWithTimeoutOfSeconds(string calledAeTitle, string callingAeTitle, int clientTimeoutSeconds)
[Given(@"a DICOM client configured with (.*) seconds timeout")]
public void GivenADICOMClientConfiguredWithSecondsTimeout(int timeout)
{
Guard.Against.NegativeOrZero(timeout);
_dataProvider.ClientTimeout = timeout;
}

[Given(@"a DICOM client configured to send data over (.*) associations and wait (.*) between each association")]
public void GivenADICOMClientConfiguredToSendDataOverAssociationsAndWaitSecondsBetweenEachAssociation(int associations, int pulseTime)
{
Guard.Against.NegativeOrZero(associations);
Guard.Against.Negative(pulseTime);

_dataProvider.ClientSendOverAssociations = associations;
_dataProvider.ClientAssociationPulseTime = pulseTime;
}

[When(@"a C-ECHO-RQ is sent to '([^']*)' from '([^']*)'")]
public async Task WhenAC_ECHO_RQIsSentToFromWithTimeoutOfSeconds(string calledAeTitle, string callingAeTitle)
{
Guard.Against.NullOrWhiteSpace(calledAeTitle);
Guard.Against.NullOrWhiteSpace(callingAeTitle);
Guard.Against.NegativeOrZero(clientTimeoutSeconds);

var echoScu = _objectContainer.Resolve<IDataClient>("EchoSCU");
await echoScu.SendAsync(
Expand All @@ -139,7 +156,7 @@ await echoScu.SendAsync(
_configuration.InformaticsGatewayOptions.Host,
_informaticsGatewayConfiguration.Dicom.Scp.Port,
calledAeTitle,
TimeSpan.FromSeconds(clientTimeoutSeconds));
TimeSpan.FromSeconds(_dataProvider.ClientTimeout));
}

[Then(@"a successful response should be received")]
Expand All @@ -148,13 +165,13 @@ public void ThenASuccessfulResponseShouldBeReceived()
_dataProvider.DimseRsponse.Should().Be(DicomStatus.Success);
}

[When(@"a C-STORE-RQ is sent to '([^']*)' with AET '([^']*)' from '([^']*)' with timeout of (.*) seconds")]
public async Task WhenAC_STORE_RQIsSentToWithAETFromWithTimeoutOfSeconds(string application, string calledAeTitle, string callingAeTitle, int clientTimeoutSeconds)
[When(@"a C-STORE-RQ is sent to '([^']*)' with AET '([^']*)' from '([^']*)'")]
[When(@"C-STORE-RQ are sent to '([^']*)' with AET '([^']*)' from '([^']*)'")]
public async Task WhenAC_STORE_RQIsSentToWithAETFromWithTimeoutOfSeconds(string application, string calledAeTitle, string callingAeTitle)
{
Guard.Against.NullOrWhiteSpace(application);
Guard.Against.NullOrWhiteSpace(calledAeTitle);
Guard.Against.NullOrWhiteSpace(callingAeTitle);
Guard.Against.NegativeOrZero(clientTimeoutSeconds);

var storeScu = _objectContainer.Resolve<IDataClient>("StoreSCU");

Expand All @@ -168,8 +185,7 @@ await storeScu.SendAsync(
callingAeTitle,
host,
port,
calledAeTitle,
TimeSpan.FromSeconds(clientTimeoutSeconds));
calledAeTitle);

_dataProvider.ReplaceGeneratedDicomDataWithHashes();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public void GivenNStudies(int studyCount, string modality, string grouping)

_dataProvider.GenerateDicomData(modality, studyCount);
_dataProvider.StudyGrouping = grouping;
_receivedMessages.SetupMessageHandle(_dataProvider.DicomSpecs.NumberOfExpectedRequests(grouping));
_receivedMessages.ClearMessages();
}

[Given(@"a workflow named '(.*)'")]
Expand Down
Loading

0 comments on commit e3a463b

Please sign in to comment.