Skip to content

Commit

Permalink
Cleanup RocksDB file tracking for previously uploaded files if files …
Browse files Browse the repository at this point in the history
…were deleted from local directory
  • Loading branch information
sahnib committed Feb 14, 2024
1 parent 8603ed5 commit 271be4a
Show file tree
Hide file tree
Showing 2 changed files with 119 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -518,16 +518,12 @@ class RocksDBFileManager(
s" DFS for version $version. $filesReused files reused without copying.")
versionToRocksDBFiles.put(version, immutableFiles)

// clean up deleted SST files from the localFilesToDfsFiles Map
val currentLocalFiles = localFiles.map(_.getName).toSet
val mappingsToClean = localFilesToDfsFiles.asScala
.keys
.filterNot(currentLocalFiles.contains)

mappingsToClean.foreach { f =>
logInfo(s"cleaning $f from the localFilesToDfsFiles map")
localFilesToDfsFiles.remove(f)
}
// Cleanup locally deleted files from the localFilesToDfsFiles map
// Locally, SST Files can be deleted due to RocksDB compaction. These files need
// to be removed rom the localFilesToDfsFiles map to ensure that if a older version
// regenerates them and overwrites the version.zip, SST files from the conflicting
// version (previously committed) are not reused.
removeLocallyDeletedSSTFilesFromDfsMapping(localFiles)

saveCheckpointMetrics = RocksDBFileManagerMetrics(
bytesCopied = bytesCopied,
Expand All @@ -545,8 +541,18 @@ class RocksDBFileManager(
private def loadImmutableFilesFromDfs(
immutableFiles: Seq[RocksDBImmutableFile], localDir: File): Unit = {
val requiredFileNameToFileDetails = immutableFiles.map(f => f.localFileName -> f).toMap

val localImmutableFiles = listRocksDBFiles(localDir)._1

// Cleanup locally deleted files from the localFilesToDfsFiles map
// Locally, SST Files can be deleted due to RocksDB compaction. These files need
// to be removed rom the localFilesToDfsFiles map to ensure that if a older version
// regenerates them and overwrites the version.zip, SST files from the conflicting
// version (previously committed) are not reused.
removeLocallyDeletedSSTFilesFromDfsMapping(localImmutableFiles)

// Delete unnecessary local immutable files
listRocksDBFiles(localDir)._1
localImmutableFiles
.foreach { existingFile =>
val requiredFile = requiredFileNameToFileDetails.get(existingFile.getName)
val prevDfsFile = localFilesToDfsFiles.asScala.get(existingFile.getName)
Expand Down Expand Up @@ -603,6 +609,19 @@ class RocksDBFileManager(
filesReused = filesReused)
}

private def removeLocallyDeletedSSTFilesFromDfsMapping(localFiles: Seq[File]): Unit = {
// clean up deleted SST files from the localFilesToDfsFiles Map
val currentLocalFiles = localFiles.map(_.getName).toSet
val mappingsToClean = localFilesToDfsFiles.asScala
.keys
.filterNot(currentLocalFiles.contains)

mappingsToClean.foreach { f =>
logInfo(s"cleaning $f from the localFilesToDfsFiles map")
localFilesToDfsFiles.remove(f)
}
}

/** Get the SST files required for a version from the version zip file in DFS */
private def getImmutableFilesFromVersionZip(version: Long): Seq[RocksDBImmutableFile] = {
Utils.deleteRecursively(localTempDir)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.spark.sql.execution.streaming.state
import java.io._
import java.nio.charset.Charset

import scala.collection.mutable
import scala.language.implicitConversions

import org.apache.commons.io.FileUtils
Expand Down Expand Up @@ -1863,6 +1864,88 @@ class RocksDBSuite extends AlsoTestWithChangelogCheckpointingEnabled with Shared
}
}

test("ensure local files deleted on filesystem" +
" are cleaned from dfs file mapping") {
def getSSTFiles(dir: File): Set[File] = {
val sstFiles = new mutable.HashSet[File]()
dir.listFiles().foreach { f =>
if (f.isDirectory) {
sstFiles ++= getSSTFiles(f)
} else {
if (f.getName.endsWith(".sst")) {
sstFiles.add(f)
}
}
}
sstFiles.toSet
}

def filterAndDeleteSSTFiles(dir: File, filesToKeep: Set[File]): Unit = {
dir.listFiles().foreach { f =>
if (f.isDirectory) {
filterAndDeleteSSTFiles(f, filesToKeep)
} else {
if (!filesToKeep.contains(f) && f.getName.endsWith(".sst")) {
logInfo(s"deleting ${f.getAbsolutePath} from local directory")
f.delete()
}
}
}
}

withTempDir { dir =>
withTempDir { localDir =>
val sqlConf = new SQLConf()
val dbConf = RocksDBConf(StateStoreConf(sqlConf))
logInfo(s"config set to ${dbConf.compactOnCommit}")
val hadoopConf = new Configuration()
val remoteDir = dir.getCanonicalPath
withDB(remoteDir = remoteDir,
conf = dbConf,
hadoopConf = hadoopConf,
localDir = localDir) { db =>
db.load(0)
db.put("a", "1")
db.put("b", "1")
db.commit()
db.doMaintenance()

// find all SST files written in version 1
val sstFiles = getSSTFiles(localDir)

// make more commits, this would generate more SST files and write
// them to remoteDir
for (version <- 1 to 10) {
db.load(version)
db.put("c", "1")
db.put("d", "1")
db.commit()
db.doMaintenance()
}

// clean the SST files committed after version 1 from local
// filesystem. This is similar to what a process like compaction
// where multiple L0 SST files can be merged into a single L1 file
filterAndDeleteSSTFiles(localDir, sstFiles)

// reload 2, and overwrite commit for version 3, this should not
// reuse any locally deleted files as they should be removed from the mapping
db.load(2)
db.put("e", "1")
db.put("f", "1")
db.commit()
db.doMaintenance()

// clean local state
db.load(0)

// reload version 3, should be successful
db.load(3)
}
}
}
}

private def sqlConf = SQLConf.get.clone()

private def dbConf = RocksDBConf(StateStoreConf(sqlConf))
Expand All @@ -1872,12 +1955,16 @@ class RocksDBSuite extends AlsoTestWithChangelogCheckpointingEnabled with Shared
version: Int = 0,
conf: RocksDBConf = dbConf,
hadoopConf: Configuration = new Configuration(),
useColumnFamilies: Boolean = false)(
useColumnFamilies: Boolean = false,
localDir: File = Utils.createTempDir())(
func: RocksDB => T): T = {
var db: RocksDB = null
try {
db = new RocksDB(
remoteDir, conf = conf, hadoopConf = hadoopConf,
remoteDir,
conf = conf,
localRootDir = localDir,
hadoopConf = hadoopConf,
loggingId = s"[Thread-${Thread.currentThread.getId}]",
useColumnFamilies = useColumnFamilies
)
Expand Down

0 comments on commit 271be4a

Please sign in to comment.