Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix payload assembler not respecting user configured timeout window #330

Merged
merged 3 commits into from
Feb 9, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 12 additions & 11 deletions src/InformaticsGateway/Services/Connectors/PayloadAssembler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -135,20 +135,21 @@ private async void OnTimedEvent(Object source, System.Timers.ElapsedEventArgs e)
var payload = await _payloads[key].Task.ConfigureAwait(false);
using var loggerScope = _logger.BeginScope(new LoggingDataDictionary<string, object> { { "CorrelationId", payload.CorrelationId } });

if (payload.IsUploadCompleted())
// Wait for timer window closes before sending payload for processing
if (payload.HasTimedOut)
{
if (_payloads.TryRemove(key, out _))
if (payload.IsUploadCompleted())
{
await QueueBucketForNotification(key, payload).ConfigureAwait(false);
if (_payloads.TryRemove(key, out _))
{
await QueueBucketForNotification(key, payload).ConfigureAwait(false);
}
else
{
_logger.BucketRemoveError(key);
}
}
else
{
_logger.BucketRemoveError(key);
}
}
else if (payload.HasTimedOut)
{
if (payload.AnyUploadFailures())
else if (payload.AnyUploadFailures())
{
_payloads.TryRemove(key, out _);
_logger.PayloadRemovedWithFailureUploads(key);
Expand Down
10 changes: 9 additions & 1 deletion tests/Integration.Test/Common/Assertions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,15 @@ internal void ShouldHaveCorrectNumberOfWorkflowRequestMessages(DataProvider data
message.ApplicationId.Should().Be(MessageBrokerConfiguration.InformaticsGatewayApplicationId);
var request = message.ConvertTo<WorkflowRequestEvent>();
request.Should().NotBeNull();
request.FileCount.Should().Be((dataProvider.DicomSpecs.NumberOfExpectedFiles(dataProvider.StudyGrouping)));

if (dataProvider.ClientSendOverAssociations == 1 || messages.Count == 1)
{
request.FileCount.Should().Be((dataProvider.DicomSpecs.NumberOfExpectedFiles(dataProvider.StudyGrouping)));
}
else
{
request.FileCount.Should().Be(dataProvider.DicomSpecs.FileCount / dataProvider.ClientSendOverAssociations);
}

if (dataProvider.Workflows is not null)
{
Expand Down
3 changes: 3 additions & 0 deletions tests/Integration.Test/Common/DataProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ internal class DataProvider
public DicomStatus DimseRsponse { get; internal set; }
public string StudyGrouping { get; internal set; }
public string[] Workflows { get; internal set; } = null;
public int ClientTimeout { get; internal set; }
public int ClientAssociationPulseTime { get; internal set; } = 0;
public int ClientSendOverAssociations { get; internal set; } = 1;

public DataProvider(Configurations configurations, ISpecFlowOutputHelper outputHelper)
{
Expand Down
73 changes: 51 additions & 22 deletions tests/Integration.Test/Common/DicomCStoreDataClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

using System.Diagnostics;
using Ardalis.GuardClauses;
using FellowOakDicom;
using FellowOakDicom.Network;
using FellowOakDicom.Network.Client;
using Monai.Deploy.InformaticsGateway.Configuration;
Expand Down Expand Up @@ -48,15 +49,59 @@ public async Task SendAsync(DataProvider dataProvider, params object[] args)
var host = args[1].ToString();
var port = (int)args[2];
var calledAeTitle = args[3].ToString();
var timeout = (TimeSpan)args[4];
var timeout = TimeSpan.FromSeconds(dataProvider.ClientTimeout);
var associations = dataProvider.ClientSendOverAssociations;
var pauseTime = TimeSpan.FromSeconds(dataProvider.ClientAssociationPulseTime);

_outputHelper.WriteLine($"C-STORE: {callingAeTitle} => {host}:{port}@{calledAeTitle}");
var stopwatch = new Stopwatch();
stopwatch.Start();
var dicomClient = DicomClientFactory.Create(host, port, false, callingAeTitle, calledAeTitle);
var countdownEvent = new CountdownEvent(dataProvider.DicomSpecs.Files.Count);

var filesPerAssociations = dataProvider.DicomSpecs.Files.Count / associations;

var failureStatus = new List<DicomStatus>();
foreach (var file in dataProvider.DicomSpecs.Files)
for (int i = 0; i < associations; i++)
{
var files = dataProvider.DicomSpecs.Files.Skip(i * filesPerAssociations).Take(filesPerAssociations).ToList();
if (i + 1 == associations && dataProvider.DicomSpecs.Files.Count > (i + 1) * filesPerAssociations)
{
files.AddRange(dataProvider.DicomSpecs.Files.Skip(i * filesPerAssociations));
}

try
{
await SendBatchAsync(
files,
callingAeTitle,
host,
port,
calledAeTitle,
timeout,
stopwatch,
failureStatus);
await Task.Delay(pauseTime);
}
catch (DicomAssociationRejectedException ex)
{
_outputHelper.WriteLine($"Association Rejected: {ex.Message}");
dataProvider.DimseRsponse = DicomStatus.Cancel;
}
}

stopwatch.Stop();
lock (SyncRoot)
{
TotalTime += (int)stopwatch.Elapsed.TotalMilliseconds;
}
_outputHelper.WriteLine($"DICOMsend:{stopwatch.Elapsed.TotalSeconds}s");
dataProvider.DimseRsponse = (failureStatus.Count == 0) ? DicomStatus.Success : failureStatus.First();
}

private async Task SendBatchAsync(List<DicomFile> files, string callingAeTitle, string host, int port, string calledAeTitle, TimeSpan timeout, Stopwatch stopwatch, List<DicomStatus> failureStatus)
{
var dicomClient = DicomClientFactory.Create(host, port, false, callingAeTitle, calledAeTitle);
var countdownEvent = new CountdownEvent(files.Count);
foreach (var file in files)
{
var cStoreRequest = new DicomCStoreRequest(file);
cStoreRequest.OnResponseReceived += (DicomCStoreRequest request, DicomCStoreResponse response) =>
Expand All @@ -67,24 +112,8 @@ public async Task SendAsync(DataProvider dataProvider, params object[] args)
await dicomClient.AddRequestAsync(cStoreRequest);
}

try
{
await dicomClient.SendAsync();
countdownEvent.Wait(timeout);
stopwatch.Stop();
lock (SyncRoot)
{
TotalTime += (int)stopwatch.Elapsed.TotalMilliseconds;
}
_outputHelper.WriteLine($"DICOMsend:{stopwatch.Elapsed.TotalSeconds}s");
}
catch (DicomAssociationRejectedException ex)
{
_outputHelper.WriteLine($"Association Rejected: {ex.Message}");
dataProvider.DimseRsponse = DicomStatus.Cancel;
}

dataProvider.DimseRsponse = (failureStatus.Count == 0) ? DicomStatus.Success : failureStatus.First();
await dicomClient.SendAsync();
countdownEvent.Wait(timeout);
}
}
}
21 changes: 16 additions & 5 deletions tests/Integration.Test/Drivers/RabbitMqConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/

using System.Collections.Concurrent;
using System.Diagnostics;
using Monai.Deploy.Messaging.Messages;
using Monai.Deploy.Messaging.RabbitMQ;
using TechTalk.SpecFlow.Infrastructure;
Expand All @@ -32,7 +33,6 @@ internal class RabbitMqConsumer : IDisposable

public IReadOnlyList<Message> Messages
{ get { return _messages.ToList(); } }
public CountdownEvent MessageWaitHandle { get; private set; }

public RabbitMqConsumer(RabbitMQMessageSubscriberService subscriberService, string queueName, ISpecFlowOutputHelper outputHelper)
{
Expand All @@ -54,15 +54,13 @@ public RabbitMqConsumer(RabbitMQMessageSubscriberService subscriberService, stri
_messages.Add(eventArgs.Message);
subscriberService.Acknowledge(eventArgs.Message);
_outputHelper.WriteLine($"{DateTime.UtcNow} - {queueName} message received with correlation ID={eventArgs.Message.CorrelationId}, delivery tag={eventArgs.Message.DeliveryTag}");
MessageWaitHandle?.Signal();
});
}

public void SetupMessageHandle(int count)
public void ClearMessages()
{
_outputHelper.WriteLine($"Expecting {count} {_queueName} messages from RabbitMQ");
_outputHelper.WriteLine($"Clearing messages received from RabbitMQ");
_messages.Clear();
MessageWaitHandle = new CountdownEvent(count);
}

protected virtual void Dispose(bool disposing)
Expand All @@ -84,5 +82,18 @@ public void Dispose()
Dispose(disposing: true);
GC.SuppressFinalize(this);
}

internal async Task<bool> WaitforAsync(int messageCount, TimeSpan messageWaitTimeSpan)
{
var stopwatch = new Stopwatch();
stopwatch.Start();

while (messageCount > _messages.Count && stopwatch.Elapsed < messageWaitTimeSpan)
{
await Task.Delay(100);
}

return messageCount >= _messages.Count;
}
}
}
2 changes: 1 addition & 1 deletion tests/Integration.Test/Features/AcrApi.feature
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ Feature: ACR API
Given a DICOM study on a remote DICOMweb service
And an ACR API request to query & retrieve by <requestType>
When the ACR API request is sent
Then a workflow requests sent to the message broker
Then a single workflow request is sent to the message broker
And a study is uploaded to the storage service

Examples:
Expand Down
27 changes: 24 additions & 3 deletions tests/Integration.Test/Features/DicomDimseScp.feature
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,17 @@ Feature: DICOM DIMSE SCP Services

Scenario: Response to C-ECHO-RQ
Given a called AE Title named 'C-ECHO-TEST' that groups by '0020,000D' for 5 seconds
When a C-ECHO-RQ is sent to 'C-ECHO-TEST' from 'TEST-RUNNER' with timeout of 30 seconds
And a DICOM client configured with 30 seconds timeout
When a C-ECHO-RQ is sent to 'C-ECHO-TEST' from 'TEST-RUNNER'
Then a successful response should be received

@messaging_workflow_request @messaging
Scenario Outline: Respond to C-STORE-RQ and group data by Study Instance UID
Given a called AE Title named 'C-STORE-STUDY' that groups by '0020,000D' for 3 seconds
And a DICOM client configured with 300 seconds timeout
And a DICOM client configured to send data over 1 associations and wait 0 between each association
And <count> <modality> studies
When a C-STORE-RQ is sent to 'Informatics Gateway' with AET 'C-STORE-STUDY' from 'TEST-RUNNER' with timeout of 300 seconds
When a C-STORE-RQ is sent to 'Informatics Gateway' with AET 'C-STORE-STUDY' from 'TEST-RUNNER'
Then a successful response should be received
And <count> workflow requests sent to message broker
And studies are uploaded to storage service
Expand All @@ -53,8 +56,10 @@ Feature: DICOM DIMSE SCP Services
@messaging_workflow_request @messaging
Scenario Outline: Respond to C-STORE-RQ and group data by Series Instance UID
Given a called AE Title named 'C-STORE-SERIES' that groups by '0020,000E' for 3 seconds
And a DICOM client configured with 300 seconds timeout
And a DICOM client configured to send data over 1 associations and wait 0 between each association
And <study_count> <modality> studies with <series_count> series per study
When a C-STORE-RQ is sent to 'Informatics Gateway' with AET 'C-STORE-SERIES' from 'TEST-RUNNER' with timeout of 300 seconds
When a C-STORE-RQ is sent to 'Informatics Gateway' with AET 'C-STORE-SERIES' from 'TEST-RUNNER'
Then a successful response should be received
And <series_count> workflow requests sent to message broker
And studies are uploaded to storage service
Expand All @@ -65,3 +70,19 @@ Feature: DICOM DIMSE SCP Services
| CT | 1 | 2 |
| MG | 1 | 3 |
| US | 1 | 2 |

@messaging_workflow_request @messaging
Scenario Outline: Respond to C-STORE-RQ and group data by Study Instance UID over multiple associations
Given a called AE Title named 'C-STORE-STUDY' that groups by '0020,000D' for 5 seconds
And a DICOM client configured with 300 seconds timeout
And a DICOM client configured to send data over <series_count> associations and wait <seconds> between each association
And <study_count> <modality> studies with <series_count> series per study
When C-STORE-RQ are sent to 'Informatics Gateway' with AET 'C-STORE-STUDY' from 'TEST-RUNNER'
Then a successful response should be received
And <workflow_requests> workflow requests sent to message broker
And studies are uploaded to storage service

Examples:
| modality | study_count | series_count | seconds | workflow_requests |
| MG | 1 | 3 | 1 | 1 |
| MG | 1 | 3 | 6 | 3 |
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public async Task GivenADICOMStudySentToAETFromWithTimeoutOfSeconds()
{
var modality = "US";
_dataProvider.GenerateDicomData(modality, WorkflowStudyCount);
_receivedMessages.SetupMessageHandle(WorkflowStudyCount);
_receivedMessages.ClearMessages();

var storeScu = _objectContainer.Resolve<IDataClient>("StoreSCU");
await storeScu.SendAsync(_dataProvider, "TEST-RUNNER", _configurations.OrthancOptions.Host, _configurations.OrthancOptions.DimsePort, "ORTHANC", TimeSpan.FromSeconds(300));
Expand All @@ -80,17 +80,16 @@ public async Task WhenTheACRAPIRequestIsSentTo()
await _informaticsGatewayClient.Inference.NewInferenceRequest(_dataProvider.AcrRequest, CancellationToken.None);
}

[Then(@"a workflow requests sent to the message broker")]
public void ThenAWorkflowRequestsSentToTheMessageBroker()
[Then(@"a single workflow request is sent to the message broker")]
public async Task ThenAWorkflowRequestsSentToTheMessageBroker()
{
_receivedMessages.MessageWaitHandle.Wait(MessageWaitTimeSpan).Should().BeTrue();
(await _receivedMessages.WaitforAsync(1, MessageWaitTimeSpan)).Should().BeTrue();
_assertions.ShouldHaveCorrectNumberOfWorkflowRequestMessagesAndAcrRequest(_dataProvider, _receivedMessages.Messages, WorkflowStudyCount);
}

[Then(@"a study is uploaded to the storage service")]
public async Task ThenAStudyIsUploadedToTheStorageService()
{
_receivedMessages.MessageWaitHandle.Wait(MessageWaitTimeSpan).Should().BeTrue();
_receivedMessages.Messages.Should().NotBeNullOrEmpty();
await _assertions.ShouldHaveUploadedDicomDataToMinio(_receivedMessages.Messages, _dataProvider.DicomSpecs.FileHashes);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,8 @@ public void GivenXStudiesWithYSeriesPerStudy(int studyCount, string modality, in
Guard.Against.NegativeOrZero(seriesPerStudy);

_dataProvider.GenerateDicomData(modality, studyCount, seriesPerStudy);
_receivedMessages.SetupMessageHandle(_dataProvider.DicomSpecs.NumberOfExpectedRequests(_dataProvider.StudyGrouping));

_receivedMessages.ClearMessages();
}

[Given(@"a called AE Title named '([^']*)' that groups by '([^']*)' for (.*) seconds")]
Expand Down Expand Up @@ -125,12 +126,28 @@ await _informaticsGatewayClient.MonaiScpAeTitle.Create(new MonaiApplicationEntit
}
}

[When(@"a C-ECHO-RQ is sent to '([^']*)' from '([^']*)' with timeout of (.*) seconds")]
public async Task WhenAC_ECHO_RQIsSentToFromWithTimeoutOfSeconds(string calledAeTitle, string callingAeTitle, int clientTimeoutSeconds)
[Given(@"a DICOM client configured with (.*) seconds timeout")]
public void GivenADICOMClientConfiguredWithSecondsTimeout(int timeout)
{
Guard.Against.NegativeOrZero(timeout);
_dataProvider.ClientTimeout = timeout;
}

[Given(@"a DICOM client configured to send data over (.*) associations and wait (.*) between each association")]
public void GivenADICOMClientConfiguredToSendDataOverAssociationsAndWaitSecondsBetweenEachAssociation(int associations, int pulseTime)
{
Guard.Against.NegativeOrZero(associations);
Guard.Against.Negative(pulseTime);

_dataProvider.ClientSendOverAssociations = associations;
_dataProvider.ClientAssociationPulseTime = pulseTime;
}

[When(@"a C-ECHO-RQ is sent to '([^']*)' from '([^']*)'")]
public async Task WhenAC_ECHO_RQIsSentToFromWithTimeoutOfSeconds(string calledAeTitle, string callingAeTitle)
{
Guard.Against.NullOrWhiteSpace(calledAeTitle);
Guard.Against.NullOrWhiteSpace(callingAeTitle);
Guard.Against.NegativeOrZero(clientTimeoutSeconds);

var echoScu = _objectContainer.Resolve<IDataClient>("EchoSCU");
await echoScu.SendAsync(
Expand All @@ -139,7 +156,7 @@ await echoScu.SendAsync(
_configuration.InformaticsGatewayOptions.Host,
_informaticsGatewayConfiguration.Dicom.Scp.Port,
calledAeTitle,
TimeSpan.FromSeconds(clientTimeoutSeconds));
TimeSpan.FromSeconds(_dataProvider.ClientTimeout));
}

[Then(@"a successful response should be received")]
Expand All @@ -148,13 +165,13 @@ public void ThenASuccessfulResponseShouldBeReceived()
_dataProvider.DimseRsponse.Should().Be(DicomStatus.Success);
}

[When(@"a C-STORE-RQ is sent to '([^']*)' with AET '([^']*)' from '([^']*)' with timeout of (.*) seconds")]
public async Task WhenAC_STORE_RQIsSentToWithAETFromWithTimeoutOfSeconds(string application, string calledAeTitle, string callingAeTitle, int clientTimeoutSeconds)
[When(@"a C-STORE-RQ is sent to '([^']*)' with AET '([^']*)' from '([^']*)'")]
[When(@"C-STORE-RQ are sent to '([^']*)' with AET '([^']*)' from '([^']*)'")]
public async Task WhenAC_STORE_RQIsSentToWithAETFromWithTimeoutOfSeconds(string application, string calledAeTitle, string callingAeTitle)
{
Guard.Against.NullOrWhiteSpace(application);
Guard.Against.NullOrWhiteSpace(calledAeTitle);
Guard.Against.NullOrWhiteSpace(callingAeTitle);
Guard.Against.NegativeOrZero(clientTimeoutSeconds);

var storeScu = _objectContainer.Resolve<IDataClient>("StoreSCU");

Expand All @@ -168,8 +185,7 @@ await storeScu.SendAsync(
callingAeTitle,
host,
port,
calledAeTitle,
TimeSpan.FromSeconds(clientTimeoutSeconds));
calledAeTitle);

_dataProvider.ReplaceGeneratedDicomDataWithHashes();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public void GivenNStudies(int studyCount, string modality, string grouping)

_dataProvider.GenerateDicomData(modality, studyCount);
_dataProvider.StudyGrouping = grouping;
_receivedMessages.SetupMessageHandle(_dataProvider.DicomSpecs.NumberOfExpectedRequests(grouping));
_receivedMessages.ClearMessages();
}

[Given(@"a workflow named '(.*)'")]
Expand Down
Loading