Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding Ability for MSQ to write select results to durable storage. #14527

Merged
merged 15 commits into from
Jul 7, 2023
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,13 @@ public Map<Integer, Map<Integer, CounterSnapshots>> copyMap()
return retVal;
}

public Map<Integer, CounterSnapshots> snapshotForStage(int stageNumber)
{
synchronized (snapshotsMap) {
return snapshotsMap.getOrDefault(stageNumber, null);
}
}

private void putAll(final Map<Integer, Map<Integer, CounterSnapshots>> otherMap)
{
synchronized (snapshotsMap) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,13 @@

package org.apache.druid.msq.counters;

import org.apache.druid.msq.indexing.processor.SegmentGeneratorFrameProcessor;
import org.apache.druid.segment.realtime.FireDepartmentMetrics;

/**
* Wrapper around {@link FireDepartmentMetrics} which updates the progress counters while updating its metrics. This
* is necessary as the {@link org.apache.druid.segment.realtime.appenderator.BatchAppenderator} used by the
* {@link org.apache.druid.msq.indexing.SegmentGeneratorFrameProcessor} is not part of the MSQ extension, and hence,
* {@link SegmentGeneratorFrameProcessor} is not part of the MSQ extension, and hence,
* cannot update the counters used in MSQ reports as it persists and pushes segments to deep storage.
*/
public class SegmentGeneratorMetricsWrapper extends FireDepartmentMetrics
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.druid.msq.counters.CounterSnapshots;
import org.apache.druid.msq.counters.CounterSnapshotsTree;
import org.apache.druid.msq.indexing.MSQControllerTask;
import org.apache.druid.msq.indexing.client.ControllerChatHandler;
import org.apache.druid.msq.indexing.error.MSQErrorReport;
import org.apache.druid.msq.statistics.PartialKeyStatisticsInformation;

Expand Down Expand Up @@ -83,7 +84,7 @@ public String getId()
/**
* Accepts a {@link PartialKeyStatisticsInformation} and updates the controller key statistics information. If all key
* statistics have been gathered, enqueues the task with the {@link WorkerSketchFetcher} to generate partiton boundaries.
* This is intended to be called by the {@link org.apache.druid.msq.indexing.ControllerChatHandler}.
* This is intended to be called by the {@link ControllerChatHandler}.
*/
void updatePartialKeyStatisticsInformation(int stageNumber, int workerNumber, Object partialKeyStatisticsInformationObject);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@
import org.apache.druid.frame.key.RowKey;
import org.apache.druid.frame.key.RowKeyReader;
import org.apache.druid.frame.processor.FrameProcessorExecutor;
import org.apache.druid.frame.processor.FrameProcessors;
import org.apache.druid.frame.util.DurableStorageUtils;
import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexer.TaskStatus;
Expand Down Expand Up @@ -86,17 +85,18 @@
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.msq.counters.CounterSnapshots;
import org.apache.druid.msq.counters.CounterSnapshotsTree;
import org.apache.druid.msq.indexing.DataSourceMSQDestination;
import org.apache.druid.msq.indexing.InputChannelFactory;
import org.apache.druid.msq.indexing.InputChannelsImpl;
import org.apache.druid.msq.indexing.MSQControllerTask;
import org.apache.druid.msq.indexing.MSQSelectDestination;
import org.apache.druid.msq.indexing.MSQSpec;
import org.apache.druid.msq.indexing.MSQTuningConfig;
import org.apache.druid.msq.indexing.MSQWorkerTaskLauncher;
import org.apache.druid.msq.indexing.SegmentGeneratorFrameProcessorFactory;
import org.apache.druid.msq.indexing.TaskReportMSQDestination;
import org.apache.druid.msq.indexing.WorkerCount;
import org.apache.druid.msq.indexing.client.ControllerChatHandler;
import org.apache.druid.msq.indexing.destination.DataSourceMSQDestination;
import org.apache.druid.msq.indexing.destination.DurableStorageMSQDestination;
import org.apache.druid.msq.indexing.destination.MSQSelectDestination;
import org.apache.druid.msq.indexing.destination.TaskReportMSQDestination;
import org.apache.druid.msq.indexing.error.CanceledFault;
import org.apache.druid.msq.indexing.error.CannotParseExternalDataFault;
import org.apache.druid.msq.indexing.error.FaultsExceededChecker;
Expand All @@ -114,6 +114,7 @@
import org.apache.druid.msq.indexing.error.TooManyWarningsFault;
import org.apache.druid.msq.indexing.error.UnknownFault;
import org.apache.druid.msq.indexing.error.WorkerRpcFailedFault;
import org.apache.druid.msq.indexing.processor.SegmentGeneratorFrameProcessorFactory;
import org.apache.druid.msq.indexing.report.MSQResultsReport;
import org.apache.druid.msq.indexing.report.MSQStagesReport;
import org.apache.druid.msq.indexing.report.MSQStatusReport;
Expand Down Expand Up @@ -154,24 +155,23 @@
import org.apache.druid.msq.querykit.ShuffleSpecFactories;
import org.apache.druid.msq.querykit.ShuffleSpecFactory;
import org.apache.druid.msq.querykit.groupby.GroupByQueryKit;
import org.apache.druid.msq.querykit.results.QueryResultFrameProcessorFactory;
import org.apache.druid.msq.querykit.scan.ScanQueryKit;
import org.apache.druid.msq.shuffle.DurableStorageInputChannelFactory;
import org.apache.druid.msq.shuffle.WorkerInputChannelFactory;
import org.apache.druid.msq.shuffle.input.DurableStorageInputChannelFactory;
import org.apache.druid.msq.shuffle.input.WorkerInputChannelFactory;
import org.apache.druid.msq.statistics.PartialKeyStatisticsInformation;
import org.apache.druid.msq.util.DimensionSchemaUtils;
import org.apache.druid.msq.util.IntervalUtils;
import org.apache.druid.msq.util.MSQFutureUtils;
import org.apache.druid.msq.util.MultiStageQueryContext;
import org.apache.druid.msq.util.PassthroughAggregatorFactory;
import org.apache.druid.msq.util.SqlStatementResourceHelper;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryContext;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.groupby.GroupByQuery;
import org.apache.druid.query.groupby.GroupByQueryConfig;
import org.apache.druid.query.scan.ScanQuery;
import org.apache.druid.segment.ColumnSelectorFactory;
import org.apache.druid.segment.ColumnValueSelector;
import org.apache.druid.segment.Cursor;
import org.apache.druid.segment.DimensionHandlerUtils;
import org.apache.druid.segment.column.ColumnHolder;
import org.apache.druid.segment.column.ColumnType;
Expand All @@ -186,7 +186,6 @@
import org.apache.druid.sql.calcite.planner.ColumnMapping;
import org.apache.druid.sql.calcite.planner.ColumnMappings;
import org.apache.druid.sql.calcite.rel.DruidQuery;
import org.apache.druid.sql.calcite.run.SqlResults;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentTimeline;
import org.apache.druid.timeline.partition.DimensionRangeShardSpec;
Expand All @@ -206,7 +205,6 @@
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -600,13 +598,24 @@ private QueryDefinition initializeQueryDefAndState(final Closer closer)

ImmutableMap.Builder<String, Object> taskContextOverridesBuilder = ImmutableMap.builder();
taskContextOverridesBuilder
.put(
MultiStageQueryContext.CTX_DURABLE_SHUFFLE_STORAGE,
isDurableStorageEnabled
).put(
MSQWarnings.CTX_MAX_PARSE_EXCEPTIONS_ALLOWED,
maxParseExceptions
);
.put(MultiStageQueryContext.CTX_DURABLE_SHUFFLE_STORAGE, isDurableStorageEnabled)
.put(MSQWarnings.CTX_MAX_PARSE_EXCEPTIONS_ALLOWED, maxParseExceptions);

if (!MSQControllerTask.isIngestion(task.getQuerySpec())) {
if (MSQControllerTask.writeResultsToDurableStorage(task.getQuerySpec())) {
taskContextOverridesBuilder.put(
MultiStageQueryContext.CTX_SELECT_DESTINATION,
MSQSelectDestination.DURABLE_STORAGE.name()
);
} else {
// we need not pass the value 'TaskReport' to the worker since the worker impl does not do anything in such a case.
// but we are passing it anyway for completeness
taskContextOverridesBuilder.put(
MultiStageQueryContext.CTX_SELECT_DESTINATION,
MSQSelectDestination.TASK_REPORT.name()
);
}
}
this.workerTaskLauncher = new MSQWorkerTaskLauncher(
id(),
task.getDataSource(),
Expand Down Expand Up @@ -683,7 +692,7 @@ private void addToRetryQueue(ControllerQueryKernel kernel, int worker, MSQFault
/**
* Accepts a {@link PartialKeyStatisticsInformation} and updates the controller key statistics information. If all key
* statistics information has been gathered, enqueues the task with the {@link WorkerSketchFetcher} to generate
* partiton boundaries. This is intended to be called by the {@link org.apache.druid.msq.indexing.ControllerChatHandler}.
* partiton boundaries. This is intended to be called by the {@link ControllerChatHandler}.
*/
@Override
public void updatePartialKeyStatisticsInformation(
Expand Down Expand Up @@ -1391,11 +1400,13 @@ private Yielder<Object[]> getFinalResultsYielder(

final InputChannelFactory inputChannelFactory;

if (isDurableStorageEnabled) {
if (isDurableStorageEnabled || MSQControllerTask.writeResultsToDurableStorage(task.getQuerySpec())) {
inputChannelFactory = DurableStorageInputChannelFactory.createStandardImplementation(
id(),
MSQTasks.makeStorageConnector(context.injector()),
closer
MSQTasks.makeStorageConnector(
context.injector()),
closer,
MSQControllerTask.writeResultsToDurableStorage(task.getQuerySpec())
);
} else {
inputChannelFactory = new WorkerInputChannelFactory(netClient, () -> taskIds);
Expand Down Expand Up @@ -1431,57 +1442,13 @@ private Yielder<Object[]> getFinalResultsYielder(
}
).collect(Collectors.toList())
).flatMap(
frame -> {
final Cursor cursor = FrameProcessors.makeCursor(
frame,
queryKernel.getStageDefinition(finalStageId).getFrameReader()
);

final ColumnSelectorFactory columnSelectorFactory = cursor.getColumnSelectorFactory();
final ColumnMappings columnMappings = task.getQuerySpec().getColumnMappings();
@SuppressWarnings("rawtypes")
final List<ColumnValueSelector> selectors =
columnMappings.getMappings()
.stream()
.map(
mapping ->
columnSelectorFactory.makeColumnValueSelector(mapping.getQueryColumn())
).collect(Collectors.toList());

final List<SqlTypeName> sqlTypeNames = task.getSqlTypeNames();
Iterable<Object[]> retVal = () -> new Iterator<Object[]>()
{
@Override
public boolean hasNext()
{
return !cursor.isDone();
}

@Override
public Object[] next()
{
final Object[] row = new Object[columnMappings.size()];
for (int i = 0; i < row.length; i++) {
final Object value = selectors.get(i).getObject();
if (sqlTypeNames == null || task.getSqlResultsContext() == null) {
// SQL type unknown, or no SQL results context: pass-through as is.
row[i] = value;
} else {
row[i] = SqlResults.coerce(
context.jsonMapper(),
task.getSqlResultsContext(),
value,
sqlTypeNames.get(i),
columnMappings.getOutputColumnName(i)
);
}
}
cursor.advance();
return row;
}
};
return Sequences.simple(retVal);
}
frame ->
SqlStatementResourceHelper.getResultSequence(
task,
queryDef.getFinalStageDefinition(),
frame,
context.jsonMapper()
)
)
.withBaggage(resultReaderExec::shutdownNow)
);
Expand Down Expand Up @@ -1571,6 +1538,10 @@ private static QueryDefinition makeQueryDefinition(
} else if (querySpec.getDestination() instanceof TaskReportMSQDestination) {
shuffleSpecFactory = ShuffleSpecFactories.singlePartition();
queryToPlan = querySpec.getQuery();
} else if (querySpec.getDestination() instanceof DurableStorageMSQDestination) {
// we add a final stage which generates one partition per worker.
shuffleSpecFactory = ShuffleSpecFactories.globalSortWithMaxPartitionCount(tuningConfig.getMaxNumWorkers());
queryToPlan = querySpec.getQuery();
} else {
throw new ISE("Unsupported destination [%s]", querySpec.getDestination());
}
Expand Down Expand Up @@ -1645,6 +1616,24 @@ private static QueryDefinition makeQueryDefinition(
return builder.build();
} else if (querySpec.getDestination() instanceof TaskReportMSQDestination) {
return queryDef;
} else if (querySpec.getDestination() instanceof DurableStorageMSQDestination) {

// attaching new query results stage always.
StageDefinition finalShuffleStageDef = queryDef.getFinalStageDefinition();
final QueryDefinitionBuilder builder = QueryDefinition.builder();
for (final StageDefinition stageDef : queryDef.getStageDefinitions()) {
builder.add(StageDefinition.builder(stageDef));
}

builder.add(StageDefinition.builder(queryDef.getNextStageNumber())
.inputs(new StageInputSpec(queryDef.getFinalStageDefinition().getStageNumber()))
.maxWorkerCount(tuningConfig.getMaxNumWorkers())
.signature(finalShuffleStageDef.getSignature())
.shuffleSpec(null)
.processorFactory(new QueryResultFrameProcessorFactory())
);

return builder.build();
} else {
throw new ISE("Unsupported destination [%s]", querySpec.getDestination());
}
Expand Down Expand Up @@ -1754,7 +1743,8 @@ private static boolean isRollupQuery(Query<?> query)

private static boolean isInlineResults(final MSQSpec querySpec)
{
return querySpec.getDestination() instanceof TaskReportMSQDestination;
return querySpec.getDestination() instanceof TaskReportMSQDestination
|| querySpec.getDestination() instanceof DurableStorageMSQDestination;
}

private static boolean isTimeBucketedIngestion(final MSQSpec querySpec)
Expand Down Expand Up @@ -2051,7 +2041,12 @@ private static MSQResultsReport makeResultsTaskReport(
);
}

return MSQResultsReport.createReportAndLimitRowsIfNeeded(mappedSignature.build(), sqlTypeNames, resultsYielder, selectDestination);
return MSQResultsReport.createReportAndLimitRowsIfNeeded(
mappedSignature.build(),
sqlTypeNames,
resultsYielder,
selectDestination
);
}

private static MSQStatusReport makeStatusReport(
Expand Down Expand Up @@ -2597,7 +2592,8 @@ private void cleanUpEffectivelyFinishedStages()
queryKernel,
(netClient, taskId, workerNumber) -> netClient.postCleanupStage(taskId, stageId),
queryKernel.getWorkerInputsForStage(stageId).workers(),
(ignore1) -> {},
(ignore1) -> {
},
false
);
queryKernel.finishStage(stageId, true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.apache.druid.msq.statistics.KeyCollectors;
import org.apache.druid.segment.column.ColumnHolder;
import org.apache.druid.server.DruidNode;
import org.apache.druid.storage.NilStorageConnector;
import org.apache.druid.storage.StorageConnector;

import javax.annotation.Nullable;
Expand Down Expand Up @@ -155,7 +156,11 @@ static String getHostFromSelfNode(@Nullable final DruidNode selfNode)
static StorageConnector makeStorageConnector(final Injector injector)
{
try {
return injector.getInstance(Key.get(StorageConnector.class, MultiStageQuery.class));
StorageConnector storageConnector = injector.getInstance(Key.get(StorageConnector.class, MultiStageQuery.class));
if (storageConnector instanceof NilStorageConnector) {
throw new Exception("Storage connector not configured.");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Once we selectively bind the NilStorageConnector to Broker nodes only, this check won't be required right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is also one approach. Since we would need to check this in the broker as well, I thought we can make it similar in all places.

I think what I can do is keep doing this check but also bind the NilStorageConnector only to the broker role.

}
return storageConnector;
}
catch (Exception e) {
throw new MSQException(new DurableStorageConfigurationFault(e.toString()));
Expand Down
Loading