Skip to content

Commit

Permalink
Add Windmill support for MultimapState (#23492)
Browse files Browse the repository at this point in the history
  • Loading branch information
zhengbuqian authored Jul 5, 2023
1 parent cb8adf1 commit 2a12765
Show file tree
Hide file tree
Showing 6 changed files with 3,003 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2450,10 +2450,10 @@ static boolean useStreamingEngine(DataflowPipelineOptions options) {

static void verifyDoFnSupported(
DoFn<?, ?> fn, boolean streaming, DataflowPipelineOptions options) {
if (DoFnSignatures.usesMultimapState(fn)) {
if (!streaming && DoFnSignatures.usesMultimapState(fn)) {
throw new UnsupportedOperationException(
String.format(
"%s does not currently support %s",
"%s does not currently support %s in batch mode",
DataflowRunner.class.getSimpleName(), MultimapState.class.getSimpleName()));
}
if (streaming && DoFnSignatures.requiresTimeSortedInput(fn)) {
Expand All @@ -2465,6 +2465,13 @@ static void verifyDoFnSupported(

boolean streamingEngine = useStreamingEngine(options);
boolean isUnifiedWorker = useUnifiedWorker(options);

if (DoFnSignatures.usesMultimapState(fn) && isUnifiedWorker) {
throw new UnsupportedOperationException(
String.format(
"%s does not currently support %s running using streaming on unified worker",
DataflowRunner.class.getSimpleName(), MultimapState.class.getSimpleName()));
}
if (DoFnSignatures.usesSetState(fn)) {
if (streaming && (isUnifiedWorker || streamingEngine)) {
throw new UnsupportedOperationException(
Expand Down
Loading

0 comments on commit 2a12765

Please sign in to comment.