diff --git a/flint/flint-spark-integration/src/main/antlr4/FlintSparkSqlExtensions.g4 b/flint/flint-spark-integration/src/main/antlr4/FlintSparkSqlExtensions.g4 index 383b9d63ba..75f71f3ead 100644 --- a/flint/flint-spark-integration/src/main/antlr4/FlintSparkSqlExtensions.g4 +++ b/flint/flint-spark-integration/src/main/antlr4/FlintSparkSqlExtensions.g4 @@ -19,7 +19,12 @@ statement ; skippingIndexStatement - : dropSkippingIndexStatement + : describeSkippingIndexStatement + | dropSkippingIndexStatement + ; + +describeSkippingIndexStatement + : (DESC | DESCRIBE) SKIPPING INDEX ON tableName=multipartIdentifier ; dropSkippingIndexStatement diff --git a/flint/flint-spark-integration/src/main/antlr4/SparkSqlBase.g4 b/flint/flint-spark-integration/src/main/antlr4/SparkSqlBase.g4 index e0c579e6f6..6836b7cacd 100644 --- a/flint/flint-spark-integration/src/main/antlr4/SparkSqlBase.g4 +++ b/flint/flint-spark-integration/src/main/antlr4/SparkSqlBase.g4 @@ -113,6 +113,8 @@ SKIPPING : 'SKIPPING'; SEMICOLON: ';'; +DESC: 'DESC'; +DESCRIBE: 'DESCRIBE'; DOT: '.'; DROP: 'DROP'; INDEX: 'INDEX'; diff --git a/flint/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/FlintSparkSqlAstBuilder.scala b/flint/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/FlintSparkSqlAstBuilder.scala index aef18051ba..4560b60b65 100644 --- a/flint/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/FlintSparkSqlAstBuilder.scala +++ b/flint/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/FlintSparkSqlAstBuilder.scala @@ -5,26 +5,47 @@ package org.opensearch.flint.spark.sql +import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex.getSkippingIndexName -import org.opensearch.flint.spark.sql.FlintSparkSqlExtensionsParser.DropSkippingIndexStatementContext +import org.opensearch.flint.spark.sql.FlintSparkSqlExtensionsParser.{DescribeSkippingIndexStatementContext, DropSkippingIndexStatementContext} +import org.apache.spark.sql.Row +import org.apache.spark.sql.catalyst.expressions.AttributeReference import org.apache.spark.sql.catalyst.plans.logical.Command +import org.apache.spark.sql.types.StringType /** * Flint Spark AST builder that builds Spark command for Flint index statement. */ class FlintSparkSqlAstBuilder extends FlintSparkSqlExtensionsBaseVisitor[Command] { - override def visitDropSkippingIndexStatement( - ctx: DropSkippingIndexStatementContext): Command = { - FlintSparkSqlCommand { flint => + override def visitDescribeSkippingIndexStatement( + ctx: DescribeSkippingIndexStatementContext): Command = { + val outputSchema = Seq( + AttributeReference("indexed_col_name", StringType, nullable = false)(), + AttributeReference("data_type", StringType, nullable = false)(), + AttributeReference("skip_type", StringType, nullable = false)()) + + FlintSparkSqlCommand(outputSchema) { flint => + val indexName = getSkippingIndexName(ctx.tableName.getText) + flint + .describeIndex(indexName) + .map { case index: FlintSparkSkippingIndex => + index.indexedColumns.map(strategy => + Row(strategy.columnName, strategy.columnType, strategy.kind.toString)) + } + .getOrElse(Seq.empty) + } + } + + override def visitDropSkippingIndexStatement(ctx: DropSkippingIndexStatementContext): Command = + FlintSparkSqlCommand() { flint => val tableName = ctx.tableName.getText // TODO: handle schema name val indexName = getSkippingIndexName(tableName) flint.deleteIndex(indexName) Seq.empty } - } override def aggregateResult(aggregate: Command, nextResult: Command): Command = - if (nextResult != null) nextResult else aggregate; + if (nextResult != null) nextResult else aggregate } diff --git a/flint/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/FlintSparkSqlCommand.scala b/flint/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/FlintSparkSqlCommand.scala index ca39a293c0..19b3698681 100644 --- a/flint/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/FlintSparkSqlCommand.scala +++ b/flint/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/FlintSparkSqlCommand.scala @@ -8,6 +8,7 @@ package org.opensearch.flint.spark.sql import org.opensearch.flint.spark.FlintSpark import org.apache.spark.sql.{Row, SparkSession} +import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.execution.command.LeafRunnableCommand /** @@ -19,7 +20,12 @@ import org.apache.spark.sql.execution.command.LeafRunnableCommand * @param block * code block that triggers Flint core API */ -case class FlintSparkSqlCommand(block: FlintSpark => Seq[Row]) extends LeafRunnableCommand { +case class FlintSparkSqlCommand(override val output: Seq[Attribute] = Seq.empty)( + block: FlintSpark => Seq[Row]) + extends LeafRunnableCommand { override def run(sparkSession: SparkSession): Seq[Row] = block(new FlintSpark(sparkSession)) + + // Lazy arguments are required to specify here + override protected def otherCopyArgs: Seq[AnyRef] = block :: Nil } diff --git a/flint/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSqlSuite.scala b/flint/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSqlSuite.scala index fffe1fe295..f45be95325 100644 --- a/flint/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSqlSuite.scala +++ b/flint/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSqlSuite.scala @@ -12,7 +12,7 @@ import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex.getSkippingIn import org.scalatest.matchers.should.Matchers.convertToAnyShouldWrapper import org.apache.spark.FlintSuite -import org.apache.spark.sql.QueryTest +import org.apache.spark.sql.{QueryTest, Row} import org.apache.spark.sql.flint.config.FlintSparkConf.{HOST_ENDPOINT, HOST_PORT} class FlintSparkSqlSuite extends QueryTest with FlintSuite with OpenSearchSuite { @@ -25,16 +25,16 @@ class FlintSparkSqlSuite extends QueryTest with FlintSuite with OpenSearchSuite } /** Test table and index name */ - private val testTable = "test" + private val testTable = "flint_sql_test" private val testIndex = getSkippingIndexName(testTable) override def beforeAll(): Unit = { super.beforeAll() - sql(s""" | CREATE TABLE $testTable | ( - | name STRING + | name STRING, + | age INT | ) | USING CSV | OPTIONS ( @@ -48,13 +48,41 @@ class FlintSparkSqlSuite extends QueryTest with FlintSuite with OpenSearchSuite |""".stripMargin) } - test("drop skipping index") { + protected override def beforeEach(): Unit = { + super.beforeEach() flint .skippingIndex() .onTable(testTable) .addPartitions("year") + .addValueSet("name") + .addMinMax("age") .create() + } + + protected override def afterEach(): Unit = { + super.afterEach() + flint.deleteIndex(testIndex) + } + + test("describe skipping index") { + val result = sql(s"DESC SKIPPING INDEX ON $testTable") + checkAnswer( + result, + Seq( + Row("year", "int", "Partition"), + Row("name", "string", "ValuesSet"), + Row("age", "int", "MinMax"))) + } + + test("should return empty if no skipping index to describe") { + flint.deleteIndex(testIndex) + + val result = sql(s"DESC SKIPPING INDEX ON $testTable") + checkAnswer(result, Seq.empty) + } + + test("drop skipping index") { sql(s"DROP SKIPPING INDEX ON $testTable") flint.describeIndex(testIndex) shouldBe empty