Skip to content
This repository has been archived by the owner on Sep 9, 2020. It is now read-only.

Commit

Permalink
Paralellized gps.WriteDepTree
Browse files Browse the repository at this point in the history
Decoupled concurrency code from the WriteDepTree func to workerPool
All errors are now being reported and WriteDepTree decides how to act
(currently calls removeAll)

Broken down WriteDepTree to child functions in order
to receive feedback if on the right track

Fixed accidental err race condition

gps.WriteDepTree now writes vendor/ tree in parallel with
configurable number of worker routines
  • Loading branch information
aneshas committed Jul 27, 2017
1 parent 7d36525 commit 87bc95f
Show file tree
Hide file tree
Showing 2 changed files with 163 additions and 13 deletions.
160 changes: 150 additions & 10 deletions internal/gps/result.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"fmt"
"os"
"path/filepath"
"strings"
)

// A Solution is returned by a solver run. It is mostly just a Lock, with some
Expand Down Expand Up @@ -61,24 +62,163 @@ func WriteDepTree(basedir string, l Lock, sm SourceManager, sv bool) error {
return err
}

// TODO(sdboyer) parallelize
for _, p := range l.Projects() {
to := filepath.FromSlash(filepath.Join(basedir, string(p.Ident().ProjectRoot)))
wp := newWorkerPool(l.Projects(), 2)

err = sm.ExportProject(p.Ident(), p.Version(), to)
if err != nil {
removeAll(basedir)
return fmt.Errorf("error while exporting %s: %s", p.Ident().ProjectRoot, err)
err = wp.writeDepTree(
func(p LockedProject) error {
to := filepath.FromSlash(filepath.Join(basedir, string(p.Ident().ProjectRoot)))

if err := sm.ExportProject(p.Ident(), p.Version(), to); err != nil {
return err
}

if sv {
filepath.Walk(to, stripVendor)
}

// TODO(sdboyer) dump version metadata file

return nil
},
)

if err != nil {
removeAll(basedir)
return err
}

return nil
}

func newWorkerPool(pjs []LockedProject, wc int) *workerPool {
wp := &workerPool{
workChan: make(chan LockedProject),
doneChan: make(chan done),
projects: pjs,
workerCount: wc,
}

wp.spawnWorkers()

return wp
}

type workerPool struct {
workChan chan LockedProject
doneChan chan done
projects []LockedProject
wpFunc writeProjectFunc
workerCount int
}

type done struct {
root ProjectRoot
err error
}

type writeProjectFunc func(p LockedProject) error

func (wp *workerPool) spawnWorkers() {
for i := 0; i < wp.workerCount; i++ {
go func() {
for p := range wp.workChan {
wp.doneChan <- done{p.Ident().ProjectRoot, wp.wpFunc(p)}
}
}()
}
}

func (wp *workerPool) writeDepTree(f writeProjectFunc) error {
wp.wpFunc = f
wp.dispatchJobs()
return wp.reportProgress()
}

func (wp *workerPool) dispatchJobs() {
go func() {
for _, p := range wp.projects {
wp.workChan <- p
}
if sv {
filepath.Walk(to, stripVendor)
close(wp.workChan)
}()
}

func (wp *workerPool) reportProgress() error {
var errs []string

for i := 0; i < len(wp.projects); i++ {
d := <-wp.doneChan
if d.err != nil {
errs = append(errs, fmt.Sprintf("error while exporting %s: %s", d.root, d.err))
}
// TODO(sdboyer) dump version metadata file
}

if errs != nil {
return fmt.Errorf(strings.Join(errs, "\n"))
}

return nil
}

/*
func (a workerPool) writeTrees(f func(p LockedProject) error) error {
dc := a.writeProjects(f)
return a.reportProgress(dc)
}
func (a workerPool) writeProjects(f func(p LockedProject) error) chan done {
dc := make(chan done)
wc := make(chan LockedProject)
for i := 0; i < a.numWorkers; i++ {
go func() {
for p := range wc {
dc <- done{p.Ident().ProjectRoot, f(p)}
}
// TODO(sdboyer) dump version metadata file
}()
}
go func() {
for _, p := range a.projects {
wc <- p
}
close(wc)
}()
return dc
}
func (a workerPool) reportProgress(dc chan done) error {
// doneChan <- fmt.Errorf("error while exporting %s: %s", p.Ident().ProjectRoot, err)
for i := 0; i < len(a.projects); i++ {
d := <-dc
if d.err != nil {
return d.err
}
}
return nil
}
*/
/*
root := p.Ident().ProjectRoot
to := filepath.FromSlash(filepath.Join(dj.basedir, string(p.Ident().ProjectRoot)))
err := dj.sm.ExportProject(p.Ident(), p.Version(), to)
if err != nil {
dc <- done{root, err}
continue
}
if sv {
filepath.Walk(to, stripVendor)
}
dc <- done{root, nil}
*/

func (r solution) Projects() []LockedProject {
return r.p
}
Expand Down
16 changes: 13 additions & 3 deletions internal/gps/result_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,10 +112,20 @@ func testWriteDepTree(t *testing.T) {
}

func BenchmarkCreateVendorTree(b *testing.B) {
// We're fs-bound here, so restrict to single parallelism
b.SetParallelism(1)

r := basicResult

r.p = append(
r.p,
pa2lp(atom{
id: pi("launchpad.net/govcstestbzrrepo"),
v: NewVersion("1.0.0").Pair(Revision("matt@mattfarina.com-20150731135137-pbphasfppmygpl68")),
}, nil),
pa2lp(atom{
id: pi("bitbucket.org/sdboyer/withbm"),
v: NewVersion("v1.0.0").Pair(Revision("aa110802a0c64195d0a6c375c9f66668827c90b4")),
}, nil),
)

tmp := path.Join(os.TempDir(), "vsolvtest")

clean := true
Expand Down

0 comments on commit 87bc95f

Please sign in to comment.