Skip to content

Commit

Permalink
address more comments from Anish
Browse files Browse the repository at this point in the history
  • Loading branch information
eason-yuchen-liu committed Jun 14, 2024
1 parent 1870b35 commit fe9cea1
Show file tree
Hide file tree
Showing 6 changed files with 25 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -209,10 +209,12 @@ object StateSourceOptions extends DataSourceOptions {
throw StateDataSourceErrors.invalidOptionValueIsNegative(SNAPSHOT_PARTITION_ID)
}

// both snapshotPartitionId and snapshotStartBatchId are required at the same time, because
// each partition may have different checkpoint status
if (snapshotPartitionId.isDefined && snapshotStartBatchId.isEmpty) {
throw StateDataSourceErrors.requiredOptionUnspecified(SNAPSHOT_START_BATCH_ID.toString)
throw StateDataSourceErrors.requiredOptionUnspecified(SNAPSHOT_START_BATCH_ID)
} else if (snapshotPartitionId.isEmpty && snapshotStartBatchId.isDefined) {
throw StateDataSourceErrors.requiredOptionUnspecified(SNAPSHOT_PARTITION_ID.toString)
throw StateDataSourceErrors.requiredOptionUnspecified(SNAPSHOT_PARTITION_ID)
}

StateSourceOptions(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.connector.read.{Batch, InputPartition, PartitionReaderFactory, Scan, ScanBuilder}
import org.apache.spark.sql.execution.datasources.v2.state.StateSourceOptions.JoinSideValues
import org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinHelper.{LeftSide, RightSide}
import org.apache.spark.sql.execution.streaming.state.{StateStoreConf, StateStoreSnapshotPartitionNotFound}
import org.apache.spark.sql.execution.streaming.state.{StateStoreConf, StateStoreErrors}
import org.apache.spark.sql.types.StructType
import org.apache.spark.util.SerializableConfiguration

Expand Down Expand Up @@ -82,14 +82,14 @@ class StateScan(
s"No continuous partitions in state: ${partitionNums.mkString("Array(", ", ", ")")}")

sourceOptions.snapshotPartitionId match {
case None => partitionNums.map {
pn => new StateStoreInputPartition(pn, queryId, sourceOptions)
case None => partitionNums.map { pn =>
new StateStoreInputPartition(pn, queryId, sourceOptions)
}.toArray
case Some(snapshotPartitionId) =>
if (partitionNums.contains(snapshotPartitionId)) {
Array(new StateStoreInputPartition(snapshotPartitionId, queryId, sourceOptions))
} else {
throw new StateStoreSnapshotPartitionNotFound(
throw StateStoreErrors.stateStoreSnapshotPartitionNotFound(
snapshotPartitionId, sourceOptions.operatorId,
sourceOptions.stateCheckpointLocation.toString)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -613,7 +613,8 @@ private[sql] class HDFSBackedStateStoreProvider extends StateStoreProvider with
readSnapshotFile(startVersion)
}
if (startVersionMap.isEmpty) {
throw new StateStoreSnapshotFileNotFound(snapshotFile(startVersion).toString, toString())
throw StateStoreErrors.stateStoreSnapshotFileNotFound(
snapshotFile(startVersion).toString, toString())
}
synchronized { putStateIntoStateCacheMap(startVersion, startVersionMap.get) }

Expand Down Expand Up @@ -772,7 +773,7 @@ private[sql] class HDFSBackedStateStoreProvider extends StateStoreProvider with
}

/**
* try to read the snapshot file of the given version.
* try to read the snapshot file of the given version.
* If the snapshot file is not available, return [[None]].
*
* @param version the version of the snapshot file
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,17 @@ object StateStoreErrors {
newValueSchema: String): StateStoreValueSchemaNotCompatible = {
new StateStoreValueSchemaNotCompatible(storedValueSchema, newValueSchema)
}

def stateStoreSnapshotFileNotFound(fileToRead: String, clazz: String):
StateStoreSnapshotFileNotFound = {
new StateStoreSnapshotFileNotFound(fileToRead, clazz)
}

def stateStoreSnapshotPartitionNotFound(
snapshotPartitionId: Long, operatorId: Int, checkpointLocation: String):
StateStoreSnapshotPartitionNotFound = {
new StateStoreSnapshotPartitionNotFound(snapshotPartitionId, operatorId, checkpointLocation)
}
}

class StateStoreMultipleColumnFamiliesNotSupportedException(stateStoreProvider: String)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -494,8 +494,7 @@ class SymmetricHashJoinStateManager(
useMultipleValuesPerKey = false)
if (snapshotStartVersion.isDefined) {
stateStoreProvider.getStore(snapshotStartVersion.get, stateInfo.get.storeVersion)
}
else {
} else {
stateStoreProvider.getStore(stateInfo.get.storeVersion)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ class StateDataSourceNegativeTestSuite extends StateDataSourceTestBase {
}
}

test("ERROR: snapshotStartBatchId specified to negative") {
test("ERROR: snapshotStartBatchId specified as a negative value") {
withTempDir { tempDir =>
val exc = intercept[StateDataSourceInvalidOptionValueIsNegative] {
spark.read.format("statestore")
Expand All @@ -210,7 +210,7 @@ class StateDataSourceNegativeTestSuite extends StateDataSourceTestBase {
}
}

test("ERROR: snapshotPartitionId specified to negative") {
test("ERROR: snapshotPartitionId specified as a negative value") {
withTempDir { tempDir =>
val exc = intercept[StateDataSourceInvalidOptionValueIsNegative] {
spark.read.format("statestore")
Expand Down Expand Up @@ -257,7 +257,6 @@ class StateDataSourceNegativeTestSuite extends StateDataSourceTestBase {
val exc = intercept[StateDataSourceInvalidOptionValue] {
spark.read.format("statestore")
// trick to bypass getting the last committed batch before validating operator ID
.option(StateSourceOptions.BATCH_ID, 0)
.option(StateSourceOptions.SNAPSHOT_START_BATCH_ID, startBatchId)
.option(StateSourceOptions.BATCH_ID, endBatchId)
.load(tempDir.getAbsolutePath)
Expand Down

0 comments on commit fe9cea1

Please sign in to comment.