From fb727268b776f3c9b896abfee60e8538aa9e93e4 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Fri, 24 Aug 2018 20:57:36 +0200 Subject: [PATCH 01/13] Support arrays of any types in to_json --- R/pkg/R/functions.R | 4 +- python/pyspark/sql/functions.py | 6 +-- .../expressions/jsonExpressions.scala | 28 +++++------ .../sql/catalyst/json/JacksonGenerator.scala | 20 +++----- .../sql/catalyst/json/JacksonUtils.scala | 34 ++++++------- .../org/apache/spark/sql/functions.scala | 18 +++---- .../apache/spark/sql/JsonFunctionsSuite.scala | 49 +++++++++++++++++++ 7 files changed, 100 insertions(+), 59 deletions(-) diff --git a/R/pkg/R/functions.R b/R/pkg/R/functions.R index 2929a00330c62..17984636a010b 100644 --- a/R/pkg/R/functions.R +++ b/R/pkg/R/functions.R @@ -1697,8 +1697,8 @@ setMethod("to_date", }) #' @details -#' \code{to_json}: Converts a column containing a \code{structType}, array of \code{structType}, -#' a \code{mapType} or array of \code{mapType} into a Column of JSON string. +#' \code{to_json}: Converts a column containing a \code{structType}, a \code{mapType} +#' or an array into a Column of JSON string. #' Resolving the Column can fail if an unsupported type is encountered. #' #' @rdname column_collection_functions diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py index d58d8d10e5cd3..e488adf5236e1 100644 --- a/python/pyspark/sql/functions.py +++ b/python/pyspark/sql/functions.py @@ -2289,12 +2289,10 @@ def from_json(col, schema, options={}): @since(2.1) def to_json(col, options={}): """ - Converts a column containing a :class:`StructType`, :class:`ArrayType` of - :class:`StructType`\\s, a :class:`MapType` or :class:`ArrayType` of :class:`MapType`\\s + Converts a column containing a :class:`StructType`, :class:`ArrayType` or a :class:`MapType` into a JSON string. Throws an exception, in the case of an unsupported type. - :param col: name of column containing the struct, array of the structs, the map or - array of the maps. + :param col: name of column containing a struct, an array or a map. :param options: options to control converting. accepts the same options as the json datasource >>> from pyspark.sql import Row diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala index 11cc88735a9a3..b95dc3c65e1c2 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala @@ -613,8 +613,7 @@ case class JsonToStructs( } /** - * Converts a [[StructType]], [[ArrayType]] of [[StructType]]s, [[MapType]] - * or [[ArrayType]] of [[MapType]]s to a json output string. + * Converts a [[StructType]], [[ArrayType]] or [[MapType]] to a json output string. */ // scalastyle:off line.size.limit @ExpressionDescription( @@ -663,12 +662,7 @@ case class StructsToJson( rowSchema, writer, new JSONOptions(options, timeZoneId.get)) @transient - lazy val rowSchema = child.dataType match { - case st: StructType => st - case ArrayType(st: StructType, _) => st - case mt: MapType => mt - case ArrayType(mt: MapType, _) => mt - } + lazy val rowSchema = child.dataType // This converts rows to the JSON output according to the given schema. @transient @@ -685,7 +679,7 @@ case class StructsToJson( (row: Any) => gen.write(row.asInstanceOf[InternalRow]) getAndReset() - case ArrayType(_: StructType, _) => + case _: ArrayType => (arr: Any) => gen.write(arr.asInstanceOf[ArrayData]) getAndReset() @@ -693,17 +687,13 @@ case class StructsToJson( (map: Any) => gen.write(map.asInstanceOf[MapData]) getAndReset() - case ArrayType(_: MapType, _) => - (arr: Any) => - gen.write(arr.asInstanceOf[ArrayData]) - getAndReset() } } override def dataType: DataType = StringType override def checkInputDataTypes(): TypeCheckResult = child.dataType match { - case _: StructType | ArrayType(_: StructType, _) => + case _: StructType => try { JacksonUtils.verifySchema(rowSchema.asInstanceOf[StructType]) TypeCheckResult.TypeCheckSuccess @@ -711,7 +701,7 @@ case class StructsToJson( case e: UnsupportedOperationException => TypeCheckResult.TypeCheckFailure(e.getMessage) } - case _: MapType | ArrayType(_: MapType, _) => + case _: MapType => // TODO: let `JacksonUtils.verifySchema` verify a `MapType` try { val st = StructType(StructField("a", rowSchema.asInstanceOf[MapType]) :: Nil) @@ -721,6 +711,14 @@ case class StructsToJson( case e: UnsupportedOperationException => TypeCheckResult.TypeCheckFailure(e.getMessage) } + case _: ArrayType => + try { + JacksonUtils.verifyType(prettyName, rowSchema) + TypeCheckResult.TypeCheckSuccess + } catch { + case e: UnsupportedOperationException => + TypeCheckResult.TypeCheckFailure(e.getMessage) + } case _ => TypeCheckResult.TypeCheckFailure( s"Input type ${child.dataType.catalogString} must be a struct, array of structs or " + "a map or array of map.") diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala index 738947766adda..9b67fa314133d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala @@ -28,7 +28,7 @@ import org.apache.spark.sql.catalyst.util.{ArrayData, DateTimeUtils, MapData} import org.apache.spark.sql.types._ /** - * `JackGenerator` can only be initialized with a `StructType` or a `MapType`. + * `JackGenerator` can only be initialized with a `StructType`, a `MapType` ot `ArrayType`. * Once it is initialized with `StructType`, it can be used to write out a struct or an array of * struct. Once it is initialized with `MapType`, it can be used to write out a map or an array * of map. An exception will be thrown if trying to write out a struct if it is initialized with @@ -43,10 +43,11 @@ private[sql] class JacksonGenerator( // we can directly access data in `ArrayData` without the help of `SpecificMutableRow`. private type ValueWriter = (SpecializedGetters, Int) => Unit - // `JackGenerator` can only be initialized with a `StructType` or a `MapType`. - require(dataType.isInstanceOf[StructType] || dataType.isInstanceOf[MapType], + // `JackGenerator` can only be initialized with a `StructType`, a `MapType` or a `ArrayType`. + require(dataType.isInstanceOf[StructType] || dataType.isInstanceOf[MapType] + || dataType.isInstanceOf[ArrayType], s"JacksonGenerator only supports to be initialized with a ${StructType.simpleString} " + - s"or ${MapType.simpleString} but got ${dataType.catalogString}") + s"${MapType.simpleString} or ${ArrayType.simpleString} but got ${dataType.catalogString}") // `ValueWriter`s for all fields of the schema private lazy val rootFieldWriters: Array[ValueWriter] = dataType match { @@ -57,14 +58,9 @@ private[sql] class JacksonGenerator( // `ValueWriter` for array data storing rows of the schema. private lazy val arrElementWriter: ValueWriter = dataType match { - case st: StructType => - (arr: SpecializedGetters, i: Int) => { - writeObject(writeFields(arr.getStruct(i, st.length), st, rootFieldWriters)) - } - case mt: MapType => - (arr: SpecializedGetters, i: Int) => { - writeObject(writeMapData(arr.getMap(i), mt, mapElementWriter)) - } + case at: ArrayType => makeWriter(at.elementType) + case _ => throw new UnsupportedOperationException( + s"Initial type ${dataType.catalogString} must be a array") } private lazy val mapElementWriter: ValueWriter = dataType match { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonUtils.scala index f26b194e7a7ce..d438bccba911d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonUtils.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonUtils.scala @@ -32,29 +32,29 @@ object JacksonUtils { } } - /** - * Verify if the schema is supported in JSON parsing. - */ - def verifySchema(schema: StructType): Unit = { - def verifyType(name: String, dataType: DataType): Unit = dataType match { - case NullType | BooleanType | ByteType | ShortType | IntegerType | LongType | FloatType | - DoubleType | StringType | TimestampType | DateType | BinaryType | _: DecimalType => + def verifyType(name: String, dataType: DataType): Unit = dataType match { + case NullType | BooleanType | ByteType | ShortType | IntegerType | LongType | FloatType | + DoubleType | StringType | TimestampType | DateType | BinaryType | _: DecimalType => - case st: StructType => st.foreach(field => verifyType(field.name, field.dataType)) + case st: StructType => st.foreach(field => verifyType(field.name, field.dataType)) - case at: ArrayType => verifyType(name, at.elementType) + case at: ArrayType => verifyType(name, at.elementType) - // For MapType, its keys are treated as a string (i.e. calling `toString`) basically when - // generating JSON, so we only care if the values are valid for JSON. - case mt: MapType => verifyType(name, mt.valueType) + // For MapType, its keys are treated as a string (i.e. calling `toString`) basically when + // generating JSON, so we only care if the values are valid for JSON. + case mt: MapType => verifyType(name, mt.valueType) - case udt: UserDefinedType[_] => verifyType(name, udt.sqlType) + case udt: UserDefinedType[_] => verifyType(name, udt.sqlType) - case _ => - throw new UnsupportedOperationException( - s"Unable to convert column $name of type ${dataType.catalogString} to JSON.") - } + case _ => + throw new UnsupportedOperationException( + s"Unable to convert column $name of type ${dataType.catalogString} to JSON.") + } + /** + * Verify if the schema is supported in JSON parsing. + */ + def verifySchema(schema: StructType): Unit = { schema.foreach(field => verifyType(field.name, field.dataType)) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala index c9331883c4799..e91b49e709320 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala @@ -3493,11 +3493,11 @@ object functions { def schema_of_json(e: Column): Column = withExpr(new SchemaOfJson(e.expr)) /** - * (Scala-specific) Converts a column containing a `StructType`, `ArrayType` of `StructType`s, - * a `MapType` or `ArrayType` of `MapType`s into a JSON string with the specified schema. + * (Scala-specific) Converts a column containing a `StructType`, `ArrayType` or + * a `MapType` into a JSON string with the specified schema. * Throws an exception, in the case of an unsupported type. * - * @param e a column containing a struct or array of the structs. + * @param e a column containing a struct, a array or a map. * @param options options to control how the struct column is converted into a json string. * accepts the same options and the json data source. * @@ -3509,11 +3509,11 @@ object functions { } /** - * (Java-specific) Converts a column containing a `StructType`, `ArrayType` of `StructType`s, - * a `MapType` or `ArrayType` of `MapType`s into a JSON string with the specified schema. + * (Java-specific) Converts a column containing a `StructType`, `ArrayType` or + * a `MapType` into a JSON string with the specified schema. * Throws an exception, in the case of an unsupported type. * - * @param e a column containing a struct or array of the structs. + * @param e a column containing a struct, an array or a map. * @param options options to control how the struct column is converted into a json string. * accepts the same options and the json data source. * @@ -3524,11 +3524,11 @@ object functions { to_json(e, options.asScala.toMap) /** - * Converts a column containing a `StructType`, `ArrayType` of `StructType`s, - * a `MapType` or `ArrayType` of `MapType`s into a JSON string with the specified schema. + * Converts a column containing a `StructType`, `ArrayType` or + * a `MapType` into a JSON string with the specified schema. * Throws an exception, in the case of an unsupported type. * - * @param e a column containing a struct or array of the structs. + * @param e a column containing a struct, an array or a map. * * @group collection_funcs * @since 2.1.0 diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala index f321ab86e9b7f..fbc667cb7fb30 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala @@ -469,4 +469,53 @@ class JsonFunctionsSuite extends QueryTest with SharedSQLContext { checkAnswer(sql("""select json[0] from jsonTable"""), Seq(Row(null))) } + + test("to_json - array of primitive type") { + val df = Seq(Array(1, 2, 3)).toDF("a") + checkAnswer(df.select(to_json($"a")), Seq(Row("[1,2,3]"))) + } + + test("roundtrip to_json -> from_json - array of primitive types") { + val arr = Array(1, 2, 3) + val df = Seq(arr).toDF("a") + checkAnswer(df.select(from_json(to_json($"a"), ArrayType(IntegerType))), Row(arr)) + } + + test("roundtrip from_json -> to_json - array of primitive types") { + val json = "[1,2,3]" + val df = Seq(json).toDF("a") + val schema = new ArrayType(IntegerType, false) + + checkAnswer(df.select(to_json(from_json($"a", schema))), Seq(Row(json))) + } + + test("roundtrip from_json -> to_json - array of arrays") { + val json = "[[1],[2,3],[4,5,6]]" + val jsonDF = Seq(json).toDF("a") + val schema = new ArrayType(ArrayType(IntegerType, false), false) + + checkAnswer( + jsonDF.select(to_json(from_json($"a", schema))), + Seq(Row(json))) + } + + test("roundtrip from_json -> to_json - array of maps") { + val json = """[{"a":1},{"b":2}]""" + val jsonDF = Seq(json).toDF("a") + val schema = new ArrayType(MapType(StringType, IntegerType, false), false) + + checkAnswer( + jsonDF.select(to_json(from_json($"a", schema))), + Seq(Row(json))) + } + + test("roundtrip from_json -> to_json - array of structs") { + val json = """[{"a":1},{"a":2},{"a":3}]""" + val jsonDF = Seq(json).toDF("a") + val schema = new ArrayType(new StructType().add("a", IntegerType), false) + + checkAnswer( + jsonDF.select(to_json(from_json($"a", schema))), + Seq(Row(json))) + } } From e9a5f50720d0e6ab301afaa3a4eff801de1240d7 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Fri, 24 Aug 2018 21:18:19 +0200 Subject: [PATCH 02/13] sql tests for array of primitive type and of arrays --- .../sql-tests/inputs/json-functions.sql | 5 +++++ .../sql-tests/results/json-functions.sql.out | 18 +++++++++++++++++- 2 files changed, 22 insertions(+), 1 deletion(-) diff --git a/sql/core/src/test/resources/sql-tests/inputs/json-functions.sql b/sql/core/src/test/resources/sql-tests/inputs/json-functions.sql index 0cf370c13e8c0..3ad0a07b1e994 100644 --- a/sql/core/src/test/resources/sql-tests/inputs/json-functions.sql +++ b/sql/core/src/test/resources/sql-tests/inputs/json-functions.sql @@ -51,3 +51,8 @@ select from_json('[null, {"a":2}]', 'array>'); select from_json('[{"a": 1}, {"b":2}]', 'array>'); select from_json('[{"a": 1}, 2]', 'array>'); + +-- to_json - array type +select to_json(array('1','2','3')); +select to_json(array(array(1,2,3),array(4))); + diff --git a/sql/core/src/test/resources/sql-tests/results/json-functions.sql.out b/sql/core/src/test/resources/sql-tests/results/json-functions.sql.out index 7444cdbef96e4..c51b90c88c3be 100644 --- a/sql/core/src/test/resources/sql-tests/results/json-functions.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/json-functions.sql.out @@ -1,5 +1,5 @@ -- Automatically generated by SQLQueryTestSuite --- Number of queries: 38 +-- Number of queries: 40 -- !query 0 @@ -354,3 +354,19 @@ select from_json('[{"a": 1}, 2]', 'array>') struct>> -- !query 37 output NULL + + +-- !query 38 +select to_json(array('1','2','3')) +-- !query 38 schema +struct +-- !query 38 output +["1","2","3"] + + +-- !query 39 +select to_json(array(array(1,2,3),array(4))) +-- !query 39 schema +struct +-- !query 39 output +[[1,2,3],[4]] From c1755decf93c503dcb3de957408908177a75cf3e Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Sat, 25 Aug 2018 00:18:11 +0200 Subject: [PATCH 03/13] Revert the struct and map types for backward compatibility --- .../spark/sql/catalyst/json/JacksonGenerator.scala | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala index 9b67fa314133d..545540873708f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala @@ -18,7 +18,6 @@ package org.apache.spark.sql.catalyst.json import java.io.Writer -import java.nio.charset.StandardCharsets import com.fasterxml.jackson.core._ @@ -59,8 +58,16 @@ private[sql] class JacksonGenerator( // `ValueWriter` for array data storing rows of the schema. private lazy val arrElementWriter: ValueWriter = dataType match { case at: ArrayType => makeWriter(at.elementType) + case st: StructType => + (arr: SpecializedGetters, i: Int) => { + writeObject(writeFields(arr.getStruct(i, st.length), st, rootFieldWriters)) + } + case mt: MapType => + (arr: SpecializedGetters, i: Int) => { + writeObject(writeMapData(arr.getMap(i), mt, mapElementWriter)) + } case _ => throw new UnsupportedOperationException( - s"Initial type ${dataType.catalogString} must be a array") + s"Initial type ${dataType.catalogString} must be an array, a struct or a map") } private lazy val mapElementWriter: ValueWriter = dataType match { From db9a358fa323b0ec4e863887fa80e78a3d88a5d1 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Mon, 27 Aug 2018 16:28:49 +0200 Subject: [PATCH 04/13] Changing array to ArrayType.simpleString and so for map and struct --- .../apache/spark/sql/catalyst/json/JacksonGenerator.scala | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala index 545540873708f..ba8fa547b61fe 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala @@ -52,7 +52,7 @@ private[sql] class JacksonGenerator( private lazy val rootFieldWriters: Array[ValueWriter] = dataType match { case st: StructType => st.map(_.dataType).map(makeWriter).toArray case _ => throw new UnsupportedOperationException( - s"Initial type ${dataType.catalogString} must be a struct") + s"Initial type ${dataType.catalogString} must be a ${StructType.simpleString}") } // `ValueWriter` for array data storing rows of the schema. @@ -67,13 +67,14 @@ private[sql] class JacksonGenerator( writeObject(writeMapData(arr.getMap(i), mt, mapElementWriter)) } case _ => throw new UnsupportedOperationException( - s"Initial type ${dataType.catalogString} must be an array, a struct or a map") + s"Initial type ${dataType.catalogString} must be " + + s"an ${ArrayType.simpleString}, a ${StructType.simpleString} or a ${MapType.simpleString}") } private lazy val mapElementWriter: ValueWriter = dataType match { case mt: MapType => makeWriter(mt.valueType) case _ => throw new UnsupportedOperationException( - s"Initial type ${dataType.catalogString} must be a map") + s"Initial type ${dataType.catalogString} must be a ${MapType.simpleString}") } private val gen = new JsonFactory().createGenerator(writer).setRootValueSeparator(null) From 906a3013e97f8e1d1f8f7e0e335f7404de47b582 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Mon, 27 Aug 2018 16:46:37 +0200 Subject: [PATCH 05/13] Minor changes - adding gaps --- .../src/test/resources/sql-tests/inputs/json-functions.sql | 4 ++-- .../test/resources/sql-tests/results/json-functions.sql.out | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/sql/core/src/test/resources/sql-tests/inputs/json-functions.sql b/sql/core/src/test/resources/sql-tests/inputs/json-functions.sql index 3ad0a07b1e994..0f22c0eeed581 100644 --- a/sql/core/src/test/resources/sql-tests/inputs/json-functions.sql +++ b/sql/core/src/test/resources/sql-tests/inputs/json-functions.sql @@ -53,6 +53,6 @@ select from_json('[{"a": 1}, {"b":2}]', 'array>'); select from_json('[{"a": 1}, 2]', 'array>'); -- to_json - array type -select to_json(array('1','2','3')); -select to_json(array(array(1,2,3),array(4))); +select to_json(array('1', '2', '3')); +select to_json(array(array(1, 2, 3), array(4))); diff --git a/sql/core/src/test/resources/sql-tests/results/json-functions.sql.out b/sql/core/src/test/resources/sql-tests/results/json-functions.sql.out index c51b90c88c3be..ae4fad43e6a3f 100644 --- a/sql/core/src/test/resources/sql-tests/results/json-functions.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/json-functions.sql.out @@ -357,7 +357,7 @@ NULL -- !query 38 -select to_json(array('1','2','3')) +select to_json(array('1', '2', '3')) -- !query 38 schema struct -- !query 38 output @@ -365,7 +365,7 @@ struct -- !query 39 -select to_json(array(array(1,2,3),array(4))) +select to_json(array(array(1, 2, 3), array(4))) -- !query 39 schema struct -- !query 39 output From e91a33cbf5416c93b95ca0bd594a5418ee033f15 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Tue, 28 Aug 2018 08:58:41 +0200 Subject: [PATCH 06/13] json -> JSON --- python/pyspark/sql/functions.py | 2 +- .../spark/sql/catalyst/expressions/jsonExpressions.scala | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py index e488adf5236e1..5c5be6398d54c 100644 --- a/python/pyspark/sql/functions.py +++ b/python/pyspark/sql/functions.py @@ -2293,7 +2293,7 @@ def to_json(col, options={}): into a JSON string. Throws an exception, in the case of an unsupported type. :param col: name of column containing a struct, an array or a map. - :param options: options to control converting. accepts the same options as the json datasource + :param options: options to control converting. accepts the same options as the JSON datasource >>> from pyspark.sql import Row >>> from pyspark.sql.types import * diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala index b95dc3c65e1c2..0ea128709fa98 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala @@ -613,11 +613,11 @@ case class JsonToStructs( } /** - * Converts a [[StructType]], [[ArrayType]] or [[MapType]] to a json output string. + * Converts a [[StructType]], [[ArrayType]] or [[MapType]] to a JSON output string. */ // scalastyle:off line.size.limit @ExpressionDescription( - usage = "_FUNC_(expr[, options]) - Returns a json string with a given struct value", + usage = "_FUNC_(expr[, options]) - Returns a JSON string with a given struct value", examples = """ Examples: > SELECT _FUNC_(named_struct('a', 1, 'b', 2)); From 2c3a641cb68a9a414f0e0664d60ba57ef121c325 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Tue, 28 Aug 2018 13:30:17 +0200 Subject: [PATCH 07/13] Fix SQL tests: json -> JSON --- .../test/resources/sql-tests/results/json-functions.sql.out | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/core/src/test/resources/sql-tests/results/json-functions.sql.out b/sql/core/src/test/resources/sql-tests/results/json-functions.sql.out index ae4fad43e6a3f..e550b43e08c28 100644 --- a/sql/core/src/test/resources/sql-tests/results/json-functions.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/json-functions.sql.out @@ -9,7 +9,7 @@ struct -- !query 0 output Class: org.apache.spark.sql.catalyst.expressions.StructsToJson Function: to_json -Usage: to_json(expr[, options]) - Returns a json string with a given struct value +Usage: to_json(expr[, options]) - Returns a JSON string with a given struct value -- !query 1 @@ -38,7 +38,7 @@ Extended Usage: Since: 2.2.0 Function: to_json -Usage: to_json(expr[, options]) - Returns a json string with a given struct value +Usage: to_json(expr[, options]) - Returns a JSON string with a given struct value -- !query 2 From 2f73c9c4d3cef21997522563d1f7dcca58f8e9cd Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Thu, 30 Aug 2018 23:38:06 +0200 Subject: [PATCH 08/13] Addressing Bryan's review comments --- .../org/apache/spark/sql/catalyst/json/JacksonGenerator.scala | 4 ++-- sql/core/src/main/scala/org/apache/spark/sql/functions.scala | 2 +- .../test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala index ba8fa547b61fe..ef351e33162ea 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala @@ -27,7 +27,7 @@ import org.apache.spark.sql.catalyst.util.{ArrayData, DateTimeUtils, MapData} import org.apache.spark.sql.types._ /** - * `JackGenerator` can only be initialized with a `StructType`, a `MapType` ot `ArrayType`. + * `JackGenerator` can only be initialized with a `StructType`, a `MapType` or an `ArrayType`. * Once it is initialized with `StructType`, it can be used to write out a struct or an array of * struct. Once it is initialized with `MapType`, it can be used to write out a map or an array * of map. An exception will be thrown if trying to write out a struct if it is initialized with @@ -45,7 +45,7 @@ private[sql] class JacksonGenerator( // `JackGenerator` can only be initialized with a `StructType`, a `MapType` or a `ArrayType`. require(dataType.isInstanceOf[StructType] || dataType.isInstanceOf[MapType] || dataType.isInstanceOf[ArrayType], - s"JacksonGenerator only supports to be initialized with a ${StructType.simpleString} " + + s"JacksonGenerator only supports to be initialized with a ${StructType.simpleString}, " + s"${MapType.simpleString} or ${ArrayType.simpleString} but got ${dataType.catalogString}") // `ValueWriter`s for all fields of the schema diff --git a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala index e91b49e709320..c4570981a5820 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala @@ -3497,7 +3497,7 @@ object functions { * a `MapType` into a JSON string with the specified schema. * Throws an exception, in the case of an unsupported type. * - * @param e a column containing a struct, a array or a map. + * @param e a column containing a struct, an array or a map. * @param options options to control how the struct column is converted into a json string. * accepts the same options and the json data source. * diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala index fbc667cb7fb30..fe4bf15fa3921 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala @@ -470,7 +470,7 @@ class JsonFunctionsSuite extends QueryTest with SharedSQLContext { checkAnswer(sql("""select json[0] from jsonTable"""), Seq(Row(null))) } - test("to_json - array of primitive type") { + test("to_json - array of primitive types") { val df = Seq(Array(1, 2, 3)).toDF("a") checkAnswer(df.select(to_json($"a")), Seq(Row("[1,2,3]"))) } From 252c924618a2c4596ffeaf6ca688348ed80011a9 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Sat, 1 Sep 2018 11:01:28 +0200 Subject: [PATCH 09/13] Testing arrays in to_json for PySpark --- python/pyspark/sql/functions.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py index 5c5be6398d54c..864780e0be9bd 100644 --- a/python/pyspark/sql/functions.py +++ b/python/pyspark/sql/functions.py @@ -2313,6 +2313,10 @@ def to_json(col, options={}): >>> df = spark.createDataFrame(data, ("key", "value")) >>> df.select(to_json(df.value).alias("json")).collect() [Row(json=u'[{"name":"Alice"},{"name":"Bob"}]')] + >>> data = [(1, ["Alice", "Bob"])] + >>> df = spark.createDataFrame(data, ("key", "value")) + >>> df.select(to_json(df.value).alias("json")).collect() + [Row(json=u'["Alice","Bob"]')] """ sc = SparkContext._active_spark_context From ce3b17c9d0525c6bc27e1ded066cd7e43b597eb1 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Sat, 1 Sep 2018 11:47:24 +0200 Subject: [PATCH 10/13] Tests for R --- R/pkg/tests/fulltests/test_sparkSQL.R | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/R/pkg/tests/fulltests/test_sparkSQL.R b/R/pkg/tests/fulltests/test_sparkSQL.R index e1f3cf339e83f..abc2f5cfd4d43 100644 --- a/R/pkg/tests/fulltests/test_sparkSQL.R +++ b/R/pkg/tests/fulltests/test_sparkSQL.R @@ -1667,6 +1667,15 @@ test_that("column functions", { expect_true(any(apply(s, 1, function(x) { x[[1]]$age == 16 }))) } + # Test to_json() supports arrays of primitive types and arrays + df <- sql("SELECT array(19, 42, 70) as age") + j <- collect(select(df, alias(to_json(df$age), "json"))) + expect_equal(j[order(j$json), ][1], "[19,42,70]") + + df <- sql("SELECT array(array(1, 2), array(3, 4)) as matrix") + j <- collect(select(df, alias(to_json(df$matrix), "json"))) + expect_equal(j[order(j$json), ][1], "[[1,2],[3,4]]") + # passing option df <- as.DataFrame(list(list("col" = "{\"date\":\"21/10/2014\"}"))) schema2 <- structType(structField("date", "date")) From 7d1cde4d2f5045a5d42ce3111101ec6c9595ab2e Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Mon, 3 Sep 2018 11:37:28 +0200 Subject: [PATCH 11/13] Minor refactoring to reduce diff --- .../sql/catalyst/json/JacksonUtils.scala | 26 ++++++++++--------- 1 file changed, 14 insertions(+), 12 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonUtils.scala index d438bccba911d..2d89c7066d080 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonUtils.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonUtils.scala @@ -32,23 +32,25 @@ object JacksonUtils { } } - def verifyType(name: String, dataType: DataType): Unit = dataType match { - case NullType | BooleanType | ByteType | ShortType | IntegerType | LongType | FloatType | - DoubleType | StringType | TimestampType | DateType | BinaryType | _: DecimalType => + def verifyType(name: String, dataType: DataType): Unit = { + dataType match { + case NullType | BooleanType | ByteType | ShortType | IntegerType | LongType | FloatType | + DoubleType | StringType | TimestampType | DateType | BinaryType | _: DecimalType => - case st: StructType => st.foreach(field => verifyType(field.name, field.dataType)) + case st: StructType => st.foreach(field => verifyType(field.name, field.dataType)) - case at: ArrayType => verifyType(name, at.elementType) + case at: ArrayType => verifyType(name, at.elementType) - // For MapType, its keys are treated as a string (i.e. calling `toString`) basically when - // generating JSON, so we only care if the values are valid for JSON. - case mt: MapType => verifyType(name, mt.valueType) + // For MapType, its keys are treated as a string (i.e. calling `toString`) basically when + // generating JSON, so we only care if the values are valid for JSON. + case mt: MapType => verifyType(name, mt.valueType) - case udt: UserDefinedType[_] => verifyType(name, udt.sqlType) + case udt: UserDefinedType[_] => verifyType(name, udt.sqlType) - case _ => - throw new UnsupportedOperationException( - s"Unable to convert column $name of type ${dataType.catalogString} to JSON.") + case _ => + throw new UnsupportedOperationException( + s"Unable to convert column $name of type ${dataType.catalogString} to JSON.") + } } /** From 72d2628323af4e44da1083c99c0d4996c34e4c8c Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Mon, 3 Sep 2018 13:49:31 +0200 Subject: [PATCH 12/13] Code simplification for maps and structs as elements of arrays --- .../spark/sql/catalyst/json/JacksonGenerator.scala | 9 +-------- 1 file changed, 1 insertion(+), 8 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala index ef351e33162ea..9b86d865622dc 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala @@ -58,14 +58,7 @@ private[sql] class JacksonGenerator( // `ValueWriter` for array data storing rows of the schema. private lazy val arrElementWriter: ValueWriter = dataType match { case at: ArrayType => makeWriter(at.elementType) - case st: StructType => - (arr: SpecializedGetters, i: Int) => { - writeObject(writeFields(arr.getStruct(i, st.length), st, rootFieldWriters)) - } - case mt: MapType => - (arr: SpecializedGetters, i: Int) => { - writeObject(writeMapData(arr.getMap(i), mt, mapElementWriter)) - } + case _: StructType | _: MapType => makeWriter(dataType) case _ => throw new UnsupportedOperationException( s"Initial type ${dataType.catalogString} must be " + s"an ${ArrayType.simpleString}, a ${StructType.simpleString} or a ${MapType.simpleString}") From 90c968772d74c8aadc7a4d0e74e226554b921486 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Tue, 4 Sep 2018 13:27:44 +0200 Subject: [PATCH 13/13] Addressing Hyukjin Kwon's review comments --- R/pkg/R/functions.R | 2 +- .../expressions/jsonExpressions.scala | 20 +++++++++---------- 2 files changed, 11 insertions(+), 11 deletions(-) diff --git a/R/pkg/R/functions.R b/R/pkg/R/functions.R index 17984636a010b..7546f13d9b89c 100644 --- a/R/pkg/R/functions.R +++ b/R/pkg/R/functions.R @@ -1698,7 +1698,7 @@ setMethod("to_date", #' @details #' \code{to_json}: Converts a column containing a \code{structType}, a \code{mapType} -#' or an array into a Column of JSON string. +#' or an \code{arrayType} into a Column of JSON string. #' Resolving the Column can fail if an unsupported type is encountered. #' #' @rdname column_collection_functions diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala index 0ea128709fa98..bd9090a07471b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala @@ -659,10 +659,10 @@ case class StructsToJson( @transient lazy val gen = new JacksonGenerator( - rowSchema, writer, new JSONOptions(options, timeZoneId.get)) + inputSchema, writer, new JSONOptions(options, timeZoneId.get)) @transient - lazy val rowSchema = child.dataType + lazy val inputSchema = child.dataType // This converts rows to the JSON output according to the given schema. @transient @@ -674,7 +674,7 @@ case class StructsToJson( UTF8String.fromString(json) } - child.dataType match { + inputSchema match { case _: StructType => (row: Any) => gen.write(row.asInstanceOf[InternalRow]) @@ -692,28 +692,28 @@ case class StructsToJson( override def dataType: DataType = StringType - override def checkInputDataTypes(): TypeCheckResult = child.dataType match { - case _: StructType => + override def checkInputDataTypes(): TypeCheckResult = inputSchema match { + case struct: StructType => try { - JacksonUtils.verifySchema(rowSchema.asInstanceOf[StructType]) + JacksonUtils.verifySchema(struct) TypeCheckResult.TypeCheckSuccess } catch { case e: UnsupportedOperationException => TypeCheckResult.TypeCheckFailure(e.getMessage) } - case _: MapType => + case map: MapType => // TODO: let `JacksonUtils.verifySchema` verify a `MapType` try { - val st = StructType(StructField("a", rowSchema.asInstanceOf[MapType]) :: Nil) + val st = StructType(StructField("a", map) :: Nil) JacksonUtils.verifySchema(st) TypeCheckResult.TypeCheckSuccess } catch { case e: UnsupportedOperationException => TypeCheckResult.TypeCheckFailure(e.getMessage) } - case _: ArrayType => + case array: ArrayType => try { - JacksonUtils.verifyType(prettyName, rowSchema) + JacksonUtils.verifyType(prettyName, array) TypeCheckResult.TypeCheckSuccess } catch { case e: UnsupportedOperationException =>