Skip to content

Commit

Permalink
Revert "Adding percentiles to distribution metric (apache#16)"
Browse files Browse the repository at this point in the history
This reverts commit 9459eef.
  • Loading branch information
MabelYC committed Nov 4, 2021
1 parent 9459eef commit 493bf20
Show file tree
Hide file tree
Showing 23 changed files with 182 additions and 689 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -546,7 +546,6 @@ class BeamModulePlugin implements Plugin<Project> {
commons_io : "commons-io:commons-io:2.6",
commons_lang3 : "org.apache.commons:commons-lang3:3.9",
commons_math3 : "org.apache.commons:commons-math3:3.6.1",
datasketches : "org.apache.datasketches:datasketches-java:3.0.0",
error_prone_annotations : "com.google.errorprone:error_prone_annotations:$errorprone_version",
flogger_system_backend : "com.google.flogger:flogger-system-backend:0.6",
gax : "com.google.api:gax", // google_cloud_platform_libraries_bom sets version
Expand Down
1 change: 0 additions & 1 deletion runners/core-java/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ dependencies {
compile project(path: ":model:job-management", configuration: "shadow")
compile project(":runners:core-construction-java")
compile project(":sdks:java:fn-execution")
compile library.java.datasketches
compile library.java.vendored_guava_26_0_jre
compile library.java.joda_time
compile library.java.vendored_grpc_1_36_0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.beam.runners.core.ReduceFnContextFactory.StateStyle;
import org.apache.beam.runners.core.StateNamespaces.WindowNamespace;
import org.apache.beam.runners.core.TimerInternals.TimerData;
import org.apache.beam.runners.core.metrics.MetricsContainerImpl;
import org.apache.beam.runners.core.triggers.ExecutableTriggerStateMachine;
import org.apache.beam.runners.core.triggers.TriggerStateMachineContextFactory;
import org.apache.beam.runners.core.triggers.TriggerStateMachineRunner;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,9 @@
*/
public class DistributionCell implements Distribution, MetricCell<DistributionData> {

private final DirtyState dirty;
private final AtomicReference<DistributionData> value;
private final DirtyState dirty = new DirtyState();
private final AtomicReference<DistributionData> value =
new AtomicReference<>(DistributionData.EMPTY);
private final MetricName name;

/**
Expand All @@ -44,36 +45,31 @@ public class DistributionCell implements Distribution, MetricCell<DistributionDa
* MetricsContainer}. These constructors are *only* public so runners can instantiate.
*/
public DistributionCell(MetricName name) {
this.dirty = new DirtyState();
this.value = new AtomicReference<>(DistributionData.empty());
this.name = name;
}

public DistributionCell(DistributionMetricKey metricKey) {
this.dirty = new DirtyState();
this.value =
new AtomicReference<>(DistributionData.withPercentiles(metricKey.getPercentiles()));
this.name = metricKey.getMetricName();
}

@Override
public void reset() {
dirty.afterModification();
value.get().reset();
value.set(DistributionData.EMPTY);
}

/** Increment the distribution by the given amount. */
@Override
public void update(long n) {
value.get().update(n);
dirty.afterModification();
update(DistributionData.singleton(n));
}

@Override
public void update(long sum, long count, long min, long max) {
update(DistributionData.create(sum, count, min, max));
}

void update(DistributionData other) {
void update(DistributionData data) {
DistributionData original;
do {
original = value.get();
} while (!value.compareAndSet(original, original.combine(other)));
} while (!value.compareAndSet(original, original.combine(data)));
dirty.afterModification();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,231 +17,48 @@
*/
package org.apache.beam.runners.core.metrics;

import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import com.google.auto.value.AutoValue;
import java.io.Serializable;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import org.apache.beam.sdk.metrics.DistributionResult;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet;
import org.apache.datasketches.memory.Memory;
import org.apache.datasketches.quantiles.DoublesSketchBuilder;
import org.apache.datasketches.quantiles.DoublesUnion;
import org.apache.datasketches.quantiles.DoublesUnionBuilder;
import org.apache.datasketches.quantiles.UpdateDoublesSketch;

/**
* Data describing the the distribution. This should retain enough detail that it can be combined
* with other {@link DistributionData}.
*
* <p>Datasketch library is used to compute percentiles. See {@linktourl
* https://datasketches.apache.org/}.
*
* <p>This is kept distinct from {@link DistributionResult} since this may be extended to include
* data necessary to approximate quantiles, etc. while {@link DistributionResult} would just include
* the approximate value of those quantiles.
*/
public class DistributionData implements Serializable {
// k = 256 should yield an approximate error ε of less than 1%
private static final int SKETCH_SUMMARY_SIZE = 256;

private final Set<Double> percentiles;
private long sum;
private long count;
private long min;
private long max;
private transient Optional<UpdateDoublesSketch> sketch;

/** Creates an instance of DistributionData with custom percentiles. */
public static DistributionData withPercentiles(Set<Double> percentiles) {
return new DistributionData(0L, 0L, Long.MAX_VALUE, Long.MIN_VALUE, percentiles);
}

/** Backward compatible static factory method. */
public static DistributionData empty() {
return new DistributionData(0L, 0L, Long.MAX_VALUE, Long.MIN_VALUE, ImmutableSet.of());
}

/** Static factory method primary used for testing. */
@VisibleForTesting
public static DistributionData create(long sum, long count, long min, long max) {
return new DistributionData(sum, count, min, max, ImmutableSet.of());
}
@AutoValue
public abstract class DistributionData implements Serializable {

private DistributionData(long sum, long count, long min, long max, Set<Double> percentiles) {
this.sum = sum;
this.count = count;
this.min = min;
this.max = max;
this.percentiles = percentiles;
if (!percentiles.isEmpty()) {
final DoublesSketchBuilder doublesSketchBuilder = new DoublesSketchBuilder();
this.sketch = Optional.of(doublesSketchBuilder.setK(SKETCH_SUMMARY_SIZE).build());
} else {
this.sketch = Optional.empty();
}
}
public abstract long sum();

public static DistributionData singleton(long value) {
final DistributionData distributionData = empty();
distributionData.update(value);
return distributionData;
}
public abstract long count();

////////////////////////////////////////////////////////////////////////////////////////////////////////
// Getters
public abstract long min();

public long sum() {
return sum;
}
public abstract long max();

public long count() {
return count;
}

public long min() {
return min;
}

public long max() {
return max;
}
public static final DistributionData EMPTY = create(0, 0, Long.MAX_VALUE, Long.MIN_VALUE);

/** Gets the percentiles and the percentiles values as a map. */
public Map<Double, Double> percentiles() {
if (!sketch.isPresent() || sketch.get().getN() == 0) {
// if the sketch is not present or is empty, do not compute the percentile
return ImmutableMap.of();
}

double[] quantiles = percentiles.stream().mapToDouble(i -> i / 100).toArray();
double[] quantileResults = sketch.get().getQuantiles(quantiles);

final ImmutableMap.Builder<Double, Double> resultBuilder = ImmutableMap.builder();
for (int k = 0; k < quantiles.length; k++) {
resultBuilder.put(quantiles[k] * 100, quantileResults[k]);
}
return resultBuilder.build();
}

////////////////////////////////////////////////////////////////////////////////////////////////////////

/**
* Updates the distribution with a value. For percentiles, only add the value to the sketch.
* Percentile will be computed prior to calling {@link DistributionCell#getCumulative()} or in
* {@link #extractResult()}.
*
* @param value value to update the distribution with.
*/
public void update(long value) {
++count;
min = Math.min(min, value);
max = Math.max(max, value);
sum += value;
sketch.ifPresent(currSketch -> currSketch.update(value));
public static DistributionData create(long sum, long count, long min, long max) {
return new AutoValue_DistributionData(sum, count, min, max);
}

/** Merges two distributions. */
public DistributionData combine(DistributionData other) {
if (sketch.isPresent()
&& other.sketch.isPresent()
&& sketch.get().getN() > 0
&& other.sketch.get().getN() > 0) {
final DoublesUnion union = new DoublesUnionBuilder().build();
union.update(sketch.get());
union.update(other.sketch.get());
sketch = Optional.of(union.getResult());
} else if (other.sketch.isPresent() && other.sketch.get().getN() > 0) {
sketch = other.sketch;
}
sum += other.sum;
count += other.count;
max = Math.max(max, other.max);
min = Math.min(min, other.min);
return this;
public static DistributionData singleton(long value) {
return create(value, 1, value, value);
}

public DistributionData reset() {
this.sum = 0L;
this.count = 0L;
this.min = Long.MAX_VALUE;
this.max = Long.MIN_VALUE;
if (!this.percentiles.isEmpty()) {
final DoublesSketchBuilder doublesSketchBuilder = new DoublesSketchBuilder();
this.sketch = Optional.of(doublesSketchBuilder.setK(SKETCH_SUMMARY_SIZE).build());
} else {
this.sketch = Optional.empty();
}
return this;
public DistributionData combine(DistributionData value) {
return create(
sum() + value.sum(),
count() + value.count(),
Math.min(value.min(), min()),
Math.max(value.max(), max()));
}

/** Generates DistributionResult from DistributionData. */
public DistributionResult extractResult() {
return DistributionResult.create(sum(), count(), min(), max(), percentiles());
}

@Override
public boolean equals(Object object) {
if (object instanceof DistributionData) {
DistributionData other = (DistributionData) object;
return Objects.equals(max, other.max())
&& Objects.equals(min, other.min())
&& Objects.equals(count, other.count())
&& Objects.equals(sum, other.sum())
&& Objects.equals(percentiles(), other.percentiles());
}
return false;
}

@Override
public int hashCode() {
return Objects.hash(min, max, sum, count, percentiles());
}

@Override
public String toString() {
return "DistributionData{"
+ "sum="
+ sum
+ ", "
+ "count="
+ count
+ ", "
+ "min="
+ min
+ ", "
+ "max="
+ max
+ ", "
+ "percentiles="
+ percentiles()
+ "}";
}

private void writeObject(ObjectOutputStream out) throws IOException {
out.defaultWriteObject();
if (sketch.isPresent()) {
byte[] bytes = sketch.get().toByteArray();
out.writeInt(bytes.length);
out.write(bytes);
}
}

@SuppressWarnings("ResultOfMethodCallIgnored")
private void readObject(ObjectInputStream in) throws ClassNotFoundException, IOException {
in.defaultReadObject();
if (!this.percentiles.isEmpty()) {
int len = in.readInt();
byte[] bytes = new byte[len];
in.read(bytes);
this.sketch = Optional.of(UpdateDoublesSketch.heapify(Memory.wrap(bytes)));
} else {
this.sketch = Optional.empty();
}
return DistributionResult.create(sum(), count(), min(), max());
}
}

This file was deleted.

Loading

0 comments on commit 493bf20

Please sign in to comment.