Skip to content

Commit

Permalink
Fix SourceDistributionTaskSource
Browse files Browse the repository at this point in the history
When `SplitSource#isFinished` is arbitrary delayed it was possible that
the first check (`boolean includeRemainder = splitSource.isFinished()`)
returns `false`, yet the second check `if (!result.isEmpty() || splitSource.isFinished()) {`
returns `true` and breaks the execution loop without creating a task
with the most recent batch of split included.
  • Loading branch information
arhimondr committed Apr 8, 2022
1 parent 991a59b commit 6831f81
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -714,13 +714,13 @@ public List<TaskDescriptor> getMoreTasks()
List<TaskDescriptor> result = new ArrayList<>();

while (true) {
boolean includeRemainder = splitSource.isFinished();
boolean splitSourceFinished = splitSource.isFinished();

result.addAll(getReadyTasks(
remotelyAccessibleSplitBuffer,
ImmutableList.of(),
new NodeRequirements(catalogRequirement, ImmutableSet.of(), taskMemory),
includeRemainder));
splitSourceFinished));
for (HostAddress remoteHost : locallyAccessibleSplitBuffer.keySet()) {
result.addAll(getReadyTasks(
locallyAccessibleSplitBuffer.get(remoteHost),
Expand All @@ -729,10 +729,10 @@ public List<TaskDescriptor> getMoreTasks()
.map(Map.Entry::getValue)
.collect(toImmutableList()),
new NodeRequirements(catalogRequirement, ImmutableSet.of(remoteHost), taskMemory),
includeRemainder));
splitSourceFinished));
}

if (!result.isEmpty() || splitSource.isFinished()) {
if (!result.isEmpty() || splitSourceFinished) {
break;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.openjdk.jol.info.ClassLayout;
import org.testng.annotations.Test;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.IdentityHashMap;
import java.util.List;
Expand All @@ -58,6 +59,7 @@
import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.collect.Iterables.getOnlyElement;
import static com.google.common.collect.Multimaps.toMultimap;
import static com.google.common.collect.Streams.findLast;
import static io.airlift.slice.SizeOf.estimatedSizeOf;
import static io.airlift.slice.SizeOf.sizeOf;
import static io.airlift.units.DataSize.Unit.BYTE;
Expand Down Expand Up @@ -790,19 +792,60 @@ public void testSourceDistributionTaskSourceWithWeights()
assertTrue(taskSource.isFinished());
}

@Test
public void testSourceDistributionTaskSourceLastIncompleteTaskAlwaysCreated()
{
for (int targetSplitsPerTask = 1; targetSplitsPerTask <= 21; targetSplitsPerTask += 5) {
List<Split> splits = new ArrayList<>();
for (int i = 0; i < targetSplitsPerTask + 1 /* to make last task incomplete with only a single split */; i++) {
splits.add(createWeightedSplit(i, STANDARD_WEIGHT));
}
for (int finishDelayIterations = 1; finishDelayIterations < 20; finishDelayIterations++) {
TaskSource taskSource = createSourceDistributionTaskSource(
new TestingSplitSource(CATALOG, splits, finishDelayIterations),
ImmutableListMultimap.of(),
1,
targetSplitsPerTask,
STANDARD_WEIGHT * targetSplitsPerTask,
targetSplitsPerTask);
List<TaskDescriptor> tasks = readAllTasks(taskSource);
assertThat(tasks).hasSize(2);
TaskDescriptor lastTask = findLast(tasks.stream()).orElseThrow();
assertThat(lastTask.getSplits()).hasSize(1);
}
}
}

private static SourceDistributionTaskSource createSourceDistributionTaskSource(
List<Split> splits,
ListMultimap<PlanNodeId, ExchangeSourceHandle> replicatedSources,
int splitBatchSize,
int minSplitsPerTask,
long splitWeightPerTask,
int maxSplitsPerTask)
{
return createSourceDistributionTaskSource(
new TestingSplitSource(CATALOG, splits),
replicatedSources,
splitBatchSize,
minSplitsPerTask,
splitWeightPerTask,
maxSplitsPerTask);
}

private static SourceDistributionTaskSource createSourceDistributionTaskSource(
SplitSource splitSource,
ListMultimap<PlanNodeId, ExchangeSourceHandle> replicatedSources,
int splitBatchSize,
int minSplitsPerTask,
long splitWeightPerTask,
int maxSplitsPerTask)
{
return new SourceDistributionTaskSource(
new QueryId("query"),
PLAN_NODE_1,
new TableExecuteContextManager(),
new TestingSplitSource(CATALOG, splits),
splitSource,
replicatedSources,
splitBatchSize,
getSplitsTime -> {},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,18 @@ public class TestingSplitSource
{
private final CatalogName catalogName;
private final Iterator<Split> splits;
private int finishDelayRemainingIterations;

public TestingSplitSource(CatalogName catalogName, List<Split> splits)
{
this(catalogName, splits, 0);
}

public TestingSplitSource(CatalogName catalogName, List<Split> splits, int finishDelayIterations)
{
this.catalogName = requireNonNull(catalogName, "catalogName is null");
this.splits = ImmutableList.copyOf(requireNonNull(splits, "splits is null")).iterator();
this.finishDelayRemainingIterations = finishDelayIterations;
}

@Override
Expand Down Expand Up @@ -70,7 +77,7 @@ public void close()
@Override
public boolean isFinished()
{
return !splits.hasNext();
return !splits.hasNext() && finishDelayRemainingIterations-- <= 0;
}

@Override
Expand Down

0 comments on commit 6831f81

Please sign in to comment.