Skip to content

Commit

Permalink
[SPARK-1368] Optimized HiveTableScan
Browse files Browse the repository at this point in the history
  • Loading branch information
liancheng committed May 28, 2014
1 parent 82eadc3 commit eb62fd3
Showing 1 changed file with 36 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -94,20 +94,27 @@ case class HiveTableScan(
(_: Any, partitionKeys: Array[String]) => {
val value = partitionKeys(ordinal)
val dataType = relation.partitionKeys(ordinal).dataType
castFromString(value, dataType)
unwrapHiveData(castFromString(value, dataType))
}
} else {
val ref = objectInspector.getAllStructFieldRefs
.find(_.getFieldName == a.name)
.getOrElse(sys.error(s"Can't find attribute $a"))
(row: Any, _: Array[String]) => {
val data = objectInspector.getStructFieldData(row, ref)
unwrapData(data, ref.getFieldObjectInspector)
unwrapHiveData(unwrapData(data, ref.getFieldObjectInspector))
}
}
}
}

private def unwrapHiveData(value: Any) = value match {
case maybeNull: String if maybeNull.toLowerCase == "null" => null
case varchar: HiveVarchar => varchar.getValue
case decimal: HiveDecimal => BigDecimal(decimal.bigDecimalValue)
case other => other
}

private def castFromString(value: String, dataType: DataType) = {
Cast(Literal(value), dataType).eval(null)
}
Expand Down Expand Up @@ -143,20 +150,34 @@ case class HiveTableScan(
}

def execute() = {
inputRdd.map { row =>
val values = row match {
case Array(deserializedRow: AnyRef, partitionKeys: Array[String]) =>
attributeFunctions.map(_(deserializedRow, partitionKeys))
case deserializedRow: AnyRef =>
attributeFunctions.map(_(deserializedRow, Array.empty))
inputRdd.mapPartitions { iterator =>
if (iterator.isEmpty) {
Iterator.empty
} else {
val mutableRow = new GenericMutableRow(attributes.length)
val buffered = iterator.buffered

(buffered.head match {
case Array(_, _) =>
buffered.map { case Array(deserializedRow, partitionKeys: Array[String]) =>
(deserializedRow, partitionKeys)
}

case _ =>
buffered.map { deserializedRow =>
(deserializedRow, Array.empty[String])
}
}).map { case (deserializedRow, partitionKeys: Array[String]) =>
var i = 0

while (i < attributes.length) {
mutableRow(i) = attributeFunctions(i)(deserializedRow, partitionKeys)
i += 1
}

mutableRow: Row
}
}
buildRow(values.map {
case n: String if n.toLowerCase == "null" => null
case varchar: org.apache.hadoop.hive.common.`type`.HiveVarchar => varchar.getValue
case decimal: org.apache.hadoop.hive.common.`type`.HiveDecimal =>
BigDecimal(decimal.bigDecimalValue)
case other => other
})
}
}

Expand Down

0 comments on commit eb62fd3

Please sign in to comment.