Skip to content

Commit

Permalink
Rename suite; add another test; add impl comment
Browse files Browse the repository at this point in the history
  • Loading branch information
scottsand-db committed Nov 28, 2023
1 parent 09edb43 commit 3059006
Showing 1 changed file with 39 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import org.apache.spark.sql.delta.test.DeltaSQLCommandTest
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.test.SharedSparkSession

class SnapshotManagerSuite extends QueryTest
class LogReplayMetricsSuite extends QueryTest
with SharedSparkSession
with DeltaSQLCommandTest {

Expand Down Expand Up @@ -58,45 +58,63 @@ class SnapshotManagerSuite extends QueryTest
)
}

private def appendCommit(path: String): Unit =
spark.range(10).write.format("delta").mode("append").save(path)

///////////
// Tests //
///////////

test("snapshot hint: no hint, no checkpoint, reads all files") {
test("no hint, no checkpoint, reads all files") {
withTempDirAndTableClient { (dir, tc) =>
val path = dir.getAbsolutePath

for (_ <- 0 to 9) {
spark.range(10).write.format("delta").mode("append").save(path)
}
for (_ <- 0 to 9) { appendCommit(path) }

val table = Table.forPath(tc, path)
loadSnapshotAssertMetrics(tc, table, 9L to 0L by -1L, Nil)
}
}

test("snapshot hint: no hint, existing checkpoint, reads all files up to that checkpoint") {
test("no hint, existing checkpoint, reads all files up to that checkpoint") {
withTempDirAndTableClient { (dir, tc) =>
val path = dir.getAbsolutePath

for (_ <- 0 to 14) { // create 00.json to 14.json; 10.checkpoint is auto created
spark.range(10).write.format("delta").mode("append").save(path)
}
for (_ <- 0 to 14) { appendCommit(path) }

val table = Table.forPath(tc, path)
loadSnapshotAssertMetrics(tc, table, 14L to 11L by -1L, Seq(10))
}
}

test("snapshot hint: hint with no new commits, should read no files") {
test("no hint, existing checkpoint, newer P & M update, reads up to P & M commit") {
withTempDirAndTableClient { (dir, tc) =>
val path = dir.getAbsolutePath

def appendCommit(): Unit =
spark.range(10).write.format("delta").mode("append").save(path)
for (_ <- 0 to 12) { appendCommit(path) }

// v13 changes the protocol (which also updates the metadata)
spark.sql(s"""
|ALTER TABLE delta.`$path` SET TBLPROPERTIES (
| 'delta.minReaderVersion' = '2',
| 'delta.minWriterVersion' = '5',
| 'delta.columnMapping.mode' = 'name'
|)
|""".stripMargin)

for (_ <- 14 to 16) { appendCommit(path) }

val table = Table.forPath(tc, path)
loadSnapshotAssertMetrics(tc, table, 16L to 13L by -1L, Nil)
}
}

test("hint with no new commits, should read no files") {
withTempDirAndTableClient { (dir, tc) =>
val path = dir.getAbsolutePath

for (_ <- 0 to 14) {
appendCommit()
appendCommit(path)
}

val table = Table.forPath(tc, path)
Expand All @@ -109,14 +127,11 @@ class SnapshotManagerSuite extends QueryTest
}
}

test("snapshot hint: hint with no P or M updates") {
test("hint with no P or M updates") {
withTempDirAndTableClient { (dir, tc) =>
val path = dir.getAbsolutePath

def appendCommit(): Unit =
spark.range(10).write.format("delta").mode("append").save(path)

for (_ <- 0 to 14) { appendCommit() }
for (_ <- 0 to 14) { appendCommit(path) }

val table = Table.forPath(tc, path)

Expand All @@ -125,32 +140,29 @@ class SnapshotManagerSuite extends QueryTest
// A hint is now saved at v14

// Case: only one version change
appendCommit() // v15
appendCommit(path) // v15
loadSnapshotAssertMetrics(tc, table, Seq(15), Nil)

// A hint is now saved at v15

// Case: several version changes
for (_ <- 16 to 19) { appendCommit() }
for (_ <- 16 to 19) { appendCommit(path) }
loadSnapshotAssertMetrics(tc, table, 19L to 16L by -1L, Nil)

// A hint is now saved at v19

// Case: [delta-io/delta#2262] [Fix me!] Read the entire checkpoint at v20, even if v20.json
// and v19 hint are available
appendCommit() // v20
appendCommit(path) // v20
loadSnapshotAssertMetrics(tc, table, Nil, Seq(20))
}
}

test("snapshot hint: hint with a P or M update") {
test("hint with a P or M update") {
withTempDirAndTableClient { (dir, tc) =>
val path = dir.getAbsolutePath

def appendCommit(): Unit =
spark.range(10).write.format("delta").mode("append").save(path)

for (_ <- 0 to 3) { appendCommit() }
for (_ <- 0 to 3) { appendCommit(path) }

val table = Table.forPath(tc, path)

Expand Down Expand Up @@ -221,6 +233,7 @@ trait FileReadMetrics { self: FileHandler =>
if (FileNames.isCommitFile(path.getName) || FileNames.isCheckpointFile(path.getName)) {
val version = FileNames.getFileVersion(path)

// We may split json/parquet reads, so don't record the same file multiple times
if (!versionsRead.contains(version)) {
versionsRead += version
}
Expand Down

0 comments on commit 3059006

Please sign in to comment.