Skip to content
This repository has been archived by the owner on Sep 9, 2020. It is now read-only.

[WIP] Parallelised gps.WriteDepTree #903

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
100 changes: 90 additions & 10 deletions internal/gps/result.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"fmt"
"os"
"path/filepath"
"strings"
)

// A Solution is returned by a solver run. It is mostly just a Lock, with some
Expand Down Expand Up @@ -61,20 +62,99 @@ func WriteDepTree(basedir string, l Lock, sm SourceManager, sv bool) error {
return err
}

// TODO(sdboyer) parallelize
for _, p := range l.Projects() {
to := filepath.FromSlash(filepath.Join(basedir, string(p.Ident().ProjectRoot)))
wp := newWorkerPool(l.Projects(), 2)

err = sm.ExportProject(p.Ident(), p.Version(), to)
if err != nil {
removeAll(basedir)
return fmt.Errorf("error while exporting %s: %s", p.Ident().ProjectRoot, err)
}
if sv {
filepath.Walk(to, stripVendor)
err = wp.writeDepTree(
func(p LockedProject) error {
to := filepath.FromSlash(filepath.Join(basedir, string(p.Ident().ProjectRoot)))

if err := sm.ExportProject(p.Ident(), p.Version(), to); err != nil {
return err
}

if sv {
filepath.Walk(to, stripVendor)
}

// TODO(sdboyer) dump version metadata file

return nil
},
)

if err != nil {
removeAll(basedir)
return err
}

return nil
}

func newWorkerPool(pjs []LockedProject, wc int) *workerPool {
wp := &workerPool{
workChan: make(chan LockedProject),
doneChan: make(chan done),
projects: pjs,
workerCount: wc,
}

wp.spawnWorkers()

return wp
}

type workerPool struct {
workChan chan LockedProject
doneChan chan done
projects []LockedProject
wpFunc writeProjectFunc
workerCount int
}

type done struct {
root ProjectRoot
err error
}

type writeProjectFunc func(p LockedProject) error

func (wp *workerPool) spawnWorkers() {
for i := 0; i < wp.workerCount; i++ {
go func() {
for p := range wp.workChan {
wp.doneChan <- done{p.Ident().ProjectRoot, wp.wpFunc(p)}
}
}()
}
}

func (wp *workerPool) writeDepTree(f writeProjectFunc) error {
wp.wpFunc = f
go wp.dispatchJobs()
return wp.reportProgress()
}

func (wp *workerPool) dispatchJobs() {
defer close(wp.workChan)
for _, p := range wp.projects {
wp.workChan <- p
}
}

func (wp *workerPool) reportProgress() error {
var errs []string

for i := 0; i < len(wp.projects); i++ {
d := <-wp.doneChan
if d.err != nil {
errs = append(errs, fmt.Sprintf("error while exporting %s: %s", d.root, d.err))
}
}

if errs != nil {
return fmt.Errorf(strings.Join(errs, "\n"))
}

return nil
}

Expand Down
16 changes: 13 additions & 3 deletions internal/gps/result_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,10 +112,20 @@ func testWriteDepTree(t *testing.T) {
}

func BenchmarkCreateVendorTree(b *testing.B) {
// We're fs-bound here, so restrict to single parallelism
b.SetParallelism(1)

r := basicResult

r.p = append(
r.p,
pa2lp(atom{
id: pi("launchpad.net/govcstestbzrrepo"),
v: NewVersion("1.0.0").Pair(Revision("matt@mattfarina.com-20150731135137-pbphasfppmygpl68")),
}, nil),
pa2lp(atom{
id: pi("bitbucket.org/sdboyer/withbm"),
v: NewVersion("v1.0.0").Pair(Revision("aa110802a0c64195d0a6c375c9f66668827c90b4")),
}, nil),
)

tmp := path.Join(os.TempDir(), "vsolvtest")

clean := true
Expand Down