-
Notifications
You must be signed in to change notification settings - Fork 0
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[BEAM-4294] [BEAM-4360] Join translation and ReduceByKey test suite where moved to org.apache.beam.* package. #1
[BEAM-4294] [BEAM-4360] Join translation and ReduceByKey test suite where moved to org.apache.beam.* package. #1
Conversation
…pplication of windowing in JoinTranslator fixed.
… to yet unsupported features.
…here moved to org.apache.beam.* package. Small imports fixes after rebase to seznam/beam eff3ffd.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good overall! Good job 👍 Please introduce a check whether RBK can be translated using current translator and we're good to go ;)
|
||
ListDataSink<Pair<Integer, Pair<String, Integer>>> output = ListDataSink.get(); | ||
|
||
BinaryFunctor<Optional<Pair<Integer, String>>, Pair<Integer, Integer>, Pair<String, Integer>> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it's usually more readable to apply lambda directly or to create named static function and use it by reference
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree. Code was changed so the lambda is is used directly.
This case is more specific since the API requires type parameters of BinaryFunctor
to be stated explicitly, otherwise parameter to the output()
method has inferred arguments type which cannot be assigned from ListDataSink<Pair<Integer, Pair<String, Integer>>>
(output
variable).
@@ -48,6 +48,10 @@ | |||
private static <InputT, K, V, OutputT, W extends Window<W>> PCollection<Pair<K, OutputT>> | |||
doTranslate(ReduceByKey<InputT, K, V, OutputT, W> operator, BeamExecutorContext context) { | |||
|
|||
if (operator.getValueComparator() != null) { //TODO Could we even do values sorting ? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Preconditions.checkState
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
@@ -54,6 +55,7 @@ | |||
// extended operators | |||
translators.put(ReduceByKey.class, new ReduceByKeyTranslator()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we need to add predicate for reduce by key translator as it can not handle some cases (eg. if it contains value comparator). Predicates also allow us to have more implementations for a single operator (to handle different cases)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I reckon it would be worth adding a boolean canTranslate
to the OperatorTranslator
interface.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
New method boolean canTranslate(operator)
added to OperatorTranslator
. Now we have ability to explicitly state when given translator cannot translate given Operator
. We can also add more specialized translators when needed.
PCollection<ValueT> typedInput = (PCollection<ValueT>) inputPCollection; | ||
typedInput.setCoder(valueCoder); | ||
|
||
PCollection<KV<K, ValueT>> leftKvInput = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
-> kvInput
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
private final UnaryFunction<InputT, K> keyExtractor; | ||
|
||
public InputToKvDoFn(UnaryFunction<InputT, K> leftKeyExtractor) { | ||
this.keyExtractor = leftKeyExtractor; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
leftKeyExtractor
-> keyExtractor
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
… extended to allow more than one translator per operator.
LGTM 👍 Good job. Can we send a pull request directly to |
Fix tests expectations and minor code fix up.
Update forked repository.
This pull requests summaries together all the changes described in:
Plus small imports fixes after rebase to eff3ffd. So please review them first to get understanding of this pull request.
Follow this checklist to help us incorporate your contribution quickly and easily:
[BEAM-XXX] Fixes bug in ApproximateQuantiles
, where you replaceBEAM-XXX
with the appropriate JIRA issue../gradlew build
to make sure basic checks pass. A more thorough check will be performed on your pull request automatically.