Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ability for MSQ tasks to query realtime tasks #15024

Merged
merged 41 commits into from
Oct 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
4dc7d68
Add metadata fetch
adarshsanjeev Sep 8, 2023
c2289d1
Add ability to query data servers
adarshsanjeev Sep 22, 2023
5296d1d
Cleanup
adarshsanjeev Sep 22, 2023
80da7b2
Add smile format
adarshsanjeev Sep 24, 2023
e2fe883
Clean
adarshsanjeev Sep 24, 2023
faeec15
Refactor code
adarshsanjeev Sep 25, 2023
0afb4d3
Cover handoff cases
adarshsanjeev Sep 26, 2023
46f84c8
Resolve bug
adarshsanjeev Sep 26, 2023
3183f37
Change handoff from exception to return
adarshsanjeev Sep 27, 2023
2570840
Add smile check
adarshsanjeev Sep 27, 2023
8d7d6a9
Change realtime check
adarshsanjeev Sep 27, 2023
92c6d75
Add logs
adarshsanjeev Sep 28, 2023
6bfa3a6
Fix checkstyles
adarshsanjeev Sep 28, 2023
065b7fd
Change return to yielder
adarshsanjeev Sep 28, 2023
f9a05d0
Fix NPE
adarshsanjeev Sep 29, 2023
3ea2b43
Add unit tests
adarshsanjeev Oct 1, 2023
763fea2
Address review comments
adarshsanjeev Oct 1, 2023
9c9e099
Add unit tests
adarshsanjeev Oct 1, 2023
c4189de
Merge remote-tracking branch 'origin/master' into realtime-msq
adarshsanjeev Oct 1, 2023
d5199d7
Address review comments
adarshsanjeev Oct 1, 2023
53bf349
Delete unwanted test
adarshsanjeev Oct 1, 2023
45bdfa4
Add unit tests
adarshsanjeev Oct 2, 2023
85c4923
Add unit tests
adarshsanjeev Oct 2, 2023
c3222cc
Fix tests
adarshsanjeev Oct 3, 2023
5587fd5
Add documentation
adarshsanjeev Oct 3, 2023
cc3db10
Fix tests
adarshsanjeev Oct 3, 2023
ed5b461
Add tests and remove interface
adarshsanjeev Oct 3, 2023
7fedc60
Add check for case where realtime servers cannot be contacted
adarshsanjeev Oct 4, 2023
e4792f9
Cancel query on interrupt
adarshsanjeev Oct 4, 2023
a10d5a7
Merge remote-tracking branch 'origin/master' into realtime-msq
adarshsanjeev Oct 4, 2023
fb27479
Resolve test failures
adarshsanjeev Oct 4, 2023
705e53b
Merge remote-tracking branch 'origin/master' into realtime-msq
adarshsanjeev Oct 5, 2023
190880a
Fix bug with certain groupby queries
adarshsanjeev Oct 5, 2023
14f0d5b
Merge remote-tracking branch 'origin/master' into realtime-msq
adarshsanjeev Oct 5, 2023
feec36f
Resolve unclean merge
adarshsanjeev Oct 5, 2023
d295b36
Address review comments
adarshsanjeev Oct 7, 2023
69eb9c8
Address review comments
adarshsanjeev Oct 7, 2023
d235618
Address review comments
adarshsanjeev Oct 8, 2023
e573e31
Resolve checkstyle failures
adarshsanjeev Oct 8, 2023
7ddf025
Address review comments
adarshsanjeev Oct 9, 2023
1cfe72a
Add log statement
adarshsanjeev Oct 9, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions docs/multi-stage-query/known-issues.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,6 @@ an [UnknownError](./reference.md#error_UnknownError) with a message including "N

## `SELECT` Statement

- `SELECT` from a Druid datasource does not include unpublished real-time data.

- `GROUPING SETS` and `UNION ALL` are not implemented. Queries using these features return a
[QueryNotSupported](reference.md#error_QueryNotSupported) error.

Expand Down
1 change: 1 addition & 0 deletions docs/multi-stage-query/reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,7 @@ The following table lists the context parameters for the MSQ task engine:
| `faultTolerance` | SELECT, INSERT, REPLACE<br /><br /> Whether to turn on fault tolerance mode or not. Failed workers are retried based on [Limits](#limits). Cannot be used when `durableShuffleStorage` is explicitly set to false. | `false` |
| `selectDestination` | SELECT<br /><br /> Controls where the final result of the select query is written. <br />Use `taskReport`(the default) to write select results to the task report. <b> This is not scalable since task reports size explodes for large results </b> <br/>Use `durableStorage` to write results to durable storage location. <b>For large results sets, its recommended to use `durableStorage` </b>. To configure durable storage see [`this`](#durable-storage) section. | `taskReport` |
| `waitTillSegmentsLoad` | INSERT, REPLACE<br /><br /> If set, the ingest query waits for the generated segment to be loaded before exiting, else the ingest query exits without waiting. The task and live reports contain the information about the status of loading segments if this flag is set. This will ensure that any future queries made after the ingestion exits will include results from the ingestion. The drawback is that the controller task will stall till the segments are loaded. | `false` |
| `includeSegmentSource` | SELECT, INSERT, REPLACE<br /><br /> Controls the sources, which will be queried for results in addition to the segments present on deep storage. Can be `NONE` or `REALTIME`. If this value is `NONE`, only non-realtime (published and used) segments will be downloaded from deep storage. If this value is `REALTIME`, results will also be included from realtime tasks. | `NONE` |

adarshsanjeev marked this conversation as resolved.
Show resolved Hide resolved
## Joins

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Iterators;
import com.google.common.util.concurrent.FutureCallback;
Expand All @@ -39,6 +40,7 @@
import it.unimi.dsi.fastutil.ints.IntList;
import it.unimi.dsi.fastutil.ints.IntSet;
import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.druid.client.ImmutableSegmentLoadInfo;
import org.apache.druid.common.guava.FutureUtils;
import org.apache.druid.data.input.StringTuple;
import org.apache.druid.data.input.impl.DimensionSchema;
Expand Down Expand Up @@ -140,6 +142,7 @@
import org.apache.druid.msq.input.stage.StageInputSlice;
import org.apache.druid.msq.input.stage.StageInputSpec;
import org.apache.druid.msq.input.stage.StageInputSpecSlicer;
import org.apache.druid.msq.input.table.DataSegmentWithLocation;
import org.apache.druid.msq.input.table.TableInputSpec;
import org.apache.druid.msq.input.table.TableInputSpecSlicer;
import org.apache.druid.msq.kernel.GlobalSortTargetSizeShuffleSpec;
Expand Down Expand Up @@ -187,6 +190,7 @@
import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
import org.apache.druid.segment.transform.TransformSpec;
import org.apache.druid.server.DruidNode;
import org.apache.druid.server.coordination.DruidServerMetadata;
import org.apache.druid.sql.calcite.planner.ColumnMapping;
import org.apache.druid.sql.calcite.planner.ColumnMappings;
import org.apache.druid.sql.calcite.rel.DruidQuery;
Expand Down Expand Up @@ -1163,14 +1167,73 @@ private QueryKit makeQueryControllerToolKit()

private DataSegmentTimelineView makeDataSegmentTimelineView()
{
final SegmentSource includeSegmentSource = MultiStageQueryContext.getSegmentSources(
task.getQuerySpec()
.getQuery()
.context()
);

final boolean includeRealtime = SegmentSource.shouldQueryRealtimeServers(includeSegmentSource);

return (dataSource, intervals) -> {
final Collection<DataSegment> dataSegments =
final Iterable<ImmutableSegmentLoadInfo> realtimeAndHistoricalSegments;

// Fetch the realtime segments and segments loaded on the historical. Do this first so that we don't miss any
// segment if they get handed off between the two calls. Segments loaded on historicals are deduplicated below,
// since we are only interested in realtime segments for now.
if (includeRealtime) {
realtimeAndHistoricalSegments = context.coordinatorClient().fetchServerViewSegments(dataSource, intervals);
} else {
realtimeAndHistoricalSegments = ImmutableList.of();
}

// Fetch all published, used segments (all non-realtime segments) from the metadata store.
final Collection<DataSegment> publishedUsedSegments =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it make sense to have DataSegmentWithLocation here ?
Location can be null for segments which are fetched from deep storage

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can be done in a follow up PR.

FutureUtils.getUnchecked(context.coordinatorClient().fetchUsedSegments(dataSource, intervals), true);

if (dataSegments.isEmpty()) {
int realtimeCount = 0;

// Deduplicate segments, giving preference to published used segments.
// We do this so that if any segments have been handed off in between the two metadata calls above,
// we directly fetch it from deep storage.
Set<DataSegment> unifiedSegmentView = new HashSet<>(publishedUsedSegments);

// Iterate over the realtime segments and segments loaded on the historical
for (ImmutableSegmentLoadInfo segmentLoadInfo : realtimeAndHistoricalSegments) {
adarshsanjeev marked this conversation as resolved.
Show resolved Hide resolved
ImmutableSet<DruidServerMetadata> servers = segmentLoadInfo.getServers();
// Filter out only realtime servers. We don't want to query historicals for now, but we can in the future.
// This check can be modified then.
Set<DruidServerMetadata> realtimeServerMetadata
= servers.stream()
adarshsanjeev marked this conversation as resolved.
Show resolved Hide resolved
.filter(druidServerMetadata -> includeSegmentSource.getUsedServerTypes()
.contains(druidServerMetadata.getType())
)
.collect(Collectors.toSet());
if (!realtimeServerMetadata.isEmpty()) {
adarshsanjeev marked this conversation as resolved.
Show resolved Hide resolved
realtimeCount += 1;
DataSegmentWithLocation dataSegmentWithLocation = new DataSegmentWithLocation(
segmentLoadInfo.getSegment(),
realtimeServerMetadata
);
unifiedSegmentView.add(dataSegmentWithLocation);
} else {
// We don't have any segments of the required segment source, ignore the segment
}
adarshsanjeev marked this conversation as resolved.
Show resolved Hide resolved
}

if (includeRealtime) {
log.info(
"Fetched total [%d] segments from coordinator: [%d] from metadata stoure, [%d] from server view",
unifiedSegmentView.size(),
publishedUsedSegments.size(),
realtimeCount
);
}

if (unifiedSegmentView.isEmpty()) {
return Optional.empty();
} else {
return Optional.of(SegmentTimeline.forSegments(dataSegments));
return Optional.of(SegmentTimeline.forSegments(unifiedSegmentView));
}
};
}
Expand Down
Loading