diff --git a/compress.go b/compress.go index 94b716e4b..9247c3553 100644 --- a/compress.go +++ b/compress.go @@ -68,7 +68,7 @@ func compress(cc CompressionCodec, level int, data []byte) ([]byte, error) { } return buf.Bytes(), nil case CompressionZSTD: - return zstdCompressLevel(nil, data, level) + return zstdCompress(nil, data) default: return nil, PacketEncodingError{fmt.Sprintf("unsupported compression codec (%d)", cc)} } diff --git a/go.mod b/go.mod index 41c37116b..4337c009a 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,6 @@ module github.com/Shopify/sarama go 1.13 require ( - github.com/DataDog/zstd v1.4.0 github.com/Shopify/toxiproxy v2.1.4+incompatible github.com/davecgh/go-spew v1.1.1 github.com/eapache/go-resiliency v1.1.0 @@ -14,6 +13,7 @@ require ( github.com/golang/snappy v0.0.1 // indirect github.com/hashicorp/go-uuid v1.0.1 // indirect github.com/jcmturner/gofork v0.0.0-20190328161633-dc7c13fece03 // indirect + github.com/klauspost/compress v1.8.2 github.com/pierrec/lz4 v2.2.6+incompatible github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a github.com/stretchr/testify v1.3.0 diff --git a/go.sum b/go.sum index d3417e5b4..d2f04eedc 100644 --- a/go.sum +++ b/go.sum @@ -23,6 +23,10 @@ github.com/hashicorp/go-uuid v1.0.1 h1:fv1ep09latC32wFoVwnqcnKJGnMSdBanPczbHAYm1 github.com/hashicorp/go-uuid v1.0.1/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro= github.com/jcmturner/gofork v0.0.0-20190328161633-dc7c13fece03 h1:FUwcHNlEqkqLjLBdCp5PRlCFijNjvcYANOZXzCfXwCM= github.com/jcmturner/gofork v0.0.0-20190328161633-dc7c13fece03/go.mod h1:MK8+TM0La+2rjBD4jE12Kj1pCCxK7d2LK/UM3ncEo0o= +github.com/klauspost/compress v1.8.1 h1:oygt2ychZFHOB6M9gUgajzgKrwRgHbGC77NwA4COVgI= +github.com/klauspost/compress v1.8.1/go.mod h1:RyIbtBH6LamlWaDj8nUwkbUhJ87Yi3uG0guNDohfE1A= +github.com/klauspost/compress v1.8.2 h1:Bx0qjetmNjdFXASH02NSAREKpiaDwkO1DRZ3dV2KCcs= +github.com/klauspost/compress v1.8.2/go.mod h1:RyIbtBH6LamlWaDj8nUwkbUhJ87Yi3uG0guNDohfE1A= github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= diff --git a/zstd.go b/zstd.go new file mode 100644 index 000000000..58880e2b4 --- /dev/null +++ b/zstd.go @@ -0,0 +1,27 @@ +package sarama + +import ( + "github.com/klauspost/compress/zstd" + "sync" +) + +var ( + zstdDec *zstd.Decoder + zstdEnc *zstd.Encoder + + zstdEncOnce, zstdDecOnce sync.Once +) + +func zstdDecompress(dst, src []byte) ([]byte, error) { + zstdDecOnce.Do(func() { + zstdDec, _ = zstd.NewReader(nil) + }) + return zstdDec.DecodeAll(src, dst) +} + +func zstdCompress(dst, src []byte) ([]byte, error) { + zstdEncOnce.Do(func() { + zstdEnc, _ = zstd.NewWriter(nil, zstd.WithZeroFrames(true)) + }) + return zstdEnc.EncodeAll(src, dst), nil +} diff --git a/zstd_cgo.go b/zstd_cgo.go deleted file mode 100644 index f5ccb31a1..000000000 --- a/zstd_cgo.go +++ /dev/null @@ -1,13 +0,0 @@ -// +build cgo - -package sarama - -import "github.com/DataDog/zstd" - -func zstdDecompress(dst, src []byte) ([]byte, error) { - return zstd.Decompress(dst, src) -} - -func zstdCompressLevel(dst, src []byte, level int) ([]byte, error) { - return zstd.CompressLevel(dst, src, level) -} diff --git a/zstd_fallback.go b/zstd_fallback.go deleted file mode 100644 index 381a56bdc..000000000 --- a/zstd_fallback.go +++ /dev/null @@ -1,17 +0,0 @@ -// +build !cgo - -package sarama - -import ( - "errors" -) - -var errZstdCgo = errors.New("zstd compression requires building with cgo enabled") - -func zstdDecompress(dst, src []byte) ([]byte, error) { - return nil, errZstdCgo -} - -func zstdCompressLevel(dst, src []byte, level int) ([]byte, error) { - return nil, errZstdCgo -}