diff --git a/logstash-core/lib/logstash/java_pipeline.rb b/logstash-core/lib/logstash/java_pipeline.rb index 4616ee79251..7ebd6b9715b 100644 --- a/logstash-core/lib/logstash/java_pipeline.rb +++ b/logstash-core/lib/logstash/java_pipeline.rb @@ -109,7 +109,11 @@ def start @finished_run.make_true rescue => e close - logger.error("Pipeline aborted due to error", default_logging_keys(:exception => e, :backtrace => e.backtrace)) + pipeline_log_params = default_logging_keys( + :exception => e, + :backtrace => e.backtrace, + "pipeline.sources" => pipeline_source_details) + logger.error("Pipeline aborted due to error", pipeline_log_params) ensure @finished_execution.make_true end @@ -225,11 +229,14 @@ def start_workers config_metric.gauge(:graph, ::LogStash::Config::LIRSerializer.serialize(lir)) config_metric.gauge(:cluster_uuids, resolve_cluster_uuids) - @logger.info("Starting pipeline", default_logging_keys( + pipeline_log_params = default_logging_keys( "pipeline.workers" => pipeline_workers, "pipeline.batch.size" => batch_size, "pipeline.batch.delay" => batch_delay, - "pipeline.max_inflight" => max_inflight)) + "pipeline.max_inflight" => max_inflight, + "pipeline.sources" => pipeline_source_details) + @logger.info("Starting pipeline", pipeline_log_params) + if max_inflight > MAX_INFLIGHT_WARN_THRESHOLD @logger.warn("CAUTION: Recommended inflight events max exceeded! Logstash will run with up to #{max_inflight} events in memory in your current configuration. If your message sizes are large this may cause instability with the default heap size. Please consider setting a non-standard heap size, changing the batch size (currently #{batch_size}), or changing the number of pipeline workers (currently #{pipeline_workers})", default_logging_keys) end diff --git a/logstash-core/lib/logstash/pipeline.rb b/logstash-core/lib/logstash/pipeline.rb index e9eb3d889f3..b6b818ea53e 100644 --- a/logstash-core/lib/logstash/pipeline.rb +++ b/logstash-core/lib/logstash/pipeline.rb @@ -164,10 +164,12 @@ def start collect_stats collect_dlq_stats - @logger.info("Starting pipeline", default_logging_keys( - "pipeline.workers" => settings.get("pipeline.workers"), - "pipeline.batch.size" => settings.get("pipeline.batch.size"), - "pipeline.batch.delay" => settings.get("pipeline.batch.delay"))) + pipeline_log_params = default_logging_keys( + "pipeline.workers" => settings.get("pipeline.workers"), + "pipeline.batch.size" => settings.get("pipeline.batch.size"), + "pipeline.batch.delay" => settings.get("pipeline.batch.delay"), + "pipeline.sources" => pipeline_source_details) + @logger.info("Starting pipeline", pipeline_log_params) @finished_execution.make_false @finished_run.make_false @@ -180,7 +182,11 @@ def start @finished_run.make_true rescue => e close - @logger.error("Pipeline aborted due to error", default_logging_keys(:exception => e, :backtrace => e.backtrace)) + pipeline_log_params = default_logging_keys( + :exception => e, + :backtrace => e.backtrace, + "pipeline.sources" => pipeline_source_details) + @logger.error("Pipeline aborted due to error", pipeline_log_params) ensure @finished_execution.make_true end diff --git a/logstash-core/src/main/java/org/logstash/execution/AbstractPipelineExt.java b/logstash-core/src/main/java/org/logstash/execution/AbstractPipelineExt.java index 2c6cd93f616..b726ff4a84c 100644 --- a/logstash-core/src/main/java/org/logstash/execution/AbstractPipelineExt.java +++ b/logstash-core/src/main/java/org/logstash/execution/AbstractPipelineExt.java @@ -6,7 +6,9 @@ import java.nio.file.Paths; import java.security.MessageDigest; import java.security.NoSuchAlgorithmException; +import java.util.ArrayList; import java.util.Arrays; +import java.util.List; import java.util.UUID; import org.apache.commons.codec.binary.Hex; import org.apache.logging.log4j.LogManager; @@ -29,6 +31,7 @@ import org.logstash.ackedqueue.ext.JRubyWrappedAckedQueueExt; import org.logstash.common.DeadLetterQueueFactory; import org.logstash.common.IncompleteSourceWithMetadataException; +import org.logstash.common.SourceWithMetadata; import org.logstash.config.ir.ConfigCompiler; import org.logstash.config.ir.PipelineIR; import org.logstash.ext.JRubyAbstractQueueWriteClientExt; @@ -364,6 +367,35 @@ public final JRubyWrappedWriteClientExt wrappedWriteClient(final ThreadContext c .initialize(inputQueueClient, pipelineId.asJavaString(), metric, pluginId); } + @JRubyMethod(name = "pipeline_source_details", visibility = Visibility.PROTECTED) + @SuppressWarnings("rawtypes") + public RubyArray getPipelineSourceDetails(final ThreadContext context) { + RubyArray res = (RubyArray) pipelineSettings.callMethod(context, "config_parts"); + List pipelineSources = new ArrayList<>(res.size()); + for (IRubyObject part : res.toJavaArray()) { + SourceWithMetadata sourceWithMetadata = part.toJava(SourceWithMetadata.class); + String protocol = sourceWithMetadata.getProtocol(); + switch (protocol) { + case "string": + pipelineSources.add(RubyString.newString(context.runtime, "config string")); + break; + case "file": + pipelineSources.add(RubyString.newString(context.runtime, sourceWithMetadata.getId())); + break; + case "x-pack-metrics": + pipelineSources.add(RubyString.newString(context.runtime, "monitoring pipeline")); + break; + case "x-pack-config-management": + pipelineSources.add(RubyString.newString(context.runtime, "central pipeline management")); + break; + case "module": + pipelineSources.add(RubyString.newString(context.runtime, "module")); + break; + } + } + return RubyArray.newArray(context.runtime, pipelineSources); + } + protected final IRubyObject getSetting(final ThreadContext context, final String name) { return settings.callMethod(context, "get_value", context.runtime.newString(name)); } diff --git a/qa/integration/fixtures/pipeline_id_log_spec.yml b/qa/integration/fixtures/pipeline_log_spec.yml similarity index 100% rename from qa/integration/fixtures/pipeline_id_log_spec.yml rename to qa/integration/fixtures/pipeline_log_spec.yml diff --git a/qa/integration/specs/pipeline_id_log_spec.rb b/qa/integration/specs/pipeline_id_log_spec.rb deleted file mode 100644 index b9da009e08f..00000000000 --- a/qa/integration/specs/pipeline_id_log_spec.rb +++ /dev/null @@ -1,47 +0,0 @@ -require_relative '../framework/fixture' -require_relative '../framework/settings' -require_relative '../services/logstash_service' -require_relative '../framework/helpers' -require "logstash/devutils/rspec/spec_helper" -require "yaml" - -describe "Test Logstash Pipeline id" do - before(:all) { - @fixture = Fixture.new(__FILE__) - # used in multiple LS tests - @ls = @fixture.get_service("logstash") - } - - after(:all) { - @fixture.teardown - } - - before(:each) { - # backup the application settings file -- logstash.yml - FileUtils.cp(@ls.application_settings_file, "#{@ls.application_settings_file}.original") - } - - after(:each) { - @ls.teardown - # restore the application settings file -- logstash.yml - FileUtils.mv("#{@ls.application_settings_file}.original", @ls.application_settings_file) - } - - let(:temp_dir) { Stud::Temporary.directory("logstash-pipelinelog-test") } - let(:config) { @fixture.config("root") } - - it "should write logs with pipeline.id" do - pipeline_name = "custom_pipeline" - settings = { - "path.logs" => temp_dir, - "pipeline.id" => pipeline_name - } - IO.write(@ls.application_settings_file, settings.to_yaml) - @ls.spawn_logstash("-w", "1" , "-e", config) - @ls.wait_for_logstash - sleep 2 until @ls.exited? - plainlog_file = "#{temp_dir}/logstash-plain.log" - expect(File.exists?(plainlog_file)).to be true - expect(IO.read(plainlog_file) =~ /\[logstash.javapipeline\s*\]\[#{pipeline_name}\]/).to be > 0 - end -end diff --git a/qa/integration/specs/pipeline_log_spec.rb b/qa/integration/specs/pipeline_log_spec.rb new file mode 100644 index 00000000000..a8da84eed30 --- /dev/null +++ b/qa/integration/specs/pipeline_log_spec.rb @@ -0,0 +1,84 @@ +require_relative '../framework/fixture' +require_relative '../framework/settings' +require_relative '../services/logstash_service' +require_relative '../framework/helpers' +require "logstash/devutils/rspec/spec_helper" +require "yaml" + +describe "Test Logstash Pipeline id" do + before(:all) { + @fixture = Fixture.new(__FILE__) + # used in multiple LS tests + @ls = @fixture.get_service("logstash") + } + + after(:all) { + @fixture.teardown + } + + before(:each) { + # backup the application settings file -- logstash.yml + FileUtils.cp(@ls.application_settings_file, "#{@ls.application_settings_file}.original") + } + + after(:each) { + @ls.teardown + # restore the application settings file -- logstash.yml + FileUtils.mv("#{@ls.application_settings_file}.original", @ls.application_settings_file) + } + + let(:temp_dir) { Stud::Temporary.directory("logstash-pipelinelog-test") } + let(:config) { @fixture.config("root") } + let(:initial_config_file) { config_to_temp_file(@fixture.config("root")) } + + it "should write logs with pipeline.id" do + pipeline_name = "custom_pipeline" + settings = { + "path.logs" => temp_dir, + "pipeline.id" => pipeline_name + } + IO.write(@ls.application_settings_file, settings.to_yaml) + @ls.spawn_logstash("-w", "1" , "-e", config) + wait_logstash_process_terminate() + plainlog_file = "#{temp_dir}/logstash-plain.log" + expect(File.exists?(plainlog_file)).to be true + expect(IO.read(plainlog_file) =~ /\[logstash.javapipeline\s*\]\[#{pipeline_name}\]/).to be > 0 + end + + it "write pipeline config in logs - source:config string" do + pipeline_name = "custom_pipeline" + settings = { + "path.logs" => temp_dir, + "pipeline.id" => pipeline_name + } + IO.write(@ls.application_settings_file, settings.to_yaml) + @ls.spawn_logstash("-w", "1" , "-e", config) + wait_logstash_process_terminate() + plainlog_file = "#{temp_dir}/logstash-plain.log" + expect(File.exists?(plainlog_file)).to be true + expect(IO.read(plainlog_file) =~ /Starting pipeline.*"pipeline.sources"=>\["config string"\]/).to be > 0 + end + + it "write pipeline config in logs - source:config file" do + pipeline_name = "custom_pipeline" + settings = { + "path.logs" => temp_dir, + "pipeline.id" => pipeline_name + } + IO.write(@ls.application_settings_file, settings.to_yaml) + @ls.spawn_logstash("-w", "1", "-f", "#{initial_config_file}") + wait_logstash_process_terminate() + plainlog_file = "#{temp_dir}/logstash-plain.log" + expect(File.exists?(plainlog_file)).to be true + expect(IO.read(plainlog_file) =~ /Starting pipeline.*"pipeline.sources"=>\["#{initial_config_file}"\]/).to be > 0 + end + + @private + def wait_logstash_process_terminate + num_retries = 100 + try(num_retries) do + expect(@ls.exited?).to be(true) + end + expect(@ls.exit_code).to be(0) + end +end