From 083506cea21b3f9779c5a24e1606b970cb6352b7 Mon Sep 17 00:00:00 2001 From: Gauri Prasad Date: Wed, 3 Apr 2024 11:53:14 -0700 Subject: [PATCH 1/5] checkpoint --- cmd/credentialUtil.go | 6 +++--- ste/mgr-JobPartMgr.go | 13 +++++++------ ste/sourceInfoProvider-Blob.go | 20 ++++++++++++-------- ste/sourceInfoProvider-File.go | 7 +++++-- ste/sourceInfoProvider-GCP.go | 14 ++++++++++---- ste/sourceInfoProvider-S3.go | 4 +++- ste/xferStatsPolicy.go | 32 +++++++++++++++++++++----------- 7 files changed, 61 insertions(+), 35 deletions(-) diff --git a/cmd/credentialUtil.go b/cmd/credentialUtil.go index 13694f60c..353dcd7c0 100644 --- a/cmd/credentialUtil.go +++ b/cmd/credentialUtil.go @@ -376,7 +376,7 @@ func isPublic(ctx context.Context, blobResourceURL string, cpkOptions common.Cpk MaxRetryDelay: ste.UploadMaxRetryDelay, }, policy.TelemetryOptions{ ApplicationID: glcm.AddUserAgentPrefix(common.UserAgent), - }, nil, nil, ste.LogOptions{}, nil) + }, nil, ste.LogOptions{}, nil) blobClient, _ := blob.NewClientWithNoCredential(bURLParts.String(), &blob.ClientOptions{ClientOptions: clientOptions}) bURLParts.BlobName = "" @@ -409,7 +409,7 @@ func mdAccountNeedsOAuth(ctx context.Context, blobResourceURL string, cpkOptions MaxRetryDelay: ste.UploadMaxRetryDelay, }, policy.TelemetryOptions{ ApplicationID: glcm.AddUserAgentPrefix(common.UserAgent), - }, nil, nil, ste.LogOptions{}, nil) + }, nil, ste.LogOptions{}, nil) blobClient, _ := blob.NewClientWithNoCredential(blobResourceURL, &blob.ClientOptions{ClientOptions: clientOptions}) _, err := blobClient.GetProperties(ctx, &blob.GetPropertiesOptions{CPKInfo: cpkOptions.GetCPKInfo()}) @@ -603,7 +603,7 @@ func createClientOptions(logger common.ILoggerResetable, srcCred *common.ScopedC MaxRetryDelay: ste.UploadMaxRetryDelay, }, policy.TelemetryOptions{ ApplicationID: glcm.AddUserAgentPrefix(common.UserAgent), - }, ste.NewAzcopyHTTPClient(frontEndMaxIdleConnectionsPerHost), nil, logOptions, srcCred) + }, ste.NewAzcopyHTTPClient(frontEndMaxIdleConnectionsPerHost), logOptions, srcCred) } const frontEndMaxIdleConnectionsPerHost = http.DefaultMaxIdleConnsPerHost diff --git a/ste/mgr-JobPartMgr.go b/ste/mgr-JobPartMgr.go index 1aacff6d4..62bf9f686 100644 --- a/ste/mgr-JobPartMgr.go +++ b/ste/mgr-JobPartMgr.go @@ -129,12 +129,12 @@ func (d *dialRateLimiter) DialContext(ctx context.Context, network, address stri return d.dialer.DialContext(ctx, network, address) } -func NewClientOptions(retry policy.RetryOptions, telemetry policy.TelemetryOptions, transport policy.Transporter, statsAcc *PipelineNetworkStats, log LogOptions, srcCred *common.ScopedCredential) azcore.ClientOptions { +func NewClientOptions(retry policy.RetryOptions, telemetry policy.TelemetryOptions, transport policy.Transporter, log LogOptions, srcCred *common.ScopedCredential) azcore.ClientOptions { // Pipeline will look like // [includeResponsePolicy, newAPIVersionPolicy (ignored), NewTelemetryPolicy, perCall, NewRetryPolicy, perRetry, NewLogPolicy, httpHeaderPolicy, bodyDownloadPolicy] perCallPolicies := []policy.Policy{azruntime.NewRequestIDPolicy(), NewVersionPolicy(), newFileUploadRangeFromURLFixPolicy()} // TODO : Default logging policy is not equivalent to old one. tracing HTTP request - perRetryPolicies := []policy.Policy{newRetryNotificationPolicy(), newLogPolicy(log), newStatsPolicy(statsAcc)} + perRetryPolicies := []policy.Policy{newRetryNotificationPolicy(), newLogPolicy(log), newStatsPolicy()} if srcCred != nil { perRetryPolicies = append(perRetryPolicies, NewSourceAuthPolicy(srcCred)) } @@ -184,10 +184,9 @@ type jobPartMgr struct { srcServiceClient *common.ServiceClient dstServiceClient *common.ServiceClient - - credInfo common.CredentialInfo - srcIsOAuth bool // true if source is authenticated via oauth - credOption *common.CredentialOpOptions + credInfo common.CredentialInfo + srcIsOAuth bool // true if source is authenticated via oauth + credOption *common.CredentialOpOptions // When the part is schedule to run (inprogress), the below fields are used planMMF *JobPartPlanMMF // This Job part plan's MMF @@ -364,6 +363,8 @@ func (jpm *jobPartMgr) ScheduleTransfers(jobCtx context.Context) { // Each transfer gets its own context (so any chunk can cancel the whole transfer) based off the job's context transferCtx, transferCancel := context.WithCancel(jobCtx) + // Add the pipeline network stats to the context. This will be manually unset for all sourceInfoProvider contexts. + transferCtx = withPipelineNetworkStats(transferCtx, jpm.jobMgr.PipelineNetworkStats()) // Initialize a job part transfer manager jptm := &jobPartTransferMgr{ jobPartMgr: jpm, diff --git a/ste/sourceInfoProvider-Blob.go b/ste/sourceInfoProvider-Blob.go index a0be6355f..73aaf66cd 100644 --- a/ste/sourceInfoProvider-Blob.go +++ b/ste/sourceInfoProvider-Blob.go @@ -21,6 +21,7 @@ package ste import ( + "context" "crypto/md5" "github.com/Azure/azure-sdk-for-go/sdk/azcore/to" "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/blob" @@ -33,6 +34,7 @@ import ( type blobSourceInfoProvider struct { defaultRemoteSourceInfoProvider source *blob.Client + ctx context.Context } func (p *blobSourceInfoProvider) IsDFSSource() bool { @@ -48,9 +50,7 @@ func (p *blobSourceInfoProvider) RawSource() string { } func (p *blobSourceInfoProvider) ReadLink() (string, error) { - ctx := p.jptm.Context() - - resp, err := p.source.DownloadStream(ctx, &blob.DownloadStreamOptions{ + resp, err := p.source.DownloadStream(p.ctx, &blob.DownloadStreamOptions{ CPKInfo: p.jptm.CpkInfo(), CPKScopeInfo: p.jptm.CpkScopeInfo(), }) @@ -58,7 +58,7 @@ func (p *blobSourceInfoProvider) ReadLink() (string, error) { return "", err } - symlinkBuf, err := io.ReadAll(resp.NewRetryReader(ctx, &blob.RetryReaderOptions{ + symlinkBuf, err := io.ReadAll(resp.NewRetryReader(p.ctx, &blob.RetryReaderOptions{ MaxRetries: 5, OnFailedRead: common.NewBlobReadLogFunc(p.jptm, p.jptm.Info().Source), })) @@ -125,6 +125,10 @@ func newBlobSourceInfoProvider(jptm IJobPartTransferMgr) (ISourceInfoProvider, e ret.source = blobClient + ctx := jptm.Context() + ctx = withPipelineNetworkStats(ctx, nil) + ret.ctx = ctx + return ret, nil } @@ -136,7 +140,7 @@ func (p *blobSourceInfoProvider) AccessControl() (*string, error) { sourceDatalakeClient := dsc.NewFileSystemClient(p.jptm.Info().SrcContainer).NewFileClient(p.jptm.Info().SrcFilePath) - resp, err := sourceDatalakeClient.GetAccessControl(p.jptm.Context(), nil) + resp, err := sourceDatalakeClient.GetAccessControl(p.ctx, nil) if err != nil { return nil, err } @@ -156,7 +160,7 @@ func (p *blobSourceInfoProvider) BlobType() blob.BlobType { func (p *blobSourceInfoProvider) GetFreshFileLastModifiedTime() (time.Time, error) { // We can't set a custom LMT on HNS, so it doesn't make sense to swap here. - properties, err := p.source.GetProperties(p.jptm.Context(), &blob.GetPropertiesOptions{CPKInfo: p.jptm.CpkInfo()}) + properties, err := p.source.GetProperties(p.ctx, &blob.GetPropertiesOptions{CPKInfo: p.jptm.CpkInfo()}) if err != nil { return time.Time{}, err } @@ -168,7 +172,7 @@ func (p *blobSourceInfoProvider) GetMD5(offset, count int64) ([]byte, error) { if count <= common.MaxRangeGetSize { rangeGetContentMD5 = to.Ptr(true) } - response, err := p.source.DownloadStream(p.jptm.Context(), + response, err := p.source.DownloadStream(p.ctx, &blob.DownloadStreamOptions{ Range: blob.HTTPRange{Offset: offset, Count: count}, RangeGetContentMD5: rangeGetContentMD5, @@ -182,7 +186,7 @@ func (p *blobSourceInfoProvider) GetMD5(offset, count int64) ([]byte, error) { return response.ContentMD5, nil } else { // compute md5 - body := response.NewRetryReader(p.jptm.Context(), &blob.RetryReaderOptions{MaxRetries: MaxRetryPerDownloadBody}) + body := response.NewRetryReader(p.ctx, &blob.RetryReaderOptions{MaxRetries: MaxRetryPerDownloadBody}) defer body.Close() h := md5.New() if _, err = io.Copy(h, body); err != nil { diff --git a/ste/sourceInfoProvider-File.go b/ste/sourceInfoProvider-File.go index 973617a54..a87b1d1fd 100644 --- a/ste/sourceInfoProvider-File.go +++ b/ste/sourceInfoProvider-File.go @@ -191,9 +191,12 @@ func newFileSourceInfoProvider(jptm IJobPartTransferMgr) (ISourceInfoProvider, e // due to the REST parity feature added in 2019-02-02, the File APIs are no longer backward compatible // so we must use the latest SDK version to stay safe //TODO: Should we do that? + ctx := jptm.Context() + ctx = withPipelineNetworkStats(ctx, nil) + return &fileSourceInfoProvider{ defaultRemoteSourceInfoProvider: *base, - ctx: jptm.Context(), + ctx: ctx, cacheOnce: &sync.Once{}, srcShareClient: s.NewShareClient(jptm.Info().SrcContainer), sourceURL: source.URL()}, nil @@ -341,7 +344,7 @@ func (p *fileSourceInfoProvider) GetMD5(offset, count int64) ([]byte, error) { return response.ContentMD5, nil } else { // compute md5 - body := response.NewRetryReader(p.jptm.Context(), &file.RetryReaderOptions{MaxRetries: MaxRetryPerDownloadBody}) + body := response.NewRetryReader(p.ctx, &file.RetryReaderOptions{MaxRetries: MaxRetryPerDownloadBody}) defer body.Close() h := md5.New() if _, err = io.Copy(h, body); err != nil { diff --git a/ste/sourceInfoProvider-GCP.go b/ste/sourceInfoProvider-GCP.go index 611fb8ea7..9f49bed12 100644 --- a/ste/sourceInfoProvider-GCP.go +++ b/ste/sourceInfoProvider-GCP.go @@ -2,6 +2,7 @@ package ste import ( gcpUtils "cloud.google.com/go/storage" + "context" "crypto/md5" "fmt" "github.com/Azure/azure-storage-azcopy/v10/common" @@ -21,6 +22,7 @@ type gcpSourceInfoProvider struct { gcpClient *gcpUtils.Client gcpURLParts common.GCPURLParts + ctx context.Context } var gcpClientFactory = common.NewGCPClientFactory() @@ -39,8 +41,12 @@ func newGCPSourceInfoProvider(jptm IJobPartTransferMgr) (ISourceInfoProvider, er return nil, err } + ctx := jptm.Context() + ctx = withPipelineNetworkStats(ctx, nil) + p.ctx = ctx + p.gcpClient, err = gcpClientFactory.GetGCPClient( - p.jptm.Context(), + p.ctx, common.CredentialInfo{ CredentialType: common.ECredentialType.GoogleAppCredentials(), GCPCredentialInfo: common.GCPCredentialInfo{}, @@ -88,7 +94,7 @@ func (p *gcpSourceInfoProvider) Properties() (*SrcProperties, error) { SrcMetadata: p.transferInfo.SrcMetadata, } if p.transferInfo.S2SGetPropertiesInBackend { - objectInfo, err := p.gcpClient.Bucket(p.gcpURLParts.BucketName).Object(p.gcpURLParts.ObjectKey).Attrs(p.jptm.Context()) + objectInfo, err := p.gcpClient.Bucket(p.gcpURLParts.BucketName).Object(p.gcpURLParts.ObjectKey).Attrs(p.ctx) if err != nil { return nil, err } @@ -151,7 +157,7 @@ func (p *gcpSourceInfoProvider) IsLocal() bool { } func (p *gcpSourceInfoProvider) GetFreshFileLastModifiedTime() (time.Time, error) { - objectInfo, err := p.gcpClient.Bucket(p.gcpURLParts.BucketName).Object(p.gcpURLParts.ObjectKey).Attrs(p.jptm.Context()) + objectInfo, err := p.gcpClient.Bucket(p.gcpURLParts.BucketName).Object(p.gcpURLParts.ObjectKey).Attrs(p.ctx) if err != nil { return time.Time{}, err } @@ -164,7 +170,7 @@ func (p *gcpSourceInfoProvider) EntityType() common.EntityType { func (p *gcpSourceInfoProvider) GetMD5(offset, count int64) ([]byte, error) { // gcp does not support getting range md5 - body, err := p.gcpClient.Bucket(p.gcpURLParts.BucketName).Object(p.gcpURLParts.ObjectKey).NewRangeReader(p.jptm.Context(), offset, count) + body, err := p.gcpClient.Bucket(p.gcpURLParts.BucketName).Object(p.gcpURLParts.ObjectKey).NewRangeReader(p.ctx, offset, count) if err != nil { return nil, err } diff --git a/ste/sourceInfoProvider-S3.go b/ste/sourceInfoProvider-S3.go index cdd8a2673..29a3da316 100644 --- a/ste/sourceInfoProvider-S3.go +++ b/ste/sourceInfoProvider-S3.go @@ -69,7 +69,9 @@ func newS3SourceInfoProvider(jptm IJobPartTransferMgr) (ISourceInfoProvider, err } else { p.credType = common.ECredentialType.S3AccessKey() } - p.s3Client, err = s3ClientFactory.GetS3Client(p.jptm.Context(), common.CredentialInfo{ + ctx := jptm.Context() + ctx = withPipelineNetworkStats(ctx, nil) + p.s3Client, err = s3ClientFactory.GetS3Client(ctx, common.CredentialInfo{ CredentialType: p.credType, S3CredentialInfo: common.S3CredentialInfo{ Endpoint: p.s3URLPart.Endpoint, diff --git a/ste/xferStatsPolicy.go b/ste/xferStatsPolicy.go index 5057ad115..a13dc73cc 100644 --- a/ste/xferStatsPolicy.go +++ b/ste/xferStatsPolicy.go @@ -22,6 +22,7 @@ package ste import ( "bytes" + "context" "github.com/Azure/azure-sdk-for-go/sdk/azcore/policy" "github.com/Azure/azure-storage-azcopy/v10/common" "io" @@ -173,33 +174,42 @@ func transparentlyReadBody(r *http.Response) string { return string(buf) // copy to string } +var pipelineNetworkStatsContextKey = contextKey{"pipelineNetworkStats"} + +// withPipelineNetworkStats returns a context that contains a pipeline network stats. The retryNotificationPolicy +// will then invoke the pipeline network stats object when necessary +func withPipelineNetworkStats(ctx context.Context, stats *PipelineNetworkStats) context.Context { + return context.WithValue(ctx, pipelineNetworkStatsContextKey, stats) +} + type statsPolicy struct { - stats *PipelineNetworkStats } func (s statsPolicy) Do(req *policy.Request) (*http.Response, error) { start := time.Now() response, err := req.Next() - if s.stats != nil { - if s.stats.IsStarted() { - atomic.AddInt64(&s.stats.atomicOperationCount, 1) - atomic.AddInt64(&s.stats.atomicE2ETotalMilliseconds, int64(time.Since(start).Seconds()*1000)) + // Grab the notification callback out of the context and, if its there, call it + stats, ok := req.Raw().Context().Value(pipelineNetworkStatsContextKey).(*PipelineNetworkStats) + if ok && stats != nil { + if stats.IsStarted() { + atomic.AddInt64(&stats.atomicOperationCount, 1) + atomic.AddInt64(&stats.atomicE2ETotalMilliseconds, int64(time.Since(start).Seconds()*1000)) if err != nil && !isContextCancelledError(err) { // no response from server - atomic.AddInt64(&s.stats.atomicNetworkErrorCount, 1) + atomic.AddInt64(&stats.atomicNetworkErrorCount, 1) } } // always look at retries, even if not started, because concurrency tuner needs to know about them // TODO should we also count status 500? It is mentioned here as timeout:https://docs.microsoft.com/en-us/azure/storage/common/storage-scalability-targets if response != nil && response.StatusCode == http.StatusServiceUnavailable { - s.stats.tunerInterface.recordRetry() // always tell the tuner - if s.stats.IsStarted() { // but only count it here, if we have started + stats.tunerInterface.recordRetry() // always tell the tuner + if stats.IsStarted() { // but only count it here, if we have started // To find out why the server was busy we need to look at the response responseBodyText := transparentlyReadBody(response) - s.stats.recordRetry(responseBodyText) + stats.recordRetry(responseBodyText) } } @@ -208,6 +218,6 @@ func (s statsPolicy) Do(req *policy.Request) (*http.Response, error) { return response, err } -func newStatsPolicy(accumulator *PipelineNetworkStats) policy.Policy { - return statsPolicy{stats: accumulator} +func newStatsPolicy() policy.Policy { + return statsPolicy{} } From 0146c60d3e689f3f51a2720735a7f850e944d092 Mon Sep 17 00:00:00 2001 From: Gauri Prasad Date: Tue, 23 Apr 2024 15:02:30 -0700 Subject: [PATCH 2/5] enabled s2s for basic test and also check for network stats --- e2etest/newe2e_task_resourcemanagement.go | 11 +++++++++++ e2etest/zt_newe2e_basic_functionality_test.go | 11 +++++++---- 2 files changed, 18 insertions(+), 4 deletions(-) diff --git a/e2etest/newe2e_task_resourcemanagement.go b/e2etest/newe2e_task_resourcemanagement.go index ffc79320e..49eb8f70e 100644 --- a/e2etest/newe2e_task_resourcemanagement.go +++ b/e2etest/newe2e_task_resourcemanagement.go @@ -220,3 +220,14 @@ func ValidateErrorOutput(a Asserter, stdout AzCopyStdout, errorMsg string) { fmt.Println(stdout.String()) a.Error("expected error message not found in azcopy output") } + +func ValidateStatsReturned(a Asserter, stdout AzCopyStdout) { + if dryrunner, ok := a.(DryrunAsserter); ok && dryrunner.Dryrun() { + return + } + csrStdout, ok := stdout.(*AzCopyParsedCopySyncRemoveStdout) + a.AssertNow("stdout must be AzCopyParsedCopySyncRemoveStdout", Equal{}, ok, true) + // Check for any of the stats. It's possible for average iops, server busy percentage, network error percentage to be 0, but average e2e milliseconds should never be 0. + statsFound := csrStdout.FinalStatus.AverageIOPS != 0 || csrStdout.FinalStatus.AverageE2EMilliseconds != 0 || csrStdout.FinalStatus.ServerBusyPercentage != 0 || csrStdout.FinalStatus.NetworkErrorPercentage != 0 + a.Assert("stats must be returned", Equal{}, statsFound, true) +} diff --git a/e2etest/zt_newe2e_basic_functionality_test.go b/e2etest/zt_newe2e_basic_functionality_test.go index eb130469f..06a4e9d30 100644 --- a/e2etest/zt_newe2e_basic_functionality_test.go +++ b/e2etest/zt_newe2e_basic_functionality_test.go @@ -11,7 +11,7 @@ func init() { type BasicFunctionalitySuite struct{} -func (s *BasicFunctionalitySuite) Scenario_SingleFileUploadDownload(svm *ScenarioVariationManager) { +func (s *BasicFunctionalitySuite) Scenario_SingleFile(svm *ScenarioVariationManager) { azCopyVerb := ResolveVariation(svm, []AzCopyVerb{AzCopyVerbCopy, AzCopyVerbSync}) // Calculate verb early to create the destination object early // Scale up from service to object dstObj := CreateResource[ContainerResourceManager](svm, GetRootResource(svm, ResolveVariation(svm, []common.Location{common.ELocation.Local(), common.ELocation.Blob()})), ResourceDefinitionContainer{}).GetObject(svm, "test", common.EEntityType.File()) @@ -32,15 +32,15 @@ func (s *BasicFunctionalitySuite) Scenario_SingleFileUploadDownload(svm *Scenari Body: body, }) - // no s2s, no local->local - if srcObj.Location().IsRemote() == dstObj.Location().IsRemote() { + // no local->local + if srcObj.Location().IsLocal() && dstObj.Location().IsLocal() { svm.InvalidateScenario() return } sasOpts := GenericAccountSignatureValues{} - RunAzCopy( + stdout, _ := RunAzCopy( svm, AzCopyCommand{ // Sync is not included at this moment, because sync requires @@ -63,4 +63,7 @@ func (s *BasicFunctionalitySuite) Scenario_SingleFileUploadDownload(svm *Scenari ValidateResource[ObjectResourceManager](svm, dstObj, ResourceDefinitionObject{ Body: body, }, true) + + // Validate that the network stats were updated + ValidateStatsReturned(svm, stdout) } From 5bfbd0a3da61e8c6d8c8eafe7010b1dfdb5d1528 Mon Sep 17 00:00:00 2001 From: Gauri Prasad Date: Tue, 23 Apr 2024 17:00:15 -0700 Subject: [PATCH 3/5] fix build issue with test --- ste/testJobPartTransferManager_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ste/testJobPartTransferManager_test.go b/ste/testJobPartTransferManager_test.go index a47f82bc9..55d8fb32d 100644 --- a/ste/testJobPartTransferManager_test.go +++ b/ste/testJobPartTransferManager_test.go @@ -292,7 +292,7 @@ func (t *testJobPartTransferManager) S2SSourceClientOptions() azcore.ClientOptio httpClient := NewAzcopyHTTPClient(4) - return NewClientOptions(retryOptions, telemetryOptions, httpClient, nil, LogOptions{}, nil) + return NewClientOptions(retryOptions, telemetryOptions, httpClient, LogOptions{}, nil) } func (t *testJobPartTransferManager) CredentialOpOptions() *common.CredentialOpOptions { From d4016554e45d0a0ac033de5af544618993d1970a Mon Sep 17 00:00:00 2001 From: Gauri Prasad Date: Mon, 20 May 2024 14:51:17 -0700 Subject: [PATCH 4/5] only install azaccounts --- azure-pipelines.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/azure-pipelines.yml b/azure-pipelines.yml index c75a382b9..467644dc1 100644 --- a/azure-pipelines.yml +++ b/azure-pipelines.yml @@ -165,7 +165,7 @@ jobs: - task: PowerShell@2 inputs: targetType: 'inline' - script: 'Install-Module -Name Az -Scope CurrentUser -Repository PSGallery -AllowClobber -Force' + script: 'Install-Module -Name Az.Accounts -Scope CurrentUser -Repository PSGallery -AllowClobber -Force' pwsh: 'true' displayName: 'Install Powershell Az Module' - task: AzureCLI@2 From 06e5f0cec34bc3d8125d822c90ff2753ae576678 Mon Sep 17 00:00:00 2001 From: Gauri Prasad Date: Mon, 20 May 2024 14:58:40 -0700 Subject: [PATCH 5/5] move validation into its own file --- e2etest/newe2e_task_resourcemanagement.go | 188 --------------------- e2etest/newe2e_task_validation.go | 193 ++++++++++++++++++++++ 2 files changed, 193 insertions(+), 188 deletions(-) create mode 100644 e2etest/newe2e_task_validation.go diff --git a/e2etest/newe2e_task_resourcemanagement.go b/e2etest/newe2e_task_resourcemanagement.go index bae7ad4bb..f63e4b0a4 100644 --- a/e2etest/newe2e_task_resourcemanagement.go +++ b/e2etest/newe2e_task_resourcemanagement.go @@ -1,13 +1,7 @@ package e2etest import ( - "crypto/md5" - "encoding/hex" - "fmt" "github.com/Azure/azure-storage-azcopy/v10/cmd" - "github.com/Azure/azure-storage-azcopy/v10/common" - "io" - "strings" ) // ResourceTracker tracks resources @@ -72,185 +66,3 @@ func CreateResource[T ResourceManager](a Asserter, base ResourceManager, def Mat return matchingRes.(T) } - -func ValidatePropertyPtr[T any](a Asserter, name string, expected, real *T) { - if expected == nil { - return - } - - a.Assert(name+" must match", Equal{Deep: true}, expected, real) -} - -func ValidateMetadata(a Asserter, expected, real common.Metadata) { - if expected == nil { - return - } - - a.Assert("Metadata must match", Equal{Deep: true}, expected, real) -} - -func ValidateTags(a Asserter, expected, real map[string]string) { - if expected == nil { - return - } - - a.Assert("Tags must match", Equal{Deep: true}, expected, real) -} - -func ValidateResource[T ResourceManager](a Asserter, target T, definition MatchedResourceDefinition[T], validateObjectContent bool) { - a.AssertNow("Target resource and definition must not be null", Not{IsNil{}}, a, target, definition) - a.AssertNow("Target resource must be at a equal level to the resource definition", Equal{}, target.Level(), definition.DefinitionTarget()) - - if dryrunner, ok := a.(DryrunAsserter); ok && dryrunner.Dryrun() { - return - } - - definition.ApplyDefinition(a, target, map[cmd.LocationLevel]func(Asserter, ResourceManager, ResourceDefinition){ - cmd.ELocationLevel.Container(): func(a Asserter, manager ResourceManager, definition ResourceDefinition) { - cRes := manager.(ContainerResourceManager) - - if !definition.ShouldExist() { - a.AssertNow("container must not exist", Equal{}, cRes.Exists(), false) - return - } - - cProps := cRes.GetProperties(a) - vProps := definition.(ResourceDefinitionContainer).Properties - - ValidateMetadata(a, vProps.Metadata, cProps.Metadata) - - if manager.Location() == common.ELocation.Blob() || manager.Location() == common.ELocation.BlobFS() { - ValidatePropertyPtr(a, "Public access", vProps.BlobContainerProperties.Access, cProps.BlobContainerProperties.Access) - } - - if manager.Location() == common.ELocation.File() { - ValidatePropertyPtr(a, "Enabled protocols", vProps.FileContainerProperties.EnabledProtocols, cProps.FileContainerProperties.EnabledProtocols) - ValidatePropertyPtr(a, "RootSquash", vProps.FileContainerProperties.RootSquash, cProps.FileContainerProperties.RootSquash) - ValidatePropertyPtr(a, "AccessTier", vProps.FileContainerProperties.AccessTier, cProps.FileContainerProperties.AccessTier) - ValidatePropertyPtr(a, "Quota", vProps.FileContainerProperties.Quota, cProps.FileContainerProperties.Quota) - } - }, - cmd.ELocationLevel.Object(): func(a Asserter, manager ResourceManager, definition ResourceDefinition) { - objMan := manager.(ObjectResourceManager) - objDef := definition.(ResourceDefinitionObject) - - if !objDef.ShouldExist() { - a.Assert(fmt.Sprintf("object %s must not exist", objMan.ObjectName()), Equal{}, objMan.Exists(), false) - return - } - - oProps := objMan.GetProperties(a) - vProps := objDef.ObjectProperties - - if validateObjectContent && objMan.EntityType() == common.EEntityType.File() && objDef.Body != nil { - objBody := objMan.Download(a) - validationBody := objDef.Body.Reader() - - objHash := md5.New() - valHash := md5.New() - - _, err := io.Copy(objHash, objBody) - a.NoError("hash object body", err) - _, err = io.Copy(valHash, validationBody) - a.NoError("hash validation body", err) - - a.Assert("bodies differ in hash", Equal{Deep: true}, hex.EncodeToString(objHash.Sum(nil)), hex.EncodeToString(valHash.Sum(nil))) - } - - // Properties - ValidateMetadata(a, vProps.Metadata, oProps.Metadata) - - // HTTP headers - ValidatePropertyPtr(a, "Cache control", vProps.HTTPHeaders.cacheControl, oProps.HTTPHeaders.cacheControl) - ValidatePropertyPtr(a, "Content disposition", vProps.HTTPHeaders.contentDisposition, oProps.HTTPHeaders.contentDisposition) - ValidatePropertyPtr(a, "Content encoding", vProps.HTTPHeaders.contentEncoding, oProps.HTTPHeaders.contentEncoding) - ValidatePropertyPtr(a, "Content language", vProps.HTTPHeaders.contentLanguage, oProps.HTTPHeaders.contentLanguage) - ValidatePropertyPtr(a, "Content type", vProps.HTTPHeaders.contentType, oProps.HTTPHeaders.contentType) - - switch manager.Location() { - case common.ELocation.Blob(): - ValidatePropertyPtr(a, "Blob type", vProps.BlobProperties.Type, oProps.BlobProperties.Type) - ValidateTags(a, vProps.BlobProperties.Tags, oProps.BlobProperties.Tags) - ValidatePropertyPtr(a, "Block blob access tier", vProps.BlobProperties.BlockBlobAccessTier, oProps.BlobProperties.BlockBlobAccessTier) - ValidatePropertyPtr(a, "Page blob access tier", vProps.BlobProperties.PageBlobAccessTier, oProps.BlobProperties.PageBlobAccessTier) - case common.ELocation.File(): - ValidatePropertyPtr(a, "Attributes", vProps.FileProperties.FileAttributes, oProps.FileProperties.FileAttributes) - ValidatePropertyPtr(a, "Creation time", vProps.FileProperties.FileCreationTime, oProps.FileProperties.FileCreationTime) - ValidatePropertyPtr(a, "Last write time", vProps.FileProperties.FileLastWriteTime, oProps.FileProperties.FileLastWriteTime) - ValidatePropertyPtr(a, "Permissions", vProps.FileProperties.FilePermissions, oProps.FileProperties.FilePermissions) - case common.ELocation.BlobFS(): - ValidatePropertyPtr(a, "Permissions", vProps.BlobFSProperties.Permissions, oProps.BlobFSProperties.Permissions) - ValidatePropertyPtr(a, "Owner", vProps.BlobFSProperties.Owner, oProps.BlobFSProperties.Owner) - ValidatePropertyPtr(a, "Group", vProps.BlobFSProperties.Group, oProps.BlobFSProperties.Group) - ValidatePropertyPtr(a, "ACL", vProps.BlobFSProperties.ACL, oProps.BlobFSProperties.ACL) - } - }, - }) -} - -type AzCopyOutputKey struct { - Path string - VersionId string - SnapshotId string -} - -func ValidateListOutput(a Asserter, stdout AzCopyStdout, expectedObjects map[AzCopyOutputKey]cmd.AzCopyListObject, expectedSummary *cmd.AzCopyListSummary) { - if dryrunner, ok := a.(DryrunAsserter); ok && dryrunner.Dryrun() { - return - } - - listStdout, ok := stdout.(*AzCopyParsedListStdout) - a.AssertNow("stdout must be AzCopyParsedListStdout", Equal{}, ok, true) - - a.AssertNow("stdout and expected objects must not be null", Not{IsNil{}}, a, stdout, expectedObjects) - a.Assert("map of objects must be equivalent in size", Equal{}, len(expectedObjects), len(listStdout.Items)) - a.Assert("map of objects must match", MapContains[AzCopyOutputKey, cmd.AzCopyListObject]{TargetMap: expectedObjects}, listStdout.Items) - a.Assert("summary must match", Equal{}, listStdout.Summary, DerefOrZero(expectedSummary)) -} - -func ValidateErrorOutput(a Asserter, stdout AzCopyStdout, errorMsg string) { - if dryrunner, ok := a.(DryrunAsserter); ok && dryrunner.Dryrun() { - return - } - for _, line := range stdout.RawStdout() { - if strings.Contains(line, errorMsg) { - return - } - } - fmt.Println(stdout.String()) - a.Error("expected error message not found in azcopy output") -} - -func ValidateStatsReturned(a Asserter, stdout AzCopyStdout) { - if dryrunner, ok := a.(DryrunAsserter); ok && dryrunner.Dryrun() { - return - } - csrStdout, ok := stdout.(*AzCopyParsedCopySyncRemoveStdout) - a.AssertNow("stdout must be AzCopyParsedCopySyncRemoveStdout", Equal{}, ok, true) - // Check for any of the stats. It's possible for average iops, server busy percentage, network error percentage to be 0, but average e2e milliseconds should never be 0. - statsFound := csrStdout.FinalStatus.AverageIOPS != 0 || csrStdout.FinalStatus.AverageE2EMilliseconds != 0 || csrStdout.FinalStatus.ServerBusyPercentage != 0 || csrStdout.FinalStatus.NetworkErrorPercentage != 0 - a.Assert("stats must be returned", Equal{}, statsFound, true) -} - -func ValidateContainsError(a Asserter, stdout AzCopyStdout, errorMsg []string) { - if dryrunner, ok := a.(DryrunAsserter); ok && dryrunner.Dryrun() { - return - } - for _, line := range stdout.RawStdout() { - if checkMultipleErrors(errorMsg, line) { - return - } - } - fmt.Println(stdout.String()) - a.Error("expected error message not found in azcopy output") -} - -func checkMultipleErrors(errorMsg []string, line string) bool { - for _, e := range errorMsg { - if strings.Contains(line, e) { - return true - } - } - - return false -} diff --git a/e2etest/newe2e_task_validation.go b/e2etest/newe2e_task_validation.go new file mode 100644 index 000000000..9b0ca5023 --- /dev/null +++ b/e2etest/newe2e_task_validation.go @@ -0,0 +1,193 @@ +package e2etest + +import ( + "crypto/md5" + "encoding/hex" + "fmt" + "github.com/Azure/azure-storage-azcopy/v10/cmd" + "github.com/Azure/azure-storage-azcopy/v10/common" + "io" + "strings" +) + +func ValidatePropertyPtr[T any](a Asserter, name string, expected, real *T) { + if expected == nil { + return + } + + a.Assert(name+" must match", Equal{Deep: true}, expected, real) +} + +func ValidateMetadata(a Asserter, expected, real common.Metadata) { + if expected == nil { + return + } + + a.Assert("Metadata must match", Equal{Deep: true}, expected, real) +} + +func ValidateTags(a Asserter, expected, real map[string]string) { + if expected == nil { + return + } + + a.Assert("Tags must match", Equal{Deep: true}, expected, real) +} + +func ValidateResource[T ResourceManager](a Asserter, target T, definition MatchedResourceDefinition[T], validateObjectContent bool) { + a.AssertNow("Target resource and definition must not be null", Not{IsNil{}}, a, target, definition) + a.AssertNow("Target resource must be at a equal level to the resource definition", Equal{}, target.Level(), definition.DefinitionTarget()) + + if dryrunner, ok := a.(DryrunAsserter); ok && dryrunner.Dryrun() { + return + } + + definition.ApplyDefinition(a, target, map[cmd.LocationLevel]func(Asserter, ResourceManager, ResourceDefinition){ + cmd.ELocationLevel.Container(): func(a Asserter, manager ResourceManager, definition ResourceDefinition) { + cRes := manager.(ContainerResourceManager) + + if !definition.ShouldExist() { + a.AssertNow("container must not exist", Equal{}, cRes.Exists(), false) + return + } + + cProps := cRes.GetProperties(a) + vProps := definition.(ResourceDefinitionContainer).Properties + + ValidateMetadata(a, vProps.Metadata, cProps.Metadata) + + if manager.Location() == common.ELocation.Blob() || manager.Location() == common.ELocation.BlobFS() { + ValidatePropertyPtr(a, "Public access", vProps.BlobContainerProperties.Access, cProps.BlobContainerProperties.Access) + } + + if manager.Location() == common.ELocation.File() { + ValidatePropertyPtr(a, "Enabled protocols", vProps.FileContainerProperties.EnabledProtocols, cProps.FileContainerProperties.EnabledProtocols) + ValidatePropertyPtr(a, "RootSquash", vProps.FileContainerProperties.RootSquash, cProps.FileContainerProperties.RootSquash) + ValidatePropertyPtr(a, "AccessTier", vProps.FileContainerProperties.AccessTier, cProps.FileContainerProperties.AccessTier) + ValidatePropertyPtr(a, "Quota", vProps.FileContainerProperties.Quota, cProps.FileContainerProperties.Quota) + } + }, + cmd.ELocationLevel.Object(): func(a Asserter, manager ResourceManager, definition ResourceDefinition) { + objMan := manager.(ObjectResourceManager) + objDef := definition.(ResourceDefinitionObject) + + if !objDef.ShouldExist() { + a.Assert(fmt.Sprintf("object %s must not exist", objMan.ObjectName()), Equal{}, objMan.Exists(), false) + return + } + + oProps := objMan.GetProperties(a) + vProps := objDef.ObjectProperties + + if validateObjectContent && objMan.EntityType() == common.EEntityType.File() && objDef.Body != nil { + objBody := objMan.Download(a) + validationBody := objDef.Body.Reader() + + objHash := md5.New() + valHash := md5.New() + + _, err := io.Copy(objHash, objBody) + a.NoError("hash object body", err) + _, err = io.Copy(valHash, validationBody) + a.NoError("hash validation body", err) + + a.Assert("bodies differ in hash", Equal{Deep: true}, hex.EncodeToString(objHash.Sum(nil)), hex.EncodeToString(valHash.Sum(nil))) + } + + // Properties + ValidateMetadata(a, vProps.Metadata, oProps.Metadata) + + // HTTP headers + ValidatePropertyPtr(a, "Cache control", vProps.HTTPHeaders.cacheControl, oProps.HTTPHeaders.cacheControl) + ValidatePropertyPtr(a, "Content disposition", vProps.HTTPHeaders.contentDisposition, oProps.HTTPHeaders.contentDisposition) + ValidatePropertyPtr(a, "Content encoding", vProps.HTTPHeaders.contentEncoding, oProps.HTTPHeaders.contentEncoding) + ValidatePropertyPtr(a, "Content language", vProps.HTTPHeaders.contentLanguage, oProps.HTTPHeaders.contentLanguage) + ValidatePropertyPtr(a, "Content type", vProps.HTTPHeaders.contentType, oProps.HTTPHeaders.contentType) + + switch manager.Location() { + case common.ELocation.Blob(): + ValidatePropertyPtr(a, "Blob type", vProps.BlobProperties.Type, oProps.BlobProperties.Type) + ValidateTags(a, vProps.BlobProperties.Tags, oProps.BlobProperties.Tags) + ValidatePropertyPtr(a, "Block blob access tier", vProps.BlobProperties.BlockBlobAccessTier, oProps.BlobProperties.BlockBlobAccessTier) + ValidatePropertyPtr(a, "Page blob access tier", vProps.BlobProperties.PageBlobAccessTier, oProps.BlobProperties.PageBlobAccessTier) + case common.ELocation.File(): + ValidatePropertyPtr(a, "Attributes", vProps.FileProperties.FileAttributes, oProps.FileProperties.FileAttributes) + ValidatePropertyPtr(a, "Creation time", vProps.FileProperties.FileCreationTime, oProps.FileProperties.FileCreationTime) + ValidatePropertyPtr(a, "Last write time", vProps.FileProperties.FileLastWriteTime, oProps.FileProperties.FileLastWriteTime) + ValidatePropertyPtr(a, "Permissions", vProps.FileProperties.FilePermissions, oProps.FileProperties.FilePermissions) + case common.ELocation.BlobFS(): + ValidatePropertyPtr(a, "Permissions", vProps.BlobFSProperties.Permissions, oProps.BlobFSProperties.Permissions) + ValidatePropertyPtr(a, "Owner", vProps.BlobFSProperties.Owner, oProps.BlobFSProperties.Owner) + ValidatePropertyPtr(a, "Group", vProps.BlobFSProperties.Group, oProps.BlobFSProperties.Group) + ValidatePropertyPtr(a, "ACL", vProps.BlobFSProperties.ACL, oProps.BlobFSProperties.ACL) + } + }, + }) +} + +type AzCopyOutputKey struct { + Path string + VersionId string + SnapshotId string +} + +func ValidateListOutput(a Asserter, stdout AzCopyStdout, expectedObjects map[AzCopyOutputKey]cmd.AzCopyListObject, expectedSummary *cmd.AzCopyListSummary) { + if dryrunner, ok := a.(DryrunAsserter); ok && dryrunner.Dryrun() { + return + } + + listStdout, ok := stdout.(*AzCopyParsedListStdout) + a.AssertNow("stdout must be AzCopyParsedListStdout", Equal{}, ok, true) + + a.AssertNow("stdout and expected objects must not be null", Not{IsNil{}}, a, stdout, expectedObjects) + a.Assert("map of objects must be equivalent in size", Equal{}, len(expectedObjects), len(listStdout.Items)) + a.Assert("map of objects must match", MapContains[AzCopyOutputKey, cmd.AzCopyListObject]{TargetMap: expectedObjects}, listStdout.Items) + a.Assert("summary must match", Equal{}, listStdout.Summary, DerefOrZero(expectedSummary)) +} + +func ValidateErrorOutput(a Asserter, stdout AzCopyStdout, errorMsg string) { + if dryrunner, ok := a.(DryrunAsserter); ok && dryrunner.Dryrun() { + return + } + for _, line := range stdout.RawStdout() { + if strings.Contains(line, errorMsg) { + return + } + } + fmt.Println(stdout.String()) + a.Error("expected error message not found in azcopy output") +} + +func ValidateStatsReturned(a Asserter, stdout AzCopyStdout) { + if dryrunner, ok := a.(DryrunAsserter); ok && dryrunner.Dryrun() { + return + } + csrStdout, ok := stdout.(*AzCopyParsedCopySyncRemoveStdout) + a.AssertNow("stdout must be AzCopyParsedCopySyncRemoveStdout", Equal{}, ok, true) + // Check for any of the stats. It's possible for average iops, server busy percentage, network error percentage to be 0, but average e2e milliseconds should never be 0. + statsFound := csrStdout.FinalStatus.AverageIOPS != 0 || csrStdout.FinalStatus.AverageE2EMilliseconds != 0 || csrStdout.FinalStatus.ServerBusyPercentage != 0 || csrStdout.FinalStatus.NetworkErrorPercentage != 0 + a.Assert("stats must be returned", Equal{}, statsFound, true) +} + +func ValidateContainsError(a Asserter, stdout AzCopyStdout, errorMsg []string) { + if dryrunner, ok := a.(DryrunAsserter); ok && dryrunner.Dryrun() { + return + } + for _, line := range stdout.RawStdout() { + if checkMultipleErrors(errorMsg, line) { + return + } + } + fmt.Println(stdout.String()) + a.Error("expected error message not found in azcopy output") +} + +func checkMultipleErrors(errorMsg []string, line string) bool { + for _, e := range errorMsg { + if strings.Contains(line, e) { + return true + } + } + + return false +}