-
Notifications
You must be signed in to change notification settings - Fork 4.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[BEAM-2728] Extension for sketch-based statistics #3686
Conversation
R: @jbonofre |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I reviewed the first file to start. Please don't flatten your commits so that it is easier to follow the edits. Will review the next when I have a free moment.
* <br>This does not means relative error in the estimation <b>can't</b> be higher. | ||
* <br>This only means that, on average, the relative error will be | ||
* lower than the desired relative error. | ||
* <br>Nevertheless, the more elements arrive in the stream, the lower the variation will be. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
elements arrive in the stream -> elements in the PCollection
* | ||
* <br><b>WARNING : </b> | ||
* <br>This does not means relative error in the estimation <b>can't</b> be higher. | ||
* <br>This only means that, on average, the relative error will be |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
that, on average, the -> that on average the
* </pre> | ||
* | ||
* <br><b>WARNING : </b> | ||
* <br>This does not means relative error in the estimation <b>can't</b> be higher. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
means -> mean that the
* <p>According to the paper, the mean squared error is bounded by the following formula : | ||
* <pre>b(m) / sqrt(m) | ||
* Where m is the number of buckets used (p = log2(m) ) | ||
* and b(m) < 1.106 for m > 16 ( p > 4). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
( p -> (p
* | ||
* <p>According to the paper, the mean squared error is bounded by the following formula : | ||
* <pre>b(m) / sqrt(m) | ||
* Where m is the number of buckets used (p = log2(m) ) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
) ) -> ))
* the number of distinct values associated with each key in a {@code PCollection} of {@code KV}s. | ||
* | ||
* <p>This class uses the HyperLogLog algorithm, and more precisely | ||
* the improved version of google (HyperLogLog+). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
google -> Google
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
* in order to estimate the cardinality. | ||
* | ||
* @param p precision value for the normal representation | ||
* @param <InputT> the type of the Input {@code Pcollection}'s elements being combined. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Input {@code Pcollection}'s -> input {@code Pcollection}'s
* <pre>{@code 1.1 / sqrt(2^p)}</pre> | ||
* For instance, the estimation {@code ApproximateDistinct.globally(12)} | ||
* will have a relative error of about 2%. | ||
* <br> Also keep in mind that {@code p} cannot be lower than 4, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
<br> Also -> <br>Also
/** | ||
* A {@code PTransform} that takes an input {@code PCollection<KV<K, InputT>>} and returns a | ||
* {@code PCollection<KV<K, HyperLogLogPlus>>} that contains an output element mapping each | ||
* distinct key in the input {@code PCollection} to a structure wrapping an HyperLogLog which |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
wrapping an HyperLogLog -> wrapping a HyperLogLogPlus
} | ||
|
||
/** | ||
* Do the same as {@link ApproximateDistinct#globally(int)}, but with a default value for p. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
with a default value for p -> with a default value of 18 for p
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, done
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would suggest flattening all the package namespaces to just org.apache.beam.sdk.extensions.sketching
Having one package namespace will be fine until there are like 30+ different classes.
What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please fix indentation/white space in all the files.
* | ||
* <p>The Space-Saving algorithm summarizes the stream by using a doubly linked-list of buckets | ||
* ordered by the frequency value they represent. Each of these buckets contains a linked-list | ||
* of counters, which estimate the {@code count} for an element as well as the maximum |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
drop ,
* {@code PTransform}s for finding the k most frequent elements in a {@code PCollection}, or | ||
* the k most frequent values associated with each key in a {@code PCollection} of {@code KV}s. | ||
* | ||
* <p>This class uses the Space-Saving algorithm, introduced in this paper : |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use <a>
tags for links
} | ||
|
||
@Override | ||
public StreamSummary<InputT> createAccumulator() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fix indentation
* <p>The {@code capacity} parameter controls the maximum number of elements the sketch | ||
* can contain. Once this capacity is reached, the least frequent element is dropped each | ||
* time an incoming element is not already present in the sketch. | ||
* Each element in the sketch is associated to a counter, that keeps track of the estimate |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
drop ,
estimate -> estimated
|
||
/** | ||
* A {@code PTransform} that takes an input {@code PCollection<KV<K, InputT>>} and returns a | ||
* {@code PCollection<KV<K, StreamSummary>>} that contains an output element mapping each |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
PCollection<KV<K, StreamSummary>> -> PCollection<KV<K, StreamSummary<T>>>
|
||
@Override | ||
public HyperLogLogPlus addInput(HyperLogLogPlus acc, InputT record) { | ||
acc.offer(record); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
HyperLogLogPlus can only handle a few types like long, string, byte[], ... when it performs the hashing and for all remaining types it uses toString. There is no guarantee that toString will be consistent for "equal" values.
We should use the coder to encode the input value to a byte[] and offer that to HyperLogLogPlus. We should also update the class comment stating that we need to ensure that the coder is deterministic and perform this validation by calling verify deterministic.
Please add a test for a custom object type which is not supported by MurmurHash (which is used within HyperLogLogPlus).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes that's a great idea !
I can't have access to the input coder from the combineFn, right ? so I thinkj we should create a method "withCoder" like in the Create transform in order to perform the encodings.
* <p>The {@code capacity} parameter controls the maximum number of elements the sketch | ||
* can contain. Once this capacity is reached, the least frequent element is dropped each | ||
* time an incoming element is not already present in the sketch. | ||
* Each element in the sketch is associated to a counter, that keeps track of the estimate |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
drop ,
estimate -> estimated
* .apply(SketchFrequencies.<Integer, String>globally(10000)); | ||
* } </pre> | ||
* | ||
* @param capacity the maximum number off distinct elements that the Stream Summary can keep |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
off -> of
} | ||
|
||
/** | ||
* A {@code PTransform} that takes an input {@code PCollection<KV<K, InputT>>} and returns a |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
PCollection<KV<K, InputT>> -> PCollection<KV<K, T>>
* .apply(SketchFrequencies.<String>perKey(10000)); | ||
* } </pre> | ||
* | ||
* @param capacity the maximum number off distinct elements that the Stream Summary can keep |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
off -> of
@lukecwik Wow thanks for the quick review Luke ! I am gonna work on this. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I got through 3 out of the 4 main files with plenty of comments. I'm on vacation next week but can continue to review once I get back on August 14th.
import org.slf4j.LoggerFactory; | ||
|
||
/** | ||
* {@code PTransform}s that records an estimation of the frequency of each element in a |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
{@code PTransform}s -> {@code PTransform}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are several PTransforms as we can go for globally() of perKey() (and most of the Combiners starts like this)
* {@code PCollection}, or the occurrences of values associated with each key in a | ||
* {@code PCollection} of {@code KV}s. | ||
* | ||
* <p>This class uses the Count-min Sketch structure. The papers and other useful information |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use <a>
href tags for links.
* <p>Example of use | ||
* <pre> {@code PCollection<String> input = ...; | ||
* PCollection<StreamSummary<String>> ssSketch = input | ||
* .apply(SketchFrequencies.<String>perKey(10000)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't the example be for KMostFrequent?
* <p>Example of use | ||
* <pre> {@code PCollection<String> input = ...; | ||
* PCollection<StreamSummary<String>> ssSketch = input | ||
* .apply(SketchFrequencies.<String>perKey(10000)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This example is using SketchFrequences instead of
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
aléas of copy/paste
* <p>Example of use | ||
* <pre> {@code PCollection<KV<Integer, String>> input = ...; | ||
* PCollection<KV<Integer, StreamSummary<String>>> ssSketch = input | ||
* .apply(SketchFrequencies.<Integer, String>globally(10000)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't the example be for KMostFrequent?
* <br>The implementation comes from Apache Spark : | ||
* https://github.com/apache/spark/tree/master/common/sketch | ||
*/ | ||
class SketchFrequencies { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This class is package private, did you mean to make it that way?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No it should be public, don't know how I could have forgotten this, thanks.
* more than 0.02% in 99% of the cases. | ||
* | ||
*/ | ||
static class CountMinSketchFn |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You could support all types instead of just strings by encoding each value to a byte[] and then converting that byte array to a base64 string or something equivalent.
You just want to make sure that the coder is deterministic.
@lukecwik Many thanks to have taken some of your time for reviewing my work before your holidays. For package flattening, I think you're right. The current organization comes from my work on DQ-Talend where there are more classes inside the packages so now this makes sense to flatten them. |
Changes Unknown when pulling 908abba on ArnaudFnr:sketching into ** on apache:master**. |
Changes Unknown when pulling 908abba on ArnaudFnr:sketching into ** on apache:master**. |
Hey - acknowledging that I am aware of this PR, but it'll be a few days before I get to it. Swamped with other things. |
99ea3df
to
49511d3
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks! I started by reviewing only CountMinSketch
because all my comments translate to other sketches too. I would actually suggest splitting these into different PRs - start with only CountMinSketch
. There's a number of high-level API questions to resolve here, and it seems better to first resolve them once and for all using one example, before spending time changing the others.
Put briefly, I suggest the following reorganization:
- Expose only CombineFn's
- Write detailed javadoc only at class level - do not duplicate any nontrivial documentation
So the class would look something like this:
/**
* Implements the CountMinSketch datastructure useful for ....
* ... (talk about the data structure, give references etc) ...
* Use one of the functions forRelativeErrorAndConfidence() or forDimensions() for to create a CombineFn that can be used in the following ways:
* ...(talk about how to use it with Combine.globally or perKey, or with a state cell)...
* Implementation details: ...(if you want, give more details that are less directly relevant to people who just want to estimate some frequencies)...
*/
class CountMinSketchFn<InputT> extends AbstractCombineFn<InputT, CountMinSketch, CountMinSketch>... {
/** ... Configures the sketch to have the given error bounds: with probability ..., queries will be within ... of the true value ... */
CountMinSketchFn forRelativeErrorAndConfidence(double fractionOfTotal, double confidence) { ... }
/** Configures the sketch to have the explicitly specified depth and width. */
CountMinSketchFn forDimensions(int width, int depth) { ... }
}
*/ | ||
package org.apache.beam.sdk.extensions.sketching.frequency; | ||
|
||
import com.clearspring.analytics.stream.frequency.CountMinSketch; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh neat, I implemented this one - small world :)
} | ||
|
||
/** | ||
* A {@code PTransform} that takes an input {@code PCollection<String>} and returns a |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The input type should be generic, I think. It's quite valid for this to be any encodable user type, e.g. it could be something like Long
, or even another KV<Something, SomethingElse>
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, I am going to work on genericity using coders
* {@code PCollection<CountMinSketch>} whose contents is a Count-min sketch that allows to query | ||
* the number of hits for a specific element in the input {@code PCollection}. | ||
* | ||
* <p>The {@code seed} parameters will be used to randomly generate different hash functions. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How does it benefit the user that they can provide a seed: can we use some default seed? Generally Beam aims to eliminate all tuning knobs that are not strictly necessary, and this one doesn't even influence correctness or performance.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You're right, this is not strictly necessary. A default seed will indeed be the best thing in most use cases. But we could still let the possibility to tune this parameter, what do you think ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would stick with not adding it until there is a usecase that people need it for.
/** | ||
* A {@code PTransform} that takes an input {@code PCollection<String>} and returns a | ||
* {@code PCollection<CountMinSketch>} whose contents is a Count-min sketch that allows to query | ||
* the number of hits for a specific element in the input {@code PCollection}. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please say a few words about how to query the number of hits.
* The {@code seed} parameter will be used to generate a and b for each hash function. | ||
* <br>The Count-min sketch size is constant through the process so the memory use is fixed. | ||
* However, the dimensions are directly linked to the accuracy. | ||
* <br>By default, the relative error is set to 1% with 1% probability that the estimation |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since this returns a Combine.Globally
, it's not possible to change this on the result of this transform, so "by default" is somewhat misleading.
I think error bounds are something that the user should always configure explicitly and there does not exist a reasonable default value - it is entirely application-specific. And the current function signature does not allow configuring them.
There are a few ways out of this:
- Instead of returning a
Combine.Globally
, create a new wrapper transform for this, with builder methods for relevant parameters. It will be rather boilerplate-y. - Just remove this function and expose only
CountMinSketchFn
, and have examples in javadoc about how to use it in a pipeline, mentioning various scenarios:Combine.Globally
,Combine.PerKey
, state cells etc. (not necessary to give code examples for each) - Add more parameters to the current function (after removing
seed
:) )
I prefer 2 - in particular, because we can always add convenience wrappers later if it turns out that life is too hard without them - but I could be convinced otherwise.
|
||
@Override public CountMinSketch mergeAccumulators(Iterable<CountMinSketch> accumulators) { | ||
Iterator<CountMinSketch> it = accumulators.iterator(); | ||
if (!it.hasNext()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can accumulators
be empty?
return accumulator; | ||
} | ||
|
||
@Override public Coder<CountMinSketch> getAccumulatorCoder(CoderRegistry registry, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These two functions for returning coders are unnecessary if instead you provide a default coder for CountMinSketch
globally, per https://beam.apache.org/contribute/ptransform-style-guide/#providing-default-coders-for-types.
return CountMinSketch.deserialize(BYTE_ARRAY_CODER.decode(inStream)); | ||
} | ||
|
||
@Override public boolean consistentWithEquals() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why isn't it consistent with equals? I think CountMinSketch
implements equals
and hashCode
deterministically.
} else { | ||
// depth and width as computed in the CountMinSketch constructor from the relative error and | ||
// confidence. | ||
int width = (int) Math.ceil(2 / value.getRelativeError()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is wonky, there's gotta be a better way to get access to depth and width. One easy way that comes to mind is simply send a PR to StreamLib to expose them.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't understand why this is wonky? I compute depth and width exactly the same way as in Count-Min Sketch constructors. I can still create a PR for that though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think @jkff is pointing out that it would be best if we didn't duplicate the internal details of StreamLib and instead asked them to expose this functionality.
I wouldn't block merging this in and instead add a TODO to replace it by calling StreamLib's implementation directly when available and link to a JIRA or PR against StreamLib which exposes it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
All right I'm going to send a PR to stream-lib and add a Todo in this function then, thanks @lukecwik
* <a>https://github.com/tdunning/t-digest</a> | ||
* However, this version has not been released yet so the issue is still up-to-date. | ||
*/ | ||
public static class SerializableTDigest implements Serializable { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why does it need to implement Serializable
?
Upon talking to @lukecwik I change my recommendation: let's still expose the transforms, but make sure that the factory functions for transforms take the configuration parameters. E.g.: CountMinSketches.{globally,perKey,combineFn}For{RelativeErrorAndConfidence,Dimensions}() Might change forRelativeErrorAndConfidence to forErrorBounds for brevity. For example, |
* @param sp the precision of HyperLogLog+' sparse representation | ||
*/ | ||
public ApproximateDistinctFn<InputT> withSparseRepresentation(int sp) { | ||
if (sp < p || sp > 32) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't you mean p < sp || sp > 32
?
* When the sparse representation would require more memory than the normal one, | ||
* it is converted and the normal algorithm applies for the remaining elements. | ||
* | ||
* <p><b>WARNING : </b>Choose sp such that {@code p <= sp <= 32} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
<b>WARNING : </b>Choose -> <b>WARNING:</b> Choose
* in order to estimate the cardinality. | ||
* | ||
* @param p precision value for the normal representation | ||
* @param <InputT> the type of the input {@code Pcollection}'s elements being combined. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pcollection -> PCollection
* }</pre> | ||
* | ||
* @param <InputT> type of elements being combined | ||
* @param p number of bits for indexes in the HyperLogLogPlus |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
indexes -> indices
} | ||
|
||
/** | ||
* Do the same as {@link ApproximateDistinct#globally(int)}, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do the same as -> Same as
|
||
private static final Logger LOG = LoggerFactory.getLogger(ApproximateDistinct.class); | ||
|
||
// do not instantiate |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: indentation
* By calling this builder, you will not use the sparse representation. | ||
* If you want to, see {@link ApproximateDistinctFn#withSparseRepresentation(int)} | ||
* | ||
* <p>Example of use |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Example of use -> Example:
|
||
/** | ||
* Returns an {@code ApproximateDistinctFn} combiner with the given precision value p. | ||
* This means that the input elements will be dispatched into 2^p buckets |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
2^p -> {@code 2^p}
|
||
/** | ||
* Do the same as {@link ApproximateDistinct#globally(int)}, | ||
* but with a default value of 18 for p. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
18 -> {@code 18}
} | ||
|
||
/** | ||
* Do the same as {@link ApproximateDistinct#globally(int)}, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do the same as -> Same as
|
||
@Override | ||
public HyperLogLogPlus addInput(HyperLogLogPlus acc, InputT record) { | ||
acc.offer(record); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ArnaudFnr wrote:
Yes that's a great idea !
I can't have access to the input coder from the combineFn, right ? so I thinkj we should create a method "withCoder" like in the Create transform in order to perform the encodings.
You should add withCoder
to the CombineFn.
You should also create a PTransform which sets the input coder from the input PCollection if its unset something like:
private static class ApproximateDistinctTransform<InputT> extends PTransform<PCollection<InputT>, PCollection<HyperLogLogPlus>> {
private ApproximateDistinctTransform(p, sp, coder) {
this.p = p;
this.sp = sp;
this.coder = coder;
}
@Override
public PCollection<HyperLogLogPlus> expand(PCollection<InputT> input) {
return input.apply(Combine.globally(new HyperLogLogPlus(p, sp, Objects.firstNonNull(coder, input.getCoder()))));
}
}
You'll also want a keyed PTransform version for perKey as well.
* | ||
* <p>This class uses the HyperLogLog algorithm, and more precisely | ||
* the improved version of google (HyperLogLog+). | ||
* <br>The implementation comes from Addthis' library Stream-lib : https://github.com/addthis/stream-lib |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ArnaudFnr wrote:
Yes didn't think about that, thanks.
I would suggest inlining the links within the text like:
<br>The implementation comes from <a href="https://github.com/addthis/stream-lib">Addthis' Stream-lib library</a>.
For example, you could rephrase the section linking the papers as:
The <a href="http://algo.inria.fr/flajolet/Publications/FlFuGaMe07.pdf">original HyperLogLog paper</a> provides details the algorithm. The same authors released a <a href="http://cscubs.cs.uni-bonn.de/2016/proceedings/paper-03.pdf">paper</a> with a clearer view of the algorithm. Google released a <a href="https://research.google.com/pubs/pub40671.html">paper</a> containing a modified version named HyperLogLog+.
* the number of distinct values associated with each key in a {@code PCollection} of {@code KV}s. | ||
* | ||
* <p>This class uses the HyperLogLog algorithm, and more precisely | ||
* the improved version of google (HyperLogLog+). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ArnaudFnr wrote:
Done
Done.
} | ||
} | ||
|
||
/** |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lukecwik wrote:
nit: fix indentation of lines below
Done.
* @param sp the precision of HyperLogLog+' sparse representation | ||
*/ | ||
public ApproximateDistinctFn<InputT> withSparseRepresentation(int sp) { | ||
return new ApproximateDistinctFn<>(this.p, sp); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lukecwik wrote:
Perform validation that p <= sp
Done.
* | ||
* <p>According to the paper, the mean squared error is bounded by the following formula : | ||
* <pre>b(m) / sqrt(m) | ||
* Where m is the number of buckets used (p = log2(m) ) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lukecwik wrote:
) ) -> ))
Done.
* | ||
* <br><b>WARNING : </b> | ||
* <br>This does not means relative error in the estimation <b>can't</b> be higher. | ||
* <br>This only means that, on average, the relative error will be |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lukecwik wrote:
that, on average, the -> that on average the
Done.
* <br>This only means that, on average, the relative error will be | ||
* lower than the desired relative error. | ||
* <br>Nevertheless, the more elements arrive in the stream, the lower the variation will be. | ||
* <br>Indeed, this is like when you throw a dice thousands or millions of time : |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ArnaudFnr wrote:
I think you meant dice* but ok for millions, makes more sense.
I believe using die
is correct since you are throwing a singular die millions of times.
Also, drop the preceding spaces, time : -> time:
* @param <InputT> the type of the Input {@code Pcollection}'s elements being combined. | ||
*/ | ||
public static <InputT> ApproximateDistinctFn<InputT> create(int p) { | ||
return new ApproximateDistinctFn<>(p, 0); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ArnaudFnr wrote:
I thought about this too, but the validation is already done in the HyperLogLogPlus constructor.
Is there a specific interest to do it there ?
It is much clearer to the caller that they caused the error if we throw an exception immediately when validating the input parameters instead of when it is caused deeper within the stack.
try { | ||
mergedAccum.addAll(accum); | ||
} catch (CardinalityMergeException e) { | ||
// Should never happen because only HyperLogLogPlus accumulators are instantiated. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ArnaudFnr wrote:
Should I remove the Logger then ?
The thrown exception will contain all the details and the user will log it so no point in duplicating logging here also.
* <p>The {@code capacity} parameter controls the maximum number of elements the sketch | ||
* can contain. Once this capacity is reached, the least frequent element is dropped each | ||
* time an incoming element is not already present in the sketch. | ||
* Each element in the sketch is associated to a counter, that keeps track of the estimate |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lukecwik wrote:
drop ,
estimate -> estimated
Done.
* <p>The {@code capacity} parameter controls the maximum number of elements the sketch | ||
* can contain. Once this capacity is reached, the least frequent element is dropped each | ||
* time an incoming element is not already present in the sketch. | ||
* Each element in the sketch is associated to a counter, that keeps track of the estimate |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lukecwik wrote:
drop ,
estimate -> estimated
Done.
|
||
@Override | ||
public StreamSummary<InputT> addInput(StreamSummary<InputT> accumulator, InputT element) { | ||
accumulator.offer(element, 1); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lukecwik wrote:
element may not implement a correct equals, for example byte[].You'll want to create a wrapper types that implement Externalizable which stores a reference to the element, the element coder, and the structural value for the element. This wrapper type should delegate equals to the structural value and its write/read method should use the coder and element to encode/decode the wrapper with the structural value being ephemeral.
Please add a test for this case by counting 1000 new instances of byte[]{ 0x00 } or something similar.
Ping?
* .apply(SketchFrequencies.<String>perKey(10000)); | ||
* } </pre> | ||
* | ||
* @param capacity the maximum number off distinct elements that the Stream Summary can keep |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lukecwik wrote:
off -> of
Done.
* <p>Example of use | ||
* <pre> {@code PCollection<KV<Integer, String>> input = ...; | ||
* PCollection<KV<Integer, StreamSummary<String>>> ssSketch = input | ||
* .apply(SketchFrequencies.<Integer, String>globally(10000)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lukecwik wrote:
Shouldn't the example be for KMostFrequent?
Done.
* <p>Example of use | ||
* <pre> {@code PCollection<String> input = ...; | ||
* PCollection<StreamSummary<String>> ssSketch = input | ||
* .apply(SketchFrequencies.<String>perKey(10000)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lukecwik wrote:
Shouldn't the example be for KMostFrequent?
Done.
* {@code PTransform}s for finding the k most frequent elements in a {@code PCollection}, or | ||
* the k most frequent values associated with each key in a {@code PCollection} of {@code KV}s. | ||
* | ||
* <p>This class uses the Space-Saving algorithm, introduced in this paper : |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lukecwik wrote:
Use<a>
tags for links
Like before, attempt to inline the text with the link like
<a href="http://my/link">paper</a>
|
||
private static final Logger LOG = LoggerFactory.getLogger(KMostFrequent.class); | ||
|
||
// do not instantiate |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lukecwik wrote:
nit: fix comment indent
Done.
|
||
/** | ||
* A {@code PTransform} that takes an input {@code PCollection<KV<K, InputT>>} and returns a | ||
* {@code PCollection<KV<K, StreamSummary>>} that contains an output element mapping each |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lukecwik wrote:
PCollection<KV<K, StreamSummary>> -> PCollection<KV<K, StreamSummary<T>>>
Done.
* | ||
* @param <InputT> the type of the elements being combined | ||
*/ | ||
static class KMostFrequentFn<InputT> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ArnaudFnr wrote:
Ok let's go for this
Done.
* The {@code seed} parameter will be used to generate a and b for each hash function. | ||
* <br>The Count-min sketch size is constant through the process so the memory use is fixed. | ||
* However, the dimensions are directly linked to the accuracy. | ||
* <br>By default, the relative error is set to 1% with 1% probability that the estimation |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
jkff wrote:
Since this returns aCombine.Globally
, it's not possible to change this on the result of this transform, so "by default" is somewhat misleading.I think error bounds are something that the user should always configure explicitly and there does not exist a reasonable default value - it is entirely application-specific. And the current function signature does not allow configuring them.
There are a few ways out of this:
- Instead of returning a
Combine.Globally
, create a new wrapper transform for this, with builder methods for relevant parameters. It will be rather boilerplate-y.- Just remove this function and expose only
CountMinSketchFn
, and have examples in javadoc about how to use it in a pipeline, mentioning various scenarios:Combine.Globally
,Combine.PerKey
, state cells etc. (not necessary to give code examples for each)- Add more parameters to the current function (after removing
seed
:) )I prefer 2 - in particular, because we can always add convenience wrappers later if it turns out that life is too hard without them - but I could be convinced otherwise.
The style guide says that we should provide PTransforms first, and then CombineFns/DoFns second which is counter to suggestion #2.
I believe we should provide both a PTransform and the CombineFn in this case.
* <a>https://github.com/tdunning/t-digest</a> | ||
* However, this version has not been released yet so the issue is still up-to-date. | ||
*/ | ||
public static class SerializableTDigest implements Serializable { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
jkff wrote:
Why does it need to implementSerializable
?
Based on the comment, it seems as though version 3.2 was released recently and you should be able to simplify your code here:
https://mvnrepository.com/artifact/com.tdunning/t-digest/3.2
* {@code PCollection}, or the occurrences of values associated with each key in a | ||
* {@code PCollection} of {@code KV}s. | ||
* | ||
* <p>This class uses the Count-min Sketch structure. The papers and other useful information |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lukecwik wrote:
Use<a>
href tags for links.
Like before, inline the link within the text like:
<a href="http://my/link">some description</a>
import org.slf4j.LoggerFactory; | ||
|
||
/** | ||
* {@code PTransform}s that records an estimation of the frequency of each element in a |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ArnaudFnr wrote:
There are several PTransforms as we can go for globally() of perKey() (and most of the Combiners starts like this)
in that case records -> record
It's getting difficult to follow all the comment threads. Please post an explicit update on this PR when you'd like me to take another look. I also stand by my suggestion to start with adding just one of these transforms - for the additional reason that it'll make it easier to deal with Github's shall we say questionable code review UI. |
Any update on this PR? |
Hi, I suppose @arnaudfnr is a bit busy with the return to school, let me ping him to confirm if he is going to finish this in the next days/weeks, if it is not the case, I will take from the current state and finish his work. |
Hello, I was indeed quite busy the past month but now I am ready to work again regularly on this PR. I am going to submit only the ApproximateDistinct transform as Eugene proposed. |
@arnaudfnr I think you were going to split this PR into multiple anyway, right? In that case, can the current PR be closed? |
It seems that this PR was already disassembled into smaller PRs - closing. |
Follow this checklist to help us incorporate your contribution quickly and easily:
[BEAM-XXX] Fixes bug in ApproximateQuantiles
, where you replaceBEAM-XXX
with the appropriate JIRA issue.mvn clean verify
to make sure basic checks pass. A more thorough check will be performed on your pull request automatically.Extension to compute approximate statistics with the use of probabilistic data structure, or sketches.
For now, 4 sketches are supported :
The sketches are implemented as Beam Combiners, allowing a user to build the sketch dynamically in the Pipeline and then make some dynamic queries and/or store it in a database.