From 1ff4b77a5a77111b3d337bc7f5e324ff99819574 Mon Sep 17 00:00:00 2001 From: Jake Landis Date: Sat, 20 Oct 2018 16:01:01 -0500 Subject: [PATCH 1/3] ingest: processor stats (#34202) This change introduces stats per processors. Total, time, failed, current are currently supported. All pipelines will now show all top level processors that belong to it. Failure processors are not displayed, however, the time taken to execute the failure chain is part of the stats for the top level processor. The processor name is the type of the processor, ordered as defined in the pipeline. If a tag for the processor is found, then the tag is appended to the type. Pipeline processors will have the pipeline name appended to the name of the name of the processors (before the tag if one exists). If more then one pipeline is used to process the document, then each pipeline will carry its own stats. The outer most pipeline will also include the inner most pipeline stats. Conditional processors will only included in the stats if the condition evaluates to true. --- .../ingest/CompoundProcessor.java | 31 +++- .../ingest/ConditionalProcessor.java | 31 +++- .../elasticsearch/ingest/IngestService.java | 112 ++++++++++-- .../org/elasticsearch/ingest/IngestStats.java | 169 +++++++++++++++--- .../org/elasticsearch/ingest/Pipeline.java | 16 +- .../ingest/PipelineProcessor.java | 4 + .../cluster/node/stats/NodeStatsTests.java | 58 ++++-- .../ingest/CompoundProcessorTests.java | 86 +++++++-- .../ingest/ConditionalProcessorTests.java | 51 +++++- .../ingest/IngestServiceTests.java | 136 +++++++++++--- .../ingest/IngestStatsTests.java | 83 ++++++--- .../ingest/PipelineProcessorTests.java | 21 +-- 12 files changed, 652 insertions(+), 146 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/ingest/CompoundProcessor.java b/server/src/main/java/org/elasticsearch/ingest/CompoundProcessor.java index e1a413f6aa9bb..3b8281bd471d2 100644 --- a/server/src/main/java/org/elasticsearch/ingest/CompoundProcessor.java +++ b/server/src/main/java/org/elasticsearch/ingest/CompoundProcessor.java @@ -20,12 +20,15 @@ package org.elasticsearch.ingest; import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.common.collect.Tuple; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.function.LongSupplier; import java.util.stream.Collectors; /** @@ -40,16 +43,33 @@ public class CompoundProcessor implements Processor { private final boolean ignoreFailure; private final List processors; private final List onFailureProcessors; + private final List> processorsWithMetrics; + private final LongSupplier relativeTimeProvider; + + CompoundProcessor(LongSupplier relativeTimeProvider, Processor... processor) { + this(false, Arrays.asList(processor), Collections.emptyList(), relativeTimeProvider); + } public CompoundProcessor(Processor... processor) { this(false, Arrays.asList(processor), Collections.emptyList()); } public CompoundProcessor(boolean ignoreFailure, List processors, List onFailureProcessors) { + this(ignoreFailure, processors, onFailureProcessors, System::nanoTime); + } + CompoundProcessor(boolean ignoreFailure, List processors, List onFailureProcessors, + LongSupplier relativeTimeProvider) { super(); this.ignoreFailure = ignoreFailure; this.processors = processors; this.onFailureProcessors = onFailureProcessors; + this.relativeTimeProvider = relativeTimeProvider; + this.processorsWithMetrics = new ArrayList<>(processors.size()); + processors.forEach(p -> processorsWithMetrics.add(new Tuple<>(p, new IngestMetric()))); + } + + List> getProcessorsWithMetrics() { + return processorsWithMetrics; } public boolean isIgnoreFailure() { @@ -94,12 +114,17 @@ public String getTag() { @Override public IngestDocument execute(IngestDocument ingestDocument) throws Exception { - for (Processor processor : processors) { + for (Tuple processorWithMetric : processorsWithMetrics) { + Processor processor = processorWithMetric.v1(); + IngestMetric metric = processorWithMetric.v2(); + long startTimeInNanos = relativeTimeProvider.getAsLong(); try { + metric.preIngest(); if (processor.execute(ingestDocument) == null) { return null; } } catch (Exception e) { + metric.ingestFailed(); if (ignoreFailure) { continue; } @@ -112,11 +137,15 @@ public IngestDocument execute(IngestDocument ingestDocument) throws Exception { executeOnFailure(ingestDocument, compoundProcessorException); break; } + } finally { + long ingestTimeInMillis = TimeUnit.NANOSECONDS.toMillis(relativeTimeProvider.getAsLong() - startTimeInNanos); + metric.postIngest(ingestTimeInMillis); } } return ingestDocument; } + void executeOnFailure(IngestDocument ingestDocument, ElasticsearchException exception) throws Exception { try { putFailureMetadata(ingestDocument, exception); diff --git a/server/src/main/java/org/elasticsearch/ingest/ConditionalProcessor.java b/server/src/main/java/org/elasticsearch/ingest/ConditionalProcessor.java index b6f6612344a39..9078dc86c1b07 100644 --- a/server/src/main/java/org/elasticsearch/ingest/ConditionalProcessor.java +++ b/server/src/main/java/org/elasticsearch/ingest/ConditionalProcessor.java @@ -28,6 +28,8 @@ import java.util.ListIterator; import java.util.Map; import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.function.LongSupplier; import java.util.stream.Collectors; import org.elasticsearch.script.IngestConditionalScript; import org.elasticsearch.script.Script; @@ -42,12 +44,20 @@ public class ConditionalProcessor extends AbstractProcessor { private final ScriptService scriptService; private final Processor processor; + private final IngestMetric metric; + private final LongSupplier relativeTimeProvider; ConditionalProcessor(String tag, Script script, ScriptService scriptService, Processor processor) { + this(tag, script, scriptService, processor, System::nanoTime); + } + + ConditionalProcessor(String tag, Script script, ScriptService scriptService, Processor processor, LongSupplier relativeTimeProvider) { super(tag); this.condition = script; this.scriptService = scriptService; this.processor = processor; + this.metric = new IngestMetric(); + this.relativeTimeProvider = relativeTimeProvider; } @Override @@ -55,11 +65,30 @@ public IngestDocument execute(IngestDocument ingestDocument) throws Exception { IngestConditionalScript script = scriptService.compile(condition, IngestConditionalScript.CONTEXT).newInstance(condition.getParams()); if (script.execute(new UnmodifiableIngestData(ingestDocument.getSourceAndMetadata()))) { - return processor.execute(ingestDocument); + // Only record metric if the script evaluates to true + long startTimeInNanos = relativeTimeProvider.getAsLong(); + try { + metric.preIngest(); + return processor.execute(ingestDocument); + } catch (Exception e) { + metric.ingestFailed(); + throw e; + } finally { + long ingestTimeInMillis = TimeUnit.NANOSECONDS.toMillis(relativeTimeProvider.getAsLong() - startTimeInNanos); + metric.postIngest(ingestTimeInMillis); + } } return ingestDocument; } + Processor getProcessor() { + return processor; + } + + IngestMetric getMetric() { + return metric; + } + @Override public String getType() { return TYPE; diff --git a/server/src/main/java/org/elasticsearch/ingest/IngestService.java b/server/src/main/java/org/elasticsearch/ingest/IngestService.java index 6c46a9b2354f6..705e77028a1ef 100644 --- a/server/src/main/java/org/elasticsearch/ingest/IngestService.java +++ b/server/src/main/java/org/elasticsearch/ingest/IngestService.java @@ -19,19 +19,6 @@ package org.elasticsearch.ingest; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Objects; -import java.util.Set; -import java.util.concurrent.TimeUnit; -import java.util.function.BiConsumer; -import java.util.function.Consumer; -import java.util.stream.Collectors; - import org.elasticsearch.ElasticsearchParseException; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.ResourceNotFoundException; @@ -49,6 +36,7 @@ import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.regex.Regex; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.AbstractRunnable; @@ -61,6 +49,19 @@ import org.elasticsearch.script.ScriptService; import org.elasticsearch.threadpool.ThreadPool; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.function.BiConsumer; +import java.util.function.Consumer; + /** * Holder class for several ingest related services. */ @@ -262,11 +263,59 @@ public void applyClusterState(final ClusterChangedEvent event) { Pipeline originalPipeline = originalPipelines.get(id); if (originalPipeline != null) { pipeline.getMetrics().add(originalPipeline.getMetrics()); + List> oldPerProcessMetrics = new ArrayList<>(); + List> newPerProcessMetrics = new ArrayList<>(); + getProcessorMetrics(originalPipeline.getCompoundProcessor(), oldPerProcessMetrics); + getProcessorMetrics(pipeline.getCompoundProcessor(), newPerProcessMetrics); + //Best attempt to populate new processor metrics using a parallel array of the old metrics. This is not ideal since + //the per processor metrics may get reset when the arrays don't match. However, to get to an ideal model, unique and + //consistent id's per processor and/or semantic equals for each processor will be needed. + if (newPerProcessMetrics.size() == oldPerProcessMetrics.size()) { + Iterator> oldMetricsIterator = oldPerProcessMetrics.iterator(); + for (Tuple compositeMetric : newPerProcessMetrics) { + String type = compositeMetric.v1().getType(); + IngestMetric metric = compositeMetric.v2(); + if (oldMetricsIterator.hasNext()) { + Tuple oldCompositeMetric = oldMetricsIterator.next(); + String oldType = oldCompositeMetric.v1().getType(); + IngestMetric oldMetric = oldCompositeMetric.v2(); + if (type.equals(oldType)) { + metric.add(oldMetric); + } + } + } + } } }); } } + /** + * Recursive method to obtain all of the non-failure processors for given compoundProcessor. Since conditionals are implemented as + * wrappers to the actual processor, always prefer the actual processor's metric over the conditional processor's metric. + * @param compoundProcessor The compound processor to start walking the non-failure processors + * @param processorMetrics The list of {@link Processor} {@link IngestMetric} tuples. + * @return the processorMetrics for all non-failure processor that belong to the original compoundProcessor + */ + private static List> getProcessorMetrics(CompoundProcessor compoundProcessor, + List> processorMetrics) { + //only surface the top level non-failure processors, on-failure processor times will be included in the top level non-failure + for (Tuple processorWithMetric : compoundProcessor.getProcessorsWithMetrics()) { + Processor processor = processorWithMetric.v1(); + IngestMetric metric = processorWithMetric.v2(); + if (processor instanceof CompoundProcessor) { + getProcessorMetrics((CompoundProcessor) processor, processorMetrics); + } else { + //Prefer the conditional's metric since it only includes metrics when the conditional evaluated to true. + if (processor instanceof ConditionalProcessor) { + metric = ((ConditionalProcessor) processor).getMetric(); + } + processorMetrics.add(new Tuple<>(processor, metric)); + } + } + return processorMetrics; + } + private static Pipeline substitutePipeline(String id, ElasticsearchParseException e) { String tag = e.getHeaderKeys().contains("processor_tag") ? e.getHeader("processor_tag").get(0) : null; String type = e.getHeaderKeys().contains("processor_type") ? e.getHeader("processor_type").get(0) : "unknown"; @@ -371,11 +420,42 @@ protected void doRun() { } public IngestStats stats() { + IngestStats.Builder statsBuilder = new IngestStats.Builder(); + statsBuilder.addTotalMetrics(totalMetrics); + pipelines.forEach((id, pipeline) -> { + CompoundProcessor rootProcessor = pipeline.getCompoundProcessor(); + statsBuilder.addPipelineMetrics(id, pipeline.getMetrics()); + List> processorMetrics = new ArrayList<>(); + getProcessorMetrics(rootProcessor, processorMetrics); + processorMetrics.forEach(t -> { + Processor processor = t.v1(); + IngestMetric processorMetric = t.v2(); + statsBuilder.addProcessorMetrics(id, getProcessorName(processor), processorMetric); + }); + }); + return statsBuilder.build(); + } - Map statsPerPipeline = - pipelines.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, v -> v.getValue().getMetrics().createStats())); + //package private for testing + static String getProcessorName(Processor processor){ + // conditionals are implemented as wrappers around the real processor, so get the real processor for the correct type for the name + if(processor instanceof ConditionalProcessor){ + processor = ((ConditionalProcessor) processor).getProcessor(); + } + StringBuilder sb = new StringBuilder(5); + sb.append(processor.getType()); - return new IngestStats(totalMetrics.createStats(), statsPerPipeline); + if(processor instanceof PipelineProcessor){ + String pipelineName = ((PipelineProcessor) processor).getPipelineName(); + sb.append(":"); + sb.append(pipelineName); + } + String tag = processor.getTag(); + if(tag != null && !tag.isEmpty()){ + sb.append(":"); + sb.append(tag); + } + return sb.toString(); } private void innerExecute(IndexRequest indexRequest, Pipeline pipeline, Consumer itemDroppedHandler) throws Exception { diff --git a/server/src/main/java/org/elasticsearch/ingest/IngestStats.java b/server/src/main/java/org/elasticsearch/ingest/IngestStats.java index c4c1520fd19d4..e3d671bc8b2a0 100644 --- a/server/src/main/java/org/elasticsearch/ingest/IngestStats.java +++ b/server/src/main/java/org/elasticsearch/ingest/IngestStats.java @@ -27,17 +27,28 @@ import org.elasticsearch.common.xcontent.XContentBuilder; import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; public class IngestStats implements Writeable, ToXContentFragment { private final Stats totalStats; - private final Map statsPerPipeline; + private final List pipelineStats; + private final Map> processorStats; - public IngestStats(Stats totalStats, Map statsPerPipeline) { + /** + * @param totalStats - The total stats for Ingest. This is the logically the sum of all pipeline stats, + * and pipeline stats are logically the sum of the processor stats. + * @param pipelineStats - The stats for a given ingest pipeline. + * @param processorStats - The per-processor stats for a given pipeline. A map keyed by the pipeline identifier. + */ + public IngestStats(Stats totalStats, List pipelineStats, Map> processorStats) { this.totalStats = totalStats; - this.statsPerPipeline = statsPerPipeline; + this.pipelineStats = pipelineStats; + this.processorStats = processorStats; } /** @@ -46,37 +57,43 @@ public IngestStats(Stats totalStats, Map statsPerPipeline) { public IngestStats(StreamInput in) throws IOException { this.totalStats = new Stats(in); int size = in.readVInt(); - this.statsPerPipeline = new HashMap<>(size); + this.pipelineStats = new ArrayList<>(size); + this.processorStats = new HashMap<>(size); for (int i = 0; i < size; i++) { - statsPerPipeline.put(in.readString(), new Stats(in)); + String pipelineId = in.readString(); + Stats pipelineStat = new Stats(in); + this.pipelineStats.add(new PipelineStat(pipelineId, pipelineStat)); + int processorsSize = in.readVInt(); + List processorStatsPerPipeline = new ArrayList<>(processorsSize); + for (int j = 0; j < processorsSize; j++) { + String processorName = in.readString(); + Stats processorStat = new Stats(in); + processorStatsPerPipeline.add(new ProcessorStat(processorName, processorStat)); + } + this.processorStats.put(pipelineId, processorStatsPerPipeline); } } @Override public void writeTo(StreamOutput out) throws IOException { totalStats.writeTo(out); - out.writeVInt(statsPerPipeline.size()); - for (Map.Entry entry : statsPerPipeline.entrySet()) { - out.writeString(entry.getKey()); - entry.getValue().writeTo(out); + out.writeVInt(pipelineStats.size()); + for (PipelineStat pipelineStat : pipelineStats) { + out.writeString(pipelineStat.getPipelineId()); + pipelineStat.getStats().writeTo(out); + List processorStatsForPipeline = processorStats.get(pipelineStat.getPipelineId()); + if(processorStatsForPipeline == null) { + out.writeVInt(0); + }else{ + out.writeVInt(processorStatsForPipeline.size()); + for (ProcessorStat processorStat : processorStatsForPipeline) { + out.writeString(processorStat.getName()); + processorStat.getStats().writeTo(out); + } + } } } - - /** - * @return The accumulated stats for all pipelines - */ - public Stats getTotalStats() { - return totalStats; - } - - /** - * @return The stats on a per pipeline basis - */ - public Map getStatsPerPipeline() { - return statsPerPipeline; - } - @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject("ingest"); @@ -84,9 +101,21 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws totalStats.toXContent(builder, params); builder.endObject(); builder.startObject("pipelines"); - for (Map.Entry entry : statsPerPipeline.entrySet()) { - builder.startObject(entry.getKey()); - entry.getValue().toXContent(builder, params); + for (PipelineStat pipelineStat : pipelineStats) { + builder.startObject(pipelineStat.getPipelineId()); + pipelineStat.getStats().toXContent(builder, params); + List processorStatsForPipeline = processorStats.get(pipelineStat.getPipelineId()); + builder.startArray("processors"); + if (processorStatsForPipeline != null) { + for (ProcessorStat processorStat : processorStatsForPipeline) { + builder.startObject(); + builder.startObject(processorStat.getName()); + processorStat.getStats().toXContent(builder, params); + builder.endObject(); + builder.endObject(); + } + } + builder.endArray(); builder.endObject(); } builder.endObject(); @@ -94,6 +123,18 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws return builder; } + public Stats getTotalStats() { + return totalStats; + } + + public List getPipelineStats() { + return pipelineStats; + } + + public Map> getProcessorStats() { + return processorStats; + } + public static class Stats implements Writeable, ToXContentFragment { private final long ingestCount; @@ -134,7 +175,6 @@ public long getIngestCount() { } /** - * * @return The total time spent of ingest preprocessing in millis. */ public long getIngestTimeInMillis() { @@ -164,4 +204,77 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws return builder; } } + + /** + * Easy conversion from scoped {@link IngestMetric} objects to a serializable Stats objects + */ + static class Builder { + private Stats totalStats; + private List pipelineStats = new ArrayList<>(); + private Map> processorStats = new HashMap<>(); + + + Builder addTotalMetrics(IngestMetric totalMetric) { + this.totalStats = totalMetric.createStats(); + return this; + } + + Builder addPipelineMetrics(String pipelineId, IngestMetric pipelineMetric) { + this.pipelineStats.add(new PipelineStat(pipelineId, pipelineMetric.createStats())); + return this; + } + + Builder addProcessorMetrics(String pipelineId, String processorName, IngestMetric metric) { + this.processorStats.computeIfAbsent(pipelineId, k -> new ArrayList<>()) + .add(new ProcessorStat(processorName, metric.createStats())); + return this; + } + + IngestStats build() { + return new IngestStats(totalStats, Collections.unmodifiableList(pipelineStats), + Collections.unmodifiableMap(processorStats)); + } + } + + /** + * Container for pipeline stats. + */ + public static class PipelineStat { + private final String pipelineId; + private final Stats stats; + + public PipelineStat(String pipelineId, Stats stats) { + this.pipelineId = pipelineId; + this.stats = stats; + } + + public String getPipelineId() { + return pipelineId; + } + + public Stats getStats() { + return stats; + } + } + + /** + * Container for processor stats. + */ + public static class ProcessorStat { + private final String name; + private final Stats stats; + + public ProcessorStat(String name, Stats stats) { + this.name = name; + this.stats = stats; + } + + public String getName() { + return name; + } + + public Stats getStats() { + return stats; + } + } } diff --git a/server/src/main/java/org/elasticsearch/ingest/Pipeline.java b/server/src/main/java/org/elasticsearch/ingest/Pipeline.java index 8d5f6d6ff7c54..fc5311be5cbde 100644 --- a/server/src/main/java/org/elasticsearch/ingest/Pipeline.java +++ b/server/src/main/java/org/elasticsearch/ingest/Pipeline.java @@ -22,11 +22,12 @@ import org.elasticsearch.ElasticsearchParseException; import org.elasticsearch.common.Nullable; -import java.time.Clock; import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.function.LongSupplier; import org.elasticsearch.script.ScriptService; @@ -47,20 +48,21 @@ public final class Pipeline { private final Integer version; private final CompoundProcessor compoundProcessor; private final IngestMetric metrics; - private final Clock clock; + private final LongSupplier relativeTimeProvider; public Pipeline(String id, @Nullable String description, @Nullable Integer version, CompoundProcessor compoundProcessor) { - this(id, description, version, compoundProcessor, Clock.systemUTC()); + this(id, description, version, compoundProcessor, System::nanoTime); } //package private for testing - Pipeline(String id, @Nullable String description, @Nullable Integer version, CompoundProcessor compoundProcessor, Clock clock) { + Pipeline(String id, @Nullable String description, @Nullable Integer version, CompoundProcessor compoundProcessor, + LongSupplier relativeTimeProvider) { this.id = id; this.description = description; this.compoundProcessor = compoundProcessor; this.version = version; this.metrics = new IngestMetric(); - this.clock = clock; + this.relativeTimeProvider = relativeTimeProvider; } public static Pipeline create(String id, Map config, @@ -89,7 +91,7 @@ public static Pipeline create(String id, Map config, * Modifies the data of a document to be indexed based on the processor this pipeline holds */ public IngestDocument execute(IngestDocument ingestDocument) throws Exception { - long startTimeInMillis = clock.millis(); + long startTimeInNanos = relativeTimeProvider.getAsLong(); try { metrics.preIngest(); return compoundProcessor.execute(ingestDocument); @@ -97,7 +99,7 @@ public IngestDocument execute(IngestDocument ingestDocument) throws Exception { metrics.ingestFailed(); throw e; } finally { - long ingestTimeInMillis = clock.millis() - startTimeInMillis; + long ingestTimeInMillis = TimeUnit.NANOSECONDS.toMillis(relativeTimeProvider.getAsLong() - startTimeInNanos); metrics.postIngest(ingestTimeInMillis); } } diff --git a/server/src/main/java/org/elasticsearch/ingest/PipelineProcessor.java b/server/src/main/java/org/elasticsearch/ingest/PipelineProcessor.java index 918ff6b8aefee..16324e8dee6c7 100644 --- a/server/src/main/java/org/elasticsearch/ingest/PipelineProcessor.java +++ b/server/src/main/java/org/elasticsearch/ingest/PipelineProcessor.java @@ -53,6 +53,10 @@ public String getType() { return TYPE; } + String getPipelineName() { + return pipelineName; + } + public static final class Factory implements Processor.Factory { private final IngestService ingestService; diff --git a/server/src/test/java/org/elasticsearch/action/admin/cluster/node/stats/NodeStatsTests.java b/server/src/test/java/org/elasticsearch/action/admin/cluster/node/stats/NodeStatsTests.java index 3384efcf836c6..8f51fb08dd23f 100644 --- a/server/src/test/java/org/elasticsearch/action/admin/cluster/node/stats/NodeStatsTests.java +++ b/server/src/test/java/org/elasticsearch/action/admin/cluster/node/stats/NodeStatsTests.java @@ -53,7 +53,6 @@ import static java.util.Collections.emptySet; public class NodeStatsTests extends ESTestCase { - public void testSerialization() throws IOException { NodeStats nodeStats = createNodeStats(); try (BytesStreamOutput out = new BytesStreamOutput()) { @@ -271,14 +270,29 @@ public void testSerialization() throws IOException { assertEquals(totalStats.getIngestCurrent(), deserializedIngestStats.getTotalStats().getIngestCurrent()); assertEquals(totalStats.getIngestFailedCount(), deserializedIngestStats.getTotalStats().getIngestFailedCount()); assertEquals(totalStats.getIngestTimeInMillis(), deserializedIngestStats.getTotalStats().getIngestTimeInMillis()); - assertEquals(ingestStats.getStatsPerPipeline().size(), deserializedIngestStats.getStatsPerPipeline().size()); - for (Map.Entry entry : ingestStats.getStatsPerPipeline().entrySet()) { - IngestStats.Stats stats = entry.getValue(); - IngestStats.Stats deserializedStats = deserializedIngestStats.getStatsPerPipeline().get(entry.getKey()); - assertEquals(stats.getIngestFailedCount(), deserializedStats.getIngestFailedCount()); - assertEquals(stats.getIngestTimeInMillis(), deserializedStats.getIngestTimeInMillis()); - assertEquals(stats.getIngestCurrent(), deserializedStats.getIngestCurrent()); - assertEquals(stats.getIngestCount(), deserializedStats.getIngestCount()); + assertEquals(ingestStats.getPipelineStats().size(), deserializedIngestStats.getPipelineStats().size()); + for (IngestStats.PipelineStat pipelineStat : ingestStats.getPipelineStats()) { + String pipelineId = pipelineStat.getPipelineId(); + IngestStats.Stats deserializedPipelineStats = + getPipelineStats(deserializedIngestStats.getPipelineStats(), pipelineId); + assertEquals(pipelineStat.getStats().getIngestFailedCount(), deserializedPipelineStats.getIngestFailedCount()); + assertEquals(pipelineStat.getStats().getIngestTimeInMillis(), deserializedPipelineStats.getIngestTimeInMillis()); + assertEquals(pipelineStat.getStats().getIngestCurrent(), deserializedPipelineStats.getIngestCurrent()); + assertEquals(pipelineStat.getStats().getIngestCount(), deserializedPipelineStats.getIngestCount()); + List processorStats = ingestStats.getProcessorStats().get(pipelineId); + //intentionally validating identical order + Iterator it = deserializedIngestStats.getProcessorStats().get(pipelineId).iterator(); + for (IngestStats.ProcessorStat processorStat : processorStats) { + IngestStats.ProcessorStat deserializedProcessorStat = it.next(); + assertEquals(processorStat.getStats().getIngestFailedCount(), + deserializedProcessorStat.getStats().getIngestFailedCount()); + assertEquals(processorStat.getStats().getIngestTimeInMillis(), + deserializedProcessorStat.getStats().getIngestTimeInMillis()); + assertEquals(processorStat.getStats().getIngestCurrent(), + deserializedProcessorStat.getStats().getIngestCurrent()); + assertEquals(processorStat.getStats().getIngestCount(), deserializedProcessorStat.getStats().getIngestCount()); + } + assertFalse(it.hasNext()); } } AdaptiveSelectionStats adaptiveStats = nodeStats.getAdaptiveSelectionStats(); @@ -429,14 +443,24 @@ private static NodeStats createNodeStats() { if (frequently()) { IngestStats.Stats totalStats = new IngestStats.Stats(randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong()); + int numPipelines = randomIntBetween(0, 10); + int numProcessors = randomIntBetween(0, 10); + List ingestPipelineStats = new ArrayList<>(numPipelines); + Map> ingestProcessorStats = new HashMap<>(numPipelines); + for (int i = 0; i < numPipelines; i++) { + String pipelineId = randomAlphaOfLengthBetween(3, 10); + ingestPipelineStats.add(new IngestStats.PipelineStat(pipelineId, new IngestStats.Stats + (randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong()))); - int numStatsPerPipeline = randomIntBetween(0, 10); - Map statsPerPipeline = new HashMap<>(); - for (int i = 0; i < numStatsPerPipeline; i++) { - statsPerPipeline.put(randomAlphaOfLengthBetween(3, 10), new IngestStats.Stats(randomNonNegativeLong(), - randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong())); + List processorPerPipeline = new ArrayList<>(numProcessors); + for (int j =0; j < numProcessors;j++) { + IngestStats.Stats processorStats = new IngestStats.Stats + (randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong()); + processorPerPipeline.add(new IngestStats.ProcessorStat(randomAlphaOfLengthBetween(3, 10), processorStats)); + } + ingestProcessorStats.put(pipelineId,processorPerPipeline); } - ingestStats = new IngestStats(totalStats, statsPerPipeline); + ingestStats = new IngestStats(totalStats, ingestPipelineStats, ingestProcessorStats); } AdaptiveSelectionStats adaptiveSelectionStats = null; if (frequently()) { @@ -465,4 +489,8 @@ private static NodeStats createNodeStats() { fsInfo, transportStats, httpStats, allCircuitBreakerStats, scriptStats, discoveryStats, ingestStats, adaptiveSelectionStats); } + + private IngestStats.Stats getPipelineStats(List pipelineStats, String id) { + return pipelineStats.stream().filter(p1 -> p1.getPipelineId().equals(id)).findFirst().map(p2 -> p2.getStats()).orElse(null); + } } diff --git a/server/src/test/java/org/elasticsearch/ingest/CompoundProcessorTests.java b/server/src/test/java/org/elasticsearch/ingest/CompoundProcessorTests.java index aaede49a36d57..dabcae533a0bf 100644 --- a/server/src/test/java/org/elasticsearch/ingest/CompoundProcessorTests.java +++ b/server/src/test/java/org/elasticsearch/ingest/CompoundProcessorTests.java @@ -27,11 +27,17 @@ import java.util.Collections; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.function.LongSupplier; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.sameInstance; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; public class CompoundProcessorTests extends ESTestCase { private IngestDocument ingestDocument; @@ -49,18 +55,29 @@ public void testEmpty() throws Exception { } public void testSingleProcessor() throws Exception { - TestProcessor processor = new TestProcessor(ingestDocument -> {}); - CompoundProcessor compoundProcessor = new CompoundProcessor(processor); + LongSupplier relativeTimeProvider = mock(LongSupplier.class); + when(relativeTimeProvider.getAsLong()).thenReturn(0L, TimeUnit.MILLISECONDS.toNanos(1)); + TestProcessor processor = new TestProcessor(ingestDocument ->{ + assertStats(0, ingestDocument.getFieldValue("compoundProcessor", CompoundProcessor.class), 1, 0, 0, 0); + }); + CompoundProcessor compoundProcessor = new CompoundProcessor(relativeTimeProvider, processor); + ingestDocument.setFieldValue("compoundProcessor", compoundProcessor); //ugly hack to assert current count = 1 assertThat(compoundProcessor.getProcessors().size(), equalTo(1)); assertThat(compoundProcessor.getProcessors().get(0), sameInstance(processor)); + assertThat(compoundProcessor.getProcessors().get(0), sameInstance(processor)); assertThat(compoundProcessor.getOnFailureProcessors().isEmpty(), is(true)); compoundProcessor.execute(ingestDocument); + verify(relativeTimeProvider, times(2)).getAsLong(); assertThat(processor.getInvokedCounter(), equalTo(1)); + assertStats(compoundProcessor, 1, 0, 1); + } public void testSingleProcessorWithException() throws Exception { TestProcessor processor = new TestProcessor(ingestDocument -> {throw new RuntimeException("error");}); - CompoundProcessor compoundProcessor = new CompoundProcessor(processor); + LongSupplier relativeTimeProvider = mock(LongSupplier.class); + when(relativeTimeProvider.getAsLong()).thenReturn(0L); + CompoundProcessor compoundProcessor = new CompoundProcessor(relativeTimeProvider, processor); assertThat(compoundProcessor.getProcessors().size(), equalTo(1)); assertThat(compoundProcessor.getProcessors().get(0), sameInstance(processor)); assertThat(compoundProcessor.getOnFailureProcessors().isEmpty(), is(true)); @@ -71,15 +88,22 @@ public void testSingleProcessorWithException() throws Exception { assertThat(e.getRootCause().getMessage(), equalTo("error")); } assertThat(processor.getInvokedCounter(), equalTo(1)); + assertStats(compoundProcessor, 1, 1, 0); + } public void testIgnoreFailure() throws Exception { TestProcessor processor1 = new TestProcessor(ingestDocument -> {throw new RuntimeException("error");}); TestProcessor processor2 = new TestProcessor(ingestDocument -> {ingestDocument.setFieldValue("field", "value");}); - CompoundProcessor compoundProcessor = new CompoundProcessor(true, Arrays.asList(processor1, processor2), Collections.emptyList()); + LongSupplier relativeTimeProvider = mock(LongSupplier.class); + when(relativeTimeProvider.getAsLong()).thenReturn(0L); + CompoundProcessor compoundProcessor = + new CompoundProcessor(true, Arrays.asList(processor1, processor2), Collections.emptyList(), relativeTimeProvider); compoundProcessor.execute(ingestDocument); assertThat(processor1.getInvokedCounter(), equalTo(1)); + assertStats(0, compoundProcessor, 0, 1, 1, 0); assertThat(processor2.getInvokedCounter(), equalTo(1)); + assertStats(1, compoundProcessor, 0, 1, 0, 0); assertThat(ingestDocument.getFieldValue("field", String.class), equalTo("value")); } @@ -93,11 +117,15 @@ public void testSingleProcessorWithOnFailureProcessor() throws Exception { assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_PROCESSOR_TAG_FIELD), equalTo("id")); }); + LongSupplier relativeTimeProvider = mock(LongSupplier.class); + when(relativeTimeProvider.getAsLong()).thenReturn(0L, TimeUnit.MILLISECONDS.toNanos(1)); CompoundProcessor compoundProcessor = new CompoundProcessor(false, Collections.singletonList(processor1), - Collections.singletonList(processor2)); + Collections.singletonList(processor2), relativeTimeProvider); compoundProcessor.execute(ingestDocument); + verify(relativeTimeProvider, times(2)).getAsLong(); assertThat(processor1.getInvokedCounter(), equalTo(1)); + assertStats(compoundProcessor, 1, 1, 1); assertThat(processor2.getInvokedCounter(), equalTo(1)); } @@ -118,14 +146,17 @@ public void testSingleProcessorWithNestedFailures() throws Exception { assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_PROCESSOR_TYPE_FIELD), equalTo("second")); assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_PROCESSOR_TAG_FIELD), equalTo("id2")); }); + LongSupplier relativeTimeProvider = mock(LongSupplier.class); + when(relativeTimeProvider.getAsLong()).thenReturn(0L); CompoundProcessor compoundOnFailProcessor = new CompoundProcessor(false, Collections.singletonList(processorToFail), - Collections.singletonList(lastProcessor)); + Collections.singletonList(lastProcessor), relativeTimeProvider); CompoundProcessor compoundProcessor = new CompoundProcessor(false, Collections.singletonList(processor), - Collections.singletonList(compoundOnFailProcessor)); + Collections.singletonList(compoundOnFailProcessor), relativeTimeProvider); compoundProcessor.execute(ingestDocument); assertThat(processorToFail.getInvokedCounter(), equalTo(1)); assertThat(lastProcessor.getInvokedCounter(), equalTo(1)); + assertStats(compoundProcessor, 1, 1, 0); } public void testCompoundProcessorExceptionFailWithoutOnFailure() throws Exception { @@ -137,21 +168,24 @@ public void testCompoundProcessorExceptionFailWithoutOnFailure() throws Exceptio assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_PROCESSOR_TYPE_FIELD), equalTo("first")); assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_PROCESSOR_TAG_FIELD), equalTo("id1")); }); + LongSupplier relativeTimeProvider = mock(LongSupplier.class); + when(relativeTimeProvider.getAsLong()).thenReturn(0L); - CompoundProcessor failCompoundProcessor = new CompoundProcessor(firstProcessor); + CompoundProcessor failCompoundProcessor = new CompoundProcessor(relativeTimeProvider, firstProcessor); CompoundProcessor compoundProcessor = new CompoundProcessor(false, Collections.singletonList(failCompoundProcessor), - Collections.singletonList(secondProcessor)); + Collections.singletonList(secondProcessor), relativeTimeProvider); compoundProcessor.execute(ingestDocument); assertThat(firstProcessor.getInvokedCounter(), equalTo(1)); assertThat(secondProcessor.getInvokedCounter(), equalTo(1)); + assertStats(compoundProcessor, 1, 1, 0); } public void testCompoundProcessorExceptionFail() throws Exception { TestProcessor firstProcessor = new TestProcessor("id1", "first", ingestDocument -> {throw new RuntimeException("error");}); TestProcessor failProcessor = - new TestProcessor("tag_fail", "fail", ingestDocument -> {throw new RuntimeException("custom error message");}); + new TestProcessor("tag_fail", "fail", ingestDocument -> {throw new RuntimeException("custom error message");}); TestProcessor secondProcessor = new TestProcessor("id3", "second", ingestDocument -> { Map ingestMetadata = ingestDocument.getIngestMetadata(); assertThat(ingestMetadata.entrySet(), hasSize(3)); @@ -160,21 +194,24 @@ public void testCompoundProcessorExceptionFail() throws Exception { assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_PROCESSOR_TAG_FIELD), equalTo("tag_fail")); }); + LongSupplier relativeTimeProvider = mock(LongSupplier.class); + when(relativeTimeProvider.getAsLong()).thenReturn(0L); CompoundProcessor failCompoundProcessor = new CompoundProcessor(false, Collections.singletonList(firstProcessor), - Collections.singletonList(failProcessor)); + Collections.singletonList(failProcessor), relativeTimeProvider); CompoundProcessor compoundProcessor = new CompoundProcessor(false, Collections.singletonList(failCompoundProcessor), - Collections.singletonList(secondProcessor)); + Collections.singletonList(secondProcessor), relativeTimeProvider); compoundProcessor.execute(ingestDocument); assertThat(firstProcessor.getInvokedCounter(), equalTo(1)); assertThat(secondProcessor.getInvokedCounter(), equalTo(1)); + assertStats(compoundProcessor, 1, 1, 0); } public void testCompoundProcessorExceptionFailInOnFailure() throws Exception { TestProcessor firstProcessor = new TestProcessor("id1", "first", ingestDocument -> {throw new RuntimeException("error");}); TestProcessor failProcessor = - new TestProcessor("tag_fail", "fail", ingestDocument -> {throw new RuntimeException("custom error message");}); + new TestProcessor("tag_fail", "fail", ingestDocument -> {throw new RuntimeException("custom error message");}); TestProcessor secondProcessor = new TestProcessor("id3", "second", ingestDocument -> { Map ingestMetadata = ingestDocument.getIngestMetadata(); assertThat(ingestMetadata.entrySet(), hasSize(3)); @@ -183,27 +220,44 @@ public void testCompoundProcessorExceptionFailInOnFailure() throws Exception { assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_PROCESSOR_TAG_FIELD), equalTo("tag_fail")); }); + LongSupplier relativeTimeProvider = mock(LongSupplier.class); + when(relativeTimeProvider.getAsLong()).thenReturn(0L); CompoundProcessor failCompoundProcessor = new CompoundProcessor(false, Collections.singletonList(firstProcessor), - Collections.singletonList(new CompoundProcessor(failProcessor))); + Collections.singletonList(new CompoundProcessor(relativeTimeProvider, failProcessor))); CompoundProcessor compoundProcessor = new CompoundProcessor(false, Collections.singletonList(failCompoundProcessor), - Collections.singletonList(secondProcessor)); + Collections.singletonList(secondProcessor), relativeTimeProvider); compoundProcessor.execute(ingestDocument); assertThat(firstProcessor.getInvokedCounter(), equalTo(1)); assertThat(secondProcessor.getInvokedCounter(), equalTo(1)); + assertStats(compoundProcessor, 1, 1, 0); } public void testBreakOnFailure() throws Exception { TestProcessor firstProcessor = new TestProcessor("id1", "first", ingestDocument -> {throw new RuntimeException("error1");}); TestProcessor secondProcessor = new TestProcessor("id2", "second", ingestDocument -> {throw new RuntimeException("error2");}); TestProcessor onFailureProcessor = new TestProcessor("id2", "on_failure", ingestDocument -> {}); + LongSupplier relativeTimeProvider = mock(LongSupplier.class); + when(relativeTimeProvider.getAsLong()).thenReturn(0L); CompoundProcessor pipeline = new CompoundProcessor(false, Arrays.asList(firstProcessor, secondProcessor), - Collections.singletonList(onFailureProcessor)); + Collections.singletonList(onFailureProcessor), relativeTimeProvider); pipeline.execute(ingestDocument); assertThat(firstProcessor.getInvokedCounter(), equalTo(1)); assertThat(secondProcessor.getInvokedCounter(), equalTo(0)); assertThat(onFailureProcessor.getInvokedCounter(), equalTo(1)); + assertStats(pipeline, 1, 1, 0); + } + + private void assertStats(CompoundProcessor compoundProcessor, long count, long failed, long time) { + assertStats(0, compoundProcessor, 0L, count, failed, time); + } + private void assertStats(int processor, CompoundProcessor compoundProcessor, long current, long count, long failed, long time) { + IngestStats.Stats stats = compoundProcessor.getProcessorsWithMetrics().get(processor).v2().createStats(); + assertThat(stats.getIngestCount(), equalTo(count)); + assertThat(stats.getIngestCurrent(), equalTo(current)); + assertThat(stats.getIngestFailedCount(), equalTo(failed)); + assertThat(stats.getIngestTimeInMillis(), equalTo(time)); } } diff --git a/server/src/test/java/org/elasticsearch/ingest/ConditionalProcessorTests.java b/server/src/test/java/org/elasticsearch/ingest/ConditionalProcessorTests.java index c7d4dfa4e68cd..c5548ae559400 100644 --- a/server/src/test/java/org/elasticsearch/ingest/ConditionalProcessorTests.java +++ b/server/src/test/java/org/elasticsearch/ingest/ConditionalProcessorTests.java @@ -33,12 +33,18 @@ import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; import java.util.function.Consumer; +import java.util.function.LongSupplier; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.hasKey; import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.not; import static org.hamcrest.core.Is.is; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; public class ConditionalProcessorTests extends ESTestCase { @@ -60,6 +66,8 @@ public void testChecksCondition() throws Exception { new HashMap<>(ScriptModule.CORE_CONTEXTS) ); Map document = new HashMap<>(); + LongSupplier relativeTimeProvider = mock(LongSupplier.class); + when(relativeTimeProvider.getAsLong()).thenReturn(0L, TimeUnit.MILLISECONDS.toNanos(1), 0L, TimeUnit.MILLISECONDS.toNanos(2)); ConditionalProcessor processor = new ConditionalProcessor( randomAlphaOfLength(10), new Script( @@ -67,7 +75,10 @@ public void testChecksCondition() throws Exception { scriptName, Collections.emptyMap()), scriptService, new Processor() { @Override - public IngestDocument execute(final IngestDocument ingestDocument) throws Exception { + public IngestDocument execute(final IngestDocument ingestDocument){ + if(ingestDocument.hasField("error")){ + throw new RuntimeException("error"); + } ingestDocument.setFieldValue("foo", "bar"); return ingestDocument; } @@ -81,20 +92,37 @@ public String getType() { public String getTag() { return null; } - }); + }, relativeTimeProvider); + //false, never call processor never increments metrics + String falseValue = "falsy"; IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), document); + ingestDocument.setFieldValue(conditionalField, falseValue); + processor.execute(ingestDocument); + assertThat(ingestDocument.getSourceAndMetadata().get(conditionalField), is(falseValue)); + assertThat(ingestDocument.getSourceAndMetadata(), not(hasKey("foo"))); + assertStats(processor, 0, 0, 0); + + ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), document); + ingestDocument.setFieldValue(conditionalField, falseValue); + ingestDocument.setFieldValue("error", true); + processor.execute(ingestDocument); + assertStats(processor, 0, 0, 0); + + //true, always call processor and increments metrics + ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), document); ingestDocument.setFieldValue(conditionalField, trueValue); processor.execute(ingestDocument); assertThat(ingestDocument.getSourceAndMetadata().get(conditionalField), is(trueValue)); assertThat(ingestDocument.getSourceAndMetadata().get("foo"), is("bar")); + assertStats(processor, 1, 0, 1); - String falseValue = "falsy"; ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), document); - ingestDocument.setFieldValue(conditionalField, falseValue); - processor.execute(ingestDocument); - assertThat(ingestDocument.getSourceAndMetadata().get(conditionalField), is(falseValue)); - assertThat(ingestDocument.getSourceAndMetadata(), not(hasKey("foo"))); + ingestDocument.setFieldValue(conditionalField, trueValue); + ingestDocument.setFieldValue("error", true); + IngestDocument finalIngestDocument = ingestDocument; + expectThrows(RuntimeException.class, () -> processor.execute(finalIngestDocument)); + assertStats(processor, 2, 1, 2); } @SuppressWarnings("unchecked") @@ -141,5 +169,14 @@ private static void assertMutatingCtxThrows(Consumer> mutati Exception e = expectedException.get(); assertThat(e, instanceOf(UnsupportedOperationException.class)); assertEquals("Mutating ingest documents in conditionals is not supported", e.getMessage()); + assertStats(processor, 0, 0, 0); + } + + private static void assertStats(ConditionalProcessor conditionalProcessor, long count, long failed, long time) { + IngestStats.Stats stats = conditionalProcessor.getMetric().createStats(); + assertThat(stats.getIngestCount(), equalTo(count)); + assertThat(stats.getIngestCurrent(), equalTo(0L)); + assertThat(stats.getIngestFailedCount(), equalTo(failed)); + assertThat(stats.getIngestTimeInMillis(), greaterThanOrEqualTo(time)); } } diff --git a/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java b/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java index 4de39349dc517..3dde7babb0a96 100644 --- a/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java +++ b/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java @@ -63,6 +63,7 @@ import static java.util.Collections.emptyMap; import static java.util.Collections.emptySet; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.notNullValue; @@ -746,16 +747,23 @@ public void testBulkRequestExecution() { verify(completionHandler, times(1)).accept(null); } - public void testStats() { + public void testStats() throws Exception { final Processor processor = mock(Processor.class); - IngestService ingestService = createWithProcessors(Collections.singletonMap( - "mock", (factories, tag, config) -> processor)); + final Processor processorFailure = mock(Processor.class); + when(processor.getType()).thenReturn("mock"); + when(processor.getTag()).thenReturn("mockTag"); + when(processorFailure.getType()).thenReturn("failure-mock"); + //avoid returning null and dropping the document + when(processor.execute(any(IngestDocument.class))).thenReturn( RandomDocumentPicks.randomIngestDocument(random())); + when(processorFailure.execute(any(IngestDocument.class))).thenThrow(new RuntimeException("error")); + Map map = new HashMap<>(2); + map.put("mock", (factories, tag, config) -> processor); + map.put("failure-mock", (factories, tag, config) -> processorFailure); + IngestService ingestService = createWithProcessors(map); + final IngestStats initialStats = ingestService.stats(); - assertThat(initialStats.getStatsPerPipeline().size(), equalTo(0)); - assertThat(initialStats.getTotalStats().getIngestCount(), equalTo(0L)); - assertThat(initialStats.getTotalStats().getIngestCurrent(), equalTo(0L)); - assertThat(initialStats.getTotalStats().getIngestFailedCount(), equalTo(0L)); - assertThat(initialStats.getTotalStats().getIngestTimeInMillis(), equalTo(0L)); + assertThat(initialStats.getPipelineStats().size(), equalTo(0)); + assertStats(initialStats.getTotalStats(), 0, 0, 0); PutPipelineRequest putRequest = new PutPipelineRequest("_id1", new BytesArray("{\"processors\": [{\"mock\" : {}}]}"), XContentType.JSON); @@ -769,7 +777,6 @@ public void testStats() { clusterState = IngestService.innerPut(putRequest, clusterState); ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState)); - @SuppressWarnings("unchecked") final BiConsumer failureHandler = mock(BiConsumer.class); @SuppressWarnings("unchecked") final Consumer completionHandler = mock(Consumer.class); @@ -778,18 +785,33 @@ public void testStats() { indexRequest.source(randomAlphaOfLength(10), randomAlphaOfLength(10)); ingestService.executeBulkRequest(Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {}); final IngestStats afterFirstRequestStats = ingestService.stats(); - assertThat(afterFirstRequestStats.getStatsPerPipeline().size(), equalTo(2)); - assertThat(afterFirstRequestStats.getStatsPerPipeline().get("_id1").getIngestCount(), equalTo(1L)); - assertThat(afterFirstRequestStats.getStatsPerPipeline().get("_id2").getIngestCount(), equalTo(0L)); - assertThat(afterFirstRequestStats.getTotalStats().getIngestCount(), equalTo(1L)); + assertThat(afterFirstRequestStats.getPipelineStats().size(), equalTo(2)); + + afterFirstRequestStats.getProcessorStats().get("_id1").forEach(p -> assertEquals(p.getName(), "mock:mockTag")); + afterFirstRequestStats.getProcessorStats().get("_id2").forEach(p -> assertEquals(p.getName(), "mock:mockTag")); + + //total + assertStats(afterFirstRequestStats.getTotalStats(), 1, 0 ,0); + //pipeline + assertPipelineStats(afterFirstRequestStats.getPipelineStats(), "_id1", 1, 0, 0); + assertPipelineStats(afterFirstRequestStats.getPipelineStats(), "_id2", 0, 0, 0); + //processor + assertProcessorStats(0, afterFirstRequestStats, "_id1", 1, 0, 0); + assertProcessorStats(0, afterFirstRequestStats, "_id2", 0, 0, 0); + indexRequest.setPipeline("_id2"); ingestService.executeBulkRequest(Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {}); final IngestStats afterSecondRequestStats = ingestService.stats(); - assertThat(afterSecondRequestStats.getStatsPerPipeline().size(), equalTo(2)); - assertThat(afterSecondRequestStats.getStatsPerPipeline().get("_id1").getIngestCount(), equalTo(1L)); - assertThat(afterSecondRequestStats.getStatsPerPipeline().get("_id2").getIngestCount(), equalTo(1L)); - assertThat(afterSecondRequestStats.getTotalStats().getIngestCount(), equalTo(2L)); + assertThat(afterSecondRequestStats.getPipelineStats().size(), equalTo(2)); + //total + assertStats(afterSecondRequestStats.getTotalStats(), 2, 0 ,0); + //pipeline + assertPipelineStats(afterSecondRequestStats.getPipelineStats(), "_id1", 1, 0, 0); + assertPipelineStats(afterSecondRequestStats.getPipelineStats(), "_id2", 1, 0, 0); + //processor + assertProcessorStats(0, afterSecondRequestStats, "_id1", 1, 0, 0); + assertProcessorStats(0, afterSecondRequestStats, "_id2", 1, 0, 0); //update cluster state and ensure that new stats are added to old stats putRequest = new PutPipelineRequest("_id1", @@ -800,13 +822,66 @@ public void testStats() { indexRequest.setPipeline("_id1"); ingestService.executeBulkRequest(Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {}); final IngestStats afterThirdRequestStats = ingestService.stats(); - assertThat(afterThirdRequestStats.getStatsPerPipeline().size(), equalTo(2)); - assertThat(afterThirdRequestStats.getStatsPerPipeline().get("_id1").getIngestCount(), equalTo(2L)); - assertThat(afterThirdRequestStats.getStatsPerPipeline().get("_id2").getIngestCount(), equalTo(1L)); - assertThat(afterThirdRequestStats.getTotalStats().getIngestCount(), equalTo(3L)); + assertThat(afterThirdRequestStats.getPipelineStats().size(), equalTo(2)); + //total + assertStats(afterThirdRequestStats.getTotalStats(), 3, 0 ,0); + //pipeline + assertPipelineStats(afterThirdRequestStats.getPipelineStats(), "_id1", 2, 0, 0); + assertPipelineStats(afterThirdRequestStats.getPipelineStats(), "_id2", 1, 0, 0); + //The number of processors for the "id1" pipeline changed, so the per-processor metrics are not carried forward. This is + //due to the parallel array's used to identify which metrics to carry forward. With out unique ids or semantic equals for each + //processor, parallel arrays are the best option for of carrying forward metrics between pipeline changes. However, in some cases, + //like this one it may not readily obvious why the metrics were not carried forward. + assertProcessorStats(0, afterThirdRequestStats, "_id1", 1, 0, 0); + assertProcessorStats(1, afterThirdRequestStats, "_id1", 1, 0, 0); + assertProcessorStats(0, afterThirdRequestStats, "_id2", 1, 0, 0); + + //test a failure, and that the processor stats are added from the old stats + putRequest = new PutPipelineRequest("_id1", + new BytesArray("{\"processors\": [{\"failure-mock\" : { \"on_failure\": [{\"mock\" : {}}]}}, {\"mock\" : {}}]}"), + XContentType.JSON); + previousClusterState = clusterState; + clusterState = IngestService.innerPut(putRequest, clusterState); + ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState)); + indexRequest.setPipeline("_id1"); + ingestService.executeBulkRequest(Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {}); + final IngestStats afterForthRequestStats = ingestService.stats(); + assertThat(afterForthRequestStats.getPipelineStats().size(), equalTo(2)); + //total + assertStats(afterForthRequestStats.getTotalStats(), 4, 0 ,0); + //pipeline + assertPipelineStats(afterForthRequestStats.getPipelineStats(), "_id1", 3, 0, 0); + assertPipelineStats(afterForthRequestStats.getPipelineStats(), "_id2", 1, 0, 0); + //processor + assertProcessorStats(0, afterForthRequestStats, "_id1", 1, 1, 0); //not carried forward since type changed + assertProcessorStats(1, afterForthRequestStats, "_id1", 2, 0, 0); //carried forward and added from old stats + assertProcessorStats(0, afterForthRequestStats, "_id2", 1, 0, 0); + } + public void testStatName(){ + Processor processor = mock(Processor.class); + String name = randomAlphaOfLength(10); + when(processor.getType()).thenReturn(name); + assertThat(IngestService.getProcessorName(processor), equalTo(name)); + String tag = randomAlphaOfLength(10); + when(processor.getTag()).thenReturn(tag); + assertThat(IngestService.getProcessorName(processor), equalTo(name + ":" + tag)); + + ConditionalProcessor conditionalProcessor = mock(ConditionalProcessor.class); + when(conditionalProcessor.getProcessor()).thenReturn(processor); + assertThat(IngestService.getProcessorName(conditionalProcessor), equalTo(name + ":" + tag)); + + PipelineProcessor pipelineProcessor = mock(PipelineProcessor.class); + String pipelineName = randomAlphaOfLength(10); + when(pipelineProcessor.getPipelineName()).thenReturn(pipelineName); + name = PipelineProcessor.TYPE; + when(pipelineProcessor.getType()).thenReturn(name); + assertThat(IngestService.getProcessorName(pipelineProcessor), equalTo(name + ":" + pipelineName)); + when(pipelineProcessor.getTag()).thenReturn(tag); + assertThat(IngestService.getProcessorName(pipelineProcessor), equalTo(name + ":" + pipelineName + ":" + tag)); } + public void testExecuteWithDrop() { Map factories = new HashMap<>(); factories.put("drop", new DropProcessor.Factory()); @@ -935,4 +1010,23 @@ public boolean matches(Object o) { return false; } } + + private void assertProcessorStats(int processor, IngestStats stats, String pipelineId, long count, long failed, long time) { + assertStats(stats.getProcessorStats().get(pipelineId).get(processor).getStats(), count, failed, time); + } + + private void assertPipelineStats(List pipelineStats, String pipelineId, long count, long failed, long time) { + assertStats(getPipelineStats(pipelineStats, pipelineId), count, failed, time); + } + + private void assertStats(IngestStats.Stats stats, long count, long failed, long time) { + assertThat(stats.getIngestCount(), equalTo(count)); + assertThat(stats.getIngestCurrent(), equalTo(0L)); + assertThat(stats.getIngestFailedCount(), equalTo(failed)); + assertThat(stats.getIngestTimeInMillis(), greaterThanOrEqualTo(time)); + } + + private IngestStats.Stats getPipelineStats(List pipelineStats, String id) { + return pipelineStats.stream().filter(p1 -> p1.getPipelineId().equals(id)).findFirst().map(p2 -> p2.getStats()).orElse(null); + } } diff --git a/server/src/test/java/org/elasticsearch/ingest/IngestStatsTests.java b/server/src/test/java/org/elasticsearch/ingest/IngestStatsTests.java index 9974dd568a8c7..3d39faf9a7447 100644 --- a/server/src/test/java/org/elasticsearch/ingest/IngestStatsTests.java +++ b/server/src/test/java/org/elasticsearch/ingest/IngestStatsTests.java @@ -19,44 +19,75 @@ package org.elasticsearch.ingest; +import org.elasticsearch.common.collect.MapBuilder; import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.test.ESTestCase; import java.io.IOException; import java.util.Collections; +import java.util.Iterator; +import java.util.List; import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.Stream; + public class IngestStatsTests extends ESTestCase { public void testSerialization() throws IOException { - IngestStats.Stats total = new IngestStats.Stats(5, 10, 20, 30); - IngestStats.Stats foo = new IngestStats.Stats(50, 100, 200, 300); - IngestStats ingestStats = new IngestStats(total, Collections.singletonMap("foo", foo)); - IngestStats serialize = serialize(ingestStats); - assertNotSame(serialize, ingestStats); - assertNotSame(serialize.getTotalStats(), total); - assertEquals(total.getIngestCount(), serialize.getTotalStats().getIngestCount()); - assertEquals(total.getIngestFailedCount(), serialize.getTotalStats().getIngestFailedCount()); - assertEquals(total.getIngestTimeInMillis(), serialize.getTotalStats().getIngestTimeInMillis()); - assertEquals(total.getIngestCurrent(), serialize.getTotalStats().getIngestCurrent()); + //total + IngestStats.Stats totalStats = new IngestStats.Stats(50, 100, 200, 300); + //pipeline + IngestStats.PipelineStat pipeline1Stats = new IngestStats.PipelineStat("pipeline1", new IngestStats.Stats(3, 3, 3, 3)); + IngestStats.PipelineStat pipeline2Stats = new IngestStats.PipelineStat("pipeline2", new IngestStats.Stats(47, 97, 197, 297)); + IngestStats.PipelineStat pipeline3Stats = new IngestStats.PipelineStat("pipeline3", new IngestStats.Stats(0, 0, 0, 0)); + List pipelineStats = + Stream.of(pipeline1Stats, pipeline2Stats, pipeline3Stats).collect(Collectors.toList()); + //processor + IngestStats.ProcessorStat processor1Stat = new IngestStats.ProcessorStat("processor1", new IngestStats.Stats(1, 1, 1, 1)); + IngestStats.ProcessorStat processor2Stat = new IngestStats.ProcessorStat("processor2", new IngestStats.Stats(2, 2, 2, 2)); + IngestStats.ProcessorStat processor3Stat = new IngestStats.ProcessorStat("processor3", new IngestStats.Stats(47, 97, 197, 297)); + //pipeline1 -> processor1,processor2; pipeline2 -> processor3 + Map> processorStats = MapBuilder.>newMapBuilder() + .put(pipeline1Stats.getPipelineId(), Stream.of(processor1Stat, processor2Stat).collect(Collectors.toList())) + .put(pipeline2Stats.getPipelineId(), Collections.singletonList(processor3Stat)) + .map(); + + IngestStats ingestStats = new IngestStats(totalStats,pipelineStats, processorStats); + + IngestStats serializedStats = serialize(ingestStats); + assertNotSame(ingestStats, serializedStats); + assertNotSame(totalStats, serializedStats.getTotalStats()); + assertNotSame(pipelineStats, serializedStats.getPipelineStats()); + assertNotSame(processorStats, serializedStats.getProcessorStats()); - assertEquals(ingestStats.getStatsPerPipeline().size(), 1); - assertTrue(ingestStats.getStatsPerPipeline().containsKey("foo")); + assertStats(totalStats, serializedStats.getTotalStats()); + assertEquals(serializedStats.getPipelineStats().size(), 3); - Map left = ingestStats.getStatsPerPipeline(); - Map right = serialize.getStatsPerPipeline(); + for (IngestStats.PipelineStat serializedPipelineStat : serializedStats.getPipelineStats()) { + assertStats(getPipelineStats(pipelineStats, serializedPipelineStat.getPipelineId()), serializedPipelineStat.getStats()); + List serializedProcessorStats = + serializedStats.getProcessorStats().get(serializedPipelineStat.getPipelineId()); + List processorStat = ingestStats.getProcessorStats().get(serializedPipelineStat.getPipelineId()); + if(processorStat != null) { + Iterator it = processorStat.iterator(); + //intentionally enforcing the identical ordering + for (IngestStats.ProcessorStat serializedProcessorStat : serializedProcessorStats) { + IngestStats.ProcessorStat ps = it.next(); + assertEquals(ps.getName(), serializedProcessorStat.getName()); + assertStats(ps.getStats(), serializedProcessorStat.getStats()); + } + assertFalse(it.hasNext()); + } + } + } - assertEquals(right.size(), 1); - assertTrue(right.containsKey("foo")); - assertEquals(left.size(), 1); - assertTrue(left.containsKey("foo")); - IngestStats.Stats leftStats = left.get("foo"); - IngestStats.Stats rightStats = right.get("foo"); - assertEquals(leftStats.getIngestCount(), rightStats.getIngestCount()); - assertEquals(leftStats.getIngestFailedCount(), rightStats.getIngestFailedCount()); - assertEquals(leftStats.getIngestTimeInMillis(), rightStats.getIngestTimeInMillis()); - assertEquals(leftStats.getIngestCurrent(), rightStats.getIngestCurrent()); + private void assertStats(IngestStats.Stats fromObject, IngestStats.Stats fromStream) { + assertEquals(fromObject.getIngestCount(), fromStream.getIngestCount()); + assertEquals(fromObject.getIngestFailedCount(), fromStream.getIngestFailedCount()); + assertEquals(fromObject.getIngestTimeInMillis(), fromStream.getIngestTimeInMillis()); + assertEquals(fromObject.getIngestCurrent(), fromStream.getIngestCurrent()); } private IngestStats serialize(IngestStats stats) throws IOException { @@ -65,4 +96,8 @@ private IngestStats serialize(IngestStats stats) throws IOException { StreamInput in = out.bytes().streamInput(); return new IngestStats(in); } + + private IngestStats.Stats getPipelineStats(List pipelineStats, String id) { + return pipelineStats.stream().filter(p1 -> p1.getPipelineId().equals(id)).findFirst().map(p2 -> p2.getStats()).orElse(null); + } } diff --git a/server/src/test/java/org/elasticsearch/ingest/PipelineProcessorTests.java b/server/src/test/java/org/elasticsearch/ingest/PipelineProcessorTests.java index 018ded346d4fc..eea0f03fa647f 100644 --- a/server/src/test/java/org/elasticsearch/ingest/PipelineProcessorTests.java +++ b/server/src/test/java/org/elasticsearch/ingest/PipelineProcessorTests.java @@ -21,12 +21,13 @@ import org.elasticsearch.ElasticsearchException; import org.elasticsearch.test.ESTestCase; -import java.time.Clock; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.function.LongSupplier; import static org.hamcrest.CoreMatchers.equalTo; import static org.mockito.Mockito.mock; @@ -143,15 +144,15 @@ public void testPipelineProcessorWithPipelineChain() throws Exception { pipeline2ProcessorConfig.put("pipeline", pipeline3Id); PipelineProcessor pipeline2Processor = factory.create(Collections.emptyMap(), null, pipeline2ProcessorConfig); - Clock clock = mock(Clock.class); - when(clock.millis()).thenReturn(0L).thenReturn(0L); + LongSupplier relativeTimeProvider = mock(LongSupplier.class); + when(relativeTimeProvider.getAsLong()).thenReturn(0L); Pipeline pipeline1 = new Pipeline( - pipeline1Id, null, null, new CompoundProcessor(pipeline1Processor), clock + pipeline1Id, null, null, new CompoundProcessor(pipeline1Processor), relativeTimeProvider ); String key1 = randomAlphaOfLength(10); - clock = mock(Clock.class); - when(clock.millis()).thenReturn(0L).thenReturn(3L); + relativeTimeProvider = mock(LongSupplier.class); + when(relativeTimeProvider.getAsLong()).thenReturn(0L, TimeUnit.MILLISECONDS.toNanos(3)); Pipeline pipeline2 = new Pipeline( pipeline2Id, null, null, new CompoundProcessor(true, Arrays.asList( @@ -160,15 +161,15 @@ pipeline2Id, null, null, new CompoundProcessor(true, }), pipeline2Processor), Collections.emptyList()), - clock + relativeTimeProvider ); - clock = mock(Clock.class); - when(clock.millis()).thenReturn(0L).thenReturn(2L); + relativeTimeProvider = mock(LongSupplier.class); + when(relativeTimeProvider.getAsLong()).thenReturn(0L, TimeUnit.MILLISECONDS.toNanos(2)); Pipeline pipeline3 = new Pipeline( pipeline3Id, null, null, new CompoundProcessor( new TestProcessor(ingestDocument -> { throw new RuntimeException("error"); - })), clock + })), relativeTimeProvider ); when(ingestService.getPipeline(pipeline1Id)).thenReturn(pipeline1); when(ingestService.getPipeline(pipeline2Id)).thenReturn(pipeline2); From dbc6a77650b6e319732f2c6932b0dc4e863424aa Mon Sep 17 00:00:00 2001 From: Jake Landis Date: Sun, 21 Oct 2018 14:09:47 -0500 Subject: [PATCH 2/3] fix bwc issue --- .../org/elasticsearch/ingest/IngestStats.java | 35 +++++++++++-------- 1 file changed, 20 insertions(+), 15 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/ingest/IngestStats.java b/server/src/main/java/org/elasticsearch/ingest/IngestStats.java index e3d671bc8b2a0..f14697353699a 100644 --- a/server/src/main/java/org/elasticsearch/ingest/IngestStats.java +++ b/server/src/main/java/org/elasticsearch/ingest/IngestStats.java @@ -19,6 +19,7 @@ package org.elasticsearch.ingest; +import org.elasticsearch.Version; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Writeable; @@ -63,14 +64,16 @@ public IngestStats(StreamInput in) throws IOException { String pipelineId = in.readString(); Stats pipelineStat = new Stats(in); this.pipelineStats.add(new PipelineStat(pipelineId, pipelineStat)); - int processorsSize = in.readVInt(); - List processorStatsPerPipeline = new ArrayList<>(processorsSize); - for (int j = 0; j < processorsSize; j++) { - String processorName = in.readString(); - Stats processorStat = new Stats(in); - processorStatsPerPipeline.add(new ProcessorStat(processorName, processorStat)); + if (in.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) { + int processorsSize = in.readVInt(); + List processorStatsPerPipeline = new ArrayList<>(processorsSize); + for (int j = 0; j < processorsSize; j++) { + String processorName = in.readString(); + Stats processorStat = new Stats(in); + processorStatsPerPipeline.add(new ProcessorStat(processorName, processorStat)); + } + this.processorStats.put(pipelineId, processorStatsPerPipeline); } - this.processorStats.put(pipelineId, processorStatsPerPipeline); } } @@ -81,14 +84,16 @@ public void writeTo(StreamOutput out) throws IOException { for (PipelineStat pipelineStat : pipelineStats) { out.writeString(pipelineStat.getPipelineId()); pipelineStat.getStats().writeTo(out); - List processorStatsForPipeline = processorStats.get(pipelineStat.getPipelineId()); - if(processorStatsForPipeline == null) { - out.writeVInt(0); - }else{ - out.writeVInt(processorStatsForPipeline.size()); - for (ProcessorStat processorStat : processorStatsForPipeline) { - out.writeString(processorStat.getName()); - processorStat.getStats().writeTo(out); + if (out.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) { + List processorStatsForPipeline = processorStats.get(pipelineStat.getPipelineId()); + if (processorStatsForPipeline == null) { + out.writeVInt(0); + } else { + out.writeVInt(processorStatsForPipeline.size()); + for (ProcessorStat processorStat : processorStatsForPipeline) { + out.writeString(processorStat.getName()); + processorStat.getStats().writeTo(out); + } } } } From 27c70774c2555a7909db85e8d4a44f52d5db0c94 Mon Sep 17 00:00:00 2001 From: Jake Landis Date: Mon, 22 Oct 2018 19:29:10 -0500 Subject: [PATCH 3/3] ingest: disable bwc tests, add bwc for 6.5 with tests --- build.gradle | 4 +- .../org/elasticsearch/ingest/IngestStats.java | 4 +- .../ingest/IngestStatsTests.java | 101 ++++++++++++------ 3 files changed, 72 insertions(+), 37 deletions(-) diff --git a/build.gradle b/build.gradle index a2b79d31bad7e..c228a1f8a621d 100644 --- a/build.gradle +++ b/build.gradle @@ -170,8 +170,8 @@ task verifyVersions { * the enabled state of every bwc task. It should be set back to true * after the backport of the backcompat code is complete. */ -final boolean bwc_tests_enabled = true -final String bwc_tests_disabled_issue = "" /* place a PR link here when committing bwc changes */ +final boolean bwc_tests_enabled = false +final String bwc_tests_disabled_issue = "https://github.com/elastic/elasticsearch/pull/34724" /* place a PR link here when committing bwc changes */ if (bwc_tests_enabled == false) { if (bwc_tests_disabled_issue.isEmpty()) { throw new GradleException("bwc_tests_disabled_issue must be set when bwc_tests_enabled == false") diff --git a/server/src/main/java/org/elasticsearch/ingest/IngestStats.java b/server/src/main/java/org/elasticsearch/ingest/IngestStats.java index f14697353699a..7a24cf3ee895e 100644 --- a/server/src/main/java/org/elasticsearch/ingest/IngestStats.java +++ b/server/src/main/java/org/elasticsearch/ingest/IngestStats.java @@ -64,7 +64,7 @@ public IngestStats(StreamInput in) throws IOException { String pipelineId = in.readString(); Stats pipelineStat = new Stats(in); this.pipelineStats.add(new PipelineStat(pipelineId, pipelineStat)); - if (in.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) { + if (in.getVersion().onOrAfter(Version.V_6_5_0)) { int processorsSize = in.readVInt(); List processorStatsPerPipeline = new ArrayList<>(processorsSize); for (int j = 0; j < processorsSize; j++) { @@ -84,7 +84,7 @@ public void writeTo(StreamOutput out) throws IOException { for (PipelineStat pipelineStat : pipelineStats) { out.writeString(pipelineStat.getPipelineId()); pipelineStat.getStats().writeTo(out); - if (out.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) { + if (out.getVersion().onOrAfter(Version.V_6_5_0)) { List processorStatsForPipeline = processorStats.get(pipelineStat.getPipelineId()); if (processorStatsForPipeline == null) { out.writeVInt(0); diff --git a/server/src/test/java/org/elasticsearch/ingest/IngestStatsTests.java b/server/src/test/java/org/elasticsearch/ingest/IngestStatsTests.java index 3d39faf9a7447..04bfcbb92b8e9 100644 --- a/server/src/test/java/org/elasticsearch/ingest/IngestStatsTests.java +++ b/server/src/test/java/org/elasticsearch/ingest/IngestStatsTests.java @@ -19,10 +19,12 @@ package org.elasticsearch.ingest; +import org.elasticsearch.Version; import org.elasticsearch.common.collect.MapBuilder; import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.test.VersionUtils; import java.io.IOException; import java.util.Collections; @@ -32,57 +34,97 @@ import java.util.stream.Collectors; import java.util.stream.Stream; - public class IngestStatsTests extends ESTestCase { public void testSerialization() throws IOException { - //total IngestStats.Stats totalStats = new IngestStats.Stats(50, 100, 200, 300); - //pipeline + List pipelineStats = createPipelineStats(); + Map> processorStats = createProcessorStats(pipelineStats); + IngestStats ingestStats = new IngestStats(totalStats, pipelineStats, processorStats); + IngestStats serializedStats = serialize(ingestStats); + assertIngestStats(ingestStats, serializedStats, true); + } + + public void testReadLegacyStream() throws IOException { + IngestStats.Stats totalStats = new IngestStats.Stats(50, 100, 200, 300); + List pipelineStats = createPipelineStats(); + + //legacy output logic + BytesStreamOutput out = new BytesStreamOutput(); + out.setVersion(VersionUtils.getPreviousVersion(Version.V_6_5_0)); + totalStats.writeTo(out); + out.writeVInt(pipelineStats.size()); + for (IngestStats.PipelineStat pipelineStat : pipelineStats) { + out.writeString(pipelineStat.getPipelineId()); + pipelineStat.getStats().writeTo(out); + } + + StreamInput in = out.bytes().streamInput(); + in.setVersion(VersionUtils.getPreviousVersion(Version.V_6_5_0)); + IngestStats serializedStats = new IngestStats(in); + IngestStats expectedStats = new IngestStats(totalStats, pipelineStats, Collections.emptyMap()); + assertIngestStats(expectedStats, serializedStats, false); + } + + private List createPipelineStats() { IngestStats.PipelineStat pipeline1Stats = new IngestStats.PipelineStat("pipeline1", new IngestStats.Stats(3, 3, 3, 3)); IngestStats.PipelineStat pipeline2Stats = new IngestStats.PipelineStat("pipeline2", new IngestStats.Stats(47, 97, 197, 297)); IngestStats.PipelineStat pipeline3Stats = new IngestStats.PipelineStat("pipeline3", new IngestStats.Stats(0, 0, 0, 0)); - List pipelineStats = - Stream.of(pipeline1Stats, pipeline2Stats, pipeline3Stats).collect(Collectors.toList()); - //processor + return Stream.of(pipeline1Stats, pipeline2Stats, pipeline3Stats).collect(Collectors.toList()); + } + + private Map> createProcessorStats(List pipelineStats){ + assert(pipelineStats.size() >= 2); IngestStats.ProcessorStat processor1Stat = new IngestStats.ProcessorStat("processor1", new IngestStats.Stats(1, 1, 1, 1)); IngestStats.ProcessorStat processor2Stat = new IngestStats.ProcessorStat("processor2", new IngestStats.Stats(2, 2, 2, 2)); IngestStats.ProcessorStat processor3Stat = new IngestStats.ProcessorStat("processor3", new IngestStats.Stats(47, 97, 197, 297)); //pipeline1 -> processor1,processor2; pipeline2 -> processor3 - Map> processorStats = MapBuilder.>newMapBuilder() - .put(pipeline1Stats.getPipelineId(), Stream.of(processor1Stat, processor2Stat).collect(Collectors.toList())) - .put(pipeline2Stats.getPipelineId(), Collections.singletonList(processor3Stat)) + return MapBuilder.>newMapBuilder() + .put(pipelineStats.get(0).getPipelineId(), Stream.of(processor1Stat, processor2Stat).collect(Collectors.toList())) + .put(pipelineStats.get(1).getPipelineId(), Collections.singletonList(processor3Stat)) .map(); + } - IngestStats ingestStats = new IngestStats(totalStats,pipelineStats, processorStats); + private IngestStats serialize(IngestStats stats) throws IOException { + BytesStreamOutput out = new BytesStreamOutput(); + stats.writeTo(out); + StreamInput in = out.bytes().streamInput(); + return new IngestStats(in); + } - IngestStats serializedStats = serialize(ingestStats); + private void assertIngestStats(IngestStats ingestStats, IngestStats serializedStats, boolean expectProcessors){ assertNotSame(ingestStats, serializedStats); - assertNotSame(totalStats, serializedStats.getTotalStats()); - assertNotSame(pipelineStats, serializedStats.getPipelineStats()); - assertNotSame(processorStats, serializedStats.getProcessorStats()); + assertNotSame(ingestStats.getTotalStats(), serializedStats.getTotalStats()); + assertNotSame(ingestStats.getPipelineStats(), serializedStats.getPipelineStats()); + assertNotSame(ingestStats.getProcessorStats(), serializedStats.getProcessorStats()); - assertStats(totalStats, serializedStats.getTotalStats()); - assertEquals(serializedStats.getPipelineStats().size(), 3); + assertStats(ingestStats.getTotalStats(), serializedStats.getTotalStats()); + assertEquals(ingestStats.getPipelineStats().size(), serializedStats.getPipelineStats().size()); for (IngestStats.PipelineStat serializedPipelineStat : serializedStats.getPipelineStats()) { - assertStats(getPipelineStats(pipelineStats, serializedPipelineStat.getPipelineId()), serializedPipelineStat.getStats()); + assertStats(getPipelineStats(ingestStats.getPipelineStats(), serializedPipelineStat.getPipelineId()), + serializedPipelineStat.getStats()); List serializedProcessorStats = serializedStats.getProcessorStats().get(serializedPipelineStat.getPipelineId()); List processorStat = ingestStats.getProcessorStats().get(serializedPipelineStat.getPipelineId()); - if(processorStat != null) { - Iterator it = processorStat.iterator(); - //intentionally enforcing the identical ordering - for (IngestStats.ProcessorStat serializedProcessorStat : serializedProcessorStats) { - IngestStats.ProcessorStat ps = it.next(); - assertEquals(ps.getName(), serializedProcessorStat.getName()); - assertStats(ps.getStats(), serializedProcessorStat.getStats()); + if(expectProcessors) { + if (processorStat != null) { + Iterator it = processorStat.iterator(); + //intentionally enforcing the identical ordering + for (IngestStats.ProcessorStat serializedProcessorStat : serializedProcessorStats) { + IngestStats.ProcessorStat ps = it.next(); + assertEquals(ps.getName(), serializedProcessorStat.getName()); + assertStats(ps.getStats(), serializedProcessorStat.getStats()); + } + assertFalse(it.hasNext()); } - assertFalse(it.hasNext()); + }else{ + //pre 6.5 did not serialize any processor stats + assertNull(serializedProcessorStats); } } - } + } private void assertStats(IngestStats.Stats fromObject, IngestStats.Stats fromStream) { assertEquals(fromObject.getIngestCount(), fromStream.getIngestCount()); assertEquals(fromObject.getIngestFailedCount(), fromStream.getIngestFailedCount()); @@ -90,13 +132,6 @@ private void assertStats(IngestStats.Stats fromObject, IngestStats.Stats fromStr assertEquals(fromObject.getIngestCurrent(), fromStream.getIngestCurrent()); } - private IngestStats serialize(IngestStats stats) throws IOException { - BytesStreamOutput out = new BytesStreamOutput(); - stats.writeTo(out); - StreamInput in = out.bytes().streamInput(); - return new IngestStats(in); - } - private IngestStats.Stats getPipelineStats(List pipelineStats, String id) { return pipelineStats.stream().filter(p1 -> p1.getPipelineId().equals(id)).findFirst().map(p2 -> p2.getStats()).orElse(null); }