From 183ea22b0bfbe057763f2a7ba092eccdde5d6007 Mon Sep 17 00:00:00 2001 From: adreed-msft <49764384+adreed-msft@users.noreply.github.com> Date: Mon, 21 Oct 2024 13:33:23 -0700 Subject: [PATCH] Correct behavior when syncing with delete-destination-files (#2818) * Correct behavior when syncing with delete-destination-files * Fix testing * fixed new sync test reference of NewRandomObjectContentContainer --------- Co-authored-by: Gauri Lamunion <51212198+gapra-msft@users.noreply.github.com> Co-authored-by: Gauri Lamunion --- cmd/syncComparator.go | 27 ++---- cmd/syncEnumerator.go | 4 +- cmd/zt_sync_file_file_test.go | 8 +- e2etest/newe2e_task_azcopy_job_validate.go | 6 +- e2etest/newe2e_task_runazcopy_parameters.go | 2 + e2etest/zt_basic_copy_sync_remove_test.go | 17 +--- e2etest/zt_newe2e_sync_test.go | 91 +++++++++++++++++++++ 7 files changed, 114 insertions(+), 41 deletions(-) diff --git a/cmd/syncComparator.go b/cmd/syncComparator.go index 64e76b6b7..b99bfe733 100644 --- a/cmd/syncComparator.go +++ b/cmd/syncComparator.go @@ -65,13 +65,12 @@ type syncDestinationComparator struct { comparisonHashType common.SyncHashType - preferSMBTime bool - disableComparison bool - deleteDestinationFileSync bool + preferSMBTime bool + disableComparison bool } -func newSyncDestinationComparator(i *objectIndexer, copyScheduler, cleaner objectProcessor, comparisonHashType common.SyncHashType, preferSMBTime, disableComparison bool, deleteDestinationFile bool) *syncDestinationComparator { - return &syncDestinationComparator{sourceIndex: i, copyTransferScheduler: copyScheduler, destinationCleaner: cleaner, preferSMBTime: preferSMBTime, disableComparison: disableComparison, comparisonHashType: comparisonHashType, deleteDestinationFileSync: deleteDestinationFile} +func newSyncDestinationComparator(i *objectIndexer, copyScheduler, cleaner objectProcessor, comparisonHashType common.SyncHashType, preferSMBTime, disableComparison bool) *syncDestinationComparator { + return &syncDestinationComparator{sourceIndex: i, copyTransferScheduler: copyScheduler, destinationCleaner: cleaner, preferSMBTime: preferSMBTime, disableComparison: disableComparison, comparisonHashType: comparisonHashType} } // it will only schedule transfers for destination objects that are present in the indexer but stale compared to the entry in the map @@ -90,11 +89,6 @@ func (f *syncDestinationComparator) processIfNecessary(destinationObject StoredO if present { defer delete(f.sourceIndex.indexMap, destinationObject.relativePath) - if f.deleteDestinationFileSync { // when delete-destination-file flag is turned on via sync command, we want to overwrite the file at destination - syncComparatorLog(sourceObjectInMap.relativePath, syncStatusOverwritten, syncOverwriteReasonDeleteDestinationFile, false) - return f.copyTransferScheduler(sourceObjectInMap) - } - if f.disableComparison { syncComparatorLog(sourceObjectInMap.relativePath, syncStatusOverwritten, syncOverwriteReasonNewerHash, false) return f.copyTransferScheduler(sourceObjectInMap) @@ -148,13 +142,12 @@ type syncSourceComparator struct { comparisonHashType common.SyncHashType - preferSMBTime bool - disableComparison bool - deleteDestinationFileSync bool + preferSMBTime bool + disableComparison bool } -func newSyncSourceComparator(i *objectIndexer, copyScheduler objectProcessor, comparisonHashType common.SyncHashType, preferSMBTime, disableComparison bool, deleteDestNew bool) *syncSourceComparator { - return &syncSourceComparator{destinationIndex: i, copyTransferScheduler: copyScheduler, preferSMBTime: preferSMBTime, disableComparison: disableComparison, comparisonHashType: comparisonHashType, deleteDestinationFileSync: deleteDestNew} +func newSyncSourceComparator(i *objectIndexer, copyScheduler objectProcessor, comparisonHashType common.SyncHashType, preferSMBTime, disableComparison bool) *syncSourceComparator { + return &syncSourceComparator{destinationIndex: i, copyTransferScheduler: copyScheduler, preferSMBTime: preferSMBTime, disableComparison: disableComparison, comparisonHashType: comparisonHashType} } // it will only transfer source items that are: @@ -174,10 +167,6 @@ func (f *syncSourceComparator) processIfNecessary(sourceObject StoredObject) err if present { defer delete(f.destinationIndex.indexMap, relPath) - if f.deleteDestinationFileSync { // when delete-destination-file flag is turned on via sync command, we want to overwrite the file at destination - syncComparatorLog(sourceObject.relativePath, syncStatusOverwritten, syncOverwriteReasonDeleteDestinationFile, false) - return f.copyTransferScheduler(sourceObject) - } // if destination is stale, schedule source for transfer if f.disableComparison { syncComparatorLog(sourceObject.relativePath, syncStatusOverwritten, syncOverwriteReasonNewerHash, false) diff --git a/cmd/syncEnumerator.go b/cmd/syncEnumerator.go index c651598ed..71ae2e8bf 100644 --- a/cmd/syncEnumerator.go +++ b/cmd/syncEnumerator.go @@ -246,7 +246,7 @@ func (cca *cookedSyncCmdArgs) initEnumerator(ctx context.Context) (enumerator *s // we ALREADY have available a complete map of everything that exists locally // so as soon as we see a remote destination object we can know whether it exists in the local source - comparator = newSyncDestinationComparator(indexer, transferScheduler.scheduleCopyTransfer, destCleanerFunc, cca.compareHash, cca.preserveSMBInfo, cca.mirrorMode, cca.deleteDestinationFileIfNecessary).processIfNecessary + comparator = newSyncDestinationComparator(indexer, transferScheduler.scheduleCopyTransfer, destCleanerFunc, cca.compareHash, cca.preserveSMBInfo, cca.mirrorMode).processIfNecessary finalize = func() error { // schedule every local file that doesn't exist at the destination err = indexer.traverse(transferScheduler.scheduleCopyTransfer, filters) @@ -270,7 +270,7 @@ func (cca *cookedSyncCmdArgs) initEnumerator(ctx context.Context) (enumerator *s indexer.isDestinationCaseInsensitive = IsDestinationCaseInsensitive(cca.fromTo) // in all other cases (download and S2S), the destination is scanned/indexed first // then the source is scanned and filtered based on what the destination contains - comparator = newSyncSourceComparator(indexer, transferScheduler.scheduleCopyTransfer, cca.compareHash, cca.preserveSMBInfo, cca.mirrorMode, cca.deleteDestinationFileIfNecessary).processIfNecessary + comparator = newSyncSourceComparator(indexer, transferScheduler.scheduleCopyTransfer, cca.compareHash, cca.preserveSMBInfo, cca.mirrorMode).processIfNecessary finalize = func() error { // remove the extra files at the destination that were not present at the source diff --git a/cmd/zt_sync_file_file_test.go b/cmd/zt_sync_file_file_test.go index 09b8d2e58..2b35c3b71 100644 --- a/cmd/zt_sync_file_file_test.go +++ b/cmd/zt_sync_file_file_test.go @@ -35,7 +35,7 @@ func TestSyncSourceComparator(t *testing.T) { // set up the indexer as well as the source comparator indexer := newObjectIndexer() - sourceComparator := newSyncSourceComparator(indexer, dummyCopyScheduler.process, common.ESyncHashType.None(), false, false, false) + sourceComparator := newSyncSourceComparator(indexer, dummyCopyScheduler.process, common.ESyncHashType.None(), false, false) // create a sample destination object sampleDestinationObject := StoredObject{name: "test", relativePath: "/usr/test", lastModifiedTime: time.Now(), md5: destMD5} @@ -89,7 +89,7 @@ func TestSyncSrcCompDisableComparator(t *testing.T) { // set up the indexer as well as the source comparator indexer := newObjectIndexer() - sourceComparator := newSyncSourceComparator(indexer, dummyCopyScheduler.process, common.ESyncHashType.None(), false, true, false) + sourceComparator := newSyncSourceComparator(indexer, dummyCopyScheduler.process, common.ESyncHashType.None(), false, true) // test the comparator in case a given source object is not present at the destination // meaning no entry in the index, so the comparator should pass the given object to schedule a transfer @@ -139,7 +139,7 @@ func TestSyncDestinationComparator(t *testing.T) { // set up the indexer as well as the destination comparator indexer := newObjectIndexer() - destinationComparator := newSyncDestinationComparator(indexer, dummyCopyScheduler.process, dummyCleaner.process, common.ESyncHashType.None(), false, false, false) + destinationComparator := newSyncDestinationComparator(indexer, dummyCopyScheduler.process, dummyCleaner.process, common.ESyncHashType.None(), false, false) // create a sample source object sampleSourceObject := StoredObject{name: "test", relativePath: "/usr/test", lastModifiedTime: time.Now(), md5: srcMD5} @@ -197,7 +197,7 @@ func TestSyncDestCompDisableComparison(t *testing.T) { // set up the indexer as well as the destination comparator indexer := newObjectIndexer() - destinationComparator := newSyncDestinationComparator(indexer, dummyCopyScheduler.process, dummyCleaner.process, common.ESyncHashType.None(), false, true, false) + destinationComparator := newSyncDestinationComparator(indexer, dummyCopyScheduler.process, dummyCleaner.process, common.ESyncHashType.None(), false, true) // create a sample source object currTime := time.Now() diff --git a/e2etest/newe2e_task_azcopy_job_validate.go b/e2etest/newe2e_task_azcopy_job_validate.go index 9c40decf6..55ef05fc8 100644 --- a/e2etest/newe2e_task_azcopy_job_validate.go +++ b/e2etest/newe2e_task_azcopy_job_validate.go @@ -386,7 +386,9 @@ func ValidatePlanFiles(sm *ScenarioVariationManager, stdOut AzCopyStdout, expect mmf.Unmap() } - for path, _ := range expected.Objects { - sm.Assert("object src: "+path.SrcPath+", dst: "+path.DstPath+"; was missing from the plan file.", Always{}) + for path, obj := range expected.Objects { + if DerefOrDefault(obj.ShouldBePresent, true) { + sm.Assert("object src: "+path.SrcPath+", dst: "+path.DstPath+"; was missing from the plan file.", Always{}) + } } } diff --git a/e2etest/newe2e_task_runazcopy_parameters.go b/e2etest/newe2e_task_runazcopy_parameters.go index 58d115054..f8ea9d91f 100644 --- a/e2etest/newe2e_task_runazcopy_parameters.go +++ b/e2etest/newe2e_task_runazcopy_parameters.go @@ -379,6 +379,8 @@ type SyncFlags struct { CompareHash *common.SyncHashType `flag:"compare-hash"` LocalHashDir *string `flag:"hash-meta-dir"` LocalHashStorageMode *common.HashStorageMode `flag:"local-hash-storage-mode"` + // The real flag name is not all that great due to `delete-destination`, but, it works. + DeleteIfNecessary *bool `flag:"delete-destination-file"` } // RemoveFlags is not tiered like CopySyncCommonFlags is, because it is dissimilar in functionality, and would be hard to test in the same scenario. diff --git a/e2etest/zt_basic_copy_sync_remove_test.go b/e2etest/zt_basic_copy_sync_remove_test.go index bd9af5a23..4655ec97d 100644 --- a/e2etest/zt_basic_copy_sync_remove_test.go +++ b/e2etest/zt_basic_copy_sync_remove_test.go @@ -22,6 +22,7 @@ package e2etest import ( "crypto/md5" + "encoding/base64" "errors" "fmt" "os" @@ -1128,20 +1129,8 @@ func TestCopySync_DeleteDestinationFileFlag(t *testing.T) { &hooks{ beforeRunJob: func(h hookHelper) { blobClient := h.GetDestination().(*resourceBlobContainer).containerClient.NewBlockBlobClient("filea") - // initial stage block - id := []string{BlockIDIntToBase64(1)} - _, err := blobClient.StageBlock(ctx, id[0], streaming.NopCloser(strings.NewReader(blockBlobDefaultData)), nil) - if err != nil { - t.Errorf("error staging block %s", err) - } - - _, err = blobClient.CommitBlockList(ctx, id, nil) - if err != nil { - t.Errorf("error committing block %s", err) - } - - // second stage block - _, err = blobClient.StageBlock(ctx, id[0], streaming.NopCloser(strings.NewReader(blockBlobDefaultData)), nil) + // initial stage block, with block id incompatible with us + _, err := blobClient.StageBlock(ctx, base64.StdEncoding.EncodeToString([]byte("foobar")), streaming.NopCloser(strings.NewReader(blockBlobDefaultData)), nil) if err != nil { t.Errorf("error staging block %s", err) } diff --git a/e2etest/zt_newe2e_sync_test.go b/e2etest/zt_newe2e_sync_test.go index 8d220f191..1b0bb0477 100644 --- a/e2etest/zt_newe2e_sync_test.go +++ b/e2etest/zt_newe2e_sync_test.go @@ -1,7 +1,9 @@ package e2etest import ( + "bytes" "encoding/base64" + "github.com/Azure/azure-sdk-for-go/sdk/azcore/streaming" "github.com/Azure/azure-storage-azcopy/v10/common" "io/fs" "os" @@ -170,3 +172,92 @@ func (s *SyncTestSuite) Scenario_TestSyncRemoveDestination(svm *ScenarioVariatio }, }, false) } + +// Scenario_TestSyncDeleteDestinationIfNecessary tests that sync is +// - capable of deleting blobs of the wrong type +func (s *SyncTestSuite) Scenario_TestSyncDeleteDestinationIfNecessary(svm *ScenarioVariationManager) { + dstLoc := ResolveVariation(svm, []common.Location{common.ELocation.Blob(), common.ELocation.BlobFS()}) + dstRes := CreateResource[ContainerResourceManager](svm, + GetRootResource(svm, dstLoc, GetResourceOptions{ + PreferredAccount: common.Iff(dstLoc == common.ELocation.Blob(), + pointerTo(PrimaryStandardAcct), // + pointerTo(PrimaryHNSAcct), + ), + }), + ResourceDefinitionContainer{}) + + overwriteName := "copyme.txt" + ignoreName := "ignore.txt" + + if !svm.Dryrun() { // We're working directly with raw clients, so, we need to be careful. + buf := streaming.NopCloser(bytes.NewReader([]byte("foo"))) + + switch dstRes.Location() { + case common.ELocation.Blob(): // In this case, we want to submit a block ID with a different length. + ctClient := dstRes.(*BlobContainerResourceManager).internalClient + blobClient := ctClient.NewBlockBlobClient(overwriteName) + + _, err := blobClient.StageBlock(ctx, base64.StdEncoding.EncodeToString([]byte("foobar")), buf, nil) + svm.Assert("stage block error", IsNil{}, err) + case common.ELocation.BlobFS(): // In this case, we want to upload a blob via DFS. + ctClient := dstRes.(*BlobFSFileSystemResourceManager).internalClient + pathClient := ctClient.NewFileClient(overwriteName) + + _, err := pathClient.Create(ctx, nil) + svm.Assert("Create error", IsNil{}, err) + err = pathClient.UploadStream(ctx, buf, nil) + svm.Assert("Upload stream error", IsNil{}, err) + } + + // Sleep so it's in the past. + time.Sleep(time.Second * 10) + } + + srcData := NewRandomObjectContentContainer(1024) + srcRes := CreateResource[ContainerResourceManager](svm, GetRootResource(svm, common.ELocation.Blob()), ResourceDefinitionContainer{ + Objects: ObjectResourceMappingFlat{ + overwriteName: ResourceDefinitionObject{Body: srcData}, + ignoreName: ResourceDefinitionObject{Body: srcData}, + }, + }) + + dstData := NewRandomObjectContentContainer(1024) + if !svm.Dryrun() { + time.Sleep(time.Second * 10) // Make sure this file is newer + + CreateResource[ObjectResourceManager](svm, dstRes, ResourceDefinitionObject{ + ObjectName: &ignoreName, + Body: dstData, + }) + } + + stdout, _ := RunAzCopy(svm, AzCopyCommand{ + Verb: AzCopyVerbSync, + Targets: []ResourceManager{srcRes, dstRes}, + Flags: SyncFlags{ + DeleteIfNecessary: pointerTo(true), + }, + }) + + ValidatePlanFiles(svm, stdout, ExpectedPlanFile{ + Objects: map[PlanFilePath]PlanFileObject{ + PlanFilePath{"/" + overwriteName, "/" + overwriteName}: { + ShouldBePresent: pointerTo(true), + }, + PlanFilePath{"/" + ignoreName, "/" + ignoreName}: { + ShouldBePresent: pointerTo(false), + }, + }, + }) + + ValidateResource(svm, dstRes, ResourceDefinitionContainer{ + Objects: ObjectResourceMappingFlat{ + overwriteName: ResourceDefinitionObject{ + Body: srcData, // Validate we overwrote this one + }, + ignoreName: ResourceDefinitionObject{ + Body: dstData, // Validate we did not overwrite this one + }, + }, + }, true) +}