-
Notifications
You must be signed in to change notification settings - Fork 3.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implement adaptive remote task request size #15721
Implement adaptive remote task request size #15721
Conversation
This can mitigate concerns mentioned by @pettyjamesm in #15579. |
@@ -670,6 +758,21 @@ private synchronized SplitAssignment getSplitAssignment(PlanNodeId planNodeId) | |||
boolean pendingNoMoreSplits = Boolean.TRUE.equals(this.noMoreSplits.get(planNodeId)); | |||
boolean noMoreSplits = this.noMoreSplits.containsKey(planNodeId); | |||
|
|||
// limit the number of splits only for a partitioned source | |||
if (planFragment.getPartitionedSourceNodes().contains(planNodeId) && splitBatchSize < splits.size()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This condition was always evaluated as false due to difference in types.
break; | ||
} | ||
} | ||
if (numSplits <= 0) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit. numSplits != 0
|
||
int newSplitBatchSize = (int) ((double) numSplits * ((double) expectedSize / requestSize)); | ||
newSplitBatchSize = Math.max(guaranteedSplitsPerRequest, Math.min(maxUnacknowledgedSplits, newSplitBatchSize)); | ||
if (currentSplitBatchSize != newSplitBatchSize) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we exceed request size, we should always end up with different number of splits if we did not hit guaranteedSplitsPerRequest
. Doing operations on doubles and round by casting, we could end up with infinite loop, as size would be bigger than expected and newSplitBatchSize
could stay the same.
Also, source splits could be only a part of requests, and we don't take this into account during limitation of request size.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Parts that can increase significantly is splitAssignments
. fragment
can increase as well, but it cannot be divided into smaller pieces. That's the reason why there is requestSizeHeadroom
.
if (taskUpdateRequestJson.length > maxRequestSize && newSplitBatchSize < oldSplitBatchSize) { | ||
log.info("%s - current taskUpdateRequestJson exceeded limit: %d, oldSplitBatchSize: %d, adjustedSplitBatchSize: %d, reschedule.", | ||
taskId, taskUpdateRequestJson.length, oldSplitBatchSize, newSplitBatchSize); | ||
scheduleUpdate(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should not reschedule if splitAssignments
size is equal guaranteedSplitsPerRequest
and such request should be processed as it is.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If oldSplitBatchSize == guaranteedSplitsPerRequest
, then newSplitBatchSize < oldSplitBatchSize
will be always false.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't we test here (line 709) taskUpdateRequestJson.length > (maxRequestSize - requestSizeHeadroom)
?
If we take requestSize = 6 mb
, maxRequestSize = 7 mb
, requestSizeHeadroom = 2 mb
then in adjustSplitBatchSize
expected size will be 5mb, and algorithm will trigger limiting of splits, but we will not trigger scheduleUpdate
. Next request will be adjusted to newSplitBatchSize
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If requestSize = 6 mb
and maxRequestSize = 7 mb
, since requestSize < maxRequestSize
, it doesn't need to reschedule. requestSizeHeadroom
is for calculating the number of splits
only when requestSize > maxRequestSize
(given that a request contains not only splits, but also other data). Let me change the logic to call adjustSplitBatchSize
only when requestSize > maxRequestSize
.
} | ||
remoteTask.addSplits(splits); | ||
|
||
poll(() -> testingTaskResource.getTaskSplitAssignment(TABLE_SCAN_NODE_ID) != null); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How adjustment is tested here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Even though splits
is divided into smaller requests, it will eventually send all 1000 splits.
Let me add another poll
to check whether splits
is actually divided or not.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we test if request size did not exceed our maximum?
@GuardedBy("this") | ||
private int splitBatchSize; | ||
@GuardedBy("this") | ||
private boolean splitBatchSizeChanged; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this variable required, as its purpose is only to present log?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. To prevent repetitive logs. But, let me remove this, since the log would be more fit to debug log level.
// and splitBatchSize is updated | ||
if (newSplitBatchSize != oldSplitBatchSize) { | ||
splitBatchSize = newSplitBatchSize; | ||
splitBatchSizeChanged = true; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this object property?
@@ -568,6 +595,12 @@ private synchronized void processTaskUpdate(TaskInfo newValue, List<SplitAssignm | |||
pendingSourceSplitsWeight -= removedWeight; | |||
} | |||
} | |||
|
|||
// increment pendingRequestsCounter by 1 when there are still pending splits |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this required in processTaskUpdate
? This looks more like a way around not sending all splits during sendUpdate
. Why pendingSplits
instead of pendingSourceSplitsWeight
?
Btw. Do you test how this waiting for task update impacts latency to send more splits if we hit request size limit?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense. I think pendingSourceSplitCount
works as well.
Yes. I tested it and there was no noticeable performance impacts. Since sendUpdate
is just assigning splits to workers, workers will have sufficient amount of works(splits) to do. guaranteedSplitsPerRequest
will guarantee that as well.
// set this false to prevent repetitive logs | ||
splitBatchSizeChanged = false; | ||
} | ||
splits = splits.stream() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe use of linkedHashSet
would be a better approach here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My intention was to avoid sorting splits by having splits already inserted into a set which is sorted by sequenceId and just do poll of first splits from this set.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't understand what you meant. (especially just do poll of first splits from this set
) AFAIK, splits
does not guarantee the sequenceId
order.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some comments
ImmutableSet.copyOf(newSplits), | ||
splitAssignment.isNoMoreSplits()); | ||
} | ||
unacknowledgedSplitAssignment.add(splitAssignment); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should it be done under the if
condition?
// Currently it supports only when there is one partitioned source. | ||
this.enableAdaptiveTaskRequestSize = numOfPartitionedSources == 1 ? isEnableAdaptiveTaskRequestSize(session) : false; | ||
if (numOfPartitionedSources > 1) { | ||
log.info("%s - There are more than one partitioned sources: numOfPartitionedSources=%s", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
debug
int newSplitBatchSize = (int) ((double) numSplits * ((double) expectedSize / requestSize)); | ||
newSplitBatchSize = Math.max(guaranteedSplitsPerRequest, Math.min(maxUnacknowledgedSplits, newSplitBatchSize)); | ||
if (currentSplitBatchSize != newSplitBatchSize) { | ||
log.info("%s - Split batch size changed: prevSize=%s, newSize=%s", taskId, numSplits, newSplitBatchSize); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
debug
VersionedDynamicFilterDomains dynamicFilterDomains; | ||
Optional<PlanFragment> fragment; | ||
byte[] taskUpdateRequestJson; | ||
synchronized (this) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should minimize the scope of syncrhonization
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
getSplitAssignments
and adjustSplitBatchSize
should be synchronized together.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Cannot we pass currentSplitBatchSize to getSplitAssignments
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let me change splitBatchSize
to AtomicInteger
, then.
splitBatchSizeChanged = true; | ||
} | ||
if (taskUpdateRequestJson.length > maxRequestSize && newSplitBatchSize < oldSplitBatchSize) { | ||
log.info("%s - current taskUpdateRequestJson exceeded limit: %d, oldSplitBatchSize: %d, adjustedSplitBatchSize: %d, reschedule.", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
debug
// limit the number of splits only for a partitioned source | ||
if (planFragment.getPartitionedSourceNodes().contains(planNodeId) && splitBatchSize < splits.size()) { | ||
if (splitBatchSizeChanged) { | ||
log.info("%s - Splits are limited by splitBatchSize: splitBatchSize=%s, splits=%s, planNodeId=%s", taskId, splitBatchSize, splits.size(), planNodeId); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
debug
core/trino-main/src/main/java/io/trino/server/remotetask/HttpRemoteTask.java
Outdated
Show resolved
Hide resolved
this.splitBatchSize = maxUnacknowledgedSplits; | ||
long numOfPartitionedSources = planFragment.getPartitionedSources().size(); | ||
// Currently it supports only when there is one partitioned source. | ||
this.enableAdaptiveTaskRequestSize = numOfPartitionedSources == 1 ? isEnableAdaptiveTaskRequestSize(session) : false; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be great if it worked for any number / type of sources. Otherwise some queries (such as collocated joins) might still experience the same problem. Would it be possible to have something like targetSplitsPerSource
and adjust it according to request size?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can. But, can we do that as a follow-up of this PR?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It feels like you would need to change quite a bit, but maybe it is more incremental than it seems. Do you think it is worth having two increments?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. I think most of cases there will be one partitioned source (assuming that collocated joins are not common). So, this PR will cover most of cases and unblock #15579.
b567913
to
4a678f0
Compare
PlanNodeId planNodeId = splitAssignment.getPlanNodeId(); | ||
long currentMaxAcknowledgedSplit = maxAcknowledgedSplitByPlanNode.getOrDefault(planNodeId, Long.MIN_VALUE); | ||
long maxAcknowledgedSplit = currentMaxAcknowledgedSplit; | ||
Set<ScheduledSplit> newSplits = new HashSet<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not use ImmutableSet.Builder
here?
Optional<PlanFragment> fragment; | ||
byte[] taskUpdateRequestJson; | ||
synchronized (this) { | ||
int oldSplitBatchSize = splitBatchSize; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit. oldSplitBatchSize
-> currentSplitBatchSize
4a678f0
to
9f274b8
Compare
@@ -629,6 +692,23 @@ private void sendUpdate() | |||
dynamicFilterDomains.getDynamicFilterDomains(), | |||
session.getExchangeEncryptionKey()); | |||
byte[] taskUpdateRequestJson = taskUpdateRequestCodec.toJsonBytes(updateRequest); | |||
|
|||
if (enableAdaptiveTaskRequestSize && taskUpdateRequestJson.length > maxRequestSize) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Testing here taskUpdateRequestJson.length > maxRequestSize
disallow to increase currentSplitBatchSize
if request size is smaller than expectedSize
and we didn't exceed maxUnacknowledgedSplits
. Cannot we have taskUpdateRequestJson.length > (maxRequestSize - headroom)
in if at 704 line?
Also, it would be nice to test increment behavior.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
taskUpdateRequestJson is symmetric of maxRequestSize. the condition in adjustSplitBatchSize
should check maxRequestSizeInBytes
instead of maxRequestSizeInBytes - requestSizeHeadroomInBytes
. Let me update it accordingly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Generally looks good, some small comments
@@ -173,6 +173,10 @@ | |||
public static final String ADAPTIVE_PARTIAL_AGGREGATION_ENABLED = "adaptive_partial_aggregation_enabled"; | |||
public static final String ADAPTIVE_PARTIAL_AGGREGATION_MIN_ROWS = "adaptive_partial_aggregation_min_rows"; | |||
public static final String ADAPTIVE_PARTIAL_AGGREGATION_UNIQUE_ROWS_RATIO_THRESHOLD = "adaptive_partial_aggregation_unique_rows_ratio_threshold"; | |||
public static final String ENABLE_ADAPTIVE_REMOTE_TASK_REQUEST_SIZE = "enable_adaptive_remote_task_request_size"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: generally we prefer boolean property names to answer is
question, e.g.: ADAPTIVE_PARTIAL_AGGREGATION_ENABLED
, SPILL_ENABLED
. There are some properties that answer "do" (e.g.: ENABLE_DYNAMIC_FILTERING
) but generally it is not preffered. I would recommend calling this property (and respective config properties / variables) as REMOTE_TASK_ADAPTIVE_UPDATE_REQUEST_SIZE_ENABLED
@@ -173,6 +173,10 @@ | |||
public static final String ADAPTIVE_PARTIAL_AGGREGATION_ENABLED = "adaptive_partial_aggregation_enabled"; | |||
public static final String ADAPTIVE_PARTIAL_AGGREGATION_MIN_ROWS = "adaptive_partial_aggregation_min_rows"; | |||
public static final String ADAPTIVE_PARTIAL_AGGREGATION_UNIQUE_ROWS_RATIO_THRESHOLD = "adaptive_partial_aggregation_unique_rows_ratio_threshold"; | |||
public static final String ENABLE_ADAPTIVE_REMOTE_TASK_REQUEST_SIZE = "enable_adaptive_remote_task_request_size"; | |||
public static final String MAX_REMOTE_TASK_REQUEST_SIZE = "max_remote_task_request_size"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe REMOTE_TASK_MAX_REQUEST_SIZE
to be consistent with other two
@@ -171,6 +180,11 @@ | |||
private final AtomicBoolean started = new AtomicBoolean(false); | |||
private final AtomicBoolean aborting = new AtomicBoolean(false); | |||
|
|||
private final int guaranteedSplitsPerRequest; | |||
private final long maxRequestSize; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: maxRequestSizeInBytes
@@ -171,6 +180,11 @@ | |||
private final AtomicBoolean started = new AtomicBoolean(false); | |||
private final AtomicBoolean aborting = new AtomicBoolean(false); | |||
|
|||
private final int guaranteedSplitsPerRequest; | |||
private final long maxRequestSize; | |||
private final long requestSizeHeadroom; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: requestSizeHeadroomInBytes
private final int guaranteedSplitsPerRequest; | ||
private final long maxRequestSize; | ||
private final long requestSizeHeadroom; | ||
private final boolean enableAdaptiveTaskRequestSize; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: adaptiveUpdateRequestSizeEnabled
this.requestSizeHeadroom = getRemoteTaskRequestSizeHeadroom(session).toBytes(); | ||
this.splitBatchSize = new AtomicInteger(maxUnacknowledgedSplits); | ||
long numOfPartitionedSources = planFragment.getPartitionedSources().size(); | ||
// Currently it supports only when there is one partitioned source. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please create a TODO issue and link it here
@@ -568,6 +594,12 @@ private synchronized void processTaskUpdate(TaskInfo newValue, List<SplitAssignm | |||
pendingSourceSplitsWeight -= removedWeight; | |||
} | |||
} | |||
|
|||
// increment pendingRequestsCounter by 1 when there are still pending splits |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why wasn't it necessary before?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Before this feature, always all pending splits were added to a request.
*/ | ||
private int adjustSplitBatchSize(List<SplitAssignment> splitAssignments, long requestSize, long expectedSize, int currentSplitBatchSize) | ||
{ | ||
if ((requestSize > expectedSize && currentSplitBatchSize > guaranteedSplitsPerRequest) || (requestSize < expectedSize && currentSplitBatchSize < maxUnacknowledgedSplits)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is the (requestSize < expectedSize && currentSplitBatchSize < maxUnacknowledgedSplits)
condition necessary? adjustSplitBatchSize
is only called when taskUpdateRequestJson.length > maxRequestSize
, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's a good point. I mistakenly moved taskUpdateRequestJson.length > maxRequestSize
to the upper condition for addressing Kamil's comment. Let me fix it.
Before the change HttpRemotTask will send all pending splits in one taskUpdateRequest, sometimes causing large request for each task in coordinator. This change will enable the ability to dynamically split large taskUpdateRequest into several smaller batch to send. Introduced 4 parameters regarding to this feature: query.remote-task.enable-adaptive-request-size : to control the enable/disable of the feature query.remote-task.max-request-size: the max size of taskUpdateRequest query.remote-task.request-size-headroom: the expected mean value of the taskUpdateRequest will be (request-size-limit - request-size-limit-headroom) query.remote-task.guaranteed-splits-per-task: guaranteed splits that will be sent in taskUpdateRequest for each task, to prevent deadlock
9f274b8
to
03cbc77
Compare
LGTM % comments |
Can someone please help merge this? |
Thanks @JunhyungSong ! |
Description
Before the change HttpRemotTask will send all pending splits in one taskUpdateRequest, sometimes causing large request for each task in coordinator.
This change will enable the ability to dynamically split large taskUpdateRequest into several smaller batch to send.
Introduced 4 parameters regarding to this feature:
query.remote-task.enable-adaptive-request-size : to control the enable/disable of the feature
query.remote-task.max-request-size: the max size of taskUpdateRequest
query.remote-task.request-size-headroom: the expected mean value of the taskUpdateRequest will be (request-size-limit - request-size-limit-headroom)
query.remote-task.guaranteed-splits-per-task: guaranteed splits that will be sent in taskUpdateRequest for each task, to prevent deadlock
New revision of #10013.
Additional context and related issues
The problematic situation:
Table has 3884 columns.
Table schema has a “parameters” field with the whole table schema string(~ 300k characters)
Size of each split object(in task update request) is big due to large strings in table properties.
If taskUpdateRequest contains a lot of huge splits like above and send it to the worker through http, it causes coordinator crash due to out of memory.
So, this PR tried to limit the size of taskUpdateRequest not only using max_unacknowledged_splits_per_task, but also using max_remote_task_request_size.
Release notes
( ) This is not user-visible or docs only and no release notes are required.
(x) Release notes are required, please propose a release note for me.
( ) Release notes are required, with the following suggested text: