Skip to content

Commit

Permalink
Added origins of pipeline's configuration (es config string, the path…
Browse files Browse the repository at this point in the history
…s of config files used, module).

closes 9630

Fixes #11130
  • Loading branch information
andsel committed Sep 12, 2019
1 parent 652e1b7 commit 1cbaeeb
Show file tree
Hide file tree
Showing 6 changed files with 137 additions and 55 deletions.
13 changes: 10 additions & 3 deletions logstash-core/lib/logstash/java_pipeline.rb
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,11 @@ def start
@finished_run.make_true
rescue => e
close
logger.error("Pipeline aborted due to error", default_logging_keys(:exception => e, :backtrace => e.backtrace))
pipeline_log_params = default_logging_keys(
:exception => e,
:backtrace => e.backtrace,
"pipeline.sources" => pipeline_source_details)
logger.error("Pipeline aborted due to error", pipeline_log_params)
ensure
@finished_execution.make_true
end
Expand Down Expand Up @@ -225,11 +229,14 @@ def start_workers
config_metric.gauge(:graph, ::LogStash::Config::LIRSerializer.serialize(lir))
config_metric.gauge(:cluster_uuids, resolve_cluster_uuids)

@logger.info("Starting pipeline", default_logging_keys(
pipeline_log_params = default_logging_keys(
"pipeline.workers" => pipeline_workers,
"pipeline.batch.size" => batch_size,
"pipeline.batch.delay" => batch_delay,
"pipeline.max_inflight" => max_inflight))
"pipeline.max_inflight" => max_inflight,
"pipeline.sources" => pipeline_source_details)
@logger.info("Starting pipeline", pipeline_log_params)

if max_inflight > MAX_INFLIGHT_WARN_THRESHOLD
@logger.warn("CAUTION: Recommended inflight events max exceeded! Logstash will run with up to #{max_inflight} events in memory in your current configuration. If your message sizes are large this may cause instability with the default heap size. Please consider setting a non-standard heap size, changing the batch size (currently #{batch_size}), or changing the number of pipeline workers (currently #{pipeline_workers})", default_logging_keys)
end
Expand Down
16 changes: 11 additions & 5 deletions logstash-core/lib/logstash/pipeline.rb
Original file line number Diff line number Diff line change
Expand Up @@ -164,10 +164,12 @@ def start
collect_stats
collect_dlq_stats

@logger.info("Starting pipeline", default_logging_keys(
"pipeline.workers" => settings.get("pipeline.workers"),
"pipeline.batch.size" => settings.get("pipeline.batch.size"),
"pipeline.batch.delay" => settings.get("pipeline.batch.delay")))
pipeline_log_params = default_logging_keys(
"pipeline.workers" => settings.get("pipeline.workers"),
"pipeline.batch.size" => settings.get("pipeline.batch.size"),
"pipeline.batch.delay" => settings.get("pipeline.batch.delay"),
"pipeline.sources" => pipeline_source_details)
@logger.info("Starting pipeline", pipeline_log_params)

@finished_execution.make_false
@finished_run.make_false
Expand All @@ -180,7 +182,11 @@ def start
@finished_run.make_true
rescue => e
close
@logger.error("Pipeline aborted due to error", default_logging_keys(:exception => e, :backtrace => e.backtrace))
pipeline_log_params = default_logging_keys(
:exception => e,
:backtrace => e.backtrace,
"pipeline.sources" => pipeline_source_details)
@logger.error("Pipeline aborted due to error", pipeline_log_params)
ensure
@finished_execution.make_true
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@
import java.nio.file.Paths;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.UUID;
import org.apache.commons.codec.binary.Hex;
import org.apache.logging.log4j.LogManager;
Expand All @@ -29,6 +31,7 @@
import org.logstash.ackedqueue.ext.JRubyWrappedAckedQueueExt;
import org.logstash.common.DeadLetterQueueFactory;
import org.logstash.common.IncompleteSourceWithMetadataException;
import org.logstash.common.SourceWithMetadata;
import org.logstash.config.ir.ConfigCompiler;
import org.logstash.config.ir.PipelineIR;
import org.logstash.ext.JRubyAbstractQueueWriteClientExt;
Expand Down Expand Up @@ -364,6 +367,35 @@ public final JRubyWrappedWriteClientExt wrappedWriteClient(final ThreadContext c
.initialize(inputQueueClient, pipelineId.asJavaString(), metric, pluginId);
}

@JRubyMethod(name = "pipeline_source_details", visibility = Visibility.PROTECTED)
@SuppressWarnings("rawtypes")
public RubyArray getPipelineSourceDetails(final ThreadContext context) {
RubyArray res = (RubyArray) pipelineSettings.callMethod(context, "config_parts");
List<RubyString> pipelineSources = new ArrayList<>(res.size());
for (IRubyObject part : res.toJavaArray()) {
SourceWithMetadata sourceWithMetadata = part.toJava(SourceWithMetadata.class);
String protocol = sourceWithMetadata.getProtocol();
switch (protocol) {
case "string":
pipelineSources.add(RubyString.newString(context.runtime, "config string"));
break;
case "file":
pipelineSources.add(RubyString.newString(context.runtime, sourceWithMetadata.getId()));
break;
case "x-pack-metrics":
pipelineSources.add(RubyString.newString(context.runtime, "monitoring pipeline"));
break;
case "x-pack-config-management":
pipelineSources.add(RubyString.newString(context.runtime, "central pipeline management"));
break;
case "module":
pipelineSources.add(RubyString.newString(context.runtime, "module"));
break;
}
}
return RubyArray.newArray(context.runtime, pipelineSources);
}

protected final IRubyObject getSetting(final ThreadContext context, final String name) {
return settings.callMethod(context, "get_value", context.runtime.newString(name));
}
Expand Down
47 changes: 0 additions & 47 deletions qa/integration/specs/pipeline_id_log_spec.rb

This file was deleted.

84 changes: 84 additions & 0 deletions qa/integration/specs/pipeline_log_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
require_relative '../framework/fixture'
require_relative '../framework/settings'
require_relative '../services/logstash_service'
require_relative '../framework/helpers'
require "logstash/devutils/rspec/spec_helper"
require "yaml"

describe "Test Logstash Pipeline id" do
before(:all) {
@fixture = Fixture.new(__FILE__)
# used in multiple LS tests
@ls = @fixture.get_service("logstash")
}

after(:all) {
@fixture.teardown
}

before(:each) {
# backup the application settings file -- logstash.yml
FileUtils.cp(@ls.application_settings_file, "#{@ls.application_settings_file}.original")
}

after(:each) {
@ls.teardown
# restore the application settings file -- logstash.yml
FileUtils.mv("#{@ls.application_settings_file}.original", @ls.application_settings_file)
}

let(:temp_dir) { Stud::Temporary.directory("logstash-pipelinelog-test") }
let(:config) { @fixture.config("root") }
let(:initial_config_file) { config_to_temp_file(@fixture.config("root")) }

it "should write logs with pipeline.id" do
pipeline_name = "custom_pipeline"
settings = {
"path.logs" => temp_dir,
"pipeline.id" => pipeline_name
}
IO.write(@ls.application_settings_file, settings.to_yaml)
@ls.spawn_logstash("-w", "1" , "-e", config)
wait_logstash_process_terminate()
plainlog_file = "#{temp_dir}/logstash-plain.log"
expect(File.exists?(plainlog_file)).to be true
expect(IO.read(plainlog_file) =~ /\[logstash.javapipeline\s*\]\[#{pipeline_name}\]/).to be > 0
end

it "write pipeline config in logs - source:config string" do
pipeline_name = "custom_pipeline"
settings = {
"path.logs" => temp_dir,
"pipeline.id" => pipeline_name
}
IO.write(@ls.application_settings_file, settings.to_yaml)
@ls.spawn_logstash("-w", "1" , "-e", config)
wait_logstash_process_terminate()
plainlog_file = "#{temp_dir}/logstash-plain.log"
expect(File.exists?(plainlog_file)).to be true
expect(IO.read(plainlog_file) =~ /Starting pipeline.*"pipeline.sources"=>\["config string"\]/).to be > 0
end

it "write pipeline config in logs - source:config file" do
pipeline_name = "custom_pipeline"
settings = {
"path.logs" => temp_dir,
"pipeline.id" => pipeline_name
}
IO.write(@ls.application_settings_file, settings.to_yaml)
@ls.spawn_logstash("-w", "1", "-f", "#{initial_config_file}")
wait_logstash_process_terminate()
plainlog_file = "#{temp_dir}/logstash-plain.log"
expect(File.exists?(plainlog_file)).to be true
expect(IO.read(plainlog_file) =~ /Starting pipeline.*"pipeline.sources"=>\["#{initial_config_file}"\]/).to be > 0
end

@private
def wait_logstash_process_terminate
num_retries = 100
try(num_retries) do
expect(@ls.exited?).to be(true)
end
expect(@ls.exit_code).to be(0)
end
end

0 comments on commit 1cbaeeb

Please sign in to comment.