Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Checkpointer testing cont. #47125

Merged
merged 6 commits into from
Nov 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
<Compile Include="$(AzureStorageDataMovementTestSharedSources)TestEventsRaised.cs" LinkBase="Shared\DataMovement" />
<Compile Include="$(AzureStorageDataMovementTestSharedSources)TestTransferWithTimeout.cs" LinkBase="Shared\DataMovement" />
<Compile Include="$(AzureStorageDataMovementTestSharedSources)DisposingLocalDirectory.cs" LinkBase="Shared\DataMovement" />
<Compile Include="$(AzureStorageDataMovementTestSharedSources)MemoryTransferCheckpointer.cs" LinkBase="Shared\DataMovement" />
<Compile Include="$(AzureStorageDataMovementTestSharedSources)MockQueueInternalTasks.cs" LinkBase="Shared\DataMovement" />
<Compile Include="$(AzureStorageDataMovementTestSharedSources)MockResourceCheckpointData.cs" LinkBase="Shared\DataMovement" />
<Compile Include="$(AzureStorageDataMovementTestSharedSources)StartTransferUploadTestBase.cs" LinkBase="Shared\DataMovement" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -237,10 +237,10 @@ public partial class TransferItemCompletedEventArgs : Azure.Storage.DataMovement
}
public partial class TransferItemFailedEventArgs : Azure.Storage.DataMovement.DataTransferEventArgs
{
public TransferItemFailedEventArgs(string transferId, Azure.Storage.DataMovement.StorageResourceItem sourceResource, Azure.Storage.DataMovement.StorageResourceItem destinationResource, System.Exception exception, bool isRunningSynchronously, System.Threading.CancellationToken cancellationToken) : base (default(string), default(bool), default(System.Threading.CancellationToken)) { }
public Azure.Storage.DataMovement.StorageResourceItem DestinationResource { get { throw null; } }
public TransferItemFailedEventArgs(string transferId, Azure.Storage.DataMovement.StorageResource sourceResource, Azure.Storage.DataMovement.StorageResource destinationResource, System.Exception exception, bool isRunningSynchronously, System.Threading.CancellationToken cancellationToken) : base (default(string), default(bool), default(System.Threading.CancellationToken)) { }
public Azure.Storage.DataMovement.StorageResource DestinationResource { get { throw null; } }
public System.Exception Exception { get { throw null; } }
public Azure.Storage.DataMovement.StorageResourceItem SourceResource { get { throw null; } }
public Azure.Storage.DataMovement.StorageResource SourceResource { get { throw null; } }
}
public partial class TransferItemSkippedEventArgs : Azure.Storage.DataMovement.DataTransferEventArgs
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -237,10 +237,10 @@ public partial class TransferItemCompletedEventArgs : Azure.Storage.DataMovement
}
public partial class TransferItemFailedEventArgs : Azure.Storage.DataMovement.DataTransferEventArgs
{
public TransferItemFailedEventArgs(string transferId, Azure.Storage.DataMovement.StorageResourceItem sourceResource, Azure.Storage.DataMovement.StorageResourceItem destinationResource, System.Exception exception, bool isRunningSynchronously, System.Threading.CancellationToken cancellationToken) : base (default(string), default(bool), default(System.Threading.CancellationToken)) { }
public Azure.Storage.DataMovement.StorageResourceItem DestinationResource { get { throw null; } }
public TransferItemFailedEventArgs(string transferId, Azure.Storage.DataMovement.StorageResource sourceResource, Azure.Storage.DataMovement.StorageResource destinationResource, System.Exception exception, bool isRunningSynchronously, System.Threading.CancellationToken cancellationToken) : base (default(string), default(bool), default(System.Threading.CancellationToken)) { }
public Azure.Storage.DataMovement.StorageResource DestinationResource { get { throw null; } }
public System.Exception Exception { get { throw null; } }
public Azure.Storage.DataMovement.StorageResourceItem SourceResource { get { throw null; } }
public Azure.Storage.DataMovement.StorageResource SourceResource { get { throw null; } }
}
public partial class TransferItemSkippedEventArgs : Azure.Storage.DataMovement.DataTransferEventArgs
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ public Task AddNewJobAsync(string transferId, StorageResource source, StorageRes
return Task.CompletedTask;
}

public Task AddNewJobPartAsync(string transferId, int partNumber, Stream headerStream, CancellationToken cancellationToken = default)
public Task AddNewJobPartAsync(string transferId, int partNumber, JobPartPlanHeader header, CancellationToken cancellationToken = default)
{
return Task.CompletedTask;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ Task AddNewJobAsync(
Task AddNewJobPartAsync(
string transferId,
int partNumber,
Stream headerStream,
JobPartPlanHeader header,
CancellationToken cancellationToken = default);

Task<bool> IsEnumerationCompleteAsync(
Expand Down
15 changes: 5 additions & 10 deletions sdk/storage/Azure.Storage.DataMovement/src/JobPartInternal.cs
Original file line number Diff line number Diff line change
Expand Up @@ -473,16 +473,11 @@ public async virtual Task CleanupAbortedJobPartAsync()
/// </summary>
public async virtual Task AddJobPartToCheckpointerAsync()
{
JobPartPlanHeader header = this.ToJobPartPlanHeader();
using (Stream stream = new MemoryStream())
{
header.Serialize(stream);
await _checkpointer.AddNewJobPartAsync(
transferId: _dataTransfer.Id,
partNumber: PartNumber,
headerStream: stream,
cancellationToken: _cancellationToken).ConfigureAwait(false);
}
await _checkpointer.AddNewJobPartAsync(
transferId: _dataTransfer.Id,
partNumber: PartNumber,
header: this.ToJobPartPlanHeader(),
cancellationToken: _cancellationToken).ConfigureAwait(false);
}

internal async virtual Task SetCheckpointerStatusAsync()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,21 +38,21 @@ public static async Task<JobPartPlanFile> CreateJobPartPlanFileAsync(
string checkpointerPath,
string id,
int jobPart,
Stream headerStream,
JobPartPlanHeader header,
CancellationToken cancellationToken = default)
{
Argument.AssertNotNullOrEmpty(checkpointerPath, nameof(checkpointerPath));
Argument.AssertNotNullOrEmpty(id, nameof(id));
Argument.AssertNotNull(jobPart, nameof(jobPart));
Argument.AssertNotNull(headerStream, nameof(headerStream));
Argument.AssertNotNull(header, nameof(header));

JobPartPlanFileName fileName = new JobPartPlanFileName(checkpointerPath: checkpointerPath, id: id, jobPartNumber: jobPart);
return await CreateJobPartPlanFileAsync(fileName, headerStream, cancellationToken).ConfigureAwait(false);
return await CreateJobPartPlanFileAsync(fileName, header, cancellationToken).ConfigureAwait(false);
}

public static async Task<JobPartPlanFile> CreateJobPartPlanFileAsync(
JobPartPlanFileName fileName,
Stream headerStream,
JobPartPlanHeader header,
CancellationToken cancellationToken = default)
{
JobPartPlanFile result = new JobPartPlanFile()
Expand All @@ -63,8 +63,11 @@ public static async Task<JobPartPlanFile> CreateJobPartPlanFileAsync(
try
{
using (FileStream fileStream = File.Create(result.FileName.ToString()))
using (MemoryStream ms = new())
{
await headerStream.CopyToAsync(
header.Serialize(ms);
ms.Position = 0;
await ms.CopyToAsync(
fileStream,
DataMovementConstants.DefaultStreamCopyBufferSize,
cancellationToken).ConfigureAwait(false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,14 +104,13 @@ public override async Task AddNewJobAsync(
public override async Task AddNewJobPartAsync(
string transferId,
int partNumber,
Stream headerStream,
JobPartPlanHeader header,
CancellationToken cancellationToken = default)
{
Argument.AssertNotNullOrEmpty(transferId, nameof(transferId));
Argument.AssertNotNull(partNumber, nameof(partNumber));
Argument.AssertNotNull(headerStream, nameof(headerStream));
Argument.AssertNotNull(header, nameof(header));
CancellationHelper.ThrowIfCancellationRequested(cancellationToken);
headerStream.Position = 0;

if (!_transferStates.ContainsKey(transferId))
{
Expand All @@ -124,7 +123,7 @@ public override async Task AddNewJobPartAsync(
_pathToCheckpointer,
transferId,
partNumber,
headerStream,
header,
cancellationToken).ConfigureAwait(false);

// Add the job part into the current state
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,15 +45,15 @@ public abstract Task AddNewJobAsync(
/// </summary>
/// <param name="transferId">The transfer ID.</param>
/// <param name="partNumber">The job part number.</param>
/// <param name="headerStream">A <see cref="Stream"/> to the job part plan header.</param>
/// <param name="header">A <see cref="Stream"/> to the job part plan header.</param>
/// <param name="cancellationToken">
/// Optional <see cref="CancellationToken"/> to propagate
/// notifications that the operation should be canceled.
/// </param>
public abstract Task AddNewJobPartAsync(
string transferId,
int partNumber,
Stream headerStream,
JobPartPlanHeader header,
CancellationToken cancellationToken = default);

/// <summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,14 @@ namespace Azure.Storage.DataMovement
public class TransferItemFailedEventArgs : DataTransferEventArgs
{
/// <summary>
/// Gets the <see cref="StorageResourceItem"/> that was the source resource for the transfer.
/// Gets the <see cref="StorageResource"/> that was the source resource for the transfer.
/// </summary>
public StorageResourceItem SourceResource { get; }
public StorageResource SourceResource { get; }

/// <summary>
/// Gets the <see cref="StorageResourceItem"/> that was the destination resource for the transfer.
/// Gets the <see cref="StorageResource"/> that was the destination resource for the transfer.
/// </summary>
public StorageResourceItem DestinationResource { get; }
public StorageResource DestinationResource { get; }

/// <summary>
/// Gets the <see cref="Exception"/> that was thrown during the job.
Expand Down Expand Up @@ -53,8 +53,8 @@ public class TransferItemFailedEventArgs : DataTransferEventArgs
/// </exception>
public TransferItemFailedEventArgs(
string transferId,
StorageResourceItem sourceResource,
StorageResourceItem destinationResource,
StorageResource sourceResource,
StorageResource destinationResource,
Exception exception,
bool isRunningSynchronously,
CancellationToken cancellationToken)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -493,8 +493,8 @@ ex is not TaskCanceledException &&
await TransferFailedEventHandler.RaiseAsync(
new TransferItemFailedEventArgs(
_dataTransfer.Id,
_sourceResource,
_destinationResource,
(StorageResource)_sourceResource ?? _sourceResourceContainer,
(StorageResource)_destinationResource ?? _destinationResourceContainer,
ex,
false,
_cancellationToken),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,20 @@ public async Task CreateJobPartPlanFileAsync_Base()
checkpointerPath: test.DirectoryPath,
id: transferId,
jobPart: jobPart,
headerStream: stream);
header: new(
DataMovementConstants.JobPartPlanFile.SchemaVersion_b3,
transferId,
jobPart,
System.DateTimeOffset.Now,
"mock",
"mock",
"mock",
"mock",
default,
default,
default,
default,
new()));
}

JobPartPlanFileName fileName = new JobPartPlanFileName(
Expand Down Expand Up @@ -63,7 +76,20 @@ public async Task CreateJobPartPlanFileAsync_FileName()
{
file = await JobPartPlanFile.CreateJobPartPlanFileAsync(
fileName: fileName,
headerStream: stream);
header: new(
DataMovementConstants.JobPartPlanFile.SchemaVersion_b3,
transferId,
jobPart,
System.DateTimeOffset.Now,
"mock",
"mock",
"mock",
"mock",
default,
default,
default,
default,
new()));
}

Assert.NotNull(file);
Expand Down
Loading