Skip to content

Commit

Permalink
avoid canonicalization if possible
Browse files Browse the repository at this point in the history
  • Loading branch information
peter-toth committed Oct 15, 2019
1 parent efd6045 commit 8ee1921
Showing 1 changed file with 18 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -107,13 +107,28 @@ case class ReuseExchange(conf: SQLConf) extends Rule[SparkPlan] {
if (!conf.exchangeReuseEnabled) {
return plan
}
// To avoid costly canonicalization of an exchange:
// - we use its schema first to check to check if it can be replaced to a reused exchange at all
// - we insert an exchange into the map of canonicalized plans only when at least 2 exchange
// have the same schema
val exchangeSchemas = mutable.HashMap[StructType, (Exchange, Boolean)]()
val exchanges = mutable.HashMap[SparkPlan, Exchange]()

def reuse(plan: SparkPlan): SparkPlan = plan.transform {
case exchange: Exchange =>
val newExchange = exchanges.getOrElseUpdate(exchange.canonicalized, exchange)
if (newExchange.ne(exchange)) {
ReusedExchangeExec(exchange.output, newExchange)
val (sameSchemaExchange, onlyOne) =
exchangeSchemas.getOrElseUpdate(exchange.schema, (exchange, true))
if (sameSchemaExchange.ne(exchange)) {
if (onlyOne) {
exchangeSchemas += sameSchemaExchange.schema -> (sameSchemaExchange, false)
exchanges += sameSchemaExchange.canonicalized -> sameSchemaExchange
}
val sameResultExchange = exchanges.getOrElseUpdate(exchange.canonicalized, exchange)
if (sameResultExchange.ne(exchange)) {
ReusedExchangeExec(exchange.output, sameResultExchange)
} else {
exchange
}
} else {
exchange
}
Expand Down

0 comments on commit 8ee1921

Please sign in to comment.