Skip to content

Commit

Permalink
Merge pull request apache#11 from bsidhom/output-receiver-empty
Browse files Browse the repository at this point in the history
Handle empty output set in FlinkExecutableStageFunction
  • Loading branch information
axelmagn authored Mar 13, 2018
2 parents 4c87525 + 5e636c3 commit 8a2d352
Showing 1 changed file with 13 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,10 @@ public void mapPartition(Iterable<WindowedValue<InputT>> input,
// TODO: Support multiple output receivers and redirect them properly.
Map<BeamFnApi.Target, Coder<WindowedValue<?>>> outputCoders =
processBundleDescriptor.getOutputTargetCoders();
BeamFnApi.Target outputTarget = Iterables.getOnlyElement(outputCoders.keySet());
BeamFnApi.Target outputTarget = null;
if (outputCoders.size() > 0) {
outputTarget = Iterables.getOnlyElement(outputCoders.keySet());
}
Coder<?> outputCoder = Iterables.getOnlyElement(outputCoders.values());
SdkHarnessClient.RemoteOutputReceiver<WindowedValue<OutputT>> mainOutputReceiver =
new SdkHarnessClient.RemoteOutputReceiver<WindowedValue<OutputT>>() {
Expand All @@ -131,8 +134,15 @@ public void accept(WindowedValue<OutputT> input) throws Exception {
};
}
};
SdkHarnessClient.ActiveBundle<InputT> bundle = processor.newBundle(
ImmutableMap.of(outputTarget, mainOutputReceiver));
Map<BeamFnApi.Target,
SdkHarnessClient.RemoteOutputReceiver<?>> receiverMap;
if (outputTarget == null) {
receiverMap = ImmutableMap.of(outputTarget, mainOutputReceiver);
} else {
receiverMap = ImmutableMap.of();
}

SdkHarnessClient.ActiveBundle<InputT> bundle = processor.newBundle(receiverMap);
try (CloseableFnDataReceiver<WindowedValue<InputT>> inputReceiver = bundle.getInputReceiver()) {
for (WindowedValue<InputT> value : input) {
inputReceiver.accept(value);
Expand Down

0 comments on commit 8a2d352

Please sign in to comment.