diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala index f86b683a9058b..18c769bed350c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala @@ -518,16 +518,12 @@ class RocksDBFileManager( s" DFS for version $version. $filesReused files reused without copying.") versionToRocksDBFiles.put(version, immutableFiles) - // clean up deleted SST files from the localFilesToDfsFiles Map - val currentLocalFiles = localFiles.map(_.getName).toSet - val mappingsToClean = localFilesToDfsFiles.asScala - .keys - .filterNot(currentLocalFiles.contains) - - mappingsToClean.foreach { f => - logInfo(s"cleaning $f from the localFilesToDfsFiles map") - localFilesToDfsFiles.remove(f) - } + // Cleanup locally deleted files from the localFilesToDfsFiles map + // Locally, SST Files can be deleted due to RocksDB compaction. These files need + // to be removed rom the localFilesToDfsFiles map to ensure that if a older version + // regenerates them and overwrites the version.zip, SST files from the conflicting + // version (previously committed) are not reused. + removeLocallyDeletedSSTFilesFromDfsMapping(localFiles) saveCheckpointMetrics = RocksDBFileManagerMetrics( bytesCopied = bytesCopied, @@ -545,8 +541,18 @@ class RocksDBFileManager( private def loadImmutableFilesFromDfs( immutableFiles: Seq[RocksDBImmutableFile], localDir: File): Unit = { val requiredFileNameToFileDetails = immutableFiles.map(f => f.localFileName -> f).toMap + + val localImmutableFiles = listRocksDBFiles(localDir)._1 + + // Cleanup locally deleted files from the localFilesToDfsFiles map + // Locally, SST Files can be deleted due to RocksDB compaction. These files need + // to be removed rom the localFilesToDfsFiles map to ensure that if a older version + // regenerates them and overwrites the version.zip, SST files from the conflicting + // version (previously committed) are not reused. + removeLocallyDeletedSSTFilesFromDfsMapping(localImmutableFiles) + // Delete unnecessary local immutable files - listRocksDBFiles(localDir)._1 + localImmutableFiles .foreach { existingFile => val requiredFile = requiredFileNameToFileDetails.get(existingFile.getName) val prevDfsFile = localFilesToDfsFiles.asScala.get(existingFile.getName) @@ -603,6 +609,19 @@ class RocksDBFileManager( filesReused = filesReused) } + private def removeLocallyDeletedSSTFilesFromDfsMapping(localFiles: Seq[File]): Unit = { + // clean up deleted SST files from the localFilesToDfsFiles Map + val currentLocalFiles = localFiles.map(_.getName).toSet + val mappingsToClean = localFilesToDfsFiles.asScala + .keys + .filterNot(currentLocalFiles.contains) + + mappingsToClean.foreach { f => + logInfo(s"cleaning $f from the localFilesToDfsFiles map") + localFilesToDfsFiles.remove(f) + } + } + /** Get the SST files required for a version from the version zip file in DFS */ private def getImmutableFilesFromVersionZip(version: Long): Seq[RocksDBImmutableFile] = { Utils.deleteRecursively(localTempDir) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala index d517f5869c480..738c7922664e3 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql.execution.streaming.state import java.io._ import java.nio.charset.Charset +import scala.collection.mutable import scala.language.implicitConversions import org.apache.commons.io.FileUtils @@ -1956,6 +1957,88 @@ class RocksDBSuite extends AlsoTestWithChangelogCheckpointingEnabled with Shared } } + test("ensure local files deleted on filesystem" + + " are cleaned from dfs file mapping") { + def getSSTFiles(dir: File): Set[File] = { + val sstFiles = new mutable.HashSet[File]() + dir.listFiles().foreach { f => + if (f.isDirectory) { + sstFiles ++= getSSTFiles(f) + } else { + if (f.getName.endsWith(".sst")) { + sstFiles.add(f) + } + } + } + sstFiles.toSet + } + + def filterAndDeleteSSTFiles(dir: File, filesToKeep: Set[File]): Unit = { + dir.listFiles().foreach { f => + if (f.isDirectory) { + filterAndDeleteSSTFiles(f, filesToKeep) + } else { + if (!filesToKeep.contains(f) && f.getName.endsWith(".sst")) { + logInfo(s"deleting ${f.getAbsolutePath} from local directory") + f.delete() + } + } + } + } + + withTempDir { dir => + withTempDir { localDir => + val sqlConf = new SQLConf() + val dbConf = RocksDBConf(StateStoreConf(sqlConf)) + logInfo(s"config set to ${dbConf.compactOnCommit}") + val hadoopConf = new Configuration() + val remoteDir = dir.getCanonicalPath + withDB(remoteDir = remoteDir, + conf = dbConf, + hadoopConf = hadoopConf, + localDir = localDir) { db => + db.load(0) + db.put("a", "1") + db.put("b", "1") + db.commit() + db.doMaintenance() + + // find all SST files written in version 1 + val sstFiles = getSSTFiles(localDir) + + // make more commits, this would generate more SST files and write + // them to remoteDir + for (version <- 1 to 10) { + db.load(version) + db.put("c", "1") + db.put("d", "1") + db.commit() + db.doMaintenance() + } + + // clean the SST files committed after version 1 from local + // filesystem. This is similar to what a process like compaction + // where multiple L0 SST files can be merged into a single L1 file + filterAndDeleteSSTFiles(localDir, sstFiles) + + // reload 2, and overwrite commit for version 3, this should not + // reuse any locally deleted files as they should be removed from the mapping + db.load(2) + db.put("e", "1") + db.put("f", "1") + db.commit() + db.doMaintenance() + + // clean local state + db.load(0) + + // reload version 3, should be successful + db.load(3) + } + } + } + } + private def sqlConf = SQLConf.get.clone() private def dbConf = RocksDBConf(StateStoreConf(sqlConf)) @@ -1965,12 +2048,16 @@ class RocksDBSuite extends AlsoTestWithChangelogCheckpointingEnabled with Shared version: Int = 0, conf: RocksDBConf = dbConf, hadoopConf: Configuration = new Configuration(), - useColumnFamilies: Boolean = false)( + useColumnFamilies: Boolean = false, + localDir: File = Utils.createTempDir())( func: RocksDB => T): T = { var db: RocksDB = null try { db = new RocksDB( - remoteDir, conf = conf, hadoopConf = hadoopConf, + remoteDir, + conf = conf, + localRootDir = localDir, + hadoopConf = hadoopConf, loggingId = s"[Thread-${Thread.currentThread.getId}]", useColumnFamilies = useColumnFamilies )