Skip to content

Commit

Permalink
[SPARK-25935][SQL] Allow null rows for bad records from JSON/CSV parsers
Browse files Browse the repository at this point in the history
## What changes were proposed in this pull request?

This PR reverts  apache#22938 per discussion in apache#23325

Closes apache#23325

Closes apache#23543 from MaxGekk/return-nulls-from-json-parser.

Authored-by: Maxim Gekk <max.gekk@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
  • Loading branch information
MaxGekk authored and jackylee-ch committed Feb 18, 2019
1 parent 140f9c7 commit 6cf3ba1
Show file tree
Hide file tree
Showing 7 changed files with 23 additions and 28 deletions.
2 changes: 1 addition & 1 deletion R/pkg/tests/fulltests/test_sparkSQL.R
Original file line number Diff line number Diff line change
Expand Up @@ -1694,7 +1694,7 @@ test_that("column functions", {

# check for unparseable
df <- as.DataFrame(list(list("a" = "")))
expect_equal(collect(select(df, from_json(df$a, schema)))[[1]][[1]]$a, NA)
expect_equal(collect(select(df, from_json(df$a, schema)))[[1]][[1]], NA)

# check if array type in string is correctly supported.
jsonArr <- "[{\"name\":\"Bob\"}, {\"name\":\"Alice\"}]"
Expand Down
2 changes: 0 additions & 2 deletions docs/sql-migration-guide-upgrade.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@ displayTitle: Spark SQL Upgrading Guide

- Since Spark 3.0, the `from_json` functions supports two modes - `PERMISSIVE` and `FAILFAST`. The modes can be set via the `mode` option. The default mode became `PERMISSIVE`. In previous versions, behavior of `from_json` did not conform to either `PERMISSIVE` nor `FAILFAST`, especially in processing of malformed JSON records. For example, the JSON string `{"a" 1}` with the schema `a INT` is converted to `null` by previous versions but Spark 3.0 converts it to `Row(null)`.

- In Spark version 2.4 and earlier, the `from_json` function produces `null`s for JSON strings and JSON datasource skips the same independently of its mode if there is no valid root JSON token in its input (` ` for example). Since Spark 3.0, such input is treated as a bad record and handled according to specified mode. For example, in the `PERMISSIVE` mode the ` ` input is converted to `Row(null, null)` if specified schema is `key STRING, value INT`.

- The `ADD JAR` command previously returned a result set with the single value 0. It now returns an empty result set.

- In Spark version 2.4 and earlier, users can create map values with map type key via built-in function like `CreateMap`, `MapFromArrays`, etc. Since Spark 3.0, it's not allowed to create map values with map type key with these built-in functions. Users can still read map values with map type key from data source or Java/Scala collections, though they are not very useful.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -548,23 +548,15 @@ case class JsonToStructs(
s"Input schema ${nullableSchema.catalogString} must be a struct, an array or a map.")
}

@transient
private lazy val castRow = nullableSchema match {
case _: StructType => (row: InternalRow) => row
case _: ArrayType => (row: InternalRow) => row.getArray(0)
case _: MapType => (row: InternalRow) => row.getMap(0)
}

// This converts parsed rows to the desired output by the given schema.
private def convertRow(rows: Iterator[InternalRow]) = {
if (rows.hasNext) {
val result = rows.next()
// JSON's parser produces one record only.
assert(!rows.hasNext)
castRow(result)
} else {
throw new IllegalArgumentException("Expected one row from JSON parser.")
}
@transient
lazy val converter = nullableSchema match {
case _: StructType =>
(rows: Iterator[InternalRow]) => if (rows.hasNext) rows.next() else null
case _: ArrayType =>
(rows: Iterator[InternalRow]) => if (rows.hasNext) rows.next().getArray(0) else null
case _: MapType =>
(rows: Iterator[InternalRow]) => if (rows.hasNext) rows.next().getMap(0) else null
}

val nameOfCorruptRecord = SQLConf.get.getConf(SQLConf.COLUMN_NAME_OF_CORRUPT_RECORD)
Expand Down Expand Up @@ -600,7 +592,7 @@ case class JsonToStructs(
copy(timeZoneId = Option(timeZoneId))

override def nullSafeEval(json: Any): Any = {
convertRow(parser.parse(json.asInstanceOf[UTF8String]))
converter(parser.parse(json.asInstanceOf[UTF8String]))
}

override def inputTypes: Seq[AbstractDataType] = StringType :: Nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -399,7 +399,7 @@ class JacksonParser(
// a null first token is equivalent to testing for input.trim.isEmpty
// but it works on any token stream and not just strings
parser.nextToken() match {
case null => throw new RuntimeException("Not found any JSON token")
case null => Nil
case _ => rootConverter.apply(parser) match {
case null => throw new RuntimeException("Root converter returned null")
case rows => rows
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -548,7 +548,7 @@ class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper with
val schema = StructType(StructField("a", IntegerType) :: Nil)
checkEvaluation(
JsonToStructs(schema, Map.empty, Literal.create(" ", StringType), gmtId),
InternalRow(null)
null
)
}

Expand Down
10 changes: 10 additions & 0 deletions sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,16 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
Seq(Row("1"), Row("2")))
}

test("SPARK-11226 Skip empty line in json file") {
spark.read
.json(Seq("{\"a\": \"1\"}}", "{\"a\": \"2\"}}", "{\"a\": \"3\"}}", "").toDS())
.createOrReplaceTempView("d")

checkAnswer(
sql("select count(1) from d"),
Seq(Row(3)))
}

test("SPARK-8828 sum should return null if all input values are null") {
checkAnswer(
sql("select sum(a), avg(a) from allNulls"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1126,7 +1126,6 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
Row(null, null, null),
Row(null, null, null),
Row(null, null, null),
Row(null, null, null),
Row("str_a_4", "str_b_4", "str_c_4"),
Row(null, null, null))
)
Expand All @@ -1148,7 +1147,6 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
checkAnswer(
jsonDF.select($"a", $"b", $"c", $"_unparsed"),
Row(null, null, null, "{") ::
Row(null, null, null, "") ::
Row(null, null, null, """{"a":1, b:2}""") ::
Row(null, null, null, """{"a":{, b:3}""") ::
Row("str_a_4", "str_b_4", "str_c_4", null) ::
Expand All @@ -1163,7 +1161,6 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
checkAnswer(
jsonDF.filter($"_unparsed".isNotNull).select($"_unparsed"),
Row("{") ::
Row("") ::
Row("""{"a":1, b:2}""") ::
Row("""{"a":{, b:3}""") ::
Row("]") :: Nil
Expand All @@ -1185,7 +1182,6 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
checkAnswer(
jsonDF.selectExpr("a", "b", "c", "_malformed"),
Row(null, null, null, "{") ::
Row(null, null, null, "") ::
Row(null, null, null, """{"a":1, b:2}""") ::
Row(null, null, null, """{"a":{, b:3}""") ::
Row("str_a_4", "str_b_4", "str_c_4", null) ::
Expand Down Expand Up @@ -1727,7 +1723,6 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
val path = dir.getCanonicalPath
primitiveFieldAndType
.toDF("value")
.repartition(1)
.write
.option("compression", "GzIp")
.text(path)
Expand Down Expand Up @@ -2428,7 +2423,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
}

checkCount(2)
countForMalformedJSON(1, Seq(""))
countForMalformedJSON(0, Seq(""))
}

test("SPARK-25040: empty strings should be disallowed") {
Expand Down

0 comments on commit 6cf3ba1

Please sign in to comment.