-
Notifications
You must be signed in to change notification settings - Fork 28.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-48761][SQL] Introduce clusterBy DataFrameWriter API for Scala #47301
Conversation
Hi @cloud-fan , @imback82 , @dabao521, this PR is ready for review |
0ac92ea
to
bbc7002
Compare
bbc7002
to
b5988f2
Compare
@@ -201,6 +201,22 @@ final class DataFrameWriter[T] private[sql] (ds: Dataset[T]) { | |||
this | |||
} | |||
|
|||
/** | |||
* Clusters the output by the given columns on the file system. The rows with matching values in |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's be a bit more general as data sources are not always based on file system. How about ... given columns on the storage.
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sure, updated here and below
@@ -201,6 +201,22 @@ final class DataFrameWriter[T] private[sql] (ds: Dataset[T]) { | |||
this | |||
} | |||
|
|||
/** | |||
* Clusters the output by the given columns on the file system. The rows with matching values in | |||
* the specified clustering columns will be consolidated within the same file. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto, ... will be consolidated within the same group.
* | ||
* @param clusterBySpec : existing ClusterBySpec to be converted to properties. | ||
*/ | ||
def toProperties(clusterBySpec: ClusterBySpec): Map[String, String] = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what's the difference between this and toProperty
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Besides the different return type, toProperty
additionally does validation of the clustering columns against table schema, therefore it has 2 more input parameters (schema
and resovler
).
I updated the comments.
@@ -274,6 +283,18 @@ trait CreateTableWriter[T] extends WriteConfigMethods[CreateTableWriter[T]] { | |||
*/ | |||
def partitionedBy(column: Column, columns: Column*): CreateTableWriter[T] | |||
|
|||
/** | |||
* Clusters the output by the given columns on the file system. The rows with matching values in | |||
* the specified clustering columns will be consolidated within the same file. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we update the api doc everywhere?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oops, updated here and below
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/DataFrameWriterV2.scala
Show resolved
Hide resolved
@@ -209,10 +221,26 @@ object ClusterBySpec { | |||
normalizeClusterBySpec(schema, clusterBySpec, resolver).toJson | |||
} | |||
|
|||
/** | |||
* Converts a ClusterBySpec to a map of table properties used to store the clustering |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm confused, why do we prefer a map with only one entry over a single tuple2 like toProperty
does?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No preference here, this is just a bit more friendly for the call site.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
then let's be consistent here and return tuple2. The name can be toPropertyWithoutValidation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated!
* @return a map entry for the clustering column property. | ||
*/ | ||
def toPropertyWithoutValidation(clusterBySpec: ClusterBySpec): (String, String) = { | ||
val columnValue = mapper.writeValueAsString(clusterBySpec.columnNames.map(_.fieldNames)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is it the same as clusterBySpec.toJson
? If yes then we can simply do
CatalogTable.PROP_CLUSTERING_COLUMNS -> ClusterBySpec.toJson
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, you're right. Updated
@@ -708,7 +746,10 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { | |||
}.getOrElse(Seq.empty[Transform]) | |||
val bucketing = | |||
getBucketSpec.map(spec => CatalogV2Implicits.BucketSpecHelper(spec).asTransform).toSeq | |||
partitioning ++ bucketing | |||
val clustering = clusteringColumns.map { colNames => | |||
ClusterByTransform(colNames.map(col => FieldReference(col))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ClusterByTransform(colNames.map(col => FieldReference(col))) | |
ClusterByTransform(colNames.map(FieldReference(_))) |
@@ -1373,6 +1373,65 @@ abstract class DDLSuite extends QueryTest with DDLSuiteBase { | |||
} | |||
} | |||
|
|||
test("Clustering columns should match when appending to existing data source tables") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shall we put it in DataFrameReaderWriterSuite
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sure, moved there.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM except for some minor comments
thanks, merging to master! |
### What changes were proposed in this pull request? Introduce a new `clusterBy` DataFrame API in Scala. This PR adds the API for both the DataFrameWriter V1 and V2, as well as Spark Connect. ### Why are the changes needed? Introduce more ways for users to interact with clustered tables. ### Does this PR introduce _any_ user-facing change? Yes, it adds a new `clusterBy` DataFrame API in Scala to allow specifying the clustering columns when writing DataFrames. ### How was this patch tested? New unit tests. ### Was this patch authored or co-authored using generative AI tooling? No Closes apache#47301 from zedtang/clusterby-scala-api. Authored-by: Jiaheng Tang <jiaheng.tang@databricks.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
### What changes were proposed in this pull request? Introduce a new `clusterBy` DataFrame API in Scala. This PR adds the API for both the DataFrameWriter V1 and V2, as well as Spark Connect. ### Why are the changes needed? Introduce more ways for users to interact with clustered tables. ### Does this PR introduce _any_ user-facing change? Yes, it adds a new `clusterBy` DataFrame API in Scala to allow specifying the clustering columns when writing DataFrames. ### How was this patch tested? New unit tests. ### Was this patch authored or co-authored using generative AI tooling? No Closes apache#47301 from zedtang/clusterby-scala-api. Authored-by: Jiaheng Tang <jiaheng.tang@databricks.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
### What changes were proposed in this pull request? Introduce a new `clusterBy` DataFrame API in Scala. This PR adds the API for both the DataFrameWriter V1 and V2, as well as Spark Connect. ### Why are the changes needed? Introduce more ways for users to interact with clustered tables. ### Does this PR introduce _any_ user-facing change? Yes, it adds a new `clusterBy` DataFrame API in Scala to allow specifying the clustering columns when writing DataFrames. ### How was this patch tested? New unit tests. ### Was this patch authored or co-authored using generative AI tooling? No Closes apache#47301 from zedtang/clusterby-scala-api. Authored-by: Jiaheng Tang <jiaheng.tang@databricks.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
What changes were proposed in this pull request?
Introduce a new
clusterBy
DataFrame API in Scala. This PR adds the API for both the DataFrameWriter V1 and V2, as well as Spark Connect.Why are the changes needed?
Introduce more ways for users to interact with clustered tables.
Does this PR introduce any user-facing change?
Yes, it adds a new
clusterBy
DataFrame API in Scala to allow specifying the clustering columns when writing DataFrames.How was this patch tested?
New unit tests.
Was this patch authored or co-authored using generative AI tooling?
No