Skip to content

Commit

Permalink
[Multimap] Added windmill implementations for multimap state.
Browse files Browse the repository at this point in the history
  • Loading branch information
zhengbuqian committed Dec 1, 2022
1 parent 065167b commit add1c3f
Show file tree
Hide file tree
Showing 5 changed files with 2,121 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@
import org.apache.beam.sdk.runners.TransformHierarchy;
import org.apache.beam.sdk.runners.TransformHierarchy.Node;
import org.apache.beam.sdk.state.MapState;
import org.apache.beam.sdk.state.MultimapState;
import org.apache.beam.sdk.state.SetState;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.Combine.CombineFn;
Expand Down Expand Up @@ -2430,6 +2431,13 @@ static void verifyDoFnSupported(

boolean streamingEngine = useStreamingEngine(options);
boolean isUnifiedWorker = useUnifiedWorker(options);

if (DoFnSignatures.usesMultimapState(fn) && isUnifiedWorker) {
throw new UnsupportedOperationException(
String.format(
"%s does not currently support %s running using streaming on unified worker",
DataflowRunner.class.getSimpleName(), MultimapState.class.getSimpleName()));
}
if (DoFnSignatures.usesSetState(fn)) {
if (streaming && (isUnifiedWorker || streamingEngine)) {
throw new UnsupportedOperationException(
Expand Down
Loading

0 comments on commit add1c3f

Please sign in to comment.