Skip to content

Commit

Permalink
[SPARK-48957][SS] Return sub-classified error class on state store lo…
Browse files Browse the repository at this point in the history
…ad for hdfs and rocksdb provider

### What changes were proposed in this pull request?
Return sub-classified error class on state store load for hdfs and rocksdb provider

### Why are the changes needed?
Without the change, all the higher level functions were seeing the exception and error class as `CANNOT_LOAD_STATE_STORE.UNCATEGORIZED`

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Augmented unit tests

```
===== POSSIBLE THREAD LEAK IN SUITE o.a.s.sql.execution.streaming.state.RocksDBSuite, threads: ForkJoinPool.commonPool-worker-6 (daemon=true), ForkJoinPool.commonPool-worker-4 (daemon=true), ForkJoinPool.commonPool-worker-7 (daemon=true), ForkJoinPool.commonPool-worker-5 (daemon=true), ForkJoinPool.commonPool-worker-3 (daemon=true), rpc-boss-3-1 (daemon=true), ForkJoinPool.commonPool-worker-8 (daemon=true), shuffle-boss-6-1 (daemon=true), ForkJoinPool.commonPool-worker-1 (daemon=true), ForkJoinPool.common...
[info] Run completed in 4 minutes, 12 seconds.
[info] Total number of tests run: 176
[info] Suites: completed 1, aborted 0
[info] Tests: succeeded 176, failed 0, canceled 0, ignored 0, pending 0
[info] All tests passed.
```

### Was this patch authored or co-authored using generative AI tooling?
No

Closes apache#47431 from anishshri-db/task/SPARK-48957.

Authored-by: Anish Shrigondekar <anish.shrigondekar@databricks.com>
Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
  • Loading branch information
anishshri-db authored and HeartSaVioR committed Jul 23, 2024
1 parent 71f3f1f commit 285489b
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import org.apache.commons.io.IOUtils
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs._

import org.apache.spark.{SparkConf, SparkEnv}
import org.apache.spark.{SparkConf, SparkEnv, SparkException}
import org.apache.spark.internal.{Logging, LogKeys, MDC, MessageWithContext}
import org.apache.spark.io.CompressionCodec
import org.apache.spark.sql.catalyst.expressions.UnsafeRow
Expand Down Expand Up @@ -282,6 +282,8 @@ private[sql] class HDFSBackedStateStoreProvider extends StateStoreProvider with
newMap
}
catch {
case e: SparkException if e.getErrorClass.contains("CANNOT_LOAD_STATE_STORE") =>
throw e
case e: Throwable => throw QueryExecutionErrors.cannotLoadStore(e)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import scala.util.control.NonFatal
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path

import org.apache.spark.{SparkConf, SparkEnv}
import org.apache.spark.{SparkConf, SparkEnv, SparkException}
import org.apache.spark.internal.{Logging, MDC}
import org.apache.spark.internal.LogKeys._
import org.apache.spark.io.CompressionCodec
Expand Down Expand Up @@ -374,6 +374,8 @@ private[sql] class RocksDBStateStoreProvider
new RocksDBStateStore(version)
}
catch {
case e: SparkException if e.getErrorClass.contains("CANNOT_LOAD_STATE_STORE") =>
throw e
case e: Throwable => throw QueryExecutionErrors.cannotLoadStore(e)
}
}
Expand All @@ -387,6 +389,8 @@ private[sql] class RocksDBStateStoreProvider
new RocksDBStateStore(version)
}
catch {
case e: SparkException if e.getErrorClass.contains("CANNOT_LOAD_STATE_STORE") =>
throw e
case e: Throwable => throw QueryExecutionErrors.cannotLoadStore(e)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,16 +201,16 @@ class RocksDBSuite extends AlsoTestWithChangelogCheckpointingEnabled with Shared
}
checkError(
ex,
errorClass = "CANNOT_LOAD_STATE_STORE.UNCATEGORIZED",
parameters = Map.empty
errorClass = "CANNOT_LOAD_STATE_STORE.UNEXPECTED_VERSION",
parameters = Map("version" -> "-1")
)
ex = intercept[SparkException] {
provider.getReadStore(-1)
}
checkError(
ex,
errorClass = "CANNOT_LOAD_STATE_STORE.UNCATEGORIZED",
parameters = Map.empty
errorClass = "CANNOT_LOAD_STATE_STORE.UNEXPECTED_VERSION",
parameters = Map("version" -> "-1")
)

val remoteDir = Utils.createTempDir().toString
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -363,7 +363,7 @@ class StateStoreSuite extends StateStoreSuiteBase[HDFSBackedStateStoreProvider]
getData(provider, snapshotVersion - 1)
}
checkError(
e.getCause.asInstanceOf[SparkThrowable],
e,
errorClass = "CANNOT_LOAD_STATE_STORE.CANNOT_READ_DELTA_FILE_NOT_EXISTS",
parameters = Map(
"fileToRead" -> s"${provider.stateStoreId.storeCheckpointLocation()}/1.delta",
Expand Down Expand Up @@ -1089,14 +1089,12 @@ abstract class StateStoreSuiteBase[ProviderClass <: StateStoreProvider]
var e = intercept[SparkException] {
provider.getStore(2)
}
assert(e.getCause.isInstanceOf[SparkException])
assert(e.getCause.getMessage.contains("does not exist"))
assert(e.getMessage.contains("does not exist"))

e = intercept[SparkException] {
getData(provider, 2, useColumnFamilies = colFamiliesEnabled)
}
assert(e.getCause.isInstanceOf[SparkException])
assert(e.getCause.getMessage.contains("does not exist"))
assert(e.getMessage.contains("does not exist"))

// New updates to the reloaded store with new version, and does not change old version
tryWithProviderResource(newStoreProvider(store.id, colFamiliesEnabled)) { reloadedProvider =>
Expand Down Expand Up @@ -1236,19 +1234,37 @@ abstract class StateStoreSuiteBase[ProviderClass <: StateStoreProvider]

testWithAllCodec(s"getStore with invalid versions") { colFamiliesEnabled =>
tryWithProviderResource(newStoreProvider(colFamiliesEnabled)) { provider =>
def checkInvalidVersion(version: Int): Unit = {
def checkInvalidVersion(version: Int, isHDFSBackedStoreProvider: Boolean): Unit = {
val e = intercept[SparkException] {
provider.getStore(version)
}
checkError(
e,
errorClass = "CANNOT_LOAD_STATE_STORE.UNCATEGORIZED",
parameters = Map.empty
)
if (version < 0) {
checkError(
e,
errorClass = "CANNOT_LOAD_STATE_STORE.UNEXPECTED_VERSION",
parameters = Map("version" -> version.toString)
)
} else {
if (isHDFSBackedStoreProvider) {
checkError(
e,
errorClass = "CANNOT_LOAD_STATE_STORE.CANNOT_READ_DELTA_FILE_NOT_EXISTS",
parameters = Map("fileToRead" -> ".*", "clazz" -> ".*"),
matchPVals = true
)
} else {
checkError(
e,
errorClass = "CANNOT_LOAD_STATE_STORE.CANNOT_READ_STREAMING_STATE_FILE",
parameters = Map("fileToRead" -> ".*"),
matchPVals = true
)
}
}
}

checkInvalidVersion(-1)
checkInvalidVersion(1)
checkInvalidVersion(-1, provider.isInstanceOf[HDFSBackedStateStoreProvider])
checkInvalidVersion(1, provider.isInstanceOf[HDFSBackedStateStoreProvider])

val store = provider.getStore(0)
put(store, "a", 0, 1)
Expand All @@ -1258,8 +1274,8 @@ abstract class StateStoreSuiteBase[ProviderClass <: StateStoreProvider]
val store1_ = provider.getStore(1)
assert(rowPairsToDataSet(store1_.iterator()) === Set(("a", 0) -> 1))

checkInvalidVersion(-1)
checkInvalidVersion(2)
checkInvalidVersion(-1, provider.isInstanceOf[HDFSBackedStateStoreProvider])
checkInvalidVersion(2, provider.isInstanceOf[HDFSBackedStateStoreProvider])

// Update store version with some data
val store1 = provider.getStore(1)
Expand All @@ -1268,8 +1284,8 @@ abstract class StateStoreSuiteBase[ProviderClass <: StateStoreProvider]
assert(store1.commit() === 2)
assert(rowPairsToDataSet(store1.iterator()) === Set(("a", 0) -> 1, ("b", 0) -> 1))

checkInvalidVersion(-1)
checkInvalidVersion(3)
checkInvalidVersion(-1, provider.isInstanceOf[HDFSBackedStateStoreProvider])
checkInvalidVersion(3, provider.isInstanceOf[HDFSBackedStateStoreProvider])
}
}

Expand Down Expand Up @@ -1444,7 +1460,7 @@ abstract class StateStoreSuiteBase[ProviderClass <: StateStoreProvider]
storeConf, hadoopConf)
}
checkError(
e.getCause.asInstanceOf[SparkThrowable],
e,
errorClass = "CANNOT_LOAD_STATE_STORE.CANNOT_READ_DELTA_FILE_NOT_EXISTS",
parameters = Map(
"fileToRead" -> s"$dir/0/0/1.delta",
Expand Down

0 comments on commit 285489b

Please sign in to comment.