From f0ce87d6f32f5c49c02fd2996f0418667f6a3998 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Muhammed=20Can=20K=C3=BC=C3=A7=C3=BCkaslan?= <32893303+Kucukaslan@users.noreply.github.com> Date: Fri, 16 Jun 2023 11:03:09 +0300 Subject: [PATCH] command: use external sort for comparison in sync (#483) It uses external sort instead of in-memory sort to dramatically reduce memory usage in the expense of the speed. It uses encoding/gob format to store to disk and restore from there. Fixes #441 Fixes #447 --- CHANGELOG.md | 5 + command/sync.go | 280 +++++--- e2e/sync_test.go | 5 +- go.mod | 4 +- go.sum | 16 +- storage/storage.go | 38 + storage/url/url.go | 44 ++ storage/url/url_test.go | 44 ++ vendor/github.com/lanrat/extsort/.gitignore | 1 + vendor/github.com/lanrat/extsort/LICENSE | 674 ++++++++++++++++++ vendor/github.com/lanrat/extsort/Makefile | 24 + vendor/github.com/lanrat/extsort/README.md | 79 ++ vendor/github.com/lanrat/extsort/config.go | 42 ++ .../lanrat/extsort/queue/priority_queue.go | 106 +++ .../lanrat/extsort/sort_sorttype.go | 309 ++++++++ .../github.com/lanrat/extsort/sort_strings.go | 294 ++++++++ .../lanrat/extsort/tempfile/mockfile.go | 106 +++ .../lanrat/extsort/tempfile/tempfile.go | 154 ++++ .../lanrat/extsort/tempfile/types.go | 23 + vendor/github.com/lanrat/extsort/types.go | 19 + vendor/github.com/lanrat/extsort/uniq.go | 24 + vendor/golang.org/x/sync/AUTHORS | 3 + vendor/golang.org/x/sync/CONTRIBUTORS | 3 + vendor/golang.org/x/sync/LICENSE | 27 + vendor/golang.org/x/sync/PATENTS | 22 + vendor/golang.org/x/sync/errgroup/errgroup.go | 66 ++ vendor/modules.txt | 10 +- 27 files changed, 2318 insertions(+), 104 deletions(-) create mode 100644 vendor/github.com/lanrat/extsort/.gitignore create mode 100644 vendor/github.com/lanrat/extsort/LICENSE create mode 100644 vendor/github.com/lanrat/extsort/Makefile create mode 100644 vendor/github.com/lanrat/extsort/README.md create mode 100644 vendor/github.com/lanrat/extsort/config.go create mode 100644 vendor/github.com/lanrat/extsort/queue/priority_queue.go create mode 100644 vendor/github.com/lanrat/extsort/sort_sorttype.go create mode 100644 vendor/github.com/lanrat/extsort/sort_strings.go create mode 100644 vendor/github.com/lanrat/extsort/tempfile/mockfile.go create mode 100644 vendor/github.com/lanrat/extsort/tempfile/tempfile.go create mode 100644 vendor/github.com/lanrat/extsort/tempfile/types.go create mode 100644 vendor/github.com/lanrat/extsort/types.go create mode 100644 vendor/github.com/lanrat/extsort/uniq.go create mode 100644 vendor/golang.org/x/sync/AUTHORS create mode 100644 vendor/golang.org/x/sync/CONTRIBUTORS create mode 100644 vendor/golang.org/x/sync/LICENSE create mode 100644 vendor/golang.org/x/sync/PATENTS create mode 100644 vendor/golang.org/x/sync/errgroup/errgroup.go diff --git a/CHANGELOG.md b/CHANGELOG.md index 6effb3710..c30dd9c20 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -17,6 +17,11 @@ - Allow adjacent slashes to be used as keys when uploading to remote. ([#459](https://github.com/peak/s5cmd/pull/459)) - Debian packages are provided on [releases page](https://github.com/peak/s5cmd/releases) ([#380](https://github.com/peak/s5cmd/issues/380)) - Upgraded minimum required Go version to 1.17. +- The sync command uses `external sort` instead of `internal` sort. This change + reduces RAM usage from ~10 GB to ~1.5 GB for `sync` operation of a directory containing + 1,000,000 files at a cost of speed (20% slower for 1,000,000 objects). For smaller + directories (~50,000 files) there is no significant change in speed. ([#483](https://github.com/peak/s5cmd/pull/483)) + - Improve auto-completion support of s5cmd for `zsh` and `bash`, start supporting `pwsh` and stop the support for `fish`. Now s5cmd can complete bucket names, s3 keys in a bucket and the local files. However, `install-completion` flag no longer _installs_ the completion script to `*rc` files instead it merely gives instructions to install autocompletion and provides the autocompletion script ([#500](https://github.com/peak/s5cmd/pull/500)). #### Bugfixes diff --git a/command/sync.go b/command/sync.go index c35f27c85..23413fd5f 100644 --- a/command/sync.go +++ b/command/sync.go @@ -6,11 +6,11 @@ import ( "io" "os" "path/filepath" - "sort" "strings" "sync" "github.com/hashicorp/go-multierror" + "github.com/lanrat/extsort" "github.com/urfave/cli/v2" errorpkg "github.com/peak/s5cmd/error" @@ -20,6 +20,11 @@ import ( "github.com/peak/s5cmd/storage/url" ) +const ( + extsortChannelBufferSize = 1_000 + extsortChunkSize = 100_000 +) + var syncHelpTemplate = `Name: {{.HelpName}} - {{.Usage}} @@ -216,71 +221,66 @@ func (s Sync) Run(c *cli.Context) error { return multierror.Append(err, merrorWaiter).ErrorOrNil() } -// compareObjects compares source and destination objects. +// compareObjects compares source and destination objects. It assumes that +// sourceObjects and destObjects channels are already sorted in ascending order. // Returns objects those in only source, only destination // and both. -// The algorithm is taken from; -// https://github.com/rclone/rclone/blob/HEAD/fs/march/march.go#L304 -func compareObjects(sourceObjects, destObjects []*storage.Object) ([]*url.URL, []*url.URL, []*ObjectPair) { - // sort the source and destination objects. - sort.SliceStable(sourceObjects, func(i, j int) bool { - return sourceObjects[i].URL.Relative() < sourceObjects[j].URL.Relative() - }) - sort.SliceStable(destObjects, func(i, j int) bool { - return destObjects[i].URL.Relative() < destObjects[j].URL.Relative() - }) - +func compareObjects(sourceObjects, destObjects chan *storage.Object) (chan *url.URL, chan *url.URL, chan *ObjectPair) { var ( - srcOnly []*url.URL - dstOnly []*url.URL - commonObj []*ObjectPair + srcOnly = make(chan *url.URL, extsortChannelBufferSize) + dstOnly = make(chan *url.URL, extsortChannelBufferSize) + commonObj = make(chan *ObjectPair, extsortChannelBufferSize) + srcName string + dstName string ) - for iSrc, iDst := 0, 0; ; iSrc, iDst = iSrc+1, iDst+1 { - var srcObject, dstObject *storage.Object - var srcName, dstName string - - if iSrc < len(sourceObjects) { - srcObject = sourceObjects[iSrc] - srcName = filepath.ToSlash(srcObject.URL.Relative()) - } + go func() { + src, srcOk := <-sourceObjects + dst, dstOk := <-destObjects - if iDst < len(destObjects) { - dstObject = destObjects[iDst] - dstName = filepath.ToSlash(dstObject.URL.Relative()) - } + defer close(srcOnly) + defer close(dstOnly) + defer close(commonObj) - if srcObject == nil && dstObject == nil { - break - } + for { + if srcOk { + srcName = filepath.ToSlash(src.URL.Relative()) + } + if dstOk { + dstName = filepath.ToSlash(dst.URL.Relative()) + } - if srcObject != nil && dstObject != nil { - if srcName > dstName { - srcObject = nil - iSrc-- - } else if srcName == dstName { // if there is a match. - commonObj = append(commonObj, &ObjectPair{src: srcObject, dst: dstObject}) - } else { - dstObject = nil - iDst-- + if srcOk && dstOk { + if srcName < dstName { + srcOnly <- src.URL + src, srcOk = <-sourceObjects + } else if srcName == dstName { // if there is a match. + commonObj <- &ObjectPair{src: src, dst: dst} + src, srcOk = <-sourceObjects + dst, dstOk = <-destObjects + } else { + dstOnly <- dst.URL + dst, dstOk = <-destObjects + } + } else if srcOk { + srcOnly <- src.URL + src, srcOk = <-sourceObjects + } else if dstOk { + dstOnly <- dst.URL + dst, dstOk = <-destObjects + } else /* if !srcOK && !dstOk */ { + break } } + }() - switch { - case srcObject == nil && dstObject == nil: - // do nothing - case srcObject == nil: - dstOnly = append(dstOnly, dstObject.URL) - case dstObject == nil: - srcOnly = append(srcOnly, srcObject.URL) - } - } return srcOnly, dstOnly, commonObj } -// getSourceAndDestinationObjects returns source and destination -// objects from given urls. -func (s Sync) getSourceAndDestinationObjects(ctx context.Context, srcurl, dsturl *url.URL) ([]*storage.Object, []*storage.Object, error) { +// getSourceAndDestinationObjects returns source and destination objects from +// given URLs. The returned channels gives objects sorted in ascending order +// with respect to their url.Relative path. See also storage.Less. +func (s Sync) getSourceAndDestinationObjects(ctx context.Context, srcurl, dsturl *url.URL) (chan *storage.Object, chan *storage.Object, error) { sourceClient, err := storage.NewClient(ctx, srcurl, s.storageOpts) if err != nil { return nil, nil, err @@ -305,46 +305,105 @@ func (s Sync) getSourceAndDestinationObjects(ctx context.Context, srcurl, dsturl } var ( - sourceObjects []*storage.Object - destObjects []*storage.Object - wg sync.WaitGroup + sourceObjects = make(chan *storage.Object, extsortChannelBufferSize) + destObjects = make(chan *storage.Object, extsortChannelBufferSize) ) + extsortDefaultConfig := extsort.DefaultConfig() + extsortConfig := &extsort.Config{ + ChunkSize: extsortChunkSize, + NumWorkers: extsortDefaultConfig.NumWorkers, + ChanBuffSize: extsortChannelBufferSize, + SortedChanBuffSize: extsortChannelBufferSize, + } + extsortDefaultConfig = nil + // get source objects. - wg.Add(1) go func() { - defer wg.Done() - srcObjectChannel := sourceClient.List(ctx, srcurl, s.followSymlinks) - for srcObject := range srcObjectChannel { - if s.shouldSkipObject(srcObject, true) { - continue + defer close(sourceObjects) + unfilteredSrcObjectChannel := sourceClient.List(ctx, srcurl, s.followSymlinks) + filteredSrcObjectChannel := make(chan extsort.SortType, extsortChannelBufferSize) + + go func() { + defer close(filteredSrcObjectChannel) + // filter and redirect objects + for st := range unfilteredSrcObjectChannel { + if s.shouldSkipObject(st, true) { + continue + } + filteredSrcObjectChannel <- *st } - sourceObjects = append(sourceObjects, srcObject) + }() + + var ( + sorter *extsort.SortTypeSorter + srcOutputChan chan extsort.SortType + ) + + sorter, srcOutputChan, srcErrCh := extsort.New(filteredSrcObjectChannel, storage.FromBytes, storage.Less, extsortConfig) + sorter.Sort(ctx) + + for srcObject := range srcOutputChan { + o := srcObject.(storage.Object) + sourceObjects <- &o } + + // read and print the external sort errors + go func() { + for err := range srcErrCh { + printError(s.fullCommand, s.op, err) + } + }() }() // get destination objects. - wg.Add(1) go func() { - defer wg.Done() - destObjectsChannel := destClient.List(ctx, destObjectsURL, false) - for destObject := range destObjectsChannel { - if s.shouldSkipObject(destObject, false) { - continue + defer close(destObjects) + unfilteredDestObjectsChannel := destClient.List(ctx, destObjectsURL, false) + filteredDstObjectChannel := make(chan extsort.SortType, extsortChannelBufferSize) + + go func() { + defer close(filteredDstObjectChannel) + + // filter and redirect objects + for dt := range unfilteredDestObjectsChannel { + if s.shouldSkipObject(dt, false) { + continue + } + filteredDstObjectChannel <- *dt } - destObjects = append(destObjects, destObject) + }() + + var ( + dstSorter *extsort.SortTypeSorter + dstOutputChan chan extsort.SortType + ) + + dstSorter, dstOutputChan, dstErrCh := extsort.New(filteredDstObjectChannel, storage.FromBytes, storage.Less, extsortConfig) + dstSorter.Sort(ctx) + + for destObject := range dstOutputChan { + o := destObject.(storage.Object) + destObjects <- &o } + + // read and print the external sort errors + go func() { + for err := range dstErrCh { + printError(s.fullCommand, s.op, err) + } + }() + }() - wg.Wait() return sourceObjects, destObjects, nil } // planRun prepares the commands and writes them to writer 'w'. func (s Sync) planRun( c *cli.Context, - onlySource, onlyDest []*url.URL, - common []*ObjectPair, + onlySource, onlyDest chan *url.URL, + common chan *ObjectPair, dsturl *url.URL, strategy SyncStrategy, w io.WriteCloser, @@ -359,44 +418,71 @@ func (s Sync) planRun( "raw": true, } + // it should wait until both of the child goroutines for onlySource and common channels + // are completed before closing the WriteCloser w to ensure that all URLs are processed. + var wg sync.WaitGroup + // only in source - for _, srcurl := range onlySource { - curDestURL := generateDestinationURL(srcurl, dsturl, isBatch) - command, err := generateCommand(c, "cp", defaultFlags, srcurl, curDestURL) - if err != nil { - printDebug(s.op, err, srcurl, curDestURL) - continue + wg.Add(1) + go func() { + defer wg.Done() + for srcurl := range onlySource { + curDestURL := generateDestinationURL(srcurl, dsturl, isBatch) + command, err := generateCommand(c, "cp", defaultFlags, srcurl, curDestURL) + if err != nil { + printDebug(s.op, err, srcurl, curDestURL) + continue + } + fmt.Fprintln(w, command) } - fmt.Fprintln(w, command) - } + }() // both in source and destination - for _, commonObject := range common { - sourceObject, destObject := commonObject.src, commonObject.dst - curSourceURL, curDestURL := sourceObject.URL, destObject.URL - err := strategy.ShouldSync(sourceObject, destObject) // check if object should be copied. - if err != nil { - printDebug(s.op, err, curSourceURL, curDestURL) - continue - } + wg.Add(1) + go func() { + defer wg.Done() + for commonObject := range common { + sourceObject, destObject := commonObject.src, commonObject.dst + curSourceURL, curDestURL := sourceObject.URL, destObject.URL + err := strategy.ShouldSync(sourceObject, destObject) // check if object should be copied. + if err != nil { + printDebug(s.op, err, curSourceURL, curDestURL) + continue + } - command, err := generateCommand(c, "cp", defaultFlags, curSourceURL, curDestURL) - if err != nil { - printDebug(s.op, err, curSourceURL, curDestURL) - continue + command, err := generateCommand(c, "cp", defaultFlags, curSourceURL, curDestURL) + if err != nil { + printDebug(s.op, err, curSourceURL, curDestURL) + continue + } + fmt.Fprintln(w, command) } - fmt.Fprintln(w, command) - } + }() // only in destination - if s.delete && len(onlyDest) > 0 { - command, err := generateCommand(c, "rm", defaultFlags, onlyDest...) + if s.delete { + // unfortunately we need to read them all! + // or rewrite generateCommand function? + dstURLs := make([]*url.URL, 0, extsortChunkSize) + + for d := range onlyDest { + dstURLs = append(dstURLs, d) + } + command, err := generateCommand(c, "rm", defaultFlags, dstURLs...) if err != nil { - printDebug(s.op, err, onlyDest...) + printDebug(s.op, err, dstURLs...) return } fmt.Fprintln(w, command) + } else { + // we only need to consume them from the channel so that rest of the objects + // can be sent to channel. + for d := range onlyDest { + _ = d + } } + + wg.Wait() } // generateDestinationURL generates destination url for given diff --git a/e2e/sync_test.go b/e2e/sync_test.go index 2df6f2547..e4dc00086 100644 --- a/e2e/sync_test.go +++ b/e2e/sync_test.go @@ -15,9 +15,7 @@ import ( // sync -n s3://bucket/object file func TestSyncFailForNonsharedFlagsFromCopyCommand(t *testing.T) { t.Parallel() - s3client, s5cmd := setup(t) - const ( filename = "source.go" ) @@ -222,6 +220,9 @@ func TestSyncLocalFolderToS3EmptyBucket(t *testing.T) { 3: equals(`cp %vtestfile.txt %vtestfile.txt`, src, dst), }, sortInput(true)) + // there should be no error, since "no object found" error for destination is ignored + assertLines(t, result.Stderr(), map[int]compareFunc{}) + // assert local filesystem expected := fs.Expected(t, folderLayout...) assert.Assert(t, fs.Equal(workdir.Path(), expected)) diff --git a/go.mod b/go.mod index 369e0fbfb..3980c6b9e 100644 --- a/go.mod +++ b/go.mod @@ -10,6 +10,7 @@ require ( github.com/igungor/gofakes3 v0.0.13 github.com/karrick/godirwalk v1.15.3 github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51 + github.com/lanrat/extsort v1.0.0 github.com/stretchr/testify v1.4.0 github.com/termie/go-shutil v0.0.0-20140729215957-bcacb06fecae github.com/urfave/cli/v2 v2.11.2 @@ -21,7 +22,7 @@ require ( github.com/davecgh/go-spew v1.1.1 // indirect github.com/hashicorp/errwrap v1.0.0 // indirect github.com/jmespath/go-jmespath v0.4.0 // indirect - github.com/kr/pretty v0.2.0 // indirect + github.com/kr/pretty v0.3.0 // indirect github.com/pkg/errors v0.9.1 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/russross/blackfriday/v2 v2.1.0 // indirect @@ -30,6 +31,7 @@ require ( github.com/stretchr/objx v0.1.0 // indirect github.com/xrash/smetrics v0.0.0-20201216005158-039620a65673 // indirect go.etcd.io/bbolt v1.3.6 // indirect + golang.org/x/sync v0.0.0-20210220032951-036812b2e83c // indirect golang.org/x/sys v0.0.0-20220405210540-1e041c57c461 // indirect golang.org/x/tools v0.0.0-20190624222133-a101b041ded4 // indirect golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 // indirect diff --git a/go.sum b/go.sum index 2393b5928..010c19273 100644 --- a/go.sum +++ b/go.sum @@ -4,6 +4,7 @@ github.com/aws/aws-sdk-go v1.40.25 h1:Depnx7O86HWgOCLD5nMto6F9Ju85Q1QuFDnbpZYQWn github.com/aws/aws-sdk-go v1.40.25/go.mod h1:585smgzpB/KqRA+K3y/NL/oYRqQvpNJYvLm+LY1U59Q= github.com/cpuguy83/go-md2man/v2 v2.0.2 h1:p1EgwI/C7NhT0JmVkwCD2ZBK8j4aeHQX2pMHHBfMQ6w= github.com/cpuguy83/go-md2man/v2 v2.0.2/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o= +github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= @@ -27,16 +28,22 @@ github.com/karrick/godirwalk v1.15.3 h1:0a2pXOgtB16CqIqXTiT7+K9L73f74n/aNQUnH6Or github.com/karrick/godirwalk v1.15.3/go.mod h1:j4mkqPuvaLI8mp1DroR3P6ad7cyYd4c1qeJ3RV7ULlk= github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51 h1:Z9n2FFNUXsshfwJMBgNA0RU6/i7WVaAegv3PtuIHPMs= github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51/go.mod h1:CzGEWj7cYgsdH8dAjBGEr58BoE7ScuLd+fwFZ44+/x8= -github.com/kr/pretty v0.2.0 h1:s5hAObm+yFO5uHYt5dYjxi2rXrsnmRpJx4OYvIWUaQs= -github.com/kr/pretty v0.2.0/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= +github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= +github.com/kr/pretty v0.3.0 h1:WgNl7dwNpEZ6jJ9k1snq4pZsg7DOEN8hP9Xw0Tsjwk0= +github.com/kr/pretty v0.3.0/go.mod h1:640gp4NfQd8pI5XOwp5fnNeVWj67G7CFk/SaSQn7NBk= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= -github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= +github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= +github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= +github.com/lanrat/extsort v1.0.0 h1:JjvkCUbD55+gs5s64FHmCU93kWjegEAM5n10XN6GB3c= +github.com/lanrat/extsort v1.0.0/go.mod h1:bkDEvem4UnD1h87yKICydXs63mKrIGW3W9OGPMg93Ww= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/rogpeppe/go-internal v1.6.1 h1:/FiVV8dS/e+YqF2JvO3yXRFbBLTIuSDkuC7aBOAvL+k= +github.com/rogpeppe/go-internal v1.6.1/go.mod h1:xXDCJY+GAPziupqXw64V24skbSoqbTEfhy4qGm1nDQc= github.com/russross/blackfriday/v2 v2.1.0 h1:JIOH55/0cWyOuilr9/qlrm0BSXldqnqwMsf35Ld67mk= github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= github.com/ryszard/goskiplist v0.0.0-20150312221310-2dfbae5fcf46 h1:GHRpF1pTW19a8tTFrMLUcfWwyC0pnifVo2ClaLq+hP8= @@ -65,6 +72,8 @@ golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn golang.org/x/net v0.0.0-20210614182718-04defd469f4e h1:XpT3nA5TvE525Ne3hInMh6+GETgn27Zfm9dxsThnX2Q= golang.org/x/net v0.0.0-20210614182718-04defd469f4e/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20210220032951-036812b2e83c h1:5KslGYwFpkhGh+Q16bwMP3cOontH8FOep7tGV86Y7SQ= +golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20200923182605-d9f96fdee20d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= @@ -85,6 +94,7 @@ golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8T gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI= gopkg.in/mgo.v2 v2.0.0-20180705113604-9856a29383ce h1:xcEWjVhvbDy+nHP67nPDDpbYrY+ILlfndk4bRioVHaU= gopkg.in/mgo.v2 v2.0.0-20180705113604-9856a29383ce/go.mod h1:yeKp02qBN3iKW1OzL3MGk2IdtZzaj7SFntXj72NppTA= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= diff --git a/storage/storage.go b/storage/storage.go index 52f7f5647..ea16bd951 100644 --- a/storage/storage.go +++ b/storage/storage.go @@ -2,12 +2,15 @@ package storage import ( + "bytes" "context" + "encoding/gob" "encoding/json" "fmt" "os" "time" + "github.com/lanrat/extsort" "github.com/peak/s5cmd/log" "github.com/peak/s5cmd/storage/url" "github.com/peak/s5cmd/strutil" @@ -280,3 +283,38 @@ func (m Metadata) SetContentEncoding(contentEncoding string) Metadata { m["ContentEncoding"] = contentEncoding return m } + +func (o Object) ToBytes() []byte { + buf := bytes.NewBuffer(make([]byte, 0, 200)) + enc := gob.NewEncoder(buf) + enc.Encode(o.URL.ToBytes()) + enc.Encode(o.ModTime.Format(time.RFC3339Nano)) + enc.Encode(o.Type.mode) + enc.Encode(o.Size) + + return buf.Bytes() +} + +func FromBytes(data []byte) extsort.SortType { + dec := gob.NewDecoder(bytes.NewBuffer(data)) + var gobUrl []byte + dec.Decode(&gobUrl) + u := url.FromBytes(gobUrl).(*url.URL) + o := Object{ + URL: u, + } + str := "" + dec.Decode(&str) + tmp, _ := time.Parse(time.RFC3339Nano, str) + o.ModTime = &tmp + dec.Decode(&o.Type.mode) + dec.Decode(&o.Size) + return o +} + +// Less returns if relative path of storage.Object a's URL comes before the one +// of b's in the lexicographic order. +// It assumes that both a, and b are the instances of Object +func Less(a, b extsort.SortType) bool { + return a.(Object).URL.Relative() < b.(Object).URL.Relative() +} diff --git a/storage/url/url.go b/storage/url/url.go index 4cfc53f2f..950245fb7 100644 --- a/storage/url/url.go +++ b/storage/url/url.go @@ -2,6 +2,8 @@ package url import ( + "bytes" + "encoding/gob" "encoding/json" "fmt" "net/url" @@ -11,6 +13,7 @@ import ( "runtime" "strings" + "github.com/lanrat/extsort" "github.com/peak/s5cmd/strutil" ) @@ -356,6 +359,31 @@ func (u *URL) MarshalJSON() ([]byte, error) { return json.Marshal(u.String()) } +func (u URL) ToBytes() []byte { + buf := bytes.NewBuffer(make([]byte, 0)) + enc := gob.NewEncoder(buf) + enc.Encode(u.Absolute()) + enc.Encode(u.relativePath) + enc.Encode(u.raw) + return buf.Bytes() +} + +func FromBytes(data []byte) extsort.SortType { + buf := bytes.NewBuffer(data) + dec := gob.NewDecoder(buf) + var ( + abs, rel string + raw bool + ) + dec.Decode(&abs) + dec.Decode(&rel) + dec.Decode(&raw) + + url, _ := New(abs, WithRaw(raw)) + url.relativePath = rel + return url +} + // IsWildcard reports whether if a string contains any wildcard chars. func (u *URL) IsWildcard() bool { return !u.raw && hasGlobCharacter(u.Path) @@ -424,3 +452,19 @@ func (u *URL) EscapedPath() string { } return strings.Join(sourceKeyElements, "/") } + +// check if all fields of URL equal +func (u *URL) deepEqual(url *URL) bool { + if url.Absolute() != u.Absolute() || + url.Type != u.Type || + url.Scheme != u.Scheme || + url.Bucket != u.Bucket || + url.Delimiter != u.Delimiter || + url.Path != u.Path || + url.Prefix != u.Prefix || + url.relativePath != u.relativePath || + url.filter != u.filter { + return false + } + return true +} diff --git a/storage/url/url_test.go b/storage/url/url_test.go index fa19f262c..87237cbbc 100644 --- a/storage/url/url_test.go +++ b/storage/url/url_test.go @@ -600,3 +600,47 @@ func TestURLSetRelative(t *testing.T) { }) } } + +func TestToFromBytes(t *testing.T) { + testcases := []struct { + name string + key string + relative string + }{ + { + name: "plain remote", + key: "s3://bucket/file", + relative: "file", + }, + { + name: "space char remote", + key: "s3://bucket/s ace/file", + relative: "s ace/file", + }, + { + name: "space char remote", + key: "s3://bucket/li\ne/file", + relative: "li\ne/file", + }, + } + + for _, tc := range testcases { + t.Run(tc.name, func(t *testing.T) { + url, err := New(tc.key) + if err != nil { + t.Errorf("URL cannot be instantiated: \nPath: %v, Error: %v", tc.key, err) + } + + url.relativePath = tc.relative + + newURL := FromBytes(url.ToBytes()).(*URL) + + if !reflect.DeepEqual(url, newURL) { + t.Errorf("got = %q, want %q", url, newURL) + } + if !url.deepEqual(newURL) { + t.Errorf("Not equal") + } + }) + } +} diff --git a/vendor/github.com/lanrat/extsort/.gitignore b/vendor/github.com/lanrat/extsort/.gitignore new file mode 100644 index 000000000..1b3ac108d --- /dev/null +++ b/vendor/github.com/lanrat/extsort/.gitignore @@ -0,0 +1 @@ +coverage.out \ No newline at end of file diff --git a/vendor/github.com/lanrat/extsort/LICENSE b/vendor/github.com/lanrat/extsort/LICENSE new file mode 100644 index 000000000..f288702d2 --- /dev/null +++ b/vendor/github.com/lanrat/extsort/LICENSE @@ -0,0 +1,674 @@ + GNU GENERAL PUBLIC LICENSE + Version 3, 29 June 2007 + + Copyright (C) 2007 Free Software Foundation, Inc. + Everyone is permitted to copy and distribute verbatim copies + of this license document, but changing it is not allowed. + + Preamble + + The GNU General Public License is a free, copyleft license for +software and other kinds of works. + + The licenses for most software and other practical works are designed +to take away your freedom to share and change the works. By contrast, +the GNU General Public License is intended to guarantee your freedom to +share and change all versions of a program--to make sure it remains free +software for all its users. We, the Free Software Foundation, use the +GNU General Public License for most of our software; it applies also to +any other work released this way by its authors. You can apply it to +your programs, too. + + When we speak of free software, we are referring to freedom, not +price. Our General Public Licenses are designed to make sure that you +have the freedom to distribute copies of free software (and charge for +them if you wish), that you receive source code or can get it if you +want it, that you can change the software or use pieces of it in new +free programs, and that you know you can do these things. + + To protect your rights, we need to prevent others from denying you +these rights or asking you to surrender the rights. Therefore, you have +certain responsibilities if you distribute copies of the software, or if +you modify it: responsibilities to respect the freedom of others. + + For example, if you distribute copies of such a program, whether +gratis or for a fee, you must pass on to the recipients the same +freedoms that you received. You must make sure that they, too, receive +or can get the source code. And you must show them these terms so they +know their rights. + + Developers that use the GNU GPL protect your rights with two steps: +(1) assert copyright on the software, and (2) offer you this License +giving you legal permission to copy, distribute and/or modify it. + + For the developers' and authors' protection, the GPL clearly explains +that there is no warranty for this free software. For both users' and +authors' sake, the GPL requires that modified versions be marked as +changed, so that their problems will not be attributed erroneously to +authors of previous versions. + + Some devices are designed to deny users access to install or run +modified versions of the software inside them, although the manufacturer +can do so. This is fundamentally incompatible with the aim of +protecting users' freedom to change the software. The systematic +pattern of such abuse occurs in the area of products for individuals to +use, which is precisely where it is most unacceptable. Therefore, we +have designed this version of the GPL to prohibit the practice for those +products. If such problems arise substantially in other domains, we +stand ready to extend this provision to those domains in future versions +of the GPL, as needed to protect the freedom of users. + + Finally, every program is threatened constantly by software patents. +States should not allow patents to restrict development and use of +software on general-purpose computers, but in those that do, we wish to +avoid the special danger that patents applied to a free program could +make it effectively proprietary. To prevent this, the GPL assures that +patents cannot be used to render the program non-free. + + The precise terms and conditions for copying, distribution and +modification follow. + + TERMS AND CONDITIONS + + 0. Definitions. + + "This License" refers to version 3 of the GNU General Public License. + + "Copyright" also means copyright-like laws that apply to other kinds of +works, such as semiconductor masks. + + "The Program" refers to any copyrightable work licensed under this +License. Each licensee is addressed as "you". "Licensees" and +"recipients" may be individuals or organizations. + + To "modify" a work means to copy from or adapt all or part of the work +in a fashion requiring copyright permission, other than the making of an +exact copy. The resulting work is called a "modified version" of the +earlier work or a work "based on" the earlier work. + + A "covered work" means either the unmodified Program or a work based +on the Program. + + To "propagate" a work means to do anything with it that, without +permission, would make you directly or secondarily liable for +infringement under applicable copyright law, except executing it on a +computer or modifying a private copy. Propagation includes copying, +distribution (with or without modification), making available to the +public, and in some countries other activities as well. + + To "convey" a work means any kind of propagation that enables other +parties to make or receive copies. Mere interaction with a user through +a computer network, with no transfer of a copy, is not conveying. + + An interactive user interface displays "Appropriate Legal Notices" +to the extent that it includes a convenient and prominently visible +feature that (1) displays an appropriate copyright notice, and (2) +tells the user that there is no warranty for the work (except to the +extent that warranties are provided), that licensees may convey the +work under this License, and how to view a copy of this License. If +the interface presents a list of user commands or options, such as a +menu, a prominent item in the list meets this criterion. + + 1. Source Code. + + The "source code" for a work means the preferred form of the work +for making modifications to it. "Object code" means any non-source +form of a work. + + A "Standard Interface" means an interface that either is an official +standard defined by a recognized standards body, or, in the case of +interfaces specified for a particular programming language, one that +is widely used among developers working in that language. + + The "System Libraries" of an executable work include anything, other +than the work as a whole, that (a) is included in the normal form of +packaging a Major Component, but which is not part of that Major +Component, and (b) serves only to enable use of the work with that +Major Component, or to implement a Standard Interface for which an +implementation is available to the public in source code form. A +"Major Component", in this context, means a major essential component +(kernel, window system, and so on) of the specific operating system +(if any) on which the executable work runs, or a compiler used to +produce the work, or an object code interpreter used to run it. + + The "Corresponding Source" for a work in object code form means all +the source code needed to generate, install, and (for an executable +work) run the object code and to modify the work, including scripts to +control those activities. However, it does not include the work's +System Libraries, or general-purpose tools or generally available free +programs which are used unmodified in performing those activities but +which are not part of the work. For example, Corresponding Source +includes interface definition files associated with source files for +the work, and the source code for shared libraries and dynamically +linked subprograms that the work is specifically designed to require, +such as by intimate data communication or control flow between those +subprograms and other parts of the work. + + The Corresponding Source need not include anything that users +can regenerate automatically from other parts of the Corresponding +Source. + + The Corresponding Source for a work in source code form is that +same work. + + 2. Basic Permissions. + + All rights granted under this License are granted for the term of +copyright on the Program, and are irrevocable provided the stated +conditions are met. This License explicitly affirms your unlimited +permission to run the unmodified Program. The output from running a +covered work is covered by this License only if the output, given its +content, constitutes a covered work. This License acknowledges your +rights of fair use or other equivalent, as provided by copyright law. + + You may make, run and propagate covered works that you do not +convey, without conditions so long as your license otherwise remains +in force. You may convey covered works to others for the sole purpose +of having them make modifications exclusively for you, or provide you +with facilities for running those works, provided that you comply with +the terms of this License in conveying all material for which you do +not control copyright. Those thus making or running the covered works +for you must do so exclusively on your behalf, under your direction +and control, on terms that prohibit them from making any copies of +your copyrighted material outside their relationship with you. + + Conveying under any other circumstances is permitted solely under +the conditions stated below. Sublicensing is not allowed; section 10 +makes it unnecessary. + + 3. Protecting Users' Legal Rights From Anti-Circumvention Law. + + No covered work shall be deemed part of an effective technological +measure under any applicable law fulfilling obligations under article +11 of the WIPO copyright treaty adopted on 20 December 1996, or +similar laws prohibiting or restricting circumvention of such +measures. + + When you convey a covered work, you waive any legal power to forbid +circumvention of technological measures to the extent such circumvention +is effected by exercising rights under this License with respect to +the covered work, and you disclaim any intention to limit operation or +modification of the work as a means of enforcing, against the work's +users, your or third parties' legal rights to forbid circumvention of +technological measures. + + 4. Conveying Verbatim Copies. + + You may convey verbatim copies of the Program's source code as you +receive it, in any medium, provided that you conspicuously and +appropriately publish on each copy an appropriate copyright notice; +keep intact all notices stating that this License and any +non-permissive terms added in accord with section 7 apply to the code; +keep intact all notices of the absence of any warranty; and give all +recipients a copy of this License along with the Program. + + You may charge any price or no price for each copy that you convey, +and you may offer support or warranty protection for a fee. + + 5. Conveying Modified Source Versions. + + You may convey a work based on the Program, or the modifications to +produce it from the Program, in the form of source code under the +terms of section 4, provided that you also meet all of these conditions: + + a) The work must carry prominent notices stating that you modified + it, and giving a relevant date. + + b) The work must carry prominent notices stating that it is + released under this License and any conditions added under section + 7. This requirement modifies the requirement in section 4 to + "keep intact all notices". + + c) You must license the entire work, as a whole, under this + License to anyone who comes into possession of a copy. This + License will therefore apply, along with any applicable section 7 + additional terms, to the whole of the work, and all its parts, + regardless of how they are packaged. This License gives no + permission to license the work in any other way, but it does not + invalidate such permission if you have separately received it. + + d) If the work has interactive user interfaces, each must display + Appropriate Legal Notices; however, if the Program has interactive + interfaces that do not display Appropriate Legal Notices, your + work need not make them do so. + + A compilation of a covered work with other separate and independent +works, which are not by their nature extensions of the covered work, +and which are not combined with it such as to form a larger program, +in or on a volume of a storage or distribution medium, is called an +"aggregate" if the compilation and its resulting copyright are not +used to limit the access or legal rights of the compilation's users +beyond what the individual works permit. Inclusion of a covered work +in an aggregate does not cause this License to apply to the other +parts of the aggregate. + + 6. Conveying Non-Source Forms. + + You may convey a covered work in object code form under the terms +of sections 4 and 5, provided that you also convey the +machine-readable Corresponding Source under the terms of this License, +in one of these ways: + + a) Convey the object code in, or embodied in, a physical product + (including a physical distribution medium), accompanied by the + Corresponding Source fixed on a durable physical medium + customarily used for software interchange. + + b) Convey the object code in, or embodied in, a physical product + (including a physical distribution medium), accompanied by a + written offer, valid for at least three years and valid for as + long as you offer spare parts or customer support for that product + model, to give anyone who possesses the object code either (1) a + copy of the Corresponding Source for all the software in the + product that is covered by this License, on a durable physical + medium customarily used for software interchange, for a price no + more than your reasonable cost of physically performing this + conveying of source, or (2) access to copy the + Corresponding Source from a network server at no charge. + + c) Convey individual copies of the object code with a copy of the + written offer to provide the Corresponding Source. This + alternative is allowed only occasionally and noncommercially, and + only if you received the object code with such an offer, in accord + with subsection 6b. + + d) Convey the object code by offering access from a designated + place (gratis or for a charge), and offer equivalent access to the + Corresponding Source in the same way through the same place at no + further charge. You need not require recipients to copy the + Corresponding Source along with the object code. If the place to + copy the object code is a network server, the Corresponding Source + may be on a different server (operated by you or a third party) + that supports equivalent copying facilities, provided you maintain + clear directions next to the object code saying where to find the + Corresponding Source. Regardless of what server hosts the + Corresponding Source, you remain obligated to ensure that it is + available for as long as needed to satisfy these requirements. + + e) Convey the object code using peer-to-peer transmission, provided + you inform other peers where the object code and Corresponding + Source of the work are being offered to the general public at no + charge under subsection 6d. + + A separable portion of the object code, whose source code is excluded +from the Corresponding Source as a System Library, need not be +included in conveying the object code work. + + A "User Product" is either (1) a "consumer product", which means any +tangible personal property which is normally used for personal, family, +or household purposes, or (2) anything designed or sold for incorporation +into a dwelling. In determining whether a product is a consumer product, +doubtful cases shall be resolved in favor of coverage. For a particular +product received by a particular user, "normally used" refers to a +typical or common use of that class of product, regardless of the status +of the particular user or of the way in which the particular user +actually uses, or expects or is expected to use, the product. A product +is a consumer product regardless of whether the product has substantial +commercial, industrial or non-consumer uses, unless such uses represent +the only significant mode of use of the product. + + "Installation Information" for a User Product means any methods, +procedures, authorization keys, or other information required to install +and execute modified versions of a covered work in that User Product from +a modified version of its Corresponding Source. The information must +suffice to ensure that the continued functioning of the modified object +code is in no case prevented or interfered with solely because +modification has been made. + + If you convey an object code work under this section in, or with, or +specifically for use in, a User Product, and the conveying occurs as +part of a transaction in which the right of possession and use of the +User Product is transferred to the recipient in perpetuity or for a +fixed term (regardless of how the transaction is characterized), the +Corresponding Source conveyed under this section must be accompanied +by the Installation Information. But this requirement does not apply +if neither you nor any third party retains the ability to install +modified object code on the User Product (for example, the work has +been installed in ROM). + + The requirement to provide Installation Information does not include a +requirement to continue to provide support service, warranty, or updates +for a work that has been modified or installed by the recipient, or for +the User Product in which it has been modified or installed. Access to a +network may be denied when the modification itself materially and +adversely affects the operation of the network or violates the rules and +protocols for communication across the network. + + Corresponding Source conveyed, and Installation Information provided, +in accord with this section must be in a format that is publicly +documented (and with an implementation available to the public in +source code form), and must require no special password or key for +unpacking, reading or copying. + + 7. Additional Terms. + + "Additional permissions" are terms that supplement the terms of this +License by making exceptions from one or more of its conditions. +Additional permissions that are applicable to the entire Program shall +be treated as though they were included in this License, to the extent +that they are valid under applicable law. If additional permissions +apply only to part of the Program, that part may be used separately +under those permissions, but the entire Program remains governed by +this License without regard to the additional permissions. + + When you convey a copy of a covered work, you may at your option +remove any additional permissions from that copy, or from any part of +it. (Additional permissions may be written to require their own +removal in certain cases when you modify the work.) You may place +additional permissions on material, added by you to a covered work, +for which you have or can give appropriate copyright permission. + + Notwithstanding any other provision of this License, for material you +add to a covered work, you may (if authorized by the copyright holders of +that material) supplement the terms of this License with terms: + + a) Disclaiming warranty or limiting liability differently from the + terms of sections 15 and 16 of this License; or + + b) Requiring preservation of specified reasonable legal notices or + author attributions in that material or in the Appropriate Legal + Notices displayed by works containing it; or + + c) Prohibiting misrepresentation of the origin of that material, or + requiring that modified versions of such material be marked in + reasonable ways as different from the original version; or + + d) Limiting the use for publicity purposes of names of licensors or + authors of the material; or + + e) Declining to grant rights under trademark law for use of some + trade names, trademarks, or service marks; or + + f) Requiring indemnification of licensors and authors of that + material by anyone who conveys the material (or modified versions of + it) with contractual assumptions of liability to the recipient, for + any liability that these contractual assumptions directly impose on + those licensors and authors. + + All other non-permissive additional terms are considered "further +restrictions" within the meaning of section 10. If the Program as you +received it, or any part of it, contains a notice stating that it is +governed by this License along with a term that is a further +restriction, you may remove that term. If a license document contains +a further restriction but permits relicensing or conveying under this +License, you may add to a covered work material governed by the terms +of that license document, provided that the further restriction does +not survive such relicensing or conveying. + + If you add terms to a covered work in accord with this section, you +must place, in the relevant source files, a statement of the +additional terms that apply to those files, or a notice indicating +where to find the applicable terms. + + Additional terms, permissive or non-permissive, may be stated in the +form of a separately written license, or stated as exceptions; +the above requirements apply either way. + + 8. Termination. + + You may not propagate or modify a covered work except as expressly +provided under this License. Any attempt otherwise to propagate or +modify it is void, and will automatically terminate your rights under +this License (including any patent licenses granted under the third +paragraph of section 11). + + However, if you cease all violation of this License, then your +license from a particular copyright holder is reinstated (a) +provisionally, unless and until the copyright holder explicitly and +finally terminates your license, and (b) permanently, if the copyright +holder fails to notify you of the violation by some reasonable means +prior to 60 days after the cessation. + + Moreover, your license from a particular copyright holder is +reinstated permanently if the copyright holder notifies you of the +violation by some reasonable means, this is the first time you have +received notice of violation of this License (for any work) from that +copyright holder, and you cure the violation prior to 30 days after +your receipt of the notice. + + Termination of your rights under this section does not terminate the +licenses of parties who have received copies or rights from you under +this License. If your rights have been terminated and not permanently +reinstated, you do not qualify to receive new licenses for the same +material under section 10. + + 9. Acceptance Not Required for Having Copies. + + You are not required to accept this License in order to receive or +run a copy of the Program. Ancillary propagation of a covered work +occurring solely as a consequence of using peer-to-peer transmission +to receive a copy likewise does not require acceptance. However, +nothing other than this License grants you permission to propagate or +modify any covered work. These actions infringe copyright if you do +not accept this License. Therefore, by modifying or propagating a +covered work, you indicate your acceptance of this License to do so. + + 10. Automatic Licensing of Downstream Recipients. + + Each time you convey a covered work, the recipient automatically +receives a license from the original licensors, to run, modify and +propagate that work, subject to this License. You are not responsible +for enforcing compliance by third parties with this License. + + An "entity transaction" is a transaction transferring control of an +organization, or substantially all assets of one, or subdividing an +organization, or merging organizations. If propagation of a covered +work results from an entity transaction, each party to that +transaction who receives a copy of the work also receives whatever +licenses to the work the party's predecessor in interest had or could +give under the previous paragraph, plus a right to possession of the +Corresponding Source of the work from the predecessor in interest, if +the predecessor has it or can get it with reasonable efforts. + + You may not impose any further restrictions on the exercise of the +rights granted or affirmed under this License. For example, you may +not impose a license fee, royalty, or other charge for exercise of +rights granted under this License, and you may not initiate litigation +(including a cross-claim or counterclaim in a lawsuit) alleging that +any patent claim is infringed by making, using, selling, offering for +sale, or importing the Program or any portion of it. + + 11. Patents. + + A "contributor" is a copyright holder who authorizes use under this +License of the Program or a work on which the Program is based. The +work thus licensed is called the contributor's "contributor version". + + A contributor's "essential patent claims" are all patent claims +owned or controlled by the contributor, whether already acquired or +hereafter acquired, that would be infringed by some manner, permitted +by this License, of making, using, or selling its contributor version, +but do not include claims that would be infringed only as a +consequence of further modification of the contributor version. For +purposes of this definition, "control" includes the right to grant +patent sublicenses in a manner consistent with the requirements of +this License. + + Each contributor grants you a non-exclusive, worldwide, royalty-free +patent license under the contributor's essential patent claims, to +make, use, sell, offer for sale, import and otherwise run, modify and +propagate the contents of its contributor version. + + In the following three paragraphs, a "patent license" is any express +agreement or commitment, however denominated, not to enforce a patent +(such as an express permission to practice a patent or covenant not to +sue for patent infringement). To "grant" such a patent license to a +party means to make such an agreement or commitment not to enforce a +patent against the party. + + If you convey a covered work, knowingly relying on a patent license, +and the Corresponding Source of the work is not available for anyone +to copy, free of charge and under the terms of this License, through a +publicly available network server or other readily accessible means, +then you must either (1) cause the Corresponding Source to be so +available, or (2) arrange to deprive yourself of the benefit of the +patent license for this particular work, or (3) arrange, in a manner +consistent with the requirements of this License, to extend the patent +license to downstream recipients. "Knowingly relying" means you have +actual knowledge that, but for the patent license, your conveying the +covered work in a country, or your recipient's use of the covered work +in a country, would infringe one or more identifiable patents in that +country that you have reason to believe are valid. + + If, pursuant to or in connection with a single transaction or +arrangement, you convey, or propagate by procuring conveyance of, a +covered work, and grant a patent license to some of the parties +receiving the covered work authorizing them to use, propagate, modify +or convey a specific copy of the covered work, then the patent license +you grant is automatically extended to all recipients of the covered +work and works based on it. + + A patent license is "discriminatory" if it does not include within +the scope of its coverage, prohibits the exercise of, or is +conditioned on the non-exercise of one or more of the rights that are +specifically granted under this License. You may not convey a covered +work if you are a party to an arrangement with a third party that is +in the business of distributing software, under which you make payment +to the third party based on the extent of your activity of conveying +the work, and under which the third party grants, to any of the +parties who would receive the covered work from you, a discriminatory +patent license (a) in connection with copies of the covered work +conveyed by you (or copies made from those copies), or (b) primarily +for and in connection with specific products or compilations that +contain the covered work, unless you entered into that arrangement, +or that patent license was granted, prior to 28 March 2007. + + Nothing in this License shall be construed as excluding or limiting +any implied license or other defenses to infringement that may +otherwise be available to you under applicable patent law. + + 12. No Surrender of Others' Freedom. + + If conditions are imposed on you (whether by court order, agreement or +otherwise) that contradict the conditions of this License, they do not +excuse you from the conditions of this License. If you cannot convey a +covered work so as to satisfy simultaneously your obligations under this +License and any other pertinent obligations, then as a consequence you may +not convey it at all. For example, if you agree to terms that obligate you +to collect a royalty for further conveying from those to whom you convey +the Program, the only way you could satisfy both those terms and this +License would be to refrain entirely from conveying the Program. + + 13. Use with the GNU Affero General Public License. + + Notwithstanding any other provision of this License, you have +permission to link or combine any covered work with a work licensed +under version 3 of the GNU Affero General Public License into a single +combined work, and to convey the resulting work. The terms of this +License will continue to apply to the part which is the covered work, +but the special requirements of the GNU Affero General Public License, +section 13, concerning interaction through a network will apply to the +combination as such. + + 14. Revised Versions of this License. + + The Free Software Foundation may publish revised and/or new versions of +the GNU General Public License from time to time. Such new versions will +be similar in spirit to the present version, but may differ in detail to +address new problems or concerns. + + Each version is given a distinguishing version number. If the +Program specifies that a certain numbered version of the GNU General +Public License "or any later version" applies to it, you have the +option of following the terms and conditions either of that numbered +version or of any later version published by the Free Software +Foundation. If the Program does not specify a version number of the +GNU General Public License, you may choose any version ever published +by the Free Software Foundation. + + If the Program specifies that a proxy can decide which future +versions of the GNU General Public License can be used, that proxy's +public statement of acceptance of a version permanently authorizes you +to choose that version for the Program. + + Later license versions may give you additional or different +permissions. However, no additional obligations are imposed on any +author or copyright holder as a result of your choosing to follow a +later version. + + 15. Disclaimer of Warranty. + + THERE IS NO WARRANTY FOR THE PROGRAM, TO THE EXTENT PERMITTED BY +APPLICABLE LAW. EXCEPT WHEN OTHERWISE STATED IN WRITING THE COPYRIGHT +HOLDERS AND/OR OTHER PARTIES PROVIDE THE PROGRAM "AS IS" WITHOUT WARRANTY +OF ANY KIND, EITHER EXPRESSED OR IMPLIED, INCLUDING, BUT NOT LIMITED TO, +THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR +PURPOSE. THE ENTIRE RISK AS TO THE QUALITY AND PERFORMANCE OF THE PROGRAM +IS WITH YOU. SHOULD THE PROGRAM PROVE DEFECTIVE, YOU ASSUME THE COST OF +ALL NECESSARY SERVICING, REPAIR OR CORRECTION. + + 16. Limitation of Liability. + + IN NO EVENT UNLESS REQUIRED BY APPLICABLE LAW OR AGREED TO IN WRITING +WILL ANY COPYRIGHT HOLDER, OR ANY OTHER PARTY WHO MODIFIES AND/OR CONVEYS +THE PROGRAM AS PERMITTED ABOVE, BE LIABLE TO YOU FOR DAMAGES, INCLUDING ANY +GENERAL, SPECIAL, INCIDENTAL OR CONSEQUENTIAL DAMAGES ARISING OUT OF THE +USE OR INABILITY TO USE THE PROGRAM (INCLUDING BUT NOT LIMITED TO LOSS OF +DATA OR DATA BEING RENDERED INACCURATE OR LOSSES SUSTAINED BY YOU OR THIRD +PARTIES OR A FAILURE OF THE PROGRAM TO OPERATE WITH ANY OTHER PROGRAMS), +EVEN IF SUCH HOLDER OR OTHER PARTY HAS BEEN ADVISED OF THE POSSIBILITY OF +SUCH DAMAGES. + + 17. Interpretation of Sections 15 and 16. + + If the disclaimer of warranty and limitation of liability provided +above cannot be given local legal effect according to their terms, +reviewing courts shall apply local law that most closely approximates +an absolute waiver of all civil liability in connection with the +Program, unless a warranty or assumption of liability accompanies a +copy of the Program in return for a fee. + + END OF TERMS AND CONDITIONS + + How to Apply These Terms to Your New Programs + + If you develop a new program, and you want it to be of the greatest +possible use to the public, the best way to achieve this is to make it +free software which everyone can redistribute and change under these terms. + + To do so, attach the following notices to the program. It is safest +to attach them to the start of each source file to most effectively +state the exclusion of warranty; and each file should have at least +the "copyright" line and a pointer to where the full notice is found. + + + Copyright (C) + + This program is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program. If not, see . + +Also add information on how to contact you by electronic and paper mail. + + If the program does terminal interaction, make it output a short +notice like this when it starts in an interactive mode: + + Copyright (C) + This program comes with ABSOLUTELY NO WARRANTY; for details type `show w'. + This is free software, and you are welcome to redistribute it + under certain conditions; type `show c' for details. + +The hypothetical commands `show w' and `show c' should show the appropriate +parts of the General Public License. Of course, your program's commands +might be different; for a GUI interface, you would use an "about box". + + You should also get your employer (if you work as a programmer) or school, +if any, to sign a "copyright disclaimer" for the program, if necessary. +For more information on this, and how to apply and follow the GNU GPL, see +. + + The GNU General Public License does not permit incorporating your program +into proprietary programs. If your program is a subroutine library, you +may consider it more useful to permit linking proprietary applications with +the library. If this is what you want to do, use the GNU Lesser General +Public License instead of this License. But first, please read +. diff --git a/vendor/github.com/lanrat/extsort/Makefile b/vendor/github.com/lanrat/extsort/Makefile new file mode 100644 index 000000000..8ed94562d --- /dev/null +++ b/vendor/github.com/lanrat/extsort/Makefile @@ -0,0 +1,24 @@ + +ALL_SOURCES := $(shell find . -type f -name '*.go') + +.PHONY: fmt check test cover coverhtml + +test: + go test -v ./... + @echo "< ALL TESTS PASS >" + +fmt: + gofmt -s -w -l . + +coverage.out: $(ALL_SOURCES) + go test -coverprofile=coverage.out ./... + +cover: coverage.out + go tool cover -func=coverage.out + +coverhtml: coverage.out + go tool cover -html=coverage.out + +check: + golangci-lint run ./... || true + staticcheck -checks all ./... diff --git a/vendor/github.com/lanrat/extsort/README.md b/vendor/github.com/lanrat/extsort/README.md new file mode 100644 index 000000000..e5ebe31c5 --- /dev/null +++ b/vendor/github.com/lanrat/extsort/README.md @@ -0,0 +1,79 @@ +# extsort + +[![PkgGoDev](https://pkg.go.dev/badge/github.com/lanrat/extsort)](https://pkg.go.dev/github.com/lanrat/extsort) + +[![Go Report Card](https://goreportcard.com/badge/github.com/lanrat/extsort)](https://goreportcard.com/report/github.com/lanrat/extsort) + +An [external sorting](https://en.wikipedia.org/wiki/External_sorting) library for golang (i.e. on disk sorting) on an arbitrarily channel, even if the generated content doesn't all fit in memory at once. Once sorted, it returns a new channel returning data in sorted order. + +In order to remain efficient for all implementations, extsort doesn't handle serialization, but leaves that to the user by operating on types that implement the [`SortType.ToBytes`](https://pkg.go.dev/github.com/lanrat/extsort#SortType) and [`FromBytes`](https://pkg.go.dev/github.com/lanrat/extsort#FromBytes) interfaces. + +extsort also has a `Strings()` interface which does not require the overhead of converting everything to/from bytes and is faster for string types. + +extsort is not a [stable sort](https://en.wikipedia.org/wiki/Sorting_algorithm#Stability). + +## Example + +```go +package main + +import ( + "encoding/binary" + "fmt" + "math/rand" + + "github.com/lanrat/extsort" +) + +var count = int(1e7) // 10M + +type sortInt struct { + i int64 +} + +func (s sortInt) ToBytes() []byte { + buf := make([]byte, binary.MaxVarintLen64) + binary.PutVarint(buf, s.i) + return buf +} + +func sortIntFromBytes(b []byte) extsort.SortType { + i, _ := binary.Varint(b) + return sortInt{i: i} +} + +func compareSortIntLess(a, b extsort.SortType) bool { + return a.(sortInt).i < b.(sortInt).i +} + +func main() { + // create an input channel with unsorted data + inputChan := make(chan extsort.SortType) + go func() { + for i := 0; i < count; i++ { + inputChan <- sortInt{i: rand.Int63()} + } + close(inputChan) + }() + + // create the sorter and start sorting + sorter, outputChan, errChan := extsort.New(inputChan, sortIntFromBytes, compareSortIntLess, nil) + sorter.Sort(context.Background()) + + // print output sorted data + for data := range outputChan { + fmt.Printf("%d\n", data.(sortInt).i) + } + if err := <-errChan; err != nil { + fmt.Printf("err: %s", err.Error()) + } +} +``` + +## [extsort/diff](https://pkg.go.dev/github.com/lanrat/extsort/diff) + +The diff sub-package is a self-contained package that assists with diffing of channels of sorted data. It can be used with the extsort methods or on its own + +## TODO + +* parallelize merging after sorting diff --git a/vendor/github.com/lanrat/extsort/config.go b/vendor/github.com/lanrat/extsort/config.go new file mode 100644 index 000000000..6ab368c27 --- /dev/null +++ b/vendor/github.com/lanrat/extsort/config.go @@ -0,0 +1,42 @@ +package extsort + +// Config holds configuration settings for extsort +type Config struct { + ChunkSize int // amount of records to store in each chunk which will be written to disk + NumWorkers int // maximum number of workers to use for parallel sorting + ChanBuffSize int // buffer size for merging chunks + SortedChanBuffSize int // buffer size for passing records to output + TempFilesDir string // empty for use OS default ex: /tmp +} + +// DefaultConfig returns the default configuration options sued if none provided +func DefaultConfig() *Config { + return &Config{ + ChunkSize: int(1e6), // 1M + NumWorkers: 2, + ChanBuffSize: 1, + SortedChanBuffSize: 10, + TempFilesDir: "", + } +} + +// mergeConfig takes a provided config and replaces any values not set with the defaults +func mergeConfig(c *Config) *Config { + d := DefaultConfig() + if c == nil { + return d + } + if c.ChunkSize <= 1 { + c.ChunkSize = d.ChunkSize + } + if c.NumWorkers <= 1 { + c.NumWorkers = d.NumWorkers + } + if c.ChanBuffSize < 0 { + c.ChanBuffSize = d.ChanBuffSize + } + if c.SortedChanBuffSize < 0 { + c.SortedChanBuffSize = d.SortedChanBuffSize + } + return c +} diff --git a/vendor/github.com/lanrat/extsort/queue/priority_queue.go b/vendor/github.com/lanrat/extsort/queue/priority_queue.go new file mode 100644 index 000000000..c5dbba001 --- /dev/null +++ b/vendor/github.com/lanrat/extsort/queue/priority_queue.go @@ -0,0 +1,106 @@ +// Package queue provided a generic priority queue implemtation based on the internal heap +package queue + +// Priority queue based on +// https://golang.org/pkg/container/heap/#example__priorityQueue + +import ( + "container/heap" + "fmt" +) + +// item is a container for holding values with a priority in the queue +type item struct { + value interface{} + // The index is needed by update and is maintained by the heap.Interface methods. + index int // The index of the item in the heap. +} + +// innerPriorityQueue implements heap.Interface and holds Items +type innerPriorityQueue struct { + items []*item + lessFunc func(interface{}, interface{}) bool +} + +// PriorityQueue implemented using a heap +type PriorityQueue struct { + ipq innerPriorityQueue +} + +// NewPriorityQueue creates a new heap based PriorityQueue using cmpFunc as the comparison function +func NewPriorityQueue(cmpFunc func(interface{}, interface{}) bool) *PriorityQueue { + var pq PriorityQueue + pq.ipq.items = make([]*item, 0) + pq.ipq.lessFunc = cmpFunc + heap.Init(&pq.ipq) + return &pq +} + +// Len returns the number of items in the queue +func (pq *PriorityQueue) Len() int { + return pq.ipq.Len() +} + +// Push adds x to the queue +func (pq *PriorityQueue) Push(x interface{}) { + var i item + i.value = x + heap.Push(&pq.ipq, i) + heap.Fix(&pq.ipq, i.index) +} + +// Pop removes and returns the next item in the queue +func (pq *PriorityQueue) Pop() interface{} { + item := heap.Pop(&pq.ipq).(*item) + return item.value +} + +// Peek returns the next item in the queue without removing it +func (pq *PriorityQueue) Peek() interface{} { + return pq.ipq.items[0].value +} + +// PeekUpdate reorders the backing heap with the new values +func (pq *PriorityQueue) PeekUpdate() { + heap.Fix(&pq.ipq, 0) +} + +// Print prints the current ordered queue +func (pq *PriorityQueue) Print() { + fmt.Print("[") + for i := range pq.ipq.items { + fmt.Print(pq.ipq.items[i].value, ", ") + + } + fmt.Println("]") +} + +func (pq *innerPriorityQueue) Len() int { + return len(pq.items) +} + +func (pq *innerPriorityQueue) Less(i, j int) bool { + return pq.lessFunc(pq.items[i].value, pq.items[j].value) +} + +func (pq *innerPriorityQueue) Swap(i, j int) { + pq.items[i], pq.items[j] = pq.items[j], pq.items[i] + pq.items[i].index = i + pq.items[j].index = j +} + +func (pq *innerPriorityQueue) Push(x interface{}) { + n := len(pq.items) + i := x.(item) + i.index = n + pq.items = append(pq.items, &i) +} + +func (pq *innerPriorityQueue) Pop() interface{} { + old := pq.items + n := len(old) + item := old[n-1] + item.index = -1 // for safety + pq.items = old[0 : n-1] + return item +} diff --git a/vendor/github.com/lanrat/extsort/sort_sorttype.go b/vendor/github.com/lanrat/extsort/sort_sorttype.go new file mode 100644 index 000000000..45f56aa40 --- /dev/null +++ b/vendor/github.com/lanrat/extsort/sort_sorttype.go @@ -0,0 +1,309 @@ +// Package extsort implements an unstable external sort for all the records in a chan or iterator +package extsort + +import ( + "bufio" + "context" + "encoding/binary" + "io" + "sort" + + "github.com/lanrat/extsort/queue" + "github.com/lanrat/extsort/tempfile" + + "golang.org/x/sync/errgroup" +) + +type chunk struct { + data []SortType + less CompareLessFunc +} + +func newChunk(size int, lessFunc CompareLessFunc) *chunk { + c := new(chunk) + c.less = lessFunc + c.data = make([]SortType, 0, size) + return c +} + +func (c *chunk) Len() int { + return len(c.data) +} + +func (c *chunk) Swap(i, j int) { + c.data[i], c.data[j] = c.data[j], c.data[i] +} + +func (c *chunk) Less(i, j int) bool { + return c.less(c.data[i], c.data[j]) +} + +// SortTypeSorter stores an input chan and feeds Sort to return a sorted chan +type SortTypeSorter struct { + config Config + buildSortCtx context.Context + saveCtx context.Context + mergeErrChan chan error + tempWriter tempfile.TempWriter + tempReader tempfile.TempReader + input chan SortType + chunkChan chan *chunk + saveChunkChan chan *chunk + mergeChunkChan chan SortType + lessFunc CompareLessFunc + fromBytes FromBytes +} + +func newSorter(i chan SortType, fromBytes FromBytes, lessFunc CompareLessFunc, config *Config) *SortTypeSorter { + s := new(SortTypeSorter) + s.input = i + s.lessFunc = lessFunc + s.fromBytes = fromBytes + s.config = *mergeConfig(config) + s.chunkChan = make(chan *chunk, s.config.ChanBuffSize) + s.saveChunkChan = make(chan *chunk, s.config.ChanBuffSize) + s.mergeChunkChan = make(chan SortType, s.config.SortedChanBuffSize) + s.mergeErrChan = make(chan error, 1) + return s +} + +// New returns a new Sorter instance that can be used to sort the input chan +// fromBytes is needed to unmarshal SortTypes from []byte on disk +// lessfunc is the comparator used for SortType +// config ca be nil to use the defaults, or only set the non-default values desired +// if errors or interupted, may leave temp files behind in config.TempFilesDir +// the returned chanels contain the data returned from calling Sort() +func New(i chan SortType, fromBytes FromBytes, lessFunc CompareLessFunc, config *Config) (*SortTypeSorter, chan SortType, chan error) { + var err error + s := newSorter(i, fromBytes, lessFunc, config) + s.tempWriter, err = tempfile.New(s.config.TempFilesDir) + if err != nil { + s.mergeErrChan <- err + close(s.mergeErrChan) + close(s.mergeChunkChan) + } + return s, s.mergeChunkChan, s.mergeErrChan +} + +// NewMock is the same as New() but is backed by memory instead of a temporary file on disk +// n is the size to initialize the backing bytes buffer too +func NewMock(i chan SortType, fromBytes FromBytes, lessFunc CompareLessFunc, config *Config, n int) (*SortTypeSorter, chan SortType, chan error) { + s := newSorter(i, fromBytes, lessFunc, config) + s.tempWriter = tempfile.Mock(n) + return s, s.mergeChunkChan, s.mergeErrChan +} + +// Sort sorts the Sorter's input chan and returns a new sorted chan, and error Chan +// Sort is a chunking operation that runs multiple workers asynchronously +// this blocks while sorting chunks and unblocks when merging +// NOTE: the context passed to Sort must outlive Sort() returning. +// Merge uses the same context and runs in a goroutine after Sort returns(). +// for example, if calling sort in an errGroup, you must pass the group's parent context into sort. +func (s *SortTypeSorter) Sort(ctx context.Context) { + var buildSortErrGroup, saveErrGroup *errgroup.Group + buildSortErrGroup, s.buildSortCtx = errgroup.WithContext(ctx) + saveErrGroup, s.saveCtx = errgroup.WithContext(ctx) + + //start creating chunks + buildSortErrGroup.Go(s.buildChunks) + + // sort chunks + for i := 0; i < s.config.NumWorkers; i++ { + buildSortErrGroup.Go(s.sortChunks) + } + + // save chunks + saveErrGroup.Go(s.saveChunks) + + err := buildSortErrGroup.Wait() + if err != nil { + s.mergeErrChan <- err + close(s.mergeErrChan) + close(s.mergeChunkChan) + return + } + + // need to close saveChunkChan + close(s.saveChunkChan) + err = saveErrGroup.Wait() + if err != nil { + s.mergeErrChan <- err + close(s.mergeErrChan) + close(s.mergeChunkChan) + return + } + + // read chunks and merge + // if this errors, it is returned in the errorChan + go s.mergeNChunks(ctx) +} + +// buildChunks reads data from the input chan to builds chunks and pushes them to chunkChan +func (s *SortTypeSorter) buildChunks() error { + defer close(s.chunkChan) // if this is not called on error, causes a deadlock + + for { + c := newChunk(s.config.ChunkSize, s.lessFunc) + for i := 0; i < s.config.ChunkSize; i++ { + select { + case rec, ok := <-s.input: + if !ok { + break + } + c.data = append(c.data, rec) + case <-s.buildSortCtx.Done(): + return s.buildSortCtx.Err() + } + } + if len(c.data) == 0 { + // the chunk is empty + break + } + + // chunk is now full + s.chunkChan <- c + } + + return nil +} + +// sortChunks is a worker for sorting the data stored in a chunk prior to save +func (s *SortTypeSorter) sortChunks() error { + for { + select { + case b, more := <-s.chunkChan: + if more { + // sort + sort.Sort(b) + // save + s.saveChunkChan <- b + } else { + return nil + } + case <-s.buildSortCtx.Done(): + return s.buildSortCtx.Err() + } + } +} + +// saveChunks is a worker for saving sorted data to disk +func (s *SortTypeSorter) saveChunks() error { + var err error + scratch := make([]byte, binary.MaxVarintLen64) + for { + select { + case b, more := <-s.saveChunkChan: + if more { + for _, d := range b.data { + // binary encoding for size + raw := d.ToBytes() + n := binary.PutUvarint(scratch, uint64(len(raw))) + _, err = s.tempWriter.Write(scratch[:n]) + if err != nil { + return err + } + // add data + _, err = s.tempWriter.Write(raw) + if err != nil { + return err + } + } + _, err = s.tempWriter.Next() + if err != nil { + return err + } + } else { + s.tempReader, err = s.tempWriter.Save() + return err + } + case <-s.saveCtx.Done(): + // delete the temp file from disk (error unchecked) + s.tempWriter.Close() + return s.saveCtx.Err() + } + } +} + +// mergeNChunks runs asynchronously in the background feeding data to getNext +// sends errors to s.mergeErrorChan +func (s *SortTypeSorter) mergeNChunks(ctx context.Context) { + //populate queue with data from mergeFile list + defer close(s.mergeChunkChan) + // close temp file when done + defer func() { + err := s.tempReader.Close() + if err != nil { + s.mergeErrChan <- err + } + }() + defer close(s.mergeErrChan) + pq := queue.NewPriorityQueue(func(a, b interface{}) bool { + return s.lessFunc(a.(*mergeFile).nextRec, b.(*mergeFile).nextRec) + }) + + for i := 0; i < s.tempReader.Size(); i++ { + merge := new(mergeFile) + merge.fromBytes = s.fromBytes + merge.reader = s.tempReader.Read(i) + _, ok, err := merge.getNext() // start the merge by preloading the values + if err == io.EOF || !ok { + continue + } + if err != nil { + s.mergeErrChan <- err + return + } + pq.Push(merge) + } + + for pq.Len() > 0 { + merge := pq.Peek().(*mergeFile) + rec, more, err := merge.getNext() + if err != nil { + s.mergeErrChan <- err + return + } + if more { + pq.PeekUpdate() + } else { + pq.Pop() + } + // check for err in context just in case + select { + case s.mergeChunkChan <- rec: + case <-ctx.Done(): + s.mergeErrChan <- err + return + } + } +} + +// mergefile represents each sorted chunk on disk and its next value +type mergeFile struct { + nextRec SortType + fromBytes FromBytes + reader *bufio.Reader +} + +// getNext returns the next value from the sorted chunk on disk +// the first call will return nil while the struct is initialized +func (m *mergeFile) getNext() (SortType, bool, error) { + var newRecBytes []byte + old := m.nextRec + + n, err := binary.ReadUvarint(m.reader) + if err == nil { + newRecBytes = make([]byte, int(n)) + _, err = io.ReadFull(m.reader, newRecBytes) + } + if err != nil { + if err == io.EOF { + m.nextRec = nil + return old, false, nil + } + return nil, false, err + } + + m.nextRec = m.fromBytes(newRecBytes) + return old, true, nil +} diff --git a/vendor/github.com/lanrat/extsort/sort_strings.go b/vendor/github.com/lanrat/extsort/sort_strings.go new file mode 100644 index 000000000..216821722 --- /dev/null +++ b/vendor/github.com/lanrat/extsort/sort_strings.go @@ -0,0 +1,294 @@ +package extsort + +import ( + "bufio" + "context" + "encoding/binary" + "io" + "sort" + + "github.com/lanrat/extsort/queue" + "github.com/lanrat/extsort/tempfile" + + "golang.org/x/sync/errgroup" +) + +type stringChunk struct { + data []string +} + +func newStringChunk(size int) *stringChunk { + c := new(stringChunk) + c.data = make([]string, 0, size) + return c +} + +func (c *stringChunk) Len() int { + return len(c.data) +} + +func (c *stringChunk) Swap(i, j int) { + c.data[i], c.data[j] = c.data[j], c.data[i] +} + +func (c *stringChunk) Less(i, j int) bool { + return c.data[i] < c.data[j] +} + +// StringSorter stores an input chan and feeds Sort to return a sorted chan +type StringSorter struct { + config Config + buildSortCtx context.Context + saveCtx context.Context + mergeErrChan chan error + tempWriter tempfile.TempWriter + tempReader tempfile.TempReader + input chan string + chunkChan chan *stringChunk + saveChunkChan chan *stringChunk + mergeChunkChan chan string +} + +func newStringSorter(i chan string, config *Config) *StringSorter { + s := new(StringSorter) + s.input = i + s.config = *mergeConfig(config) + s.chunkChan = make(chan *stringChunk, s.config.ChanBuffSize) + s.saveChunkChan = make(chan *stringChunk, s.config.ChanBuffSize) + s.mergeChunkChan = make(chan string, s.config.SortedChanBuffSize) + s.mergeErrChan = make(chan error, 1) + return s +} + +// StringsMock is the same as Strings() but is backed by memory instead of a temporary file on disk +// n is the size to initialize the backing bytes buffer too +func StringsMock(i chan string, config *Config, n int) (*StringSorter, chan string, chan error) { + s := newStringSorter(i, config) + s.tempWriter = tempfile.Mock(n) + return s, s.mergeChunkChan, s.mergeErrChan +} + +// Strings returns a new Sorter instance that can be used to sort the input chan +func Strings(i chan string, config *Config) (*StringSorter, chan string, chan error) { + var err error + s := newStringSorter(i, config) + s.tempWriter, err = tempfile.New(s.config.TempFilesDir) + if err != nil { + s.mergeErrChan <- err + close(s.mergeErrChan) + close(s.mergeChunkChan) + } + return s, s.mergeChunkChan, s.mergeErrChan +} + +// Sort sorts the Sorter's input chan and returns a new sorted chan, and error Chan +// Sort is a chunking operation that runs multiple workers asynchronously +// this blocks while sorting chunks and unblocks when merging +// NOTE: the context passed to Sort must outlive Sort() returning. +// merge used the same context and runs in a goroutine after Sort returns() +// for example, if calling sort in an errGroup, you must pass the group's parent context into sort. +func (s *StringSorter) Sort(ctx context.Context) { + var buildSortErrGroup, saveErrGroup *errgroup.Group + buildSortErrGroup, s.buildSortCtx = errgroup.WithContext(ctx) + saveErrGroup, s.saveCtx = errgroup.WithContext(ctx) + + //start creating chunks + buildSortErrGroup.Go(s.buildChunks) + + // sort chunks + for i := 0; i < s.config.NumWorkers; i++ { + buildSortErrGroup.Go(s.sortChunks) + } + + // save chunks + saveErrGroup.Go(s.saveChunks) + + err := buildSortErrGroup.Wait() + if err != nil { + s.mergeErrChan <- err + close(s.mergeErrChan) + close(s.mergeChunkChan) + return + } + + // need to close saveChunkChan + close(s.saveChunkChan) + err = saveErrGroup.Wait() + if err != nil { + s.mergeErrChan <- err + close(s.mergeErrChan) + close(s.mergeChunkChan) + return + } + + // read chunks and merge + // if this errors, it is returned in the errorChan + go s.mergeNChunks(ctx) +} + +// buildChunks reads data from the input chan to builds chunks and pushes them to chunkChan +func (s *StringSorter) buildChunks() error { + defer close(s.chunkChan) // if this is not called on error, causes a deadlock + + for { + c := newStringChunk(s.config.ChunkSize) + for i := 0; i < s.config.ChunkSize; i++ { + select { + case rec, ok := <-s.input: + if !ok { + break + } + c.data = append(c.data, rec) + case <-s.buildSortCtx.Done(): + return s.buildSortCtx.Err() + } + } + if len(c.data) == 0 { + // the chunk is empty + break + } + + // chunk is now full + s.chunkChan <- c + } + + return nil +} + +// sortChunks is a worker for sorting the data stored in a chunk prior to save +func (s *StringSorter) sortChunks() error { + for { + select { + case b, more := <-s.chunkChan: + if more { + // sort + sort.Sort(b) + // save + s.saveChunkChan <- b + } else { + return nil + } + case <-s.buildSortCtx.Done(): + return s.buildSortCtx.Err() + } + } +} + +// saveChunks is a worker for saving sorted data to disk +func (s *StringSorter) saveChunks() error { + var err error + scratch := make([]byte, binary.MaxVarintLen64) + for { + select { + case b, more := <-s.saveChunkChan: + if more { + for _, d := range b.data { + // binary encoding for size + n := binary.PutUvarint(scratch, uint64(len(d))) + _, err = s.tempWriter.Write(scratch[:n]) + if err != nil { + return err + } + // add data + _, err = s.tempWriter.WriteString(d) + if err != nil { + return err + } + } + _, err = s.tempWriter.Next() + if err != nil { + return err + } + } else { + s.tempReader, err = s.tempWriter.Save() + return err + } + case <-s.saveCtx.Done(): + // delete the temp file from disk (error unchecked) + s.tempWriter.Close() + return s.saveCtx.Err() + } + } +} + +// mergeNChunks runs asynchronously in the background feeding data to getNext +// sends errors to s.mergeErrorChan +func (s *StringSorter) mergeNChunks(ctx context.Context) { + //populate queue with data from mergeFile list + defer close(s.mergeChunkChan) + // close temp file when done + defer func() { + err := s.tempReader.Close() + if err != nil { + s.mergeErrChan <- err + } + }() + defer close(s.mergeErrChan) + pq := queue.NewPriorityQueue(func(a, b interface{}) bool { + return a.(*mergeStringFile).nextRec < b.(*mergeStringFile).nextRec + }) + + for i := 0; i < s.tempReader.Size(); i++ { + merge := new(mergeStringFile) + merge.reader = s.tempReader.Read(i) + _, ok, err := merge.getNext() // start the merge by preloading the values + if err == io.EOF || !ok { + continue + } + if err != nil { + s.mergeErrChan <- err + return + } + pq.Push(merge) + } + + for pq.Len() > 0 { + merge := pq.Peek().(*mergeStringFile) + rec, more, err := merge.getNext() + if err != nil { + s.mergeErrChan <- err + return + } + if more { + pq.PeekUpdate() + } else { + pq.Pop() + } + // check for err in context just in case + select { + case s.mergeChunkChan <- rec: + case <-ctx.Done(): + s.mergeErrChan <- ctx.Err() + return + } + } +} + +// mergefile represents each sorted chunk on disk and its next value +type mergeStringFile struct { + nextRec string + reader *bufio.Reader +} + +// getNext returns the next value from the sorted chunk on disk +// the first call will return nil while the struct is initialized +func (m *mergeStringFile) getNext() (string, bool, error) { + var newRecBytes []byte + old := m.nextRec + + n, err := binary.ReadUvarint(m.reader) + if err == nil { + newRecBytes = make([]byte, int(n)) + _, err = io.ReadFull(m.reader, newRecBytes) + } + if err != nil { + if err == io.EOF { + m.nextRec = "" + return old, false, nil + } + return "", false, err + } + + m.nextRec = string(newRecBytes) + return old, true, nil +} diff --git a/vendor/github.com/lanrat/extsort/tempfile/mockfile.go b/vendor/github.com/lanrat/extsort/tempfile/mockfile.go new file mode 100644 index 000000000..37154f586 --- /dev/null +++ b/vendor/github.com/lanrat/extsort/tempfile/mockfile.go @@ -0,0 +1,106 @@ +package tempfile + +import ( + "bufio" + "bytes" + "io" +) + +// MockFileWriter allows writeing to a temp file(s) that are backed by memory +type MockFileWriter struct { + data *bytes.Buffer + sections []int +} + +// mockFileReader allows reading from a temp file(s) that are backed by memory +type mockFileReader struct { + data *bytes.Reader + sections []int + readers []*bufio.Reader +} + +// Mock returns a new tempfileWrite backed by memory +func Mock(n int) *MockFileWriter { + var m MockFileWriter + m.data = bytes.NewBuffer(make([]byte, 0, n)) + return &m +} + +// Size return the number of "files" saved +func (w *MockFileWriter) Size() int { + // we add one because we only write to the sections when we are done + return len(w.sections) + 1 +} + +// Close stops the tempfile from accepting new data, +// works like an abort, unrecoverable +func (w *MockFileWriter) Close() error { + w.data.Reset() + w.sections = nil + w.data = nil + return nil +} + +// Write writes the byte to the temp file +func (w *MockFileWriter) Write(p []byte) (int, error) { + return w.data.Write(p) +} + +// WriteString writes the string to the temp file +func (w *MockFileWriter) WriteString(s string) (int, error) { + return w.data.WriteString(s) +} + +// Next stops writeing the the current section/file and prepaired the tempwriter for the next one +func (w *MockFileWriter) Next() (int64, error) { + // save offsets + pos := w.data.Len() + w.sections = append(w.sections, pos) + return int64(pos), nil +} + +// Save stops allowing new writes and returns a TempReader for reading the data back +func (w *MockFileWriter) Save() (TempReader, error) { + _, err := w.Next() + if err != nil { + return nil, err + } + return newMockTempReader(w.sections, w.data.Bytes()) +} + +func newMockTempReader(sections []int, data []byte) (*mockFileReader, error) { + // create TempReader + var r mockFileReader + r.data = bytes.NewReader(data) + r.sections = sections + r.readers = make([]*bufio.Reader, len(r.sections)) + + offset := 0 + for i, end := range r.sections { + section := io.NewSectionReader(r.data, int64(offset), int64(end-offset)) + offset = end + r.readers[i] = bufio.NewReaderSize(section, fileBufferSize) + } + + return &r, nil +} + +// Close does nothing much on a MockTempWriter +func (r *mockFileReader) Close() error { + r.readers = nil + r.data = nil + return nil +} + +// Size returns the number of sections/files in the reader +func (r *mockFileReader) Size() int { + return len(r.readers) +} + +// Read returns a reader for the provided section +func (r *mockFileReader) Read(i int) *bufio.Reader { + if i < 0 || i >= len(r.readers) { + panic("tempfile: read request out of range") + } + return r.readers[i] +} diff --git a/vendor/github.com/lanrat/extsort/tempfile/tempfile.go b/vendor/github.com/lanrat/extsort/tempfile/tempfile.go new file mode 100644 index 000000000..3dc34b769 --- /dev/null +++ b/vendor/github.com/lanrat/extsort/tempfile/tempfile.go @@ -0,0 +1,154 @@ +// Package tempfile implements a virtual temp files that can be written to (in series) +// and then read back (series/parallel) and then removed from the filesystem when done +// if multiple "tempfiles" are needed on the application layer, they are mapped to +// sections of the same real file on the filesystem +package tempfile + +import ( + "bufio" + "fmt" + "io" + "io/ioutil" + "os" +) + +// file IO buffer size for each file +const fileBufferSize = 1 << 16 // 64k + +// filename prefix for files put in temp directory +var mergeFilenamePrefix = fmt.Sprintf("extsort_%d_", os.Getpid()) + +// FileWriter allows for creating virtual temp files for reading/writing +type FileWriter struct { + file *os.File + bufWriter *bufio.Writer + sections []int64 +} + +type fileReader struct { + file *os.File + sections []int64 + readers []*bufio.Reader +} + +// New creates a new timefile in dir, if dir is empty the OS default is used +func New(dir string) (*FileWriter, error) { + var w FileWriter + var err error + w.file, err = ioutil.TempFile(dir, mergeFilenamePrefix) + if err != nil { + return nil, err + } + w.bufWriter = bufio.NewWriterSize(w.file, fileBufferSize) + w.sections = make([]int64, 0, 10) + + return &w, nil +} + +// Size returns the number of virtual files created +func (w *FileWriter) Size() int { + // we add one because we only write to the sections when we are done + return len(w.sections) + 1 +} + +// Name returns the full path of the underlying file on the OS +func (w *FileWriter) Name() string { + return w.file.Name() +} + +// Close stops the tempfile from accepting new data, +// closes the file, and removes the temp file from disk +// works like an abort, unrecoverable +func (w *FileWriter) Close() error { + err := w.file.Close() + if err != nil { + return err + } + w.sections = nil + w.bufWriter = nil + return os.Remove(w.file.Name()) +} + +// Write writes a byte to the current file section +func (w *FileWriter) Write(p []byte) (int, error) { + return w.bufWriter.Write(p) +} + +// WriteString writes s to the current file section +func (w *FileWriter) WriteString(s string) (int, error) { + return w.bufWriter.WriteString(s) +} + +// Next stops writeing the the current section/file and prepaired the tempwriter for the next one +func (w *FileWriter) Next() (int64, error) { + // save offsets + err := w.bufWriter.Flush() + if err != nil { + return 0, err + } + pos, err := w.file.Seek(0, io.SeekCurrent) + if err != nil { + return 0, err + } + w.sections = append(w.sections, pos) + + return pos, nil +} + +// Save stops writing new data/sections to the temp file and returns a reader for reading the data/sections back +func (w *FileWriter) Save() (TempReader, error) { + _, err := w.Next() + if err != nil { + return nil, err + } + err = w.file.Sync() + if err != nil { + return nil, err + } + err = w.file.Close() + if err != nil { + return nil, err + } + return newTempReader(w.file.Name(), w.sections) +} + +func newTempReader(filename string, sections []int64) (*fileReader, error) { + // create TempReader + var err error + var r fileReader + r.file, err = os.Open(filename) + if err != nil { + return nil, err + } + r.sections = sections + r.readers = make([]*bufio.Reader, len(r.sections)) + + offset := int64(0) + for i, end := range r.sections { + section := io.NewSectionReader(r.file, offset, end-offset) + offset = end + r.readers[i] = bufio.NewReaderSize(section, fileBufferSize) + } + + return &r, nil +} + +func (r *fileReader) Close() error { + r.readers = nil + err := r.file.Close() + if err != nil { + return err + } + return os.Remove(r.file.Name()) +} + +func (r *fileReader) Size() int { + return len(r.readers) +} + +func (r *fileReader) Read(i int) *bufio.Reader { + if i < 0 || i >= len(r.readers) { + panic("tempfile: read request out of range") + } + return r.readers[i] +} diff --git a/vendor/github.com/lanrat/extsort/tempfile/types.go b/vendor/github.com/lanrat/extsort/tempfile/types.go new file mode 100644 index 000000000..06c577155 --- /dev/null +++ b/vendor/github.com/lanrat/extsort/tempfile/types.go @@ -0,0 +1,23 @@ +package tempfile + +import ( + "bufio" + "io" +) + +// TempWriter interface defines the virtual tempfile creation/writing stage +type TempWriter interface { + io.Closer + Size() int + Write(p []byte) (int, error) + WriteString(s string) (int, error) + Next() (int64, error) + Save() (TempReader, error) +} + +// TempReader defins the methods to allow reading the sections from tempfiles +type TempReader interface { + io.Closer + Size() int + Read(i int) *bufio.Reader +} diff --git a/vendor/github.com/lanrat/extsort/types.go b/vendor/github.com/lanrat/extsort/types.go new file mode 100644 index 000000000..64b3c1f91 --- /dev/null +++ b/vendor/github.com/lanrat/extsort/types.go @@ -0,0 +1,19 @@ +package extsort + +import "context" + +// SortType defines the interface required by the extsort library to be able to sort the items +type SortType interface { + ToBytes() []byte // ToBytes used for marshaling with gob +} + +// FromBytes unmarshal bytes from gob to create a SortType when reading back the sorted items +type FromBytes func([]byte) SortType + +// CompareLessFunc compares two SortType items and returns true if a is less than b +type CompareLessFunc func(a, b SortType) bool + +// Sorter is the interface that all extsort sorters must satisfy +type Sorter interface { + Sort(context.Context) +} diff --git a/vendor/github.com/lanrat/extsort/uniq.go b/vendor/github.com/lanrat/extsort/uniq.go new file mode 100644 index 000000000..3efed1938 --- /dev/null +++ b/vendor/github.com/lanrat/extsort/uniq.go @@ -0,0 +1,24 @@ +package extsort + +// UniqStringChan returns a channel identical to the input but only with uniq elements +// only works on sorted inputs +func UniqStringChan(in chan string) chan string { + out := make(chan string) + go func() { + var prior string + priorSet := false + for d := range in { + if priorSet { + if d == prior { + continue + } + } else { + priorSet = true + } + out <- d + prior = d + } + close(out) + }() + return out +} diff --git a/vendor/golang.org/x/sync/AUTHORS b/vendor/golang.org/x/sync/AUTHORS new file mode 100644 index 000000000..15167cd74 --- /dev/null +++ b/vendor/golang.org/x/sync/AUTHORS @@ -0,0 +1,3 @@ +# This source code refers to The Go Authors for copyright purposes. +# The master list of authors is in the main Go distribution, +# visible at http://tip.golang.org/AUTHORS. diff --git a/vendor/golang.org/x/sync/CONTRIBUTORS b/vendor/golang.org/x/sync/CONTRIBUTORS new file mode 100644 index 000000000..1c4577e96 --- /dev/null +++ b/vendor/golang.org/x/sync/CONTRIBUTORS @@ -0,0 +1,3 @@ +# This source code was written by the Go contributors. +# The master list of contributors is in the main Go distribution, +# visible at http://tip.golang.org/CONTRIBUTORS. diff --git a/vendor/golang.org/x/sync/LICENSE b/vendor/golang.org/x/sync/LICENSE new file mode 100644 index 000000000..6a66aea5e --- /dev/null +++ b/vendor/golang.org/x/sync/LICENSE @@ -0,0 +1,27 @@ +Copyright (c) 2009 The Go Authors. All rights reserved. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions are +met: + + * Redistributions of source code must retain the above copyright +notice, this list of conditions and the following disclaimer. + * Redistributions in binary form must reproduce the above +copyright notice, this list of conditions and the following disclaimer +in the documentation and/or other materials provided with the +distribution. + * Neither the name of Google Inc. nor the names of its +contributors may be used to endorse or promote products derived from +this software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. diff --git a/vendor/golang.org/x/sync/PATENTS b/vendor/golang.org/x/sync/PATENTS new file mode 100644 index 000000000..733099041 --- /dev/null +++ b/vendor/golang.org/x/sync/PATENTS @@ -0,0 +1,22 @@ +Additional IP Rights Grant (Patents) + +"This implementation" means the copyrightable works distributed by +Google as part of the Go project. + +Google hereby grants to You a perpetual, worldwide, non-exclusive, +no-charge, royalty-free, irrevocable (except as stated in this section) +patent license to make, have made, use, offer to sell, sell, import, +transfer and otherwise run, modify and propagate the contents of this +implementation of Go, where such license applies only to those patent +claims, both currently owned or controlled by Google and acquired in +the future, licensable by Google that are necessarily infringed by this +implementation of Go. This grant does not include claims that would be +infringed only as a consequence of further modification of this +implementation. If you or your agent or exclusive licensee institute or +order or agree to the institution of patent litigation against any +entity (including a cross-claim or counterclaim in a lawsuit) alleging +that this implementation of Go or any code incorporated within this +implementation of Go constitutes direct or contributory patent +infringement, or inducement of patent infringement, then any patent +rights granted to you under this License for this implementation of Go +shall terminate as of the date such litigation is filed. diff --git a/vendor/golang.org/x/sync/errgroup/errgroup.go b/vendor/golang.org/x/sync/errgroup/errgroup.go new file mode 100644 index 000000000..9857fe53d --- /dev/null +++ b/vendor/golang.org/x/sync/errgroup/errgroup.go @@ -0,0 +1,66 @@ +// Copyright 2016 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// Package errgroup provides synchronization, error propagation, and Context +// cancelation for groups of goroutines working on subtasks of a common task. +package errgroup + +import ( + "context" + "sync" +) + +// A Group is a collection of goroutines working on subtasks that are part of +// the same overall task. +// +// A zero Group is valid and does not cancel on error. +type Group struct { + cancel func() + + wg sync.WaitGroup + + errOnce sync.Once + err error +} + +// WithContext returns a new Group and an associated Context derived from ctx. +// +// The derived Context is canceled the first time a function passed to Go +// returns a non-nil error or the first time Wait returns, whichever occurs +// first. +func WithContext(ctx context.Context) (*Group, context.Context) { + ctx, cancel := context.WithCancel(ctx) + return &Group{cancel: cancel}, ctx +} + +// Wait blocks until all function calls from the Go method have returned, then +// returns the first non-nil error (if any) from them. +func (g *Group) Wait() error { + g.wg.Wait() + if g.cancel != nil { + g.cancel() + } + return g.err +} + +// Go calls the given function in a new goroutine. +// +// The first call to return a non-nil error cancels the group; its error will be +// returned by Wait. +func (g *Group) Go(f func() error) { + g.wg.Add(1) + + go func() { + defer g.wg.Done() + + if err := f(); err != nil { + g.errOnce.Do(func() { + g.err = err + if g.cancel != nil { + g.cancel() + } + }) + } + }() +} diff --git a/vendor/modules.txt b/vendor/modules.txt index cd71373ef..da3238c41 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -92,8 +92,13 @@ github.com/karrick/godirwalk # github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51 ## explicit github.com/kballard/go-shellquote -# github.com/kr/pretty v0.2.0 +# github.com/kr/pretty v0.3.0 ## explicit; go 1.12 +# github.com/lanrat/extsort v1.0.0 +## explicit; go 1.13 +github.com/lanrat/extsort +github.com/lanrat/extsort/queue +github.com/lanrat/extsort/tempfile # github.com/pkg/errors v0.9.1 ## explicit github.com/pkg/errors @@ -128,6 +133,9 @@ github.com/xrash/smetrics # go.etcd.io/bbolt v1.3.6 ## explicit; go 1.12 go.etcd.io/bbolt +# golang.org/x/sync v0.0.0-20210220032951-036812b2e83c +## explicit +golang.org/x/sync/errgroup # golang.org/x/sys v0.0.0-20220405210540-1e041c57c461 ## explicit; go 1.17 golang.org/x/sys/internal/unsafeheader