From 0b4e2a9d94f7442171aaf484c309450c1122efc2 Mon Sep 17 00:00:00 2001 From: Chunzhu Li Date: Sun, 15 Nov 2020 21:20:08 -0600 Subject: [PATCH] add uploader compress writer (#566) * add uploader compress writer * fix hounder * fix hounder again * ut * update Makefile * address comments * delete unused code * fix go fmt * address comments, add zlib compresser * fix hound * fix dot * address comment * Revert "address comment" This reverts commit f0317d77e85380066bcf408acd64152dd30962a1. * address comment * fix fmt Co-authored-by: 3pointer --- Makefile | 6 +- pkg/storage/writer.go | 109 +++++++++++++++++++++++++++++++++++-- pkg/storage/writer_test.go | 87 ++++++++++++++++++++++++++++- 3 files changed, 194 insertions(+), 8 deletions(-) diff --git a/Makefile b/Makefile index 812da028a..eefe627c4 100644 --- a/Makefile +++ b/Makefile @@ -87,7 +87,7 @@ check: @make tools errdoc static lint static: export GO111MODULE=on -static: tools +static: prepare tools @ # Not running vet and fmt through metalinter becauase it ends up looking at vendor tools/bin/goimports -w -d -format-only -local $(BR_PKG) $$($(PACKAGE_DIRECTORIES)) 2>&1 | $(CHECKER) tools/bin/govet --shadow $$($(PACKAGE_DIRECTORIES)) 2>&1 | $(CHECKER) @@ -125,10 +125,12 @@ static: tools @# TODO: allow more APIs when we need to support "workaound". grep -Rn --exclude="*_test.go" -E "(\t| )errors\.[A-Z]" cmd pkg | \ grep -vE "Normalize|Annotate|Trace|Cause" 2>&1 | $(CHECKER) + $(FINISH_MOD) -lint: tools +lint: prepare tools @echo "linting" CGO_ENABLED=0 tools/bin/revive -formatter friendly -config revive.toml $$($(PACKAGES)) + $(FINISH_MOD) tidy: @echo "go mod tidy" diff --git a/pkg/storage/writer.go b/pkg/storage/writer.go index 7787448f9..e10530ff5 100644 --- a/pkg/storage/writer.go +++ b/pkg/storage/writer.go @@ -2,11 +2,108 @@ package storage import ( "bytes" + "compress/gzip" "context" + "io" ) +// CompressType represents the type of compression. +type CompressType uint8 + +const ( + // NoCompression won't compress given bytes. + NoCompression CompressType = iota + // Gzip will compress given bytes in gzip format. + Gzip +) + +type interceptBuffer interface { + io.WriteCloser + Len() int + Cap() int + Bytes() []byte + Flush() error + Reset() +} + +func newInterceptBuffer(chunkSize int, compressType CompressType) interceptBuffer { + switch compressType { + case NoCompression: + return newNoCompressionBuffer(chunkSize) + case Gzip: + return newGzipBuffer(chunkSize) + default: + return nil + } +} + +type noCompressionBuffer struct { + *bytes.Buffer +} + +func (b *noCompressionBuffer) Flush() error { + return nil +} + +func (b *noCompressionBuffer) Close() error { + return nil +} + +func newNoCompressionBuffer(chunkSize int) *noCompressionBuffer { + return &noCompressionBuffer{bytes.NewBuffer(make([]byte, 0, chunkSize))} +} + +type simpleCompressWriter interface { + io.WriteCloser + Flush() error +} + +type simpleCompressBuffer struct { + *bytes.Buffer + compressWriter simpleCompressWriter + len int + cap int +} + +func (b *simpleCompressBuffer) Write(p []byte) (int, error) { + written, err := b.compressWriter.Write(p) + b.len += written + return written, err +} + +func (b *simpleCompressBuffer) Len() int { + return b.len +} + +func (b *simpleCompressBuffer) Cap() int { + return b.cap +} + +func (b *simpleCompressBuffer) Reset() { + b.len = 0 + b.Buffer.Reset() +} + +func (b *simpleCompressBuffer) Flush() error { + return b.compressWriter.Flush() +} + +func (b *simpleCompressBuffer) Close() error { + return b.compressWriter.Close() +} + +func newGzipBuffer(chunkSize int) *simpleCompressBuffer { + bf := bytes.NewBuffer(make([]byte, 0, chunkSize)) + return &simpleCompressBuffer{ + Buffer: bf, + len: 0, + cap: chunkSize, + compressWriter: gzip.NewWriter(bf), + } +} + type uploaderWriter struct { - buf *bytes.Buffer + buf interceptBuffer uploader Uploader } @@ -27,6 +124,7 @@ func (u *uploaderWriter) Write(ctx context.Context, p []byte) (int, error) { } p = p[w:] } + u.buf.Flush() err := u.uploadChunk(ctx) if err != nil { return 0, err @@ -47,6 +145,7 @@ func (u *uploaderWriter) uploadChunk(ctx context.Context) error { } func (u *uploaderWriter) Close(ctx context.Context) error { + u.buf.Close() err := u.uploadChunk(ctx) if err != nil { return err @@ -55,15 +154,15 @@ func (u *uploaderWriter) Close(ctx context.Context) error { } // NewUploaderWriter wraps the Writer interface over an uploader. -func NewUploaderWriter(uploader Uploader, chunkSize int) Writer { - return newUploaderWriter(uploader, chunkSize) +func NewUploaderWriter(uploader Uploader, chunkSize int, compressType CompressType) Writer { + return newUploaderWriter(uploader, chunkSize, compressType) } // newUploaderWriter is used for testing only. -func newUploaderWriter(uploader Uploader, chunkSize int) *uploaderWriter { +func newUploaderWriter(uploader Uploader, chunkSize int, compressType CompressType) *uploaderWriter { return &uploaderWriter{ uploader: uploader, - buf: bytes.NewBuffer(make([]byte, 0, chunkSize))} + buf: newInterceptBuffer(chunkSize, compressType)} } // BufferWriter is a Writer implementation on top of bytes.Buffer that is useful for testing. diff --git a/pkg/storage/writer_test.go b/pkg/storage/writer_test.go index 0c5979db0..41dfdfa6d 100644 --- a/pkg/storage/writer_test.go +++ b/pkg/storage/writer_test.go @@ -3,8 +3,12 @@ package storage import ( + "bytes" + "compress/gzip" "context" + "io" "io/ioutil" + "os" "path/filepath" "strings" @@ -29,7 +33,7 @@ func (r *testStorageSuite) TestUploaderWriter(c *C) { fileName := strings.ReplaceAll(test.name, " ", "-") + ".txt" uploader, err := storage.CreateUploader(ctx, fileName) c.Assert(err, IsNil) - writer := newUploaderWriter(uploader, test.chunkSize) + writer := newUploaderWriter(uploader, test.chunkSize, NoCompression) for _, str := range test.content { p := []byte(str) written, err2 := writer.Write(ctx, p) @@ -91,3 +95,84 @@ func (r *testStorageSuite) TestUploaderWriter(c *C) { testFn(&tests[i], c) } } + +func (r *testStorageSuite) TestUploaderCompressWriter(c *C) { + dir := c.MkDir() + + type testcase struct { + name string + content []string + chunkSize int + compressType CompressType + } + testFn := func(test *testcase, c *C) { + c.Log(test.name) + backend, err := ParseBackend("local:///"+dir, nil) + c.Assert(err, IsNil) + ctx := context.Background() + storage, err := Create(ctx, backend, true) + c.Assert(err, IsNil) + fileName := strings.ReplaceAll(test.name, " ", "-") + ".txt.gz" + uploader, err := storage.CreateUploader(ctx, fileName) + c.Assert(err, IsNil) + writer := newUploaderWriter(uploader, test.chunkSize, test.compressType) + for _, str := range test.content { + p := []byte(str) + written, err2 := writer.Write(ctx, p) + c.Assert(err2, IsNil) + c.Assert(written, Equals, len(p)) + } + err = writer.Close(ctx) + c.Assert(err, IsNil) + file, err := os.Open(filepath.Join(dir, fileName)) + c.Assert(err, IsNil) + var r io.Reader + switch test.compressType { + case Gzip: + r, err = gzip.NewReader(file) + default: + c.Fatal("unknown compressType") + } + c.Assert(err, IsNil) + var bf bytes.Buffer + _, err = bf.ReadFrom(r) + c.Assert(err, IsNil) + c.Assert(bf.String(), Equals, strings.Join(test.content, "")) + // Sanity check we didn't write past the chunk size + c.Assert(writer.buf.Cap(), Equals, test.chunkSize) + c.Assert(file.Close(), IsNil) + } + compressTypeArr := []CompressType{Gzip} + tests := []testcase{ + { + name: "long text medium chunks", + content: []string{ + "hello world", + "hello world", + "hello world", + "hello world", + "hello world", + "hello world", + }, + chunkSize: 30, + }, + { + name: "long text large chunks", + content: []string{ + "hello world", + "hello world", + "hello world", + "hello world", + "hello world", + "hello world", + }, + chunkSize: 500, + }, + } + for i := range tests { + for _, compressType := range compressTypeArr { + tests[i].compressType = compressType + testFn(&tests[i], c) + } + } +}