Skip to content

Commit

Permalink
awslabs#38 - Support gzip compression
Browse files Browse the repository at this point in the history
  • Loading branch information
rahulashok authored and Rahul Ashok committed Oct 28, 2015
1 parent cc3bff8 commit a9577f4
Show file tree
Hide file tree
Showing 3 changed files with 99 additions and 8 deletions.
6 changes: 6 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,12 @@ fails all retries an error log will be emitted.
Boolean, default is false.
In case you find error `Encoding::UndefinedConversionError` with multibyte texts, you can avoid that error with this option.

### gzip_compression

Boolean, default is false.
Gzip compresses the message data blob.
Each gzip compressed message must remain within megabyte in size.

### debug

Boolean. Enable if you need to debug Amazon Kinesis API call. Default is false.
Expand Down
18 changes: 12 additions & 6 deletions lib/fluent/plugin/out_kinesis.rb
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
require 'logger'
require 'securerandom'
require 'fluent/plugin/version'
require 'zlib'

module FluentPluginKinesis
class OutputFilter < Fluent::BufferedOutput
Expand Down Expand Up @@ -56,6 +57,7 @@ class OutputFilter < Fluent::BufferedOutput
config_param :order_events, :bool, default: false
config_param :retries_on_putrecords, :integer, default: 3
config_param :use_yajl, :bool, default: false
config_param :gzip_compression, :bool, default: false

config_param :debug, :bool, default: false

Expand Down Expand Up @@ -120,15 +122,15 @@ def format(tag, time, record)
end

def write(chunk)
data_list = chunk.to_enum(:msgpack_each).find_all{|record|
unless record_exceeds_max_size?(record['data'])
data_list = chunk.to_enum(:msgpack_each).map{|record|
build_data_to_put(record)
}.find_all{|record|
unless record_exceeds_max_size?(record[:data])
true
else
log.error sprintf('Record exceeds the %.3f KB(s) per-record size limit and will not be delivered: %s', PUT_RECORD_MAX_DATA_SIZE / 1024.0, record['data'])
log.error sprintf('Record exceeds the %.3f KB(s) per-record size limit and will not be delivered: %s', PUT_RECORD_MAX_DATA_SIZE / 1024.0, record[:data])
false
end
}.map{|record|
build_data_to_put(record)
}

if @order_events
Expand Down Expand Up @@ -220,7 +222,11 @@ def get_key(name, record)
end

def build_data_to_put(data)
Hash[data.map{|k, v| [k.to_sym, v] }]
if @gzip_compression
Hash[data.map{|k, v| [k.to_sym, k=="data" ? Zlib::Deflate.deflate(v) : v] }]
else
Hash[data.map{|k, v| [k.to_sym, v] }]
end
end

def put_record_for_order_events(data_list)
Expand Down
83 changes: 81 additions & 2 deletions test/plugin/test_out_kinesis.rb
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,14 @@ def setup
use_yajl true
]

CONFIG_WITH_COMPRESSION = CONFIG + %[
gzip_compression true
]

CONFIG_YAJL_WITH_COMPRESSION = CONFIG_YAJL + %[
gzip_compression true
]

def create_driver(conf = CONFIG, tag='test')
Fluent::Test::BufferedOutputTestDriver
.new(FluentPluginKinesis::OutputFilter, tag).configure(conf)
Expand Down Expand Up @@ -237,7 +245,7 @@ def test_mode_configuration


data("json"=>CONFIG, "yajl"=>CONFIG_YAJL)
def test_format(config)
def test_format_without_compression(config)

d = create_driver(config)

Expand Down Expand Up @@ -276,6 +284,46 @@ def test_format(config)
d.run
end

data("json"=>CONFIG_WITH_COMPRESSION, "yajl"=>CONFIG_YAJL_WITH_COMPRESSION)
def test_format_with_compression(config)

d = create_driver(config)

data1 = {"test_partition_key"=>"key1","a"=>1,"time"=>"2011-01-02T13:14:15Z","tag"=>"test"}
data2 = {"test_partition_key"=>"key2","a"=>2,"time"=>"2011-01-02T13:14:15Z","tag"=>"test"}

time = Time.parse("2011-01-02 13:14:15 UTC").to_i
d.emit(data1, time)
d.emit(data2, time)

d.expect_format({
'data' => data1.to_json,
'partition_key' => 'key1' }.to_msgpack
)
d.expect_format({
'data' => data2.to_json,
'partition_key' => 'key2' }.to_msgpack
)

client = create_mock_client
client.describe_stream(stream_name: 'test_stream')
client.put_records(
stream_name: 'test_stream',
records: [
{
data: Zlib::Deflate.deflate(data1.to_json),
partition_key: 'key1'
},
{
data: Zlib::Deflate.deflate(data2.to_json),
partition_key: 'key2'
}
]
) { {} }

d.run
end

def test_order_events

d = create_driver(CONFIG + "\norder_events true")
Expand Down Expand Up @@ -349,7 +397,7 @@ def test_format_at_lowlevel_with_more_options
)
end

def test_multibyte_with_yajl
def test_multibyte_with_yajl_without_compression

d = create_driver(CONFIG_YAJL)

Expand Down Expand Up @@ -380,6 +428,37 @@ def test_multibyte_with_yajl
d.run
end

def test_multibyte_with_yajl_with_compression

d = create_driver(CONFIG_YAJL_WITH_COMPRESSION)

data1 = {"test_partition_key"=>"key1","a"=>"\xE3\x82\xA4\xE3\x83\xB3\xE3\x82\xB9\xE3\x83\x88\xE3\x83\xBC\xE3\x83\xAB","time"=>"2011-01-02T13:14:15Z","tag"=>"test"}
json = Yajl.dump(data1)
data1["a"].force_encoding("ASCII-8BIT")

time = Time.parse("2011-01-02 13:14:15 UTC").to_i
d.emit(data1, time)

d.expect_format({
'data' => json,
'partition_key' => 'key1' }.to_msgpack
)

client = create_mock_client
client.describe_stream(stream_name: 'test_stream')
client.put_records(
stream_name: 'test_stream',
records: [
{
data: Zlib::Deflate.deflate(json),
partition_key: 'key1'
}
]
) { {} }

d.run
end

def test_get_key
d = create_driver
assert_equal(
Expand Down

0 comments on commit a9577f4

Please sign in to comment.