diff --git a/go/pkg/chunker/chunker.go b/go/pkg/chunker/chunker.go index 57fbb362c..d3c75cd83 100644 --- a/go/pkg/chunker/chunker.go +++ b/go/pkg/chunker/chunker.go @@ -23,7 +23,7 @@ var IOBufferSize = 10 * 1024 * 1024 var ErrEOF = errors.New("ErrEOF") // Compressor for full blobs -var fullCompressor *zstd.Encoder +var fullCompressor, _ = zstd.NewWriter(nil) // Chunker can be used to chunk an input into uploadable-size byte slices. // A single Chunker is NOT thread-safe; it should be used by a single uploader thread. @@ -45,20 +45,36 @@ type Chunker struct { compressed bool } -// NewFromBlob initializes a Chunker from the provided bytes buffer. -func NewFromBlob(blob []byte, chunkSize int) *Chunker { +func NewChunkerFromBlob(blob []byte, chunkSize int, compressed bool) *Chunker { if chunkSize < 1 { chunkSize = DefaultChunkSize } - contents := make([]byte, len(blob)) - copy(contents, blob) + var contents []byte + if compressed { + contents = fullCompressor.EncodeAll(blob, nil) + } else { + contents = make([]byte, len(blob)) + copy(contents, blob) + } return &Chunker{ - contents: contents, - chunkSize: chunkSize, - digest: digest.NewFromBlob(blob), + compressed: compressed, + contents: contents, + chunkSize: chunkSize, + digest: digest.NewFromBlob(blob), } } +// NewFromBlob initializes a Chunker from the provided bytes buffer. +func NewFromBlob(blob []byte, chunkSize int) *Chunker { + return NewChunkerFromBlob(blob, chunkSize, false) +} + +// NewCompressedFromBlob initializes a Chunker from the provided bytes buffer, but it compresses +// the bytes first. +func NewCompressedFromBlob(blob []byte, chunkSize int) *Chunker { + return NewChunkerFromBlob(blob, chunkSize, true) +} + // NewFromFile initializes a Chunker from the provided file. // The provided Digest does NOT have to match the contents of the file! It is // for informational purposes only. Chunker won't check Digest information at @@ -79,6 +95,7 @@ func NewCompressedFromFile(path string, dg digest.Digest, chunkSize int) (*Chunk } c := NewFromReader(r, dg, chunkSize) c.path = path + c.compressed = true return c, nil } @@ -99,47 +116,22 @@ func NewFromReader(r reader.ReadSeeker, dg digest.Digest, chunkSize int) *Chunke } } -// CompressChunker will, if possible, modify the chunker so it'll return compressed -// reads instead. This is irreversible. -func CompressChunker(ch *Chunker) error { - var err error - if fullCompressor == nil { - fullCompressor, err = zstd.NewWriter(nil) - if err != nil { - return err - } - } - - if ch.compressed { - return nil - } - - if ch.contents != nil { - ch.contents = fullCompressor.EncodeAll(ch.contents, nil) - ch.compressed = true - return nil - } - - if ch.r != nil { - cmpR, err := reader.NewCompressedSeeker(ch.r) - if err != nil { - return err - } - ch.r = cmpR - ch.compressed = true - return nil +func NewChunkerFromProto(msg proto.Message, chunkSize int, compressed bool) (*Chunker, error) { + blob, err := proto.Marshal(msg) + if err != nil { + return nil, err } - - return errors.New("Provided Chunker is invalid") + return NewChunkerFromBlob(blob, chunkSize, compressed), nil } // NewFromProto initializes a Chunker from the marshalled proto message. func NewFromProto(msg proto.Message, chunkSize int) (*Chunker, error) { - blob, err := proto.Marshal(msg) - if err != nil { - return nil, err - } - return NewFromBlob(blob, chunkSize), nil + return NewChunkerFromProto(msg, chunkSize, false) +} + +// NewCompressedFromProto initializes a Chunker from the marshalled proto message, but compresses the data. +func NewCompressedFromProto(msg proto.Message, chunkSize int, compressed bool) (*Chunker, error) { + return NewChunkerFromProto(msg, chunkSize, true) } // String returns an identifiable representation of the Chunker.