Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-25252][SQL] Support arrays of any types by to_json #22226

Closed
wants to merge 13 commits into from
4 changes: 2 additions & 2 deletions R/pkg/R/functions.R
Original file line number Diff line number Diff line change
Expand Up @@ -1697,8 +1697,8 @@ setMethod("to_date",
})

#' @details
#' \code{to_json}: Converts a column containing a \code{structType}, array of \code{structType},
#' a \code{mapType} or array of \code{mapType} into a Column of JSON string.
#' \code{to_json}: Converts a column containing a \code{structType}, a \code{mapType}
#' or an array into a Column of JSON string.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does \code{arrayType} work here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it should
could we add some tests for this in R?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's add one simple python doctest as well

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added tests for Python and R. Please, take a look at them.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

\code{arrayType}. It seems missed.

#' Resolving the Column can fail if an unsupported type is encountered.
#'
#' @rdname column_collection_functions
Expand Down
6 changes: 2 additions & 4 deletions python/pyspark/sql/functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -2289,12 +2289,10 @@ def from_json(col, schema, options={}):
@since(2.1)
def to_json(col, options={}):
"""
Converts a column containing a :class:`StructType`, :class:`ArrayType` of
:class:`StructType`\\s, a :class:`MapType` or :class:`ArrayType` of :class:`MapType`\\s
Converts a column containing a :class:`StructType`, :class:`ArrayType` or a :class:`MapType`
into a JSON string. Throws an exception, in the case of an unsupported type.

:param col: name of column containing the struct, array of the structs, the map or
array of the maps.
:param col: name of column containing a struct, an array or a map.
:param options: options to control converting. accepts the same options as the json datasource
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto


>>> from pyspark.sql import Row
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -613,8 +613,7 @@ case class JsonToStructs(
}

/**
* Converts a [[StructType]], [[ArrayType]] of [[StructType]]s, [[MapType]]
* or [[ArrayType]] of [[MapType]]s to a json output string.
* Converts a [[StructType]], [[ArrayType]] or [[MapType]] to a json output string.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not a big deal but JSON while we are here

*/
// scalastyle:off line.size.limit
@ExpressionDescription(
Expand Down Expand Up @@ -663,12 +662,7 @@ case class StructsToJson(
rowSchema, writer, new JSONOptions(options, timeZoneId.get))

@transient
lazy val rowSchema = child.dataType match {
case st: StructType => st
case ArrayType(st: StructType, _) => st
case mt: MapType => mt
case ArrayType(mt: MapType, _) => mt
}
lazy val rowSchema = child.dataType
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: rowSchema -> intputSchema. I named this to rowSchema because it was always the schema for the row itself. Now, it seems can be other types as well.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's make it val.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried to remove lazy and got many errors on tests like:

Invalid call to dataType on unresolved object, tree: 'a
org.apache.spark.sql.catalyst.analysis.UnresolvedException: Invalid call to dataType on unresolved object, tree: 'a
	at org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute.dataType(unresolved.scala:105)
	at org.apache.spark.sql.catalyst.expressions.StructsToJson.<init>(jsonExpressions.scala:665)

If you don't mind, I will keep it lazy.


// This converts rows to the JSON output according to the given schema.
@transient
Expand All @@ -685,33 +679,29 @@ case class StructsToJson(
(row: Any) =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

child.dataType -> intputSchema

gen.write(row.asInstanceOf[InternalRow])
getAndReset()
case ArrayType(_: StructType, _) =>
case _: ArrayType =>
(arr: Any) =>
gen.write(arr.asInstanceOf[ArrayData])
getAndReset()
case _: MapType =>
(map: Any) =>
gen.write(map.asInstanceOf[MapData])
getAndReset()
case ArrayType(_: MapType, _) =>
(arr: Any) =>
gen.write(arr.asInstanceOf[ArrayData])
getAndReset()
}
}

override def dataType: DataType = StringType

override def checkInputDataTypes(): TypeCheckResult = child.dataType match {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: child.dataType -> inputSchema

case _: StructType | ArrayType(_: StructType, _) =>
case _: StructType =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: case _: StructType and use it instead of rowSchema.asInstanceOf[StructType].

try {
JacksonUtils.verifySchema(rowSchema.asInstanceOf[StructType])
TypeCheckResult.TypeCheckSuccess
} catch {
case e: UnsupportedOperationException =>
TypeCheckResult.TypeCheckFailure(e.getMessage)
}
case _: MapType | ArrayType(_: MapType, _) =>
case _: MapType =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit case mapType: Maptype => and use it below likewise.

// TODO: let `JacksonUtils.verifySchema` verify a `MapType`
try {
val st = StructType(StructField("a", rowSchema.asInstanceOf[MapType]) :: Nil)
Expand All @@ -721,6 +711,14 @@ case class StructsToJson(
case e: UnsupportedOperationException =>
TypeCheckResult.TypeCheckFailure(e.getMessage)
}
case _: ArrayType =>
try {
JacksonUtils.verifyType(prettyName, rowSchema)
TypeCheckResult.TypeCheckSuccess
} catch {
case e: UnsupportedOperationException =>
TypeCheckResult.TypeCheckFailure(e.getMessage)
}
case _ => TypeCheckResult.TypeCheckFailure(
s"Input type ${child.dataType.catalogString} must be a struct, array of structs or " +
"a map or array of map.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package org.apache.spark.sql.catalyst.json

import java.io.Writer
import java.nio.charset.StandardCharsets

import com.fasterxml.jackson.core._

Expand All @@ -28,7 +27,7 @@ import org.apache.spark.sql.catalyst.util.{ArrayData, DateTimeUtils, MapData}
import org.apache.spark.sql.types._

/**
* `JackGenerator` can only be initialized with a `StructType` or a `MapType`.
* `JackGenerator` can only be initialized with a `StructType`, a `MapType` ot `ArrayType`.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo: ot

* Once it is initialized with `StructType`, it can be used to write out a struct or an array of
* struct. Once it is initialized with `MapType`, it can be used to write out a map or an array
* of map. An exception will be thrown if trying to write out a struct if it is initialized with
Expand All @@ -43,20 +42,22 @@ private[sql] class JacksonGenerator(
// we can directly access data in `ArrayData` without the help of `SpecificMutableRow`.
private type ValueWriter = (SpecializedGetters, Int) => Unit

// `JackGenerator` can only be initialized with a `StructType` or a `MapType`.
require(dataType.isInstanceOf[StructType] || dataType.isInstanceOf[MapType],
// `JackGenerator` can only be initialized with a `StructType`, a `MapType` or a `ArrayType`.
require(dataType.isInstanceOf[StructType] || dataType.isInstanceOf[MapType]
|| dataType.isInstanceOf[ArrayType],
s"JacksonGenerator only supports to be initialized with a ${StructType.simpleString} " +
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe need a , between ${StructType.simpleString} and ${MapType.simpleString}?

s"or ${MapType.simpleString} but got ${dataType.catalogString}")
s"${MapType.simpleString} or ${ArrayType.simpleString} but got ${dataType.catalogString}")

// `ValueWriter`s for all fields of the schema
private lazy val rootFieldWriters: Array[ValueWriter] = dataType match {
case st: StructType => st.map(_.dataType).map(makeWriter).toArray
case _ => throw new UnsupportedOperationException(
s"Initial type ${dataType.catalogString} must be a struct")
s"Initial type ${dataType.catalogString} must be a ${StructType.simpleString}")
}

// `ValueWriter` for array data storing rows of the schema.
private lazy val arrElementWriter: ValueWriter = dataType match {
case at: ArrayType => makeWriter(at.elementType)
case st: StructType =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we do case _: StructType | _: MapType => makeWriter(dataType)?

(arr: SpecializedGetters, i: Int) => {
writeObject(writeFields(arr.getStruct(i, st.length), st, rootFieldWriters))
Expand All @@ -65,12 +66,15 @@ private[sql] class JacksonGenerator(
(arr: SpecializedGetters, i: Int) => {
writeObject(writeMapData(arr.getMap(i), mt, mapElementWriter))
}
case _ => throw new UnsupportedOperationException(
s"Initial type ${dataType.catalogString} must be " +
s"an ${ArrayType.simpleString}, a ${StructType.simpleString} or a ${MapType.simpleString}")
}

private lazy val mapElementWriter: ValueWriter = dataType match {
case mt: MapType => makeWriter(mt.valueType)
case _ => throw new UnsupportedOperationException(
s"Initial type ${dataType.catalogString} must be a map")
s"Initial type ${dataType.catalogString} must be a ${MapType.simpleString}")
}

private val gen = new JsonFactory().createGenerator(writer).setRootValueSeparator(null)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,29 +32,29 @@ object JacksonUtils {
}
}

/**
* Verify if the schema is supported in JSON parsing.
*/
def verifySchema(schema: StructType): Unit = {
def verifyType(name: String, dataType: DataType): Unit = dataType match {
case NullType | BooleanType | ByteType | ShortType | IntegerType | LongType | FloatType |
DoubleType | StringType | TimestampType | DateType | BinaryType | _: DecimalType =>
def verifyType(name: String, dataType: DataType): Unit = dataType match {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

private?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I extracted it to use outside

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can do:

def verifyType(name: String, dataType: DataType): Unit = {
  dataType match {
    case ...
  }
}

to reduce the diff.

case NullType | BooleanType | ByteType | ShortType | IntegerType | LongType | FloatType |
DoubleType | StringType | TimestampType | DateType | BinaryType | _: DecimalType =>

case st: StructType => st.foreach(field => verifyType(field.name, field.dataType))
case st: StructType => st.foreach(field => verifyType(field.name, field.dataType))

case at: ArrayType => verifyType(name, at.elementType)
case at: ArrayType => verifyType(name, at.elementType)

// For MapType, its keys are treated as a string (i.e. calling `toString`) basically when
// generating JSON, so we only care if the values are valid for JSON.
case mt: MapType => verifyType(name, mt.valueType)
// For MapType, its keys are treated as a string (i.e. calling `toString`) basically when
// generating JSON, so we only care if the values are valid for JSON.
case mt: MapType => verifyType(name, mt.valueType)

case udt: UserDefinedType[_] => verifyType(name, udt.sqlType)
case udt: UserDefinedType[_] => verifyType(name, udt.sqlType)

case _ =>
throw new UnsupportedOperationException(
s"Unable to convert column $name of type ${dataType.catalogString} to JSON.")
}
case _ =>
throw new UnsupportedOperationException(
s"Unable to convert column $name of type ${dataType.catalogString} to JSON.")
}

/**
* Verify if the schema is supported in JSON parsing.
*/
def verifySchema(schema: StructType): Unit = {
schema.foreach(field => verifyType(field.name, field.dataType))
}
}
18 changes: 9 additions & 9 deletions sql/core/src/main/scala/org/apache/spark/sql/functions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3493,11 +3493,11 @@ object functions {
def schema_of_json(e: Column): Column = withExpr(new SchemaOfJson(e.expr))

/**
* (Scala-specific) Converts a column containing a `StructType`, `ArrayType` of `StructType`s,
* a `MapType` or `ArrayType` of `MapType`s into a JSON string with the specified schema.
* (Scala-specific) Converts a column containing a `StructType`, `ArrayType` or
* a `MapType` into a JSON string with the specified schema.
* Throws an exception, in the case of an unsupported type.
*
* @param e a column containing a struct or array of the structs.
* @param e a column containing a struct, a array or a map.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo: a array -> an array

* @param options options to control how the struct column is converted into a json string.
* accepts the same options and the json data source.
*
Expand All @@ -3509,11 +3509,11 @@ object functions {
}

/**
* (Java-specific) Converts a column containing a `StructType`, `ArrayType` of `StructType`s,
* a `MapType` or `ArrayType` of `MapType`s into a JSON string with the specified schema.
* (Java-specific) Converts a column containing a `StructType`, `ArrayType` or
* a `MapType` into a JSON string with the specified schema.
* Throws an exception, in the case of an unsupported type.
*
* @param e a column containing a struct or array of the structs.
* @param e a column containing a struct, an array or a map.
* @param options options to control how the struct column is converted into a json string.
* accepts the same options and the json data source.
*
Expand All @@ -3524,11 +3524,11 @@ object functions {
to_json(e, options.asScala.toMap)

/**
* Converts a column containing a `StructType`, `ArrayType` of `StructType`s,
* a `MapType` or `ArrayType` of `MapType`s into a JSON string with the specified schema.
* Converts a column containing a `StructType`, `ArrayType` or
* a `MapType` into a JSON string with the specified schema.
* Throws an exception, in the case of an unsupported type.
*
* @param e a column containing a struct or array of the structs.
* @param e a column containing a struct, an array or a map.
*
* @group collection_funcs
* @since 2.1.0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,3 +51,8 @@ select from_json('[null, {"a":2}]', 'array<struct<a:int>>');

select from_json('[{"a": 1}, {"b":2}]', 'array<map<string,int>>');
select from_json('[{"a": 1}, 2]', 'array<map<string,int>>');

-- to_json - array type
select to_json(array('1', '2', '3'));
select to_json(array(array(1, 2, 3), array(4)));

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

super nit: add space afiter ,, e.g., select to_json(array('1', '2', '3'));

Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
-- Automatically generated by SQLQueryTestSuite
-- Number of queries: 38
-- Number of queries: 40


-- !query 0
Expand Down Expand Up @@ -354,3 +354,19 @@ select from_json('[{"a": 1}, 2]', 'array<map<string,int>>')
struct<jsontostructs([{"a": 1}, 2]):array<map<string,int>>>
-- !query 37 output
NULL


-- !query 38
select to_json(array('1', '2', '3'))
-- !query 38 schema
struct<structstojson(array(1, 2, 3)):string>
-- !query 38 output
["1","2","3"]


-- !query 39
select to_json(array(array(1, 2, 3), array(4)))
-- !query 39 schema
struct<structstojson(array(array(1, 2, 3), array(4))):string>
-- !query 39 output
[[1,2,3],[4]]
Original file line number Diff line number Diff line change
Expand Up @@ -469,4 +469,53 @@ class JsonFunctionsSuite extends QueryTest with SharedSQLContext {

checkAnswer(sql("""select json[0] from jsonTable"""), Seq(Row(null)))
}

test("to_json - array of primitive type") {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo: primitive type -> primitive types

val df = Seq(Array(1, 2, 3)).toDF("a")
checkAnswer(df.select(to_json($"a")), Seq(Row("[1,2,3]")))
}

test("roundtrip to_json -> from_json - array of primitive types") {
val arr = Array(1, 2, 3)
val df = Seq(arr).toDF("a")
checkAnswer(df.select(from_json(to_json($"a"), ArrayType(IntegerType))), Row(arr))
}

test("roundtrip from_json -> to_json - array of primitive types") {
val json = "[1,2,3]"
val df = Seq(json).toDF("a")
val schema = new ArrayType(IntegerType, false)

checkAnswer(df.select(to_json(from_json($"a", schema))), Seq(Row(json)))
}

test("roundtrip from_json -> to_json - array of arrays") {
val json = "[[1],[2,3],[4,5,6]]"
val jsonDF = Seq(json).toDF("a")
val schema = new ArrayType(ArrayType(IntegerType, false), false)

checkAnswer(
jsonDF.select(to_json(from_json($"a", schema))),
Seq(Row(json)))
}

test("roundtrip from_json -> to_json - array of maps") {
val json = """[{"a":1},{"b":2}]"""
val jsonDF = Seq(json).toDF("a")
val schema = new ArrayType(MapType(StringType, IntegerType, false), false)

checkAnswer(
jsonDF.select(to_json(from_json($"a", schema))),
Seq(Row(json)))
}

test("roundtrip from_json -> to_json - array of structs") {
val json = """[{"a":1},{"a":2},{"a":3}]"""
val jsonDF = Seq(json).toDF("a")
val schema = new ArrayType(new StructType().add("a", IntegerType), false)

checkAnswer(
jsonDF.select(to_json(from_json($"a", schema))),
Seq(Row(json)))
}
}