Skip to content

Commit

Permalink
Add pipeline.id to log lines fixes #8290, #10521
Browse files Browse the repository at this point in the history
Fixes #11075
  • Loading branch information
andsel authored and jsvd committed Aug 29, 2019
1 parent 6d140f0 commit a3ac21d
Show file tree
Hide file tree
Showing 8 changed files with 84 additions and 6 deletions.
3 changes: 3 additions & 0 deletions config/jvm.options
Original file line number Diff line number Diff line change
Expand Up @@ -76,3 +76,6 @@

# Entropy source for randomness
-Djava.security.egd=file:/dev/urandom

# Copy the logging context from parent threads to children
-Dlog4j2.isThreadContextMapInheritable=true
4 changes: 2 additions & 2 deletions config/log4j2.properties
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ name = LogstashPropertiesConfig
appender.console.type = Console
appender.console.name = plain_console
appender.console.layout.type = PatternLayout
appender.console.layout.pattern = [%d{ISO8601}][%-5p][%-25c] %m%n
appender.console.layout.pattern = [%d{ISO8601}][%-5p][%-25c]%notEmpty{[%X{pipeline.id}]} %m%n

appender.json_console.type = Console
appender.json_console.name = json_console
Expand All @@ -21,7 +21,7 @@ appender.rolling.policies.time.type = TimeBasedTriggeringPolicy
appender.rolling.policies.time.interval = 1
appender.rolling.policies.time.modulate = true
appender.rolling.layout.type = PatternLayout
appender.rolling.layout.pattern = [%d{ISO8601}][%-5p][%-25c] %-.10000m%n
appender.rolling.layout.pattern = [%d{ISO8601}][%-5p][%-25c]%notEmpty{[%X{pipeline.id}]} %-.10000m%n
appender.rolling.policies.size.type = SizeBasedTriggeringPolicy
appender.rolling.policies.size.size = 100MB
appender.rolling.strategy.type = DefaultRolloverStrategy
Expand Down
3 changes: 2 additions & 1 deletion logstash-core/lib/logstash/agent.rb
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ def initialize(settings = LogStash::SETTINGS, source_loader = nil)

def execute
@thread = Thread.current # this var is implicitly used by Stud.stop?
LogStash::Util.set_thread_name("Agent thread")
logger.debug("Starting agent")

transition_to_running
Expand Down Expand Up @@ -307,7 +308,7 @@ def converge_state(pipeline_actions)

pipeline_actions.map do |action|
Thread.new(action, converge_result) do |action, converge_result|
java.lang.Thread.currentThread().setName("Converge #{action}");
LogStash::Util.set_thread_name("Converge #{action}")
# We execute every task we need to converge the current state of pipelines
# for every task we will record the action result, that will help us
# the results of all the task will determine if the converge was successful or not
Expand Down
5 changes: 5 additions & 0 deletions logstash-core/lib/logstash/java_pipeline.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
require "logstash/compiler"
require "logstash/config/lir_serializer"

java_import org.apache.logging.log4j.ThreadContext

module LogStash; class JavaPipeline < JavaBasePipeline
include LogStash::Util::Loggable
attr_reader \
Expand Down Expand Up @@ -102,6 +104,7 @@ def start
@thread = Thread.new do
begin
LogStash::Util.set_thread_name("pipeline.#{pipeline_id}")
ThreadContext.put("pipeline.id", pipeline_id)
run
@finished_run.make_true
rescue => e
Expand Down Expand Up @@ -236,6 +239,7 @@ def start_workers
pipeline_workers.times do |t|
thread = Thread.new do
Util.set_thread_name("[#{pipeline_id}]>worker#{t}")
ThreadContext.put("pipeline.id", pipeline_id)
org.logstash.execution.WorkerLoop.new(
lir_execution, filter_queue_client, @events_filtered, @events_consumed,
@flushRequested, @flushing, @shutdownRequested, @drain_queue).run
Expand Down Expand Up @@ -305,6 +309,7 @@ def start_input(plugin)

def inputworker(plugin)
Util::set_thread_name("[#{pipeline_id}]<#{plugin.class.config_name}")
ThreadContext.put("pipeline.id", pipeline_id)
begin
plugin.run(wrapped_write_client(plugin.id.to_sym))
rescue => e
Expand Down
11 changes: 9 additions & 2 deletions logstash-core/lib/logstash/pipeline.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
require "logstash/filter_delegator"
require "logstash/compiler"

java_import org.apache.logging.log4j.ThreadContext

module LogStash; class BasePipeline < AbstractPipeline
include LogStash::Util::Loggable

Expand Down Expand Up @@ -172,7 +174,8 @@ def start

@thread = Thread.new do
begin
LogStash::Util.set_thread_name("pipeline.#{pipeline_id}")
LogStash::Util.set_thread_name("[#{pipeline_id}]-manager")
ThreadContext.put("pipeline.id", pipeline_id)
run
@finished_run.make_true
rescue => e
Expand Down Expand Up @@ -300,7 +303,8 @@ def start_workers

pipeline_workers.times do |t|
thread = Thread.new(batch_size, batch_delay, self) do |_b_size, _b_delay, _pipeline|
Util.set_thread_name("[#{pipeline_id}]>worker#{t}")
LogStash::Util::set_thread_name("[#{pipeline_id}]>worker#{t}")
ThreadContext.put("pipeline.id", pipeline_id)
_pipeline.worker_loop(_b_size, _b_delay)
end
@worker_threads << thread
Expand Down Expand Up @@ -430,6 +434,7 @@ def start_input(plugin)

def inputworker(plugin)
Util::set_thread_name("[#{pipeline_id}]<#{plugin.class.config_name}")
ThreadContext.put("pipeline.id", pipeline_id)
begin
plugin.run(wrapped_write_client(plugin.id.to_sym))
rescue => e
Expand Down Expand Up @@ -535,6 +540,8 @@ def start_flusher
raise "Attempted to start flusher on a stopped pipeline!" if stopped?

@flusher_thread = Thread.new do
LogStash::Util.set_thread_name("[#{pipeline_id}]-flusher-thread")
ThreadContext.put("pipeline.id", pipeline_id)
while Stud.stoppable_sleep(5, 0.1) { stopped? }
flush
break if stopped?
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,10 @@ public IRubyObject start(final ThreadContext context) {
} else {
queueWriter = qw;
}
Thread t = new Thread(() -> input.start(queueWriter::push));
Thread t = new Thread(() -> {
org.apache.logging.log4j.ThreadContext.put("pipeline.id", pipeline.pipelineId().toString());
input.start(queueWriter::push);
});
t.setName(pipeline.pipelineId().asJavaString() + "_" + input.getName() + "_" + input.getId());
t.start();
return JavaObject.wrap(context.getRuntime(), t);
Expand Down
12 changes: 12 additions & 0 deletions qa/integration/fixtures/pipeline_id_log_spec.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
---
services:
- logstash
config: |-
input {
generator {
count => 4
}
}
output {
null {}
}
47 changes: 47 additions & 0 deletions qa/integration/specs/pipeline_id_log_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
require_relative '../framework/fixture'
require_relative '../framework/settings'
require_relative '../services/logstash_service'
require_relative '../framework/helpers'
require "logstash/devutils/rspec/spec_helper"
require "yaml"

describe "Test Logstash Pipeline id" do
before(:all) {
@fixture = Fixture.new(__FILE__)
# used in multiple LS tests
@ls = @fixture.get_service("logstash")
}

after(:all) {
@fixture.teardown
}

before(:each) {
# backup the application settings file -- logstash.yml
FileUtils.cp(@ls.application_settings_file, "#{@ls.application_settings_file}.original")
}

after(:each) {
@ls.teardown
# restore the application settings file -- logstash.yml
FileUtils.mv("#{@ls.application_settings_file}.original", @ls.application_settings_file)
}

let(:temp_dir) { Stud::Temporary.directory("logstash-pipelinelog-test") }
let(:config) { @fixture.config("root") }

it "should write logs with pipeline.id" do
pipeline_name = "custom_pipeline"
settings = {
"path.logs" => temp_dir,
"pipeline.id" => pipeline_name
}
IO.write(@ls.application_settings_file, settings.to_yaml)
@ls.spawn_logstash("-w", "1" , "-e", config)
@ls.wait_for_logstash
sleep 2 until @ls.exited?
plainlog_file = "#{temp_dir}/logstash-plain.log"
expect(File.exists?(plainlog_file)).to be true
expect(IO.read(plainlog_file) =~ /\[logstash.javapipeline\s*\]\[#{pipeline_name}\]/).to be > 0
end
end

0 comments on commit a3ac21d

Please sign in to comment.