Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

timestamps #72

Merged
merged 3 commits into from
Dec 1, 2016
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,7 @@ private void readIntBatch(int rowId, int num, ColumnVector column) throws IOExce
private void readLongBatch(int rowId, int num, ColumnVector column) throws IOException {
// This is where we implement support for the valid type conversions.
if (column.dataType() == DataTypes.LongType ||
column.dataType() == DataTypes.TimestampType ||
DecimalType.is64BitDecimalType(column.dataType())) {
defColumn.readLongs(
num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,7 @@ class ParquetFileFormat
val splits = ParquetFileFormat.fileSplits.get(root,
new Callable[ParquetFileSplitter] {
override def call(): ParquetFileSplitter =
createParquetFileSplits(root, hadoopConf, schema)
createParquetFileSplits(root, hadoopConf, schema, sparkSession)
})
root -> splits.buildSplitter(filters)
}.toMap
Expand All @@ -320,9 +320,12 @@ class ParquetFileFormat
private def createParquetFileSplits(
root: Path,
hadoopConf: Configuration,
schema: StructType): ParquetFileSplitter = {
schema: StructType,
sparkSession: SparkSession): ParquetFileSplitter = {
getMetadataForPath(root, hadoopConf)
.map(meta => new ParquetMetadataFileSplitter(root, meta.getBlocks.asScala, schema))
.map { meta =>
new ParquetMetadataFileSplitter(root, meta.getBlocks.asScala, schema, sparkSession)
}
.getOrElse(ParquetDefaultFileSplitter)
}

Expand Down Expand Up @@ -382,13 +385,14 @@ class ParquetFileFormat
requiredSchema).asInstanceOf[StructType]
ParquetWriteSupport.setSchema(dataSchemaToWrite, hadoopConf)

val int96AsTimestamp = sparkSession.sessionState.conf.isParquetINT96AsTimestamp
// Sets flags for `CatalystSchemaConverter`
hadoopConf.setBoolean(
SQLConf.PARQUET_BINARY_AS_STRING.key,
sparkSession.sessionState.conf.isParquetBinaryAsString)
hadoopConf.setBoolean(
SQLConf.PARQUET_INT96_AS_TIMESTAMP.key,
sparkSession.sessionState.conf.isParquetINT96AsTimestamp)
int96AsTimestamp)

// Try to push down filters when filter push-down is enabled.
val pushed =
Expand All @@ -397,12 +401,12 @@ class ParquetFileFormat
// Collects all converted Parquet filter predicates. Notice that not all predicates can be
// converted (`ParquetFilters.createFilter` returns an `Option`). That's why a `flatMap`
// is used here.
.flatMap(ParquetFilters.createFilter(requiredSchema, _))
.flatMap(ParquetFilters.createFilter(requiredSchema, _, int96AsTimestamp))
.reduceOption(FilterApi.and)
} else {
None
}

log.debug(s"Pushing converted filters: $pushed")

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe info? FilterCompat.get logs at that level

val broadcastedHadoopConf =
sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import org.apache.parquet.hadoop.metadata.BlockMetaData
import org.roaringbitmap.RoaringBitmap

import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.sources.Filter
import org.apache.spark.sql.types.StructType
import org.apache.spark.util.ThreadUtils
Expand All @@ -53,10 +54,13 @@ object ParquetDefaultFileSplitter extends ParquetFileSplitter {
class ParquetMetadataFileSplitter(
val root: Path,
val blocks: Seq[BlockMetaData],
val schema: StructType)
val schema: StructType,
val session: SparkSession)
extends ParquetFileSplitter
with Logging {

private val int96AsTimestamp = session.sessionState.conf.isParquetINT96AsTimestamp

private val referencedFiles = blocks.map(bmd => new Path(root, bmd.getPath)).toSet

private val filterSets: Cache[Filter, RoaringBitmap] =
Expand Down Expand Up @@ -99,7 +103,7 @@ class ParquetMetadataFileSplitter(
filters: Seq[Filter],
blocks: Seq[BlockMetaData]): Seq[BlockMetaData] = {
val predicates = filters.flatMap {
ParquetFilters.createFilter(schema, _)
ParquetFilters.createFilter(schema, _, int96AsTimestamp)
}
if (predicates.nonEmpty) {
// Asynchronously build bitmaps
Expand All @@ -121,7 +125,7 @@ class ParquetMetadataFileSplitter(
.filter(filterSets.getIfPresent(_) == null)
.flatMap { filter =>
val bitmap = new RoaringBitmap
ParquetFilters.createFilter(schema, filter)
ParquetFilters.createFilter(schema, filter, int96AsTimestamp)
.map((filter, _, bitmap))
}
var i = 0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import org.apache.parquet.filter2.predicate._
import org.apache.parquet.filter2.predicate.FilterApi._
import org.apache.parquet.io.api.Binary

import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.sources
import org.apache.spark.sql.types._

Expand Down Expand Up @@ -49,6 +50,12 @@ private[parquet] object ParquetFilters {
(n: String, v: Any) => FilterApi.eq(
binaryColumn(n),
Option(v).map(b => Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]])).orNull)
case TimestampType =>
(n: String, v: Any) => FilterApi.eq(
longColumn(n), convertTimestamp(v.asInstanceOf[java.sql.Timestamp]))
case DateType =>
(n: String, v: Any) => FilterApi.eq(
intColumn(n), convertDate(v.asInstanceOf[java.sql.Date]))
}

private val makeNotEq: PartialFunction[DataType, (String, Any) => FilterPredicate] = {
Expand All @@ -70,6 +77,12 @@ private[parquet] object ParquetFilters {
(n: String, v: Any) => FilterApi.notEq(
binaryColumn(n),
Option(v).map(b => Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]])).orNull)
case TimestampType =>
(n: String, v: Any) => FilterApi.notEq(
longColumn(n), convertTimestamp(v.asInstanceOf[java.sql.Timestamp]))
case DateType =>
(n: String, v: Any) => FilterApi.notEq(
intColumn(n), convertDate(v.asInstanceOf[java.sql.Date]))
}

private val makeLt: PartialFunction[DataType, (String, Any) => FilterPredicate] = {
Expand All @@ -88,6 +101,12 @@ private[parquet] object ParquetFilters {
case BinaryType =>
(n: String, v: Any) =>
FilterApi.lt(binaryColumn(n), Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]]))
case TimestampType =>
(n: String, v: Any) => FilterApi.lt(
longColumn(n), convertTimestamp(v.asInstanceOf[java.sql.Timestamp]))
case DateType =>
(n: String, v: Any) => FilterApi.lt(
intColumn(n), convertDate(v.asInstanceOf[java.sql.Date]))
}

private val makeLtEq: PartialFunction[DataType, (String, Any) => FilterPredicate] = {
Expand All @@ -106,6 +125,12 @@ private[parquet] object ParquetFilters {
case BinaryType =>
(n: String, v: Any) =>
FilterApi.ltEq(binaryColumn(n), Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]]))
case TimestampType =>
(n: String, v: Any) => FilterApi.ltEq(
longColumn(n), convertTimestamp(v.asInstanceOf[java.sql.Timestamp]))
case DateType =>
(n: String, v: Any) => FilterApi.ltEq(
intColumn(n), convertDate(v.asInstanceOf[java.sql.Date]))
}

private val makeGt: PartialFunction[DataType, (String, Any) => FilterPredicate] = {
Expand All @@ -124,6 +149,12 @@ private[parquet] object ParquetFilters {
case BinaryType =>
(n: String, v: Any) =>
FilterApi.gt(binaryColumn(n), Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]]))
case TimestampType =>
(n: String, v: Any) => FilterApi.gt(
longColumn(n), convertTimestamp(v.asInstanceOf[java.sql.Timestamp]))
case DateType =>
(n: String, v: Any) => FilterApi.gt(
intColumn(n), convertDate(v.asInstanceOf[java.sql.Date]))
}

private val makeGtEq: PartialFunction[DataType, (String, Any) => FilterPredicate] = {
Expand All @@ -142,6 +173,28 @@ private[parquet] object ParquetFilters {
case BinaryType =>
(n: String, v: Any) =>
FilterApi.gtEq(binaryColumn(n), Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]]))
case TimestampType =>
(n: String, v: Any) => FilterApi.gtEq(
longColumn(n), convertTimestamp(v.asInstanceOf[java.sql.Timestamp]))
case DateType =>
(n: String, v: Any) => FilterApi.gtEq(
intColumn(n), convertDate(v.asInstanceOf[java.sql.Date]))
}

private def convertDate(d: java.sql.Date): java.lang.Integer = {
if (d != null) {
DateTimeUtils.fromJavaDate(d).asInstanceOf[java.lang.Integer]
} else {
null
}
}

private def convertTimestamp(t: java.sql.Timestamp): java.lang.Long = {
if (t != null) {
DateTimeUtils.fromJavaTimestamp(t).asInstanceOf[java.lang.Long]
} else {
null
}
}

/**
Expand All @@ -153,23 +206,32 @@ private[parquet] object ParquetFilters {
* using such fields, otherwise Parquet library will throw exception (PARQUET-389).
* Here we filter out such fields.
*/
private def getFieldMap(dataType: DataType): Map[String, DataType] = dataType match {
case StructType(fields) =>
// Here we don't flatten the fields in the nested schema but just look up through
// root fields. Currently, accessing to nested fields does not push down filters
// and it does not support to create filters for them.
fields.filter { f =>
!f.metadata.contains(StructType.metadataKeyForOptionalField) ||
!f.metadata.getBoolean(StructType.metadataKeyForOptionalField)
}.map(f => f.name -> f.dataType).toMap
case _ => Map.empty[String, DataType]
}
private def getFieldMap(dataType: DataType, int96AsTimestamp: Boolean): Map[String, DataType] =
dataType match {
case StructType(fields) =>
// Here we don't flatten the fields in the nested schema but just look up through
// root fields. Currently, accessing to nested fields does not push down filters
// and it does not support to create filters for them.
// scalastyle:off println
fields.filterNot { f =>
val isTs = DataTypes.TimestampType.acceptsType(f.dataType)

val isOptionalField = f.metadata.contains(StructType.metadataKeyForOptionalField) &&
f.metadata.getBoolean(StructType.metadataKeyForOptionalField)

(isTs && int96AsTimestamp) || isOptionalField
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: could be lazy with the boolean evaluations:

fields.filterNot { f =>
    (int96AsTimestamp && DataTypes.TimestampType.acceptsType(f.dataType))
    || (f.metadata.contains(StructType.metadataKeyForOptionalField)
                    && f.metadata.getBoolean(StructType.metadataKeyForOptionalField))
}.map(f => f.name -> f.dataType).toMap

}.map(f => f.name -> f.dataType).toMap
case _ => Map.empty[String, DataType]
}

/**
* Converts data sources filters to Parquet filter predicates.
*/
def createFilter(schema: StructType, predicate: sources.Filter): Option[FilterPredicate] = {
val dataTypeOf = getFieldMap(schema)
def createFilter(
schema: StructType,
predicate: sources.Filter,
int96AsTimestamp: Boolean): Option[FilterPredicate] = {
val dataTypeOf = getFieldMap(schema, int96AsTimestamp)

// NOTE:
//
Expand Down Expand Up @@ -221,18 +283,20 @@ private[parquet] object ParquetFilters {
// Pushing one side of AND down is only safe to do at the top level.
// You can see ParquetRelation's initializeLocalJobFunc method as an example.
for {
lhsFilter <- createFilter(schema, lhs)
rhsFilter <- createFilter(schema, rhs)
lhsFilter <- createFilter(schema, lhs, int96AsTimestamp)
rhsFilter <- createFilter(schema, rhs, int96AsTimestamp)
} yield FilterApi.and(lhsFilter, rhsFilter)

case sources.Or(lhs, rhs) =>
for {
lhsFilter <- createFilter(schema, lhs)
rhsFilter <- createFilter(schema, rhs)
lhsFilter <- createFilter(schema, lhs, int96AsTimestamp)
rhsFilter <- createFilter(schema, rhs, int96AsTimestamp)
} yield FilterApi.or(lhsFilter, rhsFilter)

case sources.Not(pred) =>
createFilter(schema, pred).map(FilterApi.not).map(LogicalInverseRewriter.rewrite)
createFilter(schema, pred, int96AsTimestamp)
.map(FilterApi.not)
.map(LogicalInverseRewriter.rewrite)

case sources.In(name, values) if dataTypeOf.contains(name) =>
val eq = makeEq.lift(dataTypeOf(name))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,9 @@ private[parquet] class ParquetRowConverter(
case StringType =>
new ParquetStringConverter(updater)

case _: TimestampType if parquetType.asPrimitiveType().getPrimitiveTypeName == INT64 =>
new ParquetPrimitiveConverter(updater)

case TimestampType =>
// TODO Implements `TIMESTAMP_MICROS` once parquet-mr has that.
new ParquetPrimitiveConverter(updater) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -368,14 +368,8 @@ private[parquet] class ParquetSchemaConverter(
// from Spark 1.5.0, we resort to a timestamp type with 100 ns precision so that we can store
// a timestamp into a `Long`. This design decision is subject to change though, for example,
// we may resort to microsecond precision in the future.
//
// For Parquet, we plan to write all `TimestampType` value as `TIMESTAMP_MICROS`, but it's
// currently not implemented yet because parquet-mr 1.7.0 (the version we're currently using)
// hasn't implemented `TIMESTAMP_MICROS` yet.
//
// TODO Converts `TIMESTAMP_MICROS` once parquet-mr implements that.
case TimestampType =>
Types.primitive(INT96, repetition).named(field.name)
Types.primitive(INT64, repetition).as(TIMESTAMP_MICROS).named(field.name)

case BinaryType =>
Types.primitive(BINARY, repetition).named(field.name)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package org.apache.spark.sql.execution.datasources.parquet

import java.nio.{ByteBuffer, ByteOrder}
import java.util

import scala.collection.JavaConverters.mapAsJavaMapConverter
Expand All @@ -32,7 +31,6 @@ import org.apache.parquet.io.api.{Binary, RecordConsumer}
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.SpecializedGetters
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.execution.datasources.parquet.ParquetSchemaConverter.minBytesForPrecision
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
Expand Down Expand Up @@ -66,9 +64,6 @@ private[parquet] class ParquetWriteSupport extends WriteSupport[InternalRow] wit
// Whether to write data in legacy Parquet format compatible with Spark 1.4 and prior versions
private var writeLegacyParquetFormat: Boolean = _

// Reusable byte array used to write timestamps as Parquet INT96 values
private val timestampBuffer = new Array[Byte](12)

// Reusable byte array used to write decimal values
private val decimalBuffer = new Array[Byte](minBytesForPrecision(DecimalType.MAX_PRECISION))

Expand Down Expand Up @@ -154,20 +149,8 @@ private[parquet] class ParquetWriteSupport extends WriteSupport[InternalRow] wit
Binary.fromReusedByteArray(row.getUTF8String(ordinal).getBytes))

case TimestampType =>
(row: SpecializedGetters, ordinal: Int) => {
// TODO Writes `TimestampType` values as `TIMESTAMP_MICROS` once parquet-mr implements it
// Currently we only support timestamps stored as INT96, which is compatible with Hive
// and Impala. However, INT96 is to be deprecated. We plan to support `TIMESTAMP_MICROS`
// defined in the parquet-format spec. But up until writing, the most recent parquet-mr
// version (1.8.1) hasn't implemented it yet.

// NOTE: Starting from Spark 1.5, Spark SQL `TimestampType` only has microsecond
// precision. Nanosecond parts of timestamp values read from INT96 are simply stripped.
val (julianDay, timeOfDayNanos) = DateTimeUtils.toJulianDay(row.getLong(ordinal))
val buf = ByteBuffer.wrap(timestampBuffer)
buf.order(ByteOrder.LITTLE_ENDIAN).putLong(timeOfDayNanos).putInt(julianDay)
recordConsumer.addBinary(Binary.fromReusedByteArray(timestampBuffer))
}
(row: SpecializedGetters, ordinal: Int) =>
recordConsumer.addLong(row.getLong(ordinal))

case BinaryType =>
(row: SpecializedGetters, ordinal: Int) =>
Expand Down
Loading