Skip to content

Commit

Permalink
Address comments.
Browse files Browse the repository at this point in the history
  • Loading branch information
rubensf committed Nov 11, 2020
1 parent 7503ac6 commit 6c40fe7
Showing 1 changed file with 36 additions and 44 deletions.
80 changes: 36 additions & 44 deletions go/pkg/chunker/chunker.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ var IOBufferSize = 10 * 1024 * 1024
var ErrEOF = errors.New("ErrEOF")

// Compressor for full blobs
var fullCompressor *zstd.Encoder
var fullCompressor, _ = zstd.NewWriter(nil)

// Chunker can be used to chunk an input into uploadable-size byte slices.
// A single Chunker is NOT thread-safe; it should be used by a single uploader thread.
Expand All @@ -45,20 +45,36 @@ type Chunker struct {
compressed bool
}

// NewFromBlob initializes a Chunker from the provided bytes buffer.
func NewFromBlob(blob []byte, chunkSize int) *Chunker {
func NewChunkerFromBlob(blob []byte, chunkSize int, compressed bool) *Chunker {
if chunkSize < 1 {
chunkSize = DefaultChunkSize
}
contents := make([]byte, len(blob))
copy(contents, blob)
var contents []byte
if compressed {
contents = fullCompressor.EncodeAll(blob, nil)
} else {
contents = make([]byte, len(blob))
copy(contents, blob)
}
return &Chunker{
contents: contents,
chunkSize: chunkSize,
digest: digest.NewFromBlob(blob),
compressed: compressed,
contents: contents,
chunkSize: chunkSize,
digest: digest.NewFromBlob(blob),
}
}

// NewFromBlob initializes a Chunker from the provided bytes buffer.
func NewFromBlob(blob []byte, chunkSize int) *Chunker {
return NewChunkerFromBlob(blob, chunkSize, false)
}

// NewCompressedFromBlob initializes a Chunker from the provided bytes buffer, but it compresses
// the bytes first.
func NewCompressedFromBlob(blob []byte, chunkSize int) *Chunker {
return NewChunkerFromBlob(blob, chunkSize, true)
}

// NewFromFile initializes a Chunker from the provided file.
// The provided Digest does NOT have to match the contents of the file! It is
// for informational purposes only. Chunker won't check Digest information at
Expand All @@ -79,6 +95,7 @@ func NewCompressedFromFile(path string, dg digest.Digest, chunkSize int) (*Chunk
}
c := NewFromReader(r, dg, chunkSize)
c.path = path
c.compressed = true
return c, nil
}

Expand All @@ -99,47 +116,22 @@ func NewFromReader(r reader.ReadSeeker, dg digest.Digest, chunkSize int) *Chunke
}
}

// CompressChunker will, if possible, modify the chunker so it'll return compressed
// reads instead. This is irreversible.
func CompressChunker(ch *Chunker) error {
var err error
if fullCompressor == nil {
fullCompressor, err = zstd.NewWriter(nil)
if err != nil {
return err
}
}

if ch.compressed {
return nil
}

if ch.contents != nil {
ch.contents = fullCompressor.EncodeAll(ch.contents, nil)
ch.compressed = true
return nil
}

if ch.r != nil {
cmpR, err := reader.NewCompressedSeeker(ch.r)
if err != nil {
return err
}
ch.r = cmpR
ch.compressed = true
return nil
func NewChunkerFromProto(msg proto.Message, chunkSize int, compressed bool) (*Chunker, error) {
blob, err := proto.Marshal(msg)
if err != nil {
return nil, err
}

return errors.New("Provided Chunker is invalid")
return NewChunkerFromBlob(blob, chunkSize, compressed), nil
}

// NewFromProto initializes a Chunker from the marshalled proto message.
func NewFromProto(msg proto.Message, chunkSize int) (*Chunker, error) {
blob, err := proto.Marshal(msg)
if err != nil {
return nil, err
}
return NewFromBlob(blob, chunkSize), nil
return NewChunkerFromProto(msg, chunkSize, false)
}

// NewCompressedFromProto initializes a Chunker from the marshalled proto message, but compresses the data.
func NewCompressedFromProto(msg proto.Message, chunkSize int, compressed bool) (*Chunker, error) {
return NewChunkerFromProto(msg, chunkSize, true)
}

// String returns an identifiable representation of the Chunker.
Expand Down

0 comments on commit 6c40fe7

Please sign in to comment.