forked from opensearch-project/sql
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add EMR client to spark connector (opensearch-project#1790)
* Create Spark Connector Signed-off-by: Vamsi Manohar <reddyvam@amazon.com> * Add spark client and engine Signed-off-by: Rupal Mahajan <maharup@amazon.com> * Remove vars Signed-off-by: Rupal Mahajan <maharup@amazon.com> * Spark connector draft Signed-off-by: Rupal Mahajan <maharup@amazon.com> * nit Signed-off-by: Rupal Mahajan <maharup@amazon.com> * Fix checkstyle errors Signed-off-by: Rupal Mahajan <maharup@amazon.com> * nit Signed-off-by: Rupal Mahajan <maharup@amazon.com> * Fix license header Signed-off-by: Rupal Mahajan <maharup@amazon.com> * Add spark storage test Signed-off-by: Rupal Mahajan <maharup@amazon.com> * Update comments Signed-off-by: Rupal Mahajan <maharup@amazon.com> * Fix checkstyle in comments Signed-off-by: Rupal Mahajan <maharup@amazon.com> * Update tests Signed-off-by: Rupal Mahajan <maharup@amazon.com> * Add emr client Signed-off-by: Rupal Mahajan <maharup@amazon.com> * Set default values for flint args Signed-off-by: Rupal Mahajan <maharup@amazon.com> * Validate emr auth type Signed-off-by: Rupal Mahajan <maharup@amazon.com> * Add default constants for flint Signed-off-by: Rupal Mahajan <maharup@amazon.com> * Update unit tests Signed-off-by: Rupal Mahajan <maharup@amazon.com> * Address PR comments Signed-off-by: Rupal Mahajan <maharup@amazon.com> * tests draft Signed-off-by: Rupal Mahajan <maharup@amazon.com> * Refactor class name Signed-off-by: Rupal Mahajan <maharup@amazon.com> * Rename classes and update tests Signed-off-by: Rupal Mahajan <maharup@amazon.com> * Update scan operator test Signed-off-by: Rupal Mahajan <maharup@amazon.com> * Address PR comment Signed-off-by: Rupal Mahajan <maharup@amazon.com> * Fix Connection pool shut down issue Signed-off-by: Rupal Mahajan <maharup@amazon.com> * Update emr unit tests Signed-off-by: Rupal Mahajan <maharup@amazon.com> * Update doc and tests Signed-off-by: Rupal Mahajan <maharup@amazon.com> * nit Signed-off-by: Rupal Mahajan <maharup@amazon.com> * Update EMR clinet impl tests Signed-off-by: Rupal Mahajan <maharup@amazon.com> * Address PR comments Signed-off-by: Rupal Mahajan <maharup@amazon.com> * Make spark & flint jars configurable Signed-off-by: Rupal Mahajan <maharup@amazon.com> * Address comments Signed-off-by: Rupal Mahajan <maharup@amazon.com> * Add spark application id in logs Signed-off-by: Rupal Mahajan <maharup@amazon.com> * nit Signed-off-by: Rupal Mahajan <maharup@amazon.com> * Delete result when not required Signed-off-by: Rupal Mahajan <maharup@amazon.com> --------- Signed-off-by: Vamsi Manohar <reddyvam@amazon.com> Signed-off-by: Rupal Mahajan <maharup@amazon.com> Co-authored-by: Vamsi Manohar <reddyvam@amazon.com> Signed-off-by: Mitchell Gale <Mitchell.Gale@improving.com>
- Loading branch information
1 parent
ad35c2f
commit 576e7ea
Showing
30 changed files
with
1,559 additions
and
44 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,92 @@ | ||
.. highlight:: sh | ||
|
||
==================== | ||
Spark Connector | ||
==================== | ||
|
||
.. rubric:: Table of contents | ||
|
||
.. contents:: | ||
:local: | ||
:depth: 1 | ||
|
||
|
||
Introduction | ||
============ | ||
|
||
This page covers spark connector properties for dataSource configuration | ||
and the nuances associated with spark connector. | ||
|
||
|
||
Spark Connector Properties in DataSource Configuration | ||
======================================================== | ||
Spark Connector Properties. | ||
|
||
* ``spark.connector`` [Required]. | ||
* This parameters provides the spark client information for connection. | ||
* ``spark.sql.application`` [Optional]. | ||
* This parameters provides the spark sql application jar. Default value is ``s3://spark-datasource/sql-job.jar``. | ||
* ``emr.cluster`` [Required]. | ||
* This parameters provides the emr cluster id information. | ||
* ``emr.auth.type`` [Required] | ||
* This parameters provides the authentication type information. | ||
* Spark emr connector currently supports ``awssigv4`` authentication mechanism and following parameters are required. | ||
* ``emr.auth.region``, ``emr.auth.access_key`` and ``emr.auth.secret_key`` | ||
* ``spark.datasource.flint.*`` [Optional] | ||
* This parameters provides the Opensearch domain host information for flint integration. | ||
* ``spark.datasource.flint.integration`` [Optional] | ||
* Default value for integration jar is ``s3://spark-datasource/flint-spark-integration-assembly-0.1.0-SNAPSHOT.jar``. | ||
* ``spark.datasource.flint.host`` [Optional] | ||
* Default value for host is ``localhost``. | ||
* ``spark.datasource.flint.port`` [Optional] | ||
* Default value for port is ``9200``. | ||
* ``spark.datasource.flint.scheme`` [Optional] | ||
* Default value for scheme is ``http``. | ||
* ``spark.datasource.flint.auth`` [Optional] | ||
* Default value for auth is ``false``. | ||
* ``spark.datasource.flint.region`` [Optional] | ||
* Default value for auth is ``us-west-2``. | ||
|
||
Example spark dataSource configuration | ||
======================================== | ||
|
||
AWSSigV4 Auth:: | ||
|
||
[{ | ||
"name" : "my_spark", | ||
"connector": "spark", | ||
"properties" : { | ||
"spark.connector": "emr", | ||
"emr.cluster" : "{{clusterId}}", | ||
"emr.auth.type" : "awssigv4", | ||
"emr.auth.region" : "us-east-1", | ||
"emr.auth.access_key" : "{{accessKey}}" | ||
"emr.auth.secret_key" : "{{secretKey}}" | ||
"spark.datasource.flint.host" : "{{opensearchHost}}", | ||
"spark.datasource.flint.port" : "{{opensearchPort}}", | ||
"spark.datasource.flint.scheme" : "{{opensearchScheme}}", | ||
"spark.datasource.flint.auth" : "{{opensearchAuth}}", | ||
"spark.datasource.flint.region" : "{{opensearchRegion}}", | ||
} | ||
}] | ||
|
||
|
||
Spark SQL Support | ||
================== | ||
|
||
`sql` Function | ||
---------------------------- | ||
Spark connector offers `sql` function. This function can be used to run spark sql query. | ||
The function takes spark sql query as input. Argument should be either passed by name or positionArguments should be either passed by name or position. | ||
`source=my_spark.sql('select 1')` | ||
or | ||
`source=my_spark.sql(query='select 1')` | ||
Example:: | ||
|
||
> source=my_spark.sql('select 1') | ||
+---+ | ||
| 1 | | ||
|---+ | ||
| 1 | | ||
+---+ | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
121 changes: 121 additions & 0 deletions
121
spark/src/main/java/org/opensearch/sql/spark/client/EmrClientImpl.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,121 @@ | ||
/* | ||
* Copyright OpenSearch Contributors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
|
||
package org.opensearch.sql.spark.client; | ||
|
||
import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_INDEX_NAME; | ||
import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_SQL_APPLICATION_JAR; | ||
|
||
import com.amazonaws.services.elasticmapreduce.AmazonElasticMapReduce; | ||
import com.amazonaws.services.elasticmapreduce.model.ActionOnFailure; | ||
import com.amazonaws.services.elasticmapreduce.model.AddJobFlowStepsRequest; | ||
import com.amazonaws.services.elasticmapreduce.model.AddJobFlowStepsResult; | ||
import com.amazonaws.services.elasticmapreduce.model.DescribeStepRequest; | ||
import com.amazonaws.services.elasticmapreduce.model.HadoopJarStepConfig; | ||
import com.amazonaws.services.elasticmapreduce.model.StepConfig; | ||
import com.amazonaws.services.elasticmapreduce.model.StepStatus; | ||
import com.google.common.annotations.VisibleForTesting; | ||
import java.io.IOException; | ||
import lombok.SneakyThrows; | ||
import org.apache.logging.log4j.LogManager; | ||
import org.apache.logging.log4j.Logger; | ||
import org.json.JSONObject; | ||
import org.opensearch.sql.spark.helper.FlintHelper; | ||
import org.opensearch.sql.spark.response.SparkResponse; | ||
|
||
public class EmrClientImpl implements SparkClient { | ||
private final AmazonElasticMapReduce emr; | ||
private final String emrCluster; | ||
private final FlintHelper flint; | ||
private final String sparkApplicationJar; | ||
private static final Logger logger = LogManager.getLogger(EmrClientImpl.class); | ||
private SparkResponse sparkResponse; | ||
|
||
/** | ||
* Constructor for EMR Client Implementation. | ||
* | ||
* @param emr EMR helper | ||
* @param flint Opensearch args for flint integration jar | ||
* @param sparkResponse Response object to help with retrieving results from Opensearch index | ||
*/ | ||
public EmrClientImpl(AmazonElasticMapReduce emr, String emrCluster, FlintHelper flint, | ||
SparkResponse sparkResponse, String sparkApplicationJar) { | ||
this.emr = emr; | ||
this.emrCluster = emrCluster; | ||
this.flint = flint; | ||
this.sparkResponse = sparkResponse; | ||
this.sparkApplicationJar = | ||
sparkApplicationJar == null ? SPARK_SQL_APPLICATION_JAR : sparkApplicationJar; | ||
} | ||
|
||
@Override | ||
public JSONObject sql(String query) throws IOException { | ||
runEmrApplication(query); | ||
return sparkResponse.getResultFromOpensearchIndex(); | ||
} | ||
|
||
@VisibleForTesting | ||
void runEmrApplication(String query) { | ||
|
||
HadoopJarStepConfig stepConfig = new HadoopJarStepConfig() | ||
.withJar("command-runner.jar") | ||
.withArgs("spark-submit", | ||
"--class","org.opensearch.sql.SQLJob", | ||
"--jars", | ||
flint.getFlintIntegrationJar(), | ||
sparkApplicationJar, | ||
query, | ||
SPARK_INDEX_NAME, | ||
flint.getFlintHost(), | ||
flint.getFlintPort(), | ||
flint.getFlintScheme(), | ||
flint.getFlintAuth(), | ||
flint.getFlintRegion() | ||
); | ||
|
||
StepConfig emrstep = new StepConfig() | ||
.withName("Spark Application") | ||
.withActionOnFailure(ActionOnFailure.CONTINUE) | ||
.withHadoopJarStep(stepConfig); | ||
|
||
AddJobFlowStepsRequest request = new AddJobFlowStepsRequest() | ||
.withJobFlowId(emrCluster) | ||
.withSteps(emrstep); | ||
|
||
AddJobFlowStepsResult result = emr.addJobFlowSteps(request); | ||
logger.info("EMR step ID: " + result.getStepIds()); | ||
|
||
String stepId = result.getStepIds().get(0); | ||
DescribeStepRequest stepRequest = new DescribeStepRequest() | ||
.withClusterId(emrCluster) | ||
.withStepId(stepId); | ||
|
||
waitForStepExecution(stepRequest); | ||
sparkResponse.setValue(stepId); | ||
} | ||
|
||
@SneakyThrows | ||
private void waitForStepExecution(DescribeStepRequest stepRequest) { | ||
// Wait for the step to complete | ||
boolean completed = false; | ||
while (!completed) { | ||
// Get the step status | ||
StepStatus statusDetail = emr.describeStep(stepRequest).getStep().getStatus(); | ||
// Check if the step has completed | ||
if (statusDetail.getState().equals("COMPLETED")) { | ||
completed = true; | ||
logger.info("EMR step completed successfully."); | ||
} else if (statusDetail.getState().equals("FAILED") | ||
|| statusDetail.getState().equals("CANCELLED")) { | ||
logger.error("EMR step failed or cancelled."); | ||
throw new RuntimeException("Spark SQL application failed."); | ||
} else { | ||
// Sleep for some time before checking the status again | ||
Thread.sleep(2500); | ||
} | ||
} | ||
} | ||
|
||
} |
20 changes: 20 additions & 0 deletions
20
spark/src/main/java/org/opensearch/sql/spark/data/constants/SparkConstants.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,20 @@ | ||
/* | ||
* Copyright OpenSearch Contributors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
|
||
package org.opensearch.sql.spark.data.constants; | ||
|
||
public class SparkConstants { | ||
public static final String EMR = "emr"; | ||
public static final String STEP_ID_FIELD = "stepId.keyword"; | ||
public static final String SPARK_SQL_APPLICATION_JAR = "s3://spark-datasource/sql-job.jar"; | ||
public static final String SPARK_INDEX_NAME = ".query_execution_result"; | ||
public static final String FLINT_INTEGRATION_JAR = | ||
"s3://spark-datasource/flint-spark-integration-assembly-0.1.0-SNAPSHOT.jar"; | ||
public static final String FLINT_DEFAULT_HOST = "localhost"; | ||
public static final String FLINT_DEFAULT_PORT = "9200"; | ||
public static final String FLINT_DEFAULT_SCHEME = "http"; | ||
public static final String FLINT_DEFAULT_AUTH = "-1"; | ||
public static final String FLINT_DEFAULT_REGION = "us-west-2"; | ||
} |
Oops, something went wrong.