Skip to content
This repository has been archived by the owner on Sep 9, 2020. It is now read-only.

Commit

Permalink
Paralellized gps.WriteDepTree
Browse files Browse the repository at this point in the history
Removed commented out code

Decoupled concurrency code from the WriteDepTree func to workerPool
All errors are now being reported and WriteDepTree decides how to act
(currently calls removeAll)

Broken down WriteDepTree to child functions in order
to receive feedback if on the right track

Fixed accidental err race condition

gps.WriteDepTree now writes vendor/ tree in parallel with
configurable number of worker routines
  • Loading branch information
aneshas committed Jul 27, 2017
1 parent d558b52 commit 74dedc7
Show file tree
Hide file tree
Showing 2 changed files with 104 additions and 12 deletions.
100 changes: 91 additions & 9 deletions internal/gps/result.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"fmt"
"os"
"path/filepath"
"strings"
)

// A Solution is returned by a solver run. It is mostly just a Lock, with some
Expand Down Expand Up @@ -61,20 +62,101 @@ func WriteDepTree(basedir string, l Lock, sm SourceManager, sv bool) error {
return err
}

// TODO(sdboyer) parallelize
for _, p := range l.Projects() {
to := filepath.FromSlash(filepath.Join(basedir, string(p.Ident().ProjectRoot)))
wp := newWorkerPool(l.Projects(), 2)

err = sm.ExportProject(p.Ident(), p.Version(), to)
if err != nil {
removeAll(basedir)
return fmt.Errorf("error while exporting %s: %s", p.Ident().ProjectRoot, err)
err = wp.writeDepTree(
func(p LockedProject) error {
to := filepath.FromSlash(filepath.Join(basedir, string(p.Ident().ProjectRoot)))

if err := sm.ExportProject(p.Ident(), p.Version(), to); err != nil {
return err
}

if sv {
filepath.Walk(to, stripVendor)
}

// TODO(sdboyer) dump version metadata file

return nil
},
)

if err != nil {
removeAll(basedir)
return err
}

return nil
}

func newWorkerPool(pjs []LockedProject, wc int) *workerPool {
wp := &workerPool{
workChan: make(chan LockedProject),
doneChan: make(chan done),
projects: pjs,
workerCount: wc,
}

wp.spawnWorkers()

return wp
}

type workerPool struct {
workChan chan LockedProject
doneChan chan done
projects []LockedProject
wpFunc writeProjectFunc
workerCount int
}

type done struct {
root ProjectRoot
err error
}

type writeProjectFunc func(p LockedProject) error

func (wp *workerPool) spawnWorkers() {
for i := 0; i < wp.workerCount; i++ {
go func() {
for p := range wp.workChan {
wp.doneChan <- done{p.Ident().ProjectRoot, wp.wpFunc(p)}
}
}()
}
}

func (wp *workerPool) writeDepTree(f writeProjectFunc) error {
wp.wpFunc = f
wp.dispatchJobs()
return wp.reportProgress()
}

func (wp *workerPool) dispatchJobs() {
go func() {
for _, p := range wp.projects {
wp.workChan <- p
}
if sv {
filepath.Walk(to, stripVendor)
close(wp.workChan)
}()
}

func (wp *workerPool) reportProgress() error {
var errs []string

for i := 0; i < len(wp.projects); i++ {
d := <-wp.doneChan
if d.err != nil {
errs = append(errs, fmt.Sprintf("error while exporting %s: %s", d.root, d.err))
}
}

if errs != nil {
return fmt.Errorf(strings.Join(errs, "\n"))
}

return nil
}

Expand Down
16 changes: 13 additions & 3 deletions internal/gps/result_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,10 +112,20 @@ func testWriteDepTree(t *testing.T) {
}

func BenchmarkCreateVendorTree(b *testing.B) {
// We're fs-bound here, so restrict to single parallelism
b.SetParallelism(1)

r := basicResult

r.p = append(
r.p,
pa2lp(atom{
id: pi("launchpad.net/govcstestbzrrepo"),
v: NewVersion("1.0.0").Pair(Revision("matt@mattfarina.com-20150731135137-pbphasfppmygpl68")),
}, nil),
pa2lp(atom{
id: pi("bitbucket.org/sdboyer/withbm"),
v: NewVersion("v1.0.0").Pair(Revision("aa110802a0c64195d0a6c375c9f66668827c90b4")),
}, nil),
)

tmp := path.Join(os.TempDir(), "vsolvtest")

clean := true
Expand Down

0 comments on commit 74dedc7

Please sign in to comment.