Skip to content

Commit

Permalink
Implement compression for incoming/outgoing records and for sstables (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
purplefox authored Jan 12, 2025
1 parent 8703a18 commit 0b84be5
Show file tree
Hide file tree
Showing 40 changed files with 916 additions and 300 deletions.
4 changes: 1 addition & 3 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -313,9 +313,7 @@ func (o *fetchCacheGetter) get(tableID sst.SSTableID) (*sst.SSTable, error) {
if len(bytes) == 0 {
return nil, errors.Errorf("cannot find sstable %s", tableID)
}
table := &sst.SSTable{}
table.Deserialize(bytes, 0)
return table, nil
return sst.GetSSTableFromBytes(bytes)
}

type compactionWorkerControllerClient struct {
Expand Down
14 changes: 14 additions & 0 deletions agent/conf.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package agent

import (
"fmt"
"github.com/spirit-labs/tektite/compress"

"github.com/pkg/errors"
"github.com/spirit-labs/tektite/cluster"
Expand Down Expand Up @@ -41,6 +42,8 @@ type CommandConf struct {
AutoCreateNumPartitions int `help:"the number of partitions for auto-created topics" default:"1"`
DefaultMaxMessageSizeBytes int `help:"the maximum size of a message batch that can be sent to a topic - can be overridden at topic level" default:"1048576"`
MetadataWriteIntervalMs int `help:"interval between writing database metadata to permanent storage, in milliseconds" default:"100"`
StorageCompressionType string `help:"determines how data is compressed before writing to object storage. one of 'lz4', 'zstd' or 'none'" default:"lz4"`
FetchCompressionType string `help:"determines how data is compressed before returning a fetched batch to a consumer. one of 'gzip', 'snappy', 'lz4', 'zstd' or 'none'" default:"lz4"`
}

var authTypeMapping = map[string]kafkaserver.AuthenticationType{
Expand Down Expand Up @@ -151,6 +154,17 @@ func CreateConfFromCommandConf(commandConf CommandConf) (Conf, error) {
commandConf.MetadataWriteIntervalMs)
}
cfg.ControllerConf.LsmStateWriteInterval = time.Duration(commandConf.MetadataWriteIntervalMs) * time.Millisecond
storageCompressionType := compress.FromString(commandConf.StorageCompressionType)
if storageCompressionType != compress.CompressionTypeLz4 && storageCompressionType != compress.CompressionTypeZstd {
return Conf{}, errors.Errorf("invalid compression-type: %s", commandConf.StorageCompressionType)
}
cfg.PusherConf.TableCompressionType = storageCompressionType
fetchCompressionType := compress.FromString(commandConf.FetchCompressionType)
if fetchCompressionType == compress.CompressionTypeUnknown {
return Conf{}, errors.Errorf("invalid compression-type: %s", commandConf.StorageCompressionType)
}
cfg.PusherConf.TableCompressionType = storageCompressionType
cfg.FetcherConf.FetchCompressionType = fetchCompressionType
return cfg, nil
}

Expand Down
48 changes: 45 additions & 3 deletions agent/fetch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,15 @@ import (
"fmt"
"github.com/spirit-labs/tektite/apiclient"
"github.com/spirit-labs/tektite/common"
"github.com/spirit-labs/tektite/compress"
"github.com/spirit-labs/tektite/kafkaencoding"
"github.com/spirit-labs/tektite/kafkaprotocol"
"github.com/spirit-labs/tektite/objstore/dev"
"github.com/spirit-labs/tektite/testutils"
"github.com/spirit-labs/tektite/topicmeta"
"github.com/spirit-labs/tektite/transport"
"github.com/stretchr/testify/require"
"hash/crc32"
"math"
"math/rand"
"sync"
Expand Down Expand Up @@ -92,8 +94,17 @@ func testFetchSimple(t *testing.T, apiVersion int16) {
require.Equal(t, 1, len(topicResp.Partitions))
partResp := topicResp.Partitions[0]
require.Equal(t, kafkaprotocol.ErrorCodeNone, int(partResp.ErrorCode))
require.Equal(t, 100, int(partResp.HighWatermark))
require.Equal(t, batch, partResp.Records)
require.Equal(t, 1, int(partResp.HighWatermark))
decompressed, err := maybeDecompressBatches([][]byte{partResp.Records})
require.NoError(t, err)

bNoCrc1 := batch
bNoCrc2 := decompressed[0]

kafkaencoding.SetCrc(bNoCrc1, 0)
kafkaencoding.SetCrc(bNoCrc2, 0)

require.Equal(t, bNoCrc1, bNoCrc2)
}

func TestFetchSingleSenderAndFetcherShortWriteTimeout(t *testing.T) {
Expand Down Expand Up @@ -366,6 +377,10 @@ func (f *fetchRunner) fetch() {
}
if len(partResp.Records) > 0 {
batches := extractBatches(partResp.Records)
batches, err := maybeDecompressBatches(batches)
if err != nil {
panic(fmt.Sprintf("failed to decompress batches: %v", err))
}
f.batches = append(f.batches, batches...)
for _, batch := range batches {
baseOffset := kafkaencoding.BaseOffset(batch)
Expand All @@ -381,6 +396,33 @@ func (f *fetchRunner) fetch() {
}
}

func maybeDecompressBatches(batches [][]byte) ([][]byte, error) {
var newBatches [][]byte
for _, batch := range batches {
compressType := compress.CompressionType(kafkaencoding.CompressionType(batch))
if compressType != compress.CompressionTypeNone {
decompressedPart, err := compress.Decompress(compressType, batch[61:])
if err != nil {
return nil, err
}
decompressed := append(common.ByteSliceCopy(batch[:61]), decompressedPart...)
// Now set new batch length
newBatchLen := len(decompressed) - 12
binary.BigEndian.PutUint32(decompressed[8:], uint32(newBatchLen))
kafkaencoding.SetCompressionType(decompressed, 0)
// And recalc the crc
crc := crc32.Checksum(decompressed[21:], crcTable)
kafkaencoding.SetCrc(decompressed, crc)
newBatches = append(newBatches, decompressed)
} else {
newBatches = append(newBatches, batch)
}
}
return newBatches, nil
}

var crcTable = crc32.MakeTable(crc32.Castagnoli)

func extractBatches(buff []byte) [][]byte {
// Multiple record batches are concatenated together
var batches [][]byte
Expand Down Expand Up @@ -409,7 +451,7 @@ func (f *fetchRunner) sendFetch(req *kafkaprotocol.FetchRequest) *kafkaprotocol.
}

func produceBatch(t *testing.T, topicName string, partitionID int, address string) []byte {
batch := testutils.CreateKafkaRecordBatchWithIncrementingKVs(0, 100)
batch := testutils.CreateKafkaRecordBatchWithIncrementingKVs(0, 1)
req := kafkaprotocol.ProduceRequest{
TransactionalId: nil,
Acks: -1,
Expand Down
7 changes: 4 additions & 3 deletions agent/produce_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/spirit-labs/tektite/apiclient"
"github.com/spirit-labs/tektite/asl/encoding"
"github.com/spirit-labs/tektite/common"
"github.com/spirit-labs/tektite/compress"
"github.com/spirit-labs/tektite/control"
"github.com/spirit-labs/tektite/iteration"
"github.com/spirit-labs/tektite/kafkaencoding"
Expand Down Expand Up @@ -380,6 +381,8 @@ func TestProduceSimpleWithReload(t *testing.T) {
func setupAgentWithArgs(t *testing.T, cfg Conf, objStore objstore.Client, inMemMemberships *InMemClusterMemberships, localTransports *transport.LocalTransports) (*Agent, func(t *testing.T)) {
kafkaAddress, err := common.AddressWithPort("localhost")
require.NoError(t, err)
cfg.PusherConf.TableCompressionType = compress.CompressionTypeLz4
cfg.FetcherConf.FetchCompressionType = compress.CompressionTypeLz4
cfg.KafkaListenerConfig.Address = kafkaAddress
transportServer, err := localTransports.NewLocalServer(uuid.New().String())
require.NoError(t, err)
Expand Down Expand Up @@ -507,7 +510,5 @@ func (n *tableGetter) GetSSTable(tableID sst.SSTableID) (*sst.SSTable, error) {
if err != nil {
return nil, err
}
var table sst.SSTable
table.Deserialize(buff, 0)
return &table, nil
return sst.GetSSTableFromBytes(buff)
}
36 changes: 36 additions & 0 deletions compress/compression_bench_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package compress

import "testing"

func BenchmarkCompressGzip(b *testing.B) {
benchmarkCompress(b, CompressionTypeGzip)
}

func BenchmarkCompressSnappy(b *testing.B) {
benchmarkCompress(b, CompressionTypeSnappy)
}

func BenchmarkCompressLz4(b *testing.B) {
benchmarkCompress(b, CompressionTypeLz4)
}

func BenchmarkCompressZstd(b *testing.B) {
benchmarkCompress(b, CompressionTypeZstd)
}

func benchmarkCompress(b *testing.B, compressionType CompressionType) {
buff := randomBytes(100000)
for i := 0; i < b.N; i++ {
compressed, err := Compress(compressionType, nil, buff)
if err != nil {
panic(err)
}
decompressed, err := Decompress(compressionType, compressed)
if err != nil {
panic(err)
}
if len(decompressed) != len(buff) {
panic("wrong compressed size")
}
}
}
124 changes: 124 additions & 0 deletions compress/compressor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
package compress

import (
"bytes"
"compress/gzip"
"github.com/DataDog/zstd"
"github.com/klauspost/compress/s2"
"github.com/pierrec/lz4/v4"
"github.com/pkg/errors"
"io"
)

type CompressionType byte

const (
CompressionTypeNone CompressionType = 0
CompressionTypeGzip CompressionType = 1
CompressionTypeSnappy CompressionType = 2
CompressionTypeLz4 CompressionType = 3
CompressionTypeZstd CompressionType = 4
CompressionTypeUnknown CompressionType = 255
)

func FromString(str string) CompressionType {
switch str {
case "none":
return CompressionTypeNone
case "gzip":
return CompressionTypeGzip
case "snappy":
return CompressionTypeSnappy
case "lz4":
return CompressionTypeLz4
case "zstd":
return CompressionTypeZstd
default:
return CompressionTypeUnknown
}
}

func (t CompressionType) String() string {
switch t {
case CompressionTypeNone:
return "none"
case CompressionTypeGzip:
return "gzip"
case CompressionTypeSnappy:
return "snappy"
case CompressionTypeLz4:
return "lz4"
case CompressionTypeZstd:
return "zstd"
case CompressionTypeUnknown:
return "unknown"
default:
panic("unknown compression type")
}
}

func Compress(compressionType CompressionType, buff []byte, data []byte) ([]byte, error) {
var w io.WriteCloser
var buf *bytes.Buffer
switch compressionType {
case CompressionTypeGzip:
buf = bytes.NewBuffer(buff)
w = gzip.NewWriter(buf)
if _, err := w.Write(data); err != nil {
return nil, err
}
case CompressionTypeSnappy:
compressed := s2.EncodeSnappy(nil, data)
if buff == nil {
return compressed, nil
}
return append(buff, compressed...), nil
case CompressionTypeLz4:
buf = bytes.NewBuffer(buff)
w = lz4.NewWriter(buf)
if _, err := w.Write(data); err != nil {
return nil, err
}
case CompressionTypeZstd:
compressed, err := zstd.Compress(nil, data)
if err != nil {
return nil, err
}
if buff == nil {
return compressed, nil
}
return append(buff, compressed...), nil
default:
return nil, errors.Errorf("unexpected compression type: %d", compressionType)
}
if err := w.Close(); err != nil {
return nil, err
}
return buf.Bytes(), nil
}

func Decompress(compressionType CompressionType, data []byte) ([]byte, error) {
var r io.Reader
switch compressionType {
case CompressionTypeGzip:
var err error
r, err = gzip.NewReader(bytes.NewReader(data))
if err != nil {
return nil, err
}
case CompressionTypeSnappy:
return s2.Decode(nil, data)
case CompressionTypeLz4:
r = lz4.NewReader(bytes.NewReader(data))
case CompressionTypeZstd:
return zstd.Decompress(nil, data)
default:
return nil, errors.Errorf("unexpected compression type: %d", compressionType)
}
buf := new(bytes.Buffer)
_, err := io.Copy(buf, r)
if err != nil {
return nil, err
}
return buf.Bytes(), nil
}
53 changes: 53 additions & 0 deletions compress/compressor_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package compress

import (
"crypto/rand"
"github.com/stretchr/testify/require"
"testing"
)

func TestCompressionGzip(t *testing.T) {
testCompressor(t, CompressionTypeGzip)
}

func TestCompressionSnappy(t *testing.T) {
testCompressor(t, CompressionTypeSnappy)
}

func TestCompressionLz4(t *testing.T) {
testCompressor(t, CompressionTypeLz4)
}

func TestCompressionZstd(t *testing.T) {
testCompressor(t, CompressionTypeZstd)
}

func testCompressor(t *testing.T, compressionType CompressionType) {
testCompressorWithInitialBytes(t, compressionType, 0)
testCompressorWithInitialBytes(t, compressionType, 100)
}

func testCompressorWithInitialBytes(t *testing.T, compressionType CompressionType, numInitialBytes int) {
data := randomBytes(10000)
var initialBytes []byte
if numInitialBytes > 0 {
initialBytes = randomBytes(numInitialBytes)
}
compressed, err := Compress(compressionType, initialBytes, data)
require.NoError(t, err)
if numInitialBytes > 0 {
require.Equal(t, initialBytes, compressed[:len(initialBytes)])
}
decompressed, err := Decompress(compressionType, compressed[len(initialBytes):])
require.NoError(t, err)
require.Equal(t, data, decompressed)
}

func randomBytes(n int) []byte {
b := make([]byte, n)
_, err := rand.Read(b)
if err != nil {
panic(err)
}
return b
}
Loading

0 comments on commit 0b84be5

Please sign in to comment.