Skip to content

Commit

Permalink
[SPARK-48796][SS] Load Column Family Id from RocksDBCheckpointMetadat…
Browse files Browse the repository at this point in the history
…a for VCF when restarting

### What changes were proposed in this pull request?

Persisting the mapping between columnFamilyName to columnFamilyId in RocksDBCheckpointMetadata and RocksDBSnapshot. RocksDB will maintain an internal metadata of this mapping, and set this info on load. RocksDBStateStoreProvider can call columnFamily operations as usual, and RocksDB.scala will translate the name to the virtual column family ID.
### Why are the changes needed?

To enable the use of virtual column families, and the performance benefits it comes along with, with the TransformWithState operator

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Amended unit tests

### Was this patch authored or co-authored using generative AI tooling?

No

Closes apache#47778 from ericm-db/vcf-integration-state-store.

Authored-by: Eric Marnadi <eric.marnadi@databricks.com>
Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
  • Loading branch information
ericm-db authored and HeartSaVioR committed Aug 20, 2024
1 parent 8fbbcb0 commit e64f620
Show file tree
Hide file tree
Showing 6 changed files with 427 additions and 144 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,13 @@ package org.apache.spark.sql.execution.streaming.state

import java.io.File
import java.util.Locale
import java.util.concurrent.TimeUnit
import java.util.concurrent.{ConcurrentHashMap, TimeUnit}
import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger}
import javax.annotation.concurrent.GuardedBy

import scala.collection.{mutable, Map}
import scala.collection.mutable.ListBuffer
import scala.jdk.CollectionConverters.{ConcurrentMapHasAsScala, MapHasAsJava}
import scala.ref.WeakReference
import scala.util.Try

Expand Down Expand Up @@ -76,7 +78,9 @@ class RocksDB(
checkpointDir: File,
version: Long,
numKeys: Long,
capturedFileMappings: RocksDBFileMappings) {
capturedFileMappings: RocksDBFileMappings,
columnFamilyMapping: Map[String, Short],
maxColumnFamilyId: Short) {
def close(): Unit = {
silentDeleteRecursively(checkpointDir, s"Free up local checkpoint of snapshot $version")
}
Expand Down Expand Up @@ -166,6 +170,87 @@ class RocksDB(
@GuardedBy("acquireLock")
@volatile private var acquiredThreadInfo: AcquiredThreadInfo = _

// This is accessed and updated only between load and commit
// which means it is implicitly guarded by acquireLock
@GuardedBy("acquireLock")
private val colFamilyNameToIdMap = new ConcurrentHashMap[String, Short]()

@GuardedBy("acquireLock")
private val maxColumnFamilyId: AtomicInteger = new AtomicInteger(-1)

@GuardedBy("acquireLock")
private val shouldForceSnapshot: AtomicBoolean = new AtomicBoolean(false)

/**
* Check whether the column family name is for internal column families.
*
* @param cfName - column family name
* @return - true if the column family is for internal use, false otherwise
*/
private def checkInternalColumnFamilies(cfName: String): Boolean = cfName.charAt(0) == '_'

// Methods to fetch column family mapping for this State Store version
def getColumnFamilyMapping: Map[String, Short] = {
colFamilyNameToIdMap.asScala
}

def getColumnFamilyId(cfName: String): Short = {
colFamilyNameToIdMap.get(cfName)
}

/**
* Create RocksDB column family, if not created already
*/
def createColFamilyIfAbsent(colFamilyName: String): Short = {
if (!checkColFamilyExists(colFamilyName)) {
val newColumnFamilyId = maxColumnFamilyId.incrementAndGet().toShort
colFamilyNameToIdMap.putIfAbsent(colFamilyName, newColumnFamilyId)
shouldForceSnapshot.set(true)
newColumnFamilyId
} else {
colFamilyNameToIdMap.get(colFamilyName)
}
}

/**
* Remove RocksDB column family, if exists
*/
def removeColFamilyIfExists(colFamilyName: String): Boolean = {
if (checkColFamilyExists(colFamilyName)) {
colFamilyNameToIdMap.remove(colFamilyName)
shouldForceSnapshot.set(true)
true
} else {
false
}
}

/**
* Function to check if the column family exists in the state store instance.
*
* @param colFamilyName - name of the column family
* @return - true if the column family exists, false otherwise
*/
def checkColFamilyExists(colFamilyName: String): Boolean = {
colFamilyNameToIdMap.containsKey(colFamilyName)
}

// This method sets the internal column family metadata to
// the default values it should be set to on load
private def setInitialCFInfo(): Unit = {
colFamilyNameToIdMap.clear()
shouldForceSnapshot.set(false)
maxColumnFamilyId.set(0)
}

def getColFamilyCount(isInternal: Boolean): Long = {
if (isInternal) {
colFamilyNameToIdMap.asScala.keys.toSeq.count(checkInternalColumnFamilies)
} else {
colFamilyNameToIdMap.asScala.keys.toSeq.count(!checkInternalColumnFamilies(_))
}
}

/**
* Load the given version of data in a native RocksDB instance.
* Note that this will copy all the necessary file from DFS to local disk as needed,
Expand All @@ -188,6 +273,14 @@ class RocksDB(
// Initialize maxVersion upon successful load from DFS
fileManager.setMaxSeenVersion(version)

setInitialCFInfo()
metadata.columnFamilyMapping.foreach { mapping =>
colFamilyNameToIdMap.putAll(mapping.asJava)
}

metadata.maxColumnFamilyId.foreach { maxId =>
maxColumnFamilyId.set(maxId)
}
// reset last snapshot version
if (lastSnapshotVersion > latestSnapshotVersion) {
// discard any newer snapshots
Expand Down Expand Up @@ -496,7 +589,7 @@ class RocksDB(
var compactTimeMs = 0L
var flushTimeMs = 0L
var checkpointTimeMs = 0L
if (shouldCreateSnapshot()) {
if (shouldCreateSnapshot() || shouldForceSnapshot.get()) {
// Need to flush the change to disk before creating a checkpoint
// because rocksdb wal is disabled.
logInfo(log"Flushing updates for ${MDC(LogKeys.VERSION_NUM, newVersion)}")
Expand Down Expand Up @@ -535,7 +628,9 @@ class RocksDB(
RocksDBSnapshot(checkpointDir,
newVersion,
numKeysOnWritingVersion,
fileManager.captureFileMapReference()))
fileManager.captureFileMapReference(),
colFamilyNameToIdMap.asScala.toMap,
maxColumnFamilyId.get().toShort))
lastSnapshotVersion = newVersion
}
}
Expand All @@ -544,11 +639,20 @@ class RocksDB(
logInfo(log"Syncing checkpoint for ${MDC(LogKeys.VERSION_NUM, newVersion)} to DFS")
val fileSyncTimeMs = timeTakenMs {
if (enableChangelogCheckpointing) {
try {
assert(changelogWriter.isDefined)
changelogWriter.foreach(_.commit())
} finally {
// If we have changed the columnFamilyId mapping, we have set a new
// snapshot and need to upload this to the DFS even if changelog checkpointing
// is enabled.
if (shouldForceSnapshot.get()) {
uploadSnapshot()
changelogWriter = None
changelogWriter.foreach(_.abort())
} else {
try {
assert(changelogWriter.isDefined)
changelogWriter.foreach(_.commit())
} finally {
changelogWriter = None
}
}
} else {
assert(changelogWriter.isEmpty)
Expand Down Expand Up @@ -606,10 +710,24 @@ class RocksDB(
checkpoint
}
localCheckpoint match {
case Some(RocksDBSnapshot(localDir, version, numKeys, capturedFileMappings)) =>
case Some(
RocksDBSnapshot(
localDir,
version,
numKeys,
capturedFileMappings,
columnFamilyMapping,
maxColumnFamilyId)) =>
try {
val uploadTime = timeTakenMs {
fileManager.saveCheckpointToDfs(localDir, version, numKeys, capturedFileMappings)
fileManager.saveCheckpointToDfs(
localDir,
version,
numKeys,
capturedFileMappings,
Some(columnFamilyMapping.toMap),
Some(maxColumnFamilyId)
)
fileManagerMetrics = fileManager.latestSaveCheckpointMetrics
}
logInfo(log"${MDC(LogKeys.LOG_ID, loggingId)}: Upload snapshot of version " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -249,12 +249,15 @@ class RocksDBFileManager(
checkpointDir: File,
version: Long,
numKeys: Long,
capturedFileMappings: RocksDBFileMappings): Unit = {
capturedFileMappings: RocksDBFileMappings,
columnFamilyMapping: Option[Map[String, Short]] = None,
maxColumnFamilyId: Option[Short] = None): Unit = {
logFilesInDir(checkpointDir, log"Saving checkpoint files " +
log"for version ${MDC(LogKeys.VERSION_NUM, version)}")
val (localImmutableFiles, localOtherFiles) = listRocksDBFiles(checkpointDir)
val rocksDBFiles = saveImmutableFilesToDfs(version, localImmutableFiles, capturedFileMappings)
val metadata = RocksDBCheckpointMetadata(rocksDBFiles, numKeys)
val metadata = RocksDBCheckpointMetadata(
rocksDBFiles, numKeys, columnFamilyMapping, maxColumnFamilyId)
val metadataFile = localMetadataFile(checkpointDir)
metadata.writeToFile(metadataFile)
logInfo(log"Written metadata for version ${MDC(LogKeys.VERSION_NUM, version)}:\n" +
Expand Down Expand Up @@ -889,11 +892,17 @@ object RocksDBFileManagerMetrics {
case class RocksDBCheckpointMetadata(
sstFiles: Seq[RocksDBSstFile],
logFiles: Seq[RocksDBLogFile],
numKeys: Long) {
numKeys: Long,
columnFamilyMapping: Option[Map[String, Short]] = None,
maxColumnFamilyId: Option[Short] = None) {

require(columnFamilyMapping.isDefined == maxColumnFamilyId.isDefined,
"columnFamilyMapping and maxColumnFamilyId must either both be defined or both be None")

import RocksDBCheckpointMetadata._

def json: String = {
// We turn this field into a null to avoid write a empty logFiles field in the json.
// We turn this field into a null to avoid write an empty logFiles field in the json.
val nullified = if (logFiles.isEmpty) this.copy(logFiles = null) else this
mapper.writeValueAsString(nullified)
}
Expand Down Expand Up @@ -941,11 +950,73 @@ object RocksDBCheckpointMetadata {
}
}

def apply(rocksDBFiles: Seq[RocksDBImmutableFile], numKeys: Long): RocksDBCheckpointMetadata = {
val sstFiles = rocksDBFiles.collect { case file: RocksDBSstFile => file }
val logFiles = rocksDBFiles.collect { case file: RocksDBLogFile => file }
// Apply method for cases without column family information
def apply(
rocksDBFiles: Seq[RocksDBImmutableFile],
numKeys: Long): RocksDBCheckpointMetadata = {
val (sstFiles, logFiles) = rocksDBFiles.partition(_.isInstanceOf[RocksDBSstFile])
new RocksDBCheckpointMetadata(
sstFiles.map(_.asInstanceOf[RocksDBSstFile]),
logFiles.map(_.asInstanceOf[RocksDBLogFile]),
numKeys,
None,
None
)
}

def apply(
rocksDBFiles: Seq[RocksDBImmutableFile],
numKeys: Long,
columnFamilyMapping: Option[Map[String, Short]],
maxColumnFamilyId: Option[Short]): RocksDBCheckpointMetadata = {
val (sstFiles, logFiles) = rocksDBFiles.partition(_.isInstanceOf[RocksDBSstFile])
new RocksDBCheckpointMetadata(
sstFiles.map(_.asInstanceOf[RocksDBSstFile]),
logFiles.map(_.asInstanceOf[RocksDBLogFile]),
numKeys,
columnFamilyMapping,
maxColumnFamilyId
)
}

// Apply method for cases with separate sstFiles and logFiles, without column family information
def apply(
sstFiles: Seq[RocksDBSstFile],
logFiles: Seq[RocksDBLogFile],
numKeys: Long): RocksDBCheckpointMetadata = {
new RocksDBCheckpointMetadata(sstFiles, logFiles, numKeys, None, None)
}

// Apply method for cases with column family information
def apply(
rocksDBFiles: Seq[RocksDBImmutableFile],
numKeys: Long,
columnFamilyMapping: Map[String, Short],
maxColumnFamilyId: Short): RocksDBCheckpointMetadata = {
val (sstFiles, logFiles) = rocksDBFiles.partition(_.isInstanceOf[RocksDBSstFile])
new RocksDBCheckpointMetadata(
sstFiles.map(_.asInstanceOf[RocksDBSstFile]),
logFiles.map(_.asInstanceOf[RocksDBLogFile]),
numKeys,
Some(columnFamilyMapping),
Some(maxColumnFamilyId)
)
}

RocksDBCheckpointMetadata(sstFiles, logFiles, numKeys)
// Apply method for cases with separate sstFiles and logFiles, and column family information
def apply(
sstFiles: Seq[RocksDBSstFile],
logFiles: Seq[RocksDBLogFile],
numKeys: Long,
columnFamilyMapping: Map[String, Short],
maxColumnFamilyId: Short): RocksDBCheckpointMetadata = {
new RocksDBCheckpointMetadata(
sstFiles,
logFiles,
numKeys,
Some(columnFamilyMapping),
Some(maxColumnFamilyId)
)
}
}

Expand Down
Loading

0 comments on commit e64f620

Please sign in to comment.