Skip to content

Commit

Permalink
Merge pull request #1664 from apache/master
Browse files Browse the repository at this point in the history
Create a new pull request by comparing changes across two branches
  • Loading branch information
GulajavaMinistudio authored Jul 26, 2024
2 parents b4ab952 + 2363aec commit a639575
Show file tree
Hide file tree
Showing 265 changed files with 4,821 additions and 2,638 deletions.
3 changes: 2 additions & 1 deletion R/pkg/tests/fulltests/test_sparkSQL.R
Original file line number Diff line number Diff line change
Expand Up @@ -4152,7 +4152,8 @@ test_that("catalog APIs, listTables, getTable, listColumns, listFunctions, funct
c <- listColumns("cars")
expect_equal(nrow(c), 2)
expect_equal(colnames(c),
c("name", "description", "dataType", "nullable", "isPartition", "isBucket"))
c("name", "description", "dataType", "nullable", "isPartition", "isBucket",
"isCluster"))
expect_equal(collect(c)[[1]][[1]], "speed")
expect_error(listColumns("zxwtyswklpf", "default"),
"[TABLE_OR_VIEW_NOT_FOUND]*`spark_catalog`.`default`.`zxwtyswklpf`*")
Expand Down
2 changes: 1 addition & 1 deletion build/mvn
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ install_app() {
local local_checksum="${local_tarball}.${checksum_suffix}"
local remote_checksum="https://archive.apache.org/dist/${url_path}.${checksum_suffix}"

local curl_opts="--silent --show-error -L"
local curl_opts="--retry 3 --retry-all-errors --silent --show-error -L"
local wget_opts="--no-verbose"

if [ ! -f "$binary" ]; then
Expand Down
32 changes: 32 additions & 0 deletions common/utils/src/main/resources/error/error-conditions.json
Original file line number Diff line number Diff line change
Expand Up @@ -471,6 +471,20 @@
],
"sqlState" : "0A000"
},
"CLUSTERING_COLUMNS_MISMATCH" : {
"message" : [
"Specified clustering does not match that of the existing table <tableName>.",
"Specified clustering columns: [<specifiedClusteringString>].",
"Existing clustering columns: [<existingClusteringString>]."
],
"sqlState" : "42P10"
},
"CLUSTERING_NOT_SUPPORTED" : {
"message" : [
"'<operation>' does not support clustering."
],
"sqlState" : "42000"
},
"CODEC_NOT_AVAILABLE" : {
"message" : [
"The codec <codecName> is not available."
Expand Down Expand Up @@ -2941,6 +2955,24 @@
],
"sqlState" : "22029"
},
"INVALID_VARIABLE_DECLARATION" : {
"message" : [
"Invalid variable declaration."
],
"subClass" : {
"NOT_ALLOWED_IN_SCOPE" : {
"message" : [
"Variable <varName> was declared on line <lineNumber>, which is not allowed in this scope."
]
},
"ONLY_AT_BEGINNING" : {
"message" : [
"Variable <varName> can only be declared at the beginning of the compound, but it was declared on line <lineNumber>."
]
}
},
"sqlState" : "42K0M"
},
"INVALID_VARIABLE_TYPE_FOR_QUERY_EXECUTE_IMMEDIATE" : {
"message" : [
"Variable type must be string type but got <varType>."
Expand Down
6 changes: 6 additions & 0 deletions common/utils/src/main/resources/error/error-states.json
Original file line number Diff line number Diff line change
Expand Up @@ -4619,6 +4619,12 @@
"standard": "N",
"usedBy": ["Spark"]
},
"42K0M": {
"description": "Invalid variable declaration.",
"origin": "Spark,",
"standard": "N",
"usedBy": ["Spark"]
},
"42KD0": {
"description": "Ambiguous name reference.",
"origin": "Databricks",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -396,6 +396,7 @@ private[spark] object LogKeys {
case object MAX_NUM_PARTITIONS extends LogKey
case object MAX_NUM_POSSIBLE_BINS extends LogKey
case object MAX_NUM_ROWS_IN_MEMORY_BUFFER extends LogKey
case object MAX_SEEN_VERSION extends LogKey
case object MAX_SERVICE_NAME_LENGTH extends LogKey
case object MAX_SIZE extends LogKey
case object MAX_SLOTS extends LogKey
Expand All @@ -420,9 +421,11 @@ private[spark] object LogKeys {
case object MIN_NUM_FREQUENT_PATTERN extends LogKey
case object MIN_POINT_PER_CLUSTER extends LogKey
case object MIN_RATE extends LogKey
case object MIN_SEEN_VERSION extends LogKey
case object MIN_SHARE extends LogKey
case object MIN_SIZE extends LogKey
case object MIN_TIME extends LogKey
case object MIN_VERSIONS_TO_DELETE extends LogKey
case object MIN_VERSION_NUM extends LogKey
case object MISSING_PARENT_STAGES extends LogKey
case object MODEL_WEIGHTS extends LogKey
Expand Down Expand Up @@ -850,6 +853,7 @@ private[spark] object LogKeys {
case object USER_NAME extends LogKey
case object UUID extends LogKey
case object VALUE extends LogKey
case object VERSIONS_TO_DELETE extends LogKey
case object VERSION_NUM extends LogKey
case object VIEW_ACLS extends LogKey
case object VIEW_ACLS_GROUPS extends LogKey
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3090,6 +3090,11 @@ class SparkConnectPlanner(
w.partitionBy(names.toSeq: _*)
}

if (writeOperation.getClusteringColumnsCount > 0) {
val names = writeOperation.getClusteringColumnsList.asScala
w.clusterBy(names.head, names.tail.toSeq: _*)
}

if (writeOperation.hasSource) {
w.format(writeOperation.getSource)
}
Expand Down Expand Up @@ -3153,6 +3158,11 @@ class SparkConnectPlanner(
w.partitionedBy(names.head, names.tail: _*)
}

if (writeOperation.getClusteringColumnsCount > 0) {
val names = writeOperation.getClusteringColumnsList.asScala
w.clusterBy(names.head, names.tail.toSeq: _*)
}

writeOperation.getMode match {
case proto.WriteOperationV2.Mode.MODE_CREATE =>
if (writeOperation.hasProvider) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,7 @@ package object dsl {
mode: Option[String] = None,
sortByColumns: Seq[String] = Seq.empty,
partitionByCols: Seq[String] = Seq.empty,
clusterByCols: Seq[String] = Seq.empty,
bucketByCols: Seq[String] = Seq.empty,
numBuckets: Option[Int] = None): Command = {
val writeOp = WriteOperation.newBuilder()
Expand All @@ -242,6 +243,7 @@ package object dsl {
}
sortByColumns.foreach(writeOp.addSortColumnNames(_))
partitionByCols.foreach(writeOp.addPartitioningColumns(_))
clusterByCols.foreach(writeOp.addClusteringColumns(_))

if (numBuckets.nonEmpty && bucketByCols.nonEmpty) {
val op = WriteOperation.BucketBy.newBuilder()
Expand Down Expand Up @@ -272,13 +274,15 @@ package object dsl {
options: Map[String, String] = Map.empty,
tableProperties: Map[String, String] = Map.empty,
partitionByCols: Seq[Expression] = Seq.empty,
clusterByCols: Seq[String] = Seq.empty,
mode: Option[String] = None,
overwriteCondition: Option[Expression] = None): Command = {
val writeOp = WriteOperationV2.newBuilder()
writeOp.setInput(logicalPlan)
tableName.foreach(writeOp.setTableName)
provider.foreach(writeOp.setProvider)
partitionByCols.foreach(writeOp.addPartitioningColumns)
clusterByCols.foreach(writeOp.addClusteringColumns)
options.foreach { case (k, v) =>
writeOp.putOptions(k, v)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -596,6 +596,48 @@ class SparkConnectProtoSuite extends PlanTest with SparkConnectPlanTest {
}
}

test("Write with clustering") {
// Cluster by existing column.
withTable("testtable") {
transform(
localRelation.write(
tableName = Some("testtable"),
tableSaveMethod = Some("save_as_table"),
format = Some("parquet"),
clusterByCols = Seq("id")))
}

// Cluster by non-existing column.
assertThrows[AnalysisException](
transform(
localRelation
.write(
tableName = Some("testtable"),
tableSaveMethod = Some("save_as_table"),
format = Some("parquet"),
clusterByCols = Seq("noid"))))
}

test("Write V2 with clustering") {
// Cluster by existing column.
withTable("testtable") {
transform(
localRelation.writeV2(
tableName = Some("testtable"),
mode = Some("MODE_CREATE"),
clusterByCols = Seq("id")))
}

// Cluster by non-existing column.
assertThrows[AnalysisException](
transform(
localRelation
.writeV2(
tableName = Some("testtable"),
mode = Some("MODE_CREATE"),
clusterByCols = Seq("noid"))))
}

test("Write with invalid bucketBy configuration") {
val cmd = localRelation.write(bucketByCols = Seq("id"), numBuckets = Some(0))
assertThrows[InvalidCommandInput] {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,22 @@ final class DataFrameWriter[T] private[sql] (ds: Dataset[T]) {
this
}

/**
* Clusters the output by the given columns on the storage. The rows with matching values in the
* specified clustering columns will be consolidated within the same group.
*
* For instance, if you cluster a dataset by date, the data sharing the same date will be stored
* together in a file. This arrangement improves query efficiency when you apply selective
* filters to these clustering columns, thanks to data skipping.
*
* @since 4.0.0
*/
@scala.annotation.varargs
def clusterBy(colName: String, colNames: String*): DataFrameWriter[T] = {
this.clusteringColumns = Option(colName +: colNames)
this
}

/**
* Saves the content of the `DataFrame` at the specified path.
*
Expand Down Expand Up @@ -242,6 +258,7 @@ final class DataFrameWriter[T] private[sql] (ds: Dataset[T]) {
source.foreach(builder.setSource)
sortColumnNames.foreach(names => builder.addAllSortColumnNames(names.asJava))
partitioningColumns.foreach(cols => builder.addAllPartitioningColumns(cols.asJava))
clusteringColumns.foreach(cols => builder.addAllClusteringColumns(cols.asJava))

numBuckets.foreach(n => {
val bucketBuilder = proto.WriteOperation.BucketBy.newBuilder()
Expand Down Expand Up @@ -509,4 +526,6 @@ final class DataFrameWriter[T] private[sql] (ds: Dataset[T]) {
private var numBuckets: Option[Int] = None

private var sortColumnNames: Option[Seq[String]] = None

private var clusteringColumns: Option[Seq[String]] = None
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ final class DataFrameWriterV2[T] private[sql] (table: String, ds: Dataset[T])

private var partitioning: Option[Seq[proto.Expression]] = None

private var clustering: Option[Seq[String]] = None

private var overwriteCondition: Option[proto.Expression] = None

override def using(provider: String): CreateTableWriter[T] = {
Expand Down Expand Up @@ -77,6 +79,12 @@ final class DataFrameWriterV2[T] private[sql] (table: String, ds: Dataset[T])
this
}

@scala.annotation.varargs
override def clusterBy(colName: String, colNames: String*): CreateTableWriter[T] = {
this.clustering = Some(colName +: colNames)
this
}

override def create(): Unit = {
executeWriteOperation(proto.WriteOperationV2.Mode.MODE_CREATE)
}
Expand Down Expand Up @@ -133,6 +141,7 @@ final class DataFrameWriterV2[T] private[sql] (table: String, ds: Dataset[T])
provider.foreach(builder.setProvider)

partitioning.foreach(columns => builder.addAllPartitioningColumns(columns.asJava))
clustering.foreach(columns => builder.addAllClusteringColumns(columns.asJava))

options.foreach { case (k, v) =>
builder.putOptions(k, v)
Expand Down Expand Up @@ -252,8 +261,22 @@ trait CreateTableWriter[T] extends WriteConfigMethods[CreateTableWriter[T]] {
*
* @since 3.4.0
*/
@scala.annotation.varargs
def partitionedBy(column: Column, columns: Column*): CreateTableWriter[T]

/**
* Clusters the output by the given columns on the storage. The rows with matching values in the
* specified clustering columns will be consolidated within the same group.
*
* For instance, if you cluster a dataset by date, the data sharing the same date will be stored
* together in a file. This arrangement improves query efficiency when you apply selective
* filters to these clustering columns, thanks to data skipping.
*
* @since 4.0.0
*/
@scala.annotation.varargs
def clusterBy(colName: String, colNames: String*): CreateTableWriter[T]

/**
* Specifies a provider for the underlying output data source. Spark's default catalog supports
* "parquet", "json", etc.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,8 @@ class Table(
* whether the column is a partition column.
* @param isBucket
* whether the column is a bucket column.
* @param isCluster
* whether the column is a clustering column.
* @since 3.5.0
*/
class Column(
Expand All @@ -161,17 +163,29 @@ class Column(
val dataType: String,
val nullable: Boolean,
val isPartition: Boolean,
val isBucket: Boolean)
val isBucket: Boolean,
val isCluster: Boolean)
extends DefinedByConstructorParams {

def this(
name: String,
description: String,
dataType: String,
nullable: Boolean,
isPartition: Boolean,
isBucket: Boolean) = {
this(name, description, dataType, nullable, isPartition, isBucket, isCluster = false)
}

override def toString: String = {
"Column[" +
s"name='$name', " +
Option(description).map { d => s"description='$d', " }.getOrElse("") +
s"dataType='$dataType', " +
s"nullable='$nullable', " +
s"isPartition='$isPartition', " +
s"isBucket='$isBucket']"
s"isBucket='$isBucket', " +
s"isCluster='$isCluster']"
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ class ClientDatasetSuite extends ConnectFunSuite with BeforeAndAfterEach {
.setNumBuckets(2)
.addBucketColumnNames("col1")
.addBucketColumnNames("col2"))
.addClusteringColumns("col3")

val expectedPlan = proto.Plan
.newBuilder()
Expand All @@ -95,6 +96,7 @@ class ClientDatasetSuite extends ConnectFunSuite with BeforeAndAfterEach {
.sortBy("col1")
.partitionBy("col99")
.bucketBy(2, "col1", "col2")
.clusterBy("col3")
.parquet("my/test/path")
val actualPlan = service.getAndClearLatestInputPlan()
assert(actualPlan.equals(expectedPlan))
Expand Down Expand Up @@ -136,6 +138,7 @@ class ClientDatasetSuite extends ConnectFunSuite with BeforeAndAfterEach {
.setTableName("t1")
.addPartitioningColumns(col("col99").expr)
.setProvider("json")
.addClusteringColumns("col3")
.putTableProperties("key", "value")
.putOptions("key2", "value2")
.setMode(proto.WriteOperationV2.Mode.MODE_CREATE_OR_REPLACE)
Expand All @@ -147,6 +150,7 @@ class ClientDatasetSuite extends ConnectFunSuite with BeforeAndAfterEach {

df.writeTo("t1")
.partitionedBy(col("col99"))
.clusterBy("col3")
.using("json")
.tableProperty("key", "value")
.options(Map("key2" -> "value2"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,4 @@
package org.apache.spark.sql

package object protobuf {
protected[protobuf] object ScalaReflectionLock
}
Loading

0 comments on commit a639575

Please sign in to comment.