-
Notifications
You must be signed in to change notification settings - Fork 1.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merge ValuesParser and TypeConverter into Parser #1286
Changes from 8 commits
e8a4d6b
b9bd8da
366b707
d220fa1
23e3b02
801dda4
f4e52b8
fdbc981
defa5cf
3088387
4a96091
06c8387
b12ed03
e44dbab
c0b5e6e
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -18,6 +18,9 @@ | |
require 'fluent/plugin/parser' | ||
require 'fluent/mixin' | ||
|
||
require 'fluent/config' | ||
require 'fluent/compat/type_converter' | ||
|
||
require 'fluent/plugin/parser_regexp' | ||
require 'fluent/plugin/parser_json' | ||
require 'fluent/plugin/parser_tsv' | ||
|
@@ -56,13 +59,14 @@ def configure(conf, required=true) | |
format = conf['format'] | ||
|
||
@parser = TextParser.lookup(format) | ||
if ! @estimate_current_event.nil? && @parser.respond_to?(:'estimate_current_event=') | ||
@parser.estimate_current_event = @estimate_current_event | ||
end | ||
|
||
if @parser.respond_to?(:configure) | ||
@parser.configure(conf) | ||
end | ||
if !@estimate_current_event.nil? && @parser.respond_to?(:'estimate_current_event=') | ||
# external code sets parser.estimate_current_event = false | ||
@parser.estimate_current_event = @estimate_current_event | ||
end | ||
|
||
return true | ||
end | ||
|
@@ -116,48 +120,170 @@ def self.lookup(format) | |
end | ||
end | ||
|
||
class TimeParser < Fluent::Plugin::Parser::TimeParser | ||
module TypeConverterCompatParameters | ||
def convert_type_converter_parameters!(conf) | ||
if conf["types"] | ||
delimiter = conf["types_delimiter"] || ',' | ||
label_delimiter = conf["types_label_delimiter"] || ':' | ||
types = {} | ||
conf['types'].split(delimiter).each do |pair| | ||
key, value = pair.split(label_delimiter, 2) | ||
if value.start_with?("time#{label_delimiter}") | ||
value = value.split(label_delimiter, 2).join(':') | ||
end | ||
types[key] = value | ||
end | ||
conf["types"] = JSON.dump(types) | ||
end | ||
end | ||
end | ||
|
||
class TimeParser < Fluent::TimeParser | ||
# TODO: warn when deprecated | ||
end | ||
|
||
class RegexpParser < Fluent::Plugin::RegexpParser | ||
include TypeConverterCompatParameters | ||
|
||
# TODO: warn when deprecated | ||
def initialize(regexp, conf = {}) | ||
super() | ||
|
||
@stored_regexp = regexp | ||
@manually_configured = false | ||
unless conf.empty? | ||
unless conf.is_a?(Config::Element) | ||
conf = Config::Element.new('default_regexp_conf', '', conf, []) | ||
end | ||
configure(conf) | ||
conf_init = if conf.is_a?(Fluent::Config::Element) | ||
conf | ||
else | ||
Fluent::Config::Element.new('parse', '', conf, []) | ||
end | ||
self.configure(conf_init) | ||
@manually_configured = true | ||
end | ||
end | ||
|
||
def configure(conf) | ||
return if @manually_configured # not to run twice | ||
|
||
conf['expression'] ||= @stored_regexp.source | ||
convert_type_converter_parameters!(conf) | ||
|
||
@regexp = regexp | ||
super | ||
end | ||
|
||
def patterns | ||
{'format' => @regexp, 'time_format' => @time_format} | ||
end | ||
end | ||
|
||
class ValuesParser < Fluent::Plugin::ValuesParser | ||
# TODO: warn when deprecated | ||
class ValuesParser < Parser | ||
include Fluent::Compat::TypeConverter | ||
|
||
config_param :keys, :array, default: [] | ||
config_param :time_key, :string, default: nil | ||
config_param :null_value_pattern, :string, default: nil | ||
config_param :null_empty_string, :bool, default: false | ||
|
||
def configure(conf) | ||
super | ||
|
||
if @time_key && !@keys.include?(@time_key) && @estimate_current_event | ||
raise Fluent::ConfigError, "time_key (#{@time_key.inspect}) is not included in keys (#{@keys.inspect})" | ||
end | ||
|
||
if @time_format && !@time_key | ||
raise Fluent::ConfigError, "time_format parameter is ignored because time_key parameter is not set. at #{conf.inspect}" | ||
end | ||
|
||
@time_parser = time_parser_create | ||
|
||
if @null_value_pattern | ||
@null_value_pattern = Regexp.new(@null_value_pattern) | ||
end | ||
|
||
@mutex = Mutex.new | ||
end | ||
|
||
def values_map(values) | ||
record = Hash[keys.zip(values.map { |value| convert_value_to_nil(value) })] | ||
|
||
if @time_key | ||
value = @keep_time_key ? record[@time_key] : record.delete(@time_key) | ||
time = if value.nil? | ||
if @estimate_current_event | ||
Fluent::EventTime.now | ||
else | ||
nil | ||
end | ||
else | ||
@mutex.synchronize { @time_parser.parse(value) } | ||
end | ||
elsif @estimate_current_event | ||
time = Fluent::EventTime.now | ||
else | ||
time = nil | ||
end | ||
|
||
convert_field_type!(record) if @type_converters | ||
|
||
return time, record | ||
end | ||
|
||
private | ||
|
||
def convert_field_type!(record) | ||
@type_converters.each_key { |key| | ||
if value = record[key] | ||
record[key] = convert_type(key, value) | ||
end | ||
} | ||
end | ||
|
||
def convert_value_to_nil(value) | ||
if value and @null_empty_string | ||
value = (value == '') ? nil : value | ||
end | ||
if value and @null_value_pattern | ||
value = ::Fluent::StringUtil.match_regexp(@null_value_pattern, value) ? nil : value | ||
end | ||
value | ||
end | ||
end | ||
|
||
class JSONParser < Fluent::Plugin::JSONParser | ||
include TypeConverterCompatParameters | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes. It is a subclass of There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. JSONParser doesn't use TypeConverter feature. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, I missed to update code of JSONParser. I pushed some commits including one to do it. |
||
# TODO: warn when deprecated | ||
def configure(conf) | ||
convert_type_converter_parameters!(conf) | ||
super | ||
end | ||
end | ||
|
||
class TSVParser < Fluent::Plugin::TSVParser | ||
include TypeConverterCompatParameters | ||
# TODO: warn when deprecated | ||
def configure(conf) | ||
convert_type_converter_parameters!(conf) | ||
super | ||
end | ||
end | ||
|
||
class LabeledTSVParser < Fluent::Plugin::LabeledTSVParser | ||
include TypeConverterCompatParameters | ||
# TODO: warn when deprecated | ||
def configure(conf) | ||
convert_type_converter_parameters!(conf) | ||
super | ||
end | ||
end | ||
|
||
class CSVParser < Fluent::Plugin::CSVParser | ||
include TypeConverterCompatParameters | ||
# TODO: warn when deprecated | ||
def configure(conf) | ||
convert_type_converter_parameters!(conf) | ||
super | ||
end | ||
end | ||
|
||
class NoneParser < Fluent::Plugin::NoneParser | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
array
should be also care?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It should be. I'll add a fix.