Skip to content

Commit

Permalink
Avoid dynamic dispatching when unwrapping Hive data.
Browse files Browse the repository at this point in the history
Conflicts:
	sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveOperators.scala
  • Loading branch information
liancheng committed Jun 1, 2014
1 parent 366c0c4 commit c49c70c
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,7 @@ import org.apache.hadoop.hive.ql.plan.{TableDesc, FileSinkDesc}
import org.apache.hadoop.hive.serde.serdeConstants
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption
import org.apache.hadoop.hive.serde2.objectinspector._
import org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaHiveDecimalObjectInspector
import org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaHiveVarcharObjectInspector
import org.apache.hadoop.hive.serde2.objectinspector.primitive._
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils
import org.apache.hadoop.hive.serde2.{ColumnProjectionUtils, Serializer}
import org.apache.hadoop.io.Writable
Expand Down Expand Up @@ -95,29 +94,34 @@ case class HiveTableScan(
attributes.map { a =>
val ordinal = relation.partitionKeys.indexOf(a)
if (ordinal >= 0) {
val dataType = relation.partitionKeys(ordinal).dataType
(_: Any, partitionKeys: Array[String]) => {
val value = partitionKeys(ordinal)
val dataType = relation.partitionKeys(ordinal).dataType
unwrapHiveData(castFromString(value, dataType))
castFromString(partitionKeys(ordinal), dataType)
}
} else {
val ref = objectInspector.getAllStructFieldRefs
.find(_.getFieldName == a.name)
.getOrElse(sys.error(s"Can't find attribute $a"))
val fieldObjectInspector = ref.getFieldObjectInspector

val unwrapHiveData = fieldObjectInspector match {
case _: HiveVarcharObjectInspector =>
(value: Any) => value.asInstanceOf[HiveVarchar].getValue
case _: HiveDecimalObjectInspector =>
(value: Any) => BigDecimal(value.asInstanceOf[HiveDecimal].bigDecimalValue())
case _ =>
identity[Any] _
}

(row: Any, _: Array[String]) => {
val data = objectInspector.getStructFieldData(row, ref)
unwrapHiveData(unwrapData(data, ref.getFieldObjectInspector))
val hiveData = unwrapData(data, fieldObjectInspector)
if (hiveData != null) unwrapHiveData(hiveData) else null
}
}
}
}

private def unwrapHiveData(value: Any) = value match {
case varchar: HiveVarchar => varchar.getValue
case decimal: HiveDecimal => BigDecimal(decimal.bigDecimalValue)
case other => other
}

private def castFromString(value: String, dataType: DataType) = {
Cast(Literal(value), dataType).eval(null)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,15 +133,14 @@ abstract class HiveComparisonTest
def isSorted(plan: LogicalPlan): Boolean = plan match {
case _: Join | _: Aggregate | _: BaseRelation | _: Generate | _: Sample | _: Distinct => false
case PhysicalOperation(_, _, Sort(_, _)) => true
case _ => plan.children.iterator.map(isSorted).exists(_ == true)
case _ => plan.children.iterator.exists(isSorted)
}

val orderedAnswer = hiveQuery.logical match {
// Clean out non-deterministic time schema info.
case _: NativeCommand => answer.filterNot(nonDeterministicLine).filterNot(_ == "")
case _: ExplainCommand => answer
case plan if isSorted(plan) => answer
case _ => answer.sorted
case plan => if (isSorted(plan)) answer else answer.sorted
}
orderedAnswer.map(cleanPaths)
}
Expand Down

0 comments on commit c49c70c

Please sign in to comment.