From 50b71d5cacba740001ee9581a3338cf87270b9d2 Mon Sep 17 00:00:00 2001 From: Dilnaz Amanzholova Date: Wed, 26 Jun 2024 21:17:41 +0200 Subject: [PATCH] Add Histogram combiner (#31379) Co-authored-by: Dilnaz Amanzholova --- build.gradle.kts | 1 + sdks/java/extensions/combiners/build.gradle | 33 + .../sdk/extensions/combiners/Histogram.java | 569 ++++++++++++++++++ .../extensions/combiners/package-info.java | 19 + .../extensions/combiners/HistogramTest.java | 403 +++++++++++++ settings.gradle.kts | 2 + 6 files changed, 1027 insertions(+) create mode 100644 sdks/java/extensions/combiners/build.gradle create mode 100644 sdks/java/extensions/combiners/src/main/java/org/apache/beam/sdk/extensions/combiners/Histogram.java create mode 100644 sdks/java/extensions/combiners/src/main/java/org/apache/beam/sdk/extensions/combiners/package-info.java create mode 100644 sdks/java/extensions/combiners/src/test/java/org/apache/beam/sdk/extensions/combiners/HistogramTest.java diff --git a/build.gradle.kts b/build.gradle.kts index d1efb45b74ee3..e6295384b7533 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -281,6 +281,7 @@ tasks.register("javaPreCommit") { dependsOn(":sdks:java:expansion-service:app:build") dependsOn(":sdks:java:extensions:arrow:build") dependsOn(":sdks:java:extensions:avro:build") + dependsOn(":sdks:java:extensions:combiners:build") dependsOn(":sdks:java:extensions:euphoria:build") dependsOn(":sdks:java:extensions:google-cloud-platform-core:build") dependsOn(":sdks:java:extensions:jackson:build") diff --git a/sdks/java/extensions/combiners/build.gradle b/sdks/java/extensions/combiners/build.gradle new file mode 100644 index 0000000000000..a3ad39624cd36 --- /dev/null +++ b/sdks/java/extensions/combiners/build.gradle @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +plugins { id 'org.apache.beam.module' } +applyJavaNature( + automaticModuleName: 'org.apache.beam.sdk.extensions.combiners', + exportJavadoc: false, +) + +description = "Apache Beam :: SDKs :: Java :: Extensions :: Combiners" + +dependencies { + implementation library.java.vendored_guava_32_1_2_jre + implementation library.java.commons_lang3 + implementation project(path: ":sdks:java:core", configuration: "shadow") + testImplementation library.java.junit + testRuntimeOnly project(path: ":runners:direct-java", configuration: "shadow") +} \ No newline at end of file diff --git a/sdks/java/extensions/combiners/src/main/java/org/apache/beam/sdk/extensions/combiners/Histogram.java b/sdks/java/extensions/combiners/src/main/java/org/apache/beam/sdk/extensions/combiners/Histogram.java new file mode 100644 index 0000000000000..e4468a535fba1 --- /dev/null +++ b/sdks/java/extensions/combiners/src/main/java/org/apache/beam/sdk/extensions/combiners/Histogram.java @@ -0,0 +1,569 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.combiners; + +import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument; +import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull; + +import com.google.auto.value.AutoValue; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; +import java.util.Objects; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CoderException; +import org.apache.beam.sdk.coders.CoderRegistry; +import org.apache.beam.sdk.coders.CustomCoder; +import org.apache.beam.sdk.coders.ListCoder; +import org.apache.beam.sdk.coders.VarIntCoder; +import org.apache.beam.sdk.coders.VarLongCoder; +import org.apache.beam.sdk.transforms.Combine; +import org.apache.beam.sdk.transforms.display.DisplayData; +import org.apache.beam.sdk.util.VarInt; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.primitives.Longs; +import org.apache.commons.lang3.ArrayUtils; +import org.checkerframework.checker.nullness.qual.Nullable; + +/** + * A histogram transform with a combiner that efficiently constructs linear, exponential or explicit + * histograms from large datasets of input data. Bucket bounds can be specified using the {@link + * BucketBounds} class. + */ +public class Histogram { + + private Histogram() { + // do not instantiate + } + + /** + * Returns a {@code PTransform} that takes a {@code PCollection} and returns a {@code + * PCollection>} with a single element per window. The values of this list represent + * the number of elements within each bucket of a histogram, as defined by {@link BucketBounds}. + * The first and the last elements of the list are numbers of elements in underflow and overflow + * buckets. + * + *

Example of use: + * + *

{@code
+   * PCollection pc = ...;
+   * PCollection> bucketCounts =
+   *     pc.apply(Histogram.globally(BucketBounds.linear(1.0, 2.0, 100)));
+   *
+   * }
+ * + * @param the type of the elements in the input {@code PCollection} + * @param bucketBounds the instance of the {@link BucketBounds} class with desired parameters of + * the histogram. + */ + public static Combine.Globally> globally( + BucketBounds bucketBounds) { + return Combine.globally(HistogramCombineFn.create(bucketBounds)); + } + + /** + * Returns a {@code PTransform} that takes a {@code PCollection>} and returns a {@code + * PCollection>>} that contains an output element mapping each distinct key in + * the input {@code PCollection} to a {@code List}. The values of this list represent the number + * of elements within each bucket of a histogram, as defined by {@link BucketBounds}. The first + * and the last elements of the list are numbers of elements in underflow and overflow buckets. + * + *

Example of use: + * + *

{@code
+   * PCollection> pc = ...;
+   * PCollection>> bucketCounts =
+   *     pc.apply(Histogram.perKey(BucketBounds.linear(1.0, 2.0, 100)));
+   *
+   * }
+ * + * @param the type of the keys in the input and output {@code PCollection}s + * @param the type of the values in the input {@code PCollection} + * @param bucketBounds the instance of the {@link BucketBounds} class with desired parameters of + * the histogram. + */ + public static Combine.PerKey> perKey( + BucketBounds bucketBounds) { + return Combine.perKey(HistogramCombineFn.create(bucketBounds)); + } + + /** + * Defines the bounds for histogram buckets. + * + *

Use the provided static factory methods to create new instances of {@link BucketBounds}. + */ + @AutoValue + public abstract static class BucketBounds { + + // Package-private because users should use static factory methods to instantiate new instances. + BucketBounds() {} + + public abstract List getBounds(); + + public abstract BoundsInclusivity getBoundsInclusivity(); + + /** + * Static factory method for defining bounds of exponential histograms and calculating bounds + * based on the parameters. + * + *

For BoundsInclusivity.LOWER_BOUND_INCLUSIVE_UPPER_BOUND_EXCLUSIVE, the list that the + * HistogramCombineFn combiner returns contains the number of elements in the following buckets: + * + *

+     * 0-th: (-inf, scale) - underflow bucket
+     * 1-st: [scale, scale * growthFactor)
+     * 2-nd: [scale * growthFactor, scale * growthFactor^2)
+     * ...
+     * i-th: [scale * growthFactor^(i-1), scale * growthFactor^i)
+     * ...
+     * numBoundedBuckets: [scale * growthFactor^(numBoundedBuckets-1), scale *
+     * growthFactor^numBoundedBuckets)
+     * numBoundedBuckets + 1: [scale * growthFactor^numBoundedBuckets), +inf) - overflow bucket.
+     * 
+ * + *

For BoundsInclusivity.LOWER_BOUND_EXCLUSIVE_UPPER_BOUND_INCLUSIVE, the list that the + * HistogramCombineFn combiner returns contains the number of elements in the following buckets: + * + *

+     * 0-th: (-inf, scale] - underflow bucket
+     * 1-st: (scale, scale * growthFactor]
+     * 2-nd: (scale * growthFactor, scale * growthFactor^2]
+     * ...
+     * i-th: (scale * growthFactor^(i-1), scale * growthFactor^i]
+     * ...
+     * numBoundedBuckets: (scale * growthFactor^(numBoundedBuckets-1), scale *
+     * growthFactor^numBoundedBuckets]
+     * numBoundedBuckets + 1: (scale * growthFactor^numBoundedBuckets), +inf) - overflow bucket.
+     * 
+ * + * @param scale the value of the lower bound for the first bounded bucket. + * @param growthFactor value by which the bucket bounds are exponentially increased. + * @param numBoundedBuckets integer determining the total number of bounded buckets within the + * histogram. + * @param boundsInclusivity enum value which defines if lower or upper bounds are + * inclusive/exclusive. + */ + public static BucketBounds exponential( + double scale, + double growthFactor, + int numBoundedBuckets, + BoundsInclusivity boundsInclusivity) { + checkArgument(scale > 0.0, "scale should be positive."); + checkArgument(growthFactor > 1.0, "growth factor should be greater than 1.0."); + checkArgument( + numBoundedBuckets > 0, "number of bounded buckets should be greater than zero."); + checkArgument( + numBoundedBuckets <= Integer.MAX_VALUE - 2, + "number of bounded buckets should be less than max value of integer."); + + ImmutableList.Builder boundsCalculated = new ImmutableList.Builder<>(); + // The number of bounds is equal to the numBoundedBuckets + 1. + for (int i = 0; i <= numBoundedBuckets; i++) { + double bound = scale * Math.pow(growthFactor, i); + if (Double.isInfinite(bound)) { + throw new IllegalArgumentException("the bound has overflown double type."); + } + boundsCalculated.add(bound); + } + + return new AutoValue_Histogram_BucketBounds(boundsCalculated.build(), boundsInclusivity); + } + + /** + * Like {@link #exponential(double, double, int, BoundsInclusivity)}, but sets + * BoundsInclusivity.LOWER_BOUND_INCLUSIVE_UPPER_BOUND_EXCLUSIVE value for the boundsInclusivity + * parameter. + */ + public static BucketBounds exponential( + double scale, double growthFactor, int numBoundedBuckets) { + return exponential( + scale, + growthFactor, + numBoundedBuckets, + BoundsInclusivity.LOWER_BOUND_INCLUSIVE_UPPER_BOUND_EXCLUSIVE); + } + + /** + * Static factory method for defining bounds of linear histogram and calculating bounds based on + * the parameters. + * + * @param offset value of the lower bound for the first bounded bucket. + * @param width bucket width. + * @param numBoundedBuckets integer determining the total number of bounded buckets within the + * histogram. + * @param boundsInclusivity enum value which defines if lower or upper bounds are + * inclusive/exclusive. + */ + public static BucketBounds linear( + double offset, double width, int numBoundedBuckets, BoundsInclusivity boundsInclusivity) { + checkArgument(width > 0.0, "width of buckets should be positive."); + checkArgument(numBoundedBuckets > 0, "number of bounded buckets should be more than zero."); + checkArgument( + numBoundedBuckets <= Integer.MAX_VALUE - 2, + "number of bounded buckets should be less than max value of integer."); + + ImmutableList.Builder boundsCalculated = new ImmutableList.Builder<>(); + // The number of bounds is equal to the numBoundedBuckets + 1. + for (int i = 0; i <= numBoundedBuckets; i++) { + double bound = offset + i * width; + if (Double.isInfinite(bound)) { + throw new IllegalArgumentException("the bound has overflown double type."); + } + boundsCalculated.add(bound); + } + + return new AutoValue_Histogram_BucketBounds(boundsCalculated.build(), boundsInclusivity); + } + + /** + * Like {@link #linear(double, double, int, BoundsInclusivity)}, but sets + * BoundsInclusivity.LOWER_BOUND_INCLUSIVE_UPPER_BOUND_EXCLUSIVE value for the boundsInclusivity + * parameter. + */ + public static BucketBounds linear(double offset, double width, int numBoundedBuckets) { + return linear( + offset, + width, + numBoundedBuckets, + BoundsInclusivity.LOWER_BOUND_INCLUSIVE_UPPER_BOUND_EXCLUSIVE); + } + + /** + * Static factory method for defining bounds of explicit histogram. + * + * @param bounds array of explicit bounds of the buckets. + * @param boundsInclusivity enum value which defines if lower or upper bounds are + * inclusive/exclusive. + */ + public static BucketBounds explicit(List bounds, BoundsInclusivity boundsInclusivity) { + checkNotNull(bounds, "the bounds array should not be null."); + checkArgument(bounds.size() > 0, "the bounds array should not be empty."); + + for (int i = 1; i < bounds.size(); i++) { + if (bounds.get(i - 1) >= bounds.get(i)) { + throw new IllegalArgumentException( + "bounds should be in ascending order without duplicates."); + } + } + + return new AutoValue_Histogram_BucketBounds(ImmutableList.copyOf(bounds), boundsInclusivity); + } + + /** + * Like {@link #explicit(List, BoundsInclusivity)}, but sets + * BoundsInclusivity.LOWER_BOUND_INCLUSIVE_UPPER_BOUND_EXCLUSIVE value for the boundsInclusivity + * parameter. + */ + public static BucketBounds explicit(List bounds) { + return explicit(bounds, BoundsInclusivity.LOWER_BOUND_INCLUSIVE_UPPER_BOUND_EXCLUSIVE); + } + } + + /** + * Combiner for calculating histograms. + * + *

The HistogramCombineFn class can be used with GroupBy transform to aggregate the input + * values in the KV pair. + * + *

Example of use: + * + *

{@code
+   * PCollection pc = ...;
+   * PCollection rows =
+   *    pc.apply(Group.byFieldNames("dimension1", "dimension2").aggregateField("value",
+   *      HistogramCombineFn.create(BucketBounds.linear(1.0, 2.0, 160)),
+   *      Field.of("bucketCounts", FieldType.array(FieldType.INT64))));
+   *
+   * }
+ */ + public static final class HistogramCombineFn + extends Combine.CombineFn> { + + private final double[] bounds; + private final BoundsInclusivity boundsInclusivity; + + private HistogramCombineFn(double[] bounds, BoundsInclusivity boundsInclusivity) { + this.bounds = bounds; + this.boundsInclusivity = boundsInclusivity; + } + + /** + * Returns a histogram combiner with the given {@link BucketBounds}. + * + * @param bucketBounds the instance of the {@link BucketBounds} class with desired parameters of + * the histogram. + */ + public static HistogramCombineFn create(BucketBounds bucketBounds) { + return new HistogramCombineFn<>( + ArrayUtils.toPrimitive(bucketBounds.getBounds().toArray(new Double[0])), + bucketBounds.getBoundsInclusivity()); + } + + @Override + public HistogramAccumulator createAccumulator() { + return new HistogramAccumulator(bounds.length + 1); + } + + @Override + public HistogramAccumulator addInput(HistogramAccumulator accumulator, T input) + throws IllegalArgumentException { + if (input == null) { + throw new NullPointerException("input should not be null."); + } + + Double inputDoubleValue = ((Number) input).doubleValue(); + if (inputDoubleValue.isNaN() || inputDoubleValue.isInfinite()) { + throw new IllegalArgumentException("input should not be NaN or infinite."); + } + int index = Arrays.binarySearch(bounds, inputDoubleValue); + if (index < 0) { + accumulator.counts[-index - 1]++; + } else { + // This means the value is on bound, can be handled based on the bound inclusivity. + if (boundsInclusivity == BoundsInclusivity.LOWER_BOUND_INCLUSIVE_UPPER_BOUND_EXCLUSIVE) { + accumulator.counts[index + 1]++; + } else { + accumulator.counts[index]++; + } + } + return accumulator; + } + + @Override + public HistogramAccumulator mergeAccumulators(Iterable accumulators) { + Iterator iter = accumulators.iterator(); + if (!iter.hasNext()) { + return createAccumulator(); + } + + HistogramAccumulator merged = iter.next(); + int countsLength = merged.counts.length; + while (iter.hasNext()) { + HistogramAccumulator histogramAccumulator = iter.next(); + checkArgument( + countsLength == histogramAccumulator.counts.length, + "number of buckets in the merging accumulators should be the same."); + for (int i = 0; i < countsLength; ++i) { + merged.counts[i] += histogramAccumulator.counts[i]; + } + } + return merged; + } + + @Override + public List extractOutput(HistogramAccumulator accumulator) throws NullPointerException { + checkNotNull(accumulator, "can not output from null histogram."); + return Longs.asList(accumulator.counts); + } + + @Override + public Coder getAccumulatorCoder( + CoderRegistry registry, Coder inputCoder) { + return new HistogramAccumulatorCoder(); + } + + @Override + public Coder> getDefaultOutputCoder(CoderRegistry registry, Coder inputCoder) { + return ListCoder.of(VarLongCoder.of()); + } + + @Override + public void populateDisplayData(DisplayData.Builder builder) { + super.populateDisplayData(builder); + builder.add(DisplayData.item("numBuckets", bounds.length + 1).withLabel("Number of buckets")); + } + } + + /** Accumulator of the Histogram combiner. */ + static final class HistogramAccumulator { + + private long[] counts; + + public HistogramAccumulator(int numBuckets) { + checkArgument( + numBuckets > 2, + "number of buckets should be greater than two - underflow bucket and overflow bucket."); + this.counts = new long[numBuckets]; + } + + @Override + public boolean equals(@Nullable Object object) { + if (object instanceof HistogramAccumulator) { + HistogramAccumulator other = (HistogramAccumulator) object; + return Arrays.equals(counts, other.counts); + } + return false; + } + + @Override + public int hashCode() { + return Objects.hash(Arrays.hashCode(counts)); + } + } + + /** Coder for {@link HistogramAccumulator}. */ + static final class HistogramAccumulatorCoder extends CustomCoder { + + private static final VarLongCoder LONG_CODER = VarLongCoder.of(); + private static final VarIntCoder INT_CODER = VarIntCoder.of(); + + /** + * Index to indicate method used where only non-empty buckets are encoded with their indices or + * where all buckets are encoded sequentially. + */ + private enum CoderType { + NON_EMPTY_BUCKETS_CODER, + ALL_BUCKETS_CODER + } + + /** Encoded size of 0 in bytes. */ + private static final long ENCODED_ZERO_SIZE = VarInt.getLength(0); + + @Override + public void encode(HistogramAccumulator value, OutputStream outStream) throws IOException { + checkNotNull(value, "can not encode a null histogram."); + + int numEmptyBucketsAtTheEnd = 0; + for (int i = value.counts.length - 1; i >= 0; i--) { + if (value.counts[i] != 0) { + break; + } + numEmptyBucketsAtTheEnd++; + } + + if (shouldEncodeNonEmptyBucketsOnly(value, numEmptyBucketsAtTheEnd)) { + // As we have two different encoding methods, the first byte indicates the coder used. + outStream.write(CoderType.NON_EMPTY_BUCKETS_CODER.ordinal()); + encodeNonEmptyBuckets(value, numEmptyBucketsAtTheEnd, outStream); + } else { + outStream.write(CoderType.ALL_BUCKETS_CODER.ordinal()); + encodeAllBuckets(value, numEmptyBucketsAtTheEnd, outStream); + } + } + + /** + * Estimates the size of the accumulator and returns whether encoding only non-empty buckets + * with indices produces a smaller accumulator. + * + *

To check the difference in sizes between these two encoding methods, we do not add the + * size of non-empty encoded bucket count because it is added to both, and we just cancel it for + * both sides. So, comparison is done between sizes of encoded indices and sizes of intermediate + * zero counts (not trailing zeros). + */ + private boolean shouldEncodeNonEmptyBucketsOnly( + HistogramAccumulator value, int numEmptyBucketsAtTheEnd) { + int numBuckets = value.counts.length; + long nonEmptyBucketIndicesEncodedSize = 0; + long numZeroBuckets = 0; + + for (int j = 0; j < numBuckets - numEmptyBucketsAtTheEnd; j++) { + if (value.counts[j] != 0) { + nonEmptyBucketIndicesEncodedSize += VarInt.getLength(j); + } else { + numZeroBuckets++; + } + } + + return nonEmptyBucketIndicesEncodedSize < numZeroBuckets * ENCODED_ZERO_SIZE; + } + + private void encodeNonEmptyBuckets( + HistogramAccumulator value, int numEmptyBucketsAtTheEnd, OutputStream outStream) + throws IOException { + int numBuckets = value.counts.length; + INT_CODER.encode(numBuckets, outStream); + + List indices = new ArrayList<>(); + List counts = new ArrayList<>(); + + for (int i = 0; i < numBuckets - numEmptyBucketsAtTheEnd; i++) { + if (value.counts[i] != 0) { + indices.add(i); + counts.add(value.counts[i]); + } + } + + INT_CODER.encode(indices.size(), outStream); + for (int i = 0; i < indices.size(); i++) { + INT_CODER.encode(indices.get(i), outStream); + LONG_CODER.encode(counts.get(i), outStream); + } + } + + private void encodeAllBuckets( + HistogramAccumulator value, int numEmptyBucketsAtTheEnd, OutputStream outStream) + throws IOException { + int numBuckets = value.counts.length; + INT_CODER.encode(numBuckets, outStream); + + INT_CODER.encode(numBuckets - numEmptyBucketsAtTheEnd, outStream); + for (int i = 0; i < numBuckets - numEmptyBucketsAtTheEnd; i++) { + LONG_CODER.encode(value.counts[i], outStream); + } + } + + @Override + public HistogramAccumulator decode(InputStream inStream) throws IOException { + int coder = inStream.read(); + int numBuckets = INT_CODER.decode(inStream); + + if (coder == CoderType.NON_EMPTY_BUCKETS_CODER.ordinal()) { + return decodeNonEmptyBuckets(numBuckets, inStream); + } else if (coder == CoderType.ALL_BUCKETS_CODER.ordinal()) { + return decodeAllBuckets(numBuckets, inStream); + } else { + throw new CoderException("wrong decoding sequence found."); + } + } + + private HistogramAccumulator decodeNonEmptyBuckets(int numBuckets, InputStream inStream) + throws IOException { + HistogramAccumulator histogramAccumulator = new HistogramAccumulator(numBuckets); + int nonEmptyBucketCounts = INT_CODER.decode(inStream); + + for (int i = 0; i < nonEmptyBucketCounts; i++) { + histogramAccumulator.counts[INT_CODER.decode(inStream)] = LONG_CODER.decode(inStream); + } + + return histogramAccumulator; + } + + private HistogramAccumulator decodeAllBuckets(int numBuckets, InputStream inStream) + throws IOException { + HistogramAccumulator histogramAccumulator = new HistogramAccumulator(numBuckets); + int numNonEmptyBucketsAtTheBeginning = INT_CODER.decode(inStream); + for (int i = 0; i < numNonEmptyBucketsAtTheBeginning; i++) { + histogramAccumulator.counts[i] = LONG_CODER.decode(inStream); + } + return histogramAccumulator; + } + } + + /** + * Enum for setting whether the lower (and upper) bounds of bucket intervals are inclusive or + * exclusive. + */ + public enum BoundsInclusivity { + LOWER_BOUND_EXCLUSIVE_UPPER_BOUND_INCLUSIVE, + LOWER_BOUND_INCLUSIVE_UPPER_BOUND_EXCLUSIVE + } +} diff --git a/sdks/java/extensions/combiners/src/main/java/org/apache/beam/sdk/extensions/combiners/package-info.java b/sdks/java/extensions/combiners/src/main/java/org/apache/beam/sdk/extensions/combiners/package-info.java new file mode 100644 index 0000000000000..1603fe48da13d --- /dev/null +++ b/sdks/java/extensions/combiners/src/main/java/org/apache/beam/sdk/extensions/combiners/package-info.java @@ -0,0 +1,19 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** Provides a transform and combiner for constructing histograms. */ +package org.apache.beam.sdk.extensions.combiners; diff --git a/sdks/java/extensions/combiners/src/test/java/org/apache/beam/sdk/extensions/combiners/HistogramTest.java b/sdks/java/extensions/combiners/src/test/java/org/apache/beam/sdk/extensions/combiners/HistogramTest.java new file mode 100644 index 0000000000000..323622b88b6d9 --- /dev/null +++ b/sdks/java/extensions/combiners/src/test/java/org/apache/beam/sdk/extensions/combiners/HistogramTest.java @@ -0,0 +1,403 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.combiners; + +import static org.apache.beam.sdk.testing.CombineFnTester.testCombineFn; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertThrows; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.coders.DoubleCoder; +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.extensions.combiners.Histogram.BoundsInclusivity; +import org.apache.beam.sdk.extensions.combiners.Histogram.BucketBounds; +import org.apache.beam.sdk.extensions.combiners.Histogram.HistogramAccumulator; +import org.apache.beam.sdk.extensions.combiners.Histogram.HistogramAccumulatorCoder; +import org.apache.beam.sdk.extensions.combiners.Histogram.HistogramCombineFn; +import org.apache.beam.sdk.testing.NeedsRunner; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.primitives.Doubles; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.primitives.Longs; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; + +/** Tests for {@link Histogram}. */ +public class HistogramTest { + + @RunWith(JUnit4.class) + @SuppressWarnings("unchecked") + public static class HistogramTransformTest { + + private static final BucketBounds BUCKET_BOUNDS = BucketBounds.exponential(1.0, 2.0, 4); + private static final int INPUT_SIZE = 20; + + static final List> TABLE = + Arrays.asList( + KV.of("a", 1.0), + KV.of("a", 2.0), + KV.of("a", 3.0), + KV.of("a", 5.0), + KV.of("a", 4.5), + KV.of("a", 10.0), + KV.of("b", 1.0), + KV.of("b", 5.0), + KV.of("b", 5.0), + KV.of("b", 20.0)); + + @Rule public TestPipeline testPipeline = TestPipeline.create(); + + @Test + @Category(NeedsRunner.class) + public void testHistogramGlobally() { + PCollection input = generateInputPCollection(testPipeline, INPUT_SIZE); + PCollection> counts = input.apply(Histogram.globally(BUCKET_BOUNDS)); + + PAssert.that(counts).containsInAnyOrder(Longs.asList(1, 1, 2, 4, 8, 4)); + + testPipeline.run(); + } + + @Test + @Category(NeedsRunner.class) + public void testHistogramPerKey() { + PCollection> input = + testPipeline.apply( + Create.of(TABLE).withCoder(KvCoder.of(StringUtf8Coder.of(), DoubleCoder.of()))); + PCollection>> counts = input.apply(Histogram.perKey(BUCKET_BOUNDS)); + + PAssert.that(counts) + .containsInAnyOrder( + KV.of("a", Longs.asList(0, 1, 2, 2, 1, 0)), + KV.of("b", Longs.asList(0, 1, 0, 2, 0, 1))); + + testPipeline.run(); + } + + private PCollection generateInputPCollection(Pipeline p, int size) { + return p.apply( + "CreateInputValuesUpTo(" + size + ")", + Create.of(IntStream.range(0, size).boxed().collect(Collectors.toList()))); + } + } + + /** Tests for {@link BucketBounds}. */ + @RunWith(JUnit4.class) + public static class BucketBoundsTest { + + @Test + public void testExponentialBucketBoundsWithoutSpecifiedInclusivity() { + BucketBounds bucketBounds = BucketBounds.exponential(1.0, 2.0, 4); + + assertEquals(Arrays.asList(1.0, 2.0, 4.0, 8.0, 16.0), bucketBounds.getBounds()); + assertEquals( + BoundsInclusivity.LOWER_BOUND_INCLUSIVE_UPPER_BOUND_EXCLUSIVE, + bucketBounds.getBoundsInclusivity()); + } + + @Test + public void testExponentialBucketBoundsThrowsIllegalArgumentExceptionForScale() { + assertThrows(IllegalArgumentException.class, () -> BucketBounds.exponential(-0.4, 2.0, 4)); + } + + @Test + public void testExponentialBucketBoundsThrowsIllegalArgumentExceptionForGrowthFactor() { + assertThrows(IllegalArgumentException.class, () -> BucketBounds.exponential(1.0, 0.5, 4)); + } + + @Test + public void + testExponentialBucketBoundsThrowsIllegalArgumentExceptionForZeroNumberBoundedOfBuckets() { + assertThrows(IllegalArgumentException.class, () -> BucketBounds.exponential(1.0, 2.0, 0)); + } + + @Test + public void + testExponentialBucketBoundsThrowsIllegalArgumentExceptionForMaxIntNumberBoundedOfBuckets() { + assertThrows( + IllegalArgumentException.class, + () -> BucketBounds.exponential(1.0, 2.0, Integer.MAX_VALUE)); + } + + @Test + public void testExponentialBucketBoundsThrowsIllegalArgumentExceptionForOverflownDoubleBound() { + // As Double.MAX_VALUE is 1.797 * 10^308, we used 10^310 to overflow the max double type. + assertThrows(IllegalArgumentException.class, () -> BucketBounds.exponential(1.0, 10.0, 310)); + } + + @Test + public void testLinearBucketBoundsWithoutSpecifiedInclusivity() { + BucketBounds bucketBounds = BucketBounds.linear(1.0, 2.0, 4); + + assertEquals(Arrays.asList(1.0, 3.0, 5.0, 7.0, 9.0), bucketBounds.getBounds()); + assertEquals( + BoundsInclusivity.LOWER_BOUND_INCLUSIVE_UPPER_BOUND_EXCLUSIVE, + bucketBounds.getBoundsInclusivity()); + } + + @Test + public void testExplicitBucketBoundsWithoutSpecifiedInclusivity() { + BucketBounds bucketBounds = BucketBounds.explicit(Arrays.asList(1.0, 2.0, 3.0, 4.0, 5.0)); + + assertEquals(Arrays.asList(1.0, 2.0, 3.0, 4.0, 5.0), bucketBounds.getBounds()); + assertEquals( + BoundsInclusivity.LOWER_BOUND_INCLUSIVE_UPPER_BOUND_EXCLUSIVE, + bucketBounds.getBoundsInclusivity()); + } + } + + /** Tests for {@link HistogramAccumulator}. */ + @RunWith(JUnit4.class) + public static class HistogramAccumulatorTest { + + private static final int NUM_BUCKETS = 2; + + @Test + public void testNumberOfBucketsThrowsException() { + assertThrows(IllegalArgumentException.class, () -> new HistogramAccumulator(NUM_BUCKETS)); + } + } + + /** Tests for {@link HistogramAccumulatorCoder}. */ + @RunWith(JUnit4.class) + public static class HistogramAccumulatorCoderTest { + + private static final HistogramAccumulatorCoder CODER = new HistogramAccumulatorCoder(); + + @Test + public void testEncode_NullAccumulator_throwsNullPointerException() { + ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); + + assertThrows(NullPointerException.class, () -> CODER.encode(null, outputStream)); + } + } + + @RunWith(Parameterized.class) + public static class HistogramAccumulatorCoderParameterizedTest { + + private static final int NUM_BOUNDED_BUCKETS = 160; + private static final HistogramAccumulatorCoder CODER = new HistogramAccumulatorCoder(); + private static final HistogramCombineFn COMBINE_FN = + HistogramCombineFn.create(BucketBounds.linear(1.0, 1.0, NUM_BOUNDED_BUCKETS)); + + private final List inputList; + + public HistogramAccumulatorCoderParameterizedTest(List inputList) { + this.inputList = inputList; + } + + @Parameters + public static Collection data() { + List emptyInput = Doubles.asList(); + List oneInput = Doubles.asList(2.0); + List theFirst10Elements = Doubles.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); + List theMiddle10Elements = Doubles.asList(51, 52, 53, 54, 55, 56, 57, 58, 59, 60); + List theLast10Elements = + Doubles.asList(151, 152, 153, 154, 155, 156, 157, 158, 159, 160); + + List theMiddle80Elements = new ArrayList<>(80); + for (int i = 40; i < 120; i++) { + theMiddle80Elements.add(i + 0.5); + } + + List fullInput = new ArrayList<>(NUM_BOUNDED_BUCKETS + 2); + for (int i = 0; i < NUM_BOUNDED_BUCKETS + 2; i++) { + fullInput.add(i + 0.5); + } + + return Arrays.asList( + new Object[][] { + {emptyInput}, + {oneInput}, + {theFirst10Elements}, + {theMiddle10Elements}, + {theLast10Elements}, + {theMiddle80Elements}, + {fullInput}, + }); + } + + @Test + public void testAccumulatorCorrectlyEncodedAndDecoded() throws IOException { + HistogramAccumulator initialAccumulator = COMBINE_FN.createAccumulator(); + for (Double input : inputList) { + COMBINE_FN.addInput(initialAccumulator, input); + } + + ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); + CODER.encode(initialAccumulator, outputStream); + + ByteArrayInputStream inputStream = new ByteArrayInputStream(outputStream.toByteArray()); + HistogramAccumulator decodedAccumulator = CODER.decode(inputStream); + + assertNotNull(decodedAccumulator); + assertEquals(initialAccumulator, decodedAccumulator); + } + } + + /** Tests for {@link HistogramCombineFn}. */ + @RunWith(JUnit4.class) + public static class HistogramCombineFnTest { + + private static final HistogramCombineFn histogramCombineFn = + HistogramCombineFn.create(BucketBounds.linear(1.0, 2.0, 4)); + private static final HistogramAccumulator histogramAccumulator = + histogramCombineFn.createAccumulator(); + + @Test + public void testAddInput_null_throwsNullPointerException() { + assertThrows( + NullPointerException.class, + () -> histogramCombineFn.addInput(histogramAccumulator, null)); + } + + @Test + public void testAddInput_nan_throwsIllegalArgumentException() { + assertThrows( + IllegalArgumentException.class, + () -> histogramCombineFn.addInput(histogramAccumulator, Double.NaN)); + } + + @Test + public void testAddInput_positiveInfinity_throwsIllegalArgumentException() { + assertThrows( + IllegalArgumentException.class, + () -> histogramCombineFn.addInput(histogramAccumulator, Double.POSITIVE_INFINITY)); + } + + @Test + public void testAddInput_negativeInfinity_throwsIllegalArgumentException() { + assertThrows( + IllegalArgumentException.class, + () -> histogramCombineFn.addInput(histogramAccumulator, Double.NEGATIVE_INFINITY)); + } + } + + @RunWith(Parameterized.class) + public static class HistogramCombineFnParameterizedTest { + + private static final List BOUNDS = Arrays.asList(1.0, 2.0, 3.0, 4.0, 5.0); + + private final List inputList; + private final BoundsInclusivity boundsInclusivity; + private final List expectedOutput; + + public HistogramCombineFnParameterizedTest( + List inputList, BoundsInclusivity boundsInclusivity, List expectedOutput) { + this.inputList = inputList; + this.boundsInclusivity = boundsInclusivity; + this.expectedOutput = expectedOutput; + } + + @Parameters + public static Collection data() { + return Arrays.asList( + new Object[][] { + // Empty input - 0 for all bucket counts. + { + Doubles.asList(), + BoundsInclusivity.LOWER_BOUND_INCLUSIVE_UPPER_BOUND_EXCLUSIVE, + Longs.asList(0L, 0L, 0L, 0L, 0L, 0L) + }, + // One input into the first bounded bucket - 1 in the second index only. + { + Doubles.asList(1.5), + BoundsInclusivity.LOWER_BOUND_INCLUSIVE_UPPER_BOUND_EXCLUSIVE, + Longs.asList(0L, 1L, 0L, 0L, 0L, 0L) + }, + { + Doubles.asList(1.5), + BoundsInclusivity.LOWER_BOUND_EXCLUSIVE_UPPER_BOUND_INCLUSIVE, + Longs.asList(0L, 1L, 0L, 0L, 0L, 0L) + }, + // One input into the second bounded bucket - 1 in the third index only. + { + Doubles.asList(2.5), + BoundsInclusivity.LOWER_BOUND_INCLUSIVE_UPPER_BOUND_EXCLUSIVE, + Longs.asList(0L, 0L, 1L, 0L, 0L, 0L) + }, + { + Doubles.asList(2.5), + BoundsInclusivity.LOWER_BOUND_EXCLUSIVE_UPPER_BOUND_INCLUSIVE, + Longs.asList(0L, 0L, 1L, 0L, 0L, 0L) + }, + // Two inputs into the first bounded bucket - 2 in the second index only. + { + Doubles.asList(1.5, 1.5), + BoundsInclusivity.LOWER_BOUND_INCLUSIVE_UPPER_BOUND_EXCLUSIVE, + Longs.asList(0L, 2L, 0L, 0L, 0L, 0L) + }, + { + Doubles.asList(1.5, 1.5), + BoundsInclusivity.LOWER_BOUND_EXCLUSIVE_UPPER_BOUND_INCLUSIVE, + Longs.asList(0L, 2L, 0L, 0L, 0L, 0L) + }, + // Middle values of the bounds one for each bucket - 1s in all buckets. + { + Doubles.asList(0.5, 1.5, 2.5, 3.5, 4.5, 5.5), + BoundsInclusivity.LOWER_BOUND_INCLUSIVE_UPPER_BOUND_EXCLUSIVE, + Longs.asList(1L, 1L, 1L, 1L, 1L, 1L) + }, + { + Doubles.asList(0.5, 1.5, 2.5, 3.5, 4.5, 5.5), + BoundsInclusivity.LOWER_BOUND_EXCLUSIVE_UPPER_BOUND_INCLUSIVE, + Longs.asList(1L, 1L, 1L, 1L, 1L, 1L) + }, + // Value on the bounds of buckets - all 1s except for first + // and last (based on the bound inclusivity). + { + Doubles.asList(0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0), + BoundsInclusivity.LOWER_BOUND_INCLUSIVE_UPPER_BOUND_EXCLUSIVE, + Longs.asList(1L, 1L, 1L, 1L, 1L, 2L) + }, + { + Doubles.asList(0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0), + BoundsInclusivity.LOWER_BOUND_EXCLUSIVE_UPPER_BOUND_INCLUSIVE, + Longs.asList(2L, 1L, 1L, 1L, 1L, 1L) + } + }); + } + + @Test + public void testBoundsInclusivity() { + testCombineFn( + HistogramCombineFn.create(BucketBounds.explicit(BOUNDS, boundsInclusivity)), + inputList, + expectedOutput); + } + } +} diff --git a/settings.gradle.kts b/settings.gradle.kts index d239499933c05..1f9369c0779fe 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -362,3 +362,5 @@ include("sdks:java:io:iceberg") findProject(":sdks:java:io:iceberg")?.name = "iceberg" include("sdks:java:io:solace") findProject(":sdks:java:io:solace")?.name = "solace" +include("sdks:java:extensions:combiners") +findProject(":sdks:java:extensions:combiners")?.name = "combiners"