From 5b8230700f0c60c2a3e5dbd7673472008f18dad7 Mon Sep 17 00:00:00 2001 From: lajin Date: Wed, 30 Oct 2019 16:50:03 +0800 Subject: [PATCH] [CARMEL-1463] Support DELETE syntax in Delta --- .../io/delta/sql/DeltaSparkSessionExtension.scala | 4 ++-- .../delta/sql/analysis/DeltaSqlResolution.scala | 10 +++++++--- .../sql/analysis/PreprocessTableUpdate.scala | 15 ++++++++++++--- .../io/delta/sql/execution/DeltaSqlStrategy.scala | 5 ++++- 4 files changed, 25 insertions(+), 9 deletions(-) diff --git a/src/main/scala/io/delta/sql/DeltaSparkSessionExtension.scala b/src/main/scala/io/delta/sql/DeltaSparkSessionExtension.scala index 5590ee0bc0c..350535f124f 100644 --- a/src/main/scala/io/delta/sql/DeltaSparkSessionExtension.scala +++ b/src/main/scala/io/delta/sql/DeltaSparkSessionExtension.scala @@ -16,7 +16,7 @@ package io.delta.sql -import io.delta.sql.analysis.{DeltaSqlResolution, PreprocessTableUpdate} +import io.delta.sql.analysis.{DeltaSqlResolution, PreprocessTableUpdateDelete} import io.delta.sql.parser.DeltaSqlParser import org.apache.spark.sql.SparkSessionExtensions @@ -78,7 +78,7 @@ class DeltaSparkSessionExtension extends (SparkSessionExtensions => Unit) { } extensions.injectPostHocResolutionRule { session => - new PreprocessTableUpdate(session) + new PreprocessTableUpdateDelete(session) } } } diff --git a/src/main/scala/io/delta/sql/analysis/DeltaSqlResolution.scala b/src/main/scala/io/delta/sql/analysis/DeltaSqlResolution.scala index 6808913c7ef..4d7771a61e8 100644 --- a/src/main/scala/io/delta/sql/analysis/DeltaSqlResolution.scala +++ b/src/main/scala/io/delta/sql/analysis/DeltaSqlResolution.scala @@ -18,16 +18,20 @@ package io.delta.sql.analysis import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedRelation} -import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, UpdateTable} +import org.apache.spark.sql.catalyst.plans.logical.{Delete, LogicalPlan, UpdateTable} import org.apache.spark.sql.catalyst.rules.Rule -import org.apache.spark.sql.execution.datasources.UpdateTableIdentifier +import org.apache.spark.sql.execution.datasources.{DeleteFromStatement, UpdateTableStatement} class DeltaSqlResolution(spark: SparkSession) extends Rule[LogicalPlan] { override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { - case UpdateTableIdentifier(tableIdentifier, c, values, condition) => + case UpdateTableStatement(tableIdentifier, c, values, condition) => val table = UnresolvedRelation(tableIdentifier) val columns = c.map(UnresolvedAttribute(_)) UpdateTable(table, columns, values, condition) + + case DeleteFromStatement(tableIdentifier, condition) => + val table = UnresolvedRelation(tableIdentifier) + Delete(table, condition) } } diff --git a/src/main/scala/io/delta/sql/analysis/PreprocessTableUpdate.scala b/src/main/scala/io/delta/sql/analysis/PreprocessTableUpdate.scala index deda7160041..95644358891 100644 --- a/src/main/scala/io/delta/sql/analysis/PreprocessTableUpdate.scala +++ b/src/main/scala/io/delta/sql/analysis/PreprocessTableUpdate.scala @@ -18,13 +18,13 @@ package io.delta.sql.analysis import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.analysis.{EliminateSubqueryAliases, UnresolvedAttribute} -import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, UpdateTable} +import org.apache.spark.sql.catalyst.plans.logical.{Delete, LogicalPlan, UpdateTable} import org.apache.spark.sql.catalyst.rules.Rule -import org.apache.spark.sql.delta.commands.UpdateCommand +import org.apache.spark.sql.delta.commands.{DeleteCommand, UpdateCommand} import org.apache.spark.sql.delta.{DeltaErrors, DeltaFullTable, UpdateExpressionsSupport} import org.apache.spark.sql.internal.SQLConf -class PreprocessTableUpdate( +class PreprocessTableUpdateDelete( spark: SparkSession) extends Rule[LogicalPlan] with UpdateExpressionsSupport { def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperatorsDown { @@ -42,6 +42,15 @@ class PreprocessTableUpdate( val alignedUpdateExprs = generateUpdateExpressions( update.child.output, targetColNameParts, update.updateExpressions, conf.resolver) UpdateCommand(index, update.child, alignedUpdateExprs, update.condition) + + case delete: Delete => + val index = EliminateSubqueryAliases(delete.child) match { + case DeltaFullTable(tahoeFileIndex) => + tahoeFileIndex + case o => + throw DeltaErrors.notADeltaSourceException("DELETE", Some(o)) + } + DeleteCommand(index, delete.child, delete.condition) } override def conf: SQLConf = spark.sessionState.conf diff --git a/src/main/scala/io/delta/sql/execution/DeltaSqlStrategy.scala b/src/main/scala/io/delta/sql/execution/DeltaSqlStrategy.scala index eafe618c04f..fc22ac45bac 100644 --- a/src/main/scala/io/delta/sql/execution/DeltaSqlStrategy.scala +++ b/src/main/scala/io/delta/sql/execution/DeltaSqlStrategy.scala @@ -18,7 +18,7 @@ package io.delta.sql.execution import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan -import org.apache.spark.sql.delta.commands.UpdateCommand +import org.apache.spark.sql.delta.commands.{DeleteCommand, UpdateCommand} import org.apache.spark.sql.execution.{SparkPlan, SparkStrategy} case class DeltaSqlStrategy(spark: SparkSession) extends SparkStrategy { @@ -26,6 +26,9 @@ case class DeltaSqlStrategy(spark: SparkSession) extends SparkStrategy { case u @ UpdateCommand(_, _, _, _) => u.run(spark) Nil + case u @ DeleteCommand(_, _, _) => + u.run(spark) + Nil case _ => Nil } }