Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce SparkParameterComposerCollection #2774

Merged
merged 3 commits into from
Jul 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions async-query-core/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ Following is the list of extension points where the consumer of the library need
- [QueryIdProvider](src/main/java/org/opensearch/sql/spark/dispatcher/QueryIdProvider.java)
- [SessionIdProvider](src/main/java/org/opensearch/sql/spark/execution/session/SessionIdProvider.java)
- [SessionConfigSupplier](src/main/java/org/opensearch/sql/spark/execution/session/SessionConfigSupplier.java)
- [SparkExecutionEngineConfigSupplier](src/main/java/org/opensearch/sql/spark/config/SparkExecutionEngineConfigSupplier.java)
- [SparkSubmitParameterModifier](src/main/java/org/opensearch/sql/spark/config/SparkSubmitParameterModifier.java)
- [EMRServerlessClientFactory](src/main/java/org/opensearch/sql/spark/client/EMRServerlessClientFactory.java)
- [SparkExecutionEngineConfigSupplier](src/main/java/org/opensearch/sql/spark/config/SparkExecutionEngineConfigSupplier.java)
- [DataSourceSparkParameterComposer](src/main/java/org/opensearch/sql/spark/parameter/DataSourceSparkParameterComposer.java)
- [GeneralSparkParameterComposer](src/main/java/org/opensearch/sql/spark/parameter/GeneralSparkParameterComposer.java)
- [SparkSubmitParameterModifier](src/main/java/org/opensearch/sql/spark/config/SparkSubmitParameterModifier.java) To be deprecated in favor of GeneralSparkParameterComposer

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
package org.opensearch.sql.spark.config;

import org.opensearch.sql.spark.asyncquery.model.SparkSubmitParameters;
import org.opensearch.sql.spark.parameter.SparkSubmitParametersBuilder;

/**
* Interface for extension point to allow modification of spark submit parameter. modifyParameter
* method is called after the default spark submit parameter is build.
* method is called after the default spark submit parameter is build. To be deprecated in favor of
* {@link org.opensearch.sql.spark.parameter.GeneralSparkParameterComposer}
*/
public interface SparkSubmitParameterModifier {
Copy link
Member

@vamsi-amazon vamsi-amazon Jun 28, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When does modifier come into picture?
So we have two concepts composer and modifier. is modifier used after the completion of building spark submit parameters using composer?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am thinking to remove Modifier once I can verify the Composer can satisfy the use cases.

void modifyParameters(SparkSubmitParameters parameters);
void modifyParameters(SparkSubmitParametersBuilder parametersBuilder);
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import org.json.JSONObject;
import org.opensearch.sql.datasource.model.DataSourceMetadata;
import org.opensearch.sql.spark.asyncquery.model.AsyncQueryJobMetadata;
import org.opensearch.sql.spark.asyncquery.model.SparkSubmitParameters;
import org.opensearch.sql.spark.client.EMRServerlessClient;
import org.opensearch.sql.spark.client.StartJobRequest;
import org.opensearch.sql.spark.dispatcher.model.DispatchQueryContext;
Expand All @@ -25,6 +24,7 @@
import org.opensearch.sql.spark.dispatcher.model.JobType;
import org.opensearch.sql.spark.leasemanager.LeaseManager;
import org.opensearch.sql.spark.metrics.MetricsService;
import org.opensearch.sql.spark.parameter.SparkSubmitParametersBuilderProvider;
import org.opensearch.sql.spark.response.JobExecutionResponseReader;

/**
Expand All @@ -37,6 +37,7 @@ public class BatchQueryHandler extends AsyncQueryHandler {
protected final JobExecutionResponseReader jobExecutionResponseReader;
protected final LeaseManager leaseManager;
protected final MetricsService metricsService;
protected final SparkSubmitParametersBuilderProvider sparkSubmitParametersBuilderProvider;

@Override
protected JSONObject getResponseFromResultIndex(AsyncQueryJobMetadata asyncQueryJobMetadata) {
Expand Down Expand Up @@ -80,12 +81,16 @@ public DispatchQueryResponse submit(
dispatchQueryRequest.getAccountId(),
dispatchQueryRequest.getApplicationId(),
dispatchQueryRequest.getExecutionRoleARN(),
SparkSubmitParameters.builder()
sparkSubmitParametersBuilderProvider
.getSparkSubmitParametersBuilder()
.clusterName(clusterName)
.dataSource(context.getDataSourceMetadata())
.query(dispatchQueryRequest.getQuery())
.build()
.dataSource(
context.getDataSourceMetadata(),
dispatchQueryRequest,
context.getAsyncQueryRequestContext())
.acceptModifier(dispatchQueryRequest.getSparkSubmitParameterModifier())
.acceptComposers(dispatchQueryRequest, context.getAsyncQueryRequestContext())
.toString(),
tags,
false,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import org.json.JSONObject;
import org.opensearch.sql.datasource.model.DataSourceMetadata;
import org.opensearch.sql.spark.asyncquery.model.AsyncQueryJobMetadata;
import org.opensearch.sql.spark.asyncquery.model.SparkSubmitParameters;
import org.opensearch.sql.spark.dispatcher.model.DispatchQueryContext;
import org.opensearch.sql.spark.dispatcher.model.DispatchQueryRequest;
import org.opensearch.sql.spark.dispatcher.model.DispatchQueryResponse;
Expand All @@ -32,6 +31,7 @@
import org.opensearch.sql.spark.leasemanager.model.LeaseRequest;
import org.opensearch.sql.spark.metrics.EmrMetrics;
import org.opensearch.sql.spark.metrics.MetricsService;
import org.opensearch.sql.spark.parameter.SparkSubmitParametersBuilderProvider;
import org.opensearch.sql.spark.response.JobExecutionResponseReader;

/**
Expand All @@ -46,6 +46,7 @@ public class InteractiveQueryHandler extends AsyncQueryHandler {
private final JobExecutionResponseReader jobExecutionResponseReader;
private final LeaseManager leaseManager;
private final MetricsService metricsService;
protected final SparkSubmitParametersBuilderProvider sparkSubmitParametersBuilderProvider;

@Override
protected JSONObject getResponseFromResultIndex(AsyncQueryJobMetadata asyncQueryJobMetadata) {
Expand Down Expand Up @@ -112,12 +113,16 @@ public DispatchQueryResponse submit(
dispatchQueryRequest.getAccountId(),
dispatchQueryRequest.getApplicationId(),
dispatchQueryRequest.getExecutionRoleARN(),
SparkSubmitParameters.builder()
sparkSubmitParametersBuilderProvider
.getSparkSubmitParametersBuilder()
.className(FLINT_SESSION_CLASS_NAME)
.clusterName(clusterName)
.dataSource(dataSourceMetadata)
.build()
.acceptModifier(dispatchQueryRequest.getSparkSubmitParameterModifier()),
.dataSource(
dataSourceMetadata,
dispatchQueryRequest,
context.getAsyncQueryRequestContext())
.acceptModifier(dispatchQueryRequest.getSparkSubmitParameterModifier())
.acceptComposers(dispatchQueryRequest, context.getAsyncQueryRequestContext()),
tags,
dataSourceMetadata.getResultIndex(),
dataSourceMetadata.getName()),
Expand Down
Loading
Loading