Skip to content
This repository has been archived by the owner on Sep 18, 2023. It is now read-only.

[OAP-1846][oap-native-sql] add more fallback logic #7

Merged
merged 6 commits into from
Jan 26, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
Original file line number Diff line number Diff line change
Expand Up @@ -1668,8 +1668,12 @@ final void setNull(int rowId) {

@Override
final void setInt(int rowId, int value) {
BigDecimal v = new BigDecimal(value);
writer.setSafe(rowId, v.setScale(writer.getScale()));
writer.setSafe(rowId, value);
}

@Override
rui-mo marked this conversation as resolved.
Show resolved Hide resolved
final void setLong(int rowId, long value) {
writer.setSafe(rowId, value);
}

@Override
Expand Down Expand Up @@ -1760,6 +1764,11 @@ void setLongs(int rowId, int count, long value) {
}
}

@Override
void setLong(int rowId, long value) {
writer.setSafe(rowId, value);
}

@Override
final void setNull(int rowId) {
writer.setNull(rowId);
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/java/com/intel/oap/vectorized/JniUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ private static void loadIncludeFromJar(String tmp_dir) throws IOException, Illeg
}
final File folder = new File(path);
copyResourcesToDirectory(urlConnection,
tmp_dir + "/" + "nativesql_include", folder);
tmp_dir + "/" + "nativesql_include", folder);
}
}
}
Expand Down
44 changes: 44 additions & 0 deletions core/src/main/scala/com/intel/oap/ColumnarGuardRule.scala
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.adaptive._
import org.apache.spark.sql.execution.aggregate.HashAggregateExec
import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec
import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
import org.apache.spark.sql.execution.exchange._
import org.apache.spark.sql.execution.joins._
Expand Down Expand Up @@ -56,6 +57,18 @@ case class ColumnarGuardRule(conf: SparkConf) extends Rule[SparkPlan] {
val columnarPlan = plan match {
case plan: BatchScanExec =>
new ColumnarBatchScanExec(plan.output, plan.scan)
case plan: FileSourceScanExec =>
if (plan.supportsColumnar) {
logWarning(s"FileSourceScanExec ${plan.nodeName} supports columnar, " +
s"may causing columnar conversion exception")
}
plan
case plan: InMemoryTableScanExec =>
if (plan.supportsColumnar) {
logWarning(s"InMemoryTableScanExec ${plan.nodeName} supports columnar, " +
s"may causing columnar conversion exception")
}
plan
case plan: ProjectExec =>
new ColumnarConditionProjectExec(null, plan.projectList, plan.child)
case plan: FilterExec =>
Expand Down Expand Up @@ -94,6 +107,36 @@ case class ColumnarGuardRule(conf: SparkConf) extends Rule[SparkPlan] {
case plan: BroadcastExchangeExec =>
ColumnarBroadcastExchangeExec(plan.mode, plan.child)
case plan: BroadcastHashJoinExec =>
// We need to check if BroadcastExchangeExec can be converted to columnar-based.
// If not, BHJ should also be row-based.
val left = plan.left
left match {
case exec: BroadcastExchangeExec =>
new ColumnarBroadcastExchangeExec(exec.mode, exec.child)
case BroadcastQueryStageExec(_, plan: BroadcastExchangeExec) =>
new ColumnarBroadcastExchangeExec(plan.mode, plan.child)
case BroadcastQueryStageExec(_, plan: ReusedExchangeExec) =>
plan match {
case ReusedExchangeExec(_, b: BroadcastExchangeExec) =>
new ColumnarBroadcastExchangeExec(b.mode, b.child)
case _ =>
}
case _ =>
}
val right = plan.right
right match {
case exec: BroadcastExchangeExec =>
new ColumnarBroadcastExchangeExec(exec.mode, exec.child)
case BroadcastQueryStageExec(_, plan: BroadcastExchangeExec) =>
new ColumnarBroadcastExchangeExec(plan.mode, plan.child)
case BroadcastQueryStageExec(_, plan: ReusedExchangeExec) =>
plan match {
case ReusedExchangeExec(_, b: BroadcastExchangeExec) =>
new ColumnarBroadcastExchangeExec(b.mode, b.child)
case _ =>
}
case _ =>
}
ColumnarBroadcastHashJoinExec(
plan.leftKeys,
plan.rightKeys,
Expand Down Expand Up @@ -124,6 +167,7 @@ case class ColumnarGuardRule(conf: SparkConf) extends Rule[SparkPlan] {
}
} catch {
case e: UnsupportedOperationException =>
System.out.println(s"Fall back to use row-based operators, error is ${e.getMessage}")
return false
}
return true
Expand Down
30 changes: 19 additions & 11 deletions core/src/main/scala/com/intel/oap/ColumnarPlugin.scala
Original file line number Diff line number Diff line change
Expand Up @@ -33,31 +33,39 @@ import org.apache.spark.sql.execution.window.WindowExec
import org.apache.spark.sql.internal.SQLConf

case class ColumnarPreOverrides(conf: SparkConf) extends Rule[SparkPlan] {
val columnarConf = ColumnarPluginConfig.getConf(conf)
val columnarConf: ColumnarPluginConfig = ColumnarPluginConfig.getConf(conf)
var isSupportAdaptive: Boolean = true
val testing: Boolean = columnarConf.isTesting

def replaceWithColumnarPlan(plan: SparkPlan): SparkPlan = plan match {
case RowGuard(child: CustomShuffleReaderExec) =>
replaceWithColumnarPlan(child)
case plan: RowGuard =>
val actualPlan = plan.child match {
case p: BroadcastHashJoinExec =>
p.withNewChildren(p.children.map(child =>
child match {
case RowGuard(queryStage: BroadcastQueryStageExec) =>
fallBackBroadcastQueryStage(queryStage)
case queryStage: BroadcastQueryStageExec =>
fallBackBroadcastQueryStage(queryStage)
case other => other
}))
p.withNewChildren(p.children.map {
case RowGuard(queryStage: BroadcastQueryStageExec) =>
fallBackBroadcastQueryStage(queryStage)
case queryStage: BroadcastQueryStageExec =>
fallBackBroadcastQueryStage(queryStage)
case plan: BroadcastExchangeExec =>
// if BroadcastHashJoin is row-based, BroadcastExchange should also be row-based
RowGuard(plan)
case other => other
})
case other =>
other
}
logDebug(s"Columnar Processing for ${actualPlan.getClass} is under RowGuard.")
actualPlan.withNewChildren(actualPlan.children.map(replaceWithColumnarPlan))
case plan: BatchScanExec =>
logDebug(s"Columnar Processing for ${plan.getClass} is currently supported.")
new ColumnarBatchScanExec(plan.output, plan.scan)
if (testing) {
// disable ColumnarBatchScanExec according to config
plan
} else {
logDebug(s"Columnar Processing for ${plan.getClass} is currently supported.")
new ColumnarBatchScanExec(plan.output, plan.scan)
}
case plan: ProjectExec =>
val columnarPlan = replaceWithColumnarPlan(plan.child)
logDebug(s"Columnar Processing for ${plan.getClass} is currently supported.")
Expand Down
2 changes: 2 additions & 0 deletions core/src/main/scala/com/intel/oap/ColumnarPluginConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ class ColumnarPluginConfig(conf: SparkConf) {
conf.getBoolean("spark.oap.sql.columnar.shuffle.preferSpill", defaultValue = true)
val columnarShuffleUseCustomizedCompression: Boolean =
conf.getBoolean("spark.oap.sql.columnar.shuffle.customizedCompression", defaultValue = false)
val isTesting: Boolean =
conf.getBoolean("spark.oap.sql.columnar.testing", defaultValue = false)
val numaBindingInfo: ColumnarNumaBindingInfo = {
val enableNumaBinding: Boolean =
conf.getBoolean("spark.oap.sql.columnar.numaBinding", defaultValue = false)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,31 @@ case class ColumnarConditionProjectExec(
"numInputBatches" -> SQLMetrics.createMetric(sparkContext, "input_batches"),
"processTime" -> SQLMetrics.createTimingMetric(sparkContext, "totaltime_condproject"))

buildCheck(condition, projectList, child.output)

def buildCheck(condExpr: Expression, projectList: Seq[Expression],
originalInputAttributes: Seq[Attribute]): Unit = {
// check datatype
originalInputAttributes.toList.foreach(attr => {
try {
ConverterUtils.checkIfTypeSupported(attr.dataType)
} catch {
case e : UnsupportedOperationException =>
throw new UnsupportedOperationException(
s"${attr.dataType} is not supported in ColumnarConditionProjector.")
}
})
// check expr
if (condExpr != null) {
ColumnarExpressionConverter.replaceWithColumnarExpression(condExpr)
}
if (projectList != null) {
for (expr <- projectList) {
ColumnarExpressionConverter.replaceWithColumnarExpression(expr)
}
}
}

def isNullIntolerant(expr: Expression): Boolean = expr match {
case e: NullIntolerant => e.children.forall(isNullIntolerant)
case _ => false
Expand Down Expand Up @@ -232,6 +257,21 @@ case class ColumnarConditionProjectExec(
case class ColumnarUnionExec(children: Seq[SparkPlan]) extends SparkPlan {
// updating nullability to make all the children consistent

buildCheck()

def buildCheck(): Unit = {
for (child <- children) {
for (schema <- child.schema) {
try {
ConverterUtils.checkIfTypeSupported(schema.dataType)
} catch {
case e: UnsupportedOperationException =>
throw new UnsupportedOperationException(
s"${schema.dataType} is not supported in ColumnarUnionExec")
}
}
}
}
override def supportsColumnar = true
protected override def doExecuteColumnar(): RDD[ColumnarBatch] =
sparkContext.union(children.map(_.executeColumnar()))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector}

class ColumnarBatchScanExec(output: Seq[AttributeReference], @transient scan: Scan)
extends BatchScanExec(output, scan) {
val tmpDir = ColumnarPluginConfig.getConf(sparkContext.getConf).tmpFile
val tmpDir: String = ColumnarPluginConfig.getConf(sparkContext.getConf).tmpFile
override def supportsColumnar(): Boolean = true
override lazy val metrics = Map(
"numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,45 @@ case class ColumnarBroadcastHashJoinExec(
case BuildRight => (rkeys, lkeys)
}
}
buildCheck()

def buildCheck(): Unit = {
// build check for condition
val conditionExpr: Expression = condition.orNull
if (conditionExpr != null) {
ColumnarExpressionConverter.replaceWithColumnarExpression(conditionExpr)
}
// build check types
for (attr <- streamedPlan.output) {
try {
ConverterUtils.checkIfTypeSupported(attr.dataType)
} catch {
case e: UnsupportedOperationException =>
throw new UnsupportedOperationException(
s"${attr.dataType} is not supported in ColumnarBroadcastHashJoinExec.")
}
}
for (attr <- buildPlan.output) {
try {
ConverterUtils.checkIfTypeSupported(attr.dataType)
} catch {
case e: UnsupportedOperationException =>
throw new UnsupportedOperationException(
s"${attr.dataType} is not supported in ColumnarBroadcastHashJoinExec.")
}
}
// build check for expr
if (buildKeyExprs != null) {
for (expr <- buildKeyExprs) {
ColumnarExpressionConverter.replaceWithColumnarExpression(expr)
}
}
if (streamedKeyExprs != null) {
for (expr <- streamedKeyExprs) {
ColumnarExpressionConverter.replaceWithColumnarExpression(expr)
}
}
}

override def output: Seq[Attribute] =
if (projectList == null || projectList.isEmpty) super.output
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,24 @@ case class ColumnarExpandExec(

override def supportsColumnar = true

buildCheck()

def buildCheck(): Unit = {
// build check for projection
projections.foreach(proj =>
ColumnarProjection.buildCheck(originalInputAttributes, proj))
//check type
for (attr <- originalInputAttributes) {
try {
ConverterUtils.checkIfTypeSupported(attr.dataType)
} catch {
case e : UnsupportedOperationException =>
throw new UnsupportedOperationException(
s"${attr.dataType} is not supported in ColumnarExpandExec.")
}
}
}

protected override def doExecute(): RDD[InternalRow] =
throw new UnsupportedOperationException("doExecute is not supported in ColumnarExpandExec.")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,8 @@ case class ColumnarHashAggregateExec(
numOutputBatches.set(0)
numInputBatches.set(0)

buildCheck()

val (listJars, signature): (Seq[String], String) =
if (ColumnarPluginConfig
.getConf(sparkConf)
Expand Down Expand Up @@ -144,6 +146,50 @@ case class ColumnarHashAggregateExec(
}
listJars.foreach(jar => logInfo(s"Uploaded ${jar}"))

def buildCheck(): Unit = {
// check datatype
for (attr <- child.output) {
try {
ConverterUtils.checkIfTypeSupported(attr.dataType)
} catch {
case e : UnsupportedOperationException =>
throw new UnsupportedOperationException(
s"${attr.dataType} is not supported in ColumnarAggregation")
}
}
// check project
for (expr <- aggregateExpressions) {
val internalExpressionList = expr.aggregateFunction.children
ColumnarProjection.buildCheck(child.output, internalExpressionList)
}
ColumnarProjection.buildCheck(child.output, groupingExpressions)
ColumnarProjection.buildCheck(child.output, resultExpressions)
// check aggregate expressions
checkAggregate(aggregateExpressions)
}

def checkAggregate(aggregateExpressions: Seq[AggregateExpression]): Unit = {
for (expr <- aggregateExpressions) {
val mode = expr.mode
val aggregateFunction = expr.aggregateFunction
aggregateFunction match {
case Average(_) | Sum(_) | Count(_) | Max(_) | Min(_) =>
case StddevSamp(_) => mode match {
case Partial | Final =>
case other =>
throw new UnsupportedOperationException(s"not currently supported: $other.")
}
case other =>
throw new UnsupportedOperationException(s"not currently supported: $other.")
}
mode match {
case Partial | PartialMerge | Final =>
case other =>
throw new UnsupportedOperationException(s"not currently supported: $other.")
}
}
}

override def doExecuteColumnar(): RDD[ColumnarBatch] = {
child.executeColumnar().mapPartitionsWithIndex { (partIndex, iter) =>
ExecutorManager.tryTaskSet(numaBindingInfo)
Expand Down
Loading