Skip to content

Commit

Permalink
Merge pull request #395 from Project-MONAI/AC-1298
Browse files Browse the repository at this point in the history
remove the need for a double copy to minio
  • Loading branch information
woodheadio authored Jun 8, 2023
2 parents 70bd3da + 556d2c3 commit b44839a
Show file tree
Hide file tree
Showing 20 changed files with 68 additions and 183 deletions.
2 changes: 2 additions & 0 deletions src/Api/Storage/FileStorageMetadata.cs
Original file line number Diff line number Diff line change
Expand Up @@ -116,5 +116,7 @@ public virtual void SetFailed()
{
File.SetFailed();
}

public string? PayloadId { get; set; }
}
}
2 changes: 1 addition & 1 deletion src/Configuration/ConfigurationValidator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ private bool IsValidDirectory(string source, string directory)
private bool IsValidBucketName(string source, string bucketName)
{
var valid = IsNotNullOrWhiteSpace(source, bucketName);
var regex = new Regex("(?=^.{3,63}$)(^[a-z0-9]+[a-z0-9\\-]+[a-z0-9]+$)");
var regex = new Regex("(?=^.{3,63}$)(^[a-z0-9]+[a-z0-9\\-]+[a-z0-9]+$)", new RegexOptions(), TimeSpan.FromSeconds(5));
if (!regex.IsMatch(bucketName))
{
valid = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ public async Task GivenAAETitleName_WhenFindByAETAsyncIsCalled_ExpectItToReturnM
Assert.Equal("AET1", actual.FirstOrDefault()!.Name);

actual = await store.FindByAETAsync("AET6").ConfigureAwait(false);
Assert.NotNull(actual);
Assert.Empty(actual);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ public async Task GivenAETitle_WhenFindByAETitleAsyncIsCalled_ExpectItToReturnMa
Assert.Equal("AET1", actual.FirstOrDefault()!.Name);

actual = await store.FindByAETAsync("AET6").ConfigureAwait(false);
Assert.NotNull(actual);
Assert.Empty(actual);
}

Expand Down
10 changes: 5 additions & 5 deletions src/InformaticsGateway/Logging/Log.4000.ObjectUploadService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,20 +27,20 @@ public static partial class Log
[LoggerMessage(EventId = 4001, Level = LogLevel.Debug, Message = "Upload statistics: {threads} threads, {seconds} seconds.")]
public static partial void UploadStats(this ILogger logger, int threads, double seconds);

[LoggerMessage(EventId = 4002, Level = LogLevel.Debug, Message = "Uploading file to temporary store at {filePath}.")]
public static partial void UploadingFileToTemporaryStore(this ILogger logger, string filePath);
[LoggerMessage(EventId = 4002, Level = LogLevel.Debug, Message = "Uploading file to storeage at {filePath}.")]
public static partial void UploadingFileToStoreage(this ILogger logger, string filePath);

[LoggerMessage(EventId = 4003, Level = LogLevel.Information, Message = "Instance queued for upload {identifier}. Items in queue {count} using memory {memoryUsageKb}KB.")]
public static partial void InstanceAddedToUploadQueue(this ILogger logger, string identifier, int count, double memoryUsageKb);

[LoggerMessage(EventId = 4004, Level = LogLevel.Debug, Message = "Error removing objects that are pending upload during startup.")]
public static partial void ErrorRemovingPendingUploadObjects(this ILogger logger, Exception ex);

[LoggerMessage(EventId = 4005, Level = LogLevel.Error, Message = "Error uploading temporary store. Waiting {timeSpan} before next retry. Retry attempt {retryCount}.")]
[LoggerMessage(EventId = 4005, Level = LogLevel.Error, Message = "Error uploading storeage. Waiting {timeSpan} before next retry. Retry attempt {retryCount}.")]
public static partial void ErrorUploadingFileToTemporaryStore(this ILogger logger, TimeSpan timespan, int retryCount, Exception ex);

[LoggerMessage(EventId = 4006, Level = LogLevel.Information, Message = "File uploaded to temporary store at {filePath}.")]
public static partial void UploadedFileToTemporaryStore(this ILogger logger, string filePath);
[LoggerMessage(EventId = 4006, Level = LogLevel.Information, Message = "File uploaded to storeage at {filePath}.")]
public static partial void UploadedFileToStoreage(this ILogger logger, string filePath);

[LoggerMessage(EventId = 4007, Level = LogLevel.Debug, Message = "Items in queue {count}.")]
public static partial void InstanceInUploadQueue(this ILogger logger, int count);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,8 +198,11 @@ private async Task NotifyNewInstance(InferenceRequest inferenceRequest, Dictiona
{
retrievedFiles[key].SetWorkflows(inferenceRequest.Application.Id);
}
var FileMeta = retrievedFiles[key];

var payloadId = await _payloadAssembler.Queue(inferenceRequest.TransactionId, retrievedFiles[key]).ConfigureAwait(false);
retrievedFiles[key].PayloadId = payloadId.ToString();
_uploadQueue.Queue(retrievedFiles[key]);
await _payloadAssembler.Queue(inferenceRequest.TransactionId, retrievedFiles[key]).ConfigureAwait(false);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
* limitations under the License.
*/

using System;
using System.Threading;
using System.Threading.Tasks;
using Monai.Deploy.InformaticsGateway.Api.Storage;
Expand All @@ -30,15 +31,15 @@ internal interface IPayloadAssembler
/// </summary>
/// <param name="bucket">The bucket group the file belongs to.</param>
/// <param name="file">Path to the file to be added to the payload bucket.</param>
Task Queue(string bucket, FileStorageMetadata file);
Task<Guid> Queue(string bucket, FileStorageMetadata file);

/// <summary>
/// Queue a new file for the spcified payload bucket.
/// </summary>
/// <param name="bucket">The bucket group the file belongs to.</param>
/// <param name="file">Path to the file to be added to the payload bucket.</param>
/// <param name="timeout">Number of seconds to wait for additional files.</param>
Task Queue(string bucket, FileStorageMetadata file, uint timeout);
Task<Guid> Queue(string bucket, FileStorageMetadata file, uint timeout);

/// <summary>
/// Dequeue a payload from the queue for the message broker to notify subscribers.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,15 +87,15 @@ private async Task RemovePendingPayloads()
/// </summary>
/// <param name="bucket">Name of the bucket where the file would be added to</param>
/// <param name="file">Instance to be queued</param>
public async Task Queue(string bucket, FileStorageMetadata file) => await Queue(bucket, file, DEFAULT_TIMEOUT).ConfigureAwait(false);
public async Task<Guid> Queue(string bucket, FileStorageMetadata file) => await Queue(bucket, file, DEFAULT_TIMEOUT).ConfigureAwait(false);

/// <summary>
/// Queues a new instance of <see cref="FileStorageMetadata"/>.
/// </summary>
/// <param name="bucket">Name of the bucket where the file would be added to</param>
/// <param name="file">Instance to be queued</param>
/// <param name="timeout">Number of seconds the bucket shall wait before sending the payload to be processed. Note: timeout cannot be modified once the bucket is created.</param>
public async Task Queue(string bucket, FileStorageMetadata file, uint timeout)
public async Task<Guid> Queue(string bucket, FileStorageMetadata file, uint timeout)
{
Guard.Against.Null(file);

Expand All @@ -106,6 +106,7 @@ public async Task Queue(string bucket, FileStorageMetadata file, uint timeout)
var payload = await CreateOrGetPayload(bucket, file.CorrelationId, timeout).ConfigureAwait(false);
payload.Add(file);
_logger.FileAddedToBucket(payload.Key, payload.Count);
return payload.PayloadId;
}

/// <summary>
Expand Down
144 changes: 0 additions & 144 deletions src/InformaticsGateway/Services/Connectors/PayloadMoveActionHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,6 @@ public async Task MoveFilesAsync(Payload payload, ActionBlock<Payload> moveQueue
var stopwatch = Stopwatch.StartNew();
try
{
await Move(payload, cancellationToken).ConfigureAwait(false);
await NotifyIfCompleted(payload, notificationQueue, cancellationToken).ConfigureAwait(false);
}
catch (Exception ex)
Expand Down Expand Up @@ -127,149 +126,6 @@ private async Task NotifyIfCompleted(Payload payload, ActionBlock<Payload> notif
}
}

private async Task Move(Payload payload, CancellationToken cancellationToken)
{
Guard.Against.Null(payload);

_logger.MovingFIlesInPayload(payload.PayloadId, _options.Value.Storage.StorageServiceBucketName);

var options = new ParallelOptions
{
CancellationToken = cancellationToken,
MaxDegreeOfParallelism = _options.Value.Storage.ConcurrentUploads
};

var exceptions = new List<Exception>();
await Parallel.ForEachAsync(payload.Files, options, async (file, cancellationToke) =>
{
try
{
switch (file)
{
case DicomFileStorageMetadata dicom:
if (!string.IsNullOrWhiteSpace(dicom.JsonFile.TemporaryPath))
{
await MoveFile(payload.PayloadId, dicom.Id, dicom.JsonFile, cancellationToken).ConfigureAwait(false);
}
break;
}

await MoveFile(payload.PayloadId, file.Id, file.File, cancellationToken).ConfigureAwait(false);
}
catch (Exception ex)
{
exceptions.Add(ex);
}
}).ConfigureAwait(false);

if (exceptions.Any())
{
throw new AggregateException(exceptions);
}
}

private async Task MoveFile(Guid payloadId, string identity, StorageObjectMetadata file, CancellationToken cancellationToken)
{
Guard.Against.NullOrWhiteSpace(identity);
Guard.Against.Null(file);

if (file.IsMoveCompleted)
{
_logger.AlreadyMoved(payloadId, file.UploadPath);
return;
}

_logger.MovingFileToPayloadDirectory(payloadId, file.UploadPath);

try
{
await _storageService.CopyObjectAsync(
file.TemporaryBucketName,
file.GetTempStoragPath(_options.Value.Storage.RemoteTemporaryStoragePath),
_options.Value.Storage.StorageServiceBucketName,
file.GetPayloadPath(payloadId),
cancellationToken).ConfigureAwait(false);

await VerifyFileExists(payloadId, file, cancellationToken).ConfigureAwait(false);
}
catch (StorageObjectNotFoundException ex) when (ex.Message.Contains("Not found", StringComparison.OrdinalIgnoreCase)) // TODO: StorageLib shall not throw any errors from MINIO
{
// when file cannot be found on the Storage Service, we assume file has been moved previously by verifying the file exists on destination.
_logger.FileMissingInPayload(payloadId, file.GetTempStoragPath(_options.Value.Storage.RemoteTemporaryStoragePath), ex);
await VerifyFileExists(payloadId, file, cancellationToken).ConfigureAwait(false);
}
catch (StorageConnectionException ex)
{
_logger.StorageServiceConnectionError(ex);
throw new PayloadNotifyException(PayloadNotifyException.FailureReason.ServiceUnavailable);
}
catch (Exception ex)
{
_logger.PayloadMoveException(ex);
await LogFilesInMinIo(file.TemporaryBucketName, cancellationToken).ConfigureAwait(false);
throw new FileMoveException(file.GetTempStoragPath(_options.Value.Storage.RemoteTemporaryStoragePath), file.UploadPath, ex);
}

try
{
_logger.DeletingFileFromTemporaryBbucket(file.TemporaryBucketName, identity, file.TemporaryPath);
await _storageService.RemoveObjectAsync(file.TemporaryBucketName, file.GetTempStoragPath(_options.Value.Storage.RemoteTemporaryStoragePath), cancellationToken).ConfigureAwait(false);
}
catch (Exception)
{
_logger.ErrorDeletingFileAfterMoveComplete(file.TemporaryBucketName, identity, file.TemporaryPath);
}
finally
{
file.SetMoved(_options.Value.Storage.StorageServiceBucketName);
}
}

private async Task VerifyFileExists(Guid payloadId, StorageObjectMetadata file, CancellationToken cancellationToken)
{
await Policy
.Handle<VerifyObjectsException>()
.WaitAndRetryAsync(
_options.Value.Storage.Retries.RetryDelays,
(exception, timeSpan, retryCount, context) =>
{
_logger.ErrorUploadingFileToTemporaryStore(timeSpan, retryCount, exception);
})
.ExecuteAsync(async () =>
{
var internalCancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
internalCancellationTokenSource.CancelAfter(_options.Value.Storage.StorageServiceListTimeout);
var exists = await _storageService.VerifyObjectExistsAsync(_options.Value.Storage.StorageServiceBucketName, file.GetPayloadPath(payloadId), cancellationToken).ConfigureAwait(false);
if (!exists)
{
_logger.FileMovedVerificationFailure(payloadId, file.UploadPath);
throw new PayloadNotifyException(PayloadNotifyException.FailureReason.MoveFailure, false);
}
})
.ConfigureAwait(false);
}

private async Task LogFilesInMinIo(string bucketName, CancellationToken cancellationToken)
{
try
{
var internalCancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
internalCancellationTokenSource.CancelAfter(_options.Value.Storage.StorageServiceListTimeout);
var listingResults = await _storageService.ListObjectsAsync(bucketName, recursive: true, cancellationToken: internalCancellationTokenSource.Token).ConfigureAwait(false);
_logger.FilesFounddOnStorageService(bucketName, listingResults.Count);
var files = new List<string>();
foreach (var item in listingResults)
{
files.Add(item.FilePath);
}
_logger.FileFounddOnStorageService(bucketName, string.Join(Environment.NewLine, files));
}
catch (Exception ex)
{
_logger.ErrorListingFilesOnStorageService(ex);
}
}

private async Task<PayloadAction> UpdatePayloadState(Payload payload, Exception ex, CancellationToken cancellationToken = default)
{
Guard.Against.Null(payload);
Expand Down
6 changes: 4 additions & 2 deletions src/InformaticsGateway/Services/DicomWeb/IStreamsWriter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -168,12 +168,14 @@ private async Task SaveInstance(Stream stream, string studyInstanceUid, string w
{
dicomInfo.SetWorkflows(workflowName);
}
// for DICOMweb, use correlation ID as the grouping key
var payloadId = await _payloadAssembler.Queue(correlationId, dicomInfo, _configuration.Value.DicomWeb.Timeout).ConfigureAwait(false);
dicomInfo.PayloadId = payloadId.ToString();

await dicomInfo.SetDataStreams(dicomFile, dicomFile.ToJson(_configuration.Value.Dicom.WriteDicomJson, _configuration.Value.Dicom.ValidateDicomOnSerialization), _configuration.Value.Storage.TemporaryDataStorage, _fileSystem, _configuration.Value.Storage.LocalTemporaryStoragePath).ConfigureAwait(false);
_uploadQueue.Queue(dicomInfo);

// for DICOMweb, use correlation ID as the grouping key
await _payloadAssembler.Queue(correlationId, dicomInfo, _configuration.Value.DicomWeb.Timeout).ConfigureAwait(false);

_logger.QueuedStowInstance();

AddSuccess(null, uids);
Expand Down
3 changes: 2 additions & 1 deletion src/InformaticsGateway/Services/Fhir/FhirService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,9 @@ public async Task<FhirStoreResult> StoreAsync(HttpRequest request, string correl
throw new FhirStoreException(correlationId, $"Provided resource is of type '{content.InternalResourceType}' but request targeted type '{resourceType}'.", IssueType.Invalid);
}

var payloadId = await _payloadAssembler.Queue(correlationId, content.Metadata, Resources.PayloadAssemblerTimeout).ConfigureAwait(false);
content.Metadata.PayloadId = payloadId.ToString();
_uploadQueue.Queue(content.Metadata);
await _payloadAssembler.Queue(correlationId, content.Metadata, Resources.PayloadAssemblerTimeout).ConfigureAwait(false);
_logger.QueuedStowInstance();

content.StatusCode = StatusCodes.Status201Created;
Expand Down
3 changes: 2 additions & 1 deletion src/InformaticsGateway/Services/HealthLevel7/MllpService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -170,8 +170,9 @@ private async Task OnDisconnect(IMllpClient client, MllpClientResult result)
{
var hl7Fileetadata = new Hl7FileStorageMetadata(client.ClientId.ToString());
await hl7Fileetadata.SetDataStream(message.HL7Message, _configuration.Value.Storage.TemporaryDataStorage, _fileSystem, _configuration.Value.Storage.LocalTemporaryStoragePath).ConfigureAwait(false);
var payloadId = await _payloadAssembler.Queue(client.ClientId.ToString(), hl7Fileetadata).ConfigureAwait(false);
hl7Fileetadata.PayloadId = payloadId.ToString();
_uploadQueue.Queue(hl7Fileetadata);
await _payloadAssembler.Queue(client.ClientId.ToString(), hl7Fileetadata).ConfigureAwait(false);
}
}
catch (Exception ex)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,12 +113,14 @@ public async Task HandleInstanceAsync(DicomCStoreRequest request, string calledA
}

await dicomInfo.SetDataStreams(request.File, request.File.ToJson(_dicomJsonOptions, _validateDicomValueOnJsonSerialization), _options.Value.Storage.TemporaryDataStorage, _fileSystem, _options.Value.Storage.LocalTemporaryStoragePath).ConfigureAwait(false);
_uploadQueue.Queue(dicomInfo);

var dicomTag = FellowOakDicom.DicomTag.Parse(_configuration.Grouping);
_logger.QueueInstanceUsingDicomTag(dicomTag);
var key = request.Dataset.GetSingleValue<string>(dicomTag);
await _payloadAssembler.Queue(key, dicomInfo, _configuration.Timeout).ConfigureAwait(false);

var payloadid = await _payloadAssembler.Queue(key, dicomInfo, _configuration.Timeout).ConfigureAwait(false);
dicomInfo.PayloadId = payloadid.ToString();
_uploadQueue.Queue(dicomInfo);
}

private bool AcceptsSopClass(string sopClassUid)
Expand Down
Loading

0 comments on commit b44839a

Please sign in to comment.