diff --git a/core/trino-main/src/main/java/io/trino/SystemSessionProperties.java b/core/trino-main/src/main/java/io/trino/SystemSessionProperties.java index ead8f13a9e05..5cabfdf87a76 100644 --- a/core/trino-main/src/main/java/io/trino/SystemSessionProperties.java +++ b/core/trino-main/src/main/java/io/trino/SystemSessionProperties.java @@ -173,6 +173,10 @@ public final class SystemSessionProperties public static final String ADAPTIVE_PARTIAL_AGGREGATION_ENABLED = "adaptive_partial_aggregation_enabled"; public static final String ADAPTIVE_PARTIAL_AGGREGATION_MIN_ROWS = "adaptive_partial_aggregation_min_rows"; public static final String ADAPTIVE_PARTIAL_AGGREGATION_UNIQUE_ROWS_RATIO_THRESHOLD = "adaptive_partial_aggregation_unique_rows_ratio_threshold"; + public static final String REMOTE_TASK_ADAPTIVE_UPDATE_REQUEST_SIZE_ENABLED = "remote_task_adaptive_update_request_size_enabled"; + public static final String REMOTE_TASK_MAX_REQUEST_SIZE = "remote_task_max_request_size"; + public static final String REMOTE_TASK_REQUEST_SIZE_HEADROOM = "remote_task_request_size_headroom"; + public static final String REMOTE_TASK_GUARANTEED_SPLITS_PER_REQUEST = "remote_task_guaranteed_splits_per_request"; public static final String JOIN_PARTITIONED_BUILD_MIN_ROW_COUNT = "join_partitioned_build_min_row_count"; public static final String USE_EXACT_PARTITIONING = "use_exact_partitioning"; public static final String FORCE_SPILLING_JOIN = "force_spilling_join"; @@ -860,6 +864,26 @@ public SystemSessionProperties( "Ratio between aggregation output and input rows above which partial aggregation might be adaptively turned off", optimizerConfig.getAdaptivePartialAggregationUniqueRowsRatioThreshold(), false), + booleanProperty( + REMOTE_TASK_ADAPTIVE_UPDATE_REQUEST_SIZE_ENABLED, + "Experimental: Enable adaptive adjustment for size of remote task update request", + queryManagerConfig.isEnabledAdaptiveTaskRequestSize(), + false), + dataSizeProperty( + REMOTE_TASK_MAX_REQUEST_SIZE, + "Experimental: Max size of remote task update request", + queryManagerConfig.getMaxRemoteTaskRequestSize(), + false), + dataSizeProperty( + REMOTE_TASK_REQUEST_SIZE_HEADROOM, + "Experimental: Headroom for size of remote task update request", + queryManagerConfig.getRemoteTaskRequestSizeHeadroom(), + false), + integerProperty( + REMOTE_TASK_GUARANTEED_SPLITS_PER_REQUEST, + "Guaranteed splits per remote task request", + queryManagerConfig.getRemoteTaskGuaranteedSplitPerTask(), + false), longProperty( JOIN_PARTITIONED_BUILD_MIN_ROW_COUNT, "Minimum number of join build side rows required to use partitioned join lookup", @@ -1554,6 +1578,26 @@ public static double getAdaptivePartialAggregationUniqueRowsRatioThreshold(Sessi return session.getSystemProperty(ADAPTIVE_PARTIAL_AGGREGATION_UNIQUE_ROWS_RATIO_THRESHOLD, Double.class); } + public static boolean isRemoteTaskAdaptiveUpdateRequestSizeEnabled(Session session) + { + return session.getSystemProperty(REMOTE_TASK_ADAPTIVE_UPDATE_REQUEST_SIZE_ENABLED, Boolean.class); + } + + public static DataSize getMaxRemoteTaskRequestSize(Session session) + { + return session.getSystemProperty(REMOTE_TASK_MAX_REQUEST_SIZE, DataSize.class); + } + + public static DataSize getRemoteTaskRequestSizeHeadroom(Session session) + { + return session.getSystemProperty(REMOTE_TASK_REQUEST_SIZE_HEADROOM, DataSize.class); + } + + public static int getRemoteTaskGuaranteedSplitsPerRequest(Session session) + { + return session.getSystemProperty(REMOTE_TASK_GUARANTEED_SPLITS_PER_REQUEST, Integer.class); + } + public static long getJoinPartitionedBuildMinRowCount(Session session) { return session.getSystemProperty(JOIN_PARTITIONED_BUILD_MIN_ROW_COUNT, Long.class); diff --git a/core/trino-main/src/main/java/io/trino/execution/QueryManagerConfig.java b/core/trino-main/src/main/java/io/trino/execution/QueryManagerConfig.java index e391706a4c45..c6d43ad03021 100644 --- a/core/trino-main/src/main/java/io/trino/execution/QueryManagerConfig.java +++ b/core/trino-main/src/main/java/io/trino/execution/QueryManagerConfig.java @@ -92,6 +92,11 @@ public class QueryManagerConfig private int maxTasksWaitingForExecutionPerQuery = 10; private int maxTasksWaitingForNodePerStage = 5; + private boolean enabledAdaptiveTaskRequestSize = true; + private DataSize maxRemoteTaskRequestSize = DataSize.of(8, DataSize.Unit.MEGABYTE); + private DataSize remoteTaskRequestSizeHeadroom = DataSize.of(2, DataSize.Unit.MEGABYTE); + private int remoteTaskGuaranteedSplitPerTask = 3; + private DataSize faultTolerantExecutionTargetTaskInputSize = DataSize.of(4, GIGABYTE); private int faultTolerantExecutionMinTaskSplitCount = 16; @@ -553,6 +558,57 @@ public QueryManagerConfig setMaxTasksWaitingForNodePerStage(int maxTasksWaitingF return this; } + public boolean isEnabledAdaptiveTaskRequestSize() + { + return enabledAdaptiveTaskRequestSize; + } + + @Config("query.remote-task.enable-adaptive-request-size") + public QueryManagerConfig setEnabledAdaptiveTaskRequestSize(boolean enabledAdaptiveTaskRequestSize) + { + this.enabledAdaptiveTaskRequestSize = enabledAdaptiveTaskRequestSize; + return this; + } + + @NotNull + public DataSize getMaxRemoteTaskRequestSize() + { + return maxRemoteTaskRequestSize; + } + + @Config("query.remote-task.max-request-size") + public QueryManagerConfig setMaxRemoteTaskRequestSize(DataSize maxRemoteTaskRequestSize) + { + this.maxRemoteTaskRequestSize = maxRemoteTaskRequestSize; + return this; + } + + @NotNull + public DataSize getRemoteTaskRequestSizeHeadroom() + { + return remoteTaskRequestSizeHeadroom; + } + + @Config("query.remote-task.request-size-headroom") + public QueryManagerConfig setRemoteTaskRequestSizeHeadroom(DataSize remoteTaskRequestSizeHeadroom) + { + this.remoteTaskRequestSizeHeadroom = remoteTaskRequestSizeHeadroom; + return this; + } + + @Min(1) + public int getRemoteTaskGuaranteedSplitPerTask() + { + return remoteTaskGuaranteedSplitPerTask; + } + + @Config("query.remote-task.guaranteed-splits-per-task") + public QueryManagerConfig setRemoteTaskGuaranteedSplitPerTask(int remoteTaskGuaranteedSplitPerTask) + { + this.remoteTaskGuaranteedSplitPerTask = remoteTaskGuaranteedSplitPerTask; + return this; + } + @NotNull public DataSize getFaultTolerantExecutionTargetTaskInputSize() { diff --git a/core/trino-main/src/main/java/io/trino/execution/SqlTaskExecution.java b/core/trino-main/src/main/java/io/trino/execution/SqlTaskExecution.java index e4ec954b1ce9..537d2ace4a2b 100644 --- a/core/trino-main/src/main/java/io/trino/execution/SqlTaskExecution.java +++ b/core/trino-main/src/main/java/io/trino/execution/SqlTaskExecution.java @@ -44,6 +44,7 @@ import java.lang.ref.WeakReference; import java.util.ArrayList; +import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -63,7 +64,6 @@ import static com.google.common.base.Preconditions.checkState; import static com.google.common.base.Verify.verify; import static com.google.common.collect.ImmutableMap.toImmutableMap; -import static com.google.common.collect.ImmutableSet.toImmutableSet; import static com.google.common.collect.Iterables.concat; import static com.google.common.util.concurrent.Futures.immediateVoidFuture; import static io.trino.SystemSessionProperties.getInitialSplitsPerNode; @@ -76,7 +76,6 @@ import static java.lang.String.format; import static java.util.Objects.requireNonNull; import static java.util.function.Function.identity; -import static java.util.stream.Collectors.toList; public class SqlTaskExecution { @@ -97,7 +96,7 @@ public class SqlTaskExecution private final Map driverRunnerFactoriesWithRemoteSource; @GuardedBy("this") - private long maxAcknowledgedSplit = Long.MIN_VALUE; + private final Map maxAcknowledgedSplitByPlanNode = new HashMap<>(); @GuardedBy("this") private final List sourceStartOrder; @@ -234,40 +233,48 @@ public void addSplitAssignments(List splitAssignments) private synchronized Set updateSplitAssignments(List splitAssignments) { ImmutableSet.Builder updatedUnpartitionedSources = ImmutableSet.builder(); + List unacknowledgedSplitAssignment = new ArrayList<>(splitAssignments.size()); // first remove any split that was already acknowledged - long currentMaxAcknowledgedSplit = this.maxAcknowledgedSplit; - splitAssignments = splitAssignments.stream() - .map(assignment -> new SplitAssignment( - assignment.getPlanNodeId(), - assignment.getSplits().stream() - .filter(scheduledSplit -> scheduledSplit.getSequenceId() > currentMaxAcknowledgedSplit) - .collect(toImmutableSet()), - assignment.isNoMoreSplits())) - // drop assignments containing no unacknowledged splits - // the noMoreSplits signal acknowledgement is not tracked but it is okay to deliver it more than once - .filter(assignment -> !assignment.getSplits().isEmpty() || assignment.isNoMoreSplits()) - .collect(toList()); - - // update task with new assignments - for (SplitAssignment assignment : splitAssignments) { - if (driverRunnerFactoriesWithSplitLifeCycle.containsKey(assignment.getPlanNodeId())) { - schedulePartitionedSource(assignment); + for (SplitAssignment splitAssignment : splitAssignments) { + // drop assignments containing no unacknowledged splits + // the noMoreSplits signal acknowledgement is not tracked but it is okay to deliver it more than once + if (!splitAssignment.getSplits().isEmpty() || splitAssignment.isNoMoreSplits()) { + PlanNodeId planNodeId = splitAssignment.getPlanNodeId(); + long currentMaxAcknowledgedSplit = maxAcknowledgedSplitByPlanNode.getOrDefault(planNodeId, Long.MIN_VALUE); + long maxAcknowledgedSplit = currentMaxAcknowledgedSplit; + ImmutableSet.Builder builder = ImmutableSet.builder(); + for (ScheduledSplit split : splitAssignment.getSplits()) { + long sequenceId = split.getSequenceId(); + // previously acknowledged splits can be included in source + if (sequenceId > currentMaxAcknowledgedSplit) { + builder.add(split); + } + if (sequenceId > maxAcknowledgedSplit) { + maxAcknowledgedSplit = sequenceId; + } + } + if (maxAcknowledgedSplit > currentMaxAcknowledgedSplit) { + maxAcknowledgedSplitByPlanNode.put(planNodeId, maxAcknowledgedSplit); + } + + unacknowledgedSplitAssignment.add(new SplitAssignment(splitAssignment.getPlanNodeId(), builder.build(), splitAssignment.isNoMoreSplits())); + } + } + + // update task with new sources + for (SplitAssignment splitAssignment : unacknowledgedSplitAssignment) { + if (driverRunnerFactoriesWithSplitLifeCycle.containsKey(splitAssignment.getPlanNodeId())) { + schedulePartitionedSource(splitAssignment); } else { // tell existing drivers about the new splits - DriverSplitRunnerFactory factory = driverRunnerFactoriesWithRemoteSource.get(assignment.getPlanNodeId()); - factory.enqueueSplits(assignment.getSplits(), assignment.isNoMoreSplits()); - updatedUnpartitionedSources.add(assignment.getPlanNodeId()); + DriverSplitRunnerFactory factory = driverRunnerFactoriesWithRemoteSource.get(splitAssignment.getPlanNodeId()); + factory.enqueueSplits(splitAssignment.getSplits(), splitAssignment.isNoMoreSplits()); + updatedUnpartitionedSources.add(splitAssignment.getPlanNodeId()); } } - // update maxAcknowledgedSplit - maxAcknowledgedSplit = splitAssignments.stream() - .flatMap(source -> source.getSplits().stream()) - .mapToLong(ScheduledSplit::getSequenceId) - .max() - .orElse(maxAcknowledgedSplit); return updatedUnpartitionedSources.build(); } diff --git a/core/trino-main/src/main/java/io/trino/server/remotetask/HttpRemoteTask.java b/core/trino-main/src/main/java/io/trino/server/remotetask/HttpRemoteTask.java index d98abd50ca66..e20960928606 100644 --- a/core/trino-main/src/main/java/io/trino/server/remotetask/HttpRemoteTask.java +++ b/core/trino-main/src/main/java/io/trino/server/remotetask/HttpRemoteTask.java @@ -13,6 +13,7 @@ */ package io.trino.server.remotetask; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Supplier; import com.google.common.collect.HashMultimap; import com.google.common.collect.ImmutableList; @@ -68,7 +69,9 @@ import java.net.URI; import java.util.Collection; +import java.util.Comparator; import java.util.HashMap; +import java.util.LinkedHashSet; import java.util.List; import java.util.Map; import java.util.Map.Entry; @@ -84,6 +87,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; import java.util.stream.Stream; import static com.google.common.base.MoreObjects.toStringHelper; @@ -96,7 +100,11 @@ import static io.airlift.http.client.Request.Builder.prepareDelete; import static io.airlift.http.client.Request.Builder.preparePost; import static io.airlift.http.client.StaticBodyGenerator.createStaticBodyGenerator; +import static io.trino.SystemSessionProperties.getMaxRemoteTaskRequestSize; import static io.trino.SystemSessionProperties.getMaxUnacknowledgedSplitsPerTask; +import static io.trino.SystemSessionProperties.getRemoteTaskGuaranteedSplitsPerRequest; +import static io.trino.SystemSessionProperties.getRemoteTaskRequestSizeHeadroom; +import static io.trino.SystemSessionProperties.isRemoteTaskAdaptiveUpdateRequestSizeEnabled; import static io.trino.execution.DynamicFiltersCollector.INITIAL_DYNAMIC_FILTERS_VERSION; import static io.trino.execution.TaskInfo.createInitialTask; import static io.trino.execution.TaskState.ABORTED; @@ -150,6 +158,9 @@ public final class HttpRemoteTask @GuardedBy("this") private OptionalLong whenSplitQueueHasSpaceThreshold = OptionalLong.empty(); + @VisibleForTesting + final AtomicInteger splitBatchSize; + private final boolean summarizeTaskInfo; private final HttpClient httpClient; @@ -171,6 +182,11 @@ public final class HttpRemoteTask private final AtomicBoolean started = new AtomicBoolean(false); private final AtomicBoolean aborting = new AtomicBoolean(false); + private final int guaranteedSplitsPerRequest; + private final long maxRequestSizeInBytes; + private final long requestSizeHeadroomInBytes; + private final boolean adaptiveUpdateRequestSizeEnabled; + public HttpRemoteTask( Session session, TaskId taskId, @@ -238,6 +254,19 @@ public HttpRemoteTask( } maxUnacknowledgedSplits = getMaxUnacknowledgedSplitsPerTask(session); + this.guaranteedSplitsPerRequest = getRemoteTaskGuaranteedSplitsPerRequest(session); + this.maxRequestSizeInBytes = getMaxRemoteTaskRequestSize(session).toBytes(); + this.requestSizeHeadroomInBytes = getRemoteTaskRequestSizeHeadroom(session).toBytes(); + this.splitBatchSize = new AtomicInteger(maxUnacknowledgedSplits); + long numOfPartitionedSources = planFragment.getPartitionedSources().size(); + // Currently it supports only when there is one partitioned source. + // TODO. https://github.com/trinodb/trino/issues/15820 + this.adaptiveUpdateRequestSizeEnabled = numOfPartitionedSources == 1 ? isRemoteTaskAdaptiveUpdateRequestSizeEnabled(session) : false; + if (numOfPartitionedSources > 1) { + log.debug("%s - There are more than one partitioned sources: numOfPartitionedSources=%s", + taskId, planFragment.getPartitionedSources().size()); + } + int pendingSourceSplitCount = 0; long pendingSourceSplitsWeight = 0; for (PlanNodeId planNodeId : planFragment.getPartitionedSources()) { @@ -568,6 +597,12 @@ private synchronized void processTaskUpdate(TaskInfo newValue, List 0) { + pendingRequestsCounter.incrementAndGet(); + } + // Update node level split tracker before split queue space to ensure it's up to date before waking up the scheduler partitionedSplitCountTracker.setPartitionedSplits(getPartitionedSplitsInfo()); updateSplitQueueSpace(); @@ -596,6 +631,42 @@ private void triggerUpdate() } } + /** + * Adaptively adjust batch size to meet expected request size: + * If requestSize is not equal to expectedSize, this function will try to estimate and adjust the batch size proportionally based on + * current nums of splits and size of request. + */ + @VisibleForTesting + boolean adjustSplitBatchSize(List splitAssignments, long requestSize, int currentSplitBatchSize) + { + if ((requestSize > maxRequestSizeInBytes && currentSplitBatchSize > guaranteedSplitsPerRequest) || (requestSize < maxRequestSizeInBytes && currentSplitBatchSize < maxUnacknowledgedSplits)) { + int newSplitBatchSize = currentSplitBatchSize; + int numSplits = 0; + for (SplitAssignment splitAssignment : splitAssignments) { + // Adjustment applies only to partitioned sources. + if (planFragment.isPartitionedSources(splitAssignment.getPlanNodeId())) { + numSplits = splitAssignment.getSplits().size(); + break; + } + } + if (numSplits != 0) { + newSplitBatchSize = (int) (numSplits * (maxRequestSizeInBytes - requestSizeHeadroomInBytes) / requestSize); + newSplitBatchSize = Math.max(guaranteedSplitsPerRequest, Math.min(maxUnacknowledgedSplits, newSplitBatchSize)); + } + if (newSplitBatchSize != currentSplitBatchSize) { + log.debug("%s - Split batch size changed: prevSize=%s, newSize=%s", taskId, currentSplitBatchSize, newSplitBatchSize); + splitBatchSize.set(newSplitBatchSize); + } + // abandon current request and reschedule update if size of request body exceeds requestSizeLimit and splitBatchSize is updated + if (newSplitBatchSize < currentSplitBatchSize) { + log.debug("%s - current taskUpdateRequestJson exceeded limit: %d, currentSplitBatchSize: %d, newSplitBatchSize: %d, reschedule.", + taskId, requestSize, currentSplitBatchSize, newSplitBatchSize); + return true; // reschedule needed + } + } + return false; + } + private void sendUpdate() { TaskStatus taskStatus = getTaskStatus(); @@ -615,7 +686,8 @@ private void sendUpdate() return; } - List splitAssignments = getSplitAssignments(); + int currentSplitBatchSize = splitBatchSize.get(); + List splitAssignments = getSplitAssignments(currentSplitBatchSize); VersionedDynamicFilterDomains dynamicFilterDomains = outboundDynamicFiltersCollector.acknowledgeAndGetNewDomains(sentDynamicFiltersVersion.get()); // Workers don't need the embedded JSON representation when the fragment is sent @@ -629,6 +701,13 @@ private void sendUpdate() dynamicFilterDomains.getDynamicFilterDomains(), session.getExchangeEncryptionKey()); byte[] taskUpdateRequestJson = taskUpdateRequestCodec.toJsonBytes(updateRequest); + + // try to adjust batch size to meet expected request size + if (adaptiveUpdateRequestSizeEnabled && adjustSplitBatchSize(splitAssignments, taskUpdateRequestJson.length, currentSplitBatchSize)) { + scheduleUpdate(); + return; + } + if (fragment.isPresent()) { stats.updateWithPlanBytes(taskUpdateRequestJson.length); } @@ -654,22 +733,33 @@ private void sendUpdate() executor); } - private synchronized List getSplitAssignments() + private synchronized List getSplitAssignments(int currentSplitBatchSize) { return Stream.concat(planFragment.getPartitionedSourceNodes().stream(), planFragment.getRemoteSourceNodes().stream()) .filter(Objects::nonNull) .map(PlanNode::getId) - .map(this::getSplitAssignment) + .map(planNodeId -> getSplitAssignment(planNodeId, currentSplitBatchSize)) .filter(Objects::nonNull) .collect(toImmutableList()); } - private synchronized SplitAssignment getSplitAssignment(PlanNodeId planNodeId) + private synchronized SplitAssignment getSplitAssignment(PlanNodeId planNodeId, int currentSplitBatchSize) { Set splits = pendingSplits.get(planNodeId); boolean pendingNoMoreSplits = Boolean.TRUE.equals(this.noMoreSplits.get(planNodeId)); boolean noMoreSplits = this.noMoreSplits.containsKey(planNodeId); + // limit the number of splits only for a partitioned source + if (planFragment.isPartitionedSources(planNodeId) && currentSplitBatchSize < splits.size()) { + log.debug("%s - Splits are limited by splitBatchSize: splitBatchSize=%s, splits=%s, planNodeId=%s", taskId, currentSplitBatchSize, splits.size(), planNodeId); + splits = splits.stream() + .sorted(Comparator.comparingLong(ScheduledSplit::getSequenceId)) + .limit(currentSplitBatchSize) + .collect(Collectors.toCollection(LinkedHashSet::new)); + // if not last batch, we need to defer setting no more splits + noMoreSplits = false; + } + SplitAssignment assignment = null; if (!splits.isEmpty() || pendingNoMoreSplits) { assignment = new SplitAssignment(planNodeId, splits, noMoreSplits); @@ -933,11 +1023,11 @@ public void success(TaskInfo value) currentRequest.set(null); updateStats(); updateErrorTracker.requestSucceeded(); + processTaskUpdate(value, splitAssignments); if (pendingRequestsCounter.addAndGet(-currentPendingRequestsCounter) > 0) { // schedule an update because triggerUpdate was called in the meantime scheduleUpdate(); } - processTaskUpdate(value, splitAssignments); } } diff --git a/core/trino-main/src/test/java/io/trino/execution/TestQueryManagerConfig.java b/core/trino-main/src/test/java/io/trino/execution/TestQueryManagerConfig.java index d0d2d5c663b9..e4c5984035ea 100644 --- a/core/trino-main/src/test/java/io/trino/execution/TestQueryManagerConfig.java +++ b/core/trino-main/src/test/java/io/trino/execution/TestQueryManagerConfig.java @@ -72,6 +72,10 @@ public void testDefaults() .setRetryDelayScaleFactor(2.0) .setMaxTasksWaitingForExecutionPerQuery(10) .setMaxTasksWaitingForNodePerStage(5) + .setEnabledAdaptiveTaskRequestSize(true) + .setMaxRemoteTaskRequestSize(DataSize.of(8, DataSize.Unit.MEGABYTE)) + .setRemoteTaskRequestSizeHeadroom(DataSize.of(2, DataSize.Unit.MEGABYTE)) + .setRemoteTaskGuaranteedSplitPerTask(3) .setFaultTolerantExecutionTargetTaskInputSize(DataSize.of(4, GIGABYTE)) .setFaultTolerantExecutionMinTaskSplitCount(16) .setFaultTolerantExecutionTargetTaskSplitCount(64) @@ -119,6 +123,10 @@ public void testExplicitPropertyMappings() .put("retry-delay-scale-factor", "2.3") .put("max-tasks-waiting-for-execution-per-query", "22") .put("max-tasks-waiting-for-node-per-stage", "3") + .put("query.remote-task.enable-adaptive-request-size", "false") + .put("query.remote-task.max-request-size", "10MB") + .put("query.remote-task.request-size-headroom", "1MB") + .put("query.remote-task.guaranteed-splits-per-task", "5") .put("fault-tolerant-execution-target-task-input-size", "222MB") .put("fault-tolerant-execution-min-task-split-count", "2") .put("fault-tolerant-execution-target-task-split-count", "3") @@ -163,6 +171,10 @@ public void testExplicitPropertyMappings() .setRetryDelayScaleFactor(2.3) .setMaxTasksWaitingForExecutionPerQuery(22) .setMaxTasksWaitingForNodePerStage(3) + .setEnabledAdaptiveTaskRequestSize(false) + .setMaxRemoteTaskRequestSize(DataSize.of(10, DataSize.Unit.MEGABYTE)) + .setRemoteTaskRequestSizeHeadroom(DataSize.of(1, DataSize.Unit.MEGABYTE)) + .setRemoteTaskGuaranteedSplitPerTask(5) .setFaultTolerantExecutionTargetTaskInputSize(DataSize.of(222, MEGABYTE)) .setFaultTolerantExecutionMinTaskSplitCount(2) .setFaultTolerantExecutionTargetTaskSplitCount(3) diff --git a/core/trino-main/src/test/java/io/trino/execution/TestSqlTaskExecution.java b/core/trino-main/src/test/java/io/trino/execution/TestSqlTaskExecution.java index 2725dc696667..a13b653ff266 100644 --- a/core/trino-main/src/test/java/io/trino/execution/TestSqlTaskExecution.java +++ b/core/trino-main/src/test/java/io/trino/execution/TestSqlTaskExecution.java @@ -151,9 +151,23 @@ public void testSimple() assertEquals(taskStateMachine.getState(), RUNNING); // add assignment for pipeline + try { + // add a splitAssignments with a larger split sequence ID and a different plan node ID + PlanNodeId tableScanNodeId = new PlanNodeId("tableScan1"); + sqlTaskExecution.addSplitAssignments(ImmutableList.of(new SplitAssignment( + tableScanNodeId, + ImmutableSet.of(newScheduledSplit(3, tableScanNodeId, 400000, 400)), + false))); + } + catch (NullPointerException e) { + // this is expected since there is no pipeline for this + // the purpose of this splitAssignment is setting maxAcknowledgedSplitByPlanNode in SqlTaskExecution with the larger split sequence ID + } + // the split below shouldn't be skipped even though its sequence ID is smaller than the sequence ID of the previous split because they have different plan node IDs sqlTaskExecution.addSplitAssignments(ImmutableList.of(new SplitAssignment( TABLE_SCAN_NODE_ID, - ImmutableSet.of(newScheduledSplit(0, TABLE_SCAN_NODE_ID, 100000, 123)), + ImmutableSet.of( + newScheduledSplit(0, TABLE_SCAN_NODE_ID, 100000, 123)), false))); // assert that partial task result is produced outputBufferConsumer.consume(123, ASSERT_WAIT_TIMEOUT); diff --git a/core/trino-main/src/test/java/io/trino/server/remotetask/TestHttpRemoteTask.java b/core/trino-main/src/test/java/io/trino/server/remotetask/TestHttpRemoteTask.java index 330a566cb052..fcb8adc14fab 100644 --- a/core/trino-main/src/test/java/io/trino/server/remotetask/TestHttpRemoteTask.java +++ b/core/trino-main/src/test/java/io/trino/server/remotetask/TestHttpRemoteTask.java @@ -13,10 +13,12 @@ */ package io.trino.server.remotetask; +import com.google.common.collect.HashMultimap; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMultimap; import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Multimap; import com.google.inject.Binder; import com.google.inject.Injector; import com.google.inject.Module; @@ -28,6 +30,7 @@ import io.airlift.json.JsonCodec; import io.airlift.json.JsonModule; import io.airlift.units.Duration; +import io.trino.Session; import io.trino.block.BlockJsonSerde; import io.trino.client.NodeVersion; import io.trino.execution.DynamicFilterConfig; @@ -35,6 +38,7 @@ import io.trino.execution.NodeTaskMap; import io.trino.execution.QueryManagerConfig; import io.trino.execution.RemoteTask; +import io.trino.execution.ScheduledSplit; import io.trino.execution.SplitAssignment; import io.trino.execution.StageId; import io.trino.execution.TaskId; @@ -94,6 +98,7 @@ import java.net.URI; import java.util.ArrayList; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Optional; @@ -109,12 +114,19 @@ import static com.google.inject.Scopes.SINGLETON; import static io.airlift.json.JsonBinder.jsonBinder; import static io.airlift.json.JsonCodecBinder.jsonCodecBinder; +import static io.airlift.testing.Assertions.assertGreaterThan; import static io.airlift.testing.Assertions.assertGreaterThanOrEqual; +import static io.airlift.testing.Assertions.assertLessThan; import static io.trino.SessionTestUtils.TEST_SESSION; +import static io.trino.SystemSessionProperties.REMOTE_TASK_ADAPTIVE_UPDATE_REQUEST_SIZE_ENABLED; +import static io.trino.SystemSessionProperties.REMOTE_TASK_GUARANTEED_SPLITS_PER_REQUEST; +import static io.trino.SystemSessionProperties.REMOTE_TASK_MAX_REQUEST_SIZE; +import static io.trino.SystemSessionProperties.REMOTE_TASK_REQUEST_SIZE_HEADROOM; import static io.trino.execution.DynamicFiltersCollector.INITIAL_DYNAMIC_FILTERS_VERSION; import static io.trino.execution.TaskTestUtils.TABLE_SCAN_NODE_ID; import static io.trino.execution.buffer.PipelinedOutputBuffers.BufferType.BROADCAST; import static io.trino.metadata.MetadataManager.createTestMetadataManager; +import static io.trino.plugin.tpch.TpchMetadata.TINY_SCHEMA_NAME; import static io.trino.server.InternalHeaders.TRINO_CURRENT_VERSION; import static io.trino.server.InternalHeaders.TRINO_MAX_WAIT; import static io.trino.spi.StandardErrorCode.REMOTE_TASK_ERROR; @@ -122,6 +134,7 @@ import static io.trino.spi.type.BigintType.BIGINT; import static io.trino.sql.planner.TestingPlannerContext.PLANNER_CONTEXT; import static io.trino.testing.TestingHandles.TEST_CATALOG_HANDLE; +import static io.trino.testing.TestingSession.testSessionBuilder; import static io.trino.testing.assertions.Assert.assertEquals; import static io.trino.testing.assertions.Assert.assertEventually; import static io.trino.type.InternalTypeManager.TESTING_TYPE_MANAGER; @@ -131,6 +144,7 @@ import static java.util.concurrent.TimeUnit.MILLISECONDS; import static java.util.concurrent.TimeUnit.NANOSECONDS; import static java.util.concurrent.TimeUnit.SECONDS; +import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertTrue; public class TestHttpRemoteTask @@ -369,6 +383,82 @@ public void testOutboundDynamicFilters() httpRemoteTaskFactory.stop(); } + @Test(timeOut = 300000) + public void testAdaptiveRemoteTaskRequestSize() + throws Exception + { + AtomicLong lastActivityNanos = new AtomicLong(System.nanoTime()); + TestingTaskResource testingTaskResource = new TestingTaskResource(lastActivityNanos, FailureScenario.NO_FAILURE); + + Session session = testSessionBuilder() + .setCatalog("tpch") + .setSchema(TINY_SCHEMA_NAME) + .setSystemProperty(REMOTE_TASK_ADAPTIVE_UPDATE_REQUEST_SIZE_ENABLED, "true") + .setSystemProperty(REMOTE_TASK_MAX_REQUEST_SIZE, "100kB") + .setSystemProperty(REMOTE_TASK_REQUEST_SIZE_HEADROOM, "10kB") + .setSystemProperty(REMOTE_TASK_GUARANTEED_SPLITS_PER_REQUEST, "1") + .build(); + HttpRemoteTaskFactory httpRemoteTaskFactory = createHttpRemoteTaskFactory(testingTaskResource); + + RemoteTask remoteTask = createRemoteTask(httpRemoteTaskFactory, ImmutableSet.of(), session); + + testingTaskResource.setInitialTaskInfo(remoteTask.getTaskInfo()); + remoteTask.start(); + + Multimap splits = HashMultimap.create(); + for (int i = 0; i < 10000; i++) { + splits.put(TABLE_SCAN_NODE_ID, new Split(TEST_CATALOG_HANDLE, TestingSplit.createLocalSplit())); + } + remoteTask.addSplits(splits); + + poll(() -> testingTaskResource.getTaskSplitAssignment(TABLE_SCAN_NODE_ID) != null); + poll(() -> testingTaskResource.getTaskSplitAssignment(TABLE_SCAN_NODE_ID).getSplits().size() < 5000); // to check whether the splits are divided or not + poll(() -> testingTaskResource.getTaskSplitAssignment(TABLE_SCAN_NODE_ID).getSplits().size() == 10000); // to check whether all the splits are sent or not + + remoteTask.noMoreSplits(TABLE_SCAN_NODE_ID); + poll(() -> testingTaskResource.getTaskSplitAssignment(TABLE_SCAN_NODE_ID).isNoMoreSplits()); + + remoteTask.cancel(); + poll(() -> remoteTask.getTaskStatus().getState().isDone()); + poll(() -> remoteTask.getTaskInfo().getTaskStatus().getState().isDone()); + + httpRemoteTaskFactory.stop(); + } + + @Test + public void testAdjustSplitBatchSize() + { + AtomicLong lastActivityNanos = new AtomicLong(System.nanoTime()); + TestingTaskResource testingTaskResource = new TestingTaskResource(lastActivityNanos, FailureScenario.NO_FAILURE); + + Session session = testSessionBuilder() + .setCatalog("tpch") + .setSchema(TINY_SCHEMA_NAME) + .setSystemProperty(REMOTE_TASK_ADAPTIVE_UPDATE_REQUEST_SIZE_ENABLED, "true") + .setSystemProperty(REMOTE_TASK_MAX_REQUEST_SIZE, "100kB") + .setSystemProperty(REMOTE_TASK_REQUEST_SIZE_HEADROOM, "10kB") + .setSystemProperty(REMOTE_TASK_GUARANTEED_SPLITS_PER_REQUEST, "1") + .build(); + HttpRemoteTaskFactory httpRemoteTaskFactory = createHttpRemoteTaskFactory(testingTaskResource); + + RemoteTask remoteTask = createRemoteTask(httpRemoteTaskFactory, ImmutableSet.of(), session); + + testingTaskResource.setInitialTaskInfo(remoteTask.getTaskInfo()); + + Set splits = new HashSet<>(); + for (int i = 0; i < 1000; i++) { + splits.add(new ScheduledSplit(i, TABLE_SCAN_NODE_ID, new Split(TEST_CATALOG_HANDLE, TestingSplit.createLocalSplit()))); + } + + // decrease splitBatchSize + assertTrue(((HttpRemoteTask) remoteTask).adjustSplitBatchSize(ImmutableList.of(new SplitAssignment(TABLE_SCAN_NODE_ID, splits, true)), 1000000, 500)); + assertLessThan(((HttpRemoteTask) remoteTask).splitBatchSize.get(), 250); + + // increase splitBatchSize + assertFalse(((HttpRemoteTask) remoteTask).adjustSplitBatchSize(ImmutableList.of(new SplitAssignment(TABLE_SCAN_NODE_ID, splits, true)), 1000, 100)); + assertGreaterThan(((HttpRemoteTask) remoteTask).splitBatchSize.get(), 250); + } + private void runTest(FailureScenario failureScenario) throws Exception { @@ -412,9 +502,14 @@ private void addSplit(RemoteTask remoteTask, TestingTaskResource testingTaskReso } private RemoteTask createRemoteTask(HttpRemoteTaskFactory httpRemoteTaskFactory, Set outboundDynamicFilterIds) + { + return createRemoteTask(httpRemoteTaskFactory, outboundDynamicFilterIds, TEST_SESSION); + } + + private RemoteTask createRemoteTask(HttpRemoteTaskFactory httpRemoteTaskFactory, Set outboundDynamicFilterIds, Session session) { return httpRemoteTaskFactory.createRemoteTask( - TEST_SESSION, + session, new TaskId(new StageId("test", 1), 2, 0), new InternalNode("node-id", URI.create("http://fake.invalid/"), new NodeVersion("version"), false), TaskTestUtils.PLAN_FRAGMENT,