From 99ea3dfc06c5af961d07548c752caa8e7c1aadde Mon Sep 17 00:00:00 2001 From: Arnaud Fournier Date: Sat, 12 Aug 2017 11:34:42 +0200 Subject: [PATCH] Refine combineFns constructors --- .../cardinality/ApproximateDistinct.java | 6 ++++++ .../sketching/frequency/KMostFrequent.java | 13 ++++++++++--- .../sketching/frequency/SketchFrequencies.java | 5 ++++- .../sketching/quantiles/TDigestQuantiles.java | 15 +++++++++++---- 4 files changed, 31 insertions(+), 8 deletions(-) diff --git a/sdks/java/extensions/sketching/src/main/java/org/apache/beam/sdk/extensions/sketching/cardinality/ApproximateDistinct.java b/sdks/java/extensions/sketching/src/main/java/org/apache/beam/sdk/extensions/sketching/cardinality/ApproximateDistinct.java index c30696000757f..45bfcbc8765d3 100644 --- a/sdks/java/extensions/sketching/src/main/java/org/apache/beam/sdk/extensions/sketching/cardinality/ApproximateDistinct.java +++ b/sdks/java/extensions/sketching/src/main/java/org/apache/beam/sdk/extensions/sketching/cardinality/ApproximateDistinct.java @@ -190,6 +190,9 @@ private ApproximateDistinctFn(int p, int sp) { * @param the type of the input {@code Pcollection}'s elements being combined. */ public static ApproximateDistinctFn create(int p) { + if (p < 4) { + throw new IllegalArgumentException("p must be greater than 4"); + } return new ApproximateDistinctFn<>(p, 0); } @@ -217,6 +220,9 @@ public static ApproximateDistinctFn create(int p) { * @param sp the precision of HyperLogLog+' sparse representation */ public ApproximateDistinctFn withSparseRepresentation(int sp) { + if (sp < p || sp > 32) { + throw new IllegalArgumentException("sp should be greater than p and lower than 32"); + } return new ApproximateDistinctFn<>(this.p, sp); } diff --git a/sdks/java/extensions/sketching/src/main/java/org/apache/beam/sdk/extensions/sketching/frequency/KMostFrequent.java b/sdks/java/extensions/sketching/src/main/java/org/apache/beam/sdk/extensions/sketching/frequency/KMostFrequent.java index d9e641e1c4e69..beeb5cc66a928 100644 --- a/sdks/java/extensions/sketching/src/main/java/org/apache/beam/sdk/extensions/sketching/frequency/KMostFrequent.java +++ b/sdks/java/extensions/sketching/src/main/java/org/apache/beam/sdk/extensions/sketching/frequency/KMostFrequent.java @@ -73,7 +73,7 @@ private KMostFrequent() { * @param the type of the elements in the input {@code PCollection} */ public static Combine.Globally> globally(int capacity) { - return Combine.>globally(new KMostFrequentFn(capacity)); + return Combine.>globally(KMostFrequentFn.create(capacity)); } /** @@ -104,7 +104,7 @@ public static Combine.PerKey> perKey(int capacity) if (capacity < 1) { throw new IllegalArgumentException("The capacity must be strictly positive"); } - return Combine.>perKey(new KMostFrequentFn(capacity)); + return Combine.>perKey(KMostFrequentFn.create(capacity)); } /** @@ -121,7 +121,7 @@ public static Combine.PerKey> perKey(int capacity) * * @param the type of the elements being combined */ - static class KMostFrequentFn + public static class KMostFrequentFn extends Combine.CombineFn, StreamSummary> { private int capacity; @@ -130,6 +130,13 @@ private KMostFrequentFn(int capacity) { this.capacity = capacity; } + public static KMostFrequentFn create(int capacity) { + if (capacity <= 0) { + throw new IllegalArgumentException("Capacity must be greater than 0."); + } + return new KMostFrequentFn<>(capacity); + } + @Override public StreamSummary createAccumulator() { return new StreamSummary<>(this.capacity); diff --git a/sdks/java/extensions/sketching/src/main/java/org/apache/beam/sdk/extensions/sketching/frequency/SketchFrequencies.java b/sdks/java/extensions/sketching/src/main/java/org/apache/beam/sdk/extensions/sketching/frequency/SketchFrequencies.java index bed5f873bf8d9..91268ad46170c 100644 --- a/sdks/java/extensions/sketching/src/main/java/org/apache/beam/sdk/extensions/sketching/frequency/SketchFrequencies.java +++ b/sdks/java/extensions/sketching/src/main/java/org/apache/beam/sdk/extensions/sketching/frequency/SketchFrequencies.java @@ -142,7 +142,7 @@ public static Combine.PerKey perKey(int seed) { * more than 0.02% in 99% of the cases. * */ - static class CountMinSketchFn + public static class CountMinSketchFn extends Combine.CombineFn { private final int depth; @@ -225,6 +225,9 @@ public static CountMinSketchFn create(int seed) { * @param depth Number of lines, i.e. number of hash functions */ public CountMinSketchFn withDimensions(int width, int depth) { + if (width <= 0 || depth <= 0) { + throw new IllegalArgumentException("depth and width must be positive."); + } return new CountMinSketchFn(width, depth, this.seed); } diff --git a/sdks/java/extensions/sketching/src/main/java/org/apache/beam/sdk/extensions/sketching/quantiles/TDigestQuantiles.java b/sdks/java/extensions/sketching/src/main/java/org/apache/beam/sdk/extensions/sketching/quantiles/TDigestQuantiles.java index b7e7d0f16e818..6dcca660fe7a0 100644 --- a/sdks/java/extensions/sketching/src/main/java/org/apache/beam/sdk/extensions/sketching/quantiles/TDigestQuantiles.java +++ b/sdks/java/extensions/sketching/src/main/java/org/apache/beam/sdk/extensions/sketching/quantiles/TDigestQuantiles.java @@ -69,7 +69,7 @@ private TDigestQuantiles() { * {@code 3 / compression} on quantiles. */ public static Combine.Globally globally(int compression) { - return Combine.globally(new QuantileFn(compression)); + return Combine.globally(TDigestQuantilesFn.create(compression)); } /** @@ -92,7 +92,7 @@ public static Combine.Globally globally(int compres * @param the type of the keys */ public static Combine.PerKey perKey(int compression) { - return Combine.perKey(new QuantileFn(compression)); + return Combine.perKey(TDigestQuantilesFn.create(compression)); } /** @@ -100,15 +100,22 @@ public static Combine.PerKey perKey(int comp * of an {@code Iterable} of Doubles, useful as an argument to {@link Combine#globally} or * {@link Combine#perKey}. */ - static class QuantileFn + public static class TDigestQuantilesFn extends Combine.CombineFn { private final int compression; - public QuantileFn(int compression) { + private TDigestQuantilesFn(int compression) { this.compression = compression; } + public static TDigestQuantilesFn create(int compression) { + if (compression > 0) { + return new TDigestQuantilesFn(compression); + } + throw new IllegalArgumentException("Compression factor should be greater than 0."); + } + @Override public SerializableTDigest createAccumulator() { return new SerializableTDigest(compression); }