diff --git a/native-sql-engine/core/src/main/scala/com/intel/oap/execution/ColumnarBroadcastHashJoinExec.scala b/native-sql-engine/core/src/main/scala/com/intel/oap/execution/ColumnarBroadcastHashJoinExec.scala index adef573f1..3089986d8 100644 --- a/native-sql-engine/core/src/main/scala/com/intel/oap/execution/ColumnarBroadcastHashJoinExec.scala +++ b/native-sql-engine/core/src/main/scala/com/intel/oap/execution/ColumnarBroadcastHashJoinExec.scala @@ -91,6 +91,8 @@ case class ColumnarBroadcastHashJoinExec( } } + var supportCodegen = true + buildCheck() // A method in ShuffledJoin of spark3.2. @@ -111,6 +113,7 @@ case class ColumnarBroadcastHashJoinExec( ColumnarExpressionConverter.replaceWithColumnarExpression(conditionExpr) val supportCodegen = columnarConditionExpr.asInstanceOf[ColumnarExpression].supportColumnarCodegen(null) + this.supportCodegen = this.supportCodegen && supportCodegen // Columnar BHJ with condition only has codegen version of implementation. if (!supportCodegen) { throw new UnsupportedOperationException( @@ -141,12 +144,30 @@ case class ColumnarBroadcastHashJoinExec( // build check for expr if (buildKeyExprs != null) { for (expr <- buildKeyExprs) { - ColumnarExpressionConverter.replaceWithColumnarExpression(expr) + val columnarBuildKeyExpr = ColumnarExpressionConverter.replaceWithColumnarExpression(expr) + val supportCodegen = + columnarBuildKeyExpr.asInstanceOf[ColumnarExpression].supportColumnarCodegen(null) + this.supportCodegen = this.supportCodegen && supportCodegen + // Fall back the join who has join condition, but does not support codegen. + if (condition.isDefined && !supportCodegen) { + throw new UnsupportedOperationException("Fall back due to codegen is" + + " not supported for " + columnarBuildKeyExpr) + } + } } if (streamedKeyExprs != null) { for (expr <- streamedKeyExprs) { - ColumnarExpressionConverter.replaceWithColumnarExpression(expr) + val columnarStreamedKeyExpr = + ColumnarExpressionConverter.replaceWithColumnarExpression(expr) + val supportCodegen = + columnarStreamedKeyExpr.asInstanceOf[ColumnarExpression].supportColumnarCodegen(null) + this.supportCodegen = this.supportCodegen && supportCodegen + // Fall back the join who has join condition, but does not support codegen. + if (condition.isDefined && !supportCodegen) { + throw new UnsupportedOperationException("Fall back due to codegen is" + + " not supported for " + columnarStreamedKeyExpr) + } } } } @@ -273,7 +294,9 @@ case class ColumnarBroadcastHashJoinExec( override def getChild: SparkPlan = streamedPlan - override def supportColumnarCodegen: Boolean = true + override def supportColumnarCodegen: Boolean = { + this.supportCodegen + } val output_skip_alias = if (projectList == null || projectList.isEmpty) super.output diff --git a/native-sql-engine/core/src/main/scala/com/intel/oap/execution/ColumnarShuffledHashJoinExec.scala b/native-sql-engine/core/src/main/scala/com/intel/oap/execution/ColumnarShuffledHashJoinExec.scala index 35a3de30f..216510b25 100644 --- a/native-sql-engine/core/src/main/scala/com/intel/oap/execution/ColumnarShuffledHashJoinExec.scala +++ b/native-sql-engine/core/src/main/scala/com/intel/oap/execution/ColumnarShuffledHashJoinExec.scala @@ -80,6 +80,8 @@ case class ColumnarShuffledHashJoinExec( "buildTime" -> SQLMetrics.createTimingMetric(sparkContext, "time to build hash map"), "joinTime" -> SQLMetrics.createTimingMetric(sparkContext, "join time")) + var supportCodegen = true + buildCheck() // For spark 3.2. @@ -129,7 +131,15 @@ case class ColumnarShuffledHashJoinExec( // build check for condition val conditionExpr: Expression = condition.orNull if (conditionExpr != null) { - ColumnarExpressionConverter.replaceWithColumnarExpression(conditionExpr) + val columnarConditionExpr = + ColumnarExpressionConverter.replaceWithColumnarExpression(conditionExpr) + val supportCodegen = + columnarConditionExpr.asInstanceOf[ColumnarExpression].supportColumnarCodegen(null) + this.supportCodegen = this.supportCodegen && supportCodegen + if (!supportCodegen) { + throw new UnsupportedOperationException( + "Condition expression is not fully supporting codegen!") + } } // build check types for (attr <- streamedPlan.output) { @@ -153,12 +163,29 @@ case class ColumnarShuffledHashJoinExec( // build check for expr if (buildKeyExprs != null) { for (expr <- buildKeyExprs) { - ColumnarExpressionConverter.replaceWithColumnarExpression(expr) + val columnarBuildKeyExpr = ColumnarExpressionConverter.replaceWithColumnarExpression(expr) + val supportCodegen = + columnarBuildKeyExpr.asInstanceOf[ColumnarExpression].supportColumnarCodegen(null) + this.supportCodegen = this.supportCodegen && supportCodegen + // Fall back the join who has join condition, but does not support codegen. + if (condition.isDefined && !supportCodegen) { + throw new UnsupportedOperationException("Fall back due to codegen is" + + " not supported for " + columnarBuildKeyExpr) + } } } if (streamedKeyExprs != null) { for (expr <- streamedKeyExprs) { - ColumnarExpressionConverter.replaceWithColumnarExpression(expr) + val columnarStreamedKeyExpr = + ColumnarExpressionConverter.replaceWithColumnarExpression(expr) + val supportCodegen = + columnarStreamedKeyExpr.asInstanceOf[ColumnarExpression].supportColumnarCodegen(null) + this.supportCodegen = this.supportCodegen && supportCodegen + // Fall back the join who has join condition, but does not support codegen. + if (condition.isDefined && !supportCodegen) { + throw new UnsupportedOperationException("Fall back due to codegen is" + + " not supported for " + columnarStreamedKeyExpr) + } } } } @@ -234,7 +261,9 @@ case class ColumnarShuffledHashJoinExec( .prepareHashBuildFunction(buildKeyExprs, buildPlan.output, builder_type)) } - override def supportColumnarCodegen: Boolean = true + override def supportColumnarCodegen: Boolean = { + this.supportCodegen + } val output_skip_alias = if (projectList == null || projectList.isEmpty) super.output @@ -286,7 +315,7 @@ case class ColumnarShuffledHashJoinExec( } override def doExecuteColumnar(): RDD[ColumnarBatch] = { -// we will use previous codegen join to handle joins with condition + // we will use previous codegen join to handle joins with condition if (condition.isDefined) { return getCodeGenIterator } diff --git a/native-sql-engine/core/src/main/scala/com/intel/oap/execution/ColumnarSortMergeJoinExec.scala b/native-sql-engine/core/src/main/scala/com/intel/oap/execution/ColumnarSortMergeJoinExec.scala index a6f2290fa..19229bcf9 100644 --- a/native-sql-engine/core/src/main/scala/com/intel/oap/execution/ColumnarSortMergeJoinExec.scala +++ b/native-sql-engine/core/src/main/scala/com/intel/oap/execution/ColumnarSortMergeJoinExec.scala @@ -235,6 +235,7 @@ case class ColumnarSortMergeJoinExec( Seq(streamedPlan.executeColumnar()) } + // Only has codegen implementation. override def supportColumnarCodegen: Boolean = true val output_skip_alias =