From 90434e74264d4fb1004854f025fb7f671827bbce Mon Sep 17 00:00:00 2001 From: Lucas Rodriguez Date: Fri, 29 Mar 2024 14:39:20 -0500 Subject: [PATCH] Remove duplicate logic and do some cleanup --- src/internal/packager/images/pull.go | 164 +-------------------------- 1 file changed, 2 insertions(+), 162 deletions(-) diff --git a/src/internal/packager/images/pull.go b/src/internal/packager/images/pull.go index de5ab03ff2..df839e4f07 100644 --- a/src/internal/packager/images/pull.go +++ b/src/internal/packager/images/pull.go @@ -6,10 +6,7 @@ package images import ( "context" - "errors" "fmt" - "io" - "os" "path/filepath" "strings" @@ -28,7 +25,6 @@ import ( "github.com/google/go-containerregistry/pkg/v1/empty" clayout "github.com/google/go-containerregistry/pkg/v1/layout" "github.com/google/go-containerregistry/pkg/v1/partial" - "github.com/google/go-containerregistry/pkg/v1/stream" "github.com/moby/moby/client" ) @@ -182,170 +178,16 @@ func (i *ImageConfig) PullAll() ([]ImgInfo, error) { updateText := fmt.Sprintf("Pulling %d images", imageCount) go utils.RenderProgressBarForLocalDirWrite(i.ImagesPath, totalBytes, doneSaving, updateText, updateText) - // Spawn a goroutine for each layer to write it to disk using crane - - layerWritingConcurrency := helpers.NewConcurrencyTools[bool, error](len(processedLayers)) - - defer layerWritingConcurrency.Cancel() - - for _, layer := range processedLayers { - layer := layer - // Function is a combination of https://github.com/google/go-containerregistry/blob/v0.15.2/pkg/v1/layout/write.go#L270-L305 - // and https://github.com/google/go-containerregistry/blob/v0.15.2/pkg/v1/layout/write.go#L198-L262 - // with modifications. This allows us to dedupe layers for all images and write them concurrently. - go func() { - digest, err := layer.Digest() - if errors.Is(err, stream.ErrNotComputed) { - // Allow digest errors, since streams may not have calculated the hash - // yet. Instead, use an empty value, which will be transformed into a - // random file name with `os.CreateTemp` and the final digest will be - // calculated after writing to a temp file and before renaming to the - // final path. - digest = v1.Hash{Algorithm: "sha256", Hex: ""} - } else if err != nil { - layerWritingConcurrency.ErrorChan <- err - return - } - - size, err := layer.Size() - if errors.Is(err, stream.ErrNotComputed) { - // Allow size errors, since streams may not have calculated the size - // yet. Instead, use -1 as a sentinel value meaning that no size - // comparison can be done and any sized blob file should be considered - // valid and not overwritten. - // - // TODO: Provide an option to always overwrite blobs. - size = -1 - } else if err != nil { - layerWritingConcurrency.ErrorChan <- err - return - } - - if layerWritingConcurrency.IsDone() { - return - } - - readCloser, err := layer.Compressed() - if err != nil { - layerWritingConcurrency.ErrorChan <- err - return - } - - // Create the directory for the blob if it doesn't exist - dir := filepath.Join(string(cranePath), "blobs", digest.Algorithm) - if err := helpers.CreateDirectory(dir, os.ModePerm); err != nil { - layerWritingConcurrency.ErrorChan <- err - return - } - - if layerWritingConcurrency.IsDone() { - return - } - - // Check if blob already exists and is the correct size - file := filepath.Join(dir, digest.Hex) - if s, err := os.Stat(file); err == nil && !s.IsDir() && (s.Size() == size || size == -1) { - layerWritingConcurrency.ProgressChan <- true - return - } - - if layerWritingConcurrency.IsDone() { - return - } - - // Write to a temporary file - w, err := os.CreateTemp(dir, digest.Hex) - if err != nil { - layerWritingConcurrency.ErrorChan <- err - return - } - // Delete temp file if an error is encountered before renaming - defer func() { - if err := os.Remove(w.Name()); err != nil && !errors.Is(err, os.ErrNotExist) { - message.Warnf("error removing temporary file after encountering an error while writing blob: %v", err) - } - }() - - defer w.Close() - - if layerWritingConcurrency.IsDone() { - return - } - - // Write to file rename - if n, err := io.Copy(w, readCloser); err != nil { - layerWritingConcurrency.ErrorChan <- err - return - } else if size != -1 && n != size { - layerWritingConcurrency.ErrorChan <- fmt.Errorf("expected blob size %d, but only wrote %d", size, n) - return - } - - if layerWritingConcurrency.IsDone() { - return - } - - // Always close reader before renaming, since Close computes the digest in - // the case of streaming layers. If Close is not called explicitly, it will - // occur in a goroutine that is not guaranteed to succeed before renamer is - // called. When renamer is the layer's Digest method, it can return - // ErrNotComputed. - if err := readCloser.Close(); err != nil { - layerWritingConcurrency.ErrorChan <- err - return - } - - // Always close file before renaming - if err := w.Close(); err != nil { - layerWritingConcurrency.ErrorChan <- err - return - } - - // Rename file based on the final hash - renamePath := filepath.Join(string(cranePath), "blobs", digest.Algorithm, digest.Hex) - os.Rename(w.Name(), renamePath) - - if layerWritingConcurrency.IsDone() { - return - } - - layerWritingConcurrency.ProgressChan <- true - }() - } - - onLayerWritingError := func(err error) error { - // Send a signal to the progress bar that we're done and wait for the thread to finish - doneSaving <- err - <-doneSaving - message.WarnErr(err, "Failed to write image layers, trying again up to 3 times...") - if strings.HasPrefix(err.Error(), "expected blob size") { - message.Warnf("Potential image cache corruption: %s - try clearing cache with \"zarf tools clear-cache\"", err.Error()) - } - return err - } - - if err := layerWritingConcurrency.WaitWithoutProgress(onLayerWritingError); err != nil { - return nil, err - } - imageSavingConcurrency := helpers.NewConcurrencyTools[digestInfo, error](len(refInfoToImage)) defer imageSavingConcurrency.Cancel() // Spawn a goroutine for each image to write it's config and manifest to disk using crane - // All layers should already be in place so this should be extremely fast for refInfo, img := range refInfoToImage { // Create a closure so that we can pass the refInfo and img into the goroutine refInfo, img := refInfo, img go func() { - // Save the image via crane - err := cranePath.WriteImage(img) - - if imageSavingConcurrency.IsDone() { - return - } - - if err != nil { + if err := cranePath.WriteImage(img); err != nil { // Check if the cache has been invalidated, and warn the user if so if strings.HasPrefix(err.Error(), "error writing layer: expected blob size") { message.Warnf("Potential image cache corruption: %s - try clearing cache with \"zarf tools clear-cache\"", err.Error()) @@ -390,15 +232,13 @@ func (i *ImageConfig) PullAll() ([]ImgInfo, error) { } // for every image sequentially append OCI descriptor - for refInfo, img := range refInfoToImage { desc, err := partial.Descriptor(img) if err != nil { return nil, err } - cranePath.AppendDescriptor(*desc) - if err != nil { + if err := cranePath.AppendDescriptor(*desc); err != nil { return nil, err }