Skip to content

Commit

Permalink
fix(api/v1): Avoid another concurrent map writes...
Browse files Browse the repository at this point in the history
  • Loading branch information
ivanilves committed May 10, 2018
1 parent e800a66 commit 23a19c5
Showing 1 changed file with 24 additions and 10 deletions.
34 changes: 24 additions & 10 deletions api/v1/v1.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,11 @@ func getBatchedSlices(batchSize int, unbatched ...string) [][]string {
return batchedSlices
}

type rtags struct {
ref string
tags []*tag.Tag
}

// CollectTags collects information on tags present in remote registry and [local] Docker daemon,
// makes required comparisons between them and spits organized info back as collection.Collection
func (api *API) CollectTags(refs ...string) (*collection.Collection, error) {
Expand All @@ -115,13 +120,7 @@ func (api *API) CollectTags(refs ...string) (*collection.Collection, error) {
return nil, err
}

type rtags struct {
ref string
tags []*tag.Tag
}

tagc := make(chan rtags, len(refs))

tags := make(map[string][]*tag.Tag)

batchedSlicesOfRefs := getBatchedSlices(api.config.ConcurrentRequests, refs...)
Expand Down Expand Up @@ -224,6 +223,7 @@ func (api *API) CollectPushTags(cn *collection.Collection, push PushConfig) (*co

refs := make([]string, len(cn.Refs()))
done := make(chan error, len(cn.Refs()))
tagc := make(chan rtags, len(refs))
tags := make(map[string][]*tag.Tag)

for i, repo := range cn.Repos() {
Expand Down Expand Up @@ -276,10 +276,9 @@ func (api *API) CollectPushTags(cn *collection.Collection, push PushConfig) (*co
tagsToPush = append(tagsToPush, tg)
}
}
log.Debugf("%s tags to push: %+v", fn(repo.Ref()), tagsToPush)

tags[repo.Ref()] = tagsToPush
log.Debugf("%s sending 'push' tags: %+v", fn(repo.Ref()), tagsToPush)

tagc <- rtags{ref: repo.Ref(), tags: tagsToPush}
done <- nil

return
Expand All @@ -291,6 +290,21 @@ func (api *API) CollectPushTags(cn *collection.Collection, push PushConfig) (*co
if err := wait.Until(done); err != nil {
return nil, err
}

step := 1
size := cap(tagc)
for t := range tagc {
log.Debugf("[%s] receiving 'push' tags: %+v", t.ref, t.tags)

tags[t.ref] = t.tags

if step >= size {
close(tagc)
}

step++
}

log.Debugf("%s 'push' tags: %+v", fn(), tags)

return collection.New(refs, tags)
Expand Down Expand Up @@ -418,7 +432,7 @@ func New(config Config) (*API, error) {
log.Debugf("%s API config: %+v", fn(), config)

if config.ConcurrentRequests == 0 {
config.ConcurrentRequests = 1
config.ConcurrentRequests = 8
}
remote.ConcurrentRequests = config.ConcurrentRequests
remote.WaitBetween = config.WaitBetween
Expand Down

0 comments on commit 23a19c5

Please sign in to comment.