Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-26246][SQL] Inferring TimestampType from JSON #23201

Closed
wants to merge 18 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.analysis.TypeCoercion
import org.apache.spark.sql.catalyst.expressions.ExprUtils
import org.apache.spark.sql.catalyst.json.JacksonUtils.nextUntil
import org.apache.spark.sql.catalyst.util.{DropMalformedMode, FailFastMode, ParseMode, PermissiveMode}
import org.apache.spark.sql.catalyst.util._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
import org.apache.spark.util.Utils
Expand All @@ -37,6 +37,12 @@ private[sql] class JsonInferSchema(options: JSONOptions) extends Serializable {

private val decimalParser = ExprUtils.getDecimalParser(options.locale)

@transient
private lazy val timestampFormatter = TimestampFormatter(
options.timestampFormat,
options.timeZone,
options.locale)

/**
* Infer the type of a collection of json records in three stages:
* 1. Infer the type of each record
Expand Down Expand Up @@ -115,13 +121,19 @@ private[sql] class JsonInferSchema(options: JSONOptions) extends Serializable {
// record fields' types have been combined.
NullType

case VALUE_STRING if options.prefersDecimal =>
case VALUE_STRING =>
val field = parser.getText
val decimalTry = allCatch opt {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, I think this was a mistake. Previously if prefersDecimal was false (by default), it won't try decimal casting. Now looks we're trying decimal try always. @bersprockets, can you open a PR to fix it? I think we can just make it lazy.

Copy link
Member Author

@MaxGekk MaxGekk Jan 25, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We shouldn't. The opt method calls body by name: def opt[U >: T](body: => U): Option[U] It should not try infer if options.prefersDecimal is false.

... or prefersDecimal became true by default?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@HyukjinKwon What's the problem here. Could you give more context (JIRA, PR), please.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here, SPARK-26711. You're cc'ed there as well :).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem here is decimal conversion looks always being tried even when prefersDecimal is false. Previously, it checked prefersDecimal first so decimal try wasn't made. This looks causing performance regression.

I mean

scala> if (prefersDecimal) allCatch opt { println("im expensive"); true }
res0: Any = ()

scala>  allCatch opt { println("im expensive"); true }
im expensive
res1: Option[Boolean] = Some(true)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and making it lazy will save us by short circuiting. I wanted to open a PR right away but wanted to let him open since this is what I investigated at SPARK-26711.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

re: #23201 (comment)
That's being called by name within opt I believe.

val bigDecimal = decimalParser(parser.getText)
val bigDecimal = decimalParser(field)
DecimalType(bigDecimal.precision, bigDecimal.scale)
}
decimalTry.getOrElse(StringType)
case VALUE_STRING => StringType
if (options.prefersDecimal && decimalTry.isDefined) {
decimalTry.get
} else if ((allCatch opt timestampFormatter.parse(field)).isDefined) {
TimestampType
} else {
StringType
}

case START_OBJECT =>
val builder = Array.newBuilder[StructField]
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.catalyst.json

import com.fasterxml.jackson.core.JsonFactory

import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.catalyst.plans.SQLHelper
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._

class JsonInferSchemaSuite extends SparkFunSuite with SQLHelper {

def checkType(options: Map[String, String], json: String, dt: DataType): Unit = {
val jsonOptions = new JSONOptions(options, "UTC", "")
val inferSchema = new JsonInferSchema(jsonOptions)
val factory = new JsonFactory()
jsonOptions.setJacksonOptions(factory)
val parser = CreateJacksonParser.string(factory, json)
parser.nextToken()
val expectedType = StructType(Seq(StructField("a", dt, true)))

assert(inferSchema.inferField(parser) === expectedType)
}

def checkTimestampType(pattern: String, json: String): Unit = {
checkType(Map("timestampFormat" -> pattern), json, TimestampType)
}

test("inferring timestamp type") {
Seq(true, false).foreach { legacyParser =>
withSQLConf(SQLConf.LEGACY_TIME_PARSER_ENABLED.key -> legacyParser.toString) {
checkTimestampType("yyyy", """{"a": "2018"}""")
checkTimestampType("yyyy=MM", """{"a": "2018=12"}""")
checkTimestampType("yyyy MM dd", """{"a": "2018 12 02"}""")
checkTimestampType(
"yyyy-MM-dd'T'HH:mm:ss.SSS",
"""{"a": "2018-12-02T21:04:00.123"}""")
checkTimestampType(
"yyyy-MM-dd'T'HH:mm:ss.SSSSSSXXX",
"""{"a": "2018-12-02T21:04:00.123567+01:00"}""")
}
}
}

test("prefer decimals over timestamps") {
Seq(true, false).foreach { legacyParser =>
withSQLConf(SQLConf.LEGACY_TIME_PARSER_ENABLED.key -> legacyParser.toString) {
checkType(
options = Map(
"prefersDecimal" -> "true",
"timestampFormat" -> "yyyyMMdd.HHmmssSSS"
),
json = """{"a": "20181202.210400123"}""",
dt = DecimalType(17, 9)
)
}
}
}

test("skip decimal type inferring") {
Seq(true, false).foreach { legacyParser =>
withSQLConf(SQLConf.LEGACY_TIME_PARSER_ENABLED.key -> legacyParser.toString) {
checkType(
options = Map(
"prefersDecimal" -> "false",
"timestampFormat" -> "yyyyMMdd.HHmmssSSS"
),
json = """{"a": "20181202.210400123"}""",
dt = TimestampType
)
}
}
}

test("fallback to string type") {
Seq(true, false).foreach { legacyParser =>
withSQLConf(SQLConf.LEGACY_TIME_PARSER_ENABLED.key -> legacyParser.toString) {
checkType(
options = Map("timestampFormat" -> "yyyy,MM,dd.HHmmssSSS"),
json = """{"a": "20181202.210400123"}""",
dt = StringType
)
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import org.apache.spark.sql.execution.datasources.DataSource
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.sql.types._
import org.apache.spark.sql.types.StructType.fromDDL
import org.apache.spark.util.Utils

class TestFileFilter extends PathFilter {
Expand Down Expand Up @@ -2589,4 +2590,55 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
Row(null, Array(0, 1, 2), "abc", """{"a":"-","b":[0, 1, 2],"c":"abc"}""") ::
Row(0.1, null, "def", """{"a":0.1,"b":{},"c":"def"}""") :: Nil)
}

test("inferring timestamp type") {
Seq(true, false).foreach { legacyParser =>
withSQLConf(SQLConf.LEGACY_TIME_PARSER_ENABLED.key -> legacyParser.toString) {
def schemaOf(jsons: String*): StructType = spark.read.json(jsons.toDS).schema

assert(schemaOf(
"""{"a":"2018-12-17T10:11:12.123-01:00"}""",
"""{"a":"2018-12-16T22:23:24.123-02:00"}""") === fromDDL("a timestamp"))

assert(schemaOf("""{"a":"2018-12-17T10:11:12.123-01:00"}""", """{"a":1}""")
=== fromDDL("a string"))
assert(schemaOf("""{"a":"2018-12-17T10:11:12.123-01:00"}""", """{"a":"123"}""")
=== fromDDL("a string"))

assert(schemaOf("""{"a":"2018-12-17T10:11:12.123-01:00"}""", """{"a":null}""")
=== fromDDL("a timestamp"))
assert(schemaOf("""{"a":null}""", """{"a":"2018-12-17T10:11:12.123-01:00"}""")
=== fromDDL("a timestamp"))
}
}
}

test("roundtrip for timestamp type inferring") {
Seq(true, false).foreach { legacyParser =>
withSQLConf(SQLConf.LEGACY_TIME_PARSER_ENABLED.key -> legacyParser.toString) {
val customSchema = new StructType().add("date", TimestampType)
withTempDir { dir =>
val timestampsWithFormatPath = s"${dir.getCanonicalPath}/timestampsWithFormat.json"
val timestampsWithFormat = spark.read
.option("timestampFormat", "dd/MM/yyyy HH:mm")
.json(datesRecords)
assert(timestampsWithFormat.schema === customSchema)

timestampsWithFormat.write
.format("json")
.option("timestampFormat", "yyyy-MM-dd HH:mm:ss")
.option(DateTimeUtils.TIMEZONE_OPTION, "UTC")
.save(timestampsWithFormatPath)

val readBack = spark.read
.option("timestampFormat", "yyyy-MM-dd HH:mm:ss")
.option(DateTimeUtils.TIMEZONE_OPTION, "UTC")
.json(timestampsWithFormatPath)

assert(readBack.schema === customSchema)
checkAnswer(readBack, timestampsWithFormat)
}
}
}
}
}