Skip to content

Commit

Permalink
Patch for delta-io#1032. Updates to ANTLR Grammar, Parser to support …
Browse files Browse the repository at this point in the history
…SHOW CREATE TABLE, implementation of command and tests. Added missing property constants to DeltaTableV2, consistent with CatalogTable.
  • Loading branch information
zpappa committed Jul 6, 2022
1 parent 5d3d73f commit 7949d7e
Show file tree
Hide file tree
Showing 5 changed files with 334 additions and 0 deletions.
3 changes: 3 additions & 0 deletions core/src/main/antlr4/io/delta/sql/parser/DeltaSqlBase.g4
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ statement
: VACUUM (path=STRING | table=qualifiedName)
(RETAIN number HOURS)? (DRY RUN)? #vacuumTable
| (DESC | DESCRIBE) DETAIL (path=STRING | table=qualifiedName) #describeDeltaDetail
| SHOW CREATE TABLE table=qualifiedName #showCreateTable
| GENERATE modeName=identifier FOR TABLE table=qualifiedName #generate
| (DESC | DESCRIBE) HISTORY (path=STRING | table=qualifiedName)
(LIMIT limit=INTEGER_VALUE)? #describeDeltaHistory
Expand Down Expand Up @@ -176,6 +177,7 @@ COMMA: ',';
COMMENT: 'COMMENT';
CONSTRAINT: 'CONSTRAINT';
CONVERT: 'CONVERT';
CREATE: 'CREATE';
DELTA: 'DELTA';
DESC: 'DESC';
DESCRIBE: 'DESCRIBE';
Expand All @@ -201,6 +203,7 @@ RESTORE: 'RESTORE';
RETAIN: 'RETAIN';
RIGHT_PAREN: ')';
RUN: 'RUN';
SHOW: 'SHOW';
SYSTEM_TIME: 'SYSTEM_TIME';
SYSTEM_VERSION: 'SYSTEM_VERSION';
TO: 'TO';
Expand Down
4 changes: 4 additions & 0 deletions core/src/main/scala/io/delta/sql/parser/DeltaSqlParser.scala
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,10 @@ class DeltaSqlAstBuilder extends DeltaSqlBaseBaseVisitor[AnyRef] {
Option(ctx.table).map(visitTableIdentifier))
}

override def visitShowCreateTable(ctx: DeltaSqlBaseParser.ShowCreateTableContext): LogicalPlan = withOrigin(ctx) {
ShowCreateTableCommand(Option(ctx.table).map(visitTableIdentifier))
}

override def visitDescribeDeltaHistory(
ctx: DescribeDeltaHistoryContext): LogicalPlan = withOrigin(ctx) {
DescribeDeltaHistoryCommand(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -289,3 +289,10 @@ private class WriteIntoDeltaBuilder(
}
}
}

object DeltaTableV2 {
val PROP_LOCATION = "location"
val PROP_TYPE = "Type"
val PROP_DELTA_MIN_READER_VERSION = "delta.minReaderVersion"
val PROP_DELTA_MAX_READER_VERSION = "delta.maxReaderVersion"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
/*
* Copyright (2021) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.delta.commands
import org.apache.hadoop.fs.Path

import scala.collection.JavaConverters._
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.util.CharVarcharUtils
import org.apache.spark.sql.connector.catalog.{CatalogV2Util, Table, TableCatalog}
import org.apache.spark.sql.catalyst.util.escapeSingleQuotedString
import org.apache.spark.sql.delta.catalog.DeltaTableV2
import org.apache.spark.sql.execution.command.LeafRunnableCommand
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.unsafe.types.UTF8String

import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer

/**
* A command for describing the details of a table such as the format, name, and size.
*/
case class ShowCreateTableCommand(table: Option[TableIdentifier])
extends LeafRunnableCommand with DeltaCommand {

case class ShowCreateTableOutput(ddl: String)

override val output: Seq[Attribute] = ExpressionEncoder[ShowCreateTableOutput]().
schema.toAttributes

override def run(sparkSession: SparkSession): Seq[Row] = {
val builder = new mutable.StringBuilder
val md = sparkSession.sessionState.catalog.getTableMetadata(table.get)
val dt = DeltaTableV2(sparkSession, new Path(md.location), Option(md))
showCreateTable(dt, builder)
Seq(Row(UTF8String.fromString(builder.toString)))
}

private def showCreateTable(table: Table, builder: mutable.StringBuilder): Unit = {
builder ++= s"CREATE TABLE ${table.name()} "

showTableDataColumns(table, builder)
showTableUsing(table, builder)

val tableOptions = table.properties.asScala
.filterKeys(_.startsWith(TableCatalog.OPTION_PREFIX)).map {
case (k, v) => k.drop(TableCatalog.OPTION_PREFIX.length) -> v
}.toMap
showTableOptions(builder, tableOptions)
showTablePartitioning(table, builder)
showTableComment(table, builder)
showTableLocation(table, builder)
showTableProperties(table, builder, tableOptions)
}

private def showTableDataColumns(table: Table, builder: mutable.StringBuilder): Unit = {
val columns = CharVarcharUtils.getRawSchema(table.schema()).fields.map(_.toDDL)
builder ++= concatByMultiLines(columns)
}

private def showTableUsing(table: Table, builder: mutable.StringBuilder): Unit = {
Option(table.properties.get(TableCatalog.PROP_PROVIDER))
.map("USING " + escapeSingleQuotedString(_) + "\n")
.foreach(builder.append)
}

private def showTableOptions(
builder: mutable.StringBuilder,
tableOptions: Map[String, String]): Unit = {
if (tableOptions.nonEmpty) {
val props = conf.redactOptions(tableOptions).toSeq.sortBy(_._1).map {
case (key, value) =>
s"'${escapeSingleQuotedString(key)}' = '${escapeSingleQuotedString(value)}'"
}
builder ++= "OPTIONS "
builder ++= concatByMultiLines(props)
}
}

private def showTablePartitioning(table: Table, builder: mutable.StringBuilder): Unit = {
if (!table.partitioning.isEmpty) {
val transforms = new ArrayBuffer[String]
table.partitioning.map {t =>
transforms += t.describe()
}
if (transforms.nonEmpty) {
builder ++= s"PARTITIONED BY ${transforms.mkString("(", ", ", ")")}\n"
}
}
}

private def showTableLocation(table: Table, builder: mutable.StringBuilder): Unit = {
val isExternalOption = Option(table.properties().get(DeltaTableV2.PROP_TYPE))
// Only generate LOCATION clause if it's not managed.
if (isExternalOption.forall(_.equalsIgnoreCase("EXTERNAL"))) {
Option(table.properties.get(DeltaTableV2.PROP_LOCATION))
.map("LOCATION '" + escapeSingleQuotedString(_) + "'\n")
.foreach(builder.append)
}
}

private def showTableProperties(
table: Table,
builder: mutable.StringBuilder,
tableOptions: Map[String, String]): Unit = {

val showProps = table.properties.asScala
.filterKeys(key => !CatalogV2Util.TABLE_RESERVED_PROPERTIES.contains(key)
&& !key.startsWith(TableCatalog.OPTION_PREFIX)
&& !tableOptions.contains(key))
if (showProps.nonEmpty) {
val props = showProps.toSeq.sortBy(_._1).map {
case (key, value) =>
s"'${escapeSingleQuotedString(key)}' = '${escapeSingleQuotedString(value)}'"
}

builder ++= "TBLPROPERTIES "
builder ++= concatByMultiLines(props)
}
}

private def showTableComment(table: Table, builder: mutable.StringBuilder): Unit = {
Option(table.properties.get(TableCatalog.PROP_COMMENT))
.map("COMMENT '" + escapeSingleQuotedString(_) + "'\n")
.foreach(builder.append)
}

private def concatByMultiLines(iter: Iterable[String]): String = {
iter.mkString("(\n ", ",\n ", ")\n")
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
/*
* Copyright (2021) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.delta

// scalastyle:off import.ordering.noEmptyLine
import org.apache.spark.sql.QueryTest
import org.apache.spark.sql.delta.test.DeltaSQLCommandTest
import org.apache.spark.sql.test.SharedSparkSession

import java.io.File
import scala.reflect.io.Directory

class DeltaShowCreateTableSuite extends QueryTest with SharedSparkSession with DeltaSQLCommandTest {

test(testName = "Test DDL Output for External Table SHOW CREATE TABLE") {
withTempDir { foo =>
val table = "external_table"
val fooPath = foo.getCanonicalPath
sql(s"CREATE TABLE `$table` (id LONG) USING delta LOCATION '$fooPath'")
val ddl = getShowCreateTable(table)
assert(ddl.contains("CREATE TABLE"))
assert(ddl.contains("'external' = 'true'"))
assert(ddl.contains(s"LOCATION 'file:$fooPath'"))
assert(ddl.contains("USING delta"))
}
}

test(testName = "Test DDL Output for Managed Table SHOW CREATE TABLE") {
val table = "managed_table"
sql(s"CREATE TABLE `$table` (id LONG) USING delta")
val ddl = getShowCreateTable(table)
assert(ddl.contains("CREATE TABLE"))
assert(!ddl.contains("'external' = 'false'"))
assert(ddl.contains("'Type' = 'MANAGED'"))
assert(ddl.contains("USING delta"))
deleteTableAndData(table)
}

test(testName = "Test Recreate table using DDL SHOW CREATE TABLE") {
val table = "managed_table"
sql(s"CREATE TABLE `$table` (id LONG) USING delta")
val ddl = getShowCreateTable(table)
deleteTableAndData(table)
sql(ddl)
deleteTableAndData(table)
}

test(testName = "Test DDL Idempotency SHOW CREATE TABLE") {
val table = "managed_table"
sql(s"CREATE TABLE `$table` (id LONG) USING delta")
val ddl = getShowCreateTable(table)
deleteTableAndData(table)
sql(ddl)
val ddl1 = getShowCreateTable(table)
deleteTableAndData(table)
assert(ddl1.equals(ddl)) // table DDL are the same
}

test(testName = "Test DDL Comment SHOW CREATE TABLE") {
val table = "managed_table"
val comment = "This is a random comment on the table"
sql(s"CREATE TABLE `$table` (id LONG) USING delta COMMENT '$comment'")
val ddl = getShowCreateTable(table)
deleteTableAndData(table)
assert(ddl.contains(s"COMMENT '$comment'"))
}

test(testName = "Test DDL Partition SHOW CREATE TABLE") {
val table = "managed_table"
sql(
s"""CREATE TABLE $table (id INT, name STRING, age INT) USING delta
| PARTITIONED BY (age)""".stripMargin)
val ddl = getShowCreateTable(table)
assert(ddl.contains("PARTITIONED BY (age)"))
deleteTableAndData(table)
}

test(testName = "Test DDL Random Table Property SHOW CREATE TABLE") {
val table = "managed_table"
sql(
s"""CREATE TABLE $table (id INT, name STRING, age INT) USING delta
| PARTITIONED BY (age) TBLPROPERTIES
| ('foo'='bar','bar'='foo')""".stripMargin)
val ddl = getShowCreateTable(table)
deleteTableAndData(table)
assert(ddl.contains("PARTITIONED BY (age)"))
assert(ddl.contains("TBLPROPERTIES"))
assert(ddl.contains("'foo' = 'bar'"))
assert(ddl.contains("'bar' = 'foo'"))
}

test(testName = "Test DDL with full variations SHOW CREATE TABLE") {
withTempDir { foo =>
val table = "some_external_table"
val fooPath = foo.getCanonicalPath
sql(
s"""CREATE TABLE $table (id INT COMMENT "some id", name STRING, age INT) USING delta
| LOCATION "$fooPath"
| PARTITIONED BY (age)
| TBLPROPERTIES
| ('foo'='bar','bar'='foo')
| COMMENT "some comment"""".stripMargin)
val ddl = getShowCreateTable(table)
deleteTableAndData(table)
assert(ddl.contains("CREATE TABLE"))
assert(ddl.contains("`id` INT COMMENT 'some id'"))
assert(ddl.contains("'Type' = 'EXTERNAL'"))
assert(ddl.contains(s"LOCATION 'file:$fooPath'"))
assert(ddl.contains("USING delta"))
assert(ddl.contains("PARTITIONED BY (age)"))
assert(ddl.contains("TBLPROPERTIES"))
assert(ddl.contains("'foo' = 'bar'"))
assert(ddl.contains("'bar' = 'foo'"))
}
}

test(testName = "Test DDL with full variations Recreate from DDL SHOW CREATE TABLE") {
withTempDir { foo =>
val table = "some_external_table"
val fooPath = foo.getCanonicalPath
sql(
s"""CREATE TABLE $table (id INT COMMENT "some id", name STRING, age INT) USING delta
| LOCATION "$fooPath"
| PARTITIONED BY (age)
| TBLPROPERTIES
| ('foo'='bar','bar'='foo')
| COMMENT "some comment"""".stripMargin)
val ddl = getShowCreateTable(table)
deleteTableAndData(table)
sql(ddl)
val ddl1 = getShowCreateTable(table)
assert(ddl.equals(ddl1))
}
}

/**
* Delete managed table and ensure that the disk cleanup occurs
* right away so the table can be immediately recreated
* @param tableName name of the table
*/
def deleteTableAndData(tableName: String): Unit = {
val ddf = sql(s"DESCRIBE FORMATTED $tableName")
val location = ddf.collect.toSeq.filter(x =>
x.get(0).toString=="Location").head.get(1).toString
sql(s"DROP TABLE `$tableName`")
val directory = new Directory(new File(location.substring(5)))
directory.deleteRecursively()
}

/**
* Helper method to get the show create table output as a string
* @param tableName name of the table
* @return ddl of the table
*/
def getShowCreateTable(tableName: String): String = {
val df = sql(s"SHOW CREATE TABLE $tableName")
val rows = df.collectAsList()
rows.get(0).get(0).toString
}
}

0 comments on commit 7949d7e

Please sign in to comment.