Skip to content

Commit

Permalink
Add min max skipping index (#1726)
Browse files Browse the repository at this point in the history
* Add min max index with IT

Signed-off-by: Chen Dai <daichen@amazon.com>

* Update user doc

Signed-off-by: Chen Dai <daichen@amazon.com>

* Fix output schema issue and IT

Signed-off-by: Chen Dai <daichen@amazon.com>

---------

Signed-off-by: Chen Dai <daichen@amazon.com>
  • Loading branch information
dai-chen committed Jun 20, 2023
1 parent 255f040 commit 65583d6
Show file tree
Hide file tree
Showing 6 changed files with 106 additions and 11 deletions.
1 change: 1 addition & 0 deletions flint/docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,7 @@ flint.skippingIndex()
.filterBy("time > 2023-04-01 00:00:00")
.addPartitions("year", "month", "day")
.addValueSet("elb_status_code")
.addMinMax("request_processing_time")
.addBloomFilter("client_ip")
.create()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ import org.opensearch.flint.spark.FlintSpark.RefreshMode.{FULL, INCREMENTAL, Ref
import org.opensearch.flint.spark.skipping.{FlintSparkSkippingIndex, FlintSparkSkippingStrategy}
import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex.SKIPPING_INDEX_TYPE
import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy.{SkippingKind, SkippingKindSerializer}
import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy.SkippingKind.{Partition, ValuesSet}
import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy.SkippingKind.{MinMax, Partition, ValuesSet}
import org.opensearch.flint.spark.skipping.minmax.MinMaxSkippingStrategy
import org.opensearch.flint.spark.skipping.partition.PartitionSkippingStrategy
import org.opensearch.flint.spark.skipping.valueset.ValueSetSkippingStrategy

Expand Down Expand Up @@ -173,6 +174,8 @@ class FlintSpark(val spark: SparkSession) {
PartitionSkippingStrategy(columnName = columnName, columnType = columnType)
case ValuesSet =>
ValueSetSkippingStrategy(columnName = columnName, columnType = columnType)
case MinMax =>
MinMaxSkippingStrategy(columnName = columnName, columnType = columnType)
case other =>
throw new IllegalStateException(s"Unknown skipping strategy: $other")
}
Expand Down Expand Up @@ -256,6 +259,22 @@ object FlintSpark {
this
}

/**
* Add min max skipping indexed column.
*
* @param colName
* indexed column name
* @return
* index builder
*/
def addMinMax(colName: String): IndexBuilder = {
val col = findColumn(colName)
indexedColumns = indexedColumns :+ MinMaxSkippingStrategy(
columnName = col.name,
columnType = col.dataType)
this
}

/**
* Create index.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,13 +71,13 @@ class FlintSparkSkippingIndex(
}

private def getSchema: String = {
val indexFieldTypes = indexedColumns.map { indexCol =>
val columnName = indexCol.columnName
// Data type INT from catalog is not recognized by Spark DataType.fromJson()
val columnType = if (indexCol.columnType == "int") "integer" else indexCol.columnType
val sparkType = DataType.fromJson("\"" + columnType + "\"")
StructField(columnName, sparkType, nullable = false)
}
val indexFieldTypes =
indexedColumns.flatMap(_.outputSchema()).map { case (colName, colType) =>
// Data type INT from catalog is not recognized by Spark DataType.fromJson()
val columnType = if (colType == "int") "integer" else colType
val sparkType = DataType.fromJson("\"" + columnType + "\"")
StructField(colName, sparkType, nullable = false)
}

val allFieldTypes =
indexFieldTypes :+ StructField(FILE_PATH_COLUMN, StringType, nullable = false)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ object FlintSparkSkippingStrategy {
type SkippingKind = Value

// Use Value[s]Set because ValueSet already exists in Enumeration
val Partition, ValuesSet = Value
val Partition, ValuesSet, MinMax = Value
}

/** json4s doesn't serialize Enum by default */
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.flint.spark.skipping.minmax

import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy
import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy.SkippingKind.{MinMax, SkippingKind}

import org.apache.spark.sql.Column
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, EqualTo, Literal, Predicate}
import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateFunction, Max, Min}
import org.apache.spark.sql.functions.col

/**
* Skipping strategy based on min-max boundary of column values.
*/
case class MinMaxSkippingStrategy(
override val kind: SkippingKind = MinMax,
override val columnName: String,
override val columnType: String)
extends FlintSparkSkippingStrategy {

/** Column name in Flint index data. */
private def minColName = s"MinMax_${columnName}_0"
private def maxColName = s"MinMax_${columnName}_1"

override def outputSchema(): Map[String, String] =
Map(minColName -> columnType, maxColName -> columnType)

override def getAggregators: Seq[AggregateFunction] =
Seq(Min(col(columnName).expr), Max(col(columnName).expr))

override def rewritePredicate(predicate: Predicate): Option[Predicate] =
predicate.collect { case EqualTo(AttributeReference(`columnName`, _, _, _), value: Literal) =>
rewriteTo(col(minColName) <= value && col(maxColName) >= value)
}.headOption

// Convert a column to predicate
private def rewriteTo(col: Column): Predicate = col.expr.asInstanceOf[Predicate]
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ class FlintSparkSkippingIndexSuite
| CREATE TABLE $testTable
| (
| name STRING,
| age INT,
| address STRING
| )
| USING CSV
Expand All @@ -67,13 +68,13 @@ class FlintSparkSkippingIndexSuite
sql(s"""
| INSERT INTO $testTable
| PARTITION (year=2023, month=4)
| VALUES ('Hello', 'Seattle')
| VALUES ('Hello', 30, 'Seattle')
| """.stripMargin)

sql(s"""
| INSERT INTO $testTable
| PARTITION (year=2023, month=5)
| VALUES ('World', 'Portland')
| VALUES ('World', 25, 'Portland')
| """.stripMargin)
}

Expand All @@ -96,6 +97,7 @@ class FlintSparkSkippingIndexSuite
.onTable(testTable)
.addPartitions("year", "month")
.addValueSet("address")
.addMinMax("age")
.create()

val indexName = s"flint_${testTable}_skipping_index"
Expand All @@ -119,6 +121,11 @@ class FlintSparkSkippingIndexSuite
| "kind": "ValuesSet",
| "columnName": "address",
| "columnType": "string"
| },
| {
| "kind": "MinMax",
| "columnName": "age",
| "columnType": "int"
| }],
| "source": "$testTable"
| },
Expand All @@ -132,6 +139,12 @@ class FlintSparkSkippingIndexSuite
| "address": {
| "type": "keyword"
| },
| "MinMax_age_0": {
| "type": "integer"
| },
| "MinMax_age_1" : {
| "type": "integer"
| },
| "file_path": {
| "type": "keyword"
| }
Expand Down Expand Up @@ -268,6 +281,26 @@ class FlintSparkSkippingIndexSuite
hasIndexFilter(col("address") === "Seattle"))
}

test("can build min max skipping index and rewrite applicable query") {
flint
.skippingIndex()
.onTable(testTable)
.addMinMax("age")
.create()
flint.refreshIndex(testIndex, FULL)

val query = sql(s"""
| SELECT name
| FROM $testTable
| WHERE age = 25
|""".stripMargin)

checkAnswer(query, Row("World"))
query.queryExecution.executedPlan should
useFlintSparkSkippingFileIndex(
hasIndexFilter(col("MinMax_age_0") <= 25 && col("MinMax_age_1") >= 25))
}

test("should return empty if describe index not exist") {
flint.describeIndex("non-exist") shouldBe empty
}
Expand Down

0 comments on commit 65583d6

Please sign in to comment.