Skip to content

Commit

Permalink
Add compressed file reader.
Browse files Browse the repository at this point in the history
Follow up for #228.
  • Loading branch information
rubensf committed Nov 6, 2020
1 parent 47c3bc8 commit 0371e47
Show file tree
Hide file tree
Showing 8 changed files with 225 additions and 6 deletions.
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ require (
github.com/googleapis/gax-go v2.0.2+incompatible // indirect
github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6 // indirect
github.com/jung-kurt/gofpdf v1.13.0 // indirect
github.com/klauspost/compress v1.11.2
github.com/kylelemons/godebug v1.1.0
github.com/mattn/go-sqlite3 v1.11.0 // indirect
github.com/nbutton23/zxcvbn-go v0.0.0-20180912185939-ae427f1e4c1d // indirect
Expand All @@ -61,5 +62,6 @@ require (
golang.org/x/time v0.0.0-20191024005414-555d28b269f0 // indirect
google.golang.org/genproto v0.0.0-20200904004341-0bd0a958aa1d
google.golang.org/grpc v1.31.1
google.golang.org/protobuf v1.25.0
gopkg.in/yaml.v1 v1.0.0-20140924161607-9f9df34309c0 // indirect
)
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,8 @@ github.com/jung-kurt/gofpdf v1.13.0 h1:OrLyhb9VU2dNdxzDu5lpMhX5/vpfm6RY5Jlr4iPQ6
github.com/jung-kurt/gofpdf v1.13.0/go.mod h1:1hl7y57EsiPAkLbOwzpzqgx1A30nQCk/YmFV8S2vmK0=
github.com/kisielk/gotool v1.0.0 h1:AV2c/EiW3KqPNT9ZKl07ehoAGi4C5/01Cfbblndcapg=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/klauspost/compress v1.11.2 h1:MiK62aErc3gIiVEtyzKfeOHgW7atJb5g/KNX5m3c2nQ=
github.com/klauspost/compress v1.11.2/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/pty v1.1.3 h1:/Um6a/ZmD5tF7peoOJ5oN5KMQ0DrGVQSXLNwyckutPk=
Expand Down
1 change: 1 addition & 0 deletions go/pkg/chunker/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ go_library(
"//go/pkg/digest:go_default_library",
"//go/pkg/reader:go_default_library",
"@com_github_golang_protobuf//proto:go_default_library",
"@com_github_klauspost_compress//zstd:go_default_library",
],
)

Expand Down
64 changes: 62 additions & 2 deletions go/pkg/chunker/chunker.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/bazelbuild/remote-apis-sdks/go/pkg/digest"
"github.com/bazelbuild/remote-apis-sdks/go/pkg/reader"
"github.com/golang/protobuf/proto"
"github.com/klauspost/compress/zstd"
)

// DefaultChunkSize is the default chunk size for ByteStream.Write RPCs.
Expand All @@ -21,6 +22,9 @@ var IOBufferSize = 10 * 1024 * 1024
// ErrEOF is returned when Next is called when HasNext is false.
var ErrEOF = errors.New("ErrEOF")

// Compressor for full blobs
var fullCompressor *zstd.Encoder

// Chunker can be used to chunk an input into uploadable-size byte slices.
// A single Chunker is NOT thread-safe; it should be used by a single uploader thread.
type Chunker struct {
Expand All @@ -38,6 +42,7 @@ type Chunker struct {
offset int64
reachedEOF bool
path string
compressed bool
}

// NewFromBlob initializes a Chunker from the provided bytes buffer.
Expand All @@ -60,20 +65,74 @@ func NewFromBlob(blob []byte, chunkSize int) *Chunker {
// any time. This means that late errors due to mismatch of digest and
// contents are possible.
func NewFromFile(path string, dg digest.Digest, chunkSize int) *Chunker {
c := NewFromReader(reader.NewFileReadSeeker(path, IOBufferSize), dg, chunkSize)
c.path = path
return c
}

// NewCompressedFromFile is similar to NewFromFile, except that it'll compress
// the contents of the file using zstd compression.
func NewCompressedFromFile(path string, dg digest.Digest, chunkSize int) (*Chunker, error) {
r, err := reader.NewCompressedFileSeeker(path, IOBufferSize)
if err != nil {
return nil, err
}
c := NewFromReader(r, dg, chunkSize)
c.path = path
return c, nil
}

// NewFromReader is similar to NewFromFile, except that it'll take in any
// arbitrary reader.ReadSeeker. Note that the digest is still for convenience,
// rather than being actual used by the Chunker.
func NewFromReader(r reader.ReadSeeker, dg digest.Digest, chunkSize int) *Chunker {
if chunkSize < 1 {
chunkSize = DefaultChunkSize
}
if chunkSize > IOBufferSize {
chunkSize = IOBufferSize
}
return &Chunker{
r: reader.NewFileReadSeeker(path, IOBufferSize),
r: r,
chunkSize: chunkSize,
digest: dg,
path: path,
}
}

// CompressChunker will, if possible, modify the chunker so it'll return compressed
// reads instead. This is irreversible.
func CompressChunker(ch *Chunker) error {
var err error
if fullCompressor == nil {
fullCompressor, err = zstd.NewWriter(nil)
if err != nil {
return err
}
}

if ch.compressed {
return nil
}

if ch.contents != nil {
ch.contents = fullCompressor.EncodeAll(ch.contents, nil)
ch.compressed = true
return nil
}

if ch.r != nil {
cmpR, err := reader.NewCompressedSeeker(ch.r)
if err != nil {
return err
}
ch.r = cmpR
ch.compressed = true
return nil
}

return errors.New("Provided Chunker is invalid")
}

// NewFromProto initializes a Chunker from the marshalled proto message.
func NewFromProto(msg proto.Message, chunkSize int) (*Chunker, error) {
blob, err := proto.Marshal(msg)
Expand Down Expand Up @@ -112,6 +171,7 @@ func (c *Chunker) ChunkSize() int {
// TODO(olaola): implement Seek(offset) when we have resumable uploads.
func (c *Chunker) Reset() {
if c.r != nil {
// We're ignoring the error here, as not to change the fn signature.
c.r.SeekOffset(0)
}
c.offset = 0
Expand Down
6 changes: 5 additions & 1 deletion go/pkg/reader/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,15 @@ go_library(
srcs = ["reader.go"],
importpath = "github.com/bazelbuild/remote-apis-sdks/go/pkg/reader",
visibility = ["//visibility:public"],
deps = ["@com_github_klauspost_compress//zstd:go_default_library"],
)

go_test(
name = "go_default_test",
srcs = ["reader_test.go"],
embed = [":go_default_library"],
deps = ["@com_github_google_go_cmp//cmp:go_default_library"],
deps = [
"@com_github_google_go_cmp//cmp:go_default_library",
"@com_github_klauspost_compress//zstd:go_default_library",
],
)
74 changes: 72 additions & 2 deletions go/pkg/reader/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,13 @@ package reader

import (
"bufio"
"bytes"
"errors"
"fmt"
"io"
"os"

"github.com/klauspost/compress/zstd"
)

type Initializable interface {
Expand All @@ -16,7 +19,7 @@ type Initializable interface {
type ReadSeeker interface {
io.Reader
Initializable
SeekOffset(offset int64)
SeekOffset(offset int64) error
}

type fileSeeker struct {
Expand Down Expand Up @@ -53,10 +56,11 @@ func (fio *fileSeeker) Read(p []byte) (int, error) {

// Seek is a simplified version of io.Seeker. It only supports offsets from the
// beginning of the file, and it errors lazily at the next Initialize.
func (fio *fileSeeker) SeekOffset(offset int64) {
func (fio *fileSeeker) SeekOffset(offset int64) error {
fio.seekOffset = offset
fio.initialized = false
fio.reader = nil
return nil
}

// IsInitialized indicates whether this reader is ready. If false, Read calls
Expand Down Expand Up @@ -95,3 +99,69 @@ func (fio *fileSeeker) Initialize() error {
fio.initialized = true
return nil
}

type compressedSeeker struct {
fs ReadSeeker
encd *zstd.Encoder
// This keeps the encrypted data
buf *bytes.Buffer
}

func NewCompressedFileSeeker(path string, buffsize int) (ReadSeeker, error) {
return NewCompressedSeeker(NewFileReadSeeker(path, buffsize))
}

func NewCompressedSeeker(fs ReadSeeker) (ReadSeeker, error) {
if _, ok := fs.(*compressedSeeker); ok {
return nil, errors.New("Trying to double compress files.")
}

buf := bytes.NewBuffer(nil)
encd, err := zstd.NewWriter(buf)
return &compressedSeeker{
fs: fs,
encd: encd,
buf: buf,
}, err
}

func (cfs *compressedSeeker) Read(p []byte) (int, error) {
var n int
var errR, errW error
for cfs.buf.Len() < len(p) && errR == nil && errW == nil {
// Read is allowed to use the entity of p as a scratchpad.
n, errR = cfs.fs.Read(p)
// errW must be non-nil if written bytes != from n.
_, errW = cfs.encd.Write(p[:n])
}
m, err := cfs.buf.Read(p)

var retErr error
if errR == io.EOF {
cfs.encd.Close()
} else if errR != nil {
retErr = errR
}
if retErr == nil && errW != nil {
retErr = errW
}
if retErr == nil {
retErr = err
}
return m, err
}

func (cfs *compressedSeeker) SeekOffset(offset int64) error {
cfs.buf.Reset()
err := cfs.encd.Close()
cfs.encd.Reset(cfs.buf)
err2 := cfs.fs.SeekOffset(offset)

if err != nil {
return err
}
return err2
}

func (cfs *compressedSeeker) IsInitialized() bool { return cfs.fs.IsInitialized() }
func (cfs *compressedSeeker) Initialize() error { return cfs.fs.Initialize() }
76 changes: 75 additions & 1 deletion go/pkg/reader/reader_test.go
Original file line number Diff line number Diff line change
@@ -1,18 +1,21 @@
package reader

import (
"bytes"
"io"
"io/ioutil"
"os"
"path/filepath"
"strings"
"testing"

"github.com/google/go-cmp/cmp"
"github.com/klauspost/compress/zstd"
)

func setTmpFile(t *testing.T, content []byte) (string, func()) {
t.Helper()
execRoot, err := ioutil.TempDir("", "")
execRoot, err := ioutil.TempDir("", strings.Replace(t.Name(), "/", "_", -1))
if err != nil {
t.Fatalf("failed to make temp dir: %v", err)
}
Expand All @@ -30,6 +33,7 @@ func setTmpFile(t *testing.T, content []byte) (string, func()) {
}

func TestFileReaderSeeks(t *testing.T) {
t.Parallel()
tests := []struct {
name string
IOBuffSize int
Expand Down Expand Up @@ -58,6 +62,7 @@ func TestFileReaderSeeks(t *testing.T) {

for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
t.Parallel()
path, clean := setTmpFile(t, tc.blob)
defer clean()

Expand Down Expand Up @@ -110,6 +115,7 @@ func TestFileReaderSeeks(t *testing.T) {
}

func TestFileReaderSeeksPastOffset(t *testing.T) {
t.Parallel()
path, clean := setTmpFile(t, []byte("12345"))
defer clean()

Expand All @@ -125,3 +131,71 @@ func TestFileReaderSeeksPastOffset(t *testing.T) {
t.Errorf("Expected err, got nil")
}
}

func TestCompressedReader(t *testing.T) {
t.Parallel()
tests := []struct {
name string
blob []byte
}{
{
name: "basic",
blob: []byte("12345"),
},
{
name: "looong",
blob: []byte("Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua."),
},
{
name: "empty blob",
blob: []byte(""),
},
}

for _, tc := range tests {
name := tc.name
blob := tc.blob
t.Run(name, func(t *testing.T) {
t.Parallel()

path, clean := setTmpFile(t, blob)
defer clean()

buf := bytes.NewBuffer(nil)
encd, err := zstd.NewWriter(buf)
if err != nil {
t.Fatalf("Failed to initialize compressor: %v", err)
}
if _, err = encd.Write(blob); err != nil {
t.Fatalf("Failed to compress data: %v", err)
}
if err = encd.Close(); err != nil {
t.Fatalf("Failed to finish compressing data: %v", err)
}
compressedData := buf.Bytes()

r, err := NewCompressedFileSeeker(path, 10)
if err != nil {
t.Fatalf("Failed to initialize compressor reader: %v", err)
}
if err := r.Initialize(); err != nil {
t.Fatalf("Failed to initialize reader: %v", err)
}

// It is theoretically possible for the compressed data to be
// larger than the original
data := make([]byte, len(blob)+100)
var n, m int
for err = nil; err == nil; m, err = r.Read(data[n:]) {
n += m
}
if err != io.EOF {
t.Errorf("Expected err, got nil")
}

if diff := cmp.Diff(compressedData[:n], data[:n]); diff != "" {
t.Errorf("Read() = incorrect result, diff(-want, +got): %v", diff)
}
})
}
}
6 changes: 6 additions & 0 deletions remote-apis-sdks-deps.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -100,3 +100,9 @@ def remote_apis_sdks_go_deps():
commit = "09ad026a62f0561b7f7e276569eda11a6afc9773",
importpath = "cloud.google.com/go",
)
_maybe(
go_repository,
name = "com_github_klauspost_compress",
importpath = "github.com/klauspost/compress",
tag = "v1.11.2",
)

0 comments on commit 0371e47

Please sign in to comment.