-
Notifications
You must be signed in to change notification settings - Fork 3.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implement adaptive remote task request size #10013
Conversation
a6b33e1
to
99595ec
Compare
29d32ba
to
2612574
Compare
2612574
to
f77f287
Compare
How does this problem manifest itself? Could you please elaborate more on the problem you are trying to solve? |
@arhimondr @yansun7 can you fix the merge conflicts, please? |
@@ -58,6 +58,11 @@ | |||
private Duration remoteTaskMaxErrorDuration = new Duration(5, TimeUnit.MINUTES); | |||
private int remoteTaskMaxCallbackThreads = 1000; | |||
|
|||
private boolean enabledAdaptiveTaskRequestSize; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why isn't it 'true'?
sources = sources.stream() | ||
.map(source -> new TaskSource( | ||
source.getPlanNodeId(), | ||
source.getSplits().stream() | ||
.filter(scheduledSplit -> scheduledSplit.getSequenceId() > currentMaxAcknowledgedSplit) | ||
.filter(scheduledSplit -> scheduledSplit.getSequenceId() > maxAcknowledgedSplitByPlanNode.getOrDefault(source.getPlanNodeId(), Long.MIN_VALUE)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There could be lots of splits. In this case, there will be lots of map accesses. It would be better to do like the below,
.map(source -> {
long currentMaxAcknowledgedSplit = maxAcknowledgedSplitByPlanNode.getOrDefault(source.getPlanNodeId(), Long.MIN_VALUE);
return new TaskSource(
source.getPlanNodeId(),
source.getSplits().stream()
.filter(scheduledSplit -> scheduledSplit.getSequenceId() > currentMaxAcknowledgedSplit)
.collect(Collectors.toSet()),
// Like splits, noMoreSplitsForLifespan could be pruned so that only new items will be processed.
// This is not happening here because correctness won't be compromised due to duplicate events for noMoreSplitsForLifespan.
source.getNoMoreSplitsForLifespan(),
source.isNoMoreSplits());
})
.max() | ||
.orElse(maxAcknowledgedSplit); | ||
for (TaskSource taskSource : sources) { | ||
long maxAcknowledgedSplit = taskSource.getSplits().stream() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When taskSource.getSplits().size() == 0, it can 'continue;'.
long maxAcknowledgedSplit = taskSource.getSplits().stream() | ||
.mapToLong(ScheduledSplit::getSequenceId) | ||
.max() | ||
.orElse(Long.MIN_VALUE); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If taskSource.getSplits().size() == 0 is already checked, .orElse(Long.MIN_VALUE) is not needed. Instead of that, just '.getAsLong()'.
if (!maxAcknowledgedSplitByPlanNode.containsKey(planNodeId)) { | ||
maxAcknowledgedSplitByPlanNode.put(planNodeId, Long.MIN_VALUE); | ||
} | ||
maxAcknowledgedSplitByPlanNode.computeIfPresent(planNodeId, (key, val) -> maxAcknowledgedSplit > val ? maxAcknowledgedSplit : val); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If taskSource.getSplits().size() == 0 is already checked, it should throw when val >= maxAcknowledgedSplit.
e.g.
maxAcknowledgedSplitByPlanNode.compute(planNodeId, (key, val) -> {
if (val == null) {
return maxAcknowledgedSplit;
}
if (val >= maxAcknowledgedSplit) {
throw new IllegalStateException(format("%s - splits are out of order? planNodeId=%s, newMax=%d, currentMax=%d", taskId, planNodeId, maxAcknowledgedSplit, val));
}
return maxAcknowledgedSplit;
});
if (!maxAcknowledgedSplitByPlanNode.containsKey(planNodeId)) { | ||
maxAcknowledgedSplitByPlanNode.put(planNodeId, Long.MIN_VALUE); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If taskSource.getSplits().size() == 0 is already checked, this is not needed.
// abandon current request and reschedule update if size of request body exceeds requestSizeLimit | ||
// and splitBatchSize is updated | ||
if (taskUpdateRequestJson.length > maxRequestSize && splitBatchSize < oldSplitBatchSize) { | ||
log.debug("%s - current taskUpdateRequestJson exceeded limit: %d, abandon.", taskId, taskUpdateRequestJson.length); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
log.info?
{ | ||
int numSplits = 0; | ||
for (TaskSource taskSource : sources) { | ||
numSplits = Math.max(numSplits, taskSource.getSplits().size()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It should find max only for partitioned sources.
{ | ||
int numSplits = 0; | ||
for (TaskSource taskSource : sources) { | ||
numSplits = Math.max(numSplits, taskSource.getSplits().size()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
With colocation joins with bucketed tables, there could be more than one partitioned sources. In that case, this logic will become awkward. Please apply this logic only when there is one partitioned source.
👋 @yansun7 - this PR is inactive and doesn't seem to be under development. If you'd like to continue work on this at any point in the future, feel free to re-open. |
Currently HttpRemotTask will send all pending splits in one taskUpdateRequest, sometimes causing large request for each task in coordinator.
This change will enable the ability to split large taskUpdateRequest into several smaller batch dynamically to send.
Introduced 4 parameters regarding to this feature:
query.remote-task.enable-adaptive-request-size
query.remote-task.max-request-size
query.remote-task.request-size-headroom
query.remote-task.guaranteed-splits-per-task