Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-44287][SQL][FOLLOWUP] Set partition index correctly #42185

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -85,20 +85,16 @@ case class ColumnarToRowExec(child: SparkPlan) extends ColumnarToRowTransition w
)

override def doExecute(): RDD[InternalRow] = {
val numOutputRows = longMetric("numOutputRows")
val numInputBatches = longMetric("numInputBatches")
val evaluatorFactory =
new ColumnarToRowEvaluatorFactory(
child.output,
numOutputRows,
numInputBatches)

val evaluatorFactory = new ColumnarToRowEvaluatorFactory(
child.output,
longMetric("numOutputRows"),
longMetric("numInputBatches"))
if (conf.usePartitionEvaluator) {
child.executeColumnar().mapPartitionsWithEvaluator(evaluatorFactory)
} else {
child.executeColumnar().mapPartitionsInternal { batches =>
child.executeColumnar().mapPartitionsWithIndexInternal { (index, batches) =>
val evaluator = evaluatorFactory.createEvaluator()
evaluator.eval(0, batches)
evaluator.eval(index, batches)
}
}
}
Expand Down Expand Up @@ -454,25 +450,20 @@ case class RowToColumnarExec(child: SparkPlan) extends RowToColumnarTransition {
)

override def doExecuteColumnar(): RDD[ColumnarBatch] = {
val numInputRows = longMetric("numInputRows")
val numOutputBatches = longMetric("numOutputBatches")
// Instead of creating a new config we are reusing columnBatchSize. In the future if we do
// combine with some of the Arrow conversion tools we will need to unify some of the configs.
val numRows = conf.columnBatchSize
val evaluatorFactory =
new RowToColumnarEvaluatorFactory(
conf.offHeapColumnVectorEnabled,
numRows,
schema,
numInputRows,
numOutputBatches)

val evaluatorFactory = new RowToColumnarEvaluatorFactory(
conf.offHeapColumnVectorEnabled,
// Instead of creating a new config we are reusing columnBatchSize. In the future if we do
// combine with some of the Arrow conversion tools we will need to unify some of the configs.
conf.columnBatchSize,
schema,
longMetric("numInputRows"),
longMetric("numOutputBatches"))
if (conf.usePartitionEvaluator) {
child.execute().mapPartitionsWithEvaluator(evaluatorFactory)
} else {
child.execute().mapPartitionsInternal { rowIterator =>
child.execute().mapPartitionsWithIndexInternal { (index, rowIterator) =>
val evaluator = evaluatorFactory.createEvaluator()
evaluator.eval(0, rowIterator)
evaluator.eval(index, rowIterator)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -279,40 +279,36 @@ class SparkSessionExtensionSuite extends SparkFunSuite with SQLHelper {
}
withSession(extensions) { session =>
session.conf.set(SQLConf.ADAPTIVE_EXECUTION_ENABLED, enableAQE)
Seq(true, false).foreach { enableEvaluator =>
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

According to the previous discussion, we don't need to test it as we are already using the evaluator by default, and we can test RDD#mapPartitionsWithEvaluator when we use it by default.

withSQLConf(SQLConf.USE_PARTITION_EVALUATOR.key -> enableEvaluator.toString) {
assert(session.sessionState.columnarRules.contains(
MyColumnarRule(PreRuleReplaceAddWithBrokenVersion(), MyPostRule())))
import session.sqlContext.implicits._
// perform a join to inject a broadcast exchange
val left = Seq((1, 50L), (2, 100L), (3, 150L)).toDF("l1", "l2")
val right = Seq((1, 50L), (2, 100L), (3, 150L)).toDF("r1", "r2")
val data = left.join(right, $"l1" === $"r1")
// repartitioning avoids having the add operation pushed up into the LocalTableScan
.repartition(1)
val df = data.selectExpr("l2 + r2")
// execute the plan so that the final adaptive plan is available when AQE is on
df.collect()
val found = collectPlanSteps(df.queryExecution.executedPlan).sum
// 1 MyBroadcastExchangeExec
// 1 MyShuffleExchangeExec
// 1 ColumnarToRowExec
// 2 ColumnarProjectExec
// 1 ReplacedRowToColumnarExec
// so 11121 is expected.
assert(found == 11121)

// Verify that we get back the expected, wrong, result
val result = df.collect()
assert(result(0).getLong(0) == 101L) // Check that broken columnar Add was used.
assert(result(1).getLong(0) == 201L)
assert(result(2).getLong(0) == 301L)

withTempPath { path =>
val e = intercept[Exception](df.write.parquet(path.getCanonicalPath))
assert(e.getMessage == "columnar write")
}
}
assert(session.sessionState.columnarRules.contains(
MyColumnarRule(PreRuleReplaceAddWithBrokenVersion(), MyPostRule())))
import session.sqlContext.implicits._
// perform a join to inject a broadcast exchange
val left = Seq((1, 50L), (2, 100L), (3, 150L)).toDF("l1", "l2")
val right = Seq((1, 50L), (2, 100L), (3, 150L)).toDF("r1", "r2")
val data = left.join(right, $"l1" === $"r1")
// repartitioning avoids having the add operation pushed up into the LocalTableScan
.repartition(1)
val df = data.selectExpr("l2 + r2")
// execute the plan so that the final adaptive plan is available when AQE is on
df.collect()
val found = collectPlanSteps(df.queryExecution.executedPlan).sum
// 1 MyBroadcastExchangeExec
// 1 MyShuffleExchangeExec
// 1 ColumnarToRowExec
// 2 ColumnarProjectExec
// 1 ReplacedRowToColumnarExec
// so 11121 is expected.
assert(found == 11121)

// Verify that we get back the expected, wrong, result
val result = df.collect()
assert(result(0).getLong(0) == 101L) // Check that broken columnar Add was used.
assert(result(1).getLong(0) == 201L)
assert(result(2).getLong(0) == 301L)

withTempPath { path =>
val e = intercept[Exception](df.write.parquet(path.getCanonicalPath))
assert(e.getMessage == "columnar write")
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,22 +127,18 @@ class SparkPlanSuite extends QueryTest with SharedSparkSession {

test("SPARK-37779: ColumnarToRowExec should be canonicalizable after being (de)serialized") {
withSQLConf(SQLConf.USE_V1_SOURCE_LIST.key -> "parquet") {
Seq(true, false).foreach { enable =>
withSQLConf(SQLConf.USE_PARTITION_EVALUATOR.key -> enable.toString) {
withTempPath { path =>
spark.range(1).write.parquet(path.getAbsolutePath)
val df = spark.read.parquet(path.getAbsolutePath)
val columnarToRowExec =
df.queryExecution.executedPlan.collectFirst { case p: ColumnarToRowExec => p }.get
try {
spark.range(1).foreach { _ =>
columnarToRowExec.canonicalized
()
}
} catch {
case e: Throwable => fail("ColumnarToRowExec was not canonicalizable", e)
}
withTempPath { path =>
spark.range(1).write.parquet(path.getAbsolutePath)
val df = spark.read.parquet(path.getAbsolutePath)
val columnarToRowExec =
df.queryExecution.executedPlan.collectFirst { case p: ColumnarToRowExec => p }.get
try {
spark.range(1).foreach { _ =>
columnarToRowExec.canonicalized
()
}
} catch {
case e: Throwable => fail("ColumnarToRowExec was not canonicalizable", e)
}
}
}
Expand Down