Skip to content

Commit

Permalink
Support Warning header aggregation and reporting
Browse files Browse the repository at this point in the history
  • Loading branch information
imjasonh committed Mar 18, 2023
1 parent 53189d3 commit e1a4877
Show file tree
Hide file tree
Showing 19 changed files with 168 additions and 51 deletions.
32 changes: 31 additions & 1 deletion cmd/crane/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"net/http"
"os"
"path/filepath"
"sort"

"github.com/docker/cli/cli/config"
"github.com/google/go-containerregistry/internal/cmd"
Expand All @@ -43,6 +44,7 @@ func New(use, short string, options []crane.Option) *cobra.Command {
insecure := false
ndlayers := false
platform := &platformValue{}
wc := &warnCollector{map[string]struct{}{}}

root := &cobra.Command{
Use: use,
Expand Down Expand Up @@ -70,7 +72,10 @@ func New(use, short string, options []crane.Option) *cobra.Command {
options = append(options, crane.WithUserAgent(fmt.Sprintf("%s/%s", binary, Version)))
}

options = append(options, crane.WithPlatform(platform.platform))
options = append(options,
crane.WithPlatform(platform.platform),
crane.WithWarnCollector(wc),
)

transport := remote.DefaultTransport.(*http.Transport).Clone()
transport.TLSClientConfig = &tls.Config{
Expand All @@ -92,6 +97,7 @@ func New(use, short string, options []crane.Option) *cobra.Command {

options = append(options, crane.WithTransport(rt))
},
PersistentPostRun: func(cmd *cobra.Command, args []string) { wc.Report() },
}

root.AddCommand(
Expand Down Expand Up @@ -146,3 +152,27 @@ func (ht *headerTransport) RoundTrip(in *http.Request) (*http.Response, error) {
}
return ht.inner.RoundTrip(in)
}

type warnCollector struct {
warns map[string]struct{}
}

func (d *warnCollector) Add(warning string) {
d.warns[warning] = struct{}{}
}

func (d *warnCollector) Report() {
warns := make([]string, 0, len(d.warns))
for k := range d.warns {
warns = append(warns, k)
}
sort.Strings(warns)
prefix := "\033[1;33m[WARNING]\033[0m:"
if v, set := os.LookupEnv("NO_COLOR"); set && v != "0" {
prefix = "[WARNING]:"
}
for _, w := range warns {
logs.Warn.Println(prefix, w)
}
d.warns = map[string]struct{}{}
}
6 changes: 5 additions & 1 deletion cmd/registry/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,11 @@ func main() {
log.Printf("serving on port %d", porti)
s := &http.Server{
ReadHeaderTimeout: 5 * time.Second, // prevent slowloris, quiet linter
Handler: registry.New(),
Handler: registry.New(
registry.WithWarning(1.0, "This registry is cool."),
registry.WithWarning(0.6, "60% of the time, it works every time."),
registry.WithWarning(0.1, "Today is your lucky day!"),
),
}
log.Fatal(s.Serve(listener))
}
8 changes: 8 additions & 0 deletions pkg/crane/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,3 +147,11 @@ func WithContext(ctx context.Context) Option {
o.Remote = append(o.Remote, remote.WithContext(ctx))
}
}

// WithWarnCollector sets the warning collector to collect and
// report warnings for HTTP operations.
func WithWarnCollector(wc remote.WarnCollector) Option {
return func(o *Options) {
o.Remote = append(o.Remote, remote.WithWarnCollector(wc))
}
}
21 changes: 21 additions & 0 deletions pkg/registry/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@
package registry

import (
"fmt"
"log"
"math/rand"
"net/http"
"os"
)
Expand All @@ -34,11 +36,21 @@ type registry struct {
blobs blobs
manifests manifests
referrersEnabled bool
warnings map[float64]string
}

// https://docs.docker.com/registry/spec/api/#api-version-check
// https://github.com/opencontainers/distribution-spec/blob/master/spec.md#api-version-check
func (r *registry) v2(resp http.ResponseWriter, req *http.Request) *regError {
if r.warnings != nil {
rnd := rand.Float64()
for prob, msg := range r.warnings {
if prob > rnd {
resp.Header().Add("Warning", fmt.Sprintf(`299 - "%s"`, msg))
}
}
}

if isBlob(req) {
return r.blobs.handle(resp, req)
}
Expand Down Expand Up @@ -115,3 +127,12 @@ func WithReferrersSupport(enabled bool) Option {
r.referrersEnabled = enabled
}
}

func WithWarning(prob float64, msg string) Option {
return func(r *registry) {
if r.warnings == nil {
r.warnings = map[float64]string{}
}
r.warnings[prob] = msg
}
}
2 changes: 2 additions & 0 deletions pkg/v1/remote/catalog.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ func CatalogPage(target name.Registry, last string, n int, options ...Option) ([
if err != nil {
return nil, err
}
o.collectWarnings(resp)
defer resp.Body.Close()

if err := transport.CheckError(resp, http.StatusOK); err != nil {
Expand Down Expand Up @@ -127,6 +128,7 @@ func Catalog(ctx context.Context, target name.Registry, options ...Option) ([]st
if err != nil {
return nil, err
}
o.collectWarnings(resp)

if err := transport.CheckError(resp, http.StatusOK); err != nil {
return nil, err
Expand Down
5 changes: 3 additions & 2 deletions pkg/v1/remote/check.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,9 @@ func CheckPushPermission(ref name.Reference, kc authn.Keychain, t http.RoundTrip
// authorize a push. Figure out how to return early here when we can,
// to avoid a roundtrip for spec-compliant registries.
w := writer{
repo: ref.Context(),
client: &http.Client{Transport: tr},
repo: ref.Context(),
client: &http.Client{Transport: tr},
options: &options{},
}
loc, _, err := w.initiateUpload(context.Background(), "", "", "")
if loc != "" {
Expand Down
1 change: 1 addition & 0 deletions pkg/v1/remote/delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ func Delete(ref name.Reference, options ...Option) error {
if err != nil {
return err
}
o.collectWarnings(resp)
defer resp.Body.Close()

return transport.CheckError(resp, http.StatusOK, http.StatusAccepted)
Expand Down
18 changes: 12 additions & 6 deletions pkg/v1/remote/descriptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ func (d *Descriptor) remoteIndex() *remoteIndex {
type fetcher struct {
Ref name.Reference
Client *http.Client
context context.Context
options *options
}

func makeFetcher(ref name.Reference, o *options) (*fetcher, error) {
Expand All @@ -226,7 +226,7 @@ func makeFetcher(ref name.Reference, o *options) (*fetcher, error) {
return &fetcher{
Ref: ref,
Client: &http.Client{Transport: tr},
context: o.context,
options: o,
}, nil
}

Expand Down Expand Up @@ -257,6 +257,7 @@ func (f *fetcher) fetchReferrers(ctx context.Context, filter map[string]string,
if err != nil {
return nil, err
}
f.options.collectWarnings(resp)
defer resp.Body.Close()

if err := transport.CheckError(resp, http.StatusOK, http.StatusNotFound, http.StatusBadRequest); err != nil {
Expand Down Expand Up @@ -301,10 +302,11 @@ func (f *fetcher) fetchManifest(ref name.Reference, acceptable []types.MediaType
}
req.Header.Set("Accept", strings.Join(accept, ","))

resp, err := f.Client.Do(req.WithContext(f.context))
resp, err := f.Client.Do(req.WithContext(f.options.context))
if err != nil {
return nil, nil, err
}
f.options.collectWarnings(resp)
defer resp.Body.Close()

if err := transport.CheckError(resp, http.StatusOK); err != nil {
Expand Down Expand Up @@ -375,10 +377,11 @@ func (f *fetcher) headManifest(ref name.Reference, acceptable []types.MediaType)
}
req.Header.Set("Accept", strings.Join(accept, ","))

resp, err := f.Client.Do(req.WithContext(f.context))
resp, err := f.Client.Do(req.WithContext(f.options.context))
if err != nil {
return nil, err
}
f.options.collectWarnings(resp)
defer resp.Body.Close()

if err := transport.CheckError(resp, http.StatusOK); err != nil {
Expand Down Expand Up @@ -431,6 +434,7 @@ func (f *fetcher) fetchBlob(ctx context.Context, size int64, h v1.Hash) (io.Read
if err != nil {
return nil, redact.Error(err)
}
f.options.collectWarnings(resp)

if err := transport.CheckError(resp, http.StatusOK); err != nil {
resp.Body.Close()
Expand Down Expand Up @@ -458,10 +462,11 @@ func (f *fetcher) headBlob(h v1.Hash) (*http.Response, error) {
return nil, err
}

resp, err := f.Client.Do(req.WithContext(f.context))
resp, err := f.Client.Do(req.WithContext(f.options.context))
if err != nil {
return nil, redact.Error(err)
}
f.options.collectWarnings(resp)

if err := transport.CheckError(resp, http.StatusOK); err != nil {
resp.Body.Close()
Expand All @@ -478,10 +483,11 @@ func (f *fetcher) blobExists(h v1.Hash) (bool, error) {
return false, err
}

resp, err := f.Client.Do(req.WithContext(f.context))
resp, err := f.Client.Do(req.WithContext(f.options.context))
if err != nil {
return false, redact.Error(err)
}
f.options.collectWarnings(resp)
defer resp.Body.Close()

if err := transport.CheckError(resp, http.StatusOK, http.StatusNotFound); err != nil {
Expand Down
4 changes: 3 additions & 1 deletion pkg/v1/remote/descriptor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,9 @@ func TestRedactFetchBlob(t *testing.T) {
Client: &http.Client{
Transport: errTransport{},
},
context: ctx,
options: &options{
context: ctx,
},
}
h, err := v1.NewHash("sha256:0000000000000000000000000000000000000000000000000000000000000000")
if err != nil {
Expand Down
5 changes: 3 additions & 2 deletions pkg/v1/remote/image.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ func (r *remoteImage) RawConfigFile() ([]byte, error) {
return r.config, nil
}

body, err := r.fetchBlob(r.context, m.Config.Size, m.Config.Digest)
body, err := r.fetchBlob(r.options.context, m.Config.Size, m.Config.Digest)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -165,7 +165,7 @@ func (rl *remoteImageLayer) Compressed() (io.ReadCloser, error) {
}

// We don't want to log binary layers -- this can break terminals.
ctx := redact.NewContext(rl.ri.context, "omitting binary blobs from logs")
ctx := redact.NewContext(rl.ri.options.context, "omitting binary blobs from logs")

for _, s := range d.URLs {
u, err := url.Parse(s)
Expand All @@ -191,6 +191,7 @@ func (rl *remoteImageLayer) Compressed() (io.ReadCloser, error) {
lastErr = err
continue
}
rl.ri.options.collectWarnings(resp)

if err := transport.CheckError(resp, http.StatusOK); err != nil {
resp.Body.Close()
Expand Down
8 changes: 4 additions & 4 deletions pkg/v1/remote/image_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ func TestRawManifestDigests(t *testing.T) {
fetcher: fetcher{
Ref: ref,
Client: http.DefaultClient,
context: context.Background(),
options: &options{context: context.Background()},
},
}

Expand Down Expand Up @@ -216,7 +216,7 @@ func TestRawManifestNotFound(t *testing.T) {
fetcher: fetcher{
Ref: mustNewTag(t, fmt.Sprintf("%s/%s:latest", u.Host, expectedRepo)),
Client: http.DefaultClient,
context: context.Background(),
options: &options{context: context.Background()},
},
}

Expand Down Expand Up @@ -256,7 +256,7 @@ func TestRawConfigFileNotFound(t *testing.T) {
fetcher: fetcher{
Ref: mustNewTag(t, fmt.Sprintf("%s/%s:latest", u.Host, expectedRepo)),
Client: http.DefaultClient,
context: context.Background(),
options: &options{context: context.Background()},
},
}

Expand Down Expand Up @@ -297,7 +297,7 @@ func TestAcceptHeaders(t *testing.T) {
fetcher: fetcher{
Ref: mustNewTag(t, fmt.Sprintf("%s/%s:latest", u.Host, expectedRepo)),
Client: http.DefaultClient,
context: context.Background(),
options: &options{context: context.Background()},
},
}
manifest, err := rmt.RawManifest()
Expand Down
2 changes: 1 addition & 1 deletion pkg/v1/remote/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ func (r *remoteIndex) childDescriptor(child v1.Descriptor, platform v1.Platform)
fetcher: fetcher{
Ref: ref,
Client: r.Client,
context: r.context,
options: r.options,
},
Manifest: manifest,
Descriptor: child,
Expand Down
2 changes: 1 addition & 1 deletion pkg/v1/remote/index_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ func TestIndexRawManifestDigests(t *testing.T) {
fetcher: fetcher{
Ref: ref,
Client: http.DefaultClient,
context: context.Background(),
options: &options{context: context.Background()},
},
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/v1/remote/layer.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,13 @@ type remoteLayer struct {
// Compressed implements partial.CompressedLayer
func (rl *remoteLayer) Compressed() (io.ReadCloser, error) {
// We don't want to log binary layers -- this can break terminals.
ctx := redact.NewContext(rl.context, "omitting binary blobs from logs")
ctx := redact.NewContext(rl.options.context, "omitting binary blobs from logs")
return rl.fetchBlob(ctx, verify.SizeUnknown, rl.digest)
}

// Compressed implements partial.CompressedLayer
func (rl *remoteLayer) Size() (int64, error) {
resp, err := rl.headBlob(rl.digest)
resp, err := rl.headBlob(rl.digest) // TODO(jason): cache this.
if err != nil {
return -1, err
}
Expand Down
1 change: 1 addition & 0 deletions pkg/v1/remote/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ func List(repo name.Repository, options ...Option) ([]string, error) {
if err != nil {
return nil, err
}
o.collectWarnings(resp)

if err := transport.CheckError(resp, http.StatusOK); err != nil {
return nil, err
Expand Down
7 changes: 3 additions & 4 deletions pkg/v1/remote/multi_write.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,10 +87,9 @@ func MultiWrite(m map[name.Reference]Taggable, options ...Option) (rerr error) {
return err
}
w := writer{
repo: repo,
client: &http.Client{Transport: tr},
backoff: o.retryBackoff,
predicate: o.retryPredicate,
repo: repo,
client: &http.Client{Transport: tr},
options: o,
}

// Collect the total size of blobs and manifests we're about to write.
Expand Down
Loading

0 comments on commit e1a4877

Please sign in to comment.