Skip to content

Commit

Permalink
Add value set skipping index (#1708)
Browse files Browse the repository at this point in the history
* Add value list skipping index

Signed-off-by: Chen Dai <daichen@amazon.com>

* Add skipping kind enum and IT

Signed-off-by: Chen Dai <daichen@amazon.com>

* Add more comments for review

Signed-off-by: Chen Dai <daichen@amazon.com>

* Add user doc

Signed-off-by: Chen Dai <daichen@amazon.com>

* Update doc with more details on skipping index

Signed-off-by: Chen Dai <daichen@amazon.com>

* Update doc to fix markdown format

Signed-off-by: Chen Dai <daichen@amazon.com>

* Remove unused imports

Signed-off-by: Chen Dai <daichen@amazon.com>

* Addressed PR comments

Signed-off-by: Chen Dai <daichen@amazon.com>

---------

Signed-off-by: Chen Dai <daichen@amazon.com>
  • Loading branch information
dai-chen authored Jun 13, 2023
1 parent 9badf77 commit 43d3566
Show file tree
Hide file tree
Showing 8 changed files with 224 additions and 62 deletions.
33 changes: 19 additions & 14 deletions flint/docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,17 @@ A Flint index is ...
### Feature Highlights

- Skipping Index
- Partition index
- MinMax index
- ValueList index
- BloomFilter index
- Covering Index
- Materialized View
- Partition: skip data scan by maintaining and filtering partitioned column value per file.
- MinMax: skip data scan by maintaining lower and upper bound of the indexed column per file.
- ValueSet: skip data scan by building a unique value set of the indexed column per file.

Please see the following example in which Index Building Logic and Query Rewrite Logic column shows the basic idea behind each skipping index implementation.

| Skipping Index | Create Index Statement (TBD) | Index Building Logic | Query Rewrite Logic |
|----------------|-------------------------------------------------------------------------------------------------------------------------------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| Partition | CREATE SKIPPING INDEX<br>ON alb_logs<br>FOR COLUMNS (<br>&nbsp;&nbsp;year PARTITION,<br>&nbsp;&nbsp;month PARTITION,<br>&nbsp;&nbsp;day PARTITION,<br>&nbsp;&nbsp;hour PARTITION<br>) | INSERT INTO flint_alb_logs_skipping_index<br>SELECT<br>&nbsp;&nbsp;FIRST(year) AS year,<br>&nbsp;&nbsp;FIRST(month) AS month,<br>&nbsp;&nbsp;FIRST(day) AS day,<br>&nbsp;&nbsp;FIRST(hour) AS hour,<br>&nbsp;&nbsp;input_file_name() AS file_path<br>FROM alb_logs<br>GROUP BY<br>&nbsp;&nbsp;input_file_name() | SELECT *<br>FROM alb_logs<br>WHERE year = 2023 AND month = 4<br>=><br>SELECT *<br>FROM alb_logs (input_files = <br>&nbsp;&nbsp;SELECT file_path<br>&nbsp;&nbsp;FROM flint_alb_logs_skipping_index<br>&nbsp;&nbsp;WHERE year = 2023 AND month = 4<br>)<br>WHERE year = 2023 AND month = 4 |
| ValueSet | CREATE SKIPPING INDEX<br>ON alb_logs<br>FOR COLUMNS (<br>&nbsp;&nbsp;elb_status_code VALUE_LIST<br>) | INSERT INTO flint_alb_logs_skipping_index<br>SELECT<br>&nbsp;&nbsp;COLLECT_SET(elb_status_code) AS elb_status_code,<br>&nbsp;&nbsp;input_file_name() AS file_path<br>FROM alb_logs<br>GROUP BY<br>&nbsp;&nbsp;input_file_name() | SELECT *<br>FROM alb_logs<br>WHERE elb_status_code = 404<br>=><br>SELECT *<br>FROM alb_logs (input_files = <br>&nbsp;&nbsp;SELECT file_path<br>&nbsp;&nbsp;FROM flint_alb_logs_skipping_index<br>&nbsp;&nbsp;WHERE ARRAY_CONTAINS(elb_status_code, 404)<br>)<br>WHERE elb_status_code = 404 |
| Min-Max | CREATE SKIPPING INDEX<br>ON alb_logs<br>FOR COLUMNS (<br>&nbsp;&nbsp;request_processing_time MIN_MAX<br>) | INSERT INTO flint_alb_logs_skipping_index<br>SELECT<br>&nbsp;&nbsp;MIN(request_processing_time) AS request_processing_time_min,<br>&nbsp;&nbsp;MAX(request_processing_time) AS request_processing_time_max,<br>&nbsp;&nbsp;input_file_name() AS file_path<br>FROM alb_logs<br>GROUP BY<br>&nbsp;&nbsp;input_file_name() | SELECT *<br>FROM alb_logs<br>WHERE request_processing_time = 100<br>=><br>SELECT *<br>FROM alb_logs (input_files = <br> SELECT file_path<br>&nbsp;&nbsp;FROM flint_alb_logs_skipping_index<br>&nbsp;&nbsp;WHERE request_processing_time_min <= 100<br>&nbsp;&nbsp;&nbsp;&nbsp;AND 100 <= request_processing_time_max<br>)<br>WHERE request_processing_time = 100

### Flint Index Specification

Expand Down Expand Up @@ -130,11 +135,11 @@ DROP SKIPPING INDEX ON <object>
Skipping index type:

```sql
<index_type> ::= { <bloom_filter>, <min_max>, <value_list> }
<index_type> ::= { <bloom_filter>, <min_max>, <value_set> }

<bloom_filter> ::= BLOOM_FILTER( bitCount, numOfHashFunctions ) #TBD
<min_max> ::= MIN_MAX
<value_list> ::= VALUE_LIST
<value_set> ::= VALUE_SET
```

Example:
Expand All @@ -143,7 +148,7 @@ Example:
CREATE SKIPPING INDEX ON alb_logs
FOR COLUMNS (
client_ip BLOOM_FILTER,
elb_status VALUE_LIST
elb_status_code VALUE_SET
)
WHERE time > '2023-04-01 00:00:00'

Expand All @@ -168,12 +173,12 @@ In the index mapping, the `_meta` and `properties`field stores meta and schema i
"properties": {
"indexedColumns": [
{
"kind": "partition",
"kind": "Partition",
"columnName": "year",
"columnType": "int"
},
{
"kind": "value_list",
"kind": "ValuesSet",
"columnName": "elb_status_code",
"columnType": "int"
}
Expand Down Expand Up @@ -248,9 +253,9 @@ val flint = new FlintSpark(spark)
flint.skippingIndex()
.onTable("alb_logs")
.filterBy("time > 2023-04-01 00:00:00")
.addPartitionIndex("year", "month", "day")
.addValueListIndex("elb_status_code")
.addBloomFilterIndex("client_ip")
.addPartitions("year", "month", "day")
.addValueSet("elb_status_code")
.addBloomFilter("client_ip")
.create()

flint.refresh("flint_alb_logs_skipping_index", FULL)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,10 @@ import org.opensearch.flint.spark.FlintSpark._
import org.opensearch.flint.spark.FlintSpark.RefreshMode.{FULL, INCREMENTAL, RefreshMode}
import org.opensearch.flint.spark.skipping.{FlintSparkSkippingIndex, FlintSparkSkippingStrategy}
import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex.SKIPPING_INDEX_TYPE
import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy.{SkippingKind, SkippingKindSerializer}
import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy.SkippingKind.{Partition, ValuesSet}
import org.opensearch.flint.spark.skipping.partition.PartitionSkippingStrategy
import org.opensearch.flint.spark.skipping.valueset.ValueSetSkippingStrategy

import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.SaveMode._
Expand All @@ -32,7 +35,7 @@ class FlintSpark(val spark: SparkSession) {
private val flintClient: FlintClient = FlintClientBuilder.build(FlintSparkConf(spark.conf))

/** Required by json4s parse function */
implicit val formats: Formats = Serialization.formats(NoTypeHints)
implicit val formats: Formats = Serialization.formats(NoTypeHints) + SkippingKindSerializer

/**
* Create index builder for creating index with fluent API.
Expand Down Expand Up @@ -153,8 +156,6 @@ class FlintSpark(val spark: SparkSession) {
*
*/
private def deserialize(metadata: FlintMetadata): FlintSparkIndex = {
implicit val formats: Formats = Serialization.formats(NoTypeHints)

val meta = parse(metadata.getContent) \ "_meta"
val tableName = (meta \ "source").extract[String]
val indexType = (meta \ "kind").extract[String]
Expand All @@ -163,13 +164,15 @@ class FlintSpark(val spark: SparkSession) {
indexType match {
case SKIPPING_INDEX_TYPE =>
val strategies = indexedColumns.arr.map { colInfo =>
val skippingType = (colInfo \ "kind").extract[String]
val skippingKind = SkippingKind.withName((colInfo \ "kind").extract[String])
val columnName = (colInfo \ "columnName").extract[String]
val columnType = (colInfo \ "columnType").extract[String]

skippingType match {
case "partition" =>
new PartitionSkippingStrategy(columnName = columnName, columnType = columnType)
skippingKind match {
case Partition =>
PartitionSkippingStrategy(columnName = columnName, columnType = columnType)
case ValuesSet =>
ValueSetSkippingStrategy(columnName = columnName, columnType = columnType)
case other =>
throw new IllegalStateException(s"Unknown skipping strategy: $other")
}
Expand Down Expand Up @@ -227,25 +230,51 @@ object FlintSpark {
* @return
* index builder
*/
def addPartitionIndex(colNames: String*): IndexBuilder = {
def addPartitions(colNames: String*): IndexBuilder = {
require(tableName.nonEmpty, "table name cannot be empty")

colNames
.map(colName =>
allColumns.getOrElse(
colName,
throw new IllegalArgumentException(s"Column $colName does not exist")))
.map(col =>
new PartitionSkippingStrategy(columnName = col.name, columnType = col.dataType))
.foreach(indexedCol => indexedColumns = indexedColumns :+ indexedCol)
.map(findColumn)
.map(col => PartitionSkippingStrategy(columnName = col.name, columnType = col.dataType))
.foreach(addIndexedColumn)
this
}

/**
* Create index.
* Add value set skipping indexed column.
*
* @param colName
* indexed column name
* @return
* index builder
*/
def create(): Unit = {
def addValueSet(colName: String): IndexBuilder = {
require(tableName.nonEmpty, "table name cannot be empty")

val col = findColumn(colName)
addIndexedColumn(
ValueSetSkippingStrategy(columnName = col.name, columnType = col.dataType))
this
}

/**
* Create index.
*/
def create(): Unit = {
flint.createIndex(new FlintSparkSkippingIndex(tableName, indexedColumns))
}

private def findColumn(colName: String): Column =
allColumns.getOrElse(
colName,
throw new IllegalArgumentException(s"Column $colName does not exist"))

private def addIndexedColumn(indexedCol: FlintSparkSkippingStrategy): Unit = {
require(
indexedColumns.forall(_.columnName != indexedCol.columnName),
s"${indexedCol.columnName} is already indexed")

indexedColumns = indexedColumns :+ indexedCol
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import org.apache.spark.sql.types.StructType
* @param filterByIndex
* pushed down filtering on index data
*/
class FlintSparkSkippingFileIndex(baseFileIndex: FileIndex, filterByIndex: DataFrame)
case class FlintSparkSkippingFileIndex(baseFileIndex: FileIndex, filterByIndex: DataFrame)
extends FileIndex {

override def listFiles(
Expand All @@ -32,7 +32,6 @@ class FlintSparkSkippingFileIndex(baseFileIndex: FileIndex, filterByIndex: DataF
.map(_.getString(0))
.toSet

// TODO: figure out if list file call can be avoided
val partitions = baseFileIndex.listFiles(partitionFilters, dataFilters)
partitions
.map(p => p.copy(files = p.files.filter(f => isFileNotSkipped(selectedFiles, f))))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import org.json4s.native.Serialization
import org.opensearch.flint.core.metadata.FlintMetadata
import org.opensearch.flint.spark.FlintSparkIndex
import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex.{getSkippingIndexName, FILE_PATH_COLUMN, SKIPPING_INDEX_TYPE}
import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy.SkippingKindSerializer

import org.apache.spark.sql.{Column, DataFrame}
import org.apache.spark.sql.catalyst.dsl.expressions.DslExpression
Expand All @@ -30,7 +31,7 @@ class FlintSparkSkippingIndex(
extends FlintSparkIndex {

/** Required by json4s write function */
implicit val formats: Formats = Serialization.formats(NoTypeHints)
implicit val formats: Formats = Serialization.formats(NoTypeHints) + SkippingKindSerializer

/** Skipping index type */
override val kind: String = SKIPPING_INDEX_TYPE
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@

package org.opensearch.flint.spark.skipping

import org.json4s.CustomSerializer
import org.json4s.JsonAST.JString
import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy.SkippingKind.SkippingKind

import org.apache.spark.sql.catalyst.expressions.Predicate
import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateFunction

Expand All @@ -16,7 +20,7 @@ trait FlintSparkSkippingStrategy {
/**
* Skipping strategy kind.
*/
val kind: String
val kind: SkippingKind

/**
* Indexed column name.
Expand All @@ -26,7 +30,6 @@ trait FlintSparkSkippingStrategy {
/**
* Indexed column Spark SQL type.
*/
@transient
val columnType: String

/**
Expand All @@ -52,3 +55,27 @@ trait FlintSparkSkippingStrategy {
*/
def rewritePredicate(predicate: Predicate): Option[Predicate]
}

object FlintSparkSkippingStrategy {

/**
* Skipping kind enum class.
*/
object SkippingKind extends Enumeration {
type SkippingKind = Value

// Use Value[s]Set because ValueSet already exists in Enumeration
val Partition, ValuesSet = Value
}

/** json4s doesn't serialize Enum by default */
object SkippingKindSerializer
extends CustomSerializer[SkippingKind](_ =>
(
{ case JString(value) =>
SkippingKind.withName(value)
},
{ case kind: SkippingKind =>
JString(kind.toString)
}))
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,17 @@
package org.opensearch.flint.spark.skipping.partition

import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy
import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy.SkippingKind.{Partition, SkippingKind}

import org.apache.spark.sql.Column
import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, EqualTo, Literal, Predicate}
import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateFunction, First}
import org.apache.spark.sql.functions.col

/**
* Skipping strategy for partitioned columns of source table.
*/
class PartitionSkippingStrategy(
override val kind: String = "partition",
case class PartitionSkippingStrategy(
override val kind: SkippingKind = Partition,
override val columnName: String,
override val columnType: String)
extends FlintSparkSkippingStrategy {
Expand All @@ -26,14 +26,13 @@ class PartitionSkippingStrategy(
}

override def getAggregators: Seq[AggregateFunction] = {
Seq(First(new Column(columnName).expr, ignoreNulls = true))
Seq(First(col(columnName).expr, ignoreNulls = true))
}

override def rewritePredicate(predicate: Predicate): Option[Predicate] = {
// Column has same name in index data, so just rewrite to the same equation
predicate.collect {
case EqualTo(AttributeReference(`columnName`, _, _, _), value: Literal) =>
EqualTo(UnresolvedAttribute(columnName), value)
predicate.collect { case EqualTo(AttributeReference(`columnName`, _, _, _), value: Literal) =>
EqualTo(col(columnName).expr, value)
}.headOption
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.flint.spark.skipping.valueset

import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy
import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy.SkippingKind.{SkippingKind, ValuesSet}

import org.apache.spark.sql.catalyst.expressions.{AttributeReference, EqualTo, Literal, Predicate}
import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateFunction, CollectSet}
import org.apache.spark.sql.functions.col

/**
* Skipping strategy based on unique column value set.
*/
case class ValueSetSkippingStrategy(
override val kind: SkippingKind = ValuesSet,
override val columnName: String,
override val columnType: String)
extends FlintSparkSkippingStrategy {

override def outputSchema(): Map[String, String] =
Map(columnName -> columnType)

override def getAggregators: Seq[AggregateFunction] =
Seq(CollectSet(col(columnName).expr))

override def rewritePredicate(predicate: Predicate): Option[Predicate] = {
/*
* This is supposed to be rewritten to ARRAY_CONTAINS(columName, value).
* However, due to push down limitation in Spark, we keep the equation.
*/
predicate.collect { case EqualTo(AttributeReference(`columnName`, _, _, _), value: Literal) =>
EqualTo(col(columnName).expr, value)
}.headOption
}
}
Loading

0 comments on commit 43d3566

Please sign in to comment.