Skip to content

Commit

Permalink
Rename skipping kind to match its name in SQL parser
Browse files Browse the repository at this point in the history
Signed-off-by: Chen Dai <daichen@amazon.com>
  • Loading branch information
dai-chen committed Jul 5, 2023
1 parent bc8630d commit 8657832
Show file tree
Hide file tree
Showing 8 changed files with 25 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import org.opensearch.flint.spark.FlintSpark.RefreshMode.{FULL, INCREMENTAL, Ref
import org.opensearch.flint.spark.skipping.{FlintSparkSkippingIndex, FlintSparkSkippingStrategy}
import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex.SKIPPING_INDEX_TYPE
import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy.{SkippingKind, SkippingKindSerializer}
import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy.SkippingKind.{MinMax, Partition, ValuesSet}
import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy.SkippingKind.{MIN_MAX, PARTITION, VALUE_SET}
import org.opensearch.flint.spark.skipping.minmax.MinMaxSkippingStrategy
import org.opensearch.flint.spark.skipping.partition.PartitionSkippingStrategy
import org.opensearch.flint.spark.skipping.valueset.ValueSetSkippingStrategy
Expand Down Expand Up @@ -170,11 +170,11 @@ class FlintSpark(val spark: SparkSession) {
val columnType = (colInfo \ "columnType").extract[String]

skippingKind match {
case Partition =>
case PARTITION =>
PartitionSkippingStrategy(columnName = columnName, columnType = columnType)
case ValuesSet =>
case VALUE_SET =>
ValueSetSkippingStrategy(columnName = columnName, columnType = columnType)
case MinMax =>
case MIN_MAX =>
MinMaxSkippingStrategy(columnName = columnName, columnType = columnType)
case other =>
throw new IllegalStateException(s"Unknown skipping strategy: $other")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ object FlintSparkSkippingStrategy {
type SkippingKind = Value

// Use Value[s]Set because ValueSet already exists in Enumeration
val Partition, ValuesSet, MinMax = Value
val PARTITION, VALUE_SET, MIN_MAX = Value
}

/** json4s doesn't serialize Enum by default */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
package org.opensearch.flint.spark.skipping.minmax

import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy
import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy.SkippingKind.{MinMax, SkippingKind}
import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy.SkippingKind.{MIN_MAX, SkippingKind}

import org.apache.spark.sql.Column
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, EqualTo, Literal, Predicate}
Expand All @@ -17,7 +17,7 @@ import org.apache.spark.sql.functions.col
* Skipping strategy based on min-max boundary of column values.
*/
case class MinMaxSkippingStrategy(
override val kind: SkippingKind = MinMax,
override val kind: SkippingKind = MIN_MAX,
override val columnName: String,
override val columnType: String)
extends FlintSparkSkippingStrategy {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
package org.opensearch.flint.spark.skipping.partition

import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy
import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy.SkippingKind.{Partition, SkippingKind}
import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy.SkippingKind.{PARTITION, SkippingKind}

import org.apache.spark.sql.catalyst.expressions.{AttributeReference, EqualTo, Literal, Predicate}
import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateFunction, First}
Expand All @@ -16,7 +16,7 @@ import org.apache.spark.sql.functions.col
* Skipping strategy for partitioned columns of source table.
*/
case class PartitionSkippingStrategy(
override val kind: SkippingKind = Partition,
override val kind: SkippingKind = PARTITION,
override val columnName: String,
override val columnType: String)
extends FlintSparkSkippingStrategy {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
package org.opensearch.flint.spark.skipping.valueset

import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy
import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy.SkippingKind.{SkippingKind, ValuesSet}
import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy.SkippingKind.{SkippingKind, VALUE_SET}

import org.apache.spark.sql.catalyst.expressions.{AttributeReference, EqualTo, Literal, Predicate}
import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateFunction, CollectSet}
Expand All @@ -16,7 +16,7 @@ import org.apache.spark.sql.functions.col
* Skipping strategy based on unique column value set.
*/
case class ValueSetSkippingStrategy(
override val kind: SkippingKind = ValuesSet,
override val kind: SkippingKind = VALUE_SET,
override val columnName: String,
override val columnType: String)
extends FlintSparkSkippingStrategy {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@

package org.opensearch.flint.spark.sql

import java.util.Locale

import org.opensearch.flint.spark.FlintSpark.RefreshMode
import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex
import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex.getSkippingIndexName
import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy.SkippingKind
import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy.SkippingKind._
import org.opensearch.flint.spark.sql.FlintSparkSqlExtensionsParser.{CreateSkippingIndexStatementContext, DescribeSkippingIndexStatementContext, DropSkippingIndexStatementContext, RefreshSkippingIndexStatementContext}

import org.apache.spark.sql.Row
Expand All @@ -31,12 +31,12 @@ class FlintSparkSqlAstBuilder extends FlintSparkSqlExtensionsBaseVisitor[Command

ctx.indexColTypeList().indexColType().forEach { colTypeCtx =>
val colName = colTypeCtx.identifier().getText
val skipType = colTypeCtx.skipType.getText.toLowerCase(Locale.ROOT)
val skipType = SkippingKind.withName(colTypeCtx.skipType.getText)

skipType match {
case "partition" => indexBuilder.addPartitions(colName)
case "value_set" => indexBuilder.addValueSet(colName)
case "min_max" => indexBuilder.addMinMax(colName)
case PARTITION => indexBuilder.addPartitions(colName)
case VALUE_SET => indexBuilder.addValueSet(colName)
case MIN_MAX => indexBuilder.addMinMax(colName)
}
}
indexBuilder.create()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,22 +108,22 @@ class FlintSparkSkippingIndexSuite
| "kind": "skipping",
| "indexedColumns": [
| {
| "kind": "Partition",
| "kind": "PARTITION",
| "columnName": "year",
| "columnType": "int"
| },
| {
| "kind": "Partition",
| "kind": "PARTITION",
| "columnName": "month",
| "columnType": "int"
| },
| {
| "kind": "ValuesSet",
| "kind": "VALUE_SET",
| "columnName": "address",
| "columnType": "string"
| },
| {
| "kind": "MinMax",
| "kind": "MIN_MAX",
| "columnName": "age",
| "columnType": "int"
| }],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import org.apache.spark.sql.{QueryTest, Row}
import org.apache.spark.sql.flint.FlintDataSourceV2.FLINT_DATASOURCE
import org.apache.spark.sql.flint.config.FlintSparkConf.{HOST_ENDPOINT, HOST_PORT, REFRESH_POLICY}

class FlintSparkSqlSuite extends QueryTest with FlintSuite with OpenSearchSuite {
class FlintSparkSqlITSuite extends QueryTest with FlintSuite with OpenSearchSuite {

/** Flint Spark high level API for assertion */
private lazy val flint: FlintSpark = new FlintSpark(spark)
Expand Down Expand Up @@ -101,9 +101,9 @@ class FlintSparkSqlSuite extends QueryTest with FlintSuite with OpenSearchSuite
checkAnswer(
result,
Seq(
Row("year", "int", "Partition"),
Row("name", "string", "ValuesSet"),
Row("age", "int", "MinMax")))
Row("year", "int", "PARTITION"),
Row("name", "string", "VALUE_SET"),
Row("age", "int", "MIN_MAX")))
}

test("should return empty if no skipping index to describe") {
Expand Down

0 comments on commit 8657832

Please sign in to comment.