From 8d5c964d2ea913c49e41692734c22f161f2214ec Mon Sep 17 00:00:00 2001 From: Chen Dai Date: Mon, 26 Jun 2023 15:36:49 -0700 Subject: [PATCH 1/4] Add empty impl and IT for desc skipping index Signed-off-by: Chen Dai --- .../src/main/antlr4/FlintSparkSqlExtensions.g4 | 7 ++++++- .../src/main/antlr4/SparkSqlBase.g4 | 2 ++ .../spark/sql/FlintSparkSqlAstBuilder.scala | 12 ++++++++---- .../flint/spark/FlintSparkSqlSuite.scala | 18 +++++++++++++++++- 4 files changed, 33 insertions(+), 6 deletions(-) diff --git a/flint/flint-spark-integration/src/main/antlr4/FlintSparkSqlExtensions.g4 b/flint/flint-spark-integration/src/main/antlr4/FlintSparkSqlExtensions.g4 index 383b9d63ba..75f71f3ead 100644 --- a/flint/flint-spark-integration/src/main/antlr4/FlintSparkSqlExtensions.g4 +++ b/flint/flint-spark-integration/src/main/antlr4/FlintSparkSqlExtensions.g4 @@ -19,7 +19,12 @@ statement ; skippingIndexStatement - : dropSkippingIndexStatement + : describeSkippingIndexStatement + | dropSkippingIndexStatement + ; + +describeSkippingIndexStatement + : (DESC | DESCRIBE) SKIPPING INDEX ON tableName=multipartIdentifier ; dropSkippingIndexStatement diff --git a/flint/flint-spark-integration/src/main/antlr4/SparkSqlBase.g4 b/flint/flint-spark-integration/src/main/antlr4/SparkSqlBase.g4 index e0c579e6f6..6836b7cacd 100644 --- a/flint/flint-spark-integration/src/main/antlr4/SparkSqlBase.g4 +++ b/flint/flint-spark-integration/src/main/antlr4/SparkSqlBase.g4 @@ -113,6 +113,8 @@ SKIPPING : 'SKIPPING'; SEMICOLON: ';'; +DESC: 'DESC'; +DESCRIBE: 'DESCRIBE'; DOT: '.'; DROP: 'DROP'; INDEX: 'INDEX'; diff --git a/flint/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/FlintSparkSqlAstBuilder.scala b/flint/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/FlintSparkSqlAstBuilder.scala index aef18051ba..4a4e1bb8ef 100644 --- a/flint/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/FlintSparkSqlAstBuilder.scala +++ b/flint/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/FlintSparkSqlAstBuilder.scala @@ -6,7 +6,7 @@ package org.opensearch.flint.spark.sql import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex.getSkippingIndexName -import org.opensearch.flint.spark.sql.FlintSparkSqlExtensionsParser.DropSkippingIndexStatementContext +import org.opensearch.flint.spark.sql.FlintSparkSqlExtensionsParser.{DescribeSkippingIndexStatementContext, DropSkippingIndexStatementContext} import org.apache.spark.sql.catalyst.plans.logical.Command @@ -15,15 +15,19 @@ import org.apache.spark.sql.catalyst.plans.logical.Command */ class FlintSparkSqlAstBuilder extends FlintSparkSqlExtensionsBaseVisitor[Command] { - override def visitDropSkippingIndexStatement( - ctx: DropSkippingIndexStatementContext): Command = { + override def visitDescribeSkippingIndexStatement( + ctx: DescribeSkippingIndexStatementContext): Command = + FlintSparkSqlCommand { flint => + Seq.empty + } + + override def visitDropSkippingIndexStatement(ctx: DropSkippingIndexStatementContext): Command = FlintSparkSqlCommand { flint => val tableName = ctx.tableName.getText // TODO: handle schema name val indexName = getSkippingIndexName(tableName) flint.deleteIndex(indexName) Seq.empty } - } override def aggregateResult(aggregate: Command, nextResult: Command): Command = if (nextResult != null) nextResult else aggregate; diff --git a/flint/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSqlSuite.scala b/flint/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSqlSuite.scala index fffe1fe295..2b2f000568 100644 --- a/flint/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSqlSuite.scala +++ b/flint/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSqlSuite.scala @@ -48,13 +48,29 @@ class FlintSparkSqlSuite extends QueryTest with FlintSuite with OpenSearchSuite |""".stripMargin) } - test("drop skipping index") { + protected override def beforeEach(): Unit = { + super.beforeEach() + flint .skippingIndex() .onTable(testTable) .addPartitions("year") .create() + } + + protected override def afterEach(): Unit = { + super.afterEach() + + flint.deleteIndex(testIndex) + } + + test("describe skipping index") { + val result = sql(s"DESC SKIPPING INDEX ON $testTable") + checkAnswer(result, Seq()) + } + + test("drop skipping index") { sql(s"DROP SKIPPING INDEX ON $testTable") flint.describeIndex(testIndex) shouldBe empty From 719a953b0abfad09bc493e7255d955f1e5125dc2 Mon Sep 17 00:00:00 2001 From: Chen Dai Date: Tue, 27 Jun 2023 14:28:41 -0700 Subject: [PATCH 2/4] Implement describe index and IT Signed-off-by: Chen Dai --- .../spark/sql/FlintSparkSqlAstBuilder.scala | 25 +++++++++++++++---- .../spark/sql/FlintSparkSqlCommand.scala | 8 +++++- .../flint/spark/FlintSparkSqlSuite.scala | 12 +++++++-- 3 files changed, 37 insertions(+), 8 deletions(-) diff --git a/flint/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/FlintSparkSqlAstBuilder.scala b/flint/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/FlintSparkSqlAstBuilder.scala index 4a4e1bb8ef..c8175857fa 100644 --- a/flint/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/FlintSparkSqlAstBuilder.scala +++ b/flint/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/FlintSparkSqlAstBuilder.scala @@ -5,10 +5,14 @@ package org.opensearch.flint.spark.sql +import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex.getSkippingIndexName import org.opensearch.flint.spark.sql.FlintSparkSqlExtensionsParser.{DescribeSkippingIndexStatementContext, DropSkippingIndexStatementContext} +import org.apache.spark.sql.Row +import org.apache.spark.sql.catalyst.expressions.AttributeReference import org.apache.spark.sql.catalyst.plans.logical.Command +import org.apache.spark.sql.types.StringType /** * Flint Spark AST builder that builds Spark command for Flint index statement. @@ -16,13 +20,24 @@ import org.apache.spark.sql.catalyst.plans.logical.Command class FlintSparkSqlAstBuilder extends FlintSparkSqlExtensionsBaseVisitor[Command] { override def visitDescribeSkippingIndexStatement( - ctx: DescribeSkippingIndexStatementContext): Command = - FlintSparkSqlCommand { flint => - Seq.empty + ctx: DescribeSkippingIndexStatementContext): Command = { + val outputSchema = Seq( + AttributeReference("indexed_col_name", StringType, nullable = false)(), + AttributeReference("data_type", StringType, nullable = false)()) + + FlintSparkSqlCommand(outputSchema) { flint => + val indexName = getSkippingIndexName(ctx.tableName.getText) + flint + .describeIndex(indexName) + .map { case index: FlintSparkSkippingIndex => + index.indexedColumns.map(strategy => Row(strategy.columnName, strategy.columnType)) + } + .getOrElse(Seq.empty) } + } override def visitDropSkippingIndexStatement(ctx: DropSkippingIndexStatementContext): Command = - FlintSparkSqlCommand { flint => + FlintSparkSqlCommand() { flint => val tableName = ctx.tableName.getText // TODO: handle schema name val indexName = getSkippingIndexName(tableName) flint.deleteIndex(indexName) @@ -30,5 +45,5 @@ class FlintSparkSqlAstBuilder extends FlintSparkSqlExtensionsBaseVisitor[Command } override def aggregateResult(aggregate: Command, nextResult: Command): Command = - if (nextResult != null) nextResult else aggregate; + if (nextResult != null) nextResult else aggregate } diff --git a/flint/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/FlintSparkSqlCommand.scala b/flint/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/FlintSparkSqlCommand.scala index ca39a293c0..19b3698681 100644 --- a/flint/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/FlintSparkSqlCommand.scala +++ b/flint/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/FlintSparkSqlCommand.scala @@ -8,6 +8,7 @@ package org.opensearch.flint.spark.sql import org.opensearch.flint.spark.FlintSpark import org.apache.spark.sql.{Row, SparkSession} +import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.execution.command.LeafRunnableCommand /** @@ -19,7 +20,12 @@ import org.apache.spark.sql.execution.command.LeafRunnableCommand * @param block * code block that triggers Flint core API */ -case class FlintSparkSqlCommand(block: FlintSpark => Seq[Row]) extends LeafRunnableCommand { +case class FlintSparkSqlCommand(override val output: Seq[Attribute] = Seq.empty)( + block: FlintSpark => Seq[Row]) + extends LeafRunnableCommand { override def run(sparkSession: SparkSession): Seq[Row] = block(new FlintSpark(sparkSession)) + + // Lazy arguments are required to specify here + override protected def otherCopyArgs: Seq[AnyRef] = block :: Nil } diff --git a/flint/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSqlSuite.scala b/flint/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSqlSuite.scala index 2b2f000568..32af577db7 100644 --- a/flint/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSqlSuite.scala +++ b/flint/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSqlSuite.scala @@ -12,7 +12,7 @@ import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex.getSkippingIn import org.scalatest.matchers.should.Matchers.convertToAnyShouldWrapper import org.apache.spark.FlintSuite -import org.apache.spark.sql.QueryTest +import org.apache.spark.sql.{QueryTest, Row} import org.apache.spark.sql.flint.config.FlintSparkConf.{HOST_ENDPOINT, HOST_PORT} class FlintSparkSqlSuite extends QueryTest with FlintSuite with OpenSearchSuite { @@ -55,6 +55,7 @@ class FlintSparkSqlSuite extends QueryTest with FlintSuite with OpenSearchSuite .skippingIndex() .onTable(testTable) .addPartitions("year") + .addValueSet("name") .create() } @@ -67,7 +68,14 @@ class FlintSparkSqlSuite extends QueryTest with FlintSuite with OpenSearchSuite test("describe skipping index") { val result = sql(s"DESC SKIPPING INDEX ON $testTable") - checkAnswer(result, Seq()) + checkAnswer(result, Seq(Row("year", "int"), Row("name", "string"))) + } + + test("should return empty if no skipping index to describe") { + flint.deleteIndex(testIndex) + + val result = sql(s"DESC SKIPPING INDEX ON $testTable") + checkAnswer(result, Seq.empty) } test("drop skipping index") { From becdf5fcc69603e1f79a3b1b7f283c85a6cbdbdb Mon Sep 17 00:00:00 2001 From: Chen Dai Date: Tue, 27 Jun 2023 15:04:29 -0700 Subject: [PATCH 3/4] Add skip type in desc output Signed-off-by: Chen Dai --- .../flint/spark/sql/FlintSparkSqlAstBuilder.scala | 6 ++++-- .../flint/spark/FlintSparkSqlSuite.scala | 14 +++++++++----- 2 files changed, 13 insertions(+), 7 deletions(-) diff --git a/flint/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/FlintSparkSqlAstBuilder.scala b/flint/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/FlintSparkSqlAstBuilder.scala index c8175857fa..4560b60b65 100644 --- a/flint/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/FlintSparkSqlAstBuilder.scala +++ b/flint/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/FlintSparkSqlAstBuilder.scala @@ -23,14 +23,16 @@ class FlintSparkSqlAstBuilder extends FlintSparkSqlExtensionsBaseVisitor[Command ctx: DescribeSkippingIndexStatementContext): Command = { val outputSchema = Seq( AttributeReference("indexed_col_name", StringType, nullable = false)(), - AttributeReference("data_type", StringType, nullable = false)()) + AttributeReference("data_type", StringType, nullable = false)(), + AttributeReference("skip_type", StringType, nullable = false)()) FlintSparkSqlCommand(outputSchema) { flint => val indexName = getSkippingIndexName(ctx.tableName.getText) flint .describeIndex(indexName) .map { case index: FlintSparkSkippingIndex => - index.indexedColumns.map(strategy => Row(strategy.columnName, strategy.columnType)) + index.indexedColumns.map(strategy => + Row(strategy.columnName, strategy.columnType, strategy.kind.toString)) } .getOrElse(Seq.empty) } diff --git a/flint/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSqlSuite.scala b/flint/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSqlSuite.scala index 32af577db7..8f9721888f 100644 --- a/flint/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSqlSuite.scala +++ b/flint/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSqlSuite.scala @@ -30,11 +30,11 @@ class FlintSparkSqlSuite extends QueryTest with FlintSuite with OpenSearchSuite override def beforeAll(): Unit = { super.beforeAll() - sql(s""" | CREATE TABLE $testTable | ( - | name STRING + | name STRING, + | age INT | ) | USING CSV | OPTIONS ( @@ -50,25 +50,29 @@ class FlintSparkSqlSuite extends QueryTest with FlintSuite with OpenSearchSuite protected override def beforeEach(): Unit = { super.beforeEach() - flint .skippingIndex() .onTable(testTable) .addPartitions("year") .addValueSet("name") + .addMinMax("age") .create() } protected override def afterEach(): Unit = { super.afterEach() - flint.deleteIndex(testIndex) } test("describe skipping index") { val result = sql(s"DESC SKIPPING INDEX ON $testTable") - checkAnswer(result, Seq(Row("year", "int"), Row("name", "string"))) + checkAnswer( + result, + Seq( + Row("year", "int", "Partition"), + Row("name", "string", "ValuesSet"), + Row("age", "int", "MinMax"))) } test("should return empty if no skipping index to describe") { From 807a1be4ed2210d373dd299a4deb18d2e9368b07 Mon Sep 17 00:00:00 2001 From: Chen Dai Date: Fri, 30 Jun 2023 14:30:41 -0700 Subject: [PATCH 4/4] Rename test table to avoid conflict Signed-off-by: Chen Dai --- .../scala/org/opensearch/flint/spark/FlintSparkSqlSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flint/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSqlSuite.scala b/flint/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSqlSuite.scala index 8f9721888f..f45be95325 100644 --- a/flint/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSqlSuite.scala +++ b/flint/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSqlSuite.scala @@ -25,7 +25,7 @@ class FlintSparkSqlSuite extends QueryTest with FlintSuite with OpenSearchSuite } /** Test table and index name */ - private val testTable = "test" + private val testTable = "flint_sql_test" private val testIndex = getSkippingIndexName(testTable) override def beforeAll(): Unit = {