Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: remove duplicate logic for writing image layers to disk concurrently #2409

Merged
merged 3 commits into from
Apr 9, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
164 changes: 2 additions & 162 deletions src/internal/packager/images/pull.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,7 @@ package images

import (
"context"
"errors"
"fmt"
"io"
"os"
"path/filepath"
"strings"

Expand All @@ -28,7 +25,6 @@ import (
"github.com/google/go-containerregistry/pkg/v1/empty"
clayout "github.com/google/go-containerregistry/pkg/v1/layout"
"github.com/google/go-containerregistry/pkg/v1/partial"
"github.com/google/go-containerregistry/pkg/v1/stream"
"github.com/moby/moby/client"
)

Expand Down Expand Up @@ -182,170 +178,16 @@ func (i *ImageConfig) PullAll() ([]ImgInfo, error) {
updateText := fmt.Sprintf("Pulling %d images", imageCount)
go utils.RenderProgressBarForLocalDirWrite(i.ImagesPath, totalBytes, doneSaving, updateText, updateText)

// Spawn a goroutine for each layer to write it to disk using crane

layerWritingConcurrency := helpers.NewConcurrencyTools[bool, error](len(processedLayers))

defer layerWritingConcurrency.Cancel()

for _, layer := range processedLayers {
layer := layer
// Function is a combination of https://github.com/google/go-containerregistry/blob/v0.15.2/pkg/v1/layout/write.go#L270-L305
// and https://github.com/google/go-containerregistry/blob/v0.15.2/pkg/v1/layout/write.go#L198-L262
// with modifications. This allows us to dedupe layers for all images and write them concurrently.
go func() {
digest, err := layer.Digest()
if errors.Is(err, stream.ErrNotComputed) {
// Allow digest errors, since streams may not have calculated the hash
// yet. Instead, use an empty value, which will be transformed into a
// random file name with `os.CreateTemp` and the final digest will be
// calculated after writing to a temp file and before renaming to the
// final path.
digest = v1.Hash{Algorithm: "sha256", Hex: ""}
} else if err != nil {
layerWritingConcurrency.ErrorChan <- err
return
}

size, err := layer.Size()
if errors.Is(err, stream.ErrNotComputed) {
// Allow size errors, since streams may not have calculated the size
// yet. Instead, use -1 as a sentinel value meaning that no size
// comparison can be done and any sized blob file should be considered
// valid and not overwritten.
//
// TODO: Provide an option to always overwrite blobs.
size = -1
} else if err != nil {
layerWritingConcurrency.ErrorChan <- err
return
}

if layerWritingConcurrency.IsDone() {
return
}

readCloser, err := layer.Compressed()
if err != nil {
layerWritingConcurrency.ErrorChan <- err
return
}

// Create the directory for the blob if it doesn't exist
dir := filepath.Join(string(cranePath), "blobs", digest.Algorithm)
if err := helpers.CreateDirectory(dir, os.ModePerm); err != nil {
layerWritingConcurrency.ErrorChan <- err
return
}

if layerWritingConcurrency.IsDone() {
return
}

// Check if blob already exists and is the correct size
file := filepath.Join(dir, digest.Hex)
if s, err := os.Stat(file); err == nil && !s.IsDir() && (s.Size() == size || size == -1) {
layerWritingConcurrency.ProgressChan <- true
return
}

if layerWritingConcurrency.IsDone() {
return
}

// Write to a temporary file
w, err := os.CreateTemp(dir, digest.Hex)
if err != nil {
layerWritingConcurrency.ErrorChan <- err
return
}
// Delete temp file if an error is encountered before renaming
defer func() {
if err := os.Remove(w.Name()); err != nil && !errors.Is(err, os.ErrNotExist) {
message.Warnf("error removing temporary file after encountering an error while writing blob: %v", err)
}
}()

defer w.Close()

if layerWritingConcurrency.IsDone() {
return
}

// Write to file rename
if n, err := io.Copy(w, readCloser); err != nil {
layerWritingConcurrency.ErrorChan <- err
return
} else if size != -1 && n != size {
layerWritingConcurrency.ErrorChan <- fmt.Errorf("expected blob size %d, but only wrote %d", size, n)
return
}

if layerWritingConcurrency.IsDone() {
return
}

// Always close reader before renaming, since Close computes the digest in
// the case of streaming layers. If Close is not called explicitly, it will
// occur in a goroutine that is not guaranteed to succeed before renamer is
// called. When renamer is the layer's Digest method, it can return
// ErrNotComputed.
if err := readCloser.Close(); err != nil {
layerWritingConcurrency.ErrorChan <- err
return
}

// Always close file before renaming
if err := w.Close(); err != nil {
layerWritingConcurrency.ErrorChan <- err
return
}

// Rename file based on the final hash
renamePath := filepath.Join(string(cranePath), "blobs", digest.Algorithm, digest.Hex)
os.Rename(w.Name(), renamePath)

if layerWritingConcurrency.IsDone() {
return
}

layerWritingConcurrency.ProgressChan <- true
}()
}

onLayerWritingError := func(err error) error {
// Send a signal to the progress bar that we're done and wait for the thread to finish
doneSaving <- err
<-doneSaving
message.WarnErr(err, "Failed to write image layers, trying again up to 3 times...")
if strings.HasPrefix(err.Error(), "expected blob size") {
message.Warnf("Potential image cache corruption: %s - try clearing cache with \"zarf tools clear-cache\"", err.Error())
}
return err
}

if err := layerWritingConcurrency.WaitWithoutProgress(onLayerWritingError); err != nil {
return nil, err
}

imageSavingConcurrency := helpers.NewConcurrencyTools[digestInfo, error](len(refInfoToImage))

defer imageSavingConcurrency.Cancel()

// Spawn a goroutine for each image to write it's config and manifest to disk using crane
// All layers should already be in place so this should be extremely fast
for refInfo, img := range refInfoToImage {
// Create a closure so that we can pass the refInfo and img into the goroutine
refInfo, img := refInfo, img
go func() {
// Save the image via crane
err := cranePath.WriteImage(img)

if imageSavingConcurrency.IsDone() {
return
}

if err != nil {
if err := cranePath.WriteImage(img); err != nil {
// Check if the cache has been invalidated, and warn the user if so
if strings.HasPrefix(err.Error(), "error writing layer: expected blob size") {
message.Warnf("Potential image cache corruption: %s - try clearing cache with \"zarf tools clear-cache\"", err.Error())
Expand Down Expand Up @@ -390,15 +232,13 @@ func (i *ImageConfig) PullAll() ([]ImgInfo, error) {
}

// for every image sequentially append OCI descriptor

for refInfo, img := range refInfoToImage {
desc, err := partial.Descriptor(img)
if err != nil {
return nil, err
}

cranePath.AppendDescriptor(*desc)
if err != nil {
if err := cranePath.AppendDescriptor(*desc); err != nil {
return nil, err
}

Expand Down
Loading