Skip to content

Commit

Permalink
Merge pull request #4 from yhuai/spark-5775-yin
Browse files Browse the repository at this point in the history
Remove runtime pattern matching.
  • Loading branch information
liancheng committed Feb 27, 2015
2 parents ca6e038 + b0b74fb commit cee55cf
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,9 @@ private[sql] case class ParquetTableScan(
conf)

if (requestedPartitionOrdinals.nonEmpty) {
// This check if based on CatalystConverter.createRootConverter.
val primitiveRow = output.forall(a => ParquetTypesConverter.isPrimitiveType(a.dataType))

baseRDD.mapPartitionsWithInputSplit { case (split, iter) =>
val partValue = "([^=]+)=([^=]+)".r
val partValues =
Expand All @@ -143,37 +146,46 @@ private[sql] case class ParquetTableScan(
relation.partitioningAttributes
.map(a => Cast(Literal(partValues(a.name)), a.dataType).eval(EmptyRow))

val mutableRow = new GenericMutableRow(output.size)

new Iterator[Row] {
def hasNext = iter.hasNext
def next() = {
iter.next() match {
case (_, row: SpecificMutableRow) =>
// Parquet will leave partitioning columns empty, so we fill them in here.
var i = 0
while (i < requestedPartitionOrdinals.size) {
row(requestedPartitionOrdinals(i)._2) =
partitionRowValues(requestedPartitionOrdinals(i)._1)
i += 1
}
row

case (_, row: Row) =>
var i = 0
while (i < row.size) {
mutableRow(i) = row(i)
i += 1
}

i = 0
while (i < requestedPartitionOrdinals.size) {
mutableRow(requestedPartitionOrdinals(i)._2) =
partitionRowValues(requestedPartitionOrdinals(i)._1)
i += 1
}

mutableRow
if (primitiveRow) {
new Iterator[Row] {
def hasNext = iter.hasNext
def next() = {
// We are using CatalystPrimitiveRowConverter and it returns a SpecificMutableRow.
val row = iter.next()._2.asInstanceOf[SpecificMutableRow]

// Parquet will leave partitioning columns empty, so we fill them in here.
var i = 0
while (i < requestedPartitionOrdinals.size) {
row(requestedPartitionOrdinals(i)._2) =
partitionRowValues(requestedPartitionOrdinals(i)._1)
i += 1
}
row
}
}
} else {
// Create a mutable row since we need to fill in values from partition columns.
val mutableRow = new GenericMutableRow(output.size)
new Iterator[Row] {
def hasNext = iter.hasNext
def next() = {
// We are using CatalystGroupConverter and it returns a GenericRow.
// Since GenericRow is not mutable, we just cast it to a Row.
val row = iter.next()._2.asInstanceOf[Row]

var i = 0
while (i < row.size) {
mutableRow(i) = row(i)
i += 1
}
// Parquet will leave partitioning columns empty, so we fill them in here.
i = 0
while (i < requestedPartitionOrdinals.size) {
mutableRow(requestedPartitionOrdinals(i)._2) =
partitionRowValues(requestedPartitionOrdinals(i)._1)
i += 1
}
mutableRow
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -476,16 +476,21 @@ private[sql] case class ParquetRelation2(
// When the data does not include the key and the key is requested then we must fill it in
// based on information from the input split.
if (!partitionKeysIncludedInDataSchema && partitionKeyLocations.nonEmpty) {
// This check if based on CatalystConverter.createRootConverter.
val primitiveRow =
requestedSchema.forall(a => ParquetTypesConverter.isPrimitiveType(a.dataType))

baseRDD.mapPartitionsWithInputSplit { case (split: ParquetInputSplit, iterator) =>
val partValues = selectedPartitions.collectFirst {
case p if split.getPath.getParent.toString == p.path => p.values
}.get

val requiredPartOrdinal = partitionKeyLocations.keys.toSeq
val mutableRow = new GenericMutableRow(requestedSchema.size)

iterator.map {
case (_, row: SpecificMutableRow) =>
if (primitiveRow) {
iterator.map { pair =>
// We are using CatalystPrimitiveRowConverter and it returns a SpecificMutableRow.
val row = pair._2.asInstanceOf[SpecificMutableRow]
var i = 0
while (i < requiredPartOrdinal.size) {
// TODO Avoids boxing cost here!
Expand All @@ -494,8 +499,14 @@ private[sql] case class ParquetRelation2(
i += 1
}
row

case (_, row: Row) =>
}
} else {
// Create a mutable row since we need to fill in values from partition columns.
val mutableRow = new GenericMutableRow(requestedSchema.size)
iterator.map { pair =>
// We are using CatalystGroupConverter and it returns a GenericRow.
// Since GenericRow is not mutable, we just cast it to a Row.
val row = pair._2.asInstanceOf[Row]
var i = 0
while (i < row.size) {
// TODO Avoids boxing cost here!
Expand All @@ -511,6 +522,7 @@ private[sql] case class ParquetRelation2(
i += 1
}
mutableRow
}
}
}
} else {
Expand Down

0 comments on commit cee55cf

Please sign in to comment.