diff --git a/server/src/main/java/org/elasticsearch/ingest/CompoundProcessor.java b/server/src/main/java/org/elasticsearch/ingest/CompoundProcessor.java index f576667f44109..364e19d1f2e8e 100644 --- a/server/src/main/java/org/elasticsearch/ingest/CompoundProcessor.java +++ b/server/src/main/java/org/elasticsearch/ingest/CompoundProcessor.java @@ -21,12 +21,15 @@ package org.elasticsearch.ingest; import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.common.collect.Tuple; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.function.LongSupplier; import java.util.stream.Collectors; /** @@ -41,16 +44,33 @@ public class CompoundProcessor implements Processor { private final boolean ignoreFailure; private final List processors; private final List onFailureProcessors; + private final List> processorsWithMetrics; + private final LongSupplier relativeTimeProvider; + + CompoundProcessor(LongSupplier relativeTimeProvider, Processor... processor) { + this(false, Arrays.asList(processor), Collections.emptyList(), relativeTimeProvider); + } public CompoundProcessor(Processor... processor) { this(false, Arrays.asList(processor), Collections.emptyList()); } public CompoundProcessor(boolean ignoreFailure, List processors, List onFailureProcessors) { + this(ignoreFailure, processors, onFailureProcessors, System::nanoTime); + } + CompoundProcessor(boolean ignoreFailure, List processors, List onFailureProcessors, + LongSupplier relativeTimeProvider) { super(); this.ignoreFailure = ignoreFailure; this.processors = processors; this.onFailureProcessors = onFailureProcessors; + this.relativeTimeProvider = relativeTimeProvider; + this.processorsWithMetrics = new ArrayList<>(processors.size()); + processors.forEach(p -> processorsWithMetrics.add(new Tuple<>(p, new IngestMetric()))); + } + + List> getProcessorsWithMetrics() { + return processorsWithMetrics; } public boolean isIgnoreFailure() { @@ -95,12 +115,17 @@ public String getTag() { @Override public IngestDocument execute(IngestDocument ingestDocument) throws Exception { - for (Processor processor : processors) { + for (Tuple processorWithMetric : processorsWithMetrics) { + Processor processor = processorWithMetric.v1(); + IngestMetric metric = processorWithMetric.v2(); + long startTimeInNanos = relativeTimeProvider.getAsLong(); try { + metric.preIngest(); if (processor.execute(ingestDocument) == null) { return null; } } catch (Exception e) { + metric.ingestFailed(); if (ignoreFailure) { continue; } @@ -113,11 +138,15 @@ public IngestDocument execute(IngestDocument ingestDocument) throws Exception { executeOnFailure(ingestDocument, compoundProcessorException); break; } + } finally { + long ingestTimeInMillis = TimeUnit.NANOSECONDS.toMillis(relativeTimeProvider.getAsLong() - startTimeInNanos); + metric.postIngest(ingestTimeInMillis); } } return ingestDocument; } + void executeOnFailure(IngestDocument ingestDocument, ElasticsearchException exception) throws Exception { try { putFailureMetadata(ingestDocument, exception); diff --git a/server/src/main/java/org/elasticsearch/ingest/ConditionalProcessor.java b/server/src/main/java/org/elasticsearch/ingest/ConditionalProcessor.java index b6f6612344a39..9078dc86c1b07 100644 --- a/server/src/main/java/org/elasticsearch/ingest/ConditionalProcessor.java +++ b/server/src/main/java/org/elasticsearch/ingest/ConditionalProcessor.java @@ -28,6 +28,8 @@ import java.util.ListIterator; import java.util.Map; import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.function.LongSupplier; import java.util.stream.Collectors; import org.elasticsearch.script.IngestConditionalScript; import org.elasticsearch.script.Script; @@ -42,12 +44,20 @@ public class ConditionalProcessor extends AbstractProcessor { private final ScriptService scriptService; private final Processor processor; + private final IngestMetric metric; + private final LongSupplier relativeTimeProvider; ConditionalProcessor(String tag, Script script, ScriptService scriptService, Processor processor) { + this(tag, script, scriptService, processor, System::nanoTime); + } + + ConditionalProcessor(String tag, Script script, ScriptService scriptService, Processor processor, LongSupplier relativeTimeProvider) { super(tag); this.condition = script; this.scriptService = scriptService; this.processor = processor; + this.metric = new IngestMetric(); + this.relativeTimeProvider = relativeTimeProvider; } @Override @@ -55,11 +65,30 @@ public IngestDocument execute(IngestDocument ingestDocument) throws Exception { IngestConditionalScript script = scriptService.compile(condition, IngestConditionalScript.CONTEXT).newInstance(condition.getParams()); if (script.execute(new UnmodifiableIngestData(ingestDocument.getSourceAndMetadata()))) { - return processor.execute(ingestDocument); + // Only record metric if the script evaluates to true + long startTimeInNanos = relativeTimeProvider.getAsLong(); + try { + metric.preIngest(); + return processor.execute(ingestDocument); + } catch (Exception e) { + metric.ingestFailed(); + throw e; + } finally { + long ingestTimeInMillis = TimeUnit.NANOSECONDS.toMillis(relativeTimeProvider.getAsLong() - startTimeInNanos); + metric.postIngest(ingestTimeInMillis); + } } return ingestDocument; } + Processor getProcessor() { + return processor; + } + + IngestMetric getMetric() { + return metric; + } + @Override public String getType() { return TYPE; diff --git a/server/src/main/java/org/elasticsearch/ingest/IngestService.java b/server/src/main/java/org/elasticsearch/ingest/IngestService.java index 5bc24a367da33..083cb362ca394 100644 --- a/server/src/main/java/org/elasticsearch/ingest/IngestService.java +++ b/server/src/main/java/org/elasticsearch/ingest/IngestService.java @@ -19,20 +19,6 @@ package org.elasticsearch.ingest; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; - -import java.util.Objects; -import java.util.Set; -import java.util.concurrent.TimeUnit; -import java.util.function.BiConsumer; -import java.util.function.Consumer; -import java.util.stream.Collectors; - import org.elasticsearch.ElasticsearchParseException; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.ResourceNotFoundException; @@ -50,6 +36,7 @@ import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.regex.Regex; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.AbstractRunnable; @@ -62,6 +49,19 @@ import org.elasticsearch.script.ScriptService; import org.elasticsearch.threadpool.ThreadPool; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.function.BiConsumer; +import java.util.function.Consumer; + /** * Holder class for several ingest related services. */ @@ -263,11 +263,59 @@ public void applyClusterState(final ClusterChangedEvent event) { Pipeline originalPipeline = originalPipelines.get(id); if (originalPipeline != null) { pipeline.getMetrics().add(originalPipeline.getMetrics()); + List> oldPerProcessMetrics = new ArrayList<>(); + List> newPerProcessMetrics = new ArrayList<>(); + getProcessorMetrics(originalPipeline.getCompoundProcessor(), oldPerProcessMetrics); + getProcessorMetrics(pipeline.getCompoundProcessor(), newPerProcessMetrics); + //Best attempt to populate new processor metrics using a parallel array of the old metrics. This is not ideal since + //the per processor metrics may get reset when the arrays don't match. However, to get to an ideal model, unique and + //consistent id's per processor and/or semantic equals for each processor will be needed. + if (newPerProcessMetrics.size() == oldPerProcessMetrics.size()) { + Iterator> oldMetricsIterator = oldPerProcessMetrics.iterator(); + for (Tuple compositeMetric : newPerProcessMetrics) { + String type = compositeMetric.v1().getType(); + IngestMetric metric = compositeMetric.v2(); + if (oldMetricsIterator.hasNext()) { + Tuple oldCompositeMetric = oldMetricsIterator.next(); + String oldType = oldCompositeMetric.v1().getType(); + IngestMetric oldMetric = oldCompositeMetric.v2(); + if (type.equals(oldType)) { + metric.add(oldMetric); + } + } + } + } } }); } } + /** + * Recursive method to obtain all of the non-failure processors for given compoundProcessor. Since conditionals are implemented as + * wrappers to the actual processor, always prefer the actual processor's metric over the conditional processor's metric. + * @param compoundProcessor The compound processor to start walking the non-failure processors + * @param processorMetrics The list of {@link Processor} {@link IngestMetric} tuples. + * @return the processorMetrics for all non-failure processor that belong to the original compoundProcessor + */ + private static List> getProcessorMetrics(CompoundProcessor compoundProcessor, + List> processorMetrics) { + //only surface the top level non-failure processors, on-failure processor times will be included in the top level non-failure + for (Tuple processorWithMetric : compoundProcessor.getProcessorsWithMetrics()) { + Processor processor = processorWithMetric.v1(); + IngestMetric metric = processorWithMetric.v2(); + if (processor instanceof CompoundProcessor) { + getProcessorMetrics((CompoundProcessor) processor, processorMetrics); + } else { + //Prefer the conditional's metric since it only includes metrics when the conditional evaluated to true. + if (processor instanceof ConditionalProcessor) { + metric = ((ConditionalProcessor) processor).getMetric(); + } + processorMetrics.add(new Tuple<>(processor, metric)); + } + } + return processorMetrics; + } + private static Pipeline substitutePipeline(String id, ElasticsearchParseException e) { String tag = e.getHeaderKeys().contains("processor_tag") ? e.getHeader("processor_tag").get(0) : null; String type = e.getHeaderKeys().contains("processor_type") ? e.getHeader("processor_type").get(0) : "unknown"; @@ -372,11 +420,42 @@ protected void doRun() { } public IngestStats stats() { + IngestStats.Builder statsBuilder = new IngestStats.Builder(); + statsBuilder.addTotalMetrics(totalMetrics); + pipelines.forEach((id, pipeline) -> { + CompoundProcessor rootProcessor = pipeline.getCompoundProcessor(); + statsBuilder.addPipelineMetrics(id, pipeline.getMetrics()); + List> processorMetrics = new ArrayList<>(); + getProcessorMetrics(rootProcessor, processorMetrics); + processorMetrics.forEach(t -> { + Processor processor = t.v1(); + IngestMetric processorMetric = t.v2(); + statsBuilder.addProcessorMetrics(id, getProcessorName(processor), processorMetric); + }); + }); + return statsBuilder.build(); + } - Map statsPerPipeline = - pipelines.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, v -> v.getValue().getMetrics().createStats())); + //package private for testing + static String getProcessorName(Processor processor){ + // conditionals are implemented as wrappers around the real processor, so get the real processor for the correct type for the name + if(processor instanceof ConditionalProcessor){ + processor = ((ConditionalProcessor) processor).getProcessor(); + } + StringBuilder sb = new StringBuilder(5); + sb.append(processor.getType()); - return new IngestStats(totalMetrics.createStats(), statsPerPipeline); + if(processor instanceof PipelineProcessor){ + String pipelineName = ((PipelineProcessor) processor).getPipelineName(); + sb.append(":"); + sb.append(pipelineName); + } + String tag = processor.getTag(); + if(tag != null && !tag.isEmpty()){ + sb.append(":"); + sb.append(tag); + } + return sb.toString(); } private void innerExecute(IndexRequest indexRequest, Pipeline pipeline, Consumer itemDroppedHandler) throws Exception { diff --git a/server/src/main/java/org/elasticsearch/ingest/IngestStats.java b/server/src/main/java/org/elasticsearch/ingest/IngestStats.java index c4c1520fd19d4..7a24cf3ee895e 100644 --- a/server/src/main/java/org/elasticsearch/ingest/IngestStats.java +++ b/server/src/main/java/org/elasticsearch/ingest/IngestStats.java @@ -19,6 +19,7 @@ package org.elasticsearch.ingest; +import org.elasticsearch.Version; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Writeable; @@ -27,17 +28,28 @@ import org.elasticsearch.common.xcontent.XContentBuilder; import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; public class IngestStats implements Writeable, ToXContentFragment { private final Stats totalStats; - private final Map statsPerPipeline; + private final List pipelineStats; + private final Map> processorStats; - public IngestStats(Stats totalStats, Map statsPerPipeline) { + /** + * @param totalStats - The total stats for Ingest. This is the logically the sum of all pipeline stats, + * and pipeline stats are logically the sum of the processor stats. + * @param pipelineStats - The stats for a given ingest pipeline. + * @param processorStats - The per-processor stats for a given pipeline. A map keyed by the pipeline identifier. + */ + public IngestStats(Stats totalStats, List pipelineStats, Map> processorStats) { this.totalStats = totalStats; - this.statsPerPipeline = statsPerPipeline; + this.pipelineStats = pipelineStats; + this.processorStats = processorStats; } /** @@ -46,37 +58,47 @@ public IngestStats(Stats totalStats, Map statsPerPipeline) { public IngestStats(StreamInput in) throws IOException { this.totalStats = new Stats(in); int size = in.readVInt(); - this.statsPerPipeline = new HashMap<>(size); + this.pipelineStats = new ArrayList<>(size); + this.processorStats = new HashMap<>(size); for (int i = 0; i < size; i++) { - statsPerPipeline.put(in.readString(), new Stats(in)); + String pipelineId = in.readString(); + Stats pipelineStat = new Stats(in); + this.pipelineStats.add(new PipelineStat(pipelineId, pipelineStat)); + if (in.getVersion().onOrAfter(Version.V_6_5_0)) { + int processorsSize = in.readVInt(); + List processorStatsPerPipeline = new ArrayList<>(processorsSize); + for (int j = 0; j < processorsSize; j++) { + String processorName = in.readString(); + Stats processorStat = new Stats(in); + processorStatsPerPipeline.add(new ProcessorStat(processorName, processorStat)); + } + this.processorStats.put(pipelineId, processorStatsPerPipeline); + } } } @Override public void writeTo(StreamOutput out) throws IOException { totalStats.writeTo(out); - out.writeVInt(statsPerPipeline.size()); - for (Map.Entry entry : statsPerPipeline.entrySet()) { - out.writeString(entry.getKey()); - entry.getValue().writeTo(out); + out.writeVInt(pipelineStats.size()); + for (PipelineStat pipelineStat : pipelineStats) { + out.writeString(pipelineStat.getPipelineId()); + pipelineStat.getStats().writeTo(out); + if (out.getVersion().onOrAfter(Version.V_6_5_0)) { + List processorStatsForPipeline = processorStats.get(pipelineStat.getPipelineId()); + if (processorStatsForPipeline == null) { + out.writeVInt(0); + } else { + out.writeVInt(processorStatsForPipeline.size()); + for (ProcessorStat processorStat : processorStatsForPipeline) { + out.writeString(processorStat.getName()); + processorStat.getStats().writeTo(out); + } + } + } } } - - /** - * @return The accumulated stats for all pipelines - */ - public Stats getTotalStats() { - return totalStats; - } - - /** - * @return The stats on a per pipeline basis - */ - public Map getStatsPerPipeline() { - return statsPerPipeline; - } - @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject("ingest"); @@ -84,9 +106,21 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws totalStats.toXContent(builder, params); builder.endObject(); builder.startObject("pipelines"); - for (Map.Entry entry : statsPerPipeline.entrySet()) { - builder.startObject(entry.getKey()); - entry.getValue().toXContent(builder, params); + for (PipelineStat pipelineStat : pipelineStats) { + builder.startObject(pipelineStat.getPipelineId()); + pipelineStat.getStats().toXContent(builder, params); + List processorStatsForPipeline = processorStats.get(pipelineStat.getPipelineId()); + builder.startArray("processors"); + if (processorStatsForPipeline != null) { + for (ProcessorStat processorStat : processorStatsForPipeline) { + builder.startObject(); + builder.startObject(processorStat.getName()); + processorStat.getStats().toXContent(builder, params); + builder.endObject(); + builder.endObject(); + } + } + builder.endArray(); builder.endObject(); } builder.endObject(); @@ -94,6 +128,18 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws return builder; } + public Stats getTotalStats() { + return totalStats; + } + + public List getPipelineStats() { + return pipelineStats; + } + + public Map> getProcessorStats() { + return processorStats; + } + public static class Stats implements Writeable, ToXContentFragment { private final long ingestCount; @@ -134,7 +180,6 @@ public long getIngestCount() { } /** - * * @return The total time spent of ingest preprocessing in millis. */ public long getIngestTimeInMillis() { @@ -164,4 +209,77 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws return builder; } } + + /** + * Easy conversion from scoped {@link IngestMetric} objects to a serializable Stats objects + */ + static class Builder { + private Stats totalStats; + private List pipelineStats = new ArrayList<>(); + private Map> processorStats = new HashMap<>(); + + + Builder addTotalMetrics(IngestMetric totalMetric) { + this.totalStats = totalMetric.createStats(); + return this; + } + + Builder addPipelineMetrics(String pipelineId, IngestMetric pipelineMetric) { + this.pipelineStats.add(new PipelineStat(pipelineId, pipelineMetric.createStats())); + return this; + } + + Builder addProcessorMetrics(String pipelineId, String processorName, IngestMetric metric) { + this.processorStats.computeIfAbsent(pipelineId, k -> new ArrayList<>()) + .add(new ProcessorStat(processorName, metric.createStats())); + return this; + } + + IngestStats build() { + return new IngestStats(totalStats, Collections.unmodifiableList(pipelineStats), + Collections.unmodifiableMap(processorStats)); + } + } + + /** + * Container for pipeline stats. + */ + public static class PipelineStat { + private final String pipelineId; + private final Stats stats; + + public PipelineStat(String pipelineId, Stats stats) { + this.pipelineId = pipelineId; + this.stats = stats; + } + + public String getPipelineId() { + return pipelineId; + } + + public Stats getStats() { + return stats; + } + } + + /** + * Container for processor stats. + */ + public static class ProcessorStat { + private final String name; + private final Stats stats; + + public ProcessorStat(String name, Stats stats) { + this.name = name; + this.stats = stats; + } + + public String getName() { + return name; + } + + public Stats getStats() { + return stats; + } + } } diff --git a/server/src/main/java/org/elasticsearch/ingest/Pipeline.java b/server/src/main/java/org/elasticsearch/ingest/Pipeline.java index 8d5f6d6ff7c54..fc5311be5cbde 100644 --- a/server/src/main/java/org/elasticsearch/ingest/Pipeline.java +++ b/server/src/main/java/org/elasticsearch/ingest/Pipeline.java @@ -22,11 +22,12 @@ import org.elasticsearch.ElasticsearchParseException; import org.elasticsearch.common.Nullable; -import java.time.Clock; import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.function.LongSupplier; import org.elasticsearch.script.ScriptService; @@ -47,20 +48,21 @@ public final class Pipeline { private final Integer version; private final CompoundProcessor compoundProcessor; private final IngestMetric metrics; - private final Clock clock; + private final LongSupplier relativeTimeProvider; public Pipeline(String id, @Nullable String description, @Nullable Integer version, CompoundProcessor compoundProcessor) { - this(id, description, version, compoundProcessor, Clock.systemUTC()); + this(id, description, version, compoundProcessor, System::nanoTime); } //package private for testing - Pipeline(String id, @Nullable String description, @Nullable Integer version, CompoundProcessor compoundProcessor, Clock clock) { + Pipeline(String id, @Nullable String description, @Nullable Integer version, CompoundProcessor compoundProcessor, + LongSupplier relativeTimeProvider) { this.id = id; this.description = description; this.compoundProcessor = compoundProcessor; this.version = version; this.metrics = new IngestMetric(); - this.clock = clock; + this.relativeTimeProvider = relativeTimeProvider; } public static Pipeline create(String id, Map config, @@ -89,7 +91,7 @@ public static Pipeline create(String id, Map config, * Modifies the data of a document to be indexed based on the processor this pipeline holds */ public IngestDocument execute(IngestDocument ingestDocument) throws Exception { - long startTimeInMillis = clock.millis(); + long startTimeInNanos = relativeTimeProvider.getAsLong(); try { metrics.preIngest(); return compoundProcessor.execute(ingestDocument); @@ -97,7 +99,7 @@ public IngestDocument execute(IngestDocument ingestDocument) throws Exception { metrics.ingestFailed(); throw e; } finally { - long ingestTimeInMillis = clock.millis() - startTimeInMillis; + long ingestTimeInMillis = TimeUnit.NANOSECONDS.toMillis(relativeTimeProvider.getAsLong() - startTimeInNanos); metrics.postIngest(ingestTimeInMillis); } } diff --git a/server/src/main/java/org/elasticsearch/ingest/PipelineProcessor.java b/server/src/main/java/org/elasticsearch/ingest/PipelineProcessor.java index 918ff6b8aefee..16324e8dee6c7 100644 --- a/server/src/main/java/org/elasticsearch/ingest/PipelineProcessor.java +++ b/server/src/main/java/org/elasticsearch/ingest/PipelineProcessor.java @@ -53,6 +53,10 @@ public String getType() { return TYPE; } + String getPipelineName() { + return pipelineName; + } + public static final class Factory implements Processor.Factory { private final IngestService ingestService; diff --git a/server/src/test/java/org/elasticsearch/action/admin/cluster/node/stats/NodeStatsTests.java b/server/src/test/java/org/elasticsearch/action/admin/cluster/node/stats/NodeStatsTests.java index 253f5d2b19dc3..f5669c727d881 100644 --- a/server/src/test/java/org/elasticsearch/action/admin/cluster/node/stats/NodeStatsTests.java +++ b/server/src/test/java/org/elasticsearch/action/admin/cluster/node/stats/NodeStatsTests.java @@ -54,7 +54,6 @@ import static java.util.Collections.emptySet; public class NodeStatsTests extends ESTestCase { - public void testSerialization() throws IOException { NodeStats nodeStats = createNodeStats(); try (BytesStreamOutput out = new BytesStreamOutput()) { @@ -272,14 +271,29 @@ public void testSerialization() throws IOException { assertEquals(totalStats.getIngestCurrent(), deserializedIngestStats.getTotalStats().getIngestCurrent()); assertEquals(totalStats.getIngestFailedCount(), deserializedIngestStats.getTotalStats().getIngestFailedCount()); assertEquals(totalStats.getIngestTimeInMillis(), deserializedIngestStats.getTotalStats().getIngestTimeInMillis()); - assertEquals(ingestStats.getStatsPerPipeline().size(), deserializedIngestStats.getStatsPerPipeline().size()); - for (Map.Entry entry : ingestStats.getStatsPerPipeline().entrySet()) { - IngestStats.Stats stats = entry.getValue(); - IngestStats.Stats deserializedStats = deserializedIngestStats.getStatsPerPipeline().get(entry.getKey()); - assertEquals(stats.getIngestFailedCount(), deserializedStats.getIngestFailedCount()); - assertEquals(stats.getIngestTimeInMillis(), deserializedStats.getIngestTimeInMillis()); - assertEquals(stats.getIngestCurrent(), deserializedStats.getIngestCurrent()); - assertEquals(stats.getIngestCount(), deserializedStats.getIngestCount()); + assertEquals(ingestStats.getPipelineStats().size(), deserializedIngestStats.getPipelineStats().size()); + for (IngestStats.PipelineStat pipelineStat : ingestStats.getPipelineStats()) { + String pipelineId = pipelineStat.getPipelineId(); + IngestStats.Stats deserializedPipelineStats = + getPipelineStats(deserializedIngestStats.getPipelineStats(), pipelineId); + assertEquals(pipelineStat.getStats().getIngestFailedCount(), deserializedPipelineStats.getIngestFailedCount()); + assertEquals(pipelineStat.getStats().getIngestTimeInMillis(), deserializedPipelineStats.getIngestTimeInMillis()); + assertEquals(pipelineStat.getStats().getIngestCurrent(), deserializedPipelineStats.getIngestCurrent()); + assertEquals(pipelineStat.getStats().getIngestCount(), deserializedPipelineStats.getIngestCount()); + List processorStats = ingestStats.getProcessorStats().get(pipelineId); + //intentionally validating identical order + Iterator it = deserializedIngestStats.getProcessorStats().get(pipelineId).iterator(); + for (IngestStats.ProcessorStat processorStat : processorStats) { + IngestStats.ProcessorStat deserializedProcessorStat = it.next(); + assertEquals(processorStat.getStats().getIngestFailedCount(), + deserializedProcessorStat.getStats().getIngestFailedCount()); + assertEquals(processorStat.getStats().getIngestTimeInMillis(), + deserializedProcessorStat.getStats().getIngestTimeInMillis()); + assertEquals(processorStat.getStats().getIngestCurrent(), + deserializedProcessorStat.getStats().getIngestCurrent()); + assertEquals(processorStat.getStats().getIngestCount(), deserializedProcessorStat.getStats().getIngestCount()); + } + assertFalse(it.hasNext()); } } AdaptiveSelectionStats adaptiveStats = nodeStats.getAdaptiveSelectionStats(); @@ -429,14 +443,24 @@ private static NodeStats createNodeStats() { if (frequently()) { IngestStats.Stats totalStats = new IngestStats.Stats(randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong()); + int numPipelines = randomIntBetween(0, 10); + int numProcessors = randomIntBetween(0, 10); + List ingestPipelineStats = new ArrayList<>(numPipelines); + Map> ingestProcessorStats = new HashMap<>(numPipelines); + for (int i = 0; i < numPipelines; i++) { + String pipelineId = randomAlphaOfLengthBetween(3, 10); + ingestPipelineStats.add(new IngestStats.PipelineStat(pipelineId, new IngestStats.Stats + (randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong()))); - int numStatsPerPipeline = randomIntBetween(0, 10); - Map statsPerPipeline = new HashMap<>(); - for (int i = 0; i < numStatsPerPipeline; i++) { - statsPerPipeline.put(randomAlphaOfLengthBetween(3, 10), new IngestStats.Stats(randomNonNegativeLong(), - randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong())); + List processorPerPipeline = new ArrayList<>(numProcessors); + for (int j =0; j < numProcessors;j++) { + IngestStats.Stats processorStats = new IngestStats.Stats + (randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong()); + processorPerPipeline.add(new IngestStats.ProcessorStat(randomAlphaOfLengthBetween(3, 10), processorStats)); + } + ingestProcessorStats.put(pipelineId,processorPerPipeline); } - ingestStats = new IngestStats(totalStats, statsPerPipeline); + ingestStats = new IngestStats(totalStats, ingestPipelineStats, ingestProcessorStats); } AdaptiveSelectionStats adaptiveSelectionStats = null; if (frequently()) { @@ -465,4 +489,8 @@ private static NodeStats createNodeStats() { fsInfo, transportStats, httpStats, allCircuitBreakerStats, scriptStats, discoveryStats, ingestStats, adaptiveSelectionStats); } + + private IngestStats.Stats getPipelineStats(List pipelineStats, String id) { + return pipelineStats.stream().filter(p1 -> p1.getPipelineId().equals(id)).findFirst().map(p2 -> p2.getStats()).orElse(null); + } } diff --git a/server/src/test/java/org/elasticsearch/ingest/CompoundProcessorTests.java b/server/src/test/java/org/elasticsearch/ingest/CompoundProcessorTests.java index aaede49a36d57..dabcae533a0bf 100644 --- a/server/src/test/java/org/elasticsearch/ingest/CompoundProcessorTests.java +++ b/server/src/test/java/org/elasticsearch/ingest/CompoundProcessorTests.java @@ -27,11 +27,17 @@ import java.util.Collections; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.function.LongSupplier; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.sameInstance; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; public class CompoundProcessorTests extends ESTestCase { private IngestDocument ingestDocument; @@ -49,18 +55,29 @@ public void testEmpty() throws Exception { } public void testSingleProcessor() throws Exception { - TestProcessor processor = new TestProcessor(ingestDocument -> {}); - CompoundProcessor compoundProcessor = new CompoundProcessor(processor); + LongSupplier relativeTimeProvider = mock(LongSupplier.class); + when(relativeTimeProvider.getAsLong()).thenReturn(0L, TimeUnit.MILLISECONDS.toNanos(1)); + TestProcessor processor = new TestProcessor(ingestDocument ->{ + assertStats(0, ingestDocument.getFieldValue("compoundProcessor", CompoundProcessor.class), 1, 0, 0, 0); + }); + CompoundProcessor compoundProcessor = new CompoundProcessor(relativeTimeProvider, processor); + ingestDocument.setFieldValue("compoundProcessor", compoundProcessor); //ugly hack to assert current count = 1 assertThat(compoundProcessor.getProcessors().size(), equalTo(1)); assertThat(compoundProcessor.getProcessors().get(0), sameInstance(processor)); + assertThat(compoundProcessor.getProcessors().get(0), sameInstance(processor)); assertThat(compoundProcessor.getOnFailureProcessors().isEmpty(), is(true)); compoundProcessor.execute(ingestDocument); + verify(relativeTimeProvider, times(2)).getAsLong(); assertThat(processor.getInvokedCounter(), equalTo(1)); + assertStats(compoundProcessor, 1, 0, 1); + } public void testSingleProcessorWithException() throws Exception { TestProcessor processor = new TestProcessor(ingestDocument -> {throw new RuntimeException("error");}); - CompoundProcessor compoundProcessor = new CompoundProcessor(processor); + LongSupplier relativeTimeProvider = mock(LongSupplier.class); + when(relativeTimeProvider.getAsLong()).thenReturn(0L); + CompoundProcessor compoundProcessor = new CompoundProcessor(relativeTimeProvider, processor); assertThat(compoundProcessor.getProcessors().size(), equalTo(1)); assertThat(compoundProcessor.getProcessors().get(0), sameInstance(processor)); assertThat(compoundProcessor.getOnFailureProcessors().isEmpty(), is(true)); @@ -71,15 +88,22 @@ public void testSingleProcessorWithException() throws Exception { assertThat(e.getRootCause().getMessage(), equalTo("error")); } assertThat(processor.getInvokedCounter(), equalTo(1)); + assertStats(compoundProcessor, 1, 1, 0); + } public void testIgnoreFailure() throws Exception { TestProcessor processor1 = new TestProcessor(ingestDocument -> {throw new RuntimeException("error");}); TestProcessor processor2 = new TestProcessor(ingestDocument -> {ingestDocument.setFieldValue("field", "value");}); - CompoundProcessor compoundProcessor = new CompoundProcessor(true, Arrays.asList(processor1, processor2), Collections.emptyList()); + LongSupplier relativeTimeProvider = mock(LongSupplier.class); + when(relativeTimeProvider.getAsLong()).thenReturn(0L); + CompoundProcessor compoundProcessor = + new CompoundProcessor(true, Arrays.asList(processor1, processor2), Collections.emptyList(), relativeTimeProvider); compoundProcessor.execute(ingestDocument); assertThat(processor1.getInvokedCounter(), equalTo(1)); + assertStats(0, compoundProcessor, 0, 1, 1, 0); assertThat(processor2.getInvokedCounter(), equalTo(1)); + assertStats(1, compoundProcessor, 0, 1, 0, 0); assertThat(ingestDocument.getFieldValue("field", String.class), equalTo("value")); } @@ -93,11 +117,15 @@ public void testSingleProcessorWithOnFailureProcessor() throws Exception { assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_PROCESSOR_TAG_FIELD), equalTo("id")); }); + LongSupplier relativeTimeProvider = mock(LongSupplier.class); + when(relativeTimeProvider.getAsLong()).thenReturn(0L, TimeUnit.MILLISECONDS.toNanos(1)); CompoundProcessor compoundProcessor = new CompoundProcessor(false, Collections.singletonList(processor1), - Collections.singletonList(processor2)); + Collections.singletonList(processor2), relativeTimeProvider); compoundProcessor.execute(ingestDocument); + verify(relativeTimeProvider, times(2)).getAsLong(); assertThat(processor1.getInvokedCounter(), equalTo(1)); + assertStats(compoundProcessor, 1, 1, 1); assertThat(processor2.getInvokedCounter(), equalTo(1)); } @@ -118,14 +146,17 @@ public void testSingleProcessorWithNestedFailures() throws Exception { assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_PROCESSOR_TYPE_FIELD), equalTo("second")); assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_PROCESSOR_TAG_FIELD), equalTo("id2")); }); + LongSupplier relativeTimeProvider = mock(LongSupplier.class); + when(relativeTimeProvider.getAsLong()).thenReturn(0L); CompoundProcessor compoundOnFailProcessor = new CompoundProcessor(false, Collections.singletonList(processorToFail), - Collections.singletonList(lastProcessor)); + Collections.singletonList(lastProcessor), relativeTimeProvider); CompoundProcessor compoundProcessor = new CompoundProcessor(false, Collections.singletonList(processor), - Collections.singletonList(compoundOnFailProcessor)); + Collections.singletonList(compoundOnFailProcessor), relativeTimeProvider); compoundProcessor.execute(ingestDocument); assertThat(processorToFail.getInvokedCounter(), equalTo(1)); assertThat(lastProcessor.getInvokedCounter(), equalTo(1)); + assertStats(compoundProcessor, 1, 1, 0); } public void testCompoundProcessorExceptionFailWithoutOnFailure() throws Exception { @@ -137,21 +168,24 @@ public void testCompoundProcessorExceptionFailWithoutOnFailure() throws Exceptio assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_PROCESSOR_TYPE_FIELD), equalTo("first")); assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_PROCESSOR_TAG_FIELD), equalTo("id1")); }); + LongSupplier relativeTimeProvider = mock(LongSupplier.class); + when(relativeTimeProvider.getAsLong()).thenReturn(0L); - CompoundProcessor failCompoundProcessor = new CompoundProcessor(firstProcessor); + CompoundProcessor failCompoundProcessor = new CompoundProcessor(relativeTimeProvider, firstProcessor); CompoundProcessor compoundProcessor = new CompoundProcessor(false, Collections.singletonList(failCompoundProcessor), - Collections.singletonList(secondProcessor)); + Collections.singletonList(secondProcessor), relativeTimeProvider); compoundProcessor.execute(ingestDocument); assertThat(firstProcessor.getInvokedCounter(), equalTo(1)); assertThat(secondProcessor.getInvokedCounter(), equalTo(1)); + assertStats(compoundProcessor, 1, 1, 0); } public void testCompoundProcessorExceptionFail() throws Exception { TestProcessor firstProcessor = new TestProcessor("id1", "first", ingestDocument -> {throw new RuntimeException("error");}); TestProcessor failProcessor = - new TestProcessor("tag_fail", "fail", ingestDocument -> {throw new RuntimeException("custom error message");}); + new TestProcessor("tag_fail", "fail", ingestDocument -> {throw new RuntimeException("custom error message");}); TestProcessor secondProcessor = new TestProcessor("id3", "second", ingestDocument -> { Map ingestMetadata = ingestDocument.getIngestMetadata(); assertThat(ingestMetadata.entrySet(), hasSize(3)); @@ -160,21 +194,24 @@ public void testCompoundProcessorExceptionFail() throws Exception { assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_PROCESSOR_TAG_FIELD), equalTo("tag_fail")); }); + LongSupplier relativeTimeProvider = mock(LongSupplier.class); + when(relativeTimeProvider.getAsLong()).thenReturn(0L); CompoundProcessor failCompoundProcessor = new CompoundProcessor(false, Collections.singletonList(firstProcessor), - Collections.singletonList(failProcessor)); + Collections.singletonList(failProcessor), relativeTimeProvider); CompoundProcessor compoundProcessor = new CompoundProcessor(false, Collections.singletonList(failCompoundProcessor), - Collections.singletonList(secondProcessor)); + Collections.singletonList(secondProcessor), relativeTimeProvider); compoundProcessor.execute(ingestDocument); assertThat(firstProcessor.getInvokedCounter(), equalTo(1)); assertThat(secondProcessor.getInvokedCounter(), equalTo(1)); + assertStats(compoundProcessor, 1, 1, 0); } public void testCompoundProcessorExceptionFailInOnFailure() throws Exception { TestProcessor firstProcessor = new TestProcessor("id1", "first", ingestDocument -> {throw new RuntimeException("error");}); TestProcessor failProcessor = - new TestProcessor("tag_fail", "fail", ingestDocument -> {throw new RuntimeException("custom error message");}); + new TestProcessor("tag_fail", "fail", ingestDocument -> {throw new RuntimeException("custom error message");}); TestProcessor secondProcessor = new TestProcessor("id3", "second", ingestDocument -> { Map ingestMetadata = ingestDocument.getIngestMetadata(); assertThat(ingestMetadata.entrySet(), hasSize(3)); @@ -183,27 +220,44 @@ public void testCompoundProcessorExceptionFailInOnFailure() throws Exception { assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_PROCESSOR_TAG_FIELD), equalTo("tag_fail")); }); + LongSupplier relativeTimeProvider = mock(LongSupplier.class); + when(relativeTimeProvider.getAsLong()).thenReturn(0L); CompoundProcessor failCompoundProcessor = new CompoundProcessor(false, Collections.singletonList(firstProcessor), - Collections.singletonList(new CompoundProcessor(failProcessor))); + Collections.singletonList(new CompoundProcessor(relativeTimeProvider, failProcessor))); CompoundProcessor compoundProcessor = new CompoundProcessor(false, Collections.singletonList(failCompoundProcessor), - Collections.singletonList(secondProcessor)); + Collections.singletonList(secondProcessor), relativeTimeProvider); compoundProcessor.execute(ingestDocument); assertThat(firstProcessor.getInvokedCounter(), equalTo(1)); assertThat(secondProcessor.getInvokedCounter(), equalTo(1)); + assertStats(compoundProcessor, 1, 1, 0); } public void testBreakOnFailure() throws Exception { TestProcessor firstProcessor = new TestProcessor("id1", "first", ingestDocument -> {throw new RuntimeException("error1");}); TestProcessor secondProcessor = new TestProcessor("id2", "second", ingestDocument -> {throw new RuntimeException("error2");}); TestProcessor onFailureProcessor = new TestProcessor("id2", "on_failure", ingestDocument -> {}); + LongSupplier relativeTimeProvider = mock(LongSupplier.class); + when(relativeTimeProvider.getAsLong()).thenReturn(0L); CompoundProcessor pipeline = new CompoundProcessor(false, Arrays.asList(firstProcessor, secondProcessor), - Collections.singletonList(onFailureProcessor)); + Collections.singletonList(onFailureProcessor), relativeTimeProvider); pipeline.execute(ingestDocument); assertThat(firstProcessor.getInvokedCounter(), equalTo(1)); assertThat(secondProcessor.getInvokedCounter(), equalTo(0)); assertThat(onFailureProcessor.getInvokedCounter(), equalTo(1)); + assertStats(pipeline, 1, 1, 0); + } + + private void assertStats(CompoundProcessor compoundProcessor, long count, long failed, long time) { + assertStats(0, compoundProcessor, 0L, count, failed, time); + } + private void assertStats(int processor, CompoundProcessor compoundProcessor, long current, long count, long failed, long time) { + IngestStats.Stats stats = compoundProcessor.getProcessorsWithMetrics().get(processor).v2().createStats(); + assertThat(stats.getIngestCount(), equalTo(count)); + assertThat(stats.getIngestCurrent(), equalTo(current)); + assertThat(stats.getIngestFailedCount(), equalTo(failed)); + assertThat(stats.getIngestTimeInMillis(), equalTo(time)); } } diff --git a/server/src/test/java/org/elasticsearch/ingest/ConditionalProcessorTests.java b/server/src/test/java/org/elasticsearch/ingest/ConditionalProcessorTests.java index c7d4dfa4e68cd..c5548ae559400 100644 --- a/server/src/test/java/org/elasticsearch/ingest/ConditionalProcessorTests.java +++ b/server/src/test/java/org/elasticsearch/ingest/ConditionalProcessorTests.java @@ -33,12 +33,18 @@ import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; import java.util.function.Consumer; +import java.util.function.LongSupplier; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.hasKey; import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.not; import static org.hamcrest.core.Is.is; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; public class ConditionalProcessorTests extends ESTestCase { @@ -60,6 +66,8 @@ public void testChecksCondition() throws Exception { new HashMap<>(ScriptModule.CORE_CONTEXTS) ); Map document = new HashMap<>(); + LongSupplier relativeTimeProvider = mock(LongSupplier.class); + when(relativeTimeProvider.getAsLong()).thenReturn(0L, TimeUnit.MILLISECONDS.toNanos(1), 0L, TimeUnit.MILLISECONDS.toNanos(2)); ConditionalProcessor processor = new ConditionalProcessor( randomAlphaOfLength(10), new Script( @@ -67,7 +75,10 @@ public void testChecksCondition() throws Exception { scriptName, Collections.emptyMap()), scriptService, new Processor() { @Override - public IngestDocument execute(final IngestDocument ingestDocument) throws Exception { + public IngestDocument execute(final IngestDocument ingestDocument){ + if(ingestDocument.hasField("error")){ + throw new RuntimeException("error"); + } ingestDocument.setFieldValue("foo", "bar"); return ingestDocument; } @@ -81,20 +92,37 @@ public String getType() { public String getTag() { return null; } - }); + }, relativeTimeProvider); + //false, never call processor never increments metrics + String falseValue = "falsy"; IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), document); + ingestDocument.setFieldValue(conditionalField, falseValue); + processor.execute(ingestDocument); + assertThat(ingestDocument.getSourceAndMetadata().get(conditionalField), is(falseValue)); + assertThat(ingestDocument.getSourceAndMetadata(), not(hasKey("foo"))); + assertStats(processor, 0, 0, 0); + + ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), document); + ingestDocument.setFieldValue(conditionalField, falseValue); + ingestDocument.setFieldValue("error", true); + processor.execute(ingestDocument); + assertStats(processor, 0, 0, 0); + + //true, always call processor and increments metrics + ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), document); ingestDocument.setFieldValue(conditionalField, trueValue); processor.execute(ingestDocument); assertThat(ingestDocument.getSourceAndMetadata().get(conditionalField), is(trueValue)); assertThat(ingestDocument.getSourceAndMetadata().get("foo"), is("bar")); + assertStats(processor, 1, 0, 1); - String falseValue = "falsy"; ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), document); - ingestDocument.setFieldValue(conditionalField, falseValue); - processor.execute(ingestDocument); - assertThat(ingestDocument.getSourceAndMetadata().get(conditionalField), is(falseValue)); - assertThat(ingestDocument.getSourceAndMetadata(), not(hasKey("foo"))); + ingestDocument.setFieldValue(conditionalField, trueValue); + ingestDocument.setFieldValue("error", true); + IngestDocument finalIngestDocument = ingestDocument; + expectThrows(RuntimeException.class, () -> processor.execute(finalIngestDocument)); + assertStats(processor, 2, 1, 2); } @SuppressWarnings("unchecked") @@ -141,5 +169,14 @@ private static void assertMutatingCtxThrows(Consumer> mutati Exception e = expectedException.get(); assertThat(e, instanceOf(UnsupportedOperationException.class)); assertEquals("Mutating ingest documents in conditionals is not supported", e.getMessage()); + assertStats(processor, 0, 0, 0); + } + + private static void assertStats(ConditionalProcessor conditionalProcessor, long count, long failed, long time) { + IngestStats.Stats stats = conditionalProcessor.getMetric().createStats(); + assertThat(stats.getIngestCount(), equalTo(count)); + assertThat(stats.getIngestCurrent(), equalTo(0L)); + assertThat(stats.getIngestFailedCount(), equalTo(failed)); + assertThat(stats.getIngestTimeInMillis(), greaterThanOrEqualTo(time)); } } diff --git a/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java b/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java index 2e66c958c752b..3598b4ea3a685 100644 --- a/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java +++ b/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java @@ -63,6 +63,7 @@ import static java.util.Collections.emptyMap; import static java.util.Collections.emptySet; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.notNullValue; @@ -746,16 +747,23 @@ public void testBulkRequestExecution() { verify(completionHandler, times(1)).accept(null); } - public void testStats() { + public void testStats() throws Exception { final Processor processor = mock(Processor.class); - IngestService ingestService = createWithProcessors(Collections.singletonMap( - "mock", (factories, tag, config) -> processor)); + final Processor processorFailure = mock(Processor.class); + when(processor.getType()).thenReturn("mock"); + when(processor.getTag()).thenReturn("mockTag"); + when(processorFailure.getType()).thenReturn("failure-mock"); + //avoid returning null and dropping the document + when(processor.execute(any(IngestDocument.class))).thenReturn( RandomDocumentPicks.randomIngestDocument(random())); + when(processorFailure.execute(any(IngestDocument.class))).thenThrow(new RuntimeException("error")); + Map map = new HashMap<>(2); + map.put("mock", (factories, tag, config) -> processor); + map.put("failure-mock", (factories, tag, config) -> processorFailure); + IngestService ingestService = createWithProcessors(map); + final IngestStats initialStats = ingestService.stats(); - assertThat(initialStats.getStatsPerPipeline().size(), equalTo(0)); - assertThat(initialStats.getTotalStats().getIngestCount(), equalTo(0L)); - assertThat(initialStats.getTotalStats().getIngestCurrent(), equalTo(0L)); - assertThat(initialStats.getTotalStats().getIngestFailedCount(), equalTo(0L)); - assertThat(initialStats.getTotalStats().getIngestTimeInMillis(), equalTo(0L)); + assertThat(initialStats.getPipelineStats().size(), equalTo(0)); + assertStats(initialStats.getTotalStats(), 0, 0, 0); PutPipelineRequest putRequest = new PutPipelineRequest("_id1", new BytesArray("{\"processors\": [{\"mock\" : {}}]}"), XContentType.JSON); @@ -769,7 +777,6 @@ public void testStats() { clusterState = IngestService.innerPut(putRequest, clusterState); ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState)); - @SuppressWarnings("unchecked") final BiConsumer failureHandler = mock(BiConsumer.class); @SuppressWarnings("unchecked") final Consumer completionHandler = mock(Consumer.class); @@ -778,18 +785,33 @@ public void testStats() { indexRequest.source(randomAlphaOfLength(10), randomAlphaOfLength(10)); ingestService.executeBulkRequest(Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {}); final IngestStats afterFirstRequestStats = ingestService.stats(); - assertThat(afterFirstRequestStats.getStatsPerPipeline().size(), equalTo(2)); - assertThat(afterFirstRequestStats.getStatsPerPipeline().get("_id1").getIngestCount(), equalTo(1L)); - assertThat(afterFirstRequestStats.getStatsPerPipeline().get("_id2").getIngestCount(), equalTo(0L)); - assertThat(afterFirstRequestStats.getTotalStats().getIngestCount(), equalTo(1L)); + assertThat(afterFirstRequestStats.getPipelineStats().size(), equalTo(2)); + + afterFirstRequestStats.getProcessorStats().get("_id1").forEach(p -> assertEquals(p.getName(), "mock:mockTag")); + afterFirstRequestStats.getProcessorStats().get("_id2").forEach(p -> assertEquals(p.getName(), "mock:mockTag")); + + //total + assertStats(afterFirstRequestStats.getTotalStats(), 1, 0 ,0); + //pipeline + assertPipelineStats(afterFirstRequestStats.getPipelineStats(), "_id1", 1, 0, 0); + assertPipelineStats(afterFirstRequestStats.getPipelineStats(), "_id2", 0, 0, 0); + //processor + assertProcessorStats(0, afterFirstRequestStats, "_id1", 1, 0, 0); + assertProcessorStats(0, afterFirstRequestStats, "_id2", 0, 0, 0); + indexRequest.setPipeline("_id2"); ingestService.executeBulkRequest(Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {}); final IngestStats afterSecondRequestStats = ingestService.stats(); - assertThat(afterSecondRequestStats.getStatsPerPipeline().size(), equalTo(2)); - assertThat(afterSecondRequestStats.getStatsPerPipeline().get("_id1").getIngestCount(), equalTo(1L)); - assertThat(afterSecondRequestStats.getStatsPerPipeline().get("_id2").getIngestCount(), equalTo(1L)); - assertThat(afterSecondRequestStats.getTotalStats().getIngestCount(), equalTo(2L)); + assertThat(afterSecondRequestStats.getPipelineStats().size(), equalTo(2)); + //total + assertStats(afterSecondRequestStats.getTotalStats(), 2, 0 ,0); + //pipeline + assertPipelineStats(afterSecondRequestStats.getPipelineStats(), "_id1", 1, 0, 0); + assertPipelineStats(afterSecondRequestStats.getPipelineStats(), "_id2", 1, 0, 0); + //processor + assertProcessorStats(0, afterSecondRequestStats, "_id1", 1, 0, 0); + assertProcessorStats(0, afterSecondRequestStats, "_id2", 1, 0, 0); //update cluster state and ensure that new stats are added to old stats putRequest = new PutPipelineRequest("_id1", @@ -800,13 +822,66 @@ public void testStats() { indexRequest.setPipeline("_id1"); ingestService.executeBulkRequest(Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {}); final IngestStats afterThirdRequestStats = ingestService.stats(); - assertThat(afterThirdRequestStats.getStatsPerPipeline().size(), equalTo(2)); - assertThat(afterThirdRequestStats.getStatsPerPipeline().get("_id1").getIngestCount(), equalTo(2L)); - assertThat(afterThirdRequestStats.getStatsPerPipeline().get("_id2").getIngestCount(), equalTo(1L)); - assertThat(afterThirdRequestStats.getTotalStats().getIngestCount(), equalTo(3L)); + assertThat(afterThirdRequestStats.getPipelineStats().size(), equalTo(2)); + //total + assertStats(afterThirdRequestStats.getTotalStats(), 3, 0 ,0); + //pipeline + assertPipelineStats(afterThirdRequestStats.getPipelineStats(), "_id1", 2, 0, 0); + assertPipelineStats(afterThirdRequestStats.getPipelineStats(), "_id2", 1, 0, 0); + //The number of processors for the "id1" pipeline changed, so the per-processor metrics are not carried forward. This is + //due to the parallel array's used to identify which metrics to carry forward. With out unique ids or semantic equals for each + //processor, parallel arrays are the best option for of carrying forward metrics between pipeline changes. However, in some cases, + //like this one it may not readily obvious why the metrics were not carried forward. + assertProcessorStats(0, afterThirdRequestStats, "_id1", 1, 0, 0); + assertProcessorStats(1, afterThirdRequestStats, "_id1", 1, 0, 0); + assertProcessorStats(0, afterThirdRequestStats, "_id2", 1, 0, 0); + + //test a failure, and that the processor stats are added from the old stats + putRequest = new PutPipelineRequest("_id1", + new BytesArray("{\"processors\": [{\"failure-mock\" : { \"on_failure\": [{\"mock\" : {}}]}}, {\"mock\" : {}}]}"), + XContentType.JSON); + previousClusterState = clusterState; + clusterState = IngestService.innerPut(putRequest, clusterState); + ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState)); + indexRequest.setPipeline("_id1"); + ingestService.executeBulkRequest(Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {}); + final IngestStats afterForthRequestStats = ingestService.stats(); + assertThat(afterForthRequestStats.getPipelineStats().size(), equalTo(2)); + //total + assertStats(afterForthRequestStats.getTotalStats(), 4, 0 ,0); + //pipeline + assertPipelineStats(afterForthRequestStats.getPipelineStats(), "_id1", 3, 0, 0); + assertPipelineStats(afterForthRequestStats.getPipelineStats(), "_id2", 1, 0, 0); + //processor + assertProcessorStats(0, afterForthRequestStats, "_id1", 1, 1, 0); //not carried forward since type changed + assertProcessorStats(1, afterForthRequestStats, "_id1", 2, 0, 0); //carried forward and added from old stats + assertProcessorStats(0, afterForthRequestStats, "_id2", 1, 0, 0); + } + public void testStatName(){ + Processor processor = mock(Processor.class); + String name = randomAlphaOfLength(10); + when(processor.getType()).thenReturn(name); + assertThat(IngestService.getProcessorName(processor), equalTo(name)); + String tag = randomAlphaOfLength(10); + when(processor.getTag()).thenReturn(tag); + assertThat(IngestService.getProcessorName(processor), equalTo(name + ":" + tag)); + + ConditionalProcessor conditionalProcessor = mock(ConditionalProcessor.class); + when(conditionalProcessor.getProcessor()).thenReturn(processor); + assertThat(IngestService.getProcessorName(conditionalProcessor), equalTo(name + ":" + tag)); + + PipelineProcessor pipelineProcessor = mock(PipelineProcessor.class); + String pipelineName = randomAlphaOfLength(10); + when(pipelineProcessor.getPipelineName()).thenReturn(pipelineName); + name = PipelineProcessor.TYPE; + when(pipelineProcessor.getType()).thenReturn(name); + assertThat(IngestService.getProcessorName(pipelineProcessor), equalTo(name + ":" + pipelineName)); + when(pipelineProcessor.getTag()).thenReturn(tag); + assertThat(IngestService.getProcessorName(pipelineProcessor), equalTo(name + ":" + pipelineName + ":" + tag)); } + public void testExecuteWithDrop() { Map factories = new HashMap<>(); factories.put("drop", new DropProcessor.Factory()); @@ -935,4 +1010,23 @@ public boolean matches(Object o) { return false; } } + + private void assertProcessorStats(int processor, IngestStats stats, String pipelineId, long count, long failed, long time) { + assertStats(stats.getProcessorStats().get(pipelineId).get(processor).getStats(), count, failed, time); + } + + private void assertPipelineStats(List pipelineStats, String pipelineId, long count, long failed, long time) { + assertStats(getPipelineStats(pipelineStats, pipelineId), count, failed, time); + } + + private void assertStats(IngestStats.Stats stats, long count, long failed, long time) { + assertThat(stats.getIngestCount(), equalTo(count)); + assertThat(stats.getIngestCurrent(), equalTo(0L)); + assertThat(stats.getIngestFailedCount(), equalTo(failed)); + assertThat(stats.getIngestTimeInMillis(), greaterThanOrEqualTo(time)); + } + + private IngestStats.Stats getPipelineStats(List pipelineStats, String id) { + return pipelineStats.stream().filter(p1 -> p1.getPipelineId().equals(id)).findFirst().map(p2 -> p2.getStats()).orElse(null); + } } diff --git a/server/src/test/java/org/elasticsearch/ingest/IngestStatsTests.java b/server/src/test/java/org/elasticsearch/ingest/IngestStatsTests.java index 9974dd568a8c7..04bfcbb92b8e9 100644 --- a/server/src/test/java/org/elasticsearch/ingest/IngestStatsTests.java +++ b/server/src/test/java/org/elasticsearch/ingest/IngestStatsTests.java @@ -19,44 +19,70 @@ package org.elasticsearch.ingest; +import org.elasticsearch.Version; +import org.elasticsearch.common.collect.MapBuilder; import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.test.VersionUtils; import java.io.IOException; import java.util.Collections; +import java.util.Iterator; +import java.util.List; import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.Stream; public class IngestStatsTests extends ESTestCase { public void testSerialization() throws IOException { - IngestStats.Stats total = new IngestStats.Stats(5, 10, 20, 30); - IngestStats.Stats foo = new IngestStats.Stats(50, 100, 200, 300); - IngestStats ingestStats = new IngestStats(total, Collections.singletonMap("foo", foo)); - IngestStats serialize = serialize(ingestStats); - assertNotSame(serialize, ingestStats); - assertNotSame(serialize.getTotalStats(), total); - assertEquals(total.getIngestCount(), serialize.getTotalStats().getIngestCount()); - assertEquals(total.getIngestFailedCount(), serialize.getTotalStats().getIngestFailedCount()); - assertEquals(total.getIngestTimeInMillis(), serialize.getTotalStats().getIngestTimeInMillis()); - assertEquals(total.getIngestCurrent(), serialize.getTotalStats().getIngestCurrent()); - - assertEquals(ingestStats.getStatsPerPipeline().size(), 1); - assertTrue(ingestStats.getStatsPerPipeline().containsKey("foo")); - - Map left = ingestStats.getStatsPerPipeline(); - Map right = serialize.getStatsPerPipeline(); - - assertEquals(right.size(), 1); - assertTrue(right.containsKey("foo")); - assertEquals(left.size(), 1); - assertTrue(left.containsKey("foo")); - IngestStats.Stats leftStats = left.get("foo"); - IngestStats.Stats rightStats = right.get("foo"); - assertEquals(leftStats.getIngestCount(), rightStats.getIngestCount()); - assertEquals(leftStats.getIngestFailedCount(), rightStats.getIngestFailedCount()); - assertEquals(leftStats.getIngestTimeInMillis(), rightStats.getIngestTimeInMillis()); - assertEquals(leftStats.getIngestCurrent(), rightStats.getIngestCurrent()); + IngestStats.Stats totalStats = new IngestStats.Stats(50, 100, 200, 300); + List pipelineStats = createPipelineStats(); + Map> processorStats = createProcessorStats(pipelineStats); + IngestStats ingestStats = new IngestStats(totalStats, pipelineStats, processorStats); + IngestStats serializedStats = serialize(ingestStats); + assertIngestStats(ingestStats, serializedStats, true); + } + + public void testReadLegacyStream() throws IOException { + IngestStats.Stats totalStats = new IngestStats.Stats(50, 100, 200, 300); + List pipelineStats = createPipelineStats(); + + //legacy output logic + BytesStreamOutput out = new BytesStreamOutput(); + out.setVersion(VersionUtils.getPreviousVersion(Version.V_6_5_0)); + totalStats.writeTo(out); + out.writeVInt(pipelineStats.size()); + for (IngestStats.PipelineStat pipelineStat : pipelineStats) { + out.writeString(pipelineStat.getPipelineId()); + pipelineStat.getStats().writeTo(out); + } + + StreamInput in = out.bytes().streamInput(); + in.setVersion(VersionUtils.getPreviousVersion(Version.V_6_5_0)); + IngestStats serializedStats = new IngestStats(in); + IngestStats expectedStats = new IngestStats(totalStats, pipelineStats, Collections.emptyMap()); + assertIngestStats(expectedStats, serializedStats, false); + } + + private List createPipelineStats() { + IngestStats.PipelineStat pipeline1Stats = new IngestStats.PipelineStat("pipeline1", new IngestStats.Stats(3, 3, 3, 3)); + IngestStats.PipelineStat pipeline2Stats = new IngestStats.PipelineStat("pipeline2", new IngestStats.Stats(47, 97, 197, 297)); + IngestStats.PipelineStat pipeline3Stats = new IngestStats.PipelineStat("pipeline3", new IngestStats.Stats(0, 0, 0, 0)); + return Stream.of(pipeline1Stats, pipeline2Stats, pipeline3Stats).collect(Collectors.toList()); + } + + private Map> createProcessorStats(List pipelineStats){ + assert(pipelineStats.size() >= 2); + IngestStats.ProcessorStat processor1Stat = new IngestStats.ProcessorStat("processor1", new IngestStats.Stats(1, 1, 1, 1)); + IngestStats.ProcessorStat processor2Stat = new IngestStats.ProcessorStat("processor2", new IngestStats.Stats(2, 2, 2, 2)); + IngestStats.ProcessorStat processor3Stat = new IngestStats.ProcessorStat("processor3", new IngestStats.Stats(47, 97, 197, 297)); + //pipeline1 -> processor1,processor2; pipeline2 -> processor3 + return MapBuilder.>newMapBuilder() + .put(pipelineStats.get(0).getPipelineId(), Stream.of(processor1Stat, processor2Stat).collect(Collectors.toList())) + .put(pipelineStats.get(1).getPipelineId(), Collections.singletonList(processor3Stat)) + .map(); } private IngestStats serialize(IngestStats stats) throws IOException { @@ -65,4 +91,48 @@ private IngestStats serialize(IngestStats stats) throws IOException { StreamInput in = out.bytes().streamInput(); return new IngestStats(in); } + + private void assertIngestStats(IngestStats ingestStats, IngestStats serializedStats, boolean expectProcessors){ + assertNotSame(ingestStats, serializedStats); + assertNotSame(ingestStats.getTotalStats(), serializedStats.getTotalStats()); + assertNotSame(ingestStats.getPipelineStats(), serializedStats.getPipelineStats()); + assertNotSame(ingestStats.getProcessorStats(), serializedStats.getProcessorStats()); + + assertStats(ingestStats.getTotalStats(), serializedStats.getTotalStats()); + assertEquals(ingestStats.getPipelineStats().size(), serializedStats.getPipelineStats().size()); + + for (IngestStats.PipelineStat serializedPipelineStat : serializedStats.getPipelineStats()) { + assertStats(getPipelineStats(ingestStats.getPipelineStats(), serializedPipelineStat.getPipelineId()), + serializedPipelineStat.getStats()); + List serializedProcessorStats = + serializedStats.getProcessorStats().get(serializedPipelineStat.getPipelineId()); + List processorStat = ingestStats.getProcessorStats().get(serializedPipelineStat.getPipelineId()); + if(expectProcessors) { + if (processorStat != null) { + Iterator it = processorStat.iterator(); + //intentionally enforcing the identical ordering + for (IngestStats.ProcessorStat serializedProcessorStat : serializedProcessorStats) { + IngestStats.ProcessorStat ps = it.next(); + assertEquals(ps.getName(), serializedProcessorStat.getName()); + assertStats(ps.getStats(), serializedProcessorStat.getStats()); + } + assertFalse(it.hasNext()); + } + }else{ + //pre 6.5 did not serialize any processor stats + assertNull(serializedProcessorStats); + } + } + + } + private void assertStats(IngestStats.Stats fromObject, IngestStats.Stats fromStream) { + assertEquals(fromObject.getIngestCount(), fromStream.getIngestCount()); + assertEquals(fromObject.getIngestFailedCount(), fromStream.getIngestFailedCount()); + assertEquals(fromObject.getIngestTimeInMillis(), fromStream.getIngestTimeInMillis()); + assertEquals(fromObject.getIngestCurrent(), fromStream.getIngestCurrent()); + } + + private IngestStats.Stats getPipelineStats(List pipelineStats, String id) { + return pipelineStats.stream().filter(p1 -> p1.getPipelineId().equals(id)).findFirst().map(p2 -> p2.getStats()).orElse(null); + } } diff --git a/server/src/test/java/org/elasticsearch/ingest/PipelineProcessorTests.java b/server/src/test/java/org/elasticsearch/ingest/PipelineProcessorTests.java index 018ded346d4fc..eea0f03fa647f 100644 --- a/server/src/test/java/org/elasticsearch/ingest/PipelineProcessorTests.java +++ b/server/src/test/java/org/elasticsearch/ingest/PipelineProcessorTests.java @@ -21,12 +21,13 @@ import org.elasticsearch.ElasticsearchException; import org.elasticsearch.test.ESTestCase; -import java.time.Clock; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.function.LongSupplier; import static org.hamcrest.CoreMatchers.equalTo; import static org.mockito.Mockito.mock; @@ -143,15 +144,15 @@ public void testPipelineProcessorWithPipelineChain() throws Exception { pipeline2ProcessorConfig.put("pipeline", pipeline3Id); PipelineProcessor pipeline2Processor = factory.create(Collections.emptyMap(), null, pipeline2ProcessorConfig); - Clock clock = mock(Clock.class); - when(clock.millis()).thenReturn(0L).thenReturn(0L); + LongSupplier relativeTimeProvider = mock(LongSupplier.class); + when(relativeTimeProvider.getAsLong()).thenReturn(0L); Pipeline pipeline1 = new Pipeline( - pipeline1Id, null, null, new CompoundProcessor(pipeline1Processor), clock + pipeline1Id, null, null, new CompoundProcessor(pipeline1Processor), relativeTimeProvider ); String key1 = randomAlphaOfLength(10); - clock = mock(Clock.class); - when(clock.millis()).thenReturn(0L).thenReturn(3L); + relativeTimeProvider = mock(LongSupplier.class); + when(relativeTimeProvider.getAsLong()).thenReturn(0L, TimeUnit.MILLISECONDS.toNanos(3)); Pipeline pipeline2 = new Pipeline( pipeline2Id, null, null, new CompoundProcessor(true, Arrays.asList( @@ -160,15 +161,15 @@ pipeline2Id, null, null, new CompoundProcessor(true, }), pipeline2Processor), Collections.emptyList()), - clock + relativeTimeProvider ); - clock = mock(Clock.class); - when(clock.millis()).thenReturn(0L).thenReturn(2L); + relativeTimeProvider = mock(LongSupplier.class); + when(relativeTimeProvider.getAsLong()).thenReturn(0L, TimeUnit.MILLISECONDS.toNanos(2)); Pipeline pipeline3 = new Pipeline( pipeline3Id, null, null, new CompoundProcessor( new TestProcessor(ingestDocument -> { throw new RuntimeException("error"); - })), clock + })), relativeTimeProvider ); when(ingestService.getPipeline(pipeline1Id)).thenReturn(pipeline1); when(ingestService.getPipeline(pipeline2Id)).thenReturn(pipeline2);