Skip to content

Commit

Permalink
Merge pull request #1276 from Azure/dev
Browse files Browse the repository at this point in the history
AzCopy v10.8.0 Release
  • Loading branch information
mohsha-msft authored Dec 11, 2020
2 parents 14f86d0 + 31036ed commit a3d0fe3
Show file tree
Hide file tree
Showing 23 changed files with 406 additions and 49 deletions.
12 changes: 12 additions & 0 deletions ChangeLog.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,18 @@

# Change Log

## Version 10.8.0

### New features
1. Added option to [disable parallel blob listing](https://github.com/Azure/azure-storage-azcopy/pull/1263)
1. Added support for uploading [large files](https://github.com/Azure/azure-storage-azcopy/pull/1254/files) upto 4TiB. Please refer the [public documentation](https://docs.microsoft.com/en-us/rest/api/storageservices/create-file) for more information
1. Added support for `include-before`flag. Refer [this](https://github.com/Azure/azure-storage-azcopy/issues/1075) for more information

### Bug fixes

1. Fixed issue [#1246](https://github.com/Azure/azure-storage-azcopy/issues/1246) of security vulnerability in x/text package
1. Fixed issue [share snapshot->share copy](https://github.com/Azure/azure-storage-azcopy/pull/1258) with smb permissions

## Version 10.7.0

### New features
Expand Down
15 changes: 14 additions & 1 deletion cmd/copy.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ type rawCopyCmdArgs struct {
excludePath string
includeFileAttributes string
excludeFileAttributes string
includeBefore string
includeAfter string
legacyInclude string // used only for warnings
legacyExclude string // used only for warnings
Expand Down Expand Up @@ -429,6 +430,16 @@ func (raw rawCopyCmdArgs) cookWithId(jobId common.JobID) (cookedCopyCmdArgs, err
cooked.listOfFilesChannel = listChan
}

if raw.includeBefore != "" {
// must set chooseEarliest = false, so that if there's an ambiguous local date, the latest will be returned
// (since that's safest for includeBefore. Better to choose the later time and do more work, than the earlier one and fail to pick up a changed file
parsedIncludeBefore, err := includeBeforeDateFilter{}.ParseISO8601(raw.includeBefore, false)
if err != nil {
return cooked, err
}
cooked.includeBefore = &parsedIncludeBefore
}

if raw.includeAfter != "" {
// must set chooseEarliest = true, so that if there's an ambiguous local date, the earliest will be returned
// (since that's safest for includeAfter. Better to choose the earlier time and do more work, than the later one and fail to pick up a changed file
Expand Down Expand Up @@ -888,6 +899,7 @@ type cookedCopyCmdArgs struct {
excludePathPatterns []string
includeFileAttributes []string
excludeFileAttributes []string
includeBefore *time.Time
includeAfter *time.Time

// list of version ids
Expand Down Expand Up @@ -1622,7 +1634,8 @@ func init() {

// filters change which files get transferred
cpCmd.PersistentFlags().BoolVar(&raw.followSymlinks, "follow-symlinks", false, "Follow symbolic links when uploading from local file system.")
cpCmd.PersistentFlags().StringVar(&raw.includeAfter, common.IncludeAfterFlagName, "", "Include only those files modified on or after the given date/time. The value should be in ISO8601 format. If no timezone is specified, the value is assumed to be in the local timezone of the machine running AzCopy. E.g. '2020-08-19T15:04:00Z' for a UTC time, or '2020-08-19' for midnight (00:00) in the local timezone. As at AzCopy 10.5, this flag applies only to files, not folders, so folder properties won't be copied when using this flag with --preserve-smb-info or --preserve-smb-permissions.")
cpCmd.PersistentFlags().StringVar(&raw.includeBefore, common.IncludeBeforeFlagName, "", "Include only those files modified before or on the given date/time. The value should be in ISO8601 format. If no timezone is specified, the value is assumed to be in the local timezone of the machine running AzCopy. E.g. '2020-08-19T15:04:00Z' for a UTC time, or '2020-08-19' for midnight (00:00) in the local timezone. As of AzCopy 10.7, this flag applies only to files, not folders, so folder properties won't be copied when using this flag with --preserve-smb-info or --preserve-smb-permissions.")
cpCmd.PersistentFlags().StringVar(&raw.includeAfter, common.IncludeAfterFlagName, "", "Include only those files modified on or after the given date/time. The value should be in ISO8601 format. If no timezone is specified, the value is assumed to be in the local timezone of the machine running AzCopy. E.g. '2020-08-19T15:04:00Z' for a UTC time, or '2020-08-19' for midnight (00:00) in the local timezone. As of AzCopy 10.5, this flag applies only to files, not folders, so folder properties won't be copied when using this flag with --preserve-smb-info or --preserve-smb-permissions.")
cpCmd.PersistentFlags().StringVar(&raw.include, "include-pattern", "", "Include only these files when copying. "+
"This option supports wildcard characters (*). Separate files by using a ';'.")
cpCmd.PersistentFlags().StringVar(&raw.includePath, "include-path", "", "Include only these paths when copying. "+
Expand Down
4 changes: 4 additions & 0 deletions cmd/copyEnumeratorInit.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,10 @@ func (cca *cookedCopyCmdArgs) isDestDirectory(dst common.ResourceString, ctx *co
func (cca *cookedCopyCmdArgs) initModularFilters() []objectFilter {
filters := make([]objectFilter, 0) // same as []objectFilter{} under the hood

if cca.includeBefore != nil {
filters = append(filters, &includeBeforeDateFilter{threshold: *cca.includeBefore})
}

if cca.includeAfter != nil {
filters = append(filters, &includeAfterDateFilter{threshold: *cca.includeAfter})
}
Expand Down
3 changes: 2 additions & 1 deletion cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,8 @@ var rootCmd = &cobra.Command{
// job is running. I.e. using this later with --include-after is _guaranteed_ to pick up all files that changed during
// or after this job
adjustedTime := timeAtPrestart.Add(-5 * time.Second)
startTimeMessage := fmt.Sprintf("ISO 8601 START TIME: to copy files that changed after this job started, use the parameter --%s=%s",
startTimeMessage := fmt.Sprintf("ISO 8601 START TIME: to copy files that changed before or after this job started, use the parameter --%s=%s or --%s=%s",
common.IncludeBeforeFlagName, includeBeforeDateFilter{}.FormatAsUTC(adjustedTime),
common.IncludeAfterFlagName, includeAfterDateFilter{}.FormatAsUTC(adjustedTime))
ste.JobsAdmin.LogToJobLog(startTimeMessage, pipeline.LogInfo)

Expand Down
53 changes: 49 additions & 4 deletions cmd/zc_filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,10 +259,55 @@ func (f *includeAfterDateFilter) doesPass(storedObject storedObject) bool {
storedObject.lastModifiedTime.Equal(f.threshold) // >= is easier for users to understand than >
}

// ParseISO8601 parses ISO 8601 dates. This routine is needed because GoLang's time.Parse* routines require all expected
func (_ includeAfterDateFilter) ParseISO8601(s string, chooseEarliest bool) (time.Time, error) {
return parseISO8601(s, chooseEarliest)
}

func (_ includeAfterDateFilter) FormatAsUTC(t time.Time) string {
return formatAsUTC(t)
}

// includeBeforeDateFilter includes files with Last Modified Times <= the specified threshold
// Used for copy, but doesn't make conceptual sense for sync
type includeBeforeDateFilter struct {
threshold time.Time
}

func (f *includeBeforeDateFilter) doesSupportThisOS() (msg string, supported bool) {
msg = ""
supported = true
return
}

func (f *includeBeforeDateFilter) appliesOnlyToFiles() bool {
return true
// because we don't currently (May 2020) have meaningful LMTs for folders. The meaningful time for a folder is the "change time" not the "last write time", and the change time can only be obtained via NtGetFileInformation, which we don't yet call.
// TODO: the consequence of this is that folder properties and folder acls can't be moved when using this filter.
// Can we live with that, for now?
}

func (f *includeBeforeDateFilter) doesPass(storedObject storedObject) bool {
zeroTime := time.Time{}
if storedObject.lastModifiedTime == zeroTime {
panic("cannot use includeBeforeDateFilter on an object for which no Last Modified Time has been retrieved")
}

return storedObject.lastModifiedTime.Before(f.threshold) ||
storedObject.lastModifiedTime.Equal(f.threshold) // <= is easier for users to understand than <
}

func (_ includeBeforeDateFilter) ParseISO8601(s string, chooseEarliest bool) (time.Time, error) {
return parseISO8601(s, chooseEarliest)
}

func (_ includeBeforeDateFilter) FormatAsUTC(t time.Time) string {
return formatAsUTC(t)
}

// parseISO8601 parses ISO 8601 dates. This routine is needed because GoLang's time.Parse* routines require all expected
// elements to be present. I.e. you can't specify just a date, and have the time default to 00:00. But ISO 8601 requires
// that and, for usability, that's what we want. (So that users can omit the whole time, or at least the seconds portion of it, if they wish)
func (_ includeAfterDateFilter) ParseISO8601(s string, chooseEarliest bool) (time.Time, error) {
func parseISO8601(s string, chooseEarliest bool) (time.Time, error) {

// list of ISO-8601 Go-lang formats in descending order of completeness
formats := []string{
Expand Down Expand Up @@ -310,7 +355,7 @@ func (_ includeAfterDateFilter) ParseISO8601(s string, chooseEarliest bool) (tim
return time.Time{}, err
}

//FormatAsUTC is inverse of parseISO8601 (and always uses the most detailed format)
func (_ includeAfterDateFilter) FormatAsUTC(t time.Time) string {
// formatAsUTC is inverse of parseISO8601 (and always uses the most detailed format)
func formatAsUTC(t time.Time) string {
return t.UTC().Format(time.RFC3339)
}
108 changes: 88 additions & 20 deletions cmd/zc_traverser_blob.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@ type blobTraverser struct {
ctx context.Context
recursive bool

// parallel listing employs the hierarchical listing API which is more expensive
// cx should have the option to disable this optimization in the name of saving costs
parallelListing bool

// whether to include blobs that have metadata 'hdi_isfolder = true'
includeDirectoryStubs bool

Expand Down Expand Up @@ -93,7 +97,6 @@ func (t *blobTraverser) getPropertiesIfSingleBlob() (props *azblob.BlobGetProper

func (t *blobTraverser) traverse(preprocessor objectMorpher, processor objectProcessor, filters []objectFilter) (err error) {
blobUrlParts := azblob.NewBlobURLParts(*t.rawURL)
util := copyHandlerUtil{}

// check if the url points to a single blob
blobProperties, isBlob, isDirStub, propErr := t.getPropertiesIfSingleBlob()
Expand Down Expand Up @@ -162,6 +165,15 @@ func (t *blobTraverser) traverse(preprocessor objectMorpher, processor objectPro
// as a performance optimization, get an extra prefix to do pre-filtering. It's typically the start portion of a blob name.
extraSearchPrefix := filterSet(filters).GetEnumerationPreFilter(t.recursive)

if t.parallelListing {
return t.parallelList(containerURL, blobUrlParts.ContainerName, searchPrefix, extraSearchPrefix, preprocessor, processor, filters)
}

return t.serialList(containerURL, blobUrlParts.ContainerName, searchPrefix, extraSearchPrefix, preprocessor, processor, filters)
}

func (t *blobTraverser) parallelList(containerURL azblob.ContainerURL, containerName string, searchPrefix string,
extraSearchPrefix string, preprocessor objectMorpher, processor objectProcessor, filters []objectFilter) error {
// Define how to enumerate its contents
// This func must be thread safe/goroutine safe
enumerateOneDir := func(dir parallel.Directory, enqueueDir func(parallel.Directory), enqueueOutput func(parallel.DirectoryEntry, error)) error {
Expand All @@ -183,24 +195,11 @@ func (t *blobTraverser) traverse(preprocessor objectMorpher, processor objectPro
// process the blobs returned in this result segment
for _, blobInfo := range lResp.Segment.BlobItems {
// if the blob represents a hdi folder, then skip it
if util.doesBlobRepresentAFolder(blobInfo.Metadata) && !(t.includeDirectoryStubs && t.recursive) {
if t.doesBlobRepresentAFolder(blobInfo.Metadata) {
continue
}

relativePath := strings.TrimPrefix(blobInfo.Name, searchPrefix)
adapter := blobPropertiesAdapter{blobInfo.Properties}
storedObject := newStoredObject(
preprocessor,
getObjectNameOnly(blobInfo.Name),
relativePath,
common.EEntityType.File(),
blobInfo.Properties.LastModified,
*blobInfo.Properties.ContentLength,
adapter,
adapter, // adapter satisfies both interfaces
common.FromAzBlobMetadataToCommonMetadata(blobInfo.Metadata),
blobUrlParts.ContainerName,
)
storedObject := t.createStoredObjectForBlob(preprocessor, blobInfo, strings.TrimPrefix(blobInfo.Name, searchPrefix), containerName)
enqueueOutput(storedObject, nil)
}

Expand All @@ -220,8 +219,6 @@ func (t *blobTraverser) traverse(preprocessor objectMorpher, processor objectPro
return workerError
}



if t.incrementEnumerationCounter != nil {
t.incrementEnumerationCounter(common.EEntityType.File())
}
Expand All @@ -235,12 +232,83 @@ func (t *blobTraverser) traverse(preprocessor objectMorpher, processor objectPro
}
}

return
return nil
}

func (t *blobTraverser) createStoredObjectForBlob(preprocessor objectMorpher, blobInfo azblob.BlobItemInternal, relativePath string, containerName string) storedObject {
adapter := blobPropertiesAdapter{blobInfo.Properties}
return newStoredObject(
preprocessor,
getObjectNameOnly(blobInfo.Name),
relativePath,
common.EEntityType.File(),
blobInfo.Properties.LastModified,
*blobInfo.Properties.ContentLength,
adapter,
adapter, // adapter satisfies both interfaces
common.FromAzBlobMetadataToCommonMetadata(blobInfo.Metadata),
containerName,
)
}

func (t *blobTraverser) doesBlobRepresentAFolder(metadata azblob.Metadata) bool {
util := copyHandlerUtil{}
return util.doesBlobRepresentAFolder(metadata) && !(t.includeDirectoryStubs && t.recursive)
}

func (t *blobTraverser) serialList(containerURL azblob.ContainerURL, containerName string, searchPrefix string,
extraSearchPrefix string, preprocessor objectMorpher, processor objectProcessor, filters []objectFilter) error {

for marker := (azblob.Marker{}); marker.NotDone(); {
// see the TO DO in GetEnumerationPreFilter if/when we make this more directory-aware

// look for all blobs that start with the prefix
// TODO optimize for the case where recursive is off
listBlob, err := containerURL.ListBlobsFlatSegment(t.ctx, marker,
azblob.ListBlobsSegmentOptions{Prefix: searchPrefix + extraSearchPrefix, Details: azblob.BlobListingDetails{Metadata: true}})
if err != nil {
return fmt.Errorf("cannot list blobs. Failed with error %s", err.Error())
}

// process the blobs returned in this result segment
for _, blobInfo := range listBlob.Segment.BlobItems {
// if the blob represents a hdi folder, then skip it
if t.doesBlobRepresentAFolder(blobInfo.Metadata) {
continue
}

relativePath := strings.TrimPrefix(blobInfo.Name, searchPrefix)
// if recursive
if !t.recursive && strings.Contains(relativePath, common.AZCOPY_PATH_SEPARATOR_STRING) {
continue
}

storedObject := t.createStoredObjectForBlob(preprocessor, blobInfo, relativePath, containerName)
if t.incrementEnumerationCounter != nil {
t.incrementEnumerationCounter(common.EEntityType.File())
}

processErr := processIfPassedFilters(filters, storedObject, processor)
_, processErr = getProcessingError(processErr)
if processErr != nil {
return processErr
}
}

marker = listBlob.NextMarker
}

return nil
}

func newBlobTraverser(rawURL *url.URL, p pipeline.Pipeline, ctx context.Context, recursive, includeDirectoryStubs bool,
incrementEnumerationCounter enumerationCounterFunc) (t *blobTraverser) {
t = &blobTraverser{rawURL: rawURL, p: p, ctx: ctx, recursive: recursive, includeDirectoryStubs: includeDirectoryStubs,
incrementEnumerationCounter: incrementEnumerationCounter}
incrementEnumerationCounter: incrementEnumerationCounter, parallelListing: true}

if strings.ToLower(glcm.GetEnvironmentVariable(common.EEnvironmentVariable.DisableHierarchicalScanning())) == "true" {
// TODO log to frontend log that parallel listing was disabled, once the frontend log PR is merged
t.parallelListing = false
}
return
}
58 changes: 58 additions & 0 deletions cmd/zt_copy_blob_upload_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -435,6 +435,64 @@ func (s *cmdIntegrationSuite) TestUploadDirectoryToContainerWithPattern(c *chk.C
})
}

func (s *cmdIntegrationSuite) TestUploadDirectoryToContainerWithIncludeBefore_UTC(c *chk.C) {
s.doTestUploadDirectoryToContainerWithIncludeBefore(true, c)
}

func (s *cmdIntegrationSuite) TestUploadDirectoryToContainerWithIncludeBefore_LocalTime(c *chk.C) {
s.doTestUploadDirectoryToContainerWithIncludeBefore(false, c)
}

func (s *cmdIntegrationSuite) doTestUploadDirectoryToContainerWithIncludeBefore(useUtc bool, c *chk.C) {
bsu := getBSU()

// set up the source directory
srcDirPath := scenarioHelper{}.generateLocalDirectory(c)
defer os.RemoveAll(srcDirPath)

// add newer files, which we wish to include
filesToInclude := []string{"important.txt", "includeSub/amazing.txt", "includeSub/wow/amazing.txt"}
scenarioHelper{}.generateLocalFilesFromList(c, srcDirPath, filesToInclude)

// sleep a little longer, to give clear LMT separation between the files above and those below (should not be copied)
time.Sleep(1500 * time.Millisecond)
includeFrom := time.Now()
extraIgnoredFiles := []string{"ignored.txt", "includeSub/ignored.txt", "includeSub/wow/ignored.txt"}
scenarioHelper{}.generateLocalFilesFromList(c, srcDirPath, extraIgnoredFiles)

// set up an empty container
containerURL, containerName := createNewContainer(c, bsu)
defer deleteContainer(c, containerURL)

// set up interceptor
mockedRPC := interceptor{}
Rpc = mockedRPC.intercept
mockedRPC.init()

// construct the raw input to simulate user input
rawContainerURLWithSAS := scenarioHelper{}.getRawContainerURLWithSAS(c, containerName)
raw := getDefaultCopyRawInput(srcDirPath, rawContainerURLWithSAS.String())
raw.recursive = true
if useUtc {
raw.includeBefore = includeFrom.UTC().Format(time.RFC3339)
} else {
raw.includeBefore = includeFrom.Format("2006-01-02T15:04:05") // local time, no timezone
}

runCopyAndVerify(c, raw, func(err error) {
c.Assert(err, chk.IsNil)

// validate that the right number of transfers were scheduled
c.Assert(len(mockedRPC.transfers), chk.Equals, len(filesToInclude))

// validate that the right transfers were sent
expectedTransfers := scenarioHelper{}.shaveOffPrefix(filesToInclude, filepath.Base(srcDirPath)+common.AZCOPY_PATH_SEPARATOR_STRING)
validateUploadTransfersAreScheduled(c, common.AZCOPY_PATH_SEPARATOR_STRING,
common.AZCOPY_PATH_SEPARATOR_STRING+filepath.Base(srcDirPath)+common.AZCOPY_PATH_SEPARATOR_STRING, expectedTransfers, mockedRPC)
})
}


func (s *cmdIntegrationSuite) TestUploadDirectoryToContainerWithIncludeAfter_UTC(c *chk.C) {
s.doTestUploadDirectoryToContainerWithIncludeAfter(true, c)
}
Expand Down
Loading

0 comments on commit a3d0fe3

Please sign in to comment.