Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow applications to configure the compression level #290

Merged
merged 1 commit into from
Jun 19, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pulsar/consumer_partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -877,7 +877,7 @@ func (pc *partitionConsumer) initializeCompressionProvider(
case pb.CompressionType_LZ4:
return compression.NewLz4Provider(), nil
case pb.CompressionType_ZSTD:
return compression.NewZStdProvider(), nil
return compression.NewZStdProvider(compression.Default), nil
}

return nil, fmt.Errorf("unsupported compression type: %v", compressionType)
Expand Down
9 changes: 5 additions & 4 deletions pulsar/internal/batch_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ type BatchBuilder struct {

// NewBatchBuilder init batch builder and return BatchBuilder pointer. Build a new batch message container.
func NewBatchBuilder(maxMessages uint, maxBatchSize uint, producerName string, producerID uint64,
compressionType pb.CompressionType) (*BatchBuilder, error) {
compressionType pb.CompressionType, level compression.Level) (*BatchBuilder, error) {
if maxMessages == 0 {
maxMessages = DefaultMaxMessagesPerBatch
}
Expand All @@ -84,7 +84,7 @@ func NewBatchBuilder(maxMessages uint, maxBatchSize uint, producerName string, p
ProducerName: &producerName,
},
callbacks: []interface{}{},
compressionProvider: getCompressionProvider(compressionType),
compressionProvider: getCompressionProvider(compressionType, level),
}

if compressionType != pb.CompressionType_NONE {
Expand Down Expand Up @@ -176,7 +176,8 @@ func (bb *BatchBuilder) Close() error {
return bb.compressionProvider.Close()
}

func getCompressionProvider(compressionType pb.CompressionType) compression.Provider {
func getCompressionProvider(compressionType pb.CompressionType,
level compression.Level) compression.Provider {
switch compressionType {
case pb.CompressionType_NONE:
return compression.NewNoopProvider()
Expand All @@ -185,7 +186,7 @@ func getCompressionProvider(compressionType pb.CompressionType) compression.Prov
case pb.CompressionType_ZLIB:
return compression.NewZLibProvider()
case pb.CompressionType_ZSTD:
return compression.NewZStdProvider()
return compression.NewZStdProvider(level)
default:
log.Panic("unsupported compression type")
return nil
Expand Down
8 changes: 8 additions & 0 deletions pulsar/internal/compression/compression.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,14 @@ package compression

import "io"

type Level int

const (
Default Level = iota
Faster
Better
)

// Provider is a interface of compression providers
type Provider interface {
// Compress a []byte, the param is a []byte with the uncompressed content.
Expand Down
12 changes: 6 additions & 6 deletions pulsar/internal/compression/compression_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,12 +64,12 @@ func testDecompression(b *testing.B, provider Provider) {
var benchmarkProviders = []testProvider{
{"zlib", NewZLibProvider(), nil},
{"lz4", NewLz4Provider(), nil},
{"zstd-pure-go-fastest", newPureGoZStdProvider(1), nil},
{"zstd-pure-go-default", newPureGoZStdProvider(2), nil},
{"zstd-pure-go-best", newPureGoZStdProvider(3), nil},
{"zstd-cgo-level-fastest", newCGoZStdProvider(1), nil},
{"zstd-cgo-level-default", newCGoZStdProvider(3), nil},
{"zstd-cgo-level-best", newCGoZStdProvider(9), nil},
{"zstd-pure-go-fastest", newPureGoZStdProvider(Faster), nil},
{"zstd-pure-go-default", newPureGoZStdProvider(Default), nil},
{"zstd-pure-go-best", newPureGoZStdProvider(Better), nil},
{"zstd-cgo-level-fastest", newCGoZStdProvider(Faster), nil},
{"zstd-cgo-level-default", newCGoZStdProvider(Default), nil},
{"zstd-cgo-level-best", newCGoZStdProvider(Better), nil},
}

func BenchmarkCompression(b *testing.B) {
Expand Down
2 changes: 1 addition & 1 deletion pulsar/internal/compression/compression_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ type testProvider struct {
var providers = []testProvider{
{"zlib", NewZLibProvider(), []byte{0x78, 0x9c, 0xca, 0x48, 0xcd, 0xc9, 0xc9, 0x07, 0x00, 0x00, 0x00, 0xff, 0xff}},
{"lz4", NewLz4Provider(), []byte{0x50, 0x68, 0x65, 0x6c, 0x6c, 0x6f}},
{"zstd", NewZStdProvider(),
{"zstd", NewZStdProvider(Default),
[]byte{0x28, 0xb5, 0x2f, 0xfd, 0x20, 0x05, 0x29, 0x00, 0x00, 0x68, 0x65, 0x6c, 0x6c, 0x6f}},
}

Expand Down
10 changes: 4 additions & 6 deletions pulsar/internal/compression/zstd.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,15 @@ package compression

import (
"fmt"

"github.com/klauspost/compress/zstd"
)

func NewZStdProvider() Provider {
return newPureGoZStdProvider(zstd.SpeedDefault)
func NewZStdProvider(level Level) Provider {
return newPureGoZStdProvider(level)
}

func newCGoZStdProvider(compressionLevel int) Provider {
func newCGoZStdProvider(level Level) Provider {
// This is kept to avoid compile errors in benchmark code when cgo is disabled.
// The warning is only shown when running the benchmark with CGO disabled.
fmt.Println("WARNING: CGO is disabled, using pure Go implementation of ZStd. Use CGO_ENABLED=1 when running benchmark.")
return newPureGoZStdProvider(zstd.SpeedDefault)
return newPureGoZStdProvider(level)
}
31 changes: 21 additions & 10 deletions pulsar/internal/compression/zstd_cgo.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,23 +29,34 @@ import (
)

type zstdCGoProvider struct {
ctx zstd.Ctx
compressionLevel int
ctx zstd.Ctx
level Level
zstdLevel int
}

func newCGoZStdProvider(compressionLevel int) Provider {
return &zstdCGoProvider{
compressionLevel: compressionLevel,
ctx: zstd.NewCtx(),
func newCGoZStdProvider(level Level) Provider {
z := &zstdCGoProvider{
ctx: zstd.NewCtx(),
}

switch level {
case Default:
z.zstdLevel = zstd.DefaultCompression
case Faster:
z.zstdLevel = zstd.BestSpeed
case Better:
z.zstdLevel = 9
}

return z
}

func NewZStdProvider() Provider {
return newCGoZStdProvider(zstd.DefaultCompression)
func NewZStdProvider(level Level) Provider {
return newCGoZStdProvider(level)
}

func (z *zstdCGoProvider) Compress(data []byte) []byte {
out, err := z.ctx.CompressLevel(nil, data, z.compressionLevel)
out, err := z.ctx.CompressLevel(nil, data, z.zstdLevel)
if err != nil {
log.WithError(err).Fatal("Failed to compress")
}
Expand All @@ -62,5 +73,5 @@ func (z *zstdCGoProvider) Close() error {
}

func (z *zstdCGoProvider) Clone() Provider {
return newCGoZStdProvider(z.compressionLevel)
return newCGoZStdProvider(z.level)
}
17 changes: 12 additions & 5 deletions pulsar/internal/compression/zstd_go.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,23 @@ import (
)

type zstdProvider struct {
compressionLevel zstd.EncoderLevel
compressionLevel Level
encoder *zstd.Encoder
decoder *zstd.Decoder
}

func newPureGoZStdProvider(compressionLevel zstd.EncoderLevel) Provider {
p := &zstdProvider{
compressionLevel: compressionLevel,
func newPureGoZStdProvider(level Level) Provider {
var zstdLevel zstd.EncoderLevel
p := &zstdProvider{}
switch level {
case Default:
zstdLevel = zstd.SpeedDefault
case Faster:
zstdLevel = zstd.SpeedFastest
case Better:
zstdLevel = zstd.SpeedBetterCompression
}
p.encoder, _ = zstd.NewWriter(nil, zstd.WithEncoderLevel(compressionLevel))
p.encoder, _ = zstd.NewWriter(nil, zstd.WithEncoderLevel(zstdLevel))
p.decoder, _ = zstd.NewReader(nil)
return p
}
Expand Down
19 changes: 19 additions & 0 deletions pulsar/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,19 @@ const (
ZSTD
)

type CompressionLevel int

const (
// Default compression level
Default CompressionLevel = iota

// Faster compression, with lower compression ration
Faster

// Higher compression rate, but slower
Better
)

// TopicMetadata is a interface of topic metadata
type TopicMetadata interface {
// NumPartitions get the number of partitions for the specific topic
Expand Down Expand Up @@ -87,6 +100,12 @@ type ProducerOptions struct {
// release in order to be able to receive messages compressed with ZSTD.
CompressionType

// Define the desired compression level. Options:
// - Default
// - Faster
// - Better
CompressionLevel

// MessageRouter set a custom message routing policy by passing an implementation of MessageRouter
// The router is a function that given a particular message and the topic metadata, returns the
// partition index where the message should be routed to
Expand Down
5 changes: 4 additions & 1 deletion pulsar/producer_partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import (
"sync/atomic"
"time"

"github.com/apache/pulsar-client-go/pulsar/internal/compression"

"github.com/golang/protobuf/proto"

log "github.com/sirupsen/logrus"
Expand Down Expand Up @@ -152,7 +154,8 @@ func (p *partitionProducer) grabCnx() error {
p.producerName = res.Response.ProducerSuccess.GetProducerName()
if p.batchBuilder == nil {
p.batchBuilder, err = internal.NewBatchBuilder(p.options.BatchingMaxMessages, p.options.BatchingMaxSize,
p.producerName, p.producerID, pb.CompressionType(p.options.CompressionType))
p.producerName, p.producerID, pb.CompressionType(p.options.CompressionType),
compression.Level(p.options.CompressionLevel))
if err != nil {
return err
}
Expand Down