diff --git a/lib/fluent/plugin/buf_file.rb b/lib/fluent/plugin/buf_file.rb index 04fc24394e..98f9e850d8 100644 --- a/lib/fluent/plugin/buf_file.rb +++ b/lib/fluent/plugin/buf_file.rb @@ -34,6 +34,8 @@ class FileBuffer < Fluent::Plugin::Buffer desc 'The path where buffer chunks are stored.' config_param :path, :string, default: nil + desc 'The suffix of buffer chunks' + config_param :path_suffix, :string, default: '.log' config_set_default :chunk_limit_size, DEFAULT_CHUNK_LIMIT_SIZE config_set_default :total_limit_size, DEFAULT_TOTAL_LIMIT_SIZE @@ -78,12 +80,12 @@ def configure(conf) if specified_directory_exists || unexisting_path_for_directory # directory if using_plugin_root_dir || !multi_workers_configured - @path = File.join(@path, 'buffer.*.log') + @path = File.join(@path, "buffer.*#{@path_suffix}") else - @path = File.join(@path, "worker#{fluentd_worker_id}", 'buffer.*.log') + @path = File.join(@path, "worker#{fluentd_worker_id}", "buffer.*#{@path_suffix}") if fluentd_worker_id == 0 # worker 0 always checks unflushed buffer chunks to be resumed (might be created while non-multi-worker configuration) - @additional_resume_path = File.join(File.expand_path("../../", @path), 'buffer.*.log') + @additional_resume_path = File.join(File.expand_path("../../", @path), "buffer.*#{@path_suffix}") end end @multi_workers_available = true @@ -91,10 +93,10 @@ def configure(conf) if File.basename(@path).include?('.*.') # valid file path elsif File.basename(@path).end_with?('.*') - @path = @path + '.log' + @path = @path + @path_suffix else # existing file will be ignored - @path = @path + '.*.log' + @path = @path + ".*#{@path_suffix}" end @multi_workers_available = false end diff --git a/test/plugin/test_buf_file.rb b/test/plugin/test_buf_file.rb index 958c930685..4a341e8143 100644 --- a/test/plugin/test_buf_file.rb +++ b/test/plugin/test_buf_file.rb @@ -57,20 +57,30 @@ def write_metadata(path, chunk_id, metadata, size, ctime, mtime) assert_equal File.join(@dir, 'buffer.*.file'), p.path end - test 'existing directory will be used with additional default file name' do + data('default' => [nil, 'log'], + 'conf' => ['.buf', 'buf']) + test 'existing directory will be used with additional default file name' do |params| + conf, suffix = params d = FluentPluginFileBufferTest::DummyOutputPlugin.new p = Fluent::Plugin::FileBuffer.new p.owner = d - p.configure(config_element('buffer', '', {'path' => @dir})) - assert_equal File.join(@dir, 'buffer.*.log'), p.path + c = {'path' => @dir} + c['path_suffix'] = conf if conf + p.configure(config_element('buffer', '', c)) + assert_equal File.join(@dir, "buffer.*.#{suffix}"), p.path end - test 'unexisting path without * handled as directory' do + data('default' => [nil, 'log'], + 'conf' => ['.buf', 'buf']) + test 'unexisting path without * handled as directory' do |params| + conf, suffix = params d = FluentPluginFileBufferTest::DummyOutputPlugin.new p = Fluent::Plugin::FileBuffer.new p.owner = d - p.configure(config_element('buffer', '', {'path' => File.join(@dir, 'buffer')})) - assert_equal File.join(@dir, 'buffer', 'buffer.*.log'), p.path + c = {'path' => File.join(@dir, 'buffer')} + c['path_suffix'] = conf if conf + p.configure(config_element('buffer', '', c)) + assert_equal File.join(@dir, 'buffer', "buffer.*.#{suffix}"), p.path end end @@ -312,10 +322,6 @@ def write_metadata(path, chunk_id, metadata, size, ctime, mtime) @d = FluentPluginFileBufferTest::DummyOutputPlugin.new @p = Fluent::Plugin::FileBuffer.new @p.owner = @d - Fluent::SystemConfig.overwrite_system_config('root_dir' => @root_dir) do - @d.configure(config_element('ROOT', '', {'@id' => 'dummy_output_with_buf'})) - @p.configure(config_element('buffer', '')) - end end teardown do @@ -329,8 +335,18 @@ def write_metadata(path, chunk_id, metadata, size, ctime, mtime) end end - test '#start creates directory for buffer chunks' do - expected_buffer_path = File.join(@root_dir, 'worker0', 'dummy_output_with_buf', 'buffer', 'buffer.*.log') + data('default' => [nil, 'log'], + 'conf' => ['.buf', 'buf']) + test '#start creates directory for buffer chunks' do |params| + conf, suffix = params + c = {} + c['path_suffix'] = conf if conf + Fluent::SystemConfig.overwrite_system_config('root_dir' => @root_dir) do + @d.configure(config_element('ROOT', '', {'@id' => 'dummy_output_with_buf'})) + @p.configure(config_element('buffer', '', c)) + end + + expected_buffer_path = File.join(@root_dir, 'worker0', 'dummy_output_with_buf', 'buffer', "buffer.*.#{suffix}") expected_buffer_dir = File.dirname(expected_buffer_path) assert_equal expected_buffer_path, @p.path assert_false Dir.exist?(expected_buffer_dir)