Skip to content

Commit

Permalink
[SPARK-44940][SQL] Improve performance of JSON parsing when "spark.sq…
Browse files Browse the repository at this point in the history
…l.json.enablePartialResults" is enabled

### What changes were proposed in this pull request?

The PR improves JSON parsing when `spark.sql.json.enablePartialResults` is enabled:
- Fixes the issue when using nested arrays `ClassCastException: org.apache.spark.sql.catalyst.util.GenericArrayData cannot be cast to org.apache.spark.sql.catalyst.InternalRow`
- Improves parsing of the nested struct fields, e.g. `{"a1": "AAA", "a2": [{"f1": "", "f2": ""}], "a3": "id1", "a4": "XXX"}` used to be parsed as `|AAA|NULL    |NULL|NULL|` and now is parsed as `|AAA|[{NULL, }]|id1|XXX|`.
- Improves performance of nested JSON parsing. The initial implementation would throw too many exceptions when multiple nested fields failed to parse. When the config is disabled, it is not a problem because the entire record is marked as NULL.

The internal benchmarks show the performance improvement from slowdown of over 160% to an improvement of 7-8% compared to the master branch when the flag is enabled. I will create a follow-up ticket to add a benchmark for this regression.

### Why are the changes needed?

Fixes some corner cases in JSON parsing and improves performance when `spark.sql.json.enablePartialResults` is enabled.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

I added tests to verify nested structs, maps, and arrays can be parsed without affecting the subsequent fields in the JSON. I also updated the existing tests when `spark.sql.json.enablePartialResults` is enabled because we parse more data now.

I added a benchmark to check performance.

Before the change (master, a45a3a3):
```
[info] OpenJDK 64-Bit Server VM 1.8.0_292-8u292-b10-0ubuntu1~18.04-b10 on Linux 5.4.0-1045-aws
[info] Intel(R) Xeon(R) Platinum 8375C CPU  2.90GHz
[info] Partial JSON results:                     Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
[info] ------------------------------------------------------------------------------------------------------------------------
[info] parse invalid JSON                                 9537           9820         452          0.0      953651.6       1.0X
```

After the change (this PR):
```
OpenJDK 64-Bit Server VM 1.8.0_292-8u292-b10-0ubuntu1~18.04-b10 on Linux 5.4.0-1045-aws
Intel(R) Xeon(R) Platinum 8375C CPU  2.90GHz
Partial JSON results:                     Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------------------------------
parse invalid JSON                                 3100           3106           6          0.0      309967.6       1.0X
```

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes apache#42667 from sadikovi/SPARK-44940.

Authored-by: Ivan Sadikov <ivan.sadikov@databricks.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
  • Loading branch information
sadikovi authored and HyukjinKwon committed Sep 4, 2023
1 parent cf0a5cb commit 74c1f02
Show file tree
Hide file tree
Showing 7 changed files with 384 additions and 94 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -420,17 +420,17 @@ class JacksonParser(
case VALUE_STRING if parser.getTextLength < 1 && allowEmptyString =>
dataType match {
case FloatType | DoubleType | TimestampType | DateType =>
throw QueryExecutionErrors.emptyJsonFieldValueError(dataType)
throw EmptyJsonFieldValueException(dataType)
case _ => null
}

case VALUE_STRING if parser.getTextLength < 1 =>
throw QueryExecutionErrors.emptyJsonFieldValueError(dataType)
throw EmptyJsonFieldValueException(dataType)

case token =>
// We cannot parse this token based on the given data type. So, we throw a
// RuntimeException and this exception will be caught by `parse` method.
throw QueryExecutionErrors.cannotParseJSONFieldError(parser, token, dataType)
throw CannotParseJSONFieldException(parser.getCurrentName, parser.getText, token, dataType)
}

/**
Expand Down Expand Up @@ -459,6 +459,11 @@ class JacksonParser(
bitmask(index) = false
} catch {
case e: SparkUpgradeException => throw e
case err: PartialValueException if enablePartialResults =>
badRecordException = badRecordException.orElse(Some(err.cause))
row.update(index, err.partialResult)
skipRow = structFilters.skipRow(row, index)
bitmask(index) = false
case NonFatal(e) if isRoot || enablePartialResults =>
badRecordException = badRecordException.orElse(Some(e))
parser.skipChildren()
Expand Down Expand Up @@ -508,7 +513,7 @@ class JacksonParser(
if (badRecordException.isEmpty) {
mapData
} else {
throw PartialResultException(InternalRow(mapData), badRecordException.get)
throw PartialMapDataResultException(mapData, badRecordException.get)
}
}

Expand Down Expand Up @@ -543,10 +548,21 @@ class JacksonParser(
throw PartialResultArrayException(arrayData.toArray[InternalRow](schema),
badRecordException.get)
} else {
throw PartialResultException(InternalRow(arrayData), badRecordException.get)
throw PartialArrayDataResultException(arrayData, badRecordException.get)
}
}

/**
* Converts the non-stacktrace exceptions to user-friendly QueryExecutionErrors.
*/
private def convertCauseForPartialResult(err: Throwable): Throwable = err match {
case CannotParseJSONFieldException(fieldName, fieldValue, jsonType, dataType) =>
QueryExecutionErrors.cannotParseJSONFieldError(fieldName, fieldValue, jsonType, dataType)
case EmptyJsonFieldValueException(dataType) =>
QueryExecutionErrors.emptyJsonFieldValueError(dataType)
case _ => err
}

/**
* Parse the JSON input to the set of [[InternalRow]]s.
*
Expand Down Expand Up @@ -589,12 +605,25 @@ class JacksonParser(
throw BadRecordException(
record = () => recordLiteral(record),
partialResults = () => Array(row),
cause)
convertCauseForPartialResult(cause))
case PartialResultArrayException(rows, cause) =>
throw BadRecordException(
record = () => recordLiteral(record),
partialResults = () => rows,
cause)
// These exceptions should never be thrown outside of JacksonParser.
// They are used for the control flow in the parser. We add them here for completeness
// since they also indicate a bad record.
case PartialArrayDataResultException(arrayData, cause) =>
throw BadRecordException(
record = () => recordLiteral(record),
partialResults = () => Array(InternalRow(arrayData)),
convertCauseForPartialResult(cause))
case PartialMapDataResultException(mapData, cause) =>
throw BadRecordException(
record = () => recordLiteral(record),
partialResults = () => Array(InternalRow(mapData)),
convertCauseForPartialResult(cause))
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,44 @@

package org.apache.spark.sql.catalyst.util

import com.fasterxml.jackson.core.JsonToken

import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.types.DataType
import org.apache.spark.unsafe.types.UTF8String

abstract class PartialValueException(val cause: Throwable) extends Exception(cause) {
def partialResult: Serializable
override def getStackTrace(): Array[StackTraceElement] = cause.getStackTrace()
override def fillInStackTrace(): Throwable = this
}

/**
* Exception thrown when the underlying parser returns a partial result of parsing.
* Exception thrown when the underlying parser returns a partial result of parsing an object/row.
* @param partialResult the partial result of parsing a bad record.
* @param cause the actual exception about why the parser cannot return full result.
*/
case class PartialResultException(
partialResult: InternalRow,
cause: Throwable)
extends Exception(cause)
override val partialResult: InternalRow,
override val cause: Throwable) extends PartialValueException(cause)

/**
* Exception thrown when the underlying parser returns a partial array result.
* @param partialResult the partial array result.
* @param cause the actual exception about why the parser cannot return full result.
*/
case class PartialArrayDataResultException(
override val partialResult: ArrayData,
override val cause: Throwable) extends PartialValueException(cause)

/**
* Exception thrown when the underlying parser returns a partial map result.
* @param partialResult the partial map result.
* @param cause the actual exception about why the parser cannot return full result.
*/
case class PartialMapDataResultException(
override val partialResult: MapData,
override val cause: Throwable) extends PartialValueException(cause)

/**
* Exception thrown when the underlying parser returns partial result list of parsing.
Expand Down Expand Up @@ -65,3 +90,25 @@ case class StringAsDataTypeException(
fieldName: String,
fieldValue: String,
dataType: DataType) extends RuntimeException()

/**
* No-stacktrace equivalent of `QueryExecutionErrors.cannotParseJSONFieldError`.
* Used for code control flow in the parser without overhead of creating a full exception.
*/
case class CannotParseJSONFieldException(
fieldName: String,
fieldValue: String,
jsonType: JsonToken,
dataType: DataType) extends RuntimeException() {
override def getStackTrace(): Array[StackTraceElement] = new Array[StackTraceElement](0)
override def fillInStackTrace(): Throwable = this
}

/**
* No-stacktrace equivalent of `QueryExecutionErrors.emptyJsonFieldValueError`.
* Used for code control flow in the parser without overhead of creating a full exception.
*/
case class EmptyJsonFieldValueException(dataType: DataType) extends RuntimeException() {
override def getStackTrace(): Array[StackTraceElement] = new Array[StackTraceElement](0)
override def fillInStackTrace(): Throwable = this
}
Original file line number Diff line number Diff line change
Expand Up @@ -1279,11 +1279,19 @@ private[sql] object QueryExecutionErrors extends QueryErrorsBase with ExecutionE

def cannotParseJSONFieldError(parser: JsonParser, jsonType: JsonToken, dataType: DataType)
: SparkRuntimeException = {
cannotParseJSONFieldError(parser.getCurrentName, parser.getText, jsonType, dataType)
}

def cannotParseJSONFieldError(
fieldName: String,
fieldValue: String,
jsonType: JsonToken,
dataType: DataType): SparkRuntimeException = {
new SparkRuntimeException(
errorClass = "CANNOT_PARSE_JSON_FIELD",
messageParameters = Map(
"fieldName" -> toSQLValue(parser.getCurrentName, StringType),
"fieldValue" -> parser.getText,
"fieldName" -> toSQLValue(fieldName, StringType),
"fieldValue" -> fieldValue,
"jsonType" -> jsonType.toString(),
"dataType" -> toSQLType(dataType)))
}
Expand Down
Loading

0 comments on commit 74c1f02

Please sign in to comment.