From 0887e5e87891e8e22f534ca6d0406daf86ec2dad Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Wed, 21 Oct 2015 09:02:20 +0800 Subject: [PATCH] [SPARK-11153][SQL] Disables Parquet filter push-down for string and binary columns Due to PARQUET-251, `BINARY` columns in existing Parquet files may be written with corrupted statistics information. This information is used by filter push-down optimization. Since Spark 1.5 turns on Parquet filter push-down by default, we may end up with wrong query results. PARQUET-251 has been fixed in parquet-mr 1.8.1, but Spark 1.5 is still using 1.7.0. This affects all Spark SQL data types that can be mapped to Parquet {{BINARY}}, namely: - `StringType` - `BinaryType` - `DecimalType` (But Spark SQL doesn't support pushing down filters involving `DecimalType` columns for now.) To avoid wrong query results, we should disable filter push-down for columns of `StringType` and `BinaryType` until we upgrade to parquet-mr 1.8. Author: Cheng Lian Closes #9152 from liancheng/spark-11153.workaround-parquet-251. --- .../datasources/parquet/ParquetFilters.scala | 27 +++++++++++++++++++ .../parquet/ParquetFilterSuite.scala | 6 +++-- 2 files changed, 31 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala index c6b3fe7900da8..1f0405f4210b1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala @@ -60,6 +60,8 @@ private[sql] object ParquetFilters { case DoubleType => (n: String, v: Any) => FilterApi.eq(doubleColumn(n), v.asInstanceOf[java.lang.Double]) + // See https://issues.apache.org/jira/browse/SPARK-11153 + /* // Binary.fromString and Binary.fromByteArray don't accept null values case StringType => (n: String, v: Any) => FilterApi.eq( @@ -69,6 +71,7 @@ private[sql] object ParquetFilters { (n: String, v: Any) => FilterApi.eq( binaryColumn(n), Option(v).map(b => Binary.fromByteArray(v.asInstanceOf[Array[Byte]])).orNull) + */ } private val makeNotEq: PartialFunction[DataType, (String, Any) => FilterPredicate] = { @@ -82,6 +85,9 @@ private[sql] object ParquetFilters { (n: String, v: Any) => FilterApi.notEq(floatColumn(n), v.asInstanceOf[java.lang.Float]) case DoubleType => (n: String, v: Any) => FilterApi.notEq(doubleColumn(n), v.asInstanceOf[java.lang.Double]) + + // See https://issues.apache.org/jira/browse/SPARK-11153 + /* case StringType => (n: String, v: Any) => FilterApi.notEq( binaryColumn(n), @@ -90,6 +96,7 @@ private[sql] object ParquetFilters { (n: String, v: Any) => FilterApi.notEq( binaryColumn(n), Option(v).map(b => Binary.fromByteArray(v.asInstanceOf[Array[Byte]])).orNull) + */ } private val makeLt: PartialFunction[DataType, (String, Any) => FilterPredicate] = { @@ -101,6 +108,9 @@ private[sql] object ParquetFilters { (n: String, v: Any) => FilterApi.lt(floatColumn(n), v.asInstanceOf[java.lang.Float]) case DoubleType => (n: String, v: Any) => FilterApi.lt(doubleColumn(n), v.asInstanceOf[java.lang.Double]) + + // See https://issues.apache.org/jira/browse/SPARK-11153 + /* case StringType => (n: String, v: Any) => FilterApi.lt(binaryColumn(n), @@ -108,6 +118,7 @@ private[sql] object ParquetFilters { case BinaryType => (n: String, v: Any) => FilterApi.lt(binaryColumn(n), Binary.fromByteArray(v.asInstanceOf[Array[Byte]])) + */ } private val makeLtEq: PartialFunction[DataType, (String, Any) => FilterPredicate] = { @@ -119,6 +130,9 @@ private[sql] object ParquetFilters { (n: String, v: Any) => FilterApi.ltEq(floatColumn(n), v.asInstanceOf[java.lang.Float]) case DoubleType => (n: String, v: Any) => FilterApi.ltEq(doubleColumn(n), v.asInstanceOf[java.lang.Double]) + + // See https://issues.apache.org/jira/browse/SPARK-11153 + /* case StringType => (n: String, v: Any) => FilterApi.ltEq(binaryColumn(n), @@ -126,6 +140,7 @@ private[sql] object ParquetFilters { case BinaryType => (n: String, v: Any) => FilterApi.ltEq(binaryColumn(n), Binary.fromByteArray(v.asInstanceOf[Array[Byte]])) + */ } private val makeGt: PartialFunction[DataType, (String, Any) => FilterPredicate] = { @@ -137,6 +152,9 @@ private[sql] object ParquetFilters { (n: String, v: Any) => FilterApi.gt(floatColumn(n), v.asInstanceOf[java.lang.Float]) case DoubleType => (n: String, v: Any) => FilterApi.gt(doubleColumn(n), v.asInstanceOf[java.lang.Double]) + + // See https://issues.apache.org/jira/browse/SPARK-11153 + /* case StringType => (n: String, v: Any) => FilterApi.gt(binaryColumn(n), @@ -144,6 +162,7 @@ private[sql] object ParquetFilters { case BinaryType => (n: String, v: Any) => FilterApi.gt(binaryColumn(n), Binary.fromByteArray(v.asInstanceOf[Array[Byte]])) + */ } private val makeGtEq: PartialFunction[DataType, (String, Any) => FilterPredicate] = { @@ -155,6 +174,9 @@ private[sql] object ParquetFilters { (n: String, v: Any) => FilterApi.gtEq(floatColumn(n), v.asInstanceOf[java.lang.Float]) case DoubleType => (n: String, v: Any) => FilterApi.gtEq(doubleColumn(n), v.asInstanceOf[java.lang.Double]) + + // See https://issues.apache.org/jira/browse/SPARK-11153 + /* case StringType => (n: String, v: Any) => FilterApi.gtEq(binaryColumn(n), @@ -162,6 +184,7 @@ private[sql] object ParquetFilters { case BinaryType => (n: String, v: Any) => FilterApi.gtEq(binaryColumn(n), Binary.fromByteArray(v.asInstanceOf[Array[Byte]])) + */ } private val makeInSet: PartialFunction[DataType, (String, Set[Any]) => FilterPredicate] = { @@ -177,6 +200,9 @@ private[sql] object ParquetFilters { case DoubleType => (n: String, v: Set[Any]) => FilterApi.userDefined(doubleColumn(n), SetInFilter(v.asInstanceOf[Set[java.lang.Double]])) + + // See https://issues.apache.org/jira/browse/SPARK-11153 + /* case StringType => (n: String, v: Set[Any]) => FilterApi.userDefined(binaryColumn(n), @@ -185,6 +211,7 @@ private[sql] object ParquetFilters { (n: String, v: Set[Any]) => FilterApi.userDefined(binaryColumn(n), SetInFilter(v.map(e => Binary.fromByteArray(e.asInstanceOf[Array[Byte]])))) + */ } /** diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala index 45ad3fde559c0..7f4d36768e597 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala @@ -219,7 +219,8 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex } } - test("filter pushdown - string") { + // See https://issues.apache.org/jira/browse/SPARK-11153 + ignore("filter pushdown - string") { withParquetDataFrame((1 to 4).map(i => Tuple1(i.toString))) { implicit df => checkFilterPredicate('_1.isNull, classOf[Eq[_]], Seq.empty[Row]) checkFilterPredicate( @@ -247,7 +248,8 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex } } - test("filter pushdown - binary") { + // See https://issues.apache.org/jira/browse/SPARK-11153 + ignore("filter pushdown - binary") { implicit class IntToBinary(int: Int) { def b: Array[Byte] = int.toString.getBytes("UTF-8") }