Skip to content

Commit

Permalink
Add IT
Browse files Browse the repository at this point in the history
Signed-off-by: Chen Dai <daichen@amazon.com>
  • Loading branch information
dai-chen committed Jul 7, 2023
1 parent 136de9f commit a710266
Show file tree
Hide file tree
Showing 5 changed files with 44 additions and 17 deletions.
1 change: 1 addition & 0 deletions flint/docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,7 @@ In the index mapping, the `_meta` and `properties`field stores meta and schema i
IMMEDIATE(true), WAIT_UNTIL(wait_for)]
- `spark.datasource.flint.read.scroll_size`: default value is 100.
- `spark.flint.optimizer.enabled`: default is true.
- `spark.flint.index.hybridscan.enabled`: default is false.

#### Data Type Mapping

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,12 +59,12 @@ case class FlintSparkSkippingFileIndex(
override def partitionSchema: StructType = baseFileIndex.partitionSchema

/*
* Left join source partitions and index data to keep unrefreshed source files:
* Left join source partitions and index data to keep unknown source files:
* Express the logic in SQL:
* SELECT left.file_path
* FROM partitions AS left
* LEFT OUTER JOIN indexScan AS right
* ON left.file_path = right.file_path
* LEFT JOIN indexScan AS right
* ON left.file_path = right.file_path
* WHERE right.file_path IS NULL
* OR [indexFilter]
*/
Expand All @@ -73,7 +73,7 @@ case class FlintSparkSkippingFileIndex(
import sparkSession.implicits._

partitions
.flatMap(_.files.map(f => f.getPath.toString))
.flatMap(_.files.map(f => f.getPath.toUri.toString))
.toDF(FILE_PATH_COLUMN)
.join(indexScan, Seq(FILE_PATH_COLUMN), "left")
.filter(isnull(indexScan(FILE_PATH_COLUMN)) || new Column(indexFilter))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import org.opensearch.flint.spark.FlintSparkExtensions
import org.apache.spark.sql.catalyst.expressions.CodegenObjectFactoryMode
import org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelation
import org.apache.spark.sql.flint.config.FlintConfigEntry
import org.apache.spark.sql.flint.config.FlintSparkConf.HYBRID_SCAN_ENABLED
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSparkSession

Expand All @@ -34,4 +35,13 @@ trait FlintSuite extends SharedSparkSession {
protected def setFlintSparkConf[T](config: FlintConfigEntry[T], value: Any): Unit = {
spark.conf.set(config.key, value.toString)
}

protected def withHybridScanEnabled(block: => Unit): Unit = {
setFlintSparkConf(HYBRID_SCAN_ENABLED, "true")
try {
block
} finally {
setFlintSparkConf(HYBRID_SCAN_ENABLED, "false")
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import org.apache.spark.sql.{Column, DataFrame, Row}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{Literal, Predicate}
import org.apache.spark.sql.execution.datasources.{FileIndex, PartitionDirectory}
import org.apache.spark.sql.flint.config.FlintSparkConf.HYBRID_SCAN_ENABLED
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.types._

Expand Down Expand Up @@ -46,7 +45,7 @@ class FlintSparkSkippingFileIndexSuite extends FlintSuite with Matchers {
.shouldScanSourceFiles(Map("partition-1" -> Seq("file-1"), "partition-2" -> Seq("file-3")))
}

test("should skip unrefreshed source files by default") {
test("should skip unknown source files by default") {
assertFlintFileIndex()
.withSourceFiles(Map(partition1))
.withIndexData(
Expand All @@ -57,7 +56,7 @@ class FlintSparkSkippingFileIndexSuite extends FlintSuite with Matchers {
.shouldScanSourceFiles(Map("partition-1" -> Seq("file-1")))
}

test("should not skip unrefreshed source files in hybrid-scan mode") {
test("should not skip unknown source files in hybrid-scan mode") {
withHybridScanEnabled {
assertFlintFileIndex()
.withSourceFiles(Map(partition1))
Expand All @@ -70,7 +69,7 @@ class FlintSparkSkippingFileIndexSuite extends FlintSuite with Matchers {
}
}

test("should not skip unrefreshed source files of multiple partitions in hybrid-scan mode") {
test("should not skip unknown source files of multiple partitions in hybrid-scan mode") {
withHybridScanEnabled {
assertFlintFileIndex()
.withSourceFiles(Map(partition1, partition2))
Expand All @@ -84,15 +83,6 @@ class FlintSparkSkippingFileIndexSuite extends FlintSuite with Matchers {
}
}

private def withHybridScanEnabled(block: => Unit): Unit = {
setFlintSparkConf(HYBRID_SCAN_ENABLED, "true")
try {
block
} finally {
setFlintSparkConf(HYBRID_SCAN_ENABLED, "false")
}
}

private def assertFlintFileIndex(): AssertionHelper = {
new AssertionHelper
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,32 @@ class FlintSparkSkippingIndexSuite
hasIndexFilter(col("MinMax_age_0") <= 25 && col("MinMax_age_1") >= 25))
}

test("should rewrite applicable query to scan latest source files in hybrid scan mode") {
flint
.skippingIndex()
.onTable(testTable)
.addPartitions("month")
.create()
flint.refreshIndex(testIndex, FULL)

// Generate a new source file which is not in index data
sql(s"""
| INSERT INTO $testTable
| PARTITION (year=2023, month=4)
| VALUES ('Hello', 35, 'Vancouver')
| """.stripMargin)

withHybridScanEnabled {
val query = sql(s"""
| SELECT address
| FROM $testTable
| WHERE month = 4
|""".stripMargin)

checkAnswer(query, Seq(Row("Seattle"), Row("Vancouver")))
}
}

test("should return empty if describe index not exist") {
flint.describeIndex("non-exist") shouldBe empty
}
Expand Down

0 comments on commit a710266

Please sign in to comment.