Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: use memory friendly file split logic #2264

Merged
merged 11 commits into from
Feb 7, 2024
4 changes: 2 additions & 2 deletions src/pkg/cluster/injector.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ func (c *Cluster) createPayloadConfigmaps(seedImagesDir, tarPath string, spinner
return configMaps, "", err
}

chunks, sha256sum, err := utils.SplitFile(tarPath, payloadChunkSize)
chunks, sha256sum, err := utils.ReadFileByChunks(tarPath, payloadChunkSize)
if err != nil {
return configMaps, "", err
}
Expand Down Expand Up @@ -373,7 +373,7 @@ func (c *Cluster) buildInjectionPod(node, image string, payloadConfigmaps []stri
corev1.ResourceMemory: injectorRequestedMemory,
},
Limits: corev1.ResourceList{
corev1.ResourceCPU: injectorLimitCPU,
corev1.ResourceCPU: injectorLimitCPU,
corev1.ResourceMemory: injectorLimitMemory,
},
},
Expand Down
8 changes: 8 additions & 0 deletions src/pkg/message/progress.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,3 +96,11 @@ func (p *ProgressBar) Errorf(err error, format string, a ...any) {
p.Stop()
WarnErrf(err, format, a...)
}

// GetCurrent returns the current total
func (p *ProgressBar) GetCurrent() int {
if p.progress != nil {
return p.progress.Current
}
return -1
}
39 changes: 5 additions & 34 deletions src/pkg/packager/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
package packager

import (
"encoding/json"
"errors"
"fmt"
"os"
Expand Down Expand Up @@ -336,50 +335,22 @@ func (p *Packager) archivePackage(destinationTarball string) error {
if err != nil {
return fmt.Errorf("unable to read the package archive: %w", err)
}
spinner.Successf("Package saved to %q", destinationTarball)

// Convert Megabytes to bytes.
chunkSize := p.cfg.CreateOpts.MaxPackageSizeMB * 1000 * 1000

// If a chunk size was specified and the package is larger than the chunk size, split it into chunks.
if p.cfg.CreateOpts.MaxPackageSizeMB > 0 && fi.Size() > int64(chunkSize) {
spinner.Updatef("Package is larger than %dMB, splitting into multiple files", p.cfg.CreateOpts.MaxPackageSizeMB)
chunks, sha256sum, err := utils.SplitFile(destinationTarball, chunkSize)
if err != nil {
return fmt.Errorf("unable to split the package archive into multiple files: %w", err)
}
if len(chunks) > 999 {
if fi.Size()/int64(chunkSize) > 999 {
return fmt.Errorf("unable to split the package archive into multiple files: must be less than 1,000 files")
}

status := fmt.Sprintf("Package split into %d files, original sha256sum is %s", len(chunks)+1, sha256sum)
spinner.Updatef(status)
message.Debug(status)
_ = os.RemoveAll(destinationTarball)

// Marshal the data into a json file.
jsonData, err := json.Marshal(types.ZarfSplitPackageData{
Count: len(chunks),
Bytes: fi.Size(),
Sha256Sum: sha256sum,
})
message.Notef("Package is larger than %dMB, splitting into multiple files", p.cfg.CreateOpts.MaxPackageSizeMB)
err := utils.SplitFile(destinationTarball, chunkSize)
if err != nil {
return fmt.Errorf("unable to marshal the split package data: %w", err)
}

// Prepend the json data to the first chunk.
chunks = append([][]byte{jsonData}, chunks...)

for idx, chunk := range chunks {
path := fmt.Sprintf("%s.part%03d", destinationTarball, idx)
status := fmt.Sprintf("Writing %s", path)
spinner.Updatef(status)
message.Debug(status)
if err := os.WriteFile(path, chunk, 0644); err != nil {
return fmt.Errorf("unable to write the file %s: %w", path, err)
}
return fmt.Errorf("unable to split the package archive into multiple files: %w", err)
}
}
spinner.Successf("Package saved to %q", destinationTarball)
return nil
}

Expand Down
139 changes: 137 additions & 2 deletions src/pkg/utils/io.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"archive/tar"
"bufio"
"crypto/sha256"
"encoding/json"
"fmt"
"io"
"io/fs"
Expand Down Expand Up @@ -297,8 +298,8 @@ func GetFinalExecutableCommand() (string, error) {
return zarfCommand, err
}

// SplitFile splits a file into multiple parts by the given size.
func SplitFile(path string, chunkSizeBytes int) (chunks [][]byte, sha256sum string, err error) {
// ReadFileByChunks reads a file into multiple chunks by the given size.
func ReadFileByChunks(path string, chunkSizeBytes int) (chunks [][]byte, sha256sum string, err error) {
var file []byte

// Open the created archive for io.Copy
Expand Down Expand Up @@ -327,6 +328,140 @@ func SplitFile(path string, chunkSizeBytes int) (chunks [][]byte, sha256sum stri
return chunks, sha256sum, nil
}

// SplitFile will take a srcFile path and split it into files based on chunkSizeBytes
// the first file will be a metadata file containing:
// - sha256sum of the original file
// - number of bytes in the original file
// - number of files the srcFile was split into
// SplitFile will delete the original file
//
// Returns:
// - fileNames: list of file paths srcFile was split across
// - sha256sum: sha256sum of the srcFile before splitting
// - err: any errors encountered
func SplitFile(srcFile string, chunkSizeBytes int) (err error) {
var fileNames []string
var sha256sum string
hash := sha256.New()

// Set buffer size to some multiple of 4096 KiB for modern file system cluster sizes
bufferSize := 16 * 1024 * 1024 // 16 MiB
// if chunkSizeBytes is less than bufferSize, use chunkSizeBytes as bufferSize for simplicity
if chunkSizeBytes < bufferSize {
bufferSize = chunkSizeBytes
}
buf := make([]byte, bufferSize)

// get file size
fi, err := os.Stat(srcFile)
if err != nil {
return err
}
fileSize := fi.Size()

// start progress bar
title := fmt.Sprintf("[0/%d] MB bytes written", fileSize/1000/1000)
progressBar := message.NewProgressBar(fileSize, title)
defer progressBar.Stop()

// open file
file, err := os.Open(srcFile)
defer file.Close()
if err != nil {
return err
}

// create file path starting from part 001
path := fmt.Sprintf("%s.part001", srcFile)
chunkFile, err := os.OpenFile(path, os.O_CREATE|os.O_WRONLY, 0644)
if err != nil {
return err
}
fileNames = append(fileNames, path)
defer chunkFile.Close()

// setup counter for tracking how many bytes are left to write to file
chunkBytesRemaining := chunkSizeBytes
// Loop over the tarball hashing as we go and breaking it into chunks based on the chunkSizeBytes
for {
bytesRead, err := file.Read(buf)

if err != nil {
if err == io.EOF {
// At end of file, break out of loop
break
}
return err
}

// Pass data to hash
hash.Write(buf[0:bytesRead])

// handle if we should split the data between two chunks
if chunkBytesRemaining < bytesRead {
// write the remaining chunk size to file
_, err := chunkFile.Write(buf[0:chunkBytesRemaining])
if err != nil {
return err
}

// create new file
path = fmt.Sprintf("%s.part%03d", srcFile, len(fileNames)+1)
chunkFile, err := os.OpenFile(path, os.O_CREATE|os.O_WRONLY, 0644)
if err != nil {
return err
}
fileNames = append(fileNames, path)
defer chunkFile.Close()

// write to new file where we left off
_, err = chunkFile.Write(buf[chunkBytesRemaining:bytesRead])
if err != nil {
return err
}

// set chunkBytesRemaining considering how many bytes are already written to new file
chunkBytesRemaining = chunkSizeBytes - (bufferSize - chunkBytesRemaining)
} else {
_, err := chunkFile.Write(buf[0:bytesRead])
if err != nil {
return err
}
chunkBytesRemaining = chunkBytesRemaining - bytesRead
}

// update progress bar
progressBar.Add(bufferSize)
title := fmt.Sprintf("[%d/%d] MB bytes written", progressBar.GetCurrent()/1000/1000, fileSize/1000/1000)
progressBar.UpdateTitle(title)
}
file.Close()
Noxsios marked this conversation as resolved.
Show resolved Hide resolved
_ = os.RemoveAll(srcFile)

// calculate sha256 sum
sha256sum = fmt.Sprintf("%x", hash.Sum(nil))

// Marshal the data into a json file.
jsonData, err := json.Marshal(types.ZarfSplitPackageData{
Count: len(fileNames),
Bytes: fileSize,
Sha256Sum: sha256sum,
})
if err != nil {
return fmt.Errorf("unable to marshal the split package data: %w", err)
}

// write header file
path = fmt.Sprintf("%s.part000", srcFile)
if err := os.WriteFile(path, jsonData, 0644); err != nil {
return fmt.Errorf("unable to write the file %s: %w", path, err)
}
fileNames = append(fileNames, path)
progressBar.Successf("Package split across %d files", len(fileNames))

return nil
}

// IsTextFile returns true if the given file is a text file.
func IsTextFile(path string) (bool, error) {
// Open the file
Expand Down
9 changes: 6 additions & 3 deletions src/test/e2e/05_tarball_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,18 +32,21 @@ func TestMultiPartPackage(t *testing.T) {
stdOut, stdErr, err := e2e.Zarf("package", "create", createPath, "--max-package-size=1", "--confirm")
require.NoError(t, err, stdOut, stdErr)

list, err := filepath.Glob("zarf-package-multi-part-*")
parts, err := filepath.Glob("zarf-package-multi-part-*")
require.NoError(t, err)
// Length is 7 because there are 6 parts and 1 manifest
require.Len(t, list, 7)
require.Len(t, parts, 7)

stdOut, stdErr, err = e2e.Zarf("package", "deploy", deployPath, "--confirm")
require.NoError(t, err, stdOut, stdErr)

// Verify the package was deployed
require.FileExists(t, outputFile)

e2e.CleanFiles(deployPath, outputFile)
// deploying package combines parts back into single archive, check dir again to find all files
parts, err = filepath.Glob("zarf-package-multi-part-*")
e2e.CleanFiles(parts...)
e2e.CleanFiles(outputFile)
}

func TestReproducibleTarballs(t *testing.T) {
Expand Down
Loading