Skip to content

Commit

Permalink
Add compressed file reader.
Browse files Browse the repository at this point in the history
Follow up for #228.
  • Loading branch information
rubensf committed Nov 24, 2020
1 parent 6098ee8 commit abc1ab3
Show file tree
Hide file tree
Showing 8 changed files with 166 additions and 2 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ require (
github.com/googleapis/gax-go v2.0.2+incompatible // indirect
github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6 // indirect
github.com/jung-kurt/gofpdf v1.13.0 // indirect
github.com/klauspost/compress v1.11.2
github.com/kylelemons/godebug v1.1.0
github.com/mattn/go-sqlite3 v1.11.0 // indirect
github.com/nbutton23/zxcvbn-go v0.0.0-20180912185939-ae427f1e4c1d // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,8 @@ github.com/jung-kurt/gofpdf v1.13.0 h1:OrLyhb9VU2dNdxzDu5lpMhX5/vpfm6RY5Jlr4iPQ6
github.com/jung-kurt/gofpdf v1.13.0/go.mod h1:1hl7y57EsiPAkLbOwzpzqgx1A30nQCk/YmFV8S2vmK0=
github.com/kisielk/gotool v1.0.0 h1:AV2c/EiW3KqPNT9ZKl07ehoAGi4C5/01Cfbblndcapg=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/klauspost/compress v1.11.2 h1:MiK62aErc3gIiVEtyzKfeOHgW7atJb5g/KNX5m3c2nQ=
github.com/klauspost/compress v1.11.2/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/pty v1.1.3 h1:/Um6a/ZmD5tF7peoOJ5oN5KMQ0DrGVQSXLNwyckutPk=
Expand Down
1 change: 1 addition & 0 deletions go/pkg/chunker/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ go_library(
deps = [
"//go/pkg/reader:go_default_library",
"//go/pkg/uploadinfo:go_default_library",
"@com_github_klauspost_compress//zstd:go_default_library",
],
)

Expand Down
5 changes: 5 additions & 0 deletions go/pkg/chunker/chunker.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

"github.com/bazelbuild/remote-apis-sdks/go/pkg/reader"
"github.com/bazelbuild/remote-apis-sdks/go/pkg/uploadinfo"
"github.com/klauspost/compress/zstd"
)

// DefaultChunkSize is the default chunk size for ByteStream.Write RPCs.
Expand All @@ -20,6 +21,9 @@ var IOBufferSize = 10 * 1024 * 1024
// ErrEOF is returned when Next is called when HasNext is false.
var ErrEOF = errors.New("ErrEOF")

// Compressor for full blobs
var fullCompressor, _ = zstd.NewWriter(nil)

// Chunker can be used to chunk an input into uploadable-size byte slices.
// A single Chunker is NOT thread-safe; it should be used by a single uploader thread.
type Chunker struct {
Expand Down Expand Up @@ -94,6 +98,7 @@ func (c *Chunker) ChunkSize() int {
// TODO(olaola): implement Seek(offset) when we have resumable uploads.
func (c *Chunker) Reset() {
if c.r != nil {
// We're ignoring the error here, as not to change the fn signature.
c.r.SeekOffset(0)
}
c.offset = 0
Expand Down
2 changes: 2 additions & 0 deletions go/pkg/reader/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ go_library(
srcs = ["reader.go"],
importpath = "github.com/bazelbuild/remote-apis-sdks/go/pkg/reader",
visibility = ["//visibility:public"],
deps = ["@com_github_klauspost_compress//zstd:go_default_library"],
)

go_test(
Expand All @@ -14,5 +15,6 @@ go_test(
deps = [
"//go/pkg/testutil:go_default_library",
"@com_github_google_go_cmp//cmp:go_default_library",
"@com_github_klauspost_compress//zstd:go_default_library",
],
)
79 changes: 77 additions & 2 deletions go/pkg/reader/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,13 @@ package reader

import (
"bufio"
"bytes"
"errors"
"fmt"
"io"
"os"

"github.com/klauspost/compress/zstd"
)

type Initializable interface {
Expand All @@ -17,7 +20,7 @@ type ReadSeeker interface {
io.Reader
io.Closer
Initializable
SeekOffset(offset int64)
SeekOffset(offset int64) error
}

type fileSeeker struct {
Expand Down Expand Up @@ -65,10 +68,11 @@ func (fio *fileSeeker) Read(p []byte) (int, error) {

// Seek is a simplified version of io.Seeker. It only supports offsets from the
// beginning of the file, and it errors lazily at the next Initialize.
func (fio *fileSeeker) SeekOffset(offset int64) {
func (fio *fileSeeker) SeekOffset(offset int64) error {
fio.seekOffset = offset
fio.initialized = false
fio.reader = nil
return nil
}

// IsInitialized indicates whether this reader is ready. If false, Read calls
Expand Down Expand Up @@ -107,3 +111,74 @@ func (fio *fileSeeker) Initialize() error {
fio.initialized = true
return nil
}

type compressedSeeker struct {
fs ReadSeeker
encd *zstd.Encoder
// This keeps the compressed data
buf *bytes.Buffer
}

func NewCompressedFileSeeker(path string, buffsize int) (ReadSeeker, error) {
return NewCompressedSeeker(NewFileReadSeeker(path, buffsize))
}

func NewCompressedSeeker(fs ReadSeeker) (ReadSeeker, error) {
if _, ok := fs.(*compressedSeeker); ok {
return nil, errors.New("Trying to double compress files.")
}

buf := bytes.NewBuffer(nil)
encd, err := zstd.NewWriter(buf)
return &compressedSeeker{
fs: fs,
encd: encd,
buf: buf,
}, err
}

func (cfs *compressedSeeker) Read(p []byte) (int, error) {
var n int
var errR, errW error
for cfs.buf.Len() < len(p) && errR == nil && errW == nil {
// Read is allowed to use the entity of p as a scratchpad.
n, errR = cfs.fs.Read(p)
// errW must be non-nil if written bytes != from n.
_, errW = cfs.encd.Write(p[:n])
}
m, errR2 := cfs.buf.Read(p)

var retErr error
if errR != nil {
retErr = errR
} else if errW != nil {
retErr = errW
} else {
retErr = errR2
}

if retErr != nil {
cfs.encd.Close()
}

return m, retErr
}

func (cfs *compressedSeeker) SeekOffset(offset int64) error {
cfs.buf.Reset()
if err := cfs.encd.Close(); err != nil {
return err
}

cfs.encd.Reset(cfs.buf)
if err := cfs.fs.SeekOffset(offset); err != nil {
return err
}

return nil
}

func (cfs *compressedSeeker) IsInitialized() bool { return cfs.fs.IsInitialized() }
func (cfs *compressedSeeker) Initialize() error { return cfs.fs.Initialize() }

func (cfs *compressedSeeker) Close() error { return cfs.fs.Close() }
72 changes: 72 additions & 0 deletions go/pkg/reader/reader_test.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
package reader

import (
"bytes"
"io"
"testing"

"github.com/bazelbuild/remote-apis-sdks/go/pkg/testutil"
"github.com/google/go-cmp/cmp"
"github.com/klauspost/compress/zstd"
)

func TestFileReaderSeeks(t *testing.T) {
Expand Down Expand Up @@ -112,3 +114,73 @@ func TestFileReaderSeeksPastOffset(t *testing.T) {
t.Errorf("Expected err, got nil")
}
}

func TestCompressedReader(t *testing.T) {
t.Parallel()
tests := []struct {
name string
blob string
}{
{
name: "basic",
blob: "12345",
},
{
name: "looong",
blob: "Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua.",
},
{
name: "empty blob",
blob: "",
},
}

for _, tc := range tests {
name := tc.name
blob := tc.blob
t.Run(name, func(t *testing.T) {
t.Parallel()

path, err := testutil.CreateFile(t, false, blob)
if err != nil {
t.Fatalf("Failed to initialize temp file: %v", err)
}

buf := bytes.NewBuffer(nil)
encd, err := zstd.NewWriter(buf)
if err != nil {
t.Fatalf("Failed to initialize compressor: %v", err)
}
if _, err = encd.Write([]byte(blob)); err != nil {
t.Fatalf("Failed to compress data: %v", err)
}
if err = encd.Close(); err != nil {
t.Fatalf("Failed to finish compressing data: %v", err)
}
compressedData := buf.Bytes()

r, err := NewCompressedFileSeeker(path, 10)
if err != nil {
t.Fatalf("Failed to initialize compressor reader: %v", err)
}
if err := r.Initialize(); err != nil {
t.Fatalf("Failed to initialize reader: %v", err)
}

// It is theoretically possible for the compressed data to be
// larger than the original
data := make([]byte, len(blob)+100)
var n, m int
for err = nil; err == nil; m, err = r.Read(data[n:]) {
n += m
}
if err != io.EOF {
t.Errorf("Expected err, got nil")
}

if diff := cmp.Diff(compressedData[:n], data[:n]); diff != "" {
t.Errorf("Read() = incorrect result, diff(-want, +got): %v", diff)
}
})
}
}
6 changes: 6 additions & 0 deletions remote-apis-sdks-deps.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -100,3 +100,9 @@ def remote_apis_sdks_go_deps():
commit = "09ad026a62f0561b7f7e276569eda11a6afc9773",
importpath = "cloud.google.com/go",
)
_maybe(
go_repository,
name = "com_github_klauspost_compress",
importpath = "github.com/klauspost/compress",
tag = "v1.11.2",
)

0 comments on commit abc1ab3

Please sign in to comment.