diff --git a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/v2/clickhouse/metadata/AddFileTags.scala b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/v2/clickhouse/metadata/AddFileTags.scala index 0680663eb553..8acc23aec207 100644 --- a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/v2/clickhouse/metadata/AddFileTags.scala +++ b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/v2/clickhouse/metadata/AddFileTags.scala @@ -144,7 +144,15 @@ object AddFileTags { "dirName" -> dirName, "marks" -> marks.toString ) - AddFile(name, partitionValues, bytesOnDisk, modificationTime, dataChange, stats, tags) + val mapper: ObjectMapper = new ObjectMapper() + val rootNode = mapper.createObjectNode() + rootNode.put("numRecords", rows) + rootNode.put("minValues", "") + rootNode.put("maxValues", "") + rootNode.put("nullCount", "") + // Add the `stats` into delta meta log + val metricsStats = mapper.writeValueAsString(rootNode) + AddFile(name, partitionValues, bytesOnDisk, modificationTime, dataChange, metricsStats, tags) } def addFileToAddMergeTreeParts(addFile: AddFile): AddMergeTreeParts = { diff --git a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreePathBasedWriteSuite.scala b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreePathBasedWriteSuite.scala index 79d663debcde..6c3d7dea0527 100644 --- a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreePathBasedWriteSuite.scala +++ b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreePathBasedWriteSuite.scala @@ -20,6 +20,7 @@ import org.apache.spark.SparkConf import org.apache.spark.sql.SaveMode import org.apache.spark.sql.delta.catalog.ClickHouseTableV2 import org.apache.spark.sql.delta.files.TahoeFileIndex +import org.apache.spark.sql.execution.LocalTableScanExec import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper import org.apache.spark.sql.execution.datasources.v2.clickhouse.metadata.AddMergeTreeParts import org.apache.spark.sql.functions._ @@ -1305,5 +1306,34 @@ class GlutenClickHouseMergeTreePathBasedWriteSuite dataFileList = dataPathFile.list(fileFilter) assertResult(6)(dataFileList.length) } + + test("GLUTEN-6378: Support delta count optimizer for the MergeTree format") { + val dataPath = s"$basePath/lineitem_mergetree_count_opti" + clearDataPath(dataPath) + + val sourceDF = spark.sql(s""" + |select * from lineitem + |""".stripMargin) + + sourceDF.write + .format("clickhouse") + .partitionBy("l_shipdate", "l_returnflag") + .option("clickhouse.orderByKey", "l_orderkey") + .option("clickhouse.primaryKey", "l_orderkey") + .mode(SaveMode.Append) + .save(dataPath) + + val df = spark.read + .format("clickhouse") + .load(dataPath) + .groupBy() + .count() + val result = df.collect() + assertResult(600572)(result(0).getLong(0)) + // Spark 3.2 + Delta 2.0 does not support this feature + if (!sparkVersion.equals("3.2")) { + assert(df.queryExecution.executedPlan.isInstanceOf[LocalTableScanExec]) + } + } } // scalastyle:off line.size.limit diff --git a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeWriteSuite.scala b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeWriteSuite.scala index 27bd4372aa64..e88eb1fedd42 100644 --- a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeWriteSuite.scala +++ b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeWriteSuite.scala @@ -20,6 +20,7 @@ import org.apache.spark.SparkConf import org.apache.spark.sql.{DataFrame, SaveMode} import org.apache.spark.sql.delta.catalog.ClickHouseTableV2 import org.apache.spark.sql.delta.files.TahoeFileIndex +import org.apache.spark.sql.execution.LocalTableScanExec import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper import org.apache.spark.sql.execution.datasources.v2.clickhouse.metadata.AddMergeTreeParts @@ -357,34 +358,10 @@ class GlutenClickHouseMergeTreeWriteSuite |""".stripMargin val df = spark.sql(sql1) - val result = df.collect() assertResult(1)( // in test data, there are only 1 row with l_orderkey = 12647 - result.apply(0).get(0) + df.collect().apply(0).get(0) ) - val scanExec = collect(df.queryExecution.executedPlan) { - case f: FileSourceScanExecTransformer => f - } - assertResult(1)(scanExec.size) - - val mergetreeScan = scanExec.head - assert(mergetreeScan.nodeName.startsWith("Scan mergetree")) - - val fileIndex = mergetreeScan.relation.location.asInstanceOf[TahoeFileIndex] - assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).clickhouseTableConfigs.nonEmpty) - assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).bucketOption.isEmpty) - assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).orderByKeyOption.isEmpty) - assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).primaryKeyOption.isEmpty) - assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).partitionColumns.isEmpty) - val addFiles = fileIndex.matchingFiles(Nil, Nil).map(f => f.asInstanceOf[AddMergeTreeParts]) - assertResult(600572)(addFiles.map(_.rows).sum) - - // 4 parts belong to the first batch - // 2 parts belong to the second batch (1 actual updated part, 1 passively updated). - assertResult(6)(addFiles.size) - val filePaths = addFiles.map(_.path).groupBy(name => name.substring(0, name.lastIndexOf("_"))) - assertResult(2)(filePaths.size) - assertResult(Array(2, 4))(filePaths.values.map(paths => paths.size).toArray.sorted) } val sql2 = @@ -439,22 +416,9 @@ class GlutenClickHouseMergeTreeWriteSuite val df = spark.sql(s""" | select count(*) from lineitem_mergetree_delete |""".stripMargin) - val result = df.collect() assertResult(600571)( - result.apply(0).get(0) + df.collect().apply(0).get(0) ) - val scanExec = collect(df.queryExecution.executedPlan) { - case f: FileSourceScanExecTransformer => f - } - val mergetreeScan = scanExec.head - val fileIndex = mergetreeScan.relation.location.asInstanceOf[TahoeFileIndex] - val addFiles = fileIndex.matchingFiles(Nil, Nil).map(f => f.asInstanceOf[AddMergeTreeParts]) - // 4 parts belong to the first batch - // 2 parts belong to the second batch (1 actual updated part, 1 passively updated). - assertResult(6)(addFiles.size) - val filePaths = addFiles.map(_.path).groupBy(name => name.substring(0, name.lastIndexOf("_"))) - assertResult(2)(filePaths.size) - assertResult(Array(2, 4))(filePaths.values.map(paths => paths.size).toArray.sorted) } { @@ -1491,19 +1455,6 @@ class GlutenClickHouseMergeTreeWriteSuite val result = df.collect() assertResult(1)(result.length) assertResult(10)(result(0).getLong(0)) - - val scanExec = collect(df.queryExecution.executedPlan) { - case f: FileSourceScanExecTransformer => f - } - assertResult(1)(scanExec.size) - - val mergetreeScan = scanExec.head - assert(mergetreeScan.nodeName.startsWith("Scan mergetree")) - - val fileIndex = mergetreeScan.relation.location.asInstanceOf[TahoeFileIndex] - val addFiles = fileIndex.matchingFiles(Nil, Nil).map(f => f.asInstanceOf[AddMergeTreeParts]) - assertResult(1)(addFiles.size) - assertResult(10)(addFiles.head.rows) }) } @@ -1962,5 +1913,63 @@ class GlutenClickHouseMergeTreeWriteSuite } }) } + + test("GLUTEN-6378: Support delta count optimizer for the MergeTree format") { + spark.sql(s""" + |DROP TABLE IF EXISTS lineitem_mergetree_count_opti; + |""".stripMargin) + + spark.sql(s""" + |CREATE TABLE IF NOT EXISTS lineitem_mergetree_count_opti + |( + | l_orderkey bigint, + | l_partkey bigint, + | l_suppkey bigint, + | l_linenumber bigint, + | l_quantity double, + | l_extendedprice double, + | l_discount double, + | l_tax double, + | l_returnflag string, + | l_linestatus string, + | l_shipdate date, + | l_commitdate date, + | l_receiptdate date, + | l_shipinstruct string, + | l_shipmode string, + | l_comment string + |) + |USING clickhouse + |PARTITIONED BY (l_shipdate, l_returnflag) + |TBLPROPERTIES (orderByKey='l_orderkey', + | primaryKey='l_orderkey') + |LOCATION '$basePath/lineitem_mergetree_count_opti' + |""".stripMargin) + + // dynamic partitions + spark.sql(s""" + | insert into table lineitem_mergetree_count_opti + | select * from lineitem + |""".stripMargin) + + val sqlStr = + s""" + |SELECT + | count(*) AS count_order + |FROM + | lineitem_mergetree_count_opti + |""".stripMargin + runSql(sqlStr)( + df => { + val result = df.collect() + assertResult(1)(result.length) + assertResult("600572")(result(0).getLong(0).toString) + + // Spark 3.2 + Delta 2.0 does not support this feature + if (!sparkVersion.equals("3.2")) { + assert(df.queryExecution.executedPlan.isInstanceOf[LocalTableScanExec]) + } + }) + } } // scalastyle:off line.size.limit diff --git a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTPCHBucketSuite.scala b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTPCHBucketSuite.scala index 79a708ce50eb..59912e72222a 100644 --- a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTPCHBucketSuite.scala +++ b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTPCHBucketSuite.scala @@ -739,8 +739,13 @@ class GlutenClickHouseTPCHBucketSuite runSql(SQL6)( df => { checkResult(df, Array(Row(600572))) - // there is a shuffle between two phase hash aggregates. - checkHashAggregateCount(df, 2) + if (sparkVersion.equals("3.2")) { + // there is a shuffle between two phase hash aggregate. + checkHashAggregateCount(df, 2) + } else { + // the delta will use the delta log meta to response this sql + checkHashAggregateCount(df, 0) + } }) // test sort aggregates