Skip to content

Commit

Permalink
[CARMEL-2255] Add 'CONVERT TO DELTA VACUUM', 'VACUUM AUTO RUN', 'SHOW…
Browse files Browse the repository at this point in the history
… DELTAS' commands (delta-io#16)
  • Loading branch information
LantaoJin authored and GitHub Enterprise committed Mar 1, 2020
1 parent 8ee137d commit 669ce54
Show file tree
Hide file tree
Showing 13 changed files with 323 additions and 112 deletions.
12 changes: 9 additions & 3 deletions src/main/antlr4/io/delta/sql/parser/DeltaSqlBase.g4
Original file line number Diff line number Diff line change
Expand Up @@ -73,13 +73,15 @@ singleStatement
// If you add keywords here that should not be reserved, add them to 'nonReserved' list.
statement
: VACUUM (path=STRING | table=qualifiedName)
(RETAIN number HOURS)? (DRY RUN)? #vacuumTable
(RETAIN number HOURS)? ((DRY | AUTO) RUN)? #vacuumTable
| (DESC | DESCRIBE) DETAIL (path=STRING | table=qualifiedName) #describeDeltaDetail
| GENERATE modeName=identifier FOR TABLE table=qualifiedName #generate
| (DESC | DESCRIBE) HISTORY (path=STRING | table=qualifiedName)
(LIMIT limit=INTEGER_VALUE)? #describeDeltaHistory
| CONVERT TO DELTA table=qualifiedName
(PARTITIONED BY '(' colTypeList ')')? #convert
(PARTITIONED BY '(' colTypeList ')')?
(VACUUM (RETAIN number HOURS)?)? #convert
| SHOW DELTAS #showDeltas
| .*? #passThrough
;

Expand Down Expand Up @@ -122,10 +124,11 @@ number
// Add keywords here so that people's queries don't break if they have a column name as one of
// these tokens
nonReserved
: VACUUM | RETAIN | HOURS | DRY | RUN
: VACUUM | RETAIN | HOURS | DRY | AUTO | RUN
| CONVERT | TO | DELTA | PARTITIONED | BY
| DESC | DESCRIBE | LIMIT | DETAIL
| GENERATE | FOR | TABLE
| SHOW | DELTAS
;

// Define how the keywords above should appear in a user's SQL statement.
Expand All @@ -138,6 +141,7 @@ DESCRIBE: 'DESCRIBE';
DETAIL: 'DETAIL';
GENERATE: 'GENERATE';
DRY: 'DRY';
AUTO: 'AUTO';
HISTORY: 'HISTORY';
HOURS: 'HOURS';
LIMIT: 'LIMIT';
Expand All @@ -151,6 +155,8 @@ RETAIN: 'RETAIN';
RUN: 'RUN';
TO: 'TO';
VACUUM: 'VACUUM';
SHOW: 'SHOW';
DELTAS: 'DELTAS';

STRING
: '\'' ( ~('\''|'\\') | ('\\' .) )* '\''
Expand Down
17 changes: 13 additions & 4 deletions src/main/scala/io/delta/sql/parser/DeltaSqlParser.scala
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,11 @@ import scala.collection.JavaConverters._

import org.apache.spark.sql.delta.commands.DeltaGenerateCommand
import io.delta.sql.parser.DeltaSqlBaseParser._
import io.delta.tables.execution.{DescribeDeltaHistoryCommand, VacuumTableCommand}
import io.delta.tables.execution.{DescribeDeltaHistoryCommand, ShowDeltasCommand, VacuumTableCommand}
import org.antlr.v4.runtime._
import org.antlr.v4.runtime.atn.PredictionMode
import org.antlr.v4.runtime.misc.{Interval, ParseCancellationException}
import org.antlr.v4.runtime.tree._

import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
Expand Down Expand Up @@ -151,7 +150,8 @@ class DeltaSqlAstBuilder extends DeltaSqlBaseBaseVisitor[AnyRef] {
Option(ctx.path).map(string),
Option(ctx.table).map(visitTableIdentifier),
Option(ctx.number).map(_.getText.toDouble),
ctx.RUN != null)
ctx.DRY != null,
ctx.AUTO != null)
}

override def visitDescribeDeltaDetail(
Expand All @@ -176,10 +176,19 @@ class DeltaSqlAstBuilder extends DeltaSqlBaseBaseVisitor[AnyRef] {
}

override def visitConvert(ctx: ConvertContext): LogicalPlan = withOrigin(ctx) {
val properties =
Map("vacuum" -> s"${ctx.VACUUM != null}") ++
Option(ctx.number).map(x => ("horizonHours", x.getText))

ConvertToDeltaCommand(
visitTableIdentifier(ctx.table),
Option(ctx.colTypeList).map(colTypeList => StructType(visitColTypeList(colTypeList))),
None)
None,
properties)
}

override def visitShowDeltas(ctx: ShowDeltasContext): AnyRef = withOrigin(ctx) {
ShowDeltasCommand()
}

override def visitSingleStatement(ctx: SingleStatementContext): LogicalPlan = withOrigin(ctx) {
Expand Down
34 changes: 34 additions & 0 deletions src/main/scala/io/delta/tables/execution/ShowDeltasCommand.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Copyright 2019 Databricks, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.delta.tables.execution

import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
import org.apache.spark.sql.delta.commands.DeltaCommand
import org.apache.spark.sql.delta.services.DeltaTableMetadata
import org.apache.spark.sql.execution.command.RunnableCommand
import org.apache.spark.sql.{Encoders, Row, SparkSession}

case class ShowDeltasCommand() extends RunnableCommand with DeltaCommand {
import org.apache.spark.sql.delta.services.DeltaTableMetadata._

override val output: Seq[Attribute] = Encoders.product[DeltaTableMetadata].schema
.map(f => AttributeReference(f.name, f.dataType, f.nullable, f.metadata)())

override def run(spark: SparkSession): Seq[Row] = {
getRowsFromMetadataTable(spark).toSeq
}
}
42 changes: 40 additions & 2 deletions src/main/scala/io/delta/tables/execution/VacuumTableCommand.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
import org.apache.spark.sql.delta.{DeltaErrors, DeltaLog, DeltaTableIdentifier, DeltaTableUtils}
import org.apache.spark.sql.delta.commands.VacuumCommand
import org.apache.spark.sql.delta.services.DeltaTableMetadata
import org.apache.spark.sql.delta.sources.DeltaSQLConf
import org.apache.spark.sql.execution.command.RunnableCommand
import org.apache.spark.sql.types.StringType

Expand All @@ -35,7 +37,8 @@ case class VacuumTableCommand(
path: Option[String],
table: Option[TableIdentifier],
horizonHours: Option[Double],
dryRun: Boolean) extends RunnableCommand {
dryRun: Boolean,
autoRun: Boolean = false) extends RunnableCommand {

override val output: Seq[Attribute] =
Seq(AttributeReference("path", StringType, nullable = true)())
Expand Down Expand Up @@ -63,6 +66,41 @@ case class VacuumTableCommand(
"VACUUM",
DeltaTableIdentifier(path = Some(pathToVacuum.toString)))
}
VacuumCommand.gc(sparkSession, deltaLog, dryRun, horizonHours).collect()
if (autoRun) {
if (table.isDefined) {
logInfo(s"VACUUM ${table.get.unquotedString} AUTO RUN command won't execute immediately")
updateMetaTable(sparkSession, table.get, pathToVacuum.toString)
Seq.empty[Row]
} else {
throw DeltaErrors.tableOnlySupportedException("VACUUM with AUTO RUN")
}
} else {
VacuumCommand.gc(sparkSession, deltaLog, dryRun, horizonHours).collect()
}
}


private def updateMetaTable(
spark: SparkSession, table: TableIdentifier, pathToVacuum: String): Unit = {
val catalog = spark.sessionState.catalog
val metadata = DeltaTableMetadata(
table.database.getOrElse(catalog.getCurrentDatabase),
table.table,
catalog.getCurrentUser,
pathToVacuum,
vacuum = true,
horizonHours.getOrElse(7 * 24D).toLong)
val res = DeltaTableMetadata.updateMetadataTable(spark, metadata)
if (res) {
logInfo(s"Update ${table.identifier} in delta metadata table")
} else {
logWarning(
s"""
|${DeltaSQLConf.META_TABLE_IDENTIFIER.key} may not be created.
|Skip to store delta metadata to ${DeltaTableMetadata.deltaMetaTableIdentifier(spark)}.
|This is triggered by command:\n
|VACUUM ${table.identifier} (RETAIN number HOURS)? AUTO RUN;
|""".stripMargin)
}
}
}
4 changes: 4 additions & 0 deletions src/main/scala/org/apache/spark/sql/delta/DeltaErrors.scala
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,10 @@ object DeltaErrors
new AnalysisException(s"Table is not supported in $operation. Please use a path instead.")
}

def tableOnlySupportedException(operation: String): Throwable = {
new AnalysisException(s"Table is only supported in $operation. Please use a table instead.")
}

def vacuumBasePathMissingException(baseDeltaPath: Path): Throwable = {
new AnalysisException(
s"Please provide the base path ($baseDeltaPath) when Vacuuming Delta tables. " +
Expand Down
12 changes: 7 additions & 5 deletions src/main/scala/org/apache/spark/sql/delta/DeltaOperations.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,11 @@

package org.apache.spark.sql.delta

import org.apache.spark.sql.delta.actions.{Metadata, Protocol}
import org.apache.spark.sql.delta.util.JsonUtils

import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
import org.apache.spark.sql.delta.actions.{Metadata, Protocol}
import org.apache.spark.sql.delta.commands.ConvertProperties
import org.apache.spark.sql.delta.util.JsonUtils
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.types.{StructField, StructType}

Expand Down Expand Up @@ -102,11 +102,13 @@ object DeltaOperations {
numFiles: Long,
partitionBy: Seq[String],
collectStats: Boolean,
catalogTable: Option[String]) extends Operation("CONVERT") {
convertProperties: ConvertProperties) extends Operation("CONVERT") {
override val parameters: Map[String, Any] = Map(
"numFiles" -> numFiles,
"partitionedBy" -> JsonUtils.toJson(partitionBy),
"collectStats" -> collectStats) ++ catalogTable.map("catalogTable" -> _)
"collectStats" -> collectStats,
"targetDir" -> convertProperties.targetDir) ++
convertProperties.catalogTable.map("catalogTable" -> _.identifier.unquotedString)
}
/** Recorded when optimizing the table. */
case class Optimize(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import org.apache.spark.sql.catalyst.{QualifiedTableName, TableIdentifier}
import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogUtils}
import org.apache.spark.sql.catalyst.analysis.Analyzer
import org.apache.spark.sql.catalyst.expressions.Cast
import org.apache.spark.sql.delta.services.DeltaTableMetadata
import org.apache.spark.sql.execution.command.RunnableCommand
import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, ParquetToSparkSchemaConverter}
import org.apache.spark.sql.internal.SQLConf
Expand Down Expand Up @@ -66,14 +67,15 @@ import org.apache.spark.util.{SerializableConfiguration, Utils}
abstract class ConvertToDeltaCommandBase(
tableIdentifier: TableIdentifier,
partitionSchema: Option[StructType],
deltaPath: Option[String]) extends RunnableCommand with DeltaCommand {
deltaPath: Option[String],
properties: Map[String, String] = Map.empty) extends RunnableCommand with DeltaCommand {

lazy val partitionColNames : Seq[String] = partitionSchema.map(_.fieldNames.toSeq).getOrElse(Nil)
lazy val partitionFields : Seq[StructField] = partitionSchema.map(_.fields.toSeq).getOrElse(Nil)
val timestampPartitionPattern = "yyyy-MM-dd HH:mm:ss[.S]"

override def run(spark: SparkSession): Seq[Row] = {
val convertProperties = getConvertProperties(spark, tableIdentifier)
val convertProperties = getConvertProperties(spark, tableIdentifier, properties)

convertProperties.provider match {
case Some(providerName) => providerName.toLowerCase(Locale.ROOT) match {
Expand Down Expand Up @@ -102,14 +104,15 @@ abstract class ConvertToDeltaCommandBase(

protected def getConvertProperties(
spark: SparkSession,
tableIdentifier: TableIdentifier): ConvertProperties = {
tableIdentifier: TableIdentifier,
properties: Map[String, String]): ConvertProperties = {
def convertForPath(tableIdentifier: TableIdentifier): ConvertProperties = {
// convert to delta format.`path`
ConvertProperties(
None,
tableIdentifier.database,
tableIdentifier.table,
Map.empty[String, String])
properties)
}

def convertForTable(tableIdentifier: TableIdentifier): ConvertProperties = {
Expand Down Expand Up @@ -255,7 +258,7 @@ abstract class ConvertToDeltaCommandBase(
numFiles,
partitionColNames,
collectStats = false,
None))
convertProperties))
} finally {
fileListResultDf.unpersist()
}
Expand Down Expand Up @@ -409,6 +412,7 @@ abstract class ConvertToDeltaCommandBase(
}

logInfo(s"Committed delta #$firstVersion to ${deltaLog.logPath}")
saveToMetaTable(spark, op.convertProperties)

try {
deltaLog.checkpoint()
Expand Down Expand Up @@ -527,15 +531,44 @@ abstract class ConvertToDeltaCommandBase(
}
}

protected case class ConvertProperties(
catalogTable: Option[CatalogTable],
provider: Option[String],
targetDir: String,
properties: Map[String, String])
private def saveToMetaTable(spark: SparkSession, convertProperties: ConvertProperties): Unit = {
convertProperties.catalogTable.foreach { table =>
val vacuum = convertProperties.properties.getOrElse("vacuum", "false").toBoolean
val horizonHours = convertProperties.properties.get("horizonHours").map(_.toLong)
val metadata = DeltaTableMetadata(
table.identifier.database.getOrElse(""),
table.identifier.table,
spark.sessionState.catalog.getCurrentUser,
convertProperties.targetDir,
vacuum,
horizonHours.getOrElse(7 * 24))
val res = DeltaTableMetadata.insertIntoMetadataTable(spark, metadata)
if (res) {
logInfo(s"Insert ${table.identifier} into delta metadata table")
} else {
logWarning(
s"""
|${DeltaSQLConf.META_TABLE_IDENTIFIER.key} may not be created.
|Skip to store delta metadata to ${DeltaTableMetadata.deltaMetaTableIdentifier(spark)}.
|This is triggered by command:\n
|CONVERT TO DELTA ${table.identifier} ${if (vacuum) "VACUUM" else ""}
|${if (horizonHours.isDefined) s" RETAIN ${horizonHours.get} HOURS" else ""}
|""".stripMargin)
}
}
}
}

case class ConvertProperties(
catalogTable: Option[CatalogTable],
provider: Option[String],
targetDir: String,
properties: Map[String, String])

case class ConvertToDeltaCommand(
tableIdentifier: TableIdentifier,
partitionSchema: Option[StructType],
deltaPath: Option[String])
extends ConvertToDeltaCommandBase(tableIdentifier, partitionSchema, deltaPath)
deltaPath: Option[String],
properties: Map[String, String] = Map.empty)
extends ConvertToDeltaCommandBase(tableIdentifier, partitionSchema, deltaPath, properties) {
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ import org.apache.spark.sql.types.BooleanType
* @param condition Condition for a source row to match with a target row
* @param updateClause Info related to matched clauses.
*/
case class UpdateWithJoinCommand(
case class UpdateWithJoinCommand(
@transient source: LogicalPlan,
@transient target: LogicalPlan,
@transient targetFileIndex: TahoeFileIndex,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import org.apache.spark.sql.delta.util.DeltaFileOperations
import org.apache.spark.sql.delta.util.DeltaFileOperations.tryDeleteNonRecursive
import com.fasterxml.jackson.databind.annotation.JsonDeserialize
import org.apache.hadoop.fs.{FileSystem, Path}

import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
import org.apache.spark.util.{Clock, SerializableConfiguration, SystemClock}

Expand Down
Loading

0 comments on commit 669ce54

Please sign in to comment.