Skip to content

Commit

Permalink
Correct behavior when syncing with delete-destination-files (#2818)
Browse files Browse the repository at this point in the history
* Correct behavior when syncing with delete-destination-files

* Fix testing

* fixed new sync test reference of NewRandomObjectContentContainer

---------

Co-authored-by: Gauri Lamunion <51212198+gapra-msft@users.noreply.github.com>
Co-authored-by: Gauri Lamunion <gapra@microsoft.com>
  • Loading branch information
3 people authored Oct 21, 2024
1 parent 6412580 commit 183ea22
Show file tree
Hide file tree
Showing 7 changed files with 114 additions and 41 deletions.
27 changes: 8 additions & 19 deletions cmd/syncComparator.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,13 +65,12 @@ type syncDestinationComparator struct {

comparisonHashType common.SyncHashType

preferSMBTime bool
disableComparison bool
deleteDestinationFileSync bool
preferSMBTime bool
disableComparison bool
}

func newSyncDestinationComparator(i *objectIndexer, copyScheduler, cleaner objectProcessor, comparisonHashType common.SyncHashType, preferSMBTime, disableComparison bool, deleteDestinationFile bool) *syncDestinationComparator {
return &syncDestinationComparator{sourceIndex: i, copyTransferScheduler: copyScheduler, destinationCleaner: cleaner, preferSMBTime: preferSMBTime, disableComparison: disableComparison, comparisonHashType: comparisonHashType, deleteDestinationFileSync: deleteDestinationFile}
func newSyncDestinationComparator(i *objectIndexer, copyScheduler, cleaner objectProcessor, comparisonHashType common.SyncHashType, preferSMBTime, disableComparison bool) *syncDestinationComparator {
return &syncDestinationComparator{sourceIndex: i, copyTransferScheduler: copyScheduler, destinationCleaner: cleaner, preferSMBTime: preferSMBTime, disableComparison: disableComparison, comparisonHashType: comparisonHashType}
}

// it will only schedule transfers for destination objects that are present in the indexer but stale compared to the entry in the map
Expand All @@ -90,11 +89,6 @@ func (f *syncDestinationComparator) processIfNecessary(destinationObject StoredO
if present {
defer delete(f.sourceIndex.indexMap, destinationObject.relativePath)

if f.deleteDestinationFileSync { // when delete-destination-file flag is turned on via sync command, we want to overwrite the file at destination
syncComparatorLog(sourceObjectInMap.relativePath, syncStatusOverwritten, syncOverwriteReasonDeleteDestinationFile, false)
return f.copyTransferScheduler(sourceObjectInMap)
}

if f.disableComparison {
syncComparatorLog(sourceObjectInMap.relativePath, syncStatusOverwritten, syncOverwriteReasonNewerHash, false)
return f.copyTransferScheduler(sourceObjectInMap)
Expand Down Expand Up @@ -148,13 +142,12 @@ type syncSourceComparator struct {

comparisonHashType common.SyncHashType

preferSMBTime bool
disableComparison bool
deleteDestinationFileSync bool
preferSMBTime bool
disableComparison bool
}

func newSyncSourceComparator(i *objectIndexer, copyScheduler objectProcessor, comparisonHashType common.SyncHashType, preferSMBTime, disableComparison bool, deleteDestNew bool) *syncSourceComparator {
return &syncSourceComparator{destinationIndex: i, copyTransferScheduler: copyScheduler, preferSMBTime: preferSMBTime, disableComparison: disableComparison, comparisonHashType: comparisonHashType, deleteDestinationFileSync: deleteDestNew}
func newSyncSourceComparator(i *objectIndexer, copyScheduler objectProcessor, comparisonHashType common.SyncHashType, preferSMBTime, disableComparison bool) *syncSourceComparator {
return &syncSourceComparator{destinationIndex: i, copyTransferScheduler: copyScheduler, preferSMBTime: preferSMBTime, disableComparison: disableComparison, comparisonHashType: comparisonHashType}
}

// it will only transfer source items that are:
Expand All @@ -174,10 +167,6 @@ func (f *syncSourceComparator) processIfNecessary(sourceObject StoredObject) err
if present {
defer delete(f.destinationIndex.indexMap, relPath)

if f.deleteDestinationFileSync { // when delete-destination-file flag is turned on via sync command, we want to overwrite the file at destination
syncComparatorLog(sourceObject.relativePath, syncStatusOverwritten, syncOverwriteReasonDeleteDestinationFile, false)
return f.copyTransferScheduler(sourceObject)
}
// if destination is stale, schedule source for transfer
if f.disableComparison {
syncComparatorLog(sourceObject.relativePath, syncStatusOverwritten, syncOverwriteReasonNewerHash, false)
Expand Down
4 changes: 2 additions & 2 deletions cmd/syncEnumerator.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ func (cca *cookedSyncCmdArgs) initEnumerator(ctx context.Context) (enumerator *s
// we ALREADY have available a complete map of everything that exists locally
// so as soon as we see a remote destination object we can know whether it exists in the local source

comparator = newSyncDestinationComparator(indexer, transferScheduler.scheduleCopyTransfer, destCleanerFunc, cca.compareHash, cca.preserveSMBInfo, cca.mirrorMode, cca.deleteDestinationFileIfNecessary).processIfNecessary
comparator = newSyncDestinationComparator(indexer, transferScheduler.scheduleCopyTransfer, destCleanerFunc, cca.compareHash, cca.preserveSMBInfo, cca.mirrorMode).processIfNecessary
finalize = func() error {
// schedule every local file that doesn't exist at the destination
err = indexer.traverse(transferScheduler.scheduleCopyTransfer, filters)
Expand All @@ -270,7 +270,7 @@ func (cca *cookedSyncCmdArgs) initEnumerator(ctx context.Context) (enumerator *s
indexer.isDestinationCaseInsensitive = IsDestinationCaseInsensitive(cca.fromTo)
// in all other cases (download and S2S), the destination is scanned/indexed first
// then the source is scanned and filtered based on what the destination contains
comparator = newSyncSourceComparator(indexer, transferScheduler.scheduleCopyTransfer, cca.compareHash, cca.preserveSMBInfo, cca.mirrorMode, cca.deleteDestinationFileIfNecessary).processIfNecessary
comparator = newSyncSourceComparator(indexer, transferScheduler.scheduleCopyTransfer, cca.compareHash, cca.preserveSMBInfo, cca.mirrorMode).processIfNecessary

finalize = func() error {
// remove the extra files at the destination that were not present at the source
Expand Down
8 changes: 4 additions & 4 deletions cmd/zt_sync_file_file_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func TestSyncSourceComparator(t *testing.T) {

// set up the indexer as well as the source comparator
indexer := newObjectIndexer()
sourceComparator := newSyncSourceComparator(indexer, dummyCopyScheduler.process, common.ESyncHashType.None(), false, false, false)
sourceComparator := newSyncSourceComparator(indexer, dummyCopyScheduler.process, common.ESyncHashType.None(), false, false)

// create a sample destination object
sampleDestinationObject := StoredObject{name: "test", relativePath: "/usr/test", lastModifiedTime: time.Now(), md5: destMD5}
Expand Down Expand Up @@ -89,7 +89,7 @@ func TestSyncSrcCompDisableComparator(t *testing.T) {

// set up the indexer as well as the source comparator
indexer := newObjectIndexer()
sourceComparator := newSyncSourceComparator(indexer, dummyCopyScheduler.process, common.ESyncHashType.None(), false, true, false)
sourceComparator := newSyncSourceComparator(indexer, dummyCopyScheduler.process, common.ESyncHashType.None(), false, true)

// test the comparator in case a given source object is not present at the destination
// meaning no entry in the index, so the comparator should pass the given object to schedule a transfer
Expand Down Expand Up @@ -139,7 +139,7 @@ func TestSyncDestinationComparator(t *testing.T) {

// set up the indexer as well as the destination comparator
indexer := newObjectIndexer()
destinationComparator := newSyncDestinationComparator(indexer, dummyCopyScheduler.process, dummyCleaner.process, common.ESyncHashType.None(), false, false, false)
destinationComparator := newSyncDestinationComparator(indexer, dummyCopyScheduler.process, dummyCleaner.process, common.ESyncHashType.None(), false, false)

// create a sample source object
sampleSourceObject := StoredObject{name: "test", relativePath: "/usr/test", lastModifiedTime: time.Now(), md5: srcMD5}
Expand Down Expand Up @@ -197,7 +197,7 @@ func TestSyncDestCompDisableComparison(t *testing.T) {

// set up the indexer as well as the destination comparator
indexer := newObjectIndexer()
destinationComparator := newSyncDestinationComparator(indexer, dummyCopyScheduler.process, dummyCleaner.process, common.ESyncHashType.None(), false, true, false)
destinationComparator := newSyncDestinationComparator(indexer, dummyCopyScheduler.process, dummyCleaner.process, common.ESyncHashType.None(), false, true)

// create a sample source object
currTime := time.Now()
Expand Down
6 changes: 4 additions & 2 deletions e2etest/newe2e_task_azcopy_job_validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -386,7 +386,9 @@ func ValidatePlanFiles(sm *ScenarioVariationManager, stdOut AzCopyStdout, expect
mmf.Unmap()
}

for path, _ := range expected.Objects {
sm.Assert("object src: "+path.SrcPath+", dst: "+path.DstPath+"; was missing from the plan file.", Always{})
for path, obj := range expected.Objects {
if DerefOrDefault(obj.ShouldBePresent, true) {
sm.Assert("object src: "+path.SrcPath+", dst: "+path.DstPath+"; was missing from the plan file.", Always{})
}
}
}
2 changes: 2 additions & 0 deletions e2etest/newe2e_task_runazcopy_parameters.go
Original file line number Diff line number Diff line change
Expand Up @@ -379,6 +379,8 @@ type SyncFlags struct {
CompareHash *common.SyncHashType `flag:"compare-hash"`
LocalHashDir *string `flag:"hash-meta-dir"`
LocalHashStorageMode *common.HashStorageMode `flag:"local-hash-storage-mode"`
// The real flag name is not all that great due to `delete-destination`, but, it works.
DeleteIfNecessary *bool `flag:"delete-destination-file"`
}

// RemoveFlags is not tiered like CopySyncCommonFlags is, because it is dissimilar in functionality, and would be hard to test in the same scenario.
Expand Down
17 changes: 3 additions & 14 deletions e2etest/zt_basic_copy_sync_remove_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ package e2etest

import (
"crypto/md5"
"encoding/base64"
"errors"
"fmt"
"os"
Expand Down Expand Up @@ -1128,20 +1129,8 @@ func TestCopySync_DeleteDestinationFileFlag(t *testing.T) {
&hooks{
beforeRunJob: func(h hookHelper) {
blobClient := h.GetDestination().(*resourceBlobContainer).containerClient.NewBlockBlobClient("filea")
// initial stage block
id := []string{BlockIDIntToBase64(1)}
_, err := blobClient.StageBlock(ctx, id[0], streaming.NopCloser(strings.NewReader(blockBlobDefaultData)), nil)
if err != nil {
t.Errorf("error staging block %s", err)
}

_, err = blobClient.CommitBlockList(ctx, id, nil)
if err != nil {
t.Errorf("error committing block %s", err)
}

// second stage block
_, err = blobClient.StageBlock(ctx, id[0], streaming.NopCloser(strings.NewReader(blockBlobDefaultData)), nil)
// initial stage block, with block id incompatible with us
_, err := blobClient.StageBlock(ctx, base64.StdEncoding.EncodeToString([]byte("foobar")), streaming.NopCloser(strings.NewReader(blockBlobDefaultData)), nil)
if err != nil {
t.Errorf("error staging block %s", err)
}
Expand Down
91 changes: 91 additions & 0 deletions e2etest/zt_newe2e_sync_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package e2etest

import (
"bytes"
"encoding/base64"
"github.com/Azure/azure-sdk-for-go/sdk/azcore/streaming"
"github.com/Azure/azure-storage-azcopy/v10/common"
"io/fs"
"os"
Expand Down Expand Up @@ -170,3 +172,92 @@ func (s *SyncTestSuite) Scenario_TestSyncRemoveDestination(svm *ScenarioVariatio
},
}, false)
}

// Scenario_TestSyncDeleteDestinationIfNecessary tests that sync is
// - capable of deleting blobs of the wrong type
func (s *SyncTestSuite) Scenario_TestSyncDeleteDestinationIfNecessary(svm *ScenarioVariationManager) {
dstLoc := ResolveVariation(svm, []common.Location{common.ELocation.Blob(), common.ELocation.BlobFS()})
dstRes := CreateResource[ContainerResourceManager](svm,
GetRootResource(svm, dstLoc, GetResourceOptions{
PreferredAccount: common.Iff(dstLoc == common.ELocation.Blob(),
pointerTo(PrimaryStandardAcct), //
pointerTo(PrimaryHNSAcct),
),
}),
ResourceDefinitionContainer{})

overwriteName := "copyme.txt"
ignoreName := "ignore.txt"

if !svm.Dryrun() { // We're working directly with raw clients, so, we need to be careful.
buf := streaming.NopCloser(bytes.NewReader([]byte("foo")))

switch dstRes.Location() {
case common.ELocation.Blob(): // In this case, we want to submit a block ID with a different length.
ctClient := dstRes.(*BlobContainerResourceManager).internalClient
blobClient := ctClient.NewBlockBlobClient(overwriteName)

_, err := blobClient.StageBlock(ctx, base64.StdEncoding.EncodeToString([]byte("foobar")), buf, nil)
svm.Assert("stage block error", IsNil{}, err)
case common.ELocation.BlobFS(): // In this case, we want to upload a blob via DFS.
ctClient := dstRes.(*BlobFSFileSystemResourceManager).internalClient
pathClient := ctClient.NewFileClient(overwriteName)

_, err := pathClient.Create(ctx, nil)
svm.Assert("Create error", IsNil{}, err)
err = pathClient.UploadStream(ctx, buf, nil)
svm.Assert("Upload stream error", IsNil{}, err)
}

// Sleep so it's in the past.
time.Sleep(time.Second * 10)
}

srcData := NewRandomObjectContentContainer(1024)
srcRes := CreateResource[ContainerResourceManager](svm, GetRootResource(svm, common.ELocation.Blob()), ResourceDefinitionContainer{
Objects: ObjectResourceMappingFlat{
overwriteName: ResourceDefinitionObject{Body: srcData},
ignoreName: ResourceDefinitionObject{Body: srcData},
},
})

dstData := NewRandomObjectContentContainer(1024)
if !svm.Dryrun() {
time.Sleep(time.Second * 10) // Make sure this file is newer

CreateResource[ObjectResourceManager](svm, dstRes, ResourceDefinitionObject{
ObjectName: &ignoreName,
Body: dstData,
})
}

stdout, _ := RunAzCopy(svm, AzCopyCommand{
Verb: AzCopyVerbSync,
Targets: []ResourceManager{srcRes, dstRes},
Flags: SyncFlags{
DeleteIfNecessary: pointerTo(true),
},
})

ValidatePlanFiles(svm, stdout, ExpectedPlanFile{
Objects: map[PlanFilePath]PlanFileObject{
PlanFilePath{"/" + overwriteName, "/" + overwriteName}: {
ShouldBePresent: pointerTo(true),
},
PlanFilePath{"/" + ignoreName, "/" + ignoreName}: {
ShouldBePresent: pointerTo(false),
},
},
})

ValidateResource(svm, dstRes, ResourceDefinitionContainer{
Objects: ObjectResourceMappingFlat{
overwriteName: ResourceDefinitionObject{
Body: srcData, // Validate we overwrote this one
},
ignoreName: ResourceDefinitionObject{
Body: dstData, // Validate we did not overwrite this one
},
},
}, true)
}

0 comments on commit 183ea22

Please sign in to comment.