Skip to content

Commit

Permalink
[SPARK-25243][SQL] Use FailureSafeParser in from_json
Browse files Browse the repository at this point in the history
## What changes were proposed in this pull request?

In the PR, I propose to switch `from_json` on `FailureSafeParser`, and to make the function compatible to `PERMISSIVE` mode by default, and to support the `FAILFAST` mode as well. The `DROPMALFORMED` mode is not supported by `from_json`.

## How was this patch tested?

It was tested by existing `JsonSuite`/`CSVSuite`, `JsonFunctionsSuite` and `JsonExpressionsSuite` as well as new tests for `from_json` which checks different modes.

Closes apache#22237 from MaxGekk/from_json-failuresafe.

Lead-authored-by: Maxim Gekk <maxim.gekk@databricks.com>
Co-authored-by: hyukjinkwon <gurwls223@apache.org>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
  • Loading branch information
2 people authored and jackylee-ch committed Feb 18, 2019
1 parent 70f834c commit 1378143
Show file tree
Hide file tree
Showing 11 changed files with 95 additions and 61 deletions.
2 changes: 1 addition & 1 deletion R/pkg/tests/fulltests/test_sparkSQL.R
Original file line number Diff line number Diff line change
Expand Up @@ -1694,7 +1694,7 @@ test_that("column functions", {
df <- as.DataFrame(list(list("col" = "{\"date\":\"21/10/2014\"}")))
schema2 <- structType(structField("date", "date"))
s <- collect(select(df, from_json(df$col, schema2)))
expect_equal(s[[1]][[1]], NA)
expect_equal(s[[1]][[1]]$date, NA)
s <- collect(select(df, from_json(df$col, schema2, dateFormat = "dd/MM/yyyy")))
expect_is(s[[1]][[1]]$date, "Date")
expect_equal(as.character(s[[1]][[1]]$date), "2014-10-21")
Expand Down
2 changes: 2 additions & 0 deletions docs/sql-migration-guide-upgrade.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ displayTitle: Spark SQL Upgrading Guide

- In Spark version 2.4 and earlier, the parser of JSON data source treats empty strings as null for some data types such as `IntegerType`. For `FloatType` and `DoubleType`, it fails on empty strings and throws exceptions. Since Spark 3.0, we disallow empty strings and will throw exceptions for data types except for `StringType` and `BinaryType`.

- Since Spark 3.0, the `from_json` functions supports two modes - `PERMISSIVE` and `FAILFAST`. The modes can be set via the `mode` option. The default mode became `PERMISSIVE`. In previous versions, behavior of `from_json` did not conform to either `PERMISSIVE` nor `FAILFAST`, especially in processing of malformed JSON records. For example, the JSON string `{"a" 1}` with the schema `a INT` is converted to `null` by previous versions but Spark 3.0 converts it to `Row(null)`.

## Upgrading From Spark SQL 2.3 to 2.4

- In Spark version 2.3 and earlier, the second parameter to array_contains function is implicitly promoted to the element type of first array type parameter. This type promotion can be lossy and may cause `array_contains` function to return wrong result. This problem has been addressed in 2.4 by employing a safer type promotion mechanism. This can cause some change in behavior and are illustrated in the table below.
Expand Down
2 changes: 1 addition & 1 deletion python/pyspark/sql/functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -2305,7 +2305,7 @@ def from_json(col, schema, options={}):
[Row(json=[Row(a=1)])]
>>> schema = schema_of_json(lit('''{"a": 0}'''))
>>> df.select(from_json(df.value, schema).alias("json")).collect()
[Row(json=Row(a=1))]
[Row(json=Row(a=None))]
>>> data = [(1, '''[1, 2, 3]''')]
>>> schema = ArrayType(IntegerType())
>>> df = spark.createDataFrame(data, ("key", "value"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -554,54 +554,44 @@ case class JsonToStructs(
@transient
lazy val converter = nullableSchema match {
case _: StructType =>
(rows: Seq[InternalRow]) => if (rows.length == 1) rows.head else null
(rows: Iterator[InternalRow]) => if (rows.hasNext) rows.next() else null
case _: ArrayType =>
(rows: Seq[InternalRow]) => rows.head.getArray(0)
(rows: Iterator[InternalRow]) => if (rows.hasNext) rows.next().getArray(0) else null
case _: MapType =>
(rows: Seq[InternalRow]) => rows.head.getMap(0)
(rows: Iterator[InternalRow]) => if (rows.hasNext) rows.next().getMap(0) else null
}

@transient
lazy val parser =
new JacksonParser(
nullableSchema,
new JSONOptions(options + ("mode" -> FailFastMode.name), timeZoneId.get))
val nameOfCorruptRecord = SQLConf.get.getConf(SQLConf.COLUMN_NAME_OF_CORRUPT_RECORD)
@transient lazy val parser = {
val parsedOptions = new JSONOptions(options, timeZoneId.get, nameOfCorruptRecord)
val mode = parsedOptions.parseMode
if (mode != PermissiveMode && mode != FailFastMode) {
throw new IllegalArgumentException(s"from_json() doesn't support the ${mode.name} mode. " +
s"Acceptable modes are ${PermissiveMode.name} and ${FailFastMode.name}.")
}
val rawParser = new JacksonParser(nullableSchema, parsedOptions, allowArrayAsStructs = false)
val createParser = CreateJacksonParser.utf8String _

val parserSchema = nullableSchema match {
case s: StructType => s
case other => StructType(StructField("value", other) :: Nil)
}

new FailureSafeParser[UTF8String](
input => rawParser.parse(input, createParser, identity[UTF8String]),
mode,
parserSchema,
parsedOptions.columnNameOfCorruptRecord,
parsedOptions.multiLine)
}

override def dataType: DataType = nullableSchema

override def withTimeZone(timeZoneId: String): TimeZoneAwareExpression =
copy(timeZoneId = Option(timeZoneId))

override def nullSafeEval(json: Any): Any = {
// When input is,
// - `null`: `null`.
// - invalid json: `null`.
// - empty string: `null`.
//
// When the schema is array,
// - json array: `Array(Row(...), ...)`
// - json object: `Array(Row(...))`
// - empty json array: `Array()`.
// - empty json object: `Array(Row(null))`.
//
// When the schema is a struct,
// - json object/array with single element: `Row(...)`
// - json array with multiple elements: `null`
// - empty json array: `null`.
// - empty json object: `Row(null)`.

// We need `null` if the input string is an empty string. `JacksonParser` can
// deal with this but produces `Nil`.
if (json.toString.trim.isEmpty) return null

try {
converter(parser.parse(
json.asInstanceOf[UTF8String],
CreateJacksonParser.utf8String,
identity[UTF8String]))
} catch {
case _: BadRecordException => null
}
converter(parser.parse(json.asInstanceOf[UTF8String]))
}

override def inputTypes: Seq[AbstractDataType] = StringType :: Nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ import org.apache.spark.util.Utils
*/
class JacksonParser(
schema: DataType,
val options: JSONOptions) extends Logging {
val options: JSONOptions,
allowArrayAsStructs: Boolean) extends Logging {

import JacksonUtils._
import com.fasterxml.jackson.core.JsonToken._
Expand Down Expand Up @@ -84,7 +85,7 @@ class JacksonParser(
// List([str_a_1,null])
// List([str_a_2,null], [null,str_b_3])
//
case START_ARRAY =>
case START_ARRAY if allowArrayAsStructs =>
val array = convertArray(parser, elementConverter)
// Here, as we support reading top level JSON arrays and take every element
// in such an array as a row, this case is possible.
Expand All @@ -93,6 +94,8 @@ class JacksonParser(
} else {
array.toArray[InternalRow](schema).toSeq
}
case START_ARRAY =>
throw new RuntimeException("Parsing JSON arrays as structs is forbidden.")
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ package org.apache.spark.sql.catalyst.expressions

import java.util.Calendar

import org.apache.spark.SparkFunSuite
import org.scalatest.exceptions.TestFailedException

import org.apache.spark.{SparkException, SparkFunSuite}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.errors.TreeNodeException
import org.apache.spark.sql.catalyst.plans.PlanTestBase
Expand Down Expand Up @@ -409,14 +411,18 @@ class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper with
val schema = StructType(StructField("a", IntegerType) :: Nil)
checkEvaluation(
JsonToStructs(schema, Map.empty, Literal(jsonData), gmtId),
null
InternalRow(null)
)

// Other modes should still return `null`.
checkEvaluation(
JsonToStructs(schema, Map("mode" -> PermissiveMode.name), Literal(jsonData), gmtId),
null
)
val exception = intercept[TestFailedException] {
checkEvaluation(
JsonToStructs(schema, Map("mode" -> FailFastMode.name), Literal(jsonData), gmtId),
InternalRow(null)
)
}.getCause
assert(exception.isInstanceOf[SparkException])
assert(exception.getMessage.contains(
"Malformed records are detected in record parsing. Parse Mode: FAILFAST"))
}

test("from_json - input=array, schema=array, output=array") {
Expand Down Expand Up @@ -450,21 +456,23 @@ class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper with
test("from_json - input=array of single object, schema=struct, output=single row") {
val input = """[{"a": 1}]"""
val schema = StructType(StructField("a", IntegerType) :: Nil)
val output = InternalRow(1)
val output = InternalRow(null)
checkEvaluation(JsonToStructs(schema, Map.empty, Literal(input), gmtId), output)
}

test("from_json - input=array, schema=struct, output=null") {
test("from_json - input=array, schema=struct, output=single row") {
val input = """[{"a": 1}, {"a": 2}]"""
val schema = StructType(StructField("a", IntegerType) :: Nil)
val output = null
checkEvaluation(JsonToStructs(schema, Map.empty, Literal(input), gmtId), output)
val corrupted = "corrupted"
val schema = new StructType().add("a", IntegerType).add(corrupted, StringType)
val output = InternalRow(null, UTF8String.fromString(input))
val options = Map("columnNameOfCorruptRecord" -> corrupted)
checkEvaluation(JsonToStructs(schema, options, Literal(input), gmtId), output)
}

test("from_json - input=empty array, schema=struct, output=null") {
test("from_json - input=empty array, schema=struct, output=single row with null") {
val input = """[]"""
val schema = StructType(StructField("a", IntegerType) :: Nil)
val output = null
val output = InternalRow(null)
checkEvaluation(JsonToStructs(schema, Map.empty, Literal(input), gmtId), output)
}

Expand All @@ -487,7 +495,7 @@ class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper with
val schema = StructType(StructField("a", IntegerType) :: Nil)
checkEvaluation(
JsonToStructs(schema, Map.empty, Literal(badJson), gmtId),
null)
InternalRow(null))
}

test("from_json with timestamp") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -446,7 +446,7 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {

val createParser = CreateJacksonParser.string _
val parsed = jsonDataset.rdd.mapPartitions { iter =>
val rawParser = new JacksonParser(actualSchema, parsedOptions)
val rawParser = new JacksonParser(actualSchema, parsedOptions, allowArrayAsStructs = true)
val parser = new FailureSafeParser[String](
input => rawParser.parse(input, createParser, UTF8String.fromString),
parsedOptions.parseMode,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ class JsonFileFormat extends TextBasedFileFormat with DataSourceRegister {
}

(file: PartitionedFile) => {
val parser = new JacksonParser(actualSchema, parsedOptions)
val parser = new JacksonParser(actualSchema, parsedOptions, allowArrayAsStructs = true)
JsonDataSource(parsedOptions).readFile(
broadcastedHadoopConf.value.value,
file,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -258,4 +258,4 @@ select from_json(a, 'a INT') from t
-- !query 31 schema
struct<from_json(a):struct<a:int>>
-- !query 31 output
NULL
{"a":null}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ package org.apache.spark.sql

import collection.JavaConverters._

import org.apache.spark.SparkException
import org.apache.spark.sql.functions._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.sql.types._

Expand Down Expand Up @@ -132,7 +134,7 @@ class JsonFunctionsSuite extends QueryTest with SharedSQLContext {

checkAnswer(
df.select(from_json($"value", schema)),
Row(null) :: Nil)
Row(Row(null)) :: Nil)
}

test("from_json - json doesn't conform to the array type") {
Expand Down Expand Up @@ -547,4 +549,33 @@ class JsonFunctionsSuite extends QueryTest with SharedSQLContext {
Map("pretty" -> "true"))),
Seq(Row(expected)))
}

test("from_json invalid json - check modes") {
withSQLConf(SQLConf.COLUMN_NAME_OF_CORRUPT_RECORD.key -> "_unparsed") {
val schema = new StructType()
.add("a", IntegerType)
.add("b", IntegerType)
.add("_unparsed", StringType)
val badRec = """{"a" 1, "b": 11}"""
val df = Seq(badRec, """{"a": 2, "b": 12}""").toDS()

checkAnswer(
df.select(from_json($"value", schema, Map("mode" -> "PERMISSIVE"))),
Row(Row(null, null, badRec)) :: Row(Row(2, 12, null)) :: Nil)

val exception1 = intercept[SparkException] {
df.select(from_json($"value", schema, Map("mode" -> "FAILFAST"))).collect()
}.getMessage
assert(exception1.contains(
"Malformed records are detected in record parsing. Parse Mode: FAILFAST."))

val exception2 = intercept[SparkException] {
df.select(from_json($"value", schema, Map("mode" -> "DROPMALFORMED")))
.collect()
}.getMessage
assert(exception2.contains(
"from_json() doesn't support the DROPMALFORMED mode. " +
"Acceptable modes are PERMISSIVE and FAILFAST."))
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {

val dummyOption = new JSONOptions(Map.empty[String, String], "GMT")
val dummySchema = StructType(Seq.empty)
val parser = new JacksonParser(dummySchema, dummyOption)
val parser = new JacksonParser(dummySchema, dummyOption, allowArrayAsStructs = true)

Utils.tryWithResource(factory.createParser(writer.toString)) { jsonParser =>
jsonParser.nextToken()
Expand Down

0 comments on commit 1378143

Please sign in to comment.