Skip to content

Commit

Permalink
[SPARK-26246][SQL] Inferring TimestampType from JSON
Browse files Browse the repository at this point in the history
## What changes were proposed in this pull request?

The `JsonInferSchema` class is extended to support `TimestampType` inferring from string fields in JSON input:
- If the `prefersDecimal` option is set to `true`, it tries to infer decimal type from the string field.
- If decimal type inference fails or `prefersDecimal` is disabled, `JsonInferSchema` tries to infer `TimestampType`.
- If timestamp type inference fails, `StringType` is returned as the inferred type.

## How was this patch tested?

Added new test suite - `JsonInferSchemaSuite` to check date and timestamp types inferring from JSON using `JsonInferSchema` directly. A few tests were added `JsonSuite` to check type merging and roundtrip tests. This changes was tested by `JsonSuite`, `JsonExpressionsSuite` and `JsonFunctionsSuite` as well.

Closes apache#23201 from MaxGekk/json-infer-time.

Lead-authored-by: Maxim Gekk <maxim.gekk@databricks.com>
Co-authored-by: Maxim Gekk <max.gekk@gmail.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
  • Loading branch information
2 people authored and jackylee-ch committed Feb 18, 2019
1 parent bde0481 commit 82363b8
Show file tree
Hide file tree
Showing 3 changed files with 171 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.analysis.TypeCoercion
import org.apache.spark.sql.catalyst.expressions.ExprUtils
import org.apache.spark.sql.catalyst.json.JacksonUtils.nextUntil
import org.apache.spark.sql.catalyst.util.{DropMalformedMode, FailFastMode, ParseMode, PermissiveMode}
import org.apache.spark.sql.catalyst.util._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
import org.apache.spark.util.Utils
Expand All @@ -37,6 +37,12 @@ private[sql] class JsonInferSchema(options: JSONOptions) extends Serializable {

private val decimalParser = ExprUtils.getDecimalParser(options.locale)

@transient
private lazy val timestampFormatter = TimestampFormatter(
options.timestampFormat,
options.timeZone,
options.locale)

/**
* Infer the type of a collection of json records in three stages:
* 1. Infer the type of each record
Expand Down Expand Up @@ -115,13 +121,19 @@ private[sql] class JsonInferSchema(options: JSONOptions) extends Serializable {
// record fields' types have been combined.
NullType

case VALUE_STRING if options.prefersDecimal =>
case VALUE_STRING =>
val field = parser.getText
val decimalTry = allCatch opt {
val bigDecimal = decimalParser(parser.getText)
val bigDecimal = decimalParser(field)
DecimalType(bigDecimal.precision, bigDecimal.scale)
}
decimalTry.getOrElse(StringType)
case VALUE_STRING => StringType
if (options.prefersDecimal && decimalTry.isDefined) {
decimalTry.get
} else if ((allCatch opt timestampFormatter.parse(field)).isDefined) {
TimestampType
} else {
StringType
}

case START_OBJECT =>
val builder = Array.newBuilder[StructField]
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.catalyst.json

import com.fasterxml.jackson.core.JsonFactory

import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.catalyst.plans.SQLHelper
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._

class JsonInferSchemaSuite extends SparkFunSuite with SQLHelper {

def checkType(options: Map[String, String], json: String, dt: DataType): Unit = {
val jsonOptions = new JSONOptions(options, "UTC", "")
val inferSchema = new JsonInferSchema(jsonOptions)
val factory = new JsonFactory()
jsonOptions.setJacksonOptions(factory)
val parser = CreateJacksonParser.string(factory, json)
parser.nextToken()
val expectedType = StructType(Seq(StructField("a", dt, true)))

assert(inferSchema.inferField(parser) === expectedType)
}

def checkTimestampType(pattern: String, json: String): Unit = {
checkType(Map("timestampFormat" -> pattern), json, TimestampType)
}

test("inferring timestamp type") {
Seq(true, false).foreach { legacyParser =>
withSQLConf(SQLConf.LEGACY_TIME_PARSER_ENABLED.key -> legacyParser.toString) {
checkTimestampType("yyyy", """{"a": "2018"}""")
checkTimestampType("yyyy=MM", """{"a": "2018=12"}""")
checkTimestampType("yyyy MM dd", """{"a": "2018 12 02"}""")
checkTimestampType(
"yyyy-MM-dd'T'HH:mm:ss.SSS",
"""{"a": "2018-12-02T21:04:00.123"}""")
checkTimestampType(
"yyyy-MM-dd'T'HH:mm:ss.SSSSSSXXX",
"""{"a": "2018-12-02T21:04:00.123567+01:00"}""")
}
}
}

test("prefer decimals over timestamps") {
Seq(true, false).foreach { legacyParser =>
withSQLConf(SQLConf.LEGACY_TIME_PARSER_ENABLED.key -> legacyParser.toString) {
checkType(
options = Map(
"prefersDecimal" -> "true",
"timestampFormat" -> "yyyyMMdd.HHmmssSSS"
),
json = """{"a": "20181202.210400123"}""",
dt = DecimalType(17, 9)
)
}
}
}

test("skip decimal type inferring") {
Seq(true, false).foreach { legacyParser =>
withSQLConf(SQLConf.LEGACY_TIME_PARSER_ENABLED.key -> legacyParser.toString) {
checkType(
options = Map(
"prefersDecimal" -> "false",
"timestampFormat" -> "yyyyMMdd.HHmmssSSS"
),
json = """{"a": "20181202.210400123"}""",
dt = TimestampType
)
}
}
}

test("fallback to string type") {
Seq(true, false).foreach { legacyParser =>
withSQLConf(SQLConf.LEGACY_TIME_PARSER_ENABLED.key -> legacyParser.toString) {
checkType(
options = Map("timestampFormat" -> "yyyy,MM,dd.HHmmssSSS"),
json = """{"a": "20181202.210400123"}""",
dt = StringType
)
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import org.apache.spark.sql.execution.datasources.DataSource
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.sql.types._
import org.apache.spark.sql.types.StructType.fromDDL
import org.apache.spark.util.Utils

class TestFileFilter extends PathFilter {
Expand Down Expand Up @@ -2589,4 +2590,55 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
Row(null, Array(0, 1, 2), "abc", """{"a":"-","b":[0, 1, 2],"c":"abc"}""") ::
Row(0.1, null, "def", """{"a":0.1,"b":{},"c":"def"}""") :: Nil)
}

test("inferring timestamp type") {
Seq(true, false).foreach { legacyParser =>
withSQLConf(SQLConf.LEGACY_TIME_PARSER_ENABLED.key -> legacyParser.toString) {
def schemaOf(jsons: String*): StructType = spark.read.json(jsons.toDS).schema

assert(schemaOf(
"""{"a":"2018-12-17T10:11:12.123-01:00"}""",
"""{"a":"2018-12-16T22:23:24.123-02:00"}""") === fromDDL("a timestamp"))

assert(schemaOf("""{"a":"2018-12-17T10:11:12.123-01:00"}""", """{"a":1}""")
=== fromDDL("a string"))
assert(schemaOf("""{"a":"2018-12-17T10:11:12.123-01:00"}""", """{"a":"123"}""")
=== fromDDL("a string"))

assert(schemaOf("""{"a":"2018-12-17T10:11:12.123-01:00"}""", """{"a":null}""")
=== fromDDL("a timestamp"))
assert(schemaOf("""{"a":null}""", """{"a":"2018-12-17T10:11:12.123-01:00"}""")
=== fromDDL("a timestamp"))
}
}
}

test("roundtrip for timestamp type inferring") {
Seq(true, false).foreach { legacyParser =>
withSQLConf(SQLConf.LEGACY_TIME_PARSER_ENABLED.key -> legacyParser.toString) {
val customSchema = new StructType().add("date", TimestampType)
withTempDir { dir =>
val timestampsWithFormatPath = s"${dir.getCanonicalPath}/timestampsWithFormat.json"
val timestampsWithFormat = spark.read
.option("timestampFormat", "dd/MM/yyyy HH:mm")
.json(datesRecords)
assert(timestampsWithFormat.schema === customSchema)

timestampsWithFormat.write
.format("json")
.option("timestampFormat", "yyyy-MM-dd HH:mm:ss")
.option(DateTimeUtils.TIMEZONE_OPTION, "UTC")
.save(timestampsWithFormatPath)

val readBack = spark.read
.option("timestampFormat", "yyyy-MM-dd HH:mm:ss")
.option(DateTimeUtils.TIMEZONE_OPTION, "UTC")
.json(timestampsWithFormatPath)

assert(readBack.schema === customSchema)
checkAnswer(readBack, timestampsWithFormat)
}
}
}
}
}

0 comments on commit 82363b8

Please sign in to comment.