Skip to content
This repository has been archived by the owner on Sep 18, 2023. It is now read-only.

Commit

Permalink
[NSE-285] ColumnarWindow: Support Date input in MAX/MIN (#286)
Browse files Browse the repository at this point in the history
Closes #285
  • Loading branch information
zhztheplayer authored May 8, 2021
1 parent 56bcb73 commit b6c267a
Show file tree
Hide file tree
Showing 10 changed files with 350 additions and 224 deletions.
56 changes: 8 additions & 48 deletions arrow-data-source/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -100,26 +100,6 @@
<version>${scala.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.12</artifactId>
<version>${spark.version}</version>
<exclusions>
<exclusion>
<groupId>org.apache.arrow</groupId>
<artifactId>arrow-vector</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
</exclusions>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
Expand All @@ -128,44 +108,24 @@
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId>
<version>${spark.version}</version>
<artifactId>spark-sql_${scala.binary.version}</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.binary.version}</artifactId>
<type>test-jar</type>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-catalyst_2.12</artifactId>
<version>${spark.version}</version>
<exclusions>
<exclusion>
<groupId>org.apache.arrow</groupId>
<artifactId>arrow-vector</artifactId>
</exclusion>
</exclusions>
<artifactId>spark-catalyst_${scala.binary.version}</artifactId>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.12</artifactId>
<version>${spark.version}</version>
<exclusions>
<exclusion>
<groupId>org.apache.arrow</groupId>
<artifactId>arrow-vector</artifactId>
</exclusion>
</exclusions>
<artifactId>spark-sql_${scala.binary.version}</artifactId>
<type>test-jar</type>
<scope>test</scope>
</dependency>
Expand Down
28 changes: 1 addition & 27 deletions native-sql-engine/core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -48,50 +48,24 @@
<!-- Prevent our dummy JAR from being included in Spark distributions or uploaded to YARN -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
<artifactId>spark-sql_${scala.binary.version}</artifactId>
<scope>provided</scope>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-catalyst_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-catalyst_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ case class ColumnarGuardRule(conf: SparkConf) extends Rule[SparkPlan] {
plan.isSkewJoin)
case plan: WindowExec =>
if (!enableColumnarWindow) return false
val window = ColumnarWindowExec.create(
val window = ColumnarWindowExec.createWithOptimizations(
plan.windowExpression,
plan.partitionSpec,
plan.orderSpec,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,36 +221,17 @@ case class ColumnarPreOverrides(conf: SparkConf) extends Rule[SparkPlan] {
}

case plan: WindowExec =>
if (columnarConf.enableColumnarWindow) {
val sortRemoved = plan.child match {
case sort: SortExec => // remove ordering requirements
replaceWithColumnarPlan(sort.child)
case _ =>
replaceWithColumnarPlan(plan.child)
}
// disable CoalesceBatchesExec to reduce Netty direct memory usage
val coalesceBatchRemoved = sortRemoved match {
case s: CoalesceBatchesExec =>
s.child
case _ => sortRemoved
}
logDebug(s"Columnar Processing for ${plan.getClass} is currently supported.")
try {
val window = ColumnarWindowExec.create(
plan.windowExpression,
plan.partitionSpec,
plan.orderSpec,
coalesceBatchRemoved)
return window
} catch {
case _: Throwable =>
logInfo("Columnar Window: Falling back to regular Window...")
}
try {
ColumnarWindowExec.createWithOptimizations(
plan.windowExpression,
plan.partitionSpec,
plan.orderSpec,
replaceWithColumnarPlan(plan.child))
} catch {
case _: Throwable =>
logInfo("Columnar Window: Falling back to regular Window...")
plan
}
logDebug(s"Columnar Processing for ${plan.getClass} is not currently supported.")
val children = plan.children.map(replaceWithColumnarPlan)
plan.withNewChildren(children)

case p =>
val children = plan.children.map(replaceWithColumnarPlan)
logDebug(s"Columnar Processing for ${p.getClass} is currently not supported.")
Expand Down
Loading

0 comments on commit b6c267a

Please sign in to comment.