Skip to content

Commit

Permalink
[SPARK-46968][SQL] Replace UnsupportedOperationException by `SparkU…
Browse files Browse the repository at this point in the history
…nsupportedOperationException` in `sql`

### What changes were proposed in this pull request?
In the PR, I propose to replace all `UnsupportedOperationException` by `SparkUnsupportedOperationException` in `sql` code base, and introduce new legacy error classes with the `_LEGACY_ERROR_TEMP_` prefix.

### Why are the changes needed?
To unify Spark SQL exception, and port Java exceptions on Spark exceptions with error classes.

### Does this PR introduce _any_ user-facing change?
Yes, it can if user's code assumes some particular format of `UnsupportedOperationException` messages.

### How was this patch tested?
By running the affected test suites:
```
$ build/sbt "core/testOnly *SparkThrowableSuite"
```

### Was this patch authored or co-authored using generative AI tooling?
No.

Closes apache#44937 from MaxGekk/migrate-UnsupportedOperationException-api.

Authored-by: Max Gekk <max.gekk@gmail.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
  • Loading branch information
MaxGekk authored and dongjoon-hyun committed Feb 3, 2024
1 parent 88d4681 commit 6e60b23
Show file tree
Hide file tree
Showing 8 changed files with 27 additions and 18 deletions.
10 changes: 10 additions & 0 deletions common/utils/src/main/resources/error/error-classes.json
Original file line number Diff line number Diff line change
Expand Up @@ -7489,6 +7489,16 @@
"Datatype not supported <dt>"
]
},
"_LEGACY_ERROR_TEMP_3193" : {
"message" : [
"Creating multiple column families with HDFSBackedStateStoreProvider is not supported"
]
},
"_LEGACY_ERROR_TEMP_3197" : {
"message" : [
"Failed to create column family with reserved name=<colFamilyName>"
]
},
"_LEGACY_ERROR_USER_RAISED_EXCEPTION" : {
"message" : [
"<errorMessage>"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.sql.catalyst.trees

import org.apache.spark.{QueryContext, QueryContextType}
import org.apache.spark.{QueryContext, QueryContextType, SparkUnsupportedOperationException}

/** The class represents error context of a SQL query. */
case class SQLQueryContext(
Expand Down Expand Up @@ -131,16 +131,16 @@ case class SQLQueryContext(
originStartIndex.get <= originStopIndex.get
}

override def callSite: String = throw new UnsupportedOperationException
override def callSite: String = throw SparkUnsupportedOperationException()
}

case class DataFrameQueryContext(stackTrace: Seq[StackTraceElement]) extends QueryContext {
override val contextType = QueryContextType.DataFrame

override def objectType: String = throw new UnsupportedOperationException
override def objectName: String = throw new UnsupportedOperationException
override def startIndex: Int = throw new UnsupportedOperationException
override def stopIndex: Int = throw new UnsupportedOperationException
override def objectType: String = throw SparkUnsupportedOperationException()
override def objectName: String = throw SparkUnsupportedOperationException()
override def startIndex: Int = throw SparkUnsupportedOperationException()
override def stopIndex: Int = throw SparkUnsupportedOperationException()

override val fragment: String = {
stackTrace.headOption.map { firstElem =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package org.apache.spark.sql.catalyst.util

import scala.util.control.NonFatal

import org.apache.spark.SparkUnsupportedOperationException
import org.apache.spark.sql.types.UserDefinedType
import org.apache.spark.util.SparkClassUtils

Expand Down Expand Up @@ -53,6 +54,6 @@ private[sql] object UDTUtils extends UDTUtils {

private[sql] object DefaultUDTUtils extends UDTUtils {
override def toRow(value: Any, udt: UserDefinedType[Any]): Any = {
throw new UnsupportedOperationException()
throw SparkUnsupportedOperationException()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ import org.apache.spark.unsafe.Platform
* instance that is backed by an on-heap byte array.
*
* Note that this serializer implements only the [[Serializer]] methods that are used during
* shuffle, so certain [[SerializerInstance]] methods will throw UnsupportedOperationException.
* shuffle, so certain [[SerializerInstance]] methods will throw SparkUnsupportedOperationException.
*
* @param numFields the number of fields in the row being serialized.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,8 +178,8 @@ abstract class CompactibleFileStreamLog[T <: AnyRef : ClassTag](
* CompactibleFileStreamLog maintains logs by itself, and manual purging might break internal
* state, specifically which latest compaction batch is purged.
*
* To simplify the situation, this method just throws UnsupportedOperationException regardless
* of given parameter, and let CompactibleFileStreamLog handles purging by itself.
* To simplify the situation, this method just throws SparkUnsupportedOperationException
* regardless of given parameter, and let CompactibleFileStreamLog handles purging by itself.
*/
override def purge(thresholdBatchId: Long): Unit =
throw QueryExecutionErrors.cannotPurgeAsBreakInternalStateError()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@
*/
package org.apache.spark.sql.execution.streaming

import java.io.Serializable

import org.apache.commons.lang3.SerializationUtils

import org.apache.spark.internal.Logging
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import org.apache.commons.io.IOUtils
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs._

import org.apache.spark.{SparkConf, SparkEnv}
import org.apache.spark.{SparkConf, SparkEnv, SparkUnsupportedOperationException}
import org.apache.spark.internal.Logging
import org.apache.spark.io.CompressionCodec
import org.apache.spark.sql.catalyst.expressions.UnsafeRow
Expand Down Expand Up @@ -115,8 +115,7 @@ private[sql] class HDFSBackedStateStoreProvider extends StateStoreProvider with
override def id: StateStoreId = HDFSBackedStateStoreProvider.this.stateStoreId

override def createColFamilyIfAbsent(colFamilyName: String): Unit = {
throw new UnsupportedOperationException("Creating multiple column families with " +
"HDFSBackedStateStoreProvider is not supported")
throw new SparkUnsupportedOperationException("_LEGACY_ERROR_TEMP_3193")
}

override def get(key: UnsafeRow, colFamilyName: String): UnsafeRow = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import org.rocksdb.{RocksDB => NativeRocksDB, _}
import org.rocksdb.CompressionType._
import org.rocksdb.TickerType._

import org.apache.spark.TaskContext
import org.apache.spark.{SparkUnsupportedOperationException, TaskContext}
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
import org.apache.spark.sql.errors.QueryExecutionErrors
Expand Down Expand Up @@ -252,8 +252,9 @@ class RocksDB(
*/
def createColFamilyIfAbsent(colFamilyName: String): Unit = {
if (colFamilyName == StateStore.DEFAULT_COL_FAMILY_NAME) {
throw new UnsupportedOperationException("Failed to create column family with reserved " +
s"name=$colFamilyName")
throw new SparkUnsupportedOperationException(
errorClass = "_LEGACY_ERROR_TEMP_3197",
messageParameters = Map("colFamilyName" -> colFamilyName).toMap)
}

if (!checkColFamilyExists(colFamilyName)) {
Expand Down

0 comments on commit 6e60b23

Please sign in to comment.