Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement adaptive remote task request size #10013

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,10 @@ public final class SystemSessionProperties
public static final String TIME_ZONE_ID = "time_zone_id";
public static final String LEGACY_CATALOG_ROLES = "legacy_catalog_roles";
public static final String INCREMENTAL_HASH_ARRAY_LOAD_FACTOR_ENABLED = "incremental_hash_array_load_factor_enabled";
public static final String ENABLE_ADAPTIVE_REMOTE_TASK_REQUEST_SIZE = "enable_adaptive_remote_task_request_size";
public static final String MAX_REMOTE_TASK_REQUEST_SIZE = "max_remote_task_request_size";
public static final String REMOTE_TASK_REQUEST_SIZE_HEADROOM = "remote_task_request_size_headroom";
public static final String REMOTE_TASK_GUARANTEED_SPLITS_PER_REQUEST = "remote_task_guaranteed_splits_per_request";

private final List<PropertyMetadata<?>> sessionProperties;

Expand Down Expand Up @@ -666,6 +670,26 @@ public SystemSessionProperties(
INCREMENTAL_HASH_ARRAY_LOAD_FACTOR_ENABLED,
"Use smaller load factor for small hash arrays in order to improve performance",
featuresConfig.isIncrementalHashArrayLoadFactorEnabled(),
false),
booleanProperty(
ENABLE_ADAPTIVE_REMOTE_TASK_REQUEST_SIZE,
"Experimental: Enable adaptive adjustment for size of remote task update request",
queryManagerConfig.isEnabledAdaptiveTaskRequestSize(),
false),
dataSizeProperty(
MAX_REMOTE_TASK_REQUEST_SIZE,
"Experimental: Max size of remote task update request",
queryManagerConfig.getMaxRemoteTaskRequestSize(),
false),
dataSizeProperty(
REMOTE_TASK_REQUEST_SIZE_HEADROOM,
"Experimental: Headroom for size of remote task update request",
queryManagerConfig.getRemoteTaskRequestSizeHeadroom(),
false),
integerProperty(
REMOTE_TASK_GUARANTEED_SPLITS_PER_REQUEST,
"Guaranteed splits per remote task request",
queryManagerConfig.getRemoteTaskGuaranteedSplitPerTask(),
false));
}

Expand Down Expand Up @@ -1184,4 +1208,24 @@ public static boolean isIncrementalHashArrayLoadFactorEnabled(Session session)
{
return session.getSystemProperty(INCREMENTAL_HASH_ARRAY_LOAD_FACTOR_ENABLED, Boolean.class);
}

public static boolean isEnableAdaptiveTaskRequestSize(Session session)
{
return session.getSystemProperty(ENABLE_ADAPTIVE_REMOTE_TASK_REQUEST_SIZE, Boolean.class);
}

public static DataSize getMaxRemoteTaskRequestSize(Session session)
{
return session.getSystemProperty(MAX_REMOTE_TASK_REQUEST_SIZE, DataSize.class);
}

public static DataSize getRemoteTaskRequestSizeHeadroom(Session session)
{
return session.getSystemProperty(REMOTE_TASK_REQUEST_SIZE_HEADROOM, DataSize.class);
}

public static int getRemoteTaskGuaranteedSplitsPerRequest(Session session)
{
return session.getSystemProperty(REMOTE_TASK_GUARANTEED_SPLITS_PER_REQUEST, Integer.class);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,11 @@ public class QueryManagerConfig
private Duration remoteTaskMaxErrorDuration = new Duration(5, TimeUnit.MINUTES);
private int remoteTaskMaxCallbackThreads = 1000;

private boolean enabledAdaptiveTaskRequestSize;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why isn't it 'true'?

private DataSize maxRemoteTaskRequestSize = DataSize.of(8, DataSize.Unit.MEGABYTE);
private DataSize remoteTaskRequestSizeHeadroom = DataSize.of(2, DataSize.Unit.MEGABYTE);
private int remoteTaskGuaranteedSplitPerTask = 3;

private String queryExecutionPolicy = "all-at-once";
private Duration queryMaxRunTime = new Duration(100, TimeUnit.DAYS);
private Duration queryMaxExecutionTime = new Duration(100, TimeUnit.DAYS);
Expand Down Expand Up @@ -338,6 +343,57 @@ public QueryManagerConfig setRemoteTaskMaxCallbackThreads(int remoteTaskMaxCallb
return this;
}

public boolean isEnabledAdaptiveTaskRequestSize()
{
return enabledAdaptiveTaskRequestSize;
}

@Config("query.remote-task.enable-adaptive-request-size")
public QueryManagerConfig setEnabledAdaptiveTaskRequestSize(boolean enabledAdaptiveTaskRequestSize)
{
this.enabledAdaptiveTaskRequestSize = enabledAdaptiveTaskRequestSize;
return this;
}

@NotNull
public DataSize getMaxRemoteTaskRequestSize()
{
return maxRemoteTaskRequestSize;
}

@Config("query.remote-task.max-request-size")
public QueryManagerConfig setMaxRemoteTaskRequestSize(DataSize maxRemoteTaskRequestSize)
{
this.maxRemoteTaskRequestSize = maxRemoteTaskRequestSize;
return this;
}

@NotNull
public DataSize getRemoteTaskRequestSizeHeadroom()
{
return remoteTaskRequestSizeHeadroom;
}

@Config("query.remote-task.request-size-headroom")
public QueryManagerConfig setRemoteTaskRequestSizeHeadroom(DataSize remoteTaskRequestSizeHeadroom)
{
this.remoteTaskRequestSizeHeadroom = remoteTaskRequestSizeHeadroom;
return this;
}

@Min(1)
public int getRemoteTaskGuaranteedSplitPerTask()
{
return remoteTaskGuaranteedSplitPerTask;
}

@Config("query.remote-task.guaranteed-splits-per-task")
public QueryManagerConfig setRemoteTaskGuaranteedSplitPerTask(int remoteTaskGuaranteedSplitPerTask)
{
this.remoteTaskGuaranteedSplitPerTask = remoteTaskGuaranteedSplitPerTask;
return this;
}

@NotNull
public String getQueryExecutionPolicy()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ public class SqlTaskExecution
private final ConcurrentMap<PlanNodeId, TaskSource> unpartitionedSources = new ConcurrentHashMap<>();

@GuardedBy("this")
private long maxAcknowledgedSplit = Long.MIN_VALUE;
private final Map<PlanNodeId, Long> maxAcknowledgedSplitByPlanNode = new HashMap<>();

@GuardedBy("this")
private final SchedulingLifespanManager schedulingLifespanManager;
Expand Down Expand Up @@ -325,12 +325,11 @@ private synchronized Map<PlanNodeId, TaskSource> updateSources(List<TaskSource>
Map<PlanNodeId, TaskSource> updatedUnpartitionedSources = new HashMap<>();

// first remove any split that was already acknowledged
long currentMaxAcknowledgedSplit = this.maxAcknowledgedSplit;
sources = sources.stream()
.map(source -> new TaskSource(
source.getPlanNodeId(),
source.getSplits().stream()
.filter(scheduledSplit -> scheduledSplit.getSequenceId() > currentMaxAcknowledgedSplit)
.filter(scheduledSplit -> scheduledSplit.getSequenceId() > maxAcknowledgedSplitByPlanNode.getOrDefault(source.getPlanNodeId(), Long.MIN_VALUE))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There could be lots of splits. In this case, there will be lots of map accesses. It would be better to do like the below,

            .map(source -> {
                long currentMaxAcknowledgedSplit = maxAcknowledgedSplitByPlanNode.getOrDefault(source.getPlanNodeId(), Long.MIN_VALUE);
                return new TaskSource(
                        source.getPlanNodeId(),
                        source.getSplits().stream()
                                .filter(scheduledSplit -> scheduledSplit.getSequenceId() > currentMaxAcknowledgedSplit)
                                .collect(Collectors.toSet()),
                        // Like splits, noMoreSplitsForLifespan could be pruned so that only new items will be processed.
                        // This is not happening here because correctness won't be compromised due to duplicate events for noMoreSplitsForLifespan.
                        source.getNoMoreSplitsForLifespan(),
                        source.isNoMoreSplits());
            })

.collect(Collectors.toSet()),
// Like splits, noMoreSplitsForLifespan could be pruned so that only new items will be processed.
// This is not happening here because correctness won't be compromised due to duplicate events for noMoreSplitsForLifespan.
Expand All @@ -354,11 +353,18 @@ private synchronized Map<PlanNodeId, TaskSource> updateSources(List<TaskSource>
}

// update maxAcknowledgedSplit
maxAcknowledgedSplit = sources.stream()
.flatMap(source -> source.getSplits().stream())
.mapToLong(ScheduledSplit::getSequenceId)
.max()
.orElse(maxAcknowledgedSplit);
for (TaskSource taskSource : sources) {
long maxAcknowledgedSplit = taskSource.getSplits().stream()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When taskSource.getSplits().size() == 0, it can 'continue;'.

.mapToLong(ScheduledSplit::getSequenceId)
.max()
.orElse(Long.MIN_VALUE);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If taskSource.getSplits().size() == 0 is already checked, .orElse(Long.MIN_VALUE) is not needed. Instead of that, just '.getAsLong()'.

PlanNodeId planNodeId = taskSource.getPlanNodeId();

if (!maxAcknowledgedSplitByPlanNode.containsKey(planNodeId)) {
maxAcknowledgedSplitByPlanNode.put(planNodeId, Long.MIN_VALUE);
}
Comment on lines +363 to +365
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If taskSource.getSplits().size() == 0 is already checked, this is not needed.

maxAcknowledgedSplitByPlanNode.computeIfPresent(planNodeId, (key, val) -> maxAcknowledgedSplit > val ? maxAcknowledgedSplit : val);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If taskSource.getSplits().size() == 0 is already checked, it should throw when val >= maxAcknowledgedSplit.

e.g.

            maxAcknowledgedSplitByPlanNode.compute(planNodeId, (key, val) -> {
                if (val == null) {
                    return maxAcknowledgedSplit;
                }
                if (val >= maxAcknowledgedSplit) {
                    throw new IllegalStateException(format("%s - splits are out of order? planNodeId=%s, newMax=%d, currentMax=%d", taskId, planNodeId, maxAcknowledgedSplit, val));
                }
                return maxAcknowledgedSplit;
            });

}
return updatedUnpartitionedSources;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import com.google.common.collect.HashMultimap;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Multimap;
import com.google.common.collect.SetMultimap;
import com.google.common.net.HttpHeaders;
Expand Down Expand Up @@ -64,6 +65,7 @@

import java.net.URI;
import java.util.Collection;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand All @@ -79,6 +81,7 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static com.google.common.base.MoreObjects.toStringHelper;
Expand All @@ -91,7 +94,11 @@
import static io.airlift.http.client.Request.Builder.prepareDelete;
import static io.airlift.http.client.Request.Builder.preparePost;
import static io.airlift.http.client.StaticBodyGenerator.createStaticBodyGenerator;
import static io.trino.SystemSessionProperties.getMaxRemoteTaskRequestSize;
import static io.trino.SystemSessionProperties.getMaxUnacknowledgedSplitsPerTask;
import static io.trino.SystemSessionProperties.getRemoteTaskGuaranteedSplitsPerRequest;
import static io.trino.SystemSessionProperties.getRemoteTaskRequestSizeHeadroom;
import static io.trino.SystemSessionProperties.isEnableAdaptiveTaskRequestSize;
import static io.trino.execution.DynamicFiltersCollector.INITIAL_DYNAMIC_FILTERS_VERSION;
import static io.trino.execution.TaskInfo.createInitialTask;
import static io.trino.execution.TaskState.ABORTED;
Expand Down Expand Up @@ -172,6 +179,14 @@ public final class HttpRemoteTask
private final AtomicBoolean started = new AtomicBoolean(false);
private final AtomicBoolean aborting = new AtomicBoolean(false);

@GuardedBy("this")
private int splitBatchSize;

private final int guaranteedSplitsPerRequest;
private final long maxRequestSize;
private final long requestSizeHeadroom;
private final boolean enableAdaptiveTaskRequestSize;

public HttpRemoteTask(
Session session,
TaskId taskId,
Expand Down Expand Up @@ -235,6 +250,12 @@ public HttpRemoteTask(
}
maxUnacknowledgedSplits = getMaxUnacknowledgedSplitsPerTask(session);

this.guaranteedSplitsPerRequest = getRemoteTaskGuaranteedSplitsPerRequest(session);
this.maxRequestSize = getMaxRemoteTaskRequestSize(session).toBytes();
this.requestSizeHeadroom = getRemoteTaskRequestSizeHeadroom(session).toBytes();
this.splitBatchSize = maxUnacknowledgedSplits;
this.enableAdaptiveTaskRequestSize = isEnableAdaptiveTaskRequestSize(session);

int pendingSourceSplitCount = 0;
long pendingSourceSplitsWeight = 0;
for (PlanNodeId planNodeId : planFragment.getPartitionedSources()) {
Expand Down Expand Up @@ -557,6 +578,10 @@ private synchronized void processTaskUpdate(TaskInfo newValue, List<TaskSource>
pendingSourceSplitsWeight -= removedWeight;
}
}
// set needsUpdate to true when there are sill pending splits
if (pendingSplits.size() > 0) {
needsUpdate.set(true);
}
// Update node level split tracker before split queue space to ensure it's up to date before waking up the scheduler
partitionedSplitCountTracker.setPartitionedSplits(getPartitionedSplitsInfo());
updateSplitQueueSpace();
Expand All @@ -580,6 +605,27 @@ private synchronized void triggerUpdate()
sendUpdate();
}

/**
* Adaptively adjust batch size to meet expected request size:
* If requestSize is not equal to expectedSize, this function will try to estimate and adjust the batch size proportionally based on
* current nums of splits and size of request.
*/
private synchronized void adjustSplitBatchSize(List<TaskSource> sources, long requestSize, long expectedSize)
{
int numSplits = 0;
for (TaskSource taskSource : sources) {
numSplits = Math.max(numSplits, taskSource.getSplits().size());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should find max only for partitioned sources.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With colocation joins with bucketed tables, there could be more than one partitioned sources. In that case, this logic will become awkward. Please apply this logic only when there is one partitioned source.

}
if (requestSize <= 0 || numSplits == 0) {
return;
}
if ((requestSize > expectedSize && splitBatchSize > guaranteedSplitsPerRequest) || (requestSize < expectedSize && splitBatchSize < maxUnacknowledgedSplits)) {
int newSplitBatchSize = (int) (numSplits * ((double) expectedSize / requestSize));
newSplitBatchSize = Math.max(guaranteedSplitsPerRequest, Math.min(maxUnacknowledgedSplits, newSplitBatchSize));
splitBatchSize = newSplitBatchSize;
}
}

private synchronized void sendUpdate()
{
TaskStatus taskStatus = getTaskStatus();
Expand Down Expand Up @@ -614,6 +660,20 @@ private synchronized void sendUpdate()
outputBuffers.get(),
dynamicFilterDomains.getDynamicFilterDomains());
byte[] taskUpdateRequestJson = taskUpdateRequestCodec.toJsonBytes(updateRequest);

if (enableAdaptiveTaskRequestSize) {
int oldSplitBatchSize = splitBatchSize;
// try to adjust batch size to meet expected request size: (requestSizeLimit - requestSizeLimitHeadroom)
adjustSplitBatchSize(sources, taskUpdateRequestJson.length, maxRequestSize - requestSizeHeadroom);
// abandon current request and reschedule update if size of request body exceeds requestSizeLimit
// and splitBatchSize is updated
if (taskUpdateRequestJson.length > maxRequestSize && splitBatchSize < oldSplitBatchSize) {
log.debug("%s - current taskUpdateRequestJson exceeded limit: %d, abandon.", taskId, taskUpdateRequestJson.length);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

log.info?

scheduleUpdate();
return;
}
}

if (fragment.isPresent()) {
stats.updateWithPlanBytes(taskUpdateRequestJson.length);
}
Expand Down Expand Up @@ -646,21 +706,36 @@ private synchronized void sendUpdate()

private synchronized List<TaskSource> getSources()
{
return Stream.concat(planFragment.getPartitionedSourceNodes().stream(), planFragment.getRemoteSourceNodes().stream())
.filter(Objects::nonNull)
.map(PlanNode::getId)
.map(this::getSource)
.filter(Objects::nonNull)
.collect(toImmutableList());
return Stream.concat(
planFragment.getPartitionedSourceNodes().stream()
.filter(Objects::nonNull)
.map(PlanNode::getId)
.map(planNodeId -> getSource(planNodeId, true)),
planFragment.getRemoteSourceNodes().stream()
.filter(Objects::nonNull)
.map(PlanNode::getId)
.map(planNodeId -> getSource(planNodeId, false))
).filter(Objects::nonNull).collect(toImmutableList());
}

private synchronized TaskSource getSource(PlanNodeId planNodeId)
private synchronized TaskSource getSource(PlanNodeId planNodeId, boolean isPartitionedSource)
{
Set<ScheduledSplit> splits = pendingSplits.get(planNodeId);
boolean pendingNoMoreSplits = Boolean.TRUE.equals(this.noMoreSplits.get(planNodeId));
boolean noMoreSplits = this.noMoreSplits.containsKey(planNodeId);
Set<Lifespan> noMoreSplitsForLifespan = pendingNoMoreSplitsForLifespan.get(planNodeId);

// only apply batchSize to partitioned sources
if (isPartitionedSource && splitBatchSize < splits.size()) {
splits = splits.stream()
.sorted(Comparator.comparingLong(ScheduledSplit::getSequenceId))
.limit(splitBatchSize)
.collect(Collectors.toSet());
// if not last batch, we need to defer setting no more splits
noMoreSplits = false;
noMoreSplitsForLifespan = ImmutableSet.of();
}

TaskSource element = null;
if (!splits.isEmpty() || !noMoreSplitsForLifespan.isEmpty() || pendingNoMoreSplits) {
element = new TaskSource(planNodeId, splits, noMoreSplitsForLifespan, noMoreSplits);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,11 @@ public void testDefaults()
.setQueryMaxCpuTime(new Duration(1_000_000_000, DAYS))
.setQueryMaxScanPhysicalBytes(null)
.setRequiredWorkers(1)
.setRequiredWorkersMaxWait(new Duration(5, MINUTES)));
.setRequiredWorkersMaxWait(new Duration(5, MINUTES))
.setEnabledAdaptiveTaskRequestSize(false)
.setMaxRemoteTaskRequestSize(DataSize.of(8, DataSize.Unit.MEGABYTE))
.setRemoteTaskRequestSizeHeadroom(DataSize.of(2, DataSize.Unit.MEGABYTE))
.setRemoteTaskGuaranteedSplitPerTask(3));
}

@Test
Expand Down Expand Up @@ -87,6 +91,10 @@ public void testExplicitPropertyMappings()
.put("query.max-scan-physical-bytes", "1kB")
.put("query-manager.required-workers", "333")
.put("query-manager.required-workers-max-wait", "33m")
.put("query.remote-task.enable-adaptive-request-size", "true")
.put("query.remote-task.max-request-size", "10MB")
.put("query.remote-task.request-size-headroom", "1MB")
.put("query.remote-task.guaranteed-splits-per-task", "5")
.build();

QueryManagerConfig expected = new QueryManagerConfig()
Expand All @@ -112,7 +120,11 @@ public void testExplicitPropertyMappings()
.setQueryMaxCpuTime(new Duration(2, DAYS))
.setQueryMaxScanPhysicalBytes(DataSize.of(1, KILOBYTE))
.setRequiredWorkers(333)
.setRequiredWorkersMaxWait(new Duration(33, MINUTES));
.setRequiredWorkersMaxWait(new Duration(33, MINUTES))
.setEnabledAdaptiveTaskRequestSize(true)
.setMaxRemoteTaskRequestSize(DataSize.of(10, DataSize.Unit.MEGABYTE))
.setRemoteTaskRequestSizeHeadroom(DataSize.of(1, DataSize.Unit.MEGABYTE))
.setRemoteTaskGuaranteedSplitPerTask(5);

assertFullMapping(properties, expected);
}
Expand Down
Loading