Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Buffer the output of gzip.Writer to avoid stalling #923

Merged
merged 1 commit into from
Jan 28, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 22 additions & 2 deletions pkg/v1/internal/gzip/zip.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package gzip

import (
"bufio"
"bytes"
"compress/gzip"
"io"
Expand All @@ -38,11 +39,19 @@ func ReadCloser(r io.ReadCloser) io.ReadCloser {
func ReadCloserLevel(r io.ReadCloser, level int) io.ReadCloser {
pr, pw := io.Pipe()

// For highly compressible layers, gzip.Writer will output a very small
// number of bytes per Write(). This is normally fine, but when pushing
// to a registry, we want to ensure that we're taking full advantage of
// the available bandwidth instead of sending tons of tiny writes over
// the wire.
// 64K ought to be small enough for anybody.
bw := bufio.NewWriterSize(pw, 2<<16)

// Returns err so we can pw.CloseWithError(err)
go func() error {
// TODO(go1.14): Just defer {pw,gw,r}.Close like you'd expect.
// Context: https://golang.org/issue/24283
gw, err := gzip.NewWriterLevel(pw, level)
gw, err := gzip.NewWriterLevel(bw, level)
if err != nil {
return pw.CloseWithError(err)
}
Expand All @@ -52,9 +61,20 @@ func ReadCloserLevel(r io.ReadCloser, level int) io.ReadCloser {
defer gw.Close()
return pw.CloseWithError(err)
}

// Close gzip writer to Flush it and write gzip trailers.
if err := gw.Close(); err != nil {
return pw.CloseWithError(err)
}

// Flush bufio writer to ensure we write out everything.
if err := bw.Flush(); err != nil {
return pw.CloseWithError(err)
}

// We dont' really care if these fail.
defer pw.Close()
defer r.Close()
defer gw.Close()

return nil
}()
Expand Down
17 changes: 16 additions & 1 deletion pkg/v1/stream/layer.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package stream

import (
"bufio"
"compress/gzip"
"crypto/sha256"
"encoding/hex"
Expand Down Expand Up @@ -130,6 +131,7 @@ type compressedReader struct {

h, zh hash.Hash // collects digests of compressed and uncompressed stream.
pr io.Reader
bw *bufio.Writer
count *countWriter

l *Layer // stream.Layer to update upon Close.
Expand All @@ -144,14 +146,22 @@ func newCompressedReader(l *Layer) (*compressedReader, error) {
// capture compressed digest, and a countWriter to capture compressed
// size.
pr, pw := io.Pipe()
zw, err := gzip.NewWriterLevel(io.MultiWriter(pw, zh, count), l.compression)

// Write compressed bytes to be read by the pipe.Reader, hashed by zh, and counted by count.
mw := io.MultiWriter(pw, zh, count)

// Buffer the output of the gzip writer so we don't have to wait on pr to keep writing.
// 64K ought to be small enough for anybody.
bw := bufio.NewWriterSize(mw, 2<<16)
zw, err := gzip.NewWriterLevel(bw, l.compression)
if err != nil {
return nil, err
}

cr := &compressedReader{
closer: newMultiCloser(zw, l.blob),
pr: pr,
bw: bw,
h: h,
zh: zh,
count: count,
Expand Down Expand Up @@ -183,6 +193,11 @@ func (cr *compressedReader) Close() error {
return err
}

// Flush the buffer.
if err := cr.bw.Flush(); err != nil {
return err
}

diffID, err := v1.NewHash("sha256:" + hex.EncodeToString(cr.h.Sum(nil)))
if err != nil {
return err
Expand Down