-
Notifications
You must be signed in to change notification settings - Fork 139
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
* Initial spark application draft Signed-off-by: Rupal Mahajan <maharup@amazon.com> * Remove temp table Signed-off-by: Rupal Mahajan <maharup@amazon.com> * Add license header Signed-off-by: Rupal Mahajan <maharup@amazon.com> * Add scalastyle-config and update readme Signed-off-by: Rupal Mahajan <maharup@amazon.com> * Fix datatype for result and schema Signed-off-by: Rupal Mahajan <maharup@amazon.com> * Add test Signed-off-by: Rupal Mahajan <maharup@amazon.com> * Simplify code using toJSON.collect.toList Signed-off-by: Rupal Mahajan <maharup@amazon.com> * Add example in readme Signed-off-by: Rupal Mahajan <maharup@amazon.com> * Fix triple quotes issue Signed-off-by: Rupal Mahajan <maharup@amazon.com> * Update method name and description Signed-off-by: Rupal Mahajan <maharup@amazon.com> --------- Signed-off-by: Rupal Mahajan <maharup@amazon.com>
- Loading branch information
Showing
8 changed files
with
414 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,14 @@ | ||
# Compiled output | ||
target/ | ||
project/target/ | ||
|
||
# sbt-specific files | ||
.sbtserver | ||
.sbt/ | ||
.bsp/ | ||
|
||
# Miscellaneous | ||
.DS_Store | ||
*.class | ||
*.log | ||
*.zip |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,107 @@ | ||
# Spark SQL Application | ||
|
||
This application execute sql query and store the result in OpenSearch index in following format | ||
``` | ||
"stepId":"<emr-step-id>", | ||
"schema": "json blob", | ||
"result": "json blob" | ||
``` | ||
|
||
## Prerequisites | ||
|
||
+ Spark 3.3.1 | ||
+ Scala 2.12.15 | ||
+ flint-spark-integration | ||
|
||
## Usage | ||
|
||
To use this application, you can run Spark with Flint extension: | ||
|
||
``` | ||
./bin/spark-submit \ | ||
--class org.opensearch.sql.SQLJob \ | ||
--jars <flint-spark-integration-jar> \ | ||
sql-job.jar \ | ||
<spark-sql-query> \ | ||
<opensearch-index> \ | ||
<opensearch-host> \ | ||
<opensearch-port> \ | ||
<opensearch-scheme> \ | ||
<opensearch-auth> \ | ||
<opensearch-region> \ | ||
``` | ||
|
||
## Result Specifications | ||
|
||
Following example shows how the result is written to OpenSearch index after query execution. | ||
|
||
Let's assume sql query result is | ||
``` | ||
+------+------+ | ||
|Letter|Number| | ||
+------+------+ | ||
|A |1 | | ||
|B |2 | | ||
|C |3 | | ||
+------+------+ | ||
``` | ||
OpenSearch index document will look like | ||
```json | ||
{ | ||
"_index" : ".query_execution_result", | ||
"_id" : "A2WOsYgBMUoqCqlDJHrn", | ||
"_score" : 1.0, | ||
"_source" : { | ||
"result" : [ | ||
"{'Letter':'A','Number':1}", | ||
"{'Letter':'B','Number':2}", | ||
"{'Letter':'C','Number':3}" | ||
], | ||
"schema" : [ | ||
"{'column_name':'Letter','data_type':'string'}", | ||
"{'column_name':'Number','data_type':'integer'}" | ||
], | ||
"stepId" : "s-JZSB1139WIVU" | ||
} | ||
} | ||
``` | ||
|
||
## Build | ||
|
||
To build and run this application with Spark, you can run: | ||
|
||
``` | ||
sbt clean publishLocal | ||
``` | ||
|
||
## Test | ||
|
||
To run tests, you can use: | ||
|
||
``` | ||
sbt test | ||
``` | ||
|
||
## Scalastyle | ||
|
||
To check code with scalastyle, you can run: | ||
|
||
``` | ||
sbt scalastyle | ||
``` | ||
|
||
## Code of Conduct | ||
|
||
This project has adopted an [Open Source Code of Conduct](../CODE_OF_CONDUCT.md). | ||
|
||
## Security | ||
|
||
If you discover a potential security issue in this project we ask that you notify AWS/Amazon Security via our [vulnerability reporting page](http://aws.amazon.com/security/vulnerability-reporting/). Please do **not** create a public GitHub issue. | ||
|
||
## License | ||
|
||
See the [LICENSE](../LICENSE.txt) file for our project's licensing. We will ask you to confirm the licensing of your contribution. | ||
|
||
## Copyright | ||
|
||
Copyright OpenSearch Contributors. See [NOTICE](../NOTICE) for details. |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,28 @@ | ||
/* | ||
* Copyright OpenSearch Contributors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
|
||
name := "sql-job" | ||
|
||
version := "1.0" | ||
|
||
scalaVersion := "2.12.15" | ||
|
||
val sparkVersion = "3.3.2" | ||
|
||
mainClass := Some("org.opensearch.sql.SQLJob") | ||
|
||
artifactName := { (sv: ScalaVersion, module: ModuleID, artifact: Artifact) => | ||
"sql-job.jar" | ||
} | ||
|
||
resolvers ++= Seq( | ||
("apache-snapshots" at "http://repository.apache.org/snapshots/").withAllowInsecureProtocol(true) | ||
) | ||
|
||
libraryDependencies ++= Seq( | ||
"org.apache.spark" %% "spark-core" % sparkVersion % "provided", | ||
"org.apache.spark" %% "spark-sql" % sparkVersion % "provided", | ||
"org.scalatest" %% "scalatest" % "3.2.15" % Test | ||
) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
sbt.version=1.8.2 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,6 @@ | ||
/* | ||
* Copyright OpenSearch Contributors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
|
||
addSbtPlugin("org.scalastyle" %% "scalastyle-sbt-plugin" % "1.0.0") |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,106 @@ | ||
<scalastyle> | ||
<name>Scalastyle standard configuration</name> | ||
<check level="warning" class="org.scalastyle.file.FileTabChecker" enabled="true"></check> | ||
<check level="warning" class="org.scalastyle.file.FileLengthChecker" enabled="true"> | ||
<parameters> | ||
<parameter name="maxFileLength"><![CDATA[800]]></parameter> | ||
</parameters> | ||
</check> | ||
<check level="warning" class="org.scalastyle.file.HeaderMatchesChecker" enabled="true"> | ||
<parameters> | ||
<parameter name="header"><![CDATA[/* | ||
* Copyright OpenSearch Contributors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/]]></parameter> | ||
</parameters> | ||
</check> | ||
<check level="warning" class="org.scalastyle.scalariform.SpacesAfterPlusChecker" enabled="true"></check> | ||
<check level="warning" class="org.scalastyle.file.WhitespaceEndOfLineChecker" enabled="true"></check> | ||
<check level="warning" class="org.scalastyle.scalariform.SpacesBeforePlusChecker" enabled="true"></check> | ||
<check level="warning" class="org.scalastyle.file.FileLineLengthChecker" enabled="true"> | ||
<parameters> | ||
<parameter name="maxLineLength"><![CDATA[160]]></parameter> | ||
<parameter name="tabSize"><![CDATA[4]]></parameter> | ||
</parameters> | ||
</check> | ||
<check level="warning" class="org.scalastyle.scalariform.ClassNamesChecker" enabled="true"> | ||
<parameters> | ||
<parameter name="regex"><![CDATA[[A-Z][A-Za-z]*]]></parameter> | ||
</parameters> | ||
</check> | ||
<check level="warning" class="org.scalastyle.scalariform.ObjectNamesChecker" enabled="true"> | ||
<parameters> | ||
<parameter name="regex"><![CDATA[[A-Z][A-Za-z]*]]></parameter> | ||
</parameters> | ||
</check> | ||
<check level="warning" class="org.scalastyle.scalariform.PackageObjectNamesChecker" enabled="true"> | ||
<parameters> | ||
<parameter name="regex"><![CDATA[^[a-z][A-Za-z]*$]]></parameter> | ||
</parameters> | ||
</check> | ||
<check level="warning" class="org.scalastyle.scalariform.EqualsHashCodeChecker" enabled="true"></check> | ||
<check level="warning" class="org.scalastyle.scalariform.IllegalImportsChecker" enabled="true"> | ||
<parameters> | ||
<parameter name="illegalImports"><![CDATA[sun._,java.awt._]]></parameter> | ||
</parameters> | ||
</check> | ||
<check level="warning" class="org.scalastyle.scalariform.ParameterNumberChecker" enabled="true"> | ||
<parameters> | ||
<parameter name="maxParameters"><![CDATA[8]]></parameter> | ||
</parameters> | ||
</check> | ||
<check level="warning" class="org.scalastyle.scalariform.MagicNumberChecker" enabled="true"> | ||
<parameters> | ||
<parameter name="ignore"><![CDATA[-1,0,1,2,3]]></parameter> | ||
</parameters> | ||
</check> | ||
<check level="warning" class="org.scalastyle.scalariform.NoWhitespaceBeforeLeftBracketChecker" enabled="true"></check> | ||
<check level="warning" class="org.scalastyle.scalariform.NoWhitespaceAfterLeftBracketChecker" enabled="true"></check> | ||
<check level="warning" class="org.scalastyle.scalariform.ReturnChecker" enabled="true"></check> | ||
<check level="warning" class="org.scalastyle.scalariform.NullChecker" enabled="true"></check> | ||
<check level="warning" class="org.scalastyle.scalariform.NoCloneChecker" enabled="true"></check> | ||
<check level="warning" class="org.scalastyle.scalariform.NoFinalizeChecker" enabled="true"></check> | ||
<check level="warning" class="org.scalastyle.scalariform.CovariantEqualsChecker" enabled="true"></check> | ||
<check level="warning" class="org.scalastyle.scalariform.StructuralTypeChecker" enabled="true"></check> | ||
<check level="warning" class="org.scalastyle.file.RegexChecker" enabled="true"> | ||
<parameters> | ||
<parameter name="regex"><![CDATA[println]]></parameter> | ||
</parameters> | ||
</check> | ||
<check level="warning" class="org.scalastyle.scalariform.NumberOfTypesChecker" enabled="true"> | ||
<parameters> | ||
<parameter name="maxTypes"><![CDATA[30]]></parameter> | ||
</parameters> | ||
</check> | ||
<check level="warning" class="org.scalastyle.scalariform.CyclomaticComplexityChecker" enabled="true"> | ||
<parameters> | ||
<parameter name="maximum"><![CDATA[10]]></parameter> | ||
</parameters> | ||
</check> | ||
<check level="warning" class="org.scalastyle.scalariform.UppercaseLChecker" enabled="true"></check> | ||
<check level="warning" class="org.scalastyle.scalariform.SimplifyBooleanExpressionChecker" enabled="true"></check> | ||
<check level="warning" class="org.scalastyle.scalariform.IfBraceChecker" enabled="true"> | ||
<parameters> | ||
<parameter name="singleLineAllowed"><![CDATA[true]]></parameter> | ||
<parameter name="doubleLineAllowed"><![CDATA[false]]></parameter> | ||
</parameters> | ||
</check> | ||
<check level="warning" class="org.scalastyle.scalariform.MethodLengthChecker" enabled="true"> | ||
<parameters> | ||
<parameter name="maxLength"><![CDATA[50]]></parameter> | ||
</parameters> | ||
</check> | ||
<check level="warning" class="org.scalastyle.scalariform.MethodNamesChecker" enabled="true"> | ||
<parameters> | ||
<parameter name="regex"><![CDATA[^[a-z][A-Za-z0-9]*$]]></parameter> | ||
</parameters> | ||
</check> | ||
<check level="warning" class="org.scalastyle.scalariform.NumberOfMethodsInTypeChecker" enabled="true"> | ||
<parameters> | ||
<parameter name="maxMethods"><![CDATA[30]]></parameter> | ||
</parameters> | ||
</check> | ||
<check level="warning" class="org.scalastyle.scalariform.PublicMethodsHaveTypeChecker" enabled="true"></check> | ||
<check level="warning" class="org.scalastyle.file.NewLineAtEofChecker" enabled="true"></check> | ||
<check level="warning" class="org.scalastyle.file.NoNewLineAtEofChecker" enabled="false"></check> | ||
</scalastyle> |
98 changes: 98 additions & 0 deletions
98
spark-sql-application/src/main/scala/org/opensearch/sql/SQLJob.scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,98 @@ | ||
/* | ||
* Copyright OpenSearch Contributors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
|
||
package org.opensearch.sql | ||
|
||
import org.apache.spark.sql.{DataFrame, SparkSession, Row} | ||
import org.apache.spark.sql.types._ | ||
|
||
/** | ||
* Spark SQL Application entrypoint | ||
* | ||
* @param args(0) | ||
* sql query | ||
* @param args(1) | ||
* opensearch index name | ||
* @param args(2-6) | ||
* opensearch connection values required for flint-integration jar. host, port, scheme, auth, region respectively. | ||
* @return | ||
* write sql query result to given opensearch index | ||
*/ | ||
object SQLJob { | ||
def main(args: Array[String]) { | ||
// Get the SQL query and Opensearch Config from the command line arguments | ||
val query = args(0) | ||
val index = args(1) | ||
val host = args(2) | ||
val port = args(3) | ||
val scheme = args(4) | ||
val auth = args(5) | ||
val region = args(6) | ||
|
||
// Create a SparkSession | ||
val spark = SparkSession.builder().appName("SQLJob").getOrCreate() | ||
|
||
try { | ||
// Execute SQL query | ||
val result: DataFrame = spark.sql(query) | ||
|
||
// Get Data | ||
val data = getFormattedData(result, spark) | ||
|
||
// Write data to OpenSearch index | ||
val aos = Map( | ||
"host" -> host, | ||
"port" -> port, | ||
"scheme" -> scheme, | ||
"auth" -> auth, | ||
"region" -> region) | ||
|
||
data.write | ||
.format("flint") | ||
.options(aos) | ||
.mode("append") | ||
.save(index) | ||
|
||
} finally { | ||
// Stop SparkSession | ||
spark.stop() | ||
} | ||
} | ||
|
||
/** | ||
* Create a new formatted dataframe with json result, json schema and EMR_STEP_ID. | ||
* | ||
* @param result | ||
* sql query result dataframe | ||
* @param spark | ||
* spark session | ||
* @return | ||
* dataframe with result, schema and emr step id | ||
*/ | ||
def getFormattedData(result: DataFrame, spark: SparkSession): DataFrame = { | ||
// Create the schema dataframe | ||
val schemaRows = result.schema.fields.map { field => | ||
Row(field.name, field.dataType.typeName) | ||
} | ||
val resultSchema = spark.createDataFrame(spark.sparkContext.parallelize(schemaRows), StructType(Seq( | ||
StructField("column_name", StringType, nullable = false), | ||
StructField("data_type", StringType, nullable = false)))) | ||
|
||
// Define the data schema | ||
val schema = StructType(Seq( | ||
StructField("result", ArrayType(StringType, containsNull = true), nullable = true), | ||
StructField("schema", ArrayType(StringType, containsNull = true), nullable = true), | ||
StructField("stepId", StringType, nullable = true))) | ||
|
||
// Create the data rows | ||
val rows = Seq(( | ||
result.toJSON.collect.toList.map(_.replaceAll("\"", "'")), | ||
resultSchema.toJSON.collect.toList.map(_.replaceAll("\"", "'")), | ||
sys.env.getOrElse("EMR_STEP_ID", ""))) | ||
|
||
// Create the DataFrame for data | ||
spark.createDataFrame(rows).toDF(schema.fields.map(_.name): _*) | ||
} | ||
} |
Oops, something went wrong.