diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala index 36080ec8b7191..75b9b626d7a21 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala @@ -26,14 +26,24 @@ import org.apache.spark.sql.catalyst.trees abstract class LogicalPlan extends QueryPlan[LogicalPlan] { self: Product => + // TODO: handle overflow? /** * Estimates of various statistics. The default estimation logic simply sums up the corresponding * statistic produced by the children. To override this behavior, override `statistics` and * assign it a overriden version of `Statistics`. */ case class Statistics( - numTuples: Long = childrenStats.map(_.numTuples).sum, - sizeInBytes: Long = childrenStats.map(_.sizeInBytes).sum + /** + * Number of output tuples. For leaf operators this defaults to 1, otherwise it is set to the + * product of children's `numTuples`. + */ + numTuples: Long = childrenStats.map(_.numTuples).product, + + /** + * Physical size in bytes. For leaf operators this defaults to 1, otherwise it is set to the + * product of children's `sizeInBytes`. + */ + sizeInBytes: Long = childrenStats.map(_.sizeInBytes).product ) lazy val statistics: Statistics = new Statistics lazy val childrenStats = children.map(_.statistics) @@ -104,6 +114,8 @@ abstract class LogicalPlan extends QueryPlan[LogicalPlan] { abstract class LeafNode extends LogicalPlan with trees.LeafNode[LogicalPlan] { self: Product => + override lazy val statistics = Statistics(numTuples = 1L, sizeInBytes = 1L) + // Leaf nodes by definition cannot reference any input attributes. override def references = Set.empty } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala index 55b3c99fb399c..5ed5a89b1e2a0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala @@ -85,12 +85,12 @@ case class SparkLogicalPlan(alreadyPlanned: SparkPlan) // If this is wrapping around ExistingRdd and no reasonable estimation logic is implemented, // return a default value. sizeInBytes = { - val defaultSum = childrenStats.map(_.sizeInBytes).sum + val naiveVal = childrenStats.map(_.sizeInBytes).product alreadyPlanned match { // TODO: Instead of returning a default value here, find a way to return a meaningful // size estimate for RDDs. See PR 1238 for more discussions. - case e: ExistingRdd if defaultSum == 0 => statsDefaultSizeInBytes - case _ => defaultSum + case e: ExistingRdd if naiveVal == 1L => statsDefaultSizeInBytes + case _ => naiveVal } } ) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala index 8962f3a07b9e0..ad3753982715f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala @@ -58,7 +58,7 @@ private[sql] case class ParquetRelation( sizeInBytes = { val hdfsPath = new Path(path) val fs = hdfsPath.getFileSystem(conf.getOrElse(ContextUtil.getConfiguration(new Job()))) - fs.getContentSummary(hdfsPath).getLength // TODO: in bytes or system-dependent? + math.max(fs.getContentSummary(hdfsPath).getLength, 1L) // TODO: in bytes or system-dependent? } ) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index 909fb4de4a421..49b1301b55fc4 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -274,7 +274,7 @@ private[hive] case class MetastoreRelation // TODO: check if this estimate is valid for tables after partition pruning. // Size getters adapted from SizeBasedBigTableSelectorForAutoSMJ.java in Hive (version 0.13). override val sizeInBytes: Long = - maybeGetSize(hiveConf, hiveQlTable.getProperty("totalSize"), path) + math.max(maybeGetSize(hiveConf, hiveQlTable.getProperty("totalSize"), path), 1L) private[this] def maybeGetSize(conf: HiveConf, size: String, path: Path): Long = { val res = try { Some(size.toLong) } catch { case _: Exception => None }