Skip to content

Commit

Permalink
Merge pull request #839: [proxima-beam-tools] explicitly add coder fo…
Browse files Browse the repository at this point in the history
…r joins
  • Loading branch information
je-ik authored Oct 16, 2023
2 parents 7a45266 + 89d7f90 commit e10f1d5
Show file tree
Hide file tree
Showing 2 changed files with 5 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ private PairCoder(Coder<K> keyCoder, Coder<V> valueCoder) {

@Override
public void encode(Pair<K, V> value, OutputStream outStream) throws IOException {

keyCoder.encode(value.getFirst(), outStream);
valueCoder.encode(value.getSecond(), outStream);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -393,9 +393,11 @@ public <K, RIGHT> WindowedStream<Pair<T, RIGHT>> join(
name == null
? Join.innerJoin(leftKv, rightKv)
: Join.innerJoin(name + ".join", leftKv, rightKv);
return joined.apply(
MapElements.into(resultCoder.getEncodedTypeDescriptor())
.via(kv -> Pair.of(kv.getValue().getKey(), kv.getValue().getValue())));
return joined
.apply(
MapElements.into(resultCoder.getEncodedTypeDescriptor())
.via(kv -> Pair.of(kv.getValue().getKey(), kv.getValue().getValue())))
.setCoder(resultCoder);
});
}

Expand Down

0 comments on commit e10f1d5

Please sign in to comment.