From 87bc95fd2092eff253e0b04292118921828768eb Mon Sep 17 00:00:00 2001 From: Anes Hasicic Date: Wed, 26 Jul 2017 17:58:51 +0200 Subject: [PATCH] Paralellized gps.WriteDepTree Decoupled concurrency code from the WriteDepTree func to workerPool All errors are now being reported and WriteDepTree decides how to act (currently calls removeAll) Broken down WriteDepTree to child functions in order to receive feedback if on the right track Fixed accidental err race condition gps.WriteDepTree now writes vendor/ tree in parallel with configurable number of worker routines --- internal/gps/result.go | 160 +++++++++++++++++++++++++++++++++--- internal/gps/result_test.go | 16 +++- 2 files changed, 163 insertions(+), 13 deletions(-) diff --git a/internal/gps/result.go b/internal/gps/result.go index eb9d6ad0e1..04bbe97944 100644 --- a/internal/gps/result.go +++ b/internal/gps/result.go @@ -8,6 +8,7 @@ import ( "fmt" "os" "path/filepath" + "strings" ) // A Solution is returned by a solver run. It is mostly just a Lock, with some @@ -61,24 +62,163 @@ func WriteDepTree(basedir string, l Lock, sm SourceManager, sv bool) error { return err } - // TODO(sdboyer) parallelize - for _, p := range l.Projects() { - to := filepath.FromSlash(filepath.Join(basedir, string(p.Ident().ProjectRoot))) + wp := newWorkerPool(l.Projects(), 2) - err = sm.ExportProject(p.Ident(), p.Version(), to) - if err != nil { - removeAll(basedir) - return fmt.Errorf("error while exporting %s: %s", p.Ident().ProjectRoot, err) + err = wp.writeDepTree( + func(p LockedProject) error { + to := filepath.FromSlash(filepath.Join(basedir, string(p.Ident().ProjectRoot))) + + if err := sm.ExportProject(p.Ident(), p.Version(), to); err != nil { + return err + } + + if sv { + filepath.Walk(to, stripVendor) + } + + // TODO(sdboyer) dump version metadata file + + return nil + }, + ) + + if err != nil { + removeAll(basedir) + return err + } + + return nil +} + +func newWorkerPool(pjs []LockedProject, wc int) *workerPool { + wp := &workerPool{ + workChan: make(chan LockedProject), + doneChan: make(chan done), + projects: pjs, + workerCount: wc, + } + + wp.spawnWorkers() + + return wp +} + +type workerPool struct { + workChan chan LockedProject + doneChan chan done + projects []LockedProject + wpFunc writeProjectFunc + workerCount int +} + +type done struct { + root ProjectRoot + err error +} + +type writeProjectFunc func(p LockedProject) error + +func (wp *workerPool) spawnWorkers() { + for i := 0; i < wp.workerCount; i++ { + go func() { + for p := range wp.workChan { + wp.doneChan <- done{p.Ident().ProjectRoot, wp.wpFunc(p)} + } + }() + } +} + +func (wp *workerPool) writeDepTree(f writeProjectFunc) error { + wp.wpFunc = f + wp.dispatchJobs() + return wp.reportProgress() +} + +func (wp *workerPool) dispatchJobs() { + go func() { + for _, p := range wp.projects { + wp.workChan <- p } - if sv { - filepath.Walk(to, stripVendor) + close(wp.workChan) + }() +} + +func (wp *workerPool) reportProgress() error { + var errs []string + + for i := 0; i < len(wp.projects); i++ { + d := <-wp.doneChan + if d.err != nil { + errs = append(errs, fmt.Sprintf("error while exporting %s: %s", d.root, d.err)) } - // TODO(sdboyer) dump version metadata file + } + + if errs != nil { + return fmt.Errorf(strings.Join(errs, "\n")) } return nil } +/* +func (a workerPool) writeTrees(f func(p LockedProject) error) error { + dc := a.writeProjects(f) + return a.reportProgress(dc) +} + +func (a workerPool) writeProjects(f func(p LockedProject) error) chan done { + dc := make(chan done) + wc := make(chan LockedProject) + + for i := 0; i < a.numWorkers; i++ { + go func() { + for p := range wc { + dc <- done{p.Ident().ProjectRoot, f(p)} + } + // TODO(sdboyer) dump version metadata file + }() + } + + go func() { + for _, p := range a.projects { + wc <- p + } + close(wc) + }() + + return dc +} + +func (a workerPool) reportProgress(dc chan done) error { + // doneChan <- fmt.Errorf("error while exporting %s: %s", p.Ident().ProjectRoot, err) + for i := 0; i < len(a.projects); i++ { + d := <-dc + if d.err != nil { + return d.err + } + } + + return nil +} + +*/ +/* +root := p.Ident().ProjectRoot + to := filepath.FromSlash(filepath.Join(dj.basedir, string(p.Ident().ProjectRoot))) + + err := dj.sm.ExportProject(p.Ident(), p.Version(), to) + if err != nil { + dc <- done{root, err} + continue + } + + if sv { + filepath.Walk(to, stripVendor) + } + + dc <- done{root, nil} +*/ + func (r solution) Projects() []LockedProject { return r.p } diff --git a/internal/gps/result_test.go b/internal/gps/result_test.go index cc96a8e2cb..b38df02643 100644 --- a/internal/gps/result_test.go +++ b/internal/gps/result_test.go @@ -112,10 +112,20 @@ func testWriteDepTree(t *testing.T) { } func BenchmarkCreateVendorTree(b *testing.B) { - // We're fs-bound here, so restrict to single parallelism - b.SetParallelism(1) - r := basicResult + + r.p = append( + r.p, + pa2lp(atom{ + id: pi("launchpad.net/govcstestbzrrepo"), + v: NewVersion("1.0.0").Pair(Revision("matt@mattfarina.com-20150731135137-pbphasfppmygpl68")), + }, nil), + pa2lp(atom{ + id: pi("bitbucket.org/sdboyer/withbm"), + v: NewVersion("v1.0.0").Pair(Revision("aa110802a0c64195d0a6c375c9f66668827c90b4")), + }, nil), + ) + tmp := path.Join(os.TempDir(), "vsolvtest") clean := true