Skip to content

Commit

Permalink
[SPARK-48760][SQL][DOCS][FOLLOWUP] Add CLUSTER BY to doc `sql-ref-s…
Browse files Browse the repository at this point in the history
…yntax-ddl-alter-table.md`

### What changes were proposed in this pull request?
The pr is  following up apache#47156, aims to
- add `CLUSTER BY` to doc `sql-ref-syntax-ddl-alter-table.md`
- move parser tests from `o.a.s.s.c.p.DDLParserSuite` to `AlterTableClusterByParserSuite`
- use `checkError` to check exception in `o.a.s.s.e.c.AlterTableClusterBySuiteBase`

### Why are the changes needed?
- Enable the doc `sql-ref-syntax-ddl-alter-table.md` to cover new syntax `ALTER TABLE ... CLUSTER BY ...`.
- Align with other similar tests, eg: AlterTableRename*

### Does this PR introduce _any_ user-facing change?
Yes, Make end-users can query the explanation of `CLUSTER BY` through the doc `sql-ref-syntax-ddl-alter-table.md`.

### How was this patch tested?
Updated UT.

### Was this patch authored or co-authored using generative AI tooling?
No.

Closes apache#47254 from panbingkun/SPARK-48760_FOLLOWUP.

Authored-by: panbingkun <panbingkun@baidu.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
  • Loading branch information
panbingkun authored and cloud-fan committed Jul 9, 2024
1 parent 43b6718 commit fdbacdf
Show file tree
Hide file tree
Showing 7 changed files with 142 additions and 41 deletions.
71 changes: 71 additions & 0 deletions docs/sql-ref-syntax-ddl-alter-table.md
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,32 @@ ALTER TABLE table_identifier DROP [ IF EXISTS ] partition_spec [PURGE]
Partition to be dropped. Note that one can use a typed literal (e.g., date'2019-01-02') in the partition spec.

**Syntax:** `PARTITION ( partition_col_name = partition_col_val [ , ... ] )`

#### CLUSTER BY

`ALTER TABLE CLUSTER BY` command can also be used for changing or removing the clustering columns for existing tables.

##### Syntax

```sql
-- Changing Clustering Columns
ALTER TABLE table_identifier CLUSTER BY ( col_name [ , ... ] )

-- Removing Clustering Columns
ALTER TABLE table_identifier CLUSTER BY NONE
```

#### Parameters

* **table_identifier**

Specifies a table name, which may be optionally qualified with a database name.

**Syntax:** `[ database_name. ] table_name`

* **col_name**

Specifies the name of the column.

### SET AND UNSET

Expand Down Expand Up @@ -596,6 +622,51 @@ SHOW PARTITIONS StudentInfo;
| age=20|
+---------+

-- CLUSTER BY
DESC Teacher;
+------------------------+---------+-------+
| col_name|data_type|comment|
+------------------------+---------+-------+
| name| string| NULL|
| gender| string| NULL|
| country| string| NULL|
| age| int| NULL|
|# Clustering Information| | |
| # col_name|data_type|comment|
| gender| string| NULL|
+------------------------+---------+-------+

ALTER TABLE Teacher CLUSTER BY (gender, country);

-- After changing clustering columns
DESC Teacher;
+------------------------+---------+-------+
| col_name|data_type|comment|
+------------------------+---------+-------+
| name| string| NULL|
| gender| string| NULL|
| country| string| NULL|
| age| int| NULL|
|# Clustering Information| | |
| # col_name|data_type|comment|
| gender| string| NULL|
| country| string| NULL|
+------------------------+---------+-------+

ALTER TABLE Teacher CLUSTER BY NONE;

-- After removing clustering columns
DESC Teacher;
+------------------------+---------+-------+
| col_name|data_type|comment|
+------------------------+---------+-------+
| name| string| NULL|
| gender| string| NULL|
| country| string| NULL|
| age| int| NULL|
|# Clustering Information| | |
+------------------------+---------+-------+

-- Change the fileformat
ALTER TABLE loc_orc SET fileformat orc;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import java.util.Locale

import org.apache.spark.SparkThrowable
import org.apache.spark.sql.catalyst.analysis._
import org.apache.spark.sql.catalyst.catalog.ClusterBySpec
import org.apache.spark.sql.catalyst.expressions.{EqualTo, Hex, Literal}
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.connector.catalog.TableChange.ColumnPosition.{after, first}
Expand Down Expand Up @@ -236,23 +235,6 @@ class DDLParserSuite extends AnalysisTest {
}
}

test("alter table cluster by") {
comparePlans(
parsePlan("ALTER TABLE table_name CLUSTER BY (`a.b`, c.d, none)"),
AlterTableClusterBy(
UnresolvedTable(Seq("table_name"), "ALTER TABLE ... CLUSTER BY"),
Some(ClusterBySpec(Seq(
FieldReference(Seq("a.b")),
FieldReference(Seq("c", "d")),
FieldReference(Seq("none")))))))

comparePlans(
parsePlan("ALTER TABLE table_name CLUSTER BY NONE"),
AlterTableClusterBy(
UnresolvedTable(Seq("table_name"), "ALTER TABLE ... CLUSTER BY"),
None))
}

test("create/replace table - with comment") {
val createSql = "CREATE TABLE my_tab(a INT, b STRING) USING parquet COMMENT 'abc'"
val replaceSql = "REPLACE TABLE my_tab(a INT, b STRING) USING parquet COMMENT 'abc'"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,13 +103,10 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)
builder.build())
AlterTableChangeColumnCommand(table.catalogTable.identifier, colName, newColumn)

case AlterTableClusterBy(ResolvedTable(catalog, ident, table: V1Table, _), clusterBySpecOpt)
case AlterTableClusterBy(ResolvedTable(catalog, _, table: V1Table, _), clusterBySpecOpt)
if isSessionCatalog(catalog) =>
val prop = clusterBySpecOpt.map { clusterBySpec =>
Map(ClusterBySpec.toProperty(table.schema, clusterBySpec, conf.resolver))
}.getOrElse {
Map(ClusterBySpec.toProperty(table.schema, ClusterBySpec(Nil), conf.resolver))
}
val prop = Map(ClusterBySpec.toProperty(table.schema,
clusterBySpecOpt.getOrElse(ClusterBySpec(Nil)), conf.resolver))
AlterTableSetPropertiesCommand(table.catalogTable.identifier, prop, isView = false)

case RenameColumn(ResolvedV1TableIdentifier(ident), _, _) =>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.execution.command

import org.apache.spark.sql.catalyst.analysis.{AnalysisTest, UnresolvedTable}
import org.apache.spark.sql.catalyst.catalog.ClusterBySpec
import org.apache.spark.sql.catalyst.parser.CatalystSqlParser.parsePlan
import org.apache.spark.sql.catalyst.plans.logical.AlterTableClusterBy
import org.apache.spark.sql.connector.expressions.FieldReference
import org.apache.spark.sql.test.SharedSparkSession

class AlterTableClusterByParserSuite extends AnalysisTest with SharedSparkSession {

test("alter table cluster by") {
comparePlans(
parsePlan("ALTER TABLE table_name CLUSTER BY (`a.b`, c.d, none)"),
AlterTableClusterBy(
UnresolvedTable(Seq("table_name"), "ALTER TABLE ... CLUSTER BY"),
Some(ClusterBySpec(Seq(
FieldReference(Seq("a.b")),
FieldReference(Seq("c", "d")),
FieldReference(Seq("none")))))))

comparePlans(
parsePlan("ALTER TABLE table_name CLUSTER BY NONE"),
AlterTableClusterBy(
UnresolvedTable(Seq("table_name"), "ALTER TABLE ... CLUSTER BY"),
None))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,45 +45,52 @@ trait AlterTableClusterBySuiteBase extends QueryTest with DDLCommandTestUtils {

test("test basic ALTER TABLE with clustering columns") {
withNamespaceAndTable("ns", "table") { tbl =>
spark.sql(s"CREATE TABLE $tbl (id INT, data STRING) $defaultUsing CLUSTER BY (id, data)")
sql(s"CREATE TABLE $tbl (id INT, data STRING) $defaultUsing CLUSTER BY (id, data)")
validateClusterBy(tbl, Seq("id", "data"))
spark.sql(s"ALTER TABLE $tbl CLUSTER BY (data, id)")
sql(s"ALTER TABLE $tbl CLUSTER BY (data, id)")
validateClusterBy(tbl, Seq("data", "id"))
spark.sql(s"ALTER TABLE $tbl CLUSTER BY NONE")
sql(s"ALTER TABLE $tbl CLUSTER BY NONE")
validateClusterBy(tbl, Seq.empty)
spark.sql(s"ALTER TABLE $tbl CLUSTER BY (id)")
sql(s"ALTER TABLE $tbl CLUSTER BY (id)")
validateClusterBy(tbl, Seq("id"))
}
}

test("test clustering columns with comma") {
withNamespaceAndTable("ns", "table") { tbl =>
spark.sql(s"CREATE TABLE $tbl (`i,d` INT, data STRING) $defaultUsing " +
"CLUSTER BY (`i,d`, data)")
sql(s"CREATE TABLE $tbl (`i,d` INT, data STRING) $defaultUsing CLUSTER BY (`i,d`, data)")
validateClusterBy(tbl, Seq("`i,d`", "data"))
spark.sql(s"ALTER TABLE $tbl CLUSTER BY (data, `i,d`)")
sql(s"ALTER TABLE $tbl CLUSTER BY (data, `i,d`)")
validateClusterBy(tbl, Seq("data", "`i,d`"))
}
}

test("test nested clustering columns") {
withNamespaceAndTable("ns", "table") { tbl =>
spark.sql(s"CREATE TABLE $tbl " +
sql(s"CREATE TABLE $tbl " +
s"($nestedColumnSchema) " +
s"$defaultUsing CLUSTER BY (${nestedClusteringColumns.mkString(",")})")
validateClusterBy(tbl, nestedClusteringColumns)
spark.sql(s"ALTER TABLE $tbl CLUSTER BY (${nestedClusteringColumnsNew.mkString(",")})")
sql(s"ALTER TABLE $tbl CLUSTER BY (${nestedClusteringColumnsNew.mkString(",")})")
validateClusterBy(tbl, nestedClusteringColumnsNew)
}
}

test("clustering columns not defined in schema") {
withNamespaceAndTable("ns", "table") { tbl =>
sql(s"CREATE TABLE $tbl (id bigint, data string) $defaultUsing CLUSTER BY (id)")
val err = intercept[AnalysisException] {
sql(s"ALTER TABLE $tbl CLUSTER BY (unknown)")
}
assert(err.message.contains("Couldn't find column unknown in:"))
checkError(
exception = intercept[AnalysisException] {
sql(s"ALTER TABLE $tbl CLUSTER BY (unknown)")
},
errorClass = "_LEGACY_ERROR_TEMP_3060",
parameters = Map("i" -> "unknown",
"schema" ->
"""root
| |-- id: long (nullable = true)
| |-- data: string (nullable = true)
|""".stripMargin)
)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ trait AlterTableClusterBySuiteBase extends command.AlterTableClusterBySuiteBase
override def validateClusterBy(tableName: String, clusteringColumns: Seq[String]): Unit = {
val catalog = spark.sessionState.catalog
val (_, db, t) = parseTableName(tableName)
val table = catalog.getTableMetadata(TableIdentifier.apply(t, Some(db)))
val table = catalog.getTableMetadata(TableIdentifier(t, Some(db)))
assert(table.clusterBySpec === Some(ClusterBySpec(clusteringColumns.map(FieldReference(_)))))
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,13 @@ class AlterTableClusterBySuite extends command.AlterTableClusterBySuiteBase

test("test REPLACE TABLE with clustering columns") {
withNamespaceAndTable("ns", "table") { tbl =>
spark.sql(s"CREATE TABLE $tbl (id INT) $defaultUsing CLUSTER BY (id)")
sql(s"CREATE TABLE $tbl (id INT) $defaultUsing CLUSTER BY (id)")
validateClusterBy(tbl, Seq("id"))

spark.sql(s"REPLACE TABLE $tbl (id INT, id2 INT) $defaultUsing CLUSTER BY (id2)")
sql(s"REPLACE TABLE $tbl (id INT, id2 INT) $defaultUsing CLUSTER BY (id2)")
validateClusterBy(tbl, Seq("id2"))

spark.sql(s"ALTER TABLE $tbl CLUSTER BY (id)")
sql(s"ALTER TABLE $tbl CLUSTER BY (id)")
validateClusterBy(tbl, Seq("id"))
}
}
Expand Down

0 comments on commit fdbacdf

Please sign in to comment.