-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-26450][SQL] Avoid rebuilding map of schema for every column in projection #23392
Conversation
Test build #100481 has finished for PR 23392 at commit
|
retest this please |
Test build #100483 has finished for PR 23392 at commit
|
Getting java.lang.NoClassDefFoundError: javax/jdo/JDOException trying to instantiate HiveMetaStoreClient during HiveClientSuites. Common error... so I will try again. |
retest this please |
Test build #100484 has finished for PR 23392 at commit
|
retest this please |
Test build #100490 has finished for PR 23392 at commit
|
retest this please |
Test build #100503 has finished for PR 23392 at commit
|
retest this please |
Test build #100507 has finished for PR 23392 at commit
|
protected def bind(in: Seq[Expression], inputSchema: Seq[Attribute]): Seq[Expression] = | ||
in.map(BindReferences.bindReference(_, inputSchema)) | ||
protected def bind(in: Seq[Expression], inputSchema: Seq[Attribute]): Seq[Expression] = { | ||
lazy val inputSchemaAttrSeq: AttributeSeq = inputSchema |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why the lazy val? Are you optimizing for the case where in is empty?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, that is the reason. For example, the query df.count
, where df is a dataframe from a CSV datasource, calls GenerateUnsafeProjection.bind with am empty list of expressions.
However, the map inside the AttributeSeq object is not built until someone accesses exprIdToOrdinal, so maybe it is overkill.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what about changing the signature of bind
instead? This would be helpful also to ensure that we don't miss this fix in other parts of the code IMHO.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@mgaido91 Do you mean change it to this?:
bind(in: Seq[Expression], inputSchema: AttributeSeq): Seq[Expression]
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 on something that eliminates this issue wholesale.
@bersprockets looks pretty good. Are there any other places where we should apply this? If there are then we should consider introducing a helper function. |
@hvanhovell There are other places that use this pattern: InterpretedProjection However, I don't know if any are in a "hot" path. |
@hvanhovell For a helper function, if needed, I was thinking object BindReferences would be a good place for it (named |
@@ -89,7 +89,8 @@ package object expressions { | |||
* A helper function to bind given expressions to an input schema. | |||
*/ | |||
def toBoundExprs(exprs: Seq[Expression], inputSchema: Seq[Attribute]): Seq[Expression] = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
I am working to replace the dozen or so cases of |
Test build #100632 has finished for PR 23392 at commit
|
exprs.map(BindReferences.bindReference(_, inputSchema)) | ||
def toBoundExprs[A <: Expression]( | ||
exprs: Seq[A], | ||
inputSchema: Seq[Attribute]): Seq[A] = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why not just changing this to AttributeSeq
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sometimes this function is called with a zero-length exprs (df.count
, for example). I am attempting to avoid constructing the AttributeSeq in that case, because AttributeSeq's constructor eagerly builds a data structure based on the attributes (private val qualified3Part
).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How expensive is it to build those structures on empty data? We could consider cachin an empty attribute seq and use that when the there are no attributes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was thinking that just because exprs is empty, that would not necessarily mean inputSchema is empty. But experiments seem to indicate that inputSchema is also empty. So building the AttributeSeq would be extremely low-cost. I guess there is no reason to lazily build it.
@@ -393,7 +393,8 @@ case class SortMergeJoinExec( | |||
input: Seq[Attribute]): Seq[ExprCode] = { | |||
ctx.INPUT_ROW = row | |||
ctx.currentVars = null | |||
keys.map(BindReferences.bindReference(_, input).genCode(ctx)) | |||
val inputAttributeSeq: AttributeSeq = input | |||
keys.map(BindReferences.bindReference(_, inputAttributeSeq).genCode(ctx)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why not toBoundExprs(keys, input).map(_.genCode(ctx))
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It should be...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, just one style comment
@@ -88,7 +88,9 @@ package object expressions { | |||
/** | |||
* A helper function to bind given expressions to an input schema. | |||
*/ | |||
def toBoundExprs(exprs: Seq[Expression], inputSchema: Seq[Attribute]): Seq[Expression] = { | |||
def toBoundExprs[A <: Expression]( | |||
exprs: Seq[A], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: indent
Test build #100645 has finished for PR 23392 at commit
|
Test build #100661 has finished for PR 23392 at commit
|
retest this please |
@mgaido91 @hvanhovell I am looking at a small oddity with one of the benchmark cases (Orc 60 cols, 50M rows) that got introduced sometime after the initial commit. So if anyone felt inclined to merge this, please hold off for now. If there are more review comments, that's good. |
Test build #100662 has finished for PR 23392 at commit
|
@bersprockets did you audit the entire codebase for this pattern? From a cursory search I could also see that |
@@ -88,7 +88,9 @@ package object expressions { | |||
/** | |||
* A helper function to bind given expressions to an input schema. | |||
*/ | |||
def toBoundExprs(exprs: Seq[Expression], inputSchema: Seq[Attribute]): Seq[Expression] = { | |||
def toBoundExprs[A <: Expression]( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would like to minimize the chance that future changes suffer from the same issue. In order to do that we should provide API in a logical place, it does not make a whole lot of sense to me that I need to look in package.scala
to find a more performant version of BindReferences.bindReference(..)
for a seq. Can we move this function to BindReference
and name it bindReferences
?
Ahh.. good catch. My search did not include a parameter name, e.g. the x in |
val aggResults = functions.map(_.evaluateExpression).map { e => | ||
BindReferences.bindReference(e, aggregateBufferAttributes).genCode(ctx) | ||
} | ||
val aggResults = bindReferences(functions.map(_.evaluateExpression), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: functions.map(_.evaluateExpression),
in the next line
val aggResults = declFunctions.map(_.evaluateExpression).map { e => | ||
BindReferences.bindReference(e, aggregateBufferAttributes).genCode(ctx) | ||
} | ||
val aggResults = bindReferences(declFunctions.map(_.evaluateExpression), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: ditto
val resultVars = resultExpressions.map { e => | ||
BindReferences.bindReference(e, inputAttrs).genCode(ctx) | ||
} | ||
val resultVars = bindReferences[Expression](resultExpressions, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
val resultVars = resultExpressions.map { e => | ||
BindReferences.bindReference(e, inputAttrs).genCode(ctx) | ||
} | ||
val resultVars = bindReferences[Expression](resultExpressions, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
val eval = resultExpressions.map{ e => | ||
BindReferences.bindReference(e, groupingAttributes).genCode(ctx) | ||
} | ||
val eval = bindReferences[Expression](resultExpressions, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
.map(BindReferences.bindReference(_, outputSpec.outputColumns)) | ||
val orderingExpr = bindReferences( | ||
requiredOrdering.map(SortOrder(_, Ascending)), | ||
outputSpec.outputColumns) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: on previous line
BindReferences.bindReference(e.input, inputAttrs) | ||
} | ||
val boundExpressions = bindReferences( | ||
Seq.fill(ordinal)(NoOp) ++ expressions.toSeq.map(_.input), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seq.fill(ordinal)(NoOp) ++ bindReferences(...)
Note: There are two remaining places that use the pattern: collectionLikeThing.map(x => BindReferences.bindReference(x, list)) Those places are HiveTableScanExec ( |
Test build #100810 has finished for PR 23392 at commit
|
Test build #100813 has finished for PR 23392 at commit
|
Test build #101148 has finished for PR 23392 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
Merging to master. Thanks! |
… projection ## What changes were proposed in this pull request? When creating some unsafe projections, Spark rebuilds the map of schema attributes once for each expression in the projection. Some file format readers create one unsafe projection per input file, others create one per task. ProjectExec also creates one unsafe projection per task. As a result, for wide queries on wide tables, Spark might build the map of schema attributes hundreds of thousands of times. This PR changes two functions to reuse the same AttributeSeq instance when creating BoundReference objects for each expression in the projection. This avoids the repeated rebuilding of the map of schema attributes. ### Benchmarks The time saved by this PR depends on size of the schema, size of the projection, number of input files (or number of file splits), number of tasks, and file format. I chose a couple of example cases. In the following tests, I ran the query ```sql select * from table where id1 = 1 ``` Matching rows are about 0.2% of the table. #### Orc table 6000 columns, 500K rows, 34 input files baseline | pr | improvement ----|----|---- 1.772306 min | 1.487267 min | 16.082943% #### Orc table 6000 columns, 500K rows, *17* input files baseline | pr | improvement ----|----|---- 1.656400 min | 1.423550 min | 14.057595% #### Orc table 60 columns, 50M rows, 34 input files baseline | pr | improvement ----|----|---- 0.299878 min | 0.290339 min | 3.180926% #### Parquet table 6000 columns, 500K rows, 34 input files baseline | pr | improvement ----|----|---- 1.478306 min | 1.373728 min | 7.074165% Note: The parquet reader does not create an unsafe projection. However, the filter operation in the query causes the planner to add a ProjectExec, which does create an unsafe projection for each task. So these results have nothing to do with Parquet itself. #### Parquet table 60 columns, 50M rows, 34 input files baseline | pr | improvement ----|----|---- 0.245006 min | 0.242200 min | 1.145099% #### CSV table 6000 columns, 500K rows, 34 input files baseline | pr | improvement ----|----|---- 2.390117 min | 2.182778 min | 8.674844% #### CSV table 60 columns, 50M rows, 34 input files baseline | pr | improvement ----|----|---- 1.520911 min | 1.510211 min | 0.703526% ## How was this patch tested? SQL unit tests Python core and SQL test Closes apache#23392 from bersprockets/norebuild. Authored-by: Bruce Robbins <bersprockets@gmail.com> Signed-off-by: Herman van Hovell <hvanhovell@databricks.com>
What changes were proposed in this pull request?
When creating some unsafe projections, Spark rebuilds the map of schema attributes once for each expression in the projection. Some file format readers create one unsafe projection per input file, others create one per task. ProjectExec also creates one unsafe projection per task. As a result, for wide queries on wide tables, Spark might build the map of schema attributes hundreds of thousands of times.
This PR changes two functions to reuse the same AttributeSeq instance when creating BoundReference objects for each expression in the projection. This avoids the repeated rebuilding of the map of schema attributes.
Benchmarks
The time saved by this PR depends on size of the schema, size of the projection, number of input files (or number of file splits), number of tasks, and file format. I chose a couple of example cases.
In the following tests, I ran the query
Matching rows are about 0.2% of the table.
Orc table 6000 columns, 500K rows, 34 input files
Orc table 6000 columns, 500K rows, 17 input files
Orc table 60 columns, 50M rows, 34 input files
Parquet table 6000 columns, 500K rows, 34 input files
Note: The parquet reader does not create an unsafe projection. However, the filter operation in the query causes the planner to add a ProjectExec, which does create an unsafe projection for each task. So these results have nothing to do with Parquet itself.
Parquet table 60 columns, 50M rows, 34 input files
CSV table 6000 columns, 500K rows, 34 input files
CSV table 60 columns, 50M rows, 34 input files
How was this patch tested?
SQL unit tests
Python core and SQL test