From f95f6ee5ee99c1e10572e4eb2575b189a4fe5197 Mon Sep 17 00:00:00 2001 From: rtreffer Date: Thu, 27 Oct 2022 21:52:09 +0100 Subject: [PATCH] Reduce the zstd encoder state to pool size to one Currently a single zstd encoder with default concurrency is used. Default concurrency causes EncodeAll to create one encoder state per GOMAXPROC, per default per core. On high core machined (32+) and high compression levels this leads to 1GB memory consumption per ~32 cores. A 1GB encoder is pretty expensive compared to the 1MB payloads usually sent to kafka. The new approach limits the encoder to a single core but allows dynamic allocation of additional encoders if no encoder is available. Encoders are returned after use, thus allowing for reuse. A benchmark emulating a 96 core system shows the effectiveness of the change. Previous result: ``` goos: linux goarch: amd64 pkg: github.com/Shopify/sarama cpu: 11th Gen Intel(R) Core(TM) i5-1135G7 @ 2.40GHz BenchmarkZstdMemoryConsumption-8 2 834830801 ns/op 3664055292 B/op 4710 allocs/op PASS ok github.com/Shopify/sarama 2.181s ``` Current result: ``` goos: linux goarch: amd64 pkg: github.com/Shopify/sarama cpu: 11th Gen Intel(R) Core(TM) i5-1135G7 @ 2.40GHz BenchmarkZstdMemoryConsumption-8 5 222605954 ns/op 38960185 B/op 814 allocs/op PASS ok github.com/Shopify/sarama 3.045s ``` ``` BenchmarkZstdMemoryConsumption-8 2 834830801 ns/op 3664055292 B/op 4710 allocs/op BenchmarkZstdMemoryConsumption-8 5 222605954 ns/op 38960185 B/op 814 allocs/op ``` A ~4x improvement on total runtime and a 96x improvemenet on memory usage for the first 2x96 messages. This patch will as a downside increase how often new encoders are created on the fly and the maximum number of encoders might be even higher. --- zstd.go | 52 ++++++++++++++++++++++++++++++++++++++-------------- zstd_test.go | 29 +++++++++++++++++++++++++++++ 2 files changed, 67 insertions(+), 14 deletions(-) create mode 100644 zstd_test.go diff --git a/zstd.go b/zstd.go index 80507e14e..f28db806e 100644 --- a/zstd.go +++ b/zstd.go @@ -6,28 +6,49 @@ import ( "github.com/klauspost/compress/zstd" ) +// zstdMaxBufferedEncoders maximum number of not-in-use zstd encoders +// If the pool of encoders is exhausted then new encoders will be created on the fly +const zstdMaxBufferedEncoders = 1 + type ZstdEncoderParams struct { Level int } type ZstdDecoderParams struct { } -var zstdEncMap, zstdDecMap sync.Map +var zstdDecMap sync.Map + +var zstdAvailableEncoders sync.Map -func getEncoder(params ZstdEncoderParams) *zstd.Encoder { - if ret, ok := zstdEncMap.Load(params); ok { - return ret.(*zstd.Encoder) +func getZstdEncoderChannel(params ZstdEncoderParams) chan *zstd.Encoder { + if c, ok := zstdAvailableEncoders.Load(params); ok { + return c.(chan *zstd.Encoder) } - // It's possible to race and create multiple new writers. - // Only one will survive GC after use. - encoderLevel := zstd.SpeedDefault - if params.Level != CompressionLevelDefault { - encoderLevel = zstd.EncoderLevelFromZstd(params.Level) + c, _ := zstdAvailableEncoders.LoadOrStore(params, make(chan *zstd.Encoder, zstdMaxBufferedEncoders)) + return c.(chan *zstd.Encoder) +} + +func getZstdEncoder(params ZstdEncoderParams) *zstd.Encoder { + select { + case enc := <-getZstdEncoderChannel(params): + return enc + default: + encoderLevel := zstd.SpeedDefault + if params.Level != CompressionLevelDefault { + encoderLevel = zstd.EncoderLevelFromZstd(params.Level) + } + zstdEnc, _ := zstd.NewWriter(nil, zstd.WithZeroFrames(true), + zstd.WithEncoderLevel(encoderLevel), + zstd.WithEncoderConcurrency(1)) + return zstdEnc + } +} + +func releaseEncoder(params ZstdEncoderParams, enc *zstd.Encoder) { + select { + case getZstdEncoderChannel(params) <- enc: + default: } - zstdEnc, _ := zstd.NewWriter(nil, zstd.WithZeroFrames(true), - zstd.WithEncoderLevel(encoderLevel)) - zstdEncMap.Store(params, zstdEnc) - return zstdEnc } func getDecoder(params ZstdDecoderParams) *zstd.Decoder { @@ -46,5 +67,8 @@ func zstdDecompress(params ZstdDecoderParams, dst, src []byte) ([]byte, error) { } func zstdCompress(params ZstdEncoderParams, dst, src []byte) ([]byte, error) { - return getEncoder(params).EncodeAll(src, dst), nil + enc := getZstdEncoder(params) + out := enc.EncodeAll(src, dst) + releaseEncoder(params, enc) + return out, nil } diff --git a/zstd_test.go b/zstd_test.go new file mode 100644 index 000000000..efdc6d83d --- /dev/null +++ b/zstd_test.go @@ -0,0 +1,29 @@ +package sarama + +import ( + "runtime" + "testing" +) + +func BenchmarkZstdMemoryConsumption(b *testing.B) { + params := ZstdEncoderParams{Level: 9} + buf := make([]byte, 1024*1024) + for i := 0; i < len(buf); i++ { + buf[i] = byte((i / 256) + (i * 257)) + } + + cpus := 96 + + gomaxprocsBackup := runtime.GOMAXPROCS(cpus) + b.ReportAllocs() + for i := 0; i < b.N; i++ { + for j := 0; j < 2*cpus; j++ { + _, _ = zstdCompress(params, nil, buf) + } + // drain the buffered encoder + getZstdEncoder(params) + // previously this would be achieved with + // zstdEncMap.Delete(params) + } + runtime.GOMAXPROCS(gomaxprocsBackup) +}