diff --git a/go.mod b/go.mod index 1f4642951..15a712bda 100644 --- a/go.mod +++ b/go.mod @@ -40,6 +40,7 @@ require ( github.com/googleapis/gax-go v2.0.2+incompatible // indirect github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6 // indirect github.com/jung-kurt/gofpdf v1.13.0 // indirect + github.com/klauspost/compress v1.11.2 github.com/kylelemons/godebug v1.1.0 github.com/mattn/go-sqlite3 v1.11.0 // indirect github.com/nbutton23/zxcvbn-go v0.0.0-20180912185939-ae427f1e4c1d // indirect @@ -61,5 +62,6 @@ require ( golang.org/x/time v0.0.0-20191024005414-555d28b269f0 // indirect google.golang.org/genproto v0.0.0-20200904004341-0bd0a958aa1d google.golang.org/grpc v1.31.1 + google.golang.org/protobuf v1.25.0 gopkg.in/yaml.v1 v1.0.0-20140924161607-9f9df34309c0 // indirect ) diff --git a/go.sum b/go.sum index 31bef0f0e..3b41b5f39 100644 --- a/go.sum +++ b/go.sum @@ -233,6 +233,8 @@ github.com/jung-kurt/gofpdf v1.13.0 h1:OrLyhb9VU2dNdxzDu5lpMhX5/vpfm6RY5Jlr4iPQ6 github.com/jung-kurt/gofpdf v1.13.0/go.mod h1:1hl7y57EsiPAkLbOwzpzqgx1A30nQCk/YmFV8S2vmK0= github.com/kisielk/gotool v1.0.0 h1:AV2c/EiW3KqPNT9ZKl07ehoAGi4C5/01Cfbblndcapg= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= +github.com/klauspost/compress v1.11.2 h1:MiK62aErc3gIiVEtyzKfeOHgW7atJb5g/KNX5m3c2nQ= +github.com/klauspost/compress v1.11.2/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/pty v1.1.3 h1:/Um6a/ZmD5tF7peoOJ5oN5KMQ0DrGVQSXLNwyckutPk= diff --git a/go/pkg/chunker/BUILD.bazel b/go/pkg/chunker/BUILD.bazel index 967a8c32f..fe8cd8a80 100644 --- a/go/pkg/chunker/BUILD.bazel +++ b/go/pkg/chunker/BUILD.bazel @@ -9,6 +9,7 @@ go_library( "//go/pkg/digest:go_default_library", "//go/pkg/reader:go_default_library", "@com_github_golang_protobuf//proto:go_default_library", + "@com_github_klauspost_compress//zstd:go_default_library", ], ) diff --git a/go/pkg/chunker/chunker.go b/go/pkg/chunker/chunker.go index a60e1e203..2f1fd075e 100644 --- a/go/pkg/chunker/chunker.go +++ b/go/pkg/chunker/chunker.go @@ -10,6 +10,7 @@ import ( "github.com/bazelbuild/remote-apis-sdks/go/pkg/digest" "github.com/bazelbuild/remote-apis-sdks/go/pkg/reader" "github.com/golang/protobuf/proto" + "github.com/klauspost/compress/zstd" ) // DefaultChunkSize is the default chunk size for ByteStream.Write RPCs. @@ -21,6 +22,9 @@ var IOBufferSize = 10 * 1024 * 1024 // ErrEOF is returned when Next is called when HasNext is false. var ErrEOF = errors.New("ErrEOF") +// Compressor for full blobs +var fullCompressor *zstd.Encoder + // Chunker can be used to chunk an input into uploadable-size byte slices. // A single Chunker is NOT thread-safe; it should be used by a single uploader thread. type Chunker struct { @@ -38,6 +42,7 @@ type Chunker struct { offset int64 reachedEOF bool path string + compressed bool } // NewFromBlob initializes a Chunker from the provided bytes buffer. @@ -60,6 +65,27 @@ func NewFromBlob(blob []byte, chunkSize int) *Chunker { // any time. This means that late errors due to mismatch of digest and // contents are possible. func NewFromFile(path string, dg digest.Digest, chunkSize int) *Chunker { + c := NewFromReader(reader.NewFileReadSeeker(path, IOBufferSize), dg, chunkSize) + c.path = path + return c +} + +// NewCompressedFromFile is similar to NewFromFile, except that it'll compress +// the contents of the file using zstd compression. +func NewCompressedFromFile(path string, dg digest.Digest, chunkSize int) (*Chunker, error) { + r, err := reader.NewCompressedFileSeeker(path, IOBufferSize) + if err != nil { + return nil, err + } + c := NewFromReader(r, dg, chunkSize) + c.path = path + return c, nil +} + +// NewFromReader is similar to NewFromFile, except that it'll take in any +// arbitrary reader.ReadSeeker. Note that the digest is still for convenience, +// rather than being actual used by the Chunker. +func NewFromReader(r reader.ReadSeeker, dg digest.Digest, chunkSize int) *Chunker { if chunkSize < 1 { chunkSize = DefaultChunkSize } @@ -67,13 +93,46 @@ func NewFromFile(path string, dg digest.Digest, chunkSize int) *Chunker { chunkSize = IOBufferSize } return &Chunker{ - r: reader.NewFileReadSeeker(path, IOBufferSize), + r: r, chunkSize: chunkSize, digest: dg, - path: path, } } +// CompressChunker will, if possible, modify the chunker so it'll return compressed +// reads instead. This is irreversible. +func CompressChunker(ch *Chunker) error { + var err error + if fullCompressor == nil { + fullCompressor, err = zstd.NewWriter(nil) + if err != nil { + return err + } + } + + if ch.compressed { + return nil + } + + if ch.contents != nil { + ch.contents = fullCompressor.EncodeAll(ch.contents, nil) + ch.compressed = true + return nil + } + + if ch.r != nil { + cmpR, err := reader.NewCompressedSeeker(ch.r) + if err != nil { + return err + } + ch.r = cmpR + ch.compressed = true + return nil + } + + return errors.New("Provided Chunker is invalid") +} + // NewFromProto initializes a Chunker from the marshalled proto message. func NewFromProto(msg proto.Message, chunkSize int) (*Chunker, error) { blob, err := proto.Marshal(msg) @@ -112,6 +171,7 @@ func (c *Chunker) ChunkSize() int { // TODO(olaola): implement Seek(offset) when we have resumable uploads. func (c *Chunker) Reset() { if c.r != nil { + // We're ignoring the error here, as not to change the fn signature. c.r.SeekOffset(0) } c.offset = 0 diff --git a/go/pkg/reader/BUILD.bazel b/go/pkg/reader/BUILD.bazel index 6fe0173fe..dcc259d60 100644 --- a/go/pkg/reader/BUILD.bazel +++ b/go/pkg/reader/BUILD.bazel @@ -5,11 +5,15 @@ go_library( srcs = ["reader.go"], importpath = "github.com/bazelbuild/remote-apis-sdks/go/pkg/reader", visibility = ["//visibility:public"], + deps = ["@com_github_klauspost_compress//zstd:go_default_library"], ) go_test( name = "go_default_test", srcs = ["reader_test.go"], embed = [":go_default_library"], - deps = ["@com_github_google_go_cmp//cmp:go_default_library"], + deps = [ + "@com_github_google_go_cmp//cmp:go_default_library", + "@com_github_klauspost_compress//zstd:go_default_library", + ], ) diff --git a/go/pkg/reader/reader.go b/go/pkg/reader/reader.go index bbdcbcab4..a1554f116 100644 --- a/go/pkg/reader/reader.go +++ b/go/pkg/reader/reader.go @@ -2,10 +2,13 @@ package reader import ( "bufio" + "bytes" "errors" "fmt" "io" "os" + + "github.com/klauspost/compress/zstd" ) type Initializable interface { @@ -16,7 +19,7 @@ type Initializable interface { type ReadSeeker interface { io.Reader Initializable - SeekOffset(offset int64) + SeekOffset(offset int64) error } type fileSeeker struct { @@ -53,10 +56,11 @@ func (fio *fileSeeker) Read(p []byte) (int, error) { // Seek is a simplified version of io.Seeker. It only supports offsets from the // beginning of the file, and it errors lazily at the next Initialize. -func (fio *fileSeeker) SeekOffset(offset int64) { +func (fio *fileSeeker) SeekOffset(offset int64) error { fio.seekOffset = offset fio.initialized = false fio.reader = nil + return nil } // IsInitialized indicates whether this reader is ready. If false, Read calls @@ -95,3 +99,69 @@ func (fio *fileSeeker) Initialize() error { fio.initialized = true return nil } + +type compressedSeeker struct { + fs ReadSeeker + encd *zstd.Encoder + // This keeps the encrypted data + buf *bytes.Buffer +} + +func NewCompressedFileSeeker(path string, buffsize int) (ReadSeeker, error) { + return NewCompressedSeeker(NewFileReadSeeker(path, buffsize)) +} + +func NewCompressedSeeker(fs ReadSeeker) (ReadSeeker, error) { + if _, ok := fs.(*compressedSeeker); ok { + return nil, errors.New("Trying to double compress files.") + } + + buf := bytes.NewBuffer(nil) + encd, err := zstd.NewWriter(buf) + return &compressedSeeker{ + fs: fs, + encd: encd, + buf: buf, + }, err +} + +func (cfs *compressedSeeker) Read(p []byte) (int, error) { + var n int + var errR, errW error + for cfs.buf.Len() < len(p) && errR == nil && errW == nil { + // Read is allowed to use the entity of p as a scratchpad. + n, errR = cfs.fs.Read(p) + // errW must be non-nil if written bytes != from n. + _, errW = cfs.encd.Write(p[:n]) + } + m, err := cfs.buf.Read(p) + + var retErr error + if errR == io.EOF { + cfs.encd.Close() + } else if errR != nil { + retErr = errR + } + if retErr == nil && errW != nil { + retErr = errW + } + if retErr == nil { + retErr = err + } + return m, err +} + +func (cfs *compressedSeeker) SeekOffset(offset int64) error { + cfs.buf.Reset() + err := cfs.encd.Close() + cfs.encd.Reset(cfs.buf) + err2 := cfs.fs.SeekOffset(offset) + + if err != nil { + return err + } + return err2 +} + +func (cfs *compressedSeeker) IsInitialized() bool { return cfs.fs.IsInitialized() } +func (cfs *compressedSeeker) Initialize() error { return cfs.fs.Initialize() } diff --git a/go/pkg/reader/reader_test.go b/go/pkg/reader/reader_test.go index 401118f28..d0dc8f87e 100644 --- a/go/pkg/reader/reader_test.go +++ b/go/pkg/reader/reader_test.go @@ -1,18 +1,21 @@ package reader import ( + "bytes" "io" "io/ioutil" "os" "path/filepath" + "strings" "testing" "github.com/google/go-cmp/cmp" + "github.com/klauspost/compress/zstd" ) func setTmpFile(t *testing.T, content []byte) (string, func()) { t.Helper() - execRoot, err := ioutil.TempDir("", "") + execRoot, err := ioutil.TempDir("", strings.Replace(t.Name(), "/", "_", -1)) if err != nil { t.Fatalf("failed to make temp dir: %v", err) } @@ -30,6 +33,7 @@ func setTmpFile(t *testing.T, content []byte) (string, func()) { } func TestFileReaderSeeks(t *testing.T) { + t.Parallel() tests := []struct { name string IOBuffSize int @@ -58,6 +62,7 @@ func TestFileReaderSeeks(t *testing.T) { for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { + t.Parallel() path, clean := setTmpFile(t, tc.blob) defer clean() @@ -110,6 +115,7 @@ func TestFileReaderSeeks(t *testing.T) { } func TestFileReaderSeeksPastOffset(t *testing.T) { + t.Parallel() path, clean := setTmpFile(t, []byte("12345")) defer clean() @@ -125,3 +131,71 @@ func TestFileReaderSeeksPastOffset(t *testing.T) { t.Errorf("Expected err, got nil") } } + +func TestCompressedReader(t *testing.T) { + t.Parallel() + tests := []struct { + name string + blob []byte + }{ + { + name: "basic", + blob: []byte("12345"), + }, + { + name: "looong", + blob: []byte("Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua."), + }, + { + name: "empty blob", + blob: []byte(""), + }, + } + + for _, tc := range tests { + name := tc.name + blob := tc.blob + t.Run(name, func(t *testing.T) { + t.Parallel() + + path, clean := setTmpFile(t, blob) + defer clean() + + buf := bytes.NewBuffer(nil) + encd, err := zstd.NewWriter(buf) + if err != nil { + t.Fatalf("Failed to initialize compressor: %v", err) + } + if _, err = encd.Write(blob); err != nil { + t.Fatalf("Failed to compress data: %v", err) + } + if err = encd.Close(); err != nil { + t.Fatalf("Failed to finish compressing data: %v", err) + } + compressedData := buf.Bytes() + + r, err := NewCompressedFileSeeker(path, 10) + if err != nil { + t.Fatalf("Failed to initialize compressor reader: %v", err) + } + if err := r.Initialize(); err != nil { + t.Fatalf("Failed to initialize reader: %v", err) + } + + // It is theoretically possible for the compressed data to be + // larger than the original + data := make([]byte, len(blob)+100) + var n, m int + for err = nil; err == nil; m, err = r.Read(data[n:]) { + n += m + } + if err != io.EOF { + t.Errorf("Expected err, got nil") + } + + if diff := cmp.Diff(compressedData[:n], data[:n]); diff != "" { + t.Errorf("Read() = incorrect result, diff(-want, +got): %v", diff) + } + }) + } +} diff --git a/remote-apis-sdks-deps.bzl b/remote-apis-sdks-deps.bzl index 55bd8ed65..b48c7b57b 100644 --- a/remote-apis-sdks-deps.bzl +++ b/remote-apis-sdks-deps.bzl @@ -100,3 +100,9 @@ def remote_apis_sdks_go_deps(): commit = "09ad026a62f0561b7f7e276569eda11a6afc9773", importpath = "cloud.google.com/go", ) + _maybe( + go_repository, + name = "com_github_klauspost_compress", + importpath = "github.com/klauspost/compress", + tag = "v1.11.2", + )