From a856332061cc7a9347625c4ecc46fb3d1d3cb1f2 Mon Sep 17 00:00:00 2001 From: sunjincheng121 Date: Tue, 3 Sep 2019 13:46:17 +0800 Subject: [PATCH] [hotfix] Encode keys as OUTER for flink key supplier --- .../beam/fn/harness/state/FnApiStateAccessor.java | 6 +++++- .../apache/beam/fn/harness/FnApiDoFnRunnerTest.java | 13 +++++++++++-- 2 files changed, 16 insertions(+), 3 deletions(-) diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/FnApiStateAccessor.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/FnApiStateAccessor.java index b1224bd19792a..4720098a67f7b 100644 --- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/FnApiStateAccessor.java +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/FnApiStateAccessor.java @@ -104,7 +104,11 @@ public FnApiStateAccessor( ByteString.Output encodedKeyOut = ByteString.newOutput(); try { - ((Coder) keyCoder).encode(((KV) element.getValue()).getKey(), encodedKeyOut); + ((Coder) keyCoder) + .encode( + ((KV) element.getValue()).getKey(), + encodedKeyOut, + Coder.Context.OUTER); } catch (IOException e) { throw new IllegalStateException(e); } diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnApiDoFnRunnerTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnApiDoFnRunnerTest.java index 088148d70db21..7533f61b9fd5c 100644 --- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnApiDoFnRunnerTest.java +++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnApiDoFnRunnerTest.java @@ -51,6 +51,7 @@ import org.apache.beam.runners.core.metrics.MonitoringInfoConstants; import org.apache.beam.runners.core.metrics.SimpleMonitoringInfoBuilder; import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.fn.data.FnDataReceiver; import org.apache.beam.sdk.metrics.Counter; @@ -284,7 +285,7 @@ private StateKey bagUserStateKey(String userStateId, String key) throws IOExcept StateKey.BagUserState.newBuilder() .setPtransformId(TEST_PTRANSFORM_ID) .setUserStateId(userStateId) - .setKey(encode(key)) + .setKey(encodeOuter(key)) .setWindow( ByteString.copyFrom( CoderUtils.encodeToByteArray( @@ -975,9 +976,17 @@ private StateKey multimapSideInputKey(String sideInputId, ByteString key, ByteSt } private ByteString encode(String... values) throws IOException { + return encode(Coder.Context.NESTED, values); + } + + private ByteString encodeOuter(String... values) throws IOException { + return encode(Coder.Context.OUTER, values); + } + + private ByteString encode(Coder.Context context, String... values) throws IOException { ByteString.Output out = ByteString.newOutput(); for (String value : values) { - StringUtf8Coder.of().encode(value, out); + StringUtf8Coder.of().encode(value, out, context); } return out.toByteString(); }