Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add min max skipping index #1726

Merged
merged 3 commits into from
Jun 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions flint/docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,7 @@ flint.skippingIndex()
.filterBy("time > 2023-04-01 00:00:00")
.addPartitions("year", "month", "day")
.addValueSet("elb_status_code")
.addMinMax("request_processing_time")
.addBloomFilter("client_ip")
.create()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ import org.opensearch.flint.spark.FlintSpark.RefreshMode.{FULL, INCREMENTAL, Ref
import org.opensearch.flint.spark.skipping.{FlintSparkSkippingIndex, FlintSparkSkippingStrategy}
import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex.SKIPPING_INDEX_TYPE
import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy.{SkippingKind, SkippingKindSerializer}
import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy.SkippingKind.{Partition, ValuesSet}
import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy.SkippingKind.{MinMax, Partition, ValuesSet}
import org.opensearch.flint.spark.skipping.minmax.MinMaxSkippingStrategy
import org.opensearch.flint.spark.skipping.partition.PartitionSkippingStrategy
import org.opensearch.flint.spark.skipping.valueset.ValueSetSkippingStrategy

Expand Down Expand Up @@ -173,6 +174,8 @@ class FlintSpark(val spark: SparkSession) {
PartitionSkippingStrategy(columnName = columnName, columnType = columnType)
case ValuesSet =>
ValueSetSkippingStrategy(columnName = columnName, columnType = columnType)
case MinMax =>
MinMaxSkippingStrategy(columnName = columnName, columnType = columnType)
case other =>
throw new IllegalStateException(s"Unknown skipping strategy: $other")
}
Expand Down Expand Up @@ -257,6 +260,22 @@ object FlintSpark {
this
}

/**
* Add min max skipping indexed column.
*
* @param colName
* indexed column name
* @return
* index builder
*/
def addMinMax(colName: String): IndexBuilder = {
val col = findColumn(colName)
indexedColumns = indexedColumns :+ MinMaxSkippingStrategy(
columnName = col.name,
columnType = col.dataType)
this
}

/**
* Create index.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,13 +71,13 @@ class FlintSparkSkippingIndex(
}

private def getSchema: String = {
val indexFieldTypes = indexedColumns.map { indexCol =>
val columnName = indexCol.columnName
// Data type INT from catalog is not recognized by Spark DataType.fromJson()
val columnType = if (indexCol.columnType == "int") "integer" else indexCol.columnType
val sparkType = DataType.fromJson("\"" + columnType + "\"")
StructField(columnName, sparkType, nullable = false)
}
val indexFieldTypes =
indexedColumns.flatMap(_.outputSchema()).map { case (colName, colType) =>
// Data type INT from catalog is not recognized by Spark DataType.fromJson()
val columnType = if (colType == "int") "integer" else colType
val sparkType = DataType.fromJson("\"" + columnType + "\"")
StructField(colName, sparkType, nullable = false)
}

val allFieldTypes =
indexFieldTypes :+ StructField(FILE_PATH_COLUMN, StringType, nullable = false)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ object FlintSparkSkippingStrategy {
type SkippingKind = Value

// Use Value[s]Set because ValueSet already exists in Enumeration
val Partition, ValuesSet = Value
val Partition, ValuesSet, MinMax = Value
}

/** json4s doesn't serialize Enum by default */
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.flint.spark.skipping.minmax

import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy
import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy.SkippingKind.{MinMax, SkippingKind}

import org.apache.spark.sql.Column
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, EqualTo, Literal, Predicate}
import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateFunction, Max, Min}
import org.apache.spark.sql.functions.col

/**
* Skipping strategy based on min-max boundary of column values.
*/
case class MinMaxSkippingStrategy(
override val kind: SkippingKind = MinMax,
override val columnName: String,
override val columnType: String)
extends FlintSparkSkippingStrategy {

/** Column name in Flint index data. */
private def minColName = s"MinMax_${columnName}_0"
private def maxColName = s"MinMax_${columnName}_1"

override def outputSchema(): Map[String, String] =
Map(minColName -> columnType, maxColName -> columnType)

override def getAggregators: Seq[AggregateFunction] =
Seq(Min(col(columnName).expr), Max(col(columnName).expr))

override def rewritePredicate(predicate: Predicate): Option[Predicate] =
predicate.collect { case EqualTo(AttributeReference(`columnName`, _, _, _), value: Literal) =>
rewriteTo(col(minColName) <= value && col(maxColName) >= value)
}.headOption

// Convert a column to predicate
private def rewriteTo(col: Column): Predicate = col.expr.asInstanceOf[Predicate]
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ class FlintSparkSkippingIndexSuite
| CREATE TABLE $testTable
| (
| name STRING,
| age INT,
| address STRING
| )
| USING CSV
Expand All @@ -67,13 +68,13 @@ class FlintSparkSkippingIndexSuite
sql(s"""
| INSERT INTO $testTable
| PARTITION (year=2023, month=4)
| VALUES ('Hello', 'Seattle')
| VALUES ('Hello', 30, 'Seattle')
| """.stripMargin)

sql(s"""
| INSERT INTO $testTable
| PARTITION (year=2023, month=5)
| VALUES ('World', 'Portland')
| VALUES ('World', 25, 'Portland')
| """.stripMargin)
}

Expand All @@ -96,6 +97,7 @@ class FlintSparkSkippingIndexSuite
.onTable(testTable)
.addPartitions("year", "month")
.addValueSet("address")
.addMinMax("age")
.create()

val indexName = s"flint_${testTable}_skipping_index"
Expand All @@ -119,6 +121,11 @@ class FlintSparkSkippingIndexSuite
| "kind": "ValuesSet",
| "columnName": "address",
| "columnType": "string"
| },
| {
| "kind": "MinMax",
| "columnName": "age",
| "columnType": "int"
| }],
| "source": "$testTable"
| },
Expand All @@ -132,6 +139,12 @@ class FlintSparkSkippingIndexSuite
| "address": {
| "type": "keyword"
| },
| "MinMax_age_0": {
| "type": "integer"
| },
| "MinMax_age_1" : {
| "type": "integer"
| },
| "file_path": {
| "type": "keyword"
| }
Expand Down Expand Up @@ -268,6 +281,26 @@ class FlintSparkSkippingIndexSuite
hasIndexFilter(col("address") === "Seattle"))
}

test("can build min max skipping index and rewrite applicable query") {
flint
.skippingIndex()
.onTable(testTable)
.addMinMax("age")
.create()
flint.refreshIndex(testIndex, FULL)

val query = sql(s"""
| SELECT name
| FROM $testTable
| WHERE age = 25
|""".stripMargin)

checkAnswer(query, Row("World"))
query.queryExecution.executedPlan should
useFlintSparkSkippingFileIndex(
hasIndexFilter(col("MinMax_age_0") <= 25 && col("MinMax_age_1") >= 25))
}

test("should return empty if describe index not exist") {
flint.describeIndex("non-exist") shouldBe empty
}
Expand Down