Skip to content

Commit

Permalink
Implement remote.Pusher (#1633)
Browse files Browse the repository at this point in the history
* Implement remote.Pusher

The Pusher handles multiplexing across multiple repositories and
deduplicating in-flight uploads. This removes the need to do an awkward
dance around re-using auth handshakes by passing custom transports.

This also fixes limitations in remote.MultiWrite around streaming layers
and multiple repositories.

* Use remote.Pusher in crane edit

This will be faster than doing the handshake twice.
  • Loading branch information
jonjohnsonjr authored Apr 7, 2023
1 parent 0962e29 commit 6743ec9
Show file tree
Hide file tree
Showing 7 changed files with 674 additions and 318 deletions.
14 changes: 10 additions & 4 deletions internal/cmd/edit.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package cmd
import (
"archive/tar"
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
Expand Down Expand Up @@ -70,7 +71,7 @@ func NewCmdEditConfig(options *[]crane.Option) *cobra.Command {
echo '{}' | crane edit config ubuntu`,
Args: cobra.ExactArgs(1),
RunE: func(cmd *cobra.Command, args []string) error {
ref, err := editConfig(cmd.InOrStdin(), cmd.OutOrStdout(), args[0], dst, *options...)
ref, err := editConfig(cmd.Context(), cmd.InOrStdin(), cmd.OutOrStdout(), args[0], dst, *options...)
if err != nil {
return fmt.Errorf("editing config: %w", err)
}
Expand Down Expand Up @@ -154,7 +155,7 @@ func interactiveFile(i any) bool {
return (stat.Mode() & os.ModeCharDevice) != 0
}

func editConfig(in io.Reader, out io.Writer, src, dst string, options ...crane.Option) (name.Reference, error) {
func editConfig(ctx context.Context, in io.Reader, out io.Writer, src, dst string, options ...crane.Option) (name.Reference, error) {
o := crane.GetOptions(options...)

img, err := crane.Pull(src, options...)
Expand Down Expand Up @@ -255,11 +256,16 @@ func editConfig(in io.Reader, out io.Writer, src, dst string, options ...crane.O
return nil, err
}

if err := remote.WriteLayer(dstRef.Context(), l, o.Remote...); err != nil {
pusher, err := remote.NewPusher(o.Remote...)
if err != nil {
return nil, err
}

if err := remote.Put(dstRef, rm, o.Remote...); err != nil {
if err := pusher.Upload(ctx, dstRef.Context(), l); err != nil {
return nil, err
}

if err := pusher.Push(ctx, dstRef, rm); err != nil {
return nil, err
}

Expand Down
26 changes: 18 additions & 8 deletions pkg/v1/remote/descriptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,10 @@ type Descriptor struct {
platform v1.Platform
}

func (d *Descriptor) toDesc() v1.Descriptor {
return d.Descriptor
}

// RawManifest exists to satisfy the Taggable interface.
func (d *Descriptor) RawManifest() ([]byte, error) {
return d.Manifest, nil
Expand Down Expand Up @@ -117,7 +121,11 @@ func get(ref name.Reference, acceptable []types.MediaType, options ...Option) (*
if err != nil {
return nil, err
}
b, desc, err := f.fetchManifest(o.context, ref, acceptable)
return f.get(o.context, ref, acceptable)
}

func (f *fetcher) get(ctx context.Context, ref name.Reference, acceptable []types.MediaType) (*Descriptor, error) {
b, desc, err := f.fetchManifest(ctx, ref, acceptable)
if err != nil {
return nil, err
}
Expand All @@ -126,7 +134,7 @@ func get(ref name.Reference, acceptable []types.MediaType, options ...Option) (*
ref: ref,
Manifest: b,
Descriptor: *desc,
platform: o.platform,
platform: f.platform,
}, nil
}

Expand Down Expand Up @@ -237,9 +245,10 @@ type resource interface {

// fetcher implements methods for reading from a registry.
type fetcher struct {
target resource
client *http.Client
context context.Context
target resource
client *http.Client
context context.Context
platform v1.Platform
}

func makeFetcher(ctx context.Context, target resource, o *options) (*fetcher, error) {
Expand All @@ -266,9 +275,10 @@ func makeFetcher(ctx context.Context, target resource, o *options) (*fetcher, er
return nil, err
}
return &fetcher{
target: target,
client: &http.Client{Transport: tr},
context: ctx,
target: target,
client: &http.Client{Transport: tr},
context: ctx,
platform: o.platform,
}, nil
}

Expand Down
269 changes: 12 additions & 257 deletions pkg/v1/remote/multi_write.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,277 +15,32 @@
package remote

import (
"context"
"fmt"

"github.com/google/go-containerregistry/pkg/name"
v1 "github.com/google/go-containerregistry/pkg/v1"
"github.com/google/go-containerregistry/pkg/v1/partial"
"github.com/google/go-containerregistry/pkg/v1/types"
"golang.org/x/sync/errgroup"
)

// MultiWrite writes the given Images or ImageIndexes to the given refs, as
// efficiently as possible, by deduping shared layer blobs and uploading layers
// in parallel, then uploading all manifests in parallel.
//
// Current limitations:
// - All refs must share the same repository.
// - Images cannot consist of stream.Layers.
func MultiWrite(m map[name.Reference]Taggable, options ...Option) (rerr error) {
// Determine the repository being pushed to; if asked to push to
// multiple repositories, give up.
var repo, zero name.Repository
for ref := range m {
if repo == zero {
repo = ref.Context()
} else if ref.Context() != repo {
return fmt.Errorf("MultiWrite can only push to the same repository (saw %q and %q)", repo, ref.Context())
}
}

// efficiently as possible, by deduping shared layer blobs while uploading them
// in parallel.
func MultiWrite(todo map[name.Reference]Taggable, options ...Option) (rerr error) {
o, err := makeOptions(options...)
if err != nil {
return err
}

// Collect unique blobs (layers and config blobs).
blobs := map[v1.Hash]v1.Layer{}
newManifests := []map[name.Reference]Taggable{}
// Separate originally requested images and indexes, so we can push images first.
images, indexes := map[name.Reference]Taggable{}, map[name.Reference]Taggable{}
for ref, i := range m {
if img, ok := i.(v1.Image); ok {
images[ref] = i
if err := addImageBlobs(img, blobs, o.allowNondistributableArtifacts); err != nil {
return err
}
continue
}
if idx, ok := i.(v1.ImageIndex); ok {
indexes[ref] = i
newManifests, err = addIndexBlobs(idx, blobs, repo, newManifests, 0, o.allowNondistributableArtifacts)
if err != nil {
return err
}
continue
}
return fmt.Errorf("pushable resource was not Image or ImageIndex: %T", i)
}

// Determine if any of the layers are Mountable, because if so we need
// to request Pull scope too.
ls := []v1.Layer{}
for _, l := range blobs {
ls = append(ls, l)
}
w, err := makeWriter(o.context, repo, ls, o)
if err != nil {
return err
if o.progress != nil {
defer func() { o.progress.Close(rerr) }()
}
p := newPusher(o)

// Collect the total size of blobs and manifests we're about to write.
if w.progress != nil {
defer func() { w.progress.Close(rerr) }()

for _, b := range blobs {
size, err := b.Size()
if err != nil {
return err
}
w.progress.total(size)
}
countManifest := func(t Taggable) error {
b, err := t.RawManifest()
if err != nil {
return err
}
w.progress.total(int64(len(b)))
return nil
}
for _, i := range images {
if err := countManifest(i); err != nil {
return err
}
}
for _, nm := range newManifests {
for _, i := range nm {
if err := countManifest(i); err != nil {
return err
}
}
}
for _, i := range indexes {
if err := countManifest(i); err != nil {
return err
}
}
}
g, ctx := errgroup.WithContext(o.context)
g.SetLimit(o.jobs)

// Upload individual blobs and collect any errors.
blobChan := make(chan v1.Layer, 2*o.jobs)
ctx := o.context
g, gctx := errgroup.WithContext(o.context)
for i := 0; i < o.jobs; i++ {
// Start N workers consuming blobs to upload.
for ref, t := range todo {
ref, t := ref, t
g.Go(func() error {
for b := range blobChan {
if err := w.uploadOne(gctx, b); err != nil {
return err
}
}
return nil
return p.Push(ctx, ref, t)
})
}
g.Go(func() error {
defer close(blobChan)
for _, b := range blobs {
select {
case blobChan <- b:
case <-gctx.Done():
return gctx.Err()
}
}
return nil
})
if err := g.Wait(); err != nil {
return err
}

commitMany := func(ctx context.Context, m map[name.Reference]Taggable) error {
g, ctx := errgroup.WithContext(ctx)
// With all of the constituent elements uploaded, upload the manifests
// to commit the images and indexes, and collect any errors.
type task struct {
i Taggable
ref name.Reference
}
taskChan := make(chan task, 2*o.jobs)
for i := 0; i < o.jobs; i++ {
// Start N workers consuming tasks to upload manifests.
g.Go(func() error {
for t := range taskChan {
if err := w.commitManifest(ctx, t.i, t.ref); err != nil {
return err
}
}
return nil
})
}
go func() {
for ref, i := range m {
taskChan <- task{i, ref}
}
close(taskChan)
}()
return g.Wait()
}
// Push originally requested image manifests. These have no
// dependencies.
if err := commitMany(ctx, images); err != nil {
return err
}
// Push new manifests from lowest levels up.
for i := len(newManifests) - 1; i >= 0; i-- {
if err := commitMany(ctx, newManifests[i]); err != nil {
return err
}
}
// Push originally requested index manifests, which might depend on
// newly discovered manifests.

return commitMany(ctx, indexes)
}

// addIndexBlobs adds blobs to the set of blobs we intend to upload, and
// returns the latest copy of the ordered collection of manifests to upload.
func addIndexBlobs(idx v1.ImageIndex, blobs map[v1.Hash]v1.Layer, repo name.Repository, newManifests []map[name.Reference]Taggable, lvl int, allowNondistributableArtifacts bool) ([]map[name.Reference]Taggable, error) {
if lvl > len(newManifests)-1 {
newManifests = append(newManifests, map[name.Reference]Taggable{})
}

im, err := idx.IndexManifest()
if err != nil {
return nil, err
}
for _, desc := range im.Manifests {
switch desc.MediaType {
case types.OCIImageIndex, types.DockerManifestList:
idx, err := idx.ImageIndex(desc.Digest)
if err != nil {
return nil, err
}
newManifests, err = addIndexBlobs(idx, blobs, repo, newManifests, lvl+1, allowNondistributableArtifacts)
if err != nil {
return nil, err
}

// Also track the sub-index manifest to upload later by digest.
newManifests[lvl][repo.Digest(desc.Digest.String())] = idx
case types.OCIManifestSchema1, types.DockerManifestSchema2:
img, err := idx.Image(desc.Digest)
if err != nil {
return nil, err
}
if err := addImageBlobs(img, blobs, allowNondistributableArtifacts); err != nil {
return nil, err
}

// Also track the sub-image manifest to upload later by digest.
newManifests[lvl][repo.Digest(desc.Digest.String())] = img
default:
// Workaround for #819.
if wl, ok := idx.(withLayer); ok {
layer, err := wl.Layer(desc.Digest)
if err != nil {
return nil, err
}
if err := addLayerBlob(layer, blobs, allowNondistributableArtifacts); err != nil {
return nil, err
}
} else {
return nil, fmt.Errorf("unknown media type: %v", desc.MediaType)
}
}
}
return newManifests, nil
}

func addLayerBlob(l v1.Layer, blobs map[v1.Hash]v1.Layer, allowNondistributableArtifacts bool) error {
// Ignore foreign layers.
mt, err := l.MediaType()
if err != nil {
return err
}

if mt.IsDistributable() || allowNondistributableArtifacts {
d, err := l.Digest()
if err != nil {
return err
}

blobs[d] = l
}

return nil
}

func addImageBlobs(img v1.Image, blobs map[v1.Hash]v1.Layer, allowNondistributableArtifacts bool) error {
ls, err := img.Layers()
if err != nil {
return err
}
// Collect all layers.
for _, l := range ls {
if err := addLayerBlob(l, blobs, allowNondistributableArtifacts); err != nil {
return err
}
}

// Collect config blob.
cl, err := partial.ConfigLayer(img)
if err != nil {
return err
}
return addLayerBlob(cl, blobs, allowNondistributableArtifacts)
return g.Wait()
}
Loading

0 comments on commit 6743ec9

Please sign in to comment.