Skip to content

Commit

Permalink
[SPARK-48668][SQL] Support ALTER NAMESPACE ... UNSET PROPERTIES in v2
Browse files Browse the repository at this point in the history
  • Loading branch information
panbingkun committed Jun 20, 2024
1 parent 714699b commit 12f8f71
Show file tree
Hide file tree
Showing 15 changed files with 388 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4207,7 +4207,7 @@
},
"UNSET_NONEXISTENT_PROPERTIES" : {
"message" : [
"Attempted to unset non-existent properties [<properties>] in table <table>."
"Attempted to unset non-existent properties [<properties>] in relation <relationId>."
],
"sqlState" : "42K0J"
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,8 @@ statement
(WITH (DBPROPERTIES | PROPERTIES) propertyList))* #createNamespace
| ALTER namespace identifierReference
SET (DBPROPERTIES | PROPERTIES) propertyList #setNamespaceProperties
| ALTER namespace identifierReference
UNSET (DBPROPERTIES | PROPERTIES) (IF EXISTS)? propertyList #unsetNamespaceProperties
| ALTER namespace identifierReference
SET locationSpec #setNamespaceLocation
| DROP namespace (IF EXISTS)? identifierReference
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4662,6 +4662,26 @@ class AstBuilder extends DataTypeAstBuilder with SQLConfHelper with Logging {
}
}

/**
* Parse [[UnsetNamespaceProperties]] commands.
*
* For example:
* {{{
* ALTER (DATABASE|SCHEMA|NAMESPACE) database
* UNSET (DBPROPERTIES | PROPERTIES) [IF EXISTS] ('comment', 'key');
* }}}
*/
override def visitUnsetNamespaceProperties(
ctx: UnsetNamespacePropertiesContext): LogicalPlan = withOrigin(ctx) {
val properties = visitPropertyKeys(ctx.propertyList)
val cleanedProperties = cleanNamespaceProperties(properties.map(_ -> "").toMap, ctx).keys.toSeq
val ifExists = ctx.EXISTS != null
UnsetNamespaceProperties(
withIdentClause(ctx.identifierReference, UnresolvedNamespace(_)),
cleanedProperties,
ifExists)
}

/**
* Create an [[SetTableLocation]] command.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -633,6 +633,23 @@ case class SetNamespaceProperties(
copy(namespace = newChild)
}

/**
* The logical plan of the ALTER NAMESPACE UNSET PROPERTIES command.
*
* The syntax of this command is:
* {{{
* ALTER (DATABASE|SCHEMA|NAMESPACE) ... UNSET (DBPROPERTIES|PROPERTIES) [IF EXISTS] ...;
* }}}
*/
case class UnsetNamespaceProperties(
namespace: LogicalPlan,
propertyKeys: Seq[String],
ifExists: Boolean) extends UnaryCommand {
override def child: LogicalPlan = namespace
override protected def withNewChildInternal(newChild: LogicalPlan): UnsetNamespaceProperties =
copy(namespace = newChild)
}

/**
* The logical plan of the ALTER (DATABASE|SCHEMA|NAMESPACE) ... SET LOCATION command.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2659,12 +2659,12 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase with Compilat
}

def unsetNonExistentPropertiesError(
properties: Seq[String], table: TableIdentifier): Throwable = {
properties: Seq[String], nameParts: Seq[String]): Throwable = {
new AnalysisException(
errorClass = "UNSET_NONEXISTENT_PROPERTIES",
messageParameters = Map(
"properties" -> properties.map(toSQLId).mkString(", "),
"table" -> toSQLId(table.nameParts))
"relationId" -> toSQLId(nameParts))
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -340,7 +340,7 @@ case class AlterTableUnsetPropertiesCommand(
&& key != TableCatalog.PROP_COMMENT)
if (nonexistentKeys.nonEmpty) {
throw QueryCompilationErrors.unsetNonExistentPropertiesError(
nonexistentKeys, table.identifier)
nonexistentKeys, table.identifier.nameParts)
}
}
// If comment is in the table property, we reset it to None
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.execution.datasources.v2

import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.connector.catalog.{NamespaceChange, SupportsNamespaces}
import org.apache.spark.sql.errors.QueryCompilationErrors

/**
* Physical plan node for unsetting properties of namespace.
*/
case class AlterNamespaceUnsetPropertiesExec(
catalog: SupportsNamespaces,
namespace: Seq[String],
propKeys: Seq[String],
ifExists: Boolean) extends LeafV2CommandExec {
override protected def run(): Seq[InternalRow] = {
if (!ifExists) {
val ns = catalog.loadNamespaceMetadata(namespace.toArray)
val nonexistentKeys = propKeys.filter(key => !ns.containsKey(key))
if (nonexistentKeys.nonEmpty) {
throw QueryCompilationErrors.unsetNonExistentPropertiesError(
nonexistentKeys, namespace)
}
}
val changes = propKeys.map {
NamespaceChange.removeProperty
}
catalog.alterNamespace(namespace.toArray, changes: _*)
Seq.empty
}

override def output: Seq[Attribute] = Seq.empty
}
Original file line number Diff line number Diff line change
Expand Up @@ -373,6 +373,9 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat
case SetNamespaceProperties(ResolvedNamespace(catalog, ns, _), properties) =>
AlterNamespaceSetPropertiesExec(catalog.asNamespaceCatalog, ns, properties) :: Nil

case UnsetNamespaceProperties(ResolvedNamespace(catalog, ns, _), keys, ifExists) =>
AlterNamespaceUnsetPropertiesExec(catalog.asNamespaceCatalog, ns, keys, ifExists) :: Nil

case SetNamespaceLocation(ResolvedNamespace(catalog, ns, _), location) =>
if (StringUtils.isEmpty(location)) {
throw QueryExecutionErrors.invalidEmptyLocationError(location)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.execution.command

import org.apache.spark.sql.catalyst.analysis.{AnalysisTest, UnresolvedNamespace}
import org.apache.spark.sql.catalyst.parser.CatalystSqlParser.parsePlan
import org.apache.spark.sql.catalyst.plans.logical.UnsetNamespaceProperties

class AlterNamespaceUnsetPropertiesParserSuite extends AnalysisTest {

test("unset namespace properties") {
Seq("DATABASE", "SCHEMA", "NAMESPACE").foreach { nsToken =>
Seq("PROPERTIES", "DBPROPERTIES").foreach { propToken =>
comparePlans(
parsePlan(s"ALTER $nsToken a.b.c UNSET $propToken ('a', 'b', 'c')"),
UnsetNamespaceProperties(
UnresolvedNamespace(Seq("a", "b", "c")), Seq("a", "b", "c"), ifExists = false))

comparePlans(
parsePlan(s"ALTER $nsToken a.b.c UNSET $propToken IF EXISTS ('a', 'b', 'c')"),
UnsetNamespaceProperties(
UnresolvedNamespace(Seq("a", "b", "c")), Seq("a", "b", "c"), ifExists = true))

comparePlans(
parsePlan(s"ALTER $nsToken a.b.c UNSET $propToken ('a')"),
UnsetNamespaceProperties(
UnresolvedNamespace(Seq("a", "b", "c")), Seq("a"), ifExists = false))
}
}
}

test("property values must not be set") {
val sql = "ALTER NAMESPACE my_db UNSET PROPERTIES('key_without_value', 'key_with_value'='x')"
checkError(
exception = parseException(parsePlan)(sql),
errorClass = "_LEGACY_ERROR_TEMP_0035",
parameters = Map("message" -> "Values should not be specified for key(s): [key_with_value]"),
context = ExpectedContext(
fragment = sql,
start = 0,
stop = 80))
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.execution.command

import org.apache.spark.sql.{AnalysisException, QueryTest}
import org.apache.spark.sql.catalyst.parser.ParseException
import org.apache.spark.sql.connector.catalog.{CatalogV2Util, SupportsNamespaces}
import org.apache.spark.sql.errors.DataTypeErrors.toSQLId
import org.apache.spark.sql.internal.SQLConf

/**
* This base suite contains unified tests for the `ALTER NAMESPACE ... UNSET PROPERTIES` command
* that check V1 and V2 table catalogs. The tests that cannot run for all supported catalogs are
* located in more specific test suites:
*
* - V2 table catalog tests:
* `org.apache.spark.sql.execution.command.v2.AlterNamespaceUnsetPropertiesSuite`
* - V1 table catalog tests:
* `org.apache.spark.sql.execution.command.v1.AlterNamespaceUnsetPropertiesSuiteBase`
* - V1 In-Memory catalog:
* `org.apache.spark.sql.execution.command.v1.AlterNamespaceUnsetPropertiesSuite`
* - V1 Hive External catalog:
* `org.apache.spark.sql.hive.execution.command.AlterNamespaceUnsetPropertiesSuite`
*/
trait AlterNamespaceUnsetPropertiesSuiteBase extends QueryTest with DDLCommandTestUtils {
override val command = "ALTER NAMESPACE ... UNSET PROPERTIES"

protected def namespace: String

protected def getProperties(namespace: String): String = {
val propsRow = sql(s"DESCRIBE NAMESPACE EXTENDED $namespace")
.toDF("key", "value")
.where("key like 'Properties%'")
.collect()
assert(propsRow.length == 1)
propsRow(0).getString(1)
}

test("namespace does not exist") {
val ns = "not_exist"
val e = intercept[AnalysisException] {
sql(s"ALTER NAMESPACE $catalog.$ns UNSET PROPERTIES ('d')")
}
checkError(e,
errorClass = "SCHEMA_NOT_FOUND",
parameters = Map("schemaName" -> s"`$ns`"))
}

test("basic test") {
val ns = s"$catalog.$namespace"
withNamespace(ns) {
sql(s"CREATE NAMESPACE $ns")
assert(getProperties(ns) === "")
sql(s"ALTER NAMESPACE $ns SET PROPERTIES ('a'='a', 'b'='b', 'c'='c')")
assert(getProperties(ns) === "((a,a), (b,b), (c,c))")
sql(s"ALTER NAMESPACE $ns UNSET PROPERTIES ('b')")
assert(getProperties(ns) === "((a,a), (c,c))")

checkError(
exception = intercept[AnalysisException] {
sql(s"ALTER NAMESPACE $ns UNSET PROPERTIES ('b')")
},
errorClass = "UNSET_NONEXISTENT_PROPERTIES",
parameters = Map("properties" -> "`b`", "relationId" -> toSQLId(namespace)))
sql(s"ALTER NAMESPACE $ns UNSET PROPERTIES IF EXISTS ('b')")
}
}

test("test reserved properties") {
import SupportsNamespaces._
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
val ns = s"$catalog.$namespace"
withSQLConf((SQLConf.LEGACY_PROPERTY_NON_RESERVED.key, "false")) {
CatalogV2Util.NAMESPACE_RESERVED_PROPERTIES.filterNot(_ == PROP_COMMENT).foreach { key =>
withNamespace(ns) {
sql(s"CREATE NAMESPACE $ns")
val sqlText = s"ALTER NAMESPACE $ns UNSET PROPERTIES ('$key')"
checkErrorMatchPVals(
exception = intercept[ParseException] {
sql(sqlText)
},
errorClass = "UNSUPPORTED_FEATURE.SET_NAMESPACE_PROPERTY",
parameters = Map(
"property" -> key,
"msg" -> ".*"),
sqlState = None,
context = ExpectedContext(
fragment = sqlText,
start = 0,
stop = 37 + ns.length + key.length)
)
}
}
}
withSQLConf((SQLConf.LEGACY_PROPERTY_NON_RESERVED.key, "true")) {
CatalogV2Util.NAMESPACE_RESERVED_PROPERTIES.filterNot(_ == PROP_COMMENT).foreach { key =>
withNamespace(ns) {
// Set the location explicitly because v2 catalog may not set the default location.
// Without this, `meta.get(key)` below may return null.
sql(s"CREATE NAMESPACE $ns LOCATION 'tmp/prop_test'")
assert(getProperties(ns) === "")
sql(s"ALTER NAMESPACE $ns UNSET PROPERTIES ('$key')")
assert(getProperties(ns) === "", s"$key is a reserved namespace property and ignored")
val meta = spark.sessionState.catalogManager.catalog(catalog)
.asNamespaceCatalog.loadNamespaceMetadata(namespace.split('.'))
assert(!meta.get(key).contains("foo"),
"reserved properties should not have side effects")
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,7 @@ abstract class DDLSuite extends QueryTest with DDLSuiteBase {
errorClass = "UNSET_NONEXISTENT_PROPERTIES",
parameters = Map(
"properties" -> "`test_prop1`, `test_prop2`",
"table" -> "`spark_catalog`.`default`.`test_table`")
"relationId" -> "`spark_catalog`.`default`.`test_table`")
)
}
}
Expand Down Expand Up @@ -1190,7 +1190,7 @@ abstract class DDLSuite extends QueryTest with DDLSuiteBase {
sql("ALTER TABLE tab1 UNSET TBLPROPERTIES ('c', 'xyz')")
},
errorClass = "UNSET_NONEXISTENT_PROPERTIES",
parameters = Map("properties" -> "`xyz`", "table" -> "`spark_catalog`.`dbx`.`tab1`")
parameters = Map("properties" -> "`xyz`", "relationId" -> "`spark_catalog`.`dbx`.`tab1`")
)
// property to unset does not exist, but "IF EXISTS" is specified
sql("ALTER TABLE tab1 UNSET TBLPROPERTIES IF EXISTS ('c', 'xyz')")
Expand Down
Loading

0 comments on commit 12f8f71

Please sign in to comment.