From e800a66c67d03fbf747b823720cd43186c4dcfc8 Mon Sep 17 00:00:00 2001 From: Ivan Ilves Date: Wed, 9 May 2018 18:52:04 +0200 Subject: [PATCH 1/4] feat(api/v1): Add optional delay between request batches --- api/v1/v1.go | 11 +++++++++++ main.go | 2 ++ tag/remote/remote.go | 5 +++++ 3 files changed, 18 insertions(+) diff --git a/api/v1/v1.go b/api/v1/v1.go index f2c4ed7..f7d9f72 100644 --- a/api/v1/v1.go +++ b/api/v1/v1.go @@ -27,6 +27,8 @@ type Config struct { DockerJSONConfigFile string // ConcurrentRequests defines how much requests to registry we could run in parallel ConcurrentRequests int + // WaitBetween defines how much we will wait between batches of requests (incl. pull and push) + WaitBetween time.Duration // TraceRequests sets if we will print out registry HTTP request traces TraceRequests bool // RetryRequests defines how much retries we will do to the failed HTTP request @@ -172,6 +174,8 @@ func (api *API) CollectTags(refs ...string) (*collection.Collection, error) { if err := wait.Until(done); err != nil { return nil, err } + + time.Sleep(api.config.WaitBetween) } step := 1 @@ -280,6 +284,8 @@ func (api *API) CollectPushTags(cn *collection.Collection, push PushConfig) (*co return }(repo, i, done) + + time.Sleep(api.config.WaitBetween) } if err := wait.Until(done); err != nil { @@ -331,6 +337,8 @@ func (api *API) PullTags(cn *collection.Collection) error { done <- nil } }(repo, tags, done) + + time.Sleep(api.config.WaitBetween) } return wait.WithTolerance(done) @@ -388,6 +396,8 @@ func (api *API) PushTags(cn *collection.Collection, push PushConfig) error { done <- err } }(repo, tags, done) + + time.Sleep(api.config.WaitBetween) } return wait.WithTolerance(done) @@ -411,6 +421,7 @@ func New(config Config) (*API, error) { config.ConcurrentRequests = 1 } remote.ConcurrentRequests = config.ConcurrentRequests + remote.WaitBetween = config.WaitBetween remote.TraceRequests = config.TraceRequests remote.RetryRequests = config.RetryRequests remote.RetryDelay = config.RetryDelay diff --git a/main.go b/main.go index 8c0a0d5..f8bfd1f 100644 --- a/main.go +++ b/main.go @@ -22,6 +22,7 @@ type Options struct { PushPrefix string `short:"R" long:"push-prefix" description:"[Re]Push pulled images with a specified repo path prefix" env:"PUSH_PREFIX"` PushUpdate bool `short:"U" long:"push-update" description:"Update our pushed images if remote image digest changes" env:"PUSH_UPDATE"` ConcurrentRequests int `short:"c" long:"concurrent-requests" default:"32" description:"Limit of concurrent requests to the registry" env:"CONCURRENT_REQUESTS"` + WaitBetween time.Duration `short:"w" long:"wait-between" default:"0" description:"Time to wait between batches of requests (incl. pulls and pushes)" env:"WAIT_BETWEEN"` RetryRequests int `short:"y" long:"retry-requests" default:"2" description:"Number of retries for failed Docker registry requests" env:"RETRY_REQUESTS"` RetryDelay time.Duration `short:"D" long:"retry-delay" default:"30s" description:"Delay between retries of failed registry requests" env:"RETRY_DELAY"` InsecureRegistryEx string `short:"I" long:"insecure-registry-ex" description:"Expression to match insecure registry hostnames" env:"INSECURE_REGISTRY_EX"` @@ -99,6 +100,7 @@ func main() { apiConfig := v1.Config{ DockerJSONConfigFile: o.DockerJSON, ConcurrentRequests: o.ConcurrentRequests, + WaitBetween: o.WaitBetween, TraceRequests: o.TraceRequests, RetryRequests: o.RetryRequests, RetryDelay: o.RetryDelay, diff --git a/tag/remote/remote.go b/tag/remote/remote.go index d40eaec..a439549 100644 --- a/tag/remote/remote.go +++ b/tag/remote/remote.go @@ -22,6 +22,9 @@ import ( // ConcurrentRequests defines maximum number of concurrent requests we could maintain against the registry var ConcurrentRequests = 32 +// WaitBetween defines how much we will wait between batches of requests +var WaitBetween time.Duration + // RetryRequests is a number of retries we do in case of request failure var RetryRequests = 0 @@ -372,6 +375,8 @@ func FetchTags(repo *repository.Repository, username, password string) (map[stri }(repo, tagNames[tagIndex], authorization, ch) tagIndex++ + + time.Sleep(WaitBetween) } for s := 1; s <= stepSize; s++ { From 23a19c5beb23b5b984bf740dc4522e759c0e16b6 Mon Sep 17 00:00:00 2001 From: Ivan Ilves Date: Thu, 10 May 2018 08:02:45 +0200 Subject: [PATCH 2/4] fix(api/v1): Avoid another concurrent map writes... --- api/v1/v1.go | 34 ++++++++++++++++++++++++---------- 1 file changed, 24 insertions(+), 10 deletions(-) diff --git a/api/v1/v1.go b/api/v1/v1.go index f7d9f72..caab72c 100644 --- a/api/v1/v1.go +++ b/api/v1/v1.go @@ -103,6 +103,11 @@ func getBatchedSlices(batchSize int, unbatched ...string) [][]string { return batchedSlices } +type rtags struct { + ref string + tags []*tag.Tag +} + // CollectTags collects information on tags present in remote registry and [local] Docker daemon, // makes required comparisons between them and spits organized info back as collection.Collection func (api *API) CollectTags(refs ...string) (*collection.Collection, error) { @@ -115,13 +120,7 @@ func (api *API) CollectTags(refs ...string) (*collection.Collection, error) { return nil, err } - type rtags struct { - ref string - tags []*tag.Tag - } - tagc := make(chan rtags, len(refs)) - tags := make(map[string][]*tag.Tag) batchedSlicesOfRefs := getBatchedSlices(api.config.ConcurrentRequests, refs...) @@ -224,6 +223,7 @@ func (api *API) CollectPushTags(cn *collection.Collection, push PushConfig) (*co refs := make([]string, len(cn.Refs())) done := make(chan error, len(cn.Refs())) + tagc := make(chan rtags, len(refs)) tags := make(map[string][]*tag.Tag) for i, repo := range cn.Repos() { @@ -276,10 +276,9 @@ func (api *API) CollectPushTags(cn *collection.Collection, push PushConfig) (*co tagsToPush = append(tagsToPush, tg) } } - log.Debugf("%s tags to push: %+v", fn(repo.Ref()), tagsToPush) - - tags[repo.Ref()] = tagsToPush + log.Debugf("%s sending 'push' tags: %+v", fn(repo.Ref()), tagsToPush) + tagc <- rtags{ref: repo.Ref(), tags: tagsToPush} done <- nil return @@ -291,6 +290,21 @@ func (api *API) CollectPushTags(cn *collection.Collection, push PushConfig) (*co if err := wait.Until(done); err != nil { return nil, err } + + step := 1 + size := cap(tagc) + for t := range tagc { + log.Debugf("[%s] receiving 'push' tags: %+v", t.ref, t.tags) + + tags[t.ref] = t.tags + + if step >= size { + close(tagc) + } + + step++ + } + log.Debugf("%s 'push' tags: %+v", fn(), tags) return collection.New(refs, tags) @@ -418,7 +432,7 @@ func New(config Config) (*API, error) { log.Debugf("%s API config: %+v", fn(), config) if config.ConcurrentRequests == 0 { - config.ConcurrentRequests = 1 + config.ConcurrentRequests = 8 } remote.ConcurrentRequests = config.ConcurrentRequests remote.WaitBetween = config.WaitBetween From ae5d00a4ea009aa402128c15607f0ccd3fd7e138 Mon Sep 17 00:00:00 2001 From: Ivan Ilves Date: Thu, 10 May 2018 09:04:42 +0200 Subject: [PATCH 3/4] chore(api/v1): Move registry package. Again ;) --- .../registry/container/container.go} | 6 +++--- .../registry/container/container_test.go} | 14 +++++++------- api/v1/v1_test.go | 4 ++-- 3 files changed, 12 insertions(+), 12 deletions(-) rename api/{internal/registry/registry.go => v1/registry/container/container.go} (96%) rename api/{internal/registry/registry_test.go => v1/registry/container/container_test.go} (94%) diff --git a/api/internal/registry/registry.go b/api/v1/registry/container/container.go similarity index 96% rename from api/internal/registry/registry.go rename to api/v1/registry/container/container.go index 13c4083..1f6aa77 100644 --- a/api/internal/registry/registry.go +++ b/api/v1/registry/container/container.go @@ -1,4 +1,4 @@ -package registry +package container import ( "bufio" @@ -78,8 +78,8 @@ func verify(hostname string) error { return err } -// LaunchContainer launches a Docker container with Docker registry inside -func LaunchContainer() (*Container, error) { +// Launch launches a Docker container with Docker registry inside +func Launch() (*Container, error) { dockerClient, _ := getDockerClient() hostPort := getRandomPort() diff --git a/api/internal/registry/registry_test.go b/api/v1/registry/container/container_test.go similarity index 94% rename from api/internal/registry/registry_test.go rename to api/v1/registry/container/container_test.go index cbcc932..c9e4759 100644 --- a/api/internal/registry/registry_test.go +++ b/api/v1/registry/container/container_test.go @@ -1,4 +1,4 @@ -package registry +package container import ( "testing" @@ -64,7 +64,7 @@ func TestRunGuaranteedFailure(t *testing.T) { } func testVerify(t *testing.T) { - c, _ := LaunchContainer() + c, _ := Launch() defer c.Destroy() @@ -81,8 +81,8 @@ func testVerifyGuaranteedFailure(t *testing.T) { } } -func TestLaunchContainerAndThanDestroyIt(t *testing.T) { - c, err := LaunchContainer() +func TestLaunchAndThanDestroyIt(t *testing.T) { + c, err := Launch() if err != nil { t.Fatal(err) } @@ -113,7 +113,7 @@ func TestLaunchManyContainersWithoutNamingCollisions(t *testing.T) { for c := 0; c < createContainers; c++ { go func() { - c, err := LaunchContainer() + c, err := Launch() if err != nil { done <- err return @@ -131,7 +131,7 @@ func TestLaunchManyContainersWithoutNamingCollisions(t *testing.T) { } func TestSeedContainerWithImages(t *testing.T) { - c, err := LaunchContainer() + c, err := Launch() if err != nil { t.Fatal(err) } @@ -175,7 +175,7 @@ func TestSeedContainerWithImages(t *testing.T) { } func TestSeedContainerWithImagesGuaranteedFailure(t *testing.T) { - c, err := LaunchContainer() + c, err := Launch() if err != nil { t.Fatal(err) } diff --git a/api/v1/v1_test.go b/api/v1/v1_test.go index 8b6a646..2f6b572 100644 --- a/api/v1/v1_test.go +++ b/api/v1/v1_test.go @@ -7,7 +7,7 @@ import ( "github.com/stretchr/testify/assert" - "github.com/ivanilves/lstags/api/internal/registry" + registrycontainer "github.com/ivanilves/lstags/api/v1/registry/container" "github.com/ivanilves/lstags/repository" ) @@ -28,7 +28,7 @@ func runEnd2EndJob(pullRefs, seedRefs []string) ([]string, error) { return nil, err } - registryContainer, err := registry.LaunchContainer() + registryContainer, err := registrycontainer.Launch() if err != nil { return nil, err } From 85a24851c142d559eda126560c8fd9dd610b7725 Mon Sep 17 00:00:00 2001 From: Ivan Ilves Date: Thu, 10 May 2018 18:47:00 +0200 Subject: [PATCH 4/4] refactor(api/v1): DRY the way we avoid concurrent map writes --- api/v1/v1.go | 57 ++++++++++++++++++++++++---------------------------- 1 file changed, 26 insertions(+), 31 deletions(-) diff --git a/api/v1/v1.go b/api/v1/v1.go index caab72c..4a91323 100644 --- a/api/v1/v1.go +++ b/api/v1/v1.go @@ -58,6 +58,12 @@ type API struct { dockerClient *dockerclient.DockerClient } +// rtags is a structure to send collection of referenced tags using chan +type rtags struct { + ref string + tags []*tag.Tag +} + // fn gives the name of the calling function (e.g. enriches log.Debugf() output) // + optionally attaches free form string labels (mainly to identify goroutines) func fn(labels ...string) string { @@ -103,9 +109,24 @@ func getBatchedSlices(batchSize int, unbatched ...string) [][]string { return batchedSlices } -type rtags struct { - ref string - tags []*tag.Tag +func receiveTags(tagc chan rtags) map[string][]*tag.Tag { + tags := make(map[string][]*tag.Tag) + + step := 1 + size := cap(tagc) + for t := range tagc { + log.Debugf("[%s] receiving tags: %+v", t.ref, t.tags) + + tags[t.ref] = t.tags + + if step >= size { + close(tagc) + } + + step++ + } + + return tags } // CollectTags collects information on tags present in remote registry and [local] Docker daemon, @@ -121,7 +142,6 @@ func (api *API) CollectTags(refs ...string) (*collection.Collection, error) { } tagc := make(chan rtags, len(refs)) - tags := make(map[string][]*tag.Tag) batchedSlicesOfRefs := getBatchedSlices(api.config.ConcurrentRequests, refs...) @@ -177,19 +197,7 @@ func (api *API) CollectTags(refs ...string) (*collection.Collection, error) { time.Sleep(api.config.WaitBetween) } - step := 1 - size := cap(tagc) - for t := range tagc { - log.Debugf("[%s] receiving tags: %+v", t.ref, t.tags) - - tags[t.ref] = t.tags - - if step >= size { - close(tagc) - } - - step++ - } + tags := receiveTags(tagc) log.Debugf("%s tags: %+v", fn(), tags) @@ -224,7 +232,6 @@ func (api *API) CollectPushTags(cn *collection.Collection, push PushConfig) (*co refs := make([]string, len(cn.Refs())) done := make(chan error, len(cn.Refs())) tagc := make(chan rtags, len(refs)) - tags := make(map[string][]*tag.Tag) for i, repo := range cn.Repos() { go func(repo *repository.Repository, i int, done chan error) { @@ -291,19 +298,7 @@ func (api *API) CollectPushTags(cn *collection.Collection, push PushConfig) (*co return nil, err } - step := 1 - size := cap(tagc) - for t := range tagc { - log.Debugf("[%s] receiving 'push' tags: %+v", t.ref, t.tags) - - tags[t.ref] = t.tags - - if step >= size { - close(tagc) - } - - step++ - } + tags := receiveTags(tagc) log.Debugf("%s 'push' tags: %+v", fn(), tags)