From 160dd82adeb07ef389afb67daadf8a70177bc5a1 Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Fri, 4 Mar 2016 11:51:07 +0900 Subject: [PATCH 1/8] Support for specifying custom date format for date and timestamp types. --- .../com/databricks/spark/csv/CsvParser.scala | 12 +- .../databricks/spark/csv/CsvRelation.scala | 17 ++- .../databricks/spark/csv/DefaultSource.scala | 7 +- .../com/databricks/spark/csv/package.scala | 16 ++- .../spark/csv/util/InferSchema.scala | 110 ++++++++++-------- .../databricks/spark/csv/util/TypeCast.scala | 11 +- src/test/resources/dates.csv | 4 + .../com/databricks/spark/csv/CsvSuite.scala | 43 ++++++- .../spark/csv/util/InferSchemaSuite.scala | 11 ++ .../spark/csv/util/TypeCastSuite.scala | 12 ++ 10 files changed, 178 insertions(+), 65 deletions(-) create mode 100644 src/test/resources/dates.csv diff --git a/src/main/scala/com/databricks/spark/csv/CsvParser.scala b/src/main/scala/com/databricks/spark/csv/CsvParser.scala index 370ab3c..ad04832 100644 --- a/src/main/scala/com/databricks/spark/csv/CsvParser.scala +++ b/src/main/scala/com/databricks/spark/csv/CsvParser.scala @@ -41,6 +41,7 @@ class CsvParser extends Serializable { private var inferSchema: Boolean = false private var codec: String = null private var nullValue: String = "" + private var dateFormat: String = null def withUseHeader(flag: Boolean): CsvParser = { this.useHeader = flag @@ -117,6 +118,11 @@ class CsvParser extends Serializable { this } + def withDateFormat(dateFormat: String): CsvParser = { + this.dateFormat = dateFormat + this + } + /** Returns a Schema RDD for the given CSV path. */ @throws[RuntimeException] def csvFile(sqlContext: SQLContext, path: String): DataFrame = { @@ -136,7 +142,8 @@ class CsvParser extends Serializable { schema, inferSchema, codec, - nullValue)(sqlContext) + nullValue, + dateFormat)(sqlContext) sqlContext.baseRelationToDataFrame(relation) } @@ -157,7 +164,8 @@ class CsvParser extends Serializable { schema, inferSchema, codec, - nullValue)(sqlContext) + nullValue, + dateFormat)(sqlContext) sqlContext.baseRelationToDataFrame(relation) } } diff --git a/src/main/scala/com/databricks/spark/csv/CsvRelation.scala b/src/main/scala/com/databricks/spark/csv/CsvRelation.scala index d6dce1c..6b7efca 100755 --- a/src/main/scala/com/databricks/spark/csv/CsvRelation.scala +++ b/src/main/scala/com/databricks/spark/csv/CsvRelation.scala @@ -16,6 +16,7 @@ package com.databricks.spark.csv import java.io.IOException +import java.text.SimpleDateFormat import scala.collection.JavaConversions._ import scala.util.control.NonFatal @@ -47,9 +48,13 @@ case class CsvRelation protected[spark] ( userSchema: StructType = null, inferCsvSchema: Boolean, codec: String = null, - nullValue: String = "")(@transient val sqlContext: SQLContext) + nullValue: String = "", + dateFormat: String = "")(@transient val sqlContext: SQLContext) extends BaseRelation with TableScan with PrunedScan with InsertableRelation { + // Share date format object as it is expensive to parse date pattern. + private val dateFormatter = if (dateFormat != null) new SimpleDateFormat(dateFormat) else null + private val logger = LoggerFactory.getLogger(CsvRelation.getClass) // Parse mode flags @@ -96,6 +101,7 @@ case class CsvRelation protected[spark] ( } override def buildScan: RDD[Row] = { + val simpleDateFormatter = dateFormatter val schemaFields = schema.fields tokenRdd(schemaFields.map(_.name)).flatMap { tokens => @@ -112,7 +118,7 @@ case class CsvRelation protected[spark] ( while (index < schemaFields.length) { val field = schemaFields(index) rowArray(index) = TypeCast.castTo(tokens(index), field.dataType, field.nullable, - treatEmptyValuesAsNulls, nullValue) + treatEmptyValuesAsNulls, nullValue, simpleDateFormatter) index = index + 1 } Some(Row.fromSeq(rowArray)) @@ -142,6 +148,7 @@ case class CsvRelation protected[spark] ( * both the indices produced by `requiredColumns` and the ones of tokens. */ override def buildScan(requiredColumns: Array[String]): RDD[Row] = { + val simpleDateFormatter = dateFormatter val schemaFields = schema.fields val requiredFields = StructType(requiredColumns.map(schema(_))).fields val shouldTableScan = schemaFields.deep == requiredFields.deep @@ -189,7 +196,8 @@ case class CsvRelation protected[spark] ( field.dataType, field.nullable, treatEmptyValuesAsNulls, - nullValue + nullValue, + simpleDateFormatter ) subIndex = subIndex + 1 } @@ -237,7 +245,8 @@ case class CsvRelation protected[spark] ( firstRow.zipWithIndex.map { case (value, index) => s"C$index"} } if (this.inferCsvSchema) { - InferSchema(tokenRdd(header), header, nullValue) + val simpleDateFormatter = dateFormatter + InferSchema(tokenRdd(header), header, nullValue, simpleDateFormatter) } else { // By default fields are assumed to be StringType val schemaFields = header.map { fieldName => diff --git a/src/main/scala/com/databricks/spark/csv/DefaultSource.scala b/src/main/scala/com/databricks/spark/csv/DefaultSource.scala index 10e377d..a89145b 100755 --- a/src/main/scala/com/databricks/spark/csv/DefaultSource.scala +++ b/src/main/scala/com/databricks/spark/csv/DefaultSource.scala @@ -128,6 +128,8 @@ class DefaultSource val charset = parameters.getOrElse("charset", TextFile.DEFAULT_CHARSET.name()) // TODO validate charset? + val dataFormat = parameters.getOrElse("charset", TextFile.DEFAULT_CHARSET.name()) + val inferSchema = parameters.getOrElse("inferSchema", "false") val inferSchemaFlag = if (inferSchema == "false") { false @@ -138,6 +140,8 @@ class DefaultSource } val nullValue = parameters.getOrElse("nullValue", "") + val dateFormat = parameters.getOrElse("dateFormat", null) + val codec = parameters.getOrElse("codec", null) CsvRelation( @@ -156,7 +160,8 @@ class DefaultSource schema, inferSchemaFlag, codec, - nullValue)(sqlContext) + nullValue, + dateFormat)(sqlContext) } override def createRelation( diff --git a/src/main/scala/com/databricks/spark/csv/package.scala b/src/main/scala/com/databricks/spark/csv/package.scala index 4a01f01..108c3dc 100755 --- a/src/main/scala/com/databricks/spark/csv/package.scala +++ b/src/main/scala/com/databricks/spark/csv/package.scala @@ -42,7 +42,9 @@ package object csv { ignoreLeadingWhiteSpace: Boolean = false, ignoreTrailingWhiteSpace: Boolean = false, charset: String = TextFile.DEFAULT_CHARSET.name(), - inferSchema: Boolean = false): DataFrame = { + inferSchema: Boolean = false, + nullValue: String = "", + dateFormat: String = null): DataFrame = { val csvRelation = CsvRelation( () => TextFile.withCharset(sqlContext.sparkContext, filePath, charset), location = Some(filePath), @@ -56,7 +58,9 @@ package object csv { ignoreLeadingWhiteSpace = ignoreLeadingWhiteSpace, ignoreTrailingWhiteSpace = ignoreTrailingWhiteSpace, treatEmptyValuesAsNulls = false, - inferCsvSchema = inferSchema)(sqlContext) + inferCsvSchema = inferSchema, + nullValue = nullValue, + dateFormat = dateFormat)(sqlContext) sqlContext.baseRelationToDataFrame(csvRelation) } @@ -67,7 +71,9 @@ package object csv { ignoreLeadingWhiteSpace: Boolean = false, ignoreTrailingWhiteSpace: Boolean = false, charset: String = TextFile.DEFAULT_CHARSET.name(), - inferSchema: Boolean = false): DataFrame = { + inferSchema: Boolean = false, + nullValue: String = "", + dateFormat: String = null): DataFrame = { val csvRelation = CsvRelation( () => TextFile.withCharset(sqlContext.sparkContext, filePath, charset), location = Some(filePath), @@ -81,7 +87,9 @@ package object csv { ignoreLeadingWhiteSpace = ignoreLeadingWhiteSpace, ignoreTrailingWhiteSpace = ignoreTrailingWhiteSpace, treatEmptyValuesAsNulls = false, - inferCsvSchema = inferSchema)(sqlContext) + inferCsvSchema = inferSchema, + nullValue = nullValue, + dateFormat = dateFormat)(sqlContext) sqlContext.baseRelationToDataFrame(csvRelation) } } diff --git a/src/main/scala/com/databricks/spark/csv/util/InferSchema.scala b/src/main/scala/com/databricks/spark/csv/util/InferSchema.scala index f1736a9..e6c1929 100644 --- a/src/main/scala/com/databricks/spark/csv/util/InferSchema.scala +++ b/src/main/scala/com/databricks/spark/csv/util/InferSchema.scala @@ -16,6 +16,7 @@ package com.databricks.spark.csv.util import java.sql.Timestamp +import java.text.SimpleDateFormat import scala.util.control.Exception._ @@ -34,11 +35,11 @@ private[csv] object InferSchema { def apply( tokenRdd: RDD[Array[String]], header: Array[String], - nullValue: String = ""): StructType = { - + nullValue: String = "", + dateFormatter: SimpleDateFormat = null): StructType = { val startType: Array[DataType] = Array.fill[DataType](header.length)(NullType) val rootTypes: Array[DataType] = tokenRdd.aggregate(startType)( - inferRowType(nullValue), + inferRowType(nullValue, dateFormatter), mergeRowTypes) val structFields = header.zip(rootTypes).map { case (thisHeader, rootType) => @@ -52,11 +53,11 @@ private[csv] object InferSchema { StructType(structFields) } - private def inferRowType(nullValue: String) + private def inferRowType(nullValue: String, dateFormatter: SimpleDateFormat) (rowSoFar: Array[DataType], next: Array[String]): Array[DataType] = { var i = 0 while (i < math.min(rowSoFar.length, next.length)) { // May have columns on right missing. - rowSoFar(i) = inferField(rowSoFar(i), next(i), nullValue) + rowSoFar(i) = inferField(rowSoFar(i), next(i), nullValue, dateFormatter) i+=1 } rowSoFar @@ -76,7 +77,61 @@ private[csv] object InferSchema { */ private[csv] def inferField(typeSoFar: DataType, field: String, - nullValue: String = ""): DataType = { + nullValue: String = "", + dateFormatter: SimpleDateFormat = null): DataType = { + def tryParseInteger(field: String): DataType = if ((allCatch opt field.toInt).isDefined) { + IntegerType + } else { + tryParseLong(field) + } + + def tryParseLong(field: String): DataType = if ((allCatch opt field.toLong).isDefined) { + LongType + } else { + tryParseDouble(field) + } + + def tryParseDouble(field: String): DataType = { + if ((allCatch opt field.toDouble).isDefined) { + DoubleType + } else { + tryParseTimestamp(field) + } + } + + def tryParseTimestamp(field: String): DataType = { + if (dateFormatter != null) { + // This case infers a custom `dataFormat` is set. + if ((allCatch opt dateFormatter.parse(field)).isDefined){ + TimestampType + } else { + tryParseBoolean(field) + } + } else { + // We keep this for backwords competibility. + if ((allCatch opt Timestamp.valueOf(field)).isDefined) { + TimestampType + } else { + tryParseBoolean(field) + } + } + } + + def tryParseBoolean(field: String): DataType = { + if ((allCatch opt field.toBoolean).isDefined) { + BooleanType + } else { + stringType() + } + } + + // Defining a function to return the StringType constant is necessary in order to work around + // a Scala compiler issue which leads to runtime incompatibilities with certain Spark versions; + // see issue #128 for more details. + def stringType(): DataType = { + StringType + } + if (field == null || field.isEmpty || field == nullValue) { typeSoFar } else { @@ -94,49 +149,6 @@ private[csv] object InferSchema { } } - private def tryParseInteger(field: String): DataType = if ((allCatch opt field.toInt).isDefined) { - IntegerType - } else { - tryParseLong(field) - } - - private def tryParseLong(field: String): DataType = if ((allCatch opt field.toLong).isDefined) { - LongType - } else { - tryParseDouble(field) - } - - private def tryParseDouble(field: String): DataType = { - if ((allCatch opt field.toDouble).isDefined) { - DoubleType - } else { - tryParseTimestamp(field) - } - } - - def tryParseTimestamp(field: String): DataType = { - if ((allCatch opt Timestamp.valueOf(field)).isDefined) { - TimestampType - } else { - tryParseBoolean(field) - } - } - - def tryParseBoolean(field: String): DataType = { - if ((allCatch opt field.toBoolean).isDefined) { - BooleanType - } else { - stringType() - } - } - - // Defining a function to return the StringType constant is necessary in order to work around - // a Scala compiler issue which leads to runtime incompatibilities with certain Spark versions; - // see issue #128 for more details. - private def stringType(): DataType = { - StringType - } - /** * Copied from internal Spark api * [[org.apache.spark.sql.catalyst.analysis.HiveTypeCoercion]] diff --git a/src/main/scala/com/databricks/spark/csv/util/TypeCast.scala b/src/main/scala/com/databricks/spark/csv/util/TypeCast.scala index edecf97..11d5a07 100644 --- a/src/main/scala/com/databricks/spark/csv/util/TypeCast.scala +++ b/src/main/scala/com/databricks/spark/csv/util/TypeCast.scala @@ -17,7 +17,7 @@ package com.databricks.spark.csv.util import java.math.BigDecimal import java.sql.{Date, Timestamp} -import java.text.NumberFormat +import java.text.{SimpleDateFormat, NumberFormat} import java.util.Locale import org.apache.spark.sql.types._ @@ -44,7 +44,8 @@ object TypeCast { castType: DataType, nullable: Boolean = true, treatEmptyValuesAsNulls: Boolean = false, - nullValue: String = ""): Any = { + nullValue: String = "", + dateFormatter: SimpleDateFormat = null): Any = { // if nullValue is not an empty string, don't require treatEmptyValuesAsNulls // to be set to true val nullValueIsNotEmpty = nullValue != "" @@ -65,9 +66,11 @@ object TypeCast { .getOrElse(NumberFormat.getInstance(Locale.getDefault).parse(datum).doubleValue()) case _: BooleanType => datum.toBoolean case _: DecimalType => new BigDecimal(datum.replaceAll(",", "")) - // TODO(hossein): would be good to support other common timestamp formats + case _: TimestampType if dateFormatter != null => + new Timestamp(dateFormatter.parse(datum).getTime) case _: TimestampType => Timestamp.valueOf(datum) - // TODO(hossein): would be good to support other common date formats + case _: DateType if dateFormatter != null => + new Date(dateFormatter.parse(datum).getTime) case _: DateType => Date.valueOf(datum) case _: StringType => datum case _ => throw new RuntimeException(s"Unsupported type: ${castType.typeName}") diff --git a/src/test/resources/dates.csv b/src/test/resources/dates.csv new file mode 100644 index 0000000..e7683b6 --- /dev/null +++ b/src/test/resources/dates.csv @@ -0,0 +1,4 @@ +date +26/08/2015 18:00 +27/10/2014 18:30 +28/01/2016 20:00 \ No newline at end of file diff --git a/src/test/scala/com/databricks/spark/csv/CsvSuite.scala b/src/test/scala/com/databricks/spark/csv/CsvSuite.scala index 4e8ba04..a5f5eb5 100755 --- a/src/test/scala/com/databricks/spark/csv/CsvSuite.scala +++ b/src/test/scala/com/databricks/spark/csv/CsvSuite.scala @@ -17,7 +17,8 @@ package com.databricks.spark.csv import java.io.File import java.nio.charset.UnsupportedCharsetException -import java.sql.Timestamp +import java.sql.{Date, Timestamp} +import java.text.SimpleDateFormat import scala.io.Source import com.databricks.spark.csv.util.ParseModes @@ -44,6 +45,7 @@ abstract class AbstractCsvSuite extends FunSuite with BeforeAndAfterAll { val commentsFile = "src/test/resources/comments.csv" val disableCommentsFile = "src/test/resources/disable_comments.csv" val boolFile = "src/test/resources/bool.csv" + val datesFile = "src/test/resources/dates.csv" private val simpleDatasetFile = "src/test/resources/simple.csv" val numCars = 3 @@ -772,6 +774,45 @@ abstract class AbstractCsvSuite extends FunSuite with BeforeAndAfterAll { assert(results.toSeq.map(_.toSeq) === expected) } + test("Inferring timestamp types via custom date format") { + val results = sqlContext + .csvFile(datesFile, parserLib = parserLib, inferSchema = true, dateFormat = "dd/MM/yyyy hh:mm") + .select("date") + .collect() + + val dateFormatter = new SimpleDateFormat("dd/MM/yyyy hh:mm") + val expected = + Seq(Seq(new Timestamp(dateFormatter.parse("26/08/2015 18:00").getTime)), + Seq(new Timestamp(dateFormatter.parse("27/10/2014 18:30").getTime)), + Seq(new Timestamp(dateFormatter.parse("28/01/2016 20:00").getTime))) + assert(results.toSeq.map(_.toSeq) === expected) + } + + test("Load date types via custom date format") { + val customSchema = new StructType(Array(StructField("date", DateType, true))) + val results = new CsvParser() + .withSchema(customSchema) + .withUseHeader(true) + .withParserLib(parserLib) + .withDateFormat("dd/MM/yyyy hh:mm") + .csvFile(sqlContext, datesFile) + .select("date") + .collect() + + val dateFormatter = new SimpleDateFormat("dd/MM/yyyy hh:mm") + val expected = Seq( + new Date(dateFormatter.parse("26/08/2015 18:00").getTime), + new Date(dateFormatter.parse("27/10/2014 18:30").getTime), + new Date(dateFormatter.parse("28/01/2016 20:00").getTime)) + val dates = results.toSeq.map(_.toSeq.head) + expected.zip(dates).foreach { + case (expectedDate, date) => + // As it truncates the hours, minutes and etc., we only check + // if the dates (days, months and years) are the same via `toString()`. + assert(expectedDate.toString === date.toString) + } + } + test("Setting comment to null disables comment support") { val results: Array[Row] = new CsvParser() .withDelimiter(',') diff --git a/src/test/scala/com/databricks/spark/csv/util/InferSchemaSuite.scala b/src/test/scala/com/databricks/spark/csv/util/InferSchemaSuite.scala index 7915c73..9d05195 100644 --- a/src/test/scala/com/databricks/spark/csv/util/InferSchemaSuite.scala +++ b/src/test/scala/com/databricks/spark/csv/util/InferSchemaSuite.scala @@ -15,6 +15,8 @@ */ package com.databricks.spark.csv.util +import java.text.SimpleDateFormat + import org.apache.spark.sql.types._ import org.scalatest.FunSuite @@ -55,6 +57,15 @@ class InferSchemaSuite extends FunSuite { assert(InferSchema.inferField(TimestampType, "FALSE") == BooleanType) } + test("Timestamp field types are inferred correctly via custom data format"){ + val formatter = new SimpleDateFormat("yyyy-mm") + assert( + InferSchema.inferField(TimestampType, "2015-08", dateFormatter = formatter) == TimestampType) + formatter.applyPattern("yyyy") + assert( + InferSchema.inferField(TimestampType, "2015", dateFormatter = formatter) == TimestampType) + } + test("Timestamp field types are inferred correctly from other types") { assert(InferSchema.inferField(IntegerType, "2015-08-20 14") == StringType) assert(InferSchema.inferField(DoubleType, "2015-08-20 14:10") == StringType) diff --git a/src/test/scala/com/databricks/spark/csv/util/TypeCastSuite.scala b/src/test/scala/com/databricks/spark/csv/util/TypeCastSuite.scala index b8e6e71..448debf 100644 --- a/src/test/scala/com/databricks/spark/csv/util/TypeCastSuite.scala +++ b/src/test/scala/com/databricks/spark/csv/util/TypeCastSuite.scala @@ -17,6 +17,7 @@ package com.databricks.spark.csv.util import java.math.BigDecimal import java.sql.{Date, Timestamp} +import java.text.SimpleDateFormat import java.util.Locale import org.scalatest.FunSuite @@ -86,6 +87,17 @@ class TypeCastSuite extends FunSuite { val timestamp = "2015-01-01 00:00:00" assert(TypeCast.castTo(timestamp, TimestampType) == Timestamp.valueOf(timestamp)) assert(TypeCast.castTo("2015-01-01", DateType) == Date.valueOf("2015-01-01")) + + val dateFormatter = new SimpleDateFormat("dd/MM/yyyy hh:mm") + val customTimestamp = "31/01/2015 00:00" + // `SimpleDateFormat.parse` returns `java.util.Date`. This needs to be converted + // to `java.sql.Date` + val expectedDate = new Date(dateFormatter.parse("31/01/2015 00:00").getTime) + val expectedTimestamp = new Timestamp(expectedDate.getTime) + assert(TypeCast.castTo(customTimestamp, TimestampType, dateFormatter = dateFormatter) + == expectedTimestamp) + assert(TypeCast.castTo(customTimestamp, DateType, dateFormatter = dateFormatter) == + expectedDate) } test("Float and Double Types are cast correctly with Locale") { From 920c0053cab70c91cb2372a88b5d6c59faa18137 Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Fri, 4 Mar 2016 11:51:45 +0900 Subject: [PATCH 2/8] Update readme.md --- README.md | 1 + 1 file changed, 1 insertion(+) diff --git a/README.md b/README.md index a4e36e1..3ced1ce 100755 --- a/README.md +++ b/README.md @@ -56,6 +56,7 @@ When reading files the API accepts several options: * `inferSchema`: automatically infers column types. It requires one extra pass over the data and is false by default * `comment`: skip lines beginning with this character. Default is `"#"`. Disable comments by setting this to `null`. * `nullValue`: specificy a string that indicates a null value, any fields matching this string will be set as nulls in the DataFrame +* `dateFormat`: specificy a string that indicates a date format. Custom date formats follow the formats at [`java.text.SimpleDateFormat`](https://docs.oracle.com/javase/7/docs/api/java/text/SimpleDateFormat.html). This applies to both `DateType` and `TimestampType`. By default, it is `null` which means trying to parse times and date by `java.sql.Timestamp.valueOf()` and `java.sql.Date.valueOf()`. The package also supports saving simple (non-nested) DataFrame. When writing files the API accepts several options: * `path`: location of files. From 54ff60425a779461fe791f15f3666d2bd6a13601 Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Fri, 4 Mar 2016 11:55:18 +0900 Subject: [PATCH 3/8] Add a newline at the end of test file and change default value --- src/main/scala/com/databricks/spark/csv/CsvRelation.scala | 2 +- src/test/resources/dates.csv | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/main/scala/com/databricks/spark/csv/CsvRelation.scala b/src/main/scala/com/databricks/spark/csv/CsvRelation.scala index 6b7efca..3e36931 100755 --- a/src/main/scala/com/databricks/spark/csv/CsvRelation.scala +++ b/src/main/scala/com/databricks/spark/csv/CsvRelation.scala @@ -49,7 +49,7 @@ case class CsvRelation protected[spark] ( inferCsvSchema: Boolean, codec: String = null, nullValue: String = "", - dateFormat: String = "")(@transient val sqlContext: SQLContext) + dateFormat: String = null)(@transient val sqlContext: SQLContext) extends BaseRelation with TableScan with PrunedScan with InsertableRelation { // Share date format object as it is expensive to parse date pattern. diff --git a/src/test/resources/dates.csv b/src/test/resources/dates.csv index e7683b6..9ee99c3 100644 --- a/src/test/resources/dates.csv +++ b/src/test/resources/dates.csv @@ -1,4 +1,4 @@ date 26/08/2015 18:00 27/10/2014 18:30 -28/01/2016 20:00 \ No newline at end of file +28/01/2016 20:00 From cb58babdf6886bf5f9951d8db8e0f3166a725199 Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Fri, 4 Mar 2016 12:18:04 +0900 Subject: [PATCH 4/8] Remove some arguments added before for binary compatibility. --- src/main/scala/com/databricks/spark/csv/package.scala | 8 ++------ src/test/scala/com/databricks/spark/csv/CsvSuite.scala | 8 ++++++-- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/src/main/scala/com/databricks/spark/csv/package.scala b/src/main/scala/com/databricks/spark/csv/package.scala index 108c3dc..b441dec 100755 --- a/src/main/scala/com/databricks/spark/csv/package.scala +++ b/src/main/scala/com/databricks/spark/csv/package.scala @@ -42,9 +42,7 @@ package object csv { ignoreLeadingWhiteSpace: Boolean = false, ignoreTrailingWhiteSpace: Boolean = false, charset: String = TextFile.DEFAULT_CHARSET.name(), - inferSchema: Boolean = false, - nullValue: String = "", - dateFormat: String = null): DataFrame = { + inferSchema: Boolean = false): DataFrame = { val csvRelation = CsvRelation( () => TextFile.withCharset(sqlContext.sparkContext, filePath, charset), location = Some(filePath), @@ -58,9 +56,7 @@ package object csv { ignoreLeadingWhiteSpace = ignoreLeadingWhiteSpace, ignoreTrailingWhiteSpace = ignoreTrailingWhiteSpace, treatEmptyValuesAsNulls = false, - inferCsvSchema = inferSchema, - nullValue = nullValue, - dateFormat = dateFormat)(sqlContext) + inferCsvSchema = inferSchema)(sqlContext) sqlContext.baseRelationToDataFrame(csvRelation) } diff --git a/src/test/scala/com/databricks/spark/csv/CsvSuite.scala b/src/test/scala/com/databricks/spark/csv/CsvSuite.scala index a5f5eb5..3d55c84 100755 --- a/src/test/scala/com/databricks/spark/csv/CsvSuite.scala +++ b/src/test/scala/com/databricks/spark/csv/CsvSuite.scala @@ -775,8 +775,12 @@ abstract class AbstractCsvSuite extends FunSuite with BeforeAndAfterAll { } test("Inferring timestamp types via custom date format") { - val results = sqlContext - .csvFile(datesFile, parserLib = parserLib, inferSchema = true, dateFormat = "dd/MM/yyyy hh:mm") + val results = new CsvParser() + .withUseHeader(true) + .withParserLib(parserLib) + .withDateFormat("dd/MM/yyyy hh:mm") + .withInferSchema(true) + .csvFile(sqlContext, datesFile) .select("date") .collect() From 20bd77cf81531882cf704e83dcc28421f36fbc8e Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Fri, 4 Mar 2016 12:27:03 +0900 Subject: [PATCH 5/8] Remove arguments added --- src/main/scala/com/databricks/spark/csv/package.scala | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/src/main/scala/com/databricks/spark/csv/package.scala b/src/main/scala/com/databricks/spark/csv/package.scala index b441dec..ef081c5 100755 --- a/src/main/scala/com/databricks/spark/csv/package.scala +++ b/src/main/scala/com/databricks/spark/csv/package.scala @@ -68,8 +68,7 @@ package object csv { ignoreTrailingWhiteSpace: Boolean = false, charset: String = TextFile.DEFAULT_CHARSET.name(), inferSchema: Boolean = false, - nullValue: String = "", - dateFormat: String = null): DataFrame = { + nullValue: String = ""): DataFrame = { val csvRelation = CsvRelation( () => TextFile.withCharset(sqlContext.sparkContext, filePath, charset), location = Some(filePath), @@ -84,8 +83,7 @@ package object csv { ignoreTrailingWhiteSpace = ignoreTrailingWhiteSpace, treatEmptyValuesAsNulls = false, inferCsvSchema = inferSchema, - nullValue = nullValue, - dateFormat = dateFormat)(sqlContext) + nullValue = nullValue)(sqlContext) sqlContext.baseRelationToDataFrame(csvRelation) } } From 6ff750c5242ec0852391032852463eaecef76adc Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Fri, 4 Mar 2016 12:28:07 +0900 Subject: [PATCH 6/8] Remove arguments added --- src/main/scala/com/databricks/spark/csv/package.scala | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/src/main/scala/com/databricks/spark/csv/package.scala b/src/main/scala/com/databricks/spark/csv/package.scala index ef081c5..4a01f01 100755 --- a/src/main/scala/com/databricks/spark/csv/package.scala +++ b/src/main/scala/com/databricks/spark/csv/package.scala @@ -67,8 +67,7 @@ package object csv { ignoreLeadingWhiteSpace: Boolean = false, ignoreTrailingWhiteSpace: Boolean = false, charset: String = TextFile.DEFAULT_CHARSET.name(), - inferSchema: Boolean = false, - nullValue: String = ""): DataFrame = { + inferSchema: Boolean = false): DataFrame = { val csvRelation = CsvRelation( () => TextFile.withCharset(sqlContext.sparkContext, filePath, charset), location = Some(filePath), @@ -82,8 +81,7 @@ package object csv { ignoreLeadingWhiteSpace = ignoreLeadingWhiteSpace, ignoreTrailingWhiteSpace = ignoreTrailingWhiteSpace, treatEmptyValuesAsNulls = false, - inferCsvSchema = inferSchema, - nullValue = nullValue)(sqlContext) + inferCsvSchema = inferSchema)(sqlContext) sqlContext.baseRelationToDataFrame(csvRelation) } } From afaf371902ae96755bf0b7de8e81532fdda1bc47 Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Fri, 4 Mar 2016 15:04:55 +0900 Subject: [PATCH 7/8] Remove unused variable --- src/main/scala/com/databricks/spark/csv/DefaultSource.scala | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/main/scala/com/databricks/spark/csv/DefaultSource.scala b/src/main/scala/com/databricks/spark/csv/DefaultSource.scala index a89145b..220c380 100755 --- a/src/main/scala/com/databricks/spark/csv/DefaultSource.scala +++ b/src/main/scala/com/databricks/spark/csv/DefaultSource.scala @@ -128,8 +128,6 @@ class DefaultSource val charset = parameters.getOrElse("charset", TextFile.DEFAULT_CHARSET.name()) // TODO validate charset? - val dataFormat = parameters.getOrElse("charset", TextFile.DEFAULT_CHARSET.name()) - val inferSchema = parameters.getOrElse("inferSchema", "false") val inferSchemaFlag = if (inferSchema == "false") { false From e61cdf7a88738ff4054de231b2b92d07d71b2f8e Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Fri, 4 Mar 2016 15:28:11 +0900 Subject: [PATCH 8/8] Indentation corrections --- .../databricks/spark/csv/util/InferSchema.scala | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/src/main/scala/com/databricks/spark/csv/util/InferSchema.scala b/src/main/scala/com/databricks/spark/csv/util/InferSchema.scala index e6c1929..9a52ab0 100644 --- a/src/main/scala/com/databricks/spark/csv/util/InferSchema.scala +++ b/src/main/scala/com/databricks/spark/csv/util/InferSchema.scala @@ -33,10 +33,10 @@ private[csv] object InferSchema { * 3. Replace any null types with string type */ def apply( - tokenRdd: RDD[Array[String]], - header: Array[String], - nullValue: String = "", - dateFormatter: SimpleDateFormat = null): StructType = { + tokenRdd: RDD[Array[String]], + header: Array[String], + nullValue: String = "", + dateFormatter: SimpleDateFormat = null): StructType = { val startType: Array[DataType] = Array.fill[DataType](header.length)(NullType) val rootTypes: Array[DataType] = tokenRdd.aggregate(startType)( inferRowType(nullValue, dateFormatter), @@ -76,9 +76,9 @@ private[csv] object InferSchema { * point checking if it is an Int, as the final type must be Double or higher. */ private[csv] def inferField(typeSoFar: DataType, - field: String, - nullValue: String = "", - dateFormatter: SimpleDateFormat = null): DataType = { + field: String, + nullValue: String = "", + dateFormatter: SimpleDateFormat = null): DataType = { def tryParseInteger(field: String): DataType = if ((allCatch opt field.toInt).isDefined) { IntegerType } else {