From 9459eeffc680686522d039376aecd5b4e46a1d83 Mon Sep 17 00:00:00 2001 From: ajo thomas Date: Tue, 2 Nov 2021 09:41:19 -0700 Subject: [PATCH] Adding percentiles to distribution metric (#16) --- .../beam/gradle/BeamModulePlugin.groovy | 1 + runners/core-java/build.gradle | 1 + .../beam/runners/core/ReduceFnRunner.java | 1 - .../core/metrics/DistributionCell.java | 28 ++- .../core/metrics/DistributionData.java | 221 ++++++++++++++++-- .../core/metrics/DistributionMetricKey.java | 38 +++ .../core/metrics/MetricsContainerImpl.java | 71 ++++-- .../core/metrics/DistributionCellTest.java | 90 ++++--- .../metrics/MetricsContainerImplTest.java | 49 ++-- .../metrics/MetricsContainerStepMapTest.java | 137 ++++++++--- .../beam/runners/direct/DirectMetrics.java | 4 +- .../worker/DeltaDistributionCell.java | 11 +- .../runners/jet/metrics/DistributionImpl.java | 9 +- .../runners/jet/metrics/JetMetricResults.java | 3 +- .../samza/metrics/SamzaMetricsContainer.java | 47 +++- .../sdk/metrics/DelegatingDistribution.java | 11 - .../apache/beam/sdk/metrics/Distribution.java | 2 - .../beam/sdk/metrics/DistributionResult.java | 13 +- .../org/apache/beam/sdk/metrics/Metrics.java | 60 ++++- .../beam/sdk/metrics/MetricsContainer.java | 12 + .../apache/beam/sdk/metrics/MetricsTest.java | 54 ++++- .../beam/sdk/io/gcp/firestore/RpcQosImpl.java | 3 - .../firestore/BaseFirestoreV1WriteFnTest.java | 5 - 23 files changed, 689 insertions(+), 182 deletions(-) create mode 100644 runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/DistributionMetricKey.java diff --git a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy index 946bf9739a011..7155a8ff68f11 100644 --- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy +++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy @@ -546,6 +546,7 @@ class BeamModulePlugin implements Plugin { commons_io : "commons-io:commons-io:2.6", commons_lang3 : "org.apache.commons:commons-lang3:3.9", commons_math3 : "org.apache.commons:commons-math3:3.6.1", + datasketches : "org.apache.datasketches:datasketches-java:3.0.0", error_prone_annotations : "com.google.errorprone:error_prone_annotations:$errorprone_version", flogger_system_backend : "com.google.flogger:flogger-system-backend:0.6", gax : "com.google.api:gax", // google_cloud_platform_libraries_bom sets version diff --git a/runners/core-java/build.gradle b/runners/core-java/build.gradle index e8381a603e3a3..499d0c588dcaf 100644 --- a/runners/core-java/build.gradle +++ b/runners/core-java/build.gradle @@ -38,6 +38,7 @@ dependencies { compile project(path: ":model:job-management", configuration: "shadow") compile project(":runners:core-construction-java") compile project(":sdks:java:fn-execution") + compile library.java.datasketches compile library.java.vendored_guava_26_0_jre compile library.java.joda_time compile library.java.vendored_grpc_1_36_0 diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java index bcc556a2a0946..506a8d2ff200b 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java @@ -32,7 +32,6 @@ import org.apache.beam.runners.core.ReduceFnContextFactory.StateStyle; import org.apache.beam.runners.core.StateNamespaces.WindowNamespace; import org.apache.beam.runners.core.TimerInternals.TimerData; -import org.apache.beam.runners.core.metrics.MetricsContainerImpl; import org.apache.beam.runners.core.triggers.ExecutableTriggerStateMachine; import org.apache.beam.runners.core.triggers.TriggerStateMachineContextFactory; import org.apache.beam.runners.core.triggers.TriggerStateMachineRunner; diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/DistributionCell.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/DistributionCell.java index 5cfa414db0325..1e44e6b9f9052 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/DistributionCell.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/DistributionCell.java @@ -34,9 +34,8 @@ */ public class DistributionCell implements Distribution, MetricCell { - private final DirtyState dirty = new DirtyState(); - private final AtomicReference value = - new AtomicReference<>(DistributionData.EMPTY); + private final DirtyState dirty; + private final AtomicReference value; private final MetricName name; /** @@ -45,31 +44,36 @@ public class DistributionCell implements Distribution, MetricCell(DistributionData.empty()); this.name = name; } + public DistributionCell(DistributionMetricKey metricKey) { + this.dirty = new DirtyState(); + this.value = + new AtomicReference<>(DistributionData.withPercentiles(metricKey.getPercentiles())); + this.name = metricKey.getMetricName(); + } + @Override public void reset() { dirty.afterModification(); - value.set(DistributionData.EMPTY); + value.get().reset(); } /** Increment the distribution by the given amount. */ @Override public void update(long n) { - update(DistributionData.singleton(n)); - } - - @Override - public void update(long sum, long count, long min, long max) { - update(DistributionData.create(sum, count, min, max)); + value.get().update(n); + dirty.afterModification(); } - void update(DistributionData data) { + void update(DistributionData other) { DistributionData original; do { original = value.get(); - } while (!value.compareAndSet(original, original.combine(data))); + } while (!value.compareAndSet(original, original.combine(other))); dirty.afterModification(); } diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/DistributionData.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/DistributionData.java index 085d171eacbb5..adf963b7c2d97 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/DistributionData.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/DistributionData.java @@ -17,48 +17,231 @@ */ package org.apache.beam.runners.core.metrics; -import com.google.auto.value.AutoValue; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; import java.io.Serializable; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; import org.apache.beam.sdk.metrics.DistributionResult; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet; +import org.apache.datasketches.memory.Memory; +import org.apache.datasketches.quantiles.DoublesSketchBuilder; +import org.apache.datasketches.quantiles.DoublesUnion; +import org.apache.datasketches.quantiles.DoublesUnionBuilder; +import org.apache.datasketches.quantiles.UpdateDoublesSketch; /** * Data describing the the distribution. This should retain enough detail that it can be combined * with other {@link DistributionData}. * + *

Datasketch library is used to compute percentiles. See {@linktourl + * https://datasketches.apache.org/}. + * *

This is kept distinct from {@link DistributionResult} since this may be extended to include * data necessary to approximate quantiles, etc. while {@link DistributionResult} would just include * the approximate value of those quantiles. */ -@AutoValue -public abstract class DistributionData implements Serializable { - - public abstract long sum(); - - public abstract long count(); +public class DistributionData implements Serializable { + // k = 256 should yield an approximate error ε of less than 1% + private static final int SKETCH_SUMMARY_SIZE = 256; - public abstract long min(); + private final Set percentiles; + private long sum; + private long count; + private long min; + private long max; + private transient Optional sketch; - public abstract long max(); + /** Creates an instance of DistributionData with custom percentiles. */ + public static DistributionData withPercentiles(Set percentiles) { + return new DistributionData(0L, 0L, Long.MAX_VALUE, Long.MIN_VALUE, percentiles); + } - public static final DistributionData EMPTY = create(0, 0, Long.MAX_VALUE, Long.MIN_VALUE); + /** Backward compatible static factory method. */ + public static DistributionData empty() { + return new DistributionData(0L, 0L, Long.MAX_VALUE, Long.MIN_VALUE, ImmutableSet.of()); + } + /** Static factory method primary used for testing. */ + @VisibleForTesting public static DistributionData create(long sum, long count, long min, long max) { - return new AutoValue_DistributionData(sum, count, min, max); + return new DistributionData(sum, count, min, max, ImmutableSet.of()); + } + + private DistributionData(long sum, long count, long min, long max, Set percentiles) { + this.sum = sum; + this.count = count; + this.min = min; + this.max = max; + this.percentiles = percentiles; + if (!percentiles.isEmpty()) { + final DoublesSketchBuilder doublesSketchBuilder = new DoublesSketchBuilder(); + this.sketch = Optional.of(doublesSketchBuilder.setK(SKETCH_SUMMARY_SIZE).build()); + } else { + this.sketch = Optional.empty(); + } } public static DistributionData singleton(long value) { - return create(value, 1, value, value); + final DistributionData distributionData = empty(); + distributionData.update(value); + return distributionData; + } + + //////////////////////////////////////////////////////////////////////////////////////////////////////// + // Getters + + public long sum() { + return sum; + } + + public long count() { + return count; + } + + public long min() { + return min; } - public DistributionData combine(DistributionData value) { - return create( - sum() + value.sum(), - count() + value.count(), - Math.min(value.min(), min()), - Math.max(value.max(), max())); + public long max() { + return max; + } + + /** Gets the percentiles and the percentiles values as a map. */ + public Map percentiles() { + if (!sketch.isPresent() || sketch.get().getN() == 0) { + // if the sketch is not present or is empty, do not compute the percentile + return ImmutableMap.of(); + } + + double[] quantiles = percentiles.stream().mapToDouble(i -> i / 100).toArray(); + double[] quantileResults = sketch.get().getQuantiles(quantiles); + + final ImmutableMap.Builder resultBuilder = ImmutableMap.builder(); + for (int k = 0; k < quantiles.length; k++) { + resultBuilder.put(quantiles[k] * 100, quantileResults[k]); + } + return resultBuilder.build(); } + //////////////////////////////////////////////////////////////////////////////////////////////////////// + + /** + * Updates the distribution with a value. For percentiles, only add the value to the sketch. + * Percentile will be computed prior to calling {@link DistributionCell#getCumulative()} or in + * {@link #extractResult()}. + * + * @param value value to update the distribution with. + */ + public void update(long value) { + ++count; + min = Math.min(min, value); + max = Math.max(max, value); + sum += value; + sketch.ifPresent(currSketch -> currSketch.update(value)); + } + + /** Merges two distributions. */ + public DistributionData combine(DistributionData other) { + if (sketch.isPresent() + && other.sketch.isPresent() + && sketch.get().getN() > 0 + && other.sketch.get().getN() > 0) { + final DoublesUnion union = new DoublesUnionBuilder().build(); + union.update(sketch.get()); + union.update(other.sketch.get()); + sketch = Optional.of(union.getResult()); + } else if (other.sketch.isPresent() && other.sketch.get().getN() > 0) { + sketch = other.sketch; + } + sum += other.sum; + count += other.count; + max = Math.max(max, other.max); + min = Math.min(min, other.min); + return this; + } + + public DistributionData reset() { + this.sum = 0L; + this.count = 0L; + this.min = Long.MAX_VALUE; + this.max = Long.MIN_VALUE; + if (!this.percentiles.isEmpty()) { + final DoublesSketchBuilder doublesSketchBuilder = new DoublesSketchBuilder(); + this.sketch = Optional.of(doublesSketchBuilder.setK(SKETCH_SUMMARY_SIZE).build()); + } else { + this.sketch = Optional.empty(); + } + return this; + } + + /** Generates DistributionResult from DistributionData. */ public DistributionResult extractResult() { - return DistributionResult.create(sum(), count(), min(), max()); + return DistributionResult.create(sum(), count(), min(), max(), percentiles()); + } + + @Override + public boolean equals(Object object) { + if (object instanceof DistributionData) { + DistributionData other = (DistributionData) object; + return Objects.equals(max, other.max()) + && Objects.equals(min, other.min()) + && Objects.equals(count, other.count()) + && Objects.equals(sum, other.sum()) + && Objects.equals(percentiles(), other.percentiles()); + } + return false; + } + + @Override + public int hashCode() { + return Objects.hash(min, max, sum, count, percentiles()); + } + + @Override + public String toString() { + return "DistributionData{" + + "sum=" + + sum + + ", " + + "count=" + + count + + ", " + + "min=" + + min + + ", " + + "max=" + + max + + ", " + + "percentiles=" + + percentiles() + + "}"; + } + + private void writeObject(ObjectOutputStream out) throws IOException { + out.defaultWriteObject(); + if (sketch.isPresent()) { + byte[] bytes = sketch.get().toByteArray(); + out.writeInt(bytes.length); + out.write(bytes); + } + } + + @SuppressWarnings("ResultOfMethodCallIgnored") + private void readObject(ObjectInputStream in) throws ClassNotFoundException, IOException { + in.defaultReadObject(); + if (!this.percentiles.isEmpty()) { + int len = in.readInt(); + byte[] bytes = new byte[len]; + in.read(bytes); + this.sketch = Optional.of(UpdateDoublesSketch.heapify(Memory.wrap(bytes))); + } else { + this.sketch = Optional.empty(); + } } } diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/DistributionMetricKey.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/DistributionMetricKey.java new file mode 100644 index 0000000000000..759d9e4f9f190 --- /dev/null +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/DistributionMetricKey.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.core.metrics; + +import com.google.auto.value.AutoValue; +import java.io.Serializable; +import java.util.Set; +import org.apache.beam.sdk.metrics.MetricName; + +/** + * Value class to represent Metric name and percentiles together. {@link MetricsContainerImpl} uses + * a map of this key and the Distribution Metric. + */ +@AutoValue +public abstract class DistributionMetricKey implements Serializable { + public abstract MetricName getMetricName(); + + public abstract Set getPercentiles(); + + public static DistributionMetricKey create(MetricName metricName, Set percentiles) { + return new AutoValue_DistributionMetricKey(metricName, percentiles); + } +} diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricsContainerImpl.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricsContainerImpl.java index 4d7c37abc3b7e..c425a7d2f2e90 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricsContainerImpl.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricsContainerImpl.java @@ -38,7 +38,6 @@ import java.util.function.Function; import org.apache.beam.model.pipeline.v1.MetricsApi.MonitoringInfo; import org.apache.beam.runners.core.metrics.MetricUpdates.MetricUpdate; -import org.apache.beam.sdk.metrics.Distribution; import org.apache.beam.sdk.metrics.Metric; import org.apache.beam.sdk.metrics.MetricKey; import org.apache.beam.sdk.metrics.MetricName; @@ -49,6 +48,7 @@ import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet; import org.checkerframework.checker.nullness.qual.Nullable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -78,7 +78,7 @@ public class MetricsContainerImpl implements Serializable, MetricsContainer { private MetricsMap counters = new MetricsMap<>(CounterCell::new); - private MetricsMap distributions = + private MetricsMap distributions = new MetricsMap<>(DistributionCell::new); private MetricsMap gauges = new MetricsMap<>(GaugeCell::new); @@ -152,7 +152,16 @@ public CounterCell getCounter(MetricName metricName) { */ @Override public DistributionCell getDistribution(MetricName metricName) { - return distributions.get(metricName); + return distributions.get(DistributionMetricKey.create(metricName, ImmutableSet.of())); + } + + /** + * Return a {@code DistributionCell} named {@code metricName} with a custom set of percentiles. If + * it doesn't exist, create a {@code Metric} with the specified name. + */ + @Override + public DistributionCell getDistribution(MetricName metricName, Set percentiles) { + return distributions.get(DistributionMetricKey.create(metricName, percentiles)); } /** @@ -160,7 +169,7 @@ public DistributionCell getDistribution(MetricName metricName) { * null}. */ public @Nullable DistributionCell tryGetDistribution(MetricName metricName) { - return distributions.tryGet(metricName); + return distributions.tryGet(DistributionMetricKey.create(metricName, ImmutableSet.of())); } /** @@ -210,13 +219,29 @@ ImmutableList> extractUpdates(MetricsMap> extractDistributionUpdates( + MetricsMap cells) { + ImmutableList.Builder> updates = ImmutableList.builder(); + for (Map.Entry cell : cells.entries()) { + if (cell.getValue().getDirty().beforeCommit()) { + updates.add( + MetricUpdate.create( + MetricKey.create(stepName, cell.getKey().getMetricName()), + cell.getValue().getCumulative())); + } + } + return updates.build(); + } + /** * Return the cumulative values for any metrics that have changed since the last time updates were * committed. */ public MetricUpdates getUpdates() { return MetricUpdates.create( - extractUpdates(counters), extractUpdates(distributions), extractUpdates(gauges)); + extractUpdates(counters), + extractDistributionUpdates(distributions), + extractUpdates(gauges)); } /** @return The MonitoringInfo metadata from the metric. */ @@ -361,13 +386,20 @@ private void commitUpdates(MetricsMap> cells } } + private void commitDistributionUpdates( + MetricsMap cells) { + for (DistributionCell cell : cells.values()) { + cell.getDirty().afterCommit(); + } + } + /** * Mark all of the updates that were retrieved with the latest call to {@link #getUpdates()} as * committed. */ public void commitUpdates() { commitUpdates(counters); - commitUpdates(distributions); + commitDistributionUpdates(distributions); commitUpdates(gauges); } @@ -381,6 +413,17 @@ ImmutableList> extractCumulatives(MetricsMap> extractCumulativesForDistributions( + MetricsMap cells) { + ImmutableList.Builder> updates = ImmutableList.builder(); + for (Map.Entry cell : cells.entries()) { + DistributionData update = checkNotNull(cell.getValue().getCumulative()); + updates.add( + MetricUpdate.create(MetricKey.create(stepName, cell.getKey().getMetricName()), update)); + } + return updates.build(); + } + /** * Return the {@link MetricUpdates} representing the cumulative values of all metrics in this * container. @@ -388,7 +431,7 @@ ImmutableList> extractCumulatives(MetricsMap current, - MetricsMap updates) { - for (Map.Entry counter : updates.entries()) { + MetricsMap current, + MetricsMap updates) { + for (Map.Entry counter : updates.entries()) { current.get(counter.getKey()).update(counter.getValue().getCumulative()); } } @@ -523,8 +566,8 @@ public String getCumulativeString(@Nullable Set allowedMetricUrns) { message.append(cell.getValue().getCumulative()); message.append("\n"); } - for (Map.Entry cell : distributions.entries()) { - if (!matchMetric(cell.getKey(), allowedMetricUrns)) { + for (Map.Entry cell : distributions.entries()) { + if (!matchMetric(cell.getKey().getMetricName(), allowedMetricUrns)) { continue; } message.append(cell.getKey().toString()); diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/DistributionCellTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/DistributionCellTest.java index faa8b5ebbc532..16ab777774c81 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/DistributionCellTest.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/DistributionCellTest.java @@ -17,10 +17,10 @@ */ package org.apache.beam.runners.core.metrics; -import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.Matchers.equalTo; - +import org.apache.beam.sdk.metrics.DistributionResult; import org.apache.beam.sdk.metrics.MetricName; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet; import org.junit.Assert; import org.junit.Test; import org.junit.runner.RunWith; @@ -29,35 +29,66 @@ /** Tests for {@link DistributionCell}. */ @RunWith(JUnit4.class) public class DistributionCellTest { - private DistributionCell cell = new DistributionCell(MetricName.named("hello", "world")); - @Test public void testDeltaAndCumulative() { + DistributionCell cell = + new DistributionCell( + DistributionMetricKey.create( + MetricName.named("hello", "world"), ImmutableSet.of(95.0D, 99.0D))); cell.update(5); cell.update(7); - assertThat(cell.getCumulative(), equalTo(DistributionData.create(12, 2, 5, 7))); - assertThat( - "getCumulative is idempotent", - cell.getCumulative(), - equalTo(DistributionData.create(12, 2, 5, 7))); - - assertThat(cell.getDirty().beforeCommit(), equalTo(true)); + ImmutableMap percentiles = + ImmutableMap.builder().put(95.0D, 7.0D).put(99.0D, 7.0D).build(); + Assert.assertEquals( + cell.getCumulative().extractResult(), DistributionResult.create(12, 2, 5, 7, percentiles)); + Assert.assertTrue(cell.getDirty().beforeCommit()); cell.getDirty().afterCommit(); - assertThat(cell.getDirty().beforeCommit(), equalTo(false)); + Assert.assertFalse(cell.getDirty().beforeCommit()); cell.update(30); - assertThat(cell.getCumulative(), equalTo(DistributionData.create(42, 3, 5, 30))); + ImmutableMap percentiles1 = + ImmutableMap.builder().put(95.0D, 30.0D).put(99.0D, 30.0D).build(); + Assert.assertEquals( + cell.getCumulative().extractResult(), + DistributionResult.create(42, 3, 5, 30, percentiles1)); + } - assertThat( - "Adding a new value made the cell dirty", cell.getDirty().beforeCommit(), equalTo(true)); + @Test + public void testDeltaAndCumulativeWithPercentilesAsEmpty() { + DistributionCell cell = + new DistributionCell( + DistributionMetricKey.create(MetricName.named("hello", "world"), ImmutableSet.of())); + cell.update(5); + cell.update(7); + Assert.assertEquals( + cell.getCumulative().extractResult(), + DistributionResult.create(12, 2, 5, 7, ImmutableMap.of())); + Assert.assertTrue(cell.getDirty().beforeCommit()); + cell.getDirty().afterCommit(); + Assert.assertFalse(cell.getDirty().beforeCommit()); } @Test public void testEquals() { - DistributionCell distributionCell = new DistributionCell(MetricName.named("namespace", "name")); - DistributionCell equal = new DistributionCell(MetricName.named("namespace", "name")); - Assert.assertEquals(distributionCell, equal); - Assert.assertEquals(distributionCell.hashCode(), equal.hashCode()); + DistributionCell cell11 = new DistributionCell(MetricName.named("hello", "world")); + DistributionCell cell21 = new DistributionCell(MetricName.named("hello", "world")); + cell11.update(5); + cell21.update(5); + Assert.assertEquals(cell11, cell21); + Assert.assertEquals(cell11.hashCode(), cell21.hashCode()); + + DistributionCell cell1 = + new DistributionCell( + DistributionMetricKey.create( + MetricName.named("hello", "world"), ImmutableSet.of(95.0D, 99.0D))); + cell1.update(5); + DistributionCell cell2 = + new DistributionCell( + DistributionMetricKey.create( + MetricName.named("hello", "world"), ImmutableSet.of(95.0D, 99.0D))); + cell2.update(5); + Assert.assertEquals(cell1, cell2); + Assert.assertEquals(cell1.hashCode(), cell2.hashCode()); } @Test @@ -72,7 +103,7 @@ public void testNotEquals() { Assert.assertNotEquals(distributionCell.hashCode(), differentDirty.hashCode()); DistributionCell differentValue = new DistributionCell(MetricName.named("namespace", "name")); - differentValue.update(DistributionData.create(1, 1, 1, 1)); + differentValue.update(1); Assert.assertNotEquals(distributionCell, differentValue); Assert.assertNotEquals(distributionCell.hashCode(), differentValue.hashCode()); @@ -84,12 +115,19 @@ public void testNotEquals() { @Test public void testReset() { - DistributionCell distributionCell = new DistributionCell(MetricName.named("namespace", "name")); + DistributionCell distributionCell = + new DistributionCell( + DistributionMetricKey.create( + MetricName.named("namespace", "name"), ImmutableSet.of(95.0D, 99.0D))); distributionCell.update(2); - assertThat(distributionCell.getCumulative(), equalTo(DistributionData.create(2, 1, 2, 2))); - + ImmutableMap percentiles = + ImmutableMap.builder().put(95.0D, 2.0D).put(99.0D, 2.0D).build(); + Assert.assertEquals(distributionCell.getCumulative().percentiles(), percentiles); + Assert.assertEquals( + distributionCell.getCumulative().extractResult(), + DistributionResult.create(2, 1, 2, 2, percentiles)); distributionCell.reset(); - assertThat(distributionCell.getCumulative(), equalTo(DistributionData.EMPTY)); - assertThat(distributionCell.getDirty(), equalTo(new DirtyState())); + Assert.assertEquals(distributionCell.getCumulative(), DistributionData.empty()); + Assert.assertEquals(distributionCell.getDirty(), new DirtyState()); } } diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/MetricsContainerImplTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/MetricsContainerImplTest.java index c20ee5de21933..81e6bf9da6dba 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/MetricsContainerImplTest.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/MetricsContainerImplTest.java @@ -24,6 +24,7 @@ import static org.hamcrest.collection.IsIterableContainingInAnyOrder.containsInAnyOrder; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue; @@ -34,8 +35,12 @@ import java.util.Map; import java.util.Set; import org.apache.beam.model.pipeline.v1.MetricsApi.MonitoringInfo; +import org.apache.beam.sdk.metrics.DistributionResult; import org.apache.beam.sdk.metrics.MetricName; import org.apache.beam.sdk.util.HistogramData; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet; import org.junit.Assert; import org.junit.Test; import org.junit.runner.RunWith; @@ -113,48 +118,54 @@ public void testCounterCumulatives() { @Test public void testDistributionDeltas() { MetricsContainerImpl container = new MetricsContainerImpl("step1"); - DistributionCell c1 = container.getDistribution(MetricName.named("ns", "name1")); + DistributionCell c1 = + container.getDistribution(MetricName.named("ns", "name1"), ImmutableSet.of(95.0D, 99.0D)); DistributionCell c2 = container.getDistribution(MetricName.named("ns", "name2")); assertThat( "Initial update includes initial zero-values", container.getUpdates().distributionUpdates(), containsInAnyOrder( - metricUpdate("name1", DistributionData.EMPTY), - metricUpdate("name2", DistributionData.EMPTY))); - - container.commitUpdates(); - assertThat( - "No updates after commit", container.getUpdates().distributionUpdates(), emptyIterable()); + metricUpdate("name1", DistributionData.empty()), + metricUpdate("name2", DistributionData.empty()))); c1.update(5L); c2.update(4L); + ImmutableMap percentile1 = + ImmutableMap.builder().put(95.0D, 5.0D).put(99.0D, 5.0D).build(); + ImmutableMap percentile2 = ImmutableMap.of(); + + ImmutableList distributionResultsPreCommit = + ImmutableList.copyOf(container.getUpdates().distributionUpdates()).stream() + .map(update -> update.getUpdate().extractResult()) + .collect(ImmutableList.toImmutableList()); - assertThat( - container.getUpdates().distributionUpdates(), - containsInAnyOrder( - metricUpdate("name1", DistributionData.create(5, 1, 5, 5)), - metricUpdate("name2", DistributionData.create(4, 1, 4, 4)))); assertThat( "Updates stay the same without commit", - container.getUpdates().distributionUpdates(), + distributionResultsPreCommit, containsInAnyOrder( - metricUpdate("name1", DistributionData.create(5, 1, 5, 5)), - metricUpdate("name2", DistributionData.create(4, 1, 4, 4)))); + DistributionResult.create(5, 1, 5, 5, percentile1), + DistributionResult.create(4, 1, 4, 4, percentile2))); container.commitUpdates(); assertThat( - "No updatess after commit", container.getUpdates().distributionUpdates(), emptyIterable()); + "No updates after commit", container.getUpdates().distributionUpdates(), emptyIterable()); c1.update(8L); c1.update(4L); + percentile1 = ImmutableMap.builder().put(95.0D, 8.0D).put(99.0D, 8.0D).build(); + ImmutableList distributionResultsPostCommit = + ImmutableList.copyOf(container.getUpdates().distributionUpdates()).stream() + .map(update -> update.getUpdate().extractResult()) + .collect(ImmutableList.toImmutableList()); + assertThat( - container.getUpdates().distributionUpdates(), - contains(metricUpdate("name1", DistributionData.create(17, 3, 4, 8)))); + distributionResultsPostCommit, + contains(DistributionResult.create(17, 3, 4, 8, percentile1))); container.commitUpdates(); DistributionCell dne = container.tryGetDistribution(MetricName.named("ns", "dne")); - assertEquals(dne, null); + assertNull(dne); } @Test diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/MetricsContainerStepMapTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/MetricsContainerStepMapTest.java index 4718a6f2fed3a..29ff42ca9840f 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/MetricsContainerStepMapTest.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/MetricsContainerStepMapTest.java @@ -40,6 +40,9 @@ import org.apache.beam.sdk.metrics.Metrics; import org.apache.beam.sdk.metrics.MetricsEnvironment; import org.apache.beam.sdk.metrics.MetricsFilter; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet; import org.hamcrest.collection.IsIterableWithSize; import org.joda.time.Instant; import org.junit.Assert; @@ -57,15 +60,19 @@ public class MetricsContainerStepMapTest { private static final String STEP1 = "myStep1"; private static final String STEP2 = "myStep2"; private static final String COUNTER_NAME = "myCounter"; - private static final String DISTRIBUTION_NAME = "myDistribution"; + private static final String DISTRIBUTION_NAME1 = "myDistribution1"; + private static final String DISTRIBUTION_NAME2 = "myDistribution2"; private static final String GAUGE_NAME = "myGauge"; private static final long VALUE = 100; private static final Counter counter = Metrics.counter(MetricsContainerStepMapTest.class, COUNTER_NAME); - private static final Distribution distribution = - Metrics.distribution(MetricsContainerStepMapTest.class, DISTRIBUTION_NAME); + private static final Distribution distribution1 = + Metrics.distribution(MetricsContainerStepMapTest.class, DISTRIBUTION_NAME1); + private static final Distribution distribution2 = + Metrics.distribution( + MetricsContainerStepMapTest.class, DISTRIBUTION_NAME2, ImmutableSet.of(90.0D, 99.0D)); private static final Gauge gauge = Metrics.gauge(MetricsContainerStepMapTest.class, GAUGE_NAME); private static final MetricsContainerImpl metricsContainer; @@ -74,8 +81,12 @@ public class MetricsContainerStepMapTest { metricsContainer = new MetricsContainerImpl(null); try (Closeable ignored = MetricsEnvironment.scopedMetricsContainer(metricsContainer)) { counter.inc(VALUE); - distribution.update(VALUE); - distribution.update(VALUE * 2); + distribution1.update(VALUE); + distribution1.update(VALUE * 2); + distribution1.update(VALUE * 3); + distribution2.update(VALUE); + distribution2.update(VALUE * 2); + distribution2.update(VALUE * 3); gauge.set(VALUE); } catch (IOException e) { LOG.error(e.getMessage(), e); @@ -92,20 +103,28 @@ public void testAttemptedAccumulatedMetricResults() { attemptedMetrics.update(STEP2, metricsContainer); MetricResults metricResults = asAttemptedOnlyMetricResults(attemptedMetrics); - MetricQueryResults step1res = metricResults.queryMetrics(MetricsFilter.builder().addStep(STEP1).build()); assertIterableSize(step1res.getCounters(), 1); - assertIterableSize(step1res.getDistributions(), 1); + assertIterableSize(step1res.getDistributions(), 2); assertIterableSize(step1res.getGauges(), 1); + ImmutableMap percentiles1 = ImmutableMap.of(); + ImmutableMap percentiles2 = + ImmutableMap.builder().put(90.0D, 300.0D).put(99.0D, 300.0D).build(); assertCounter(COUNTER_NAME, step1res, STEP1, VALUE, false); assertDistribution( - DISTRIBUTION_NAME, + DISTRIBUTION_NAME1, + step1res, + STEP1, + DistributionResult.create(VALUE * 6, 3, VALUE, VALUE * 3, percentiles1), + false); + assertDistribution( + DISTRIBUTION_NAME2, step1res, STEP1, - DistributionResult.create(VALUE * 3, 2, VALUE, VALUE * 2), + DistributionResult.create(VALUE * 6, 3, VALUE, VALUE * 3, percentiles2), false); assertGauge(GAUGE_NAME, step1res, STEP1, GaugeResult.create(VALUE, Instant.now()), false); @@ -113,22 +132,28 @@ public void testAttemptedAccumulatedMetricResults() { metricResults.queryMetrics(MetricsFilter.builder().addStep(STEP2).build()); assertIterableSize(step2res.getCounters(), 1); - assertIterableSize(step2res.getDistributions(), 1); + assertIterableSize(step2res.getDistributions(), 2); assertIterableSize(step2res.getGauges(), 1); assertCounter(COUNTER_NAME, step2res, STEP2, VALUE * 2, false); assertDistribution( - DISTRIBUTION_NAME, + DISTRIBUTION_NAME1, + step2res, + STEP2, + DistributionResult.create(VALUE * 12, 6, VALUE, VALUE * 3, percentiles1), + false); + assertDistribution( + DISTRIBUTION_NAME2, step2res, STEP2, - DistributionResult.create(VALUE * 6, 4, VALUE, VALUE * 2), + DistributionResult.create(VALUE * 12, 6, VALUE, VALUE * 3, percentiles2), false); assertGauge(GAUGE_NAME, step2res, STEP2, GaugeResult.create(VALUE, Instant.now()), false); MetricQueryResults allres = metricResults.allMetrics(); assertIterableSize(allres.getCounters(), 2); - assertIterableSize(allres.getDistributions(), 2); + assertIterableSize(allres.getDistributions(), 4); assertIterableSize(allres.getGauges(), 2); } @@ -160,7 +185,7 @@ public void testDistributionCommittedUnsupportedInAttemptedAccumulatedMetricResu thrown.expectMessage("This runner does not currently support committed metrics results."); assertDistribution( - DISTRIBUTION_NAME, step1res, STEP1, DistributionResult.IDENTITY_ELEMENT, true); + DISTRIBUTION_NAME1, step1res, STEP1, DistributionResult.IDENTITY_ELEMENT, true); } @Test @@ -184,8 +209,7 @@ public void testUserMetricDroppedOnUnbounded() { CounterCell c1 = testObject.getUnboundContainer().getCounter(MetricName.named("ns", "name1")); c1.inc(5); - List expected = new ArrayList(); - assertThat(testObject.getMonitoringInfos(), containsInAnyOrder(expected.toArray())); + assertThat(testObject.getMonitoringInfos(), containsInAnyOrder(ImmutableList.of().toArray())); } @Test @@ -205,7 +229,7 @@ public void testUpdateAllUpdatesUnboundedAndBoundedContainers() { MetricsContainerStepMap testObject = new MetricsContainerStepMap(); testObject.updateAll(baseMetricContainerRegistry); - List expected = new ArrayList(); + List expected = new ArrayList<>(); SimpleMonitoringInfoBuilder builder = new SimpleMonitoringInfoBuilder(); builder @@ -218,7 +242,7 @@ public void testUpdateAllUpdatesUnboundedAndBoundedContainers() { expected.add(MonitoringInfoTestUtil.testElementCountMonitoringInfo(14)); - ArrayList actual = new ArrayList(); + ArrayList actual = new ArrayList<>(); for (MonitoringInfo mi : testObject.getMonitoringInfos()) { actual.add(mi); @@ -246,56 +270,83 @@ public void testAttemptedAndCommittedAccumulatedMetricResults() { metricResults.queryMetrics(MetricsFilter.builder().addStep(STEP1).build()); assertIterableSize(step1res.getCounters(), 1); - assertIterableSize(step1res.getDistributions(), 1); + assertIterableSize(step1res.getDistributions(), 2); assertIterableSize(step1res.getGauges(), 1); assertCounter(COUNTER_NAME, step1res, STEP1, VALUE * 2, false); + ImmutableMap percentiles1 = ImmutableMap.of(); + ImmutableMap percentiles2 = + ImmutableMap.builder().put(90.0D, 300.0D).put(99.0D, 300.0D).build(); assertDistribution( - DISTRIBUTION_NAME, + DISTRIBUTION_NAME1, step1res, STEP1, - DistributionResult.create(VALUE * 6, 4, VALUE, VALUE * 2), + DistributionResult.create(VALUE * 12, 6, VALUE, VALUE * 3, percentiles1), + false); + assertDistribution( + DISTRIBUTION_NAME2, + step1res, + STEP1, + DistributionResult.create(VALUE * 12, 6, VALUE, VALUE * 3, percentiles2), false); assertGauge(GAUGE_NAME, step1res, STEP1, GaugeResult.create(VALUE, Instant.now()), false); assertCounter(COUNTER_NAME, step1res, STEP1, VALUE, true); assertDistribution( - DISTRIBUTION_NAME, + DISTRIBUTION_NAME1, step1res, STEP1, - DistributionResult.create(VALUE * 3, 2, VALUE, VALUE * 2), + DistributionResult.create(VALUE * 6, 3, VALUE, VALUE * 3, percentiles1), true); + assertDistribution( + DISTRIBUTION_NAME2, + step1res, + STEP1, + DistributionResult.create(VALUE * 12, 6, VALUE, VALUE * 3, percentiles2), + false); assertGauge(GAUGE_NAME, step1res, STEP1, GaugeResult.create(VALUE, Instant.now()), true); MetricQueryResults step2res = metricResults.queryMetrics(MetricsFilter.builder().addStep(STEP2).build()); assertIterableSize(step2res.getCounters(), 1); - assertIterableSize(step2res.getDistributions(), 1); + assertIterableSize(step2res.getDistributions(), 2); assertIterableSize(step2res.getGauges(), 1); assertCounter(COUNTER_NAME, step2res, STEP2, VALUE * 3, false); assertDistribution( - DISTRIBUTION_NAME, + DISTRIBUTION_NAME1, step2res, STEP2, - DistributionResult.create(VALUE * 9, 6, VALUE, VALUE * 2), + DistributionResult.create(VALUE * 18, 9, VALUE, VALUE * 3, percentiles1), + false); + assertDistribution( + DISTRIBUTION_NAME2, + step2res, + STEP2, + DistributionResult.create(VALUE * 18, 9, VALUE, VALUE * 3, percentiles2), false); assertGauge(GAUGE_NAME, step2res, STEP2, GaugeResult.create(VALUE, Instant.now()), false); assertCounter(COUNTER_NAME, step2res, STEP2, VALUE * 2, true); assertDistribution( - DISTRIBUTION_NAME, + DISTRIBUTION_NAME1, step2res, STEP2, - DistributionResult.create(VALUE * 6, 4, VALUE, VALUE * 2), + DistributionResult.create(VALUE * 12, 6, VALUE, VALUE * 3, percentiles1), + true); + assertDistribution( + DISTRIBUTION_NAME2, + step2res, + STEP2, + DistributionResult.create(VALUE * 12, 6, VALUE, VALUE * 3, percentiles2), true); assertGauge(GAUGE_NAME, step2res, STEP2, GaugeResult.create(VALUE, Instant.now()), true); MetricQueryResults allres = metricResults.queryMetrics(MetricsFilter.builder().build()); assertIterableSize(allres.getCounters(), 2); - assertIterableSize(allres.getDistributions(), 2); + assertIterableSize(allres.getDistributions(), 4); assertIterableSize(allres.getGauges(), 2); } @@ -337,21 +388,37 @@ public void testReset() { MetricResults metricResults = asAttemptedOnlyMetricResults(attemptedMetrics); MetricQueryResults allres = metricResults.allMetrics(); + + ImmutableMap percentiles1 = ImmutableMap.of(); + ImmutableMap percentiles2 = + ImmutableMap.builder().put(90.0D, 300.0D).put(99.0D, 300.0D).build(); assertCounter(COUNTER_NAME, allres, STEP1, VALUE, false); assertDistribution( - DISTRIBUTION_NAME, + DISTRIBUTION_NAME1, allres, STEP1, - DistributionResult.create(VALUE * 3, 2, VALUE, VALUE * 2), + DistributionResult.create(VALUE * 6, 3, VALUE, VALUE * 3, percentiles1), + false); + assertDistribution( + DISTRIBUTION_NAME2, + allres, + STEP1, + DistributionResult.create(VALUE * 6, 3, VALUE, VALUE * 3, percentiles2), false); assertGauge(GAUGE_NAME, allres, STEP1, GaugeResult.create(VALUE, Instant.now()), false); assertCounter(COUNTER_NAME, allres, STEP2, VALUE * 2, false); assertDistribution( - DISTRIBUTION_NAME, + DISTRIBUTION_NAME1, + allres, + STEP2, + DistributionResult.create(VALUE * 12, 6, VALUE, VALUE * 3, percentiles1), + false); + assertDistribution( + DISTRIBUTION_NAME2, allres, STEP2, - DistributionResult.create(VALUE * 6, 4, VALUE, VALUE * 2), + DistributionResult.create(VALUE * 12, 6, VALUE, VALUE * 3, percentiles2), false); assertGauge(GAUGE_NAME, allres, STEP2, GaugeResult.create(VALUE, Instant.now()), false); @@ -362,13 +429,13 @@ public void testReset() { // Check that the metrics container for STEP1 is reset assertCounter(COUNTER_NAME, allres, STEP1, 0L, false); assertDistribution( - DISTRIBUTION_NAME, allres, STEP1, DistributionResult.IDENTITY_ELEMENT, false); + DISTRIBUTION_NAME1, allres, STEP1, DistributionResult.IDENTITY_ELEMENT, false); assertGauge(GAUGE_NAME, allres, STEP1, GaugeResult.empty(), false); // Check that the metrics container for STEP2 is reset assertCounter(COUNTER_NAME, allres, STEP2, 0L, false); assertDistribution( - DISTRIBUTION_NAME, allres, STEP2, DistributionResult.IDENTITY_ELEMENT, false); + DISTRIBUTION_NAME1, allres, STEP2, DistributionResult.IDENTITY_ELEMENT, false); assertGauge(GAUGE_NAME, allres, STEP2, GaugeResult.empty(), false); } diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectMetrics.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectMetrics.java index ff4606cf7c6e1..df500e2edba34 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectMetrics.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectMetrics.java @@ -177,12 +177,12 @@ public Long extract(Long data) { new MetricAggregation() { @Override public DistributionData zero() { - return DistributionData.EMPTY; + return DistributionData.empty(); } @Override public DistributionData combine(Iterable updates) { - DistributionData result = DistributionData.EMPTY; + DistributionData result = DistributionData.empty(); for (DistributionData update : updates) { result = result.combine(update); } diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DeltaDistributionCell.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DeltaDistributionCell.java index 6e75d4b9eb371..4c6fc2d978bd6 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DeltaDistributionCell.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DeltaDistributionCell.java @@ -32,7 +32,7 @@ */ public class DeltaDistributionCell implements Distribution, MetricCell { private final AtomicReference value = - new AtomicReference<>(DistributionData.EMPTY); + new AtomicReference<>(DistributionData.empty()); private final MetricName name; public DeltaDistributionCell(MetricName name) { @@ -54,12 +54,7 @@ void update(DistributionData data) { @Override public void reset() { - value.set(DistributionData.EMPTY); - } - - @Override - public void update(long sum, long count, long min, long max) { - update(DistributionData.create(sum, count, min, max)); + value.get().reset(); } @Override @@ -74,7 +69,7 @@ public DistributionData getCumulative() { } public DistributionData getAndReset() { - return value.getAndUpdate(unused -> DistributionData.EMPTY); + return value.getAndUpdate(DistributionData::reset); } @Override diff --git a/runners/jet/src/main/java/org/apache/beam/runners/jet/metrics/DistributionImpl.java b/runners/jet/src/main/java/org/apache/beam/runners/jet/metrics/DistributionImpl.java index 89f9b5dda45b8..05d1d2104dc3a 100644 --- a/runners/jet/src/main/java/org/apache/beam/runners/jet/metrics/DistributionImpl.java +++ b/runners/jet/src/main/java/org/apache/beam/runners/jet/metrics/DistributionImpl.java @@ -24,7 +24,7 @@ /** Implementation of {@link Distribution}. */ public class DistributionImpl extends AbstractMetric implements Distribution { - private DistributionData distributionData = DistributionData.EMPTY; + private DistributionData distributionData = DistributionData.empty(); public DistributionImpl(MetricName name) { super(name); @@ -37,12 +37,7 @@ DistributionData getValue() { @Override public void update(long value) { - update(DistributionData.singleton(value)); - } - - @Override - public void update(long sum, long count, long min, long max) { - update(DistributionData.create(sum, count, min, max)); + distributionData.update(value); } private void update(DistributionData update) { diff --git a/runners/jet/src/main/java/org/apache/beam/runners/jet/metrics/JetMetricResults.java b/runners/jet/src/main/java/org/apache/beam/runners/jet/metrics/JetMetricResults.java index 2b5d5f58c2701..acda16f50cc64 100644 --- a/runners/jet/src/main/java/org/apache/beam/runners/jet/metrics/JetMetricResults.java +++ b/runners/jet/src/main/java/org/apache/beam/runners/jet/metrics/JetMetricResults.java @@ -157,7 +157,8 @@ private static class Distributions { void merge(Iterable> updates) { for (MetricUpdate update : updates) { MetricKey key = update.getKey(); - DistributionData oldDistribution = distributions.getOrDefault(key, DistributionData.EMPTY); + DistributionData oldDistribution = + distributions.getOrDefault(key, DistributionData.empty()); DistributionData updatedDistribution = update.getUpdate().combine(oldDistribution); distributions.put(key, updatedDistribution); } diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/metrics/SamzaMetricsContainer.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/metrics/SamzaMetricsContainer.java index 745a1f21417d9..6af5abb678959 100644 --- a/runners/samza/src/main/java/org/apache/beam/runners/samza/metrics/SamzaMetricsContainer.java +++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/metrics/SamzaMetricsContainer.java @@ -28,6 +28,7 @@ import org.apache.beam.runners.core.metrics.MetricUpdates; import org.apache.beam.runners.core.metrics.MetricsContainerImpl; import org.apache.beam.runners.core.metrics.MetricsContainerStepMap; +import org.apache.beam.sdk.metrics.DistributionResult; import org.apache.beam.sdk.metrics.GaugeResult; import org.apache.beam.sdk.metrics.MetricQueryResults; import org.apache.beam.sdk.metrics.MetricResult; @@ -94,7 +95,8 @@ public void updateMetrics(String stepName) { final GaugeUpdater updateGauge = new GaugeUpdater(); results.getGauges().forEach(updateGauge); - // TODO(BEAM-12614): add distribution metrics to Samza + final DistributionUpdater updateDistribution = new DistributionUpdater(); + results.getDistributions().forEach(updateDistribution); } private class CounterUpdater implements Consumer> { @@ -123,6 +125,49 @@ public void accept(MetricResult metricResult) { } } + private class DistributionUpdater implements Consumer> { + @Override + public void accept(MetricResult metricResult) { + final String metricName = getMetricName(metricResult); + final DistributionResult distributionResult = metricResult.getAttempted(); + setLongGauge(metricName + "Sum", distributionResult.getSum()); + setLongGauge(metricName + "Count", distributionResult.getCount()); + setLongGauge(metricName + "Max", distributionResult.getMax()); + setLongGauge(metricName + "Min", distributionResult.getMin()); + distributionResult + .getPercentiles() + .forEach( + (percentile, percentileValue) -> { + final String percentileMetricName = metricName + getPercentileSuffix(percentile); + @SuppressWarnings("unchecked") + Gauge gauge = (Gauge) getSamzaMetricFor(percentileMetricName); + if (gauge == null) { + gauge = metricsRegistry.newGauge(BEAM_METRICS_GROUP, percentileMetricName, 0.0D); + } + gauge.set(percentileValue); + }); + } + + private void setLongGauge(String metricName, Long value) { + @SuppressWarnings("unchecked") + Gauge gauge = (Gauge) getSamzaMetricFor(metricName); + if (gauge == null) { + gauge = metricsRegistry.newGauge(BEAM_METRICS_GROUP, metricName, 0L); + } + gauge.set(value); + } + + private String getPercentileSuffix(Double value) { + String strValue; + if (value == value.intValue()) { + strValue = String.valueOf(value.intValue()); + } else { + strValue = String.valueOf(value).replace(".", "_"); + } + return "P" + strValue; + } + } + private Metric getSamzaMetricFor(String metricName) { return metricsRegistry.getGroup(BEAM_METRICS_GROUP).get(metricName); } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DelegatingDistribution.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DelegatingDistribution.java index 6cfe98e019316..a17c6788441ec 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DelegatingDistribution.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DelegatingDistribution.java @@ -48,17 +48,6 @@ public void update(long value) { } } - @Override - public void update(long sum, long count, long min, long max) { - MetricsContainer container = - this.processWideContainer - ? MetricsEnvironment.getProcessWideContainer() - : MetricsEnvironment.getCurrentContainer(); - if (container != null) { - container.getDistribution(name).update(sum, count, min, max); - } - } - @Override public MetricName getName() { return name; diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Distribution.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Distribution.java index 4922a04d4b7ab..06cbad571b5d9 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Distribution.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Distribution.java @@ -25,6 +25,4 @@ public interface Distribution extends Metric { /** Add an observation to this distribution. */ void update(long value); - - void update(long sum, long count, long min, long max); } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DistributionResult.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DistributionResult.java index 4c7a8322f17be..4188902194d4d 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DistributionResult.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DistributionResult.java @@ -18,14 +18,15 @@ package org.apache.beam.sdk.metrics; import com.google.auto.value.AutoValue; +import java.util.Map; import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.annotations.Experimental.Kind; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap; /** The result of a {@link Distribution} metric. */ @Experimental(Kind.METRICS) @AutoValue public abstract class DistributionResult { - public abstract long getSum(); public abstract long getCount(); @@ -34,6 +35,8 @@ public abstract class DistributionResult { public abstract long getMax(); + public abstract Map getPercentiles(); + public double getMean() { return (1.0 * getSum()) / getCount(); } @@ -42,7 +45,13 @@ public double getMean() { public static final DistributionResult IDENTITY_ELEMENT = create(0, 0, Long.MAX_VALUE, Long.MIN_VALUE); + public static DistributionResult create( + long sum, long count, long min, long max, Map percentiles) { + return new AutoValue_DistributionResult(sum, count, min, max, percentiles); + } + + /** For backward compatibility. */ public static DistributionResult create(long sum, long count, long min, long max) { - return new AutoValue_DistributionResult(sum, count, min, max); + return create(sum, count, min, max, ImmutableMap.of()); } } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Metrics.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Metrics.java index aa26aacf6b4ce..260253073a907 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Metrics.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Metrics.java @@ -18,8 +18,11 @@ package org.apache.beam.sdk.metrics; import java.io.Serializable; +import java.util.Set; import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.annotations.Experimental.Kind; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet; /** * The Metrics is a utility class for producing various kinds of metrics for reporting @@ -80,6 +83,48 @@ public static Distribution distribution(Class namespace, String name) { return new DelegatingDistribution(MetricName.named(namespace, name)); } + /** + * Create a metric that records various statistics about the distribution of reported values. + * + * @param namespace namespace for the distribution + * @param name name of the distribution + * @param percentiles Set of percentiles to be computed in the distribution. If the user wishes to + * compute the 90th and 99th percentile, the set of percentiles would be [90.0D, 99.0D]. + */ + public static Distribution distribution( + Class namespace, String name, Set percentiles) { + validatePercentiles(percentiles); + return new DelegatingDistribution(MetricName.named(namespace, name), percentiles); + } + + /** + * Create a metric that records various statistics about the distribution of reported values. + * + * @param namespace namespace for the distribution + * @param name name of the distribution + * @param percentiles Set of percentiles to be computed in the distribution. If the user wishes to + * compute the 90th and 99th percentile, the set of percentiles would be [90.0D, 99.0D]. + */ + public static Distribution distribution(String namespace, String name, Set percentiles) { + validatePercentiles(percentiles); + return new DelegatingDistribution(MetricName.named(namespace, name), percentiles); + } + + private static void validatePercentiles(Set percentiles) { + Preconditions.checkArgument( + percentiles != null && !percentiles.isEmpty(), + "Percentiles cannot be null or an empty set."); + final ImmutableSet invalidPercentiles = + percentiles.stream() + .filter(perc -> perc < 0.0 || perc > 100.0) + .collect(ImmutableSet.toImmutableSet()); + Preconditions.checkArgument( + invalidPercentiles.isEmpty(), + "User supplied percentiles should be between " + + "0.0 and 100.0. Following invalid percentiles were supplied: " + + invalidPercentiles); + } + /** * Create a metric that can have its new value set, and is aggregated by taking the last reported * value. @@ -101,24 +146,23 @@ public static Gauge gauge(Class namespace, String name) { */ private static class DelegatingDistribution implements Metric, Distribution, Serializable { private final MetricName name; + private final Set percentiles; private DelegatingDistribution(MetricName name) { this.name = name; + this.percentiles = ImmutableSet.of(); } - @Override - public void update(long value) { - MetricsContainer container = MetricsEnvironment.getCurrentContainer(); - if (container != null) { - container.getDistribution(name).update(value); - } + private DelegatingDistribution(MetricName name, Set percentiles) { + this.name = name; + this.percentiles = percentiles; } @Override - public void update(long sum, long count, long min, long max) { + public void update(long value) { MetricsContainer container = MetricsEnvironment.getCurrentContainer(); if (container != null) { - container.getDistribution(name).update(sum, count, min, max); + container.getDistribution(name, percentiles).update(value); } } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsContainer.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsContainer.java index f10fccdd44d3e..ec73bd9d11d9c 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsContainer.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsContainer.java @@ -18,6 +18,7 @@ package org.apache.beam.sdk.metrics; import java.io.Serializable; +import java.util.Set; import org.apache.beam.model.pipeline.v1.MetricsApi; import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.annotations.Experimental.Kind; @@ -42,6 +43,17 @@ public interface MetricsContainer extends Serializable { */ Distribution getDistribution(MetricName metricName); + /** + * Return the {@link Distribution} that should be used for implementing the given {@code + * metricName} in this container. + */ + default Distribution getDistribution(MetricName metricName, Set percentiles) { + if (percentiles.isEmpty()) { + return getDistribution(metricName); + } + throw new RuntimeException("Distribution metric with custom percentiles is not supported yet."); + } + /** * Return the {@link Gauge} that should be used for implementing the given {@code metricName} in * this container. diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsTest.java index 089d679933141..8fb3de05c573f 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsTest.java @@ -43,6 +43,8 @@ import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.sdk.values.TupleTagList; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet; import org.joda.time.Duration; import org.joda.time.Instant; import org.junit.After; @@ -56,7 +58,6 @@ /** Tests for {@link Metrics}. */ public class MetricsTest implements Serializable { - private static final String NS = "test"; private static final String NAME = "name"; private static final MetricName METRIC_NAME = MetricName.named(NS, NAME); @@ -103,7 +104,11 @@ public void startBundle() { @SuppressWarnings("unused") @ProcessElement public void processElement(ProcessContext c) { - Distribution values = Metrics.distribution(MetricsTest.class, "input"); + Distribution values = + Metrics.distribution( + MetricsTest.class, + "input", + ImmutableSet.of(25.0D, 90.0D, 95.0D, 99.0D)); count.inc(); values.update(c.element()); @@ -123,7 +128,11 @@ public void finishBundle() { @SuppressWarnings("unused") @ProcessElement public void processElement(ProcessContext c) { - Distribution values = Metrics.distribution(MetricsTest.class, "input"); + Distribution values = + Metrics.distribution( + MetricsTest.class, + "input", + ImmutableSet.of(25.0D, 90.0D, 95.0D, 99.0D)); Gauge gauge = Metrics.gauge(MetricsTest.class, "my-gauge"); Integer element = c.element(); count.inc(); @@ -180,6 +189,30 @@ public void testDistributionWithEmptyName() { Metrics.distribution(NS, ""); } + @Test + public void testDistributionWithEmptyPercentiles() { + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage("Percentiles cannot be null or an empty set"); + Metrics.distribution(NS, NAME, ImmutableSet.of()); + } + + @Test + public void testDistributionWithNullPercentiles() { + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage("Percentiles cannot be null or an empty set"); + Metrics.distribution(NS, NAME, null); + } + + @Test + public void testDistributionWithInvalidPercentiles() { + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage( + "User supplied percentiles should be between 0.0 and 100.0. " + + "Following invalid percentiles were supplied: " + + ImmutableSet.of(-1.0D, 100.1D)); + Metrics.distribution(NS, NAME, ImmutableSet.of(-1.0D, 0.0, 1.0, 100.0, 100.1D)); + } + @Test public void testDistributionWithEmptyNamespace() { thrown.expect(IllegalArgumentException.class); @@ -190,6 +223,8 @@ public void testDistributionWithEmptyNamespace() { public void testDistributionToCell() { MetricsContainer mockContainer = Mockito.mock(MetricsContainer.class); Distribution mockDistribution = Mockito.mock(Distribution.class); + when(mockContainer.getDistribution(METRIC_NAME, ImmutableSet.of())) + .thenReturn(mockDistribution); when(mockContainer.getDistribution(METRIC_NAME)).thenReturn(mockDistribution); Distribution distribution = Metrics.distribution(NS, NAME); @@ -416,6 +451,13 @@ private static void assertGaugeMetrics(MetricQueryResults metrics, boolean isCom } private static void assertDistributionMetrics(MetricQueryResults metrics, boolean isCommitted) { + ImmutableMap percentiles = + ImmutableMap.builder() + .put(25.0D, 5.0D) + .put(90.0D, 13.0D) + .put(95.0D, 13.0D) + .put(99.0D, 13.0D) + .build(); assertThat( metrics.getDistributions(), anyOf( @@ -425,14 +467,14 @@ private static void assertDistributionMetrics(MetricQueryResults metrics, boolea NAMESPACE, "input", "MyStep1", - DistributionResult.create(26L, 3L, 5L, 13L), + DistributionResult.create(26L, 3L, 5L, 13L, percentiles), isCommitted)), hasItem( metricsResult( NAMESPACE, "input", "MyStep1-ParMultiDo-Anonymous-", - DistributionResult.create(26L, 3L, 5L, 13L), + DistributionResult.create(26L, 3L, 5L, 13L, percentiles), isCommitted)))); assertThat( @@ -442,7 +484,7 @@ private static void assertDistributionMetrics(MetricQueryResults metrics, boolea NAMESPACE, "input", "MyStep2", - DistributionResult.create(52L, 6L, 5L, 13L), + DistributionResult.create(52L, 6L, 5L, 13L, percentiles), isCommitted))); assertThat( metrics.getDistributions(), diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/RpcQosImpl.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/RpcQosImpl.java index f91f18cd4bd4c..dd30d1327acf8 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/RpcQosImpl.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/RpcQosImpl.java @@ -1040,9 +1040,6 @@ private NullDistribution(MetricName name) { @Override public void update(long value) {} - @Override - public void update(long sum, long count, long min, long max) {} - @Override public MetricName getName() { return name; diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/BaseFirestoreV1WriteFnTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/BaseFirestoreV1WriteFnTest.java index 727fdc07d42ac..b1df17a648113 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/BaseFirestoreV1WriteFnTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/BaseFirestoreV1WriteFnTest.java @@ -906,11 +906,6 @@ public void update(long value) { updateInvocations.add(value); } - @Override - public void update(long sum, long count, long min, long max) { - throw new IllegalStateException("not implemented"); - } - @Override public MetricName getName() { return name;