Skip to content

Commit

Permalink
avoid canonicalization if possible
Browse files Browse the repository at this point in the history
  • Loading branch information
peter-toth committed Oct 16, 2019
1 parent efd6045 commit fe7526e
Showing 1 changed file with 18 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -107,13 +107,27 @@ case class ReuseExchange(conf: SQLConf) extends Rule[SparkPlan] {
if (!conf.exchangeReuseEnabled) {
return plan
}
val exchanges = mutable.HashMap[SparkPlan, Exchange]()
// To avoid costly canonicalization of an exchange:
// - we use its schema first to check if it can be replaced to a reused exchange at all
// - we insert an exchange into the map of canonicalized plans only when at least 2 exchange
// have the same schema
val exchanges = mutable.Map[StructType, (Exchange, mutable.Map[SparkPlan, Exchange])]()

def reuse(plan: SparkPlan): SparkPlan = plan.transform {
case exchange: Exchange =>
val newExchange = exchanges.getOrElseUpdate(exchange.canonicalized, exchange)
if (newExchange.ne(exchange)) {
ReusedExchangeExec(exchange.output, newExchange)
val (firstSameSchemaExchange, sameResultExchanges) =
exchanges.getOrElseUpdate(exchange.schema, (exchange, mutable.Map()))
if (firstSameSchemaExchange.ne(exchange)) {
if (sameResultExchanges.isEmpty) {
sameResultExchanges += firstSameSchemaExchange.canonicalized -> firstSameSchemaExchange
}
val sameResultExchange =
sameResultExchanges.getOrElseUpdate(exchange.canonicalized, exchange)
if (sameResultExchange.ne(exchange)) {
ReusedExchangeExec(exchange.output, sameResultExchange)
} else {
exchange
}
} else {
exchange
}
Expand Down

0 comments on commit fe7526e

Please sign in to comment.