Skip to content

Commit

Permalink
Rename variables to improve readability
Browse files Browse the repository at this point in the history
  • Loading branch information
sopel39 committed Nov 8, 2022
1 parent c1f1942 commit 1ee0cf7
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ public class PhasedExecutionSchedule
*/
private final List<PlanFragmentId> sortedFragments = new ArrayList<>();
private final Map<PlanFragmentId, StageExecution> stagesByFragmentId;
private final Set<StageExecution> activeStages = new LinkedHashSet<>();
private final Set<StageExecution> schedulingStages = new LinkedHashSet<>();
private final DynamicFilterService dynamicFilterService;

/**
Expand Down Expand Up @@ -145,7 +145,7 @@ public StagesScheduleResult getStagesToSchedule()
// notifications from previously started stages are not lost
Optional<ListenableFuture<Void>> rescheduleFuture = getRescheduleFuture();
schedule();
return new StagesScheduleResult(activeStages, rescheduleFuture);
return new StagesScheduleResult(schedulingStages, rescheduleFuture);
}

@Override
Expand All @@ -165,7 +165,7 @@ synchronized Optional<ListenableFuture<Void>> getRescheduleFuture()
void schedule()
{
ImmutableSet.Builder<PlanFragmentId> fragmentsToExecute = ImmutableSet.builder();
fragmentsToExecute.addAll(removeCompletedStages());
fragmentsToExecute.addAll(removeScheduledStages());
fragmentsToExecute.addAll(unblockStagesWithFullOutputBuffer());
selectForExecution(fragmentsToExecute.build());
}
Expand All @@ -183,25 +183,25 @@ DirectedGraph<PlanFragmentId, FragmentsEdge> getFragmentDependency()
}

@VisibleForTesting
Set<StageExecution> getActiveStages()
Set<StageExecution> getSchedulingStages()
{
return activeStages;
return schedulingStages;
}

private Set<PlanFragmentId> removeCompletedStages()
private Set<PlanFragmentId> removeScheduledStages()
{
// iterate over all stages, not only active ones; stages which are not yet active could have already been aborted
Set<StageExecution> completedStages = stagesByFragmentId.values().stream()
.filter(this::isStageCompleted)
// iterate over all stages, not only scheduling ones; stages which are not yet scheduling could have already been aborted
Set<StageExecution> scheduledStages = stagesByFragmentId.values().stream()
.filter(this::isStageScheduled)
.collect(toImmutableSet());
// remove completed stages outside of Java stream to prevent concurrent modification
log.debug("completedStages: %s", completedStages);
return completedStages.stream()
.flatMap(stage -> removeCompletedStage(stage).stream())
log.debug("scheduledStages: %s", scheduledStages);
return scheduledStages.stream()
.flatMap(stage -> removeScheduledStage(stage).stream())
.collect(toImmutableSet());
}

private Set<PlanFragmentId> removeCompletedStage(StageExecution stage)
private Set<PlanFragmentId> removeScheduledStage(StageExecution stage)
{
// start all stages that depend on completed stage
PlanFragmentId fragmentId = stage.getFragment().getId();
Expand All @@ -215,7 +215,7 @@ private Set<PlanFragmentId> removeCompletedStage(StageExecution stage)
.filter(dependentFragmentId -> fragmentDependency.inDegreeOf(dependentFragmentId) == 1)
.collect(toImmutableSet());
fragmentDependency.removeVertex(fragmentId);
activeStages.remove(stage);
schedulingStages.remove(stage);
return fragmentsToExecute;
}

Expand Down Expand Up @@ -248,16 +248,16 @@ private void selectForExecution(Set<PlanFragmentId> fragmentIds)

private void selectForExecution(StageExecution stage)
{
if (isStageCompleted(stage)) {
if (isStageScheduled(stage)) {
// don't start completed stages (can happen when non-lazy stage is selected for
// execution and stage is started immediately even with dependencies)
return;
}

if (activeStages.add(stage) && fragmentDependency.outDegreeOf(stage.getFragment().getId()) > 0) {
if (schedulingStages.add(stage) && fragmentDependency.outDegreeOf(stage.getFragment().getId()) > 0) {
// if there are any dependent stages then reschedule when stage is completed
stage.addStateChangeListener(state -> {
if (isStageCompleted(stage)) {
if (isStageScheduled(stage)) {
notifyReschedule(stage);
}
});
Expand All @@ -276,7 +276,7 @@ private void notifyReschedule(StageExecution stage)
rescheduleFuture.set(null);
}

private boolean isStageCompleted(StageExecution stage)
private boolean isStageScheduled(StageExecution stage)
{
State state = stage.getState();
return state == SCHEDULED || state == RUNNING || state == FLUSHING || state.isDone();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,15 +86,15 @@ public void testPartitionedJoin()
assertThat(dependencies.edgeSet()).containsExactlyInAnyOrder(new FragmentsEdge(buildFragment.getId(), probeFragment.getId()));

// build and join stage should start immediately
assertThat(getActiveFragments(schedule)).containsExactly(buildFragment.getId(), joinFragment.getId());
assertThat(getSchedulingFragments(schedule)).containsExactly(buildFragment.getId(), joinFragment.getId());

// probe stage should start after build stage is completed
ListenableFuture<Void> rescheduleFuture = schedule.getRescheduleFuture().orElseThrow();
assertThat(rescheduleFuture).isNotDone();
buildStage.setState(FLUSHING);
assertThat(rescheduleFuture).isDone();
schedule.schedule();
assertThat(getActiveFragments(schedule)).containsExactly(joinFragment.getId(), probeFragment.getId());
assertThat(getSchedulingFragments(schedule)).containsExactly(joinFragment.getId(), probeFragment.getId());

// make sure scheduler finishes
rescheduleFuture = schedule.getRescheduleFuture().orElseThrow();
Expand All @@ -103,7 +103,7 @@ public void testPartitionedJoin()
assertThat(rescheduleFuture).isNotDone();
joinStage.setState(FINISHED);
schedule.schedule();
assertThat(getActiveFragments(schedule)).isEmpty();
assertThat(getSchedulingFragments(schedule)).isEmpty();
assertThat(schedule.isFinished()).isTrue();
}

Expand All @@ -124,12 +124,12 @@ public void testBroadcastSourceJoin()
assertThat(dependencies.edgeSet()).containsExactlyInAnyOrder(new FragmentsEdge(buildFragment.getId(), joinSourceFragment.getId()));

// build stage should start immediately
assertThat(getActiveFragments(schedule)).containsExactly(buildFragment.getId());
assertThat(getSchedulingFragments(schedule)).containsExactly(buildFragment.getId());

// join stage should start after build stage buffer is full
buildStage.setAnyTaskBlocked(true);
schedule.schedule();
assertThat(getActiveFragments(schedule)).containsExactly(buildFragment.getId(), joinSourceFragment.getId());
assertThat(getSchedulingFragments(schedule)).containsExactly(buildFragment.getId(), joinSourceFragment.getId());
}

@Test
Expand All @@ -151,7 +151,7 @@ public void testAggregation()
// aggregation and source stage should start immediately, join stage should wait for build stage to complete
DirectedGraph<PlanFragmentId, FragmentsEdge> dependencies = schedule.getFragmentDependency();
assertThat(dependencies.edgeSet()).containsExactly(new FragmentsEdge(buildFragment.getId(), joinFragment.getId()));
assertThat(getActiveFragments(schedule)).containsExactly(buildFragment.getId(), sourceFragment.getId(), aggregationFragment.getId());
assertThat(getSchedulingFragments(schedule)).containsExactly(buildFragment.getId(), sourceFragment.getId(), aggregationFragment.getId());
}

@Test
Expand All @@ -173,9 +173,9 @@ public void testDependentStageAbortedBeforeStarted()
// aggregation and source stage should start immediately, join stage should wait for build stage to complete
DirectedGraph<PlanFragmentId, FragmentsEdge> dependencies = schedule.getFragmentDependency();
assertThat(dependencies.edgeSet()).containsExactly(new FragmentsEdge(buildFragment.getId(), joinFragment.getId()));
assertThat(getActiveFragments(schedule)).containsExactly(buildFragment.getId(), sourceFragment.getId(), aggregationFragment.getId());
assertThat(getSchedulingFragments(schedule)).containsExactly(buildFragment.getId(), sourceFragment.getId(), aggregationFragment.getId());

// abort non-active join stage
// abort non-scheduling join stage
joinStage.setState(ABORTED);

// dependencies finish
Expand Down Expand Up @@ -210,17 +210,17 @@ public void testStageWithBroadcastAndPartitionedJoin()
new FragmentsEdge(broadcastBuildFragment.getId(), probeFragment.getId()),
new FragmentsEdge(partitionedBuildFragment.getId(), probeFragment.getId()),
new FragmentsEdge(broadcastBuildFragment.getId(), joinFragment.getId()));
assertThat(getActiveFragments(schedule)).containsExactly(partitionedBuildFragment.getId(), broadcastBuildFragment.getId(), joinFragment.getId());
assertThat(getSchedulingFragments(schedule)).containsExactly(partitionedBuildFragment.getId(), broadcastBuildFragment.getId(), joinFragment.getId());

// completing single build dependency shouldn't cause probe stage to start
broadcastBuildStage.setState(FLUSHING);
schedule.schedule();
assertThat(getActiveFragments(schedule)).containsExactly(partitionedBuildFragment.getId(), joinFragment.getId());
assertThat(getSchedulingFragments(schedule)).containsExactly(partitionedBuildFragment.getId(), joinFragment.getId());

// completing all build dependencies should cause probe stage to start
partitionedBuildStage.setState(FLUSHING);
schedule.schedule();
assertThat(getActiveFragments(schedule)).containsExactly(joinFragment.getId(), probeFragment.getId());
assertThat(getSchedulingFragments(schedule)).containsExactly(joinFragment.getId(), probeFragment.getId());
}

@Test
Expand All @@ -246,28 +246,28 @@ public void testSourceStageBroadcastJoinWithPartitionedJoinBuildSide()
new FragmentsEdge(nestedJoinFragment.getId(), joinSourceFragment.getId()),
new FragmentsEdge(nestedJoinBuildFragment.getId(), nestedJoinProbeFragment.getId()),
new FragmentsEdge(nestedJoinProbeFragment.getId(), joinSourceFragment.getId()));
assertThat(getActiveFragments(schedule)).containsExactly(nestedJoinBuildFragment.getId(), nestedJoinFragment.getId());
assertThat(getSchedulingFragments(schedule)).containsExactly(nestedJoinBuildFragment.getId(), nestedJoinFragment.getId());

// Mark nestedJoinFragment and nestedJoinBuildFragment as scheduled.
// joinSourceFragment still has dependency on nestedJoinProbeFragment
nestedJoinStage.setState(SCHEDULED);
nestedJoinBuildStage.setState(FINISHED);
schedule.schedule();
assertThat(getActiveFragments(schedule)).containsExactly(nestedJoinProbeFragment.getId());
assertThat(getSchedulingFragments(schedule)).containsExactly(nestedJoinProbeFragment.getId());

// mark nestedJoinFragment buffer as full, now joinSourceFragment is forced to be scheduled
nestedJoinStage.setAnyTaskBlocked(true);
schedule.schedule();
assertThat(getActiveFragments(schedule)).containsExactly(nestedJoinProbeFragment.getId(), joinSourceFragment.getId());
assertThat(getSchedulingFragments(schedule)).containsExactly(nestedJoinProbeFragment.getId(), joinSourceFragment.getId());

nestedJoinProbeStage.setState(FINISHED);
schedule.schedule();
assertThat(getActiveFragments(schedule)).containsExactly(joinSourceFragment.getId());
assertThat(getSchedulingFragments(schedule)).containsExactly(joinSourceFragment.getId());
}

private Set<PlanFragmentId> getActiveFragments(PhasedExecutionSchedule schedule)
private Set<PlanFragmentId> getSchedulingFragments(PhasedExecutionSchedule schedule)
{
return schedule.getActiveStages().stream()
return schedule.getSchedulingStages().stream()
.map(stage -> stage.getFragment().getId())
.collect(toImmutableSet());
}
Expand Down

0 comments on commit 1ee0cf7

Please sign in to comment.