diff --git a/message.go b/message.go index 2e4995a0e..44d5cc91b 100644 --- a/message.go +++ b/message.go @@ -7,7 +7,6 @@ import ( "io/ioutil" "time" - "github.com/DataDog/zstd" "github.com/eapache/go-xerial-snappy" "github.com/pierrec/lz4" ) @@ -116,7 +115,7 @@ func (m *Message) encode(pe packetEncoder) error { m.compressedCache = buf.Bytes() payload = m.compressedCache case CompressionZSTD: - c, err := zstd.CompressLevel(nil, m.Value, m.CompressionLevel) + c, err := zstdCompressLevel(nil, m.Value, m.CompressionLevel) if err != nil { return err } @@ -219,7 +218,7 @@ func (m *Message) decode(pd packetDecoder) (err error) { if m.Value == nil { break } - if m.Value, err = zstd.Decompress(nil, m.Value); err != nil { + if m.Value, err = zstdDecompress(nil, m.Value); err != nil { return err } if err := m.decodeSet(); err != nil { diff --git a/record_batch.go b/record_batch.go index b145b2dce..5444557f1 100644 --- a/record_batch.go +++ b/record_batch.go @@ -7,7 +7,6 @@ import ( "io/ioutil" "time" - "github.com/DataDog/zstd" "github.com/eapache/go-xerial-snappy" "github.com/pierrec/lz4" ) @@ -195,7 +194,7 @@ func (b *RecordBatch) decode(pd packetDecoder) (err error) { return err } case CompressionZSTD: - if recBuffer, err = zstd.Decompress(nil, recBuffer); err != nil { + if recBuffer, err = zstdDecompress(nil, recBuffer); err != nil { return err } default: @@ -254,7 +253,7 @@ func (b *RecordBatch) encodeRecords(pe packetEncoder) error { } b.compressedRecords = buf.Bytes() case CompressionZSTD: - c, err := zstd.CompressLevel(nil, raw, b.CompressionLevel) + c, err := zstdCompressLevel(nil, raw, b.CompressionLevel) if err != nil { return err } diff --git a/zstd_cgo.go b/zstd_cgo.go new file mode 100644 index 000000000..f5ccb31a1 --- /dev/null +++ b/zstd_cgo.go @@ -0,0 +1,13 @@ +// +build cgo + +package sarama + +import "github.com/DataDog/zstd" + +func zstdDecompress(dst, src []byte) ([]byte, error) { + return zstd.Decompress(dst, src) +} + +func zstdCompressLevel(dst, src []byte, level int) ([]byte, error) { + return zstd.CompressLevel(dst, src, level) +} diff --git a/zstd_fallback.go b/zstd_fallback.go new file mode 100644 index 000000000..381a56bdc --- /dev/null +++ b/zstd_fallback.go @@ -0,0 +1,17 @@ +// +build !cgo + +package sarama + +import ( + "errors" +) + +var errZstdCgo = errors.New("zstd compression requires building with cgo enabled") + +func zstdDecompress(dst, src []byte) ([]byte, error) { + return nil, errZstdCgo +} + +func zstdCompressLevel(dst, src []byte, level int) ([]byte, error) { + return nil, errZstdCgo +}