From cdccabbb6843a7553eec4698c84d14f39518f62f Mon Sep 17 00:00:00 2001 From: Kip Kohn Date: Sat, 28 Oct 2023 03:23:15 -0700 Subject: [PATCH] Insert pause, to spread the load on the temporal server, before launch of each child workflow that may have direct leaves of its own --- .../AbstractNestingExecWorkflowImpl.java | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/util/nesting/workflow/AbstractNestingExecWorkflowImpl.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/util/nesting/workflow/AbstractNestingExecWorkflowImpl.java index 046b9a4a33d..425d6284c37 100644 --- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/util/nesting/workflow/AbstractNestingExecWorkflowImpl.java +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/util/nesting/workflow/AbstractNestingExecWorkflowImpl.java @@ -17,6 +17,7 @@ package org.apache.gobblin.temporal.util.nesting.workflow; +import java.time.Duration; import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -40,6 +41,8 @@ /** Core skeleton of {@link NestingExecWorkflow}: realizing classes need only define {@link #launchAsyncActivity} */ @Slf4j public abstract class AbstractNestingExecWorkflowImpl implements NestingExecWorkflow { + public static final int NUM_SECONDS_TO_PAUSE_BEFORE_CREATING_SUB_TREE_DEFAULT = 10; + public static final int MAX_CHILD_SUB_TREE_LEAVES_BEFORE_SHOULD_PAUSE_DEFAULT = 100; @Override public int performWorkload( @@ -74,6 +77,12 @@ public int performWorkload( final WorkflowAddr childAddr = addr.createChild(nextChildId); final NestingExecWorkflow child = createChildWorkflow(childAddr); if (!workload.isIndexKnownToExceed(childStartIndex)) { // best-effort short-circuiting + // IMPORTANT: insert pause before launch of each child workflow that may have direct leaves of its own. periodic pauses spread the load on the + // temporal server, to avoid a sustained burst from submitting potentially very many async activities over the full hierarchical elaboration + final int numDirectLeavesChildMayHave = maxBranchesPerTree - subTreeChildMaxSubTreesPerTree; + if (numDirectLeavesChildMayHave > 0) { + Workflow.sleep(calcPauseDurationBeforeCreatingSubTree(numDirectLeavesChildMayHave)); + } childSubTrees.add( Async.function(child::performWorkload, childAddr, workload, childStartIndex, maxBranchesPerTree, maxSubTreesPerTree, Optional.of(subTreeChildMaxSubTreesPerTree))); @@ -106,6 +115,14 @@ protected NestingExecWorkflow createChildWorkflow(final WorkflowAddr return Workflow.newChildWorkflowStub(NestingExecWorkflow.class, childOpts); } + /** @return how long to pause prior to creating a child workflow, based on `numDirectLeavesChildMayHave` */ + protected Duration calcPauseDurationBeforeCreatingSubTree(int numDirectLeavesChildMayHave) { + // (only pause when an appreciable number of leaves) + return numDirectLeavesChildMayHave > MAX_CHILD_SUB_TREE_LEAVES_BEFORE_SHOULD_PAUSE_DEFAULT + ? Duration.ofSeconds(NUM_SECONDS_TO_PAUSE_BEFORE_CREATING_SUB_TREE_DEFAULT) + : Duration.ZERO; + } + /** * "right-tilt" sub-tree's grandchildren, so final child gets all grandchildren (vs. constant grandchildren/child) * i.e. NOT!: