Skip to content
This repository has been archived by the owner on Sep 9, 2020. It is now read-only.

Commit

Permalink
Adding sync.Errgroup concurrency
Browse files Browse the repository at this point in the history
  • Loading branch information
xmattstrongx committed Oct 9, 2017
1 parent 80d4e7a commit f18d789
Showing 1 changed file with 32 additions and 21 deletions.
53 changes: 32 additions & 21 deletions cmd/dep/root_analyzer.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,22 @@
package main

import (
"context"
"io/ioutil"
"log"
"strings"

"golang.org/x/sync/errgroup"

"github.com/golang/dep"
fb "github.com/golang/dep/internal/feedback"
"github.com/golang/dep/internal/gps"
"github.com/golang/dep/internal/gps/paths"
"github.com/golang/dep/internal/importers"
)

const concurrency = 4

// rootAnalyzer supplies manifest/lock data from both dep and external tool's
// configuration files.
// * When used on the root project, it imports only from external tools.
Expand Down Expand Up @@ -47,7 +52,9 @@ func (a *rootAnalyzer) InitializeRootManifestAndLock(dir string, pr gps.ProjectR

if rootM == nil {
rootM = dep.NewManifest()
a.cacheDeps(pr)
if err := a.cacheDeps(pr); err != nil {
return nil, nil, err
}
}
if rootL == nil {
rootL = &dep.Lock{}
Expand All @@ -56,18 +63,19 @@ func (a *rootAnalyzer) InitializeRootManifestAndLock(dir string, pr gps.ProjectR
return
}

func (a *rootAnalyzer) cacheDeps(pr gps.ProjectRoot) {
packages := make(map[string]bool)
dependencies := make(map[gps.ProjectRoot][]string)
func (a *rootAnalyzer) cacheDeps(pr gps.ProjectRoot) error {
deps := make(map[gps.ProjectRoot]bool)
logger := a.ctx.Err
concurrency := 4
sem := make(chan bool, concurrency)
g, ctx := errgroup.WithContext(context.TODO())
sem := make(chan struct{}, concurrency)

syncDep := func(pr gps.ProjectRoot, sm gps.SourceManager) {
syncDep := func(pr gps.ProjectRoot, sm gps.SourceManager) error {
if err := sm.SyncSourceFor(gps.ProjectIdentifier{ProjectRoot: pr}); err != nil {
logger.Printf("Unable to cache %s - %s", pr, err)
return err
}
logger.Printf("Cached %s", pr)
return nil
}

for ip := range a.directDeps {
Expand All @@ -81,27 +89,30 @@ func (a *rootAnalyzer) cacheDeps(pr gps.ProjectRoot) {

pr, err := a.sm.DeduceProjectRoot(ip)
if err != nil {
logger.Fatalf("DeduceProjectRoot err: %s", err)
return err
}

packages[ip] = true
if _, ok := dependencies[pr]; ok {
if !contains(dependencies[pr], ip) {
dependencies[pr] = append(dependencies[pr], ip)
}
if _, ok := deps[pr]; ok {
continue
}

sem <- true
go func() {
defer func() { <-sem }()
go syncDep(pr, a.sm)
}()
dependencies[pr] = []string{ip}
g.Go(func() error {
select {
case sem <- struct{}{}:
defer func() { <-sem }()
case <-ctx.Done():
return ctx.Err()
}
err := syncDep(pr, a.sm)
return err
})

deps[pr] = true
}
for i := 0; i < cap(sem); i++ {
sem <- true
if err := g.Wait(); err == nil {
logger.Printf("Successfully cached all deps.")
}
return nil
}

func hasImportPathPrefix(s, prefix string) bool {
Expand Down

0 comments on commit f18d789

Please sign in to comment.