From f1c69a5a687fdb4e5a613fe43bbf6f6366f63fda Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Thu, 26 Sep 2024 13:39:02 -0700 Subject: [PATCH] [SPARK-49791][SQL] Make DelegatingCatalogExtension more extendable ### What changes were proposed in this pull request? This PR updates `DelegatingCatalogExtension` so that it's more extendable - `initialize` becomes not final, so that sub-classes can overwrite it - `delegate` becomes `protected`, so that sub-classes can access it In addition, this PR fixes a mistake that `DelegatingCatalogExtension` is just a convenient default implementation, it's actually the `CatalogExtension` interface that indicates this catalog implementation will delegate requests to the Spark session catalog. https://github.com/apache/spark/pull/47724 should use `CatalogExtension` instead. ### Why are the changes needed? Unblock the Iceberg extension. ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? existing tests ### Was this patch authored or co-authored using generative AI tooling? no Closes #48257 from cloud-fan/catalog. Lead-authored-by: Wenchen Fan Co-authored-by: Wenchen Fan Signed-off-by: Dongjoon Hyun (cherry picked from commit 339dd5b93316fecd0455b53b2cedee2b5333a184) Signed-off-by: Dongjoon Hyun --- .../sql/connector/catalog/DelegatingCatalogExtension.java | 4 ++-- .../src/main/scala/org/apache/spark/sql/DataFrameWriter.scala | 2 +- .../spark/sql/catalyst/analysis/ResolveSessionCatalog.scala | 4 ++-- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/DelegatingCatalogExtension.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/DelegatingCatalogExtension.java index f6686d2e4d3b6..786821514822e 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/DelegatingCatalogExtension.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/DelegatingCatalogExtension.java @@ -38,7 +38,7 @@ @Evolving public abstract class DelegatingCatalogExtension implements CatalogExtension { - private CatalogPlugin delegate; + protected CatalogPlugin delegate; @Override public final void setDelegateCatalog(CatalogPlugin delegate) { @@ -51,7 +51,7 @@ public String name() { } @Override - public final void initialize(String name, CaseInsensitiveStringMap options) {} + public void initialize(String name, CaseInsensitiveStringMap options) {} @Override public Set capabilities() { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index 2506cb736f18b..f1664f66b7f8c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -568,7 +568,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { val canUseV2 = lookupV2Provider().isDefined || (df.sparkSession.sessionState.conf.getConf( SQLConf.V2_SESSION_CATALOG_IMPLEMENTATION).isDefined && !df.sparkSession.sessionState.catalogManager.catalog(CatalogManager.SESSION_CATALOG_NAME) - .isInstanceOf[DelegatingCatalogExtension]) + .isInstanceOf[CatalogExtension]) session.sessionState.sqlParser.parseMultipartIdentifier(tableName) match { case nameParts @ NonSessionCatalogAndIdentifier(catalog, ident) => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala index 7500f32ac2b90..0a86a043985eb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala @@ -27,7 +27,7 @@ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.catalyst.util.{quoteIfNeeded, toPrettySQL, ResolveDefaultColumns => DefaultCols} import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns._ -import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogPlugin, CatalogV2Util, DelegatingCatalogExtension, LookupCatalog, SupportsNamespaces, V1Table} +import org.apache.spark.sql.connector.catalog.{CatalogExtension, CatalogManager, CatalogPlugin, CatalogV2Util, LookupCatalog, SupportsNamespaces, V1Table} import org.apache.spark.sql.connector.expressions.Transform import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors} import org.apache.spark.sql.execution.command._ @@ -691,6 +691,6 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager) private def supportsV1Command(catalog: CatalogPlugin): Boolean = { isSessionCatalog(catalog) && ( SQLConf.get.getConf(SQLConf.V2_SESSION_CATALOG_IMPLEMENTATION).isEmpty || - catalog.isInstanceOf[DelegatingCatalogExtension]) + catalog.isInstanceOf[CatalogExtension]) } }