Skip to content

Commit

Permalink
Add check for manual refresh on incremental refresh index
Browse files Browse the repository at this point in the history
Signed-off-by: Chen Dai <daichen@amazon.com>
  • Loading branch information
dai-chen committed Jul 5, 2023
1 parent 21c57e9 commit 1f8dc3f
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 0 deletions.
4 changes: 4 additions & 0 deletions flint/docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,8 @@ ON <object>
WHERE <filter_predicate>
WITH (auto_refresh = (true|false))

REFRESH SKIPPING INDEX ON <object>

DESCRIBE SKIPPING INDEX ON <object>

DROP SKIPPING INDEX ON <object>
Expand All @@ -148,6 +150,8 @@ CREATE SKIPPING INDEX ON alb_logs
)
WHERE time > '2023-04-01 00:00:00'

REFRESH SKIPPING INDEX ON alb_logs

DESCRIBE SKIPPING INDEX ON alb_logs

DROP SKIPPING INDEX ON alb_logs
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,9 @@ class FlintSpark(val spark: SparkSession) {
}

mode match {
case FULL if isIncrementalRefreshing(indexName) =>
throw new IllegalStateException(
s"Index $indexName is incremental refreshing and cannot be manual refreshed")
case FULL =>
writeFlintIndex(
spark.read
Expand Down Expand Up @@ -145,6 +148,9 @@ class FlintSpark(val spark: SparkSession) {
}
}

private def isIncrementalRefreshing(indexName: String): Boolean =
spark.streams.active.exists(_.name == indexName)

// TODO: Remove all parsing logic below once Flint spec finalized and FlintMetadata strong typed
private def getSourceTableName(index: FlintSparkIndex): String = {
val json = parse(index.metadata().getContent)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,19 @@ class FlintSparkSkippingIndexITSuite
indexData should have size 2
}

test("should fail to manual refresh an incremental refreshing index") {
flint
.skippingIndex()
.onTable(testTable)
.addPartitions("year", "month")
.create()
flint.refreshIndex(testIndex, INCREMENTAL)

assertThrows[IllegalStateException] {
flint.refreshIndex(testIndex, FULL)
}
}

test("can have only 1 skipping index on a table") {
flint
.skippingIndex()
Expand Down

0 comments on commit 1f8dc3f

Please sign in to comment.