Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support zstd compression #724

Merged
merged 2 commits into from
Apr 29, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -424,6 +424,7 @@ Compression is enabled by passing the `compression_codec` parameter to `#produce
* `:snappy` for [Snappy](http://google.github.io/snappy/) compression.
* `:gzip` for [gzip](https://en.wikipedia.org/wiki/Gzip) compression.
* `:lz4` for [LZ4](https://en.wikipedia.org/wiki/LZ4_(compression_algorithm)) compression.
* `:zstd` for [zstd](https://facebook.github.io/zstd/) compression.

By default, all message sets will be compressed if you specify a compression codec. To increase the compression threshold, set `compression_threshold` to an integer value higher than one.

Expand Down
4 changes: 2 additions & 2 deletions lib/kafka/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -233,8 +233,8 @@ def deliver_message(value, key: nil, headers: {}, topic:, partition: nil, partit
# result in {BufferOverflow} being raised.
#
# @param compression_codec [Symbol, nil] the name of the compression codec to
# use, or nil if no compression should be performed. Valid codecs: `:snappy`
# and `:gzip`.
# use, or nil if no compression should be performed. Valid codecs: `:snappy`,
# `:gzip`, `:lz4`, `:zstd`
#
# @param compression_threshold [Integer] the number of messages that needs to
# be in a message set before it should be compressed. Note that message sets
Expand Down
24 changes: 13 additions & 11 deletions lib/kafka/compression.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,27 +3,27 @@
require "kafka/snappy_codec"
require "kafka/gzip_codec"
require "kafka/lz4_codec"
require "kafka/zstd_codec"

module Kafka
module Compression
CODEC_NAMES = {
1 => :gzip,
2 => :snappy,
3 => :lz4,
}.freeze

CODECS = {
CODECS_BY_NAME = {
:gzip => GzipCodec.new,
:snappy => SnappyCodec.new,
:lz4 => LZ4Codec.new,
:zstd => ZstdCodec.new,
}.freeze

CODECS_BY_ID = CODECS_BY_NAME.each_with_object({}) do |(_, codec), hash|
hash[codec.codec_id] = codec
end.freeze

def self.codecs
CODECS.keys
CODECS_BY_NAME.keys
end

def self.find_codec(name)
codec = CODECS.fetch(name) do
codec = CODECS_BY_NAME.fetch(name) do
raise "Unknown compression codec #{name}"
end

Expand All @@ -33,11 +33,13 @@ def self.find_codec(name)
end

def self.find_codec_by_id(codec_id)
codec_name = CODEC_NAMES.fetch(codec_id) do
codec = CODECS_BY_ID.fetch(codec_id) do
raise "Unknown codec id #{codec_id}"
end

find_codec(codec_name)
codec.load

codec
end
end
end
1 change: 1 addition & 0 deletions lib/kafka/compressor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ module Kafka
# * `compressed_bytesize` – the byte size of the compressed data.
#
class Compressor
attr_reader :codec

# @param codec_name [Symbol, nil]
# @param threshold [Integer] the minimum number of messages in a message set
Expand Down
4 changes: 4 additions & 0 deletions lib/kafka/gzip_codec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@ def codec_id
1
end

def produce_api_min_version
0
end

def load
require "zlib"
end
Expand Down
4 changes: 4 additions & 0 deletions lib/kafka/lz4_codec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@ def codec_id
3
end

def produce_api_min_version
0
end

def load
require "extlz4"
rescue LoadError
Expand Down
2 changes: 2 additions & 0 deletions lib/kafka/producer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ module Kafka
#
# * `:snappy` for [Snappy](http://google.github.io/snappy/) compression.
# * `:gzip` for [gzip](https://en.wikipedia.org/wiki/Gzip) compression.
# * `:lz4` for [LZ4](https://en.wikipedia.org/wiki/LZ4_(compression_algorithm)) compression.
# * `:zstd` for [zstd](https://facebook.github.io/zstd/) compression.
#
# By default, all message sets will be compressed if you specify a compression
# codec. To increase the compression threshold, set `compression_threshold` to
Expand Down
4 changes: 3 additions & 1 deletion lib/kafka/protocol/produce_request.rb
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ module Protocol
# Value => bytes
#
class ProduceRequest
API_MIN_VERSION = 3

attr_reader :transactional_id, :required_acks, :timeout, :messages_for_topics, :compressor

# @param required_acks [Integer]
Expand All @@ -45,7 +47,7 @@ def api_key
end

def api_version
3
compressor.codec.nil? ? API_MIN_VERSION : [compressor.codec.produce_api_min_version, API_MIN_VERSION].max
end

def response_class
Expand Down
4 changes: 4 additions & 0 deletions lib/kafka/snappy_codec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@ def codec_id
2
end

def produce_api_min_version
0
end

def load
require "snappy"
rescue LoadError
Expand Down
27 changes: 27 additions & 0 deletions lib/kafka/zstd_codec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# frozen_string_literal: true

module Kafka
class ZstdCodec
def codec_id
4
end

def produce_api_min_version
7
end

def load
require "zstd-ruby"
rescue LoadError
raise LoadError, "using zstd compression requires adding a dependency on the `zstd-ruby` gem to your Gemfile."
end

def compress(data)
Zstd.compress(data)
end

def decompress(data)
Zstd.decompress(data)
end
end
end
1 change: 1 addition & 0 deletions ruby-kafka.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ Gem::Specification.new do |spec|
spec.add_development_dependency "activesupport"
spec.add_development_dependency "snappy"
spec.add_development_dependency "extlz4"
spec.add_development_dependency "zstd-ruby"
spec.add_development_dependency "colored"
spec.add_development_dependency "rspec_junit_formatter", "0.2.2"
spec.add_development_dependency "dogstatsd-ruby", ">= 3.0.0", "< 5.0.0"
Expand Down
62 changes: 22 additions & 40 deletions spec/functional/compression_spec.rb
Original file line number Diff line number Diff line change
@@ -1,54 +1,36 @@
# frozen_string_literal: true

require "snappy"

describe "Compression", functional: true do
let!(:topic) { create_random_topic(num_partitions: 3) }

example "producing and consuming snappy-compressed messages" do
producer = kafka.producer(
compression_codec: :snappy,
max_retries: 0,
retry_backoff: 0
)

last_offset = fetch_last_offset

producer.produce("message1", topic: topic, partition: 0)
producer.produce("message2", topic: topic, partition: 0)

producer.deliver_messages

messages = kafka.fetch_messages(
topic: topic,
partition: 0,
offset: last_offset + 1,
)

expect(messages.last(2).map(&:value)).to eq ["message1", "message2"]
end
Kafka::Compression.codecs.each do |codec_name|
example "producing and consuming #{codec_name}-compressed messages" do
codec = Kafka::Compression.find_codec(codec_name)
unless kafka.supports_api?(Kafka::Protocol::PRODUCE_API, codec.produce_api_min_version)
skip("This Kafka version does not support #{codec_name}")
end

example "producing and consuming gzip-compressed messages" do
producer = kafka.producer(
compression_codec: :gzip,
max_retries: 0,
retry_backoff: 0
)
producer = kafka.producer(
compression_codec: codec_name,
max_retries: 0,
retry_backoff: 0
)

last_offset = fetch_last_offset
last_offset = fetch_last_offset

producer.produce("message1", topic: topic, partition: 0)
producer.produce("message2", topic: topic, partition: 0)
producer.produce("message1", topic: topic, partition: 0)
producer.produce("message2", topic: topic, partition: 0)

producer.deliver_messages
producer.deliver_messages

messages = kafka.fetch_messages(
topic: topic,
partition: 0,
offset: last_offset + 1,
)
messages = kafka.fetch_messages(
topic: topic,
partition: 0,
offset: last_offset + 1,
)

expect(messages.last(2).map(&:value)).to eq ["message1", "message2"]
expect(messages.last(2).map(&:value)).to eq ["message1", "message2"]
end
end

def fetch_last_offset
Expand Down
42 changes: 42 additions & 0 deletions spec/protocol/record_batch_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,27 @@
].flatten
end

let(:sample_record_batch_zstd_bytes) do
[
# First offset
0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1,
# Record Batch Length
0x0, 0x0, 0x0, 0x67,
# Partition Leader Epoch
0x0, 0x0, 0x0, 0x2,
# Magic byte
0x2,
# CRC
0x0, 0x0, 0x0, 0x0,
# Attributes
0x0, 0b00110100,
sample_record_batch_metadata_bytes,
Kafka::ZstdCodec.new.compress(
(record_1_bytes + record_2_bytes).pack("C*")
).bytes
].flatten
end

context '#encode' do
let(:buffer) { StringIO.new }
let(:encoder) { Kafka::Protocol::Encoder.new(buffer) }
Expand Down Expand Up @@ -271,6 +292,15 @@
expect(strip_crc(buffer.string.bytes)).to eql sample_record_batch_lz4_bytes
end
end

context 'Compress with Zstd' do
let(:codec_id) { 4 }

it 'encodes the record batch using zstd compressor' do
sample_record_batch.encode(encoder)
expect(strip_crc(buffer.string.bytes)).to eql sample_record_batch_zstd_bytes
end
end
end

context '.decode' do
Expand Down Expand Up @@ -343,6 +373,18 @@
expect_matched_records(record_batch.records)
end
end

context 'Compress with Zstd' do
let(:decoder) do
Kafka::Protocol::Decoder.new(byte_array_to_io(sample_record_batch_zstd_bytes))
end

it 'decodes records with Zstd decompressor' do
record_batch = Kafka::Protocol::RecordBatch.decode(decoder)
expect_matched_batch_metadata(record_batch)
expect_matched_records(record_batch.records)
end
end
end
end

Expand Down
1 change: 1 addition & 0 deletions spec/spec_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
require "securerandom"
require 'snappy'
require 'extlz4'
require 'zstd-ruby'

Dotenv.load

Expand Down