From 22b9c0a243aa9b8809a0bf3f2e550fb268e34aa3 Mon Sep 17 00:00:00 2001 From: Chen Dai Date: Tue, 11 Jul 2023 18:51:40 -0700 Subject: [PATCH] Add Flint create and refresh index SQL support (#1800) * Add create and refresh index Signed-off-by: Chen Dai * Rename skipping kind to match its name in SQL parser Signed-off-by: Chen Dai * Support auto refresh property in WITH clause Signed-off-by: Chen Dai * Update doc Signed-off-by: Chen Dai * Add check for manual refresh on incremental refresh index Signed-off-by: Chen Dai * Add limitation to doc Signed-off-by: Chen Dai * Fix IT failure Signed-off-by: Chen Dai --------- Signed-off-by: Chen Dai --- flint/docs/index.md | 29 ++-- .../main/antlr4/FlintSparkSqlExtensions.g4 | 22 ++- .../src/main/antlr4/SparkSqlBase.g4 | 62 ++++++- .../opensearch/flint/spark/FlintSpark.scala | 15 +- .../skipping/FlintSparkSkippingStrategy.scala | 2 +- .../minmax/MinMaxSkippingStrategy.scala | 4 +- .../partition/PartitionSkippingStrategy.scala | 4 +- .../valueset/ValueSetSkippingStrategy.scala | 4 +- .../spark/sql/FlintSparkSqlAstBuilder.scala | 55 +++++- .../FlintSparkSkippingIndexITSuite.scala | 26 ++- .../flint/spark/FlintSparkSqlITSuite.scala | 156 ++++++++++++++++++ .../flint/spark/FlintSparkSqlSuite.scala | 90 ---------- 12 files changed, 346 insertions(+), 123 deletions(-) create mode 100644 flint/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSqlITSuite.scala delete mode 100644 flint/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSqlSuite.scala diff --git a/flint/docs/index.md b/flint/docs/index.md index 0a6453d999..27c68bcc74 100644 --- a/flint/docs/index.md +++ b/flint/docs/index.md @@ -17,11 +17,11 @@ A Flint index is ... Please see the following example in which Index Building Logic and Query Rewrite Logic column shows the basic idea behind each skipping index implementation. -| Skipping Index | Create Index Statement (TBD) | Index Building Logic | Query Rewrite Logic | -|----------------|-------------------------------------------------------------------------------------------------------------------------------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| Skipping Index | Create Index Statement | Index Building Logic | Query Rewrite Logic | +|----------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| | Partition | CREATE SKIPPING INDEX
ON alb_logs
FOR COLUMNS (
  year PARTITION,
  month PARTITION,
  day PARTITION,
  hour PARTITION
) | INSERT INTO flint_alb_logs_skipping_index
SELECT
  FIRST(year) AS year,
  FIRST(month) AS month,
  FIRST(day) AS day,
  FIRST(hour) AS hour,
  input_file_name() AS file_path
FROM alb_logs
GROUP BY
  input_file_name() | SELECT *
FROM alb_logs
WHERE year = 2023 AND month = 4
=>
SELECT *
FROM alb_logs (input_files =
  SELECT file_path
  FROM flint_alb_logs_skipping_index
  WHERE year = 2023 AND month = 4
)
WHERE year = 2023 AND month = 4 | -| ValueSet | CREATE SKIPPING INDEX
ON alb_logs
FOR COLUMNS (
  elb_status_code VALUE_LIST
) | INSERT INTO flint_alb_logs_skipping_index
SELECT
  COLLECT_SET(elb_status_code) AS elb_status_code,
  input_file_name() AS file_path
FROM alb_logs
GROUP BY
  input_file_name() | SELECT *
FROM alb_logs
WHERE elb_status_code = 404
=>
SELECT *
FROM alb_logs (input_files =
  SELECT file_path
  FROM flint_alb_logs_skipping_index
  WHERE ARRAY_CONTAINS(elb_status_code, 404)
)
WHERE elb_status_code = 404 | -| Min-Max | CREATE SKIPPING INDEX
ON alb_logs
FOR COLUMNS (
  request_processing_time MIN_MAX
) | INSERT INTO flint_alb_logs_skipping_index
SELECT
  MIN(request_processing_time) AS request_processing_time_min,
  MAX(request_processing_time) AS request_processing_time_max,
  input_file_name() AS file_path
FROM alb_logs
GROUP BY
  input_file_name() | SELECT *
FROM alb_logs
WHERE request_processing_time = 100
=>
SELECT *
FROM alb_logs (input_files =
SELECT file_path
  FROM flint_alb_logs_skipping_index
  WHERE request_processing_time_min <= 100
    AND 100 <= request_processing_time_max
)
WHERE request_processing_time = 100 +| ValueSet | CREATE SKIPPING INDEX
ON alb_logs
FOR COLUMNS (
  elb_status_code VALUE_SET
) | INSERT INTO flint_alb_logs_skipping_index
SELECT
  COLLECT_SET(elb_status_code) AS elb_status_code,
  input_file_name() AS file_path
FROM alb_logs
GROUP BY
  input_file_name() | SELECT *
FROM alb_logs
WHERE elb_status_code = 404
=>
SELECT *
FROM alb_logs (input_files =
  SELECT file_path
  FROM flint_alb_logs_skipping_index
  WHERE ARRAY_CONTAINS(elb_status_code, 404)
)
WHERE elb_status_code = 404 | +| MinMax | CREATE SKIPPING INDEX
ON alb_logs
FOR COLUMNS (
  request_processing_time MIN_MAX
) | INSERT INTO flint_alb_logs_skipping_index
SELECT
  MIN(request_processing_time) AS request_processing_time_min,
  MAX(request_processing_time) AS request_processing_time_max,
  input_file_name() AS file_path
FROM alb_logs
GROUP BY
  input_file_name() | SELECT *
FROM alb_logs
WHERE request_processing_time = 100
=>
SELECT *
FROM alb_logs (input_files =
SELECT file_path
  FROM flint_alb_logs_skipping_index
  WHERE request_processing_time_min <= 100
    AND 100 <= request_processing_time_max
)
WHERE request_processing_time = 100 ### Flint Index Specification @@ -122,8 +122,11 @@ DDL statement: ```sql CREATE SKIPPING INDEX ON -FOR COLUMNS ( column [, ...] ) +( column [, ...] ) WHERE +WITH (auto_refresh = (true|false)) + +REFRESH SKIPPING INDEX ON DESCRIBE SKIPPING INDEX ON @@ -135,23 +138,20 @@ DROP SKIPPING INDEX ON Skipping index type: ```sql - ::= { , , } - - ::= BLOOM_FILTER( bitCount, numOfHashFunctions ) #TBD - ::= MIN_MAX - ::= VALUE_SET + ::= { PARTITION, VALUE_SET, MIN_MAX } ``` Example: ```sql CREATE SKIPPING INDEX ON alb_logs -FOR COLUMNS ( - client_ip BLOOM_FILTER, +( elb_status_code VALUE_SET ) WHERE time > '2023-04-01 00:00:00' +REFRESH SKIPPING INDEX ON alb_logs + DESCRIBE SKIPPING INDEX ON alb_logs DROP SKIPPING INDEX ON alb_logs @@ -258,7 +258,6 @@ flint.skippingIndex() .addPartitions("year", "month", "day") .addValueSet("elb_status_code") .addMinMax("request_processing_time") - .addBloomFilter("client_ip") .create() flint.refresh("flint_alb_logs_skipping_index", FULL) @@ -302,3 +301,7 @@ val df = new SQLContext(sc).read ## Benchmarks TODO + +## Limitations + +Manual refreshing a table which already has skipping index being auto-refreshed, will be prevented. However, this assumption relies on the condition that the incremental refresh job is actively running in the same Spark cluster, which can be identified when performing the check. diff --git a/flint/flint-spark-integration/src/main/antlr4/FlintSparkSqlExtensions.g4 b/flint/flint-spark-integration/src/main/antlr4/FlintSparkSqlExtensions.g4 index 75f71f3ead..0ee976cb75 100644 --- a/flint/flint-spark-integration/src/main/antlr4/FlintSparkSqlExtensions.g4 +++ b/flint/flint-spark-integration/src/main/antlr4/FlintSparkSqlExtensions.g4 @@ -19,10 +19,22 @@ statement ; skippingIndexStatement - : describeSkippingIndexStatement + : createSkippingIndexStatement + | refreshSkippingIndexStatement + | describeSkippingIndexStatement | dropSkippingIndexStatement ; +createSkippingIndexStatement + : CREATE SKIPPING INDEX ON tableName=multipartIdentifier + LEFT_PAREN indexColTypeList RIGHT_PAREN + (WITH LEFT_PAREN propertyList RIGHT_PAREN)? + ; + +refreshSkippingIndexStatement + : REFRESH SKIPPING INDEX ON tableName=multipartIdentifier + ; + describeSkippingIndexStatement : (DESC | DESCRIBE) SKIPPING INDEX ON tableName=multipartIdentifier ; @@ -30,3 +42,11 @@ describeSkippingIndexStatement dropSkippingIndexStatement : DROP SKIPPING INDEX ON tableName=multipartIdentifier ; + +indexColTypeList + : indexColType (COMMA indexColType)* + ; + +indexColType + : identifier skipType=(PARTITION | VALUE_SET | MIN_MAX) + ; \ No newline at end of file diff --git a/flint/flint-spark-integration/src/main/antlr4/SparkSqlBase.g4 b/flint/flint-spark-integration/src/main/antlr4/SparkSqlBase.g4 index 6836b7cacd..a777cc59fe 100644 --- a/flint/flint-spark-integration/src/main/antlr4/SparkSqlBase.g4 +++ b/flint/flint-spark-integration/src/main/antlr4/SparkSqlBase.g4 @@ -85,6 +85,31 @@ grammar SparkSqlBase; } +propertyList + : property (COMMA property)* + ; + +property + : key=propertyKey (EQ? value=propertyValue)? + ; + +propertyKey + : identifier (DOT identifier)* + | STRING + ; + +propertyValue + : INTEGER_VALUE + | DECIMAL_VALUE + | booleanValue + | STRING + ; + +booleanValue + : TRUE | FALSE + ; + + multipartIdentifier : parts+=identifier (DOT parts+=identifier)* ; @@ -106,20 +131,46 @@ nonReserved // Flint lexical tokens -SKIPPING : 'SKIPPING'; +MIN_MAX: 'MIN_MAX'; +SKIPPING: 'SKIPPING'; +VALUE_SET: 'VALUE_SET'; // Spark lexical tokens SEMICOLON: ';'; +LEFT_PAREN: '('; +RIGHT_PAREN: ')'; +COMMA: ','; +DOT: '.'; + + +CREATE: 'CREATE'; DESC: 'DESC'; DESCRIBE: 'DESCRIBE'; -DOT: '.'; DROP: 'DROP'; +FALSE: 'FALSE'; INDEX: 'INDEX'; -MINUS: '-'; ON: 'ON'; +PARTITION: 'PARTITION'; +REFRESH: 'REFRESH'; +STRING: 'STRING'; +TRUE: 'TRUE'; +WITH: 'WITH'; + + +EQ : '=' | '=='; +MINUS: '-'; + + +INTEGER_VALUE + : DIGIT+ + ; + +DECIMAL_VALUE + : DECIMAL_DIGITS {isValidDecimal()}? + ; IDENTIFIER : (LETTER | DIGIT | '_')+ @@ -129,6 +180,11 @@ BACKQUOTED_IDENTIFIER : '`' ( ~'`' | '``' )* '`' ; +fragment DECIMAL_DIGITS + : DIGIT+ '.' DIGIT* + | '.' DIGIT+ + ; + fragment DIGIT : [0-9] ; diff --git a/flint/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala b/flint/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala index 635cbae50a..b9305f9b17 100644 --- a/flint/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala +++ b/flint/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala @@ -18,7 +18,7 @@ import org.opensearch.flint.spark.FlintSparkIndex.ID_COLUMN import org.opensearch.flint.spark.skipping.{FlintSparkSkippingIndex, FlintSparkSkippingStrategy} import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex.SKIPPING_INDEX_TYPE import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy.{SkippingKind, SkippingKindSerializer} -import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy.SkippingKind.{MinMax, Partition, ValuesSet} +import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy.SkippingKind.{MIN_MAX, PARTITION, VALUE_SET} import org.opensearch.flint.spark.skipping.minmax.MinMaxSkippingStrategy import org.opensearch.flint.spark.skipping.partition.PartitionSkippingStrategy import org.opensearch.flint.spark.skipping.valueset.ValueSetSkippingStrategy @@ -101,6 +101,9 @@ class FlintSpark(val spark: SparkSession) { } mode match { + case FULL if isIncrementalRefreshing(indexName) => + throw new IllegalStateException( + s"Index $indexName is incremental refreshing and cannot be manual refreshed") case FULL => writeFlintIndex( spark.read @@ -113,6 +116,7 @@ class FlintSpark(val spark: SparkSession) { val job = spark.readStream .table(tableName) .writeStream + .queryName(indexName) .outputMode(Append()) .foreachBatch { (batchDF: DataFrame, _: Long) => writeFlintIndex(batchDF) @@ -156,6 +160,9 @@ class FlintSpark(val spark: SparkSession) { } } + private def isIncrementalRefreshing(indexName: String): Boolean = + spark.streams.active.exists(_.name == indexName) + // TODO: Remove all parsing logic below once Flint spec finalized and FlintMetadata strong typed private def getSourceTableName(index: FlintSparkIndex): String = { val json = parse(index.metadata().getContent) @@ -182,11 +189,11 @@ class FlintSpark(val spark: SparkSession) { val columnType = (colInfo \ "columnType").extract[String] skippingKind match { - case Partition => + case PARTITION => PartitionSkippingStrategy(columnName = columnName, columnType = columnType) - case ValuesSet => + case VALUE_SET => ValueSetSkippingStrategy(columnName = columnName, columnType = columnType) - case MinMax => + case MIN_MAX => MinMaxSkippingStrategy(columnName = columnName, columnType = columnType) case other => throw new IllegalStateException(s"Unknown skipping strategy: $other") diff --git a/flint/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingStrategy.scala b/flint/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingStrategy.scala index 678a01bed5..61721481de 100644 --- a/flint/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingStrategy.scala +++ b/flint/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingStrategy.scala @@ -65,7 +65,7 @@ object FlintSparkSkippingStrategy { type SkippingKind = Value // Use Value[s]Set because ValueSet already exists in Enumeration - val Partition, ValuesSet, MinMax = Value + val PARTITION, VALUE_SET, MIN_MAX = Value } /** json4s doesn't serialize Enum by default */ diff --git a/flint/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/minmax/MinMaxSkippingStrategy.scala b/flint/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/minmax/MinMaxSkippingStrategy.scala index d141104cbd..779148c205 100644 --- a/flint/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/minmax/MinMaxSkippingStrategy.scala +++ b/flint/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/minmax/MinMaxSkippingStrategy.scala @@ -6,7 +6,7 @@ package org.opensearch.flint.spark.skipping.minmax import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy -import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy.SkippingKind.{MinMax, SkippingKind} +import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy.SkippingKind.{MIN_MAX, SkippingKind} import org.apache.spark.sql.Column import org.apache.spark.sql.catalyst.expressions.{AttributeReference, EqualTo, Literal, Predicate} @@ -17,7 +17,7 @@ import org.apache.spark.sql.functions.col * Skipping strategy based on min-max boundary of column values. */ case class MinMaxSkippingStrategy( - override val kind: SkippingKind = MinMax, + override val kind: SkippingKind = MIN_MAX, override val columnName: String, override val columnType: String) extends FlintSparkSkippingStrategy { diff --git a/flint/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/partition/PartitionSkippingStrategy.scala b/flint/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/partition/PartitionSkippingStrategy.scala index c3ed33bb34..8be602b212 100644 --- a/flint/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/partition/PartitionSkippingStrategy.scala +++ b/flint/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/partition/PartitionSkippingStrategy.scala @@ -6,7 +6,7 @@ package org.opensearch.flint.spark.skipping.partition import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy -import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy.SkippingKind.{Partition, SkippingKind} +import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy.SkippingKind.{PARTITION, SkippingKind} import org.apache.spark.sql.catalyst.expressions.{AttributeReference, EqualTo, Literal, Predicate} import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateFunction, First} @@ -16,7 +16,7 @@ import org.apache.spark.sql.functions.col * Skipping strategy for partitioned columns of source table. */ case class PartitionSkippingStrategy( - override val kind: SkippingKind = Partition, + override val kind: SkippingKind = PARTITION, override val columnName: String, override val columnType: String) extends FlintSparkSkippingStrategy { diff --git a/flint/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/valueset/ValueSetSkippingStrategy.scala b/flint/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/valueset/ValueSetSkippingStrategy.scala index 94c6c4989d..389ddc7dc0 100644 --- a/flint/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/valueset/ValueSetSkippingStrategy.scala +++ b/flint/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/valueset/ValueSetSkippingStrategy.scala @@ -6,7 +6,7 @@ package org.opensearch.flint.spark.skipping.valueset import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy -import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy.SkippingKind.{SkippingKind, ValuesSet} +import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy.SkippingKind.{SkippingKind, VALUE_SET} import org.apache.spark.sql.catalyst.expressions.{AttributeReference, EqualTo, Literal, Predicate} import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateFunction, CollectSet} @@ -16,7 +16,7 @@ import org.apache.spark.sql.functions.col * Skipping strategy based on unique column value set. */ case class ValueSetSkippingStrategy( - override val kind: SkippingKind = ValuesSet, + override val kind: SkippingKind = VALUE_SET, override val columnName: String, override val columnType: String) extends FlintSparkSkippingStrategy { diff --git a/flint/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/FlintSparkSqlAstBuilder.scala b/flint/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/FlintSparkSqlAstBuilder.scala index 4560b60b65..64d57dd5bf 100644 --- a/flint/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/FlintSparkSqlAstBuilder.scala +++ b/flint/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/FlintSparkSqlAstBuilder.scala @@ -5,9 +5,12 @@ package org.opensearch.flint.spark.sql +import org.opensearch.flint.spark.FlintSpark.RefreshMode import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex.getSkippingIndexName -import org.opensearch.flint.spark.sql.FlintSparkSqlExtensionsParser.{DescribeSkippingIndexStatementContext, DropSkippingIndexStatementContext} +import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy.SkippingKind +import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy.SkippingKind.{MIN_MAX, PARTITION, VALUE_SET} +import org.opensearch.flint.spark.sql.FlintSparkSqlExtensionsParser._ import org.apache.spark.sql.Row import org.apache.spark.sql.catalyst.expressions.AttributeReference @@ -19,6 +22,41 @@ import org.apache.spark.sql.types.StringType */ class FlintSparkSqlAstBuilder extends FlintSparkSqlExtensionsBaseVisitor[Command] { + override def visitCreateSkippingIndexStatement( + ctx: CreateSkippingIndexStatementContext): Command = + FlintSparkSqlCommand() { flint => + // Create skipping index + val indexBuilder = flint + .skippingIndex() + .onTable(ctx.tableName.getText) + + ctx.indexColTypeList().indexColType().forEach { colTypeCtx => + val colName = colTypeCtx.identifier().getText + val skipType = SkippingKind.withName(colTypeCtx.skipType.getText) + skipType match { + case PARTITION => indexBuilder.addPartitions(colName) + case VALUE_SET => indexBuilder.addValueSet(colName) + case MIN_MAX => indexBuilder.addMinMax(colName) + } + } + indexBuilder.create() + + // Trigger auto refresh if enabled + if (isAutoRefreshEnabled(ctx.propertyList())) { + val indexName = getSkippingIndexName(ctx.tableName.getText) + flint.refreshIndex(indexName, RefreshMode.INCREMENTAL) + } + Seq.empty + } + + override def visitRefreshSkippingIndexStatement( + ctx: RefreshSkippingIndexStatementContext): Command = + FlintSparkSqlCommand() { flint => + val indexName = getSkippingIndexName(ctx.tableName.getText) + flint.refreshIndex(indexName, RefreshMode.FULL) + Seq.empty + } + override def visitDescribeSkippingIndexStatement( ctx: DescribeSkippingIndexStatementContext): Command = { val outputSchema = Seq( @@ -46,6 +84,21 @@ class FlintSparkSqlAstBuilder extends FlintSparkSqlExtensionsBaseVisitor[Command Seq.empty } + private def isAutoRefreshEnabled(ctx: PropertyListContext): Boolean = { + if (ctx == null) { + false + } else { + ctx + .property() + .forEach(p => { + if (p.key.getText == "auto_refresh") { + return p.value.getText.toBoolean + } + }) + false + } + } + override def aggregateResult(aggregate: Command, nextResult: Command): Command = if (nextResult != null) nextResult else aggregate } diff --git a/flint/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala b/flint/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala index 068100e814..2da0f0d4de 100644 --- a/flint/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala +++ b/flint/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala @@ -108,22 +108,22 @@ class FlintSparkSkippingIndexITSuite | "kind": "skipping", | "indexedColumns": [ | { - | "kind": "Partition", + | "kind": "PARTITION", | "columnName": "year", | "columnType": "int" | }, | { - | "kind": "Partition", + | "kind": "PARTITION", | "columnName": "month", | "columnType": "int" | }, | { - | "kind": "ValuesSet", + | "kind": "VALUE_SET", | "columnName": "address", | "columnType": "string" | }, | { - | "kind": "MinMax", + | "kind": "MIN_MAX", | "columnName": "age", | "columnType": "int" | }], @@ -201,6 +201,24 @@ class FlintSparkSkippingIndexITSuite indexData should have size 2 } + test("should fail to manual refresh an incremental refreshing index") { + flint + .skippingIndex() + .onTable(testTable) + .addPartitions("year", "month") + .create() + + val jobId = flint.refreshIndex(testIndex, INCREMENTAL) + val job = spark.streams.get(jobId.get) + failAfter(streamingTimeout) { + job.processAllAvailable() + } + + assertThrows[IllegalStateException] { + flint.refreshIndex(testIndex, FULL) + } + } + test("can have only 1 skipping index on a table") { flint .skippingIndex() diff --git a/flint/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSqlITSuite.scala b/flint/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSqlITSuite.scala new file mode 100644 index 0000000000..4f5ca98fa8 --- /dev/null +++ b/flint/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSqlITSuite.scala @@ -0,0 +1,156 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.flint.spark + +import scala.Option.empty + +import org.opensearch.flint.OpenSearchSuite +import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex.getSkippingIndexName +import org.scalatest.matchers.must.Matchers.defined +import org.scalatest.matchers.should.Matchers.convertToAnyShouldWrapper + +import org.apache.spark.FlintSuite +import org.apache.spark.sql.{QueryTest, Row} +import org.apache.spark.sql.flint.FlintDataSourceV2.FLINT_DATASOURCE +import org.apache.spark.sql.flint.config.FlintSparkConf.{HOST_ENDPOINT, HOST_PORT, REFRESH_POLICY} +import org.apache.spark.sql.streaming.StreamTest + +class FlintSparkSqlITSuite + extends QueryTest + with FlintSuite + with OpenSearchSuite + with StreamTest { + + /** Flint Spark high level API for assertion */ + private lazy val flint: FlintSpark = new FlintSpark(spark) + + /** Test table and index name */ + private val testTable = "flint_sql_test" + private val testIndex = getSkippingIndexName(testTable) + + override def beforeAll(): Unit = { + super.beforeAll() + + // Configure for FlintSpark explicit created above and the one behind Flint SQL + setFlintSparkConf(HOST_ENDPOINT, openSearchHost) + setFlintSparkConf(HOST_PORT, openSearchPort) + setFlintSparkConf(REFRESH_POLICY, true) + + // Create test table + sql(s""" + | CREATE TABLE $testTable + | ( + | name STRING, + | age INT + | ) + | USING CSV + | OPTIONS ( + | header 'false', + | delimiter '\t' + | ) + | PARTITIONED BY ( + | year INT, + | month INT + | ) + |""".stripMargin) + + sql(s""" + | INSERT INTO $testTable + | PARTITION (year=2023, month=4) + | VALUES ('Hello', 30) + | """.stripMargin) + } + + protected override def beforeEach(): Unit = { + super.beforeEach() + flint + .skippingIndex() + .onTable(testTable) + .addPartitions("year") + .addValueSet("name") + .addMinMax("age") + .create() + } + + protected override def afterEach(): Unit = { + super.afterEach() + flint.deleteIndex(testIndex) + + // Stop all streaming jobs if any + spark.streams.active.foreach { job => + job.stop() + job.awaitTermination() + } + } + + test("create skipping index with auto refresh") { + flint.deleteIndex(testIndex) + sql(s""" + | CREATE SKIPPING INDEX ON $testTable + | ( + | year PARTITION, + | name VALUE_SET, + | age MIN_MAX + | ) + | WITH (auto_refresh = true) + | """.stripMargin) + + // Wait for streaming job complete current micro batch + val job = spark.streams.active.find(_.name == testIndex) + job shouldBe defined + failAfter(streamingTimeout) { + job.get.processAllAvailable() + } + + val indexData = spark.read.format(FLINT_DATASOURCE).load(testIndex) + flint.describeIndex(testIndex) shouldBe defined + indexData.count() shouldBe 1 + } + + test("create skipping index with manual refresh") { + flint.deleteIndex(testIndex) + sql(s""" + | CREATE SKIPPING INDEX ON $testTable + | ( + | year PARTITION, + | name VALUE_SET, + | age MIN_MAX + | ) + | """.stripMargin) + + val indexData = spark.read.format(FLINT_DATASOURCE).load(testIndex) + + flint.describeIndex(testIndex) shouldBe defined + indexData.count() shouldBe 0 + + sql(s"REFRESH SKIPPING INDEX ON $testTable") + indexData.count() shouldBe 1 + } + + test("describe skipping index") { + val result = sql(s"DESC SKIPPING INDEX ON $testTable") + + checkAnswer( + result, + Seq( + Row("year", "int", "PARTITION"), + Row("name", "string", "VALUE_SET"), + Row("age", "int", "MIN_MAX"))) + } + + test("should return empty if no skipping index to describe") { + flint.deleteIndex(testIndex) + + val result = sql(s"DESC SKIPPING INDEX ON $testTable") + checkAnswer(result, Seq.empty) + } + + test("drop skipping index") { + sql(s"DROP SKIPPING INDEX ON $testTable") + + flint.describeIndex(testIndex) shouldBe empty + } +} diff --git a/flint/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSqlSuite.scala b/flint/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSqlSuite.scala deleted file mode 100644 index f45be95325..0000000000 --- a/flint/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSqlSuite.scala +++ /dev/null @@ -1,90 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.flint.spark - -import scala.Option.empty - -import org.opensearch.flint.OpenSearchSuite -import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex.getSkippingIndexName -import org.scalatest.matchers.should.Matchers.convertToAnyShouldWrapper - -import org.apache.spark.FlintSuite -import org.apache.spark.sql.{QueryTest, Row} -import org.apache.spark.sql.flint.config.FlintSparkConf.{HOST_ENDPOINT, HOST_PORT} - -class FlintSparkSqlSuite extends QueryTest with FlintSuite with OpenSearchSuite { - - /** Flint Spark high level API for assertion */ - private lazy val flint: FlintSpark = { - setFlintSparkConf(HOST_ENDPOINT, openSearchHost) - setFlintSparkConf(HOST_PORT, openSearchPort) - new FlintSpark(spark) - } - - /** Test table and index name */ - private val testTable = "flint_sql_test" - private val testIndex = getSkippingIndexName(testTable) - - override def beforeAll(): Unit = { - super.beforeAll() - sql(s""" - | CREATE TABLE $testTable - | ( - | name STRING, - | age INT - | ) - | USING CSV - | OPTIONS ( - | header 'false', - | delimiter '\t' - | ) - | PARTITIONED BY ( - | year INT, - | month INT - | ) - |""".stripMargin) - } - - protected override def beforeEach(): Unit = { - super.beforeEach() - flint - .skippingIndex() - .onTable(testTable) - .addPartitions("year") - .addValueSet("name") - .addMinMax("age") - .create() - } - - protected override def afterEach(): Unit = { - super.afterEach() - flint.deleteIndex(testIndex) - } - - test("describe skipping index") { - val result = sql(s"DESC SKIPPING INDEX ON $testTable") - - checkAnswer( - result, - Seq( - Row("year", "int", "Partition"), - Row("name", "string", "ValuesSet"), - Row("age", "int", "MinMax"))) - } - - test("should return empty if no skipping index to describe") { - flint.deleteIndex(testIndex) - - val result = sql(s"DESC SKIPPING INDEX ON $testTable") - checkAnswer(result, Seq.empty) - } - - test("drop skipping index") { - sql(s"DROP SKIPPING INDEX ON $testTable") - - flint.describeIndex(testIndex) shouldBe empty - } -}