From a403ffcec224d6edf76ce4c2ca82743837693b6c Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Sat, 2 Sep 2023 12:51:10 +0900 Subject: [PATCH] [attempt by ycombinator] concurrent dcopy (#122) * Use worker pool * Introduce Concurrency() functional option * Collect all errors * Simplify type of Concurrency option * Make channels buffered * Make input channel unbuffered * Logging for debugging * 10x channel capacities * 10x concurrency * 100x concurrency * 100x in channel * Make output channel 1000x * Reducing numWorkers multiplier by 1/10 * Removing numWorkers multiplier * Reducing in channel capacity by 1/10 * Reducing output channel capacity by 1/10 * Reducing output channel capacity by further 1/10 * Remove input channel capacity multiplier * Removing multiplication factor for output channel capacity * Remove debugging statement --------- Co-authored-by: Shaunak Kashyap --- copy.go | 81 ++++++++++++++++++++++++++++++++++++++++++++++-------- go.mod | 1 + go.sum | 6 ++++ options.go | 5 ++++ 4 files changed, 81 insertions(+), 12 deletions(-) diff --git a/copy.go b/copy.go index 085db78..560e9eb 100644 --- a/copy.go +++ b/copy.go @@ -1,11 +1,13 @@ package copy import ( + "go.uber.org/multierr" "io" "io/fs" "io/ioutil" "os" "path/filepath" + "sync" "time" ) @@ -18,36 +20,57 @@ type timespec struct { // Copy copies src to dest, doesn't matter if src is a directory or a file. func Copy(src, dest string, opts ...Options) error { opt := assureOptions(src, dest, opts...) + + var numCopyWorkers uint = 1 + if opt.Concurrency > 1 { + numCopyWorkers = opt.Concurrency + } + + inCh := make(chan workerInput, numCopyWorkers) + outCh := make(chan workerOutput, numCopyWorkers) + errCh := make(chan error) + go startWorkers(numCopyWorkers, inCh, outCh) + go processResults(outCh, errCh) + if opt.FS != nil { info, err := fs.Stat(opt.FS, src) if err != nil { return onError(src, dest, err, opt) } - return switchboard(src, dest, info, opt) + return switchboard(src, dest, info, opt, inCh) } info, err := os.Lstat(src) if err != nil { return onError(src, dest, err, opt) } - return switchboard(src, dest, info, opt) + + err = switchboard(src, dest, info, opt, inCh) + if err != nil { + close(inCh) + close(outCh) + return err + } + close(inCh) + + return <-errCh } // switchboard switches proper copy functions regarding file type, etc... // If there would be anything else here, add a case to this switchboard. -func switchboard(src, dest string, info os.FileInfo, opt Options) (err error) { +func switchboard(src, dest string, info os.FileInfo, opt Options, inCh chan workerInput) (err error) { if info.Mode()&os.ModeDevice != 0 && !opt.Specials { return onError(src, dest, err, opt) } switch { case info.Mode()&os.ModeSymlink != 0: - err = onsymlink(src, dest, opt) + err = onsymlink(src, dest, opt, inCh) case info.IsDir(): - err = dcopy(src, dest, info, opt) + err = dcopy(src, dest, info, opt, inCh) case info.Mode()&os.ModeNamedPipe != 0: err = pcopy(dest, info) default: - err = fcopy(src, dest, info, opt) + inCh <- workerInput{src, dest, info, opt} } return onError(src, dest, err, opt) @@ -56,7 +79,7 @@ func switchboard(src, dest string, info os.FileInfo, opt Options) (err error) { // copyNextOrSkip decide if this src should be copied or not. // Because this "copy" could be called recursively, // "info" MUST be given here, NOT nil. -func copyNextOrSkip(src, dest string, info os.FileInfo, opt Options) error { +func copyNextOrSkip(src, dest string, info os.FileInfo, opt Options, inCh chan workerInput) error { if opt.Skip != nil { skip, err := opt.Skip(info, src, dest) if err != nil { @@ -66,7 +89,7 @@ func copyNextOrSkip(src, dest string, info os.FileInfo, opt Options) error { return nil } } - return switchboard(src, dest, info, opt) + return switchboard(src, dest, info, opt, inCh) } // fcopy is for just a file, @@ -145,7 +168,7 @@ func fcopy(src, dest string, info os.FileInfo, opt Options) (err error) { // dcopy is for a directory, // with scanning contents inside the directory // and pass everything to "copy" recursively. -func dcopy(srcdir, destdir string, info os.FileInfo, opt Options) (err error) { +func dcopy(srcdir, destdir string, info os.FileInfo, opt Options, inCh chan workerInput) (err error) { if skip, err := onDirExists(opt, srcdir, destdir); err != nil { return err } else if skip { @@ -186,7 +209,7 @@ func dcopy(srcdir, destdir string, info os.FileInfo, opt Options) (err error) { for _, content := range contents { cs, cd := filepath.Join(srcdir, content.Name()), filepath.Join(destdir, content.Name()) - if err = copyNextOrSkip(cs, cd, content, opt); err != nil { + if err = copyNextOrSkip(cs, cd, content, opt, inCh); err != nil { // If any error, exit immediately return } @@ -224,7 +247,7 @@ func onDirExists(opt Options, srcdir, destdir string) (bool, error) { return false, nil } -func onsymlink(src, dest string, opt Options) error { +func onsymlink(src, dest string, opt Options, inCh chan workerInput) error { switch opt.OnSymlink(src) { case Shallow: if err := lcopy(src, dest); err != nil { @@ -243,7 +266,7 @@ func onsymlink(src, dest string, opt Options) error { if err != nil { return err } - return copyNextOrSkip(orig, dest, info, opt) + return copyNextOrSkip(orig, dest, info, opt, inCh) case Skip: fallthrough default: @@ -282,3 +305,37 @@ func onError(src, dest string, err error, opt Options) error { return opt.OnError(src, dest, err) } + +type workerInput struct { + src string + dest string + info os.FileInfo + opt Options +} + +type workerOutput error + +func startWorkers(numWorkers uint, inCh chan workerInput, outCh chan workerOutput) { + var wg sync.WaitGroup + for workerID := uint(0); workerID < numWorkers; workerID++ { + wg.Add(1) + go worker(&wg, inCh, outCh) + } + wg.Wait() + close(outCh) +} + +func worker(wg *sync.WaitGroup, inCh chan workerInput, outCh chan workerOutput) { + for i := range inCh { + outCh <- fcopy(i.src, i.dest, i.info, i.opt) + } + wg.Done() +} + +func processResults(out chan workerOutput, result chan error) { + var err error + for o := range out { + err = multierr.Append(err, o) + } + result <- err +} diff --git a/go.mod b/go.mod index 2263c9e..c8a1cab 100644 --- a/go.mod +++ b/go.mod @@ -4,5 +4,6 @@ go 1.18 require ( github.com/otiai10/mint v1.5.1 + go.uber.org/multierr v1.11.0 golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8 ) diff --git a/go.sum b/go.sum index 7fc5834..7ad8053 100644 --- a/go.sum +++ b/go.sum @@ -1,4 +1,10 @@ +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/otiai10/mint v1.5.1 h1:XaPLeE+9vGbuyEHem1JNk3bYc7KKqyI/na0/mLd/Kks= github.com/otiai10/mint v1.5.1/go.mod h1:MJm72SBthJjz8qhefc4z1PYEieWmy8Bku7CjcAqyUSM= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY= +go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0= +go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8 h1:0A+M6Uqn+Eje4kHMK80dtF3JCXC4ykBgQG4Fe06QRhQ= golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= diff --git a/options.go b/options.go index 1b4e508..35c8d2f 100644 --- a/options.go +++ b/options.go @@ -65,6 +65,11 @@ type Options struct { // e.g., You can use embed.FS to copy files from embedded filesystem. FS fs.FS + // If given, returns the number of workers to use to concurrently perform + // the copying operation. It the returned value is <= 1, a value of 1 is + // used and copying will proceed serially. + Concurrency uint + intent struct { src string dest string