Skip to content
This repository has been archived by the owner on Nov 1, 2022. It is now read-only.

Add a timeout to all registry requests #1970

Merged
merged 2 commits into from
Apr 24, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 4 additions & 11 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Gopkg.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ required = ["k8s.io/code-generator/cmd/client-gen"]
[[constraint]]
name = "github.com/docker/distribution"
branch = "master"
source = "github.com/2opremio/distribution"

# Pin to master branch until there is a more recent stable release:
# https://github.com/prometheus/client_golang/issues/375
Expand Down
72 changes: 52 additions & 20 deletions registry/cache/repocachemanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,25 +24,35 @@ type imageToUpdate struct {

// repoCacheManager handles cache operations for a container image repository
type repoCacheManager struct {
now time.Time
repoID image.Name
burst int
trace bool
logger log.Logger
cacheClient Client
now time.Time
repoID image.Name
client registry.Client
clientTimeout time.Duration
burst int
trace bool
logger log.Logger
cacheClient Client
sync.Mutex
}

func newRepoCacheManager(now time.Time, repoId image.Name, burst int, trace bool, logger log.Logger,
cacheClient Client) *repoCacheManager {
return &repoCacheManager{
now: now,
repoID: repoId,
burst: burst,
trace: trace,
logger: logger,
cacheClient: cacheClient,
func newRepoCacheManager(now time.Time,
repoID image.Name, clientFactory registry.ClientFactory, creds registry.Credentials, repoClientTimeout time.Duration,
burst int, trace bool, logger log.Logger, cacheClient Client) (*repoCacheManager, error) {
Copy link
Contributor Author

@2opremio 2opremio Apr 24, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think the amount of parameters (which comes from the previous refactoring I made) is reasonable at all.

I tried a few alternatives but they were worse. Any suggestions?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some people (including me occasionally) wrap optional values in a config struct, which is passed by value.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Uhm, only one (or maybe two) are really optional.

client, err := clientFactory.ClientFor(repoID.CanonicalName(), creds)
if err != nil {
return nil, err
}
manager := &repoCacheManager{
now: now,
repoID: repoID,
client: client,
clientTimeout: repoClientTimeout,
burst: burst,
trace: trace,
logger: logger,
cacheClient: cacheClient,
}
return manager, nil
}

// fetchRepository fetches the repository from the cache
Expand All @@ -59,6 +69,17 @@ func (c *repoCacheManager) fetchRepository() (ImageRepository, error) {
return result, nil
}

// getTags gets the tags from the repository
func (c *repoCacheManager) getTags(ctx context.Context) ([]string, error) {
ctx, cancel := context.WithTimeout(ctx, c.clientTimeout)
defer cancel()
tags, err := c.client.Tags(ctx)
if ctx.Err() == context.DeadlineExceeded {
return nil, c.clientTimeoutError()
}
return tags, err
}

// storeRepository stores the repository from the cache
func (c *repoCacheManager) storeRepository(repo ImageRepository) error {
repoKey := NewRepositoryKey(c.repoID.CanonicalName())
Expand Down Expand Up @@ -154,7 +175,7 @@ func (c *repoCacheManager) fetchImages(tags []string) (fetchImagesResult, error)
// updateImages, refreshes the cache entries for the images passed. It may not succeed for all images.
// It returns the values stored in cache, the number of images it succeeded for and the number
// of images whose manifest wasn't found in the registry.
func (c *repoCacheManager) updateImages(ctx context.Context, registryClient registry.Client, images []imageToUpdate) (map[string]image.Info, int, int) {
func (c *repoCacheManager) updateImages(ctx context.Context, images []imageToUpdate) (map[string]image.Info, int, int) {
// The upper bound for concurrent fetches against a single host is
// w.Burst, so limit the number of fetching goroutines to that.
fetchers := make(chan struct{}, c.burst)
Expand All @@ -179,9 +200,11 @@ updates:
awaitFetchers.Add(1)
go func() {
defer func() { awaitFetchers.Done(); <-fetchers }()
entry, err := c.updateImage(ctxc, registryClient, upCopy)
ctxcc, cancel := context.WithTimeout(ctxc, c.clientTimeout)
defer cancel()
entry, err := c.updateImage(ctxcc, upCopy)
if err != nil {
if err, ok := errors.Cause(err).(net.Error); ok && err.Timeout() {
if err, ok := errors.Cause(err).(net.Error); (ok && err.Timeout()) || ctxcc.Err() == context.DeadlineExceeded {
// This was due to a context timeout, don't bother logging
return
}
Expand Down Expand Up @@ -216,16 +239,21 @@ updates:
return result, successCount, manifestUnknownCount
}

func (c *repoCacheManager) updateImage(ctx context.Context, registryClient registry.Client, update imageToUpdate) (registry.ImageEntry, error) {
func (c *repoCacheManager) updateImage(ctx context.Context, update imageToUpdate) (registry.ImageEntry, error) {
imageID := update.ref

if c.trace {
c.logger.Log("trace", "refreshing manifest", "ref", imageID, "previous_refresh", update.previousRefresh.String())
}

ctx, cancel := context.WithTimeout(ctx, c.clientTimeout)
defer cancel()
// Get the image from the remote
entry, err := registryClient.Manifest(ctx, imageID.Tag)
entry, err := c.client.Manifest(ctx, imageID.Tag)
if err != nil {
if ctx.Err() == context.DeadlineExceeded {
return registry.ImageEntry{}, c.clientTimeoutError()
}
return registry.ImageEntry{}, err
}

Expand Down Expand Up @@ -266,3 +294,7 @@ func (c *repoCacheManager) updateImage(ctx context.Context, registryClient regis
}
return entry, nil
}

func (r *repoCacheManager) clientTimeoutError() error {
return fmt.Errorf("client timeout (%s) exceeded", r.clientTimeout)
}
59 changes: 59 additions & 0 deletions registry/cache/repocachemanager_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package cache

import (
"context"
"net/http"
"net/http/httptest"
"net/url"
"os"
"testing"
"time"

"github.com/go-kit/kit/log"
"github.com/stretchr/testify/assert"

"github.com/weaveworks/flux/image"
"github.com/weaveworks/flux/registry"
"github.com/weaveworks/flux/registry/middleware"
)

func Test_ClientTimeouts(t *testing.T) {
timeout := 2 * time.Millisecond
server := httptest.NewServer(http.HandlerFunc(func(http.ResponseWriter, *http.Request) {
// make sure we exceed the timeout
time.Sleep(timeout * 10)
}))
defer server.Close()
url, err := url.Parse(server.URL)
assert.NoError(t, err)
logger := log.NewLogfmtLogger(os.Stdout)
cf := &registry.RemoteClientFactory{
Logger: log.NewLogfmtLogger(os.Stdout),
Limiters: &middleware.RateLimiters{
RPS: 100,
Burst: 100,
Logger: logger,
},
Trace: false,
InsecureHosts: []string{url.Host},
}
name := image.Name{
Domain: url.Host,
Image: "foo/bar",
}
rcm, err := newRepoCacheManager(
time.Now(),
name,
cf,
registry.NoCredentials(),
timeout,
100,
false,
logger,
nil,
)
assert.NoError(t, err)
_, err = rcm.getTags(context.Background())
assert.Error(t, err)
assert.Equal(t, "client timeout (2ms) exceeded", err.Error())
}
8 changes: 3 additions & 5 deletions registry/cache/warming.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,14 +149,12 @@ func imageCredsToBacklog(imageCreds registry.ImageCreds) []backlogItem {
func (w *Warmer) warm(ctx context.Context, now time.Time, logger log.Logger, id image.Name, creds registry.Credentials) {
errorLogger := log.With(logger, "canonical_name", id.CanonicalName(), "auth", creds)

client, err := w.clientFactory.ClientFor(id.CanonicalName(), creds)
cacheManager, err := newRepoCacheManager(now, id, w.clientFactory, creds, time.Minute, w.burst, w.Trace, errorLogger, w.cache)
if err != nil {
errorLogger.Log("err", err.Error())
return
}

cacheManager := newRepoCacheManager(now, id, w.burst, w.Trace, errorLogger, w.cache)

// This is what we're going to write back to the cache
var repo ImageRepository
repo, err = cacheManager.fetchRepository()
Expand All @@ -176,7 +174,7 @@ func (w *Warmer) warm(ctx context.Context, now time.Time, logger log.Logger, id
}
}()

tags, err := client.Tags(ctx)
tags, err := cacheManager.getTags(ctx)
if err != nil {
if !strings.Contains(err.Error(), context.DeadlineExceeded.Error()) && !strings.Contains(err.Error(), "net/http: request canceled") {
errorLogger.Log("err", errors.Wrap(err, "requesting tags"))
Expand All @@ -201,7 +199,7 @@ func (w *Warmer) warm(ctx context.Context, now time.Time, logger log.Logger, id
"to_update", len(fetchResult.imagesToUpdate),
"of_which_refresh", fetchResult.imagesToUpdateRefreshCount, "of_which_missing", fetchResult.imagesToUpdateMissingCount)
var images map[string]image.Info
images, successCount, manifestUnknownCount = cacheManager.updateImages(ctx, client, fetchResult.imagesToUpdate)
images, successCount, manifestUnknownCount = cacheManager.updateImages(ctx, fetchResult.imagesToUpdate)
for k, v := range images {
newImages[k] = v
}
Expand Down