From c65bdddf6cab993859d2d4ddc36fa958324760ad Mon Sep 17 00:00:00 2001 From: Klaus Post Date: Tue, 3 Sep 2019 14:47:59 -0700 Subject: [PATCH 1/3] Remove cgo zstd package Switch from cgo package to pure Go implementation. This will (when fixed) work even if no cgo is present. https://github.com/klauspost/compress/tree/master/zstd#zstd Compression level is removed. It could be made so the first request determines the compression level. But since there is currently only two levels (fast and default) I am not sure it is worth it. This does NOT fix #1252 - only updates the code used for compression/decompression. --- compress.go | 2 +- go.mod | 2 +- zstd.go | 28 ++++++++++++++++++++++++++++ zstd_cgo.go | 13 ------------- zstd_fallback.go | 17 ----------------- 5 files changed, 30 insertions(+), 32 deletions(-) create mode 100644 zstd.go delete mode 100644 zstd_cgo.go delete mode 100644 zstd_fallback.go diff --git a/compress.go b/compress.go index 94b716e4b..9247c3553 100644 --- a/compress.go +++ b/compress.go @@ -68,7 +68,7 @@ func compress(cc CompressionCodec, level int, data []byte) ([]byte, error) { } return buf.Bytes(), nil case CompressionZSTD: - return zstdCompressLevel(nil, data, level) + return zstdCompress(nil, data) default: return nil, PacketEncodingError{fmt.Sprintf("unsupported compression codec (%d)", cc)} } diff --git a/go.mod b/go.mod index d072c8e78..8ceacd9d6 100644 --- a/go.mod +++ b/go.mod @@ -1,7 +1,6 @@ module github.com/Shopify/sarama require ( - github.com/DataDog/zstd v1.4.0 github.com/Shopify/toxiproxy v2.1.4+incompatible github.com/davecgh/go-spew v1.1.1 github.com/eapache/go-resiliency v1.1.0 @@ -12,6 +11,7 @@ require ( github.com/golang/snappy v0.0.1 // indirect github.com/hashicorp/go-uuid v1.0.1 // indirect github.com/jcmturner/gofork v0.0.0-20190328161633-dc7c13fece03 // indirect + github.com/klauspost/compress v1.8.1 github.com/pierrec/lz4 v2.2.6+incompatible github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a github.com/stretchr/testify v1.3.0 diff --git a/zstd.go b/zstd.go new file mode 100644 index 000000000..bdb21b33c --- /dev/null +++ b/zstd.go @@ -0,0 +1,28 @@ +package sarama + +import ( + "github.com/klauspost/compress/zstd" + "sync" +) + +var ( + zstdDec *zstd.Decoder + zstdEnc *zstd.Encoder + + zstdEncOnce, zstdDecOnce sync.Once +) + + +func zstdDecompress(dst, src []byte) ([]byte, error) { + zstdDecOnce.Do(func() { + zstdDec, _ = zstd.NewReader(nil) + }) + return zstdDec.DecodeAll(src, dst) +} + +func zstdCompress(dst, src []byte) ([]byte, error) { + zstdEncOnce.Do(func() { + zstdEnc, _ = zstd.NewWriter(nil) + }) + return zstdEnc.EncodeAll(src, dst), nil +} diff --git a/zstd_cgo.go b/zstd_cgo.go deleted file mode 100644 index f5ccb31a1..000000000 --- a/zstd_cgo.go +++ /dev/null @@ -1,13 +0,0 @@ -// +build cgo - -package sarama - -import "github.com/DataDog/zstd" - -func zstdDecompress(dst, src []byte) ([]byte, error) { - return zstd.Decompress(dst, src) -} - -func zstdCompressLevel(dst, src []byte, level int) ([]byte, error) { - return zstd.CompressLevel(dst, src, level) -} diff --git a/zstd_fallback.go b/zstd_fallback.go deleted file mode 100644 index 381a56bdc..000000000 --- a/zstd_fallback.go +++ /dev/null @@ -1,17 +0,0 @@ -// +build !cgo - -package sarama - -import ( - "errors" -) - -var errZstdCgo = errors.New("zstd compression requires building with cgo enabled") - -func zstdDecompress(dst, src []byte) ([]byte, error) { - return nil, errZstdCgo -} - -func zstdCompressLevel(dst, src []byte, level int) ([]byte, error) { - return nil, errZstdCgo -} From 99ec304387bd35f6c3699775462ce66ec2a46199 Mon Sep 17 00:00:00 2001 From: Klaus Post Date: Thu, 5 Sep 2019 11:03:57 -0700 Subject: [PATCH 2/3] Use zero frames for encoding. --- go.mod | 2 +- go.sum | 4 ++++ zstd.go | 2 +- 3 files changed, 6 insertions(+), 2 deletions(-) diff --git a/go.mod b/go.mod index 8ceacd9d6..5192cd110 100644 --- a/go.mod +++ b/go.mod @@ -11,7 +11,7 @@ require ( github.com/golang/snappy v0.0.1 // indirect github.com/hashicorp/go-uuid v1.0.1 // indirect github.com/jcmturner/gofork v0.0.0-20190328161633-dc7c13fece03 // indirect - github.com/klauspost/compress v1.8.1 + github.com/klauspost/compress v1.8.2 github.com/pierrec/lz4 v2.2.6+incompatible github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a github.com/stretchr/testify v1.3.0 diff --git a/go.sum b/go.sum index d3417e5b4..d2f04eedc 100644 --- a/go.sum +++ b/go.sum @@ -23,6 +23,10 @@ github.com/hashicorp/go-uuid v1.0.1 h1:fv1ep09latC32wFoVwnqcnKJGnMSdBanPczbHAYm1 github.com/hashicorp/go-uuid v1.0.1/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro= github.com/jcmturner/gofork v0.0.0-20190328161633-dc7c13fece03 h1:FUwcHNlEqkqLjLBdCp5PRlCFijNjvcYANOZXzCfXwCM= github.com/jcmturner/gofork v0.0.0-20190328161633-dc7c13fece03/go.mod h1:MK8+TM0La+2rjBD4jE12Kj1pCCxK7d2LK/UM3ncEo0o= +github.com/klauspost/compress v1.8.1 h1:oygt2ychZFHOB6M9gUgajzgKrwRgHbGC77NwA4COVgI= +github.com/klauspost/compress v1.8.1/go.mod h1:RyIbtBH6LamlWaDj8nUwkbUhJ87Yi3uG0guNDohfE1A= +github.com/klauspost/compress v1.8.2 h1:Bx0qjetmNjdFXASH02NSAREKpiaDwkO1DRZ3dV2KCcs= +github.com/klauspost/compress v1.8.2/go.mod h1:RyIbtBH6LamlWaDj8nUwkbUhJ87Yi3uG0guNDohfE1A= github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= diff --git a/zstd.go b/zstd.go index bdb21b33c..6be3b34bc 100644 --- a/zstd.go +++ b/zstd.go @@ -22,7 +22,7 @@ func zstdDecompress(dst, src []byte) ([]byte, error) { func zstdCompress(dst, src []byte) ([]byte, error) { zstdEncOnce.Do(func() { - zstdEnc, _ = zstd.NewWriter(nil) + zstdEnc, _ = zstd.NewWriter(nil, zstd.WithZeroFrames(true)) }) return zstdEnc.EncodeAll(src, dst), nil } From 2fbc232314df58690b8bee3c306ff3f25900400f Mon Sep 17 00:00:00 2001 From: Klaus Post Date: Fri, 6 Sep 2019 11:32:08 -0700 Subject: [PATCH 3/3] Remove double newline --- zstd.go | 1 - 1 file changed, 1 deletion(-) diff --git a/zstd.go b/zstd.go index 6be3b34bc..58880e2b4 100644 --- a/zstd.go +++ b/zstd.go @@ -12,7 +12,6 @@ var ( zstdEncOnce, zstdDecOnce sync.Once ) - func zstdDecompress(dst, src []byte) ([]byte, error) { zstdDecOnce.Do(func() { zstdDec, _ = zstd.NewReader(nil)