Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding Ability for MSQ to write select results to durable storage. #14527

Merged
merged 15 commits into from
Jul 7, 2023

Conversation

cryptoe
Copy link
Contributor

@cryptoe cryptoe commented Jul 5, 2023

Description

One of the most requested features in druid is to have an ability to download big result sets.
As part of #14416 , we added an ability for MSQ to be queried via a query friendly endpoint. This PR builds upon that work and adds the ability for MSQ to write select results to durable storage.

We write the results to the durable storage location <prefix>/results/<queryId> in the druid frame format. This is exposed to users by
/v2/sql/statements/:queryId/results.

The broker now understands how to read frame files from durable storage and return the results as a json format to end users.

We also write the first 3000 rows of the result to task report to enable preview results for clients like the console.

Users can use this by setting the context flag
selectDestination:DURABLE_STORAGE while issuing select q's to MSQ.
Note that DURABLE_STORAGE needs to be configured on broker, overlord, ingestion tasks/indexers captured at : https://druid.apache.org/docs/latest/multi-stage-query/reference.html#enable-durable-storage


Key changed/added classes in this PR
  • SQLStatementResource
  • QueryResultsFrameProcessor
  • ControllerImpl
  • WorkerImpl
  • DurableStorageInputChannelFactory
  • DurableStorageOutputChannelFactory

TODO:

  • Add more UT's

User facing docs for both # #14416 and this PR would be raised once this is merged.

This PR has:

  • been self-reviewed.
  • added documentation for new or modified features or behaviors.
  • a release note entry in the PR description.
  • added Javadocs for most classes and all non-trivial methods. Linked related entities via Javadoc links.
  • added or updated version, license, or notice information in licenses.yaml
  • added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader.
  • added unit tests or modified existing tests to cover new code paths, ensuring the threshold for code coverage is met.
  • added integration tests.
  • been tested in a test Druid cluster.

@cryptoe cryptoe added the Area - MSQ For multi stage queries - https://github.com/apache/druid/issues/12262 label Jul 5, 2023
@@ -98,6 +99,9 @@ public void configure(Binder binder)
.addBinding()
.to(DurableStorageCleaner.class);
}
} else {
// bind with nil implementation so that configs are not required during service startups.
binder.bind(Key.get(StorageConnector.class, MultiStageQuery.class)).toInstance(NilStorageConnector.getInstance());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this change just as a convenience for users? Or is this required in some way for this PR?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since the SQLStatementResource needs a binding of the StorageConnector, brokers would need storage Connector configs to start.

With this change, we bind a nil implementation so that those configs are not necessary.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In that case

  1. We should selectively bind the implementation to the Broker Nodes only. For MSQ stuff residing in the indexers, we have an extensive system in place that determines whether durable storage is enabled.
  2. This still seems slightly out of place, where each method throws a UOE for the user. IMO we should do something like this: Wherever we are using a storage connector in the broker, we should check isDurableStorageEnabled - as we do in the MSQ engine - which would basically check if the connector is a NilStorageConnector and then throw an error which is directed at the ADMIN/OPERATOR of the cluster.
  3. The UOE in the connector's implementation should be a defensive mechanism, and we should ensure that it is never called.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Discussed here :
#14527 (comment)

@@ -1319,7 +1342,7 @@ private void writeDurableStorageSuccessFileIfNeeded(final int stageNumber)
e,
"Unable to create the success file [%s] at the location [%s]",
DurableStorageUtils.SUCCESS_MARKER_FILENAME,
DurableStorageUtils.getSuccessFilePath(
DurableStorageUtils.getWorkerOutputSuccessFilePath(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we get the worker output path from the DurableStorageOutputChannelFactory instead?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch!!

@@ -35,7 +36,7 @@

public class DataSourceMSQDestination implements MSQDestination
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason to not move MSQDestination to org.apache.druid.msq.indexing.destination as well?

@Override
public boolean pathExists(String path)
{
throw new UOE("Please configure durable storage.");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can refactor this into a method and call that method from everywhere

Suggested change
throw new UOE("Please configure durable storage.");
private void throwError()
{
throw new UOE("Please configure durable storage.");
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, we should be replacing UOE with a DruidException, and the category should be defensive since we don't expect these methods to be called under normal circumstances. The top-level code should detect if the implementation is a NilStorageConnector, and then not call the method

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can refactor this into a method and call that method from everywhere

This seems better. Used a version of this.

The top-level code should detect if the implementation is a NilStorageConnector, and then not call the method

We already do that in multiple places

  • SqlStatementResource#contextChecks
  • MSQTasks.makeStorageConnector()

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add a javadoc for this class explaining the necessity of this

return injector.getInstance(Key.get(StorageConnector.class, MultiStageQuery.class));
StorageConnector storageConnector = injector.getInstance(Key.get(StorageConnector.class, MultiStageQuery.class));
if (storageConnector instanceof NilStorageConnector) {
throw new Exception("Storage connector not configured.");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Once we selectively bind the NilStorageConnector to Broker nodes only, this check won't be required right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is also one approach. Since we would need to check this in the broker as well, I thought we can make it similar in all places.

I think what I can do is keep doing this check but also bind the NilStorageConnector only to the broker role.


@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
@JsonSubTypes(value = {
@JsonSubTypes.Type(name = DataSourceMSQDestination.TYPE, value = DataSourceMSQDestination.class),
@JsonSubTypes.Type(name = TaskReportMSQDestination.TYPE, value = TaskReportMSQDestination.class)
@JsonSubTypes.Type(name = TaskReportMSQDestination.TYPE, value = TaskReportMSQDestination.class),
@JsonSubTypes.Type(name = DurableStorageDestination.TYPE, value = DurableStorageDestination.class)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
@JsonSubTypes.Type(name = DurableStorageDestination.TYPE, value = DurableStorageDestination.class)
@JsonSubTypes.Type(name = DurableStorageMSQDestination.TYPE, value = DurableStorageDestination.class)

nit: For consistency with the other two names, this class could be renamed

@@ -263,4 +265,9 @@ public static boolean isIngestion(final MSQSpec querySpec)
{
return querySpec.getDestination() instanceof DataSourceMSQDestination;
}

public static boolean writeResultsToDurableStorage(final MSQSpec querySpec)
Copy link
Contributor

@LakshSingla LakshSingla Jul 6, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Naming. From the name, the method seems like it writes the results to durable storage and returns whether the write was successful.

Suggested change
public static boolean writeResultsToDurableStorage(final MSQSpec querySpec)
public static boolean isWriteResultsToDurableStorage(final MSQSpec querySpec)

partition.getPartitionNumber(),
i -> {
try {
return outputChannelFactory.openChannel(i);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there was some discussion to change this to open inside the processor.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this would need a change in both GroupByPostFrameProcessorFactory and this new QueryResultsFrameProceesor.

Since I donot want to accidentally break GroupByPostFrameProcessorFactory so close to the cut off , we can change it as part of a follow up PR.
WDYT ?

)
{
// Expecting a single input slice from some prior stage.
final StageInputSlice slice = (StageInputSlice) Iterables.getOnlyElement(inputSlices);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: This returns a pretty bad exception message when it fails.
While we don't expect it to ever hit, it would be good if we can find out the elements that were present in the input slices (if it fails). Can we manually check and extract the slice instead of this helper method?

}

@Override
public void cleanup() throws IOException
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This relates to the above point of opening the channel in the factory.

  1. It is fine if we open the channel in the factory and close it in the factory
  2. Or if we open the channel in the processor and close it in the processor

However, currently we are opening the output channel in the processor and closing it in the factory. If the above point is addressed, then this cleanup method is fine. However currently, we should move it to the place where we are opening the output channel

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add Javadoc for this and the other new durable storage classes that have been created.

Copy link
Contributor

@adarshsanjeev adarshsanjeev left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The overall approach seems fine to me. Had a few more minor comments.

return Response.ok().build();
}
Optional<List<ColumnNameAndTypes>> signature = SqlStatementResourceHelper.getSignature(msqControllerTask);
if (!signature.isPresent() || MSQControllerTask.isIngestion(msqControllerTask.getQuerySpec())) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a need for both these conditions? Shouldn't signature be present only for selects?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is more of a defensive check which does not harm us.

2. Addressing review comments.
@cryptoe cryptoe added this to the 27.0 milestone Jul 7, 2023
Copy link
Contributor

@adarshsanjeev adarshsanjeev left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the update! The changes look good to me % some test failures

@cryptoe cryptoe merged commit afa8c7b into apache:master Jul 7, 2023
@cryptoe
Copy link
Contributor Author

cryptoe commented Jul 7, 2023

Thanks @adarshsanjeev and @LakshSingla for the review!!

@vogievetsky vogievetsky added the Needs web console change Backend API changes that would benefit from frontend support in the web console label Jul 18, 2023
sergioferragut pushed a commit to sergioferragut/druid that referenced this pull request Jul 21, 2023
…pache#14527)

One of the most requested features in druid is to have an ability to download big result sets.
As part of apache#14416 , we added an ability for MSQ to be queried via a query friendly endpoint. This PR builds upon that work and adds the ability for MSQ to write select results to durable storage.

We write the results to the durable storage location <prefix>/results/<queryId> in the druid frame format. This is exposed to users by
/v2/sql/statements/:queryId/results.
@vogievetsky vogievetsky removed the Needs web console change Backend API changes that would benefit from frontend support in the web console label Aug 8, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Area - MSQ For multi stage queries - https://github.com/apache/druid/issues/12262
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants