From 1d3c5395e7367396de3b58d6f0cafbaab5121b25 Mon Sep 17 00:00:00 2001 From: andsel Date: Fri, 23 Aug 2019 17:32:45 +0200 Subject: [PATCH] Added plugin.id to fish tag log lines related to plugins --- config/log4j2.properties | 6 +- logstash-core/lib/logstash/java_pipeline.rb | 1 + logstash-core/lib/logstash/pipeline.rb | 1 + logstash-core/spec/logstash/pipeline_spec.rb | 16 +++++- .../config/ir/compiler/DatasetCompiler.java | 31 +++++++++-- .../ir/compiler/FilterDelegatorExt.java | 22 ++++++-- .../ir/compiler/JavaInputDelegatorExt.java | 1 + .../ir/compiler/OutputDelegatorExt.java | 7 ++- .../config/ir/CompiledPipelineTest.java | 4 +- .../ir/PluginConfigNameMethodDouble.java | 55 +++++++++++++++++++ .../fixtures/plugin_name_log_spec.yml | 18 ++++++ qa/integration/specs/plugin_name_log_spec.rb | 55 +++++++++++++++++++ 12 files changed, 202 insertions(+), 15 deletions(-) create mode 100644 logstash-core/src/test/java/org/logstash/config/ir/PluginConfigNameMethodDouble.java create mode 100644 qa/integration/fixtures/plugin_name_log_spec.yml create mode 100644 qa/integration/specs/plugin_name_log_spec.rb diff --git a/config/log4j2.properties b/config/log4j2.properties index 19ec491ba8b..68dd142408f 100644 --- a/config/log4j2.properties +++ b/config/log4j2.properties @@ -4,7 +4,7 @@ name = LogstashPropertiesConfig appender.console.type = Console appender.console.name = plain_console appender.console.layout.type = PatternLayout -appender.console.layout.pattern = [%d{ISO8601}][%-5p][%-25c]%notEmpty{[%X{pipeline.id}]} %m%n +appender.console.layout.pattern = [%d{ISO8601}][%-5p][%-25c]%notEmpty{[%X{pipeline.id}]}%notEmpty{[%X{plugin.id}]} %m%n appender.json_console.type = Console appender.json_console.name = json_console @@ -21,7 +21,7 @@ appender.rolling.policies.time.type = TimeBasedTriggeringPolicy appender.rolling.policies.time.interval = 1 appender.rolling.policies.time.modulate = true appender.rolling.layout.type = PatternLayout -appender.rolling.layout.pattern = [%d{ISO8601}][%-5p][%-25c]%notEmpty{[%X{pipeline.id}]} %m%n +appender.rolling.layout.pattern = [%d{ISO8601}][%-5p][%-25c]%notEmpty{[%X{pipeline.id}]}%notEmpty{[%X{plugin.id}]} %m%n appender.rolling.policies.size.type = SizeBasedTriggeringPolicy appender.rolling.policies.size.size = 100MB appender.rolling.strategy.type = DefaultRolloverStrategy @@ -144,7 +144,7 @@ appender.deprecation_rolling.policies.time.type = TimeBasedTriggeringPolicy appender.deprecation_rolling.policies.time.interval = 1 appender.deprecation_rolling.policies.time.modulate = true appender.deprecation_rolling.layout.type = PatternLayout -appender.deprecation_rolling.layout.pattern = [%d{ISO8601}][%-5p][%-25c]%notEmpty{[%X{pipeline.id}]} %m%n +appender.deprecation_rolling.layout.pattern = [%d{ISO8601}][%-5p][%-25c]%notEmpty{[%X{pipeline.id}]}%notEmpty{[%X{plugin.id}]} %m%n appender.deprecation_rolling.policies.size.type = SizeBasedTriggeringPolicy appender.deprecation_rolling.policies.size.size = 100MB appender.deprecation_rolling.strategy.type = DefaultRolloverStrategy diff --git a/logstash-core/lib/logstash/java_pipeline.rb b/logstash-core/lib/logstash/java_pipeline.rb index dbc0a64419d..c05f8bcf1ce 100644 --- a/logstash-core/lib/logstash/java_pipeline.rb +++ b/logstash-core/lib/logstash/java_pipeline.rb @@ -324,6 +324,7 @@ def start_input(plugin) def inputworker(plugin) Util::set_thread_name("[#{pipeline_id}]<#{plugin.class.config_name}") ThreadContext.put("pipeline.id", pipeline_id) + ThreadContext.put("plugin.id", plugin.id) begin plugin.run(wrapped_write_client(plugin.id.to_sym)) rescue => e diff --git a/logstash-core/lib/logstash/pipeline.rb b/logstash-core/lib/logstash/pipeline.rb index f182c49cdf4..2c17918306b 100644 --- a/logstash-core/lib/logstash/pipeline.rb +++ b/logstash-core/lib/logstash/pipeline.rb @@ -445,6 +445,7 @@ def start_input(plugin) def inputworker(plugin) Util::set_thread_name("[#{pipeline_id}]<#{plugin.class.config_name}") ThreadContext.put("pipeline.id", pipeline_id) + ThreadContext.put("plugin.id", plugin.id) begin plugin.run(wrapped_write_client(plugin.id.to_sym)) rescue => e diff --git a/logstash-core/spec/logstash/pipeline_spec.rb b/logstash-core/spec/logstash/pipeline_spec.rb index 12dd306e7e5..71b07a48427 100644 --- a/logstash-core/spec/logstash/pipeline_spec.rb +++ b/logstash-core/spec/logstash/pipeline_spec.rb @@ -15,6 +15,7 @@ def register end def run(queue) + @logger.debug("check log4j fish tagging input plugin") end def close @@ -73,7 +74,9 @@ class DummyFilter < LogStash::Filters::Base def register() end - def filter(event) end + def filter(event) + @logger.debug("check log4j fish tagging filter plugin") + end def threadsafe?() false; end @@ -243,6 +246,7 @@ def flush(options) before do expect(::LogStash::Pipeline).to receive(:logger).and_return(logger) allow(logger).to receive(:debug?).and_return(true) + allow_any_instance_of(DummyFilter).to receive(:logger).and_return(logger) end it "should not receive a debug message with the compiled code" do @@ -266,8 +270,18 @@ def flush(options) pipeline.filter_func([LogStash::Event.new]) pipeline.close end + + it "should log fish tagging of plugins" do + pipeline_settings_obj.set("config.debug", true) + pipeline = mock_pipeline_from_string(test_config_with_filters, pipeline_settings_obj) + expect(logger).to receive(:debug).with(/filter received/, anything) + expect(logger).to receive(:debug).with(/[dummyfilter]/) + pipeline.filter_func([LogStash::Event.new]) + pipeline.close + end end + context "when there is no command line -w N set" do it "starts one filter thread" do msg = "Defaulting pipeline worker threads to 1 because there are some filters that might not work with multiple worker threads" diff --git a/logstash-core/src/main/java/org/logstash/config/ir/compiler/DatasetCompiler.java b/logstash-core/src/main/java/org/logstash/config/ir/compiler/DatasetCompiler.java index 5cd4b65cd40..d660ac7ceb4 100644 --- a/logstash-core/src/main/java/org/logstash/config/ir/compiler/DatasetCompiler.java +++ b/logstash-core/src/main/java/org/logstash/config/ir/compiler/DatasetCompiler.java @@ -152,7 +152,9 @@ public static ComputeStepSyntaxElement outputDataset(final Collection parentFields = parents.stream().map(fields::add).collect(Collectors.toList()); @@ -167,8 +169,10 @@ public static ComputeStepSyntaxElement outputDataset(final Collection "org.apache.logging.log4j.ThreadContext.remove(\"plugin.id\")"; + } + + private static MethodLevelSyntaxElement setPluginIdForLog4j(final AbstractFilterDelegatorExt filterPlugin) { + final IRubyObject pluginId = filterPlugin.getId(); + return generateLog4jContextAssignment(pluginId); + } + + private static MethodLevelSyntaxElement setPluginIdForLog4j(final AbstractOutputDelegatorExt outputPlugin) { + final IRubyObject pluginId = outputPlugin.getId(); + return generateLog4jContextAssignment(pluginId); + } + + private static MethodLevelSyntaxElement generateLog4jContextAssignment(IRubyObject pluginId) { + return () -> "org.apache.logging.log4j.ThreadContext.put(\"plugin.id\", \"" + pluginId + "\")"; + } + private static MethodLevelSyntaxElement clear(final ValueSyntaxElement field) { return field.call("clear"); } diff --git a/logstash-core/src/main/java/org/logstash/config/ir/compiler/FilterDelegatorExt.java b/logstash-core/src/main/java/org/logstash/config/ir/compiler/FilterDelegatorExt.java index 6a39c9e062c..18a44400feb 100644 --- a/logstash-core/src/main/java/org/logstash/config/ir/compiler/FilterDelegatorExt.java +++ b/logstash-core/src/main/java/org/logstash/config/ir/compiler/FilterDelegatorExt.java @@ -5,17 +5,21 @@ import org.jruby.RubyArray; import org.jruby.RubyClass; import org.jruby.RubyHash; +import org.jruby.RubyObject; import org.jruby.RubyString; import org.jruby.anno.JRubyClass; import org.jruby.anno.JRubyMethod; import org.jruby.internal.runtime.methods.DynamicMethod; import org.jruby.runtime.ThreadContext; import org.jruby.runtime.builtin.IRubyObject; -import org.logstash.RubyUtil; import org.logstash.execution.WorkerLoop; import org.logstash.instrument.metrics.AbstractNamespacedMetricExt; import org.logstash.instrument.metrics.counter.LongCounter; +import java.util.UUID; + +import static org.logstash.RubyUtil.RUBY; + @JRubyClass(name = "FilterDelegator") public final class FilterDelegatorExt extends AbstractFilterDelegatorExt { @@ -44,13 +48,15 @@ public IRubyObject initialize(final ThreadContext context, final IRubyObject fil } @VisibleForTesting - public FilterDelegatorExt initForTesting(final IRubyObject filter) { + public FilterDelegatorExt initForTesting(final IRubyObject filter, RubyObject configNameDouble) { eventMetricOut = LongCounter.DUMMY_COUNTER; eventMetricIn = LongCounter.DUMMY_COUNTER; eventMetricTime = LongCounter.DUMMY_COUNTER; this.filter = filter; filterMethod = filter.getMetaClass().searchMethod(FILTER_METHOD_NAME); flushes = filter.respondsTo("flush"); + filterClass = configNameDouble.getType(); + id = RUBY.newString(UUID.randomUUID().toString()); return this; } @@ -96,8 +102,14 @@ protected IRubyObject getConfigName(final ThreadContext context) { @Override @SuppressWarnings({"rawtypes"}) protected RubyArray doMultiFilter(final RubyArray batch) { - return (RubyArray) filterMethod.call( - WorkerLoop.THREAD_CONTEXT.get(), filter, filterClass, FILTER_METHOD_NAME, batch); + final IRubyObject pluginId = this.getId(); + org.apache.logging.log4j.ThreadContext.put("plugin.id", pluginId.toString()); + try { + return (RubyArray) filterMethod.call( + WorkerLoop.THREAD_CONTEXT.get(), filter, filterClass, FILTER_METHOD_NAME, batch); + } finally { + org.apache.logging.log4j.ThreadContext.remove("plugin.id"); + } } @Override @@ -112,6 +124,6 @@ protected boolean getHasFlush() { @Override protected boolean getPeriodicFlush() { - return filter.callMethod(RubyUtil.RUBY.getCurrentContext(), "periodic_flush").isTrue(); + return filter.callMethod(RUBY.getCurrentContext(), "periodic_flush").isTrue(); } } diff --git a/logstash-core/src/main/java/org/logstash/config/ir/compiler/JavaInputDelegatorExt.java b/logstash-core/src/main/java/org/logstash/config/ir/compiler/JavaInputDelegatorExt.java index 6134a69d215..fecfc605538 100644 --- a/logstash-core/src/main/java/org/logstash/config/ir/compiler/JavaInputDelegatorExt.java +++ b/logstash-core/src/main/java/org/logstash/config/ir/compiler/JavaInputDelegatorExt.java @@ -61,6 +61,7 @@ public IRubyObject start(final ThreadContext context) { } Thread t = new Thread(() -> { org.apache.logging.log4j.ThreadContext.put("pipeline.id", pipeline.pipelineId().toString()); + org.apache.logging.log4j.ThreadContext.put("plugin.id", this.getId(context).toString()); input.start(queueWriter::push); }); t.setName(pipeline.pipelineId().asJavaString() + "_" + input.getName() + "_" + input.getId()); diff --git a/logstash-core/src/main/java/org/logstash/config/ir/compiler/OutputDelegatorExt.java b/logstash-core/src/main/java/org/logstash/config/ir/compiler/OutputDelegatorExt.java index 2e6c5516c11..28db76e9cde 100644 --- a/logstash-core/src/main/java/org/logstash/config/ir/compiler/OutputDelegatorExt.java +++ b/logstash-core/src/main/java/org/logstash/config/ir/compiler/OutputDelegatorExt.java @@ -17,7 +17,8 @@ import org.logstash.instrument.metrics.AbstractMetricExt; @JRubyClass(name = "OutputDelegator") -public final class OutputDelegatorExt extends AbstractOutputDelegatorExt { +public final class +OutputDelegatorExt extends AbstractOutputDelegatorExt { private static final long serialVersionUID = 1L; @@ -75,9 +76,13 @@ protected IRubyObject getConcurrency(final ThreadContext context) { @Override protected void doOutput(final Collection batch) { try { + final IRubyObject pluginId = this.getId(); + org.apache.logging.log4j.ThreadContext.put("plugin.id", pluginId.toString()); strategy.multiReceive(WorkerLoop.THREAD_CONTEXT.get(), (IRubyObject) batch); } catch (final InterruptedException ex) { throw new IllegalStateException(ex); + } finally { + org.apache.logging.log4j.ThreadContext.remove("plugin.id"); } } diff --git a/logstash-core/src/test/java/org/logstash/config/ir/CompiledPipelineTest.java b/logstash-core/src/test/java/org/logstash/config/ir/CompiledPipelineTest.java index 32f3c2ba4fa..06af6f68485 100644 --- a/logstash-core/src/test/java/org/logstash/config/ir/CompiledPipelineTest.java +++ b/logstash-core/src/test/java/org/logstash/config/ir/CompiledPipelineTest.java @@ -14,6 +14,7 @@ import java.util.function.Supplier; import org.hamcrest.CoreMatchers; import org.hamcrest.MatcherAssert; +import org.jruby.RubyObject; import org.jruby.RubyString; import org.jruby.runtime.builtin.IRubyObject; import org.junit.After; @@ -490,9 +491,10 @@ public AbstractOutputDelegatorExt buildOutput(final RubyString name, SourceWithM @Override public AbstractFilterDelegatorExt buildFilter(final RubyString name, SourceWithMetadata source, final IRubyObject args, Map pluginArgs) { + final RubyObject configNameDouble = org.logstash.config.ir.PluginConfigNameMethodDouble.create(name); return new FilterDelegatorExt( RubyUtil.RUBY, RubyUtil.FILTER_DELEGATOR_CLASS) - .initForTesting(setupPlugin(name, filters)); + .initForTesting(setupPlugin(name, filters), configNameDouble); } @Override diff --git a/logstash-core/src/test/java/org/logstash/config/ir/PluginConfigNameMethodDouble.java b/logstash-core/src/test/java/org/logstash/config/ir/PluginConfigNameMethodDouble.java new file mode 100644 index 00000000000..2e9b2590bcc --- /dev/null +++ b/logstash-core/src/test/java/org/logstash/config/ir/PluginConfigNameMethodDouble.java @@ -0,0 +1,55 @@ +package org.logstash.config.ir; + +import org.jruby.Ruby; +import org.jruby.RubyClass; +import org.jruby.RubyObject; +import org.jruby.RubyString; +import org.jruby.anno.JRubyClass; +import org.jruby.anno.JRubyMethod; +import org.jruby.runtime.ThreadContext; +import org.jruby.runtime.builtin.IRubyObject; + +import static org.logstash.RubyUtil.RUBY; + +/** + * Fake class to double the Ruby's method config_name() + * */ +@JRubyClass(name = "PluginConfigNameMethodDouble") +public class PluginConfigNameMethodDouble extends RubyObject { + + private static final long serialVersionUID = 1L; + + static final RubyClass RUBY_META_CLASS; + private RubyString filterName; + + static { + RUBY_META_CLASS = RUBY.defineClass("PluginConfigNameMethodDouble", RUBY.getObject(), + PluginConfigNameMethodDouble::new); + RUBY_META_CLASS.defineAnnotatedMethods(org.logstash.config.ir.compiler.FakeOutClass.class); + } + + PluginConfigNameMethodDouble(final Ruby runtime, final RubyClass metaClass) { + super(runtime, metaClass); + } + + static PluginConfigNameMethodDouble create(RubyString filterName) { + final PluginConfigNameMethodDouble instance = new PluginConfigNameMethodDouble(RUBY, RUBY_META_CLASS); + instance.filterName = filterName; + return instance; + } + + @JRubyMethod + public IRubyObject name(final ThreadContext context) { + return RUBY.newString("example"); + } + + @JRubyMethod(name = "config_name") + public IRubyObject configName(final ThreadContext context, final IRubyObject recv) { + return filterName; + } + + @JRubyMethod + public IRubyObject initialize(final ThreadContext context, final IRubyObject args) { + return this; + } +} \ No newline at end of file diff --git a/qa/integration/fixtures/plugin_name_log_spec.yml b/qa/integration/fixtures/plugin_name_log_spec.yml new file mode 100644 index 00000000000..f21a5d8e04f --- /dev/null +++ b/qa/integration/fixtures/plugin_name_log_spec.yml @@ -0,0 +1,18 @@ +--- +services: + - logstash +config: |- + input { + generator { + count => 4 + } + } + filter { + sleep { + id => "sleep_filter_123" + time => 1 + } + } + output { + null {} + } diff --git a/qa/integration/specs/plugin_name_log_spec.rb b/qa/integration/specs/plugin_name_log_spec.rb new file mode 100644 index 00000000000..23e477bf334 --- /dev/null +++ b/qa/integration/specs/plugin_name_log_spec.rb @@ -0,0 +1,55 @@ +require_relative '../framework/fixture' +require_relative '../framework/settings' +require_relative '../services/logstash_service' +require_relative '../framework/helpers' +require "logstash/devutils/rspec/spec_helper" +require "yaml" + +describe "Test Logstash Pipeline id" do + before(:all) { + @fixture = Fixture.new(__FILE__) + # used in multiple LS tests + @ls = @fixture.get_service("logstash") + } + + after(:all) { + @fixture.teardown + } + + before(:each) { + # backup the application settings file -- logstash.yml + FileUtils.cp(@ls.application_settings_file, "#{@ls.application_settings_file}.original") + } + + after(:each) { + @ls.teardown + # restore the application settings file -- logstash.yml + FileUtils.mv("#{@ls.application_settings_file}.original", @ls.application_settings_file) + } + + let(:temp_dir) { Stud::Temporary.directory("logstash-pipelinelog-test") } + let(:config) { @fixture.config("root") } + + it "should write logs with plugin name" do + settings = { + "path.logs" => temp_dir, + "log.level" => "debug" + } + IO.write(@ls.application_settings_file, settings.to_yaml) + @ls.spawn_logstash("-w", "1" , "-e", config) + wait_logstash_process_terminate() + plainlog_file = "#{temp_dir}/logstash-plain.log" + expect(File.exists?(plainlog_file)).to be true + #We know taht sleep plugin log debug lines + expect(IO.read(plainlog_file) =~ /\[sleep_filter_123\] Sleeping {:delay=>1}/).to be > 0 + end + + @private + def wait_logstash_process_terminate + num_retries = 100 + try(num_retries) do + expect(@ls.exited?).to be(true) + end + expect(@ls.exit_code).to be(0) + end +end