diff --git a/backends-clickhouse/pom.xml b/backends-clickhouse/pom.xml index 241cdd84519a..130fe88552e7 100644 --- a/backends-clickhouse/pom.xml +++ b/backends-clickhouse/pom.xml @@ -338,7 +338,6 @@ src/test/scala/**/*.scala src/main/delta-${delta.binary.version}/**/*.scala src/test/delta-${delta.binary.version}/**/*.scala - src/main/${sparkshim.module.name}/**/*.scala src/main/delta-${delta.binary.version}/org/apache/spark/sql/delta/commands/*.scala @@ -398,7 +397,6 @@ src/main/delta-${delta.binary.version} - src/main/${sparkshim.module.name} diff --git a/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/delta/catalog/ClickHouseTableV2.scala b/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/delta/catalog/ClickHouseTableV2.scala index f5f1668f60b5..f730b42e4db0 100644 --- a/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/delta/catalog/ClickHouseTableV2.scala +++ b/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/delta/catalog/ClickHouseTableV2.scala @@ -102,7 +102,9 @@ class ClickHouseTableV2( StorageMeta.withMoreStorageInfo( meta, ClickhouseSnapshot.genSnapshotId(initialSnapshot), - deltaLog.dataPath)) + deltaLog.dataPath, + dataBaseName, + tableName)) } override def deltaProperties: Map[String, String] = properties().asScala.toMap diff --git a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHRuleApi.scala b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHRuleApi.scala index ba9d859bc9cd..2cf1f4fcc45b 100644 --- a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHRuleApi.scala +++ b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHRuleApi.scala @@ -19,7 +19,7 @@ package org.apache.gluten.backendsapi.clickhouse import org.apache.gluten.backendsapi.RuleApi import org.apache.gluten.extension._ import org.apache.gluten.extension.columnar._ -import org.apache.gluten.extension.columnar.MiscColumnarRules._ +import org.apache.gluten.extension.columnar.MiscColumnarRules.{RemoveGlutenTableCacheColumnarToRow, RemoveTopmostColumnarToRow, RewriteSubqueryBroadcast, TransformPreOverrides} import org.apache.gluten.extension.columnar.rewrite.RewriteSparkPlanRulesManager import org.apache.gluten.extension.columnar.transition.{InsertTransitions, RemoveTransitions} import org.apache.gluten.extension.injector.{RuleInjector, SparkInjector} @@ -28,7 +28,7 @@ import org.apache.gluten.parser.{GlutenCacheFilesSqlParser, GlutenClickhouseSqlP import org.apache.gluten.sql.shims.SparkShimLoader import org.apache.gluten.utils.PhysicalPlanSelector -import org.apache.spark.sql.catalyst._ +import org.apache.spark.sql.catalyst.{CHAggregateFunctionRewriteRule, EqualToRewrite} import org.apache.spark.sql.execution.{ColumnarCollapseTransformStages, GlutenFallbackReporter} import org.apache.spark.util.SparkPlanRules @@ -44,7 +44,7 @@ class CHRuleApi extends RuleApi { } private object CHRuleApi { - private def injectSpark(injector: SparkInjector): Unit = { + def injectSpark(injector: SparkInjector): Unit = { // Inject the regular Spark rules directly. injector.injectQueryStagePrepRule(FallbackBroadcastHashJoinPrepQueryStage.apply) injector.injectQueryStagePrepRule(spark => CHAQEPropagateEmptyRelation(spark)) @@ -61,10 +61,9 @@ private object CHRuleApi { injector.injectOptimizerRule(spark => CHAggregateFunctionRewriteRule(spark)) injector.injectOptimizerRule(_ => CountDistinctWithoutExpand) injector.injectOptimizerRule(_ => EqualToRewrite) - CHExtendRule.injectSpark(injector) } - private def injectLegacy(injector: LegacyInjector): Unit = { + def injectLegacy(injector: LegacyInjector): Unit = { // Gluten columnar: Transform rules. injector.injectTransform(_ => RemoveTransitions) injector.injectTransform(_ => PushDownInputFileExpression.PreOffload) @@ -108,7 +107,7 @@ private object CHRuleApi { injector.injectFinal(_ => RemoveFallbackTagRule()) } - private def injectRas(injector: RasInjector): Unit = { + def injectRas(injector: RasInjector): Unit = { // CH backend doesn't work with RAS at the moment. Inject a rule that aborts any // execution calls. injector.inject( diff --git a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/mergetree/StorageMeta.scala b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/mergetree/StorageMeta.scala index e08f91450ec2..6a7ebc3c39d2 100644 --- a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/mergetree/StorageMeta.scala +++ b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/mergetree/StorageMeta.scala @@ -24,13 +24,8 @@ import org.apache.spark.sql.execution.datasources.clickhouse.utils.MergeTreeDelt import org.apache.hadoop.fs.Path -import scala.collection.mutable.ListBuffer - /** Reserved table property for MergeTree table. */ object StorageMeta { - val Provider: String = "clickhouse" - val DEFAULT_FILE_FORMAT: String = "write.format.default" - val DEFAULT_FILE_FORMAT_DEFAULT: String = "mergetree" // Storage properties val DEFAULT_PATH_BASED_DATABASE: String = "clickhouse_db" @@ -54,18 +49,6 @@ object StorageMeta { STORAGE_PATH -> deltaPath.toString) withMoreOptions(metadata, moreOptions) } - def withMoreStorageInfo(metadata: Metadata, snapshotId: String, deltaPath: Path): Metadata = { - val moreOptions = - ListBuffer(STORAGE_SNAPSHOT_ID -> snapshotId, STORAGE_PATH -> deltaPath.toString) - // Path-based create table statement does not have storage_db and storage_table - if (!metadata.configuration.contains(STORAGE_DB)) { - moreOptions += STORAGE_DB -> DEFAULT_PATH_BASED_DATABASE - } - if (!metadata.configuration.contains(STORAGE_TABLE)) { - moreOptions += STORAGE_TABLE -> deltaPath.toUri.getPath - } - withMoreOptions(metadata, moreOptions.toSeq) - } private def withMoreOptions(metadata: Metadata, newOptions: Seq[(String, String)]): Metadata = { metadata.copy(configuration = metadata.configuration ++ newOptions) diff --git a/backends-clickhouse/src/main/spark32/org/apache/gluten/backendsapi/clickhouse/CHExtendRule.scala b/backends-clickhouse/src/main/spark32/org/apache/gluten/backendsapi/clickhouse/CHExtendRule.scala deleted file mode 100644 index 234954386adb..000000000000 --- a/backends-clickhouse/src/main/spark32/org/apache/gluten/backendsapi/clickhouse/CHExtendRule.scala +++ /dev/null @@ -1,23 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.gluten.backendsapi.clickhouse - -import org.apache.gluten.extension.injector.SparkInjector - -object CHExtendRule { - def injectSpark(injector: SparkInjector): Unit = {} -} diff --git a/backends-clickhouse/src/main/spark33/org/apache/gluten/backendsapi/clickhouse/CHExtendRule.scala b/backends-clickhouse/src/main/spark33/org/apache/gluten/backendsapi/clickhouse/CHExtendRule.scala deleted file mode 100644 index 234954386adb..000000000000 --- a/backends-clickhouse/src/main/spark33/org/apache/gluten/backendsapi/clickhouse/CHExtendRule.scala +++ /dev/null @@ -1,23 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.gluten.backendsapi.clickhouse - -import org.apache.gluten.extension.injector.SparkInjector - -object CHExtendRule { - def injectSpark(injector: SparkInjector): Unit = {} -} diff --git a/backends-clickhouse/src/main/spark35/org/apache/gluten/backendsapi/clickhouse/CHExtendRule.scala b/backends-clickhouse/src/main/spark35/org/apache/gluten/backendsapi/clickhouse/CHExtendRule.scala deleted file mode 100644 index fb3a854ef98c..000000000000 --- a/backends-clickhouse/src/main/spark35/org/apache/gluten/backendsapi/clickhouse/CHExtendRule.scala +++ /dev/null @@ -1,27 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.gluten.backendsapi.clickhouse - -import org.apache.gluten.extension.injector.SparkInjector - -import org.apache.spark.sql.catalyst.AddStorageInfo - -object CHExtendRule { - def injectSpark(injector: SparkInjector): Unit = { - injector.injectOptimizerRule(_ => AddStorageInfo) - } -} diff --git a/backends-clickhouse/src/main/spark35/org/apache/spark/sql/catalyst/AddStorageInfo.scala b/backends-clickhouse/src/main/spark35/org/apache/spark/sql/catalyst/AddStorageInfo.scala deleted file mode 100644 index 760241f840f2..000000000000 --- a/backends-clickhouse/src/main/spark35/org/apache/spark/sql/catalyst/AddStorageInfo.scala +++ /dev/null @@ -1,50 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.spark.sql.catalyst - -import org.apache.spark.sql.catalyst.analysis.ResolvedIdentifier -import org.apache.spark.sql.catalyst.plans.logical.{CreateTable, LogicalPlan, TableSpec} -import org.apache.spark.sql.catalyst.rules.Rule -import org.apache.spark.sql.catalyst.trees.TreePattern.COMMAND -import org.apache.spark.sql.execution.datasources.mergetree.StorageMeta - -/** This object is responsible for adding storage information to the CreateTable. */ - -object AddStorageInfo extends Rule[LogicalPlan] { - - private def createMergeTreeTable(tableSpec: TableSpec): Boolean = { - tableSpec.provider.contains(StorageMeta.Provider) || - tableSpec.properties - .get(StorageMeta.DEFAULT_FILE_FORMAT) - .contains(StorageMeta.DEFAULT_FILE_FORMAT_DEFAULT) - } - - override def apply(plan: LogicalPlan): LogicalPlan = - plan.transformWithPruning(_.containsAnyPattern(COMMAND)) { - case create @ CreateTable(ResolvedIdentifier(_, ident), _, _, tableSpec: TableSpec, _) - if createMergeTreeTable(tableSpec) => - val newTableSpec = tableSpec.copy( - properties = tableSpec.properties ++ Seq( - StorageMeta.STORAGE_DB -> ident - .namespace() - .lastOption - .getOrElse(StorageMeta.DEFAULT_CREATE_TABLE_DATABASE), - StorageMeta.STORAGE_TABLE -> ident.name()) - ) - create.copy(tableSpec = newTableSpec) - } -}