Skip to content
This repository has been archived by the owner on Sep 18, 2023. It is now read-only.

Commit

Permalink
[OAP-1846][oap-native-sql] add more fallback logic (#7)
Browse files Browse the repository at this point in the history
* [oap-native-sql] move back test script

* [oap-native-sql] add more fallback and refresh ut

* [oap-native-sql] use function to do build check

* [oap-native-sql] add missing types

* fix on decimal dataset

* add fallback to SMJ and refine
  • Loading branch information
rui-mo authored Jan 26, 2021
1 parent 3ed71f3 commit b47116d
Show file tree
Hide file tree
Showing 342 changed files with 13,898 additions and 1,921 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1668,8 +1668,12 @@ final void setNull(int rowId) {

@Override
final void setInt(int rowId, int value) {
BigDecimal v = new BigDecimal(value);
writer.setSafe(rowId, v.setScale(writer.getScale()));
writer.setSafe(rowId, value);
}

@Override
final void setLong(int rowId, long value) {
writer.setSafe(rowId, value);
}

@Override
Expand Down Expand Up @@ -1760,6 +1764,11 @@ void setLongs(int rowId, int count, long value) {
}
}

@Override
void setLong(int rowId, long value) {
writer.setSafe(rowId, value);
}

@Override
final void setNull(int rowId) {
writer.setNull(rowId);
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/java/com/intel/oap/vectorized/JniUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ private static void loadIncludeFromJar(String tmp_dir) throws IOException, Illeg
}
final File folder = new File(path);
copyResourcesToDirectory(urlConnection,
tmp_dir + "/" + "nativesql_include", folder);
tmp_dir + "/" + "nativesql_include", folder);
}
}
}
Expand Down
44 changes: 44 additions & 0 deletions core/src/main/scala/com/intel/oap/ColumnarGuardRule.scala
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import org.apache.spark.sql.catalyst.plans.FullOuter
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.adaptive._
import org.apache.spark.sql.execution.aggregate.HashAggregateExec
import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec
import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
import org.apache.spark.sql.execution.exchange._
import org.apache.spark.sql.execution.joins._
Expand Down Expand Up @@ -57,6 +58,18 @@ case class ColumnarGuardRule(conf: SparkConf) extends Rule[SparkPlan] {
val columnarPlan = plan match {
case plan: BatchScanExec =>
new ColumnarBatchScanExec(plan.output, plan.scan)
case plan: FileSourceScanExec =>
if (plan.supportsColumnar) {
logWarning(s"FileSourceScanExec ${plan.nodeName} supports columnar, " +
s"may causing columnar conversion exception")
}
plan
case plan: InMemoryTableScanExec =>
if (plan.supportsColumnar) {
logWarning(s"InMemoryTableScanExec ${plan.nodeName} supports columnar, " +
s"may causing columnar conversion exception")
}
plan
case plan: ProjectExec =>
new ColumnarConditionProjectExec(null, plan.projectList, plan.child)
case plan: FilterExec =>
Expand Down Expand Up @@ -95,6 +108,36 @@ case class ColumnarGuardRule(conf: SparkConf) extends Rule[SparkPlan] {
case plan: BroadcastExchangeExec =>
ColumnarBroadcastExchangeExec(plan.mode, plan.child)
case plan: BroadcastHashJoinExec =>
// We need to check if BroadcastExchangeExec can be converted to columnar-based.
// If not, BHJ should also be row-based.
val left = plan.left
left match {
case exec: BroadcastExchangeExec =>
new ColumnarBroadcastExchangeExec(exec.mode, exec.child)
case BroadcastQueryStageExec(_, plan: BroadcastExchangeExec) =>
new ColumnarBroadcastExchangeExec(plan.mode, plan.child)
case BroadcastQueryStageExec(_, plan: ReusedExchangeExec) =>
plan match {
case ReusedExchangeExec(_, b: BroadcastExchangeExec) =>
new ColumnarBroadcastExchangeExec(b.mode, b.child)
case _ =>
}
case _ =>
}
val right = plan.right
right match {
case exec: BroadcastExchangeExec =>
new ColumnarBroadcastExchangeExec(exec.mode, exec.child)
case BroadcastQueryStageExec(_, plan: BroadcastExchangeExec) =>
new ColumnarBroadcastExchangeExec(plan.mode, plan.child)
case BroadcastQueryStageExec(_, plan: ReusedExchangeExec) =>
plan match {
case ReusedExchangeExec(_, b: BroadcastExchangeExec) =>
new ColumnarBroadcastExchangeExec(b.mode, b.child)
case _ =>
}
case _ =>
}
ColumnarBroadcastHashJoinExec(
plan.leftKeys,
plan.rightKeys,
Expand Down Expand Up @@ -125,6 +168,7 @@ case class ColumnarGuardRule(conf: SparkConf) extends Rule[SparkPlan] {
}
} catch {
case e: UnsupportedOperationException =>
System.out.println(s"Fall back to use row-based operators, error is ${e.getMessage}")
return false
}
return true
Expand Down
30 changes: 19 additions & 11 deletions core/src/main/scala/com/intel/oap/ColumnarPlugin.scala
Original file line number Diff line number Diff line change
Expand Up @@ -33,31 +33,39 @@ import org.apache.spark.sql.execution.window.WindowExec
import org.apache.spark.sql.internal.SQLConf

case class ColumnarPreOverrides(conf: SparkConf) extends Rule[SparkPlan] {
val columnarConf = ColumnarPluginConfig.getConf(conf)
val columnarConf: ColumnarPluginConfig = ColumnarPluginConfig.getConf(conf)
var isSupportAdaptive: Boolean = true
val testing: Boolean = columnarConf.isTesting

def replaceWithColumnarPlan(plan: SparkPlan): SparkPlan = plan match {
case RowGuard(child: CustomShuffleReaderExec) =>
replaceWithColumnarPlan(child)
case plan: RowGuard =>
val actualPlan = plan.child match {
case p: BroadcastHashJoinExec =>
p.withNewChildren(p.children.map(child =>
child match {
case RowGuard(queryStage: BroadcastQueryStageExec) =>
fallBackBroadcastQueryStage(queryStage)
case queryStage: BroadcastQueryStageExec =>
fallBackBroadcastQueryStage(queryStage)
case other => other
}))
p.withNewChildren(p.children.map {
case RowGuard(queryStage: BroadcastQueryStageExec) =>
fallBackBroadcastQueryStage(queryStage)
case queryStage: BroadcastQueryStageExec =>
fallBackBroadcastQueryStage(queryStage)
case plan: BroadcastExchangeExec =>
// if BroadcastHashJoin is row-based, BroadcastExchange should also be row-based
RowGuard(plan)
case other => other
})
case other =>
other
}
logDebug(s"Columnar Processing for ${actualPlan.getClass} is under RowGuard.")
actualPlan.withNewChildren(actualPlan.children.map(replaceWithColumnarPlan))
case plan: BatchScanExec =>
logDebug(s"Columnar Processing for ${plan.getClass} is currently supported.")
new ColumnarBatchScanExec(plan.output, plan.scan)
if (testing) {
// disable ColumnarBatchScanExec according to config
plan
} else {
logDebug(s"Columnar Processing for ${plan.getClass} is currently supported.")
new ColumnarBatchScanExec(plan.output, plan.scan)
}
case plan: ProjectExec =>
val columnarPlan = replaceWithColumnarPlan(plan.child)
logDebug(s"Columnar Processing for ${plan.getClass} is currently supported.")
Expand Down
2 changes: 2 additions & 0 deletions core/src/main/scala/com/intel/oap/ColumnarPluginConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ class ColumnarPluginConfig(conf: SparkConf) {
conf.getBoolean("spark.oap.sql.columnar.shuffle.preferSpill", defaultValue = true)
val columnarShuffleUseCustomizedCompression: Boolean =
conf.getBoolean("spark.oap.sql.columnar.shuffle.customizedCompression", defaultValue = false)
val isTesting: Boolean =
conf.getBoolean("spark.oap.sql.columnar.testing", defaultValue = false)
val numaBindingInfo: ColumnarNumaBindingInfo = {
val enableNumaBinding: Boolean =
conf.getBoolean("spark.oap.sql.columnar.numaBinding", defaultValue = false)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,31 @@ case class ColumnarConditionProjectExec(
"numInputBatches" -> SQLMetrics.createMetric(sparkContext, "input_batches"),
"processTime" -> SQLMetrics.createTimingMetric(sparkContext, "totaltime_condproject"))

buildCheck(condition, projectList, child.output)

def buildCheck(condExpr: Expression, projectList: Seq[Expression],
originalInputAttributes: Seq[Attribute]): Unit = {
// check datatype
originalInputAttributes.toList.foreach(attr => {
try {
ConverterUtils.checkIfTypeSupported(attr.dataType)
} catch {
case e : UnsupportedOperationException =>
throw new UnsupportedOperationException(
s"${attr.dataType} is not supported in ColumnarConditionProjector.")
}
})
// check expr
if (condExpr != null) {
ColumnarExpressionConverter.replaceWithColumnarExpression(condExpr)
}
if (projectList != null) {
for (expr <- projectList) {
ColumnarExpressionConverter.replaceWithColumnarExpression(expr)
}
}
}

def isNullIntolerant(expr: Expression): Boolean = expr match {
case e: NullIntolerant => e.children.forall(isNullIntolerant)
case _ => false
Expand Down Expand Up @@ -232,6 +257,21 @@ case class ColumnarConditionProjectExec(
case class ColumnarUnionExec(children: Seq[SparkPlan]) extends SparkPlan {
// updating nullability to make all the children consistent

buildCheck()

def buildCheck(): Unit = {
for (child <- children) {
for (schema <- child.schema) {
try {
ConverterUtils.checkIfTypeSupported(schema.dataType)
} catch {
case e: UnsupportedOperationException =>
throw new UnsupportedOperationException(
s"${schema.dataType} is not supported in ColumnarUnionExec")
}
}
}
}
override def supportsColumnar = true
protected override def doExecuteColumnar(): RDD[ColumnarBatch] =
sparkContext.union(children.map(_.executeColumnar()))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector}

class ColumnarBatchScanExec(output: Seq[AttributeReference], @transient scan: Scan)
extends BatchScanExec(output, scan) {
val tmpDir = ColumnarPluginConfig.getConf(sparkContext.getConf).tmpFile
val tmpDir: String = ColumnarPluginConfig.getConf(sparkContext.getConf).tmpFile
override def supportsColumnar(): Boolean = true
override lazy val metrics = Map(
"numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,45 @@ case class ColumnarBroadcastHashJoinExec(
case BuildRight => (rkeys, lkeys)
}
}
buildCheck()

def buildCheck(): Unit = {
// build check for condition
val conditionExpr: Expression = condition.orNull
if (conditionExpr != null) {
ColumnarExpressionConverter.replaceWithColumnarExpression(conditionExpr)
}
// build check types
for (attr <- streamedPlan.output) {
try {
ConverterUtils.checkIfTypeSupported(attr.dataType)
} catch {
case e: UnsupportedOperationException =>
throw new UnsupportedOperationException(
s"${attr.dataType} is not supported in ColumnarBroadcastHashJoinExec.")
}
}
for (attr <- buildPlan.output) {
try {
ConverterUtils.checkIfTypeSupported(attr.dataType)
} catch {
case e: UnsupportedOperationException =>
throw new UnsupportedOperationException(
s"${attr.dataType} is not supported in ColumnarBroadcastHashJoinExec.")
}
}
// build check for expr
if (buildKeyExprs != null) {
for (expr <- buildKeyExprs) {
ColumnarExpressionConverter.replaceWithColumnarExpression(expr)
}
}
if (streamedKeyExprs != null) {
for (expr <- streamedKeyExprs) {
ColumnarExpressionConverter.replaceWithColumnarExpression(expr)
}
}
}

override def output: Seq[Attribute] =
if (projectList == null || projectList.isEmpty) super.output
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,24 @@ case class ColumnarExpandExec(

override def supportsColumnar = true

buildCheck()

def buildCheck(): Unit = {
// build check for projection
projections.foreach(proj =>
ColumnarProjection.buildCheck(originalInputAttributes, proj))
//check type
for (attr <- originalInputAttributes) {
try {
ConverterUtils.checkIfTypeSupported(attr.dataType)
} catch {
case e : UnsupportedOperationException =>
throw new UnsupportedOperationException(
s"${attr.dataType} is not supported in ColumnarExpandExec.")
}
}
}

protected override def doExecute(): RDD[InternalRow] =
throw new UnsupportedOperationException("doExecute is not supported in ColumnarExpandExec.")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,8 @@ case class ColumnarHashAggregateExec(
numOutputBatches.set(0)
numInputBatches.set(0)

buildCheck()

val (listJars, signature): (Seq[String], String) =
if (ColumnarPluginConfig
.getConf(sparkConf)
Expand Down Expand Up @@ -144,6 +146,50 @@ case class ColumnarHashAggregateExec(
}
listJars.foreach(jar => logInfo(s"Uploaded ${jar}"))

def buildCheck(): Unit = {
// check datatype
for (attr <- child.output) {
try {
ConverterUtils.checkIfTypeSupported(attr.dataType)
} catch {
case e : UnsupportedOperationException =>
throw new UnsupportedOperationException(
s"${attr.dataType} is not supported in ColumnarAggregation")
}
}
// check project
for (expr <- aggregateExpressions) {
val internalExpressionList = expr.aggregateFunction.children
ColumnarProjection.buildCheck(child.output, internalExpressionList)
}
ColumnarProjection.buildCheck(child.output, groupingExpressions)
ColumnarProjection.buildCheck(child.output, resultExpressions)
// check aggregate expressions
checkAggregate(aggregateExpressions)
}

def checkAggregate(aggregateExpressions: Seq[AggregateExpression]): Unit = {
for (expr <- aggregateExpressions) {
val mode = expr.mode
val aggregateFunction = expr.aggregateFunction
aggregateFunction match {
case Average(_) | Sum(_) | Count(_) | Max(_) | Min(_) =>
case StddevSamp(_) => mode match {
case Partial | Final =>
case other =>
throw new UnsupportedOperationException(s"not currently supported: $other.")
}
case other =>
throw new UnsupportedOperationException(s"not currently supported: $other.")
}
mode match {
case Partial | PartialMerge | Final =>
case other =>
throw new UnsupportedOperationException(s"not currently supported: $other.")
}
}
}

override def doExecuteColumnar(): RDD[ColumnarBatch] = {
child.executeColumnar().mapPartitionsWithIndex { (partIndex, iter) =>
ExecutorManager.tryTaskSet(numaBindingInfo)
Expand Down
Loading

0 comments on commit b47116d

Please sign in to comment.