Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added plugin.name to fish tag log lines #11078

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions config/log4j2.properties
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ name = LogstashPropertiesConfig
appender.console.type = Console
appender.console.name = plain_console
appender.console.layout.type = PatternLayout
appender.console.layout.pattern = [%d{ISO8601}][%-5p][%-25c]%notEmpty{[%X{pipeline.id}]} %m%n
appender.console.layout.pattern = [%d{ISO8601}][%-5p][%-25c]%notEmpty{[%X{pipeline.id}]}%notEmpty{[%X{plugin.id}]} %m%n

appender.json_console.type = Console
appender.json_console.name = json_console
Expand All @@ -21,7 +21,7 @@ appender.rolling.policies.time.type = TimeBasedTriggeringPolicy
appender.rolling.policies.time.interval = 1
appender.rolling.policies.time.modulate = true
appender.rolling.layout.type = PatternLayout
appender.rolling.layout.pattern = [%d{ISO8601}][%-5p][%-25c]%notEmpty{[%X{pipeline.id}]} %m%n
appender.rolling.layout.pattern = [%d{ISO8601}][%-5p][%-25c]%notEmpty{[%X{pipeline.id}]}%notEmpty{[%X{plugin.id}]} %m%n
appender.rolling.policies.size.type = SizeBasedTriggeringPolicy
appender.rolling.policies.size.size = 100MB
appender.rolling.strategy.type = DefaultRolloverStrategy
Expand Down Expand Up @@ -144,7 +144,7 @@ appender.deprecation_rolling.policies.time.type = TimeBasedTriggeringPolicy
appender.deprecation_rolling.policies.time.interval = 1
appender.deprecation_rolling.policies.time.modulate = true
appender.deprecation_rolling.layout.type = PatternLayout
appender.deprecation_rolling.layout.pattern = [%d{ISO8601}][%-5p][%-25c]%notEmpty{[%X{pipeline.id}]} %m%n
appender.deprecation_rolling.layout.pattern = [%d{ISO8601}][%-5p][%-25c]%notEmpty{[%X{pipeline.id}]}%notEmpty{[%X{plugin.id}]} %m%n
appender.deprecation_rolling.policies.size.type = SizeBasedTriggeringPolicy
appender.deprecation_rolling.policies.size.size = 100MB
appender.deprecation_rolling.strategy.type = DefaultRolloverStrategy
Expand Down
1 change: 1 addition & 0 deletions logstash-core/lib/logstash/java_pipeline.rb
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,7 @@ def start_input(plugin)
def inputworker(plugin)
Util::set_thread_name("[#{pipeline_id}]<#{plugin.class.config_name}")
ThreadContext.put("pipeline.id", pipeline_id)
ThreadContext.put("plugin.id", plugin.id)
begin
plugin.run(wrapped_write_client(plugin.id.to_sym))
rescue => e
Expand Down
1 change: 1 addition & 0 deletions logstash-core/lib/logstash/pipeline.rb
Original file line number Diff line number Diff line change
Expand Up @@ -445,6 +445,7 @@ def start_input(plugin)
def inputworker(plugin)
Util::set_thread_name("[#{pipeline_id}]<#{plugin.class.config_name}")
ThreadContext.put("pipeline.id", pipeline_id)
ThreadContext.put("plugin.id", plugin.id)
begin
plugin.run(wrapped_write_client(plugin.id.to_sym))
rescue => e
Expand Down
16 changes: 15 additions & 1 deletion logstash-core/spec/logstash/pipeline_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ def register
end

def run(queue)
@logger.debug("check log4j fish tagging input plugin")
end

def close
Expand Down Expand Up @@ -73,7 +74,9 @@ class DummyFilter < LogStash::Filters::Base

def register() end

def filter(event) end
def filter(event)
@logger.debug("check log4j fish tagging filter plugin")
end

def threadsafe?() false; end

Expand Down Expand Up @@ -243,6 +246,7 @@ def flush(options)
before do
expect(::LogStash::Pipeline).to receive(:logger).and_return(logger)
allow(logger).to receive(:debug?).and_return(true)
allow_any_instance_of(DummyFilter).to receive(:logger).and_return(logger)
end

it "should not receive a debug message with the compiled code" do
Expand All @@ -266,8 +270,18 @@ def flush(options)
pipeline.filter_func([LogStash::Event.new])
pipeline.close
end

it "should log fish tagging of plugins" do
pipeline_settings_obj.set("config.debug", true)
pipeline = mock_pipeline_from_string(test_config_with_filters, pipeline_settings_obj)
expect(logger).to receive(:debug).with(/filter received/, anything)
expect(logger).to receive(:debug).with(/[dummyfilter]/)
pipeline.filter_func([LogStash::Event.new])
pipeline.close
end
end


context "when there is no command line -w N set" do
it "starts one filter thread" do
msg = "Defaulting pipeline worker threads to 1 because there are some filters that might not work with multiple worker threads"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,9 @@ public static ComputeStepSyntaxElement<Dataset> outputDataset(final Collection<D
final Closure computeSyntax;
if (parents.isEmpty()) {
clearSyntax = Closure.EMPTY;
computeSyntax = Closure.wrap(invokeOutput(fields.add(output), BATCH_ARG));
computeSyntax = Closure.wrap(setPluginIdForLog4j(output),
invokeOutput(fields.add(output), BATCH_ARG),
unsetPluginIdForLog4j());
} else {
final Collection<ValueSyntaxElement> parentFields =
parents.stream().map(fields::add).collect(Collectors.toList());
Expand All @@ -167,8 +169,10 @@ public static ComputeStepSyntaxElement<Dataset> outputDataset(final Collection<D
clearSyntax = clearSyntax(parentFields);
}
final ValueSyntaxElement inputBuffer = fields.add(buffer);
computeSyntax = withInputBuffering(
Closure.wrap(invokeOutput(fields.add(output), inputBuffer), inlineClear),
computeSyntax = withInputBuffering(Closure.wrap(
setPluginIdForLog4j(output),
invokeOutput(fields.add(output), inputBuffer), inlineClear,
unsetPluginIdForLog4j()),
parentFields, inputBuffer
);
}
Expand All @@ -184,12 +188,13 @@ private static Closure filterBody(final ValueSyntaxElement outputBuffer,
final ValueSyntaxElement inputBuffer, final ClassFields fields,
final AbstractFilterDelegatorExt plugin) {
final ValueSyntaxElement filterField = fields.add(plugin);
final Closure body = Closure.wrap(
final Closure body = Closure.wrap(setPluginIdForLog4j(plugin),
buffer(outputBuffer, filterField.call("multiFilter", inputBuffer))
);
if (plugin.hasFlush()) {
body.add(callFilterFlush(fields, outputBuffer, filterField, !plugin.periodicFlush()));
}
body.add(unsetPluginIdForLog4j());
return body;
}

Expand Down Expand Up @@ -286,6 +291,24 @@ private static MethodLevelSyntaxElement callFilterFlush(final ClassFields fields
);
}

private static MethodLevelSyntaxElement unsetPluginIdForLog4j() {
return () -> "org.apache.logging.log4j.ThreadContext.remove(\"plugin.id\")";
}

private static MethodLevelSyntaxElement setPluginIdForLog4j(final AbstractFilterDelegatorExt filterPlugin) {
final IRubyObject pluginId = filterPlugin.getId();
return generateLog4jContextAssignment(pluginId);
}

private static MethodLevelSyntaxElement setPluginIdForLog4j(final AbstractOutputDelegatorExt outputPlugin) {
final IRubyObject pluginId = outputPlugin.getId();
return generateLog4jContextAssignment(pluginId);
}

private static MethodLevelSyntaxElement generateLog4jContextAssignment(IRubyObject pluginId) {
return () -> "org.apache.logging.log4j.ThreadContext.put(\"plugin.id\", \"" + pluginId + "\")";
}

private static MethodLevelSyntaxElement clear(final ValueSyntaxElement field) {
return field.call("clear");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,21 @@
import org.jruby.RubyArray;
import org.jruby.RubyClass;
import org.jruby.RubyHash;
import org.jruby.RubyObject;
import org.jruby.RubyString;
import org.jruby.anno.JRubyClass;
import org.jruby.anno.JRubyMethod;
import org.jruby.internal.runtime.methods.DynamicMethod;
import org.jruby.runtime.ThreadContext;
import org.jruby.runtime.builtin.IRubyObject;
import org.logstash.RubyUtil;
import org.logstash.execution.WorkerLoop;
import org.logstash.instrument.metrics.AbstractNamespacedMetricExt;
import org.logstash.instrument.metrics.counter.LongCounter;

import java.util.UUID;

import static org.logstash.RubyUtil.RUBY;

@JRubyClass(name = "FilterDelegator")
public final class FilterDelegatorExt extends AbstractFilterDelegatorExt {

Expand Down Expand Up @@ -44,13 +48,15 @@ public IRubyObject initialize(final ThreadContext context, final IRubyObject fil
}

@VisibleForTesting
public FilterDelegatorExt initForTesting(final IRubyObject filter) {
public FilterDelegatorExt initForTesting(final IRubyObject filter, RubyObject configNameDouble) {
eventMetricOut = LongCounter.DUMMY_COUNTER;
eventMetricIn = LongCounter.DUMMY_COUNTER;
eventMetricTime = LongCounter.DUMMY_COUNTER;
this.filter = filter;
filterMethod = filter.getMetaClass().searchMethod(FILTER_METHOD_NAME);
flushes = filter.respondsTo("flush");
filterClass = configNameDouble.getType();
id = RUBY.newString(UUID.randomUUID().toString());
return this;
}

Expand Down Expand Up @@ -96,8 +102,14 @@ protected IRubyObject getConfigName(final ThreadContext context) {
@Override
@SuppressWarnings({"rawtypes"})
protected RubyArray doMultiFilter(final RubyArray batch) {
return (RubyArray) filterMethod.call(
WorkerLoop.THREAD_CONTEXT.get(), filter, filterClass, FILTER_METHOD_NAME, batch);
final IRubyObject pluginId = this.getId();
org.apache.logging.log4j.ThreadContext.put("plugin.id", pluginId.toString());
try {
return (RubyArray) filterMethod.call(
WorkerLoop.THREAD_CONTEXT.get(), filter, filterClass, FILTER_METHOD_NAME, batch);
} finally {
org.apache.logging.log4j.ThreadContext.remove("plugin.id");
}
}

@Override
Expand All @@ -112,6 +124,6 @@ protected boolean getHasFlush() {

@Override
protected boolean getPeriodicFlush() {
return filter.callMethod(RubyUtil.RUBY.getCurrentContext(), "periodic_flush").isTrue();
return filter.callMethod(RUBY.getCurrentContext(), "periodic_flush").isTrue();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ public IRubyObject start(final ThreadContext context) {
}
Thread t = new Thread(() -> {
org.apache.logging.log4j.ThreadContext.put("pipeline.id", pipeline.pipelineId().toString());
org.apache.logging.log4j.ThreadContext.put("plugin.id", this.getId(context).toString());
input.start(queueWriter::push);
});
t.setName(pipeline.pipelineId().asJavaString() + "_" + input.getName() + "_" + input.getId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@
import org.logstash.instrument.metrics.AbstractMetricExt;

@JRubyClass(name = "OutputDelegator")
public final class OutputDelegatorExt extends AbstractOutputDelegatorExt {
public final class
OutputDelegatorExt extends AbstractOutputDelegatorExt {

private static final long serialVersionUID = 1L;

Expand Down Expand Up @@ -75,9 +76,13 @@ protected IRubyObject getConcurrency(final ThreadContext context) {
@Override
protected void doOutput(final Collection<JrubyEventExtLibrary.RubyEvent> batch) {
try {
final IRubyObject pluginId = this.getId();
org.apache.logging.log4j.ThreadContext.put("plugin.id", pluginId.toString());
strategy.multiReceive(WorkerLoop.THREAD_CONTEXT.get(), (IRubyObject) batch);
} catch (final InterruptedException ex) {
throw new IllegalStateException(ex);
} finally {
org.apache.logging.log4j.ThreadContext.remove("plugin.id");
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import java.util.function.Supplier;
import org.hamcrest.CoreMatchers;
import org.hamcrest.MatcherAssert;
import org.jruby.RubyObject;
import org.jruby.RubyString;
import org.jruby.runtime.builtin.IRubyObject;
import org.junit.After;
Expand Down Expand Up @@ -490,9 +491,10 @@ public AbstractOutputDelegatorExt buildOutput(final RubyString name, SourceWithM
@Override
public AbstractFilterDelegatorExt buildFilter(final RubyString name, SourceWithMetadata source,
final IRubyObject args, Map<String, Object> pluginArgs) {
final RubyObject configNameDouble = org.logstash.config.ir.PluginConfigNameMethodDouble.create(name);
return new FilterDelegatorExt(
RubyUtil.RUBY, RubyUtil.FILTER_DELEGATOR_CLASS)
.initForTesting(setupPlugin(name, filters));
.initForTesting(setupPlugin(name, filters), configNameDouble);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package org.logstash.config.ir;

import org.jruby.Ruby;
import org.jruby.RubyClass;
import org.jruby.RubyObject;
import org.jruby.RubyString;
import org.jruby.anno.JRubyClass;
import org.jruby.anno.JRubyMethod;
import org.jruby.runtime.ThreadContext;
import org.jruby.runtime.builtin.IRubyObject;

import static org.logstash.RubyUtil.RUBY;

/**
* Fake class to double the Ruby's method config_name()
* */
@JRubyClass(name = "PluginConfigNameMethodDouble")
public class PluginConfigNameMethodDouble extends RubyObject {

private static final long serialVersionUID = 1L;

static final RubyClass RUBY_META_CLASS;
private RubyString filterName;

static {
RUBY_META_CLASS = RUBY.defineClass("PluginConfigNameMethodDouble", RUBY.getObject(),
PluginConfigNameMethodDouble::new);
RUBY_META_CLASS.defineAnnotatedMethods(org.logstash.config.ir.compiler.FakeOutClass.class);
}

PluginConfigNameMethodDouble(final Ruby runtime, final RubyClass metaClass) {
super(runtime, metaClass);
}

static PluginConfigNameMethodDouble create(RubyString filterName) {
final PluginConfigNameMethodDouble instance = new PluginConfigNameMethodDouble(RUBY, RUBY_META_CLASS);
instance.filterName = filterName;
return instance;
}

@JRubyMethod
public IRubyObject name(final ThreadContext context) {
return RUBY.newString("example");
}

@JRubyMethod(name = "config_name")
public IRubyObject configName(final ThreadContext context, final IRubyObject recv) {
return filterName;
}

@JRubyMethod
public IRubyObject initialize(final ThreadContext context, final IRubyObject args) {
return this;
}
}
18 changes: 18 additions & 0 deletions qa/integration/fixtures/plugin_name_log_spec.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
---
services:
- logstash
config: |-
input {
generator {
count => 4
}
}
filter {
sleep {
id => "sleep_filter_123"
time => 1
}
}
output {
null {}
}
Loading