Skip to content

Commit

Permalink
Only checks for partition batch pruning flag once
Browse files Browse the repository at this point in the history
  • Loading branch information
liancheng committed Sep 10, 2014
1 parent 489f97b commit 8b8552b
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@ private[sql] case class InMemoryColumnarTableScan(
}
}

// Accumulators used for testing purposes
val readPartitions = sparkContext.accumulator(0)
val readBatches = sparkContext.accumulator(0)

Expand Down Expand Up @@ -211,27 +212,14 @@ private[sql] case class InMemoryColumnarTableScan(
}

val nextRow = new SpecificMutableRow(requestedColumnDataTypes)
val rows = cachedBatchIterator
// Skip pruned batches
.filter { cachedBatch =>
if (inMemoryPartitionPruningEnabled && !partitionFilter(cachedBatch.stats)) {
def statsString = relation.partitionStatistics.schema
.zip(cachedBatch.stats)
.map { case (a, s) => s"${a.name}: $s" }
.mkString(", ")
logInfo(s"Skipping partition based on stats $statsString")
false
} else {
readBatches += 1
true
}
}
// Build column accessors
.map { cachedBatch =>
requestedColumnIndices.map(cachedBatch.buffers(_)).map(ColumnAccessor(_))
}
// Extract rows via column accessors
.flatMap { columnAccessors =>

def cachedBatchesToRows(cacheBatches: Iterator[CachedBatch]) = {
val rows = cacheBatches.flatMap { cachedBatch =>
// Build column accessors
val columnAccessors =
requestedColumnIndices.map(cachedBatch.buffers(_)).map(ColumnAccessor(_))

// Extract rows via column accessors
new Iterator[Row] {
override def next() = {
var i = 0
Expand All @@ -246,11 +234,34 @@ private[sql] case class InMemoryColumnarTableScan(
}
}

if (rows.hasNext) {
readPartitions += 1
if (rows.hasNext) {
readPartitions += 1
}

rows
}

rows
// Do partition batch pruning if enabled
val cachedBatchesToScan =
if (inMemoryPartitionPruningEnabled) {
cachedBatchIterator.filter { cachedBatch =>
if (!partitionFilter(cachedBatch.stats)) {
def statsString = relation.partitionStatistics.schema
.zip(cachedBatch.stats)
.map { case (a, s) => s"${a.name}: $s" }
.mkString(", ")
logInfo(s"Skipping partition based on stats $statsString")
false
} else {
readBatches += 1
true
}
}
} else {
cachedBatchIterator
}

cachedBatchesToRows(cachedBatchesToScan)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@

package org.apache.spark.sql.hive

import scala.collection.JavaConversions._

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{Path, PathFilter}
import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants._
Expand Down

0 comments on commit 8b8552b

Please sign in to comment.