Skip to content

Commit

Permalink
parquet support for primitive date
Browse files Browse the repository at this point in the history
  • Loading branch information
adrian-wang committed Mar 22, 2015
1 parent 97e9080 commit faef887
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.sql.parquet

import java.sql.{Date, Timestamp}
import java.sql.Timestamp
import java.util.{TimeZone, Calendar}

import scala.collection.mutable.{Buffer, ArrayBuffer, HashMap}
Expand Down Expand Up @@ -127,6 +127,12 @@ private[sql] object CatalystConverter {
parent.updateByte(fieldIndex, value.asInstanceOf[ByteType.JvmType])
}
}
case DateType => {
new CatalystPrimitiveConverter(parent, fieldIndex) {
override def addInt(value: Int): Unit =
parent.updateDate(fieldIndex, value.asInstanceOf[DateType.JvmType])
}
}
case d: DecimalType => {
new CatalystPrimitiveConverter(parent, fieldIndex) {
override def addBinary(value: Binary): Unit =
Expand Down Expand Up @@ -193,7 +199,7 @@ private[parquet] abstract class CatalystConverter extends GroupConverter {
updateField(fieldIndex, value)

protected[parquet] def updateDate(fieldIndex: Int, value: Int): Unit =
updateField(fieldIndex, new Date(value))
updateField(fieldIndex, value)

protected[parquet] def updateLong(fieldIndex: Int, value: Long): Unit =
updateField(fieldIndex, value)
Expand Down Expand Up @@ -392,7 +398,7 @@ private[parquet] class CatalystPrimitiveRowConverter(
current.setInt(fieldIndex, value)

override protected[parquet] def updateDate(fieldIndex: Int, value: Int): Unit =
current.update(fieldIndex, new Date(value))
current.update(fieldIndex, value)

override protected[parquet] def updateLong(fieldIndex: Int, value: Long): Unit =
current.setLong(fieldIndex, value)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ private[parquet] class RowWriteSupport extends WriteSupport[Row] with Logging {
case DoubleType => writer.addDouble(value.asInstanceOf[Double])
case FloatType => writer.addFloat(value.asInstanceOf[Float])
case BooleanType => writer.addBoolean(value.asInstanceOf[Boolean])
case DateType => writer.addInteger(value.asInstanceOf[java.sql.Date].getTime.toInt)
case DateType => writer.addInteger(value.asInstanceOf[Int])
case d: DecimalType =>
if (d.precisionInfo == None || d.precisionInfo.get.precision > 18) {
sys.error(s"Unsupported datatype $d, cannot write to consumer")
Expand Down Expand Up @@ -359,6 +359,7 @@ private[parquet] class MutableRowWriteSupport extends RowWriteSupport {
case DoubleType => writer.addDouble(record.getDouble(index))
case FloatType => writer.addFloat(record.getFloat(index))
case BooleanType => writer.addBoolean(record.getBoolean(index))
case DateType => writer.addInteger(record.getInt(index))
case TimestampType => writeTimestamp(record(index).asInstanceOf[java.sql.Timestamp])
case d: DecimalType =>
if (d.precisionInfo == None || d.precisionInfo.get.precision > 18) {
Expand Down

0 comments on commit faef887

Please sign in to comment.