Skip to content

Commit

Permalink
[SPARK-41708][SQL][FOLLOWUP] Do not insert columnar to row transition…
Browse files Browse the repository at this point in the history
… before write command

### What changes were proposed in this pull request?

This is a followup of apache#39277 . With planned write, the write command requires neither columnar nor row-based execution. It invokes a new API `executeWrite`, which returns commit messages, not columnar or row-based data.

This PR updates `ApplyColumnarRulesAndInsertTransitions` to take this case into consideration.

### Why are the changes needed?

If people replaces `WriteFilesExec` with a columnar version, the plan can't be executed due to an extra columnar to row transition between `WriteFilesExee` and the write command.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

new test

Closes apache#39922 from cloud-fan/write.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(cherry picked from commit 56dd20f)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
  • Loading branch information
cloud-fan authored and dongjoon-hyun committed Feb 7, 2023
1 parent 200f7f7 commit 5ce7ffe
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ import org.apache.spark.sql.catalyst.expressions.codegen.Block._
import org.apache.spark.sql.catalyst.plans.physical.Partitioning
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.execution.command.DataWritingCommandExec
import org.apache.spark.sql.execution.datasources.V1WriteCommand
import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
import org.apache.spark.sql.execution.vectorized.{OffHeapColumnVector, OnHeapColumnVector, WritableColumnVector}
import org.apache.spark.sql.types._
Expand Down Expand Up @@ -541,10 +543,19 @@ case class ApplyColumnarRulesAndInsertTransitions(
// `outputsColumnar` is false but the plan only outputs columnar format, so add a
// to-row transition here.
ColumnarToRowExec(insertRowToColumnar(plan))
} else if (!plan.isInstanceOf[ColumnarToRowTransition]) {
plan.withNewChildren(plan.children.map(insertTransitions(_, outputsColumnar = false)))
} else {
} else if (plan.isInstanceOf[ColumnarToRowTransition]) {
plan
} else {
val outputsColumnar = plan match {
// With planned write, the write command invokes child plan's `executeWrite` which is
// neither columnar nor row-based.
case write: DataWritingCommandExec
if write.cmd.isInstanceOf[V1WriteCommand] && conf.plannedWriteEnabled =>
write.child.supportsColumnar
case _ =>
false
}
plan.withNewChildren(plan.children.map(insertTransitions(_, outputsColumnar)))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,19 @@ import org.apache.spark.broadcast.Broadcast
import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.{FunctionIdentifier, InternalRow, TableIdentifier}
import org.apache.spark.sql.catalyst.catalog.BucketSpec
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.parser.{CatalystSqlParser, ParserInterface}
import org.apache.spark.sql.catalyst.plans.SQLHelper
import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, Limit, LocalRelation, LogicalPlan, Statistics, UnresolvedHint}
import org.apache.spark.sql.catalyst.plans.physical.Partitioning
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.trees.TreeNodeTag
import org.apache.spark.sql.connector.write.WriterCommitMessage
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec, QueryStageExec, ShuffleQueryStageExec}
import org.apache.spark.sql.execution.datasources.{FileFormat, WriteFilesExec, WriteFilesSpec}
import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, BroadcastExchangeLike, ShuffleExchangeExec, ShuffleExchangeLike, ShuffleOrigin}
import org.apache.spark.sql.execution.vectorized.OnHeapColumnVector
import org.apache.spark.sql.internal.SQLConf
Expand Down Expand Up @@ -301,6 +305,11 @@ class SparkSessionExtensionSuite extends SparkFunSuite with SQLHelper {
assert(result(0).getLong(0) == 101L) // Check that broken columnar Add was used.
assert(result(1).getLong(0) == 201L)
assert(result(2).getLong(0) == 301L)

withTempPath { path =>
val e = intercept[Exception](df.write.parquet(path.getCanonicalPath))
assert(e.getMessage == "columnar write")
}
}
}

Expand Down Expand Up @@ -790,6 +799,27 @@ class ColumnarProjectExec(projectList: Seq[NamedExpression], child: SparkPlan)
new ColumnarProjectExec(projectList, newChild)
}

class ColumnarWriteExec(
child: SparkPlan,
fileFormat: FileFormat,
partitionColumns: Seq[Attribute],
bucketSpec: Option[BucketSpec],
options: Map[String, String],
staticPartitions: TablePartitionSpec) extends WriteFilesExec(
child, fileFormat, partitionColumns, bucketSpec, options, staticPartitions) {

override def supportsColumnar(): Boolean = true

override def doExecuteWrite(writeFilesSpec: WriteFilesSpec): RDD[WriterCommitMessage] = {
assert(child.supportsColumnar)
throw new Exception("columnar write")
}

override protected def withNewChildInternal(newChild: SparkPlan): WriteFilesExec =
new ColumnarWriteExec(
newChild, fileFormat, partitionColumns, bucketSpec, options, staticPartitions)
}

/**
* A version of add that supports columnar processing for longs. This version is broken
* on purpose so it adds the numbers plus 1 so that the tests can show that it was replaced.
Expand Down Expand Up @@ -897,6 +927,14 @@ case class PreRuleReplaceAddWithBrokenVersion() extends Rule[SparkPlan] {
new ColumnarProjectExec(plan.projectList.map((exp) =>
replaceWithColumnarExpression(exp).asInstanceOf[NamedExpression]),
replaceWithColumnarPlan(plan.child))
case write: WriteFilesExec =>
new ColumnarWriteExec(
replaceWithColumnarPlan(write.child),
write.fileFormat,
write.partitionColumns,
write.bucketSpec,
write.options,
write.staticPartitions)
case p =>
logWarning(s"Columnar processing for ${p.getClass} is not currently supported.")
p.withNewChildren(p.children.map(replaceWithColumnarPlan))
Expand Down

0 comments on commit 5ce7ffe

Please sign in to comment.