-
Notifications
You must be signed in to change notification settings - Fork 3.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add ability for MSQ tasks to query realtime tasks #15024
Conversation
...-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/TaskDataSegmentProvider.java
Fixed
Show fixed
Hide fixed
.../multi-stage-query/src/main/java/org/apache/druid/msq/input/table/RichSegmentDescriptor.java
Fixed
Show fixed
Hide fixed
...ery/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByPreShuffleFrameProcessor.java
Fixed
Show fixed
Hide fixed
...ti-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessor.java
Fixed
Show fixed
Hide fixed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewed the overall design. Left some comments.
Please add UT's for this.
...s-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/MultiStageQueryContext.java
Outdated
Show resolved
Hide resolved
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
Outdated
Show resolved
Hide resolved
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
Outdated
Show resolved
Hide resolved
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
Outdated
Show resolved
Hide resolved
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
Show resolved
Hide resolved
...ore/multi-stage-query/src/main/java/org/apache/druid/msq/exec/LoadedSegmentDataProvider.java
Outdated
Show resolved
Hide resolved
...multi-stage-query/src/main/java/org/apache/druid/msq/exec/LoadedSegmentDataProviderImpl.java
Outdated
Show resolved
Hide resolved
...ulti-stage-query/src/main/java/org/apache/druid/msq/input/table/DataSegmentWithLocation.java
Show resolved
Hide resolved
...re/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/BaseLeafFrameProcessor.java
Show resolved
Hide resolved
.../multi-stage-query/src/main/java/org/apache/druid/msq/input/table/SegmentWithDescriptor.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since numRetriesOnMissingSegments is not added in the documentation, perhaps we should leave it out of the release notes as well.
...ulti-stage-query/src/main/java/org/apache/druid/msq/input/table/DataSegmentWithLocation.java
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Classes left are
BaseLeafFrameProcesor
DataServerClient
LoadedSegmentDataProvider
GroupingEngine
...ore/multi-stage-query/src/main/java/org/apache/druid/msq/exec/LoadedSegmentDataProvider.java
Outdated
Show resolved
Hide resolved
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/SegmentSource.java
Show resolved
Hide resolved
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
Outdated
Show resolved
Hide resolved
.../multi-stage-query/src/main/java/org/apache/druid/msq/input/table/RichSegmentDescriptor.java
Outdated
Show resolved
Hide resolved
.../multi-stage-query/src/main/java/org/apache/druid/msq/input/table/SegmentWithDescriptor.java
Outdated
Show resolved
Hide resolved
...-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerFrameContext.java
Show resolved
Hide resolved
...ore/multi-stage-query/src/main/java/org/apache/druid/msq/exec/LoadedSegmentDataProvider.java
Outdated
Show resolved
Hide resolved
...ore/multi-stage-query/src/main/java/org/apache/druid/msq/exec/LoadedSegmentDataProvider.java
Outdated
Show resolved
Hide resolved
server/src/main/java/org/apache/druid/discovery/DataServerClient.java
Outdated
Show resolved
Hide resolved
@Override | ||
public ClientResponse<AppendableByteArrayInputStream> handleResponse(HttpResponse response, TrafficCop trafficCop) | ||
{ | ||
log.debug("Received response status[%s] for queryId[%s]", response.getStatus(), queryId); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we check for timeout?
Shoudnt traffic cop be used for backpressure?
We can do these things in a follow up PR as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What about metrics like bytes scanned and stuff ?
server/src/main/java/org/apache/druid/discovery/DataServerResponseHandler.java
Show resolved
Hide resolved
StandardRetryPolicy.noRetries() | ||
); | ||
this.objectMapper = objectMapper; | ||
this.queryCancellationExecutor = Execs.scheduledSingleThreaded("query-cancellation-executor"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we make a pool for this. For each segment you are creating a new executor service which seems wasteful.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Who closes this executor service.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Moved this to a common threadpool and added a wait to close it.
try { | ||
statusSequencePair = RetryUtils.retry( | ||
() -> { | ||
Sequence<QueryType> sequence = dataServerClient.run(preparedQuery, responseContext, queryResultType, closer) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What happen's if sequence is interrupted or closed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be thrown outside this class, which would cause the query to fail
...ore/multi-stage-query/src/main/java/org/apache/druid/msq/exec/LoadedSegmentDataProvider.java
Outdated
Show resolved
Hide resolved
...ore/multi-stage-query/src/main/java/org/apache/druid/msq/exec/LoadedSegmentDataProvider.java
Show resolved
Hide resolved
} else { | ||
Boolean wasHandedOff = checkSegmentHandoff(coordinatorClient, dataSource, segmentDescriptor); | ||
if (Boolean.TRUE.equals(wasHandedOff)) { | ||
log.debug("Segment[%s] was handed off.", segmentDescriptor); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we log the counts in the stage somewhere ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will add this as part of the counters
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Left some comments.
LGTM overall.
statusSequencePair = RetryUtils.retry( | ||
() -> { | ||
ServiceLocation serviceLocation = Iterables.getOnlyElement(fixedSetServiceLocator.locate().get().getLocations()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: Use collectionUtils.getOnlyElement() so that a nicer error message is thrown.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed
...ore/multi-stage-query/src/main/java/org/apache/druid/msq/exec/LoadedSegmentDataProvider.java
Outdated
Show resolved
Hide resolved
*/ | ||
public enum SegmentSource | ||
{ | ||
NONE(ImmutableSet.of()), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
NIt: Lets add that None also includes deep storage segments.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added comments here
...ti-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessor.java
Show resolved
Hide resolved
...multi-stage-query/src/test/java/org/apache/druid/msq/exec/LoadedSegmentDataProviderTest.java
Show resolved
Hide resolved
...ns-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/CalciteMSQTestsHelper.java
Show resolved
Hide resolved
server/src/main/java/org/apache/druid/discovery/DataServerClient.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Change LGTM.
This PR aims to add the capabilities to: 1. Fetch the realtime segment metadata from the coordinator server view, 2. Adds the ability for workers to query indexers, similar to how brokers do the same for native queries.
This PR aims to add the capabilities to: 1. Fetch the realtime segment metadata from the coordinator server view, 2. Adds the ability for workers to query indexers, similar to how brokers do the same for native queries.
Currently, MSQ is unable to include realtime results from realtime queries.
This problem is present both in the controller, which only fetches metadata from the metadata store so that only published segments would be read and the worker, which reads segments from deep storage and lacks the ability to communicate with data servers.
This PR aims to add the capabilities to:
Release Notes
includeSegmentSource
which can be eitherNONE
(default) orREALTIME
. If this parameter is set toREALTIME
, MSQ will include real time segments in the query results.This PR has: