Skip to content

Commit

Permalink
Fix merge command numTargetFilesAdded metric
Browse files Browse the repository at this point in the history
## Description

This PR fixes a bug in `MergeIntoCommand` where the metric `numTargetFilesAdded` sometimes gave an unexpected value. This PR ensures that `MergeIntoCommand` only writes out new files to the target table that are non-empty (i.e. at least 1 row).

Note: the value was never wrong, for example it would say that we wrote out 1 file, and we did in fact write out 1 empty file. However, there was no logical reason for us to write out an empty file with no rows.

This PR also updates existing tests (which knew about this bug and so were ignored) inside of `MergeIntoMetricsBase`.

```
build/sbt 'core/testOnly *DescribeDeltaHistorySuite -- -z "merge-metrics"'
```

Closes #1334

Signed-off-by: Scott Sandre <scott.sandre@databricks.com>
GitOrigin-RevId: 1c04cff75461ec1d2987653ad1a82dcbcf5926c1
  • Loading branch information
scottsand-db authored and allisonport-db committed Aug 24, 2022
1 parent b71ad65 commit 5d22a38
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -540,6 +540,14 @@ case class MergeIntoCommand(

val newFiles = deltaTxn
.writeFiles(repartitionIfNeeded(spark, insertDf, deltaTxn.metadata.partitionColumns))
.filter {
// In some cases (e.g. insert-only when all rows are matched, insert-only with an empty
// source, insert-only with an unsatisfied condition) we can write out an empty insertDf.
// This is hard to catch before the write without collecting the DF ahead of time. Instead,
// we can just accept only the AddFiles that actually add rows.
case a: AddFile => a.getNumLogicalRecords > 0
case _ => true
}

// Update metrics
metrics("numTargetFilesBeforeSkipping") += deltaTxn.snapshot.numOfFiles
Expand Down Expand Up @@ -818,6 +826,14 @@ case class MergeIntoCommand(
// Write to Delta
val newFiles = deltaTxn
.writeFiles(repartitionIfNeeded(spark, outputDF, deltaTxn.metadata.partitionColumns))
.filter {
// In some cases (e.g. delete with empty source, or empty target, or on disjoint tables)
// we can write out an empty outputDF. This is hard to catch before the write without
// collecting the DF ahead of time. Instead, we can just accept only the AddFiles that
// actually add rows.
case a: AddFile => a.getNumLogicalRecords > 0
case _ => true
}

// Update metrics
val (addedBytes, addedPartitions) = totalBytesAndDistinctPartitionValues(newFiles)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,38 +72,16 @@ trait MergeIntoMetricsBase
// test utils //
////////////////

// We store testName --> (partitioned, cdfEnabled) where partitioned and cdfEnabled are
// boolean options. If they are None, we ignore the test for both the true and false values.
// Otherwise we ignore only the boolean value in the option.
val testsToIgnore = Map(
// numTargetFilesAdded - only wrong when partitioned=false
"insert-only when all rows match" -> (Some(false), None),
"insert-only with unsatisfied condition" -> (Some(false), None),
"insert-only with empty source" -> (Some(false), None),
"delete-only with disjoint tables" -> (Some(false), Some(false)),
"delete-only delete all rows" -> (Some(false), Some(false)),
"delete-only with empty source" -> (Some(false), Some(false)),
"delete-only with empty target" -> (Some(false), Some(false)),
"delete-only without join empty source" -> (Some(false), Some(false)),

// numTargetRowsCopied
"delete-only with condition" -> (None, None),
"delete-only with update with unsatisfied condition" -> (None, None),
"delete-only with unsatisfied condition" -> (None, None),
"delete-only with target-only condition" -> (None, None),
"delete-only with source-only condition" -> (None, None),
"match-only with unsatisfied condition" -> (None, None)
val testsToIgnore = Seq(
// The below tests fail due to incorrect numTargetRowsCopied metric.
"delete-only with condition",
"delete-only with update with unsatisfied condition",
"delete-only with unsatisfied condition",
"delete-only with target-only condition",
"delete-only with source-only condition",
"match-only with unsatisfied condition"
)

// Currently multiple metrics are wrong for Merge. We have added tests for these scenarios but
// we need to ignore the failing tests until the metrics are fixed.
private def shouldIgnoreTest(name: String, partitioned: Boolean, cdfEnabled: Boolean): Boolean = {
testsToIgnore.get(name).exists {
case (partitionedValue, cdfEnabledValue) =>
partitionedValue.forall(_ == partitioned) && cdfEnabledValue.forall(_ == cdfEnabled)
}
}

// Helper to generate tests with different configurations.
private def testMergeMetrics(name: String)(testFn: MergeTestConfiguration => Unit): Unit = {
for {
Expand All @@ -113,7 +91,9 @@ trait MergeIntoMetricsBase
val testConfig = MergeTestConfiguration(partitioned = partitioned, cdfEnabled = cdfEnabled)
val testName = s"merge-metrics: $name - Partitioned = $partitioned, CDF = $cdfEnabled"

if (shouldIgnoreTest(name, partitioned, cdfEnabled)) {
if (testsToIgnore.contains(name)) {
// Currently multiple metrics are wrong for Merge. We have added tests for these scenarios
// but we need to ignore the failing tests until the metrics are fixed.
ignore(testName) { testFn(testConfig) }
} else {
test(testName) { testFn(testConfig) }
Expand Down

0 comments on commit 5d22a38

Please sign in to comment.