From 74dedc7cc32d00d8b35227d446cd0b6db4332461 Mon Sep 17 00:00:00 2001 From: Anes Hasicic Date: Wed, 26 Jul 2017 17:58:51 +0200 Subject: [PATCH] Paralellized gps.WriteDepTree Removed commented out code Decoupled concurrency code from the WriteDepTree func to workerPool All errors are now being reported and WriteDepTree decides how to act (currently calls removeAll) Broken down WriteDepTree to child functions in order to receive feedback if on the right track Fixed accidental err race condition gps.WriteDepTree now writes vendor/ tree in parallel with configurable number of worker routines --- internal/gps/result.go | 100 ++++++++++++++++++++++++++++++++---- internal/gps/result_test.go | 16 ++++-- 2 files changed, 104 insertions(+), 12 deletions(-) diff --git a/internal/gps/result.go b/internal/gps/result.go index 504b02368b..c655129195 100644 --- a/internal/gps/result.go +++ b/internal/gps/result.go @@ -8,6 +8,7 @@ import ( "fmt" "os" "path/filepath" + "strings" ) // A Solution is returned by a solver run. It is mostly just a Lock, with some @@ -61,20 +62,101 @@ func WriteDepTree(basedir string, l Lock, sm SourceManager, sv bool) error { return err } - // TODO(sdboyer) parallelize - for _, p := range l.Projects() { - to := filepath.FromSlash(filepath.Join(basedir, string(p.Ident().ProjectRoot))) + wp := newWorkerPool(l.Projects(), 2) - err = sm.ExportProject(p.Ident(), p.Version(), to) - if err != nil { - removeAll(basedir) - return fmt.Errorf("error while exporting %s: %s", p.Ident().ProjectRoot, err) + err = wp.writeDepTree( + func(p LockedProject) error { + to := filepath.FromSlash(filepath.Join(basedir, string(p.Ident().ProjectRoot))) + + if err := sm.ExportProject(p.Ident(), p.Version(), to); err != nil { + return err + } + + if sv { + filepath.Walk(to, stripVendor) + } + + // TODO(sdboyer) dump version metadata file + + return nil + }, + ) + + if err != nil { + removeAll(basedir) + return err + } + + return nil +} + +func newWorkerPool(pjs []LockedProject, wc int) *workerPool { + wp := &workerPool{ + workChan: make(chan LockedProject), + doneChan: make(chan done), + projects: pjs, + workerCount: wc, + } + + wp.spawnWorkers() + + return wp +} + +type workerPool struct { + workChan chan LockedProject + doneChan chan done + projects []LockedProject + wpFunc writeProjectFunc + workerCount int +} + +type done struct { + root ProjectRoot + err error +} + +type writeProjectFunc func(p LockedProject) error + +func (wp *workerPool) spawnWorkers() { + for i := 0; i < wp.workerCount; i++ { + go func() { + for p := range wp.workChan { + wp.doneChan <- done{p.Ident().ProjectRoot, wp.wpFunc(p)} + } + }() + } +} + +func (wp *workerPool) writeDepTree(f writeProjectFunc) error { + wp.wpFunc = f + wp.dispatchJobs() + return wp.reportProgress() +} + +func (wp *workerPool) dispatchJobs() { + go func() { + for _, p := range wp.projects { + wp.workChan <- p } - if sv { - filepath.Walk(to, stripVendor) + close(wp.workChan) + }() +} + +func (wp *workerPool) reportProgress() error { + var errs []string + + for i := 0; i < len(wp.projects); i++ { + d := <-wp.doneChan + if d.err != nil { + errs = append(errs, fmt.Sprintf("error while exporting %s: %s", d.root, d.err)) } } + if errs != nil { + return fmt.Errorf(strings.Join(errs, "\n")) + } + return nil } diff --git a/internal/gps/result_test.go b/internal/gps/result_test.go index cc96a8e2cb..b38df02643 100644 --- a/internal/gps/result_test.go +++ b/internal/gps/result_test.go @@ -112,10 +112,20 @@ func testWriteDepTree(t *testing.T) { } func BenchmarkCreateVendorTree(b *testing.B) { - // We're fs-bound here, so restrict to single parallelism - b.SetParallelism(1) - r := basicResult + + r.p = append( + r.p, + pa2lp(atom{ + id: pi("launchpad.net/govcstestbzrrepo"), + v: NewVersion("1.0.0").Pair(Revision("matt@mattfarina.com-20150731135137-pbphasfppmygpl68")), + }, nil), + pa2lp(atom{ + id: pi("bitbucket.org/sdboyer/withbm"), + v: NewVersion("v1.0.0").Pair(Revision("aa110802a0c64195d0a6c375c9f66668827c90b4")), + }, nil), + ) + tmp := path.Join(os.TempDir(), "vsolvtest") clean := true