diff --git a/logstash-core/lib/logstash/codecs/delegator.rb b/logstash-core/lib/logstash/codecs/delegator.rb index 2cbb440c5b9..7d477c0b800 100644 --- a/logstash-core/lib/logstash/codecs/delegator.rb +++ b/logstash-core/lib/logstash/codecs/delegator.rb @@ -1,7 +1,9 @@ module LogStash::Codecs class Delegator < SimpleDelegator - def initialize(obj) + def initialize(obj, parent_plugin_name) super(obj) + @parent_plugin_name = parent_plugin_name + @parent_config_reference = nil @encode_metric = LogStash::Instrument::NamespacedNullMetric.new @decode_metric = LogStash::Instrument::NamespacedNullMetric.new end @@ -14,6 +16,8 @@ def metric=(metric) __getobj__.metric = metric __getobj__.metric.gauge(:name, __getobj__.class.config_name) + __getobj__.metric.gauge(:"parent-config-ref", @parent_config_reference) + __getobj__.metric.gauge(:"parent-name", @parent_plugin_name) @encode_metric = __getobj__.metric.namespace(:encode) @encode_metric.counter(:writes_in) @@ -25,6 +29,10 @@ def metric=(metric) @decode_metric.report_time(:duration_in_millis, 0) end + def parent_config_reference=(parent_config_reference) + @parent_config_reference = parent_config_reference + end + def encode(event) @encode_metric.increment(:writes_in) @encode_metric.time(:duration_in_millis) do diff --git a/logstash-core/lib/logstash/compiler.rb b/logstash-core/lib/logstash/compiler.rb index 6cd68b280e0..685c5e17294 100644 --- a/logstash-core/lib/logstash/compiler.rb +++ b/logstash-core/lib/logstash/compiler.rb @@ -6,9 +6,10 @@ module LogStash; class Compiler include ::LogStash::Util::Loggable - def self.compile_sources(sources_with_metadata, support_escapes) + def self.compile_sources(pipeline_config, support_escapes) + sources_with_metadata = [org.logstash.common.SourceWithMetadata.new("str", "pipeline", 0, 0, pipeline_config.config_string)] graph_sections = sources_with_metadata.map do |swm| - self.compile_graph(swm, support_escapes) + self.compile_graph(pipeline_config, swm, support_escapes) end input_graph = Graph.combine(*graph_sections.map {|s| s[:input] }).graph @@ -29,7 +30,7 @@ def self.compile_sources(sources_with_metadata, support_escapes) PipelineIR.new(input_graph, filter_graph, output_graph, original_source) end - def self.compile_imperative(source_with_metadata, support_escapes) + def self.compile_imperative(pipeline_config, source_with_metadata, support_escapes) if !source_with_metadata.is_a?(org.logstash.common.SourceWithMetadata) raise ArgumentError, "Expected 'org.logstash.common.SourceWithMetadata', got #{source_with_metadata.class}" end @@ -41,11 +42,23 @@ def self.compile_imperative(source_with_metadata, support_escapes) raise ConfigurationError, grammar.failure_reason end + plugins = config.recursive_select(LogStashCompilerLSCLGrammar::LogStash::Compiler::LSCL::AST::Plugin) + added_source_line_column(pipeline_config, plugins) + config.process_escape_sequences = support_escapes config.compile(source_with_metadata) end - def self.compile_graph(source_with_metadata, support_escapes) - Hash[compile_imperative(source_with_metadata, support_escapes).map {|section,icompiled| [section, icompiled.toGraph]}] + def self.compile_graph(pipeline_config, source_with_metadata, support_escapes) + Hash[compile_imperative(pipeline_config, source_with_metadata, support_escapes).map {|section,icompiled| [section, icompiled.toGraph]}] end + + def self.added_source_line_column(pipeline_config, plugins) + plugins.each do |plugin| + remapped_line = pipeline_config.lookup_source_and_line(plugin.line_and_column[0]) + plugin.source_file = remapped_line[0] + plugin.source_line = remapped_line[1] + end + end + end; end diff --git a/logstash-core/lib/logstash/compiler/lscl.rb b/logstash-core/lib/logstash/compiler/lscl.rb index d658bf25788..9ae60521ebb 100644 --- a/logstash-core/lib/logstash/compiler/lscl.rb +++ b/logstash-core/lib/logstash/compiler/lscl.rb @@ -72,8 +72,11 @@ def expr class Plugins < Node; end class Plugin < Node + + attr_accessor :source_file, :source_line + def expr - jdsl.iPlugin(source_meta, plugin_type_enum, self.plugin_name, self.expr_attributes) + jdsl.iPlugin(source_meta, plugin_type_enum, self.plugin_name, source_file, source_line, self.expr_attributes) end def plugin_type_enum @@ -95,7 +98,11 @@ def plugin_name def expr_attributes # Turn attributes into a hash map - self.attributes.recursive_select(Attribute).map(&:expr).map {|k,v| + self.attributes.recursive_select(Attribute).map {|attribute| + attribute.source_file = source_file + attribute.source_line = source_line + attribute + }.map(&:expr).map {|k,v| if v.java_kind_of?(Java::OrgLogstashConfigIrExpression::ValueExpression) [k, v.get] else @@ -130,7 +137,12 @@ def expr end class Attribute < Node + attr_accessor :source_file, :source_line def expr + if value.is_a?(Plugin) + value.source_file = source_file + value.source_line = source_line + end [name.text_value, value.expr] end end diff --git a/logstash-core/lib/logstash/config/config_ast.rb b/logstash-core/lib/logstash/config/config_ast.rb index 1d0019616b5..19234846e5b 100644 --- a/logstash-core/lib/logstash/config/config_ast.rb +++ b/logstash-core/lib/logstash/config/config_ast.rb @@ -213,6 +213,8 @@ def generate_variables class Plugins < Node; end class Plugin < Node + attr_accessor :source_file, :source_line + def plugin_type if recursive_select_parent(Plugin).any? return "codec" @@ -231,14 +233,18 @@ def variable_name def compile_initializer # If any parent is a Plugin, this must be a codec. - if attributes.elements.nil? - return "plugin(#{plugin_type.inspect}, #{plugin_name.inspect}, #{source_meta.line}, #{source_meta.column})" << (plugin_type == "codec" ? "" : "\n") + return "plugin(#{plugin_type.inspect}, #{plugin_name.inspect}, #{source_meta.line}, #{source_meta.column}, #{source_file.inspect}, #{source_line.inspect})" << (plugin_type == "codec" ? "" : "\n") else - settings = attributes.recursive_select(Attribute).collect(&:compile).reject(&:empty?) + attrs = attributes.recursive_select(Attribute) + attrs.each do |attribute| + attribute.source_file = source_file + attribute.source_line = source_line + end + settings = attrs.collect(&:compile).reject(&:empty?) attributes_code = "LogStash::Util.hash_merge_many(#{settings.map { |c| "{ #{c} }" }.join(", ")})" - return "plugin(#{plugin_type.inspect}, #{plugin_name.inspect}, #{source_meta.line}, #{source_meta.column}, #{attributes_code})" << (plugin_type == "codec" ? "" : "\n") + return "plugin(#{plugin_type.inspect}, #{plugin_name.inspect}, #{source_meta.line}, #{source_meta.column}, #{source_file.inspect}, #{source_line.inspect}, #{attributes_code})" << (plugin_type == "codec" ? "" : "\n") end end @@ -255,7 +261,7 @@ def compile when "codec" settings = attributes.recursive_select(Attribute).collect(&:compile).reject(&:empty?) attributes_code = "LogStash::Util.hash_merge_many(#{settings.map { |c| "{ #{c} }" }.join(", ")})" - return "plugin(#{plugin_type.inspect}, #{plugin_name.inspect}, #{source_meta.line}, #{source_meta.column}, #{attributes_code})" + return "plugin(#{plugin_type.inspect}, #{plugin_name.inspect}, #{source_meta.line}, #{source_meta.column}, #{source_file.inspect}, #{source_line.inspect}, #{attributes_code})" end end @@ -309,7 +315,12 @@ def compile end end class Attribute < Node + attr_accessor :source_file, :source_line def compile + if value.is_a?(Plugin) + value.source_file = source_file + value.source_line = source_line + end return %Q(#{name.compile} => #{value.compile}) end end diff --git a/logstash-core/lib/logstash/config/mixin.rb b/logstash-core/lib/logstash/config/mixin.rb index 621ef9e0c59..9fae506a072 100644 --- a/logstash-core/lib/logstash/config/mixin.rb +++ b/logstash-core/lib/logstash/config/mixin.rb @@ -411,7 +411,9 @@ def validate_value(value, validator) case validator when :codec if value.first.is_a?(String) - value = LogStash::Codecs::Delegator.new LogStash::Plugin.lookup("codec", value.first).new + parent_plugin_name = self.config_name + codec = LogStash::Plugin.lookup("codec", value.first).new + value = LogStash::Codecs::Delegator.new(codec, parent_plugin_name) return true, value else value = value.first diff --git a/logstash-core/lib/logstash/config/pipeline_config.rb b/logstash-core/lib/logstash/config/pipeline_config.rb index d93b444b4e5..0df3c24535b 100644 --- a/logstash-core/lib/logstash/config/pipeline_config.rb +++ b/logstash-core/lib/logstash/config/pipeline_config.rb @@ -1,7 +1,10 @@ # encoding: utf-8 require "digest" +java_import org.logstash.config.ir.ConfigSourceSegment + module LogStash module Config + class PipelineConfig include LogStash::Util::Loggable @@ -44,5 +47,27 @@ def display_debug_information logger.debug("Merged config") logger.debug("\n\n#{config_string}") end + + def lookup_source_and_line(merged_line_number) + res = source_map.find { |source_segment| source_segment.contains(merged_line_number) } + if res == nil + raise IndexError + end + [res.getSource(), res.rebase(merged_line_number)] + end + + private + def source_map + @source_map ||= begin + offset = 0 + source_map = [] + config_parts.each do |config_part| + source_segment = ConfigSourceSegment.new config_part.id, offset, config_part.getLinesCount() + source_map << source_segment + offset += source_segment.getLength() + end + source_map.freeze + end + end end end end diff --git a/logstash-core/lib/logstash/inputs/base.rb b/logstash-core/lib/logstash/inputs/base.rb index 50878ffff57..8c62341b393 100644 --- a/logstash-core/lib/logstash/inputs/base.rb +++ b/logstash-core/lib/logstash/inputs/base.rb @@ -99,6 +99,15 @@ def clone cloned end + #used only in tests + def codec_attribute + @codec + end + + def codec + params["codec"] + end + def metric=(metric) super # Hack to create a new metric namespace using 'plugins' as the root diff --git a/logstash-core/lib/logstash/pipeline.rb b/logstash-core/lib/logstash/pipeline.rb index b6b818ea53e..c762ea28a90 100644 --- a/logstash-core/lib/logstash/pipeline.rb +++ b/logstash-core/lib/logstash/pipeline.rb @@ -38,6 +38,9 @@ def initialize(pipeline_config, namespaced_metric = nil, agent = nil) parsed_config = grammar.parse(config_str) raise(ConfigurationError, grammar.failure_reason) if parsed_config.nil? + plugins = parsed_config.recursive_select(LogStash::Config::AST::Plugin) + added_source_line_column(pipeline_config, plugins) + parsed_config.process_escape_sequences = settings.get_value("config.support_escapes") config_code = parsed_config.compile @@ -68,9 +71,16 @@ def non_reloadable_plugins private + def added_source_line_column(pipeline_config, plugins) + plugins.each do |plugin| + remapped_line = pipeline_config.lookup_source_and_line(plugin.line_and_column[0]) + plugin.source_file = remapped_line[0] + plugin.source_line = remapped_line[1] + end + end - def plugin(plugin_type, name, line, column, *args) - @plugin_factory.plugin(plugin_type, name, line, column, *args) + def plugin(plugin_type, name, line, column, source_file, source_line, *args) + @plugin_factory.plugin(plugin_type, name, line, column, source_file, source_line, *args) end def default_logging_keys(other_keys = {}) diff --git a/logstash-core/spec/logstash/codecs/delegator_spec.rb b/logstash-core/spec/logstash/codecs/delegator_spec.rb index c5702ee94d9..70e897ac9d7 100644 --- a/logstash-core/spec/logstash/codecs/delegator_spec.rb +++ b/logstash-core/spec/logstash/codecs/delegator_spec.rb @@ -23,7 +23,7 @@ def decode(e) let(:codec) { LogStash::Codecs::MockCodec.new } subject do - delegator = described_class.new(codec) + delegator = described_class.new(codec, "MockCodec") delegator.metric = metric.namespace([:stats, :pipelines, :main, :plugins, :codecs, :my_id]) delegator end diff --git a/logstash-core/spec/logstash/compiler/compiler_spec.rb b/logstash-core/spec/logstash/compiler/compiler_spec.rb index 8e614332fe8..d5cc8768f2c 100644 --- a/logstash-core/spec/logstash/compiler/compiler_spec.rb +++ b/logstash-core/spec/logstash/compiler/compiler_spec.rb @@ -1,6 +1,7 @@ require "spec_helper" require "logstash/compiler" require "support/helpers" +require "logstash/config/pipeline_config" java_import Java::OrgLogstashConfigIr::DSL describe LogStash::Compiler do @@ -50,8 +51,9 @@ def rand_meta org.logstash.common.SourceWithMetadata.new("#{source_protocol}_#{idx}", "#{source_id}_#{idx}", 0, 0, source) end end + let(:pipeline_config) {LogStash::Config::PipelineConfig.new LogStash::Config::Source::Local, "test_pipeline", sources_with_metadata, LogStash::SETTINGS} - subject(:pipeline) { described_class.compile_sources(sources_with_metadata, false) } + subject(:pipeline) { described_class.compile_sources(pipeline_config, false) } it "should generate a hash" do expect(pipeline.unique_hash).to be_a(String) @@ -64,20 +66,6 @@ def rand_meta it "should provide the original source" do expect(pipeline.original_source).to eq(sources.join("\n")) end - - describe "applying protocol and id metadata" do - it "should apply the correct source metadata to all components" do - # TODO: seems to be a jruby regression we cannot currently call each on a stream - pipeline.get_plugin_vertices.each do |pv| - name_idx = pv.plugin_definition.name.split("_").last - source_protocol_idx = pv.source_with_metadata.protocol.split("_").last - source_id_idx = pv.source_with_metadata.id.split("_").last - - expect(name_idx).to eq(source_protocol_idx) - expect(name_idx).to eq(source_id_idx) - end - end - end end describe "complex configs" do @@ -104,7 +92,8 @@ def rand_meta describe "compiling imperative" do let(:source_id) { "fake_sourcefile" } let(:source_with_metadata) { org.logstash.common.SourceWithMetadata.new(source_protocol, source_id, 0, 0, source) } - subject(:compiled) { described_class.compile_imperative(source_with_metadata, settings.get_value("config.support_escapes")) } + let(:pipeline_config) {LogStash::Config::PipelineConfig.new LogStash::Config::Source::Local, "test_pipeline", [source_with_metadata], LogStash::SETTINGS} + subject(:compiled) { described_class.compile_imperative(pipeline_config, source_with_metadata, settings.get_value("config.support_escapes")) } context "when config.support_escapes" do let(:parser) { LogStashCompilerLSCLGrammarParser.new } diff --git a/logstash-core/spec/logstash/config/pipeline_config_spec.rb b/logstash-core/spec/logstash/config/pipeline_config_spec.rb index d7ed1c56524..18777988191 100644 --- a/logstash-core/spec/logstash/config/pipeline_config_spec.rb +++ b/logstash-core/spec/logstash/config/pipeline_config_spec.rb @@ -72,4 +72,58 @@ end end end + + describe "source and line remapping" do + context "when pipeline is constructed from single file single line" do + let (:pipeline_conf_string) { 'input { generator1 }' } + subject { described_class.new(source, pipeline_id, [org.logstash.common.SourceWithMetadata.new("file", "/tmp/1", 0, 0, pipeline_conf_string)], settings) } + it "return the same line of the queried" do + expect(subject.lookup_source_and_line(1)[1]).to eq(1) + end + end + + context "when pipeline is constructed from single file" do + let (:pipeline_conf_string) { 'input { + generator1 + }' } + subject { described_class.new(source, pipeline_id, [org.logstash.common.SourceWithMetadata.new("file", "/tmp/1", 0, 0, pipeline_conf_string)], settings) } + + it "return the same line of the queried" do + expect(subject.lookup_source_and_line(1)[1]).to eq(1) + expect(subject.lookup_source_and_line(2)[1]).to eq(2) + end + + it "throw exception if line is out of bound" do + expect { subject.lookup_source_and_line(100) }.to raise_exception(IndexError) + end + end + + context "when pipeline is constructed from multiple files" do + let (:pipeline_conf_string_part1) { 'input { + generator1 + }' } + let (:pipeline_conf_string_part2) { 'output { + stdout + }' } + let(:merged_config_parts) do + [ + org.logstash.common.SourceWithMetadata.new("file", "/tmp/input", 0, 0, pipeline_conf_string_part1), + org.logstash.common.SourceWithMetadata.new("file", "/tmp/output", 0, 0, pipeline_conf_string_part2) + ] + end + subject { described_class.new(source, pipeline_id, merged_config_parts, settings) } + + it "return the line of first segment" do + expect(subject.lookup_source_and_line(2)).to eq(["/tmp/input", 2]) + end + + it "return the line of second segment" do + expect(subject.lookup_source_and_line(4)).to eq(["/tmp/output", 1]) + end + + it "throw exception if line is out of bound" do + expect { subject.lookup_source_and_line(100) }.to raise_exception(IndexError) + end + end + end end diff --git a/logstash-core/spec/logstash/filter_delegator_spec.rb b/logstash-core/spec/logstash/filter_delegator_spec.rb index 36d7605b25b..1d31d230469 100644 --- a/logstash-core/spec/logstash/filter_delegator_spec.rb +++ b/logstash-core/spec/logstash/filter_delegator_spec.rb @@ -11,7 +11,7 @@ let(:filter_id) { "my-filter" } let(:config) do - { "host" => "127.0.0.1", "id" => filter_id } + { "host" => "127.0.0.1", "id" => filter_id, "config-ref" => "S: L: 14" } end let(:collector) {LogStash::Instrument::Collector.new} let(:metric) { LogStash::Instrument::Metric.new(collector).namespace(:null) } diff --git a/logstash-core/spec/logstash/inputs/base_spec.rb b/logstash-core/spec/logstash/inputs/base_spec.rb index a16e7ae4d26..7f533b12ccc 100644 --- a/logstash-core/spec/logstash/inputs/base_spec.rb +++ b/logstash-core/spec/logstash/inputs/base_spec.rb @@ -95,7 +95,7 @@ def register; end end it "should clone the codec when cloned" do - expect(input.codec).not_to eq(cloned.codec) + expect(input.codec_attribute).not_to eq(cloned.codec_attribute) end it "should preserve codec params" do diff --git a/logstash-core/spec/logstash/java_filter_delegator_spec.rb b/logstash-core/spec/logstash/java_filter_delegator_spec.rb index c9d24b49259..e0ff24c2f54 100644 --- a/logstash-core/spec/logstash/java_filter_delegator_spec.rb +++ b/logstash-core/spec/logstash/java_filter_delegator_spec.rb @@ -17,7 +17,7 @@ def increment(_) let(:filter_id) { "my_filter" } let(:config) do - { "host" => "127.0.0.1", "id" => filter_id } + { "host" => "127.0.0.1", "id" => filter_id, "config-ref" => "S:, L: 1"} end let(:metric) { LogStash::Instrument::NamespacedMetric.new( diff --git a/logstash-core/spec/logstash/pipeline_spec.rb b/logstash-core/spec/logstash/pipeline_spec.rb index 12dd306e7e5..3e0e7681e4f 100644 --- a/logstash-core/spec/logstash/pipeline_spec.rb +++ b/logstash-core/spec/logstash/pipeline_spec.rb @@ -61,6 +61,13 @@ def encode(event) def close end + + def codec + self + end + + def parent_config_reference=(code_ref) + end end class DummyOutputMore < ::LogStash::Outputs::DummyOutput diff --git a/logstash-core/src/main/java/org/logstash/common/SourceWithMetadata.java b/logstash-core/src/main/java/org/logstash/common/SourceWithMetadata.java index 9a76ef5e533..33a766ad535 100644 --- a/logstash-core/src/main/java/org/logstash/common/SourceWithMetadata.java +++ b/logstash-core/src/main/java/org/logstash/common/SourceWithMetadata.java @@ -39,6 +39,10 @@ public String getText() { return text; } + public int getLinesCount() { + return text.split("\\n").length; + } + private static final Pattern emptyString = Pattern.compile("^\\s*$"); public SourceWithMetadata(String protocol, String id, Integer line, Integer column, String text) throws IncompleteSourceWithMetadataException { diff --git a/logstash-core/src/main/java/org/logstash/config/ir/CompiledPipeline.java b/logstash-core/src/main/java/org/logstash/config/ir/CompiledPipeline.java index 612865c8f57..4ccbcefbe49 100644 --- a/logstash-core/src/main/java/org/logstash/config/ir/CompiledPipeline.java +++ b/logstash-core/src/main/java/org/logstash/config/ir/CompiledPipeline.java @@ -131,9 +131,11 @@ private Map setupOutputs(ConfigVariableExpan outs.forEach(v -> { final PluginDefinition def = v.getPluginDefinition(); final SourceWithMetadata source = v.getSourceWithMetadata(); + final String sourceFile = v.getSourceFile(); + final int sourceLine = v.getSourceLine(); res.put(v.getId(), pluginFactory.buildOutput( RubyUtil.RUBY.newString(def.getName()), RubyUtil.RUBY.newFixnum(source.getLine()), - RubyUtil.RUBY.newFixnum(source.getColumn()), convertArgs(def), convertJavaArgs(def, cve) + RubyUtil.RUBY.newFixnum(source.getColumn()), sourceFile, sourceLine, convertArgs(def), convertJavaArgs(def, cve) )); }); return res; @@ -149,9 +151,11 @@ private Map setupFilters(ConfigVariableExpan for (final PluginVertex vertex : filterPlugins) { final PluginDefinition def = vertex.getPluginDefinition(); final SourceWithMetadata source = vertex.getSourceWithMetadata(); + final String sourceFile = vertex.getSourceFile(); + final int sourceLine = vertex.getSourceLine(); res.put(vertex.getId(), pluginFactory.buildFilter( RubyUtil.RUBY.newString(def.getName()), RubyUtil.RUBY.newFixnum(source.getLine()), - RubyUtil.RUBY.newFixnum(source.getColumn()), convertArgs(def), convertJavaArgs(def, cve) + RubyUtil.RUBY.newFixnum(source.getColumn()), sourceFile, sourceLine, convertArgs(def), convertJavaArgs(def, cve) )); } return res; @@ -166,9 +170,11 @@ private Collection setupInputs(ConfigVariableExpander cve) { vertices.forEach(v -> { final PluginDefinition def = v.getPluginDefinition(); final SourceWithMetadata source = v.getSourceWithMetadata(); + final String sourceFile = v.getSourceFile(); + final int sourceLine = v.getSourceLine(); IRubyObject o = pluginFactory.buildInput( RubyUtil.RUBY.newString(def.getName()), RubyUtil.RUBY.newFixnum(source.getLine()), - RubyUtil.RUBY.newFixnum(source.getColumn()), convertArgs(def), convertJavaArgs(def, cve)); + RubyUtil.RUBY.newFixnum(source.getColumn()), sourceFile, sourceLine, convertArgs(def), convertJavaArgs(def, cve)); nodes.add(o); }); return nodes; @@ -188,9 +194,13 @@ private RubyHash convertArgs(final PluginDefinition def) { final String key = entry.getKey(); final Object toput; if (value instanceof PluginStatement) { - final PluginDefinition codec = ((PluginStatement) value).getPluginDefinition(); + PluginStatement pluginStatement = (PluginStatement) value; + final String sourceFile = pluginStatement.getSourceFile(); + final int sourceLine = pluginStatement.getSourceLine(); + final PluginDefinition codec = pluginStatement.getPluginDefinition(); toput = pluginFactory.buildCodec( RubyUtil.RUBY.newString(codec.getName()), + sourceFile, sourceLine, Rubyfier.deep(RubyUtil.RUBY, codec.getArguments()), codec.getArguments() ); @@ -216,10 +226,14 @@ private Map convertJavaArgs(final PluginDefinition def, ConfigVa final String key = entry.getKey(); final IRubyObject toput; if (value instanceof PluginStatement) { - final PluginDefinition codec = ((PluginStatement) value).getPluginDefinition(); + PluginStatement pluginStatement = (PluginStatement) value; + final String sourceFile = pluginStatement.getSourceFile(); + final int sourceLine = pluginStatement.getSourceLine(); + final PluginDefinition codec = pluginStatement.getPluginDefinition(); Map codecArgs = expandConfigVariables(cve, codec.getArguments()); toput = pluginFactory.buildCodec( RubyUtil.RUBY.newString(codec.getName()), + sourceFile, sourceLine, Rubyfier.deep(RubyUtil.RUBY, codec.getArguments()), codecArgs ); diff --git a/logstash-core/src/main/java/org/logstash/config/ir/ConfigCompiler.java b/logstash-core/src/main/java/org/logstash/config/ir/ConfigCompiler.java index 668563fc6bb..98b6a62f7b8 100644 --- a/logstash-core/src/main/java/org/logstash/config/ir/ConfigCompiler.java +++ b/logstash-core/src/main/java/org/logstash/config/ir/ConfigCompiler.java @@ -17,27 +17,18 @@ private ConfigCompiler() { } /** - * @param config Logstash Config String + * @param pipelineConfig Logstash single pipeline's Config * @param supportEscapes The value of the setting {@code config.support_escapes} * @return Compiled {@link PipelineIR} - * @throws IncompleteSourceWithMetadataException On Broken Configuration */ - public static PipelineIR configToPipelineIR(final String config, final boolean supportEscapes) - throws IncompleteSourceWithMetadataException { + public static PipelineIR configToPipelineIR(final IRubyObject pipelineConfig, final boolean supportEscapes) { final IRubyObject compiler = RubyUtil.RUBY.executeScript( "require 'logstash/compiler'\nLogStash::Compiler", "" ); final IRubyObject code = compiler.callMethod(RubyUtil.RUBY.getCurrentContext(), "compile_sources", - new IRubyObject[]{ - RubyUtil.RUBY.newArray( - JavaUtil.convertJavaToRuby( - RubyUtil.RUBY, - new SourceWithMetadata("str", "pipeline", 0, 0, config) - ) - ), - RubyUtil.RUBY.newBoolean(supportEscapes) + new IRubyObject[]{pipelineConfig, RubyUtil.RUBY.newBoolean(supportEscapes) } ); return code.toJava(PipelineIR.class); diff --git a/logstash-core/src/main/java/org/logstash/config/ir/ConfigSourceSegment.java b/logstash-core/src/main/java/org/logstash/config/ir/ConfigSourceSegment.java new file mode 100644 index 00000000000..6b5898b0cc7 --- /dev/null +++ b/logstash-core/src/main/java/org/logstash/config/ir/ConfigSourceSegment.java @@ -0,0 +1,36 @@ +package org.logstash.config.ir; + +public final class ConfigSourceSegment { + + private final String source; + private final int offset; + private final int length; + + public ConfigSourceSegment(String source, int offset, int length) { + this.source = source; + this.offset = offset; + this.length = length; + } + + public String getSource() { + return source; + } + + public int getLength() { + return length; + } + + public boolean contains(int lineNumber) { + int rebased_line_number = lineNumber - this.offset; + return 1 <= rebased_line_number && rebased_line_number <= this.length; + } + + public int rebase(int lineNumber) { + return lineNumber - this.offset; + } + + @Override + public String toString() { + return "ConfigSourceSegment{source='" + source + "', offset=" + offset + ", length=" + length + '}'; + } +} diff --git a/logstash-core/src/main/java/org/logstash/config/ir/DSL.java b/logstash-core/src/main/java/org/logstash/config/ir/DSL.java index be5905a729e..744f7cb4bde 100644 --- a/logstash-core/src/main/java/org/logstash/config/ir/DSL.java +++ b/logstash-core/src/main/java/org/logstash/config/ir/DSL.java @@ -5,6 +5,8 @@ import java.util.Map; import java.util.UUID; +import org.jruby.RubyInteger; +import org.jruby.RubyString; import org.logstash.common.IncompleteSourceWithMetadataException; import org.logstash.common.SourceWithMetadata; import org.logstash.config.ir.expression.BooleanExpression; @@ -230,6 +232,11 @@ public static NoopStatement noop() { return new NoopStatement(null); } + public static PluginStatement iPlugin(SourceWithMetadata meta, PluginDefinition.Type pluginType, String pluginName, + RubyString sourceFile, RubyInteger sourceLine, Map pluginArguments) { + return new PluginStatement(meta, new PluginDefinition(pluginType, pluginName, pluginArguments), sourceFile, sourceLine); + } + public static PluginStatement iPlugin(SourceWithMetadata meta, PluginDefinition.Type pluginType, String pluginName, Map pluginArguments) { return new PluginStatement(meta, new PluginDefinition(pluginType, pluginName, pluginArguments)); } diff --git a/logstash-core/src/main/java/org/logstash/config/ir/compiler/AbstractFilterDelegatorExt.java b/logstash-core/src/main/java/org/logstash/config/ir/compiler/AbstractFilterDelegatorExt.java index 46bc7b1c3ca..94892587ef0 100644 --- a/logstash-core/src/main/java/org/logstash/config/ir/compiler/AbstractFilterDelegatorExt.java +++ b/logstash-core/src/main/java/org/logstash/config/ir/compiler/AbstractFilterDelegatorExt.java @@ -39,7 +39,7 @@ public AbstractFilterDelegatorExt(final Ruby runtime, final RubyClass metaClass) super(runtime, metaClass); } - protected void initMetrics(final String id, final AbstractNamespacedMetricExt namespacedMetric) { + protected void initMetrics(final String id, final AbstractNamespacedMetricExt namespacedMetric, String codeRef) { final ThreadContext context = RubyUtil.RUBY.getCurrentContext(); this.id = RubyString.newString(context.runtime, id); synchronized(namespacedMetric.getMetric()) { @@ -48,6 +48,9 @@ protected void initMetrics(final String id, final AbstractNamespacedMetricExt na eventMetricIn = LongCounter.fromRubyBase(metricEvents, MetricKeys.IN_KEY); eventMetricTime = LongCounter.fromRubyBase(metricEvents, MetricKeys.DURATION_IN_MILLIS_KEY); namespacedMetric.gauge(context, MetricKeys.NAME_KEY, configName(context)); + if (codeRef != null) { + namespacedMetric.gauge(context, MetricKeys.CONFIG_REF_KEY, RubyUtil.RUBY.newString(codeRef)); + } } } diff --git a/logstash-core/src/main/java/org/logstash/config/ir/compiler/AbstractOutputDelegatorExt.java b/logstash-core/src/main/java/org/logstash/config/ir/compiler/AbstractOutputDelegatorExt.java index 2df5c678c13..797f07c5ffd 100644 --- a/logstash-core/src/main/java/org/logstash/config/ir/compiler/AbstractOutputDelegatorExt.java +++ b/logstash-core/src/main/java/org/logstash/config/ir/compiler/AbstractOutputDelegatorExt.java @@ -104,7 +104,7 @@ public IRubyObject multiReceive(final IRubyObject events) { return this; } - protected void initMetrics(final String id, final AbstractMetricExt metric) { + protected void initMetrics(final String id, final AbstractMetricExt metric, final String codeRef) { this.metric = metric; final ThreadContext context = RubyUtil.RUBY.getCurrentContext(); this.id = RubyString.newString(context.runtime, id); @@ -112,6 +112,9 @@ protected void initMetrics(final String id, final AbstractMetricExt metric) { namespacedMetric = metric.namespace(context, context.runtime.newSymbol(id)); metricEvents = namespacedMetric.namespace(context, MetricKeys.EVENTS_KEY); namespacedMetric.gauge(context, MetricKeys.NAME_KEY, configName(context)); + if (codeRef != null) { + namespacedMetric.gauge(context, MetricKeys.CONFIG_REF_KEY, RubyUtil.RUBY.newString(codeRef)); + } eventMetricOut = LongCounter.fromRubyBase(metricEvents, MetricKeys.OUT_KEY); eventMetricIn = LongCounter.fromRubyBase(metricEvents, MetricKeys.IN_KEY); eventMetricTime = LongCounter.fromRubyBase(metricEvents, MetricKeys.DURATION_IN_MILLIS_KEY); diff --git a/logstash-core/src/main/java/org/logstash/config/ir/compiler/FilterDelegatorExt.java b/logstash-core/src/main/java/org/logstash/config/ir/compiler/FilterDelegatorExt.java index 6a39c9e062c..85fcef0935f 100644 --- a/logstash-core/src/main/java/org/logstash/config/ir/compiler/FilterDelegatorExt.java +++ b/logstash-core/src/main/java/org/logstash/config/ir/compiler/FilterDelegatorExt.java @@ -32,13 +32,14 @@ public final class FilterDelegatorExt extends AbstractFilterDelegatorExt { private boolean flushes; @JRubyMethod(name="initialize") - public IRubyObject initialize(final ThreadContext context, final IRubyObject filter, final IRubyObject id) { + public IRubyObject initialize(final ThreadContext context, final IRubyObject filter, final IRubyObject id, + final RubyString configRef) { this.id = (RubyString) id; this.filter = filter; filterClass = filter.getSingletonClass().getRealClass(); filterMethod = filterClass.searchMethod(FILTER_METHOD_NAME); final AbstractNamespacedMetricExt namespacedMetric = (AbstractNamespacedMetricExt) filter.callMethod(context, "metric"); - initMetrics(this.id.asJavaString(), namespacedMetric); + initMetrics(this.id.asJavaString(), namespacedMetric, configRef.asJavaString()); flushes = filter.respondsTo("flush"); return this; } diff --git a/logstash-core/src/main/java/org/logstash/config/ir/compiler/JavaCodecDelegator.java b/logstash-core/src/main/java/org/logstash/config/ir/compiler/JavaCodecDelegator.java index afa91d5389b..a308feb688c 100644 --- a/logstash-core/src/main/java/org/logstash/config/ir/compiler/JavaCodecDelegator.java +++ b/logstash-core/src/main/java/org/logstash/config/ir/compiler/JavaCodecDelegator.java @@ -41,13 +41,16 @@ public class JavaCodecDelegator implements Codec { protected final CounterMetric decodeMetricTime; - public JavaCodecDelegator(final Context context, final Codec codec) { + public JavaCodecDelegator(final Context context, final Codec codec, final String codeRef) { this.codec = codec; final NamespacedMetric metric = context.getMetric(codec); synchronized(metric.root()) { metric.gauge(MetricKeys.NAME_KEY.asJavaString(), codec.getName()); + if (codeRef != null) { + metric.gauge(MetricKeys.CONFIG_REF_KEY.asJavaString(), RubyUtil.RUBY.newString(codeRef)); + } final NamespacedMetric encodeMetric = metric.namespace(ENCODE_KEY); encodeMetricIn = encodeMetric.counter(IN_KEY); diff --git a/logstash-core/src/main/java/org/logstash/config/ir/compiler/JavaFilterDelegatorExt.java b/logstash-core/src/main/java/org/logstash/config/ir/compiler/JavaFilterDelegatorExt.java index 0e4638ec4cf..7865f12b82a 100644 --- a/logstash-core/src/main/java/org/logstash/config/ir/compiler/JavaFilterDelegatorExt.java +++ b/logstash-core/src/main/java/org/logstash/config/ir/compiler/JavaFilterDelegatorExt.java @@ -42,13 +42,14 @@ public JavaFilterDelegatorExt(final Ruby runtime, final RubyClass metaClass) { public static JavaFilterDelegatorExt create(final String configName, final String id, final AbstractNamespacedMetricExt metric, - final Filter filter, final Map pluginArgs) { + final Filter filter, final Map pluginArgs, + final String configReference) { final JavaFilterDelegatorExt instance = new JavaFilterDelegatorExt(RubyUtil.RUBY, RubyUtil.JAVA_FILTER_DELEGATOR_CLASS); instance.configName = RubyUtil.RUBY.newString(configName); AbstractNamespacedMetricExt scopedMetric = metric.namespace(RubyUtil.RUBY.getCurrentContext(), RubyUtil.RUBY.newSymbol(filter.getId())); - instance.initMetrics(id, scopedMetric); + instance.initMetrics(id, scopedMetric, configReference); instance.filter = filter; instance.initializeFilterMatchListener(pluginArgs); return instance; diff --git a/logstash-core/src/main/java/org/logstash/config/ir/compiler/JavaInputDelegatorExt.java b/logstash-core/src/main/java/org/logstash/config/ir/compiler/JavaInputDelegatorExt.java index e91367f8cd5..0b03b371406 100644 --- a/logstash-core/src/main/java/org/logstash/config/ir/compiler/JavaInputDelegatorExt.java +++ b/logstash-core/src/main/java/org/logstash/config/ir/compiler/JavaInputDelegatorExt.java @@ -38,11 +38,12 @@ public JavaInputDelegatorExt(Ruby runtime, RubyClass metaClass) { public static JavaInputDelegatorExt create(final JavaBasePipelineExt pipeline, final AbstractNamespacedMetricExt metric, final Input input, - final Map pluginArgs) { + final Map pluginArgs, String configRef) { final JavaInputDelegatorExt instance = new JavaInputDelegatorExt(RubyUtil.RUBY, RubyUtil.JAVA_INPUT_DELEGATOR_CLASS); AbstractNamespacedMetricExt scopedMetric = metric.namespace(RubyUtil.RUBY.getCurrentContext(), RubyUtil.RUBY.newSymbol(input.getId())); scopedMetric.gauge(RubyUtil.RUBY.getCurrentContext(), MetricKeys.NAME_KEY, RubyUtil.RUBY.newString(input.getName())); + scopedMetric.gauge(RubyUtil.RUBY.getCurrentContext(), MetricKeys.CONFIG_REF_KEY, RubyUtil.RUBY.newString(configRef)); instance.setMetric(RubyUtil.RUBY.getCurrentContext(), scopedMetric); instance.input = input; instance.pipeline = pipeline; diff --git a/logstash-core/src/main/java/org/logstash/config/ir/compiler/JavaOutputDelegatorExt.java b/logstash-core/src/main/java/org/logstash/config/ir/compiler/JavaOutputDelegatorExt.java index 3edcc2bb447..f6a5660a7c5 100644 --- a/logstash-core/src/main/java/org/logstash/config/ir/compiler/JavaOutputDelegatorExt.java +++ b/logstash-core/src/main/java/org/logstash/config/ir/compiler/JavaOutputDelegatorExt.java @@ -42,11 +42,11 @@ public JavaOutputDelegatorExt(final Ruby runtime, final RubyClass metaClass) { public static JavaOutputDelegatorExt create(final String configName, final String id, final AbstractMetricExt metric, final Consumer> outputFunction, - final Runnable closeAction, final Runnable registerAction) { + final Runnable closeAction, final Runnable registerAction, String configReference) { final JavaOutputDelegatorExt instance = new JavaOutputDelegatorExt(RubyUtil.RUBY, RubyUtil.JAVA_OUTPUT_DELEGATOR_CLASS); instance.configName = RubyUtil.RUBY.newString(configName); - instance.initMetrics(id, metric); + instance.initMetrics(id, metric, configReference); instance.outputFunction = outputFunction; instance.closeAction = closeAction; instance.registerAction = registerAction; @@ -55,11 +55,11 @@ public static JavaOutputDelegatorExt create(final String configName, final Strin public static JavaOutputDelegatorExt create(final String configName, final String id, final AbstractMetricExt metric, - final Output output) { + final Output output, String configReference) { final JavaOutputDelegatorExt instance = new JavaOutputDelegatorExt(RubyUtil.RUBY, RubyUtil.JAVA_OUTPUT_DELEGATOR_CLASS); instance.configName = RubyUtil.RUBY.newString(configName); - instance.initMetrics(id, metric); + instance.initMetrics(id, metric, configReference); instance.output = output; instance.outputFunction = instance::outputRubyEvents; instance.closeAction = instance::outputClose; diff --git a/logstash-core/src/main/java/org/logstash/config/ir/compiler/OutputDelegatorExt.java b/logstash-core/src/main/java/org/logstash/config/ir/compiler/OutputDelegatorExt.java index 2e6c5516c11..dad62846f6c 100644 --- a/logstash-core/src/main/java/org/logstash/config/ir/compiler/OutputDelegatorExt.java +++ b/logstash-core/src/main/java/org/logstash/config/ir/compiler/OutputDelegatorExt.java @@ -11,6 +11,7 @@ import org.jruby.runtime.Block; import org.jruby.runtime.ThreadContext; import org.jruby.runtime.builtin.IRubyObject; +import org.logstash.RubyUtil; import org.logstash.execution.ExecutionContextExt; import org.logstash.execution.WorkerLoop; import org.logstash.ext.JrubyEventExtLibrary; @@ -39,14 +40,17 @@ public OutputDelegatorExt initialize(final ThreadContext context, final RubyHash final ExecutionContextExt executionContext, final OutputStrategyExt.OutputStrategyRegistryExt strategyRegistry) { this.outputClass = outputClass; - initMetrics( - args.op_aref(context, RubyString.newString(context.runtime, "id")).asJavaString(), - metric - ); + + String id = args.op_aref(context, RubyString.newString(context.runtime, "id")).asJavaString(); + RubyString configRefKey = RubyString.newString(context.runtime, "config-ref"); + String configRef = args.op_aref(context, configRefKey).asJavaString(); + initMetrics(id, metric, configRef); + + args.remove(configRefKey); strategy = (OutputStrategyExt.AbstractOutputStrategyExt) strategyRegistry.classFor( context, concurrency(context) ).newInstance( - context, new IRubyObject[]{outputClass, namespacedMetric, executionContext, args}, + context, new IRubyObject[]{outputClass, namespacedMetric, executionContext, RubyUtil.RUBY.newString(configRef), args}, Block.NULL_BLOCK ); return this; diff --git a/logstash-core/src/main/java/org/logstash/config/ir/compiler/OutputStrategyExt.java b/logstash-core/src/main/java/org/logstash/config/ir/compiler/OutputStrategyExt.java index 4f90b86a823..c245f213fe6 100644 --- a/logstash-core/src/main/java/org/logstash/config/ir/compiler/OutputStrategyExt.java +++ b/logstash-core/src/main/java/org/logstash/config/ir/compiler/OutputStrategyExt.java @@ -3,12 +3,8 @@ import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.stream.Collectors; -import org.jruby.Ruby; -import org.jruby.RubyArray; -import org.jruby.RubyClass; -import org.jruby.RubyFixnum; -import org.jruby.RubyHash; -import org.jruby.RubyObject; + +import org.jruby.*; import org.jruby.anno.JRubyClass; import org.jruby.anno.JRubyMethod; import org.jruby.internal.runtime.methods.DynamicMethod; @@ -154,9 +150,9 @@ public LegacyOutputStrategyExt(final Ruby runtime, final RubyClass metaClass) { super(runtime, metaClass); } - @JRubyMethod(required = 4) + @JRubyMethod(required = 5) public IRubyObject initialize(final ThreadContext context, final IRubyObject[] args) { - final RubyHash pluginArgs = (RubyHash) args[3]; + final RubyHash pluginArgs = (RubyHash) args[4]; workerCount = pluginArgs.op_aref(context, context.runtime.newString("workers")); if (workerCount.isNil()) { workerCount = RubyFixnum.one(context.runtime); @@ -169,6 +165,9 @@ public IRubyObject initialize(final ThreadContext context, final IRubyObject[] a // Calling "new" here manually to allow mocking the ctor in RSpec Tests final IRubyObject output = outputClass.callMethod(context, "new", pluginArgs); initOutputCallsite(outputClass); + // WARNING: order is important since metric= create gauges with data assigned from parent_config_reference= + IRubyObject codec = output.callMethod(context, "codec"); + codec.callMethod(context, "parent_config_reference=", args[3]); output.callMethod(context, "metric=", args[1]); output.callMethod(context, "execution_context=", args[2]); workers.append(output); @@ -225,12 +224,23 @@ protected SimpleAbstractOutputStrategyExt(final Ruby runtime, final RubyClass me super(runtime, metaClass); } - @JRubyMethod(required = 4) + @JRubyMethod(required = 5) public IRubyObject initialize(final ThreadContext context, final IRubyObject[] args) { final RubyClass outputClass = (RubyClass) args[0]; // Calling "new" here manually to allow mocking the ctor in RSpec Tests - output = args[0].callMethod(context, "new", args[3]); + output = args[0].callMethod(context, "new", args[4]); initOutputCallsite(outputClass); + + final IRubyObject codecDelegatorClass = RubyUtil.RUBY.executeScript( + "require 'logstash/codecs/delegator'\nLogStash::Codecs::Delegator", + ""); + // WARNING: order is important since metric= create gauges with data assigned from parent_config_reference= + IRubyObject codec = output.callMethod(context, "codec"); + if (codec instanceof RubyBasicObject) { + if (((RubyBasicObject) codec).instance_of_p(context, codecDelegatorClass).isTrue()) { + codec.callMethod(context, "parent_config_reference=", args[3]); + } + } output.callMethod(context, "metric=", args[1]); output.callMethod(context, "execution_context=", args[2]); return this; diff --git a/logstash-core/src/main/java/org/logstash/config/ir/compiler/PluginFactory.java b/logstash-core/src/main/java/org/logstash/config/ir/compiler/PluginFactory.java index 304a720f423..516f56b936b 100644 --- a/logstash-core/src/main/java/org/logstash/config/ir/compiler/PluginFactory.java +++ b/logstash-core/src/main/java/org/logstash/config/ir/compiler/PluginFactory.java @@ -42,27 +42,31 @@ public Filter buildFilter(final String name, final String id, final Configuratio @Override public IRubyObject buildInput(final RubyString name, final RubyInteger line, final RubyInteger column, + final String sourceFile, final int sourceLine, final IRubyObject args, Map pluginArgs) { - return rubyFactory.buildInput(name, line, column, args, pluginArgs); + return rubyFactory.buildInput(name, line, column, sourceFile, sourceLine, args, pluginArgs); } @Override public AbstractOutputDelegatorExt buildOutput(final RubyString name, final RubyInteger line, - final RubyInteger column, final IRubyObject args, + final RubyInteger column, final String sourceFile, + final int sourceLine, final IRubyObject args, final Map pluginArgs) { - return rubyFactory.buildOutput(name, line, column, args, pluginArgs); + return rubyFactory.buildOutput(name, line, column, sourceFile, sourceLine, args, pluginArgs); } @Override public AbstractFilterDelegatorExt buildFilter(final RubyString name, final RubyInteger line, - final RubyInteger column, final IRubyObject args, + final RubyInteger column, final String sourceFile, + final int sourceLine, final IRubyObject args, final Map pluginArgs) { - return rubyFactory.buildFilter(name, line, column, args, pluginArgs); + return rubyFactory.buildFilter(name, line, column, sourceFile, sourceLine, args, pluginArgs); } @Override - public IRubyObject buildCodec(final RubyString name, final IRubyObject args, Map pluginArgs) { - return rubyFactory.buildCodec(name, args, pluginArgs); + public IRubyObject buildCodec(final RubyString name, final String sourceFile, final int sourceLine, + final IRubyObject args, Map pluginArgs) { + return rubyFactory.buildCodec(name, sourceFile, sourceLine, args, pluginArgs); } @Override diff --git a/logstash-core/src/main/java/org/logstash/config/ir/compiler/RubyIntegration.java b/logstash-core/src/main/java/org/logstash/config/ir/compiler/RubyIntegration.java index d840597294a..abd89816e1d 100644 --- a/logstash-core/src/main/java/org/logstash/config/ir/compiler/RubyIntegration.java +++ b/logstash-core/src/main/java/org/logstash/config/ir/compiler/RubyIntegration.java @@ -21,16 +21,16 @@ private RubyIntegration() { */ public interface PluginFactory { - IRubyObject buildInput(RubyString name, RubyInteger line, RubyInteger column, + IRubyObject buildInput(RubyString name, RubyInteger line, RubyInteger column, String sourceFile, int sourceLine, IRubyObject args, Map pluginArgs); - AbstractOutputDelegatorExt buildOutput(RubyString name, RubyInteger line, RubyInteger column, + AbstractOutputDelegatorExt buildOutput(RubyString name, RubyInteger line, RubyInteger column, String sourceFile, int sourceLine, IRubyObject args, Map pluginArgs); - AbstractFilterDelegatorExt buildFilter(RubyString name, RubyInteger line, RubyInteger column, IRubyObject args, + AbstractFilterDelegatorExt buildFilter(RubyString name, RubyInteger line, RubyInteger column, String sourceFile, int sourceLine, IRubyObject args, Map pluginArgs); - IRubyObject buildCodec(RubyString name, IRubyObject args, Map pluginArgs); + IRubyObject buildCodec(RubyString name, String sourceFile, int sourceLine, IRubyObject args, Map pluginArgs); Codec buildDefaultCodec(String codecName); diff --git a/logstash-core/src/main/java/org/logstash/config/ir/graph/PluginVertex.java b/logstash-core/src/main/java/org/logstash/config/ir/graph/PluginVertex.java index 4187c0ae942..ced17eeb1e2 100644 --- a/logstash-core/src/main/java/org/logstash/config/ir/graph/PluginVertex.java +++ b/logstash-core/src/main/java/org/logstash/config/ir/graph/PluginVertex.java @@ -6,16 +6,23 @@ public class PluginVertex extends Vertex { private final PluginDefinition pluginDefinition; + private final String sourceFile; + private final int sourceLine; public PluginDefinition getPluginDefinition() { return pluginDefinition; } - public PluginVertex(SourceWithMetadata meta, PluginDefinition pluginDefinition) { + this(meta, pluginDefinition, null, -1); + } + + public PluginVertex(SourceWithMetadata meta, PluginDefinition pluginDefinition, String sourceFile, int sourceLine) { // We know that if the ID value exists it will be as a string super(meta, (String) pluginDefinition.getArguments().get("id")); this.pluginDefinition = pluginDefinition; + this.sourceFile = sourceFile; + this.sourceLine = sourceLine; } public String toString() { @@ -24,7 +31,7 @@ public String toString() { @Override public PluginVertex copy() { - return new PluginVertex(this.getSourceWithMetadata(), pluginDefinition); + return new PluginVertex(this.getSourceWithMetadata(), pluginDefinition, sourceFile, sourceLine); } @Override @@ -39,4 +46,12 @@ public boolean sourceComponentEquals(SourceComponent other) { } return false; } + + public String getSourceFile() { + return sourceFile; + } + + public int getSourceLine() { + return sourceLine; + } } diff --git a/logstash-core/src/main/java/org/logstash/config/ir/imperative/PluginStatement.java b/logstash-core/src/main/java/org/logstash/config/ir/imperative/PluginStatement.java index 934ab536d43..9abb0ef31fc 100644 --- a/logstash-core/src/main/java/org/logstash/config/ir/imperative/PluginStatement.java +++ b/logstash-core/src/main/java/org/logstash/config/ir/imperative/PluginStatement.java @@ -1,5 +1,7 @@ package org.logstash.config.ir.imperative; +import org.jruby.RubyInteger; +import org.jruby.RubyString; import org.logstash.config.ir.SourceComponent; import org.logstash.config.ir.InvalidIRException; import org.logstash.config.ir.PluginDefinition; @@ -10,12 +12,36 @@ public class PluginStatement extends Statement { private final PluginDefinition pluginDefinition; + private String sourceFile; + private int sourceLine; public PluginStatement(SourceWithMetadata meta, PluginDefinition pluginDefinition) { super(meta); this.pluginDefinition = pluginDefinition; } + public PluginStatement(SourceWithMetadata meta, PluginDefinition pluginDefinition, RubyString sourceFile, RubyInteger sourceLine) { + this(meta, pluginDefinition); + if (sourceFile == null) { + this.sourceFile = ""; + } else { + this.sourceFile = sourceFile.asJavaString(); + } + if (sourceLine == null) { + this.sourceLine = -1; + } else { + this.sourceLine = sourceLine.getIntValue(); + } + } + + public String getSourceFile() { + return sourceFile; + } + + public int getSourceLine() { + return sourceLine; + } + @Override public boolean sourceComponentEquals(SourceComponent sourceComponent) { if (sourceComponent == null) return false; @@ -34,7 +60,7 @@ public String toString(int indent) { @Override public Graph toGraph() throws InvalidIRException { - Vertex pluginVertex = new PluginVertex(getSourceWithMetadata(), pluginDefinition); + Vertex pluginVertex = new PluginVertex(getSourceWithMetadata(), pluginDefinition, sourceFile, sourceLine); Graph g = Graph.empty(); g.addVertex(pluginVertex); return g; diff --git a/logstash-core/src/main/java/org/logstash/execution/AbstractPipelineExt.java b/logstash-core/src/main/java/org/logstash/execution/AbstractPipelineExt.java index b726ff4a84c..dd4054ce4b0 100644 --- a/logstash-core/src/main/java/org/logstash/execution/AbstractPipelineExt.java +++ b/logstash-core/src/main/java/org/logstash/execution/AbstractPipelineExt.java @@ -153,9 +153,8 @@ public final AbstractPipelineExt initialize(final ThreadContext context, ); } } - lir = ConfigCompiler.configToPipelineIR( - configString.asJavaString(), - getSetting(context, "config.support_escapes").isTrue() + lir = ConfigCompiler.configToPipelineIR(pipelineSettings, + getSetting(context, "config.support_escapes").isTrue() ); return this; } diff --git a/logstash-core/src/main/java/org/logstash/instrument/metrics/MetricKeys.java b/logstash-core/src/main/java/org/logstash/instrument/metrics/MetricKeys.java index 501d9425560..03bba52156d 100644 --- a/logstash-core/src/main/java/org/logstash/instrument/metrics/MetricKeys.java +++ b/logstash-core/src/main/java/org/logstash/instrument/metrics/MetricKeys.java @@ -13,6 +13,8 @@ private MetricKeys() { public static final RubySymbol NAME_KEY = RubyUtil.RUBY.newSymbol("name"); + public static final RubySymbol CONFIG_REF_KEY = RubyUtil.RUBY.newSymbol("config-ref"); + public static final RubySymbol EVENTS_KEY = RubyUtil.RUBY.newSymbol("events"); public static final RubySymbol OUT_KEY = RubyUtil.RUBY.newSymbol("out"); diff --git a/logstash-core/src/main/java/org/logstash/plugins/PluginFactoryExt.java b/logstash-core/src/main/java/org/logstash/plugins/PluginFactoryExt.java index f570fe0d6d9..ffc841f3774 100644 --- a/logstash-core/src/main/java/org/logstash/plugins/PluginFactoryExt.java +++ b/logstash-core/src/main/java/org/logstash/plugins/PluginFactoryExt.java @@ -7,14 +7,7 @@ import co.elastic.logstash.api.Filter; import co.elastic.logstash.api.Input; import co.elastic.logstash.api.Output; -import org.jruby.Ruby; -import org.jruby.RubyArray; -import org.jruby.RubyBasicObject; -import org.jruby.RubyClass; -import org.jruby.RubyHash; -import org.jruby.RubyInteger; -import org.jruby.RubyString; -import org.jruby.RubySymbol; +import org.jruby.*; import org.jruby.anno.JRubyClass; import org.jruby.anno.JRubyMethod; import org.jruby.javasupport.JavaUtil; @@ -78,15 +71,17 @@ public static final class Plugins extends RubyBasicObject public static IRubyObject filterDelegator(final ThreadContext context, final IRubyObject recv, final IRubyObject[] args) { final RubyHash arguments = (RubyHash) args[2]; - final IRubyObject filterInstance = args[1].callMethod(context, "new", arguments); final RubyString id = (RubyString) arguments.op_aref(context, ID_KEY); + RubyString configRefKey = RubyString.newString(context.runtime, "config-ref"); + RubyString configRefValue = (RubyString) arguments.remove(configRefKey); + final IRubyObject filterInstance = args[1].callMethod(context, "new", arguments); filterInstance.callMethod( context, "metric=", ((AbstractMetricExt) args[3]).namespace(context, id.intern()) ); filterInstance.callMethod(context, "execution_context=", args[4]); return new FilterDelegatorExt(context.runtime, RubyUtil.FILTER_DELEGATOR_CLASS) - .initialize(context, filterInstance, id); + .initialize(context, filterInstance, id, configRefValue); } public Plugins(final Ruby runtime, final RubyClass metaClass) { @@ -116,60 +111,67 @@ public PluginFactoryExt.Plugins init(final PipelineIR lir, final PluginFactoryEx @SuppressWarnings("unchecked") @Override public IRubyObject buildInput(final RubyString name, final RubyInteger line, final RubyInteger column, + final String sourceFile, final int sourceLine, final IRubyObject args, Map pluginArgs) { return plugin( RubyUtil.RUBY.getCurrentContext(), PluginLookup.PluginType.INPUT, - name.asJavaString(), line.getIntValue(), column.getIntValue(), + name.asJavaString(), line.getIntValue(), column.getIntValue(), sourceFile, sourceLine, (Map) args, pluginArgs ); } - @JRubyMethod(required = 4) + @JRubyMethod(required = 6) public IRubyObject buildInput(final ThreadContext context, final IRubyObject[] args) { return buildInput( (RubyString) args[0], args[1].convertToInteger(), args[2].convertToInteger(), - args[3], null + args[3].asJavaString(), args[4].convertToInteger().getIntValue(), + args[5], null ); } @SuppressWarnings("unchecked") @Override public AbstractOutputDelegatorExt buildOutput(final RubyString name, final RubyInteger line, - final RubyInteger column, final IRubyObject args, + final RubyInteger column, final String sourceFile, + final int sourceLine,final IRubyObject args, Map pluginArgs) { return (AbstractOutputDelegatorExt) plugin( RubyUtil.RUBY.getCurrentContext(), PluginLookup.PluginType.OUTPUT, - name.asJavaString(), line.getIntValue(), column.getIntValue(), + name.asJavaString(), line.getIntValue(), column.getIntValue(), sourceFile, sourceLine, (Map) args, pluginArgs ); } - @JRubyMethod(required = 4) + @JRubyMethod(required = 6) public AbstractOutputDelegatorExt buildOutput(final ThreadContext context, final IRubyObject[] args) { return buildOutput( - (RubyString) args[0], args[1].convertToInteger(), args[2].convertToInteger(), args[3], null + (RubyString) args[0], args[1].convertToInteger(), args[2].convertToInteger(), args[3].asJavaString(), + args[4].convertToInteger().getIntValue(), + args[5], null ); } @SuppressWarnings("unchecked") @Override public AbstractFilterDelegatorExt buildFilter(final RubyString name, final RubyInteger line, - final RubyInteger column, final IRubyObject args, + final RubyInteger column, final String sourceFile, + final int sourceLine, final IRubyObject args, Map pluginArgs) { return (AbstractFilterDelegatorExt) plugin( RubyUtil.RUBY.getCurrentContext(), PluginLookup.PluginType.FILTER, - name.asJavaString(), line.getIntValue(), column.getIntValue(), + name.asJavaString(), line.getIntValue(), column.getIntValue(), sourceFile, sourceLine, (Map) args, pluginArgs ); } @SuppressWarnings("unchecked") @Override - public IRubyObject buildCodec(final RubyString name, final IRubyObject args, Map pluginArgs) { + public IRubyObject buildCodec(final RubyString name, final String sourceFile, final int sourceLine, + final IRubyObject args, Map pluginArgs) { return plugin( RubyUtil.RUBY.getCurrentContext(), PluginLookup.PluginType.CODEC, - name.asJavaString(), 0, 0, (Map) args, pluginArgs + name.asJavaString(), 0, 0, sourceFile, sourceLine, (Map) args, pluginArgs ); } @@ -177,27 +179,33 @@ public IRubyObject buildCodec(final RubyString name, final IRubyObject args, Map public Codec buildDefaultCodec(String codecName) { return (Codec) JavaUtil.unwrapJavaValue(plugin( RubyUtil.RUBY.getCurrentContext(), PluginLookup.PluginType.CODEC, - codecName, 0, 0, Collections.emptyMap(), Collections.emptyMap() + codecName, 0, 0, null, 0, Collections.emptyMap(), Collections.emptyMap() )); } @SuppressWarnings("unchecked") - @JRubyMethod(required = 4, optional = 1) + @JRubyMethod(required = 6, optional = 1) public IRubyObject plugin(final ThreadContext context, final IRubyObject[] args) { + String sourceFile = args[4].convertToString().asJavaString(); + int sourceLine = args[5].convertToInteger().getIntValue(); + return plugin( context, PluginLookup.PluginType.valueOf(args[0].asJavaString().toUpperCase(Locale.ENGLISH)), args[1].asJavaString(), args[2].convertToInteger().getIntValue(), args[3].convertToInteger().getIntValue(), - args.length > 4 ? (Map) args[4] : new HashMap<>(), + sourceFile, + sourceLine, + args.length > 6 ? (Map) args[6] : new HashMap<>(), null ); } @SuppressWarnings("unchecked") private IRubyObject plugin(final ThreadContext context, final PluginLookup.PluginType type, final String name, - final int line, final int column, final Map args, + final int line, final int column, final String sourceFile, final int sourceLine, + final Map args, Map pluginArgs) { final String id; final PluginLookup.PluginClass pluginClass = PluginLookup.lookup(type, name); @@ -228,6 +236,13 @@ private IRubyObject plugin(final ThreadContext context, final PluginLookup.Plugi pluginsById.add(id); final AbstractNamespacedMetricExt typeScopedMetric = metrics.create(context, type.rubyLabel()); + final String configReference; + if (sourceFile == null) { + configReference = "S: , L:" + line + ", C:" + column; + } else { + configReference = "S: " + sourceFile + ", L:" + sourceLine + ", C:" + column; + } + if (pluginClass.language() == PluginLookup.PluginLanguage.RUBY) { final Map newArgs = new HashMap<>(args); @@ -239,6 +254,7 @@ private IRubyObject plugin(final ThreadContext context, final PluginLookup.Plugi final RubyHash rubyArgs = RubyHash.newHash(context.runtime); rubyArgs.putAll(newArgs); if (type == PluginLookup.PluginType.OUTPUT) { + rubyArgs.put("config-ref", configReference); return new OutputDelegatorExt(context.runtime, RubyUtil.RUBY_OUTPUT_DELEGATOR_CLASS).initialize( context, new IRubyObject[]{ @@ -248,6 +264,7 @@ private IRubyObject plugin(final ThreadContext context, final PluginLookup.Plugi } ); } else if (type == PluginLookup.PluginType.FILTER) { + rubyArgs.putAll(Collections.singletonMap("config-ref", configReference)); return filterDelegator( context, null, new IRubyObject[]{ @@ -258,8 +275,21 @@ private IRubyObject plugin(final ThreadContext context, final PluginLookup.Plugi final IRubyObject pluginInstance = klass.callMethod(context, "new", rubyArgs); final AbstractNamespacedMetricExt scopedMetric = typeScopedMetric.namespace(context, RubyUtil.RUBY.newSymbol(id)); scopedMetric.gauge(context, MetricKeys.NAME_KEY, pluginInstance.callMethod(context, "config_name")); - pluginInstance.callMethod(context, "metric=", scopedMetric); - pluginInstance.callMethod(context, "execution_context=", executionCntx); + scopedMetric.gauge(context, MetricKeys.CONFIG_REF_KEY, RubyUtil.RUBY.newString(configReference)); + + if (type == PluginLookup.PluginType.INPUT) { + final IRubyObject codecDelegatorClass = RubyUtil.RUBY.executeScript( + "require 'logstash/codecs/delegator'\nLogStash::Codecs::Delegator", + ""); + + // WARNING: order is important since metric= create gauges with data assigned from parent_config_reference= + IRubyObject codec = pluginInstance.callMethod(context, "codec"); + if (codec.getType().instance_of_p(context, codecDelegatorClass).isTrue()) { + codec.callMethod(context, "parent_config_reference=", RubyUtil.RUBY.newString(configReference)); + } + pluginInstance.callMethod(context, "metric=", scopedMetric); + pluginInstance.callMethod(context, "execution_context=", executionCntx); + } return pluginInstance; } } else { @@ -287,7 +317,7 @@ private IRubyObject plugin(final ThreadContext context, final PluginLookup.Plugi } if (output != null) { - return JavaOutputDelegatorExt.create(name, id, typeScopedMetric, output); + return JavaOutputDelegatorExt.create(name, id, typeScopedMetric, output, configReference); } else { throw new IllegalStateException("Unable to instantiate output: " + pluginClass); } @@ -309,7 +339,7 @@ private IRubyObject plugin(final ThreadContext context, final PluginLookup.Plugi } if (filter != null) { - return JavaFilterDelegatorExt.create(name, id, typeScopedMetric, filter, pluginArgs); + return JavaFilterDelegatorExt.create(name, id, typeScopedMetric, filter, pluginArgs, configReference); } else { throw new IllegalStateException("Unable to instantiate filter: " + pluginClass); } @@ -331,7 +361,7 @@ private IRubyObject plugin(final ThreadContext context, final PluginLookup.Plugi } if (input != null) { - return JavaInputDelegatorExt.create((JavaBasePipelineExt) executionContext.pipeline, typeScopedMetric, input, pluginArgs); + return JavaInputDelegatorExt.create((JavaBasePipelineExt) executionContext.pipeline, typeScopedMetric, input, pluginArgs, configReference); } else { throw new IllegalStateException("Unable to instantiate input: " + pluginClass); } @@ -344,7 +374,7 @@ private IRubyObject plugin(final ThreadContext context, final PluginLookup.Plugi final Context pluginContext = executionContext.toContext(type, metrics.getRoot(context)); final Codec codec = ctor.newInstance(config, pluginContext); PluginUtil.validateConfig(codec, config); - return JavaUtil.convertJavaToRuby(RubyUtil.RUBY, new JavaCodecDelegator(pluginContext, codec)); + return JavaUtil.convertJavaToRuby(RubyUtil.RUBY, new JavaCodecDelegator(pluginContext, codec, configReference)); } catch (NoSuchMethodException | IllegalAccessException | InstantiationException | InvocationTargetException ex) { if (ex instanceof InvocationTargetException && ex.getCause() != null) { throw new IllegalStateException((ex).getCause()); diff --git a/logstash-core/src/test/java/org/logstash/config/ir/CompiledPipelineTest.java b/logstash-core/src/test/java/org/logstash/config/ir/CompiledPipelineTest.java index 70642c10621..c8129bfb894 100644 --- a/logstash-core/src/test/java/org/logstash/config/ir/CompiledPipelineTest.java +++ b/logstash-core/src/test/java/org/logstash/config/ir/CompiledPipelineTest.java @@ -14,8 +14,10 @@ import java.util.function.Supplier; import org.hamcrest.CoreMatchers; import org.hamcrest.MatcherAssert; +import org.jruby.RubyArray; import org.jruby.RubyInteger; import org.jruby.RubyString; +import org.jruby.javasupport.JavaUtil; import org.jruby.runtime.builtin.IRubyObject; import org.junit.After; import org.junit.Before; @@ -25,6 +27,7 @@ import org.logstash.Event; import org.logstash.RubyUtil; import org.logstash.common.IncompleteSourceWithMetadataException; +import org.logstash.common.SourceWithMetadata; import org.logstash.config.ir.compiler.AbstractFilterDelegatorExt; import org.logstash.config.ir.compiler.AbstractOutputDelegatorExt; import org.logstash.config.ir.compiler.FilterDelegatorExt; @@ -94,10 +97,35 @@ public void afterEach() { EVENT_SINKS.remove(runId); } + static IRubyObject createRubyPipelineConfig(String configString) throws IncompleteSourceWithMetadataException { + final IRubyObject pipelineConfigClass = RubyUtil.RUBY.executeScript( + "require 'logstash/config/pipeline_config'\nLogStash::Config::PipelineConfig", + "" + ); + + @SuppressWarnings("unchecked") + RubyArray configParts = (RubyArray) RubyUtil.RUBY.newArray( + JavaUtil.convertJavaToRuby( + RubyUtil.RUBY, + new SourceWithMetadata("str", "pipeline", 0, 0, configString) + ) + ); + + final IRubyObject pipelineConfig = + pipelineConfigClass.callMethod(RubyUtil.RUBY.getCurrentContext(), "new", + new IRubyObject[] { + RubyUtil.RUBY.getNil(), /*source*/ + RubyUtil.RUBY.newString("main"), /*pipeline_id*/ + configParts, + RubyUtil.RUBY.getNil(), /*settings*/ + }); + return pipelineConfig; + } + @Test public void buildsTrivialPipeline() throws Exception { final PipelineIR pipelineIR = ConfigCompiler.configToPipelineIR( - "input {mockinput{}} output{mockoutput{}}", false + createRubyPipelineConfig("input {mockinput{}} output{mockoutput{}}"), false ); final JrubyEventExtLibrary.RubyEvent testEvent = JrubyEventExtLibrary.RubyEvent.newRubyEvent(RubyUtil.RUBY, new Event()); @@ -116,7 +144,7 @@ public void buildsTrivialPipeline() throws Exception { @Test public void buildsStraightPipeline() throws Exception { final PipelineIR pipelineIR = ConfigCompiler.configToPipelineIR( - "input {mockinput{}} filter { mockfilter {} mockfilter {} mockfilter {}} output{mockoutput{}}", + createRubyPipelineConfig("input {mockinput{}} filter { mockfilter {} mockfilter {} mockfilter {}} output{mockoutput{}}"), false ); final JrubyEventExtLibrary.RubyEvent testEvent = @@ -137,14 +165,14 @@ public void buildsStraightPipeline() throws Exception { @Test public void buildsForkedPipeline() throws Exception { final PipelineIR pipelineIR = ConfigCompiler.configToPipelineIR( - "input {mockinput{}} filter { " + + createRubyPipelineConfig("input {mockinput{}} filter { " + "if [foo] != \"bar\" { " + "mockfilter {} " + "mockaddfilter {} " + "if [foo] != \"bar\" { " + "mockfilter {} " + "}} " + - "} output {mockoutput{} }", + "} output {mockoutput{} }"), false ); final JrubyEventExtLibrary.RubyEvent testEvent = @@ -268,9 +296,9 @@ private void verifyRegex(String operator, int expectedEvents) new CompiledPipeline( ConfigCompiler.configToPipelineIR( - "input {mockinput{}} output { " + + createRubyPipelineConfig("input {mockinput{}} output { " + String.format("if \"z\" %s /z/ { ", operator) + - " mockoutput{} } }", + " mockoutput{} } }"), false ), new CompiledPipelineTest.MockPluginFactory( @@ -289,7 +317,7 @@ private void verifyRegex(String operator, int expectedEvents) @Test public void equalityCheckOnCompositeField() throws Exception { final PipelineIR pipelineIR = ConfigCompiler.configToPipelineIR( - "input {mockinput{}} filter { if 4 == [list] { mockaddfilter {} } if 5 == [map] { mockaddfilter {} } } output {mockoutput{} }", + createRubyPipelineConfig("input {mockinput{}} filter { if 4 == [list] { mockaddfilter {} } if 5 == [map] { mockaddfilter {} } } output {mockoutput{} }"), false ); final Collection s = new ArrayList<>(); @@ -320,7 +348,7 @@ public void equalityCheckOnCompositeField() throws Exception { @Test public void conditionalWithNullField() throws Exception { final PipelineIR pipelineIR = ConfigCompiler.configToPipelineIR( - "input {mockinput{}} filter { if [foo] == [bar] { mockaddfilter {} } } output {mockoutput{} }", + createRubyPipelineConfig("input {mockinput{}} filter { if [foo] == [bar] { mockaddfilter {} } } output {mockoutput{} }"), false ); final JrubyEventExtLibrary.RubyEvent testEvent = @@ -344,7 +372,7 @@ public void conditionalWithNullField() throws Exception { @Test public void conditionalNestedMetaFieldPipeline() throws Exception { final PipelineIR pipelineIR = ConfigCompiler.configToPipelineIR( - "input {mockinput{}} filter { if [@metadata][foo][bar] { mockaddfilter {} } } output {mockoutput{} }", + createRubyPipelineConfig("input {mockinput{}} filter { if [@metadata][foo][bar] { mockaddfilter {} } } output {mockoutput{} }"), false ); final JrubyEventExtLibrary.RubyEvent testEvent = @@ -369,7 +397,7 @@ public void conditionalNestedMetaFieldPipeline() throws Exception { @Test public void moreThan255Parents() throws Exception { final PipelineIR pipelineIR = ConfigCompiler.configToPipelineIR( - "input {mockinput{}} filter { " + + createRubyPipelineConfig("input {mockinput{}} filter { " + "if [foo] != \"bar\" { " + "mockfilter {} " + "mockaddfilter {} " + @@ -377,7 +405,7 @@ public void moreThan255Parents() throws Exception { "mockfilter {} " + Strings.repeat("} else if [foo] != \"bar\" {" + "mockfilter {} ", 300) + " } } " + - "} output {mockoutput{} }", + "} output {mockoutput{} }"), false ); final JrubyEventExtLibrary.RubyEvent testEvent = @@ -426,11 +454,11 @@ private void verifyComparison(final boolean expected, final String conditional, new CompiledPipeline( ConfigCompiler.configToPipelineIR( - "input {mockinput{}} filter { " + + createRubyPipelineConfig("input {mockinput{}} filter { " + String.format("if %s { ", conditional) + " mockaddfilter {} " + "} " + - "} output {mockoutput{} }", + "} output {mockoutput{} }"), false ), new CompiledPipelineTest.MockPluginFactory( @@ -467,29 +495,32 @@ static final class MockPluginFactory implements PluginFactory { private final Map>>> outputs; MockPluginFactory(final Map> inputs, - final Map> filters, - final Map>>> outputs - ) { + final Map> filters, + final Map>>> outputs) { this.inputs = inputs; this.filters = filters; this.outputs = outputs; } @Override - public IRubyObject buildInput(final RubyString name, final RubyInteger line, - final RubyInteger column, final IRubyObject args, Map pluginArgs) { + public IRubyObject buildInput(final RubyString name, final RubyInteger line, final RubyInteger column, + final String sourceFile, final int sourceLine, final IRubyObject args, + Map pluginArgs) { return setupPlugin(name, inputs); } @Override public AbstractOutputDelegatorExt buildOutput(final RubyString name, final RubyInteger line, - final RubyInteger column, final IRubyObject args, Map pluginArgs) { + final RubyInteger column, final String sourceFile, final int sourceLine, final IRubyObject args, + Map pluginArgs) + { return PipelineTestUtil.buildOutput(setupPlugin(name, outputs)); } @Override public AbstractFilterDelegatorExt buildFilter(final RubyString name, final RubyInteger line, - final RubyInteger column, final IRubyObject args, + final RubyInteger column, final String sourceFile, + final int sourceLine, final IRubyObject args, Map pluginArgs) { return new FilterDelegatorExt( RubyUtil.RUBY, RubyUtil.FILTER_DELEGATOR_CLASS) @@ -497,7 +528,8 @@ public AbstractFilterDelegatorExt buildFilter(final RubyString name, final RubyI } @Override - public IRubyObject buildCodec(final RubyString name, final IRubyObject args, Map pluginArgs) { + public IRubyObject buildCodec(final RubyString name, final String sourceFile, final int sourceLine, + final IRubyObject args, Map pluginArgs) { throw new IllegalStateException("No codec setup expected in this test."); } diff --git a/logstash-core/src/test/java/org/logstash/config/ir/ConfigCompilerTest.java b/logstash-core/src/test/java/org/logstash/config/ir/ConfigCompilerTest.java index 1314dd0ec8f..2e8cb91db0d 100644 --- a/logstash-core/src/test/java/org/logstash/config/ir/ConfigCompilerTest.java +++ b/logstash-core/src/test/java/org/logstash/config/ir/ConfigCompilerTest.java @@ -8,13 +8,14 @@ import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.MatcherAssert.assertThat; +import static org.logstash.config.ir.CompiledPipelineTest.createRubyPipelineConfig; public class ConfigCompilerTest extends RubyEnvTestCase { @Test public void testConfigToPipelineIR() throws Exception { final PipelineIR pipelineIR = - ConfigCompiler.configToPipelineIR("input {stdin{}} output{stdout{}}", false); + ConfigCompiler.configToPipelineIR(createRubyPipelineConfig("input {stdin{}} output{stdout{}}"), false); assertThat(pipelineIR.getOutputPluginVertices().size(), is(1)); assertThat(pipelineIR.getFilterPluginVertices().size(), is(0)); } @@ -62,6 +63,6 @@ public void testComplexConfigToPipelineIR() throws Exception { private static String graphHash(final String config) throws IncompleteSourceWithMetadataException { - return ConfigCompiler.configToPipelineIR(config, false).uniqueHash(); + return ConfigCompiler.configToPipelineIR(createRubyPipelineConfig(config), false).uniqueHash(); } } diff --git a/logstash-core/src/test/java/org/logstash/config/ir/EventConditionTest.java b/logstash-core/src/test/java/org/logstash/config/ir/EventConditionTest.java index 0d47a4a1d18..59fd9890665 100644 --- a/logstash-core/src/test/java/org/logstash/config/ir/EventConditionTest.java +++ b/logstash-core/src/test/java/org/logstash/config/ir/EventConditionTest.java @@ -19,6 +19,7 @@ import java.util.function.Supplier; import static org.logstash.config.ir.CompiledPipelineTest.IDENTITY_FILTER; +import static org.logstash.config.ir.CompiledPipelineTest.createRubyPipelineConfig; import static org.logstash.ext.JrubyEventExtLibrary.RubyEvent; import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.MatcherAssert.assertThat; @@ -55,12 +56,12 @@ public void afterEach() { @SuppressWarnings("rawtypes") public void testInclusionWithFieldInField() throws Exception { final PipelineIR pipelineIR = ConfigCompiler.configToPipelineIR( - "input {mockinput{}} filter { " + + createRubyPipelineConfig("input {mockinput{}} filter { " + "mockfilter {} } " + "output { " + " if [left] in [right] { " + " mockoutput{}" + - " } }", + " } }"), false ); @@ -136,12 +137,12 @@ public void testConditionWithConstantEmptyStringValue() throws Exception { private void testConditionWithConstantValue(String condition, int expectedMatches) throws Exception { final PipelineIR pipelineIR = ConfigCompiler.configToPipelineIR( - "input {mockinput{}} filter { " + + createRubyPipelineConfig("input {mockinput{}} filter { " + "mockfilter {} } " + "output { " + " if " + condition + " { " + " mockoutput{}" + - " } }", + " } }"), false ); diff --git a/logstash-core/src/test/java/org/logstash/config/ir/PipelineTestUtil.java b/logstash-core/src/test/java/org/logstash/config/ir/PipelineTestUtil.java index e5142ab1ec5..c33f9d6547f 100644 --- a/logstash-core/src/test/java/org/logstash/config/ir/PipelineTestUtil.java +++ b/logstash-core/src/test/java/org/logstash/config/ir/PipelineTestUtil.java @@ -17,7 +17,7 @@ public static AbstractOutputDelegatorExt buildOutput( final Consumer> consumer) { return JavaOutputDelegatorExt.create( "someClassName", "someId", NullMetricExt.create(), consumer, () -> {}, - () -> {} + () -> {}, "L:test, C:test" ); } } diff --git a/logstash-core/src/test/java/org/logstash/config/ir/compiler/FakeOutClass.java b/logstash-core/src/test/java/org/logstash/config/ir/compiler/FakeOutClass.java index 1d8e00f0116..cb02d714f55 100644 --- a/logstash-core/src/test/java/org/logstash/config/ir/compiler/FakeOutClass.java +++ b/logstash-core/src/test/java/org/logstash/config/ir/compiler/FakeOutClass.java @@ -7,6 +7,7 @@ import org.jruby.anno.JRubyMethod; import org.jruby.runtime.ThreadContext; import org.jruby.runtime.builtin.IRubyObject; +import org.logstash.RubyUtil; import static org.logstash.RubyUtil.RUBY; @@ -60,6 +61,19 @@ public IRubyObject register() { return this; } + @JRubyMethod(name = "codec") + public IRubyObject codec() { + final IRubyObject codecDelegatorClass = RubyUtil.RUBY.executeScript( + "require 'logstash/codecs/delegator'\nLogStash::Codecs::Delegator", + "" + ); + final IRubyObject codecDelegator = + codecDelegatorClass.callMethod(RubyUtil.RUBY.getCurrentContext(), "new", + new IRubyObject[]{RubyUtil.RUBY.newString("Fake Codec Object"), null} + ); + return codecDelegator; + } + @JRubyMethod(name = "metric=") public IRubyObject metric(final IRubyObject args) { this.metricArgs = args; diff --git a/logstash-core/src/test/java/org/logstash/config/ir/compiler/JavaCodecDelegatorTest.java b/logstash-core/src/test/java/org/logstash/config/ir/compiler/JavaCodecDelegatorTest.java index 7da86c699d0..300b41a3d98 100644 --- a/logstash-core/src/test/java/org/logstash/config/ir/compiler/JavaCodecDelegatorTest.java +++ b/logstash-core/src/test/java/org/logstash/config/ir/compiler/JavaCodecDelegatorTest.java @@ -198,7 +198,7 @@ private long getMetricLongValue(final String type, final String symbolName) { } private JavaCodecDelegator constructCodecDelegator() { - return new JavaCodecDelegator(new ContextImpl(null, this.getInstance()), codec); + return new JavaCodecDelegator(new ContextImpl(null, this.getInstance()), codec, null); } private abstract class AbstractCodec implements Codec { diff --git a/logstash-core/src/test/java/org/logstash/config/ir/compiler/OutputDelegatorTest.java b/logstash-core/src/test/java/org/logstash/config/ir/compiler/OutputDelegatorTest.java index f2ecf3957a2..b5912945499 100644 --- a/logstash-core/src/test/java/org/logstash/config/ir/compiler/OutputDelegatorTest.java +++ b/logstash-core/src/test/java/org/logstash/config/ir/compiler/OutputDelegatorTest.java @@ -41,6 +41,7 @@ public void setup() { pluginArgs = RubyHash.newHash(RUBY); pluginArgs.put("id", "foo"); pluginArgs.put("arg1", "val1"); + pluginArgs.put("config-ref", ""); } @Override @@ -114,6 +115,7 @@ public void closesOutputPlugin() { @Test public void singleConcurrencyStrategyIsDefault() { + pluginArgs.put("config-ref", ""); OutputDelegatorExt outputDelegator = constructOutputDelegator(); IRubyObject concurrency = outputDelegator.concurrency(RUBY.getCurrentContext()); assertEquals(RUBY.newSymbol("single"), concurrency); @@ -128,6 +130,7 @@ public void outputStrategyTests() { }; for (StrategyPair pair : outputStrategies) { + pluginArgs.put("config-ref", ""); FakeOutClass.setOutStrategy(RUBY.getCurrentContext(), null, pair.symbol); OutputDelegatorExt outputDelegator = constructOutputDelegator(); @@ -153,6 +156,7 @@ public void outputStrategyMethodDelegationTests() { }; final ThreadContext context = RUBY.getCurrentContext(); for (RubySymbol symbol : outputStrategies) { + pluginArgs.put("config-ref", ""); FakeOutClass.create().initialize(context); FakeOutClass.setOutStrategy(RUBY.getCurrentContext(), null, symbol); OutputDelegatorExt outputDelegator = constructOutputDelegator(); diff --git a/logstash-core/src/test/java/org/logstash/plugins/TestPluginFactory.java b/logstash-core/src/test/java/org/logstash/plugins/TestPluginFactory.java index 0c78ac865e4..d79fb92464f 100644 --- a/logstash-core/src/test/java/org/logstash/plugins/TestPluginFactory.java +++ b/logstash-core/src/test/java/org/logstash/plugins/TestPluginFactory.java @@ -15,22 +15,22 @@ public class TestPluginFactory implements RubyIntegration.PluginFactory { @Override - public IRubyObject buildInput(RubyString name, RubyInteger line, RubyInteger column, IRubyObject args, Map pluginArgs) { + public IRubyObject buildInput(RubyString name, RubyInteger line, RubyInteger column, String sourceFile, int sourceLine, IRubyObject args, Map pluginArgs) { return null; } @Override - public AbstractOutputDelegatorExt buildOutput(RubyString name, RubyInteger line, RubyInteger column, IRubyObject args, Map pluginArgs) { + public AbstractOutputDelegatorExt buildOutput(RubyString name, RubyInteger line, RubyInteger column, String sourceFile, int sourceLine, IRubyObject args, Map pluginArgs) { return null; } @Override - public AbstractFilterDelegatorExt buildFilter(RubyString name, RubyInteger line, RubyInteger column, IRubyObject args, Map pluginArgs) { + public AbstractFilterDelegatorExt buildFilter(RubyString name, RubyInteger line, RubyInteger column, String sourceFile, int sourceLine, IRubyObject args, Map pluginArgs) { return null; } @Override - public IRubyObject buildCodec(RubyString name, IRubyObject args, Map pluginArgs) { + public IRubyObject buildCodec(RubyString name, String sourceFile, int sourceLine, IRubyObject args, Map pluginArgs) { return null; } diff --git a/qa/integration/specs/monitoring_api_spec.rb b/qa/integration/specs/monitoring_api_spec.rb index 0a3270988e6..64105860fb9 100644 --- a/qa/integration/specs/monitoring_api_spec.rb +++ b/qa/integration/specs/monitoring_api_spec.rb @@ -48,7 +48,7 @@ end end - it 'can retrieve dlq stats' do + it "can retrieve dlq stats" do logstash_service = @fixture.get_service("logstash") logstash_service.start_with_stdin logstash_service.wait_for_logstash @@ -142,6 +142,89 @@ end end + it "can retrieve pipeline metrics stats - config string" do + logstash_service = @fixture.get_service("logstash") + logstash_service.start_with_stdin + logstash_service.wait_for_logstash + + Stud.try(max_retry.times, [StandardError, RSpec::Expectations::ExpectationNotMetError]) do + # event_stats can fail if the stats subsystem isn't ready + result = logstash_service.monitoring_api.pipeline_stats("main") rescue nil + puts "<><><> #{result} <><><>" + expect(result).not_to be_nil + + # we use fetch here since we want failed fetches to raise an exception + # and trigger the retry block + inputs_stats = result.fetch("plugins").fetch("inputs")[0] + config_ref = inputs_stats.fetch("config-ref") + puts ">>> inputs_stats: #{inputs_stats} <<<" + expect(config_ref).to eq("S: config_string, L:1, C:8") + end + end + + describe "multifile pipelines" do + + let!(:settings_dir) { Stud::Temporary.directory("logstash-splitted-pipeline-config-test") } + + it "can retrieve pipeline metrics stats - multiple files" do + IO.write(settings_dir + "/pipeline_1_piece.conf", """ + input { + stdin { + codec => json { + charset => \"UTF-8\" + } + } + } + + filter { + sleep { + time => 1 + } + } + """) + + IO.write(settings_dir + "/pipeline_2_piece.conf", """ + output { + stdout { + codec => rubydebug + } + } + """) + + logstash_service = @fixture.get_service("logstash") + logstash_service.spawn_logstash("--path.config", settings_dir) + logstash_service.wait_for_logstash + + Stud.try(max_retry.times, [StandardError, RSpec::Expectations::ExpectationNotMetError]) do + # event_stats can fail if the stats subsystem isn't ready + result = logstash_service.monitoring_api.pipeline_stats("main") rescue nil + expect(result).not_to be_nil + + inputs_stats = result.fetch("plugins").fetch("inputs")[0] + config_ref = inputs_stats.fetch("config-ref") + expect_source_ref(config_ref, "pipeline_1_piece.conf", 3, 8) + + input_codec_stats = result.fetch("plugins").fetch("codecs").select { |c| c["name"] == "json"}.first + expect(input_codec_stats).not_to be_nil + config_ref = input_codec_stats.fetch("config-ref") + expect_source_ref(config_ref, "pipeline_1_piece.conf", 3, 0) + + filters_stats = result.fetch("plugins").fetch("filters")[0] + config_ref = filters_stats.fetch("config-ref") + expect_source_ref(config_ref, "pipeline_1_piece.conf", 11, 8) + + outputs_stats = result.fetch("plugins").fetch("outputs")[0] + config_ref = outputs_stats.fetch("config-ref") + expect_source_ref(config_ref, "pipeline_2_piece.conf", 3, 9) + + output_codec_stats = result.fetch("plugins").fetch("codecs").select { |c| c["name"] == "rubydebug"}.first + expect(output_codec_stats).not_to be_nil + config_ref = output_codec_stats.fetch("parent-config-ref") + expect_source_ref(config_ref, "pipeline_2_piece.conf", 3, 9) + end + end + end + private def logging_get_assert(logstash_service, logstash_level, slowlog_level) @@ -159,4 +242,8 @@ def logging_put_assert(result) expect(result["acknowledged"]).to be(true) end + def expect_source_ref(config_ref, filename, expected_line, expected_column) + expect(config_ref).to match("S: \/tmp\/logstash-splitted-pipeline-config-test.*\/#{filename}, L:#{expected_line}, C:#{expected_column}") + end + end diff --git a/x-pack/spec/monitoring/inputs/metrics/state_event/lir_serializer_spec.rb b/x-pack/spec/monitoring/inputs/metrics/state_event/lir_serializer_spec.rb index b4c2c64dd53..f1b2aabd6fe 100644 --- a/x-pack/spec/monitoring/inputs/metrics/state_event/lir_serializer_spec.rb +++ b/x-pack/spec/monitoring/inputs/metrics/state_event/lir_serializer_spec.rb @@ -4,6 +4,7 @@ require "spec_helper" require "logstash/environment" +require "logstash/config/pipeline_config" describe ::LogStash::Config::LIRSerializer do let(:config) do @@ -21,8 +22,10 @@ [org.logstash.common.SourceWithMetadata.new("string", "spec", config)] end + let(:pipeline_config) { LogStash::Config::PipelineConfig.new("x-pack_lir_serializer_test", "lir", config_source_with_metadata, LogStash::SETTINGS) } + let(:lir_pipeline) do - ::LogStash::Compiler.compile_sources(config_source_with_metadata, LogStash::SETTINGS) + ::LogStash::Compiler.compile_sources(pipeline_config, LogStash::SETTINGS) end describe "#serialize" do