Skip to content

Commit

Permalink
[Storage][DataMovement] Fixed ContinueOnFailure not being respected (
Browse files Browse the repository at this point in the history
  • Loading branch information
jalauzon-msft authored May 9, 2023
1 parent dc1faaf commit 0744465
Show file tree
Hide file tree
Showing 3 changed files with 110 additions and 44 deletions.
1 change: 1 addition & 0 deletions sdk/storage/Azure.Storage.DataMovement/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

### Bugs Fixed
- Fix to prevent empty strings or null to be passed as paths for `LocalFileStorageResource` and `LocalDirectoryStorageResourceContainer`.
- Fixed `ErrorHandlingOptions.ContinueOnFailure` not be respected.

### Other Changes

Expand Down
11 changes: 8 additions & 3 deletions sdk/storage/Azure.Storage.DataMovement/src/JobPartInternal.cs
Original file line number Diff line number Diff line change
Expand Up @@ -254,10 +254,16 @@ await QueueChunk(
/// <returns>The task to wait until the cancellation has been triggered.</returns>
internal async Task TriggerCancellationAsync()
{
if (!_cancellationToken.IsCancellationRequested)
// If stop on failure specified, cancel entire job.
if (_errorHandling == ErrorHandlingOptions.StopOnAllFailures)
{
_dataTransfer._state.TriggerCancellation();
if (!_cancellationToken.IsCancellationRequested)
{
_dataTransfer._state.TriggerCancellation();
}
_dataTransfer._state.ResetTransferredBytes();
}

// Set the status to Pause/CancellationInProgress
if (StorageTransferStatus.PauseInProgress == _dataTransfer.TransferStatus)
{
Expand All @@ -270,7 +276,6 @@ internal async Task TriggerCancellationAsync()
// It's a cancellation if a pause wasn't called.
await OnTransferStatusChanged(StorageTransferStatus.CancellationInProgress).ConfigureAwait(false);
}
_dataTransfer._state.ResetTransferredBytes();
}

/// <summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -362,47 +362,6 @@ await UploadBlobDirectoryAndVerify(
destinationPrefix: dirName,
waitTimeInSec: 10);
}

[Test]
[LiveOnly] // https://github.com/Azure/azure-sdk-for-net/issues/33082
public async Task DirectoryUpload_Root()
{
// Arrange
using DisposingLocalDirectory source = DisposingLocalDirectory.GetTestDirectory();
await using DisposingBlobContainer destination = await GetTestContainerAsync();

string file1 = await CreateRandomFileAsync(source.DirectoryPath, size:10);

string dir1 = CreateRandomDirectory(source.DirectoryPath);
string file2 = await CreateRandomFileAsync(dir1, size: 10);
string file3 = await CreateRandomFileAsync(dir1, size: 10);
string file4 = await CreateRandomFileAsync(dir1, size: 10);

string dir2 = CreateRandomDirectory(source.DirectoryPath);
string file5 = await CreateRandomFileAsync(dir2, size: 10);

string[] files = {file1, file2, file3, file4, file5};

TransferManager transferManager = new TransferManager();

StorageResourceContainer sourceResource =
new LocalDirectoryStorageResourceContainer(source.DirectoryPath);
StorageResourceContainer destinationResource =
new BlobStorageResourceContainer(destination.Container);

DataTransfer transfer = await transferManager.StartTransferAsync(sourceResource, destinationResource);

CancellationTokenSource tokenSource = new CancellationTokenSource(TimeSpan.FromSeconds(10));
await transfer.AwaitCompletion(tokenSource.Token);

IEnumerable<string> destinationFiles =
(await destination.Container.GetBlobsAsync().ToEnumerableAsync()).Select(b => b.Name);

Assert.IsTrue(files
.Select(f => f.Substring(source.DirectoryPath.Length + 1).Replace("\\", "/"))
.OrderBy(f => f)
.SequenceEqual(destinationFiles.OrderBy(f => f)));
}
#endregion

#region DirectoryUploadTests
Expand Down Expand Up @@ -537,6 +496,107 @@ public async Task DirectoryUpload_BlobType(BlobType blobType)
}
#endregion DirectoryUploadTests

#region DirectoryUploadTests_Root

private async Task<string[]> PopulateLocalTestDirectory(string path)
{
string file1 = await CreateRandomFileAsync(path, size: 10);

string dir1 = CreateRandomDirectory(path);
string file2 = await CreateRandomFileAsync(dir1, size: 10);
string file3 = await CreateRandomFileAsync(dir1, size: 10);
string file4 = await CreateRandomFileAsync(dir1, size: 10);

string dir2 = CreateRandomDirectory(path);
string file5 = await CreateRandomFileAsync(dir2, size: 10);

string[] files = { file1, file2, file3, file4, file5 };
return files;
}

[Test]
[LiveOnly] // https://github.com/Azure/azure-sdk-for-net/issues/33082
public async Task DirectoryUpload_Root()
{
// Arrange
using DisposingLocalDirectory source = DisposingLocalDirectory.GetTestDirectory();
await using DisposingBlobContainer destination = await GetTestContainerAsync();

string[] files = await PopulateLocalTestDirectory(source.DirectoryPath);

TransferManager transferManager = new TransferManager();

StorageResourceContainer sourceResource =
new LocalDirectoryStorageResourceContainer(source.DirectoryPath);
StorageResourceContainer destinationResource =
new BlobStorageResourceContainer(destination.Container);

DataTransfer transfer = await transferManager.StartTransferAsync(sourceResource, destinationResource);

CancellationTokenSource tokenSource = new CancellationTokenSource(TimeSpan.FromSeconds(10));
await transfer.AwaitCompletion(tokenSource.Token);

IEnumerable<string> destinationFiles =
(await destination.Container.GetBlobsAsync().ToEnumerableAsync()).Select(b => b.Name);

Assert.IsTrue(files
.Select(f => f.Substring(source.DirectoryPath.Length + 1).Replace("\\", "/"))
.OrderBy(f => f)
.SequenceEqual(destinationFiles.OrderBy(f => f)));
}

[Test]
[LiveOnly] // https://github.com/Azure/azure-sdk-for-net/issues/33082
public async Task DirectoryUpload_ContinueOnFailure()
{
// Arrange
using DisposingLocalDirectory source = DisposingLocalDirectory.GetTestDirectory();
await using DisposingBlobContainer destination = await GetTestContainerAsync();

string[] files = await PopulateLocalTestDirectory(source.DirectoryPath);

// Create conflict
await destination.Container.UploadBlobAsync(
files[0].Substring(source.DirectoryPath.Length + 1), BinaryData.FromString("Hello world"));

TransferManager transferManager = new TransferManager(new TransferManagerOptions()
{
ErrorHandling = ErrorHandlingOptions.ContinueOnFailure
});

StorageResourceContainer sourceResource =
new LocalDirectoryStorageResourceContainer(source.DirectoryPath);
StorageResourceContainer destinationResource =
new BlobStorageResourceContainer(destination.Container);

// Conflict should cause failure
TransferOptions options = new TransferOptions()
{
CreateMode = StorageResourceCreateMode.Fail
};
TestEventsRaised testEventsRaised = new TestEventsRaised(options);

// Act
DataTransfer transfer = await transferManager.StartTransferAsync(sourceResource, destinationResource, options);

CancellationTokenSource tokenSource = new CancellationTokenSource(TimeSpan.FromSeconds(10));
await transfer.AwaitCompletion(tokenSource.Token);

// Assert
IEnumerable<string> destinationFiles =
(await destination.Container.GetBlobsAsync().ToEnumerableAsync()).Select(b => b.Name);

Assert.AreEqual(1, testEventsRaised.FailedEvents.Count);

// Verify all files exist, meaning files without conflict were transferred
Assert.IsTrue(files
.Select(f => f.Substring(source.DirectoryPath.Length + 1).Replace("\\", "/"))
.OrderBy(f => f)
.SequenceEqual(destinationFiles.OrderBy(f => f)));
}

#endregion

#region Single Concurrency
private async Task CreateTempDirectoryStructure(
string sourceFolderPath,
Expand Down

0 comments on commit 0744465

Please sign in to comment.