Skip to content

Commit

Permalink
chunked: rework GetBlobAt usage
Browse files Browse the repository at this point in the history
rewrite how the result from GetBlobAt is used, to make sure 1) that
the streams are always closed, and 2) that any error is processed.

Closes: https://issues.redhat.com/browse/OCPBUGS-43968

Signed-off-by: Giuseppe Scrivano <gscrivan@redhat.com>
  • Loading branch information
giuseppe committed Nov 7, 2024
1 parent ad5f2a4 commit 5e80346
Show file tree
Hide file tree
Showing 3 changed files with 336 additions and 75 deletions.
123 changes: 69 additions & 54 deletions pkg/chunked/compression_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,21 +48,21 @@ func readEstargzChunkedManifest(blobStream ImageSourceSeekable, blobSize int64,
Offset: uint64(blobSize - footerSize),
Length: uint64(footerSize),
}
parts, errs, err := blobStream.GetBlobAt([]ImageSourceChunk{chunk})

footer := make([]byte, footerSize)
streamsOrErrors, err := getBlobAt(blobStream, []ImageSourceChunk{chunk})
if err != nil {
return nil, 0, err
}
var reader io.ReadCloser
select {
case r := <-parts:
reader = r
case err := <-errs:
return nil, 0, err
}
defer reader.Close()
footer := make([]byte, footerSize)
if _, err := io.ReadFull(reader, footer); err != nil {
return nil, 0, err

for soe := range streamsOrErrors {
if soe.stream != nil {
_, err = io.ReadFull(soe.stream, footer)
_ = soe.stream.Close()
}
if soe.err != nil && err == nil {
err = soe.err
}
}

/* Read the ToC offset:
Expand All @@ -89,40 +89,50 @@ func readEstargzChunkedManifest(blobStream ImageSourceSeekable, blobSize int64,
Offset: uint64(tocOffset),
Length: uint64(size),
}
parts, errs, err = blobStream.GetBlobAt([]ImageSourceChunk{chunk})
streamsOrErrors, err = getBlobAt(blobStream, []ImageSourceChunk{chunk})
if err != nil {
return nil, 0, err
}

var tocReader io.ReadCloser
select {
case r := <-parts:
tocReader = r
case err := <-errs:
return nil, 0, err
}
defer tocReader.Close()

r, err := pgzip.NewReader(tocReader)
if err != nil {
return nil, 0, err
}
defer r.Close()

aTar := archivetar.NewReader(r)

header, err := aTar.Next()
if err != nil {
return nil, 0, err
}
// set a reasonable limit
if header.Size > (1<<20)*50 {
return nil, 0, errors.New("manifest too big")
var manifestUncompressed []byte

for soe := range streamsOrErrors {
if soe.stream != nil {
err1 := func() error {
defer soe.stream.Close()

r, err := pgzip.NewReader(soe.stream)
if err != nil {
return err
}
defer r.Close()

aTar := archivetar.NewReader(r)

header, err := aTar.Next()
if err != nil {
return err
}
// set a reasonable limit
if header.Size > (1<<20)*50 {
return errors.New("manifest too big")
}

manifestUncompressed = make([]byte, header.Size)
if _, err := io.ReadFull(aTar, manifestUncompressed); err != nil {
return err
}
return nil
}()
if err == nil {
err = err1
}
} else if err == nil {
err = soe.err
}
}

manifestUncompressed := make([]byte, header.Size)
if _, err := io.ReadFull(aTar, manifestUncompressed); err != nil {
return nil, 0, err
if manifestUncompressed == nil {
return nil, 0, errors.New("manifest not found")
}

manifestDigester := digest.Canonical.Digester()
Expand All @@ -140,7 +150,7 @@ func readEstargzChunkedManifest(blobStream ImageSourceSeekable, blobSize int64,

// readZstdChunkedManifest reads the zstd:chunked manifest from the seekable stream blobStream.
// Returns (manifest blob, parsed manifest, tar-split blob or nil, manifest offset).
func readZstdChunkedManifest(blobStream ImageSourceSeekable, tocDigest digest.Digest, annotations map[string]string) ([]byte, *internal.TOC, []byte, int64, error) {
func readZstdChunkedManifest(blobStream ImageSourceSeekable, tocDigest digest.Digest, annotations map[string]string) (_ []byte, _ *internal.TOC, _ []byte, _ int64, retErr error) {
offsetMetadata := annotations[internal.ManifestInfoKey]
if offsetMetadata == "" {
return nil, nil, nil, 0, fmt.Errorf("%q annotation missing", internal.ManifestInfoKey)
Expand Down Expand Up @@ -175,26 +185,31 @@ func readZstdChunkedManifest(blobStream ImageSourceSeekable, tocDigest digest.Di
if tarSplitChunk.Offset > 0 {
chunks = append(chunks, tarSplitChunk)
}
parts, errs, err := blobStream.GetBlobAt(chunks)

streamsOrErrors, err := getBlobAt(blobStream, chunks)
if err != nil {
return nil, nil, nil, 0, err
}

defer func() {
err := ensureAllBlobsDone(streamsOrErrors)
if retErr == nil {
retErr = err
}
}()

readBlob := func(len uint64) ([]byte, error) {
var reader io.ReadCloser
select {
case r := <-parts:
reader = r
case err := <-errs:
return nil, err
soe, ok := <-streamsOrErrors
if !ok {
return nil, errors.New("stream closed")
}
if soe.err != nil {
return nil, soe.err
}
defer soe.stream.Close()

blob := make([]byte, len)
if _, err := io.ReadFull(reader, blob); err != nil {
reader.Close()
return nil, err
}
if err := reader.Close(); err != nil {
if _, err := io.ReadFull(soe.stream, blob); err != nil {
return nil, err
}
return blob, nil
Expand Down
117 changes: 96 additions & 21 deletions pkg/chunked/storage_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -1141,41 +1141,116 @@ func makeEntriesFlat(mergedEntries []fileMetadata) ([]fileMetadata, error) {
return new, nil
}

func (c *chunkedDiffer) copyAllBlobToFile(destination *os.File) (digest.Digest, error) {
var payload io.ReadCloser
var streams chan io.ReadCloser
var errs chan error
var err error
type streamOrErr struct {
stream io.ReadCloser
err error
}

// ensureAllBlobsDone ensures that all blobs are closed and returns the first error encountered.
func ensureAllBlobsDone(streamsOrErrors chan streamOrErr) (retErr error) {
for soe := range streamsOrErrors {
if soe.stream != nil {
_ = soe.stream.Close()
} else if retErr == nil {
retErr = soe.err
}
}
return
}

func getBlobAtConverterGoroutine(stream chan streamOrErr, streams chan io.ReadCloser, errs chan error, maxStreams int) {
tooManyStreams := false
streamsSoFar := 0

err := errors.New("Unexpected error in getBlobAtGoroutine")

defer func() {
if err != nil {
stream <- streamOrErr{err: err}
}
close(stream)
}()
loop:
for {
select {
case p, ok := <-streams:
if !ok {
streams = nil
break loop
}
if maxStreams > 0 && streamsSoFar >= maxStreams {
tooManyStreams = true
_ = p.Close()
continue
}
streamsSoFar++
stream <- streamOrErr{stream: p}
case err, ok := <-errs:
if !ok {
errs = nil
break loop
}
stream <- streamOrErr{err: err}
}
}
if streams != nil {
for p := range streams {
if maxStreams > 0 && streamsSoFar >= maxStreams {
tooManyStreams = true
_ = p.Close()
continue
}
streamsSoFar++
stream <- streamOrErr{stream: p}
}
}
if errs != nil {
for err := range errs {
stream <- streamOrErr{err: err}
}
}
if tooManyStreams {
stream <- streamOrErr{err: fmt.Errorf("too many streams returned, got more than %d", maxStreams)}
}
err = nil
}

func getBlobAt(is ImageSourceSeekable, chunksToRequest []ImageSourceChunk) (chan streamOrErr, error) {
streams, errs, err := is.GetBlobAt(chunksToRequest)
if err != nil {
return nil, err
}
stream := make(chan streamOrErr)
go getBlobAtConverterGoroutine(stream, streams, errs, len(chunksToRequest))
return stream, nil
}

func (c *chunkedDiffer) copyAllBlobToFile(destination *os.File) (digest.Digest, error) {
chunksToRequest := []ImageSourceChunk{
{
Offset: 0,
Length: uint64(c.blobSize),
},
}

streams, errs, err = c.stream.GetBlobAt(chunksToRequest)
streamsOrErrors, err := getBlobAt(c.stream, chunksToRequest)
if err != nil {
return "", err
}
select {
case p := <-streams:
payload = p
case err := <-errs:
return "", err
}
if payload == nil {
return "", errors.New("invalid stream returned")
}
defer payload.Close()

originalRawDigester := digest.Canonical.Digester()
for soe := range streamsOrErrors {
if soe.stream != nil {
r := io.TeeReader(soe.stream, originalRawDigester.Hash())

r := io.TeeReader(payload, originalRawDigester.Hash())

// copy the entire tarball and compute its digest
_, err = io.CopyBuffer(destination, r, c.copyBuffer)

// copy the entire tarball and compute its digest
_, err = io.CopyBuffer(destination, r, c.copyBuffer)
_ = soe.stream.Close()
}
if soe.err != nil && err == nil {
err = soe.err
}
}
return originalRawDigester.Digest(), err
}

Expand Down
Loading

0 comments on commit 5e80346

Please sign in to comment.