Skip to content

Commit

Permalink
[CARMEL-1463] Support DELETE syntax in Delta
Browse files Browse the repository at this point in the history
  • Loading branch information
LantaoJin committed Oct 30, 2019
1 parent 2ac8d31 commit 5b82307
Show file tree
Hide file tree
Showing 4 changed files with 25 additions and 9 deletions.
4 changes: 2 additions & 2 deletions src/main/scala/io/delta/sql/DeltaSparkSessionExtension.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

package io.delta.sql

import io.delta.sql.analysis.{DeltaSqlResolution, PreprocessTableUpdate}
import io.delta.sql.analysis.{DeltaSqlResolution, PreprocessTableUpdateDelete}
import io.delta.sql.parser.DeltaSqlParser
import org.apache.spark.sql.SparkSessionExtensions

Expand Down Expand Up @@ -78,7 +78,7 @@ class DeltaSparkSessionExtension extends (SparkSessionExtensions => Unit) {
}

extensions.injectPostHocResolutionRule { session =>
new PreprocessTableUpdate(session)
new PreprocessTableUpdateDelete(session)
}
}
}
10 changes: 7 additions & 3 deletions src/main/scala/io/delta/sql/analysis/DeltaSqlResolution.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,20 @@ package io.delta.sql.analysis

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedRelation}
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, UpdateTable}
import org.apache.spark.sql.catalyst.plans.logical.{Delete, LogicalPlan, UpdateTable}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.datasources.UpdateTableIdentifier
import org.apache.spark.sql.execution.datasources.{DeleteFromStatement, UpdateTableStatement}

class DeltaSqlResolution(spark: SparkSession) extends Rule[LogicalPlan] {

override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
case UpdateTableIdentifier(tableIdentifier, c, values, condition) =>
case UpdateTableStatement(tableIdentifier, c, values, condition) =>
val table = UnresolvedRelation(tableIdentifier)
val columns = c.map(UnresolvedAttribute(_))
UpdateTable(table, columns, values, condition)

case DeleteFromStatement(tableIdentifier, condition) =>
val table = UnresolvedRelation(tableIdentifier)
Delete(table, condition)
}
}
15 changes: 12 additions & 3 deletions src/main/scala/io/delta/sql/analysis/PreprocessTableUpdate.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,13 @@ package io.delta.sql.analysis

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.analysis.{EliminateSubqueryAliases, UnresolvedAttribute}
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, UpdateTable}
import org.apache.spark.sql.catalyst.plans.logical.{Delete, LogicalPlan, UpdateTable}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.delta.commands.UpdateCommand
import org.apache.spark.sql.delta.commands.{DeleteCommand, UpdateCommand}
import org.apache.spark.sql.delta.{DeltaErrors, DeltaFullTable, UpdateExpressionsSupport}
import org.apache.spark.sql.internal.SQLConf

class PreprocessTableUpdate(
class PreprocessTableUpdateDelete(
spark: SparkSession) extends Rule[LogicalPlan] with UpdateExpressionsSupport {

def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperatorsDown {
Expand All @@ -42,6 +42,15 @@ class PreprocessTableUpdate(
val alignedUpdateExprs = generateUpdateExpressions(
update.child.output, targetColNameParts, update.updateExpressions, conf.resolver)
UpdateCommand(index, update.child, alignedUpdateExprs, update.condition)

case delete: Delete =>
val index = EliminateSubqueryAliases(delete.child) match {
case DeltaFullTable(tahoeFileIndex) =>
tahoeFileIndex
case o =>
throw DeltaErrors.notADeltaSourceException("DELETE", Some(o))
}
DeleteCommand(index, delete.child, delete.condition)
}

override def conf: SQLConf = spark.sessionState.conf
Expand Down
5 changes: 4 additions & 1 deletion src/main/scala/io/delta/sql/execution/DeltaSqlStrategy.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,17 @@ package io.delta.sql.execution

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.delta.commands.UpdateCommand
import org.apache.spark.sql.delta.commands.{DeleteCommand, UpdateCommand}
import org.apache.spark.sql.execution.{SparkPlan, SparkStrategy}

case class DeltaSqlStrategy(spark: SparkSession) extends SparkStrategy {
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
case u @ UpdateCommand(_, _, _, _) =>
u.run(spark)
Nil
case u @ DeleteCommand(_, _, _) =>
u.run(spark)
Nil
case _ => Nil
}
}

0 comments on commit 5b82307

Please sign in to comment.