From 5faaf0c9cbcd8fe736b02c334818e4268845b5ec Mon Sep 17 00:00:00 2001 From: Jon Johnson Date: Mon, 24 Apr 2023 10:55:41 -0700 Subject: [PATCH 1/5] code organization: split fetcher from descriptor --- pkg/v1/remote/descriptor.go | 366 ------------------------------------ pkg/v1/remote/fetcher.go | 312 ++++++++++++++++++++++++++++++ pkg/v1/remote/referrers.go | 85 +++++++++ 3 files changed, 397 insertions(+), 366 deletions(-) create mode 100644 pkg/v1/remote/fetcher.go diff --git a/pkg/v1/remote/descriptor.go b/pkg/v1/remote/descriptor.go index 59da03a75..ed6b8e0e1 100644 --- a/pkg/v1/remote/descriptor.go +++ b/pkg/v1/remote/descriptor.go @@ -15,25 +15,12 @@ package remote import ( - "bytes" - "context" - "errors" "fmt" - "io" - "net/http" - "net/url" - "strings" - "github.com/google/go-containerregistry/internal/redact" - "github.com/google/go-containerregistry/internal/verify" - "github.com/google/go-containerregistry/pkg/authn" "github.com/google/go-containerregistry/pkg/logs" "github.com/google/go-containerregistry/pkg/name" v1 "github.com/google/go-containerregistry/pkg/v1" - "github.com/google/go-containerregistry/pkg/v1/empty" - "github.com/google/go-containerregistry/pkg/v1/mutate" "github.com/google/go-containerregistry/pkg/v1/partial" - "github.com/google/go-containerregistry/pkg/v1/remote/transport" "github.com/google/go-containerregistry/pkg/v1/types" ) @@ -125,20 +112,6 @@ func get(ref name.Reference, acceptable []types.MediaType, options ...Option) (* return f.get(o.context, ref, acceptable) } -func (f *fetcher) get(ctx context.Context, ref name.Reference, acceptable []types.MediaType) (*Descriptor, error) { - b, desc, err := f.fetchManifest(ctx, ref, acceptable) - if err != nil { - return nil, err - } - return &Descriptor{ - fetcher: *f, - ref: ref, - Manifest: b, - Descriptor: *desc, - platform: f.platform, - }, nil -} - // Image converts the Descriptor into a v1.Image. // // If the fetched artifact is already an image, it will just return it. @@ -235,342 +208,3 @@ func (d *Descriptor) remoteIndex() *remoteIndex { descriptor: &d.Descriptor, } } - -type resource interface { - Scheme() string - RegistryStr() string - Scope(string) string - - authn.Resource -} - -// fetcher implements methods for reading from a registry. -type fetcher struct { - target resource - client *http.Client - context context.Context - platform v1.Platform - pageSize int -} - -func makeFetcher(ctx context.Context, target resource, o *options) (*fetcher, error) { - auth := o.auth - if o.keychain != nil { - kauth, err := o.keychain.Resolve(target) - if err != nil { - return nil, err - } - auth = kauth - } - - reg, ok := target.(name.Registry) - if !ok { - repo, ok := target.(name.Repository) - if !ok { - return nil, fmt.Errorf("unexpected resource: %T", target) - } - reg = repo.Registry - } - - tr, err := transport.NewWithContext(ctx, reg, auth, o.transport, []string{target.Scope(transport.PullScope)}) - if err != nil { - return nil, err - } - return &fetcher{ - target: target, - client: &http.Client{Transport: tr}, - context: ctx, - platform: o.platform, - pageSize: o.pageSize, - }, nil -} - -// url returns a url.Url for the specified path in the context of this remote image reference. -func (f *fetcher) url(resource, identifier string) url.URL { - u := url.URL{ - Scheme: f.target.Scheme(), - Host: f.target.RegistryStr(), - // Default path if this is not a repository. - Path: "/v2/_catalog", - } - if repo, ok := f.target.(name.Repository); ok { - u.Path = fmt.Sprintf("/v2/%s/%s/%s", repo.RepositoryStr(), resource, identifier) - } - return u -} - -// https://github.com/opencontainers/distribution-spec/blob/main/spec.md#referrers-tag-schema -func fallbackTag(d name.Digest) name.Tag { - return d.Context().Tag(strings.Replace(d.DigestStr(), ":", "-", 1)) -} - -func (f *fetcher) fetchReferrers(ctx context.Context, filter map[string]string, d name.Digest) (v1.ImageIndex, error) { - // Check the Referrers API endpoint first. - u := f.url("referrers", d.DigestStr()) - req, err := http.NewRequestWithContext(ctx, http.MethodGet, u.String(), nil) - if err != nil { - return nil, err - } - req.Header.Set("Accept", string(types.OCIImageIndex)) - - resp, err := f.client.Do(req) - if err != nil { - return nil, err - } - defer resp.Body.Close() - - if err := transport.CheckError(resp, http.StatusOK, http.StatusNotFound, http.StatusBadRequest); err != nil { - return nil, err - } - - var b []byte - if resp.StatusCode == http.StatusOK { - b, err = io.ReadAll(resp.Body) - if err != nil { - return nil, err - } - } else { - // The registry doesn't support the Referrers API endpoint, so we'll use the fallback tag scheme. - b, _, err = f.fetchManifest(ctx, fallbackTag(d), []types.MediaType{types.OCIImageIndex}) - var terr *transport.Error - if errors.As(err, &terr) && terr.StatusCode == http.StatusNotFound { - // Not found just means there are no attachments yet. Start with an empty manifest. - return empty.Index, nil - } else if err != nil { - return nil, err - } - } - - h, sz, err := v1.SHA256(bytes.NewReader(b)) - if err != nil { - return nil, err - } - idx := &remoteIndex{ - fetcher: *f, - manifest: b, - mediaType: types.OCIImageIndex, - descriptor: &v1.Descriptor{ - Digest: h, - MediaType: types.OCIImageIndex, - Size: sz, - }, - } - return filterReferrersResponse(filter, idx), nil -} - -func (f *fetcher) fetchManifest(ctx context.Context, ref name.Reference, acceptable []types.MediaType) ([]byte, *v1.Descriptor, error) { - u := f.url("manifests", ref.Identifier()) - req, err := http.NewRequest(http.MethodGet, u.String(), nil) - if err != nil { - return nil, nil, err - } - accept := []string{} - for _, mt := range acceptable { - accept = append(accept, string(mt)) - } - req.Header.Set("Accept", strings.Join(accept, ",")) - - resp, err := f.client.Do(req.WithContext(ctx)) - if err != nil { - return nil, nil, err - } - defer resp.Body.Close() - - if err := transport.CheckError(resp, http.StatusOK); err != nil { - return nil, nil, err - } - - manifest, err := io.ReadAll(resp.Body) - if err != nil { - return nil, nil, err - } - - digest, size, err := v1.SHA256(bytes.NewReader(manifest)) - if err != nil { - return nil, nil, err - } - - mediaType := types.MediaType(resp.Header.Get("Content-Type")) - contentDigest, err := v1.NewHash(resp.Header.Get("Docker-Content-Digest")) - if err == nil && mediaType == types.DockerManifestSchema1Signed { - // If we can parse the digest from the header, and it's a signed schema 1 - // manifest, let's use that for the digest to appease older registries. - digest = contentDigest - } - - // Validate the digest matches what we asked for, if pulling by digest. - if dgst, ok := ref.(name.Digest); ok { - if digest.String() != dgst.DigestStr() { - return nil, nil, fmt.Errorf("manifest digest: %q does not match requested digest: %q for %q", digest, dgst.DigestStr(), ref) - } - } - - var artifactType string - mf, _ := v1.ParseManifest(bytes.NewReader(manifest)) - // Failing to parse as a manifest should just be ignored. - // The manifest might not be valid, and that's okay. - if mf != nil && !mf.Config.MediaType.IsConfig() { - artifactType = string(mf.Config.MediaType) - } - - // Do nothing for tags; I give up. - // - // We'd like to validate that the "Docker-Content-Digest" header matches what is returned by the registry, - // but so many registries implement this incorrectly that it's not worth checking. - // - // For reference: - // https://github.com/GoogleContainerTools/kaniko/issues/298 - - // Return all this info since we have to calculate it anyway. - desc := v1.Descriptor{ - Digest: digest, - Size: size, - MediaType: mediaType, - ArtifactType: artifactType, - } - - return manifest, &desc, nil -} - -func (f *fetcher) headManifest(ctx context.Context, ref name.Reference, acceptable []types.MediaType) (*v1.Descriptor, error) { - u := f.url("manifests", ref.Identifier()) - req, err := http.NewRequest(http.MethodHead, u.String(), nil) - if err != nil { - return nil, err - } - accept := []string{} - for _, mt := range acceptable { - accept = append(accept, string(mt)) - } - req.Header.Set("Accept", strings.Join(accept, ",")) - - resp, err := f.client.Do(req.WithContext(ctx)) - if err != nil { - return nil, err - } - defer resp.Body.Close() - - if err := transport.CheckError(resp, http.StatusOK); err != nil { - return nil, err - } - - mth := resp.Header.Get("Content-Type") - if mth == "" { - return nil, fmt.Errorf("HEAD %s: response did not include Content-Type header", u.String()) - } - mediaType := types.MediaType(mth) - - size := resp.ContentLength - if size == -1 { - return nil, fmt.Errorf("GET %s: response did not include Content-Length header", u.String()) - } - - dh := resp.Header.Get("Docker-Content-Digest") - if dh == "" { - return nil, fmt.Errorf("HEAD %s: response did not include Docker-Content-Digest header", u.String()) - } - digest, err := v1.NewHash(dh) - if err != nil { - return nil, err - } - - // Validate the digest matches what we asked for, if pulling by digest. - if dgst, ok := ref.(name.Digest); ok { - if digest.String() != dgst.DigestStr() { - return nil, fmt.Errorf("manifest digest: %q does not match requested digest: %q for %q", digest, dgst.DigestStr(), ref) - } - } - - // Return all this info since we have to calculate it anyway. - return &v1.Descriptor{ - Digest: digest, - Size: size, - MediaType: mediaType, - }, nil -} - -func (f *fetcher) fetchBlob(ctx context.Context, size int64, h v1.Hash) (io.ReadCloser, error) { - u := f.url("blobs", h.String()) - req, err := http.NewRequest(http.MethodGet, u.String(), nil) - if err != nil { - return nil, err - } - - resp, err := f.client.Do(req.WithContext(ctx)) - if err != nil { - return nil, redact.Error(err) - } - - if err := transport.CheckError(resp, http.StatusOK); err != nil { - resp.Body.Close() - return nil, err - } - - // Do whatever we can. - // If we have an expected size and Content-Length doesn't match, return an error. - // If we don't have an expected size and we do have a Content-Length, use Content-Length. - if hsize := resp.ContentLength; hsize != -1 { - if size == verify.SizeUnknown { - size = hsize - } else if hsize != size { - return nil, fmt.Errorf("GET %s: Content-Length header %d does not match expected size %d", u.String(), hsize, size) - } - } - - return verify.ReadCloser(resp.Body, size, h) -} - -func (f *fetcher) headBlob(h v1.Hash) (*http.Response, error) { - u := f.url("blobs", h.String()) - req, err := http.NewRequest(http.MethodHead, u.String(), nil) - if err != nil { - return nil, err - } - - resp, err := f.client.Do(req.WithContext(f.context)) - if err != nil { - return nil, redact.Error(err) - } - - if err := transport.CheckError(resp, http.StatusOK); err != nil { - resp.Body.Close() - return nil, err - } - - return resp, nil -} - -func (f *fetcher) blobExists(h v1.Hash) (bool, error) { - u := f.url("blobs", h.String()) - req, err := http.NewRequest(http.MethodHead, u.String(), nil) - if err != nil { - return false, err - } - - resp, err := f.client.Do(req.WithContext(f.context)) - if err != nil { - return false, redact.Error(err) - } - defer resp.Body.Close() - - if err := transport.CheckError(resp, http.StatusOK, http.StatusNotFound); err != nil { - return false, err - } - - return resp.StatusCode == http.StatusOK, nil -} - -// If filter applied, filter out by artifactType. -// See https://github.com/opencontainers/distribution-spec/blob/main/spec.md#listing-referrers -func filterReferrersResponse(filter map[string]string, in v1.ImageIndex) v1.ImageIndex { - if filter == nil { - return in - } - v, ok := filter["artifactType"] - if !ok { - return in - } - return mutate.RemoveManifests(in, func(desc v1.Descriptor) bool { - return desc.ArtifactType != v - }) -} diff --git a/pkg/v1/remote/fetcher.go b/pkg/v1/remote/fetcher.go new file mode 100644 index 000000000..7c0baf8c2 --- /dev/null +++ b/pkg/v1/remote/fetcher.go @@ -0,0 +1,312 @@ +// Copyright 2023 Google LLC All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package remote + +import ( + "bytes" + "context" + "fmt" + "io" + "net/http" + "net/url" + "strings" + + "github.com/google/go-containerregistry/internal/redact" + "github.com/google/go-containerregistry/internal/verify" + "github.com/google/go-containerregistry/pkg/authn" + "github.com/google/go-containerregistry/pkg/name" + v1 "github.com/google/go-containerregistry/pkg/v1" + "github.com/google/go-containerregistry/pkg/v1/remote/transport" + "github.com/google/go-containerregistry/pkg/v1/types" +) + +// fetcher implements methods for reading from a registry. +type fetcher struct { + target resource + client *http.Client + context context.Context + platform v1.Platform + pageSize int +} + +func makeFetcher(ctx context.Context, target resource, o *options) (*fetcher, error) { + auth := o.auth + if o.keychain != nil { + kauth, err := o.keychain.Resolve(target) + if err != nil { + return nil, err + } + auth = kauth + } + + reg, ok := target.(name.Registry) + if !ok { + repo, ok := target.(name.Repository) + if !ok { + return nil, fmt.Errorf("unexpected resource: %T", target) + } + reg = repo.Registry + } + + tr, err := transport.NewWithContext(ctx, reg, auth, o.transport, []string{target.Scope(transport.PullScope)}) + if err != nil { + return nil, err + } + return &fetcher{ + target: target, + client: &http.Client{Transport: tr}, + context: ctx, + platform: o.platform, + pageSize: o.pageSize, + }, nil +} + +type resource interface { + Scheme() string + RegistryStr() string + Scope(string) string + + authn.Resource +} + +// url returns a url.Url for the specified path in the context of this remote image reference. +func (f *fetcher) url(resource, identifier string) url.URL { + u := url.URL{ + Scheme: f.target.Scheme(), + Host: f.target.RegistryStr(), + // Default path if this is not a repository. + Path: "/v2/_catalog", + } + if repo, ok := f.target.(name.Repository); ok { + u.Path = fmt.Sprintf("/v2/%s/%s/%s", repo.RepositoryStr(), resource, identifier) + } + return u +} + +func (f *fetcher) get(ctx context.Context, ref name.Reference, acceptable []types.MediaType) (*Descriptor, error) { + b, desc, err := f.fetchManifest(ctx, ref, acceptable) + if err != nil { + return nil, err + } + return &Descriptor{ + fetcher: *f, + ref: ref, + Manifest: b, + Descriptor: *desc, + platform: f.platform, + }, nil +} + +func (f *fetcher) fetchManifest(ctx context.Context, ref name.Reference, acceptable []types.MediaType) ([]byte, *v1.Descriptor, error) { + u := f.url("manifests", ref.Identifier()) + req, err := http.NewRequest(http.MethodGet, u.String(), nil) + if err != nil { + return nil, nil, err + } + accept := []string{} + for _, mt := range acceptable { + accept = append(accept, string(mt)) + } + req.Header.Set("Accept", strings.Join(accept, ",")) + + resp, err := f.client.Do(req.WithContext(ctx)) + if err != nil { + return nil, nil, err + } + defer resp.Body.Close() + + if err := transport.CheckError(resp, http.StatusOK); err != nil { + return nil, nil, err + } + + manifest, err := io.ReadAll(resp.Body) + if err != nil { + return nil, nil, err + } + + digest, size, err := v1.SHA256(bytes.NewReader(manifest)) + if err != nil { + return nil, nil, err + } + + mediaType := types.MediaType(resp.Header.Get("Content-Type")) + contentDigest, err := v1.NewHash(resp.Header.Get("Docker-Content-Digest")) + if err == nil && mediaType == types.DockerManifestSchema1Signed { + // If we can parse the digest from the header, and it's a signed schema 1 + // manifest, let's use that for the digest to appease older registries. + digest = contentDigest + } + + // Validate the digest matches what we asked for, if pulling by digest. + if dgst, ok := ref.(name.Digest); ok { + if digest.String() != dgst.DigestStr() { + return nil, nil, fmt.Errorf("manifest digest: %q does not match requested digest: %q for %q", digest, dgst.DigestStr(), ref) + } + } + + var artifactType string + mf, _ := v1.ParseManifest(bytes.NewReader(manifest)) + // Failing to parse as a manifest should just be ignored. + // The manifest might not be valid, and that's okay. + if mf != nil && !mf.Config.MediaType.IsConfig() { + artifactType = string(mf.Config.MediaType) + } + + // Do nothing for tags; I give up. + // + // We'd like to validate that the "Docker-Content-Digest" header matches what is returned by the registry, + // but so many registries implement this incorrectly that it's not worth checking. + // + // For reference: + // https://github.com/GoogleContainerTools/kaniko/issues/298 + + // Return all this info since we have to calculate it anyway. + desc := v1.Descriptor{ + Digest: digest, + Size: size, + MediaType: mediaType, + ArtifactType: artifactType, + } + + return manifest, &desc, nil +} + +func (f *fetcher) headManifest(ctx context.Context, ref name.Reference, acceptable []types.MediaType) (*v1.Descriptor, error) { + u := f.url("manifests", ref.Identifier()) + req, err := http.NewRequest(http.MethodHead, u.String(), nil) + if err != nil { + return nil, err + } + accept := []string{} + for _, mt := range acceptable { + accept = append(accept, string(mt)) + } + req.Header.Set("Accept", strings.Join(accept, ",")) + + resp, err := f.client.Do(req.WithContext(ctx)) + if err != nil { + return nil, err + } + defer resp.Body.Close() + + if err := transport.CheckError(resp, http.StatusOK); err != nil { + return nil, err + } + + mth := resp.Header.Get("Content-Type") + if mth == "" { + return nil, fmt.Errorf("HEAD %s: response did not include Content-Type header", u.String()) + } + mediaType := types.MediaType(mth) + + size := resp.ContentLength + if size == -1 { + return nil, fmt.Errorf("GET %s: response did not include Content-Length header", u.String()) + } + + dh := resp.Header.Get("Docker-Content-Digest") + if dh == "" { + return nil, fmt.Errorf("HEAD %s: response did not include Docker-Content-Digest header", u.String()) + } + digest, err := v1.NewHash(dh) + if err != nil { + return nil, err + } + + // Validate the digest matches what we asked for, if pulling by digest. + if dgst, ok := ref.(name.Digest); ok { + if digest.String() != dgst.DigestStr() { + return nil, fmt.Errorf("manifest digest: %q does not match requested digest: %q for %q", digest, dgst.DigestStr(), ref) + } + } + + // Return all this info since we have to calculate it anyway. + return &v1.Descriptor{ + Digest: digest, + Size: size, + MediaType: mediaType, + }, nil +} + +func (f *fetcher) fetchBlob(ctx context.Context, size int64, h v1.Hash) (io.ReadCloser, error) { + u := f.url("blobs", h.String()) + req, err := http.NewRequest(http.MethodGet, u.String(), nil) + if err != nil { + return nil, err + } + + resp, err := f.client.Do(req.WithContext(ctx)) + if err != nil { + return nil, redact.Error(err) + } + + if err := transport.CheckError(resp, http.StatusOK); err != nil { + resp.Body.Close() + return nil, err + } + + // Do whatever we can. + // If we have an expected size and Content-Length doesn't match, return an error. + // If we don't have an expected size and we do have a Content-Length, use Content-Length. + if hsize := resp.ContentLength; hsize != -1 { + if size == verify.SizeUnknown { + size = hsize + } else if hsize != size { + return nil, fmt.Errorf("GET %s: Content-Length header %d does not match expected size %d", u.String(), hsize, size) + } + } + + return verify.ReadCloser(resp.Body, size, h) +} + +func (f *fetcher) headBlob(h v1.Hash) (*http.Response, error) { + u := f.url("blobs", h.String()) + req, err := http.NewRequest(http.MethodHead, u.String(), nil) + if err != nil { + return nil, err + } + + resp, err := f.client.Do(req.WithContext(f.context)) + if err != nil { + return nil, redact.Error(err) + } + + if err := transport.CheckError(resp, http.StatusOK); err != nil { + resp.Body.Close() + return nil, err + } + + return resp, nil +} + +func (f *fetcher) blobExists(h v1.Hash) (bool, error) { + u := f.url("blobs", h.String()) + req, err := http.NewRequest(http.MethodHead, u.String(), nil) + if err != nil { + return false, err + } + + resp, err := f.client.Do(req.WithContext(f.context)) + if err != nil { + return false, redact.Error(err) + } + defer resp.Body.Close() + + if err := transport.CheckError(resp, http.StatusOK, http.StatusNotFound); err != nil { + return false, err + } + + return resp.StatusCode == http.StatusOK, nil +} diff --git a/pkg/v1/remote/referrers.go b/pkg/v1/remote/referrers.go index 546940e8a..58a4aa27b 100644 --- a/pkg/v1/remote/referrers.go +++ b/pkg/v1/remote/referrers.go @@ -15,8 +15,19 @@ package remote import ( + "bytes" + "context" + "errors" + "io" + "net/http" + "strings" + "github.com/google/go-containerregistry/pkg/name" v1 "github.com/google/go-containerregistry/pkg/v1" + "github.com/google/go-containerregistry/pkg/v1/empty" + "github.com/google/go-containerregistry/pkg/v1/mutate" + "github.com/google/go-containerregistry/pkg/v1/remote/transport" + "github.com/google/go-containerregistry/pkg/v1/types" ) // Referrers returns a list of descriptors that refer to the given manifest digest. @@ -33,3 +44,77 @@ func Referrers(d name.Digest, options ...Option) (v1.ImageIndex, error) { } return f.fetchReferrers(o.context, o.filter, d) } + +// https://github.com/opencontainers/distribution-spec/blob/main/spec.md#referrers-tag-schema +func fallbackTag(d name.Digest) name.Tag { + return d.Context().Tag(strings.Replace(d.DigestStr(), ":", "-", 1)) +} + +func (f *fetcher) fetchReferrers(ctx context.Context, filter map[string]string, d name.Digest) (v1.ImageIndex, error) { + // Check the Referrers API endpoint first. + u := f.url("referrers", d.DigestStr()) + req, err := http.NewRequestWithContext(ctx, http.MethodGet, u.String(), nil) + if err != nil { + return nil, err + } + req.Header.Set("Accept", string(types.OCIImageIndex)) + + resp, err := f.client.Do(req) + if err != nil { + return nil, err + } + defer resp.Body.Close() + + if err := transport.CheckError(resp, http.StatusOK, http.StatusNotFound, http.StatusBadRequest); err != nil { + return nil, err + } + + var b []byte + if resp.StatusCode == http.StatusOK { + b, err = io.ReadAll(resp.Body) + if err != nil { + return nil, err + } + } else { + // The registry doesn't support the Referrers API endpoint, so we'll use the fallback tag scheme. + b, _, err = f.fetchManifest(ctx, fallbackTag(d), []types.MediaType{types.OCIImageIndex}) + var terr *transport.Error + if errors.As(err, &terr) && terr.StatusCode == http.StatusNotFound { + // Not found just means there are no attachments yet. Start with an empty manifest. + return empty.Index, nil + } else if err != nil { + return nil, err + } + } + + h, sz, err := v1.SHA256(bytes.NewReader(b)) + if err != nil { + return nil, err + } + idx := &remoteIndex{ + fetcher: *f, + manifest: b, + mediaType: types.OCIImageIndex, + descriptor: &v1.Descriptor{ + Digest: h, + MediaType: types.OCIImageIndex, + Size: sz, + }, + } + return filterReferrersResponse(filter, idx), nil +} + +// If filter applied, filter out by artifactType. +// See https://github.com/opencontainers/distribution-spec/blob/main/spec.md#listing-referrers +func filterReferrersResponse(filter map[string]string, in v1.ImageIndex) v1.ImageIndex { + if filter == nil { + return in + } + v, ok := filter["artifactType"] + if !ok { + return in + } + return mutate.RemoveManifests(in, func(desc v1.Descriptor) bool { + return desc.ArtifactType != v + }) +} From 85f203ec0ee821570ff14e0922dbda3cd9921bde Mon Sep 17 00:00:00 2001 From: Jon Johnson Date: Mon, 24 Apr 2023 11:12:06 -0700 Subject: [PATCH 2/5] Refactor everything to go through Puller --- pkg/v1/remote/catalog.go | 5 ++-- pkg/v1/remote/descriptor.go | 13 ++------- pkg/v1/remote/layer.go | 21 +-------------- pkg/v1/remote/puller.go | 54 ++++++++++++++++++++++++++++++++----- pkg/v1/remote/referrers.go | 6 +---- 5 files changed, 55 insertions(+), 44 deletions(-) diff --git a/pkg/v1/remote/catalog.go b/pkg/v1/remote/catalog.go index 79d237dfa..2e8c1a071 100644 --- a/pkg/v1/remote/catalog.go +++ b/pkg/v1/remote/catalog.go @@ -36,7 +36,8 @@ func CatalogPage(target name.Registry, last string, n int, options ...Option) ([ if err != nil { return nil, err } - f, err := makeFetcher(o.context, target, o) + + r, err := newPuller(o).reader(o.context, target) if err != nil { return nil, err } @@ -52,7 +53,7 @@ func CatalogPage(target name.Registry, last string, n int, options ...Option) ([ if err != nil { return nil, err } - resp, err := f.client.Do(req.WithContext(o.context)) + resp, err := r.f.client.Do(req.WithContext(o.context)) if err != nil { return nil, err } diff --git a/pkg/v1/remote/descriptor.go b/pkg/v1/remote/descriptor.go index ed6b8e0e1..90ae15cba 100644 --- a/pkg/v1/remote/descriptor.go +++ b/pkg/v1/remote/descriptor.go @@ -90,12 +90,7 @@ func Head(ref name.Reference, options ...Option) (*v1.Descriptor, error) { return nil, err } - f, err := makeFetcher(o.context, ref.Context(), o) - if err != nil { - return nil, err - } - - return f.headManifest(o.context, ref, allManifestMediaTypes) + return newPuller(o).Head(o.context, ref) } // Handle options and fetch the manifest with the acceptable MediaTypes in the @@ -105,11 +100,7 @@ func get(ref name.Reference, acceptable []types.MediaType, options ...Option) (* if err != nil { return nil, err } - f, err := makeFetcher(o.context, ref.Context(), o) - if err != nil { - return nil, err - } - return f.get(o.context, ref, acceptable) + return newPuller(o).get(o.context, ref, acceptable) } // Image converts the Descriptor into a v1.Image. diff --git a/pkg/v1/remote/layer.go b/pkg/v1/remote/layer.go index 8bf32c297..a03ae7ae9 100644 --- a/pkg/v1/remote/layer.go +++ b/pkg/v1/remote/layer.go @@ -21,7 +21,6 @@ import ( "github.com/google/go-containerregistry/internal/verify" "github.com/google/go-containerregistry/pkg/name" v1 "github.com/google/go-containerregistry/pkg/v1" - "github.com/google/go-containerregistry/pkg/v1/partial" "github.com/google/go-containerregistry/pkg/v1/types" ) @@ -72,23 +71,5 @@ func Layer(ref name.Digest, options ...Option) (v1.Layer, error) { if err != nil { return nil, err } - f, err := makeFetcher(o.context, ref.Context(), o) - if err != nil { - return nil, err - } - h, err := v1.NewHash(ref.Identifier()) - if err != nil { - return nil, err - } - l, err := partial.CompressedToLayer(&remoteLayer{ - fetcher: *f, - digest: h, - }) - if err != nil { - return nil, err - } - return &MountableLayer{ - Layer: l, - Reference: ref, - }, nil + return newPuller(o).Layer(o.context, ref) } diff --git a/pkg/v1/remote/puller.go b/pkg/v1/remote/puller.go index a8b74881c..d49ac4896 100644 --- a/pkg/v1/remote/puller.go +++ b/pkg/v1/remote/puller.go @@ -20,6 +20,8 @@ import ( "github.com/google/go-containerregistry/pkg/name" v1 "github.com/google/go-containerregistry/pkg/v1" + "github.com/google/go-containerregistry/pkg/v1/partial" + "github.com/google/go-containerregistry/pkg/v1/types" ) type Puller struct { @@ -61,10 +63,10 @@ func (r *reader) init(ctx context.Context) error { return r.err } -func (p *Puller) reader(ctx context.Context, target resource, o *options) (*reader, error) { +func (p *Puller) reader(ctx context.Context, target resource) (*reader, error) { v, _ := p.readers.LoadOrStore(target, &reader{ target: target, - o: o, + o: p.o, }) rr := v.(*reader) return rr, rr.init(ctx) @@ -72,7 +74,7 @@ func (p *Puller) reader(ctx context.Context, target resource, o *options) (*read // Head is like remote.Head, but avoids re-authenticating when possible. func (p *Puller) Head(ctx context.Context, ref name.Reference) (*v1.Descriptor, error) { - r, err := p.reader(ctx, ref.Context(), p.o) + r, err := p.reader(ctx, ref.Context()) if err != nil { return nil, err } @@ -82,13 +84,37 @@ func (p *Puller) Head(ctx context.Context, ref name.Reference) (*v1.Descriptor, // Get is like remote.Get, but avoids re-authenticating when possible. func (p *Puller) Get(ctx context.Context, ref name.Reference) (*Descriptor, error) { - r, err := p.reader(ctx, ref.Context(), p.o) + r, err := p.reader(ctx, ref.Context()) if err != nil { return nil, err } return r.f.get(ctx, ref, allManifestMediaTypes) } +// Layer is like remote.Layer, but avoids re-authenticated when possible. +func (p *Puller) Layer(ctx context.Context, ref name.Digest) (v1.Layer, error) { + r, err := p.reader(ctx, ref.Context()) + if err != nil { + return nil, err + } + + h, err := v1.NewHash(ref.Identifier()) + if err != nil { + return nil, err + } + l, err := partial.CompressedToLayer(&remoteLayer{ + fetcher: *r.f, + digest: h, + }) + if err != nil { + return nil, err + } + return &MountableLayer{ + Layer: l, + Reference: ref, + }, nil +} + // List lists tags in a repo and handles pagination, returning the full list of tags. func (p *Puller) List(ctx context.Context, repo name.Repository) ([]string, error) { lister, err := p.Lister(ctx, repo) @@ -110,7 +136,7 @@ func (p *Puller) List(ctx context.Context, repo name.Repository) ([]string, erro // Lister lists tags in a repo and returns a Lister for paginating through the results. func (p *Puller) Lister(ctx context.Context, repo name.Repository) (*Lister, error) { - r, err := p.reader(ctx, repo, p.o) + r, err := p.reader(ctx, repo) if err != nil { return nil, err } @@ -145,7 +171,7 @@ func (p *Puller) Catalog(ctx context.Context, reg name.Registry) ([]string, erro // Catalogger lists repos in a registry and returns a Catalogger for paginating through the results. func (p *Puller) Catalogger(ctx context.Context, reg name.Registry) (*Catalogger, error) { - r, err := p.reader(ctx, reg, p.o) + r, err := p.reader(ctx, reg) if err != nil { return nil, err } @@ -160,3 +186,19 @@ func (p *Puller) Catalogger(ctx context.Context, reg name.Registry) (*Catalogger err: err, }, nil } + +func (p *Puller) get(ctx context.Context, ref name.Reference, acceptable []types.MediaType) (*Descriptor, error) { + r, err := p.reader(ctx, ref.Context()) + if err != nil { + return nil, err + } + return r.f.get(ctx, ref, acceptable) +} + +func (p *Puller) referrers(ctx context.Context, d name.Digest, filter map[string]string) (v1.ImageIndex, error) { + r, err := p.reader(ctx, d.Context()) + if err != nil { + return nil, err + } + return r.f.fetchReferrers(ctx, filter, d) +} diff --git a/pkg/v1/remote/referrers.go b/pkg/v1/remote/referrers.go index 58a4aa27b..271f7ed97 100644 --- a/pkg/v1/remote/referrers.go +++ b/pkg/v1/remote/referrers.go @@ -38,11 +38,7 @@ func Referrers(d name.Digest, options ...Option) (v1.ImageIndex, error) { if err != nil { return nil, err } - f, err := makeFetcher(o.context, d.Context(), o) - if err != nil { - return nil, err - } - return f.fetchReferrers(o.context, o.filter, d) + return newPuller(o).referrers(o.context, d, o.filter) } // https://github.com/opencontainers/distribution-spec/blob/main/spec.md#referrers-tag-schema From cf386dadc230fe25c13f91e1fbb696d376be2b5a Mon Sep 17 00:00:00 2001 From: Jon Johnson Date: Mon, 24 Apr 2023 16:02:10 -0700 Subject: [PATCH 3/5] crane: use puller/pusher in copy to drop legacy This package was only used to copy schema 1 images, which we don't need anymore. --- internal/legacy/copy.go | 57 --------------------- internal/legacy/copy_test.go | 97 ------------------------------------ pkg/crane/copy.go | 54 ++++++-------------- pkg/crane/options.go | 3 ++ 4 files changed, 18 insertions(+), 193 deletions(-) delete mode 100644 internal/legacy/copy.go delete mode 100644 internal/legacy/copy_test.go diff --git a/internal/legacy/copy.go b/internal/legacy/copy.go deleted file mode 100644 index 10467ba10..000000000 --- a/internal/legacy/copy.go +++ /dev/null @@ -1,57 +0,0 @@ -// Copyright 2019 Google LLC All Rights Reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -// Package legacy provides methods for interacting with legacy image formats. -package legacy - -import ( - "bytes" - "encoding/json" - - "github.com/google/go-containerregistry/pkg/name" - "github.com/google/go-containerregistry/pkg/v1/remote" -) - -// CopySchema1 allows `[g]crane cp` to work with old images without adding -// full support for schema 1 images to this package. -func CopySchema1(desc *remote.Descriptor, srcRef, dstRef name.Reference, opts ...remote.Option) error { - m := schema1{} - if err := json.NewDecoder(bytes.NewReader(desc.Manifest)).Decode(&m); err != nil { - return err - } - - for _, layer := range m.FSLayers { - src := srcRef.Context().Digest(layer.BlobSum) - dst := dstRef.Context().Digest(layer.BlobSum) - - blob, err := remote.Layer(src, opts...) - if err != nil { - return err - } - - if err := remote.WriteLayer(dst.Context(), blob, opts...); err != nil { - return err - } - } - - return remote.Put(dstRef, desc, opts...) -} - -type fslayer struct { - BlobSum string `json:"blobSum"` -} - -type schema1 struct { - FSLayers []fslayer `json:"fsLayers"` -} diff --git a/internal/legacy/copy_test.go b/internal/legacy/copy_test.go deleted file mode 100644 index b8ca799e8..000000000 --- a/internal/legacy/copy_test.go +++ /dev/null @@ -1,97 +0,0 @@ -// Copyright 2019 Google LLC All Rights Reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package legacy - -import ( - "encoding/json" - "fmt" - "net/http/httptest" - "net/url" - "strings" - "testing" - - "github.com/google/go-containerregistry/pkg/name" - "github.com/google/go-containerregistry/pkg/registry" - v1 "github.com/google/go-containerregistry/pkg/v1" - "github.com/google/go-containerregistry/pkg/v1/random" - "github.com/google/go-containerregistry/pkg/v1/remote" - "github.com/google/go-containerregistry/pkg/v1/types" -) - -func TestCopySchema1(t *testing.T) { - // Set up a fake registry. - s := httptest.NewServer(registry.New()) - defer s.Close() - u, err := url.Parse(s.URL) - if err != nil { - t.Fatal(err) - } - - // We'll copy from src to dst. - src := fmt.Sprintf("%s/schema1/src", u.Host) - srcRef, err := name.ParseReference(src) - if err != nil { - t.Fatal(err) - } - dst := fmt.Sprintf("%s/schema1/dst", u.Host) - dstRef, err := name.ParseReference(dst) - if err != nil { - t.Fatal(err) - } - - // Create a random layer. - layer, err := random.Layer(1024, types.DockerLayer) - if err != nil { - t.Fatal(err) - } - digest, err := layer.Digest() - if err != nil { - t.Fatal(err) - } - layerRef, err := name.NewDigest(fmt.Sprintf("%s@%s", src, digest)) - if err != nil { - t.Fatal(err) - } - - // Populate the registry with a layer and a schema 1 manifest referencing it. - if err := remote.WriteLayer(layerRef.Context(), layer); err != nil { - t.Fatal(err) - } - manifest := schema1{ - FSLayers: []fslayer{{ - BlobSum: digest.String(), - }}, - } - b, err := json.Marshal(manifest) - if err != nil { - t.Fatal(err) - } - desc := &remote.Descriptor{ - Manifest: b, - Descriptor: v1.Descriptor{ - MediaType: types.DockerManifestSchema1, - Digest: v1.Hash{Algorithm: "sha256", - Hex: strings.Repeat("a", 64), - }, - }, - } - if err := remote.Put(dstRef, desc); err != nil { - t.Fatal(err) - } - - if err := CopySchema1(desc, srcRef, dstRef); err != nil { - t.Errorf("failed to copy schema 1: %v", err) - } -} diff --git a/pkg/crane/copy.go b/pkg/crane/copy.go index a606f9654..ecebae946 100644 --- a/pkg/crane/copy.go +++ b/pkg/crane/copy.go @@ -17,11 +17,9 @@ package crane import ( "fmt" - "github.com/google/go-containerregistry/internal/legacy" "github.com/google/go-containerregistry/pkg/logs" "github.com/google/go-containerregistry/pkg/name" "github.com/google/go-containerregistry/pkg/v1/remote" - "github.com/google/go-containerregistry/pkg/v1/types" ) // Copy copies a remote image or index from src to dst. @@ -37,52 +35,30 @@ func Copy(src, dst string, opt ...Option) error { return fmt.Errorf("parsing reference for %q: %w", dst, err) } - logs.Progress.Printf("Copying from %v to %v", srcRef, dstRef) - desc, err := remote.Get(srcRef, o.Remote...) + pusher, err := remote.NewPusher(o.Remote...) if err != nil { - return fmt.Errorf("fetching %q: %w", src, err) + return err } - switch desc.MediaType { - case types.OCIImageIndex, types.DockerManifestList: - // Handle indexes separately. - if o.Platform != nil { - // If platform is explicitly set, don't copy the whole index, just the appropriate image. - if err := copyImage(desc, dstRef, o); err != nil { - return fmt.Errorf("failed to copy image: %w", err) - } - } else { - if err := copyIndex(desc, dstRef, o); err != nil { - return fmt.Errorf("failed to copy index: %w", err) - } - } - case types.DockerManifestSchema1, types.DockerManifestSchema1Signed: - // Handle schema 1 images separately. - if err := legacy.CopySchema1(desc, srcRef, dstRef, o.Remote...); err != nil { - return fmt.Errorf("failed to copy schema 1 image: %w", err) - } - default: - // Assume anything else is an image, since some registries don't set mediaTypes properly. - if err := copyImage(desc, dstRef, o); err != nil { - return fmt.Errorf("failed to copy image: %w", err) - } + puller, err := remote.NewPuller(o.Remote...) + if err != nil { + return err } - return nil -} - -func copyImage(desc *remote.Descriptor, dstRef name.Reference, o Options) error { - img, err := desc.Image() + logs.Progress.Printf("Copying from %v to %v", srcRef, dstRef) + desc, err := puller.Get(o.ctx, srcRef) if err != nil { - return err + return fmt.Errorf("fetching %q: %w", src, err) } - return remote.Write(dstRef, img, o.Remote...) -} -func copyIndex(desc *remote.Descriptor, dstRef name.Reference, o Options) error { - idx, err := desc.ImageIndex() + if o.Platform == nil { + return pusher.Push(o.ctx, dstRef, desc) + } + + // If platform is explicitly set, don't copy the whole index, just the appropriate image. + img, err := desc.Image() if err != nil { return err } - return remote.WriteIndex(dstRef, idx, o.Remote...) + return pusher.Push(o.ctx, dstRef, img) } diff --git a/pkg/crane/options.go b/pkg/crane/options.go index 5d2e0e4b2..be4d9bbb1 100644 --- a/pkg/crane/options.go +++ b/pkg/crane/options.go @@ -34,6 +34,7 @@ type Options struct { transport http.RoundTripper insecure bool + ctx context.Context } // GetOptions exposes the underlying []remote.Option, []name.Option, and @@ -50,6 +51,7 @@ func makeOptions(opts ...Option) Options { remote.WithAuthFromKeychain(authn.DefaultKeychain), }, Keychain: authn.DefaultKeychain, + ctx: context.Background(), } for _, o := range opts { @@ -144,6 +146,7 @@ func WithNondistributable() Option { // WithContext is a functional option for setting the context. func WithContext(ctx context.Context) Option { return func(o *Options) { + o.ctx = ctx o.Remote = append(o.Remote, remote.WithContext(ctx)) } } From 860cfab1efc3d067b81bce4cce628c1d45c9f261 Mon Sep 17 00:00:00 2001 From: Jon Johnson Date: Mon, 24 Apr 2023 16:03:41 -0700 Subject: [PATCH 4/5] Add remote.Reuse for Pusher/Puller This is a big change, but it helps callers out a lot. The Pusher/Puller interfaces allow us to deduplicate a bunch of work (largely, ping an auth), but they only work if you actually use them. It's a huge pain to migrate callers from remote.{Image,Index,...} interfaces to start using Pusher/Puller because the remote functions can be called from anywhere, which means plumbing pushers and pullers around the entire callgraph, which is super painful. Happily, though, most callers already plumb remote.Options around the callgraph so that they can set their options in one place and have them be consistent throughout their application. This change takes advantage of that fact by introducing remote.Reuse, which takes either a Puller or a Pusher, and calls equivalent methods on said pusher/puller whenever you pass remote.Reuse into a remote function. The end result is that we can get almost all of the performance wins of using Puller/Pusher directly with a 3 line change to most applications rather than refactoring everything. --- pkg/v1/remote/catalog.go | 19 +- pkg/v1/remote/delete.go | 34 +-- pkg/v1/remote/descriptor.go | 15 +- pkg/v1/remote/descriptor_test.go | 4 +- pkg/v1/remote/fetcher.go | 40 ++-- pkg/v1/remote/image.go | 19 +- pkg/v1/remote/image_test.go | 24 +-- pkg/v1/remote/index.go | 12 +- pkg/v1/remote/index_test.go | 6 +- pkg/v1/remote/layer.go | 14 +- pkg/v1/remote/list.go | 13 +- pkg/v1/remote/options.go | 29 ++- pkg/v1/remote/puller.go | 97 ++++++--- pkg/v1/remote/pusher.go | 39 +++- pkg/v1/remote/referrers.go | 1 + pkg/v1/remote/schema1.go | 5 +- pkg/v1/remote/write.go | 345 +------------------------------ pkg/v1/remote/write_test.go | 92 +++------ 18 files changed, 267 insertions(+), 541 deletions(-) diff --git a/pkg/v1/remote/catalog.go b/pkg/v1/remote/catalog.go index 2e8c1a071..a0281b9fd 100644 --- a/pkg/v1/remote/catalog.go +++ b/pkg/v1/remote/catalog.go @@ -37,7 +37,7 @@ func CatalogPage(target name.Registry, last string, n int, options ...Option) ([ return nil, err } - r, err := newPuller(o).reader(o.context, target) + f, err := newPuller(o).fetcher(o.context, target) if err != nil { return nil, err } @@ -53,7 +53,7 @@ func CatalogPage(target name.Registry, last string, n int, options ...Option) ([ if err != nil { return nil, err } - resp, err := r.f.client.Do(req.WithContext(o.context)) + resp, err := f.client.Do(req.WithContext(o.context)) if err != nil { return nil, err } @@ -83,18 +83,18 @@ func Catalog(ctx context.Context, target name.Registry, options ...Option) ([]st ctx = o.context } - return newPuller(o).Catalog(ctx, target) + return newPuller(o).catalog(ctx, target, o.pageSize) } -func (f *fetcher) catalogPage(ctx context.Context, reg name.Registry, next string) (*Catalogs, error) { +func (f *fetcher) catalogPage(ctx context.Context, reg name.Registry, next string, pageSize int) (*Catalogs, error) { if next == "" { uri := &url.URL{ Scheme: reg.Scheme(), Host: reg.RegistryStr(), Path: "/v2/_catalog", } - if f.pageSize > 0 { - uri.RawQuery = fmt.Sprintf("n=%d", f.pageSize) + if pageSize > 0 { + uri.RawQuery = fmt.Sprintf("n=%d", pageSize) } next = uri.String() } @@ -135,8 +135,9 @@ func (f *fetcher) catalogPage(ctx context.Context, reg name.Registry, next strin } type Catalogger struct { - f *fetcher - reg name.Registry + f *fetcher + reg name.Registry + pageSize int page *Catalogs err error @@ -146,7 +147,7 @@ type Catalogger struct { func (l *Catalogger) Next(ctx context.Context) (*Catalogs, error) { if l.needMore { - l.page, l.err = l.f.catalogPage(ctx, l.reg, l.page.Next) + l.page, l.err = l.f.catalogPage(ctx, l.reg, l.page.Next, l.pageSize) } else { l.needMore = true } diff --git a/pkg/v1/remote/delete.go b/pkg/v1/remote/delete.go index 78868c8ff..36e1d0816 100644 --- a/pkg/v1/remote/delete.go +++ b/pkg/v1/remote/delete.go @@ -15,12 +15,7 @@ package remote import ( - "fmt" - "net/http" - "net/url" - "github.com/google/go-containerregistry/pkg/name" - "github.com/google/go-containerregistry/pkg/v1/remote/transport" ) // Delete removes the specified image reference from the remote registry. @@ -29,32 +24,5 @@ func Delete(ref name.Reference, options ...Option) error { if err != nil { return err } - w, err := makeWriter(o.context, ref.Context(), nil, o) - if err != nil { - return err - } - c := w.client - - u := url.URL{ - Scheme: ref.Context().Registry.Scheme(), - Host: ref.Context().RegistryStr(), - Path: fmt.Sprintf("/v2/%s/manifests/%s", ref.Context().RepositoryStr(), ref.Identifier()), - } - - req, err := http.NewRequest(http.MethodDelete, u.String(), nil) - if err != nil { - return err - } - - resp, err := c.Do(req.WithContext(o.context)) - if err != nil { - return err - } - defer resp.Body.Close() - - return transport.CheckError(resp, http.StatusOK, http.StatusAccepted) - - // TODO(jason): If the manifest had a `subject`, and if the registry - // doesn't support Referrers, update the index pointed to by the - // subject's fallback tag to remove the descriptor for this manifest. + return newPusher(o).Delete(o.context, ref) } diff --git a/pkg/v1/remote/descriptor.go b/pkg/v1/remote/descriptor.go index 90ae15cba..61f28f4c0 100644 --- a/pkg/v1/remote/descriptor.go +++ b/pkg/v1/remote/descriptor.go @@ -15,6 +15,7 @@ package remote import ( + "context" "fmt" "github.com/google/go-containerregistry/pkg/logs" @@ -51,11 +52,12 @@ func (e *ErrSchema1) Error() string { // Descriptor provides access to metadata about remote artifact and accessors // for efficiently converting it into a v1.Image or v1.ImageIndex. type Descriptor struct { - fetcher + fetcher fetcher v1.Descriptor ref name.Reference Manifest []byte + ctx context.Context // So we can share this implementation with Image. platform v1.Platform @@ -100,7 +102,7 @@ func get(ref name.Reference, acceptable []types.MediaType, options ...Option) (* if err != nil { return nil, err } - return newPuller(o).get(o.context, ref, acceptable) + return newPuller(o).get(o.context, ref, acceptable, o.platform) } // Image converts the Descriptor into a v1.Image. @@ -147,8 +149,9 @@ func (d *Descriptor) Image() (v1.Image, error) { // This is separate from Image() to avoid a backward incompatible change for callers expecting ErrSchema1. func (d *Descriptor) Schema1() (v1.Image, error) { i := &schema1{ - fetcher: d.fetcher, ref: d.ref, + fetcher: d.fetcher, + ctx: d.ctx, manifest: d.Manifest, mediaType: d.MediaType, descriptor: &d.Descriptor, @@ -182,8 +185,9 @@ func (d *Descriptor) ImageIndex() (v1.ImageIndex, error) { func (d *Descriptor) remoteImage() *remoteImage { return &remoteImage{ - fetcher: d.fetcher, ref: d.ref, + ctx: d.ctx, + fetcher: d.fetcher, manifest: d.Manifest, mediaType: d.MediaType, descriptor: &d.Descriptor, @@ -192,8 +196,9 @@ func (d *Descriptor) remoteImage() *remoteImage { func (d *Descriptor) remoteIndex() *remoteIndex { return &remoteIndex{ - fetcher: d.fetcher, ref: d.ref, + ctx: d.ctx, + fetcher: d.fetcher, manifest: d.Manifest, mediaType: d.MediaType, descriptor: &d.Descriptor, diff --git a/pkg/v1/remote/descriptor_test.go b/pkg/v1/remote/descriptor_test.go index fe167f628..4f0b36b79 100644 --- a/pkg/v1/remote/descriptor_test.go +++ b/pkg/v1/remote/descriptor_test.go @@ -30,9 +30,10 @@ import ( "github.com/google/go-containerregistry/pkg/v1/types" ) +var fakeDigest = "sha256:0000000000000000000000000000000000000000000000000000000000000000" + func TestGetSchema1(t *testing.T) { expectedRepo := "foo/bar" - fakeDigest := "sha256:0000000000000000000000000000000000000000000000000000000000000000" manifestPath := fmt.Sprintf("/v2/%s/manifests/latest", expectedRepo) server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { @@ -229,7 +230,6 @@ func TestRedactFetchBlob(t *testing.T) { client: &http.Client{ Transport: errTransport{}, }, - context: ctx, } h, err := v1.NewHash("sha256:0000000000000000000000000000000000000000000000000000000000000000") if err != nil { diff --git a/pkg/v1/remote/fetcher.go b/pkg/v1/remote/fetcher.go index 7c0baf8c2..1b5851626 100644 --- a/pkg/v1/remote/fetcher.go +++ b/pkg/v1/remote/fetcher.go @@ -32,13 +32,17 @@ import ( "github.com/google/go-containerregistry/pkg/v1/types" ) +func fetcherFromWriter(w *writer) *fetcher { + return &fetcher{ + target: w.repo, + client: w.client, + } +} + // fetcher implements methods for reading from a registry. type fetcher struct { - target resource - client *http.Client - context context.Context - platform v1.Platform - pageSize int + target resource + client *http.Client } func makeFetcher(ctx context.Context, target resource, o *options) (*fetcher, error) { @@ -65,14 +69,15 @@ func makeFetcher(ctx context.Context, target resource, o *options) (*fetcher, er return nil, err } return &fetcher{ - target: target, - client: &http.Client{Transport: tr}, - context: ctx, - platform: o.platform, - pageSize: o.pageSize, + target: target, + client: &http.Client{Transport: tr}, }, nil } +func (f *fetcher) Do(req *http.Request) (*http.Response, error) { + return f.client.Do(req) +} + type resource interface { Scheme() string RegistryStr() string @@ -95,17 +100,18 @@ func (f *fetcher) url(resource, identifier string) url.URL { return u } -func (f *fetcher) get(ctx context.Context, ref name.Reference, acceptable []types.MediaType) (*Descriptor, error) { +func (f *fetcher) get(ctx context.Context, ref name.Reference, acceptable []types.MediaType, platform v1.Platform) (*Descriptor, error) { b, desc, err := f.fetchManifest(ctx, ref, acceptable) if err != nil { return nil, err } return &Descriptor{ - fetcher: *f, ref: ref, + ctx: ctx, + fetcher: *f, Manifest: b, Descriptor: *desc, - platform: f.platform, + platform: platform, }, nil } @@ -271,14 +277,14 @@ func (f *fetcher) fetchBlob(ctx context.Context, size int64, h v1.Hash) (io.Read return verify.ReadCloser(resp.Body, size, h) } -func (f *fetcher) headBlob(h v1.Hash) (*http.Response, error) { +func (f *fetcher) headBlob(ctx context.Context, h v1.Hash) (*http.Response, error) { u := f.url("blobs", h.String()) req, err := http.NewRequest(http.MethodHead, u.String(), nil) if err != nil { return nil, err } - resp, err := f.client.Do(req.WithContext(f.context)) + resp, err := f.client.Do(req.WithContext(ctx)) if err != nil { return nil, redact.Error(err) } @@ -291,14 +297,14 @@ func (f *fetcher) headBlob(h v1.Hash) (*http.Response, error) { return resp, nil } -func (f *fetcher) blobExists(h v1.Hash) (bool, error) { +func (f *fetcher) blobExists(ctx context.Context, h v1.Hash) (bool, error) { u := f.url("blobs", h.String()) req, err := http.NewRequest(http.MethodHead, u.String(), nil) if err != nil { return false, err } - resp, err := f.client.Do(req.WithContext(f.context)) + resp, err := f.client.Do(req.WithContext(ctx)) if err != nil { return false, redact.Error(err) } diff --git a/pkg/v1/remote/image.go b/pkg/v1/remote/image.go index 82d02e94f..f085967ed 100644 --- a/pkg/v1/remote/image.go +++ b/pkg/v1/remote/image.go @@ -16,6 +16,7 @@ package remote import ( "bytes" + "context" "io" "net/http" "net/url" @@ -37,8 +38,9 @@ var acceptableImageMediaTypes = []types.MediaType{ // remoteImage accesses an image from a remote registry type remoteImage struct { - fetcher + fetcher fetcher ref name.Reference + ctx context.Context manifestLock sync.Mutex // Protects manifest manifest []byte configLock sync.Mutex // Protects config @@ -85,7 +87,7 @@ func (r *remoteImage) RawManifest() ([]byte, error) { // NOTE(jonjohnsonjr): We should never get here because the public entrypoints // do type-checking via remote.Descriptor. I've left this here for tests that // directly instantiate a remoteImage. - manifest, desc, err := r.fetchManifest(r.context, r.ref, acceptableImageMediaTypes) + manifest, desc, err := r.fetcher.fetchManifest(r.ctx, r.ref, acceptableImageMediaTypes) if err != nil { return nil, err } @@ -118,7 +120,7 @@ func (r *remoteImage) RawConfigFile() ([]byte, error) { return r.config, nil } - body, err := r.fetchBlob(r.context, m.Config.Size, m.Config.Digest) + body, err := r.fetcher.fetchBlob(r.ctx, m.Config.Size, m.Config.Digest) if err != nil { return nil, err } @@ -151,6 +153,7 @@ func (r *remoteImage) ConfigLayer() (v1.Layer, error) { return partial.CompressedToLayer(&remoteImageLayer{ ri: r, + ctx: r.ctx, digest: m.Config.Digest, }) } @@ -158,6 +161,7 @@ func (r *remoteImage) ConfigLayer() (v1.Layer, error) { // remoteImageLayer implements partial.CompressedLayer type remoteImageLayer struct { ri *remoteImage + ctx context.Context digest v1.Hash } @@ -168,7 +172,7 @@ func (rl *remoteImageLayer) Digest() (v1.Hash, error) { // Compressed implements partial.CompressedLayer func (rl *remoteImageLayer) Compressed() (io.ReadCloser, error) { - urls := []url.URL{rl.ri.url("blobs", rl.digest.String())} + urls := []url.URL{rl.ri.fetcher.url("blobs", rl.digest.String())} // Add alternative layer sources from URLs (usually none). d, err := partial.BlobDescriptor(rl, rl.digest) @@ -181,7 +185,7 @@ func (rl *remoteImageLayer) Compressed() (io.ReadCloser, error) { } // We don't want to log binary layers -- this can break terminals. - ctx := redact.NewContext(rl.ri.context, "omitting binary blobs from logs") + ctx := redact.NewContext(rl.ctx, "omitting binary blobs from logs") for _, s := range d.URLs { u, err := url.Parse(s) @@ -202,7 +206,7 @@ func (rl *remoteImageLayer) Compressed() (io.ReadCloser, error) { return nil, err } - resp, err := rl.ri.client.Do(req.WithContext(ctx)) + resp, err := rl.ri.fetcher.Do(req.WithContext(ctx)) if err != nil { lastErr = err continue @@ -260,13 +264,14 @@ func (rl *remoteImageLayer) Descriptor() (*v1.Descriptor, error) { // See partial.Exists. func (rl *remoteImageLayer) Exists() (bool, error) { - return rl.ri.blobExists(rl.digest) + return rl.ri.fetcher.blobExists(rl.ri.ctx, rl.digest) } // LayerByDigest implements partial.CompressedLayer func (r *remoteImage) LayerByDigest(h v1.Hash) (partial.CompressedLayer, error) { return &remoteImageLayer{ ri: r, + ctx: r.ctx, digest: h, }, nil } diff --git a/pkg/v1/remote/image_test.go b/pkg/v1/remote/image_test.go index 0930b2346..f15e96a6d 100644 --- a/pkg/v1/remote/image_test.go +++ b/pkg/v1/remote/image_test.go @@ -179,10 +179,10 @@ func TestRawManifestDigests(t *testing.T) { rmt := remoteImage{ ref: ref, + ctx: context.Background(), fetcher: fetcher{ - target: ref.Context(), - client: http.DefaultClient, - context: context.Background(), + target: ref.Context(), + client: http.DefaultClient, }, } @@ -216,10 +216,10 @@ func TestRawManifestNotFound(t *testing.T) { ref := mustNewTag(t, fmt.Sprintf("%s/%s:latest", u.Host, expectedRepo)) img := remoteImage{ ref: ref, + ctx: context.Background(), fetcher: fetcher{ - target: ref.Context(), - client: http.DefaultClient, - context: context.Background(), + target: ref.Context(), + client: http.DefaultClient, }, } @@ -258,10 +258,10 @@ func TestRawConfigFileNotFound(t *testing.T) { ref := mustNewTag(t, fmt.Sprintf("%s/%s:latest", u.Host, expectedRepo)) rmt := remoteImage{ ref: ref, + ctx: context.Background(), fetcher: fetcher{ - target: ref.Context(), - client: http.DefaultClient, - context: context.Background(), + target: ref.Context(), + client: http.DefaultClient, }, } @@ -300,10 +300,10 @@ func TestAcceptHeaders(t *testing.T) { ref := mustNewTag(t, fmt.Sprintf("%s/%s:latest", u.Host, expectedRepo)) rmt := &remoteImage{ ref: ref, + ctx: context.Background(), fetcher: fetcher{ - target: ref.Context(), - client: http.DefaultClient, - context: context.Background(), + target: ref.Context(), + client: http.DefaultClient, }, } manifest, err := rmt.RawManifest() diff --git a/pkg/v1/remote/index.go b/pkg/v1/remote/index.go index 6b50b1a94..b80972c80 100644 --- a/pkg/v1/remote/index.go +++ b/pkg/v1/remote/index.go @@ -16,6 +16,7 @@ package remote import ( "bytes" + "context" "fmt" "sync" @@ -33,8 +34,9 @@ var acceptableIndexMediaTypes = []types.MediaType{ // remoteIndex accesses an index from a remote registry type remoteIndex struct { - fetcher + fetcher fetcher ref name.Reference + ctx context.Context manifestLock sync.Mutex // Protects manifest manifest []byte mediaType types.MediaType @@ -76,7 +78,7 @@ func (r *remoteIndex) RawManifest() ([]byte, error) { // NOTE(jonjohnsonjr): We should never get here because the public entrypoints // do type-checking via remote.Descriptor. I've left this here for tests that // directly instantiate a remoteIndex. - manifest, desc, err := r.fetchManifest(r.context, r.ref, acceptableIndexMediaTypes) + manifest, desc, err := r.fetcher.fetchManifest(r.ctx, r.ref, acceptableIndexMediaTypes) if err != nil { return nil, err } @@ -134,6 +136,7 @@ func (r *remoteIndex) Layer(h v1.Hash) (v1.Layer, error) { if h == childDesc.Digest { l, err := partial.CompressedToLayer(&remoteLayer{ fetcher: r.fetcher, + ctx: r.ctx, digest: h, }) if err != nil { @@ -212,7 +215,7 @@ func (r *remoteIndex) childDescriptor(child v1.Descriptor, platform v1.Platform) } manifest = child.Data } else { - manifest, _, err = r.fetchManifest(r.context, ref, []types.MediaType{child.MediaType}) + manifest, _, err = r.fetcher.fetchManifest(r.ctx, ref, []types.MediaType{child.MediaType}) if err != nil { return nil, err } @@ -228,8 +231,9 @@ func (r *remoteIndex) childDescriptor(child v1.Descriptor, platform v1.Platform) } return &Descriptor{ - fetcher: r.fetcher, ref: ref, + ctx: r.ctx, + fetcher: r.fetcher, Manifest: manifest, Descriptor: child, platform: platform, diff --git a/pkg/v1/remote/index_test.go b/pkg/v1/remote/index_test.go index 52d6b5bc1..1b4b5cf1e 100644 --- a/pkg/v1/remote/index_test.go +++ b/pkg/v1/remote/index_test.go @@ -141,10 +141,10 @@ func TestIndexRawManifestDigests(t *testing.T) { rmt := remoteIndex{ ref: ref, + ctx: context.Background(), fetcher: fetcher{ - target: ref.Context(), - client: http.DefaultClient, - context: context.Background(), + target: ref.Context(), + client: http.DefaultClient, }, } diff --git a/pkg/v1/remote/layer.go b/pkg/v1/remote/layer.go index a03ae7ae9..39c205950 100644 --- a/pkg/v1/remote/layer.go +++ b/pkg/v1/remote/layer.go @@ -15,6 +15,7 @@ package remote import ( + "context" "io" "github.com/google/go-containerregistry/internal/redact" @@ -26,20 +27,21 @@ import ( // remoteImagelayer implements partial.CompressedLayer type remoteLayer struct { - fetcher - digest v1.Hash + ctx context.Context + fetcher fetcher + digest v1.Hash } // Compressed implements partial.CompressedLayer func (rl *remoteLayer) Compressed() (io.ReadCloser, error) { // We don't want to log binary layers -- this can break terminals. - ctx := redact.NewContext(rl.context, "omitting binary blobs from logs") - return rl.fetchBlob(ctx, verify.SizeUnknown, rl.digest) + ctx := redact.NewContext(rl.ctx, "omitting binary blobs from logs") + return rl.fetcher.fetchBlob(ctx, verify.SizeUnknown, rl.digest) } // Compressed implements partial.CompressedLayer func (rl *remoteLayer) Size() (int64, error) { - resp, err := rl.headBlob(rl.digest) + resp, err := rl.fetcher.headBlob(rl.ctx, rl.digest) if err != nil { return -1, err } @@ -59,7 +61,7 @@ func (rl *remoteLayer) MediaType() (types.MediaType, error) { // See partial.Exists. func (rl *remoteLayer) Exists() (bool, error) { - return rl.blobExists(rl.digest) + return rl.fetcher.blobExists(rl.ctx, rl.digest) } // Layer reads the given blob reference from a registry as a Layer. A blob diff --git a/pkg/v1/remote/list.go b/pkg/v1/remote/list.go index 332ae50d7..910d2a94c 100644 --- a/pkg/v1/remote/list.go +++ b/pkg/v1/remote/list.go @@ -49,15 +49,15 @@ type Tags struct { Next string `json:"next,omitempty"` } -func (f *fetcher) listPage(ctx context.Context, repo name.Repository, next string) (*Tags, error) { +func (f *fetcher) listPage(ctx context.Context, repo name.Repository, next string, pageSize int) (*Tags, error) { if next == "" { uri := &url.URL{ Scheme: repo.Scheme(), Host: repo.RegistryStr(), Path: fmt.Sprintf("/v2/%s/tags/list", repo.RepositoryStr()), } - if f.pageSize > 0 { - uri.RawQuery = fmt.Sprintf("n=%d", f.pageSize) + if pageSize > 0 { + uri.RawQuery = fmt.Sprintf("n=%d", pageSize) } next = uri.String() } @@ -128,8 +128,9 @@ func getNextPageURL(resp *http.Response) (*url.URL, error) { } type Lister struct { - f *fetcher - repo name.Repository + f *fetcher + repo name.Repository + pageSize int page *Tags err error @@ -139,7 +140,7 @@ type Lister struct { func (l *Lister) Next(ctx context.Context) (*Tags, error) { if l.needMore { - l.page, l.err = l.f.listPage(ctx, l.repo, l.page.Next) + l.page, l.err = l.f.listPage(ctx, l.repo, l.page.Next, l.pageSize) } else { l.needMore = true } diff --git a/pkg/v1/remote/options.go b/pkg/v1/remote/options.go index 6e5bb04a3..a722c2ca6 100644 --- a/pkg/v1/remote/options.go +++ b/pkg/v1/remote/options.go @@ -37,17 +37,23 @@ type options struct { auth authn.Authenticator keychain authn.Keychain transport http.RoundTripper - platform v1.Platform context context.Context jobs int userAgent string allowNondistributableArtifacts bool progress *progress - pageSize int retryBackoff Backoff retryPredicate retry.Predicate retryStatusCodes []int - filter map[string]string + + // Only these options can overwrite Reuse()d options. + platform v1.Platform + pageSize int + filter map[string]string + + // Set by Reuse, we currently store one or the other. + puller *Puller + pusher *Pusher } var defaultPlatform = v1.Platform{ @@ -323,3 +329,20 @@ func WithFilter(key string, value string) Option { return nil } } + +// Reuse takes a Puller or Pusher and reuses it for remote interactions +// rather than starting from a clean slate. For example, it will reuse token exchanges +// when possible and avoid sending redundant HEAD requests. +// +// Reuse will take precedence over other options passed to most remote functions because +// most options deal with setting up auth and transports, which Reuse intetionally skips. +func Reuse[I *Puller | *Pusher](i I) Option { + return func(o *options) error { + if puller, ok := any(i).(*Puller); ok { + o.puller = puller + } else if pusher, ok := any(i).(*Pusher); ok { + o.pusher = pusher + } + return nil + } +} diff --git a/pkg/v1/remote/puller.go b/pkg/v1/remote/puller.go index d49ac4896..cd1a713cd 100644 --- a/pkg/v1/remote/puller.go +++ b/pkg/v1/remote/puller.go @@ -41,16 +41,23 @@ func NewPuller(options ...Option) (*Puller, error) { } func newPuller(o *options) *Puller { + if o.puller != nil { + return o.puller + } return &Puller{ o: o, } } type reader struct { + // in target resource o *options - once sync.Once + // f() + once sync.Once + + // out f *fetcher err error } @@ -63,37 +70,56 @@ func (r *reader) init(ctx context.Context) error { return r.err } -func (p *Puller) reader(ctx context.Context, target resource) (*reader, error) { +func (p *Puller) fetcher(ctx context.Context, target resource) (*fetcher, error) { + // If we are Reuse()ing a Pusher, we want to use that for token handshakes and scopes, + // but we want to do read requests via a fetcher{}. + // + // TODO(jonjohnsonjr): Unify fetcher, writer, and repoWriter. + if p.o.pusher != nil { + if repo, ok := target.(name.Repository); ok { + w, err := p.o.pusher.writer(ctx, repo, p.o) + if err != nil { + return nil, err + } + return fetcherFromWriter(w.w), nil + } + } + + // Normal path for NewPuller. v, _ := p.readers.LoadOrStore(target, &reader{ target: target, o: p.o, }) rr := v.(*reader) - return rr, rr.init(ctx) + return rr.f, rr.init(ctx) } // Head is like remote.Head, but avoids re-authenticating when possible. func (p *Puller) Head(ctx context.Context, ref name.Reference) (*v1.Descriptor, error) { - r, err := p.reader(ctx, ref.Context()) + f, err := p.fetcher(ctx, ref.Context()) if err != nil { return nil, err } - return r.f.headManifest(ctx, ref, allManifestMediaTypes) + return f.headManifest(ctx, ref, allManifestMediaTypes) } // Get is like remote.Get, but avoids re-authenticating when possible. func (p *Puller) Get(ctx context.Context, ref name.Reference) (*Descriptor, error) { - r, err := p.reader(ctx, ref.Context()) + return p.get(ctx, ref, allManifestMediaTypes, p.o.platform) +} + +func (p *Puller) get(ctx context.Context, ref name.Reference, acceptable []types.MediaType, platform v1.Platform) (*Descriptor, error) { + f, err := p.fetcher(ctx, ref.Context()) if err != nil { return nil, err } - return r.f.get(ctx, ref, allManifestMediaTypes) + return f.get(ctx, ref, acceptable, platform) } // Layer is like remote.Layer, but avoids re-authenticated when possible. func (p *Puller) Layer(ctx context.Context, ref name.Digest) (v1.Layer, error) { - r, err := p.reader(ctx, ref.Context()) + f, err := p.fetcher(ctx, ref.Context()) if err != nil { return nil, err } @@ -103,7 +129,8 @@ func (p *Puller) Layer(ctx context.Context, ref name.Digest) (v1.Layer, error) { return nil, err } l, err := partial.CompressedToLayer(&remoteLayer{ - fetcher: *r.f, + fetcher: *f, + ctx: ctx, digest: h, }) if err != nil { @@ -136,25 +163,34 @@ func (p *Puller) List(ctx context.Context, repo name.Repository) ([]string, erro // Lister lists tags in a repo and returns a Lister for paginating through the results. func (p *Puller) Lister(ctx context.Context, repo name.Repository) (*Lister, error) { - r, err := p.reader(ctx, repo) + return p.lister(ctx, repo, p.o.pageSize) +} + +func (p *Puller) lister(ctx context.Context, repo name.Repository, pageSize int) (*Lister, error) { + f, err := p.fetcher(ctx, repo) if err != nil { return nil, err } - page, err := r.f.listPage(ctx, repo, "") + page, err := f.listPage(ctx, repo, "", pageSize) if err != nil { return nil, err } return &Lister{ - f: r.f, - repo: repo, - page: page, - err: err, + f: f, + repo: repo, + pageSize: pageSize, + page: page, + err: err, }, nil } // Catalog lists repos in a registry and handles pagination, returning the full list of repos. func (p *Puller) Catalog(ctx context.Context, reg name.Registry) ([]string, error) { - catalogger, err := p.Catalogger(ctx, reg) + return p.catalog(ctx, reg, p.o.pageSize) +} + +func (p *Puller) catalog(ctx context.Context, reg name.Registry, pageSize int) ([]string, error) { + catalogger, err := p.catalogger(ctx, reg, pageSize) if err != nil { return nil, err } @@ -171,34 +207,31 @@ func (p *Puller) Catalog(ctx context.Context, reg name.Registry) ([]string, erro // Catalogger lists repos in a registry and returns a Catalogger for paginating through the results. func (p *Puller) Catalogger(ctx context.Context, reg name.Registry) (*Catalogger, error) { - r, err := p.reader(ctx, reg) + return p.catalogger(ctx, reg, p.o.pageSize) +} + +func (p *Puller) catalogger(ctx context.Context, reg name.Registry, pageSize int) (*Catalogger, error) { + f, err := p.fetcher(ctx, reg) if err != nil { return nil, err } - page, err := r.f.catalogPage(ctx, reg, "") + page, err := f.catalogPage(ctx, reg, "", pageSize) if err != nil { return nil, err } return &Catalogger{ - f: r.f, - reg: reg, - page: page, - err: err, + f: f, + reg: reg, + pageSize: pageSize, + page: page, + err: err, }, nil } -func (p *Puller) get(ctx context.Context, ref name.Reference, acceptable []types.MediaType) (*Descriptor, error) { - r, err := p.reader(ctx, ref.Context()) - if err != nil { - return nil, err - } - return r.f.get(ctx, ref, acceptable) -} - func (p *Puller) referrers(ctx context.Context, d name.Digest, filter map[string]string) (v1.ImageIndex, error) { - r, err := p.reader(ctx, d.Context()) + f, err := p.fetcher(ctx, d.Context()) if err != nil { return nil, err } - return r.f.fetchReferrers(ctx, filter, d) + return f.fetchReferrers(ctx, filter, d) } diff --git a/pkg/v1/remote/pusher.go b/pkg/v1/remote/pusher.go index 0c9dd42f2..1a216b1eb 100644 --- a/pkg/v1/remote/pusher.go +++ b/pkg/v1/remote/pusher.go @@ -20,6 +20,7 @@ import ( "errors" "fmt" "net/http" + "net/url" "sync" "github.com/google/go-containerregistry/pkg/logs" @@ -107,6 +108,9 @@ func NewPusher(options ...Option) (*Pusher, error) { } func newPusher(o *options) *Pusher { + if o.pusher != nil { + return o.pusher + } return &Pusher{ o: o, } @@ -137,6 +141,36 @@ func (p *Pusher) Upload(ctx context.Context, repo name.Repository, l v1.Layer) e return w.writeLayer(ctx, l) } +func (p *Pusher) Delete(ctx context.Context, ref name.Reference) error { + w, err := p.writer(ctx, ref.Context(), p.o) + if err != nil { + return err + } + + u := url.URL{ + Scheme: ref.Context().Registry.Scheme(), + Host: ref.Context().RegistryStr(), + Path: fmt.Sprintf("/v2/%s/manifests/%s", ref.Context().RepositoryStr(), ref.Identifier()), + } + + req, err := http.NewRequest(http.MethodDelete, u.String(), nil) + if err != nil { + return err + } + + resp, err := w.w.client.Do(req.WithContext(ctx)) + if err != nil { + return err + } + defer resp.Body.Close() + + return transport.CheckError(resp, http.StatusOK, http.StatusAccepted) + + // TODO(jason): If the manifest had a `subject`, and if the registry + // doesn't support Referrers, update the index pointed to by the + // subject's fallback tag to remove the descriptor for this manifest. +} + type repoWriter struct { repo name.Repository o *options @@ -354,9 +388,8 @@ func (rw *repoWriter) writeChild(ctx context.Context, child partial.Describable, func (rw *repoWriter) manifestExists(ctx context.Context, ref name.Reference, t Taggable) (bool, error) { f := &fetcher{ - target: ref.Context(), - client: rw.w.client, - context: ctx, + target: ref.Context(), + client: rw.w.client, } m, err := taggableToManifest(t) diff --git a/pkg/v1/remote/referrers.go b/pkg/v1/remote/referrers.go index 271f7ed97..e30ca57ed 100644 --- a/pkg/v1/remote/referrers.go +++ b/pkg/v1/remote/referrers.go @@ -89,6 +89,7 @@ func (f *fetcher) fetchReferrers(ctx context.Context, filter map[string]string, } idx := &remoteIndex{ fetcher: *f, + ctx: ctx, manifest: b, mediaType: types.OCIImageIndex, descriptor: &v1.Descriptor{ diff --git a/pkg/v1/remote/schema1.go b/pkg/v1/remote/schema1.go index 96456a323..4bc1c4c45 100644 --- a/pkg/v1/remote/schema1.go +++ b/pkg/v1/remote/schema1.go @@ -16,6 +16,7 @@ package remote import ( "bytes" + "context" "encoding/json" "github.com/google/go-containerregistry/pkg/name" @@ -25,8 +26,9 @@ import ( ) type schema1 struct { - fetcher ref name.Reference + ctx context.Context + fetcher fetcher manifest []byte mediaType types.MediaType descriptor *v1.Descriptor @@ -91,6 +93,7 @@ func (s *schema1) RawManifest() ([]byte, error) { func (s *schema1) LayerByDigest(h v1.Hash) (v1.Layer, error) { l, err := partial.CompressedToLayer(&remoteLayer{ fetcher: s.fetcher, + ctx: s.ctx, digest: h, }) if err != nil { diff --git a/pkg/v1/remote/write.go b/pkg/v1/remote/write.go index 5eea1b181..55abae950 100644 --- a/pkg/v1/remote/write.go +++ b/pkg/v1/remote/write.go @@ -33,11 +33,9 @@ import ( "github.com/google/go-containerregistry/pkg/logs" "github.com/google/go-containerregistry/pkg/name" v1 "github.com/google/go-containerregistry/pkg/v1" - "github.com/google/go-containerregistry/pkg/v1/partial" "github.com/google/go-containerregistry/pkg/v1/remote/transport" "github.com/google/go-containerregistry/pkg/v1/stream" "github.com/google/go-containerregistry/pkg/v1/types" - "golang.org/x/sync/errgroup" ) // Taggable is an interface that enables a manifest PUT (e.g. for tagging). @@ -51,112 +49,10 @@ func Write(ref name.Reference, img v1.Image, options ...Option) (rerr error) { if err != nil { return err } - if o.progress != nil { defer func() { o.progress.Close(rerr) }() - o.progress.lastUpdate.Total, err = countImage(img, o.allowNondistributableArtifacts) - if err != nil { - return err - } } - return writeImage(o.context, ref, img, o) -} - -func writeImage(ctx context.Context, ref name.Reference, img v1.Image, o *options) error { - ls, err := img.Layers() - if err != nil { - return err - } - - w, err := makeWriter(ctx, ref.Context(), ls, o) - if err != nil { - return err - } - - // Upload individual blobs and collect any errors. - blobChan := make(chan v1.Layer, 2*o.jobs) - g, gctx := errgroup.WithContext(ctx) - for i := 0; i < o.jobs; i++ { - // Start N workers consuming blobs to upload. - g.Go(func() error { - for b := range blobChan { - if err := w.uploadOne(gctx, b); err != nil { - return err - } - } - return nil - }) - } - - // Upload individual layers in goroutines and collect any errors. - // If we can dedupe by the layer digest, try to do so. If we can't determine - // the digest for whatever reason, we can't dedupe and might re-upload. - g.Go(func() error { - defer close(blobChan) - uploaded := map[v1.Hash]bool{} - for _, l := range ls { - l := l - - // Handle foreign layers. - mt, err := l.MediaType() - if err != nil { - return err - } - if !mt.IsDistributable() && !o.allowNondistributableArtifacts { - continue - } - - // Streaming layers calculate their digests while uploading them. Assume - // an error here indicates we need to upload the layer. - h, err := l.Digest() - if err == nil { - // If we can determine the layer's digest ahead of - // time, use it to dedupe uploads. - if uploaded[h] { - continue // Already uploading. - } - uploaded[h] = true - } - select { - case blobChan <- l: - case <-gctx.Done(): - return gctx.Err() - } - } - return nil - }) - - if l, err := partial.ConfigLayer(img); err != nil { - // We can't read the ConfigLayer, possibly because of streaming layers, - // since the layer DiffIDs haven't been calculated yet. Attempt to wait - // for the other layers to be uploaded, then try the config again. - if err := g.Wait(); err != nil { - return err - } - - // Now that all the layers are uploaded, try to upload the config file blob. - l, err := partial.ConfigLayer(img) - if err != nil { - return err - } - if err := w.uploadOne(ctx, l); err != nil { - return err - } - } else { - // We *can* read the ConfigLayer, so upload it concurrently with the layers. - g.Go(func() error { - return w.uploadOne(gctx, l) - }) - - // Wait for the layers + config. - if err := g.Wait(); err != nil { - return err - } - } - - // With all of the constituent elements uploaded, upload the manifest - // to commit the image. - return w.commitManifest(ctx, img, ref) + return newPusher(o).Push(o.context, ref, img) } // writer writes the elements of an image to a remote image reference. @@ -164,7 +60,8 @@ type writer struct { repo name.Repository auth authn.Authenticator transport http.RoundTripper - client *http.Client + + client *http.Client progress *progress backoff Backoff @@ -286,30 +183,6 @@ func (w *writer) checkExistingBlob(ctx context.Context, h v1.Hash) (bool, error) return resp.StatusCode == http.StatusOK, nil } -// checkExistingManifest checks if a manifest exists already in the repository -// by making a HEAD request to the manifest API. -func (w *writer) checkExistingManifest(ctx context.Context, h v1.Hash, mt types.MediaType) (bool, error) { - u := w.url(fmt.Sprintf("/v2/%s/manifests/%s", w.repo.RepositoryStr(), h.String())) - - req, err := http.NewRequest(http.MethodHead, u.String(), nil) - if err != nil { - return false, err - } - req.Header.Set("Accept", string(mt)) - - resp, err := w.client.Do(req.WithContext(ctx)) - if err != nil { - return false, err - } - defer resp.Body.Close() - - if err := transport.CheckError(resp, http.StatusOK, http.StatusNotFound); err != nil { - return false, err - } - - return resp.StatusCode == http.StatusOK, nil -} - // initiateUpload initiates the blob upload, which starts with a POST that can // optionally include the hash of the layer and a list of repositories from // which that layer might be read. On failure, an error is returned. @@ -535,64 +408,6 @@ func (w *writer) uploadOne(ctx context.Context, l v1.Layer) error { return retry.Retry(tryUpload, w.predicate, w.backoff) } -type withLayer interface { - Layer(v1.Hash) (v1.Layer, error) -} - -func (w *writer) writeIndex(ctx context.Context, ref name.Reference, ii v1.ImageIndex, o *options) error { - index, err := ii.IndexManifest() - if err != nil { - return err - } - - // TODO(#803): Pipe through remote.WithJobs and upload these in parallel. - for _, desc := range index.Manifests { - ref := ref.Context().Digest(desc.Digest.String()) - exists, err := w.checkExistingManifest(ctx, desc.Digest, desc.MediaType) - if err != nil { - return err - } - if exists { - logs.Progress.Print("existing manifest: ", desc.Digest) - continue - } - - switch desc.MediaType { - case types.OCIImageIndex, types.DockerManifestList: - ii, err := ii.ImageIndex(desc.Digest) - if err != nil { - return err - } - if err := w.writeIndex(ctx, ref, ii, o); err != nil { - return err - } - case types.OCIManifestSchema1, types.DockerManifestSchema2: - img, err := ii.Image(desc.Digest) - if err != nil { - return err - } - if err := writeImage(ctx, ref, img, o); err != nil { - return err - } - default: - // Workaround for #819. - if wl, ok := ii.(withLayer); ok { - layer, err := wl.Layer(desc.Digest) - if err != nil { - return err - } - if err := w.uploadOne(ctx, layer); err != nil { - return err - } - } - } - } - - // With all of the constituent elements uploaded, upload the manifest - // to commit the image. - return w.commitManifest(ctx, ii, ref) -} - type withMediaType interface { MediaType() (types.MediaType, error) } @@ -830,129 +645,10 @@ func WriteIndex(ref name.Reference, ii v1.ImageIndex, options ...Option) (rerr e if err != nil { return err } - - w, err := makeWriter(o.context, ref.Context(), nil, o) - if err != nil { - return err - } - if w.progress != nil { - defer func() { w.progress.Close(rerr) }() - - w.progress.lastUpdate.Total, err = countIndex(ii, o.allowNondistributableArtifacts) - if err != nil { - return err - } - } - - return w.writeIndex(o.context, ref, ii, o) -} - -// countImage counts the total size of all layers + config blob + manifest for -// an image. It de-dupes duplicate layers. -func countImage(img v1.Image, allowNondistributableArtifacts bool) (int64, error) { - var total int64 - ls, err := img.Layers() - if err != nil { - return 0, err - } - seen := map[v1.Hash]bool{} - for _, l := range ls { - // Handle foreign layers. - mt, err := l.MediaType() - if err != nil { - return 0, err - } - if !mt.IsDistributable() && !allowNondistributableArtifacts { - continue - } - - // TODO: support streaming layers which update the total count as they write. - if _, ok := l.(*stream.Layer); ok { - return 0, errors.New("cannot use stream.Layer and WithProgress") - } - - // Dedupe layers. - d, err := l.Digest() - if err != nil { - return 0, err - } - if seen[d] { - continue - } - seen[d] = true - - size, err := l.Size() - if err != nil { - return 0, err - } - total += size - } - b, err := img.RawConfigFile() - if err != nil { - return 0, err - } - total += int64(len(b)) - size, err := img.Size() - if err != nil { - return 0, err - } - total += size - return total, nil -} - -// countIndex counts the total size of all images + sub-indexes for an index. -// It does not attempt to de-dupe duplicate images, etc. -func countIndex(idx v1.ImageIndex, allowNondistributableArtifacts bool) (int64, error) { - var total int64 - mf, err := idx.IndexManifest() - if err != nil { - return 0, err - } - - for _, desc := range mf.Manifests { - switch desc.MediaType { - case types.OCIImageIndex, types.DockerManifestList: - sidx, err := idx.ImageIndex(desc.Digest) - if err != nil { - return 0, err - } - size, err := countIndex(sidx, allowNondistributableArtifacts) - if err != nil { - return 0, err - } - total += size - case types.OCIManifestSchema1, types.DockerManifestSchema2: - simg, err := idx.Image(desc.Digest) - if err != nil { - return 0, err - } - size, err := countImage(simg, allowNondistributableArtifacts) - if err != nil { - return 0, err - } - total += size - default: - // Workaround for #819. - if wl, ok := idx.(withLayer); ok { - layer, err := wl.Layer(desc.Digest) - if err != nil { - return 0, err - } - size, err := layer.Size() - if err != nil { - return 0, err - } - total += size - } - } - } - - size, err := idx.Size() - if err != nil { - return 0, err + if o.progress != nil { + defer func() { o.progress.Close(rerr) }() } - total += size - return total, nil + return newPusher(o).Push(o.context, ref, ii) } // WriteLayer uploads the provided Layer to the specified repo. @@ -961,25 +657,10 @@ func WriteLayer(repo name.Repository, layer v1.Layer, options ...Option) (rerr e if err != nil { return err } - w, err := makeWriter(o.context, repo, []v1.Layer{layer}, o) - if err != nil { - return err - } - - if w.progress != nil { - defer func() { w.progress.Close(rerr) }() - - // TODO: support streaming layers which update the total count as they write. - if _, ok := layer.(*stream.Layer); ok { - return errors.New("cannot use stream.Layer and WithProgress") - } - size, err := layer.Size() - if err != nil { - return err - } - w.progress.total(size) + if o.progress != nil { + defer func() { o.progress.Close(rerr) }() } - return w.uploadOne(o.context, layer) + return newPusher(o).Upload(o.context, repo, layer) } // Tag adds a tag to the given Taggable via PUT /v2/.../manifests/ @@ -1009,15 +690,9 @@ func Tag(tag name.Tag, t Taggable, options ...Option) error { // should ensure that all blobs or manifests that are referenced by t exist // in the target registry. func Put(ref name.Reference, t Taggable, options ...Option) error { - repo := ref.Context() o, err := makeOptions(options...) if err != nil { return err } - w, err := makeWriter(o.context, repo, nil, o) - if err != nil { - return err - } - - return w.commitManifest(o.context, t, ref) + return newPusher(o).Push(o.context, ref, t) } diff --git a/pkg/v1/remote/write_test.go b/pkg/v1/remote/write_test.go index 7235c96a6..3f2696d7f 100644 --- a/pkg/v1/remote/write_test.go +++ b/pkg/v1/remote/write_test.go @@ -568,6 +568,12 @@ func TestDedupeLayers(t *testing.T) { case commitPath: http.Error(w, "Created", http.StatusCreated) case manifestPath: + if r.Method == http.MethodHead { + w.Header().Set("Content-Type", string(types.DockerManifestSchema1Signed)) + w.Header().Set("Docker-Content-Digest", fakeDigest) + w.Write([]byte("doesn't matter")) + return + } if r.Method != http.MethodPut { t.Errorf("Method; got %v, want %v", r.Method, http.MethodPut) } @@ -927,6 +933,10 @@ func TestWrite(t *testing.T) { } http.Error(w, "Mounted", http.StatusCreated) case manifestPath: + if r.Method == http.MethodHead { + w.WriteHeader(http.StatusNotFound) + return + } if r.Method != http.MethodPut { t.Errorf("Method; got %v, want %v", r.Method, http.MethodPut) } @@ -955,6 +965,7 @@ func TestWriteWithErrors(t *testing.T) { expectedRepo := "write/time" headPathPrefix := fmt.Sprintf("/v2/%s/blobs/", expectedRepo) initiatePath := fmt.Sprintf("/v2/%s/blobs/uploads/", expectedRepo) + manifestPath := fmt.Sprintf("/v2/%s/manifests/latest", expectedRepo) errorBody := `{"errors":[{"code":"NAME_INVALID","message":"some explanation of how things were messed up."}],"StatusCode":400}` expectedErrMsg, err := regexp.Compile(`POST .+ NAME_INVALID: some explanation of how things were messed up.`) @@ -970,6 +981,8 @@ func TestWriteWithErrors(t *testing.T) { switch r.URL.Path { case "/v2/": w.WriteHeader(http.StatusOK) + case manifestPath: + w.WriteHeader(http.StatusNotFound) case initiatePath: if r.Method != http.MethodPost { t.Errorf("Method; got %v, want %v", r.Method, http.MethodPost) @@ -1216,65 +1229,6 @@ func TestScopesForUploadingImage(t *testing.T) { } } -func TestCheckExistingManifest(t *testing.T) { - tests := []struct { - name string - status int - existing bool - wantErr bool - }{{ - name: "success", - status: http.StatusOK, - existing: true, - }, { - name: "not found", - status: http.StatusNotFound, - existing: false, - }, { - name: "error", - status: http.StatusInternalServerError, - existing: false, - wantErr: true, - }} - - img := setupImage(t) - h := mustDigest(t, img) - mt := mustMediaType(t, img) - expectedRepo := "foo/bar" - expectedPath := fmt.Sprintf("/v2/%s/manifests/%s", expectedRepo, h.String()) - - for _, test := range tests { - t.Run(test.name, func(t *testing.T) { - w, closer, err := setupWriter(expectedRepo, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - if r.Method != http.MethodHead { - t.Errorf("Method; got %v, want %v", r.Method, http.MethodHead) - } - if r.URL.Path != expectedPath { - t.Errorf("URL; got %v, want %v", r.URL.Path, expectedPath) - } - if got, want := r.Header.Get("Accept"), string(mt); got != want { - t.Errorf("r.Header['Accept']; got %v, want %v", got, want) - } - http.Error(w, http.StatusText(test.status), test.status) - })) - if err != nil { - t.Fatalf("setupWriter() = %v", err) - } - defer closer.Close() - - existing, err := w.checkExistingManifest(context.Background(), h, mt) - if test.existing != existing { - t.Errorf("checkExistingManifest() = %v, want %v", existing, test.existing) - } - if err != nil && !test.wantErr { - t.Errorf("checkExistingManifest() = %v", err) - } else if err == nil && test.wantErr { - t.Error("checkExistingManifest() wanted err, got nil") - } - }) - } -} - func TestWriteIndex(t *testing.T) { idx := setupIndex(t, 2) expectedRepo := "write/time" @@ -1283,8 +1237,8 @@ func TestWriteIndex(t *testing.T) { manifestPath := fmt.Sprintf("/v2/%s/manifests/latest", expectedRepo) childDigest := mustIndexManifest(t, idx).Manifests[0].Digest childPath := fmt.Sprintf("/v2/%s/manifests/%s", expectedRepo, childDigest) - existinChildDigest := mustIndexManifest(t, idx).Manifests[1].Digest - existingChildPath := fmt.Sprintf("/v2/%s/manifests/%s", expectedRepo, existinChildDigest) + existingChildDigest := mustIndexManifest(t, idx).Manifests[1].Digest + existingChildPath := fmt.Sprintf("/v2/%s/manifests/%s", expectedRepo, existingChildDigest) server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { if r.Method == http.MethodHead && strings.HasPrefix(r.URL.Path, headPathPrefix) && r.URL.Path != initiatePath { @@ -1300,13 +1254,19 @@ func TestWriteIndex(t *testing.T) { } http.Error(w, "Mounted", http.StatusCreated) case manifestPath: + if r.Method == http.MethodHead { + w.WriteHeader(http.StatusNotFound) + return + } if r.Method != http.MethodPut { t.Errorf("Method; got %v, want %v", r.Method, http.MethodPut) } http.Error(w, "Created", http.StatusCreated) case existingChildPath: if r.Method == http.MethodHead { - http.Error(w, http.StatusText(http.StatusOK), http.StatusOK) + w.Header().Set("Content-Type", string(types.DockerManifestSchema1)) + w.Header().Set("Docker-Content-Digest", existingChildDigest.String()) + w.Header().Set("Content-Length", "123") return } t.Errorf("Unexpected method; got %v, want %v", r.Method, http.MethodHead) @@ -1440,7 +1400,13 @@ func TestWriteForeignLayerIfOptionSet(t *testing.T) { case commitPath: http.Error(w, "Created", http.StatusCreated) case manifestPath: - if r.Method != http.MethodPut { + if r.Method == http.MethodHead { + w.Header().Set("Content-Type", string(types.DockerManifestSchema1Signed)) + w.Header().Set("Docker-Content-Digest", fakeDigest) + w.Header().Set("Content-Length", "123") + return + } + if r.Method != http.MethodPut && r.Method != http.MethodHead { t.Errorf("Method; got %v, want %v", r.Method, http.MethodPut) } http.Error(w, "Created", http.StatusCreated) From a65ff5f9ef22db57ba2dd6df795cee94d8687489 Mon Sep 17 00:00:00 2001 From: Jon Johnson Date: Tue, 25 Apr 2023 09:09:28 -0700 Subject: [PATCH 5/5] Address review feedback --- pkg/v1/remote/descriptor_test.go | 5 ++--- pkg/v1/remote/puller.go | 2 +- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/pkg/v1/remote/descriptor_test.go b/pkg/v1/remote/descriptor_test.go index 4f0b36b79..f18deb85a 100644 --- a/pkg/v1/remote/descriptor_test.go +++ b/pkg/v1/remote/descriptor_test.go @@ -134,7 +134,6 @@ func TestGetImageAsIndex(t *testing.T) { func TestHeadSchema1(t *testing.T) { expectedRepo := "foo/bar" mediaType := types.DockerManifestSchema1Signed - fakeDigest := "sha256:0000000000000000000000000000000000000000000000000000000000000000" response := []byte("doesn't matter") manifestPath := fmt.Sprintf("/v2/%s/manifests/latest", expectedRepo) @@ -203,7 +202,7 @@ func TestHead_MissingHeaders(t *testing.T) { w.Header().Set("Content-Length", "10") } if !strings.Contains(r.URL.Path, missingDigest) { - w.Header().Set("Docker-Content-Digest", "sha256:0000000000000000000000000000000000000000000000000000000000000000") + w.Header().Set("Docker-Content-Digest", fakeDigest) } })) defer server.Close() @@ -231,7 +230,7 @@ func TestRedactFetchBlob(t *testing.T) { Transport: errTransport{}, }, } - h, err := v1.NewHash("sha256:0000000000000000000000000000000000000000000000000000000000000000") + h, err := v1.NewHash(fakeDigest) if err != nil { t.Fatal("NewHash:", err) } diff --git a/pkg/v1/remote/puller.go b/pkg/v1/remote/puller.go index cd1a713cd..8d979bd3c 100644 --- a/pkg/v1/remote/puller.go +++ b/pkg/v1/remote/puller.go @@ -117,7 +117,7 @@ func (p *Puller) get(ctx context.Context, ref name.Reference, acceptable []types return f.get(ctx, ref, acceptable, platform) } -// Layer is like remote.Layer, but avoids re-authenticated when possible. +// Layer is like remote.Layer, but avoids re-authenticating when possible. func (p *Puller) Layer(ctx context.Context, ref name.Digest) (v1.Layer, error) { f, err := p.fetcher(ctx, ref.Context()) if err != nil {