diff --git a/R/pkg/R/functions.R b/R/pkg/R/functions.R index 2929a00330c62..7546f13d9b89c 100644 --- a/R/pkg/R/functions.R +++ b/R/pkg/R/functions.R @@ -1697,8 +1697,8 @@ setMethod("to_date", }) #' @details -#' \code{to_json}: Converts a column containing a \code{structType}, array of \code{structType}, -#' a \code{mapType} or array of \code{mapType} into a Column of JSON string. +#' \code{to_json}: Converts a column containing a \code{structType}, a \code{mapType} +#' or an \code{arrayType} into a Column of JSON string. #' Resolving the Column can fail if an unsupported type is encountered. #' #' @rdname column_collection_functions diff --git a/R/pkg/tests/fulltests/test_sparkSQL.R b/R/pkg/tests/fulltests/test_sparkSQL.R index e1f3cf339e83f..abc2f5cfd4d43 100644 --- a/R/pkg/tests/fulltests/test_sparkSQL.R +++ b/R/pkg/tests/fulltests/test_sparkSQL.R @@ -1667,6 +1667,15 @@ test_that("column functions", { expect_true(any(apply(s, 1, function(x) { x[[1]]$age == 16 }))) } + # Test to_json() supports arrays of primitive types and arrays + df <- sql("SELECT array(19, 42, 70) as age") + j <- collect(select(df, alias(to_json(df$age), "json"))) + expect_equal(j[order(j$json), ][1], "[19,42,70]") + + df <- sql("SELECT array(array(1, 2), array(3, 4)) as matrix") + j <- collect(select(df, alias(to_json(df$matrix), "json"))) + expect_equal(j[order(j$json), ][1], "[[1,2],[3,4]]") + # passing option df <- as.DataFrame(list(list("col" = "{\"date\":\"21/10/2014\"}"))) schema2 <- structType(structField("date", "date")) diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py index d58d8d10e5cd3..864780e0be9bd 100644 --- a/python/pyspark/sql/functions.py +++ b/python/pyspark/sql/functions.py @@ -2289,13 +2289,11 @@ def from_json(col, schema, options={}): @since(2.1) def to_json(col, options={}): """ - Converts a column containing a :class:`StructType`, :class:`ArrayType` of - :class:`StructType`\\s, a :class:`MapType` or :class:`ArrayType` of :class:`MapType`\\s + Converts a column containing a :class:`StructType`, :class:`ArrayType` or a :class:`MapType` into a JSON string. Throws an exception, in the case of an unsupported type. - :param col: name of column containing the struct, array of the structs, the map or - array of the maps. - :param options: options to control converting. accepts the same options as the json datasource + :param col: name of column containing a struct, an array or a map. + :param options: options to control converting. accepts the same options as the JSON datasource >>> from pyspark.sql import Row >>> from pyspark.sql.types import * @@ -2315,6 +2313,10 @@ def to_json(col, options={}): >>> df = spark.createDataFrame(data, ("key", "value")) >>> df.select(to_json(df.value).alias("json")).collect() [Row(json=u'[{"name":"Alice"},{"name":"Bob"}]')] + >>> data = [(1, ["Alice", "Bob"])] + >>> df = spark.createDataFrame(data, ("key", "value")) + >>> df.select(to_json(df.value).alias("json")).collect() + [Row(json=u'["Alice","Bob"]')] """ sc = SparkContext._active_spark_context diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala index 11cc88735a9a3..bd9090a07471b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala @@ -613,12 +613,11 @@ case class JsonToStructs( } /** - * Converts a [[StructType]], [[ArrayType]] of [[StructType]]s, [[MapType]] - * or [[ArrayType]] of [[MapType]]s to a json output string. + * Converts a [[StructType]], [[ArrayType]] or [[MapType]] to a JSON output string. */ // scalastyle:off line.size.limit @ExpressionDescription( - usage = "_FUNC_(expr[, options]) - Returns a json string with a given struct value", + usage = "_FUNC_(expr[, options]) - Returns a JSON string with a given struct value", examples = """ Examples: > SELECT _FUNC_(named_struct('a', 1, 'b', 2)); @@ -660,15 +659,10 @@ case class StructsToJson( @transient lazy val gen = new JacksonGenerator( - rowSchema, writer, new JSONOptions(options, timeZoneId.get)) + inputSchema, writer, new JSONOptions(options, timeZoneId.get)) @transient - lazy val rowSchema = child.dataType match { - case st: StructType => st - case ArrayType(st: StructType, _) => st - case mt: MapType => mt - case ArrayType(mt: MapType, _) => mt - } + lazy val inputSchema = child.dataType // This converts rows to the JSON output according to the given schema. @transient @@ -680,12 +674,12 @@ case class StructsToJson( UTF8String.fromString(json) } - child.dataType match { + inputSchema match { case _: StructType => (row: Any) => gen.write(row.asInstanceOf[InternalRow]) getAndReset() - case ArrayType(_: StructType, _) => + case _: ArrayType => (arr: Any) => gen.write(arr.asInstanceOf[ArrayData]) getAndReset() @@ -693,34 +687,38 @@ case class StructsToJson( (map: Any) => gen.write(map.asInstanceOf[MapData]) getAndReset() - case ArrayType(_: MapType, _) => - (arr: Any) => - gen.write(arr.asInstanceOf[ArrayData]) - getAndReset() } } override def dataType: DataType = StringType - override def checkInputDataTypes(): TypeCheckResult = child.dataType match { - case _: StructType | ArrayType(_: StructType, _) => + override def checkInputDataTypes(): TypeCheckResult = inputSchema match { + case struct: StructType => try { - JacksonUtils.verifySchema(rowSchema.asInstanceOf[StructType]) + JacksonUtils.verifySchema(struct) TypeCheckResult.TypeCheckSuccess } catch { case e: UnsupportedOperationException => TypeCheckResult.TypeCheckFailure(e.getMessage) } - case _: MapType | ArrayType(_: MapType, _) => + case map: MapType => // TODO: let `JacksonUtils.verifySchema` verify a `MapType` try { - val st = StructType(StructField("a", rowSchema.asInstanceOf[MapType]) :: Nil) + val st = StructType(StructField("a", map) :: Nil) JacksonUtils.verifySchema(st) TypeCheckResult.TypeCheckSuccess } catch { case e: UnsupportedOperationException => TypeCheckResult.TypeCheckFailure(e.getMessage) } + case array: ArrayType => + try { + JacksonUtils.verifyType(prettyName, array) + TypeCheckResult.TypeCheckSuccess + } catch { + case e: UnsupportedOperationException => + TypeCheckResult.TypeCheckFailure(e.getMessage) + } case _ => TypeCheckResult.TypeCheckFailure( s"Input type ${child.dataType.catalogString} must be a struct, array of structs or " + "a map or array of map.") diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala index 738947766adda..9b86d865622dc 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala @@ -18,7 +18,6 @@ package org.apache.spark.sql.catalyst.json import java.io.Writer -import java.nio.charset.StandardCharsets import com.fasterxml.jackson.core._ @@ -28,7 +27,7 @@ import org.apache.spark.sql.catalyst.util.{ArrayData, DateTimeUtils, MapData} import org.apache.spark.sql.types._ /** - * `JackGenerator` can only be initialized with a `StructType` or a `MapType`. + * `JackGenerator` can only be initialized with a `StructType`, a `MapType` or an `ArrayType`. * Once it is initialized with `StructType`, it can be used to write out a struct or an array of * struct. Once it is initialized with `MapType`, it can be used to write out a map or an array * of map. An exception will be thrown if trying to write out a struct if it is initialized with @@ -43,34 +42,32 @@ private[sql] class JacksonGenerator( // we can directly access data in `ArrayData` without the help of `SpecificMutableRow`. private type ValueWriter = (SpecializedGetters, Int) => Unit - // `JackGenerator` can only be initialized with a `StructType` or a `MapType`. - require(dataType.isInstanceOf[StructType] || dataType.isInstanceOf[MapType], - s"JacksonGenerator only supports to be initialized with a ${StructType.simpleString} " + - s"or ${MapType.simpleString} but got ${dataType.catalogString}") + // `JackGenerator` can only be initialized with a `StructType`, a `MapType` or a `ArrayType`. + require(dataType.isInstanceOf[StructType] || dataType.isInstanceOf[MapType] + || dataType.isInstanceOf[ArrayType], + s"JacksonGenerator only supports to be initialized with a ${StructType.simpleString}, " + + s"${MapType.simpleString} or ${ArrayType.simpleString} but got ${dataType.catalogString}") // `ValueWriter`s for all fields of the schema private lazy val rootFieldWriters: Array[ValueWriter] = dataType match { case st: StructType => st.map(_.dataType).map(makeWriter).toArray case _ => throw new UnsupportedOperationException( - s"Initial type ${dataType.catalogString} must be a struct") + s"Initial type ${dataType.catalogString} must be a ${StructType.simpleString}") } // `ValueWriter` for array data storing rows of the schema. private lazy val arrElementWriter: ValueWriter = dataType match { - case st: StructType => - (arr: SpecializedGetters, i: Int) => { - writeObject(writeFields(arr.getStruct(i, st.length), st, rootFieldWriters)) - } - case mt: MapType => - (arr: SpecializedGetters, i: Int) => { - writeObject(writeMapData(arr.getMap(i), mt, mapElementWriter)) - } + case at: ArrayType => makeWriter(at.elementType) + case _: StructType | _: MapType => makeWriter(dataType) + case _ => throw new UnsupportedOperationException( + s"Initial type ${dataType.catalogString} must be " + + s"an ${ArrayType.simpleString}, a ${StructType.simpleString} or a ${MapType.simpleString}") } private lazy val mapElementWriter: ValueWriter = dataType match { case mt: MapType => makeWriter(mt.valueType) case _ => throw new UnsupportedOperationException( - s"Initial type ${dataType.catalogString} must be a map") + s"Initial type ${dataType.catalogString} must be a ${MapType.simpleString}") } private val gen = new JsonFactory().createGenerator(writer).setRootValueSeparator(null) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonUtils.scala index f26b194e7a7ce..2d89c7066d080 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonUtils.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonUtils.scala @@ -32,11 +32,8 @@ object JacksonUtils { } } - /** - * Verify if the schema is supported in JSON parsing. - */ - def verifySchema(schema: StructType): Unit = { - def verifyType(name: String, dataType: DataType): Unit = dataType match { + def verifyType(name: String, dataType: DataType): Unit = { + dataType match { case NullType | BooleanType | ByteType | ShortType | IntegerType | LongType | FloatType | DoubleType | StringType | TimestampType | DateType | BinaryType | _: DecimalType => @@ -54,7 +51,12 @@ object JacksonUtils { throw new UnsupportedOperationException( s"Unable to convert column $name of type ${dataType.catalogString} to JSON.") } + } + /** + * Verify if the schema is supported in JSON parsing. + */ + def verifySchema(schema: StructType): Unit = { schema.foreach(field => verifyType(field.name, field.dataType)) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala index c9331883c4799..c4570981a5820 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala @@ -3493,11 +3493,11 @@ object functions { def schema_of_json(e: Column): Column = withExpr(new SchemaOfJson(e.expr)) /** - * (Scala-specific) Converts a column containing a `StructType`, `ArrayType` of `StructType`s, - * a `MapType` or `ArrayType` of `MapType`s into a JSON string with the specified schema. + * (Scala-specific) Converts a column containing a `StructType`, `ArrayType` or + * a `MapType` into a JSON string with the specified schema. * Throws an exception, in the case of an unsupported type. * - * @param e a column containing a struct or array of the structs. + * @param e a column containing a struct, an array or a map. * @param options options to control how the struct column is converted into a json string. * accepts the same options and the json data source. * @@ -3509,11 +3509,11 @@ object functions { } /** - * (Java-specific) Converts a column containing a `StructType`, `ArrayType` of `StructType`s, - * a `MapType` or `ArrayType` of `MapType`s into a JSON string with the specified schema. + * (Java-specific) Converts a column containing a `StructType`, `ArrayType` or + * a `MapType` into a JSON string with the specified schema. * Throws an exception, in the case of an unsupported type. * - * @param e a column containing a struct or array of the structs. + * @param e a column containing a struct, an array or a map. * @param options options to control how the struct column is converted into a json string. * accepts the same options and the json data source. * @@ -3524,11 +3524,11 @@ object functions { to_json(e, options.asScala.toMap) /** - * Converts a column containing a `StructType`, `ArrayType` of `StructType`s, - * a `MapType` or `ArrayType` of `MapType`s into a JSON string with the specified schema. + * Converts a column containing a `StructType`, `ArrayType` or + * a `MapType` into a JSON string with the specified schema. * Throws an exception, in the case of an unsupported type. * - * @param e a column containing a struct or array of the structs. + * @param e a column containing a struct, an array or a map. * * @group collection_funcs * @since 2.1.0 diff --git a/sql/core/src/test/resources/sql-tests/inputs/json-functions.sql b/sql/core/src/test/resources/sql-tests/inputs/json-functions.sql index 0cf370c13e8c0..0f22c0eeed581 100644 --- a/sql/core/src/test/resources/sql-tests/inputs/json-functions.sql +++ b/sql/core/src/test/resources/sql-tests/inputs/json-functions.sql @@ -51,3 +51,8 @@ select from_json('[null, {"a":2}]', 'array>'); select from_json('[{"a": 1}, {"b":2}]', 'array>'); select from_json('[{"a": 1}, 2]', 'array>'); + +-- to_json - array type +select to_json(array('1', '2', '3')); +select to_json(array(array(1, 2, 3), array(4))); + diff --git a/sql/core/src/test/resources/sql-tests/results/json-functions.sql.out b/sql/core/src/test/resources/sql-tests/results/json-functions.sql.out index 7444cdbef96e4..e550b43e08c28 100644 --- a/sql/core/src/test/resources/sql-tests/results/json-functions.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/json-functions.sql.out @@ -1,5 +1,5 @@ -- Automatically generated by SQLQueryTestSuite --- Number of queries: 38 +-- Number of queries: 40 -- !query 0 @@ -9,7 +9,7 @@ struct -- !query 0 output Class: org.apache.spark.sql.catalyst.expressions.StructsToJson Function: to_json -Usage: to_json(expr[, options]) - Returns a json string with a given struct value +Usage: to_json(expr[, options]) - Returns a JSON string with a given struct value -- !query 1 @@ -38,7 +38,7 @@ Extended Usage: Since: 2.2.0 Function: to_json -Usage: to_json(expr[, options]) - Returns a json string with a given struct value +Usage: to_json(expr[, options]) - Returns a JSON string with a given struct value -- !query 2 @@ -354,3 +354,19 @@ select from_json('[{"a": 1}, 2]', 'array>') struct>> -- !query 37 output NULL + + +-- !query 38 +select to_json(array('1', '2', '3')) +-- !query 38 schema +struct +-- !query 38 output +["1","2","3"] + + +-- !query 39 +select to_json(array(array(1, 2, 3), array(4))) +-- !query 39 schema +struct +-- !query 39 output +[[1,2,3],[4]] diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala index f321ab86e9b7f..fe4bf15fa3921 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala @@ -469,4 +469,53 @@ class JsonFunctionsSuite extends QueryTest with SharedSQLContext { checkAnswer(sql("""select json[0] from jsonTable"""), Seq(Row(null))) } + + test("to_json - array of primitive types") { + val df = Seq(Array(1, 2, 3)).toDF("a") + checkAnswer(df.select(to_json($"a")), Seq(Row("[1,2,3]"))) + } + + test("roundtrip to_json -> from_json - array of primitive types") { + val arr = Array(1, 2, 3) + val df = Seq(arr).toDF("a") + checkAnswer(df.select(from_json(to_json($"a"), ArrayType(IntegerType))), Row(arr)) + } + + test("roundtrip from_json -> to_json - array of primitive types") { + val json = "[1,2,3]" + val df = Seq(json).toDF("a") + val schema = new ArrayType(IntegerType, false) + + checkAnswer(df.select(to_json(from_json($"a", schema))), Seq(Row(json))) + } + + test("roundtrip from_json -> to_json - array of arrays") { + val json = "[[1],[2,3],[4,5,6]]" + val jsonDF = Seq(json).toDF("a") + val schema = new ArrayType(ArrayType(IntegerType, false), false) + + checkAnswer( + jsonDF.select(to_json(from_json($"a", schema))), + Seq(Row(json))) + } + + test("roundtrip from_json -> to_json - array of maps") { + val json = """[{"a":1},{"b":2}]""" + val jsonDF = Seq(json).toDF("a") + val schema = new ArrayType(MapType(StringType, IntegerType, false), false) + + checkAnswer( + jsonDF.select(to_json(from_json($"a", schema))), + Seq(Row(json))) + } + + test("roundtrip from_json -> to_json - array of structs") { + val json = """[{"a":1},{"a":2},{"a":3}]""" + val jsonDF = Seq(json).toDF("a") + val schema = new ArrayType(new StructType().add("a", IntegerType), false) + + checkAnswer( + jsonDF.select(to_json(from_json($"a", schema))), + Seq(Row(json))) + } }