diff --git a/R/pkg/tests/fulltests/test_sparkSQL.R b/R/pkg/tests/fulltests/test_sparkSQL.R index 5ad5d78d3ed17..509f689ac521e 100644 --- a/R/pkg/tests/fulltests/test_sparkSQL.R +++ b/R/pkg/tests/fulltests/test_sparkSQL.R @@ -1694,7 +1694,7 @@ test_that("column functions", { df <- as.DataFrame(list(list("col" = "{\"date\":\"21/10/2014\"}"))) schema2 <- structType(structField("date", "date")) s <- collect(select(df, from_json(df$col, schema2))) - expect_equal(s[[1]][[1]], NA) + expect_equal(s[[1]][[1]]$date, NA) s <- collect(select(df, from_json(df$col, schema2, dateFormat = "dd/MM/yyyy"))) expect_is(s[[1]][[1]]$date, "Date") expect_equal(as.character(s[[1]][[1]]$date), "2014-10-21") diff --git a/docs/sql-migration-guide-upgrade.md b/docs/sql-migration-guide-upgrade.md index b8b9ad8438554..dfa35b88369cb 100644 --- a/docs/sql-migration-guide-upgrade.md +++ b/docs/sql-migration-guide-upgrade.md @@ -13,6 +13,8 @@ displayTitle: Spark SQL Upgrading Guide - In Spark version 2.4 and earlier, the parser of JSON data source treats empty strings as null for some data types such as `IntegerType`. For `FloatType` and `DoubleType`, it fails on empty strings and throws exceptions. Since Spark 3.0, we disallow empty strings and will throw exceptions for data types except for `StringType` and `BinaryType`. + - Since Spark 3.0, the `from_json` functions supports two modes - `PERMISSIVE` and `FAILFAST`. The modes can be set via the `mode` option. The default mode became `PERMISSIVE`. In previous versions, behavior of `from_json` did not conform to either `PERMISSIVE` nor `FAILFAST`, especially in processing of malformed JSON records. For example, the JSON string `{"a" 1}` with the schema `a INT` is converted to `null` by previous versions but Spark 3.0 converts it to `Row(null)`. + ## Upgrading From Spark SQL 2.3 to 2.4 - In Spark version 2.3 and earlier, the second parameter to array_contains function is implicitly promoted to the element type of first array type parameter. This type promotion can be lossy and may cause `array_contains` function to return wrong result. This problem has been addressed in 2.4 by employing a safer type promotion mechanism. This can cause some change in behavior and are illustrated in the table below. diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py index 32d7f02f61883..2694e777d8266 100644 --- a/python/pyspark/sql/functions.py +++ b/python/pyspark/sql/functions.py @@ -2305,7 +2305,7 @@ def from_json(col, schema, options={}): [Row(json=[Row(a=1)])] >>> schema = schema_of_json(lit('''{"a": 0}''')) >>> df.select(from_json(df.value, schema).alias("json")).collect() - [Row(json=Row(a=1))] + [Row(json=Row(a=None))] >>> data = [(1, '''[1, 2, 3]''')] >>> schema = ArrayType(IntegerType()) >>> df = spark.createDataFrame(data, ("key", "value")) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala index b4815b47d1797..e966924293cf7 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala @@ -554,18 +554,36 @@ case class JsonToStructs( @transient lazy val converter = nullableSchema match { case _: StructType => - (rows: Seq[InternalRow]) => if (rows.length == 1) rows.head else null + (rows: Iterator[InternalRow]) => if (rows.hasNext) rows.next() else null case _: ArrayType => - (rows: Seq[InternalRow]) => rows.head.getArray(0) + (rows: Iterator[InternalRow]) => if (rows.hasNext) rows.next().getArray(0) else null case _: MapType => - (rows: Seq[InternalRow]) => rows.head.getMap(0) + (rows: Iterator[InternalRow]) => if (rows.hasNext) rows.next().getMap(0) else null } - @transient - lazy val parser = - new JacksonParser( - nullableSchema, - new JSONOptions(options + ("mode" -> FailFastMode.name), timeZoneId.get)) + val nameOfCorruptRecord = SQLConf.get.getConf(SQLConf.COLUMN_NAME_OF_CORRUPT_RECORD) + @transient lazy val parser = { + val parsedOptions = new JSONOptions(options, timeZoneId.get, nameOfCorruptRecord) + val mode = parsedOptions.parseMode + if (mode != PermissiveMode && mode != FailFastMode) { + throw new IllegalArgumentException(s"from_json() doesn't support the ${mode.name} mode. " + + s"Acceptable modes are ${PermissiveMode.name} and ${FailFastMode.name}.") + } + val rawParser = new JacksonParser(nullableSchema, parsedOptions, allowArrayAsStructs = false) + val createParser = CreateJacksonParser.utf8String _ + + val parserSchema = nullableSchema match { + case s: StructType => s + case other => StructType(StructField("value", other) :: Nil) + } + + new FailureSafeParser[UTF8String]( + input => rawParser.parse(input, createParser, identity[UTF8String]), + mode, + parserSchema, + parsedOptions.columnNameOfCorruptRecord, + parsedOptions.multiLine) + } override def dataType: DataType = nullableSchema @@ -573,35 +591,7 @@ case class JsonToStructs( copy(timeZoneId = Option(timeZoneId)) override def nullSafeEval(json: Any): Any = { - // When input is, - // - `null`: `null`. - // - invalid json: `null`. - // - empty string: `null`. - // - // When the schema is array, - // - json array: `Array(Row(...), ...)` - // - json object: `Array(Row(...))` - // - empty json array: `Array()`. - // - empty json object: `Array(Row(null))`. - // - // When the schema is a struct, - // - json object/array with single element: `Row(...)` - // - json array with multiple elements: `null` - // - empty json array: `null`. - // - empty json object: `Row(null)`. - - // We need `null` if the input string is an empty string. `JacksonParser` can - // deal with this but produces `Nil`. - if (json.toString.trim.isEmpty) return null - - try { - converter(parser.parse( - json.asInstanceOf[UTF8String], - CreateJacksonParser.utf8String, - identity[UTF8String])) - } catch { - case _: BadRecordException => null - } + converter(parser.parse(json.asInstanceOf[UTF8String])) } override def inputTypes: Seq[AbstractDataType] = StringType :: Nil diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala index 918c9e71ad37a..57c7f2faf3107 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala @@ -38,7 +38,8 @@ import org.apache.spark.util.Utils */ class JacksonParser( schema: DataType, - val options: JSONOptions) extends Logging { + val options: JSONOptions, + allowArrayAsStructs: Boolean) extends Logging { import JacksonUtils._ import com.fasterxml.jackson.core.JsonToken._ @@ -84,7 +85,7 @@ class JacksonParser( // List([str_a_1,null]) // List([str_a_2,null], [null,str_b_3]) // - case START_ARRAY => + case START_ARRAY if allowArrayAsStructs => val array = convertArray(parser, elementConverter) // Here, as we support reading top level JSON arrays and take every element // in such an array as a row, this case is possible. @@ -93,6 +94,8 @@ class JacksonParser( } else { array.toArray[InternalRow](schema).toSeq } + case START_ARRAY => + throw new RuntimeException("Parsing JSON arrays as structs is forbidden.") } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala index f31b294fe25d4..304642161146b 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala @@ -19,7 +19,9 @@ package org.apache.spark.sql.catalyst.expressions import java.util.Calendar -import org.apache.spark.SparkFunSuite +import org.scalatest.exceptions.TestFailedException + +import org.apache.spark.{SparkException, SparkFunSuite} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.errors.TreeNodeException import org.apache.spark.sql.catalyst.plans.PlanTestBase @@ -409,14 +411,18 @@ class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper with val schema = StructType(StructField("a", IntegerType) :: Nil) checkEvaluation( JsonToStructs(schema, Map.empty, Literal(jsonData), gmtId), - null + InternalRow(null) ) - // Other modes should still return `null`. - checkEvaluation( - JsonToStructs(schema, Map("mode" -> PermissiveMode.name), Literal(jsonData), gmtId), - null - ) + val exception = intercept[TestFailedException] { + checkEvaluation( + JsonToStructs(schema, Map("mode" -> FailFastMode.name), Literal(jsonData), gmtId), + InternalRow(null) + ) + }.getCause + assert(exception.isInstanceOf[SparkException]) + assert(exception.getMessage.contains( + "Malformed records are detected in record parsing. Parse Mode: FAILFAST")) } test("from_json - input=array, schema=array, output=array") { @@ -450,21 +456,23 @@ class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper with test("from_json - input=array of single object, schema=struct, output=single row") { val input = """[{"a": 1}]""" val schema = StructType(StructField("a", IntegerType) :: Nil) - val output = InternalRow(1) + val output = InternalRow(null) checkEvaluation(JsonToStructs(schema, Map.empty, Literal(input), gmtId), output) } - test("from_json - input=array, schema=struct, output=null") { + test("from_json - input=array, schema=struct, output=single row") { val input = """[{"a": 1}, {"a": 2}]""" - val schema = StructType(StructField("a", IntegerType) :: Nil) - val output = null - checkEvaluation(JsonToStructs(schema, Map.empty, Literal(input), gmtId), output) + val corrupted = "corrupted" + val schema = new StructType().add("a", IntegerType).add(corrupted, StringType) + val output = InternalRow(null, UTF8String.fromString(input)) + val options = Map("columnNameOfCorruptRecord" -> corrupted) + checkEvaluation(JsonToStructs(schema, options, Literal(input), gmtId), output) } - test("from_json - input=empty array, schema=struct, output=null") { + test("from_json - input=empty array, schema=struct, output=single row with null") { val input = """[]""" val schema = StructType(StructField("a", IntegerType) :: Nil) - val output = null + val output = InternalRow(null) checkEvaluation(JsonToStructs(schema, Map.empty, Literal(input), gmtId), output) } @@ -487,7 +495,7 @@ class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper with val schema = StructType(StructField("a", IntegerType) :: Nil) checkEvaluation( JsonToStructs(schema, Map.empty, Literal(badJson), gmtId), - null) + InternalRow(null)) } test("from_json with timestamp") { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala index 4f6d8b8a0c34a..95c97e5c9433c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala @@ -446,7 +446,7 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { val createParser = CreateJacksonParser.string _ val parsed = jsonDataset.rdd.mapPartitions { iter => - val rawParser = new JacksonParser(actualSchema, parsedOptions) + val rawParser = new JacksonParser(actualSchema, parsedOptions, allowArrayAsStructs = true) val parser = new FailureSafeParser[String]( input => rawParser.parse(input, createParser, UTF8String.fromString), parsedOptions.parseMode, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala index a9241afba537b..1f7c9d73f19fe 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala @@ -130,7 +130,7 @@ class JsonFileFormat extends TextBasedFileFormat with DataSourceRegister { } (file: PartitionedFile) => { - val parser = new JacksonParser(actualSchema, parsedOptions) + val parser = new JacksonParser(actualSchema, parsedOptions, allowArrayAsStructs = true) JsonDataSource(parsedOptions).readFile( broadcastedHadoopConf.value.value, file, diff --git a/sql/core/src/test/resources/sql-tests/results/typeCoercion/native/stringCastAndExpressions.sql.out b/sql/core/src/test/resources/sql-tests/results/typeCoercion/native/stringCastAndExpressions.sql.out index ba9bf76513f97..31ee700a8db95 100644 --- a/sql/core/src/test/resources/sql-tests/results/typeCoercion/native/stringCastAndExpressions.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/typeCoercion/native/stringCastAndExpressions.sql.out @@ -258,4 +258,4 @@ select from_json(a, 'a INT') from t -- !query 31 schema struct> -- !query 31 output -NULL +{"a":null} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala index 5cbf10129a4da..797b274f42cdd 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala @@ -19,7 +19,9 @@ package org.apache.spark.sql import collection.JavaConverters._ +import org.apache.spark.SparkException import org.apache.spark.sql.functions._ +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.sql.types._ @@ -132,7 +134,7 @@ class JsonFunctionsSuite extends QueryTest with SharedSQLContext { checkAnswer( df.select(from_json($"value", schema)), - Row(null) :: Nil) + Row(Row(null)) :: Nil) } test("from_json - json doesn't conform to the array type") { @@ -547,4 +549,33 @@ class JsonFunctionsSuite extends QueryTest with SharedSQLContext { Map("pretty" -> "true"))), Seq(Row(expected))) } + + test("from_json invalid json - check modes") { + withSQLConf(SQLConf.COLUMN_NAME_OF_CORRUPT_RECORD.key -> "_unparsed") { + val schema = new StructType() + .add("a", IntegerType) + .add("b", IntegerType) + .add("_unparsed", StringType) + val badRec = """{"a" 1, "b": 11}""" + val df = Seq(badRec, """{"a": 2, "b": 12}""").toDS() + + checkAnswer( + df.select(from_json($"value", schema, Map("mode" -> "PERMISSIVE"))), + Row(Row(null, null, badRec)) :: Row(Row(2, 12, null)) :: Nil) + + val exception1 = intercept[SparkException] { + df.select(from_json($"value", schema, Map("mode" -> "FAILFAST"))).collect() + }.getMessage + assert(exception1.contains( + "Malformed records are detected in record parsing. Parse Mode: FAILFAST.")) + + val exception2 = intercept[SparkException] { + df.select(from_json($"value", schema, Map("mode" -> "DROPMALFORMED"))) + .collect() + }.getMessage + assert(exception2.contains( + "from_json() doesn't support the DROPMALFORMED mode. " + + "Acceptable modes are PERMISSIVE and FAILFAST.")) + } + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala index 43e1a616e363c..06032ded42a53 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala @@ -67,7 +67,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { val dummyOption = new JSONOptions(Map.empty[String, String], "GMT") val dummySchema = StructType(Seq.empty) - val parser = new JacksonParser(dummySchema, dummyOption) + val parser = new JacksonParser(dummySchema, dummyOption, allowArrayAsStructs = true) Utils.tryWithResource(factory.createParser(writer.toString)) { jsonParser => jsonParser.nextToken()