diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go index a74d88c92a..b7bf261ebb 100644 --- a/pulsar/consumer_partition.go +++ b/pulsar/consumer_partition.go @@ -877,7 +877,7 @@ func (pc *partitionConsumer) initializeCompressionProvider( case pb.CompressionType_LZ4: return compression.NewLz4Provider(), nil case pb.CompressionType_ZSTD: - return compression.NewZStdProvider(), nil + return compression.NewZStdProvider(compression.Default), nil } return nil, fmt.Errorf("unsupported compression type: %v", compressionType) diff --git a/pulsar/internal/batch_builder.go b/pulsar/internal/batch_builder.go index 78be00ef57..61c5ced681 100644 --- a/pulsar/internal/batch_builder.go +++ b/pulsar/internal/batch_builder.go @@ -62,7 +62,7 @@ type BatchBuilder struct { // NewBatchBuilder init batch builder and return BatchBuilder pointer. Build a new batch message container. func NewBatchBuilder(maxMessages uint, maxBatchSize uint, producerName string, producerID uint64, - compressionType pb.CompressionType) (*BatchBuilder, error) { + compressionType pb.CompressionType, level compression.Level) (*BatchBuilder, error) { if maxMessages == 0 { maxMessages = DefaultMaxMessagesPerBatch } @@ -84,7 +84,7 @@ func NewBatchBuilder(maxMessages uint, maxBatchSize uint, producerName string, p ProducerName: &producerName, }, callbacks: []interface{}{}, - compressionProvider: getCompressionProvider(compressionType), + compressionProvider: getCompressionProvider(compressionType, level), } if compressionType != pb.CompressionType_NONE { @@ -176,7 +176,8 @@ func (bb *BatchBuilder) Close() error { return bb.compressionProvider.Close() } -func getCompressionProvider(compressionType pb.CompressionType) compression.Provider { +func getCompressionProvider(compressionType pb.CompressionType, + level compression.Level) compression.Provider { switch compressionType { case pb.CompressionType_NONE: return compression.NewNoopProvider() @@ -185,7 +186,7 @@ func getCompressionProvider(compressionType pb.CompressionType) compression.Prov case pb.CompressionType_ZLIB: return compression.NewZLibProvider() case pb.CompressionType_ZSTD: - return compression.NewZStdProvider() + return compression.NewZStdProvider(level) default: log.Panic("unsupported compression type") return nil diff --git a/pulsar/internal/compression/compression.go b/pulsar/internal/compression/compression.go index e45d18ff5a..90adeeb8d3 100644 --- a/pulsar/internal/compression/compression.go +++ b/pulsar/internal/compression/compression.go @@ -19,6 +19,14 @@ package compression import "io" +type Level int + +const ( + Default Level = iota + Faster + Better +) + // Provider is a interface of compression providers type Provider interface { // Compress a []byte, the param is a []byte with the uncompressed content. diff --git a/pulsar/internal/compression/compression_bench_test.go b/pulsar/internal/compression/compression_bench_test.go index fea2eb81d5..1f4032178f 100644 --- a/pulsar/internal/compression/compression_bench_test.go +++ b/pulsar/internal/compression/compression_bench_test.go @@ -64,12 +64,12 @@ func testDecompression(b *testing.B, provider Provider) { var benchmarkProviders = []testProvider{ {"zlib", NewZLibProvider(), nil}, {"lz4", NewLz4Provider(), nil}, - {"zstd-pure-go-fastest", newPureGoZStdProvider(1), nil}, - {"zstd-pure-go-default", newPureGoZStdProvider(2), nil}, - {"zstd-pure-go-best", newPureGoZStdProvider(3), nil}, - {"zstd-cgo-level-fastest", newCGoZStdProvider(1), nil}, - {"zstd-cgo-level-default", newCGoZStdProvider(3), nil}, - {"zstd-cgo-level-best", newCGoZStdProvider(9), nil}, + {"zstd-pure-go-fastest", newPureGoZStdProvider(Faster), nil}, + {"zstd-pure-go-default", newPureGoZStdProvider(Default), nil}, + {"zstd-pure-go-best", newPureGoZStdProvider(Better), nil}, + {"zstd-cgo-level-fastest", newCGoZStdProvider(Faster), nil}, + {"zstd-cgo-level-default", newCGoZStdProvider(Default), nil}, + {"zstd-cgo-level-best", newCGoZStdProvider(Better), nil}, } func BenchmarkCompression(b *testing.B) { diff --git a/pulsar/internal/compression/compression_test.go b/pulsar/internal/compression/compression_test.go index 7df821ffaa..cfb00b230a 100644 --- a/pulsar/internal/compression/compression_test.go +++ b/pulsar/internal/compression/compression_test.go @@ -34,7 +34,7 @@ type testProvider struct { var providers = []testProvider{ {"zlib", NewZLibProvider(), []byte{0x78, 0x9c, 0xca, 0x48, 0xcd, 0xc9, 0xc9, 0x07, 0x00, 0x00, 0x00, 0xff, 0xff}}, {"lz4", NewLz4Provider(), []byte{0x50, 0x68, 0x65, 0x6c, 0x6c, 0x6f}}, - {"zstd", NewZStdProvider(), + {"zstd", NewZStdProvider(Default), []byte{0x28, 0xb5, 0x2f, 0xfd, 0x20, 0x05, 0x29, 0x00, 0x00, 0x68, 0x65, 0x6c, 0x6c, 0x6f}}, } diff --git a/pulsar/internal/compression/zstd.go b/pulsar/internal/compression/zstd.go index a9124034ed..694f9e79c5 100644 --- a/pulsar/internal/compression/zstd.go +++ b/pulsar/internal/compression/zstd.go @@ -21,17 +21,15 @@ package compression import ( "fmt" - - "github.com/klauspost/compress/zstd" ) -func NewZStdProvider() Provider { - return newPureGoZStdProvider(zstd.SpeedDefault) +func NewZStdProvider(level Level) Provider { + return newPureGoZStdProvider(level) } -func newCGoZStdProvider(compressionLevel int) Provider { +func newCGoZStdProvider(level Level) Provider { // This is kept to avoid compile errors in benchmark code when cgo is disabled. // The warning is only shown when running the benchmark with CGO disabled. fmt.Println("WARNING: CGO is disabled, using pure Go implementation of ZStd. Use CGO_ENABLED=1 when running benchmark.") - return newPureGoZStdProvider(zstd.SpeedDefault) + return newPureGoZStdProvider(level) } diff --git a/pulsar/internal/compression/zstd_cgo.go b/pulsar/internal/compression/zstd_cgo.go index f6f6c45ac4..9c43cc4ad7 100644 --- a/pulsar/internal/compression/zstd_cgo.go +++ b/pulsar/internal/compression/zstd_cgo.go @@ -29,23 +29,34 @@ import ( ) type zstdCGoProvider struct { - ctx zstd.Ctx - compressionLevel int + ctx zstd.Ctx + level Level + zstdLevel int } -func newCGoZStdProvider(compressionLevel int) Provider { - return &zstdCGoProvider{ - compressionLevel: compressionLevel, - ctx: zstd.NewCtx(), +func newCGoZStdProvider(level Level) Provider { + z := &zstdCGoProvider{ + ctx: zstd.NewCtx(), } + + switch level { + case Default: + z.zstdLevel = zstd.DefaultCompression + case Faster: + z.zstdLevel = zstd.BestSpeed + case Better: + z.zstdLevel = 9 + } + + return z } -func NewZStdProvider() Provider { - return newCGoZStdProvider(zstd.DefaultCompression) +func NewZStdProvider(level Level) Provider { + return newCGoZStdProvider(level) } func (z *zstdCGoProvider) Compress(data []byte) []byte { - out, err := z.ctx.CompressLevel(nil, data, z.compressionLevel) + out, err := z.ctx.CompressLevel(nil, data, z.zstdLevel) if err != nil { log.WithError(err).Fatal("Failed to compress") } @@ -62,5 +73,5 @@ func (z *zstdCGoProvider) Close() error { } func (z *zstdCGoProvider) Clone() Provider { - return newCGoZStdProvider(z.compressionLevel) + return newCGoZStdProvider(z.level) } diff --git a/pulsar/internal/compression/zstd_go.go b/pulsar/internal/compression/zstd_go.go index f9839c57bc..06f203e4d3 100644 --- a/pulsar/internal/compression/zstd_go.go +++ b/pulsar/internal/compression/zstd_go.go @@ -23,16 +23,23 @@ import ( ) type zstdProvider struct { - compressionLevel zstd.EncoderLevel + compressionLevel Level encoder *zstd.Encoder decoder *zstd.Decoder } -func newPureGoZStdProvider(compressionLevel zstd.EncoderLevel) Provider { - p := &zstdProvider{ - compressionLevel: compressionLevel, +func newPureGoZStdProvider(level Level) Provider { + var zstdLevel zstd.EncoderLevel + p := &zstdProvider{} + switch level { + case Default: + zstdLevel = zstd.SpeedDefault + case Faster: + zstdLevel = zstd.SpeedFastest + case Better: + zstdLevel = zstd.SpeedBetterCompression } - p.encoder, _ = zstd.NewWriter(nil, zstd.WithEncoderLevel(compressionLevel)) + p.encoder, _ = zstd.NewWriter(nil, zstd.WithEncoderLevel(zstdLevel)) p.decoder, _ = zstd.NewReader(nil) return p } diff --git a/pulsar/producer.go b/pulsar/producer.go index 2d630f1110..7d44a56e7b 100644 --- a/pulsar/producer.go +++ b/pulsar/producer.go @@ -40,6 +40,19 @@ const ( ZSTD ) +type CompressionLevel int + +const ( + // Default compression level + Default CompressionLevel = iota + + // Faster compression, with lower compression ration + Faster + + // Higher compression rate, but slower + Better +) + // TopicMetadata is a interface of topic metadata type TopicMetadata interface { // NumPartitions get the number of partitions for the specific topic @@ -87,6 +100,12 @@ type ProducerOptions struct { // release in order to be able to receive messages compressed with ZSTD. CompressionType + // Define the desired compression level. Options: + // - Default + // - Faster + // - Better + CompressionLevel + // MessageRouter set a custom message routing policy by passing an implementation of MessageRouter // The router is a function that given a particular message and the topic metadata, returns the // partition index where the message should be routed to diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go index 284e505236..32b0da0009 100644 --- a/pulsar/producer_partition.go +++ b/pulsar/producer_partition.go @@ -24,6 +24,8 @@ import ( "sync/atomic" "time" + "github.com/apache/pulsar-client-go/pulsar/internal/compression" + "github.com/golang/protobuf/proto" log "github.com/sirupsen/logrus" @@ -152,7 +154,8 @@ func (p *partitionProducer) grabCnx() error { p.producerName = res.Response.ProducerSuccess.GetProducerName() if p.batchBuilder == nil { p.batchBuilder, err = internal.NewBatchBuilder(p.options.BatchingMaxMessages, p.options.BatchingMaxSize, - p.producerName, p.producerID, pb.CompressionType(p.options.CompressionType)) + p.producerName, p.producerID, pb.CompressionType(p.options.CompressionType), + compression.Level(p.options.CompressionLevel)) if err != nil { return err }