Skip to content

Commit

Permalink
buf_file: Add path_suffix parameter. ref fluent#2236
Browse files Browse the repository at this point in the history
Signed-off-by: Masahiro Nakagawa <repeatedly@gmail.com>
  • Loading branch information
repeatedly authored and ganmacs committed Jul 30, 2019
1 parent cce3112 commit 358b69b
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 17 deletions.
12 changes: 7 additions & 5 deletions lib/fluent/plugin/buf_file.rb
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ class FileBuffer < Fluent::Plugin::Buffer

desc 'The path where buffer chunks are stored.'
config_param :path, :string, default: nil
desc 'The suffix of buffer chunks'
config_param :path_suffix, :string, default: '.log'

config_set_default :chunk_limit_size, DEFAULT_CHUNK_LIMIT_SIZE
config_set_default :total_limit_size, DEFAULT_TOTAL_LIMIT_SIZE
Expand Down Expand Up @@ -78,23 +80,23 @@ def configure(conf)

if specified_directory_exists || unexisting_path_for_directory # directory
if using_plugin_root_dir || !multi_workers_configured
@path = File.join(@path, 'buffer.*.log')
@path = File.join(@path, "buffer.*#{@path_suffix}")
else
@path = File.join(@path, "worker#{fluentd_worker_id}", 'buffer.*.log')
@path = File.join(@path, "worker#{fluentd_worker_id}", "buffer.*#{@path_suffix}")
if fluentd_worker_id == 0
# worker 0 always checks unflushed buffer chunks to be resumed (might be created while non-multi-worker configuration)
@additional_resume_path = File.join(File.expand_path("../../", @path), 'buffer.*.log')
@additional_resume_path = File.join(File.expand_path("../../", @path), "buffer.*#{@path_suffix}")
end
end
@multi_workers_available = true
else # specified path is file path
if File.basename(@path).include?('.*.')
# valid file path
elsif File.basename(@path).end_with?('.*')
@path = @path + '.log'
@path = @path + @path_suffix
else
# existing file will be ignored
@path = @path + '.*.log'
@path = @path + ".*#{@path_suffix}"
end
@multi_workers_available = false
end
Expand Down
40 changes: 28 additions & 12 deletions test/plugin/test_buf_file.rb
Original file line number Diff line number Diff line change
Expand Up @@ -57,20 +57,30 @@ def write_metadata(path, chunk_id, metadata, size, ctime, mtime)
assert_equal File.join(@dir, 'buffer.*.file'), p.path
end

test 'existing directory will be used with additional default file name' do
data('default' => [nil, 'log'],
'conf' => ['.buf', 'buf'])
test 'existing directory will be used with additional default file name' do |params|
conf, suffix = params
d = FluentPluginFileBufferTest::DummyOutputPlugin.new
p = Fluent::Plugin::FileBuffer.new
p.owner = d
p.configure(config_element('buffer', '', {'path' => @dir}))
assert_equal File.join(@dir, 'buffer.*.log'), p.path
c = {'path' => @dir}
c['path_suffix'] = conf if conf
p.configure(config_element('buffer', '', c))
assert_equal File.join(@dir, "buffer.*.#{suffix}"), p.path
end

test 'unexisting path without * handled as directory' do
data('default' => [nil, 'log'],
'conf' => ['.buf', 'buf'])
test 'unexisting path without * handled as directory' do |params|
conf, suffix = params
d = FluentPluginFileBufferTest::DummyOutputPlugin.new
p = Fluent::Plugin::FileBuffer.new
p.owner = d
p.configure(config_element('buffer', '', {'path' => File.join(@dir, 'buffer')}))
assert_equal File.join(@dir, 'buffer', 'buffer.*.log'), p.path
c = {'path' => File.join(@dir, 'buffer')}
c['path_suffix'] = conf if conf
p.configure(config_element('buffer', '', c))
assert_equal File.join(@dir, 'buffer', "buffer.*.#{suffix}"), p.path
end
end

Expand Down Expand Up @@ -312,10 +322,6 @@ def write_metadata(path, chunk_id, metadata, size, ctime, mtime)
@d = FluentPluginFileBufferTest::DummyOutputPlugin.new
@p = Fluent::Plugin::FileBuffer.new
@p.owner = @d
Fluent::SystemConfig.overwrite_system_config('root_dir' => @root_dir) do
@d.configure(config_element('ROOT', '', {'@id' => 'dummy_output_with_buf'}))
@p.configure(config_element('buffer', ''))
end
end

teardown do
Expand All @@ -329,8 +335,18 @@ def write_metadata(path, chunk_id, metadata, size, ctime, mtime)
end
end

test '#start creates directory for buffer chunks' do
expected_buffer_path = File.join(@root_dir, 'worker0', 'dummy_output_with_buf', 'buffer', 'buffer.*.log')
data('default' => [nil, 'log'],
'conf' => ['.buf', 'buf'])
test '#start creates directory for buffer chunks' do |params|
conf, suffix = params
c = {}
c['path_suffix'] = conf if conf
Fluent::SystemConfig.overwrite_system_config('root_dir' => @root_dir) do
@d.configure(config_element('ROOT', '', {'@id' => 'dummy_output_with_buf'}))
@p.configure(config_element('buffer', '', c))
end

expected_buffer_path = File.join(@root_dir, 'worker0', 'dummy_output_with_buf', 'buffer', "buffer.*.#{suffix}")
expected_buffer_dir = File.dirname(expected_buffer_path)
assert_equal expected_buffer_path, @p.path
assert_false Dir.exist?(expected_buffer_dir)
Expand Down

0 comments on commit 358b69b

Please sign in to comment.