Skip to content

Commit

Permalink
Add remote.MultiWrite (#798)
Browse files Browse the repository at this point in the history
This method takes multiple images or manifest lists, and attempts to
efficiently upload all of them in parallel, deduplicating shared layer
and config blobs and avoiding unnecessary auth token exchanges.

Blobs are uploaded in parallel first, then manifests are tagged in order
with as much parallelism as we can support.

This method is currently limited in that:
- all images must be getting uploaded to the same repository (to
  simplify auth and transports), and
- no image may contain streaming layers (to simplify digests), but both
  of these can be removed in future changes.
- blobs are uploaded and manifests are tagged in parallel with a
  constant max parallelism, which should be a user-controlled option.
  • Loading branch information
imjasonh committed Nov 1, 2020
1 parent b7100fc commit ab3252b
Show file tree
Hide file tree
Showing 4 changed files with 421 additions and 1 deletion.
4 changes: 3 additions & 1 deletion pkg/crane/push.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,5 +36,7 @@ func Push(img v1.Image, dst string, opt ...Option) error {
if err != nil {
return fmt.Errorf("parsing tag %q: %v", dst, err)
}
return remote.Write(tag, img, o.remote...)
return remote.MultiWrite(map[name.Reference]remote.Taggable{
tag: img,
}, o.remote...)
}
30 changes: 30 additions & 0 deletions pkg/registry/manifest.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ import (
"net/http"
"strings"
"sync"

v1 "github.com/google/go-containerregistry/pkg/v1"
"github.com/google/go-containerregistry/pkg/v1/types"
)

type manifest struct {
Expand Down Expand Up @@ -123,6 +126,33 @@ func (m *manifests) handle(resp http.ResponseWriter, req *http.Request) *regErro
blob: b.Bytes(),
contentType: req.Header.Get("Content-Type"),
}

// If the manifest is a manifest list, check that the manifest
// list's constituent manifests are already uploaded.
// This isn't strictly required by the registry API, but some
// registries require this.
if mf.contentType == string(types.OCIImageIndex) ||
mf.contentType == string(types.DockerManifestList) {

im, err := v1.ParseIndexManifest(b)
if err != nil {
return &regError{
Status: http.StatusNotFound,
Code: "MANIFEST_UNKNOWN",
Message: err.Error(),
}
}
for _, desc := range im.Manifests {
if _, found := m.manifests[repo][desc.Digest.String()]; !found {
return &regError{
Status: http.StatusNotFound,
Code: "MANIFEST_UNKNOWN",
Message: fmt.Sprintf("Sub-manifest %q not found", desc.Digest),
}
}
}
}

// Allow future references by target (tag) and immutable digest.
// See https://docs.docker.com/engine/reference/commandline/pull/#pull-an-image-by-digest-immutable-identifier.
m.manifests[repo][target] = mf
Expand Down
245 changes: 245 additions & 0 deletions pkg/v1/remote/multi_write.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,245 @@
// Copyright 2020 Google LLC All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package remote

import (
"fmt"
"net/http"

"github.com/google/go-containerregistry/pkg/name"
v1 "github.com/google/go-containerregistry/pkg/v1"
"github.com/google/go-containerregistry/pkg/v1/partial"
"github.com/google/go-containerregistry/pkg/v1/remote/transport"
"github.com/google/go-containerregistry/pkg/v1/types"
"golang.org/x/sync/errgroup"
)

// Parallelism of blob and manifest uploads
// TODO(jasonhall): Make this an Option.
const jobs = 4

// MultiWrite writes the given Images or ImageIndexes to the given refs, as
// efficiently as possible, by deduping shared layer blobs and uploading layers
// in parallel, then uploading all manifests in parallel.
//
// Current limitations:
// - All refs must share the same repository.
// - Images cannot consist of stream.Layers.
func MultiWrite(m map[name.Reference]Taggable, options ...Option) error {
// Determine the repository being pushed to; if asked to push to
// multiple repositories, give up.
var repo, zero name.Repository
for ref := range m {
if repo == zero {
repo = ref.Context()
} else if ref.Context() != repo {
return fmt.Errorf("MultiWrite can only push to the same repository (saw %q and %q)", repo, ref.Context())
}
}

// Collect unique blobs (layers and config blobs).
blobs := map[v1.Hash]v1.Layer{}
newManifests := []map[name.Reference]Taggable{}
// Separate originally requested images and indexes, so we can push images first.
images, indexes := map[name.Reference]Taggable{}, map[name.Reference]Taggable{}
var err error
for ref, i := range m {
if img, ok := i.(v1.Image); ok {
images[ref] = i
if err := addImageBlobs(img, blobs); err != nil {
return err
}
continue
}
if idx, ok := i.(v1.ImageIndex); ok {
indexes[ref] = i
newManifests, err = addIndexBlobs(idx, blobs, repo, newManifests, 0)
if err != nil {
return err
}
continue
}
return fmt.Errorf("pushable resource was not Image or ImageIndex: %T", i)
}

o, err := makeOptions(repo, options...)
if err != nil {
return err
}
// Determine if any of the layers are Mountable, because if so we need
// to request Pull scope too.
ls := []v1.Layer{}
for _, l := range blobs {
ls = append(ls, l)
}
scopes := scopesForUploadingImage(repo, ls)
tr, err := transport.New(repo.Registry, o.auth, o.transport, scopes)
if err != nil {
return err
}
w := writer{
repo: repo,
client: &http.Client{Transport: tr},
context: o.context,
}

// Upload individual blobs and collect any errors.
blobChan := make(chan v1.Layer, 2*jobs)
var g errgroup.Group
for i := 0; i < jobs; i++ {
// Start N workers consuming blobs to upload.
g.Go(func() error {
for b := range blobChan {
if err := w.uploadOne(b); err != nil {
return err
}
}
return nil
})
}
go func() {
for _, b := range blobs {
blobChan <- b
}
close(blobChan)
}()
if err := g.Wait(); err != nil {
return err
}

commitMany := func(m map[name.Reference]Taggable) error {
// With all of the constituent elements uploaded, upload the manifests
// to commit the images and indexes, and collect any errors.
type task struct {
i Taggable
ref name.Reference
}
taskChan := make(chan task, 2*jobs)
for i := 0; i < jobs; i++ {
// Start N workers consuming tasks to upload manifests.
g.Go(func() error {
for t := range taskChan {
if err := w.commitImage(t.i, t.ref); err != nil {
return err
}
}
return nil
})
}
go func() {
for ref, i := range m {
taskChan <- task{i, ref}
}
close(taskChan)
}()
return g.Wait()
}
// Push originally requested image manifests. These have no
// dependencies.
if err := commitMany(images); err != nil {
return err
}
// Push new manifests from lowest levels up.
for i := len(newManifests) - 1; i >= 0; i-- {
if err := commitMany(newManifests[i]); err != nil {
return err
}
}
// Push originally requested index manifests, which might depend on
// newly discovered manifests.
return commitMany(indexes)

}

// addIndexBlobs adds blobs to the set of blobs we intend to upload, and
// returns the latest copy of the ordered collection of manifests to upload.
func addIndexBlobs(idx v1.ImageIndex, blobs map[v1.Hash]v1.Layer, repo name.Repository, newManifests []map[name.Reference]Taggable, lvl int) ([]map[name.Reference]Taggable, error) {
if lvl > len(newManifests)-1 {
newManifests = append(newManifests, map[name.Reference]Taggable{})
}

im, err := idx.IndexManifest()
if err != nil {
return nil, err
}
for _, desc := range im.Manifests {
switch desc.MediaType {
case types.OCIImageIndex, types.DockerManifestList:
idx, err := idx.ImageIndex(desc.Digest)
if err != nil {
return nil, err
}
newManifests, err = addIndexBlobs(idx, blobs, repo, newManifests, lvl+1)
if err != nil {
return nil, err
}

// Also track the sub-index manifest to upload later by digest.
newManifests[lvl][repo.Digest(desc.Digest.String())] = idx
case types.OCIManifestSchema1, types.DockerManifestSchema2:
img, err := idx.Image(desc.Digest)
if err != nil {
return nil, err
}
if err := addImageBlobs(img, blobs); err != nil {
return nil, err
}

// Also track the sub-image manifest to upload later by digest.
newManifests[lvl][repo.Digest(desc.Digest.String())] = img
default:
return nil, fmt.Errorf("unknown media type: %v", desc.MediaType)
}
}
return newManifests, nil
}

func addImageBlobs(img v1.Image, blobs map[v1.Hash]v1.Layer) error {
ls, err := img.Layers()
if err != nil {
return err
}
// Collect all layers.
for _, l := range ls {
d, err := l.Digest()
if err != nil {
return err
}

// Ignore foreign layers.
mt, err := l.MediaType()
if err != nil {
return err
}
if !mt.IsDistributable() {
// TODO(jonjohnsonjr): Add "allow-nondistributable-artifacts" option.
continue
}

blobs[d] = l
}

// Collect config blob.
cl, err := partial.ConfigLayer(img)
if err != nil {
return err
}
cld, err := cl.Digest()
if err != nil {
return err
}
blobs[cld] = cl
return nil
}
Loading

0 comments on commit ab3252b

Please sign in to comment.