From 8405c9b57a1f4f94f8bb4d8ed5cb11e071edeb7e Mon Sep 17 00:00:00 2001 From: Bhuwan Sahni Date: Thu, 17 Oct 2024 15:26:30 +0900 Subject: [PATCH] [SPARK-49770][SS][ROCKSDB HARDENING] Improve RocksDB SST file mapping management, and fix issue with reloading same version with existing snapshot ### What changes were proposed in this pull request? Currently, we have a scenario where if a version X is loaded (and there is an existing snapshot with version X), Spark will reuse the SST files from the existing Snapshot resulting in a VersionID Mismatch error. This PR fixes this issue, and simplifies RocksDB state management. The change eliminates the majority of shared state between Task thread and Maintenance thread, simplifying the implementation. With this change, the task thread is now solely responsible for keeping track of the local files to DFS file mapping, the maintenance thread will not access this mapping. The DFS file names are generated at `commit()` - the generated snapshot, and the mapping containing the new SST files (with their generated DFS names) is handed over to the maintenance thread. The latest generated snapshot will be appended to a ConcurrentLinkedQueue (named snapshotsToUploadQueue). The maintenance thread polls from the snapshotsToUploadQueue repeatedly until its empty. For the last snapshot polled out of the queue, the maintenance thread will upload the new SST files. The maintenance thread will also clear all removed snapshot objects from the disk. ### Why are the changes needed? These changes fix an issue where Spark Streaming fails with RocksDB VersionIdMismatch error if there is a existing snapshot (not yet uploaded) for RocksDB version being loaded. In this scenario, SST files from existing snapshot are reused resulting in a versionId Mismatch error. In short, these changes: 1. Remove shared fileMapping between RocksDB Maintenance thread and task thread, simplifying the file mapping logic. The file Mapping is only modified from task thread after this change. 2. Fixes the issue where RocksDB SST files from current snapshot (with same version) are reused, resulting in RocksDB VersionId Mismatch. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? 1. All existing testcases pass. 2. Added new testcases suggested in https://github.com/apache/spark/pull/47850/files, and ensure they pass with these changes. ### Was this patch authored or co-authored using generative AI tooling? No Closes #47875 from sahnib/master. Lead-authored-by: Bhuwan Sahni Co-authored-by: micheal-o Co-authored-by: Bhuwan Sahni Signed-off-by: Jungtaek Lim --- .../execution/streaming/state/RocksDB.scala | 336 ++++++++++++------ .../streaming/state/RocksDBFileManager.scala | 195 ++++------ .../streaming/state/RocksDBSuite.scala | 155 +++++--- 3 files changed, 405 insertions(+), 281 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala index c7f8434e5345b..99f8e7b8f36e6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala @@ -19,12 +19,13 @@ package org.apache.spark.sql.execution.streaming.state import java.io.File import java.util.Locale -import java.util.concurrent.{ConcurrentHashMap, TimeUnit} +import java.util.Set +import java.util.UUID +import java.util.concurrent.{ConcurrentHashMap, ConcurrentLinkedQueue, TimeUnit} import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger} import javax.annotation.concurrent.GuardedBy import scala.collection.{mutable, Map} -import scala.collection.mutable.ListBuffer import scala.jdk.CollectionConverters.{ConcurrentMapHasAsScala, MapHasAsJava} import scala.ref.WeakReference import scala.util.Try @@ -51,7 +52,6 @@ case object RollbackStore extends RocksDBOpType("rollback_store") case object CloseStore extends RocksDBOpType("close_store") case object ReportStoreMetrics extends RocksDBOpType("report_store_metrics") case object StoreTaskCompletionListener extends RocksDBOpType("store_task_completion_listener") -case object StoreMaintenance extends RocksDBOpType("store_maintenance") /** * Class representing a RocksDB instance that checkpoints version of data to DFS. @@ -74,21 +74,22 @@ class RocksDB( loggingId: String = "", useColumnFamilies: Boolean = false) extends Logging { + import RocksDB._ + case class RocksDBSnapshot( checkpointDir: File, version: Long, numKeys: Long, - capturedFileMappings: RocksDBFileMappings, columnFamilyMapping: Map[String, Short], - maxColumnFamilyId: Short) { + maxColumnFamilyId: Short, + dfsFileSuffix: String, + fileMapping: Map[String, RocksDBSnapshotFile]) { def close(): Unit = { silentDeleteRecursively(checkpointDir, s"Free up local checkpoint of snapshot $version") } } - @volatile private var latestSnapshot: Option[RocksDBSnapshot] = None @volatile private var lastSnapshotVersion = 0L - private val oldSnapshots = new ListBuffer[RocksDBSnapshot] RocksDBLoader.loadLibrary() @@ -150,6 +151,7 @@ class RocksDB( hadoopConf, conf.compressionCodec, loggingId = loggingId) private val byteArrayPair = new ByteArrayPair() private val commitLatencyMs = new mutable.HashMap[String, Long]() + private val acquireLock = new Object @volatile private var db: NativeRocksDB = _ @@ -249,6 +251,16 @@ class RocksDB( } } + // Mapping of local SST files to DFS files for file reuse. + // This mapping should only be updated using the Task thread - at version load and commit time. + // If same mapping instance is updated from different threads, + // it will result in undefined behavior (and most likely incorrect mapping state). + @GuardedBy("acquireLock") + private val rocksDBFileMapping: RocksDBFileMapping = new RocksDBFileMapping() + + // We send snapshots that needs to be uploaded by the maintenance thread to this queue + private val snapshotsToUploadQueue = new ConcurrentLinkedQueue[RocksDBSnapshot]() + /** * Load the given version of data in a native RocksDB instance. * Note that this will copy all the necessary file from DFS to local disk as needed, @@ -262,12 +274,15 @@ class RocksDB( try { if (loadedVersion != version) { closeDB(ignoreException = false) - // deep copy is needed to avoid race condition - // between maintenance and task threads - fileManager.copyFileMapping() val latestSnapshotVersion = fileManager.getLatestSnapshotVersion(version) - val metadata = fileManager.loadCheckpointFromDfs(latestSnapshotVersion, workingDir) + rocksDBFileMapping.currentVersion = latestSnapshotVersion + val metadata = fileManager.loadCheckpointFromDfs(latestSnapshotVersion, + workingDir, rocksDBFileMapping) loadedVersion = latestSnapshotVersion + + // reset the last snapshot version to the latest available snapshot version + lastSnapshotVersion = latestSnapshotVersion + // Initialize maxVersion upon successful load from DFS fileManager.setMaxSeenVersion(version) @@ -279,21 +294,7 @@ class RocksDB( metadata.maxColumnFamilyId.foreach { maxId => maxColumnFamilyId.set(maxId) } - // reset last snapshot version - if (lastSnapshotVersion > latestSnapshotVersion) { - // discard any newer snapshots - synchronized { - if (latestSnapshot.isDefined) { - oldSnapshots += latestSnapshot.get - latestSnapshot = None - } - } - } - - // reset the last snapshot version to the latest available snapshot version - lastSnapshotVersion = latestSnapshotVersion openDB() - numKeysOnWritingVersion = if (!conf.trackTotalNumberOfRows) { // we don't track the total number of rows - discard the number being track -1L @@ -369,15 +370,11 @@ class RocksDB( */ private def replayFromCheckpoint(snapshotVersion: Long, endVersion: Long): Any = { closeDB() - val metadata = fileManager.loadCheckpointFromDfs(snapshotVersion, workingDir) + rocksDBFileMapping.currentVersion = snapshotVersion + val metadata = fileManager.loadCheckpointFromDfs(snapshotVersion, + workingDir, rocksDBFileMapping) loadedVersion = snapshotVersion - - // reset last snapshot version - if (lastSnapshotVersion > snapshotVersion) { - // discard any newer snapshots - lastSnapshotVersion = 0L - latestSnapshot = None - } + lastSnapshotVersion = snapshotVersion openDB() numKeysOnWritingVersion = if (!conf.trackTotalNumberOfRows) { @@ -583,12 +580,13 @@ class RocksDB( def commit(): Long = { val newVersion = loadedVersion + 1 try { - logInfo(log"Flushing updates for ${MDC(LogKeys.VERSION_NUM, newVersion)}") var compactTimeMs = 0L var flushTimeMs = 0L var checkpointTimeMs = 0L + var snapshot: Option[RocksDBSnapshot] = None + if (shouldCreateSnapshot() || shouldForceSnapshot.get()) { // Need to flush the change to disk before creating a checkpoint // because rocksdb wal is disabled. @@ -620,19 +618,9 @@ class RocksDB( // inside the uploadSnapshot() called below. // If changelog checkpointing is enabled, snapshot will be uploaded asynchronously // during state store maintenance. - synchronized { - if (latestSnapshot.isDefined) { - oldSnapshots += latestSnapshot.get - } - latestSnapshot = Some( - RocksDBSnapshot(checkpointDir, - newVersion, - numKeysOnWritingVersion, - fileManager.captureFileMapReference(), - colFamilyNameToIdMap.asScala.toMap, - maxColumnFamilyId.get().toShort)) - lastSnapshotVersion = newVersion - } + snapshot = Some(createSnapshot(checkpointDir, newVersion, + colFamilyNameToIdMap.asScala.toMap, maxColumnFamilyId.get().toShort)) + lastSnapshotVersion = newVersion } } @@ -642,8 +630,16 @@ class RocksDB( // If we have changed the columnFamilyId mapping, we have set a new // snapshot and need to upload this to the DFS even if changelog checkpointing // is enabled. + var isUploaded = false if (shouldForceSnapshot.get()) { - uploadSnapshot() + assert(snapshot.isDefined) + fileManagerMetrics = uploadSnapshot( + snapshot.get, + fileManager, + rocksDBFileMapping.snapshotsPendingUpload, + loggingId + ) + isUploaded = true shouldForceSnapshot.set(false) } @@ -651,12 +647,21 @@ class RocksDB( try { assert(changelogWriter.isDefined) changelogWriter.foreach(_.commit()) + if (!isUploaded) { + snapshot.foreach(snapshotsToUploadQueue.offer) + } } finally { changelogWriter = None } } else { assert(changelogWriter.isEmpty) - uploadSnapshot() + assert(snapshot.isDefined) + fileManagerMetrics = uploadSnapshot( + snapshot.get, + fileManager, + rocksDBFileMapping.snapshotsPendingUpload, + loggingId + ) } } @@ -696,56 +701,6 @@ class RocksDB( } else true } - private def uploadSnapshot(): Unit = { - var oldSnapshotsImmutable: List[RocksDBSnapshot] = Nil - val localCheckpoint = synchronized { - val checkpoint = latestSnapshot - latestSnapshot = None - - // Convert mutable list buffer to immutable to prevent - // race condition with commit where old snapshot is added - oldSnapshotsImmutable = oldSnapshots.toList - oldSnapshots.clear() - - checkpoint - } - localCheckpoint match { - case Some( - RocksDBSnapshot( - localDir, - version, - numKeys, - capturedFileMappings, - columnFamilyMapping, - maxColumnFamilyId)) => - try { - val uploadTime = timeTakenMs { - fileManager.saveCheckpointToDfs( - localDir, - version, - numKeys, - capturedFileMappings, - Some(columnFamilyMapping.toMap), - Some(maxColumnFamilyId) - ) - fileManagerMetrics = fileManager.latestSaveCheckpointMetrics - } - logInfo(log"${MDC(LogKeys.LOG_ID, loggingId)}: Upload snapshot of version " + - log"${MDC(LogKeys.VERSION_NUM, version)}," + - log" time taken: ${MDC(LogKeys.TIME_UNITS, uploadTime)} ms") - } finally { - localCheckpoint.foreach(_.close()) - - // Clean up old latestSnapshots - for (snapshot <- oldSnapshotsImmutable) { - snapshot.close() - } - - } - case _ => - } - } - /** * Drop uncommitted changes, and roll back to previous version. */ @@ -762,7 +717,26 @@ class RocksDB( def doMaintenance(): Unit = { if (enableChangelogCheckpointing) { - uploadSnapshot() + + var mostRecentSnapshot: Option[RocksDBSnapshot] = None + var snapshot = snapshotsToUploadQueue.poll() + + // We only want to upload the most recent snapshot and skip the previous ones. + while (snapshot != null) { + logDebug(s"RocksDB Maintenance - polled snapshot ${snapshot.version}") + mostRecentSnapshot.foreach(_.close()) + mostRecentSnapshot = Some(snapshot) + snapshot = snapshotsToUploadQueue.poll() + } + + if (mostRecentSnapshot.isDefined) { + fileManagerMetrics = uploadSnapshot( + mostRecentSnapshot.get, + fileManager, + rocksDBFileMapping.snapshotsPendingUpload, + loggingId + ) + } } val cleanupTime = timeTakenMs { fileManager.deleteOldVersions(conf.minVersionsToRetain, conf.minVersionsToDelete) @@ -782,10 +756,13 @@ class RocksDB( flushOptions.close() rocksDbOptions.close() dbLogger.close() - synchronized { - latestSnapshot.foreach(_.close()) - latestSnapshot = None + + var snapshot = snapshotsToUploadQueue.poll() + while (snapshot != null) { + snapshot.close() + snapshot = snapshotsToUploadQueue.poll() } + silentDeleteRecursively(localRootDir, "closing RocksDB") } catch { case e: Exception => @@ -884,6 +861,18 @@ class RocksDB( rocksDBMetricsOpt } + private def createSnapshot( + checkpointDir: File, + version: Long, + columnFamilyMapping: Map[String, Short], + maxColumnFamilyId: Short): RocksDBSnapshot = { + val (dfsFileSuffix, immutableFileMapping) = rocksDBFileMapping.createSnapshotFileMapping( + fileManager, checkpointDir, version) + + RocksDBSnapshot(checkpointDir, version, numKeysOnWritingVersion, + columnFamilyMapping, maxColumnFamilyId, dfsFileSuffix, immutableFileMapping) + } + /** * Function to acquire RocksDB instance lock that allows for synchronized access to the state * store instance @@ -1005,10 +994,147 @@ class RocksDB( } } + override protected def logName: String = s"${super.logName} $loggingId" +} + +object RocksDB extends Logging { + + /** Upload the snapshot to DFS and remove it from snapshots pending */ + private def uploadSnapshot( + snapshot: RocksDB#RocksDBSnapshot, + fileManager: RocksDBFileManager, + snapshotsPendingUpload: Set[RocksDBVersionSnapshotInfo], + loggingId: String): RocksDBFileManagerMetrics = { + var fileManagerMetrics: RocksDBFileManagerMetrics = null + try { + val uploadTime = timeTakenMs { + fileManager.saveCheckpointToDfs(snapshot.checkpointDir, + snapshot.version, snapshot.numKeys, snapshot.fileMapping, + Some(snapshot.columnFamilyMapping), Some(snapshot.maxColumnFamilyId)) + fileManagerMetrics = fileManager.latestSaveCheckpointMetrics + + val snapshotInfo = RocksDBVersionSnapshotInfo(snapshot.version, snapshot.dfsFileSuffix) + // We are only removing the uploaded snapshot info from the pending set, + // to let the file mapping (i.e. query threads) know that the snapshot (i.e. and its files) + // have been uploaded to DFS. We don't touch the file mapping here to avoid corrupting it. + snapshotsPendingUpload.remove(snapshotInfo) + } + logInfo(log"${MDC(LogKeys.LOG_ID, loggingId)}: Upload snapshot of version " + + log"${MDC(LogKeys.VERSION_NUM, snapshot.version)}," + + log" time taken: ${MDC(LogKeys.TIME_UNITS, uploadTime)} ms") + } finally { + snapshot.close() + } + + fileManagerMetrics + } + /** Records the duration of running `body` for the next query progress update. */ - protected def timeTakenMs(body: => Unit): Long = Utils.timeTakenMs(body)._2 + private def timeTakenMs(body: => Unit): Long = Utils.timeTakenMs(body)._2 +} - override protected def logName: String = s"${super.logName} $loggingId" +// uniquely identifies a Snapshot. Multiple snapshots created for same version will +// use a different dfsFilesUUID, and hence will have different RocksDBVersionSnapshotInfo +case class RocksDBVersionSnapshotInfo(version: Long, dfsFilesUUID: String) + +// Encapsulates a RocksDB immutable file, and the information whether it has been previously +// uploaded to DFS. Already uploaded files can be skipped during SST file upload. +case class RocksDBSnapshotFile(immutableFile: RocksDBImmutableFile, isUploaded: Boolean) + +// Encapsulates the mapping of local SST files to DFS files. This mapping prevents +// re-uploading the same SST file multiple times to DFS, saving I/O and reducing snapshot +// upload time. During version load, if a DFS file is already present on local file system, +// it will be reused. +// This mapping should only be updated using the Task thread - at version load and commit time. +// If same mapping instance is updated from different threads, it will result in undefined behavior +// (and most likely incorrect mapping state). +class RocksDBFileMapping { + + // Maps a local SST file to the DFS version and DFS file. + private val localFileMappings: mutable.Map[String, (Long, RocksDBImmutableFile)] = + mutable.HashMap[String, (Long, RocksDBImmutableFile)]() + + // Keeps track of all snapshots which have not been uploaded yet. This prevents Spark + // from reusing SST files which have not been yet persisted to DFS, + val snapshotsPendingUpload: Set[RocksDBVersionSnapshotInfo] = ConcurrentHashMap.newKeySet() + + // Current State Store version which has been loaded. + var currentVersion: Long = 0 + + // If the local file (with localFileName) has already been persisted to DFS, returns the + // DFS file, else returns None. + // If the currently mapped DFS file was committed in a newer version (or was generated + // in a version which has not been uploaded to DFS yet), the mapped DFS file is ignored (because + // it cannot be reused in this version). In this scenario, the local mapping to this DFS file + // will be cleared, and function will return None. + def getDfsFile( + fileManager: RocksDBFileManager, + localFileName: String): Option[RocksDBImmutableFile] = { + localFileMappings.get(localFileName).map { case (dfsFileCommitVersion, dfsFile) => + val dfsFileSuffix = fileManager.dfsFileSuffix(dfsFile) + val versionSnapshotInfo = RocksDBVersionSnapshotInfo(dfsFileCommitVersion, dfsFileSuffix) + if (dfsFileCommitVersion >= currentVersion || + snapshotsPendingUpload.contains(versionSnapshotInfo)) { + // the mapped dfs file cannot be used, delete from mapping + remove(localFileName) + None + } else { + Some(dfsFile) + } + }.getOrElse(None) + } + + private def mapToDfsFile( + localFileName: String, + dfsFile: RocksDBImmutableFile, + version: Long): Unit = { + localFileMappings.put(localFileName, (version, dfsFile)) + } + + def remove(localFileName: String): Unit = { + localFileMappings.remove(localFileName) + } + + def mapToDfsFileForCurrentVersion(localFileName: String, dfsFile: RocksDBImmutableFile): Unit = { + localFileMappings.put(localFileName, (currentVersion, dfsFile)) + } + + private def syncWithLocalState(localFiles: Seq[File]): Unit = { + val localFileNames = localFiles.map(_.getName).toSet + val deletedFiles = localFileMappings.keys.filterNot(localFileNames.contains) + + deletedFiles.foreach(localFileMappings.remove) + } + + // Generates the DFS file names for local Immutable files in checkpoint directory, and + // returns the mapping from local fileName in checkpoint directory to generated DFS file. + // If the DFS file has been previously uploaded - the snapshot file isUploaded flag is set + // to true. + def createSnapshotFileMapping( + fileManager: RocksDBFileManager, + checkpointDir: File, + version: Long): (String, Map[String, RocksDBSnapshotFile]) = { + val (localImmutableFiles, _) = fileManager.listRocksDBFiles(checkpointDir) + // UUID used to prefix files uploaded to DFS as part of commit + val dfsFilesSuffix = UUID.randomUUID().toString + val snapshotFileMapping = localImmutableFiles.map { f => + val localFileName = f.getName + val existingDfsFile = getDfsFile(fileManager, localFileName) + val dfsFile = existingDfsFile.getOrElse { + val newDfsFileName = fileManager.newDFSFileName(localFileName, dfsFilesSuffix) + val newDfsFile = RocksDBImmutableFile(localFileName, newDfsFileName, sizeBytes = f.length()) + mapToDfsFile(localFileName, newDfsFile, version) + newDfsFile + } + localFileName -> RocksDBSnapshotFile(dfsFile, existingDfsFile.isDefined) + }.toMap + + syncWithLocalState(localImmutableFiles) + + val rocksDBSnapshotInfo = RocksDBVersionSnapshotInfo(version, dfsFilesSuffix) + snapshotsPendingUpload.add(rocksDBSnapshotInfo) + (dfsFilesSuffix, snapshotFileMapping) + } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala index 350a5797978b3..e503ea1737c0d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala @@ -24,8 +24,7 @@ import java.util.UUID import java.util.concurrent.ConcurrentHashMap import java.util.zip.{ZipEntry, ZipOutputStream} -import scala.collection.mutable -import scala.jdk.CollectionConverters._ +import scala.collection.{mutable, Map} import com.fasterxml.jackson.annotation.JsonInclude.Include import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper} @@ -146,33 +145,13 @@ class RocksDBFileManager( private def codec = CompressionCodec.createCodec(sparkConf, codecName) + // This is set when a version is loaded/committed. Hence only set by a task thread. private var maxSeenVersion: Option[Long] = None + // This is set during deletion of old versions. Hence only set by a maintenance thread. private var minSeenVersion = 1L @volatile private var rootDirChecked: Boolean = false - @volatile private var fileMappings = RocksDBFileMappings( - new ConcurrentHashMap[Long, Seq[RocksDBImmutableFile]], - new ConcurrentHashMap[String, RocksDBImmutableFile] - ) - - /** - * Make a deep copy of versionToRocksDBFiles and localFilesToDfsFiles to avoid - * current task thread from overwriting the file mapping whenever background maintenance - * thread attempts to upload a snapshot - */ - def copyFileMapping() : Unit = { - val newVersionToRocksDBFiles = new ConcurrentHashMap[Long, Seq[RocksDBImmutableFile]] - val newLocalFilesToDfsFiles = new ConcurrentHashMap[String, RocksDBImmutableFile] - - newVersionToRocksDBFiles.putAll(fileMappings.versionToRocksDBFiles) - newLocalFilesToDfsFiles.putAll(fileMappings.localFilesToDfsFiles) - - fileMappings = RocksDBFileMappings(newVersionToRocksDBFiles, newLocalFilesToDfsFiles) - } - - def captureFileMapReference(): RocksDBFileMappings = { - fileMappings - } + private val versionToRocksDBFiles = new ConcurrentHashMap[Long, Seq[RocksDBImmutableFile]] private def getChangelogVersion(useColumnFamilies: Boolean): Short = { val changelogVersion: Short = if (useColumnFamilies) { @@ -249,13 +228,13 @@ class RocksDBFileManager( checkpointDir: File, version: Long, numKeys: Long, - capturedFileMappings: RocksDBFileMappings, + fileMapping: Map[String, RocksDBSnapshotFile], columnFamilyMapping: Option[Map[String, Short]] = None, maxColumnFamilyId: Option[Short] = None): Unit = { logFilesInDir(checkpointDir, log"Saving checkpoint files " + log"for version ${MDC(LogKeys.VERSION_NUM, version)}") val (localImmutableFiles, localOtherFiles) = listRocksDBFiles(checkpointDir) - val rocksDBFiles = saveImmutableFilesToDfs(version, localImmutableFiles, capturedFileMappings) + val rocksDBFiles = saveImmutableFilesToDfs(version, localImmutableFiles, fileMapping) val metadata = RocksDBCheckpointMetadata( rocksDBFiles, numKeys, columnFamilyMapping, maxColumnFamilyId) val metadataFile = localMetadataFile(checkpointDir) @@ -286,15 +265,17 @@ class RocksDBFileManager( * ensures that only the exact files generated during checkpointing will be present in the * local directory. */ - def loadCheckpointFromDfs(version: Long, localDir: File): RocksDBCheckpointMetadata = { + def loadCheckpointFromDfs( + version: Long, + localDir: File, + rocksDBFileMapping: RocksDBFileMapping): RocksDBCheckpointMetadata = { logInfo(log"Loading checkpoint files for version ${MDC(LogKeys.VERSION_NUM, version)}") // The unique ids of SST files are checked when opening a rocksdb instance. The SST files // in larger versions can't be reused even if they have the same size and name because // they belong to another rocksdb instance. - fileMappings.versionToRocksDBFiles.keySet().removeIf(_ >= version) + versionToRocksDBFiles.keySet().removeIf(_ >= version) val metadata = if (version == 0) { if (localDir.exists) Utils.deleteRecursively(localDir) - fileMappings.localFilesToDfsFiles.clear() localDir.mkdirs() RocksDBCheckpointMetadata(Seq.empty, 0) } else { @@ -307,8 +288,8 @@ class RocksDBFileManager( val metadata = RocksDBCheckpointMetadata.readFromFile(metadataFile) logInfo(log"Read metadata for version ${MDC(LogKeys.VERSION_NUM, version)}:\n" + log"${MDC(LogKeys.METADATA_JSON, metadata.prettyJson)}") - loadImmutableFilesFromDfs(metadata.immutableFiles, localDir) - fileMappings.versionToRocksDBFiles.put(version, metadata.immutableFiles) + loadImmutableFilesFromDfs(metadata.immutableFiles, localDir, rocksDBFileMapping) + versionToRocksDBFiles.put(version, metadata.immutableFiles) metadataFile.delete() metadata } @@ -342,11 +323,11 @@ class RocksDBFileManager( if (fm.exists(path)) { val files = fm.list(path).map(_.getPath) val changelogFileVersions = files - .filter(onlyChangelogFiles.accept(_)) + .filter(onlyChangelogFiles.accept) .map(_.getName.stripSuffix(".changelog")) .map(_.toLong) val snapshotFileVersions = files - .filter(onlyZipFiles.accept(_)) + .filter(onlyZipFiles.accept) .map(_.getName.stripSuffix(".zip")) .map(_.toLong) val versions = changelogFileVersions ++ snapshotFileVersions @@ -516,9 +497,9 @@ class RocksDBFileManager( // Resolve RocksDB files for all the versions and find the max version each file is used val fileToMaxUsedVersion = new mutable.HashMap[String, Long] sortedSnapshotVersions.foreach { version => - val files = Option(fileMappings.versionToRocksDBFiles.get(version)).getOrElse { + val files = Option(versionToRocksDBFiles.get(version)).getOrElse { val newResolvedFiles = getImmutableFilesFromVersionZip(version) - fileMappings.versionToRocksDBFiles.put(version, newResolvedFiles) + versionToRocksDBFiles.put(version, newResolvedFiles) newResolvedFiles } files.foreach(f => fileToMaxUsedVersion(f.dfsFileName) = @@ -565,7 +546,7 @@ class RocksDBFileManager( val versionFile = dfsBatchZipFile(version) try { fm.delete(versionFile) - fileMappings.versionToRocksDBFiles.remove(version) + versionToRocksDBFiles.remove(version) logDebug(s"Deleted version $version") } catch { case e: Exception => @@ -591,7 +572,7 @@ class RocksDBFileManager( private def saveImmutableFilesToDfs( version: Long, localFiles: Seq[File], - capturedFileMappings: RocksDBFileMappings): Seq[RocksDBImmutableFile] = { + fileMappings: Map[String, RocksDBSnapshotFile]): Seq[RocksDBImmutableFile] = { // Get the immutable files used in previous versions, as some of those uploaded files can be // reused for this version logInfo(log"Saving RocksDB files to DFS for ${MDC(LogKeys.VERSION_NUM, version)}") @@ -601,49 +582,36 @@ class RocksDBFileManager( var filesReused = 0L val immutableFiles = localFiles.map { localFile => - val existingDfsFile = - capturedFileMappings.localFilesToDfsFiles.asScala.get(localFile.getName) - if (existingDfsFile.isDefined && existingDfsFile.get.sizeBytes == localFile.length()) { - val dfsFile = existingDfsFile.get - filesReused += 1 + val dfsFileMapping = fileMappings.get(localFile.getName) + assert(dfsFileMapping.isDefined) + val dfsFile = dfsFileMapping.get.immutableFile + val existsInDfs = dfsFileMapping.get.isUploaded + + if (existsInDfs) { logInfo(log"reusing file ${MDC(LogKeys.DFS_FILE, dfsFile)} for " + log"${MDC(LogKeys.FILE_NAME, localFile)}") - RocksDBImmutableFile(localFile.getName, dfsFile.dfsFileName, dfsFile.sizeBytes) + filesReused += 1 } else { - val localFileName = localFile.getName - val dfsFileName = newDFSFileName(localFileName) - val dfsFile = dfsFilePath(dfsFileName) // Note: The implementation of copyFromLocalFile() closes the output stream when there is // any exception while copying. So this may generate partial files on DFS. But that is // okay because until the main [version].zip file is written, those partial files are // not going to be used at all. Eventually these files should get cleared. fs.copyFromLocalFile( - new Path(localFile.getAbsoluteFile.toURI), dfsFile) + new Path(localFile.getAbsoluteFile.toURI), dfsFilePath(dfsFile.dfsFileName)) val localFileSize = localFile.length() logInfo(log"Copied ${MDC(LogKeys.FILE_NAME, localFile)} to " + log"${MDC(LogKeys.DFS_FILE, dfsFile)} - ${MDC(LogKeys.NUM_BYTES, localFileSize)} bytes") filesCopied += 1 bytesCopied += localFileSize - - val immutableDfsFile = RocksDBImmutableFile(localFile.getName, dfsFileName, localFileSize) - capturedFileMappings.localFilesToDfsFiles.put(localFileName, immutableDfsFile) - - immutableDfsFile } + + dfsFile } logInfo(log"Copied ${MDC(LogKeys.NUM_FILES_COPIED, filesCopied)} files " + log"(${MDC(LogKeys.NUM_BYTES, bytesCopied)} bytes) from local to" + log" DFS for version ${MDC(LogKeys.VERSION_NUM, version)}. " + log"${MDC(LogKeys.NUM_FILES_REUSED, filesReused)} files reused without copying.") - capturedFileMappings.versionToRocksDBFiles.put(version, immutableFiles) - - // Cleanup locally deleted files from the localFilesToDfsFiles map - // Locally, SST Files can be deleted due to RocksDB compaction. These files need - // to be removed rom the localFilesToDfsFiles map to ensure that if a older version - // regenerates them and overwrites the version.zip, SST files from the conflicting - // version (previously committed) are not reused. - removeLocallyDeletedSSTFilesFromDfsMapping(localFiles) - + versionToRocksDBFiles.put(version, immutableFiles) saveCheckpointMetrics = RocksDBFileManagerMetrics( bytesCopied = bytesCopied, filesCopied = filesCopied, @@ -658,43 +626,37 @@ class RocksDBFileManager( * necessary and non-existing files are copied from DFS. */ private def loadImmutableFilesFromDfs( - immutableFiles: Seq[RocksDBImmutableFile], localDir: File): Unit = { + immutableFiles: Seq[RocksDBImmutableFile], + localDir: File, + rocksDBFileMapping: RocksDBFileMapping): Unit = { val requiredFileNameToFileDetails = immutableFiles.map(f => f.localFileName -> f).toMap val localImmutableFiles = listRocksDBFiles(localDir)._1 - // Cleanup locally deleted files from the localFilesToDfsFiles map - // Locally, SST Files can be deleted due to RocksDB compaction. These files need - // to be removed rom the localFilesToDfsFiles map to ensure that if a older version - // regenerates them and overwrites the version.zip, SST files from the conflicting - // version (previously committed) are not reused. - removeLocallyDeletedSSTFilesFromDfsMapping(localImmutableFiles) - // Delete unnecessary local immutable files - localImmutableFiles - .foreach { existingFile => - val existingFileSize = existingFile.length() - val requiredFile = requiredFileNameToFileDetails.get(existingFile.getName) - val prevDfsFile = fileMappings.localFilesToDfsFiles.asScala.get(existingFile.getName) - val isSameFile = if (requiredFile.isDefined && prevDfsFile.isDefined) { - requiredFile.get.dfsFileName == prevDfsFile.get.dfsFileName && - existingFile.length() == requiredFile.get.sizeBytes - } else { - false - } + localImmutableFiles.foreach { existingFile => + val existingFileSize = existingFile.length() + val requiredFile = requiredFileNameToFileDetails.get(existingFile.getName) + val prevDfsFile = rocksDBFileMapping.getDfsFile(this, existingFile.getName) + val isSameFile = if (requiredFile.isDefined && prevDfsFile.isDefined) { + requiredFile.get.dfsFileName == prevDfsFile.get.dfsFileName && + existingFile.length() == requiredFile.get.sizeBytes + } else { + false + } - if (!isSameFile) { - existingFile.delete() - fileMappings.localFilesToDfsFiles.remove(existingFile.getName) - logInfo(log"Deleted local file ${MDC(LogKeys.FILE_NAME, existingFile)} " + - log"with size ${MDC(LogKeys.NUM_BYTES, existingFileSize)} mapped" + - log" to previous dfsFile ${MDC(LogKeys.DFS_FILE, prevDfsFile.getOrElse("null"))}") - } else { - logInfo(log"reusing ${MDC(LogKeys.DFS_FILE, prevDfsFile)} present at " + - log"${MDC(LogKeys.EXISTING_FILE, existingFile)} " + - log"for ${MDC(LogKeys.FILE_NAME, requiredFile)}") - } + if (!isSameFile) { + rocksDBFileMapping.remove(existingFile.getName) + existingFile.delete() + logInfo(log"Deleted local file ${MDC(LogKeys.FILE_NAME, existingFile)} " + + log"with size ${MDC(LogKeys.NUM_BYTES, existingFileSize)} mapped" + + log" to previous dfsFile ${MDC(LogKeys.DFS_FILE, prevDfsFile.getOrElse("null"))}") + } else { + logInfo(log"reusing ${MDC(LogKeys.DFS_FILE, prevDfsFile)} present at " + + log"${MDC(LogKeys.EXISTING_FILE, existingFile)} " + + log"for ${MDC(LogKeys.FILE_NAME, requiredFile)}") } + } var filesCopied = 0L var bytesCopied = 0L @@ -717,7 +679,7 @@ class RocksDBFileManager( } filesCopied += 1 bytesCopied += localFileSize - fileMappings.localFilesToDfsFiles.put(localFileName, file) + rocksDBFileMapping.mapToDfsFileForCurrentVersion(localFileName, file) logInfo(log"Copied ${MDC(LogKeys.DFS_FILE, dfsFile)} to " + log"${MDC(LogKeys.FILE_NAME, localFile)} - " + log"${MDC(LogKeys.NUM_BYTES, localFileSize)} bytes") @@ -735,19 +697,6 @@ class RocksDBFileManager( filesReused = filesReused) } - private def removeLocallyDeletedSSTFilesFromDfsMapping(localFiles: Seq[File]): Unit = { - // clean up deleted SST files from the localFilesToDfsFiles Map - val currentLocalFiles = localFiles.map(_.getName).toSet - val mappingsToClean = fileMappings.localFilesToDfsFiles.asScala - .keys - .filterNot(currentLocalFiles.contains) - - mappingsToClean.foreach { f => - logInfo(log"cleaning ${MDC(LogKeys.FILE_NAME, f)} from the localFilesToDfsFiles map") - fileMappings.localFilesToDfsFiles.remove(f) - } - } - /** Get the SST files required for a version from the version zip file in DFS */ private def getImmutableFilesFromVersionZip(version: Long): Seq[RocksDBImmutableFile] = { Utils.deleteRecursively(localTempDir) @@ -811,6 +760,19 @@ class RocksDBFileManager( s"$baseName-${UUID.randomUUID}.$extension" } + def newDFSFileName(localFileName: String, dfsFileSuffix: String): String = { + val baseName = FilenameUtils.getBaseName(localFileName) + val extension = FilenameUtils.getExtension(localFileName) + s"$baseName-$dfsFileSuffix.$extension" + } + + def dfsFileSuffix(immutableFile: RocksDBImmutableFile): String = { + val suffixStart = immutableFile.dfsFileName.indexOf('-') + val suffixEnd = immutableFile.dfsFileName.indexOf('.') + + immutableFile.dfsFileName.substring(suffixStart + 1, suffixEnd) + } + private def dfsBatchZipFile(version: Long): Path = new Path(s"$dfsRootDir/$version.zip") // We use changelog suffix intentionally so that we can tell the difference from changelog file of // HDFSBackedStateStore which is named version.delta. @@ -841,7 +803,7 @@ class RocksDBFileManager( /** * List all the RocksDB files that need be synced or recovered. */ - private def listRocksDBFiles(localDir: File): (Seq[File], Seq[File]) = { + def listRocksDBFiles(localDir: File): (Seq[File], Seq[File]) = { val topLevelFiles = localDir.listFiles.filter(!_.isDirectory) val archivedLogFiles = Option(new File(localDir, LOG_FILES_LOCAL_SUBDIR).listFiles()) @@ -854,20 +816,6 @@ class RocksDBFileManager( } } -/** - * Track file mappings in RocksDB across local and remote directories - * @param versionToRocksDBFiles Mapping of RocksDB files used across versions for maintenance - * @param localFilesToDfsFiles Mapping of the exact Dfs file used to create a local SST file - * The reason localFilesToDfsFiles is a separate map because versionToRocksDBFiles can contain - * multiple similar SST files to a particular local file (for example 1.sst can map to 1-UUID1.sst - * in v1 and 1-UUID2.sst in v2). We need to capture the exact file used to ensure Version ID - * compatibility across SST files and RocksDB manifest. - */ - -case class RocksDBFileMappings( - versionToRocksDBFiles: ConcurrentHashMap[Long, Seq[RocksDBImmutableFile]], - localFilesToDfsFiles: ConcurrentHashMap[String, RocksDBImmutableFile]) - /** * Metrics regarding RocksDB file sync between local and DFS. */ @@ -1067,7 +1015,10 @@ object RocksDBImmutableFile { val LOG_FILES_DFS_SUBDIR = "logs" val LOG_FILES_LOCAL_SUBDIR = "archive" - def apply(localFileName: String, dfsFileName: String, sizeBytes: Long): RocksDBImmutableFile = { + def apply( + localFileName: String, + dfsFileName: String, + sizeBytes: Long): RocksDBImmutableFile = { if (isSstFile(localFileName)) { RocksDBSstFile(localFileName, dfsFileName, sizeBytes) } else if (isLogFile(localFileName)) { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala index 9fcd2001cce50..8fde216c14411 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala @@ -802,10 +802,12 @@ class RocksDBSuite extends AlsoTestWithChangelogCheckpointingEnabled with Shared val cpFiles = Seq() generateFiles(verificationDir, cpFiles) assert(!dfsRootDir.exists()) - saveCheckpointFiles(fileManager, cpFiles, version = 1, numKeys = -1) + val fileMapping = new RocksDBFileMapping + saveCheckpointFiles(fileManager, cpFiles, version = 1, + numKeys = -1, fileMapping) // The dfs root dir is created even with unknown number of keys assert(dfsRootDir.exists()) - loadAndVerifyCheckpointFiles(fileManager, verificationDir, version = 1, Nil, -1) + loadAndVerifyCheckpointFiles(fileManager, verificationDir, version = 1, Nil, -1, fileMapping) } finally { Utils.deleteRecursively(dfsRootDir) } @@ -896,6 +898,7 @@ class RocksDBSuite extends AlsoTestWithChangelogCheckpointingEnabled with Shared // that checkpoint the same version of state val fileManager = new RocksDBFileManager( dfsRootDir, Utils.createTempDir(), new Configuration) + val rocksDBFileMapping = new RocksDBFileMapping() val fileManager_ = new RocksDBFileManager( dfsRootDir, Utils.createTempDir(), new Configuration) val sstDir = s"$dfsRootDir/SSTs" @@ -912,7 +915,10 @@ class RocksDBSuite extends AlsoTestWithChangelogCheckpointingEnabled with Shared "archive/00001.log" -> 1000, "archive/00002.log" -> 2000 ) - saveCheckpointFiles(fileManager, cpFiles1, version = 1, numKeys = 101) + + rocksDBFileMapping.currentVersion = 1 + saveCheckpointFiles(fileManager, cpFiles1, version = 1, + numKeys = 101, rocksDBFileMapping) assert(fileManager.getLatestVersion() === 1) assert(numRemoteSSTFiles == 2) // 2 sst files copied assert(numRemoteLogFiles == 2) @@ -926,7 +932,8 @@ class RocksDBSuite extends AlsoTestWithChangelogCheckpointingEnabled with Shared "archive/00002.log" -> 1000, "archive/00003.log" -> 2000 ) - saveCheckpointFiles(fileManager_, cpFiles1_, version = 1, numKeys = 101) + saveCheckpointFiles(fileManager_, cpFiles1_, version = 1, + numKeys = 101, new RocksDBFileMapping()) assert(fileManager_.getLatestVersion() === 1) assert(numRemoteSSTFiles == 4) assert(numRemoteLogFiles == 4) @@ -945,7 +952,8 @@ class RocksDBSuite extends AlsoTestWithChangelogCheckpointingEnabled with Shared "archive/00004.log" -> 1000, "archive/00005.log" -> 2000 ) - saveCheckpointFiles(fileManager_, cpFiles2, version = 2, numKeys = 121) + saveCheckpointFiles(fileManager_, cpFiles2, + version = 2, numKeys = 121, new RocksDBFileMapping()) fileManager_.deleteOldVersions(1) assert(numRemoteSSTFiles <= 4) // delete files recorded in 1.zip assert(numRemoteLogFiles <= 5) // delete files recorded in 1.zip and orphan 00001.log @@ -959,7 +967,8 @@ class RocksDBSuite extends AlsoTestWithChangelogCheckpointingEnabled with Shared "archive/00006.log" -> 1000, "archive/00007.log" -> 2000 ) - saveCheckpointFiles(fileManager_, cpFiles3, version = 3, numKeys = 131) + saveCheckpointFiles(fileManager_, cpFiles3, + version = 3, numKeys = 131, new RocksDBFileMapping()) assert(fileManager_.getLatestVersion() === 3) fileManager_.deleteOldVersions(1) assert(numRemoteSSTFiles == 1) @@ -996,7 +1005,9 @@ class RocksDBSuite extends AlsoTestWithChangelogCheckpointingEnabled with Shared "archive/00001.log" -> 1000, "archive/00002.log" -> 2000 ) - saveCheckpointFiles(fileManager, cpFiles1, version = 1, numKeys = 101) + val rocksDBFileMapping = new RocksDBFileMapping() + saveCheckpointFiles(fileManager, cpFiles1, + version = 1, numKeys = 101, rocksDBFileMapping) fileManager.deleteOldVersions(1) // Should not delete orphan files even when they are older than all existing files // when there is only 1 version. @@ -1013,7 +1024,8 @@ class RocksDBSuite extends AlsoTestWithChangelogCheckpointingEnabled with Shared "archive/00003.log" -> 1000, "archive/00004.log" -> 2000 ) - saveCheckpointFiles(fileManager, cpFiles2, version = 2, numKeys = 101) + saveCheckpointFiles(fileManager, cpFiles2, + version = 2, numKeys = 101, rocksDBFileMapping) assert(numRemoteSSTFiles == 5) assert(numRemoteLogFiles == 5) fileManager.deleteOldVersions(1) @@ -1034,13 +1046,14 @@ class RocksDBSuite extends AlsoTestWithChangelogCheckpointingEnabled with Shared def numRemoteSSTFiles: Int = listFiles(sstDir).length val logDir = s"$dfsRootDir/logs" def numRemoteLogFiles: Int = listFiles(logDir).length + val fileMapping = new RocksDBFileMapping // Verify behavior before any saved checkpoints assert(fileManager.getLatestVersion() === 0) // Try to load incorrect versions intercept[FileNotFoundException] { - fileManager.loadCheckpointFromDfs(1, Utils.createTempDir()) + fileManager.loadCheckpointFromDfs(1, Utils.createTempDir(), fileMapping) } // Save a version of checkpoint files @@ -1052,7 +1065,8 @@ class RocksDBSuite extends AlsoTestWithChangelogCheckpointingEnabled with Shared "archive/00001.log" -> 1000, "archive/00002.log" -> 2000 ) - saveCheckpointFiles(fileManager, cpFiles1, version = 1, numKeys = 101) + saveCheckpointFiles(fileManager, cpFiles1, + version = 1, numKeys = 101, fileMapping) assert(fileManager.getLatestVersion() === 1) assert(numRemoteSSTFiles == 2) // 2 sst files copied assert(numRemoteLogFiles == 2) // 2 log files copied @@ -1067,12 +1081,16 @@ class RocksDBSuite extends AlsoTestWithChangelogCheckpointingEnabled with Shared "00005.log" -> 101, "archive/00007.log" -> 101 )) - loadAndVerifyCheckpointFiles(fileManager, verificationDir, version = 1, cpFiles1, 101) + + // as we are loading version 1 again, the previously committed 1,zip and + // SST files would not be reused. + loadAndVerifyCheckpointFiles(fileManager, verificationDir, + version = 1, cpFiles1, 101, fileMapping) // Save SAME version again with different checkpoint files and load back again to verify // whether files were overwritten. val cpFiles1_ = Seq( - "sst-file1.sst" -> 10, // same SST file as before, this should get reused + "sst-file1.sst" -> 10, // same SST file as before, but will be uploaded again "sst-file2.sst" -> 25, // new SST file with same name as before, but different length "sst-file3.sst" -> 30, // new SST file "other-file1" -> 100, // same non-SST file as before, should not get copied @@ -1082,33 +1100,51 @@ class RocksDBSuite extends AlsoTestWithChangelogCheckpointingEnabled with Shared "archive/00002.log" -> 2500, // new log file with same name as before, but different length "archive/00003.log" -> 3000 // new log file ) - saveCheckpointFiles(fileManager, cpFiles1_, version = 1, numKeys = 1001) - assert(numRemoteSSTFiles === 4, "shouldn't copy same files again") // 2 old + 2 new SST files - assert(numRemoteLogFiles === 4, "shouldn't copy same files again") // 2 old + 2 new log files - loadAndVerifyCheckpointFiles(fileManager, verificationDir, version = 1, cpFiles1_, 1001) + + // upload version 1 again, new checkpoint will be created and SST files from + // previously committed version 1 will not be reused. + saveCheckpointFiles(fileManager, cpFiles1_, + version = 1, numKeys = 1001, fileMapping) + assert(numRemoteSSTFiles === 5, "shouldn't reuse old version 1 SST files" + + " while uploading version 1 again") // 2 old + 3 new SST files + assert(numRemoteLogFiles === 5, "shouldn't reuse old version 1 log files" + + " while uploading version 1 again") // 2 old + 3 new log files + + // verify checkpoint state is correct + loadAndVerifyCheckpointFiles(fileManager, verificationDir, + version = 1, cpFiles1_, 1001, fileMapping) // Save another version and verify val cpFiles2 = Seq( - "sst-file4.sst" -> 40, + "sst-file1.sst" -> 10, // same SST file as version 1, should be reused + "sst-file2.sst" -> 25, // same SST file as version 1, should be reused + "sst-file3.sst" -> 30, // same SST file as version 1, should be reused + "sst-file4.sst" -> 40, // new sst file, should be uploaded "other-file4" -> 400, "archive/00004.log" -> 4000 ) - saveCheckpointFiles(fileManager, cpFiles2, version = 2, numKeys = 1501) - assert(numRemoteSSTFiles === 5) // 1 new file over earlier 4 files - assert(numRemoteLogFiles === 5) // 1 new file over earlier 4 files - loadAndVerifyCheckpointFiles(fileManager, verificationDir, version = 2, cpFiles2, 1501) + + saveCheckpointFiles(fileManager, cpFiles2, + version = 2, numKeys = 1501, fileMapping) + assert(numRemoteSSTFiles === 6) // 1 new file over earlier 5 files + assert(numRemoteLogFiles === 6) // 1 new file over earlier 6 files + loadAndVerifyCheckpointFiles(fileManager, verificationDir, + version = 2, cpFiles2, 1501, fileMapping) // Loading an older version should work - loadAndVerifyCheckpointFiles(fileManager, verificationDir, version = 1, cpFiles1_, 1001) + loadAndVerifyCheckpointFiles(fileManager, verificationDir, + version = 1, cpFiles1_, 1001, fileMapping) // Loading incorrect version should fail intercept[FileNotFoundException] { - loadAndVerifyCheckpointFiles(fileManager, verificationDir, version = 3, Nil, 1001) + loadAndVerifyCheckpointFiles(fileManager, verificationDir, + version = 3, Nil, 1001, fileMapping) } // Loading 0 should delete all files require(verificationDir.list().length > 0) - loadAndVerifyCheckpointFiles(fileManager, verificationDir, version = 0, Nil, 0) + loadAndVerifyCheckpointFiles(fileManager, verificationDir, + version = 0, Nil, 0, fileMapping) } } @@ -1125,7 +1161,8 @@ class RocksDBSuite extends AlsoTestWithChangelogCheckpointingEnabled with Shared val cpFiles = Seq("sst-file1.sst" -> 10, "sst-file2.sst" -> 20, "other-file1" -> 100) CreateAtomicTestManager.shouldFailInCreateAtomic = true intercept[IOException] { - saveCheckpointFiles(fileManager, cpFiles, version = 1, numKeys = 101) + saveCheckpointFiles(fileManager, cpFiles, + version = 1, numKeys = 101, new RocksDBFileMapping()) } assert(CreateAtomicTestManager.cancelCalledInCreateAtomic) } @@ -1779,37 +1816,39 @@ class RocksDBSuite extends AlsoTestWithChangelogCheckpointingEnabled with Shared "validate successful RocksDB load when metadata file is not overwritten") { val fmClass = "org.apache.spark.sql.execution.streaming.state." + "NoOverwriteFileSystemBasedCheckpointFileManager" - withTempDir { dir => - val conf = dbConf.copy(minDeltasForSnapshot = 0) // create snapshot every commit - val hadoopConf = new Configuration() - hadoopConf.set(STREAMING_CHECKPOINT_FILE_MANAGER_CLASS.parent.key, fmClass) - - val remoteDir = dir.getCanonicalPath - withDB(remoteDir, conf = conf, hadoopConf = hadoopConf) { db => - db.load(0) - db.put("a", "1") - db.commit() + Seq(Some(fmClass), None).foreach { fm => + withTempDir { dir => + val conf = dbConf.copy(minDeltasForSnapshot = 0) // create snapshot every commit + val hadoopConf = new Configuration() + fm.foreach(value => + hadoopConf.set(STREAMING_CHECKPOINT_FILE_MANAGER_CLASS.parent.key, value)) + val remoteDir = dir.getCanonicalPath + withDB(remoteDir, conf = conf, hadoopConf = hadoopConf) { db => + db.load(0) + db.put("a", "1") + db.commit() - // load previous version, and recreate the snapshot - db.load(0) - db.put("a", "1") + // load previous version, will recreate snapshot on commit + db.load(0) + db.put("a", "1") - // do not upload version 1 snapshot created previously - db.doMaintenance() - assert(snapshotVersionsPresent(remoteDir) == Seq.empty) + // upload version 1 snapshot created previously + db.doMaintenance() + assert(snapshotVersionsPresent(remoteDir) == Seq(1)) - db.commit() // create snapshot again + db.commit() // create snapshot again - // load version 1 - should succeed - withDB(remoteDir, version = 1, conf = conf, hadoopConf = hadoopConf) { db => - } + // load version 1 - should succeed + withDB(remoteDir, version = 1, conf = conf, hadoopConf = hadoopConf) { db => + } - // upload recently created snapshot - db.doMaintenance() - assert(snapshotVersionsPresent(remoteDir) == Seq(1)) + // upload recently created snapshot + db.doMaintenance() + assert(snapshotVersionsPresent(remoteDir) == Seq(1)) - // load version 1 again - should succeed - withDB(remoteDir, version = 1, conf = conf, hadoopConf = hadoopConf) { db => + // load version 1 again - should succeed + withDB(remoteDir, version = 1, conf = conf, hadoopConf = hadoopConf) { db => + } } } } @@ -2241,14 +2280,20 @@ class RocksDBSuite extends AlsoTestWithChangelogCheckpointingEnabled with Shared fileManager: RocksDBFileManager, fileToLengths: Seq[(String, Int)], version: Int, - numKeys: Int): Unit = { + numKeys: Int, + fileMapping: RocksDBFileMapping): Unit = { val checkpointDir = Utils.createTempDir().getAbsolutePath // local dir to create checkpoints generateFiles(checkpointDir, fileToLengths) + fileMapping.currentVersion = version - 1 + val (dfsFileSuffix, immutableFileMapping) = fileMapping.createSnapshotFileMapping( + fileManager, checkpointDir, version) fileManager.saveCheckpointToDfs( checkpointDir, version, numKeys, - fileManager.captureFileMapReference()) + immutableFileMapping) + val snapshotInfo = RocksDBVersionSnapshotInfo(version, dfsFileSuffix) + fileMapping.snapshotsPendingUpload.remove(snapshotInfo) } def loadAndVerifyCheckpointFiles( @@ -2256,8 +2301,10 @@ class RocksDBSuite extends AlsoTestWithChangelogCheckpointingEnabled with Shared verificationDir: String, version: Int, expectedFiles: Seq[(String, Int)], - expectedNumKeys: Int): Unit = { - val metadata = fileManager.loadCheckpointFromDfs(version, verificationDir) + expectedNumKeys: Int, + fileMapping: RocksDBFileMapping): Unit = { + val metadata = fileManager.loadCheckpointFromDfs(version, + verificationDir, fileMapping) val filesAndLengths = listFiles(verificationDir).map(f => f.getName -> f.length).toSet ++ listFiles(verificationDir + "/archive").map(f => s"archive/${f.getName}" -> f.length()).toSet