diff --git a/core/src/main/scala/org/apache/spark/sql/delta/DeltaConfig.scala b/core/src/main/scala/org/apache/spark/sql/delta/DeltaConfig.scala index ac8ba8dab03..6d930bd1aac 100644 --- a/core/src/main/scala/org/apache/spark/sql/delta/DeltaConfig.scala +++ b/core/src/main/scala/org/apache/spark/sql/delta/DeltaConfig.scala @@ -405,6 +405,16 @@ trait DeltaConfigsBase extends DeltaLogging { _ => true, "needs to be a boolean.") + /** + * Whether this table will automagically optimize the layout of files while writing data. + */ + val OPTIMIZE_WRITE = buildConfig[Boolean]( + "autoOptimize.optimizeWrite", + "false", + _.toBoolean, + _ => true, + "needs to be a boolean.") + /** * The number of columns to collect stats on for data skipping. A value of -1 means collecting * stats for all columns. Updating this conf does not trigger stats re-collection, but redefines diff --git a/core/src/main/scala/org/apache/spark/sql/delta/files/TransactionalWrite.scala b/core/src/main/scala/org/apache/spark/sql/delta/files/TransactionalWrite.scala index ff651471b80..09f6679692e 100644 --- a/core/src/main/scala/org/apache/spark/sql/delta/files/TransactionalWrite.scala +++ b/core/src/main/scala/org/apache/spark/sql/delta/files/TransactionalWrite.scala @@ -34,9 +34,10 @@ import org.apache.hadoop.fs.Path import org.apache.spark.SparkException import org.apache.spark.sql.{DataFrame, Dataset, SparkSession} import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.catalyst.plans.logical.LocalRelation +import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan} +import org.apache.spark.sql.delta.util.DeltaShufflePartitionsUtil import org.apache.spark.sql.execution._ -import org.apache.spark.sql.execution.datasources.{BasicWriteJobStatsTracker, FileFormatWriter, WriteJobStatsTracker} +import org.apache.spark.sql.execution.datasources.{BasicWriteJobStatsTracker, FileFormatWriter, HadoopFsRelation, LogicalRelation, WriteJobStatsTracker} import org.apache.spark.sql.functions.{col, to_json} import org.apache.spark.sql.types.{StringType, StructField, StructType} import org.apache.spark.util.SerializableConfiguration @@ -294,6 +295,8 @@ trait TransactionalWrite extends DeltaLogging { self: OptimisticTransactionImpl val constraints = Constraints.getAll(metadata, spark) ++ generatedColumnConstraints ++ additionalConstraints + val isOptimize = isOptimizeCommand(queryExecution.analyzed) + SQLExecution.withNewExecutionId(queryExecution, Option("deltaTransactionalWrite")) { val outputSpec = FileFormatWriter.OutputSpec( outputPath.toString, @@ -302,7 +305,9 @@ trait TransactionalWrite extends DeltaLogging { self: OptimisticTransactionImpl val empty2NullPlan = convertEmptyToNullIfNeeded(queryExecution.executedPlan, partitioningColumns, constraints) - val physicalPlan = DeltaInvariantCheckerExec(empty2NullPlan, constraints) + val optimizeWritePlan = + applyOptimizeWriteIfNeeded(spark, empty2NullPlan, partitionSchema, isOptimize) + val physicalPlan = DeltaInvariantCheckerExec(optimizeWritePlan, constraints) val statsTrackers: ListBuffer[WriteJobStatsTracker] = ListBuffer() @@ -359,4 +364,32 @@ trait TransactionalWrite extends DeltaLogging { self: OptimisticTransactionImpl resultFiles.toSeq ++ committer.changeFiles } + + private def applyOptimizeWriteIfNeeded( + spark: SparkSession, + physicalPlan: SparkPlan, + partitionSchema: StructType, + isOptimize: Boolean): SparkPlan = { + val optimizeWriteEnabled = !isOptimize && + spark.sessionState.conf.getConf(DeltaSQLConf.OPTIMIZE_WRITE_ENABLED) + .getOrElse(DeltaConfigs.OPTIMIZE_WRITE.fromMetaData(metadata)) + if (optimizeWriteEnabled) { + val planWithoutTopRepartition = + DeltaShufflePartitionsUtil.removeTopRepartition(physicalPlan) + val partitioning = DeltaShufflePartitionsUtil.partitioningForRebalance( + physicalPlan.output, partitionSchema, spark.sessionState.conf.numShufflePartitions) + OptimizeWriteExchangeExec(partitioning, planWithoutTopRepartition) + } else { + physicalPlan + } + } + + private def isOptimizeCommand(plan: LogicalPlan): Boolean = { + val leaves = plan.collectLeaves() + leaves.size == 1 && leaves.head.collect { + case LogicalRelation(HadoopFsRelation( + index: TahoeBatchFileIndex, _, _, _, _, _), _, _, _) => + index.actionType.equals("Optimize") + }.headOption.getOrElse(false) + } } diff --git a/core/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala b/core/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala index 7550a5842dc..78eed21ab6d 100644 --- a/core/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala +++ b/core/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala @@ -773,6 +773,34 @@ trait DeltaSQLConfBase { .booleanConf .createWithDefault(false) + val OPTIMIZE_WRITE_ENABLED = + buildConf(key = "optimizeWrite.enabled") + .internal() + .doc("Enable optimize write.") + .booleanConf + .createOptional + + val OPTIMIZE_WRITE_BIN_SIZE = + buildConf(key = "optimizeWrite.binSize") + .internal() + .doc("File size hint for optimize write.") + .longConf + .createWithDefault(134217728) + + val OPTIMIZE_WRITE_SMALL_PARTITION_FACTOR = + buildConf("optimizeWrite.smallPartitionFactor") + .internal() + .doc("Factor used to coalesce partitions for optimize write.") + .doubleConf + .createWithDefault(0.5) + + val OPTIMIZE_WRITE_MERGED_PARTITION_FACTOR = + buildConf("optimizeWrite.mergedPartitionFactor") + .internal() + .doc("Factor used to rebalance partitions for optimize write.") + .doubleConf + .createWithDefault(1.2) + val DELTA_ALTER_TABLE_CHANGE_COLUMN_CHECK_EXPRESSIONS = buildConf("alterTable.changeColumn.checkExpressions") .internal() diff --git a/core/src/main/scala/org/apache/spark/sql/delta/util/DeltaShufflePartitionsUtil.scala b/core/src/main/scala/org/apache/spark/sql/delta/util/DeltaShufflePartitionsUtil.scala new file mode 100644 index 00000000000..07513ec7735 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/sql/delta/util/DeltaShufflePartitionsUtil.scala @@ -0,0 +1,175 @@ +/* + * Copyright (2021) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.delta.util + +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning, RoundRobinPartitioning} +import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec +import org.apache.spark.sql.execution.exchange.{ENSURE_REQUIREMENTS, ShuffleExchangeExec} +import org.apache.spark.sql.execution.{CoalesceExec, PartialReducerPartitionSpec, SparkPlan} +import org.apache.spark.sql.types.StructType +import org.apache.spark.{MapOutputTrackerMaster, SparkEnv} + +import scala.collection.mutable.ArrayBuffer + +object DeltaShufflePartitionsUtil { + + // scalastyle:off line.size.limit + /** + * Splits the skewed partition based on the map size and the target partition size + * after split, and create a list of `PartialMapperPartitionSpec`. Returns None if can't split. + * + * The function is copied from Spark 3.2: + * https://github.com/apache/spark/blob/v3.2.1/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsUtil.scala#L376 + * EDIT: Configurable smallPartitionFactor and mergedPartitionFactor. + */ + // scalastyle:on + def createSkewPartitionSpecs( + shuffleId: Int, + reducerId: Int, + targetSize: Long, + smallPartitionFactor: Double, + mergedPartitionFactor: Double): Option[Seq[PartialReducerPartitionSpec]] = { + val mapPartitionSizes = getMapSizesForReduceId(shuffleId, reducerId) + if (mapPartitionSizes.exists(_ < 0)) return None + val mapStartIndices = splitSizeListByTargetSize( + mapPartitionSizes, + targetSize, + smallPartitionFactor, + mergedPartitionFactor) + if (mapStartIndices.length > 1) { + Some(mapStartIndices.indices.map { i => + val startMapIndex = mapStartIndices(i) + val endMapIndex = if (i == mapStartIndices.length - 1) { + mapPartitionSizes.length + } else { + mapStartIndices(i + 1) + } + val dataSize = startMapIndex.until(endMapIndex).map(mapPartitionSizes(_)).sum + PartialReducerPartitionSpec(reducerId, startMapIndex, endMapIndex, dataSize) + }) + } else { + None + } + } + + // scalastyle:off line.size.limit + /** + * Given a list of size, return an array of indices to split the list into multiple partitions, + * so that the size sum of each partition is close to the target size. Each index indicates the + * start of a partition. + * + * The function is copied from Spark 3.2: + * https://github.com/apache/spark/blob/v3.2.1/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsUtil.scala#L319 + * EDIT: Configurable smallPartitionFactor and mergedPartitionFactor. + */ + // scalastyle:on + // Visible for testing + private[sql] def splitSizeListByTargetSize( + sizes: Seq[Long], + targetSize: Long, + smallPartitionFactor: Double, + mergedPartitionFactor: Double): Array[Int] = { + val partitionStartIndices = ArrayBuffer[Int]() + partitionStartIndices += 0 + var i = 0 + var currentPartitionSize = 0L + var lastPartitionSize = -1L + + def tryMergePartitions() = { + // When we are going to start a new partition, it's possible that the current partition or + // the previous partition is very small and it's better to merge the current partition into + // the previous partition. + val shouldMergePartitions = lastPartitionSize > -1 && + ((currentPartitionSize + lastPartitionSize) < targetSize * mergedPartitionFactor || + (currentPartitionSize < targetSize * smallPartitionFactor || + lastPartitionSize < targetSize * smallPartitionFactor)) + if (shouldMergePartitions) { + // We decide to merge the current partition into the previous one, so the start index of + // the current partition should be removed. + partitionStartIndices.remove(partitionStartIndices.length - 1) + lastPartitionSize += currentPartitionSize + } else { + lastPartitionSize = currentPartitionSize + } + } + + while (i < sizes.length) { + // If including the next size in the current partition exceeds the target size, package the + // current partition and start a new partition. + if (i > 0 && currentPartitionSize + sizes(i) > targetSize) { + tryMergePartitions() + partitionStartIndices += i + currentPartitionSize = sizes(i) + } else { + currentPartitionSize += sizes(i) + } + i += 1 + } + tryMergePartitions() + partitionStartIndices.toArray + } + + // scalastyle:off line.size.limit + /** + * Get the map size of the specific shuffle and reduce ID. Note that, some map outputs can be + * missing due to issues like executor lost. The size will be -1 for missing map outputs and the + * caller side should take care of it. + * + * The function is copied from Spark 3.2: + * https://github.com/apache/spark/blob/v3.2.1/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsUtil.scala#L365 + */ + // scalastyle:on + private[sql] def getMapSizesForReduceId(shuffleId: Int, partitionId: Int): Array[Long] = { + val mapOutputTracker = SparkEnv.get.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster] + mapOutputTracker.shuffleStatuses(shuffleId).withMapStatuses(_.map { stat => + if (stat == null) -1 else stat.getSizeForBlock(partitionId) + }) + } + + private[sql] def removeTopRepartition(plan: SparkPlan): SparkPlan = { + plan match { + case p@AdaptiveSparkPlanExec(inputPlan: ShuffleExchangeExec, _, _, _, _) + if !inputPlan.shuffleOrigin.equals(ENSURE_REQUIREMENTS) => + p.copy(inputPlan = inputPlan.child) + case ShuffleExchangeExec(_, child, shuffleOrigin) + if !shuffleOrigin.equals(ENSURE_REQUIREMENTS) => + child + case AdaptiveSparkPlanExec(inputPlan: CoalesceExec, _, _, _, _) => + inputPlan.child + case CoalesceExec(_, child) => + child + case _ => + plan + } + } + + private[sql] def partitioningForRebalance( + outputColumns: Seq[Attribute], + partitionSchema: StructType, + numShufflePartitions: Int): Partitioning = { + if (partitionSchema.fields.isEmpty) { + RoundRobinPartitioning(numShufflePartitions) + } else { + val partitionColumnsExpr = partitionSchema.fields.map { f => + outputColumns.find(c => c.name.equals(f.name)).get + } + HashPartitioning(partitionColumnsExpr, numShufflePartitions) + } + } + +} diff --git a/core/src/main/scala/org/apache/spark/sql/execution/OptimizeWriteExchangeExec.scala b/core/src/main/scala/org/apache/spark/sql/execution/OptimizeWriteExchangeExec.scala new file mode 100644 index 00000000000..cfaa9d03643 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/sql/execution/OptimizeWriteExchangeExec.scala @@ -0,0 +1,170 @@ +/* + * Copyright (2021) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution + +import org.apache.spark.rdd.RDD +import org.apache.spark.serializer.Serializer +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, RoundRobinPartitioning, UnknownPartitioning} +import org.apache.spark.sql.delta.sources.DeltaSQLConf +import org.apache.spark.sql.delta.util.DeltaShufflePartitionsUtil +import org.apache.spark.sql.execution.exchange.{Exchange, ShuffleExchangeExec} +import org.apache.spark.sql.execution.metric.{SQLMetrics, SQLShuffleReadMetricsReporter, SQLShuffleWriteMetricsReporter} +import org.apache.spark.util.ThreadUtils +import org.apache.spark.{MapOutputStatistics, ShuffleDependency} + +import scala.concurrent.Future +import scala.concurrent.duration.Duration + +case class OptimizeWriteExchangeExec( + partitioning: Partitioning, + override val child: SparkPlan) extends Exchange { + + // Use 140% of target file size hint config considering parquet compression. + // Still the result file can be smaller/larger than the config due to data skew or + // variable compression ratio for each data type. + final val PARQUET_COMPRESSION_RATIO = 1.4 + + // Dummy partitioning because: + // 1) The exact output partitioning is determined at query runtime + // 2) optimizeWrite is always placed right after the top node(DeltaInvariantChecker), + // there is no parent plan to refer to outputPartitioning + override def outputPartitioning: Partitioning = UnknownPartitioning(partitioning.numPartitions) + + private lazy val writeMetrics = + SQLShuffleWriteMetricsReporter.createShuffleWriteMetrics(sparkContext) + private[sql] lazy val readMetrics = + SQLShuffleReadMetricsReporter.createShuffleReadMetrics(sparkContext) + override lazy val metrics = Map( + "dataSize" -> SQLMetrics.createSizeMetric(sparkContext, "data size"), + "numPartitions" -> SQLMetrics.createMetric(sparkContext, "number of partitions") + ) ++ readMetrics ++ writeMetrics + + private lazy val serializer: Serializer = + new UnsafeRowSerializer(child.output.size, longMetric("dataSize")) + + @transient lazy val inputRDD: RDD[InternalRow] = child.execute() + + @transient lazy val mapOutputStatisticsFuture: Future[MapOutputStatistics] = { + if (inputRDD.getNumPartitions == 0) { + Future.successful(null) + } else { + sparkContext.submitMapStage(shuffleDependency) + } + } + + + @transient lazy val shuffleDependency: ShuffleDependency[Int, InternalRow, InternalRow] = { + val dep = ShuffleExchangeExec.prepareShuffleDependency( + inputRDD, + child.output, + partitioning, + serializer, + writeMetrics) + metrics("numPartitions").set(dep.partitioner.numPartitions) + val executionId = sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY) + SQLMetrics.postDriverMetricUpdates( + sparkContext, executionId, metrics("numPartitions") :: Nil) + dep + } + + override protected def doExecute(): RDD[InternalRow] = { + // Collect execution statistics, these will be used to adjust/decide how to split files + val stats = ThreadUtils.awaitResult(mapOutputStatisticsFuture, Duration.Inf) + if (stats == null) { + new ShuffledRowRDD(shuffleDependency, readMetrics) + } else { + try { + val partitionSpecs = Some(rebalancePartitions(stats)) + new ShuffledRowRDD(shuffleDependency, readMetrics, partitionSpecs.get.toArray) + } catch { + case e: Throwable => + logWarning("Failed to apply OptimizeWrite.", e) + new ShuffledRowRDD(shuffleDependency, readMetrics) + } + } + } + + private def rebalancePartitions(stats: MapOutputStatistics): Seq[ShufflePartitionSpec] = { + val binSize = Option(sparkContext.getLocalProperty( + DeltaSQLConf.OPTIMIZE_WRITE_BIN_SIZE.key)) + .map(_.toLong) + .getOrElse(DeltaSQLConf.OPTIMIZE_WRITE_BIN_SIZE.defaultValue.get) + val smallPartitionFactor = Option(sparkContext.getLocalProperty( + DeltaSQLConf.OPTIMIZE_WRITE_SMALL_PARTITION_FACTOR.key)) + .map(_.toDouble) + .getOrElse(DeltaSQLConf.OPTIMIZE_WRITE_SMALL_PARTITION_FACTOR.defaultValue.get) + val mergedPartitionFactor = Option(sparkContext.getLocalProperty( + DeltaSQLConf.OPTIMIZE_WRITE_MERGED_PARTITION_FACTOR.key)) + .map(_.toDouble) + .getOrElse(DeltaSQLConf.OPTIMIZE_WRITE_MERGED_PARTITION_FACTOR.defaultValue.get) + val bytesByPartitionId = stats.bytesByPartitionId + val targetPartitionSize = (binSize * PARQUET_COMPRESSION_RATIO).toLong + + val splitPartitions = if (partitioning.isInstanceOf[RoundRobinPartitioning]) { + DeltaShufflePartitionsUtil.splitSizeListByTargetSize( + bytesByPartitionId, + targetPartitionSize, + smallPartitionFactor, + mergedPartitionFactor) + } else { + // For partitioned data, do not coalesce small partitions as it will hurt parallelism. + // Eg. a partition containing 100 partition keys => a task will write 100 files. + Seq.range(0, bytesByPartitionId.length).toArray + } + + def optimizeSkewedPartition(reduceIndex: Int): Seq[ShufflePartitionSpec] = { + val partitionSize = bytesByPartitionId(reduceIndex) + if (partitionSize > targetPartitionSize) { + val shuffleId = shuffleDependency.shuffleId + val newPartitionSpec = DeltaShufflePartitionsUtil.createSkewPartitionSpecs( + shuffleId, + reduceIndex, + targetPartitionSize, + smallPartitionFactor, + mergedPartitionFactor) + + if (newPartitionSpec.isEmpty) { + CoalescedPartitionSpec(reduceIndex, reduceIndex + 1) :: Nil + } else { + logDebug(s"[OptimizeWrite] Partition $reduceIndex is skew, " + + s"split it into ${newPartitionSpec.get.size} parts.") + newPartitionSpec.get + } + } else if (partitionSize > 0) { + CoalescedPartitionSpec(reduceIndex, reduceIndex + 1) :: Nil + } else { + Nil + } + } + + // Transform the partitions to the ranges. + // e.g. [0, 3, 6, 7, 10] -> [[0, 3), [3, 6), [6, 7), [7, 10)] + (splitPartitions :+ stats.bytesByPartitionId.length).sliding(2).flatMap { k => + if (k.head == k.last - 1) { + // If not a merged partition, split it if needed. + optimizeSkewedPartition(k.head) + } else { + CoalescedPartitionSpec(k.head, k.last) :: Nil + } + }.toList + } + + override protected def withNewChildInternal(newChild: SparkPlan): OptimizeWriteExchangeExec = { + copy(child = newChild) + } +} diff --git a/core/src/test/scala/org/apache/spark/sql/delta/DeltaAutoOptimizeSuite.scala b/core/src/test/scala/org/apache/spark/sql/delta/DeltaAutoOptimizeSuite.scala index 65f44b0416a..d723797dc7d 100644 --- a/core/src/test/scala/org/apache/spark/sql/delta/DeltaAutoOptimizeSuite.scala +++ b/core/src/test/scala/org/apache/spark/sql/delta/DeltaAutoOptimizeSuite.scala @@ -16,12 +16,20 @@ package org.apache.spark.sql.delta +import java.io.File + import org.apache.hadoop.fs.Path -import org.apache.spark.sql.test.SharedSparkSession -import org.apache.spark.sql.QueryTest +import org.apache.spark.sql.{QueryTest, Row} +import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, RoundRobinPartitioning} import org.apache.spark.sql.delta.sources.DeltaSQLConf import org.apache.spark.sql.delta.test.DeltaSQLCommandTest +import org.apache.spark.sql.delta.util.DeltaShufflePartitionsUtil +import org.apache.spark.sql.execution.CoalesceExec +import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec +import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec +import org.apache.spark.sql.execution.exchange.REPARTITION_BY_NUM import org.apache.spark.sql.functions._ +import org.apache.spark.sql.test.SharedSparkSession class DeltaAutoOptimizeSuite extends QueryTest with SharedSparkSession with DeltaSQLCommandTest { import testImplicits._ @@ -55,13 +63,42 @@ class DeltaAutoOptimizeSuite extends QueryTest with SharedSparkSession with Delt } } + def writeData( + numFiles: Int, + dataPath: String, + partitioned: Boolean = false, + mode: String = "overwrite"): Unit = { + val df = spark + .range(50000) + .map { _ => + ( + scala.util.Random.nextInt(10000000).toLong, + scala.util.Random.nextInt(1000000000), + scala.util.Random.nextInt(2)) + } + .toDF("colA", "colB", "colC") + .repartition(numFiles) + if (partitioned) { + df.write + .partitionBy("colC") + .mode(mode) + .format("delta") + .save(dataPath) + } else { + df.write + .mode(mode) + .format("delta") + .save(dataPath) + } + } + def checkTableVersionAndNumFiles( path: String, expectedVer: Long, expectedNumFiles: Long): Unit = { val dt = DeltaLog.forTable(spark, path) - assert(dt.snapshot.allFiles.count() == expectedNumFiles) assert(dt.snapshot.version == expectedVer) + assert(dt.snapshot.allFiles.count() == expectedNumFiles) } test("test enabling autoCompact") { @@ -120,6 +157,102 @@ class DeltaAutoOptimizeSuite extends QueryTest with SharedSparkSession with Delt } } + test("test adaptive config and OptimizeWrite enabled") { + withTempDir { dir => + val rootPath = dir.getCanonicalPath + val path = new Path(rootPath, "table1").toString + + var expectedTableVersion = -1 + writeData(20, path) + expectedTableVersion += 1 // version should be 0. + checkTableVersionAndNumFiles(path, expectedTableVersion, 20) + + withSQLConf("spark.sql.adaptive.enabled" -> "true", + DeltaSQLConf.OPTIMIZE_WRITE_ENABLED.key -> "true") { + writeData(20, path) + expectedTableVersion += 1 // OptimizeWrite should be done with write transaction. + checkTableVersionAndNumFiles(path, expectedTableVersion, 1) + } + } + } + + test("test enabling OptimizeWrite") { + val tableName = "optimizeWriteTestTable" + val tableName2 = s"${tableName}2" + withTable(tableName, tableName2) { + withTempDir { dir => + val rootPath = dir.getCanonicalPath + val path = new Path(rootPath, "table1").toString + + { + var expectedTableVersion = -1 + writeData(20, path) + expectedTableVersion += 1 // version should be 0. + checkTableVersionAndNumFiles(path, expectedTableVersion, 20) + + withSQLConf(DeltaSQLConf.OPTIMIZE_WRITE_ENABLED.key -> "true") { + writeData(20, path) + expectedTableVersion += 1 // optimize should be done with write transaction. + checkTableVersionAndNumFiles(path, expectedTableVersion, 1) + } + } + + { + // Test with default table properties. + // Note that 0.6.1 does not support setting table properties using DDL. + // E.g. CREATE/ALTER TABLE; no way to change the properties after it's created. + var expectedTableVersion = -1 + // Test default delta table config + val path2 = new Path(rootPath, "table2").toString + withSQLConf( + "spark.databricks.delta.properties.defaults.autoOptimize.optimizeWrite" -> "true") { + writeData(20, path2) + expectedTableVersion += 1 + checkTableVersionAndNumFiles(path2, expectedTableVersion, 1) + } + + // Session config should be prior to table property. + withSQLConf(DeltaSQLConf.OPTIMIZE_WRITE_ENABLED.key -> "false") { + writeData(20, path2) + expectedTableVersion += 1 // autoCompact should not be triggered + checkTableVersionAndNumFiles(path2, expectedTableVersion, 20) + } + + withSQLConf( + "spark.databricks.delta.properties.defaults.autoOptimize.optimizeWrite" -> "false") { + // defaults config only applied at table creation. + writeData(20, path2) + expectedTableVersion += 1 // autoCompact should be triggered + checkTableVersionAndNumFiles(path2, expectedTableVersion, 1) + } + } + } + } + } + + test("test OptimizeWrite configs") { + withTempDir { dir => + val rootPath = dir.getCanonicalPath + val path = new Path(rootPath, "table1").toString + + withSQLConf(DeltaSQLConf.OPTIMIZE_WRITE_ENABLED.key -> "true", + "spark.sql.shuffle.partitions" -> "20", + DeltaSQLConf.OPTIMIZE_WRITE_BIN_SIZE.key -> "10101") { + // binSize is small, so won't coalesce partitions. + writeData(30, path) + checkTableVersionAndNumFiles(path, 0, 40) + } + + withSQLConf(DeltaSQLConf.OPTIMIZE_WRITE_ENABLED.key -> "true", + "spark.sql.shuffle.partitions" -> "20", + DeltaSQLConf.OPTIMIZE_WRITE_BIN_SIZE.key -> "101010") { + // binSize is small, so won't coalesce partitions. + writeData(30, path) + checkTableVersionAndNumFiles(path, 1, 7) + } + } + } + test("test autoCompact configs") { val tableName = "autoCompactTestTable" withTable(tableName) { @@ -162,6 +295,29 @@ class DeltaAutoOptimizeSuite extends QueryTest with SharedSparkSession with Delt } } + test("test partitioned table with OptimizeWrite") { + val tableName = "optimizeWriteTestTable" + val tableName2 = s"${tableName}2" + withTable(tableName, tableName2) { + withTempDir { dir => + val rootPath = dir.getCanonicalPath + val path = new Path(rootPath, "table1").toString + writeData(20, path, partitioned = true) + checkTableVersionAndNumFiles(path, 0, 40) + + withSQLConf(DeltaSQLConf.OPTIMIZE_WRITE_ENABLED.key -> "true") { + writeData(20, path, partitioned = true) + checkTableVersionAndNumFiles(path, 1, 2) + + withSQLConf(DeltaSQLConf.OPTIMIZE_WRITE_BIN_SIZE.key -> "101010") { + writeData(20, path, partitioned = true) + checkTableVersionAndNumFiles(path, 2, 4) + } + } + } + } + } + test("test max compact data size config") { withTempDir { dir => val rootPath = dir.getCanonicalPath @@ -216,6 +372,50 @@ class DeltaAutoOptimizeSuite extends QueryTest with SharedSparkSession with Delt } } + test("test OptimizeWrite with empty partition") { + withTempDir { dir => + val rootPath = dir.getCanonicalPath + + withSQLConf(DeltaSQLConf.OPTIMIZE_WRITE_ENABLED.key -> "true") { + val path = new Path(rootPath, "table2").toString + val tempPath = new Path(rootPath, "temp").toString + new File(tempPath).mkdir() + writeData(20, path) + val df = spark.read.format("delta").load(path) + + // empty dataframe test + val emptyDF = spark.createDataFrame(spark.sparkContext.emptyRDD[Row], df.schema) + emptyDF.write.mode("append").format("delta").save(path) + + // empty directory test + val emptyDF2 = spark.read.schema(df.schema).parquet(tempPath) + emptyDF2.write.mode("append").format("delta").save(path) + } + } + } + + test("run optimize with OptimizeWrite") { + withTempDir { dir => + val rootPath = dir.getCanonicalPath + val path = new Path(rootPath, "table1").toString + spark.range(100).repartition(50).write.format("delta").save(path) + assert(spark.read.format("delta").load(path).inputFiles.length == 50) + + withSQLConf(DeltaSQLConf.OPTIMIZE_WRITE_ENABLED.key -> "true") { + spark.range(100).repartition(50).write.mode("append").format("delta").save(path) + assert(spark.read.format("delta").load(path).inputFiles.length == 51) + + withSQLConf(DeltaSQLConf.OPTIMIZE_WRITE_BIN_SIZE.key -> "2") { + val dt = io.delta.tables.DeltaTable.forPath(path) + dt.optimize().executeCompaction() + // OptimizeWrite shouldn't been applied. + assert(spark.read.format("delta").load(path).inputFiles.length == 1) + } + } + } + } + +<<<<<<< HEAD test("test autoCompact.target config") { withTempDir { dir => val rootPath = dir.getCanonicalPath @@ -281,4 +481,143 @@ class DeltaAutoOptimizeSuite extends QueryTest with SharedSparkSession with Delt } } } + + test("test DeltaShufflePartitionsUtil.partitioningForRebalance") { + withTempDir { dir => + val rootPath = dir.getCanonicalPath + + // Test partitioned data + withSQLConf("spark.sql.shuffle.partitions" -> "100") { + val path = new Path(rootPath, "table1").toString + writeData(20, path, partitioned = true) + val df = spark.read.format("delta").load(path) + val dl = DeltaLog.forTable(spark, path) + + val partitioning = DeltaShufflePartitionsUtil.partitioningForRebalance( + df.queryExecution.executedPlan.output, + dl.snapshot.metadata.partitionSchema, + spark.sessionState.conf.numShufflePartitions) + + assert(partitioning.isInstanceOf[HashPartitioning]) + assert(partitioning.asInstanceOf[HashPartitioning] + .expressions.map(_.toString).head.contains("colC")) + assert(partitioning.numPartitions == 100) + } + + // Test non partitioned data + withSQLConf("spark.sql.shuffle.partitions" -> "100") { + val path = new Path(rootPath, "table2").toString + writeData(20, path) + val df = spark.read.format("delta").load(path) + val dl = DeltaLog.forTable(spark, path) + + val partitioning = DeltaShufflePartitionsUtil.partitioningForRebalance( + df.queryExecution.executedPlan.output, + dl.snapshot.metadata.partitionSchema, + spark.sessionState.conf.numShufflePartitions) + + assert(partitioning.isInstanceOf[RoundRobinPartitioning]) + assert(partitioning.numPartitions == 100) + } + } + } + + test("test DeltaShufflePartitionsUtil.removeTopRepartition") { + withTempDir { dir => + val rootPath = dir.getCanonicalPath + + withSQLConf("spark.sql.shuffle.partitions" -> "100", + "spark.sql.codegen.whleStage" -> "false") { + val path1 = new Path(rootPath, "table1").toString + writeData(20, path1) + val df = spark.read.format("delta").load(path1) + + withSQLConf("spark.sql.adaptive.enabled" -> "false") { + { + val repartitionDF = df.repartition(3) + val plan = repartitionDF.queryExecution.executedPlan + // Plan should have ShuffleExchangeExec. + val isShuffle = plan match { + case ShuffleExchangeExec(_, _, shuffleOrigin) + if shuffleOrigin.equals(REPARTITION_BY_NUM) => true + case _ => false + } + assert(isShuffle) + + val updatedPlan = DeltaShufflePartitionsUtil.removeTopRepartition(plan) + // ShuffleExchangeExec should be removed. + assert(updatedPlan.equals(plan.children.head)) + } + + { + val coalesceDF = df.coalesce(3) + val plan = coalesceDF.queryExecution.executedPlan + // Plan should have CoalesceExec. + assert(plan.isInstanceOf[CoalesceExec]) + + val updatedPlan = DeltaShufflePartitionsUtil.removeTopRepartition(plan) + // CoalesceExec should be removed. + assert(updatedPlan.equals(plan.children.head)) + } + } + + // Test with AdaptiveSparkPlanExec + withSQLConf("spark.sql.adaptive.enabled" -> "true") { + val repartitionDF = df.repartition(3) + val plan = repartitionDF.queryExecution.executedPlan + // Plan should have ShuffleExchangeExec. + val inputPlan = plan.asInstanceOf[AdaptiveSparkPlanExec].inputPlan + val isShuffle = inputPlan match { + case ShuffleExchangeExec(_, _, shuffleOrigin) + if shuffleOrigin.equals(REPARTITION_BY_NUM) => true + case _ => false + } + assert(isShuffle) + + val updatedPlan = DeltaShufflePartitionsUtil.removeTopRepartition(plan) + // ShuffleExchangeExec should be removed. + assert(updatedPlan.asInstanceOf[AdaptiveSparkPlanExec].inputPlan.equals( + inputPlan.children.head)) + } + } + } + } + + test("test DeltaShufflePartitionsUtil.splitSizeListByTargetSize") { + val targetSize = 100 + val smallPartitionFactor = 0.5 + val mergedPartitionFactor = 1.2 + + // merge the small partitions at the beginning/end + val sizeList1 = Seq[Long](15, 90, 15, 15, 15, 90, 15) + assert(DeltaShufflePartitionsUtil.splitSizeListByTargetSize(sizeList1, targetSize, + smallPartitionFactor, mergedPartitionFactor).toSeq == + Seq(0, 5)) + + // merge the small partitions in the middle + val sizeList2 = Seq[Long](30, 15, 90, 10, 90, 15, 30) + assert(DeltaShufflePartitionsUtil.splitSizeListByTargetSize(sizeList2, targetSize, + smallPartitionFactor, mergedPartitionFactor).toSeq == + Seq(0, 4)) + + // merge small partitions if the partition itself is smaller than + // targetSize * SMALL_PARTITION_FACTOR + val sizeList3 = Seq[Long](15, 1000, 15, 1000) + assert(DeltaShufflePartitionsUtil.splitSizeListByTargetSize(sizeList3, targetSize, + smallPartitionFactor, mergedPartitionFactor).toSeq == + Seq(0, 3)) + + // merge small partitions if the combined size is smaller than + // targetSize * MERGED_PARTITION_FACTOR + val sizeList4 = Seq[Long](35, 75, 90, 20, 35, 25, 35) + assert(DeltaShufflePartitionsUtil.splitSizeListByTargetSize(sizeList4, targetSize, + smallPartitionFactor, mergedPartitionFactor).toSeq == + Seq(0, 2, 3)) + + val sizeList5 = Seq[Long](99, 19, 19, 99, 19, 19, 19, 19, 19, 19, 19, 19, 19, 19) + assert(DeltaShufflePartitionsUtil.splitSizeListByTargetSize(sizeList5, targetSize, + smallPartitionFactor, mergedPartitionFactor).toSeq == + Seq(0, 3, 4, 9)) + + } }