Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-26248][SQL] Infer date type from CSV #23202

Closed
wants to merge 9 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,20 @@ import scala.util.control.Exception.allCatch
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.analysis.TypeCoercion
import org.apache.spark.sql.catalyst.expressions.ExprUtils
import org.apache.spark.sql.catalyst.util.TimestampFormatter
import org.apache.spark.sql.catalyst.util.{DateFormatter, TimestampFormatter}
import org.apache.spark.sql.types._

class CSVInferSchema(val options: CSVOptions) extends Serializable {

@transient
private lazy val timestampParser = TimestampFormatter(
private lazy val timestampFormatter = TimestampFormatter(
options.timestampFormat,
options.timeZone,
options.locale)
@transient
private lazy val dateFormatter = DateFormatter(
options.dateFormat,
options.locale)

private val decimalParser = {
ExprUtils.getDecimalParser(options.locale)
Expand Down Expand Up @@ -104,6 +108,7 @@ class CSVInferSchema(val options: CSVOptions) extends Serializable {
compatibleType(typeSoFar, tryParseDecimal(field)).getOrElse(StringType)
case DoubleType => tryParseDouble(field)
case TimestampType => tryParseTimestamp(field)
case DateType => tryParseDate(field)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem here is it looks a bit odd that we try date type later. IIRC the root cause is related with date paring library. Couldn't we try date first if we switch the parsing library? I thought that's in progress.

Copy link
Member

@HyukjinKwon HyukjinKwon Dec 3, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean, IIRC, if the pattern is, for instance, yyyy-MM-dd, 2010-10-10 and also 2018-12-02T21:04:00.123567 are parsed as dates because the current parsing library checks if the string is matched and ignore the rest of them.

So, if we try date first, it will works for its default patterns but if we do some weird patterns, it wouldn't work again.

I was thinking we can fix it if we use DateTimeFormatter, which does an exact match IIRC.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just in case, I did exact match here as well, see https://github.com/apache/spark/pull/23202/files#diff-17719da188b2c15129f848f654a0e6feR174 . If date parser didn't consume all input (pos.getIndex != field.length), it fails. If I move it up in type inferring pipeline, it should work.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. Can we try date first above? was wondering if there was a reason to try date first.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. Please, have a look at the changes.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another problem here is the order here matters when type is being merged. For instance, date type is inferred first, and then if timestamp is found, it won't detect timestamp type anymore.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIRC we decided to follow the order in partition inference and infer timestamp first?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here's what I mean:

Seq("2010|10|10", "2010_10_10")
  .toDF.repartition(1).write.mode("overwrite").text("/tmp/foo")
spark.read
  .option("inferSchema", "true")
  .option("header", "false")
  .option("dateFormat", "yyyy|MM|dd")
  .option("timestampFormat", "yyyy_MM_dd").csv("/tmp/foo").printSchema()
root
 |-- _c0: string (nullable = true)
Seq("2010_10_10", "2010|10|10")
  .toDF.repartition(1).write.mode("overwrite").text("/tmp/foo")
spark.read
  .option("inferSchema", "true")
  .option("header", "false")
  .option("dateFormat", "yyyy|MM|dd")
  .option("timestampFormat", "yyyy_MM_dd").csv("/tmp/foo").printSchema()
root
 |-- _c0: date (nullable = true)
Seq("2010_10_10")
  .toDF.repartition(1).write.mode("overwrite").text("/tmp/foo")
spark.read
  .option("inferSchema", "true")
  .option("header", "false")
  .option("timestampFormat", "yyyy_MM_dd").csv("/tmp/foo").printSchema()
root
 |-- _c0: timestamp (nullable = true)
Seq("2010|10|10")
  .toDF.repartition(1).write.mode("overwrite").text("/tmp/foo")
spark.read
  .option("inferSchema", "true")
  .option("header", "false")
  .option("dateFormat", "yyyy|MM|dd").csv("/tmp/foo").printSchema()
root
 |-- _c0: date (nullable = true)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah i see your point. So the order here is not only for how we infer the type for a single token, but also how we merge types.

This is super weird, as the order has different meaning according to the context:

  1. for single token, the case appears first has higher priority. Here timestamp is prefered over date
  2. for type merge, the case appears last has higher priority. Once a type is inferred as date, we can't go back to timestamp anymore.

If the specified format of data and timestamp is not compatible, timestamp and date type should be incompatible and we should fallback to string.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because of this, I'm +1 for reverting. We should think of a better way to do it. Sorry for not realizing the tricky stuff here.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's okay .. sorry for rushing comments. I realised my comments are hard to read now.

case BooleanType => tryParseBoolean(field)
case StringType => StringType
case other: DataType =>
Expand Down Expand Up @@ -159,9 +164,16 @@ class CSVInferSchema(val options: CSVOptions) extends Serializable {
}

private def tryParseTimestamp(field: String): DataType = {
// This case infers a custom `dataFormat` is set.
if ((allCatch opt timestampParser.parse(field)).isDefined) {
if ((allCatch opt timestampFormatter.parse(field)).isDefined) {
TimestampType
} else {
tryParseDate(field)
}
}

private def tryParseDate(field: String): DataType = {
if ((allCatch opt dateFormatter.parse(field)).isDefined) {
DateType
} else {
tryParseBoolean(field)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,4 +187,22 @@ class CSVInferSchemaSuite extends SparkFunSuite with SQLHelper {

Seq("en-US", "ko-KR", "ru-RU", "de-DE").foreach(checkDecimalInfer(_, DecimalType(7, 0)))
}

test("inferring date type") {
var options = new CSVOptions(Map("dateFormat" -> "yyyy/MM/dd"), false, "GMT")
var inferSchema = new CSVInferSchema(options)
assert(inferSchema.inferField(NullType, "2018/12/02") == DateType)

options = new CSVOptions(Map("dateFormat" -> "MMM yyyy"), false, "GMT")
inferSchema = new CSVInferSchema(options)
assert(inferSchema.inferField(NullType, "Dec 2018") == DateType)

options = new CSVOptions(
Map("dateFormat" -> "yyyy-MM-dd", "timestampFormat" -> "yyyy-MM-dd'T'HH:mm:ss"),
columnPruning = false,
defaultTimeZoneId = "GMT")
inferSchema = new CSVInferSchema(options)
assert(inferSchema.inferField(NullType, "2018-12-03T11:00:00") == TimestampType)
assert(inferSchema.inferField(NullType, "2018-12-03") == DateType)
}
}