Skip to content

Commit

Permalink
Add windmill APIs proto and windmill legacy worker implementations fo…
Browse files Browse the repository at this point in the history
…r multimap state
  • Loading branch information
zhengbuqian committed Feb 28, 2023
1 parent 7db76df commit 230a03f
Show file tree
Hide file tree
Showing 6 changed files with 2,856 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2443,10 +2443,10 @@ static boolean useStreamingEngine(DataflowPipelineOptions options) {

static void verifyDoFnSupported(
DoFn<?, ?> fn, boolean streaming, DataflowPipelineOptions options) {
if (DoFnSignatures.usesMultimapState(fn)) {
if (!streaming && DoFnSignatures.usesMultimapState(fn)) {
throw new UnsupportedOperationException(
String.format(
"%s does not currently support %s",
"%s does not currently support %s in batch mode",
DataflowRunner.class.getSimpleName(), MultimapState.class.getSimpleName()));
}
if (streaming && DoFnSignatures.requiresTimeSortedInput(fn)) {
Expand All @@ -2458,6 +2458,13 @@ static void verifyDoFnSupported(

boolean streamingEngine = useStreamingEngine(options);
boolean isUnifiedWorker = useUnifiedWorker(options);

if (DoFnSignatures.usesMultimapState(fn) && isUnifiedWorker) {
throw new UnsupportedOperationException(
String.format(
"%s does not currently support %s running using streaming on unified worker",
DataflowRunner.class.getSimpleName(), MultimapState.class.getSimpleName()));
}
if (DoFnSignatures.usesSetState(fn)) {
if (streaming && (isUnifiedWorker || streamingEngine)) {
throw new UnsupportedOperationException(
Expand Down
Loading

0 comments on commit 230a03f

Please sign in to comment.