Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement adaptive remote task request size #15721

Merged
merged 1 commit into from
Jan 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,10 @@ public final class SystemSessionProperties
public static final String ADAPTIVE_PARTIAL_AGGREGATION_ENABLED = "adaptive_partial_aggregation_enabled";
public static final String ADAPTIVE_PARTIAL_AGGREGATION_MIN_ROWS = "adaptive_partial_aggregation_min_rows";
public static final String ADAPTIVE_PARTIAL_AGGREGATION_UNIQUE_ROWS_RATIO_THRESHOLD = "adaptive_partial_aggregation_unique_rows_ratio_threshold";
public static final String REMOTE_TASK_ADAPTIVE_UPDATE_REQUEST_SIZE_ENABLED = "remote_task_adaptive_update_request_size_enabled";
public static final String REMOTE_TASK_MAX_REQUEST_SIZE = "remote_task_max_request_size";
public static final String REMOTE_TASK_REQUEST_SIZE_HEADROOM = "remote_task_request_size_headroom";
public static final String REMOTE_TASK_GUARANTEED_SPLITS_PER_REQUEST = "remote_task_guaranteed_splits_per_request";
public static final String JOIN_PARTITIONED_BUILD_MIN_ROW_COUNT = "join_partitioned_build_min_row_count";
public static final String USE_EXACT_PARTITIONING = "use_exact_partitioning";
public static final String FORCE_SPILLING_JOIN = "force_spilling_join";
Expand Down Expand Up @@ -860,6 +864,26 @@ public SystemSessionProperties(
"Ratio between aggregation output and input rows above which partial aggregation might be adaptively turned off",
optimizerConfig.getAdaptivePartialAggregationUniqueRowsRatioThreshold(),
false),
booleanProperty(
REMOTE_TASK_ADAPTIVE_UPDATE_REQUEST_SIZE_ENABLED,
"Experimental: Enable adaptive adjustment for size of remote task update request",
queryManagerConfig.isEnabledAdaptiveTaskRequestSize(),
false),
dataSizeProperty(
REMOTE_TASK_MAX_REQUEST_SIZE,
"Experimental: Max size of remote task update request",
queryManagerConfig.getMaxRemoteTaskRequestSize(),
false),
dataSizeProperty(
REMOTE_TASK_REQUEST_SIZE_HEADROOM,
"Experimental: Headroom for size of remote task update request",
queryManagerConfig.getRemoteTaskRequestSizeHeadroom(),
false),
integerProperty(
REMOTE_TASK_GUARANTEED_SPLITS_PER_REQUEST,
"Guaranteed splits per remote task request",
queryManagerConfig.getRemoteTaskGuaranteedSplitPerTask(),
false),
longProperty(
JOIN_PARTITIONED_BUILD_MIN_ROW_COUNT,
"Minimum number of join build side rows required to use partitioned join lookup",
Expand Down Expand Up @@ -1554,6 +1578,26 @@ public static double getAdaptivePartialAggregationUniqueRowsRatioThreshold(Sessi
return session.getSystemProperty(ADAPTIVE_PARTIAL_AGGREGATION_UNIQUE_ROWS_RATIO_THRESHOLD, Double.class);
}

public static boolean isRemoteTaskAdaptiveUpdateRequestSizeEnabled(Session session)
{
return session.getSystemProperty(REMOTE_TASK_ADAPTIVE_UPDATE_REQUEST_SIZE_ENABLED, Boolean.class);
}

public static DataSize getMaxRemoteTaskRequestSize(Session session)
{
return session.getSystemProperty(REMOTE_TASK_MAX_REQUEST_SIZE, DataSize.class);
}

public static DataSize getRemoteTaskRequestSizeHeadroom(Session session)
{
return session.getSystemProperty(REMOTE_TASK_REQUEST_SIZE_HEADROOM, DataSize.class);
}

public static int getRemoteTaskGuaranteedSplitsPerRequest(Session session)
{
return session.getSystemProperty(REMOTE_TASK_GUARANTEED_SPLITS_PER_REQUEST, Integer.class);
}

public static long getJoinPartitionedBuildMinRowCount(Session session)
{
return session.getSystemProperty(JOIN_PARTITIONED_BUILD_MIN_ROW_COUNT, Long.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,11 @@ public class QueryManagerConfig
private int maxTasksWaitingForExecutionPerQuery = 10;
private int maxTasksWaitingForNodePerStage = 5;

private boolean enabledAdaptiveTaskRequestSize = true;
private DataSize maxRemoteTaskRequestSize = DataSize.of(8, DataSize.Unit.MEGABYTE);
private DataSize remoteTaskRequestSizeHeadroom = DataSize.of(2, DataSize.Unit.MEGABYTE);
private int remoteTaskGuaranteedSplitPerTask = 3;

private DataSize faultTolerantExecutionTargetTaskInputSize = DataSize.of(4, GIGABYTE);

private int faultTolerantExecutionMinTaskSplitCount = 16;
Expand Down Expand Up @@ -553,6 +558,57 @@ public QueryManagerConfig setMaxTasksWaitingForNodePerStage(int maxTasksWaitingF
return this;
}

public boolean isEnabledAdaptiveTaskRequestSize()
{
return enabledAdaptiveTaskRequestSize;
}

@Config("query.remote-task.enable-adaptive-request-size")
public QueryManagerConfig setEnabledAdaptiveTaskRequestSize(boolean enabledAdaptiveTaskRequestSize)
{
this.enabledAdaptiveTaskRequestSize = enabledAdaptiveTaskRequestSize;
return this;
}

@NotNull
public DataSize getMaxRemoteTaskRequestSize()
{
return maxRemoteTaskRequestSize;
}

@Config("query.remote-task.max-request-size")
public QueryManagerConfig setMaxRemoteTaskRequestSize(DataSize maxRemoteTaskRequestSize)
{
this.maxRemoteTaskRequestSize = maxRemoteTaskRequestSize;
return this;
}

@NotNull
public DataSize getRemoteTaskRequestSizeHeadroom()
{
return remoteTaskRequestSizeHeadroom;
}

@Config("query.remote-task.request-size-headroom")
public QueryManagerConfig setRemoteTaskRequestSizeHeadroom(DataSize remoteTaskRequestSizeHeadroom)
{
this.remoteTaskRequestSizeHeadroom = remoteTaskRequestSizeHeadroom;
return this;
}

@Min(1)
public int getRemoteTaskGuaranteedSplitPerTask()
{
return remoteTaskGuaranteedSplitPerTask;
}

@Config("query.remote-task.guaranteed-splits-per-task")
public QueryManagerConfig setRemoteTaskGuaranteedSplitPerTask(int remoteTaskGuaranteedSplitPerTask)
{
this.remoteTaskGuaranteedSplitPerTask = remoteTaskGuaranteedSplitPerTask;
return this;
}

@NotNull
public DataSize getFaultTolerantExecutionTargetTaskInputSize()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@

import java.lang.ref.WeakReference;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
Expand All @@ -63,7 +64,6 @@
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.base.Verify.verify;
import static com.google.common.collect.ImmutableMap.toImmutableMap;
import static com.google.common.collect.ImmutableSet.toImmutableSet;
import static com.google.common.collect.Iterables.concat;
import static com.google.common.util.concurrent.Futures.immediateVoidFuture;
import static io.trino.SystemSessionProperties.getInitialSplitsPerNode;
Expand All @@ -76,7 +76,6 @@
import static java.lang.String.format;
import static java.util.Objects.requireNonNull;
import static java.util.function.Function.identity;
import static java.util.stream.Collectors.toList;

public class SqlTaskExecution
{
Expand All @@ -97,7 +96,7 @@ public class SqlTaskExecution
private final Map<PlanNodeId, DriverSplitRunnerFactory> driverRunnerFactoriesWithRemoteSource;

@GuardedBy("this")
private long maxAcknowledgedSplit = Long.MIN_VALUE;
private final Map<PlanNodeId, Long> maxAcknowledgedSplitByPlanNode = new HashMap<>();

@GuardedBy("this")
private final List<PlanNodeId> sourceStartOrder;
Expand Down Expand Up @@ -234,40 +233,48 @@ public void addSplitAssignments(List<SplitAssignment> splitAssignments)
private synchronized Set<PlanNodeId> updateSplitAssignments(List<SplitAssignment> splitAssignments)
{
ImmutableSet.Builder<PlanNodeId> updatedUnpartitionedSources = ImmutableSet.builder();
List<SplitAssignment> unacknowledgedSplitAssignment = new ArrayList<>(splitAssignments.size());

// first remove any split that was already acknowledged
long currentMaxAcknowledgedSplit = this.maxAcknowledgedSplit;
splitAssignments = splitAssignments.stream()
.map(assignment -> new SplitAssignment(
assignment.getPlanNodeId(),
assignment.getSplits().stream()
.filter(scheduledSplit -> scheduledSplit.getSequenceId() > currentMaxAcknowledgedSplit)
.collect(toImmutableSet()),
assignment.isNoMoreSplits()))
// drop assignments containing no unacknowledged splits
// the noMoreSplits signal acknowledgement is not tracked but it is okay to deliver it more than once
.filter(assignment -> !assignment.getSplits().isEmpty() || assignment.isNoMoreSplits())
.collect(toList());

// update task with new assignments
for (SplitAssignment assignment : splitAssignments) {
if (driverRunnerFactoriesWithSplitLifeCycle.containsKey(assignment.getPlanNodeId())) {
schedulePartitionedSource(assignment);
for (SplitAssignment splitAssignment : splitAssignments) {
// drop assignments containing no unacknowledged splits
// the noMoreSplits signal acknowledgement is not tracked but it is okay to deliver it more than once
if (!splitAssignment.getSplits().isEmpty() || splitAssignment.isNoMoreSplits()) {
PlanNodeId planNodeId = splitAssignment.getPlanNodeId();
long currentMaxAcknowledgedSplit = maxAcknowledgedSplitByPlanNode.getOrDefault(planNodeId, Long.MIN_VALUE);
long maxAcknowledgedSplit = currentMaxAcknowledgedSplit;
ImmutableSet.Builder builder = ImmutableSet.builder();
for (ScheduledSplit split : splitAssignment.getSplits()) {
long sequenceId = split.getSequenceId();
// previously acknowledged splits can be included in source
if (sequenceId > currentMaxAcknowledgedSplit) {
builder.add(split);
}
if (sequenceId > maxAcknowledgedSplit) {
maxAcknowledgedSplit = sequenceId;
}
}
if (maxAcknowledgedSplit > currentMaxAcknowledgedSplit) {
maxAcknowledgedSplitByPlanNode.put(planNodeId, maxAcknowledgedSplit);
}

unacknowledgedSplitAssignment.add(new SplitAssignment(splitAssignment.getPlanNodeId(), builder.build(), splitAssignment.isNoMoreSplits()));
}
}

// update task with new sources
for (SplitAssignment splitAssignment : unacknowledgedSplitAssignment) {
if (driverRunnerFactoriesWithSplitLifeCycle.containsKey(splitAssignment.getPlanNodeId())) {
schedulePartitionedSource(splitAssignment);
}
else {
// tell existing drivers about the new splits
DriverSplitRunnerFactory factory = driverRunnerFactoriesWithRemoteSource.get(assignment.getPlanNodeId());
factory.enqueueSplits(assignment.getSplits(), assignment.isNoMoreSplits());
updatedUnpartitionedSources.add(assignment.getPlanNodeId());
DriverSplitRunnerFactory factory = driverRunnerFactoriesWithRemoteSource.get(splitAssignment.getPlanNodeId());
factory.enqueueSplits(splitAssignment.getSplits(), splitAssignment.isNoMoreSplits());
updatedUnpartitionedSources.add(splitAssignment.getPlanNodeId());
}
}

// update maxAcknowledgedSplit
maxAcknowledgedSplit = splitAssignments.stream()
.flatMap(source -> source.getSplits().stream())
.mapToLong(ScheduledSplit::getSequenceId)
.max()
.orElse(maxAcknowledgedSplit);
return updatedUnpartitionedSources.build();
}

Expand Down
Loading