Skip to content

Commit

Permalink
Support arrays of any types in to_json
Browse files Browse the repository at this point in the history
  • Loading branch information
MaxGekk committed Aug 24, 2018
1 parent ab33028 commit fb72726
Show file tree
Hide file tree
Showing 7 changed files with 100 additions and 59 deletions.
4 changes: 2 additions & 2 deletions R/pkg/R/functions.R
Original file line number Diff line number Diff line change
Expand Up @@ -1697,8 +1697,8 @@ setMethod("to_date",
})

#' @details
#' \code{to_json}: Converts a column containing a \code{structType}, array of \code{structType},
#' a \code{mapType} or array of \code{mapType} into a Column of JSON string.
#' \code{to_json}: Converts a column containing a \code{structType}, a \code{mapType}
#' or an array into a Column of JSON string.
#' Resolving the Column can fail if an unsupported type is encountered.
#'
#' @rdname column_collection_functions
Expand Down
6 changes: 2 additions & 4 deletions python/pyspark/sql/functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -2289,12 +2289,10 @@ def from_json(col, schema, options={}):
@since(2.1)
def to_json(col, options={}):
"""
Converts a column containing a :class:`StructType`, :class:`ArrayType` of
:class:`StructType`\\s, a :class:`MapType` or :class:`ArrayType` of :class:`MapType`\\s
Converts a column containing a :class:`StructType`, :class:`ArrayType` or a :class:`MapType`
into a JSON string. Throws an exception, in the case of an unsupported type.
:param col: name of column containing the struct, array of the structs, the map or
array of the maps.
:param col: name of column containing a struct, an array or a map.
:param options: options to control converting. accepts the same options as the json datasource
>>> from pyspark.sql import Row
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -613,8 +613,7 @@ case class JsonToStructs(
}

/**
* Converts a [[StructType]], [[ArrayType]] of [[StructType]]s, [[MapType]]
* or [[ArrayType]] of [[MapType]]s to a json output string.
* Converts a [[StructType]], [[ArrayType]] or [[MapType]] to a json output string.
*/
// scalastyle:off line.size.limit
@ExpressionDescription(
Expand Down Expand Up @@ -663,12 +662,7 @@ case class StructsToJson(
rowSchema, writer, new JSONOptions(options, timeZoneId.get))

@transient
lazy val rowSchema = child.dataType match {
case st: StructType => st
case ArrayType(st: StructType, _) => st
case mt: MapType => mt
case ArrayType(mt: MapType, _) => mt
}
lazy val rowSchema = child.dataType

// This converts rows to the JSON output according to the given schema.
@transient
Expand All @@ -685,33 +679,29 @@ case class StructsToJson(
(row: Any) =>
gen.write(row.asInstanceOf[InternalRow])
getAndReset()
case ArrayType(_: StructType, _) =>
case _: ArrayType =>
(arr: Any) =>
gen.write(arr.asInstanceOf[ArrayData])
getAndReset()
case _: MapType =>
(map: Any) =>
gen.write(map.asInstanceOf[MapData])
getAndReset()
case ArrayType(_: MapType, _) =>
(arr: Any) =>
gen.write(arr.asInstanceOf[ArrayData])
getAndReset()
}
}

override def dataType: DataType = StringType

override def checkInputDataTypes(): TypeCheckResult = child.dataType match {
case _: StructType | ArrayType(_: StructType, _) =>
case _: StructType =>
try {
JacksonUtils.verifySchema(rowSchema.asInstanceOf[StructType])
TypeCheckResult.TypeCheckSuccess
} catch {
case e: UnsupportedOperationException =>
TypeCheckResult.TypeCheckFailure(e.getMessage)
}
case _: MapType | ArrayType(_: MapType, _) =>
case _: MapType =>
// TODO: let `JacksonUtils.verifySchema` verify a `MapType`
try {
val st = StructType(StructField("a", rowSchema.asInstanceOf[MapType]) :: Nil)
Expand All @@ -721,6 +711,14 @@ case class StructsToJson(
case e: UnsupportedOperationException =>
TypeCheckResult.TypeCheckFailure(e.getMessage)
}
case _: ArrayType =>
try {
JacksonUtils.verifyType(prettyName, rowSchema)
TypeCheckResult.TypeCheckSuccess
} catch {
case e: UnsupportedOperationException =>
TypeCheckResult.TypeCheckFailure(e.getMessage)
}
case _ => TypeCheckResult.TypeCheckFailure(
s"Input type ${child.dataType.catalogString} must be a struct, array of structs or " +
"a map or array of map.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import org.apache.spark.sql.catalyst.util.{ArrayData, DateTimeUtils, MapData}
import org.apache.spark.sql.types._

/**
* `JackGenerator` can only be initialized with a `StructType` or a `MapType`.
* `JackGenerator` can only be initialized with a `StructType`, a `MapType` ot `ArrayType`.
* Once it is initialized with `StructType`, it can be used to write out a struct or an array of
* struct. Once it is initialized with `MapType`, it can be used to write out a map or an array
* of map. An exception will be thrown if trying to write out a struct if it is initialized with
Expand All @@ -43,10 +43,11 @@ private[sql] class JacksonGenerator(
// we can directly access data in `ArrayData` without the help of `SpecificMutableRow`.
private type ValueWriter = (SpecializedGetters, Int) => Unit

// `JackGenerator` can only be initialized with a `StructType` or a `MapType`.
require(dataType.isInstanceOf[StructType] || dataType.isInstanceOf[MapType],
// `JackGenerator` can only be initialized with a `StructType`, a `MapType` or a `ArrayType`.
require(dataType.isInstanceOf[StructType] || dataType.isInstanceOf[MapType]
|| dataType.isInstanceOf[ArrayType],
s"JacksonGenerator only supports to be initialized with a ${StructType.simpleString} " +
s"or ${MapType.simpleString} but got ${dataType.catalogString}")
s"${MapType.simpleString} or ${ArrayType.simpleString} but got ${dataType.catalogString}")

// `ValueWriter`s for all fields of the schema
private lazy val rootFieldWriters: Array[ValueWriter] = dataType match {
Expand All @@ -57,14 +58,9 @@ private[sql] class JacksonGenerator(

// `ValueWriter` for array data storing rows of the schema.
private lazy val arrElementWriter: ValueWriter = dataType match {
case st: StructType =>
(arr: SpecializedGetters, i: Int) => {
writeObject(writeFields(arr.getStruct(i, st.length), st, rootFieldWriters))
}
case mt: MapType =>
(arr: SpecializedGetters, i: Int) => {
writeObject(writeMapData(arr.getMap(i), mt, mapElementWriter))
}
case at: ArrayType => makeWriter(at.elementType)
case _ => throw new UnsupportedOperationException(
s"Initial type ${dataType.catalogString} must be a array")
}

private lazy val mapElementWriter: ValueWriter = dataType match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,29 +32,29 @@ object JacksonUtils {
}
}

/**
* Verify if the schema is supported in JSON parsing.
*/
def verifySchema(schema: StructType): Unit = {
def verifyType(name: String, dataType: DataType): Unit = dataType match {
case NullType | BooleanType | ByteType | ShortType | IntegerType | LongType | FloatType |
DoubleType | StringType | TimestampType | DateType | BinaryType | _: DecimalType =>
def verifyType(name: String, dataType: DataType): Unit = dataType match {
case NullType | BooleanType | ByteType | ShortType | IntegerType | LongType | FloatType |
DoubleType | StringType | TimestampType | DateType | BinaryType | _: DecimalType =>

case st: StructType => st.foreach(field => verifyType(field.name, field.dataType))
case st: StructType => st.foreach(field => verifyType(field.name, field.dataType))

case at: ArrayType => verifyType(name, at.elementType)
case at: ArrayType => verifyType(name, at.elementType)

// For MapType, its keys are treated as a string (i.e. calling `toString`) basically when
// generating JSON, so we only care if the values are valid for JSON.
case mt: MapType => verifyType(name, mt.valueType)
// For MapType, its keys are treated as a string (i.e. calling `toString`) basically when
// generating JSON, so we only care if the values are valid for JSON.
case mt: MapType => verifyType(name, mt.valueType)

case udt: UserDefinedType[_] => verifyType(name, udt.sqlType)
case udt: UserDefinedType[_] => verifyType(name, udt.sqlType)

case _ =>
throw new UnsupportedOperationException(
s"Unable to convert column $name of type ${dataType.catalogString} to JSON.")
}
case _ =>
throw new UnsupportedOperationException(
s"Unable to convert column $name of type ${dataType.catalogString} to JSON.")
}

/**
* Verify if the schema is supported in JSON parsing.
*/
def verifySchema(schema: StructType): Unit = {
schema.foreach(field => verifyType(field.name, field.dataType))
}
}
18 changes: 9 additions & 9 deletions sql/core/src/main/scala/org/apache/spark/sql/functions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3493,11 +3493,11 @@ object functions {
def schema_of_json(e: Column): Column = withExpr(new SchemaOfJson(e.expr))

/**
* (Scala-specific) Converts a column containing a `StructType`, `ArrayType` of `StructType`s,
* a `MapType` or `ArrayType` of `MapType`s into a JSON string with the specified schema.
* (Scala-specific) Converts a column containing a `StructType`, `ArrayType` or
* a `MapType` into a JSON string with the specified schema.
* Throws an exception, in the case of an unsupported type.
*
* @param e a column containing a struct or array of the structs.
* @param e a column containing a struct, a array or a map.
* @param options options to control how the struct column is converted into a json string.
* accepts the same options and the json data source.
*
Expand All @@ -3509,11 +3509,11 @@ object functions {
}

/**
* (Java-specific) Converts a column containing a `StructType`, `ArrayType` of `StructType`s,
* a `MapType` or `ArrayType` of `MapType`s into a JSON string with the specified schema.
* (Java-specific) Converts a column containing a `StructType`, `ArrayType` or
* a `MapType` into a JSON string with the specified schema.
* Throws an exception, in the case of an unsupported type.
*
* @param e a column containing a struct or array of the structs.
* @param e a column containing a struct, an array or a map.
* @param options options to control how the struct column is converted into a json string.
* accepts the same options and the json data source.
*
Expand All @@ -3524,11 +3524,11 @@ object functions {
to_json(e, options.asScala.toMap)

/**
* Converts a column containing a `StructType`, `ArrayType` of `StructType`s,
* a `MapType` or `ArrayType` of `MapType`s into a JSON string with the specified schema.
* Converts a column containing a `StructType`, `ArrayType` or
* a `MapType` into a JSON string with the specified schema.
* Throws an exception, in the case of an unsupported type.
*
* @param e a column containing a struct or array of the structs.
* @param e a column containing a struct, an array or a map.
*
* @group collection_funcs
* @since 2.1.0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -469,4 +469,53 @@ class JsonFunctionsSuite extends QueryTest with SharedSQLContext {

checkAnswer(sql("""select json[0] from jsonTable"""), Seq(Row(null)))
}

test("to_json - array of primitive type") {
val df = Seq(Array(1, 2, 3)).toDF("a")
checkAnswer(df.select(to_json($"a")), Seq(Row("[1,2,3]")))
}

test("roundtrip to_json -> from_json - array of primitive types") {
val arr = Array(1, 2, 3)
val df = Seq(arr).toDF("a")
checkAnswer(df.select(from_json(to_json($"a"), ArrayType(IntegerType))), Row(arr))
}

test("roundtrip from_json -> to_json - array of primitive types") {
val json = "[1,2,3]"
val df = Seq(json).toDF("a")
val schema = new ArrayType(IntegerType, false)

checkAnswer(df.select(to_json(from_json($"a", schema))), Seq(Row(json)))
}

test("roundtrip from_json -> to_json - array of arrays") {
val json = "[[1],[2,3],[4,5,6]]"
val jsonDF = Seq(json).toDF("a")
val schema = new ArrayType(ArrayType(IntegerType, false), false)

checkAnswer(
jsonDF.select(to_json(from_json($"a", schema))),
Seq(Row(json)))
}

test("roundtrip from_json -> to_json - array of maps") {
val json = """[{"a":1},{"b":2}]"""
val jsonDF = Seq(json).toDF("a")
val schema = new ArrayType(MapType(StringType, IntegerType, false), false)

checkAnswer(
jsonDF.select(to_json(from_json($"a", schema))),
Seq(Row(json)))
}

test("roundtrip from_json -> to_json - array of structs") {
val json = """[{"a":1},{"a":2},{"a":3}]"""
val jsonDF = Seq(json).toDF("a")
val schema = new ArrayType(new StructType().add("a", IntegerType), false)

checkAnswer(
jsonDF.select(to_json(from_json($"a", schema))),
Seq(Row(json)))
}
}

0 comments on commit fb72726

Please sign in to comment.