Skip to content

Commit

Permalink
Merge branch 'main' into hotfix-add-skip-cosign-find-images
Browse files Browse the repository at this point in the history
  • Loading branch information
AustinAbro321 committed Apr 9, 2024
2 parents c99c661 + 5ab393a commit 7d7977b
Showing 1 changed file with 2 additions and 162 deletions.
164 changes: 2 additions & 162 deletions src/internal/packager/images/pull.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,7 @@ package images

import (
"context"
"errors"
"fmt"
"io"
"os"
"path/filepath"
"strings"

Expand All @@ -28,7 +25,6 @@ import (
"github.com/google/go-containerregistry/pkg/v1/empty"
clayout "github.com/google/go-containerregistry/pkg/v1/layout"
"github.com/google/go-containerregistry/pkg/v1/partial"
"github.com/google/go-containerregistry/pkg/v1/stream"
"github.com/moby/moby/client"
)

Expand Down Expand Up @@ -182,170 +178,16 @@ func (i *ImageConfig) PullAll() ([]ImgInfo, error) {
updateText := fmt.Sprintf("Pulling %d images", imageCount)
go utils.RenderProgressBarForLocalDirWrite(i.ImagesPath, totalBytes, doneSaving, updateText, updateText)

// Spawn a goroutine for each layer to write it to disk using crane

layerWritingConcurrency := helpers.NewConcurrencyTools[bool, error](len(processedLayers))

defer layerWritingConcurrency.Cancel()

for _, layer := range processedLayers {
layer := layer
// Function is a combination of https://github.com/google/go-containerregistry/blob/v0.15.2/pkg/v1/layout/write.go#L270-L305
// and https://github.com/google/go-containerregistry/blob/v0.15.2/pkg/v1/layout/write.go#L198-L262
// with modifications. This allows us to dedupe layers for all images and write them concurrently.
go func() {
digest, err := layer.Digest()
if errors.Is(err, stream.ErrNotComputed) {
// Allow digest errors, since streams may not have calculated the hash
// yet. Instead, use an empty value, which will be transformed into a
// random file name with `os.CreateTemp` and the final digest will be
// calculated after writing to a temp file and before renaming to the
// final path.
digest = v1.Hash{Algorithm: "sha256", Hex: ""}
} else if err != nil {
layerWritingConcurrency.ErrorChan <- err
return
}

size, err := layer.Size()
if errors.Is(err, stream.ErrNotComputed) {
// Allow size errors, since streams may not have calculated the size
// yet. Instead, use -1 as a sentinel value meaning that no size
// comparison can be done and any sized blob file should be considered
// valid and not overwritten.
//
// TODO: Provide an option to always overwrite blobs.
size = -1
} else if err != nil {
layerWritingConcurrency.ErrorChan <- err
return
}

if layerWritingConcurrency.IsDone() {
return
}

readCloser, err := layer.Compressed()
if err != nil {
layerWritingConcurrency.ErrorChan <- err
return
}

// Create the directory for the blob if it doesn't exist
dir := filepath.Join(string(cranePath), "blobs", digest.Algorithm)
if err := helpers.CreateDirectory(dir, os.ModePerm); err != nil {
layerWritingConcurrency.ErrorChan <- err
return
}

if layerWritingConcurrency.IsDone() {
return
}

// Check if blob already exists and is the correct size
file := filepath.Join(dir, digest.Hex)
if s, err := os.Stat(file); err == nil && !s.IsDir() && (s.Size() == size || size == -1) {
layerWritingConcurrency.ProgressChan <- true
return
}

if layerWritingConcurrency.IsDone() {
return
}

// Write to a temporary file
w, err := os.CreateTemp(dir, digest.Hex)
if err != nil {
layerWritingConcurrency.ErrorChan <- err
return
}
// Delete temp file if an error is encountered before renaming
defer func() {
if err := os.Remove(w.Name()); err != nil && !errors.Is(err, os.ErrNotExist) {
message.Warnf("error removing temporary file after encountering an error while writing blob: %v", err)
}
}()

defer w.Close()

if layerWritingConcurrency.IsDone() {
return
}

// Write to file rename
if n, err := io.Copy(w, readCloser); err != nil {
layerWritingConcurrency.ErrorChan <- err
return
} else if size != -1 && n != size {
layerWritingConcurrency.ErrorChan <- fmt.Errorf("expected blob size %d, but only wrote %d", size, n)
return
}

if layerWritingConcurrency.IsDone() {
return
}

// Always close reader before renaming, since Close computes the digest in
// the case of streaming layers. If Close is not called explicitly, it will
// occur in a goroutine that is not guaranteed to succeed before renamer is
// called. When renamer is the layer's Digest method, it can return
// ErrNotComputed.
if err := readCloser.Close(); err != nil {
layerWritingConcurrency.ErrorChan <- err
return
}

// Always close file before renaming
if err := w.Close(); err != nil {
layerWritingConcurrency.ErrorChan <- err
return
}

// Rename file based on the final hash
renamePath := filepath.Join(string(cranePath), "blobs", digest.Algorithm, digest.Hex)
os.Rename(w.Name(), renamePath)

if layerWritingConcurrency.IsDone() {
return
}

layerWritingConcurrency.ProgressChan <- true
}()
}

onLayerWritingError := func(err error) error {
// Send a signal to the progress bar that we're done and wait for the thread to finish
doneSaving <- err
<-doneSaving
message.WarnErr(err, "Failed to write image layers, trying again up to 3 times...")
if strings.HasPrefix(err.Error(), "expected blob size") {
message.Warnf("Potential image cache corruption: %s - try clearing cache with \"zarf tools clear-cache\"", err.Error())
}
return err
}

if err := layerWritingConcurrency.WaitWithoutProgress(onLayerWritingError); err != nil {
return nil, err
}

imageSavingConcurrency := helpers.NewConcurrencyTools[digestInfo, error](len(refInfoToImage))

defer imageSavingConcurrency.Cancel()

// Spawn a goroutine for each image to write it's config and manifest to disk using crane
// All layers should already be in place so this should be extremely fast
for refInfo, img := range refInfoToImage {
// Create a closure so that we can pass the refInfo and img into the goroutine
refInfo, img := refInfo, img
go func() {
// Save the image via crane
err := cranePath.WriteImage(img)

if imageSavingConcurrency.IsDone() {
return
}

if err != nil {
if err := cranePath.WriteImage(img); err != nil {
// Check if the cache has been invalidated, and warn the user if so
if strings.HasPrefix(err.Error(), "error writing layer: expected blob size") {
message.Warnf("Potential image cache corruption: %s - try clearing cache with \"zarf tools clear-cache\"", err.Error())
Expand Down Expand Up @@ -390,15 +232,13 @@ func (i *ImageConfig) PullAll() ([]ImgInfo, error) {
}

// for every image sequentially append OCI descriptor

for refInfo, img := range refInfoToImage {
desc, err := partial.Descriptor(img)
if err != nil {
return nil, err
}

cranePath.AppendDescriptor(*desc)
if err != nil {
if err := cranePath.AppendDescriptor(*desc); err != nil {
return nil, err
}

Expand Down

0 comments on commit 7d7977b

Please sign in to comment.