From 1d341baa409c7a3e6211b8dbedb2f5260835e576 Mon Sep 17 00:00:00 2001 From: Patrick Woody Date: Wed, 30 Nov 2016 18:39:16 -0500 Subject: [PATCH 1/3] Fixes --- .../parquet/VectorizedColumnReader.java | 1 + .../parquet/ParquetFileFormat.scala | 11 +- .../parquet/ParquetFileSplitter.scala | 10 +- .../datasources/parquet/ParquetFilters.scala | 81 ++++++++-- .../parquet/ParquetFilterSuite.scala | 151 +++++++++++++++++- .../parquet/ParquetQuerySuite.scala | 43 +++++ 6 files changed, 275 insertions(+), 22 deletions(-) diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java index 7dea4e44d39e5..eb91d27632889 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java @@ -360,6 +360,7 @@ private void readIntBatch(int rowId, int num, ColumnVector column) throws IOExce private void readLongBatch(int rowId, int num, ColumnVector column) throws IOException { // This is where we implement support for the valid type conversions. if (column.dataType() == DataTypes.LongType || + column.dataType() == DataTypes.TimestampType || DecimalType.is64BitDecimalType(column.dataType())) { defColumn.readLongs( num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn); diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala index 173ee97ae4a9f..6e9ae8eaec5d9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala @@ -302,7 +302,7 @@ class ParquetFileFormat val splits = ParquetFileFormat.fileSplits.get(root, new Callable[ParquetFileSplitter] { override def call(): ParquetFileSplitter = - createParquetFileSplits(root, hadoopConf, schema) + createParquetFileSplits(root, hadoopConf, schema, sparkSession) }) root -> splits.buildSplitter(filters) }.toMap @@ -320,9 +320,12 @@ class ParquetFileFormat private def createParquetFileSplits( root: Path, hadoopConf: Configuration, - schema: StructType): ParquetFileSplitter = { + schema: StructType, + sparkSession: SparkSession): ParquetFileSplitter = { getMetadataForPath(root, hadoopConf) - .map(meta => new ParquetMetadataFileSplitter(root, meta.getBlocks.asScala, schema)) + .map { meta => + new ParquetMetadataFileSplitter(root, meta.getBlocks.asScala, schema, sparkSession) + } .getOrElse(ParquetDefaultFileSplitter) } @@ -402,7 +405,7 @@ class ParquetFileFormat } else { None } - + log.debug(s"Pushing converted filters: $pushed") val broadcastedHadoopConf = sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf)) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileSplitter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileSplitter.scala index b17814e952c17..9a6d9e528557d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileSplitter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileSplitter.scala @@ -31,6 +31,7 @@ import org.apache.parquet.hadoop.metadata.BlockMetaData import org.roaringbitmap.RoaringBitmap import org.apache.spark.internal.Logging +import org.apache.spark.sql.SparkSession import org.apache.spark.sql.sources.Filter import org.apache.spark.sql.types.StructType import org.apache.spark.util.ThreadUtils @@ -53,10 +54,13 @@ object ParquetDefaultFileSplitter extends ParquetFileSplitter { class ParquetMetadataFileSplitter( val root: Path, val blocks: Seq[BlockMetaData], - val schema: StructType) + val schema: StructType, + val session: SparkSession) extends ParquetFileSplitter with Logging { + private val int96AsTimestamp = session.sessionState.conf.isParquetINT96AsTimestamp + private val referencedFiles = blocks.map(bmd => new Path(root, bmd.getPath)).toSet private val filterSets: Cache[Filter, RoaringBitmap] = @@ -99,7 +103,7 @@ class ParquetMetadataFileSplitter( filters: Seq[Filter], blocks: Seq[BlockMetaData]): Seq[BlockMetaData] = { val predicates = filters.flatMap { - ParquetFilters.createFilter(schema, _) + ParquetFilters.createFilter(schema, _, int96AsTimestamp) } if (predicates.nonEmpty) { // Asynchronously build bitmaps @@ -121,7 +125,7 @@ class ParquetMetadataFileSplitter( .filter(filterSets.getIfPresent(_) == null) .flatMap { filter => val bitmap = new RoaringBitmap - ParquetFilters.createFilter(schema, filter) + ParquetFilters.createFilter(schema, filter, int96AsTimestamp) .map((filter, _, bitmap)) } var i = 0 diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala index 28a544c67ba3c..dd06190229842 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala @@ -21,6 +21,7 @@ import org.apache.parquet.filter2.predicate._ import org.apache.parquet.filter2.predicate.FilterApi._ import org.apache.parquet.io.api.Binary +import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.sources import org.apache.spark.sql.types._ @@ -49,6 +50,12 @@ private[parquet] object ParquetFilters { (n: String, v: Any) => FilterApi.eq( binaryColumn(n), Option(v).map(b => Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]])).orNull) + case TimestampType => + (n: String, v: Any) => FilterApi.eq( + longColumn(n), convertTimestamp(v.asInstanceOf[java.sql.Timestamp])) + case DateType => + (n: String, v: Any) => FilterApi.eq( + intColumn(n), convertDate(v.asInstanceOf[java.sql.Date])) } private val makeNotEq: PartialFunction[DataType, (String, Any) => FilterPredicate] = { @@ -70,6 +77,12 @@ private[parquet] object ParquetFilters { (n: String, v: Any) => FilterApi.notEq( binaryColumn(n), Option(v).map(b => Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]])).orNull) + case TimestampType => + (n: String, v: Any) => FilterApi.notEq( + longColumn(n), convertTimestamp(v.asInstanceOf[java.sql.Timestamp])) + case DateType => + (n: String, v: Any) => FilterApi.notEq( + intColumn(n), convertDate(v.asInstanceOf[java.sql.Date])) } private val makeLt: PartialFunction[DataType, (String, Any) => FilterPredicate] = { @@ -88,6 +101,12 @@ private[parquet] object ParquetFilters { case BinaryType => (n: String, v: Any) => FilterApi.lt(binaryColumn(n), Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]])) + case TimestampType => + (n: String, v: Any) => FilterApi.lt( + longColumn(n), convertTimestamp(v.asInstanceOf[java.sql.Timestamp])) + case DateType => + (n: String, v: Any) => FilterApi.lt( + intColumn(n), convertDate(v.asInstanceOf[java.sql.Date])) } private val makeLtEq: PartialFunction[DataType, (String, Any) => FilterPredicate] = { @@ -106,6 +125,12 @@ private[parquet] object ParquetFilters { case BinaryType => (n: String, v: Any) => FilterApi.ltEq(binaryColumn(n), Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]])) + case TimestampType => + (n: String, v: Any) => FilterApi.ltEq( + longColumn(n), convertTimestamp(v.asInstanceOf[java.sql.Timestamp])) + case DateType => + (n: String, v: Any) => FilterApi.ltEq( + intColumn(n), convertDate(v.asInstanceOf[java.sql.Date])) } private val makeGt: PartialFunction[DataType, (String, Any) => FilterPredicate] = { @@ -124,6 +149,12 @@ private[parquet] object ParquetFilters { case BinaryType => (n: String, v: Any) => FilterApi.gt(binaryColumn(n), Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]])) + case TimestampType => + (n: String, v: Any) => FilterApi.gt( + longColumn(n), convertTimestamp(v.asInstanceOf[java.sql.Timestamp])) + case DateType => + (n: String, v: Any) => FilterApi.gt( + intColumn(n), convertDate(v.asInstanceOf[java.sql.Date])) } private val makeGtEq: PartialFunction[DataType, (String, Any) => FilterPredicate] = { @@ -142,6 +173,28 @@ private[parquet] object ParquetFilters { case BinaryType => (n: String, v: Any) => FilterApi.gtEq(binaryColumn(n), Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]])) + case TimestampType => + (n: String, v: Any) => FilterApi.gtEq( + longColumn(n), convertTimestamp(v.asInstanceOf[java.sql.Timestamp])) + case DateType => + (n: String, v: Any) => FilterApi.gtEq( + intColumn(n), convertDate(v.asInstanceOf[java.sql.Date])) + } + + private def convertDate(d: java.sql.Date): java.lang.Integer = { + if (d != null) { + DateTimeUtils.fromJavaDate(d).asInstanceOf[java.lang.Integer] + } else { + null + } + } + + private def convertTimestamp(t: java.sql.Timestamp): java.lang.Long = { + if (t != null) { + DateTimeUtils.fromJavaTimestamp(t).asInstanceOf[java.lang.Long] + } else { + null + } } /** @@ -153,17 +206,23 @@ private[parquet] object ParquetFilters { * using such fields, otherwise Parquet library will throw exception (PARQUET-389). * Here we filter out such fields. */ - private def getFieldMap(dataType: DataType): Map[String, DataType] = dataType match { - case StructType(fields) => - // Here we don't flatten the fields in the nested schema but just look up through - // root fields. Currently, accessing to nested fields does not push down filters - // and it does not support to create filters for them. - fields.filter { f => - !f.metadata.contains(StructType.metadataKeyForOptionalField) || - !f.metadata.getBoolean(StructType.metadataKeyForOptionalField) - }.map(f => f.name -> f.dataType).toMap - case _ => Map.empty[String, DataType] - } + private def getFieldMap(dataType: DataType, int96AsTimestamp: Boolean): Map[String, DataType] = + dataType match { + case StructType(fields) => + // Here we don't flatten the fields in the nested schema but just look up through + // root fields. Currently, accessing to nested fields does not push down filters + // and it does not support to create filters for them. + // scalastyle:off println + fields.filterNot { f => + val isTs = DataTypes.TimestampType.acceptsType(f.dataType) + + val isOptionalField = f.metadata.contains(StructType.metadataKeyForOptionalField) && + f.metadata.getBoolean(StructType.metadataKeyForOptionalField) + + (isTs && int96AsTimestamp) || isOptionalField + }.map(f => f.name -> f.dataType).toMap + case _ => Map.empty[String, DataType] + } /** * Converts data sources filters to Parquet filter predicates. diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala index c6f691f7f55ae..ee4078b9930bd 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala @@ -18,6 +18,8 @@ package org.apache.spark.sql.execution.datasources.parquet import java.nio.charset.StandardCharsets +import java.sql.{Date, Timestamp} +import java.time.{LocalDate, ZoneId} import org.apache.parquet.filter2.predicate.{FilterPredicate, Operators} import org.apache.parquet.filter2.predicate.FilterApi._ @@ -47,6 +49,34 @@ import org.apache.spark.util.{AccumulatorContext, LongAccumulator} * data type is nullable. */ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContext { + private def checkNoFilterPredicate(predicate: Predicate)(implicit df: DataFrame) = { + val output = predicate.collect { case a: Attribute => a }.distinct + withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> "true") { + withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "false") { + val query = df + .select(output.map(e => Column(e)): _*) + .where(Column(predicate)) + + var maybeRelation: Option[HadoopFsRelation] = None + val maybeAnalyzedPredicate = query.queryExecution.optimizedPlan.collect { + case PhysicalOperation(_, filters, LogicalRelation(relation: HadoopFsRelation, _, _)) => + maybeRelation = Some(relation) + filters + }.flatten.reduceLeftOption(_ && _) + assert(maybeAnalyzedPredicate.isDefined, "No filter is analyzed from the given query") + + val (_, selectedFilters, _) = + DataSourceStrategy.selectFilters(maybeRelation.get, maybeAnalyzedPredicate.toSeq) + assert(selectedFilters.nonEmpty, "No filter is pushed down") + + selectedFilters.foreach { pred => + val maybeFilter = ParquetFilters.createFilter(df.schema, pred, + spark.sessionState.conf.isParquetINT96AsTimestamp) + assert(maybeFilter.isEmpty, s"Predicate should not be created for $pred") + } + } + } + } private def checkFilterPredicate( df: DataFrame, @@ -75,7 +105,8 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex assert(selectedFilters.nonEmpty, "No filter is pushed down") selectedFilters.foreach { pred => - val maybeFilter = ParquetFilters.createFilter(df.schema, pred) + val maybeFilter = ParquetFilters.createFilter(df.schema, pred, + spark.sessionState.conf.isParquetINT96AsTimestamp) assert(maybeFilter.isDefined, s"Couldn't generate filter predicate for $pred") // Doesn't bother checking type parameters here (e.g. `Eq[Integer]`) maybeFilter.exists(_.getClass === filterClass) @@ -292,6 +323,80 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex } } + test("filter pushdown - timestamp") { + val baseMillis = System.currentTimeMillis() + def base(): Timestamp = new Timestamp(baseMillis) + + val timestamps = (0 to 3).map { i => + val ts = base() + ts.setNanos(i * 1000) + ts + } + + withSQLConf(SQLConf.PARQUET_INT96_AS_TIMESTAMP.key -> "false") { + withParquetDataFrame(timestamps.map(i => Tuple1(Option(i)))) { implicit df => + checkFilterPredicate('_1.isNull, classOf[Eq[_]], Seq.empty[Row]) + checkFilterPredicate('_1.isNotNull, classOf[NotEq[_]], timestamps.map(Row.apply(_))) + + checkFilterPredicate('_1 === timestamps(0), classOf[Eq[_]], timestamps(0)) + checkFilterPredicate('_1 <=> timestamps(0), classOf[Eq[_]], timestamps(0)) + checkFilterPredicate('_1 =!= timestamps(0), classOf[NotEq[_]], + timestamps.slice(1, 4).map(Row.apply(_))) + + checkFilterPredicate('_1 < timestamps(1), classOf[Lt[_]], timestamps(0)) + checkFilterPredicate('_1 > timestamps(2), classOf[Gt[_]], timestamps(3)) + checkFilterPredicate('_1 <= timestamps(0), classOf[LtEq[_]], timestamps(0)) + checkFilterPredicate('_1 >= timestamps(3), classOf[GtEq[_]], timestamps(3)) + + checkFilterPredicate(Literal(timestamps(0)) === '_1, classOf[Eq[_]], timestamps(0)) + checkFilterPredicate(Literal(timestamps(0)) <=> '_1, classOf[Eq[_]], timestamps(0)) + checkFilterPredicate(Literal(timestamps(1)) > '_1, classOf[Lt[_]], timestamps(0)) + checkFilterPredicate(Literal(timestamps(2)) < '_1, classOf[Gt[_]], timestamps(3)) + checkFilterPredicate(Literal(timestamps(0)) >= '_1, classOf[LtEq[_]], timestamps(0)) + checkFilterPredicate(Literal(timestamps(3)) <= '_1, classOf[GtEq[_]], timestamps(3)) + + checkFilterPredicate(!('_1 < timestamps(3)), classOf[GtEq[_]], timestamps(3)) + checkFilterPredicate('_1 < timestamps(1) || '_1 > timestamps(2), classOf[Operators.Or], + Seq(Row(timestamps(0)), Row(timestamps(3)))) + } + } + } + + test("filter pushdown - date") { + val dates = (0 to 3).map { i => + val millis = LocalDate.of(2016, 1, i + 1) + .atStartOfDay(ZoneId.systemDefault()) + .toInstant + .toEpochMilli + new Date(millis) + } + withParquetDataFrame(dates.map(i => Tuple1(Option(i)))) { implicit df => + checkFilterPredicate('_1.isNull, classOf[Eq[_]], Seq.empty[Row]) + checkFilterPredicate('_1.isNotNull, classOf[NotEq[_]], dates.map(Row.apply(_))) + + checkFilterPredicate('_1 === dates(0), classOf[Eq[_]], dates(0)) + checkFilterPredicate('_1 <=> dates(0), classOf[Eq[_]], dates(0)) + checkFilterPredicate('_1 =!= dates(0), classOf[NotEq[_]], + dates.slice(1, 4).map(Row.apply(_))) + + checkFilterPredicate('_1 < dates(1), classOf[Lt[_]], dates(0)) + checkFilterPredicate('_1 > dates(2), classOf[Gt[_]], dates(3)) + checkFilterPredicate('_1 <= dates(0), classOf[LtEq[_]], dates(0)) + checkFilterPredicate('_1 >= dates(3), classOf[GtEq[_]], dates(3)) + + checkFilterPredicate(Literal(dates(0)) === '_1, classOf[Eq[_]], dates(0)) + checkFilterPredicate(Literal(dates(0)) <=> '_1, classOf[Eq[_]], dates(0)) + checkFilterPredicate(Literal(dates(1)) > '_1, classOf[Lt[_]], dates(0)) + checkFilterPredicate(Literal(dates(2)) < '_1, classOf[Gt[_]], dates(3)) + checkFilterPredicate(Literal(dates(0)) >= '_1, classOf[LtEq[_]], dates(0)) + checkFilterPredicate(Literal(dates(3)) <= '_1, classOf[GtEq[_]], dates(3)) + + checkFilterPredicate(!('_1 < dates(3)), classOf[GtEq[_]], dates(3)) + checkFilterPredicate('_1 < dates(1) || '_1 > dates(2), classOf[Operators.Or], + Seq(Row(dates(0)), Row(dates(3)))) + } + } + test("SPARK-6554: don't push down predicates which reference partition columns") { import testImplicits._ @@ -496,7 +601,8 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex schema, sources.And( sources.LessThan("a", 10), - sources.GreaterThan("c", 1.5D))) + sources.GreaterThan("c", 1.5D)), + spark.sessionState.conf.isParquetINT96AsTimestamp) } assertResult(None) { @@ -504,7 +610,8 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex schema, sources.And( sources.LessThan("a", 10), - sources.StringContains("b", "prefix"))) + sources.StringContains("b", "prefix")), + spark.sessionState.conf.isParquetINT96AsTimestamp) } assertResult(None) { @@ -513,7 +620,8 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex sources.Not( sources.And( sources.GreaterThan("a", 1), - sources.StringContains("b", "prefix")))) + sources.StringContains("b", "prefix"))), + spark.sessionState.conf.isParquetINT96AsTimestamp) } } @@ -581,4 +689,39 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex } } } + + test("Do not create Timestamp filters when interpreting from INT96") { + val baseMillis = System.currentTimeMillis() + def base(): Timestamp = new Timestamp(baseMillis) + + val timestamps = (0 to 3).map { i => + val ts = base() + ts.setNanos(i * 1000) + ts + } + withParquetDataFrame(timestamps.map(i => Tuple1(Option(i)))) { implicit df => + val schema = df.schema + checkNoFilterPredicate('_1.isNull) + checkNoFilterPredicate('_1.isNotNull) + + checkNoFilterPredicate('_1 === timestamps(0)) + checkNoFilterPredicate('_1 <=> timestamps(0)) + checkNoFilterPredicate('_1 =!= timestamps(0)) + + checkNoFilterPredicate('_1 < timestamps(1)) + checkNoFilterPredicate('_1 > timestamps(2)) + checkNoFilterPredicate('_1 <= timestamps(0)) + checkNoFilterPredicate('_1 >= timestamps(3)) + + checkNoFilterPredicate(Literal(timestamps(0)) === '_1) + checkNoFilterPredicate(Literal(timestamps(0)) <=> '_1) + checkNoFilterPredicate(Literal(timestamps(1)) > '_1) + checkNoFilterPredicate(Literal(timestamps(2)) < '_1) + checkNoFilterPredicate(Literal(timestamps(0)) >= '_1) + checkNoFilterPredicate(Literal(timestamps(3)) <= '_1) + + checkNoFilterPredicate(!('_1 < timestamps(3))) + checkNoFilterPredicate('_1 < timestamps(1) || '_1 > timestamps(2)) + } + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala index 6b148a16418eb..dfe83fa332bd5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.datasources.parquet import java.io.File import java.net.URI +import java.sql.{Date, Timestamp} import com.google.common.collect.{HashMultiset, Multiset} import org.apache.hadoop.fs.{FileSystem, FSDataInputStream, Path, RawLocalFileSystem} @@ -795,6 +796,48 @@ class ParquetQuerySuite extends QueryTest with ParquetTest with SharedSQLContext } } } + + test("Ensure timestamps are filterable") { + withSQLConf(ParquetOutputFormat.JOB_SUMMARY_LEVEL -> "ALL", + SQLConf.PARQUET_INT96_AS_TIMESTAMP.key -> "false") { + withTempPath { path => + implicit val encoder = Encoders.TIMESTAMP + val ts = new Timestamp(System.currentTimeMillis()) + spark.createDataset(Seq(ts)).write.parquet(path.getCanonicalPath) + + val df = spark.read.parquet(path.getCanonicalPath) + val columnHit: Column = df.col("value").eqNullSafe(ts) + val filterHit = df.filter(columnHit) + assert(filterHit.rdd.partitions.length == 1 && filterHit.count == 1) + + val newTs = new Timestamp(System.currentTimeMillis()) + val columnMiss: Column = df.col("value").eqNullSafe(newTs) + val filterMiss = df.filter(columnMiss) + assert(filterMiss.rdd.partitions.length == 0 && filterMiss.count == 0) + } + } + } + + test("Ensure dates are filterable") { + withSQLConf(ParquetOutputFormat.JOB_SUMMARY_LEVEL -> "ALL", + SQLConf.PARQUET_INT96_AS_TIMESTAMP.key -> "false") { + withTempPath { path => + implicit val encoder = Encoders.DATE + val date = new Date(2016, 1, 1) + spark.createDataset(Seq(date)).write.parquet(path.getCanonicalPath) + + val df = spark.read.parquet(path.getCanonicalPath) + val columnHit: Column = df.col("value").eqNullSafe(date) + val filterHit = df.filter(columnHit) + assert(filterHit.rdd.partitions.length == 1 && filterHit.count == 1) + + val newDate = new Date(2015, 1, 1) + val columnMiss: Column = df.col("value").eqNullSafe(newDate) + val filterMiss = df.filter(columnMiss) + assert(filterMiss.rdd.partitions.length == 0 && filterMiss.count == 0) + } + } + } } class CountingFileSystem extends RawLocalFileSystem { From 2df57e612afa8ed62a4a84cccb5f0f53dea08bec Mon Sep 17 00:00:00 2001 From: Patrick Woody Date: Thu, 1 Dec 2016 13:46:01 -0500 Subject: [PATCH 2/3] Fix bad rebase --- .../parquet/ParquetFileFormat.scala | 5 +++-- .../datasources/parquet/ParquetFilters.scala | 19 ++++++++++------- .../parquet/ParquetRowConverter.scala | 3 +++ .../parquet/ParquetSchemaConverter.scala | 8 +------ .../parquet/ParquetWriteSupport.scala | 21 ++----------------- 5 files changed, 21 insertions(+), 35 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala index 6e9ae8eaec5d9..3a76168bedbf1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala @@ -385,13 +385,14 @@ class ParquetFileFormat requiredSchema).asInstanceOf[StructType] ParquetWriteSupport.setSchema(dataSchemaToWrite, hadoopConf) + val int96AsTimestamp = sparkSession.sessionState.conf.isParquetINT96AsTimestamp // Sets flags for `CatalystSchemaConverter` hadoopConf.setBoolean( SQLConf.PARQUET_BINARY_AS_STRING.key, sparkSession.sessionState.conf.isParquetBinaryAsString) hadoopConf.setBoolean( SQLConf.PARQUET_INT96_AS_TIMESTAMP.key, - sparkSession.sessionState.conf.isParquetINT96AsTimestamp) + int96AsTimestamp) // Try to push down filters when filter push-down is enabled. val pushed = @@ -400,7 +401,7 @@ class ParquetFileFormat // Collects all converted Parquet filter predicates. Notice that not all predicates can be // converted (`ParquetFilters.createFilter` returns an `Option`). That's why a `flatMap` // is used here. - .flatMap(ParquetFilters.createFilter(requiredSchema, _)) + .flatMap(ParquetFilters.createFilter(requiredSchema, _, int96AsTimestamp)) .reduceOption(FilterApi.and) } else { None diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala index dd06190229842..5b26695173de6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala @@ -227,8 +227,11 @@ private[parquet] object ParquetFilters { /** * Converts data sources filters to Parquet filter predicates. */ - def createFilter(schema: StructType, predicate: sources.Filter): Option[FilterPredicate] = { - val dataTypeOf = getFieldMap(schema) + def createFilter( + schema: StructType, + predicate: sources.Filter, + int96AsTimestamp: Boolean): Option[FilterPredicate] = { + val dataTypeOf = getFieldMap(schema, int96AsTimestamp) // NOTE: // @@ -280,18 +283,20 @@ private[parquet] object ParquetFilters { // Pushing one side of AND down is only safe to do at the top level. // You can see ParquetRelation's initializeLocalJobFunc method as an example. for { - lhsFilter <- createFilter(schema, lhs) - rhsFilter <- createFilter(schema, rhs) + lhsFilter <- createFilter(schema, lhs, int96AsTimestamp) + rhsFilter <- createFilter(schema, rhs, int96AsTimestamp) } yield FilterApi.and(lhsFilter, rhsFilter) case sources.Or(lhs, rhs) => for { - lhsFilter <- createFilter(schema, lhs) - rhsFilter <- createFilter(schema, rhs) + lhsFilter <- createFilter(schema, lhs, int96AsTimestamp) + rhsFilter <- createFilter(schema, rhs, int96AsTimestamp) } yield FilterApi.or(lhsFilter, rhsFilter) case sources.Not(pred) => - createFilter(schema, pred).map(FilterApi.not).map(LogicalInverseRewriter.rewrite) + createFilter(schema, pred, int96AsTimestamp) + .map(FilterApi.not) + .map(LogicalInverseRewriter.rewrite) case sources.In(name, values) if dataTypeOf.contains(name) => val eq = makeEq.lift(dataTypeOf(name)) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala index 33dcf2f3fd167..56495f487d693 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala @@ -252,6 +252,9 @@ private[parquet] class ParquetRowConverter( case StringType => new ParquetStringConverter(updater) + case _: TimestampType if parquetType.asPrimitiveType().getPrimitiveTypeName == INT64 => + new ParquetPrimitiveConverter(updater) + case TimestampType => // TODO Implements `TIMESTAMP_MICROS` once parquet-mr has that. new ParquetPrimitiveConverter(updater) { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala index b4f36ce3752c0..cc2a6a61d2366 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala @@ -368,14 +368,8 @@ private[parquet] class ParquetSchemaConverter( // from Spark 1.5.0, we resort to a timestamp type with 100 ns precision so that we can store // a timestamp into a `Long`. This design decision is subject to change though, for example, // we may resort to microsecond precision in the future. - // - // For Parquet, we plan to write all `TimestampType` value as `TIMESTAMP_MICROS`, but it's - // currently not implemented yet because parquet-mr 1.7.0 (the version we're currently using) - // hasn't implemented `TIMESTAMP_MICROS` yet. - // - // TODO Converts `TIMESTAMP_MICROS` once parquet-mr implements that. case TimestampType => - Types.primitive(INT96, repetition).named(field.name) + Types.primitive(INT64, repetition).as(TIMESTAMP_MICROS).named(field.name) case BinaryType => Types.primitive(BINARY, repetition).named(field.name) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala index a31d2b9c37e9d..4c2a5e7227420 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala @@ -17,7 +17,6 @@ package org.apache.spark.sql.execution.datasources.parquet -import java.nio.{ByteBuffer, ByteOrder} import java.util import scala.collection.JavaConverters.mapAsJavaMapConverter @@ -32,7 +31,6 @@ import org.apache.parquet.io.api.{Binary, RecordConsumer} import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.SpecializedGetters -import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.execution.datasources.parquet.ParquetSchemaConverter.minBytesForPrecision import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ @@ -66,9 +64,6 @@ private[parquet] class ParquetWriteSupport extends WriteSupport[InternalRow] wit // Whether to write data in legacy Parquet format compatible with Spark 1.4 and prior versions private var writeLegacyParquetFormat: Boolean = _ - // Reusable byte array used to write timestamps as Parquet INT96 values - private val timestampBuffer = new Array[Byte](12) - // Reusable byte array used to write decimal values private val decimalBuffer = new Array[Byte](minBytesForPrecision(DecimalType.MAX_PRECISION)) @@ -154,20 +149,8 @@ private[parquet] class ParquetWriteSupport extends WriteSupport[InternalRow] wit Binary.fromReusedByteArray(row.getUTF8String(ordinal).getBytes)) case TimestampType => - (row: SpecializedGetters, ordinal: Int) => { - // TODO Writes `TimestampType` values as `TIMESTAMP_MICROS` once parquet-mr implements it - // Currently we only support timestamps stored as INT96, which is compatible with Hive - // and Impala. However, INT96 is to be deprecated. We plan to support `TIMESTAMP_MICROS` - // defined in the parquet-format spec. But up until writing, the most recent parquet-mr - // version (1.8.1) hasn't implemented it yet. - - // NOTE: Starting from Spark 1.5, Spark SQL `TimestampType` only has microsecond - // precision. Nanosecond parts of timestamp values read from INT96 are simply stripped. - val (julianDay, timeOfDayNanos) = DateTimeUtils.toJulianDay(row.getLong(ordinal)) - val buf = ByteBuffer.wrap(timestampBuffer) - buf.order(ByteOrder.LITTLE_ENDIAN).putLong(timeOfDayNanos).putInt(julianDay) - recordConsumer.addBinary(Binary.fromReusedByteArray(timestampBuffer)) - } + (row: SpecializedGetters, ordinal: Int) => + recordConsumer.addLong(row.getLong(ordinal)) case BinaryType => (row: SpecializedGetters, ordinal: Int) => From 47825bbc62d601a7704f10265db77c5e57155ae3 Mon Sep 17 00:00:00 2001 From: Patrick Woody Date: Thu, 1 Dec 2016 16:26:22 -0500 Subject: [PATCH 3/3] move to info logging --- .../sql/execution/datasources/parquet/ParquetFileFormat.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala index 3a76168bedbf1..26aa6de602ee3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala @@ -406,7 +406,7 @@ class ParquetFileFormat } else { None } - log.debug(s"Pushing converted filters: $pushed") + log.info(s"Pushing converted filters: $pushed") val broadcastedHadoopConf = sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf))