Skip to content

Commit

Permalink
Refine combineFns constructors
Browse files Browse the repository at this point in the history
  • Loading branch information
arnaudfnr committed Aug 12, 2017
1 parent 908abba commit 99ea3df
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,9 @@ private ApproximateDistinctFn(int p, int sp) {
* @param <InputT> the type of the input {@code Pcollection}'s elements being combined.
*/
public static <InputT> ApproximateDistinctFn<InputT> create(int p) {
if (p < 4) {
throw new IllegalArgumentException("p must be greater than 4");
}
return new ApproximateDistinctFn<>(p, 0);
}

Expand Down Expand Up @@ -217,6 +220,9 @@ public static <InputT> ApproximateDistinctFn<InputT> create(int p) {
* @param sp the precision of HyperLogLog+' sparse representation
*/
public ApproximateDistinctFn<InputT> withSparseRepresentation(int sp) {
if (sp < p || sp > 32) {
throw new IllegalArgumentException("sp should be greater than p and lower than 32");
}
return new ApproximateDistinctFn<>(this.p, sp);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ private KMostFrequent() {
* @param <T> the type of the elements in the input {@code PCollection}
*/
public static <T> Combine.Globally<T, StreamSummary<T>> globally(int capacity) {
return Combine.<T, StreamSummary<T>>globally(new KMostFrequentFn<T>(capacity));
return Combine.<T, StreamSummary<T>>globally(KMostFrequentFn.<T>create(capacity));
}

/**
Expand Down Expand Up @@ -104,7 +104,7 @@ public static <K, T> Combine.PerKey<K, T, StreamSummary<T>> perKey(int capacity)
if (capacity < 1) {
throw new IllegalArgumentException("The capacity must be strictly positive");
}
return Combine.<K, T, StreamSummary<T>>perKey(new KMostFrequentFn<T>(capacity));
return Combine.<K, T, StreamSummary<T>>perKey(KMostFrequentFn.<T>create(capacity));
}

/**
Expand All @@ -121,7 +121,7 @@ public static <K, T> Combine.PerKey<K, T, StreamSummary<T>> perKey(int capacity)
*
* @param <T> the type of the elements being combined
*/
static class KMostFrequentFn<T>
public static class KMostFrequentFn<T>
extends Combine.CombineFn<T, StreamSummary<T>, StreamSummary<T>> {

private int capacity;
Expand All @@ -130,6 +130,13 @@ private KMostFrequentFn(int capacity) {
this.capacity = capacity;
}

public static <T> KMostFrequentFn<T> create(int capacity) {
if (capacity <= 0) {
throw new IllegalArgumentException("Capacity must be greater than 0.");
}
return new KMostFrequentFn<>(capacity);
}

@Override
public StreamSummary<T> createAccumulator() {
return new StreamSummary<>(this.capacity);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ public static <K> Combine.PerKey<K, String, CountMinSketch> perKey(int seed) {
* more than 0.02% in 99% of the cases.
*
*/
static class CountMinSketchFn
public static class CountMinSketchFn
extends Combine.CombineFn<String, CountMinSketch, CountMinSketch> {

private final int depth;
Expand Down Expand Up @@ -225,6 +225,9 @@ public static CountMinSketchFn create(int seed) {
* @param depth Number of lines, i.e. number of hash functions
*/
public CountMinSketchFn withDimensions(int width, int depth) {
if (width <= 0 || depth <= 0) {
throw new IllegalArgumentException("depth and width must be positive.");
}
return new CountMinSketchFn(width, depth, this.seed);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ private TDigestQuantiles() {
* {@code 3 / compression} on quantiles.
*/
public static Combine.Globally<Double, SerializableTDigest> globally(int compression) {
return Combine.<Double, SerializableTDigest>globally(new QuantileFn(compression));
return Combine.<Double, SerializableTDigest>globally(TDigestQuantilesFn.create(compression));
}

/**
Expand All @@ -92,23 +92,30 @@ public static Combine.Globally<Double, SerializableTDigest> globally(int compres
* @param <K> the type of the keys
*/
public static <K> Combine.PerKey<K, Double, SerializableTDigest> perKey(int compression) {
return Combine.<K, Double, SerializableTDigest>perKey(new QuantileFn(compression));
return Combine.<K, Double, SerializableTDigest>perKey(TDigestQuantilesFn.create(compression));
}

/**
* A {@code Combine.CombineFn} that computes the {@link SerializableTDigest} structure
* of an {@code Iterable} of Doubles, useful as an argument to {@link Combine#globally} or
* {@link Combine#perKey}.
*/
static class QuantileFn
public static class TDigestQuantilesFn
extends Combine.CombineFn<Double, SerializableTDigest, SerializableTDigest> {

private final int compression;

public QuantileFn(int compression) {
private TDigestQuantilesFn(int compression) {
this.compression = compression;
}

public static TDigestQuantilesFn create(int compression) {
if (compression > 0) {
return new TDigestQuantilesFn(compression);
}
throw new IllegalArgumentException("Compression factor should be greater than 0.");
}

@Override public SerializableTDigest createAccumulator() {
return new SerializableTDigest(compression);
}
Expand Down

0 comments on commit 99ea3df

Please sign in to comment.