-
Notifications
You must be signed in to change notification settings - Fork 3.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Use the Pinot Grpc Endpoint for Streaming Server Queries #12332
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
skimmed, leaving some comments about commit boundaries as well (seems too fine-grained)
I'll need to look at the new PageSource in more detail
plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/PinotSegmentPageSource.java
Show resolved
Hide resolved
plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/PinotPageSourceProvider.java
Outdated
Show resolved
Hide resolved
plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/PinotPageSourceProvider.java
Outdated
Show resolved
Hide resolved
plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/PinotPageSourceProvider.java
Outdated
Show resolved
Hide resolved
plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/PinotPageSourceProvider.java
Outdated
Show resolved
Hide resolved
We can re-order commits as
The below could/should be a single commit adding GRPC support (it was a bit difficult to see the "whole" GRPC streaming feature in current shape). Add PinotConfig options for the grpc client |
For the GRPC streaming is the only retained memory the one consumed by the byte buffers used for retrieving the Grpc responses? Can we account for them in the PageSource? Anything else that would need to be accounted (DataTable?)? |
Reoardered the commits but had to keep re: memory size: the entire payload in bytes from the response is now added to the memory usage. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some comments about commits but looks good overall.
@Praveen2112 Can you please take a look at last commit to see if the PageSource implementation looks correct (in terms of memory accounting, isFinished and getReadTimeNanos).
plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/PinotSegmentPageSource.java
Show resolved
Hide resolved
plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/PinotSegmentPageSource.java
Outdated
Show resolved
Hide resolved
plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/PinotSegmentPageSource.java
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm, thanks for adding this support @elonazoulay
plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/PinotSegmentPageSource.java
Show resolved
Hide resolved
@@ -58,6 +60,9 @@ | |||
private int maxRowsForBrokerQueries = 50_000; | |||
private boolean aggregationPushdownEnabled = true; | |||
private boolean countDistinctPushdownEnabled = true; | |||
private boolean grpcEnabled; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this be enabled by default?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep, less impact on the pinot servers and allows for larger result sets.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pinot servers will have to enable the endpoint in their configuration though, by default it's not enabled. Can add a note in the documentation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've turned on the grpc flag on pinot by default recently. I think we can make this enabled after one or two versions.
public Iterator<Server.ServerResponse> submitQuery(String query, String serverHost, List<String> segments) | ||
{ | ||
HostAndPort mappedHostAndPort = pinotHostMapper.getServerGrpcHostAndPort(serverHost, grpcPort); | ||
GrpcQueryClient client = clientCache.computeIfAbsent(mappedHostAndPort, k -> new GrpcQueryClient(mappedHostAndPort.getHost(), mappedHostAndPort.getPort())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The lambda should use k
. Also, give it a more meaninful name. At a minimum, key
, but maybe hostAndPort
would make it more obvious and readable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Who manages the lifecycle of these clients? Do they need to be torn down? Can they go into an "invalid" state (in which case, they'd need to be refreshed)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Grpc clients are long lived and use internal threads to manage connections.
The default idle time is 30 minutes for the ManagedChannel to close idle connections.
I added life cycle management and a comment in the code.
- DONE: Manage lifecycle - close grpc clients on shutdown
- DONE: Separate configs and modules for grpc and legacy query clients
- DONE: Remove unused configs (separate commit)
- TODO: extract to separate commits
PinotQueryClient pinotQueryClient, | ||
PinotGrpcClient pinotGrpcClient) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Given that the grpc setting is global to the server, Instead of passing both, it'd be better for these to implement a common interface and pass the appropriate one depending how the connector was initialized.
68a14b4
to
b068fbb
Compare
public HostAndPort getServerGrpcHostAndPort(String serverHost, int grpcPort) | ||
{ | ||
ServerInstance serverInstance = getServerInstance(serverHost); | ||
return HostAndPort.fromParts(serverInstance.getHostname(), grpcPort); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually ServerInstance
has an API to get grpcPort, so you don't need to pass it.
https://github.com/apache/pinot/blob/master/pinot-core/src/main/java/org/apache/pinot/core/transport/ServerInstance.java#L93
This API can just be: public HostAndPort getServerGrpcHostAndPort(String serverHost)
In this case, you don't even need to have the config for grpcPort, just make this best effort try. If the grpc port is -1, you return null here, and the query will use netty query endpoint. Once server has grpc, then the query will use grpc query endpoint.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tried this connecting to a real cluster and the server instance returns -1
for the grpc port, it did not work. Is it ok to keep it explicitly specified? lmk what you think @xiangfu0 .
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You mean grpc is enabled but serverInstance gives grpc port -1? If that is the case, please keep this grpc port config as the backup and use the grpc port that > 0 if from the ServerInstance.
So the logic is best effort from ServerInstance if possible or use grpc port if not provided.
public class PinotGrpcServerQueryClientConfig | ||
{ | ||
private int maxRowsPerSplitForSegmentQueries = Integer.MAX_VALUE - 1; | ||
private int grpcPort = 8090; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we don't need this grpcPort, just get it directly if grpc is enabled.
@martint can you review this again? |
{ | ||
Map<String, String> metadata = dataTable.getMetadata(); | ||
List<String> exceptions = new ArrayList<>(); | ||
metadata.forEach((k, v) -> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use more descriptive names for k
and v
. It's not clear what they represent.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed, was from legacy code. Also thanks to @xiangfu0 for the commit!
interface Factory | ||
{ | ||
PinotDataFetcher create(ConnectorSession session, String query, PinotSplit split); | ||
} | ||
|
||
interface PinotServerQueryClient | ||
{ | ||
Iterator<PinotDataTableWithSize> queryPinot(ConnectorSession session, String query, String serverHost, List<String> segments); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need both a Factory and a PinotServerQueryClient interface? I'm not sure I understand the relationship between the two. Why do we need different implementations of the Factory, given that the query client is already abstracted to support both underlying protocols?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Make sense, I think we just need factory no need for PinotServerQueryClient
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good! Updating.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated, apologies, should have removed that before.
plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/client/PinotGrpcDataFetcher.java
Outdated
Show resolved
Hide resolved
TODO: extract to separate commits, lmk if that will make it easier to review |
I feel it's good to keep the PR here for a full complete feature. It's also simpler for future reference. |
plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/client/PinotDataTableWithSize.java
Outdated
Show resolved
Hide resolved
plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/client/PinotGrpcDataFetcher.java
Outdated
Show resolved
Hide resolved
plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/client/PinotGrpcDataFetcher.java
Outdated
Show resolved
Hide resolved
plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/client/PinotGrpcDataFetcher.java
Outdated
Show resolved
Hide resolved
plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/client/PinotGrpcDataFetcher.java
Outdated
Show resolved
Hide resolved
...no-pinot/src/main/java/io/trino/plugin/pinot/client/PinotGrpcServerQueryClientTlsConfig.java
Outdated
Show resolved
Hide resolved
...no-pinot/src/main/java/io/trino/plugin/pinot/client/PinotGrpcServerQueryClientTlsConfig.java
Outdated
Show resolved
Hide resolved
...no-pinot/src/main/java/io/trino/plugin/pinot/client/PinotGrpcServerQueryClientTlsConfig.java
Outdated
Show resolved
Hide resolved
plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/client/PinotLegacyDataFetcher.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for putting this up!
GrpcQueryClient create(HostAndPort hostAndPort); | ||
} | ||
|
||
private static void addIfNotNull(ImmutableMap.Builder<String, Object> propertiesBuilder, String key, Object value) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is no longer used
Thanks @martint @elonazoulay to get this PR in! 🥂 |
Description
Add support for querying Pinot via the more efficient GRPC endpoint.
New feature
Pinot connector
Related issues, pull requests, and links
Documentation
(x) Documentation issue #12944 is filed, and can be handled later.
Release notes
(x) Release notes entries required with the following suggested text: