Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BEAM-8162] Encode keys as NESTED for flink keyselector #9464

Merged
merged 1 commit into from
Sep 5, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.beam.runners.core.StateNamespaces;
import org.apache.beam.runners.core.StateTag;
import org.apache.beam.runners.core.StateTags;
import org.apache.beam.runners.flink.translation.wrappers.streaming.FlinkKeyUtils;
import org.apache.beam.runners.flink.translation.wrappers.streaming.state.FlinkStateInternals;
import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.sdk.coders.StringUtf8Coder;
Expand Down Expand Up @@ -63,6 +64,17 @@ protected StateInternals createStateInternals() {
}
}

@Test
public void testGetKey() throws Exception {
KeyedStateBackend<ByteBuffer> keyedStateBackend = createStateBackend();
StringUtf8Coder coder = StringUtf8Coder.of();
String key = "key";

keyedStateBackend.setCurrentKey(FlinkKeyUtils.encodeKey(key, coder));
FlinkStateInternals stateInternals = new FlinkStateInternals<>(keyedStateBackend, coder);
assertThat(stateInternals.getKey(), is(key));
}

@Test
public void testWatermarkHoldsPersistence() throws Exception {
KeyedStateBackend<ByteBuffer> keyedStateBackend = createStateBackend();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.beam.runners.core.StateNamespaces;
import org.apache.beam.runners.core.StateTag;
import org.apache.beam.runners.core.StateTags;
import org.apache.beam.runners.flink.translation.wrappers.streaming.FlinkKeyUtils;
import org.apache.beam.runners.flink.translation.wrappers.streaming.state.FlinkStateInternals;
import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.sdk.coders.StringUtf8Coder;
Expand Down Expand Up @@ -66,6 +67,17 @@ protected StateInternals createStateInternals() {
}
}

@Test
public void testGetKey() throws Exception {
KeyedStateBackend<ByteBuffer> keyedStateBackend = createStateBackend();
StringUtf8Coder coder = StringUtf8Coder.of();
String key = "key";

keyedStateBackend.setCurrentKey(FlinkKeyUtils.encodeKey(key, coder));
FlinkStateInternals stateInternals = new FlinkStateInternals<>(keyedStateBackend, coder);
assertThat(stateInternals.getKey(), is(key));
}

@Test
public void testWatermarkHoldsPersistence() throws Exception {
KeyedStateBackend<ByteBuffer> keyedStateBackend = createStateBackend();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,10 @@ public static <K> ByteBuffer encodeKey(K key, Coder<K> keyCoder) {
checkNotNull(keyCoder, "Provided coder must not be null");
final byte[] keyBytes;
try {
keyBytes = CoderUtils.encodeToByteArray(keyCoder, key);
// We use nested here to make sure the same logic with the get key logic at the portability
// layer. We currently use NESTED everywhere in the Beam portability APIs since NESTED vs
// OUTER becomes quite complicated and extremely error prone.
keyBytes = CoderUtils.encodeToByteArray(keyCoder, key, Coder.Context.NESTED);
} catch (Exception e) {
throw new RuntimeException(String.format(Locale.ENGLISH, "Failed to encode key: %s", key), e);
}
Expand All @@ -59,7 +62,7 @@ static <K> K decodeKey(ByteBuffer byteBuffer, Coder<K> keyCoder) {
@SuppressWarnings("ByteBufferBackingArray")
final byte[] keyBytes = byteBuffer.array();
try {
return CoderUtils.decodeFromByteArray(keyCoder, keyBytes);
return CoderUtils.decodeFromByteArray(keyCoder, keyBytes, Coder.Context.NESTED);
} catch (Exception e) {
throw new RuntimeException(
String.format(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ public K getKey() {
keyBytes.get(bytes);
keyBytes.position(keyBytes.position() - bytes.length);
try {
return CoderUtils.decodeFromByteArray(keyCoder, bytes);
return CoderUtils.decodeFromByteArray(keyCoder, bytes, Coder.Context.NESTED);
} catch (CoderException e) {
throw new RuntimeException("Error decoding key.", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1631,10 +1631,10 @@ public void finishBundle(FinishBundleContext context) {
assertThat(
stripStreamRecordFromWindowedValue(testHarness.getOutput()),
contains(
WindowedValue.valueInGlobalWindow(KV.of("key2", "c")),
WindowedValue.valueInGlobalWindow(KV.of("key2", "d")),
WindowedValue.valueInGlobalWindow(KV.of("key", "a")),
WindowedValue.valueInGlobalWindow(KV.of("key", "b")),
WindowedValue.valueInGlobalWindow(KV.of("key2", "c")),
WindowedValue.valueInGlobalWindow(KV.of("key2", "d")),
WindowedValue.valueInGlobalWindow(KV.of("key3", "finishBundle"))));

doFnOperator = doFnOperatorSupplier.get();
Expand All @@ -1652,10 +1652,10 @@ public void finishBundle(FinishBundleContext context) {
assertThat(
stripStreamRecordFromWindowedValue(testHarness.getOutput()),
contains(
WindowedValue.valueInGlobalWindow(KV.of("key2", "c")),
WindowedValue.valueInGlobalWindow(KV.of("key2", "d")),
WindowedValue.valueInGlobalWindow(KV.of("key", "a")),
WindowedValue.valueInGlobalWindow(KV.of("key", "b")),
WindowedValue.valueInGlobalWindow(KV.of("key2", "c")),
WindowedValue.valueInGlobalWindow(KV.of("key2", "d")),
WindowedValue.valueInGlobalWindow(KV.of("key3", "finishBundle"))));

// repeat to see if elements are evicted
Expand All @@ -1665,10 +1665,10 @@ public void finishBundle(FinishBundleContext context) {
assertThat(
stripStreamRecordFromWindowedValue(testHarness.getOutput()),
contains(
WindowedValue.valueInGlobalWindow(KV.of("key2", "c")),
WindowedValue.valueInGlobalWindow(KV.of("key2", "d")),
WindowedValue.valueInGlobalWindow(KV.of("key", "a")),
WindowedValue.valueInGlobalWindow(KV.of("key", "b")),
WindowedValue.valueInGlobalWindow(KV.of("key2", "c")),
WindowedValue.valueInGlobalWindow(KV.of("key2", "d")),
WindowedValue.valueInGlobalWindow(KV.of("key3", "finishBundle"))));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,12 @@ public void testNullKey() {
@SuppressWarnings("ByteBufferBackingArray")
public void testCoderContext() throws Exception {
byte[] bytes = {1, 1, 1};
byte[] encodedBytes = {3, 1, 1, 1};
ByteString key = ByteString.copyFrom(bytes);
ByteStringCoder coder = ByteStringCoder.of();

ByteBuffer encoded = FlinkKeyUtils.encodeKey(key, coder);
// Ensure outer context is used where no length encoding is used.
assertThat(encoded.array(), is(bytes));
// Ensure nested context is used.
assertThat(encoded.array(), is(encodedBytes));
}
}