Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add compressed writes to cas.go. #232

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ require (
github.com/googleapis/gax-go v2.0.2+incompatible // indirect
github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6 // indirect
github.com/jung-kurt/gofpdf v1.13.0 // indirect
github.com/klauspost/compress v1.11.2
github.com/kylelemons/godebug v1.1.0
github.com/mattn/go-sqlite3 v1.11.0 // indirect
github.com/nbutton23/zxcvbn-go v0.0.0-20180912185939-ae427f1e4c1d // indirect
Expand All @@ -61,5 +62,6 @@ require (
golang.org/x/time v0.0.0-20191024005414-555d28b269f0 // indirect
google.golang.org/genproto v0.0.0-20200904004341-0bd0a958aa1d
google.golang.org/grpc v1.31.1
google.golang.org/protobuf v1.25.0
gopkg.in/yaml.v1 v1.0.0-20140924161607-9f9df34309c0 // indirect
)
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,8 @@ github.com/jung-kurt/gofpdf v1.13.0 h1:OrLyhb9VU2dNdxzDu5lpMhX5/vpfm6RY5Jlr4iPQ6
github.com/jung-kurt/gofpdf v1.13.0/go.mod h1:1hl7y57EsiPAkLbOwzpzqgx1A30nQCk/YmFV8S2vmK0=
github.com/kisielk/gotool v1.0.0 h1:AV2c/EiW3KqPNT9ZKl07ehoAGi4C5/01Cfbblndcapg=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/klauspost/compress v1.11.2 h1:MiK62aErc3gIiVEtyzKfeOHgW7atJb5g/KNX5m3c2nQ=
github.com/klauspost/compress v1.11.2/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/pty v1.1.3 h1:/Um6a/ZmD5tF7peoOJ5oN5KMQ0DrGVQSXLNwyckutPk=
Expand Down
2 changes: 2 additions & 0 deletions go/pkg/chunker/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@ go_library(
visibility = ["//visibility:public"],
deps = [
"//go/pkg/digest:go_default_library",
"//go/pkg/reader:go_default_library",
"@com_github_golang_protobuf//proto:go_default_library",
"@com_github_klauspost_compress//zstd:go_default_library",
],
)

Expand Down
176 changes: 125 additions & 51 deletions go/pkg/chunker/chunker.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,15 @@
package chunker

import (
"bufio"
"errors"
"fmt"
"io"
"io/ioutil"
"os"

"github.com/bazelbuild/remote-apis-sdks/go/pkg/digest"
"github.com/bazelbuild/remote-apis-sdks/go/pkg/reader"
"github.com/golang/protobuf/proto"
"github.com/klauspost/compress/zstd"
)

// DefaultChunkSize is the default chunk size for ByteStream.Write RPCs.
Expand All @@ -22,21 +22,27 @@ var IOBufferSize = 10 * 1024 * 1024
// ErrEOF is returned when Next is called when HasNext is false.
var ErrEOF = errors.New("ErrEOF")

// Compressor for full blobs
var fullCompressor *zstd.Encoder

// Chunker can be used to chunk an input into uploadable-size byte slices.
// A single Chunker is NOT thread-safe; it should be used by a single uploader thread.
type Chunker struct {
chunkSize int
reader *bufio.Reader
r reader.ReadSeeker
// An optional cache of the full data. It will be present in these cases:
// * The Chunker was initialized from a []byte.
// * Chunker.FullData was called at least once.
// * Next() was called and the read was less than IOBufferSize.
// Once contents are initialized, they are immutable.
contents []byte
digest digest.Digest
offset int64
initialized bool
path string
contents []byte
// The digest carried here is for easy of data access *only*. It is never
// checked anywhere in the Chunker logic.
digest digest.Digest
offset int64
reachedEOF bool
path string
compressed bool
}

// NewFromBlob initializes a Chunker from the provided bytes buffer.
Expand All @@ -54,23 +60,79 @@ func NewFromBlob(blob []byte, chunkSize int) *Chunker {
}

// NewFromFile initializes a Chunker from the provided file.
// The provided Digest has to match the contents of the file! If the size of the actual contents is
// shorter than the provided Digest size, the Chunker will error on Next(), but otherwise the
// results are unspecified.
// The provided Digest does NOT have to match the contents of the file! It is
// for informational purposes only. Chunker won't check Digest information at
// any time. This means that late errors due to mismatch of digest and
// contents are possible.
func NewFromFile(path string, dg digest.Digest, chunkSize int) *Chunker {
c := NewFromReader(reader.NewFileReadSeeker(path, IOBufferSize), dg, chunkSize)
c.path = path
return c
}

// NewCompressedFromFile is similar to NewFromFile, except that it'll compress
// the contents of the file using zstd compression.
func NewCompressedFromFile(path string, dg digest.Digest, chunkSize int) (*Chunker, error) {
r, err := reader.NewCompressedFileSeeker(path, IOBufferSize)
if err != nil {
return nil, err
}
c := NewFromReader(r, dg, chunkSize)
c.path = path
return c, nil
}

// NewFromReader is similar to NewFromFile, except that it'll take in any
// arbitrary reader.ReadSeeker. Note that the digest is still for convenience,
// rather than being actual used by the Chunker.
func NewFromReader(r reader.ReadSeeker, dg digest.Digest, chunkSize int) *Chunker {
if chunkSize < 1 {
chunkSize = DefaultChunkSize
}
if chunkSize > IOBufferSize {
chunkSize = IOBufferSize
}
return &Chunker{
r: r,
chunkSize: chunkSize,
digest: dg,
path: path,
}
}

// CompressChunker will, if possible, modify the chunker so it'll return compressed
// reads instead. This is irreversible.
func CompressChunker(ch *Chunker) error {
var err error
if fullCompressor == nil {
fullCompressor, err = zstd.NewWriter(nil)
if err != nil {
return err
}
}

if ch.compressed {
return nil
}

if ch.contents != nil {
ch.contents = fullCompressor.EncodeAll(ch.contents, nil)
ch.compressed = true
return nil
}

if ch.r != nil {
cmpR, err := reader.NewCompressedSeeker(ch.r)
if err != nil {
return err
}
ch.r = cmpR
ch.compressed = true
return nil
}

return errors.New("Provided Chunker is invalid")
}

// NewFromProto initializes a Chunker from the marshalled proto message.
func NewFromProto(msg proto.Message, chunkSize int) (*Chunker, error) {
blob, err := proto.Marshal(msg)
Expand Down Expand Up @@ -99,10 +161,6 @@ func (c *Chunker) Offset() int64 {
return c.offset
}

func (c *Chunker) bytesLeft() int64 {
return c.digest.Size - c.offset
}

// ChunkSize returns the maximum size of each chunk.
func (c *Chunker) ChunkSize() int {
return c.chunkSize
Expand All @@ -112,9 +170,12 @@ func (c *Chunker) ChunkSize() int {
// Useful for upload retries.
// TODO(olaola): implement Seek(offset) when we have resumable uploads.
func (c *Chunker) Reset() {
c.reader = nil
if c.r != nil {
// We're ignoring the error here, as not to change the fn signature.
c.r.SeekOffset(0)
}
c.offset = 0
c.initialized = false
c.reachedEOF = false
}

// FullData returns the overall (non-chunked) underlying data. The Chunker is Reset.
Expand All @@ -125,15 +186,21 @@ func (c *Chunker) FullData() ([]byte, error) {
return c.contents, nil
}
var err error
if !c.r.IsInitialized() {
err = c.r.Initialize()
}
if err != nil {
return nil, err
}
// Cache contents so that the next call to FullData() doesn't result in file read.
c.contents, err = ioutil.ReadFile(c.path)
c.contents, err = ioutil.ReadAll(c.r)
return c.contents, err
}

// HasNext returns whether a subsequent call to Next will return a valid chunk. Always true for a
// newly created Chunker.
func (c *Chunker) HasNext() bool {
return !c.initialized || c.bytesLeft() > 0
return !c.reachedEOF
}

// Chunk is a piece of a byte[] blob suitable for being uploaded.
Expand All @@ -149,47 +216,54 @@ func (c *Chunker) Next() (*Chunk, error) {
if !c.HasNext() {
return nil, ErrEOF
}
c.initialized = true
if c.digest.Size == 0 {
c.reachedEOF = true
return &Chunk{}, nil
}
// Empty contents means we don't have the full data cache, and must read it from a file.
if c.contents == nil && c.reader == nil { // Need to initialize the reader.
f, err := os.Open(c.path)
if err != nil {
return nil, err
}
c.reader = bufio.NewReaderSize(f, IOBufferSize)
}
bytesLeft := c.bytesLeft()
bytesToSend := c.chunkSize
if bytesLeft < int64(bytesToSend) {
bytesToSend = int(bytesLeft)
}

var data []byte
if c.contents == nil { // Data is being read from a file rather than static contents cache.
if c.offset == 0 && c.digest.Size <= int64(IOBufferSize) {
data = make([]byte, c.digest.Size)
c.contents = data // Cache the contents to avoid Reads on future Resets.
} else {
data = make([]byte, bytesToSend)
if c.contents != nil {
// As long as we have data in memory, it's much more efficient to return
// a view slice than to copy it around. Contents are immutable so it's okay
// to return the slice.
endRead := int(c.offset) + c.chunkSize
if endRead >= len(c.contents) {
endRead = len(c.contents)
c.reachedEOF = true
}
n, err := io.ReadFull(c.reader, data)
if err != nil {
return nil, err
data = c.contents[c.offset:endRead]
} else {
if !c.r.IsInitialized() {
err := c.r.Initialize()
if err != nil {
return nil, err
}
}
if n < bytesToSend {
return nil, fmt.Errorf("only read %d bytes from %s, expected %d", n, c.path, bytesToSend)

// We don't need to check the amount of bytes read, as ReadFull will yell if
// it's diff than len(data).
data = make([]byte, c.chunkSize)
n, err := io.ReadFull(c.r, data)
data = data[:n]
switch err {
case io.ErrUnexpectedEOF:
// Cache the contents to avoid further IO for small files.
if c.offset == 0 {
c.contents = data
}
fallthrough
case io.EOF:
c.reachedEOF = true
case nil:
default:
return nil, err
}
} else {
// Data is being read from the cache. Contents are immutable so it's okay to return a slice.
data = c.contents[c.offset : int(c.offset)+bytesToSend]
}

res := &Chunk{
Offset: c.offset,
// Reading only up to bytesToSend in case contents contains the entire data.
Data: data[:bytesToSend],
Data: data,
}
c.offset += int64(bytesToSend)
c.offset += int64(len(data))
return res, nil
}
36 changes: 1 addition & 35 deletions go/pkg/chunker/chunker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,40 +281,6 @@ func TestChunkerErrors_ErrEOF(t *testing.T) {
}
}

func TestChunkerErrors_ShortRead(t *testing.T) {
execRoot, err := ioutil.TempDir("", t.Name())
if err != nil {
t.Fatalf("failed to make temp dir: %v", err)
}
defer os.RemoveAll(execRoot)
blob := []byte("123")
path := filepath.Join(execRoot, "file")
if err := ioutil.WriteFile(path, blob, 0777); err != nil {
t.Fatalf("failed to write temp file: %v", err)
}
dg := digest.NewFromBlob([]byte("1234")) // We digest a blob that is longer than the actual one.
IOBufferSize = 10
// The error will be returned immediately, because the first buffer read will be shorter than expected.
c := NewFromFile(path, dg, 2)
got, err := c.Next()
if err == nil {
t.Errorf("c.Next() gave %v, %v, expecting _, error", got, err)
}

IOBufferSize = 3
// This time the error will be not be returned immediately, because the first buffer read will
// return the expected 3 bytes, and only the second one will be shorter than expected.
c = NewFromFile(path, dg, 2)
_, err = c.Next()
if err != nil {
t.Errorf("c.Next() gave error %v, expecting next chunk \"12\"", err)
}
got, err = c.Next()
if err == nil {
t.Errorf("c.Next() gave %v, %v, expecting _, error", got, err)
}
}

func TestChunkerResetOptimization_SmallFile(t *testing.T) {
// Files smaller than IOBufferSize are loaded into memory once and not re-read on Reset.
execRoot, err := ioutil.TempDir("", t.Name())
Expand All @@ -330,7 +296,7 @@ func TestChunkerResetOptimization_SmallFile(t *testing.T) {
}
dg := digest.NewFromBlob(blob)
IOBufferSize = 10
c := NewFromFile(path, dg, 3)
c := NewFromFile(path, dg, 4)
got, err := c.Next()
if err != nil {
t.Errorf("c.Next() gave error %v", err)
Expand Down
4 changes: 4 additions & 0 deletions go/pkg/client/bytestream.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@ func (c *Client) WriteChunked(ctx context.Context, name string, ch *chunker.Chun
return err
}
if chunk.Offset == 0 {
// Notice that the digest in the chunker might be misleading.
// Specifically, for compressed blob uploads, the resource
// name should include the uncompressed digest - while chunker
// should be including the compressed digest.
req.ResourceName = name
}
req.WriteOffset = chunk.Offset
Expand Down
Loading