From 6ac92e8a8b422c27c069f5e69b8f71eccfcc4019 Mon Sep 17 00:00:00 2001 From: Jon Johnson Date: Thu, 6 Apr 2023 06:47:45 -0700 Subject: [PATCH] Refactor fetcher, writer, and progress (#1625) This will allow reuse across a repository. One major difference is that keychains are no longer resolved within the option execution, but lazily during fetcher/writer construction, which enables us to create options without knowing the repo. --- pkg/v1/remote/catalog.go | 24 ++---- pkg/v1/remote/delete.go | 7 +- pkg/v1/remote/descriptor.go | 95 ++++++++++++++++-------- pkg/v1/remote/descriptor_test.go | 4 +- pkg/v1/remote/image.go | 5 +- pkg/v1/remote/image_test.go | 24 +++--- pkg/v1/remote/index.go | 20 +++-- pkg/v1/remote/index_test.go | 5 +- pkg/v1/remote/layer.go | 4 +- pkg/v1/remote/list.go | 8 +- pkg/v1/remote/multi_write.go | 21 ++---- pkg/v1/remote/options.go | 15 ++-- pkg/v1/remote/progress.go | 7 ++ pkg/v1/remote/referrers.go | 4 +- pkg/v1/remote/write.go | 121 ++++++++++++------------------- 15 files changed, 179 insertions(+), 185 deletions(-) diff --git a/pkg/v1/remote/catalog.go b/pkg/v1/remote/catalog.go index eb4306f28..baa1c771b 100644 --- a/pkg/v1/remote/catalog.go +++ b/pkg/v1/remote/catalog.go @@ -31,32 +31,27 @@ type catalog struct { // CatalogPage calls /_catalog, returning the list of repositories on the registry. func CatalogPage(target name.Registry, last string, n int, options ...Option) ([]string, error) { - o, err := makeOptions(target, options...) + o, err := makeOptions(options...) if err != nil { return nil, err } - - scopes := []string{target.Scope(transport.PullScope)} - tr, err := transport.NewWithContext(o.context, target, o.auth, o.transport, scopes) + f, err := makeFetcher(o.context, target, o) if err != nil { return nil, err } - query := fmt.Sprintf("last=%s&n=%d", url.QueryEscape(last), n) - uri := url.URL{ Scheme: target.Scheme(), Host: target.RegistryStr(), Path: "/v2/_catalog", - RawQuery: query, + RawQuery: fmt.Sprintf("last=%s&n=%d", url.QueryEscape(last), n), } - client := http.Client{Transport: tr} req, err := http.NewRequest(http.MethodGet, uri.String(), nil) if err != nil { return nil, err } - resp, err := client.Do(req.WithContext(o.context)) + resp, err := f.client.Do(req.WithContext(o.context)) if err != nil { return nil, err } @@ -76,13 +71,11 @@ func CatalogPage(target name.Registry, last string, n int, options ...Option) ([ // Catalog calls /_catalog, returning the list of repositories on the registry. func Catalog(ctx context.Context, target name.Registry, options ...Option) ([]string, error) { - o, err := makeOptions(target, options...) + o, err := makeOptions(options...) if err != nil { return nil, err } - - scopes := []string{target.Scope(transport.PullScope)} - tr, err := transport.NewWithContext(o.context, target, o.auth, o.transport, scopes) + f, err := makeFetcher(o.context, target, o) if err != nil { return nil, err } @@ -92,13 +85,10 @@ func Catalog(ctx context.Context, target name.Registry, options ...Option) ([]st Host: target.RegistryStr(), Path: "/v2/_catalog", } - if o.pageSize > 0 { uri.RawQuery = fmt.Sprintf("n=%d", o.pageSize) } - client := http.Client{Transport: tr} - // WithContext overrides the ctx passed directly. if o.context != context.Background() { ctx = o.context @@ -123,7 +113,7 @@ func Catalog(ctx context.Context, target name.Registry, options ...Option) ([]st } req = req.WithContext(ctx) - resp, err := client.Do(req) + resp, err := f.client.Do(req) if err != nil { return nil, err } diff --git a/pkg/v1/remote/delete.go b/pkg/v1/remote/delete.go index 74a06fd22..78868c8ff 100644 --- a/pkg/v1/remote/delete.go +++ b/pkg/v1/remote/delete.go @@ -25,16 +25,15 @@ import ( // Delete removes the specified image reference from the remote registry. func Delete(ref name.Reference, options ...Option) error { - o, err := makeOptions(ref.Context(), options...) + o, err := makeOptions(options...) if err != nil { return err } - scopes := []string{ref.Scope(transport.DeleteScope)} - tr, err := transport.NewWithContext(o.context, ref.Context().Registry, o.auth, o.transport, scopes) + w, err := makeWriter(o.context, ref.Context(), nil, o) if err != nil { return err } - c := &http.Client{Transport: tr} + c := w.client u := url.URL{ Scheme: ref.Context().Registry.Scheme(), diff --git a/pkg/v1/remote/descriptor.go b/pkg/v1/remote/descriptor.go index 78919d7a8..31247077d 100644 --- a/pkg/v1/remote/descriptor.go +++ b/pkg/v1/remote/descriptor.go @@ -27,6 +27,7 @@ import ( "github.com/google/go-containerregistry/internal/redact" "github.com/google/go-containerregistry/internal/verify" + "github.com/google/go-containerregistry/pkg/authn" "github.com/google/go-containerregistry/pkg/logs" "github.com/google/go-containerregistry/pkg/name" v1 "github.com/google/go-containerregistry/pkg/v1" @@ -59,6 +60,8 @@ func (e *ErrSchema1) Error() string { type Descriptor struct { fetcher v1.Descriptor + + ref name.Reference Manifest []byte // So we can share this implementation with Image. @@ -100,36 +103,37 @@ func Head(ref name.Reference, options ...Option) (*v1.Descriptor, error) { acceptable = append(acceptable, acceptableImageMediaTypes...) acceptable = append(acceptable, acceptableIndexMediaTypes...) - o, err := makeOptions(ref.Context(), options...) + o, err := makeOptions(options...) if err != nil { return nil, err } - f, err := makeFetcher(ref, o) + f, err := makeFetcher(o.context, ref.Context(), o) if err != nil { return nil, err } - return f.headManifest(ref, acceptable) + return f.headManifest(o.context, ref, acceptable) } // Handle options and fetch the manifest with the acceptable MediaTypes in the // Accept header. func get(ref name.Reference, acceptable []types.MediaType, options ...Option) (*Descriptor, error) { - o, err := makeOptions(ref.Context(), options...) + o, err := makeOptions(options...) if err != nil { return nil, err } - f, err := makeFetcher(ref, o) + f, err := makeFetcher(o.context, ref.Context(), o) if err != nil { return nil, err } - b, desc, err := f.fetchManifest(ref, acceptable) + b, desc, err := f.fetchManifest(o.context, ref, acceptable) if err != nil { return nil, err } return &Descriptor{ fetcher: *f, + ref: ref, Manifest: b, Descriptor: *desc, platform: o.platform, @@ -169,7 +173,7 @@ func (d *Descriptor) Image() (v1.Image, error) { } return &mountableImage{ Image: imgCore, - Reference: d.Ref, + Reference: d.ref, }, nil } @@ -196,6 +200,7 @@ func (d *Descriptor) ImageIndex() (v1.ImageIndex, error) { func (d *Descriptor) remoteImage() *remoteImage { return &remoteImage{ fetcher: d.fetcher, + ref: d.ref, manifest: d.Manifest, mediaType: d.MediaType, descriptor: &d.Descriptor, @@ -205,38 +210,70 @@ func (d *Descriptor) remoteImage() *remoteImage { func (d *Descriptor) remoteIndex() *remoteIndex { return &remoteIndex{ fetcher: d.fetcher, + ref: d.ref, manifest: d.Manifest, mediaType: d.MediaType, descriptor: &d.Descriptor, } } +type resource interface { + Scheme() string + RegistryStr() string + Scope(string) string + + authn.Resource +} + // fetcher implements methods for reading from a registry. type fetcher struct { - Ref name.Reference - Client *http.Client + target resource + client *http.Client context context.Context } -func makeFetcher(ref name.Reference, o *options) (*fetcher, error) { - tr, err := transport.NewWithContext(o.context, ref.Context().Registry, o.auth, o.transport, []string{ref.Scope(transport.PullScope)}) +func makeFetcher(ctx context.Context, target resource, o *options) (*fetcher, error) { + auth := o.auth + if o.keychain != nil { + kauth, err := o.keychain.Resolve(target) + if err != nil { + return nil, err + } + auth = kauth + } + + reg, ok := target.(name.Registry) + if !ok { + repo, ok := target.(name.Repository) + if !ok { + return nil, fmt.Errorf("unexpected resource: %T", target) + } + reg = repo.Registry + } + + tr, err := transport.NewWithContext(ctx, reg, auth, o.transport, []string{target.Scope(transport.PullScope)}) if err != nil { return nil, err } return &fetcher{ - Ref: ref, - Client: &http.Client{Transport: tr}, - context: o.context, + target: target, + client: &http.Client{Transport: tr}, + context: ctx, }, nil } // url returns a url.Url for the specified path in the context of this remote image reference. func (f *fetcher) url(resource, identifier string) url.URL { - return url.URL{ - Scheme: f.Ref.Context().Registry.Scheme(), - Host: f.Ref.Context().RegistryStr(), - Path: fmt.Sprintf("/v2/%s/%s/%s", f.Ref.Context().RepositoryStr(), resource, identifier), + u := url.URL{ + Scheme: f.target.Scheme(), + Host: f.target.RegistryStr(), + // Default path if this is not a repository. + Path: "/v2/_catalog", + } + if repo, ok := f.target.(name.Repository); ok { + u.Path = fmt.Sprintf("/v2/%s/%s/%s", repo.RepositoryStr(), resource, identifier) } + return u } // https://github.com/opencontainers/distribution-spec/blob/main/spec.md#referrers-tag-schema @@ -253,7 +290,7 @@ func (f *fetcher) fetchReferrers(ctx context.Context, filter map[string]string, } req.Header.Set("Accept", string(types.OCIImageIndex)) - resp, err := f.Client.Do(req) + resp, err := f.client.Do(req) if err != nil { return nil, err } @@ -271,7 +308,7 @@ func (f *fetcher) fetchReferrers(ctx context.Context, filter map[string]string, } // The registry doesn't support the Referrers API endpoint, so we'll use the fallback tag scheme. - b, _, err := f.fetchManifest(fallbackTag(d), []types.MediaType{types.OCIImageIndex}) + b, _, err := f.fetchManifest(ctx, fallbackTag(d), []types.MediaType{types.OCIImageIndex}) if err != nil { return nil, err } @@ -289,7 +326,7 @@ func (f *fetcher) fetchReferrers(ctx context.Context, filter map[string]string, return filterReferrersResponse(filter, &im), nil } -func (f *fetcher) fetchManifest(ref name.Reference, acceptable []types.MediaType) ([]byte, *v1.Descriptor, error) { +func (f *fetcher) fetchManifest(ctx context.Context, ref name.Reference, acceptable []types.MediaType) ([]byte, *v1.Descriptor, error) { u := f.url("manifests", ref.Identifier()) req, err := http.NewRequest(http.MethodGet, u.String(), nil) if err != nil { @@ -301,7 +338,7 @@ func (f *fetcher) fetchManifest(ref name.Reference, acceptable []types.MediaType } req.Header.Set("Accept", strings.Join(accept, ",")) - resp, err := f.Client.Do(req.WithContext(f.context)) + resp, err := f.client.Do(req.WithContext(ctx)) if err != nil { return nil, nil, err } @@ -332,7 +369,7 @@ func (f *fetcher) fetchManifest(ref name.Reference, acceptable []types.MediaType // Validate the digest matches what we asked for, if pulling by digest. if dgst, ok := ref.(name.Digest); ok { if digest.String() != dgst.DigestStr() { - return nil, nil, fmt.Errorf("manifest digest: %q does not match requested digest: %q for %q", digest, dgst.DigestStr(), f.Ref) + return nil, nil, fmt.Errorf("manifest digest: %q does not match requested digest: %q for %q", digest, dgst.DigestStr(), ref) } } @@ -363,7 +400,7 @@ func (f *fetcher) fetchManifest(ref name.Reference, acceptable []types.MediaType return manifest, &desc, nil } -func (f *fetcher) headManifest(ref name.Reference, acceptable []types.MediaType) (*v1.Descriptor, error) { +func (f *fetcher) headManifest(ctx context.Context, ref name.Reference, acceptable []types.MediaType) (*v1.Descriptor, error) { u := f.url("manifests", ref.Identifier()) req, err := http.NewRequest(http.MethodHead, u.String(), nil) if err != nil { @@ -375,7 +412,7 @@ func (f *fetcher) headManifest(ref name.Reference, acceptable []types.MediaType) } req.Header.Set("Accept", strings.Join(accept, ",")) - resp, err := f.Client.Do(req.WithContext(f.context)) + resp, err := f.client.Do(req.WithContext(ctx)) if err != nil { return nil, err } @@ -408,7 +445,7 @@ func (f *fetcher) headManifest(ref name.Reference, acceptable []types.MediaType) // Validate the digest matches what we asked for, if pulling by digest. if dgst, ok := ref.(name.Digest); ok { if digest.String() != dgst.DigestStr() { - return nil, fmt.Errorf("manifest digest: %q does not match requested digest: %q for %q", digest, dgst.DigestStr(), f.Ref) + return nil, fmt.Errorf("manifest digest: %q does not match requested digest: %q for %q", digest, dgst.DigestStr(), ref) } } @@ -427,7 +464,7 @@ func (f *fetcher) fetchBlob(ctx context.Context, size int64, h v1.Hash) (io.Read return nil, err } - resp, err := f.Client.Do(req.WithContext(ctx)) + resp, err := f.client.Do(req.WithContext(ctx)) if err != nil { return nil, redact.Error(err) } @@ -458,7 +495,7 @@ func (f *fetcher) headBlob(h v1.Hash) (*http.Response, error) { return nil, err } - resp, err := f.Client.Do(req.WithContext(f.context)) + resp, err := f.client.Do(req.WithContext(f.context)) if err != nil { return nil, redact.Error(err) } @@ -478,7 +515,7 @@ func (f *fetcher) blobExists(h v1.Hash) (bool, error) { return false, err } - resp, err := f.Client.Do(req.WithContext(f.context)) + resp, err := f.client.Do(req.WithContext(f.context)) if err != nil { return false, redact.Error(err) } diff --git a/pkg/v1/remote/descriptor_test.go b/pkg/v1/remote/descriptor_test.go index 1b77f80b0..fe167f628 100644 --- a/pkg/v1/remote/descriptor_test.go +++ b/pkg/v1/remote/descriptor_test.go @@ -225,8 +225,8 @@ func TestHead_MissingHeaders(t *testing.T) { func TestRedactFetchBlob(t *testing.T) { ctx := context.Background() f := fetcher{ - Ref: mustNewTag(t, "original.com/repo:latest"), - Client: &http.Client{ + target: mustNewTag(t, "original.com/repo:latest").Context(), + client: &http.Client{ Transport: errTransport{}, }, context: ctx, diff --git a/pkg/v1/remote/image.go b/pkg/v1/remote/image.go index fde614274..ec655d84e 100644 --- a/pkg/v1/remote/image.go +++ b/pkg/v1/remote/image.go @@ -38,6 +38,7 @@ var acceptableImageMediaTypes = []types.MediaType{ // remoteImage accesses an image from a remote registry type remoteImage struct { fetcher + ref name.Reference manifestLock sync.Mutex // Protects manifest manifest []byte configLock sync.Mutex // Protects config @@ -84,7 +85,7 @@ func (r *remoteImage) RawManifest() ([]byte, error) { // NOTE(jonjohnsonjr): We should never get here because the public entrypoints // do type-checking via remote.Descriptor. I've left this here for tests that // directly instantiate a remoteImage. - manifest, desc, err := r.fetchManifest(r.Ref, acceptableImageMediaTypes) + manifest, desc, err := r.fetchManifest(r.context, r.ref, acceptableImageMediaTypes) if err != nil { return nil, err } @@ -186,7 +187,7 @@ func (rl *remoteImageLayer) Compressed() (io.ReadCloser, error) { return nil, err } - resp, err := rl.ri.Client.Do(req.WithContext(ctx)) + resp, err := rl.ri.client.Do(req.WithContext(ctx)) if err != nil { lastErr = err continue diff --git a/pkg/v1/remote/image_test.go b/pkg/v1/remote/image_test.go index 4a6c29d32..0930b2346 100644 --- a/pkg/v1/remote/image_test.go +++ b/pkg/v1/remote/image_test.go @@ -178,9 +178,10 @@ func TestRawManifestDigests(t *testing.T) { } rmt := remoteImage{ + ref: ref, fetcher: fetcher{ - Ref: ref, - Client: http.DefaultClient, + target: ref.Context(), + client: http.DefaultClient, context: context.Background(), }, } @@ -212,10 +213,12 @@ func TestRawManifestNotFound(t *testing.T) { t.Fatalf("url.Parse(%v) = %v", server.URL, err) } + ref := mustNewTag(t, fmt.Sprintf("%s/%s:latest", u.Host, expectedRepo)) img := remoteImage{ + ref: ref, fetcher: fetcher{ - Ref: mustNewTag(t, fmt.Sprintf("%s/%s:latest", u.Host, expectedRepo)), - Client: http.DefaultClient, + target: ref.Context(), + client: http.DefaultClient, context: context.Background(), }, } @@ -252,10 +255,12 @@ func TestRawConfigFileNotFound(t *testing.T) { t.Fatalf("url.Parse(%v) = %v", server.URL, err) } + ref := mustNewTag(t, fmt.Sprintf("%s/%s:latest", u.Host, expectedRepo)) rmt := remoteImage{ + ref: ref, fetcher: fetcher{ - Ref: mustNewTag(t, fmt.Sprintf("%s/%s:latest", u.Host, expectedRepo)), - Client: http.DefaultClient, + target: ref.Context(), + client: http.DefaultClient, context: context.Background(), }, } @@ -292,11 +297,12 @@ func TestAcceptHeaders(t *testing.T) { if err != nil { t.Fatalf("url.Parse(%v) = %v", server.URL, err) } - + ref := mustNewTag(t, fmt.Sprintf("%s/%s:latest", u.Host, expectedRepo)) rmt := &remoteImage{ + ref: ref, fetcher: fetcher{ - Ref: mustNewTag(t, fmt.Sprintf("%s/%s:latest", u.Host, expectedRepo)), - Client: http.DefaultClient, + target: ref.Context(), + client: http.DefaultClient, context: context.Background(), }, } diff --git a/pkg/v1/remote/index.go b/pkg/v1/remote/index.go index 0939947e3..23409cc21 100644 --- a/pkg/v1/remote/index.go +++ b/pkg/v1/remote/index.go @@ -34,6 +34,7 @@ var acceptableIndexMediaTypes = []types.MediaType{ // remoteIndex accesses an index from a remote registry type remoteIndex struct { fetcher + ref name.Reference manifestLock sync.Mutex // Protects manifest manifest []byte mediaType types.MediaType @@ -75,7 +76,7 @@ func (r *remoteIndex) RawManifest() ([]byte, error) { // NOTE(jonjohnsonjr): We should never get here because the public entrypoints // do type-checking via remote.Descriptor. I've left this here for tests that // directly instantiate a remoteIndex. - manifest, desc, err := r.fetchManifest(r.Ref, acceptableIndexMediaTypes) + manifest, desc, err := r.fetchManifest(r.context, r.ref, acceptableIndexMediaTypes) if err != nil { return nil, err } @@ -140,7 +141,7 @@ func (r *remoteIndex) Layer(h v1.Hash) (v1.Layer, error) { } return &MountableLayer{ Layer: l, - Reference: r.Ref.Context().Digest(h.String()), + Reference: r.ref.Context().Digest(h.String()), }, nil } } @@ -216,7 +217,7 @@ func (r *remoteIndex) childByPlatform(platform v1.Platform) (*Descriptor, error) return r.childDescriptor(childDesc, platform) } } - return nil, fmt.Errorf("no child with platform %+v in index %s", platform, r.Ref) + return nil, fmt.Errorf("no child with platform %+v in index %s", platform, r.ref) } func (r *remoteIndex) childByHash(h v1.Hash) (*Descriptor, error) { @@ -229,12 +230,12 @@ func (r *remoteIndex) childByHash(h v1.Hash) (*Descriptor, error) { return r.childDescriptor(childDesc, defaultPlatform) } } - return nil, fmt.Errorf("no child with digest %s in index %s", h, r.Ref) + return nil, fmt.Errorf("no child with digest %s in index %s", h, r.ref) } // Convert one of this index's child's v1.Descriptor into a remote.Descriptor, with the given platform option. func (r *remoteIndex) childDescriptor(child v1.Descriptor, platform v1.Platform) (*Descriptor, error) { - ref := r.Ref.Context().Digest(child.Digest.String()) + ref := r.ref.Context().Digest(child.Digest.String()) var ( manifest []byte err error @@ -245,7 +246,7 @@ func (r *remoteIndex) childDescriptor(child v1.Descriptor, platform v1.Platform) } manifest = child.Data } else { - manifest, _, err = r.fetchManifest(ref, []types.MediaType{child.MediaType}) + manifest, _, err = r.fetchManifest(r.context, ref, []types.MediaType{child.MediaType}) if err != nil { return nil, err } @@ -261,11 +262,8 @@ func (r *remoteIndex) childDescriptor(child v1.Descriptor, platform v1.Platform) } return &Descriptor{ - fetcher: fetcher{ - Ref: ref, - Client: r.Client, - context: r.context, - }, + fetcher: r.fetcher, + ref: ref, Manifest: manifest, Descriptor: child, platform: platform, diff --git a/pkg/v1/remote/index_test.go b/pkg/v1/remote/index_test.go index 4399b1635..52d6b5bc1 100644 --- a/pkg/v1/remote/index_test.go +++ b/pkg/v1/remote/index_test.go @@ -140,9 +140,10 @@ func TestIndexRawManifestDigests(t *testing.T) { } rmt := remoteIndex{ + ref: ref, fetcher: fetcher{ - Ref: ref, - Client: http.DefaultClient, + target: ref.Context(), + client: http.DefaultClient, context: context.Background(), }, } diff --git a/pkg/v1/remote/layer.go b/pkg/v1/remote/layer.go index b2126f599..8bf32c297 100644 --- a/pkg/v1/remote/layer.go +++ b/pkg/v1/remote/layer.go @@ -68,11 +68,11 @@ func (rl *remoteLayer) Exists() (bool, error) { // digest of the blob to be read and the repository portion is the repo where // that blob lives. func Layer(ref name.Digest, options ...Option) (v1.Layer, error) { - o, err := makeOptions(ref.Context(), options...) + o, err := makeOptions(options...) if err != nil { return nil, err } - f, err := makeFetcher(ref, o) + f, err := makeFetcher(o.context, ref.Context(), o) if err != nil { return nil, err } diff --git a/pkg/v1/remote/list.go b/pkg/v1/remote/list.go index e643c49aa..5e32abc26 100644 --- a/pkg/v1/remote/list.go +++ b/pkg/v1/remote/list.go @@ -41,12 +41,11 @@ func ListWithContext(ctx context.Context, repo name.Repository, options ...Optio // List calls /tags/list for the given repository, returning the list of tags // in the "tags" property. func List(repo name.Repository, options ...Option) ([]string, error) { - o, err := makeOptions(repo, options...) + o, err := makeOptions(options...) if err != nil { return nil, err } - scopes := []string{repo.Scope(transport.PullScope)} - tr, err := transport.NewWithContext(o.context, repo.Registry, o.auth, o.transport, scopes) + f, err := makeFetcher(o.context, repo, o) if err != nil { return nil, err } @@ -61,7 +60,6 @@ func List(repo name.Repository, options ...Option) ([]string, error) { uri.RawQuery = fmt.Sprintf("n=%d", o.pageSize) } - client := http.Client{Transport: tr} tagList := []string{} parsed := tags{} @@ -78,7 +76,7 @@ func List(repo name.Repository, options ...Option) ([]string, error) { return nil, err } - resp, err := client.Do(req) + resp, err := f.client.Do(req) if err != nil { return nil, err } diff --git a/pkg/v1/remote/multi_write.go b/pkg/v1/remote/multi_write.go index 7f32413ce..705e09281 100644 --- a/pkg/v1/remote/multi_write.go +++ b/pkg/v1/remote/multi_write.go @@ -17,12 +17,10 @@ package remote import ( "context" "fmt" - "net/http" "github.com/google/go-containerregistry/pkg/name" v1 "github.com/google/go-containerregistry/pkg/v1" "github.com/google/go-containerregistry/pkg/v1/partial" - "github.com/google/go-containerregistry/pkg/v1/remote/transport" "github.com/google/go-containerregistry/pkg/v1/types" "golang.org/x/sync/errgroup" ) @@ -46,7 +44,7 @@ func MultiWrite(m map[name.Reference]Taggable, options ...Option) (rerr error) { } } - o, err := makeOptions(repo, options...) + o, err := makeOptions(options...) if err != nil { return err } @@ -81,24 +79,15 @@ func MultiWrite(m map[name.Reference]Taggable, options ...Option) (rerr error) { for _, l := range blobs { ls = append(ls, l) } - scopes := scopesForUploadingImage(repo, ls) - tr, err := transport.NewWithContext(o.context, repo.Registry, o.auth, o.transport, scopes) + w, err := makeWriter(o.context, repo, ls, o) if err != nil { return err } - w := writer{ - repo: repo, - client: &http.Client{Transport: tr}, - backoff: o.retryBackoff, - predicate: o.retryPredicate, - } // Collect the total size of blobs and manifests we're about to write. - if o.updates != nil { - w.progress = &progress{updates: o.updates} - w.progress.lastUpdate = &v1.Update{} - defer close(o.updates) - defer func() { _ = w.progress.err(rerr) }() + if w.progress != nil { + defer func() { w.progress.Close(rerr) }() + for _, b := range blobs { size, err := b.Size() if err != nil { diff --git a/pkg/v1/remote/options.go b/pkg/v1/remote/options.go index 595496c65..aaafb9bf5 100644 --- a/pkg/v1/remote/options.go +++ b/pkg/v1/remote/options.go @@ -42,7 +42,7 @@ type options struct { jobs int userAgent string allowNondistributableArtifacts bool - updates chan<- v1.Update + progress *progress pageSize int retryBackoff Backoff retryPredicate retry.Predicate @@ -113,9 +113,11 @@ var DefaultTransport http.RoundTripper = &http.Transport{ IdleConnTimeout: 90 * time.Second, TLSHandshakeTimeout: 10 * time.Second, ExpectContinueTimeout: 1 * time.Second, + // We usually are dealing with 2 hosts (at most), split MaxIdleConns between them. + MaxIdleConnsPerHost: 50, } -func makeOptions(target authn.Resource, opts ...Option) (*options, error) { +func makeOptions(opts ...Option) (*options, error) { o := &options{ transport: DefaultTransport, platform: defaultPlatform, @@ -137,12 +139,6 @@ func makeOptions(target authn.Resource, opts ...Option) (*options, error) { // It is a better experience to explicitly tell a caller their auth is misconfigured // than potentially fail silently when the correct auth is overridden by option misuse. return nil, errors.New("provide an option for either authn.Authenticator or authn.Keychain, not both") - case o.keychain != nil: - auth, err := o.keychain.Resolve(target) - if err != nil { - return nil, err - } - o.auth = auth case o.auth == nil: o.auth = authn.Anonymous } @@ -274,7 +270,8 @@ func WithNondistributable(o *options) error { // should provide a buffered channel to avoid potential deadlocks. func WithProgress(updates chan<- v1.Update) Option { return func(o *options) error { - o.updates = updates + o.progress = &progress{updates: updates} + o.progress.lastUpdate = &v1.Update{} return nil } } diff --git a/pkg/v1/remote/progress.go b/pkg/v1/remote/progress.go index 1f4396350..fe60c8c35 100644 --- a/pkg/v1/remote/progress.go +++ b/pkg/v1/remote/progress.go @@ -29,6 +29,8 @@ type progress struct { } func (p *progress) total(delta int64) { + p.Lock() + defer p.Unlock() atomic.AddInt64(&p.lastUpdate.Total, delta) } @@ -48,6 +50,11 @@ func (p *progress) err(err error) error { return err } +func (p *progress) Close(err error) { + _ = p.err(err) + close(p.updates) +} + type progressReader struct { rc io.ReadCloser diff --git a/pkg/v1/remote/referrers.go b/pkg/v1/remote/referrers.go index b3db863d1..159785f10 100644 --- a/pkg/v1/remote/referrers.go +++ b/pkg/v1/remote/referrers.go @@ -23,11 +23,11 @@ import ( // // The subject manifest doesn't have to exist in the registry for there to be descriptors that refer to it. func Referrers(d name.Digest, options ...Option) (*v1.IndexManifest, error) { - o, err := makeOptions(d.Context(), options...) + o, err := makeOptions(options...) if err != nil { return nil, err } - f, err := makeFetcher(d, o) + f, err := makeFetcher(o.context, d.Context(), o) if err != nil { return nil, err } diff --git a/pkg/v1/remote/write.go b/pkg/v1/remote/write.go index 5dbaa7c23..9606d7e19 100644 --- a/pkg/v1/remote/write.go +++ b/pkg/v1/remote/write.go @@ -45,42 +45,31 @@ type Taggable interface { // Write pushes the provided img to the specified image reference. func Write(ref name.Reference, img v1.Image, options ...Option) (rerr error) { - o, err := makeOptions(ref.Context(), options...) + o, err := makeOptions(options...) if err != nil { return err } - var p *progress - if o.updates != nil { - p = &progress{updates: o.updates} - p.lastUpdate = &v1.Update{} - p.lastUpdate.Total, err = countImage(img, o.allowNondistributableArtifacts) + if o.progress != nil { + o.progress.lastUpdate.Total, err = countImage(img, o.allowNondistributableArtifacts) if err != nil { return err } - defer close(o.updates) - defer func() { _ = p.err(rerr) }() + defer func() { o.progress.Close(rerr) }() } - return writeImage(o.context, ref, img, o, p) + return writeImage(o.context, ref, img, o) } -func writeImage(ctx context.Context, ref name.Reference, img v1.Image, o *options, progress *progress) error { +func writeImage(ctx context.Context, ref name.Reference, img v1.Image, o *options) error { ls, err := img.Layers() if err != nil { return err } - scopes := scopesForUploadingImage(ref.Context(), ls) - tr, err := transport.NewWithContext(o.context, ref.Context().Registry, o.auth, o.transport, scopes) + + w, err := makeWriter(ctx, ref.Context(), ls, o) if err != nil { return err } - w := writer{ - repo: ref.Context(), - client: &http.Client{Transport: tr}, - progress: progress, - backoff: o.retryBackoff, - predicate: o.retryPredicate, - } // Upload individual blobs and collect any errors. blobChan := make(chan v1.Layer, 2*o.jobs) @@ -178,6 +167,29 @@ type writer struct { predicate retry.Predicate } +func makeWriter(ctx context.Context, repo name.Repository, ls []v1.Layer, o *options) (*writer, error) { + auth := o.auth + if o.keychain != nil { + kauth, err := o.keychain.Resolve(repo) + if err != nil { + return nil, err + } + auth = kauth + } + scopes := scopesForUploadingImage(repo, ls) + tr, err := transport.NewWithContext(ctx, repo.Registry, auth, o.transport, scopes) + if err != nil { + return nil, err + } + return &writer{ + repo: repo, + client: &http.Client{Transport: tr}, + progress: o.progress, + backoff: o.retryBackoff, + predicate: o.retryPredicate, + }, nil +} + // url returns a url.Url for the specified path in the context of this remote image reference. func (w *writer) url(path string) url.URL { return url.URL{ @@ -478,17 +490,12 @@ type withLayer interface { Layer(v1.Hash) (v1.Layer, error) } -func (w *writer) writeIndex(ctx context.Context, ref name.Reference, ii v1.ImageIndex, options ...Option) error { +func (w *writer) writeIndex(ctx context.Context, ref name.Reference, ii v1.ImageIndex, o *options) error { index, err := ii.IndexManifest() if err != nil { return err } - o, err := makeOptions(ref.Context(), options...) - if err != nil { - return err - } - // TODO(#803): Pipe through remote.WithJobs and upload these in parallel. for _, desc := range index.Manifests { ref := ref.Context().Digest(desc.Digest.String()) @@ -507,7 +514,7 @@ func (w *writer) writeIndex(ctx context.Context, ref name.Reference, ii v1.Image if err != nil { return err } - if err := w.writeIndex(ctx, ref, ii, options...); err != nil { + if err := w.writeIndex(ctx, ref, ii, o); err != nil { return err } case types.OCIManifestSchema1, types.DockerManifestSchema2: @@ -515,7 +522,7 @@ func (w *writer) writeIndex(ctx context.Context, ref name.Reference, ii v1.Image if err != nil { return err } - if err := writeImage(ctx, ref, img, o, w.progress); err != nil { + if err := writeImage(ctx, ref, img, o); err != nil { return err } default: @@ -770,29 +777,17 @@ func scopesForUploadingImage(repo name.Repository, layers []v1.Layer) []string { // WriteIndex will attempt to push all of the referenced manifests before // attempting to push the ImageIndex, to retain referential integrity. func WriteIndex(ref name.Reference, ii v1.ImageIndex, options ...Option) (rerr error) { - o, err := makeOptions(ref.Context(), options...) + o, err := makeOptions(options...) if err != nil { return err } - scopes := []string{ref.Scope(transport.PushScope)} - tr, err := transport.NewWithContext(o.context, ref.Context().Registry, o.auth, o.transport, scopes) + w, err := makeWriter(o.context, ref.Context(), nil, o) if err != nil { return err } - w := writer{ - repo: ref.Context(), - client: &http.Client{Transport: tr}, - backoff: o.retryBackoff, - predicate: o.retryPredicate, - } - - if o.updates != nil { - w.progress = &progress{updates: o.updates} - w.progress.lastUpdate = &v1.Update{} - - defer close(o.updates) - defer func() { w.progress.err(rerr) }() + if w.progress != nil { + defer func() { w.progress.Close(rerr) }() w.progress.lastUpdate.Total, err = countIndex(ii, o.allowNondistributableArtifacts) if err != nil { @@ -800,7 +795,7 @@ func WriteIndex(ref name.Reference, ii v1.ImageIndex, options ...Option) (rerr e } } - return w.writeIndex(o.context, ref, ii, options...) + return w.writeIndex(o.context, ref, ii, o) } // countImage counts the total size of all layers + config blob + manifest for @@ -913,28 +908,17 @@ func countIndex(idx v1.ImageIndex, allowNondistributableArtifacts bool) (int64, // WriteLayer uploads the provided Layer to the specified repo. func WriteLayer(repo name.Repository, layer v1.Layer, options ...Option) (rerr error) { - o, err := makeOptions(repo, options...) + o, err := makeOptions(options...) if err != nil { return err } - scopes := scopesForUploadingImage(repo, []v1.Layer{layer}) - tr, err := transport.NewWithContext(o.context, repo.Registry, o.auth, o.transport, scopes) + w, err := makeWriter(o.context, repo, []v1.Layer{layer}, o) if err != nil { return err } - w := writer{ - repo: repo, - client: &http.Client{Transport: tr}, - backoff: o.retryBackoff, - predicate: o.retryPredicate, - } - if o.updates != nil { - w.progress = &progress{updates: o.updates} - w.progress.lastUpdate = &v1.Update{} - - defer close(o.updates) - defer func() { w.progress.err(rerr) }() + if w.progress != nil { + defer func() { w.progress.Close(rerr) }() // TODO: support streaming layers which update the total count as they write. if _, ok := layer.(*stream.Layer); ok { @@ -976,28 +960,15 @@ func Tag(tag name.Tag, t Taggable, options ...Option) error { // should ensure that all blobs or manifests that are referenced by t exist // in the target registry. func Put(ref name.Reference, t Taggable, options ...Option) error { - o, err := makeOptions(ref.Context(), options...) + repo := ref.Context() + o, err := makeOptions(options...) if err != nil { return err } - scopes := []string{ref.Scope(transport.PushScope)} - - // TODO: This *always* does a token exchange. For some registries, - // that's pretty slow. Some ideas; - // * Tag could take a list of tags. - // * Allow callers to pass in a transport.Transport, typecheck - // it to allow them to reuse the transport across multiple calls. - // * WithTag option to do multiple manifest PUTs in commitManifest. - tr, err := transport.NewWithContext(o.context, ref.Context().Registry, o.auth, o.transport, scopes) + w, err := makeWriter(o.context, repo, nil, o) if err != nil { return err } - w := writer{ - repo: ref.Context(), - client: &http.Client{Transport: tr}, - backoff: o.retryBackoff, - predicate: o.retryPredicate, - } return w.commitManifest(o.context, t, ref) }