Skip to content

Commit

Permalink
parquet support for date type
Browse files Browse the repository at this point in the history
  • Loading branch information
adrian-wang committed Mar 22, 2015
1 parent b9fe504 commit 97e9080
Show file tree
Hide file tree
Showing 4 changed files with 14 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.sql.parquet

import java.sql.Timestamp
import java.sql.{Date, Timestamp}
import java.util.{TimeZone, Calendar}

import scala.collection.mutable.{Buffer, ArrayBuffer, HashMap}
Expand Down Expand Up @@ -192,6 +192,9 @@ private[parquet] abstract class CatalystConverter extends GroupConverter {
protected[parquet] def updateInt(fieldIndex: Int, value: Int): Unit =
updateField(fieldIndex, value)

protected[parquet] def updateDate(fieldIndex: Int, value: Int): Unit =
updateField(fieldIndex, new Date(value))

protected[parquet] def updateLong(fieldIndex: Int, value: Long): Unit =
updateField(fieldIndex, value)

Expand Down Expand Up @@ -388,6 +391,9 @@ private[parquet] class CatalystPrimitiveRowConverter(
override protected[parquet] def updateInt(fieldIndex: Int, value: Int): Unit =
current.setInt(fieldIndex, value)

override protected[parquet] def updateDate(fieldIndex: Int, value: Int): Unit =
current.update(fieldIndex, new Date(value))

override protected[parquet] def updateLong(fieldIndex: Int, value: Long): Unit =
current.setLong(fieldIndex, value)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,7 @@ private[parquet] class RowWriteSupport extends WriteSupport[Row] with Logging {
case DoubleType => writer.addDouble(value.asInstanceOf[Double])
case FloatType => writer.addFloat(value.asInstanceOf[Float])
case BooleanType => writer.addBoolean(value.asInstanceOf[Boolean])
case DateType => writer.addInteger(value.asInstanceOf[java.sql.Date].getTime.toInt)
case d: DecimalType =>
if (d.precisionInfo == None || d.precisionInfo.get.precision > 18) {
sys.error(s"Unsupported datatype $d, cannot write to consumer")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ private[parquet] object ParquetTypesConverter extends Logging {
case ParquetPrimitiveTypeName.BOOLEAN => BooleanType
case ParquetPrimitiveTypeName.DOUBLE => DoubleType
case ParquetPrimitiveTypeName.FLOAT => FloatType
case ParquetPrimitiveTypeName.INT32
if originalType == ParquetOriginalType.DATE => DateType
case ParquetPrimitiveTypeName.INT32 => IntegerType
case ParquetPrimitiveTypeName.INT64 => LongType
case ParquetPrimitiveTypeName.INT96 if int96AsTimestamp => TimestampType
Expand Down Expand Up @@ -222,6 +224,8 @@ private[parquet] object ParquetTypesConverter extends Logging {
// There is no type for Byte or Short so we promote them to INT32.
case ShortType => Some(ParquetTypeInfo(ParquetPrimitiveTypeName.INT32))
case ByteType => Some(ParquetTypeInfo(ParquetPrimitiveTypeName.INT32))
case DateType => Some(ParquetTypeInfo(
ParquetPrimitiveTypeName.INT32, Some(ParquetOriginalType.DATE)))
case LongType => Some(ParquetTypeInfo(ParquetPrimitiveTypeName.INT64))
case TimestampType => Some(ParquetTypeInfo(ParquetPrimitiveTypeName.INT96))
case DecimalType.Fixed(precision, scale) if precision <= 18 =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,14 +57,15 @@ class ParquetSchemaSuite extends FunSuite with ParquetTest {
|}
""".stripMargin)

testSchema[(Byte, Short, Int, Long)](
testSchema[(Byte, Short, Int, Long, java.sql.Date)](
"logical integral types",
"""
|message root {
| required int32 _1 (INT_8);
| required int32 _2 (INT_16);
| required int32 _3 (INT_32);
| required int64 _4 (INT_64);
| optional int32 _5 (DATE);
|}
""".stripMargin)

Expand Down

0 comments on commit 97e9080

Please sign in to comment.