Skip to content

Commit

Permalink
Add <worker n> section to set a configuration for a specific worker, c…
Browse files Browse the repository at this point in the history
…loses fluent#1392
  • Loading branch information
Yuki Ito committed Mar 17, 2017
1 parent fb76ec5 commit 295dd08
Show file tree
Hide file tree
Showing 4 changed files with 62 additions and 11 deletions.
11 changes: 9 additions & 2 deletions lib/fluent/agent.rb
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ def initialize(log:)
@log = log
@event_router = EventRouter.new(NoMatchMatch.new(log), self)
@error_collector = nil

@worker_id = (ENV['SERVERENGINE_WORKER_ID'] || 0).to_i
end

attr_reader :log
Expand All @@ -62,6 +64,7 @@ def configure(conf)

# initialize <match> and <filter> elements
conf.elements('filter', 'match').each { |e|
next if e.has_target? && !e.is_target?(@worker_id)
pattern = e.arg.empty? ? '**' : e.arg
type = e['@type']
raise ConfigError, "Missing '@type' parameter on <#{e.name}> directive" unless type
Expand Down Expand Up @@ -121,10 +124,12 @@ def lifecycle(desc: false)
end

def add_match(type, pattern, conf)
log.info :worker0, "adding match#{@context.nil? ? '' : " in #{@context}"}", pattern: pattern, type: type
log_type = conf.is_target?(@worker_id) ? :default : :worker0
log.info log_type, "adding match#{@context.nil? ? '' : " in #{@context}"}", pattern: pattern, type: type

output = Plugin.new_output(type)
output.context_router = @event_router
output.system_config_override(workers: 1) if conf.is_target?(@worker_id)
output.configure(conf)
@outputs << output
if output.respond_to?(:outputs) && output.respond_to?(:multi_output?) && output.multi_output?
Expand All @@ -142,10 +147,12 @@ def add_match(type, pattern, conf)
end

def add_filter(type, pattern, conf)
log.info :worker0, "adding filter#{@context.nil? ? '' : " in #{@context}"}", pattern: pattern, type: type
log_type = conf.is_target?(@worker_id) ? :default : :worker0
log.info log_type, "adding filter#{@context.nil? ? '' : " in #{@context}"}", pattern: pattern, type: type

filter = Plugin.new_filter(type)
filter.context_router = @event_router
filter.system_config_override(workers: 1) if conf.is_target?(@worker_id)
filter.configure(conf)
@filters << filter
@event_router.add_rule(pattern, filter)
Expand Down
15 changes: 15 additions & 0 deletions lib/fluent/config/element.rb
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,21 @@ def self.unescape_parameter(v)
v.each_char { |c| result << LiteralParser.unescape_char(c) }
result
end

def set_target_worker_id(worker_id)
@target_worker_id = worker_id
@elements.each { |e|
e.set_target_worker_id(worker_id)
}
end

def has_target?
!!@target_worker_id
end

def is_target?(worker_id)
@target_worker_id == worker_id
end
end
end
end
27 changes: 19 additions & 8 deletions lib/fluent/engine.rb
Original file line number Diff line number Diff line change
Expand Up @@ -97,12 +97,21 @@ def run_configure(conf)
else
"section <#{e.name}> is not used in <#{parent_name}>"
end
$log.warn :worker0, message
if !e.has_target?
$log.warn :worker0, message
elsif e.is_target?(worker_id)
$log.warn message
end
next
end
unless e.name == 'system'
unless @without_source && e.name == 'source'
$log.warn :worker0, "parameter '#{key}' in #{e.to_s.strip} is not used."
message = "parameter '#{key}' in #{e.to_s.strip} is not used."
if !e.has_target?
$log.warn :worker0, message
elsif e.is_target?(worker_id)
$log.warn message
end
end
end
}
Expand All @@ -128,7 +137,7 @@ def configure(conf)

unmatched_tags = Fluent::Log.event_tags.select{|t| !@log_event_router.match?(t) }
unless unmatched_tags.empty?
$log.warn :worker0, "match for some tags of log events are not defined (to be ignored)", tags: unmatched_tags
$log.warn "match for some tags of log events are not defined (to be ignored)", tags: unmatched_tags
end
rescue ArgumentError # ArgumentError "#{label_name} label not found"
# use default event router if <label @FLUENT_LOG> is missing in configuration
Expand All @@ -139,7 +148,7 @@ def configure(conf)

unmatched_tags = Fluent::Log.event_tags.select{|t| !@log_event_router.match?(t) }
unless unmatched_tags.empty?
$log.warn :worker0, "match for some tags of log events are not defined (to be ignored)", tags: unmatched_tags
$log.warn "match for some tags of log events are not defined (to be ignored)", tags: unmatched_tags
end
end
end
Expand Down Expand Up @@ -200,10 +209,6 @@ def log_event_loop
end

def run
# if ENV doesn't have SERVERENGINE_WORKER_ID, it is a worker under --no-supervisor or in tests
# so it's (almost) a single worker, worker_id=0
worker_id = (ENV['SERVERENGINE_WORKER_ID'] || 0).to_i

begin
$log.info "starting fluentd worker", pid: Process.pid, ppid: Process.ppid, worker: worker_id
start
Expand Down Expand Up @@ -260,6 +265,12 @@ def start
def shutdown
@root_agent.shutdown
end

def worker_id
# if ENV doesn't have SERVERENGINE_WORKER_ID, it is a worker under --no-supervisor or in tests
# so it's (almost) a single worker, worker_id=0
(ENV['SERVERENGINE_WORKER_ID'] || 0).to_i
end
end

Engine = EngineClass.new
Expand Down
20 changes: 19 additions & 1 deletion lib/fluent/root_agent.rb
Original file line number Diff line number Diff line change
Expand Up @@ -64,11 +64,26 @@ def initialize(log:, system_config: SystemConfig.new)
attr_reader :labels

def configure(conf)
# initialize <worker> elements
conf.elements(name: 'worker').each do |e|
target_worker_id_str = e.arg
raise ConfigError, "Missing worker id on <worker> directive" if target_worker_id_str.empty?
target_worker_id = target_worker_id_str.to_i
raise ConfigError, "worker#{target_worker_id} specified by <worker> directive doesn't exist" if target_worker_id > (Fluent::Engine.system_config.workers - 1)
e.elements.each do |elem|
raise "<worker> directive can't contain <#{elem.name}> directive" unless ['source', 'match', 'filter', 'label'].include?(elem.name)
elem.set_target_worker_id(target_worker_id)
end
conf += e
end
conf.elements.delete_if{|e| e.name == 'worker'}

error_label_config = nil

# initialize <label> elements before configuring all plugins to avoid 'label not found' in input, filter and output.
label_configs = {}
conf.elements(name: 'label').each { |e|
next if e.has_target? && !e.is_target?(@worker_id)
name = e.arg
raise ConfigError, "Missing symbol argument on <label> directive" if name.empty?

Expand All @@ -90,6 +105,7 @@ def configure(conf)
log.info :worker0, "'--without-source' is applied. Ignore <source> sections"
else
conf.elements(name: 'source').each { |e|
next e.has_target? && !e.is_target?(@worker_id)
type = e['@type']
raise ConfigError, "Missing '@type' parameter on <source> directive" unless type
add_source(type, e)
Expand Down Expand Up @@ -231,13 +247,15 @@ def suppress_interval(interval_time)
end

def add_source(type, conf)
log.info :worker0, "adding source", type: type
log_type = conf.is_target?(@worker_id) ? :default : :worker0
log.info log_type, "adding source", type: type

input = Plugin.new_input(type)
# <source> emits events to the top-level event router (RootAgent#event_router).
# Input#configure overwrites event_router to a label's event_router if it has `@label` parameter.
# See also 'fluentd/plugin/input.rb'
input.context_router = @event_router
input.system_config_override(workers: 1) if conf.is_target?(@worker_id)
input.configure(conf)
@inputs << input

Expand Down

0 comments on commit 295dd08

Please sign in to comment.