-
Notifications
You must be signed in to change notification settings - Fork 3.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Decide number of clients basing on average request size of client #15369
Decide number of clients basing on average request size of client #15369
Conversation
5d5fe25
to
b520ecf
Compare
b520ecf
to
c8e680e
Compare
c8e680e
to
77388b6
Compare
.filter(client -> !queuedClients.contains(client) && !completedClients.contains(client)) | ||
.mapToLong(HttpPageBufferClient::getAverageRequestSizeInBytes) | ||
.sum(); | ||
long bytesToBeRequested = 0; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
projectedBytesToBeRequested
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok, let it be
@@ -106,7 +108,9 @@ public DirectExchangeClient( | |||
ScheduledExecutorService scheduledExecutor, | |||
LocalMemoryContext memoryContext, | |||
Executor pageBufferClientCallbackExecutor, | |||
TaskFailureListener taskFailureListener) | |||
TaskFailureListener taskFailureListener, | |||
ConcurrentHashMap<URI, HttpPageBufferClient> allClients, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it's better to expose package private getter for testing
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I do not think so in that case. If I do it in that way I need to make the queuedClients
collection to be concurrent or I need another mechanism of synchronization. I saw in the codebase that we use @VisibleForTesting
constructor.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
visible for testing (package private)
getAllClients()
getQueuedClients()
core/trino-main/src/main/java/io/trino/operator/DirectExchangeClient.java
Show resolved
Hide resolved
synchronized void updateAverageRequestSize(int successfulRequests, long responseSize) | ||
{ | ||
if (successfulRequests > 0) { | ||
averageRequestSizeInBytes = (averageRequestSizeInBytes * (successfulRequests - 1) + responseSize) / successfulRequests; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
in current could this is using doubles for computations:
// AVG_n = AVG_(n-1) * (n-1)/n + VALUE_n / n
averageBytesPerRequest = (long) (1.0 * averageBytesPerRequest * (successfulRequests - 1) / successfulRequests + responseSize / successfulRequests);
I think its more accurate
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
right
long responseSize = pages.stream().mapToLong(Slice::length).sum(); | ||
synchronized (HttpPageBufferClient.this) { | ||
requestsCompleted.incrementAndGet(); | ||
updateAverageRequestSize(Math.max(0, requestsCompleted.get() - requestsFailed.get()), responseSize); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can requestsCompleted.get() - requestsFailed.get()
become negative?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It should not. It is kind of defensive programming.
core/trino-main/src/main/java/io/trino/operator/HttpPageBufferClient.java
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/HttpPageBufferClient.java
Outdated
Show resolved
Hide resolved
@@ -511,8 +532,8 @@ public void onSuccess(@Nullable StatusResponse result) | |||
future = null; | |||
} | |||
lastUpdate = DateTime.now(); | |||
requestsCompleted.incrementAndGet(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
undo
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I need that. I am doing artihmetic on requestsFailed
and requestsCompleted
. Without synchronization my average would be invalid.
core/trino-main/src/main/java/io/trino/operator/HttpPageBufferClient.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/DirectExchangeClient.java
Outdated
Show resolved
Hide resolved
77388b6
to
94f00d6
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm % comments
@@ -106,7 +108,9 @@ public DirectExchangeClient( | |||
ScheduledExecutorService scheduledExecutor, | |||
LocalMemoryContext memoryContext, | |||
Executor pageBufferClientCallbackExecutor, | |||
TaskFailureListener taskFailureListener) | |||
TaskFailureListener taskFailureListener, | |||
ConcurrentHashMap<URI, HttpPageBufferClient> allClients, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
visible for testing (package private)
getAllClients()
getQueuedClients()
core/trino-main/src/main/java/io/trino/operator/DirectExchangeClient.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/DirectExchangeClient.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/HttpPageBufferClient.java
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/HttpPageBufferClient.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/HttpPageBufferClient.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/HttpPageBufferClient.java
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/HttpPageBufferClient.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/test/java/io/trino/operator/TestDirectExchangeClient.java
Outdated
Show resolved
Hide resolved
It gives some gain in our TPCDS / TPCH benchmarks:
|
94f00d6
to
189a510
Compare
@sopel39 all comments addressed |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
small comment
core/trino-main/src/test/java/io/trino/operator/TestDirectExchangeClient.java
Show resolved
Hide resolved
Change the way how DirectExchangeClient.scheduleRequestIfNecessary calculates the number of clients to be requested on the exchange phase to use an average request size of specific client instead of aggregated average of all clients.
Tests for a new approach to calculate the number of clients to be requested was added. Beyond that, the test for calculating average size of request was added.
189a510
to
51a7d65
Compare
commit structure repaired |
Description
Let's define the class of queries that are run on skewed data and as a result on some downstream stage there is one (or few nodes) that gather data from empty or nearly empty nodes at upstream stage - i.e. at some upstream stage most nodes are empty (serving no data) or nearly empty (serving little data). We observed that such queries are executed very poorly on Trino.
This PR addresses that issue.
The query listed below is a representant of that class:
Before the change it takes 11,5 minutes to finish that query on 32 nodes r6g.4xlarge cluster. After the change it takes 4 minutes to finish on same cluster.
No regresion in serial and throughput benchmarks.
Release notes
( *) This is not user-visible or docs only and no release notes are required.
( ) Release notes are required, please propose a release note for me.
( ) Release notes are required, with the following suggested text: