Skip to content

Commit

Permalink
init
Browse files Browse the repository at this point in the history
  • Loading branch information
zhengruifeng authored and itholic committed Jul 17, 2023
1 parent 85d8d62 commit 12b4e73
Show file tree
Hide file tree
Showing 7 changed files with 45 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,12 @@ class SparkConnectPlanner(val sessionHolder: SessionHolder) extends Logging {

if (rel.hasCommon && rel.getCommon.hasPlanId) {
plan.setTagValue(LogicalPlan.PLAN_ID_TAG, rel.getCommon.getPlanId)
// scalastyle:off println

println()
println("Planner get a plan:")
println(s"$plan")
println()
}
plan
}
Expand Down
1 change: 1 addition & 0 deletions python/pyspark/sql/connect/client/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -823,6 +823,7 @@ def schema(self, plan: pb2.Plan) -> StructType:
Return schema for given plan.
"""
logger.info(f"Schema for plan: {self._proto_to_string(plan)}")
print(f"Schema for plan: {self._proto_to_string(plan)}")
schema = self._analyze(method="schema", plan=plan).schema
assert schema is not None
# Server side should populate the struct field which is the schema.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3151,9 +3151,16 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor
// Add Window operators.
val withWindow = addWindow(windowExpressions, withProject)

val planId = p.getTagValue(LogicalPlan.PLAN_ID_TAG)

// Finally, generate output columns according to the original projectList.
val finalProjectList = projectList.map(_.toAttribute)
Project(finalProjectList, withWindow)
val newProject = Project(finalProjectList, withWindow)

// retain the plan id used in Spark Connect
planId.foreach(newProject.setTagValue(LogicalPlan.PLAN_ID_TAG, _))

newProject
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,9 @@ abstract class TypeCoercionBase {
object WidenSetOperationTypes extends Rule[LogicalPlan] {

override def apply(plan: LogicalPlan): LogicalPlan = {
plan resolveOperatorsUpWithNewOutput {
val planId = plan.getTagValue(LogicalPlan.PLAN_ID_TAG)

val newPlan = plan resolveOperatorsUpWithNewOutput {
case s @ Except(left, right, isAll) if s.childrenResolved &&
left.output.length == right.output.length && !s.resolved =>
val newChildren: Seq[LogicalPlan] = buildNewChildrenWithWiderTypes(left :: right :: Nil)
Expand Down Expand Up @@ -290,6 +292,11 @@ abstract class TypeCoercionBase {
s.copy(children = newChildren) -> attrMapping
}
}

// retain the plan id used in Spark Connect
planId.foreach(newPlan.setTagValue(LogicalPlan.PLAN_ID_TAG, _))

newPlan
}

/** Build new children with the widest types for each attribute among all the children */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import org.apache.spark.sql.catalyst.analysis.TypeCheckResult.DataTypeMismatch
import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateFunction
import org.apache.spark.sql.catalyst.expressions.codegen._
import org.apache.spark.sql.catalyst.expressions.codegen.Block._
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.trees.{BinaryLike, CurrentOrigin, LeafLike, QuaternaryLike, SQLQueryContext, TernaryLike, TreeNode, UnaryLike}
import org.apache.spark.sql.catalyst.trees.TreePattern.{RUNTIME_REPLACEABLE, TreePattern}
import org.apache.spark.sql.catalyst.types.DataTypeUtils
Expand Down Expand Up @@ -343,8 +344,14 @@ abstract class Expression extends TreeNode[Expression] {

override def simpleString(maxFields: Int): String = toString

override def toString: String = prettyName + truncatedString(
flatArguments.toSeq, "(", ", ", ")", SQLConf.get.maxToStringFields)
override def toString: String = {
val str = prettyName + truncatedString(
flatArguments.toSeq, "(", ", ", ")", SQLConf.get.maxToStringFields)
this.getTagValue(LogicalPlan.PLAN_ID_TAG) match {
case Some(planId) => s"$str {planId=$planId}"
case _ => str
}
}

/**
* Returns SQL representation of this expression. For expressions extending [[NonSQLExpression]],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,15 @@ abstract class LogicalPlan
}
}

override def simpleString(maxFields: Int): String = {
val str = super.simpleString(maxFields)
this.getTagValue(LogicalPlan.PLAN_ID_TAG) match {
case Some(planId) => s"$str {planId=$planId}"
case _ => str
}
}


private[this] lazy val childAttributes = AttributeSeq.fromNormalOutput(children.flatMap(_.output))

private[this] lazy val childMetadataAttributes = AttributeSeq(children.flatMap(_.metadataOutput))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ object RuleExecutor {
}
}

// scalastyle:off println
class PlanChangeLogger[TreeType <: TreeNode[_]] extends Logging {

private val logLevel = SQLConf.get.planChangeLogLevel
Expand All @@ -63,6 +64,7 @@ class PlanChangeLogger[TreeType <: TreeNode[_]] extends Logging {
""".stripMargin
}

println(message)
logBasedOnLevel(message)
}
}
Expand All @@ -81,6 +83,7 @@ class PlanChangeLogger[TreeType <: TreeNode[_]] extends Logging {
}
}

println(message)
logBasedOnLevel(message)
}
}
Expand All @@ -97,6 +100,7 @@ class PlanChangeLogger[TreeType <: TreeNode[_]] extends Logging {
|Total time of effective runs: $totalTimeEffective seconds
""".stripMargin

println(message)
logBasedOnLevel(message)
}

Expand Down

0 comments on commit 12b4e73

Please sign in to comment.