From 6ffd3785c6d442ed7e9f6dfd435e644eb093862e Mon Sep 17 00:00:00 2001 From: Ben Krieger Date: Fri, 31 Dec 2021 21:45:28 -0500 Subject: [PATCH 1/9] Add layout.WriteLayer for streaming layers and undersized layers from incomplete downloads WriteLayer is an alternative to WriteBlob which can be used with streaming layers which have not yet calculated a digest or size. A temporary file is first written and then renamed when the reader has been fully consumed. Also, existing layers which do not have the correct size (and thus invalid hashes, but the hash is not checked) are overwritten. {Append,Replace,Write}{Image,Layer} functions utilize WriteLayer instead of WriteBlob to correctly overwrite undersized layers which were previously ignored without error. Undersized layers should no longer occur, since WriteLayer writes to a temporary file first, but when WriteBlob was used, these were common due to incomplete downloads. --- pkg/v1/layout/write.go | 68 ++++++++-- pkg/v1/layout/write_test.go | 258 ++++++++++++++++++++++++++++++++++++ 2 files changed, 313 insertions(+), 13 deletions(-) diff --git a/pkg/v1/layout/write.go b/pkg/v1/layout/write.go index f912b124e..f27f29340 100644 --- a/pkg/v1/layout/write.go +++ b/pkg/v1/layout/write.go @@ -16,6 +16,8 @@ package layout import ( "bytes" + "crypto/rand" + "encoding/hex" "encoding/json" "io" "io/ioutil" @@ -221,35 +223,75 @@ func (l Path) WriteFile(name string, data []byte, perm os.FileMode) error { // WriteBlob copies a file to the blobs/ directory in the Path from the given ReadCloser at // blobs/{hash.Algorithm}/{hash.Hex}. func (l Path) WriteBlob(hash v1.Hash, r io.ReadCloser) error { + return l.writeBlob(hash, 0, r, nil) +} + +func (l Path) writeBlob(hash v1.Hash, size int64, r io.ReadCloser, renamer func() (v1.Hash, error)) error { dir := l.path("blobs", hash.Algorithm) if err := os.MkdirAll(dir, os.ModePerm); err != nil && !os.IsExist(err) { return err } + // Check if blob already exists and is the correct size file := filepath.Join(dir, hash.Hex) - if _, err := os.Stat(file); err == nil { - // Blob already exists, that's fine. + if s, err := os.Stat(file); err == nil && (s.Size() == size || size == 0) { return nil } + + // If a rename func was provided, use a temporary file suffix + if renamer != nil { + file += ".tmp" + } w, err := os.Create(file) if err != nil { return err } defer w.Close() - _, err = io.Copy(w, r) - return err + // Write to file and optionally rename + if _, err := io.Copy(w, r); err != nil || renamer == nil { + return err + } + finalHash, err := renamer() + if err != nil { + return err + } + // Always close file before renaming + if err := w.Close(); err != nil { + return err + } + return os.Rename(file, l.path("blobs", finalHash.Algorithm, finalHash.Hex)) } -// TODO: A streaming version of WriteBlob so we don't have to know the hash -// before we write it. - -// TODO: For streaming layers we should write to a tmp file then Rename to the -// final digest. -func (l Path) writeLayer(layer v1.Layer) error { +// WriteLayer writes the compressed layer to a blob. Unlike WriteBlob it will +// write to a temporary file (suffixed with .tmp) within the layout until the +// compressed reader is fully consumed and written to disk. Also unlike +// WriteBlob, it will not skip writing and exit without error when a blob file +// exists, but does not have the correct size. (The blob hash is not +// considered, because it may be expensive to compute.) +func (l Path) WriteLayer(layer v1.Layer) error { d, err := layer.Digest() if err != nil { - return err + // Allow digest errors, since streams may not have calculated the hash + // yet. Instead, use a random value for the digest and require it to be + // calculated after writing to a temp file and before renaming to the + // final path. + var randHash [32]byte + if _, err := rand.Read(randHash[:]); err != nil { + return err + } + d = v1.Hash{Algorithm: "sha256", Hex: hex.EncodeToString(randHash[:])} + } + + s, err := layer.Size() + if err != nil { + // Allow size errors, since streams may not have calculated the size + // yet. Instead, use zero as a sentinel value meaning that no size + // comparison can be done and any sized blob file should be considered + // valid and not overwritten. + // + // TODO: Provide an option to always overwrite blobs. + s = 0 } r, err := layer.Compressed() @@ -257,7 +299,7 @@ func (l Path) writeLayer(layer v1.Layer) error { return err } - return l.WriteBlob(d, r) + return l.writeBlob(d, s, r, layer.Digest) } // RemoveBlob removes a file from the blobs directory in the Path @@ -290,7 +332,7 @@ func (l Path) WriteImage(img v1.Image) error { for _, layer := range layers { layer := layer g.Go(func() error { - return l.writeLayer(layer) + return l.WriteLayer(layer) }) } if err := g.Wait(); err != nil { diff --git a/pkg/v1/layout/write_test.go b/pkg/v1/layout/write_test.go index 6466dbb7b..59f0c6cfc 100644 --- a/pkg/v1/layout/write_test.go +++ b/pkg/v1/layout/write_test.go @@ -1,6 +1,21 @@ +// Copyright 2022 Google LLC All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + package layout import ( + "archive/tar" "bytes" "io" "io/ioutil" @@ -12,7 +27,9 @@ import ( v1 "github.com/google/go-containerregistry/pkg/v1" "github.com/google/go-containerregistry/pkg/v1/empty" "github.com/google/go-containerregistry/pkg/v1/match" + "github.com/google/go-containerregistry/pkg/v1/mutate" "github.com/google/go-containerregistry/pkg/v1/random" + "github.com/google/go-containerregistry/pkg/v1/stream" "github.com/google/go-containerregistry/pkg/v1/types" "github.com/google/go-containerregistry/pkg/v1/validate" ) @@ -463,3 +480,244 @@ func TestRemoveBlob(t *testing.T) { t.Fatal("still existed after deletion") } } + +func TestStreamingWriteLayer(t *testing.T) { + // need to set up a basic path + tmp, err := ioutil.TempDir("", "streaming-write-layer-test") + if err != nil { + t.Fatal(err) + } + + defer os.RemoveAll(tmp) + + var ii v1.ImageIndex = empty.Index + l, err := Write(tmp, ii) + if err != nil { + t.Fatal(err) + } + + // create a random streaming image and persist + pr, pw := io.Pipe() + tw := tar.NewWriter(pw) + go func() { + pw.CloseWithError(func() error { + body := "test file" + if err := tw.WriteHeader(&tar.Header{ + Name: "test.txt", + Mode: 0600, + Size: int64(len(body)), + Typeflag: tar.TypeReg, + }); err != nil { + return err + } + if _, err := tw.Write([]byte(body)); err != nil { + return err + } + return tw.Close() + }()) + }() + img, err := mutate.Append(empty.Image, mutate.Addendum{ + Layer: stream.NewLayer(pr), + }) + if err != nil { + t.Fatalf("creating random streaming image failed: %v", err) + } + if _, err := img.Digest(); err == nil { + t.Fatal("digesting image before stream is consumed; (v1.Image).Digest() = nil, expected err") + } + // AppendImage uses WriteLayer + if err := l.AppendImage(img); err != nil { + t.Fatalf("(Path).AppendImage() = %v", err) + } + + // Check that image was persisted and is valid + imgDigest, err := img.Digest() + if err != nil { + t.Fatalf("(v1.Image).Digest() = %v", err) + } + img, err = l.Image(imgDigest) + if err != nil { + t.Fatalf("error loading image after WriteLayer for validation; (Path).Image = %v", err) + } + if err := validate.Image(img); err != nil { + t.Fatalf("validate.Image() = %v", err) + } +} + +func TestOverwriteWithWriteLayer(t *testing.T) { + // need to set up a basic path + tmp, err := ioutil.TempDir("", "overwrite-with-write-layer-test") + if err != nil { + t.Fatal(err) + } + + defer os.RemoveAll(tmp) + + var ii v1.ImageIndex = empty.Index + l, err := Write(tmp, ii) + if err != nil { + t.Fatal(err) + } + + // create a random image and persist + img, err := random.Image(1024, 1) + if err != nil { + t.Fatalf("random.Image() = %v", err) + } + imgDigest, err := img.Digest() + if err != nil { + t.Fatalf("(v1.Image).Digest() = %v", err) + } + if err := l.AppendImage(img); err != nil { + t.Fatalf("(Path).AppendImage() = %v", err) + } + if err := validate.Image(img); err != nil { + t.Fatalf("validate.Image() = %v", err) + } + + // get the random image's layer + layers, err := img.Layers() + if err != nil { + t.Fatal(err) + } + if n := len(layers); n != 1 { + t.Fatalf("expected image with 1 layer, got %d", n) + } + + layer := layers[0] + layerDigest, err := layer.Digest() + if err != nil { + t.Fatalf("(v1.Layer).Digest() = %v", err) + } + + // truncate the layer contents on disk + completeLayerBytes, err := l.Bytes(layerDigest) + if err != nil { + t.Fatalf("(Path).Bytes() = %v", err) + } + truncatedLayerBytes := completeLayerBytes[:512] + + path := l.path("blobs", layerDigest.Algorithm, layerDigest.Hex) + if err := ioutil.WriteFile(path, truncatedLayerBytes, os.ModePerm); err != nil { + t.Fatalf("ioutil.WriteFile(layerPath, truncated) = %v", err) + } + + // ensure validation fails + img, err = l.Image(imgDigest) + if err != nil { + t.Fatalf("error loading truncated image for validation; (Path).Image = %v", err) + } + if err := validate.Image(img); err == nil { + t.Fatal("validating image after truncating layer; validate.Image() = nil, expected err") + } + + // try writing expected contents with WriteBlob + if err := l.WriteBlob(layerDigest, ioutil.NopCloser(bytes.NewBuffer(completeLayerBytes))); err != nil { + t.Fatalf("error attempting to overwrite truncated layer with valid layer; (Path).WriteBlob = %v", err) + } + + // validation should still fail + img, err = l.Image(imgDigest) + if err != nil { + t.Fatalf("error loading truncated image after WriteBlob for validation; (Path).Image = %v", err) + } + if err := validate.Image(img); err == nil { + t.Fatal("validating image after attempting repair of truncated layer with WriteBlob; validate.Image() = nil, expected err") + } + + // try writing expected contents with WriteLayer + if err := l.WriteLayer(layer); err != nil { + t.Fatalf("error attempting to overwrite truncated layer with valid layer; (Path).WriteLayer = %v", err) + } + + // validation should now succeed + img, err = l.Image(imgDigest) + if err != nil { + t.Fatalf("error loading truncated image after WriteLayer for validation; (Path).Image = %v", err) + } + if err := validate.Image(img); err != nil { + t.Fatalf("validating image after attempting repair of truncated layer with WriteLayer; validate.Image() = %v", err) + } +} + +func TestOverwriteWithReplaceImage(t *testing.T) { + // need to set up a basic path + tmp, err := ioutil.TempDir("", "overwrite-with-replace-image-test") + if err != nil { + t.Fatal(err) + } + + defer os.RemoveAll(tmp) + + var ii v1.ImageIndex = empty.Index + l, err := Write(tmp, ii) + if err != nil { + t.Fatal(err) + } + + // create a random image and persist + img, err := random.Image(1024, 1) + if err != nil { + t.Fatalf("random.Image() = %v", err) + } + imgDigest, err := img.Digest() + if err != nil { + t.Fatalf("(v1.Image).Digest() = %v", err) + } + if err := l.AppendImage(img); err != nil { + t.Fatalf("(Path).AppendImage() = %v", err) + } + if err := validate.Image(img); err != nil { + t.Fatalf("validate.Image() = %v", err) + } + + // get the random image's layer + layers, err := img.Layers() + if err != nil { + t.Fatal(err) + } + if n := len(layers); n != 1 { + t.Fatalf("expected image with 1 layer, got %d", n) + } + + layer := layers[0] + layerDigest, err := layer.Digest() + if err != nil { + t.Fatalf("(v1.Layer).Digest() = %v", err) + } + + // truncate the layer contents on disk + completeLayerBytes, err := l.Bytes(layerDigest) + if err != nil { + t.Fatalf("(Path).Bytes() = %v", err) + } + truncatedLayerBytes := completeLayerBytes[:512] + + path := l.path("blobs", layerDigest.Algorithm, layerDigest.Hex) + if err := ioutil.WriteFile(path, truncatedLayerBytes, os.ModePerm); err != nil { + t.Fatalf("ioutil.WriteFile(layerPath, truncated) = %v", err) + } + + // ensure validation fails + truncatedImg, err := l.Image(imgDigest) + if err != nil { + t.Fatalf("error loading truncated image for validation; (Path).Image = %v", err) + } + if err := validate.Image(truncatedImg); err == nil { + t.Fatal("validating image after truncating layer; validate.Image() = nil, expected err") + } + + // try writing expected contents with ReplaceImage + if err := l.ReplaceImage(img, match.Digests(imgDigest)); err != nil { + t.Fatalf("error attempting to overwrite truncated layer with valid layer; (Path).ReplaceImage = %v", err) + } + + // validation should now succeed + repairedImg, err := l.Image(imgDigest) + if err != nil { + t.Fatalf("error loading truncated image after ReplaceImage for validation; (Path).Image = %v", err) + } + if err := validate.Image(repairedImg); err != nil { + t.Fatalf("validating image after attempting repair of truncated layer with ReplaceImage; validate.Image() = %v", err) + } +} From a559ccd1198fb353acfa3f61148b80d86652e4be Mon Sep 17 00:00:00 2001 From: Ben Krieger Date: Sat, 1 Jan 2022 14:29:45 -0500 Subject: [PATCH 2/9] Improve error messages for validate.Image and validate.Index when layer blob content is truncated --- pkg/v1/layout/write_test.go | 3 +++ pkg/v1/validate/image.go | 26 +++++++++++++++----------- 2 files changed, 18 insertions(+), 11 deletions(-) diff --git a/pkg/v1/layout/write_test.go b/pkg/v1/layout/write_test.go index 59f0c6cfc..ea2b5e7ee 100644 --- a/pkg/v1/layout/write_test.go +++ b/pkg/v1/layout/write_test.go @@ -21,6 +21,7 @@ import ( "io/ioutil" "log" "os" + "strings" "testing" "github.com/google/go-cmp/cmp" @@ -705,6 +706,8 @@ func TestOverwriteWithReplaceImage(t *testing.T) { } if err := validate.Image(truncatedImg); err == nil { t.Fatal("validating image after truncating layer; validate.Image() = nil, expected err") + } else if strings.Contains(err.Error(), "unexpected EOF") { + t.Fatalf("validating image after truncating layer; validate.Image() error is not helpful: %v", err) } // try writing expected contents with ReplaceImage diff --git a/pkg/v1/validate/image.go b/pkg/v1/validate/image.go index de9bdda12..0820efec1 100644 --- a/pkg/v1/validate/image.go +++ b/pkg/v1/validate/image.go @@ -18,6 +18,7 @@ import ( "bytes" "errors" "fmt" + "io" "strings" "github.com/google/go-cmp/cmp" @@ -113,12 +114,25 @@ func validateLayers(img v1.Image, opt ...Option) error { return layersExist(layers) } + cf, err := img.ConfigFile() + if err != nil { + return err + } + + m, err := img.Manifest() + if err != nil { + return err + } + digests := []v1.Hash{} diffids := []v1.Hash{} udiffids := []v1.Hash{} sizes := []int64{} - for _, layer := range layers { + for i, layer := range layers { cl, err := computeLayer(layer) + if errors.Is(err, io.ErrUnexpectedEOF) { + return fmt.Errorf("undersized layer[%d] content: Manifest.Layers[%d].Size=%d", i, i, m.Layers[i].Size) + } if err != nil { return err } @@ -130,16 +144,6 @@ func validateLayers(img v1.Image, opt ...Option) error { sizes = append(sizes, cl.size) } - cf, err := img.ConfigFile() - if err != nil { - return err - } - - m, err := img.Manifest() - if err != nil { - return err - } - errs := []string{} for i, layer := range layers { digest, err := layer.Digest() From ab33ba12efa9290f70514ed25c9c732fa0b714dd Mon Sep 17 00:00:00 2001 From: Ben Krieger Date: Mon, 3 Jan 2022 16:21:40 -0500 Subject: [PATCH 3/9] Unexport `layout.WriteLayer`; Implement suggested changes for validateLayers and temp file naming --- pkg/v1/layout/write.go | 71 +++++++++++++++++++++++-------------- pkg/v1/layout/write_test.go | 16 ++++----- pkg/v1/stream/layer.go | 18 ++++++---- pkg/v1/validate/image.go | 27 ++++++++------ 4 files changed, 80 insertions(+), 52 deletions(-) diff --git a/pkg/v1/layout/write.go b/pkg/v1/layout/write.go index f27f29340..84aecbe69 100644 --- a/pkg/v1/layout/write.go +++ b/pkg/v1/layout/write.go @@ -16,9 +16,9 @@ package layout import ( "bytes" - "crypto/rand" - "encoding/hex" "encoding/json" + "errors" + "fmt" "io" "io/ioutil" "os" @@ -28,6 +28,7 @@ import ( "github.com/google/go-containerregistry/pkg/v1/match" "github.com/google/go-containerregistry/pkg/v1/mutate" "github.com/google/go-containerregistry/pkg/v1/partial" + "github.com/google/go-containerregistry/pkg/v1/stream" "github.com/google/go-containerregistry/pkg/v1/types" "golang.org/x/sync/errgroup" ) @@ -223,10 +224,15 @@ func (l Path) WriteFile(name string, data []byte, perm os.FileMode) error { // WriteBlob copies a file to the blobs/ directory in the Path from the given ReadCloser at // blobs/{hash.Algorithm}/{hash.Hex}. func (l Path) WriteBlob(hash v1.Hash, r io.ReadCloser) error { - return l.writeBlob(hash, 0, r, nil) + return l.writeBlob(hash, -1, r, nil) } -func (l Path) writeBlob(hash v1.Hash, size int64, r io.ReadCloser, renamer func() (v1.Hash, error)) error { +func (l Path) writeBlob(hash v1.Hash, size int64, r io.Reader, renamer func() (v1.Hash, error)) error { + if hash.Hex == "" && renamer == nil { + // Should we panic, since this is a programming error? + return errors.New("writeBlob called an invalid hash and no renamer") + } + dir := l.path("blobs", hash.Algorithm) if err := os.MkdirAll(dir, os.ModePerm); err != nil && !os.IsExist(err) { return err @@ -234,72 +240,83 @@ func (l Path) writeBlob(hash v1.Hash, size int64, r io.ReadCloser, renamer func( // Check if blob already exists and is the correct size file := filepath.Join(dir, hash.Hex) - if s, err := os.Stat(file); err == nil && (s.Size() == size || size == 0) { + if s, err := os.Stat(file); err == nil && !s.IsDir() && (s.Size() == size || size == -1) { return nil } - // If a rename func was provided, use a temporary file suffix + // If a renamer func was provided write to a temporary file + open := func() (*os.File, error) { return os.Create(file) } if renamer != nil { - file += ".tmp" + open = func() (*os.File, error) { return ioutil.TempFile(dir, hash.Hex) } } - w, err := os.Create(file) + w, err := open() if err != nil { return err } defer w.Close() - // Write to file and optionally rename + // Write to file and exit if not renaming if _, err := io.Copy(w, r); err != nil || renamer == nil { return err } - finalHash, err := renamer() - if err != nil { - return err - } + // Always close file before renaming if err := w.Close(); err != nil { return err } - return os.Rename(file, l.path("blobs", finalHash.Algorithm, finalHash.Hex)) + + // Rename file based on the final hash + finalHash, err := renamer() + if err != nil { + return fmt.Errorf("error getting final digest of layer: %w", err) + } + + renamePath := l.path("blobs", finalHash.Algorithm, finalHash.Hex) + return os.Rename(w.Name(), renamePath) } -// WriteLayer writes the compressed layer to a blob. Unlike WriteBlob it will +// writeLayer writes the compressed layer to a blob. Unlike WriteBlob it will // write to a temporary file (suffixed with .tmp) within the layout until the // compressed reader is fully consumed and written to disk. Also unlike // WriteBlob, it will not skip writing and exit without error when a blob file // exists, but does not have the correct size. (The blob hash is not // considered, because it may be expensive to compute.) -func (l Path) WriteLayer(layer v1.Layer) error { +func (l Path) writeLayer(layer v1.Layer) error { d, err := layer.Digest() - if err != nil { + if errors.Is(err, stream.ErrNotComputed) { // Allow digest errors, since streams may not have calculated the hash - // yet. Instead, use a random value for the digest and require it to be + // yet. Instead, use an empty value, which will be transformed into a + // random file name with `ioutil.TempFile` and the final digest will be // calculated after writing to a temp file and before renaming to the // final path. - var randHash [32]byte - if _, err := rand.Read(randHash[:]); err != nil { - return err - } - d = v1.Hash{Algorithm: "sha256", Hex: hex.EncodeToString(randHash[:])} + d = v1.Hash{Algorithm: "sha256", Hex: ""} + } else if err != nil { + return err } s, err := layer.Size() - if err != nil { + if errors.Is(err, stream.ErrNotComputed) { // Allow size errors, since streams may not have calculated the size // yet. Instead, use zero as a sentinel value meaning that no size // comparison can be done and any sized blob file should be considered // valid and not overwritten. // // TODO: Provide an option to always overwrite blobs. - s = 0 + s = -1 + } else if err != nil { + return err } r, err := layer.Compressed() if err != nil { return err } + defer r.Close() - return l.writeBlob(d, s, r, layer.Digest) + if err := l.writeBlob(d, s, r, layer.Digest); err != nil { + return fmt.Errorf("error writing layer: %w", err) + } + return nil } // RemoveBlob removes a file from the blobs directory in the Path @@ -332,7 +349,7 @@ func (l Path) WriteImage(img v1.Image) error { for _, layer := range layers { layer := layer g.Go(func() error { - return l.WriteLayer(layer) + return l.writeLayer(layer) }) } if err := g.Wait(); err != nil { diff --git a/pkg/v1/layout/write_test.go b/pkg/v1/layout/write_test.go index ea2b5e7ee..66b53b068 100644 --- a/pkg/v1/layout/write_test.go +++ b/pkg/v1/layout/write_test.go @@ -526,7 +526,7 @@ func TestStreamingWriteLayer(t *testing.T) { if _, err := img.Digest(); err == nil { t.Fatal("digesting image before stream is consumed; (v1.Image).Digest() = nil, expected err") } - // AppendImage uses WriteLayer + // AppendImage uses writeLayer if err := l.AppendImage(img); err != nil { t.Fatalf("(Path).AppendImage() = %v", err) } @@ -538,14 +538,14 @@ func TestStreamingWriteLayer(t *testing.T) { } img, err = l.Image(imgDigest) if err != nil { - t.Fatalf("error loading image after WriteLayer for validation; (Path).Image = %v", err) + t.Fatalf("error loading image after writeLayer for validation; (Path).Image = %v", err) } if err := validate.Image(img); err != nil { t.Fatalf("validate.Image() = %v", err) } } -func TestOverwriteWithWriteLayer(t *testing.T) { +func TestOverwriteWithwriteLayer(t *testing.T) { // need to set up a basic path tmp, err := ioutil.TempDir("", "overwrite-with-write-layer-test") if err != nil { @@ -626,18 +626,18 @@ func TestOverwriteWithWriteLayer(t *testing.T) { t.Fatal("validating image after attempting repair of truncated layer with WriteBlob; validate.Image() = nil, expected err") } - // try writing expected contents with WriteLayer - if err := l.WriteLayer(layer); err != nil { - t.Fatalf("error attempting to overwrite truncated layer with valid layer; (Path).WriteLayer = %v", err) + // try writing expected contents with writeLayer + if err := l.writeLayer(layer); err != nil { + t.Fatalf("error attempting to overwrite truncated layer with valid layer; (Path).writeLayer = %v", err) } // validation should now succeed img, err = l.Image(imgDigest) if err != nil { - t.Fatalf("error loading truncated image after WriteLayer for validation; (Path).Image = %v", err) + t.Fatalf("error loading truncated image after writeLayer for validation; (Path).Image = %v", err) } if err := validate.Image(img); err != nil { - t.Fatalf("validating image after attempting repair of truncated layer with WriteLayer; validate.Image() = %v", err) + t.Fatalf("validating image after attempting repair of truncated layer with writeLayer; validate.Image() = %v", err) } } diff --git a/pkg/v1/stream/layer.go b/pkg/v1/stream/layer.go index e91f57ab3..9f42f6301 100644 --- a/pkg/v1/stream/layer.go +++ b/pkg/v1/stream/layer.go @@ -159,13 +159,17 @@ func newCompressedReader(l *Layer) (*compressedReader, error) { } cr := &compressedReader{ - closer: newMultiCloser(zw, l.blob), - pr: pr, - bw: bw, - h: h, - zh: zh, - count: count, - l: l, + // NOTE: Order matters! If zw is closed first, then it will panic, + // because io.Copy in the goroutine below will still be copying the + // contents of l.blob into it. + closer: newMultiCloser(l.blob, zw), + + pr: pr, + bw: bw, + h: h, + zh: zh, + count: count, + l: l, } go func() { if _, err := io.Copy(io.MultiWriter(h, zw), l.blob); err != nil { diff --git a/pkg/v1/validate/image.go b/pkg/v1/validate/image.go index 0820efec1..94fb767bb 100644 --- a/pkg/v1/validate/image.go +++ b/pkg/v1/validate/image.go @@ -114,16 +114,6 @@ func validateLayers(img v1.Image, opt ...Option) error { return layersExist(layers) } - cf, err := img.ConfigFile() - if err != nil { - return err - } - - m, err := img.Manifest() - if err != nil { - return err - } - digests := []v1.Hash{} diffids := []v1.Hash{} udiffids := []v1.Hash{} @@ -131,6 +121,13 @@ func validateLayers(img v1.Image, opt ...Option) error { for i, layer := range layers { cl, err := computeLayer(layer) if errors.Is(err, io.ErrUnexpectedEOF) { + // Errored while reading tar content of layer because a header or + // content section was not the correct length. This is most likely + // due to an incomplete download or otherwise interrupted process. + m, err := img.Manifest() + if err != nil { + return fmt.Errorf("undersized layer[%d] content", i) + } return fmt.Errorf("undersized layer[%d] content: Manifest.Layers[%d].Size=%d", i, i, m.Layers[i].Size) } if err != nil { @@ -144,6 +141,16 @@ func validateLayers(img v1.Image, opt ...Option) error { sizes = append(sizes, cl.size) } + cf, err := img.ConfigFile() + if err != nil { + return err + } + + m, err := img.Manifest() + if err != nil { + return err + } + errs := []string{} for i, layer := range layers { digest, err := layer.Digest() From 4d9503b3007ba2558770273ef39d13dbcd20b7c9 Mon Sep 17 00:00:00 2001 From: Ben Krieger Date: Wed, 5 Jan 2022 16:59:17 -0500 Subject: [PATCH 4/9] Add tests for closing stream.Layer --- pkg/v1/layout/write.go | 1 - pkg/v1/stream/layer.go | 50 ++++++++++++++++--------------------- pkg/v1/stream/layer_test.go | 49 ++++++++++++++++++++++++++++++++++++ 3 files changed, 71 insertions(+), 29 deletions(-) diff --git a/pkg/v1/layout/write.go b/pkg/v1/layout/write.go index 84aecbe69..ca91ec937 100644 --- a/pkg/v1/layout/write.go +++ b/pkg/v1/layout/write.go @@ -311,7 +311,6 @@ func (l Path) writeLayer(layer v1.Layer) error { if err != nil { return err } - defer r.Close() if err := l.writeBlob(d, s, r, layer.Digest); err != nil { return fmt.Errorf("error writing layer: %w", err) diff --git a/pkg/v1/stream/layer.go b/pkg/v1/stream/layer.go index 9f42f6301..740e6c54a 100644 --- a/pkg/v1/stream/layer.go +++ b/pkg/v1/stream/layer.go @@ -127,8 +127,6 @@ func (l *Layer) Compressed() (io.ReadCloser, error) { } type compressedReader struct { - closer io.Closer // original blob's Closer. - h, zh hash.Hash // collects digests of compressed and uncompressed stream. pr io.Reader bw *bufio.Writer @@ -159,11 +157,6 @@ func newCompressedReader(l *Layer) (*compressedReader, error) { } cr := &compressedReader{ - // NOTE: Order matters! If zw is closed first, then it will panic, - // because io.Copy in the goroutine below will still be copying the - // contents of l.blob into it. - closer: newMultiCloser(l.blob, zw), - pr: pr, bw: bw, h: h, @@ -172,10 +165,26 @@ func newCompressedReader(l *Layer) (*compressedReader, error) { l: l, } go func() { - if _, err := io.Copy(io.MultiWriter(h, zw), l.blob); err != nil { - pw.CloseWithError(err) + // Copy blob into the gzip writer - which also hashes and counts the + // size of the compressed output - and hasher of the raw contents. + _, copyErr := io.Copy(io.MultiWriter(h, zw), l.blob) + + // Close the gzip writer once copying is done. If this is done in the + // Close method of compressedReader instead, then it can cause a panic + // when the compressedReader is closed before the blob is fully + // consumed and io.Copy in this goroutine is still blocking. + closeErr := zw.Close() + + // Check errors from writing and closing streams. + if copyErr != nil { + pw.CloseWithError(copyErr) + return + } + if closeErr != nil { + pw.CloseWithError(closeErr) return } + // Now close the compressed reader, to flush the gzip stream // and calculate digest/diffID/size. This will cause pr to // return EOF which will cause readers of the Compressed stream @@ -193,7 +202,10 @@ func (cr *compressedReader) Close() error { defer cr.l.mu.Unlock() // Close the inner ReadCloser. - if err := cr.closer.Close(); err != nil { + // + // NOTE: net/http will call close on success, so if we've already + // closed the inner rc, it's not an error. + if err := cr.l.blob.Close(); err != nil && !errors.Is(err, os.ErrClosed) { return err } @@ -226,21 +238,3 @@ func (c *countWriter) Write(p []byte) (int, error) { c.n += int64(len(p)) return len(p), nil } - -// multiCloser is a Closer that collects multiple Closers and Closes them in order. -type multiCloser []io.Closer - -var _ io.Closer = (multiCloser)(nil) - -func newMultiCloser(c ...io.Closer) multiCloser { return multiCloser(c) } - -func (m multiCloser) Close() error { - for _, c := range m { - // NOTE: net/http will call close on success, so if we've already - // closed the inner rc, it's not an error. - if err := c.Close(); err != nil && !errors.Is(err, os.ErrClosed) { - return err - } - } - return nil -} diff --git a/pkg/v1/stream/layer_test.go b/pkg/v1/stream/layer_test.go index d44fd5f97..d77497408 100644 --- a/pkg/v1/stream/layer_test.go +++ b/pkg/v1/stream/layer_test.go @@ -223,6 +223,55 @@ func TestConsumed(t *testing.T) { } } +func TestCloseTextStreamBeforeConsume(t *testing.T) { + // Create stream layer from tar pipe + l := NewLayer(ioutil.NopCloser(strings.NewReader("hello"))) + rc, err := l.Compressed() + if err != nil { + t.Fatalf("Compressed: %v", err) + } + + // Close stream layer before consuming + if err := rc.Close(); err != nil { + t.Fatalf("Close: %v", err) + } +} + +func TestCloseTarStreamBeforeConsume(t *testing.T) { + // Write small tar to pipe + pr, pw := io.Pipe() + tw := tar.NewWriter(pw) + go func() { + pw.CloseWithError(func() error { + body := "test file" + if err := tw.WriteHeader(&tar.Header{ + Name: "test.txt", + Mode: 0600, + Size: int64(len(body)), + Typeflag: tar.TypeReg, + }); err != nil { + return err + } + if _, err := tw.Write([]byte(body)); err != nil { + return err + } + return tw.Close() + }()) + }() + + // Create stream layer from tar pipe + l := NewLayer(pr) + rc, err := l.Compressed() + if err != nil { + t.Fatalf("Compressed: %v", err) + } + + // Close stream layer before consuming + if err := rc.Close(); err != nil { + t.Fatalf("Close: %v", err) + } +} + func TestMediaType(t *testing.T) { l := NewLayer(ioutil.NopCloser(strings.NewReader("hello"))) mediaType, err := l.MediaType() From 575f86dd318429140f6f76a71ec6c9acd180fa11 Mon Sep 17 00:00:00 2001 From: Ben Krieger Date: Wed, 5 Jan 2022 18:49:55 -0500 Subject: [PATCH 5/9] Check size of blob written when expected size is known --- pkg/v1/layout/write.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/pkg/v1/layout/write.go b/pkg/v1/layout/write.go index ca91ec937..6564fef9b 100644 --- a/pkg/v1/layout/write.go +++ b/pkg/v1/layout/write.go @@ -256,8 +256,10 @@ func (l Path) writeBlob(hash v1.Hash, size int64, r io.Reader, renamer func() (v defer w.Close() // Write to file and exit if not renaming - if _, err := io.Copy(w, r); err != nil || renamer == nil { + if n, err := io.Copy(w, r); err != nil || renamer == nil { return err + } else if size != -1 && n != size { + return fmt.Errorf("expected blob size %d, but only wrote %d", size, n) } // Always close file before renaming From 1efe3efcbdfac114bf8e4cced94971f1346c75c1 Mon Sep 17 00:00:00 2001 From: Ben Krieger Date: Wed, 5 Jan 2022 18:54:39 -0500 Subject: [PATCH 6/9] Clean up temp file when layout.WriteBlob errors --- pkg/v1/layout/write.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/pkg/v1/layout/write.go b/pkg/v1/layout/write.go index 6564fef9b..cd0d2671b 100644 --- a/pkg/v1/layout/write.go +++ b/pkg/v1/layout/write.go @@ -253,6 +253,10 @@ func (l Path) writeBlob(hash v1.Hash, size int64, r io.Reader, renamer func() (v if err != nil { return err } + if renamer != nil { + // Delete temp file if an error is encountered before renaming + defer os.Remove(w.Name()) + } defer w.Close() // Write to file and exit if not renaming From b2322b65f0e694d2c8422ce0c111f75bb0c3a8f9 Mon Sep 17 00:00:00 2001 From: Ben Krieger Date: Fri, 7 Jan 2022 08:11:07 -0500 Subject: [PATCH 7/9] Refactor `stream.compressedReader` to have less access to mutable fields and fix data race --- pkg/v1/stream/layer.go | 125 ++++++++++++++++++++++++----------------- 1 file changed, 74 insertions(+), 51 deletions(-) diff --git a/pkg/v1/stream/layer.go b/pkg/v1/stream/layer.go index 740e6c54a..eeed00b3c 100644 --- a/pkg/v1/stream/layer.go +++ b/pkg/v1/stream/layer.go @@ -126,16 +126,36 @@ func (l *Layer) Compressed() (io.ReadCloser, error) { return newCompressedReader(l) } -type compressedReader struct { - h, zh hash.Hash // collects digests of compressed and uncompressed stream. - pr io.Reader - bw *bufio.Writer - count *countWriter +// finalize sets the layer to consumed and computes all hash and size values. +func (l *Layer) finalize(uncompressed, compressed hash.Hash, size int64) error { + l.mu.Lock() + defer l.mu.Unlock() + + diffID, err := v1.NewHash("sha256:" + hex.EncodeToString(uncompressed.Sum(nil))) + if err != nil { + return err + } + l.diffID = &diffID - l *Layer // stream.Layer to update upon Close. + digest, err := v1.NewHash("sha256:" + hex.EncodeToString(compressed.Sum(nil))) + if err != nil { + return err + } + l.digest = &digest + + l.size = size + l.consumed = true + return nil +} + +type compressedReader struct { + pr io.Reader + closer func() error } func newCompressedReader(l *Layer) (*compressedReader, error) { + // Collect digests of compressed and uncompressed stream and size of + // compressed stream. h := sha256.New() zh := sha256.New() count := &countWriter{} @@ -156,17 +176,41 @@ func newCompressedReader(l *Layer) (*compressedReader, error) { return nil, err } + doneDigesting := make(chan struct{}) + cr := &compressedReader{ - pr: pr, - bw: bw, - h: h, - zh: zh, - count: count, - l: l, + pr: pr, + closer: func() error { + // Immediately close pw without error. There are three ways to get + // here. + // + // 1. There was a copy error due from the underlying reader, in which + // case the error will not be overwritten. + // 2. Copying from the underlying reader completed successfully. + // 3. Close has been called before the underlying reader has been + // fully consumed. In this case pw must be closed in order to + // keep the flush of bw from blocking indefinitely. + // + // NOTE: pw.Close never returns an error. The signature is only to + // implement io.Closer. + _ = pw.Close() + + // Close the inner ReadCloser. + // + // NOTE: net/http will call close on success, so if we've already + // closed the inner rc, it's not an error. + if err := l.blob.Close(); err != nil && !errors.Is(err, os.ErrClosed) { + return err + } + + // Finalize layer with its digest and size values. + <-doneDigesting + return l.finalize(h, zh, count.n) + }, } go func() { - // Copy blob into the gzip writer - which also hashes and counts the - // size of the compressed output - and hasher of the raw contents. + // Copy blob into the gzip writer, which also hashes and counts the + // size of the compressed output, and hasher of the raw contents. _, copyErr := io.Copy(io.MultiWriter(h, zw), l.blob) // Close the gzip writer once copying is done. If this is done in the @@ -177,18 +221,29 @@ func newCompressedReader(l *Layer) (*compressedReader, error) { // Check errors from writing and closing streams. if copyErr != nil { + close(doneDigesting) pw.CloseWithError(copyErr) return } if closeErr != nil { + close(doneDigesting) pw.CloseWithError(closeErr) return } - // Now close the compressed reader, to flush the gzip stream - // and calculate digest/diffID/size. This will cause pr to - // return EOF which will cause readers of the Compressed stream - // to finish reading. + // Flush the buffer once all writes are complete to the gzip writer. + if err := bw.Flush(); err != nil { + close(doneDigesting) + pw.CloseWithError(err) + return + } + + // Notify closer that digests are done being written. + close(doneDigesting) + + // Close the compressed reader to calculate digest/diffID/size. This + // will cause pr to return EOF which will cause readers of the + // Compressed stream to finish reading. pw.CloseWithError(cr.Close()) }() @@ -197,39 +252,7 @@ func newCompressedReader(l *Layer) (*compressedReader, error) { func (cr *compressedReader) Read(b []byte) (int, error) { return cr.pr.Read(b) } -func (cr *compressedReader) Close() error { - cr.l.mu.Lock() - defer cr.l.mu.Unlock() - - // Close the inner ReadCloser. - // - // NOTE: net/http will call close on success, so if we've already - // closed the inner rc, it's not an error. - if err := cr.l.blob.Close(); err != nil && !errors.Is(err, os.ErrClosed) { - return err - } - - // Flush the buffer. - if err := cr.bw.Flush(); err != nil { - return err - } - - diffID, err := v1.NewHash("sha256:" + hex.EncodeToString(cr.h.Sum(nil))) - if err != nil { - return err - } - cr.l.diffID = &diffID - - digest, err := v1.NewHash("sha256:" + hex.EncodeToString(cr.zh.Sum(nil))) - if err != nil { - return err - } - cr.l.digest = &digest - - cr.l.size = cr.count.n - cr.l.consumed = true - return nil -} +func (cr *compressedReader) Close() error { return cr.closer() } // countWriter counts bytes written to it. type countWriter struct{ n int64 } From ecf1181e3813893b78f3a031926d7be473a89cbb Mon Sep 17 00:00:00 2001 From: Ben Krieger Date: Fri, 7 Jan 2022 08:14:10 -0500 Subject: [PATCH 8/9] Panic when `(layout.Path).writeBlob` is called with invalid arguments --- pkg/v1/layout/write.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/pkg/v1/layout/write.go b/pkg/v1/layout/write.go index cd0d2671b..b8b72483a 100644 --- a/pkg/v1/layout/write.go +++ b/pkg/v1/layout/write.go @@ -229,8 +229,7 @@ func (l Path) WriteBlob(hash v1.Hash, r io.ReadCloser) error { func (l Path) writeBlob(hash v1.Hash, size int64, r io.Reader, renamer func() (v1.Hash, error)) error { if hash.Hex == "" && renamer == nil { - // Should we panic, since this is a programming error? - return errors.New("writeBlob called an invalid hash and no renamer") + panic("writeBlob called an invalid hash and no renamer") } dir := l.path("blobs", hash.Algorithm) From 4c3cdb6ef91831012693850e062e27db2a1d0762 Mon Sep 17 00:00:00 2001 From: Ben Krieger Date: Fri, 7 Jan 2022 17:32:01 -0500 Subject: [PATCH 9/9] Warn when temp file cleanup fails after failing to write a blob --- pkg/v1/layout/write.go | 7 ++++++- pkg/v1/layout/write_test.go | 2 +- 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/pkg/v1/layout/write.go b/pkg/v1/layout/write.go index b8b72483a..4c580e253 100644 --- a/pkg/v1/layout/write.go +++ b/pkg/v1/layout/write.go @@ -24,6 +24,7 @@ import ( "os" "path/filepath" + "github.com/google/go-containerregistry/pkg/logs" v1 "github.com/google/go-containerregistry/pkg/v1" "github.com/google/go-containerregistry/pkg/v1/match" "github.com/google/go-containerregistry/pkg/v1/mutate" @@ -254,7 +255,11 @@ func (l Path) writeBlob(hash v1.Hash, size int64, r io.Reader, renamer func() (v } if renamer != nil { // Delete temp file if an error is encountered before renaming - defer os.Remove(w.Name()) + defer func() { + if err := os.Remove(w.Name()); err != nil && !errors.Is(err, os.ErrNotExist) { + logs.Warn.Printf("error removing temporary file after encountering an error while writing blob: %v", err) + } + }() } defer w.Close() diff --git a/pkg/v1/layout/write_test.go b/pkg/v1/layout/write_test.go index 66b53b068..6e610fe4b 100644 --- a/pkg/v1/layout/write_test.go +++ b/pkg/v1/layout/write_test.go @@ -545,7 +545,7 @@ func TestStreamingWriteLayer(t *testing.T) { } } -func TestOverwriteWithwriteLayer(t *testing.T) { +func TestOverwriteWithWriteLayer(t *testing.T) { // need to set up a basic path tmp, err := ioutil.TempDir("", "overwrite-with-write-layer-test") if err != nil {