From 8cfdc25c7729a388ddaa9359f473e6d4008dae19 Mon Sep 17 00:00:00 2001 From: Rahul Ashok Date: Thu, 22 Oct 2015 16:39:15 -0500 Subject: [PATCH] #38 - Support gzip compression --- README.md | 6 +++ lib/fluent/plugin/out_kinesis.rb | 18 ++++--- test/plugin/test_out_kinesis.rb | 83 +++++++++++++++++++++++++++++++- 3 files changed, 99 insertions(+), 8 deletions(-) diff --git a/README.md b/README.md index b465427..ae6fac9 100644 --- a/README.md +++ b/README.md @@ -203,6 +203,12 @@ fails all retries an error log will be emitted. Boolean, default is false. In case you find error `Encoding::UndefinedConversionError` with multibyte texts, you can avoid that error with this option. +### zlib_compression + +Boolean, default is false. +Zlib compresses the message data blob. +Each zlib compressed message must remain within megabyte in size. + ### debug Boolean. Enable if you need to debug Amazon Kinesis API call. Default is false. diff --git a/lib/fluent/plugin/out_kinesis.rb b/lib/fluent/plugin/out_kinesis.rb index 317674a..097c58e 100644 --- a/lib/fluent/plugin/out_kinesis.rb +++ b/lib/fluent/plugin/out_kinesis.rb @@ -17,6 +17,7 @@ require 'logger' require 'securerandom' require 'fluent/plugin/version' +require 'zlib' module FluentPluginKinesis class OutputFilter < Fluent::BufferedOutput @@ -56,6 +57,7 @@ class OutputFilter < Fluent::BufferedOutput config_param :order_events, :bool, default: false config_param :retries_on_putrecords, :integer, default: 3 config_param :use_yajl, :bool, default: false + config_param :zlib_compression, :bool, default: false config_param :debug, :bool, default: false @@ -120,15 +122,15 @@ def format(tag, time, record) end def write(chunk) - data_list = chunk.to_enum(:msgpack_each).find_all{|record| - unless record_exceeds_max_size?(record['data']) + data_list = chunk.to_enum(:msgpack_each).map{|record| + build_data_to_put(record) + }.find_all{|record| + unless record_exceeds_max_size?(record[:data]) true else - log.error sprintf('Record exceeds the %.3f KB(s) per-record size limit and will not be delivered: %s', PUT_RECORD_MAX_DATA_SIZE / 1024.0, record['data']) + log.error sprintf('Record exceeds the %.3f KB(s) per-record size limit and will not be delivered: %s', PUT_RECORD_MAX_DATA_SIZE / 1024.0, record[:data]) false end - }.map{|record| - build_data_to_put(record) } if @order_events @@ -220,7 +222,11 @@ def get_key(name, record) end def build_data_to_put(data) - Hash[data.map{|k, v| [k.to_sym, v] }] + if @zlib_compression + Hash[data.map{|k, v| [k.to_sym, k=="data" ? Zlib::Deflate.deflate(v) : v] }] + else + Hash[data.map{|k, v| [k.to_sym, v] }] + end end def put_record_for_order_events(data_list) diff --git a/test/plugin/test_out_kinesis.rb b/test/plugin/test_out_kinesis.rb index ec588dc..9361dfd 100644 --- a/test/plugin/test_out_kinesis.rb +++ b/test/plugin/test_out_kinesis.rb @@ -30,6 +30,14 @@ def setup use_yajl true ] + CONFIG_WITH_COMPRESSION = CONFIG + %[ + zlib_compression true + ] + + CONFIG_YAJL_WITH_COMPRESSION = CONFIG_YAJL + %[ + zlib_compression true + ] + def create_driver(conf = CONFIG, tag='test') Fluent::Test::BufferedOutputTestDriver .new(FluentPluginKinesis::OutputFilter, tag).configure(conf) @@ -237,7 +245,7 @@ def test_mode_configuration data("json"=>CONFIG, "yajl"=>CONFIG_YAJL) - def test_format(config) + def test_format_without_compression(config) d = create_driver(config) @@ -276,6 +284,46 @@ def test_format(config) d.run end + data("json"=>CONFIG_WITH_COMPRESSION, "yajl"=>CONFIG_YAJL_WITH_COMPRESSION) + def test_format_with_compression(config) + + d = create_driver(config) + + data1 = {"test_partition_key"=>"key1","a"=>1,"time"=>"2011-01-02T13:14:15Z","tag"=>"test"} + data2 = {"test_partition_key"=>"key2","a"=>2,"time"=>"2011-01-02T13:14:15Z","tag"=>"test"} + + time = Time.parse("2011-01-02 13:14:15 UTC").to_i + d.emit(data1, time) + d.emit(data2, time) + + d.expect_format({ + 'data' => data1.to_json, + 'partition_key' => 'key1' }.to_msgpack + ) + d.expect_format({ + 'data' => data2.to_json, + 'partition_key' => 'key2' }.to_msgpack + ) + + client = create_mock_client + client.describe_stream(stream_name: 'test_stream') + client.put_records( + stream_name: 'test_stream', + records: [ + { + data: Zlib::Deflate.deflate(data1.to_json), + partition_key: 'key1' + }, + { + data: Zlib::Deflate.deflate(data2.to_json), + partition_key: 'key2' + } + ] + ) { {} } + + d.run + end + def test_order_events d = create_driver(CONFIG + "\norder_events true") @@ -349,7 +397,7 @@ def test_format_at_lowlevel_with_more_options ) end - def test_multibyte_with_yajl + def test_multibyte_with_yajl_without_compression d = create_driver(CONFIG_YAJL) @@ -380,6 +428,37 @@ def test_multibyte_with_yajl d.run end + def test_multibyte_with_yajl_with_compression + + d = create_driver(CONFIG_YAJL_WITH_COMPRESSION) + + data1 = {"test_partition_key"=>"key1","a"=>"\xE3\x82\xA4\xE3\x83\xB3\xE3\x82\xB9\xE3\x83\x88\xE3\x83\xBC\xE3\x83\xAB","time"=>"2011-01-02T13:14:15Z","tag"=>"test"} + json = Yajl.dump(data1) + data1["a"].force_encoding("ASCII-8BIT") + + time = Time.parse("2011-01-02 13:14:15 UTC").to_i + d.emit(data1, time) + + d.expect_format({ + 'data' => json, + 'partition_key' => 'key1' }.to_msgpack + ) + + client = create_mock_client + client.describe_stream(stream_name: 'test_stream') + client.put_records( + stream_name: 'test_stream', + records: [ + { + data: Zlib::Deflate.deflate(json), + partition_key: 'key1' + } + ] + ) { {} } + + d.run + end + def test_get_key d = create_driver assert_equal(