Skip to content

Commit

Permalink
[CARMEL-1440] Enable Delta Lake SQL on Carmel Spark-2.3
Browse files Browse the repository at this point in the history
  • Loading branch information
LantaoJin committed Oct 28, 2019
1 parent 3c9f685 commit 2ac8d31
Show file tree
Hide file tree
Showing 7 changed files with 131 additions and 11 deletions.
6 changes: 4 additions & 2 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,13 @@ name := "delta-core"

organization := "io.delta"

crossScalaVersions := Seq("2.12.8", "2.11.12")
crossScalaVersions := Seq("2.11.12", "2.12.8")

scalaVersion := crossScalaVersions.value.head

sparkVersion := "2.4.2"
sparkVersion := "2.3.0"

unmanagedBase := baseDirectory.value / "lib"

libraryDependencies ++= Seq(
// Adding test classifier seems to break transitive resolution of the core dependencies
Expand Down
9 changes: 8 additions & 1 deletion src/main/scala/io/delta/sql/DeltaSparkSessionExtension.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@

package io.delta.sql

import io.delta.sql.analysis.{DeltaSqlResolution, PreprocessTableUpdate}
import io.delta.sql.parser.DeltaSqlParser

import org.apache.spark.sql.SparkSessionExtensions

/**
Expand Down Expand Up @@ -73,5 +73,12 @@ class DeltaSparkSessionExtension extends (SparkSessionExtensions => Unit) {
extensions.injectParser { (session, parser) =>
new DeltaSqlParser(parser)
}
extensions.injectResolutionRule { session =>
new DeltaSqlResolution(session)
}

extensions.injectPostHocResolutionRule { session =>
new PreprocessTableUpdate(session)
}
}
}
33 changes: 33 additions & 0 deletions src/main/scala/io/delta/sql/analysis/DeltaSqlResolution.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Copyright 2019 Databricks, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.delta.sql.analysis

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedRelation}
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, UpdateTable}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.datasources.UpdateTableIdentifier

class DeltaSqlResolution(spark: SparkSession) extends Rule[LogicalPlan] {

override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
case UpdateTableIdentifier(tableIdentifier, c, values, condition) =>
val table = UnresolvedRelation(tableIdentifier)
val columns = c.map(UnresolvedAttribute(_))
UpdateTable(table, columns, values, condition)
}
}
48 changes: 48 additions & 0 deletions src/main/scala/io/delta/sql/analysis/PreprocessTableUpdate.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Copyright 2019 Databricks, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.delta.sql.analysis

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.analysis.{EliminateSubqueryAliases, UnresolvedAttribute}
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, UpdateTable}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.delta.commands.UpdateCommand
import org.apache.spark.sql.delta.{DeltaErrors, DeltaFullTable, UpdateExpressionsSupport}
import org.apache.spark.sql.internal.SQLConf

class PreprocessTableUpdate(
spark: SparkSession) extends Rule[LogicalPlan] with UpdateExpressionsSupport {

def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperatorsDown {
case update: UpdateTable =>
val index = EliminateSubqueryAliases(update.child) match {
case DeltaFullTable(tahoeFileIndex) =>
tahoeFileIndex
case o =>
throw DeltaErrors.notADeltaSourceException("UPDATE", Some(o))
}

val targetColNameParts =
update.updateColumns.map{col => new UnresolvedAttribute(col.name.split("\\.")).nameParts}

val alignedUpdateExprs = generateUpdateExpressions(
update.child.output, targetColNameParts, update.updateExpressions, conf.resolver)
UpdateCommand(index, update.child, alignedUpdateExprs, update.condition)
}

override def conf: SQLConf = spark.sessionState.conf
}
31 changes: 31 additions & 0 deletions src/main/scala/io/delta/sql/execution/DeltaSqlStrategy.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Copyright 2019 Databricks, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.delta.sql.execution

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.delta.commands.UpdateCommand
import org.apache.spark.sql.execution.{SparkPlan, SparkStrategy}

case class DeltaSqlStrategy(spark: SparkSession) extends SparkStrategy {
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
case u @ UpdateCommand(_, _, _, _) =>
u.run(spark)
Nil
case _ => Nil
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,7 @@ import org.apache.spark.sql.delta.schema.Invariants.{ArbitraryExpression, NotNul
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
import org.apache.spark.sql.catalyst.expressions.{Expression, NonSQLExpression, UnaryExpression}
import org.apache.spark.sql.catalyst.expressions.codegen.{Block, CodegenContext, ExprCode, JavaCode, TrueLiteral}
import org.apache.spark.sql.catalyst.expressions.codegen.Block._
import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode}
import org.apache.spark.sql.types.{DataType, NullType}

/** An expression that validates a specific invariant on a column, before writing into Delta. */
Expand Down Expand Up @@ -55,10 +54,10 @@ case class CheckDeltaInvariant(
null
}

private def generateNotNullCode(ctx: CodegenContext): Block = {
private def generateNotNullCode(ctx: CodegenContext): String = {
val childGen = child.genCode(ctx)
val invariantField = ctx.addReferenceObj("errMsg", invariant)
code"""${childGen.code}
"""${childGen.code}
|
|if (${childGen.isNull}) {
| throw org.apache.spark.sql.delta.schema.InvariantViolationException.apply(
Expand All @@ -67,15 +66,15 @@ case class CheckDeltaInvariant(
""".stripMargin
}

private def generateExpressionValidationCode(expr: Expression, ctx: CodegenContext): Block = {
private def generateExpressionValidationCode(expr: Expression, ctx: CodegenContext): String = {
val resolvedExpr = expr.transform {
case _: UnresolvedAttribute => child
}
val elementValue = child.genCode(ctx)
val childGen = resolvedExpr.genCode(ctx)
val invariantField = ctx.addReferenceObj("errMsg", invariant)
val eValue = ctx.freshName("elementResult")
code"""${elementValue.code}
"""${elementValue.code}
|${childGen.code}
|
|if (${childGen.isNull} || ${childGen.value} == false) {
Expand All @@ -94,6 +93,6 @@ case class CheckDeltaInvariant(
case NotNull => generateNotNullCode(ctx)
case ArbitraryExpression(expr) => generateExpressionValidationCode(expr, ctx)
}
ev.copy(code = code, isNull = TrueLiteral, value = JavaCode.literal("null", NullType))
ev.copy(code = code, isNull = "true", value = null)
}
}
2 changes: 1 addition & 1 deletion version.sbt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version in ThisBuild := "0.4.1-SNAPSHOT"
version in ThisBuild := "0.4.0.carmel0.1-SNAPSHOT"

0 comments on commit 2ac8d31

Please sign in to comment.