diff --git a/CHANGELOG.md b/CHANGELOG.md index c30dd9c20..aa41f73d7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,12 @@ - Added `--profile` flag to allow users to specify a [named profile](https://docs.aws.amazon.com/cli/latest/userguide/cli-configure-profiles.html). ([#353](https://github.com/peak/s5cmd/issues/353)) - Added `--credentials-file` flag to allow users to specify path for the AWS credentials file instead of using the [default location](https://docs.aws.amazon.com/cli/latest/userguide/cli-configure-files.html#cli-configure-files-where). - Added `bench.py` script under new `benchmark` folder to compare performances of two different builds of s5cmd. ([#471](https://github.com/peak/s5cmd/pull/471)) +- Added `--all-versions` flag to `ls`, `rm`, `du` and `select` subcommands to apply operation on(/over) all versions of the objects. ([#475](https://github.com/peak/s5cmd/pull/475)) +- Added `--version-id` flag to `cat`, `cp`/`mv`, `rm`, `du` and `select` subcommands to apply operation on(/over) a specific versions of the object. ([#475](https://github.com/peak/s5cmd/pull/475)) +- Added `bucket-version` command to configure bucket versioning. Bucket name + alone returns the bucket versioning status of the bucket. Bucket versioning can + be configured with `set` flag. ([#475](https://github.com/peak/s5cmd/pull/475)) +- Added `--raw` flag to `cat` and `select` subcommands. It disables the wildcard operations. ([#475](https://github.com/peak/s5cmd/pull/475)) #### Improvements - Disable AWS SDK logger if log level is not `trace`. ([##460](https://github.com/peak/s5cmd/pull/460)) diff --git a/command/app.go b/command/app.go index 283d843aa..ceafb7394 100644 --- a/command/app.go +++ b/command/app.go @@ -207,6 +207,7 @@ func Commands() []*cli.Command { NewRunCommand(), NewSyncCommand(), NewVersionCommand(), + NewBucketVersionCommand(), } } diff --git a/command/bucket_version.go b/command/bucket_version.go new file mode 100644 index 000000000..fb0875b76 --- /dev/null +++ b/command/bucket_version.go @@ -0,0 +1,151 @@ +package command + +import ( + "context" + "fmt" + "strings" + + "github.com/urfave/cli/v2" + + "github.com/peak/s5cmd/log" + "github.com/peak/s5cmd/storage" + "github.com/peak/s5cmd/storage/url" + "github.com/peak/s5cmd/strutil" +) + +var bucketVersionHelpTemplate = `Name: + {{.HelpName}} - {{.Usage}} + +Usage: + {{.HelpName}} [options] s3://bucketname + +Options: + {{range .VisibleFlags}}{{.}} + {{end}} +Examples: + 1. Get bucket versioning status of a bucket + > s5cmd {{.HelpName}} s3://bucketname + + 2. Enable bucket versioning for the bucket + > s5cmd {{.HelpName}} --set Enabled s3://bucketname + + 3. Suspend bucket versioning for the bucket + > s5cmd {{.HelpName}} --set Suspended s3://bucketname +` + +func NewBucketVersionCommand() *cli.Command { + cmd := &cli.Command{ + Name: "bucket-version", + CustomHelpTemplate: bucketVersionHelpTemplate, + HelpName: "bucket-version", + Usage: "configure bucket versioning", + Flags: []cli.Flag{ + &cli.GenericFlag{ + Name: "set", + Value: &EnumValue{ + Enum: []string{"Suspended", "Enabled"}, + Default: "", + ConditionFunction: strings.EqualFold, + }, + Usage: "set versioning status of bucket: (Suspended, Enabled)", + }, + }, + Before: func(ctx *cli.Context) error { + if err := checkNumberOfArguments(ctx, 1, 1); err != nil { + printError(commandFromContext(ctx), ctx.Command.Name, err) + return err + } + return nil + }, + Action: func(c *cli.Context) error { + status := c.String("set") + + fullCommand := commandFromContext(c) + + bucket, err := url.New(c.Args().First()) + if err != nil { + printError(fullCommand, c.Command.Name, err) + return err + } + + return BucketVersion{ + src: bucket, + op: c.Command.Name, + fullCommand: fullCommand, + + status: status, + storageOpts: NewStorageOpts(c), + }.Run(c.Context) + }, + } + cmd.BashComplete = getBashCompleteFn(cmd, true, true) + return cmd +} + +type BucketVersion struct { + src *url.URL + op string + fullCommand string + + status string + storageOpts storage.Options +} + +func (v BucketVersion) Run(ctx context.Context) error { + client, err := storage.NewRemoteClient(ctx, &url.URL{}, v.storageOpts) + if err != nil { + printError(v.fullCommand, v.op, err) + return err + } + + if v.status != "" { + v.status = strutil.CapitalizeFirstRune(v.status) + + err := client.SetBucketVersioning(ctx, v.status, v.src.Bucket) + if err != nil { + printError(v.fullCommand, v.op, err) + return err + } + msg := BucketVersionMessage{ + Bucket: v.src.Bucket, + Status: v.status, + isSet: true, + } + log.Info(msg) + return nil + } + + status, err := client.GetBucketVersioning(ctx, v.src.Bucket) + if err != nil { + printError(v.fullCommand, v.op, err) + return err + } + + msg := BucketVersionMessage{ + Bucket: v.src.Bucket, + Status: status, + isSet: false, + } + log.Info(msg) + return nil +} + +type BucketVersionMessage struct { + Bucket string `json:"bucket"` + Status string `json:"status"` + isSet bool +} + +func (v BucketVersionMessage) String() string { + if v.isSet { + return fmt.Sprintf("Bucket versioning for %q is set to %q", v.Bucket, v.Status) + } + if v.Status != "" { + return fmt.Sprintf("Bucket versioning for %q is %q", v.Bucket, v.Status) + } + return fmt.Sprintf("%q is an unversioned bucket", v.Bucket) +} + +func (v BucketVersionMessage) JSON() string { + return strutil.JSON(v) +} diff --git a/command/cat.go b/command/cat.go index 2a3b1242c..6ed2af502 100644 --- a/command/cat.go +++ b/command/cat.go @@ -25,13 +25,26 @@ Options: Examples: 1. Print a remote object's content to stdout > s5cmd {{.HelpName}} s3://bucket/prefix/object + + 2. Print specific version of a remote object's content to stdout + > s5cmd {{.HelpName}} --version-id VERSION_ID s3://bucket/prefix/object ` func NewCatCommand() *cli.Command { cmd := &cli.Command{ - Name: "cat", - HelpName: "cat", - Usage: "print remote object content", + Name: "cat", + HelpName: "cat", + Usage: "print remote object content", + Flags: []cli.Flag{ + &cli.BoolFlag{ + Name: "raw", + Usage: "disable the wildcard operations, useful with filenames that contains glob characters", + }, + &cli.StringFlag{ + Name: "version-id", + Usage: "use the specified version of an object", + }, + }, CustomHelpTemplate: catHelpTemplate, Before: func(c *cli.Context) error { err := validateCatCommand(c) @@ -43,9 +56,11 @@ func NewCatCommand() *cli.Command { Action: func(c *cli.Context) (err error) { defer stat.Collect(c.Command.FullName(), &err)() - src, err := url.New(c.Args().Get(0)) op := c.Command.Name fullCommand := commandFromContext(c) + + src, err := url.New(c.Args().Get(0), url.WithVersion(c.String("version-id")), + url.WithRaw(c.Bool("raw"))) if err != nil { printError(fullCommand, op, err) return err @@ -102,8 +117,8 @@ func validateCatCommand(c *cli.Context) error { return fmt.Errorf("expected only one argument") } - src, err := url.New(c.Args().Get(0)) - + src, err := url.New(c.Args().Get(0), url.WithVersion(c.String("version-id")), + url.WithRaw(c.Bool("raw"))) if err != nil { return err } @@ -119,5 +134,10 @@ func validateCatCommand(c *cli.Context) error { if src.IsWildcard() { return fmt.Errorf("remote source %q can not contain glob characters", src) } + + if err := checkVersioningWithGoogleEndpoint(c); err != nil { + return err + } + return nil } diff --git a/command/cp.go b/command/cp.go index b6b766d65..cb94076e1 100644 --- a/command/cp.go +++ b/command/cp.go @@ -100,6 +100,9 @@ Examples: 21. Upload a file to S3 with a content-type and content-encoding header > s5cmd --content-type "text/css" --content-encoding "br" myfile.css.br s3://bucket/ + + 22. Download the specific version of a remote object to working directory + > s5cmd {{.HelpName}} --version-id VERSION_ID s3://bucket/prefix/object . ` func NewSharedFlags() []cli.Flag { @@ -207,6 +210,10 @@ func NewCopyCommandFlags() []cli.Flag { Aliases: []string{"u"}, Usage: "only overwrite destination if source modtime is newer", }, + &cli.StringFlag{ + Name: "version-id", + Usage: "use the specified version of an object", + }, } sharedFlags := NewSharedFlags() return append(copyFlags, sharedFlags...) @@ -230,7 +237,11 @@ func NewCopyCommand() *cli.Command { defer stat.Collect(c.Command.FullName(), &err)() // don't delete source - return NewCopy(c, false).Run(c.Context) + copy, err := NewCopy(c, false) + if err != nil { + return err + } + return copy.Run(c.Context) }, } @@ -240,8 +251,8 @@ func NewCopyCommand() *cli.Command { // Copy holds copy operation flags and states. type Copy struct { - src string - dst string + src *url.URL + dst *url.URL op string fullCommand string @@ -259,7 +270,6 @@ type Copy struct { acl string forceGlacierTransfer bool ignoreGlacierWarnings bool - raw bool exclude []string cacheControl string expires string @@ -277,12 +287,27 @@ type Copy struct { } // NewCopy creates Copy from cli.Context. -func NewCopy(c *cli.Context, deleteSource bool) Copy { - return Copy{ - src: c.Args().Get(0), - dst: c.Args().Get(1), +func NewCopy(c *cli.Context, deleteSource bool) (*Copy, error) { + fullCommand := commandFromContext(c) + + src, err := url.New(c.Args().Get(0), url.WithVersion(c.String("version-id")), + url.WithRaw(c.Bool("raw"))) + if err != nil { + printError(fullCommand, c.Command.Name, err) + return nil, err + } + + dst, err := url.New(c.Args().Get(1), url.WithRaw(c.Bool("raw"))) + if err != nil { + printError(fullCommand, c.Command.Name, err) + return nil, err + } + + return &Copy{ + src: src, + dst: dst, op: c.Command.Name, - fullCommand: commandFromContext(c), + fullCommand: fullCommand, deleteSource: deleteSource, // flags noClobber: c.Bool("no-clobber"), @@ -299,7 +324,6 @@ func NewCopy(c *cli.Context, deleteSource bool) Copy { forceGlacierTransfer: c.Bool("force-glacier-transfer"), ignoreGlacierWarnings: c.Bool("ignore-glacier-warnings"), exclude: c.StringSlice("exclude"), - raw: c.Bool("raw"), cacheControl: c.String("cache-control"), expires: c.String("expires"), contentType: c.String("content-type"), @@ -309,7 +333,7 @@ func NewCopy(c *cli.Context, deleteSource bool) Copy { dstRegion: c.String("destination-region"), storageOpts: NewStorageOpts(c), - } + }, nil } const fdlimitWarning = ` @@ -320,30 +344,18 @@ increase the open file limit or try to decrease the number of workers with // Run starts copying given source objects to destination. func (c Copy) Run(ctx context.Context) error { - srcurl, err := url.New(c.src, url.WithRaw(c.raw)) - if err != nil { - printError(c.fullCommand, c.op, err) - return err - } - - dsturl, err := url.New(c.dst, url.WithRaw(c.raw)) - if err != nil { - printError(c.fullCommand, c.op, err) - return err - } - // override source region if set if c.srcRegion != "" { c.storageOpts.SetRegion(c.srcRegion) } - client, err := storage.NewClient(ctx, srcurl, c.storageOpts) + client, err := storage.NewClient(ctx, c.src, c.storageOpts) if err != nil { printError(c.fullCommand, c.op, err) return err } - objch, err := expandSource(ctx, client, c.followSymlinks, srcurl) + objch, err := expandSource(ctx, client, c.followSymlinks, c.src) if err != nil { printError(c.fullCommand, c.op, err) @@ -372,9 +384,9 @@ func (c Copy) Run(ctx context.Context) error { } }() - isBatch := srcurl.IsWildcard() - if !isBatch && !srcurl.IsRemote() { - obj, _ := client.Stat(ctx, srcurl) + isBatch := c.src.IsWildcard() + if !isBatch && !c.src.IsRemote() { + obj, _ := client.Stat(ctx, c.src) isBatch = obj != nil && obj.Type.IsDir() } @@ -404,7 +416,7 @@ func (c Copy) Run(ctx context.Context) error { continue } - if isURLExcluded(excludePatterns, object.URL.Path, srcurl.Prefix) { + if isURLExcluded(excludePatterns, object.URL.Path, c.src.Prefix) { continue } @@ -412,12 +424,12 @@ func (c Copy) Run(ctx context.Context) error { var task parallel.Task switch { - case srcurl.Type == dsturl.Type: // local->local or remote->remote - task = c.prepareCopyTask(ctx, srcurl, dsturl, isBatch) + case srcurl.Type == c.dst.Type: // local->local or remote->remote + task = c.prepareCopyTask(ctx, srcurl, c.dst, isBatch) case srcurl.IsRemote(): // remote->local - task = c.prepareDownloadTask(ctx, srcurl, dsturl, isBatch) - case dsturl.IsRemote(): // local->remote - task = c.prepareUploadTask(ctx, srcurl, dsturl, isBatch) + task = c.prepareDownloadTask(ctx, srcurl, c.dst, isBatch) + case c.dst.IsRemote(): // local->remote + task = c.prepareUploadTask(ctx, srcurl, c.dst, isBatch) default: panic("unexpected src-dst pair") } @@ -848,7 +860,8 @@ func validateCopyCommand(c *cli.Context) error { src := c.Args().Get(0) dst := c.Args().Get(1) - srcurl, err := url.New(src, url.WithRaw(c.Bool("raw"))) + srcurl, err := url.New(src, url.WithVersion(c.String("version-id")), + url.WithRaw(c.Bool("raw"))) if err != nil { return err } @@ -874,6 +887,14 @@ func validateCopyCommand(c *cli.Context) error { return fmt.Errorf("target %q must be a bucket or a prefix", dsturl) } + if err := checkVersinoningURLRemote(srcurl); err != nil { + return err + } + + if err := checkVersioningWithGoogleEndpoint(c); err != nil { + return err + } + switch { case srcurl.Type == dsturl.Type: return validateCopy(srcurl, dsturl) diff --git a/command/du.go b/command/du.go index 380ab47c1..a380f4283 100644 --- a/command/du.go +++ b/command/du.go @@ -4,6 +4,8 @@ import ( "context" "fmt" + urlpkg "net/url" + "github.com/hashicorp/go-multierror" "github.com/urfave/cli/v2" @@ -26,13 +28,25 @@ Options: {{end}} Examples: 1. Show disk usage of all objects in a bucket - > s5cmd {{.HelpName}} s3://bucket/* + > s5cmd {{.HelpName}} "s3://bucket/*" 2. Show disk usage of all objects that match a wildcard, grouped by storage class - > s5cmd {{.HelpName}} --group s3://bucket/prefix/obj*.gz + > s5cmd {{.HelpName}} --group "s3://bucket/prefix/obj*.gz" 3. Show disk usage of all objects in a bucket but exclude the ones with py extension or starts with main - > s5cmd {{.HelpName}} --exclude "*.py" --exclude "main*" s3://bucket/* + > s5cmd {{.HelpName}} --exclude "*.py" --exclude "main*" "s3://bucket/*" + + 4. Show disk usage of all versions of an object in the bucket + > s5cmd {{.HelpName}} --all-versions s3://bucket/object + + 5. Show disk usage of all versions of all objects that starts with a prefix in the bucket + > s5cmd {{.HelpName}} --all-versions "s3://bucket/prefix*" + + 6. Show disk usage of all versions of all objects in the bucket + > s5cmd {{.HelpName}} --all-versions "s3://bucket/*" + + 7. Show disk usage of a specific version of an object in the bucket + > s5cmd {{.HelpName}} --version-id VERSION_ID s3://bucket/object ` func NewSizeCommand() *cli.Command { @@ -56,6 +70,14 @@ func NewSizeCommand() *cli.Command { Name: "exclude", Usage: "exclude objects with given pattern", }, + &cli.BoolFlag{ + Name: "all-versions", + Usage: "list all versions of object(s)", + }, + &cli.StringFlag{ + Name: "version-id", + Usage: "use the specified version of an object", + }, }, Before: func(c *cli.Context) error { err := validateDUCommand(c) @@ -67,10 +89,20 @@ func NewSizeCommand() *cli.Command { Action: func(c *cli.Context) (err error) { defer stat.Collect(c.Command.FullName(), &err)() + fullCommand := commandFromContext(c) + + srcurl, err := url.New(c.Args().First(), + url.WithAllVersions(c.Bool("all-versions")), + url.WithVersion(c.String("version-id"))) + if err != nil { + printError(fullCommand, c.Command.Name, err) + return err + } + return Size{ - src: c.Args().First(), + src: srcurl, op: c.Command.Name, - fullCommand: commandFromContext(c), + fullCommand: fullCommand, // flags groupByClass: c.Bool("group"), humanize: c.Bool("humanize"), @@ -87,7 +119,7 @@ func NewSizeCommand() *cli.Command { // Size holds disk usage (du) operation flags and states. type Size struct { - src string + src *url.URL op string fullCommand string @@ -101,12 +133,7 @@ type Size struct { // Run calculates disk usage of given source. func (sz Size) Run(ctx context.Context) error { - srcurl, err := url.New(sz.src) - if err != nil { - return err - } - - client, err := storage.NewClient(ctx, srcurl, sz.storageOpts) + client, err := storage.NewClient(ctx, sz.src, sz.storageOpts) if err != nil { printError(sz.fullCommand, sz.op, err) return err @@ -123,7 +150,7 @@ func (sz Size) Run(ctx context.Context) error { return err } - for object := range client.List(ctx, srcurl, false) { + for object := range client.List(ctx, sz.src, false) { if object.Type.IsDir() || errorpkg.IsCancelation(object.Err) { continue } @@ -134,7 +161,7 @@ func (sz Size) Run(ctx context.Context) error { continue } - if isURLExcluded(excludePatterns, object.URL.Path, srcurl.Prefix) { + if isURLExcluded(excludePatterns, object.URL.Path, sz.src.Prefix) { continue } @@ -148,7 +175,7 @@ func (sz Size) Run(ctx context.Context) error { if !sz.groupByClass { msg := SizeMessage{ - Source: srcurl.String(), + Source: sz.src.String(), Count: total.count, Size: total.size, showHumanized: sz.humanize, @@ -159,7 +186,7 @@ func (sz Size) Run(ctx context.Context) error { for k, v := range storageTotal { msg := SizeMessage{ - Source: srcurl.String(), + Source: sz.src.String(), StorageClass: k, Count: v.count, Size: v.size, @@ -222,5 +249,27 @@ func validateDUCommand(c *cli.Context) error { if c.Args().Len() != 1 { return fmt.Errorf("expected only 1 argument") } + + if err := checkVersioningFlagCompatibility(c); err != nil { + return err + } + + srcurl, err := url.New(c.Args().First(), + url.WithAllVersions(c.Bool("all-versions"))) + if err != nil { + return err + } + + if err := checkVersinoningURLRemote(srcurl); err != nil { + return err + } + + // the "all-versions" flag of du command works with GCS, because it does not + // depend on the generation numbers. + endpoint, err := urlpkg.Parse(c.String("endpoint-url")) + if err == nil && c.String("version-id") != "" && storage.IsGoogleEndpoint(*endpoint) { + return fmt.Errorf(versioningNotSupportedWarning, endpoint) + } + return nil } diff --git a/command/expand.go b/command/expand.go index f58c29586..9f419834d 100644 --- a/command/expand.go +++ b/command/expand.go @@ -32,7 +32,7 @@ func expandSource( } // call storage.List for only walking operations. - if srcurl.IsWildcard() || isDir { + if srcurl.IsWildcard() || srcurl.AllVersions || isDir { return client.List(ctx, srcurl, followSymlinks), nil } diff --git a/command/expand_test.go b/command/expand_test.go index 059da8e9b..0681c86e4 100644 --- a/command/expand_test.go +++ b/command/expand_test.go @@ -132,7 +132,7 @@ func TestExpandSources(t *testing.T) { for _, tc := range tests { tc := tc t.Run(tc.name, func(t *testing.T) { - srcurls, err := newURLs(false, keys(tc.src)...) + srcurls, err := newURLs(false, "", false, keys(tc.src)...) if err != nil { t.Errorf("unexpected error: %v", err) return diff --git a/command/flag.go b/command/flag.go index b0b9e261c..f29cdb08b 100644 --- a/command/flag.go +++ b/command/flag.go @@ -6,14 +6,23 @@ import ( ) type EnumValue struct { - Enum []string - Default string - selected string + Enum []string + Default string + // ConditionFunction is used to check if the value passed to Set method is valid + // or not. + // If ConditionFunction is not set, it defaults to string '==' comparison. + ConditionFunction func(str, target string) bool + selected string } func (e *EnumValue) Set(value string) error { + if e.ConditionFunction == nil { + e.ConditionFunction = func(str, target string) bool { + return str == target + } + } for _, enum := range e.Enum { - if enum == value { + if e.ConditionFunction(enum, value) { e.selected = value return nil } @@ -28,3 +37,7 @@ func (e EnumValue) String() string { } return e.selected } + +func (e EnumValue) Get() interface{} { + return e +} diff --git a/command/ls.go b/command/ls.go index 2f1183301..86e6efbc0 100644 --- a/command/ls.go +++ b/command/ls.go @@ -32,19 +32,29 @@ Examples: > s5cmd {{.HelpName}} s3://bucket/ 3. List all objects in a bucket - > s5cmd {{.HelpName}} s3://bucket/* + > s5cmd {{.HelpName}} "s3://bucket/*" 4. List all objects that matches a wildcard - > s5cmd {{.HelpName}} s3://bucket/prefix/*/*.gz + > s5cmd {{.HelpName}} "s3://bucket/prefix/*/*.gz" 5. List all objects in a public bucket - > s5cmd --no-sign-request {{.HelpName}} s3://bucket/* + > s5cmd --no-sign-request {{.HelpName}} "s3://bucket/*" 6. List all objects in a bucket but exclude the ones with prefix abc - > s5cmd {{.HelpName}} --exclude "abc*" s3://bucket/* + > s5cmd {{.HelpName}} --exclude "abc*" "s3://bucket/*" 7. List all object in a requester pays bucket - > s5cmd --request-payer=requester {{.HelpName}} s3://bucket/* + > s5cmd --request-payer=requester {{.HelpName}} "s3://bucket/*" + + 8. List all versions of an object in the bucket + > s5cmd {{.HelpName}} --all-versions s3://bucket/object + + 9. List all versions of all objects that starts with a prefix in the bucket + > s5cmd {{.HelpName}} --all-versions "s3://bucket/prefix*" + + 10. List all versions of all objects in the bucket + > s5cmd {{.HelpName}} --all-versions "s3://bucket/*" + ` func NewListCommand() *cli.Command { @@ -73,6 +83,10 @@ func NewListCommand() *cli.Command { Name: "exclude", Usage: "exclude objects with given pattern", }, + &cli.BoolFlag{ + Name: "all-versions", + Usage: "list all versions of object(s)", + }, }, Before: func(c *cli.Context) error { err := validateLSCommand(c) @@ -91,10 +105,18 @@ func NewListCommand() *cli.Command { return err } + fullCommand := commandFromContext(c) + + srcurl, err := url.New(c.Args().First(), + url.WithAllVersions(c.Bool("all-versions"))) + if err != nil { + printError(fullCommand, c.Command.Name, err) + return err + } return List{ - src: c.Args().First(), + src: srcurl, op: c.Command.Name, - fullCommand: commandFromContext(c), + fullCommand: fullCommand, // flags showEtag: c.Bool("etag"), humanize: c.Bool("humanize"), @@ -112,7 +134,7 @@ func NewListCommand() *cli.Command { // List holds list operation flags and states. type List struct { - src string + src *url.URL op string fullCommand string @@ -148,13 +170,8 @@ func ListBuckets(ctx context.Context, storageOpts storage.Options) error { // Run prints objects at given source. func (l List) Run(ctx context.Context) error { - srcurl, err := url.New(l.src) - if err != nil { - printError(l.fullCommand, l.op, err) - return err - } - client, err := storage.NewClient(ctx, srcurl, l.storageOpts) + client, err := storage.NewClient(ctx, l.src, l.storageOpts) if err != nil { printError(l.fullCommand, l.op, err) return err @@ -168,7 +185,7 @@ func (l List) Run(ctx context.Context) error { return err } - for object := range client.List(ctx, srcurl, false) { + for object := range client.List(ctx, l.src, false) { if errorpkg.IsCancelation(object.Err) { continue } @@ -179,7 +196,7 @@ func (l List) Run(ctx context.Context) error { continue } - if isURLExcluded(excludePatterns, object.URL.Path, srcurl.Prefix) { + if isURLExcluded(excludePatterns, object.URL.Path, l.src.Prefix) { continue } @@ -222,21 +239,37 @@ const ( // String returns the string representation of ListMessage. func (l ListMessage) String() string { - var listFormat = "%19s %2s %-1s %12s %s" var etag string + // date and storage fiels + var listFormat = "%19s %2s" + + // align etag if l.showEtag { etag = l.Object.Etag - listFormat = "%19s %2s %-38s %12s %s" + listFormat = listFormat + " %-38s" + } else { + listFormat = listFormat + " %-1s" } + // format file size + listFormat = listFormat + " %12s " + // format key and version ID + if l.Object.URL.VersionID != "" { + listFormat = listFormat + " %-50s %s" + } else { + listFormat = listFormat + " %s%s" + } + + var s string if l.Object.Type.IsDir() { - s := fmt.Sprintf( + s = fmt.Sprintf( listFormat, "", "", "", "DIR", l.Object.URL.Relative(), + "", ) return s } @@ -246,14 +279,16 @@ func (l ListMessage) String() string { stclass = fmt.Sprintf("%v", l.Object.StorageClass) } - s := fmt.Sprintf( + s = fmt.Sprintf( listFormat, l.Object.ModTime.Format(dateFormat), stclass, etag, l.humanize(), l.Object.URL.Relative(), + l.Object.URL.VersionID, ) + return s } @@ -266,5 +301,20 @@ func validateLSCommand(c *cli.Context) error { if c.Args().Len() > 1 { return fmt.Errorf("expected only 1 argument") } + + srcurl, err := url.New(c.Args().First(), + url.WithAllVersions(c.Bool("all-versions"))) + if err != nil { + return err + } + + if err := checkVersinoningURLRemote(srcurl); err != nil { + return err + } + + if err := checkVersioningWithGoogleEndpoint(c); err != nil { + return err + } + return nil } diff --git a/command/mv.go b/command/mv.go index 0693fd3da..ade2f4112 100644 --- a/command/mv.go +++ b/command/mv.go @@ -52,7 +52,11 @@ func NewMoveCommand() *cli.Command { defer stat.Collect(c.Command.FullName(), &err)() // delete source - return NewCopy(c, true).Run(c.Context) + copy, err := NewCopy(c, true) + if err != nil { + return err + } + return copy.Run(c.Context) }, } diff --git a/command/rm.go b/command/rm.go index 1f193b0a5..9f1e9bc96 100644 --- a/command/rm.go +++ b/command/rm.go @@ -28,16 +28,28 @@ Examples: > s5cmd {{.HelpName}} s3://bucketname/prefix/object.gz 2. Delete all objects with a prefix - > s5cmd {{.HelpName}} s3://bucketname/prefix/* + > s5cmd {{.HelpName}} "s3://bucketname/prefix/*" 3. Delete all objects that matches a wildcard - > s5cmd {{.HelpName}} s3://bucketname/*/obj*.gz + > s5cmd {{.HelpName}} "s3://bucketname/*/obj*.gz" 4. Delete all matching objects and a specific object - > s5cmd {{.HelpName}} s3://bucketname/prefix/* s3://bucketname/object1.gz + > s5cmd {{.HelpName}} "s3://bucketname/prefix/*" s3://bucketname/object1.gz 5. Delete all matching objects but exclude the ones with .txt extension or starts with "main" - > s5cmd {{.HelpName}} --exclude "*.txt" --exclude "main*" s3://bucketname/prefix/* + > s5cmd {{.HelpName}} --exclude "*.txt" --exclude "main*" "s3://bucketname/prefix/*" + + 6. Delete the specific version of a remote object's content to stdout + > s5cmd {{.HelpName}} --version-id VERSION_ID s3://bucket/prefix/object + + 7. Delete all versions of an object in the bucket + > s5cmd {{.HelpName}} --all-versions s3://bucket/object + + 8. Delete all versions of all objects that starts with a prefix in the bucket + > s5cmd {{.HelpName}} --all-versions "s3://bucket/prefix*" + + 9. Delete all versions of all objects in the bucket + > s5cmd {{.HelpName}} --all-versions "s3://bucket/*" ` func NewDeleteCommand() *cli.Command { @@ -54,6 +66,14 @@ func NewDeleteCommand() *cli.Command { Name: "exclude", Usage: "exclude objects with given pattern", }, + &cli.BoolFlag{ + Name: "all-versions", + Usage: "list all versions of object(s)", + }, + &cli.StringFlag{ + Name: "version-id", + Usage: "use the specified version of an object", + }, }, CustomHelpTemplate: deleteHelpTemplate, Before: func(c *cli.Context) error { @@ -65,13 +85,21 @@ func NewDeleteCommand() *cli.Command { }, Action: func(c *cli.Context) (err error) { defer stat.Collect(c.Command.FullName(), &err)() + fullCommand := commandFromContext(c) + + sources := c.Args().Slice() + srcUrls, err := newURLs(c.Bool("raw"), c.String("version-id"), c.Bool("all-versions"), sources...) + if err != nil { + printError(fullCommand, c.Command.Name, err) + return err + } + return Delete{ - src: c.Args().Slice(), + src: srcUrls, op: c.Command.Name, - fullCommand: commandFromContext(c), + fullCommand: fullCommand, // flags - raw: c.Bool("raw"), exclude: c.StringSlice("exclude"), storageOpts: NewStorageOpts(c), @@ -85,13 +113,12 @@ func NewDeleteCommand() *cli.Command { // Delete holds delete operation flags and states. type Delete struct { - src []string + src []*url.URL op string fullCommand string // flag options exclude []string - raw bool // storage options storageOpts storage.Options @@ -99,12 +126,8 @@ type Delete struct { // Run remove given sources. func (d Delete) Run(ctx context.Context) error { - srcurls, err := newURLs(d.raw, d.src...) - if err != nil { - printError(d.fullCommand, d.op, err) - return err - } - srcurl := srcurls[0] + + srcurl := d.src[0] client, err := storage.NewClient(ctx, srcurl, d.storageOpts) if err != nil { @@ -118,7 +141,7 @@ func (d Delete) Run(ctx context.Context) error { return err } - objch := expandSources(ctx, client, false, srcurls...) + objch := expandSources(ctx, client, false, d.src...) var ( merrorObjects error @@ -173,13 +196,18 @@ func (d Delete) Run(ctx context.Context) error { } // newSources creates object URL list from given sources. -func newURLs(urlMode bool, sources ...string) ([]*url.URL, error) { +func newURLs(isRaw bool, versionID string, isAllVersions bool, sources ...string) ([]*url.URL, error) { var urls []*url.URL for _, src := range sources { - srcurl, err := url.New(src, url.WithRaw(urlMode)) + srcurl, err := url.New(src, url.WithRaw(isRaw), url.WithVersion(versionID), + url.WithAllVersions(isAllVersions)) if err != nil { return nil, err } + + if err := checkVersinoningURLRemote(srcurl); err != nil { + return nil, err + } urls = append(urls, srcurl) } return urls, nil @@ -190,11 +218,29 @@ func validateRMCommand(c *cli.Context) error { return fmt.Errorf("expected at least 1 object to remove") } - srcurls, err := newURLs(c.Bool("raw"), c.Args().Slice()...) + // It might be a reasonable request too. Consider that user wants to delete + // all-versions of "a" and "b", but want to delete only a single + // version of "c" "someversion". User might want to express this as + // `s5cmd rm --all-versions a --all-versions b version-id someversion c` + // but, current implementation does not take repetitive flags into account, + // anyway, this is not supported in the current implementation. + if err := checkVersioningFlagCompatibility(c); err != nil { + return err + } + + if len(c.Args().Slice()) > 1 && c.String("version-id") != "" { + return fmt.Errorf("version-id flag can only be used with single source object") + } + + srcurls, err := newURLs(c.Bool("raw"), c.String("version-id"), c.Bool("all-versions"), c.Args().Slice()...) if err != nil { return err } + if err := checkVersioningWithGoogleEndpoint(c); err != nil { + return err + } + var ( firstBucket string hasRemote, hasLocal bool diff --git a/command/select.go b/command/select.go index 99f4cd5bd..d391f485f 100644 --- a/command/select.go +++ b/command/select.go @@ -67,6 +67,18 @@ func NewSelectCommand() *cli.Command { Name: "ignore-glacier-warnings", Usage: "turns off glacier warnings: ignore errors encountered during selecting objects", }, + &cli.BoolFlag{ + Name: "raw", + Usage: "disable the wildcard operations, useful with filenames that contains glob characters", + }, + &cli.BoolFlag{ + Name: "all-versions", + Usage: "list all versions of object(s)", + }, + &cli.StringFlag{ + Name: "version-id", + Usage: "use the specified version of the object", + }, }, CustomHelpTemplate: selectHelpTemplate, Before: func(c *cli.Context) error { @@ -79,10 +91,18 @@ func NewSelectCommand() *cli.Command { Action: func(c *cli.Context) (err error) { defer stat.Collect(c.Command.FullName(), &err)() + fullCommand := commandFromContext(c) + + src, err := url.New(c.Args().Get(0), url.WithVersion(c.String("version-id")), + url.WithRaw(c.Bool("raw")), url.WithAllVersions(c.Bool("all-versions"))) + if err != nil { + printError(fullCommand, c.Command.Name, err) + return err + } return Select{ - src: c.Args().Get(0), + src: src, op: c.Command.Name, - fullCommand: commandFromContext(c), + fullCommand: fullCommand, // flags query: c.String("query"), compressionType: c.String("compression"), @@ -101,7 +121,7 @@ func NewSelectCommand() *cli.Command { // Select holds select operation flags and states. type Select struct { - src string + src *url.URL op string fullCommand string @@ -117,13 +137,7 @@ type Select struct { // Run starts copying given source objects to destination. func (s Select) Run(ctx context.Context) error { - srcurl, err := url.New(s.src) - if err != nil { - printError(s.fullCommand, s.op, err) - return err - } - - client, err := storage.NewRemoteClient(ctx, srcurl, s.storageOpts) + client, err := storage.NewRemoteClient(ctx, s.src, s.storageOpts) if err != nil { printError(s.fullCommand, s.op, err) return err @@ -132,7 +146,7 @@ func (s Select) Run(ctx context.Context) error { ctx, cancel := context.WithCancel(ctx) defer cancel() - objch, err := expandSource(ctx, client, false, srcurl) + objch, err := expandSource(ctx, client, false, s.src) if err != nil { printError(s.fullCommand, s.op, err) return err @@ -203,7 +217,7 @@ func (s Select) Run(ctx context.Context) error { continue } - if isURLExcluded(excludePatterns, object.URL.Path, srcurl.Prefix) { + if isURLExcluded(excludePatterns, object.URL.Path, s.src.Prefix) { continue } @@ -237,9 +251,16 @@ func validateSelectCommand(c *cli.Context) error { return fmt.Errorf("expected source argument") } - src := c.Args().Get(0) + if err := checkVersioningFlagCompatibility(c); err != nil { + return err + } + + if err := checkVersioningWithGoogleEndpoint(c); err != nil { + return err + } - srcurl, err := url.New(src) + srcurl, err := url.New(c.Args().Get(0), url.WithVersion(c.String("version-id")), + url.WithRaw(c.Bool("raw")), url.WithAllVersions(c.Bool("all-versions"))) if err != nil { return err } diff --git a/command/validation.go b/command/validation.go new file mode 100644 index 000000000..8dc2a9ff5 --- /dev/null +++ b/command/validation.go @@ -0,0 +1,75 @@ +package command + +import ( + "fmt" + urlpkg "net/url" + + "github.com/peak/s5cmd/storage" + "github.com/peak/s5cmd/storage/url" + "github.com/urfave/cli/v2" +) + +const ( + versioningNotSupportedWarning = "versioning related features are not supported with the given endpoint %q" + allVersionsFlagName = "all-versions" + versionIDFlagName = "version-id" +) + +// checkVersinoningURLRemote checks if the versioning related flags are used with +// local objects. Because the versioning is only supported with s3. +func checkVersinoningURLRemote(url *url.URL) error { + if !url.IsRemote() && url.IsVersioned() { + return fmt.Errorf("%q, and %q flags can only be used with remote objects", allVersionsFlagName, versionIDFlagName) + } + return nil +} + +// checkVersioningFlagCompatibility checks if the incompatible versioning flags +// are used together. Because it is not allowed to refer to both "all versions" and +// a specific version of an object together. +func checkVersioningFlagCompatibility(ctx *cli.Context) error { + if ctx.Bool(allVersionsFlagName) && ctx.String(versionIDFlagName) != "" { + return fmt.Errorf("it is not allowed to combine %q and %q flags", allVersionsFlagName, versionIDFlagName) + } + return nil +} + +// checkVersioningWithGoogleEndpoint checks if the versioning flags are used with +// the Google Endpoint. Because the s3 versioning operations are not compatible with +// GCS's versioning API. +func checkVersioningWithGoogleEndpoint(ctx *cli.Context) error { + endpoint := ctx.String("endpoint-url") + if endpoint == "" { + return nil + } + + u, err := urlpkg.Parse(endpoint) + if err != nil { + return err + } + + if storage.IsGoogleEndpoint(*u) && (ctx.Bool(allVersionsFlagName) || ctx.String(versionIDFlagName) != "") { + return fmt.Errorf(versioningNotSupportedWarning, endpoint) + } + + return nil +} + +// checkNumberOfArguments checks if the number of the arguments is valid. +// if the max is negative then there is no upper limit of arguments. +func checkNumberOfArguments(ctx *cli.Context, min, max int) error { + l := ctx.Args().Len() + if min == 1 && max == 1 && l != 1 { + return fmt.Errorf("expected only one argument") + } + if min == 2 && max == 2 && l != 2 { + return fmt.Errorf("expected source and destination arguments") + } + if l < min { + return fmt.Errorf("expected at least %d arguments but was given %d: %q", min, l, ctx.Args().Slice()) + } + if max >= 0 && l > max { + return fmt.Errorf("expected at most %d arguments but was given %d: %q", min, l, ctx.Args().Slice()) + } + return nil +} diff --git a/e2e/bucket_version_test.go b/e2e/bucket_version_test.go new file mode 100644 index 000000000..78483d9f2 --- /dev/null +++ b/e2e/bucket_version_test.go @@ -0,0 +1,79 @@ +package e2e + +import ( + "testing" + + "gotest.tools/v3/icmd" +) + +func TestBucketVersioning(t *testing.T) { + skipTestIfGCS(t, "versioning is not supported in GCS") + + t.Parallel() + + bucket := s3BucketFromTestName(t) + s3client, s5cmd := setup(t, withS3Backend("mem")) + + createBucket(t, s3client, bucket) + + // check that when bucket is created, it is unversioned + cmd := s5cmd("bucket-version", "s3://"+bucket) + result := icmd.RunCmd(cmd) + + result.Assert(t, icmd.Success) + + assertLines(t, result.Stdout(), map[int]compareFunc{ + 0: equals("%q is an unversioned bucket", bucket), + }) + + testcases := []struct { + name string + versioningStatus string + expectedVersioningStatus string + }{ + { + name: "Enable Bucket Versioning", + versioningStatus: "Enabled", + expectedVersioningStatus: "Enabled", + }, + { + name: "Suspend Bucket Versioning", + versioningStatus: "Suspended", + expectedVersioningStatus: "Suspended", + }, + { + name: "Enable Bucket Versioning Case Insensitive", + versioningStatus: "eNaBlEd", + expectedVersioningStatus: "Enabled", + }, + { + name: "Suspend Bucket Versioning Case Insensitive", + versioningStatus: "sUsPenDeD", + expectedVersioningStatus: "Suspended", + }, + } + for _, tc := range testcases { + tc := tc + t.Run(tc.name, func(t *testing.T) { + // set bucket versioning to and check if the change succeeded + cmd = s5cmd("bucket-version", "--set", tc.versioningStatus, "s3://"+bucket) + result = icmd.RunCmd(cmd) + + result.Assert(t, icmd.Success) + + assertLines(t, result.Stdout(), map[int]compareFunc{ + 0: equals("Bucket versioning for %q is set to %q", bucket, tc.expectedVersioningStatus), + }) + + cmd = s5cmd("bucket-version", "s3://"+bucket) + result = icmd.RunCmd(cmd) + + result.Assert(t, icmd.Success) + + assertLines(t, result.Stdout(), map[int]compareFunc{ + 0: equals("Bucket versioning for %q is %q", bucket, tc.expectedVersioningStatus), + }) + + }) + } +} diff --git a/e2e/cat_test.go b/e2e/cat_test.go index af5e31d86..6c4c4099d 100644 --- a/e2e/cat_test.go +++ b/e2e/cat_test.go @@ -5,6 +5,8 @@ import ( "strings" "testing" + "github.com/google/go-cmp/cmp" + "gotest.tools/v3/assert" "gotest.tools/v3/icmd" ) @@ -213,3 +215,66 @@ func getSequentialFileContent() (string, map[int]compareFunc) { return sb.String(), expectedLines } + +func TestCatByVersionID(t *testing.T) { + skipTestIfGCS(t, "versioning is not supported in GCS") + + t.Parallel() + + bucket := s3BucketFromTestName(t) + + // versioninng is only supported with in memory backend! + s3client, s5cmd := setup(t, withS3Backend("mem")) + + const filename = "testfile.txt" + + var contents = []string{ + "This is first content", + "Second content it is, and it is a bit longer!!!", + } + + // create a bucket and Enable versioning + createBucket(t, s3client, bucket) + setBucketVersioning(t, s3client, bucket, "Enabled") + + // upload two versions of the file with same key + putFile(t, s3client, bucket, filename, contents[0]) + putFile(t, s3client, bucket, filename, contents[1]) + + // get content of the latest + cmd := s5cmd("cat", "s3://"+bucket+"/"+filename) + result := icmd.RunCmd(cmd) + + assert.Assert(t, result.Stdout() == contents[1]) + + if diff := cmp.Diff(contents[1], result.Stdout()); diff != "" { + t.Errorf("(-want +got):\n%v", diff) + } + + // now we will list and parse their version IDs + cmd = s5cmd("ls", "--all-versions", "s3://"+bucket+"/"+filename) + result = icmd.RunCmd(cmd) + + assertLines(t, result.Stdout(), map[int]compareFunc{ + 0: contains("%v", filename), + 1: contains("%v", filename), + }) + + versionIDs := make([]string, 0) + for _, row := range strings.Split(result.Stdout(), "\n") { + if row != "" { + arr := strings.Split(row, " ") + versionIDs = append(versionIDs, arr[len(arr)-1]) + } + } + + for i, version := range versionIDs { + cmd = s5cmd("cat", "--version-id", version, + fmt.Sprintf("s3://%v/%v", bucket, filename)) + result = icmd.RunCmd(cmd) + + if diff := cmp.Diff(contents[i], result.Stdout()); diff != "" { + t.Errorf("(-want +got):\n%v", diff) + } + } +} diff --git a/e2e/cp_test.go b/e2e/cp_test.go index 6e2d21ab2..ad4a64896 100644 --- a/e2e/cp_test.go +++ b/e2e/cp_test.go @@ -27,6 +27,8 @@ import ( "os" "path/filepath" "runtime" + "strconv" + "strings" "testing" "time" @@ -4057,6 +4059,65 @@ func TestCopySingleFileToS3WithNoSuchUploadRetryCount(t *testing.T) { assert.Assert(t, ensureS3Object(s3client, bucket, filename, content)) } +func TestVersionedDownload(t *testing.T) { + t.Parallel() + + bucket := s3BucketFromTestName(t) + + // versioninng is only supported with in memory backend! + s3client, s5cmd := setup(t, withS3Backend("mem")) + + const filename = "testfile.txt" + + var contents = []string{ + "This is first content", + "Second content it is, and it is a bit longer!!!", + } + + workdir := fs.NewDir(t, t.Name(), fs.WithFile(filename+"1", contents[0]), fs.WithFile(filename+"2", contents[1])) + defer workdir.Remove() + + // create a bucket and Enable versioning + createBucket(t, s3client, bucket) + setBucketVersioning(t, s3client, bucket, "Enabled") + + // upload two versions of the file with same key + putFile(t, s3client, bucket, filename, contents[0]) + putFile(t, s3client, bucket, filename, contents[1]) + + // we expect to see 2 versions of objects + cmd := s5cmd("ls", "--all-versions", "s3://"+bucket+"/"+filename) + result := icmd.RunCmd(cmd) + + assertLines(t, result.Stdout(), map[int]compareFunc{ + 0: contains("%v", filename), + 1: contains("%v", filename), + }) + + // now we will parse their version IDs in the order we put them into s3 server. + // the rest of the tests depends on this assumption + versionIDs := make([]string, 0) + for _, row := range strings.Split(result.Stdout(), "\n") { + if row != "" { + arr := strings.Split(row, " ") + versionIDs = append(versionIDs, arr[len(arr)-1]) + } + } + + // create new dir to download files + newDir := fs.NewDir(t, t.Name()) + defer newDir.Remove() + + // download both old and new versions of the file to newDir + for i, version := range versionIDs { + cmd = s5cmd("cp", "--version-id", version, + fmt.Sprintf("s3://%v/%v", bucket, filename), newDir.Path()+"/"+filename+strconv.Itoa(1+i)) + _ = icmd.RunCmd(cmd) + } + + assert.Assert(t, fs.Equal(workdir.Path(), fs.ManifestFromDir(t, newDir.Path()))) +} + // Before downloading a file from s3 a local target file is created. If download // fails the created file should be deleted. func TestDeleteFileWhenDownloadFailed(t *testing.T) { diff --git a/e2e/du_test.go b/e2e/du_test.go index b72b2ef49..0dfb04ad7 100644 --- a/e2e/du_test.go +++ b/e2e/du_test.go @@ -1,6 +1,7 @@ package e2e import ( + "fmt" "strings" "testing" @@ -228,3 +229,74 @@ func TestDiskUsageWildcardWithExcludeFilters(t *testing.T) { 0: suffix(`84 bytes in 3 objects: s3://%v/*`, bucket), }) } + +func TestDiskUsageByVersionIDAndAllVersions(t *testing.T) { + skipTestIfGCS(t, "versioning is not supported in GCS") + + t.Parallel() + + bucket := s3BucketFromTestName(t) + + // versioninng is only supported with in memory backend! + s3client, s5cmd := setup(t, withS3Backend("mem")) + + const filename = "testfile.txt" + + var ( + contents = []string{ + "This is first content", + "Second content it is, and it is a bit longer!!!", + } + sizes = []int{len(contents[0]), len(contents[1])} + ) + + // create a bucket and Enable versioning + createBucket(t, s3client, bucket) + setBucketVersioning(t, s3client, bucket, "Enabled") + + // upload two versions of the file with same key + putFile(t, s3client, bucket, filename, contents[0]) + putFile(t, s3client, bucket, filename, contents[1]) + + // get disk usage + cmd := s5cmd("du", "s3://"+bucket+"/"+filename) + result := icmd.RunCmd(cmd) + + assertLines(t, result.Stdout(), map[int]compareFunc{ + 0: equals(fmt.Sprintf("%d bytes in %d objects: s3://%v/%v", sizes[1], 1, bucket, filename)), + }) + + // we expect to see disk usage of 2 versions of objects + cmd = s5cmd("du", "--all-versions", "s3://"+bucket+"/"+filename) + result = icmd.RunCmd(cmd) + + assertLines(t, result.Stdout(), map[int]compareFunc{ + 0: equals(fmt.Sprintf("%d bytes in %d objects: s3://%v/%v", sizes[0]+sizes[1], 2, bucket, filename)), + }) + + // now we will list and parse their version IDs + cmd = s5cmd("ls", "--all-versions", "s3://"+bucket+"/"+filename) + result = icmd.RunCmd(cmd) + + assertLines(t, result.Stdout(), map[int]compareFunc{ + 0: contains("%v", filename), + 1: contains("%v", filename), + }) + + versionIDs := make([]string, 0) + for _, row := range strings.Split(result.Stdout(), "\n") { + if row != "" { + arr := strings.Split(row, " ") + versionIDs = append(versionIDs, arr[len(arr)-1]) + } + } + + for i, version := range versionIDs { + cmd = s5cmd("du", "--version-id", version, + fmt.Sprintf("s3://%v/%v", bucket, filename)) + result = icmd.RunCmd(cmd) + assertLines(t, result.Stdout(), map[int]compareFunc{ + 0: equals(fmt.Sprintf("%d bytes in %d objects: s3://%v/%v", sizes[i], 1, bucket, filename)), + }) + } +} diff --git a/e2e/rm_test.go b/e2e/rm_test.go index 4efbd62aa..4a01b6a40 100644 --- a/e2e/rm_test.go +++ b/e2e/rm_test.go @@ -4,6 +4,7 @@ import ( "fmt" "path/filepath" "runtime" + "strings" "testing" "gotest.tools/v3/assert" @@ -1167,3 +1168,136 @@ func TestRemovetNonexistingLocalFile(t *testing.T) { 0: equals(`ERROR "rm nonexistentfile": no object found`), }, strictLineCheck(true)) } + +func TestVersionedListAndRemove(t *testing.T) { + skipTestIfGCS(t, "versioning is not supported in GCS") + + t.Parallel() + + bucket := s3BucketFromTestName(t) + + // versioninng is only supported with in memory backend! + s3client, s5cmd := setup(t, withS3Backend("mem")) + + const ( + filename = "testfile.txt" + firstContent = "this is the first content" + firstExpectedContent = firstContent + "\n" + secondContent = "this is the second content: different than the first." + secondExpectedContent = secondContent + "\n" + ) + + workdir := fs.NewDir(t, t.Name(), fs.WithFile(filename+"1", firstContent), fs.WithFile(filename+"2", firstContent)) + defer workdir.Remove() + + // create a bucket and Enable versioning + createBucket(t, s3client, bucket) + setBucketVersioning(t, s3client, bucket, "Enabled") + + // upload two versions of the file with same key + putFile(t, s3client, bucket, filename, firstExpectedContent) + putFile(t, s3client, bucket, filename, secondExpectedContent) + + // remove (add a delete marker) + cmd := s5cmd("rm", "s3://"+bucket+"/"+filename) + result := icmd.RunCmd(cmd) + + assertLines(t, result.Stdout(), map[int]compareFunc{ + 0: contains("rm s3://%v/%v", bucket, filename), + }) + + // we expect to see it empty + cmd = s5cmd("ls", "s3://"+bucket+"/"+filename) + result = icmd.RunCmd(cmd) + + assert.Assert(t, result.Stdout() == "") + + // we expect to see 3 versions of objects one of which is a delete marker + cmd = s5cmd("ls", "--all-versions", "s3://"+bucket+"/"+filename) + result = icmd.RunCmd(cmd) + + assertLines(t, result.Stdout(), map[int]compareFunc{ + 0: contains("%v", filename), + 1: contains("%v", filename), + 2: contains("%v", filename), + }) + + // we expect all 3 versions of objects (one of which is a delete marker) to be deleted + cmd = s5cmd("rm", "--all-versions", "s3://"+bucket+"/"+filename) + result = icmd.RunCmd(cmd) + + assertLines(t, result.Stdout(), map[int]compareFunc{ + 0: contains("rm s3://%v/%v", bucket, filename), + 1: contains("rm s3://%v/%v", bucket, filename), + 2: contains("rm s3://%v/%v", bucket, filename), + }) + + // all versions are deleted so we don't expect to see any result + cmd = s5cmd("ls", "--all-versions", "s3://"+bucket+"/"+filename) + result = icmd.RunCmd(cmd) + assert.Assert(t, result.Stdout() == "") +} + +func TestRemoveByVersionID(t *testing.T) { + skipTestIfGCS(t, "versioning is not supported in GCS") + + t.Parallel() + + bucket := s3BucketFromTestName(t) + + // versioninng is only supported with in memory backend! + s3client, s5cmd := setup(t, withS3Backend("mem")) + + const filename = "testfile.txt" + + var contents = []string{ + "This is first content", + "Second content it is, and it is a bit longer!!!", + } + + // create a bucket and Enable versioning + createBucket(t, s3client, bucket) + setBucketVersioning(t, s3client, bucket, "Enabled") + + // upload two versions of the file with same key + putFile(t, s3client, bucket, filename, contents[0]) + putFile(t, s3client, bucket, filename, contents[1]) + + // remove (add a delete marker) + cmd := s5cmd("rm", "s3://"+bucket+"/"+filename) + result := icmd.RunCmd(cmd) + + assertLines(t, result.Stdout(), map[int]compareFunc{ + 0: contains("rm s3://%v/%v", bucket, filename), + }) + + // we expect to see 3 versions of objects + cmd = s5cmd("ls", "--all-versions", "s3://"+bucket+"/"+filename) + result = icmd.RunCmd(cmd) + + assertLines(t, result.Stdout(), map[int]compareFunc{ + 0: contains("%v", filename), + 1: contains("%v", filename), + 2: contains("%v", filename), + }) + + // now we will parse their version IDs and remove them one by one + versionIDs := make([]string, 0) + for _, row := range strings.Split(result.Stdout(), "\n") { + if row != "" { + arr := strings.Split(row, " ") + versionIDs = append(versionIDs, arr[len(arr)-1]) + } + } + + for _, version := range versionIDs { + cmd = s5cmd("rm", "--version-id", version, + fmt.Sprintf("s3://%v/%v", bucket, filename)) + _ = icmd.RunCmd(cmd) + } + + // all versions are deleted so we don't expect to see any result + cmd = s5cmd("ls", "--all-versions", "s3://"+bucket+"/"+filename) + result = icmd.RunCmd(cmd) + assert.Assert(t, result.Stdout() == "") +} diff --git a/e2e/util_test.go b/e2e/util_test.go index 06d5044c8..3bf64efeb 100644 --- a/e2e/util_test.go +++ b/e2e/util_test.go @@ -380,6 +380,10 @@ func createBucket(t *testing.T, client *s3.S3, bucket string) { t.Fatal(err) } + if !isEndpointFromEnv() { + return + } + t.Cleanup(func() { // cleanup if bucket exists. _, err := client.HeadBucket(&s3.HeadBucketInput{Bucket: aws.String(bucket)}) @@ -413,24 +417,51 @@ func createBucket(t *testing.T, client *s3.S3, bucket string) { keys = make([]*s3.ObjectIdentifier, 0) } - err = client.ListObjectsPages(&listInput, func(p *s3.ListObjectsOutput, lastPage bool) bool { - for _, c := range p.Contents { - objid := &s3.ObjectIdentifier{Key: c.Key} - keys = append(keys, objid) + listVersionsInput := s3.ListObjectVersionsInput{ + Bucket: aws.String(bucket), + } - if len(keys) == chunkSize { - _, err := client.DeleteObjects(&s3.DeleteObjectsInput{ - Bucket: aws.String(bucket), - Delete: &s3.Delete{Objects: keys}, - }) - if err != nil { - t.Fatal(err) + err = client.ListObjectVersionsPages(&listVersionsInput, + func(p *s3.ListObjectVersionsOutput, lastPage bool) bool { + for _, v := range p.Versions { + objid := &s3.ObjectIdentifier{ + Key: v.Key, + VersionId: v.VersionId, + } + keys = append(keys, objid) + + if len(keys) == chunkSize { + _, err := client.DeleteObjects(&s3.DeleteObjectsInput{ + Bucket: aws.String(bucket), + Delete: &s3.Delete{Objects: keys}, + }) + if err != nil { + t.Fatal(err) + } + initKeys() } - initKeys() } - } - return !lastPage - }) + + for _, d := range p.DeleteMarkers { + objid := &s3.ObjectIdentifier{ + Key: d.Key, + VersionId: d.VersionId, + } + keys = append(keys, objid) + + if len(keys) == chunkSize { + _, err := client.DeleteObjects(&s3.DeleteObjectsInput{ + Bucket: aws.String(bucket), + Delete: &s3.Delete{Objects: keys}, + }) + if err != nil { + t.Fatal(err) + } + initKeys() + } + } + return !lastPage + }) if err != nil { t.Fatal(err) } @@ -464,6 +495,19 @@ func isGoogleEndpointFromEnv(t *testing.T) bool { return storage.IsGoogleEndpoint(*endpoint) } +func setBucketVersioning(t *testing.T, s3client *s3.S3, bucket string, versioning string) { + t.Helper() + _, err := s3client.PutBucketVersioning(&s3.PutBucketVersioningInput{ + Bucket: aws.String(bucket), + VersioningConfiguration: &s3.VersioningConfiguration{ + Status: aws.String(versioning), + }, + }) + if err != nil { + t.Fatal(err) + } +} + var errS3NoSuchKey = fmt.Errorf("s3: no such key") type ensureOpts struct { diff --git a/e2e/version_test.go b/e2e/version_test.go index d6bfd45c2..f5cc90810 100644 --- a/e2e/version_test.go +++ b/e2e/version_test.go @@ -14,7 +14,7 @@ func TestVersion(t *testing.T) { cmd := s5cmd("version") result := icmd.RunCmd(cmd) - // make sure that -version flag works as expected: + // make sure that version subcommand works as expected: // https://github.com/peak/s5cmd/issues/70#issuecomment-592218542 result.Assert(t, icmd.Success) } diff --git a/log/message.go b/log/message.go index 0938ff066..b29621841 100644 --- a/log/message.go +++ b/log/message.go @@ -20,6 +20,10 @@ type InfoMessage struct { Source *url.URL `json:"source"` Destination *url.URL `json:"destination,omitempty"` Object Message `json:"object,omitempty"` + + // the VersionID field exist only for JSON Marshall, it must not be used for + // any other purpose. + VersionId string `json:"version_id,omitempty"` } // String is the string representation of InfoMessage. @@ -27,11 +31,17 @@ func (i InfoMessage) String() string { if i.Destination != nil { return fmt.Sprintf("%v %v %v", i.Operation, i.Source, i.Destination) } + if i.Source != nil && i.Source.VersionID != "" { + return fmt.Sprintf("%v %-50v %v", i.Operation, i.Source, i.Source.VersionID) + } return fmt.Sprintf("%v %v", i.Operation, i.Source) } // JSON is the JSON representation of InfoMessage. func (i InfoMessage) JSON() string { + if i.Destination == nil && i.Source != nil { + i.VersionId = i.Source.VersionID + } i.Success = true return strutil.JSON(i) } diff --git a/storage/s3.go b/storage/s3.go index 701b525db..a3e73ab26 100644 --- a/storage/s3.go +++ b/storage/s3.go @@ -115,11 +115,16 @@ func newS3Storage(ctx context.Context, opts Options) (*S3, error) { // Stat retrieves metadata from S3 object without returning the object itself. func (s *S3) Stat(ctx context.Context, url *url.URL) (*Object, error) { - output, err := s.api.HeadObjectWithContext(ctx, &s3.HeadObjectInput{ + input := &s3.HeadObjectInput{ Bucket: aws.String(url.Bucket), Key: aws.String(url.Path), RequestPayer: s.RequestPayer(), - }) + } + if url.VersionID != "" { + input.SetVersionId(url.VersionID) + } + + output, err := s.api.HeadObjectWithContext(ctx, input) if err != nil { if errHasCode(err, "NotFound") { return nil, &ErrGivenObjectNotFound{ObjectAbsPath: url.Absolute()} @@ -150,6 +155,9 @@ func (s *S3) Stat(ctx context.Context, url *url.URL) (*Object, error) { // keys. If no object found or an error is encountered during this period, // it sends these errors to object channel. func (s *S3) List(ctx context.Context, url *url.URL, _ bool) <-chan *Object { + if url.VersionID != "" || url.AllVersions { + return s.listObjectVersions(ctx, url) + } if IsGoogleEndpoint(s.endpointURL) || s.useListObjectsV1 { return s.listObjects(ctx, url) } @@ -157,6 +165,136 @@ func (s *S3) List(ctx context.Context, url *url.URL, _ bool) <-chan *Object { return s.listObjectsV2(ctx, url) } +func (s *S3) listObjectVersions(ctx context.Context, url *url.URL) <-chan *Object { + listInput := s3.ListObjectVersionsInput{ + Bucket: aws.String(url.Bucket), + Prefix: aws.String(url.Prefix), + } + + if url.Delimiter != "" { + listInput.SetDelimiter(url.Delimiter) + } + + objCh := make(chan *Object) + + go func() { + defer close(objCh) + objectFound := false + + var now time.Time + + err := s.api.ListObjectVersionsPagesWithContext(ctx, &listInput, + func(p *s3.ListObjectVersionsOutput, lastPage bool) bool { + for _, c := range p.CommonPrefixes { + prefix := aws.StringValue(c.Prefix) + if !url.Match(prefix) { + continue + } + + newurl := url.Clone() + newurl.Path = prefix + objCh <- &Object{ + URL: newurl, + Type: ObjectType{os.ModeDir}, + } + + objectFound = true + } + // track the instant object iteration began, + // so it can be used to bypass objects created after this instant + if now.IsZero() { + now = time.Now().UTC() + } + + // iterate over all versions of the objects (except the delete markers) + for _, v := range p.Versions { + key := aws.StringValue(v.Key) + if !url.Match(key) { + continue + } + if url.VersionID != "" && url.VersionID != aws.StringValue(v.VersionId) { + continue + } + + mod := aws.TimeValue(v.LastModified).UTC() + if mod.After(now) { + objectFound = true + continue + } + + var objtype os.FileMode + if strings.HasSuffix(key, "/") { + objtype = os.ModeDir + } + + newurl := url.Clone() + newurl.Path = aws.StringValue(v.Key) + newurl.VersionID = aws.StringValue(v.VersionId) + etag := aws.StringValue(v.ETag) + + objCh <- &Object{ + URL: newurl, + Etag: strings.Trim(etag, `"`), + ModTime: &mod, + Type: ObjectType{objtype}, + Size: aws.Int64Value(v.Size), + StorageClass: StorageClass(aws.StringValue(v.StorageClass)), + } + + objectFound = true + } + + // iterate over all delete marker versions of the objects + for _, d := range p.DeleteMarkers { + key := aws.StringValue(d.Key) + if !url.Match(key) { + continue + } + if url.VersionID != "" && url.VersionID != aws.StringValue(d.VersionId) { + continue + } + + mod := aws.TimeValue(d.LastModified).UTC() + if mod.After(now) { + objectFound = true + continue + } + + var objtype os.FileMode + if strings.HasSuffix(key, "/") { + objtype = os.ModeDir + } + + newurl := url.Clone() + newurl.Path = aws.StringValue(d.Key) + newurl.VersionID = aws.StringValue(d.VersionId) + + objCh <- &Object{ + URL: newurl, + ModTime: &mod, + Type: ObjectType{objtype}, + Size: 0, + } + + objectFound = true + } + + return !lastPage + }) + + if err != nil { + objCh <- &Object{Err: err} + return + } + + if !objectFound { + objCh <- &Object{Err: ErrNoObjectFound} + } + }() + + return objCh +} + func (s *S3) listObjectsV2(ctx context.Context, url *url.URL) <-chan *Object { listInput := s3.ListObjectsV2Input{ Bucket: aws.String(url.Bucket), @@ -355,6 +493,14 @@ func (s *S3) Copy(ctx context.Context, from, to *url.URL, metadata Metadata) err CopySource: aws.String(copySource), RequestPayer: s.RequestPayer(), } + if from.VersionID != "" { + // Unlike many other *Input and *Output types version ID is not a field, + // but rather something that must be appended to CopySource string. + // This is same in both v1 and v2 SDKs: + // https://pkg.go.dev/github.com/aws/aws-sdk-go/service/s3#CopyObjectInput + // https://pkg.go.dev/github.com/aws/aws-sdk-go-v2/service/s3#CopyObjectInput + input.CopySource = aws.String(copySource + "?versionId=" + from.VersionID) + } storageClass := metadata.StorageClass() if storageClass != "" { @@ -395,11 +541,17 @@ func (s *S3) Copy(ctx context.Context, from, to *url.URL, metadata Metadata) err // Read fetches the remote object and returns its contents as an io.ReadCloser. func (s *S3) Read(ctx context.Context, src *url.URL) (io.ReadCloser, error) { - resp, err := s.api.GetObjectWithContext(ctx, &s3.GetObjectInput{ + input := &s3.GetObjectInput{ Bucket: aws.String(src.Bucket), Key: aws.String(src.Path), RequestPayer: s.RequestPayer(), - }) + } + if src.VersionID != "" { + input.SetVersionId(src.VersionID) + } + + resp, err := s.api.GetObjectWithContext(ctx, input) + if err != nil { return nil, err } @@ -420,11 +572,16 @@ func (s *S3) Get( return 0, nil } - return s.downloader.DownloadWithContext(ctx, to, &s3.GetObjectInput{ + input := &s3.GetObjectInput{ Bucket: aws.String(from.Bucket), Key: aws.String(from.Path), RequestPayer: s.RequestPayer(), - }, func(u *s3manager.Downloader) { + } + if from.VersionID != "" { + input.VersionId = aws.String(from.VersionID) + } + + return s.downloader.DownloadWithContext(ctx, to, input, func(u *s3manager.Downloader) { u.PartSize = partSize u.Concurrency = concurrency }) @@ -650,6 +807,10 @@ func (s *S3) calculateChunks(ch <-chan *url.URL) <-chan chunk { bucket = url.Bucket objid := &s3.ObjectIdentifier{Key: aws.String(url.Path)} + if url.VersionID != "" { + objid.VersionId = &url.VersionID + } + keys = append(keys, objid) if len(keys) == chunkSize { chunkch <- chunk{ @@ -695,6 +856,7 @@ func (s *S3) doDelete(ctx context.Context, chunk chunk, resultch chan *Object) { for _, k := range chunk.Keys { key := fmt.Sprintf("s3://%v/%v", chunk.Bucket, aws.StringValue(k.Key)) url, _ := url.New(key) + url.VersionID = aws.StringValue(k.VersionId) resultch <- &Object{URL: url} } return @@ -733,12 +895,15 @@ func (s *S3) doDelete(ctx context.Context, chunk chunk, resultch chan *Object) { for _, d := range o.Deleted { key := fmt.Sprintf("s3://%v/%v", bucket, aws.StringValue(d.Key)) url, _ := url.New(key) + url.VersionID = aws.StringValue(d.VersionId) resultch <- &Object{URL: url} } for _, e := range o.Errors { key := fmt.Sprintf("s3://%v/%v", bucket, aws.StringValue(e.Key)) url, _ := url.New(key) + url.VersionID = aws.StringValue(e.VersionId) + resultch <- &Object{ URL: url, Err: fmt.Errorf(aws.StringValue(e.Message)), @@ -826,6 +991,34 @@ func (s *S3) RemoveBucket(ctx context.Context, name string) error { return err } +// SetBucketVersioning sets the versioning property of the bucket +func (s *S3) SetBucketVersioning(ctx context.Context, versioningStatus, bucket string) error { + if s.dryRun { + return nil + } + + _, err := s.api.PutBucketVersioningWithContext(ctx, &s3.PutBucketVersioningInput{ + Bucket: aws.String(bucket), + VersioningConfiguration: &s3.VersioningConfiguration{ + Status: aws.String(versioningStatus), + }, + }) + return err +} + +// GetBucketVersioning returnsversioning property of the bucket +func (s *S3) GetBucketVersioning(ctx context.Context, bucket string) (string, error) { + output, err := s.api.GetBucketVersioningWithContext(ctx, &s3.GetBucketVersioningInput{ + Bucket: aws.String(bucket), + }) + if err != nil || output.Status == nil { + return "", err + } + + return *output.Status, nil + +} + type sdkLogger struct{} func (l sdkLogger) Log(args ...interface{}) { diff --git a/storage/storage.go b/storage/storage.go index ea16bd951..7725d9375 100644 --- a/storage/storage.go +++ b/storage/storage.go @@ -60,6 +60,7 @@ func NewLocalClient(opts Options) *Filesystem { func NewRemoteClient(ctx context.Context, url *url.URL, opts Options) (*S3, error) { newOpts := Options{ MaxRetries: opts.MaxRetries, + NoSuchUploadRetryCount: opts.NoSuchUploadRetryCount, Endpoint: opts.Endpoint, NoVerifySSL: opts.NoVerifySSL, DryRun: opts.DryRun, @@ -71,7 +72,6 @@ func NewRemoteClient(ctx context.Context, url *url.URL, opts Options) (*S3, erro LogLevel: opts.LogLevel, bucket: url.Bucket, region: opts.region, - NoSuchUploadRetryCount: opts.NoSuchUploadRetryCount, } return newS3Storage(ctx, newOpts) } @@ -114,6 +114,10 @@ type Object struct { StorageClass StorageClass `json:"storage_class,omitempty"` Err error `json:"error,omitempty"` retryID string + + // the VersionID field exist only for JSON Marshall, it must not be used for + // any other purpose. URL.VersionID must be used instead. + VersionID string `json:"version_id,omitempty"` } // String returns the string representation of Object. @@ -123,6 +127,9 @@ func (o *Object) String() string { // JSON returns the JSON representation of Object. func (o *Object) JSON() string { + if o.URL != nil { + o.VersionID = o.URL.VersionID + } return strutil.JSON(o) } diff --git a/storage/url/url.go b/storage/url/url.go index 950245fb7..f03298e05 100644 --- a/storage/url/url.go +++ b/storage/url/url.go @@ -40,12 +40,14 @@ const ( // URL is the canonical representation of an object, either on local or remote // storage. type URL struct { - Type urlType - Scheme string - Bucket string - Path string - Delimiter string - Prefix string + Type urlType + Scheme string + Bucket string + Path string + Delimiter string + Prefix string + VersionID string + AllVersions bool relativePath string filter string @@ -61,6 +63,18 @@ func WithRaw(mode bool) Option { } } +func WithVersion(versionId string) Option { + return func(u *URL) { + u.VersionID = versionId + } +} + +func WithAllVersions(isAllVersions bool) Option { + return func(u *URL) { + u.AllVersions = isAllVersions + } +} + // New creates a new URL from given path string. func New(s string, opts ...Option) (*URL, error) { // TODO Change strCut to strings.Cut when minimum required Go version is 1.18 @@ -150,6 +164,11 @@ func (u *URL) IsBucket() bool { return u.IsRemote() && u.Path == "" } +// IsVersioned returns true if the URL has versioning related values +func (u *URL) IsVersioned() bool { + return u.AllVersions || u.VersionID != "" +} + // Absolute returns the absolute URL format of the object. func (u *URL) Absolute() string { if !u.IsRemote() { @@ -280,16 +299,19 @@ func (u *URL) setPrefixAndFilter() error { // Clone creates a copy of the receiver. func (u *URL) Clone() *URL { return &URL{ - Type: u.Type, - Scheme: u.Scheme, - Bucket: u.Bucket, - Delimiter: u.Delimiter, - Path: u.Path, - Prefix: u.Prefix, + Type: u.Type, + Scheme: u.Scheme, + Bucket: u.Bucket, + Path: u.Path, + Delimiter: u.Delimiter, + Prefix: u.Prefix, + VersionID: u.VersionID, + AllVersions: u.AllVersions, relativePath: u.relativePath, filter: u.filter, filterRegex: u.filterRegex, + raw: u.raw, } } diff --git a/strutil/strutil.go b/strutil/strutil.go index a0cec37a0..49c66183d 100644 --- a/strutil/strutil.go +++ b/strutil/strutil.go @@ -43,6 +43,17 @@ func JSON(v interface{}) string { return string(bytes) } +// CapitalizeFirstRune converts first rune to uppercase, and converts rest of +// the string to lower case. +func CapitalizeFirstRune(str string) string { + if str == "" { + return str + } + runes := []rune(str) + first, rest := runes[0], runes[1:] + return strings.ToUpper(string(first)) + strings.ToLower(string(rest)) +} + // AddNewLineFlag adds a flag that allows . to match new line character "\n". // It assumes that the pattern does not have any flags. func AddNewLineFlag(pattern string) string { diff --git a/strutil/strutil_test.go b/strutil/strutil_test.go index 2741f89d6..9884279fd 100644 --- a/strutil/strutil_test.go +++ b/strutil/strutil_test.go @@ -2,6 +2,47 @@ package strutil import "testing" +func TestCapitalizeFirstLetter(t *testing.T) { + tests := []struct { + name string + arg string + want string + }{ + { + name: "empty string", + arg: "", + want: "", + }, + { + name: "single rune", + arg: "s", + want: "S", + }, + { + name: "normal word", + arg: "sUsPend", + want: "Suspend", + }, + { + name: "with number", + arg: "numb3r", + want: "Numb3r", + }, + { + name: "two words", + arg: "two words", + want: "Two words", + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := CapitalizeFirstRune(tt.arg); got != tt.want { + t.Errorf("CapitalizeFirstRune() = %v, want %v", got, tt.want) + } + }) + } +} + func Test_WildCardToRegexp(t *testing.T) { t.Parallel() tests := []struct {