Skip to content

Commit

Permalink
Insert pause, to spread the load on the temporal server, before launc…
Browse files Browse the repository at this point in the history
…h of each child workflow that may have direct leaves of its own
  • Loading branch information
phet committed Oct 28, 2023
1 parent 48e2bf4 commit cdccabb
Showing 1 changed file with 17 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.gobblin.temporal.util.nesting.workflow;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
Expand All @@ -40,6 +41,8 @@
/** Core skeleton of {@link NestingExecWorkflow}: realizing classes need only define {@link #launchAsyncActivity} */
@Slf4j
public abstract class AbstractNestingExecWorkflowImpl<WORK_ITEM, ACTIVITY_RESULT> implements NestingExecWorkflow<WORK_ITEM> {
public static final int NUM_SECONDS_TO_PAUSE_BEFORE_CREATING_SUB_TREE_DEFAULT = 10;
public static final int MAX_CHILD_SUB_TREE_LEAVES_BEFORE_SHOULD_PAUSE_DEFAULT = 100;

@Override
public int performWorkload(
Expand Down Expand Up @@ -74,6 +77,12 @@ public int performWorkload(
final WorkflowAddr childAddr = addr.createChild(nextChildId);
final NestingExecWorkflow<WORK_ITEM> child = createChildWorkflow(childAddr);
if (!workload.isIndexKnownToExceed(childStartIndex)) { // best-effort short-circuiting
// IMPORTANT: insert pause before launch of each child workflow that may have direct leaves of its own. periodic pauses spread the load on the
// temporal server, to avoid a sustained burst from submitting potentially very many async activities over the full hierarchical elaboration
final int numDirectLeavesChildMayHave = maxBranchesPerTree - subTreeChildMaxSubTreesPerTree;
if (numDirectLeavesChildMayHave > 0) {
Workflow.sleep(calcPauseDurationBeforeCreatingSubTree(numDirectLeavesChildMayHave));
}
childSubTrees.add(
Async.function(child::performWorkload, childAddr, workload, childStartIndex, maxBranchesPerTree,
maxSubTreesPerTree, Optional.of(subTreeChildMaxSubTreesPerTree)));
Expand Down Expand Up @@ -106,6 +115,14 @@ protected NestingExecWorkflow<WORK_ITEM> createChildWorkflow(final WorkflowAddr
return Workflow.newChildWorkflowStub(NestingExecWorkflow.class, childOpts);
}

/** @return how long to pause prior to creating a child workflow, based on `numDirectLeavesChildMayHave` */
protected Duration calcPauseDurationBeforeCreatingSubTree(int numDirectLeavesChildMayHave) {
// (only pause when an appreciable number of leaves)
return numDirectLeavesChildMayHave > MAX_CHILD_SUB_TREE_LEAVES_BEFORE_SHOULD_PAUSE_DEFAULT
? Duration.ofSeconds(NUM_SECONDS_TO_PAUSE_BEFORE_CREATING_SUB_TREE_DEFAULT)
: Duration.ZERO;
}

/**
* "right-tilt" sub-tree's grandchildren, so final child gets all grandchildren (vs. constant grandchildren/child)
* i.e. NOT!:
Expand Down

0 comments on commit cdccabb

Please sign in to comment.