Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ability to cache/push compressed layers #517

Closed
wants to merge 16 commits into from
Closed
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 22 additions & 10 deletions pkg/v1/partial/uncompressed.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,9 @@ type uncompressedImageExtender struct {

lock sync.Mutex
manifest *v1.Manifest

layers []v1.Layer
layerOnce sync.Once
}

// Assert that our extender type completes the v1.Image interface
Expand Down Expand Up @@ -192,19 +195,28 @@ func (i *uncompressedImageExtender) ConfigFile() (*v1.ConfigFile, error) {

// Layers implements v1.Image
func (i *uncompressedImageExtender) Layers() ([]v1.Layer, error) {
diffIDs, err := DiffIDs(i)
if err != nil {
return nil, err
}
ls := make([]v1.Layer, 0, len(diffIDs))
for _, h := range diffIDs {
l, err := i.LayerByDiffID(h)
var outerErr error
i.layerOnce.Do(func() {
diffIDs, err := DiffIDs(i)
if err != nil {
return nil, err
outerErr = err
return
}
i.layers = make([]v1.Layer, 0, len(diffIDs))
for _, h := range diffIDs {
l, err := i.LayerByDiffID(h)
if err != nil {
outerErr = err
return
}
i.layers = append(i.layers, l)
}
ls = append(ls, l)
})
if outerErr != nil {
return nil, outerErr
}
return ls, nil

return i.layers, nil
}

// LayerByDiffID implements v1.Image
Expand Down
84 changes: 70 additions & 14 deletions pkg/v1/remote/write.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,17 @@ import (
"errors"
"fmt"
"io"
"io/ioutil"
"net/http"
"net/url"
"os"
"time"

"github.com/google/go-containerregistry/pkg/internal/retry"
"github.com/google/go-containerregistry/pkg/logs"
"github.com/google/go-containerregistry/pkg/name"
v1 "github.com/google/go-containerregistry/pkg/v1"
"github.com/google/go-containerregistry/pkg/v1/cache"
"github.com/google/go-containerregistry/pkg/v1/partial"
"github.com/google/go-containerregistry/pkg/v1/remote/transport"
"github.com/google/go-containerregistry/pkg/v1/types"
Expand Down Expand Up @@ -56,16 +59,37 @@ func Write(ref name.Reference, img v1.Image, options ...Option) error {
if err != nil {
return err
}
w := writer{
ref: ref,
client: &http.Client{Transport: tr},
w, err := newWriter(ref, &http.Client{Transport: tr})
if err != nil {
return err
}
defer w.Close()

// Upload individual layers in goroutines and collect any errors.
// If we can dedupe by the layer digest, try to do so. If we can't determine
// the digest for whatever reason, we can't dedupe and might re-upload.
var g errgroup.Group
uploaded := map[v1.Hash]bool{}

// precompute digests in parallel
// computing digests is expensive, and the next stage does it serially
// since digests are cached, there is no need to store them in an array
for _, l := range ls {
l := l

g.Go(func() error {
// Streaming layers calculate their digests while uploading them. Assume
// an error here indicates we need to upload the layer.
_, _ = l.Digest()
return nil
})
}
err = g.Wait()
if err != nil {
return err
}

g = errgroup.Group{}
for _, l := range ls {
l := l

Expand Down Expand Up @@ -133,6 +157,27 @@ func Write(ref name.Reference, img v1.Image, options ...Option) error {
type writer struct {
ref name.Reference
client *http.Client

layerCachePath string // this is where the cached layers will be stored
layerCache cache.Cache // a cache to store compressed layers
}

func newWriter(ref name.Reference, client *http.Client) (*writer, error) {
cacheDir, err := ioutil.TempDir("", "go-containerregistry-layercache")
if err != nil {
return nil, err
}

return &writer{
ref: ref,
client: client,
layerCachePath: cacheDir,
layerCache: cache.NewFilesystemCache(cacheDir),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't actually want to embed the cache into the writer -- callers should wrap the image they are going to write with a cache.Image. This isn't very discoverable, so I opened a PR to add an example to the godoc: #522

}, nil
}

func (w *writer) Close() error {
return os.RemoveAll(w.layerCachePath)
}

// url returns a url.Url for the specified path in the context of this remote image reference.
Expand Down Expand Up @@ -319,6 +364,18 @@ func (w *writer) uploadOne(l v1.Layer) error {
}
}

// pull the layer into RAM before sending it
blob, err := l.Compressed()
if err != nil {
return err
}
blobContent, err := ioutil.ReadAll(blob)
if err != nil {
return err
}
blob.Close()
compressedContentReader := ioutil.NopCloser(bytes.NewBuffer(blobContent))

tryUpload := func() error {
location, mounted, err := w.initiateUpload(from, mount)
if err != nil {
Expand All @@ -332,11 +389,7 @@ func (w *writer) uploadOne(l v1.Layer) error {
return nil
}

blob, err := l.Compressed()
if err != nil {
return err
}
location, err = w.streamBlob(blob, location)
location, err = w.streamBlob(compressedContentReader, location)
if err != nil {
return err
}
Expand All @@ -345,6 +398,7 @@ func (w *writer) uploadOne(l v1.Layer) error {
if err != nil {
return err
}

digest := h.String()

if err := w.commitBlob(location, digest); err != nil {
Expand Down Expand Up @@ -447,10 +501,11 @@ func WriteIndex(ref name.Reference, ii v1.ImageIndex, options ...Option) error {
if err != nil {
return err
}
w := writer{
ref: ref,
client: &http.Client{Transport: tr},
w, err := newWriter(ref, &http.Client{Transport: tr})
if err != nil {
return err
}
defer w.Close()

for _, desc := range index.Manifests {
ref, err := name.ParseReference(fmt.Sprintf("%s@%s", ref.Context(), desc.Digest), name.StrictValidation)
Expand Down Expand Up @@ -503,10 +558,11 @@ func WriteLayer(ref name.Digest, layer v1.Layer, options ...Option) error {
if err != nil {
return err
}
w := writer{
ref: ref,
client: &http.Client{Transport: tr},
w, err := newWriter(ref, &http.Client{Transport: tr})
if err != nil {
return err
}
defer w.Close()

return w.uploadOne(layer)
}