Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

1.5 performance regression fixes #2869

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 8 additions & 19 deletions lib/logstash/config/config_ast.rb
Original file line number Diff line number Diff line change
Expand Up @@ -81,16 +81,15 @@ def compile
["filter", "output"].each do |type|
# defines @filter_func and @output_func

definitions << "@#{type}_func = lambda do |event, &block|"
definitions << " events = [event]"
definitions << "def #{type}_func(event)"
definitions << " events = [event]" if type == "filter"
definitions << " @logger.debug? && @logger.debug(\"#{type} received\", :event => event.to_hash)"

sections.select { |s| s.plugin_type.text_value == type }.each do |s|
definitions << s.compile.split("\n", -1).map { |e| " #{e}" }
end

if type == "filter"
definitions << " events.flatten.each{|e| block.call(e) }"
end
definitions << " events" if type == "filter"
definitions << "end"
end

Expand Down Expand Up @@ -211,13 +210,7 @@ def compile
return "start_input(#{variable_name})"
when "filter"
return <<-CODE
events = events.flat_map do |event|
next [] if event.cancelled?

new_events = []
#{variable_name}.filter(event){|new_event| new_events << new_event}
event.cancelled? ? new_events : new_events.unshift(event)
end
events = #{variable_name}.multi_filter(events)
CODE
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the multi_filter method is replacing this generated code, the new method doesn't deal with the cancelled? boolean?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From what I understand, (or think to understand) the cancel logic is here to remove the shutdown event from the queue? Am I right?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, the new code expect events to be a one dimension array, the old code assumed multidimensional array. (see flat_map) I don't know enough about the background of the code to know if it is something we should worry about.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Split ond metrics can emits multiple events ^

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I already explained some of this in the issue comments: the events cancellation is now dealt with in the pipeline code. It is not for the shutdown but when a filter marks an event not to be sent to the output queue... think multiline: it will aggregate events and cancel them until the multiline boundary is reached.

flat_map is not required anymore, previously each filter step was pushing an array in the events array but now only individual events are pushed so flat_map is not needed anymore.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for event cancellation, see bottom of #2869 (comment)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree to remove it, if each plugin emits events and not array.
We should have a clearer deprecation policy for third parties plugins. cc @suyograo

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no, plugins always yield only one event, this has not changed and we do not want to change any API at this point. The proposal for the filter method refactor is in #2872 and we'll do that in 1.6 or 2.0.

the previous generated code, to handle new events emitted by the filter was inserting these new events as an array in the events array, see generated code example in section 1.5.0-rc2 of #2869 (comment)
now, the multi_filter method takes care of that logic but adds individual events not as an array, so flat_map is not required anymore.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it.

when "output"
return "#{variable_name}.handle(event)\n"
Expand Down Expand Up @@ -287,7 +280,7 @@ class Value < RValue; end

module Unicode
def self.wrap(text)
return "(" + text.inspect + ".force_encoding(Encoding::UTF_8)" + ")"
return "(" + text.force_encoding(Encoding::UTF_8).inspect + ")"
end
end

Expand Down Expand Up @@ -364,12 +357,8 @@ def compile
# at the end, events is returned to handle the case where no branch match and no branch code is executed
# so we must make sure to return the current event.

return <<-CODE
events = events.flat_map do |event|
events = [event]
#{super}
end
events
<<-CODE
#{super}
end
CODE
end
Expand Down
33 changes: 19 additions & 14 deletions lib/logstash/event.rb
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,13 @@
class LogStash::ShutdownEvent; end
class LogStash::FlushEvent; end

module LogStash
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd add an extra module level here to add a "subject" example:

module LogStash::Signal
 FLUSH = LogStash::FlushEvent.new
 SHUTDOWN = LogStash::ShutdownEvent.new
end

Then:

@input_to_filter.push(LogStash::Signal::SHUTDOWN)

or

@input_to_filter.push(Signal::SHUTDOWN) since we're already inside the namespace.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good suggestion but unfortunately we cannot do this without updating all plugins which rely on LogStash::SHUTDOWN, see #2863 which is exactly what happened. I suggest you create a new issue and we can tackle this in >= 1.6. It could be done at the same time we do #2872

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hum..around 10 plugins use it https://github.com/search?p=1&q=user%3Alogstash-plugins+LogStash%3A%3ASHUTDOWN&ref=searchresults&type=Code&utf8=%E2%9C%93 (I couldn't find a more accurate search for this)

Alright, lets postpone this to > 1.5

FLUSH = LogStash::FlushEvent.new

# LogStash::SHUTDOWN is used by plugins
SHUTDOWN = LogStash::ShutdownEvent.new
end

# the logstash event object.
#
# An event is simply a tuple of (timestamp, data).
Expand Down Expand Up @@ -48,25 +55,26 @@ class DeprecatedMethod < StandardError; end
TIMESTAMP_FAILURE_TAG = "_timestampparsefailure"
TIMESTAMP_FAILURE_FIELD = "_@timestamp"

METADATA = "@metadata".freeze
METADATA_BRACKETS = "[#{METADATA}]".freeze

# Floats outside of these upper and lower bounds are forcibly converted
# to scientific notation by Float#to_s
MIN_FLOAT_BEFORE_SCI_NOT = 0.0001
MAX_FLOAT_BEFORE_SCI_NOT = 1000000000000000.0

LOGGER = Cabin::Channel.get(LogStash)

public
def initialize(data = {})
@logger = Cabin::Channel.get(LogStash)
@cancelled = false
@data = data
@accessors = LogStash::Util::Accessors.new(data)
@data[VERSION] ||= VERSION_ONE
@data[TIMESTAMP] = init_timestamp(@data[TIMESTAMP])
ts = @data[TIMESTAMP]
@data[TIMESTAMP] = ts ? init_timestamp(ts) : LogStash::Timestamp.now

@metadata = if @data.include?("@metadata")
@data.delete("@metadata")
else
{}
end
@metadata = @data.delete(METADATA) || {}
@metadata_accessors = LogStash::Util::Accessors.new(@metadata)
end # def initialize

Expand Down Expand Up @@ -113,9 +121,6 @@ def ruby_timestamp
raise DeprecatedMethod
end # def unix_timestamp

# field-related access
METADATA = "@metadata".freeze
METADATA_BRACKETS = "[#{METADATA}]".freeze
public
def [](fieldref)
if fieldref.start_with?(METADATA_BRACKETS)
Expand Down Expand Up @@ -267,12 +272,12 @@ def tag(value)

def init_timestamp(o)
begin
timestamp = o ? LogStash::Timestamp.coerce(o) : LogStash::Timestamp.now
timestamp = LogStash::Timestamp.coerce(o)
return timestamp if timestamp

@logger.warn("Unrecognized #{TIMESTAMP} value, setting current time to #{TIMESTAMP}, original in #{TIMESTAMP_FAILURE_FIELD}field", :value => o.inspect)
LOGGER.warn("Unrecognized #{TIMESTAMP} value, setting current time to #{TIMESTAMP}, original in #{TIMESTAMP_FAILURE_FIELD}field", :value => o.inspect)
rescue LogStash::TimestampParserError => e
@logger.warn("Error parsing #{TIMESTAMP} string, setting current time to #{TIMESTAMP}, original in #{TIMESTAMP_FAILURE_FIELD} field", :value => o.inspect, :exception => e.message)
LOGGER.warn("Error parsing #{TIMESTAMP} string, setting current time to #{TIMESTAMP}, original in #{TIMESTAMP_FAILURE_FIELD} field", :value => o.inspect, :exception => e.message)
end

@data["tags"] ||= []
Expand All @@ -287,7 +292,7 @@ def to_hash_with_metadata
if @metadata.nil?
to_hash
else
to_hash.merge("@metadata" => @metadata)
to_hash.merge(METADATA => @metadata)
end
end

Expand Down
19 changes: 19 additions & 0 deletions lib/logstash/filters/base.rb
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# encoding: utf-8
require "logstash/namespace"
require "logstash/event"
require "logstash/logging"
require "logstash/plugin"
require "logstash/config/mixin"
Expand Down Expand Up @@ -145,6 +146,24 @@ def filter(event)
raise "#{self.class}#filter must be overidden"
end # def filter


# in 1.5.0 multi_filter is meant to be used in the generated filter function in LogStash::Config::AST::Plugin only
# and is temporary until we refactor the filter method interface to accept events list and return events list,
# just list in multi_filter see https://github.com/elastic/logstash/issues/2872.
# refactoring the filter method will mean updating all plugins which we want to avoid doing for 1.5.0.
#
# @param events [Array<LogStash::Event] list of events to filter
# @return [Array<LogStash::Event] filtered events and any new events generated by the filter
public
def multi_filter(events)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe apply, filter_array or something else would be a better name for this method?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmmm, I am not in love with multi_filter either but definitely do not like apply and do not really like filter_array... I don't think it better conveys the purpose. other suggestions?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if sticking to multi_filter, I'd prefer filter_multi, b/c we're doing filter on multiple events, not multiple filters

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm, we're actually calling filter multiple times, hence multi_filter... no?

this method is only used in the generated filter function and is ephemeral per #2872 because it will change the signature to handle processing multiple events, can we just keep it as-is knowing it will go away soon? :P

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

result = []
events.each do |event|
result << event
filter(event){|new_event| result << new_event}
end
result
end

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a comment on this? I am not sure what it exactly does?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

totally! on it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

public
def execute(event, &block)
filter(event, &block)
Expand Down
2 changes: 0 additions & 2 deletions lib/logstash/namespace.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,4 @@ module Web; end
module Util; end
module PluginMixins; end
module PluginManager; end

SHUTDOWN = :shutdown
end # module LogStash
25 changes: 8 additions & 17 deletions lib/logstash/pipeline.rb
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,6 @@

class LogStash::Pipeline

FLUSH_EVENT = LogStash::FlushEvent.new

def initialize(configstr)
@logger = Cabin::Channel.get(LogStash)
grammar = LogStashConfigParser.new
Expand Down Expand Up @@ -113,7 +111,7 @@ def wait_inputs

def shutdown_filters
@flusher_lock.synchronize { @flusher_thread.kill }
@input_to_filter.push(LogStash::ShutdownEvent.new)
@input_to_filter.push(LogStash::SHUTDOWN)
end

def wait_filters
Expand All @@ -122,7 +120,7 @@ def wait_filters

def shutdown_outputs
# nothing, filters will do this
@filter_to_output.push(LogStash::ShutdownEvent.new)
@filter_to_output.push(LogStash::SHUTDOWN)
end

def wait_outputs
Expand Down Expand Up @@ -154,7 +152,7 @@ def start_filters
end

@flusher_lock = Mutex.new
@flusher_thread = Thread.new { Stud.interval(5) { @flusher_lock.synchronize { @input_to_filter.push(FLUSH_EVENT) } } }
@flusher_thread = Thread.new { Stud.interval(5) { @flusher_lock.synchronize { @input_to_filter.push(LogStash::FLUSH) } } }
end

def start_outputs
Expand Down Expand Up @@ -203,11 +201,7 @@ def filterworker

case event
when LogStash::Event
# use events array to guarantee ordering of origin vs created events
# where created events are emitted by filters like split or metrics
events = []
filter(event) { |newevent| events << newevent }
events.each { |event| @filter_to_output.push(event) }
filter_func(event).each { |e| @filter_to_output.push(e) unless e.cancelled? }
when LogStash::FlushEvent
# handle filter flushing here so that non threadsafe filters (thus only running one filterworker)
# don't have to deal with thread safety implementing the flush method
Expand All @@ -231,8 +225,8 @@ def outputworker

while true
event = @filter_to_output.pop
break if event.is_a?(LogStash::ShutdownEvent)
output(event)
break if event == LogStash::SHUTDOWN
output_func(event)
end # while true

@outputs.each do |output|
Expand Down Expand Up @@ -271,12 +265,9 @@ def plugin(plugin_type, name, *args)
return klass.new(*args)
end

# for backward compatibility in devutils for the rspec helpers
def filter(event, &block)
@filter_func.call(event, &block)
end

def output(event)
@output_func.call(event)
filter_func(event).each { |e| block.call(e) }
end

# perform filters flush and yeild flushed event to the passed block
Expand Down
14 changes: 10 additions & 4 deletions spec/core/event_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -289,8 +289,7 @@
it "should tag and warn for invalid value" do
ts = LogStash::Timestamp.now
expect(LogStash::Timestamp).to receive(:now).twice.and_return(ts)
expect(Cabin::Channel).to receive(:get).twice.and_return(logger)
expect(logger).to receive(:warn).twice
expect(LogStash::Event::LOGGER).to receive(:warn).twice

event = LogStash::Event.new("@timestamp" => :foo)
expect(event.timestamp.to_i).to eq(ts.to_i)
Expand All @@ -306,8 +305,7 @@
it "should tag and warn for invalid string format" do
ts = LogStash::Timestamp.now
expect(LogStash::Timestamp).to receive(:now).and_return(ts)
expect(Cabin::Channel).to receive(:get).and_return(logger)
expect(logger).to receive(:warn)
expect(LogStash::Event::LOGGER).to receive(:warn)

event = LogStash::Event.new("@timestamp" => "foo")
expect(event.timestamp.to_i).to eq(ts.to_i)
Expand Down Expand Up @@ -400,7 +398,15 @@
expect(subject["foo"]).to eq("bar")
end
end
end

context "signal events" do
it "should define the shutdown event" do
# the SHUTDOWN and FLUSH constants are part of the plugin API contract
# if they are changed, all plugins must be updated
expect(LogStash::SHUTDOWN).to be_a(LogStash::ShutdownEvent)
expect(LogStash::FLUSH).to be_a(LogStash::FlushEvent)
end
end

end
29 changes: 29 additions & 0 deletions spec/filters/base_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,35 @@ def filter(event)
end
end

describe LogStash::Filters::Base do
subject {LogStash::Filters::Base.new({})}

it "should provide method interfaces to override" do
expect{subject.register}.to raise_error(RuntimeError)
expect{subject.filter(:foo)}.to raise_error(RuntimeError)
end

it "should provide class public API" do
[:register, :filter, :multi_filter, :execute, :threadsafe?, :filter_matched, :filter?, :teardown].each do |method|
expect(subject).to respond_to(method)
end
end

it "should multi_filter without new events" do
allow(subject).to receive(:filter) do |event, &block|
nil
end
expect(subject.multi_filter([:foo])).to eq([:foo])
end

it "should multi_filter with new events" do
allow(subject).to receive(:filter) do |event, &block|
block.call(:bar)
end
expect(subject.multi_filter([:foo])).to eq([:foo, :bar])
end
end

describe LogStash::Filters::NOOP do

describe "adding multiple values to one field" do
Expand Down