Skip to content

Commit

Permalink
Use diffCaseSensitivity everywhere
Browse files Browse the repository at this point in the history
  • Loading branch information
EnricoMi committed Sep 12, 2022
1 parent 7277a90 commit dacbcaf
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 35 deletions.
50 changes: 17 additions & 33 deletions src/main/scala/uk/co/gresearch/spark/diff/Diff.scala
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,8 @@ class Differ(options: DiffOptions) {
s"Left column names: ${left.columns.mkString(", ")}\n" +
s"Right column names: ${right.columns.mkString(", ")}")

val ignoreColumnsCs = ignoreColumns.map(handleConfiguredCaseSensitivity).toSet
def isIgnoredColumn(column: String): Boolean = !ignoreColumnsCs.contains(handleConfiguredCaseSensitivity(column))
val leftNonIgnored = left.columns.filter(isIgnoredColumn)
val rightNonIgnored = right.columns.filter(isIgnoredColumn)

def notInWithCaseSensitivity(columns: Seq[String])(column: String): Boolean =
!columns.map(handleConfiguredCaseSensitivity).contains(handleConfiguredCaseSensitivity(column))
val leftNonIgnored = left.columns.diffCaseSensitivity(ignoreColumns)
val rightNonIgnored = right.columns.diffCaseSensitivity(ignoreColumns)

val exceptIgnoredColumnsMsg = if (ignoreColumns.nonEmpty) " except ignored columns" else ""

Expand All @@ -54,8 +49,8 @@ class Differ(options: DiffOptions) {
require(leftNonIgnored.length > 0, s"The schema$exceptIgnoredColumnsMsg must not be empty")

// column types must match but we ignore the nullability of columns
val leftFields = left.schema.fields.filter(f => isIgnoredColumn(f.name)).map(f => handleConfiguredCaseSensitivity(f.name) -> f.dataType)
val rightFields = right.schema.fields.filter(f => isIgnoredColumn(f.name)).map(f => handleConfiguredCaseSensitivity(f.name) -> f.dataType)
val leftFields = left.schema.fields.filter(f => !ignoreColumns.containsCaseSensitivity(f.name)).map(f => handleConfiguredCaseSensitivity(f.name) -> f.dataType)
val rightFields = right.schema.fields.filter(f => !ignoreColumns.containsCaseSensitivity(f.name)).map(f => handleConfiguredCaseSensitivity(f.name) -> f.dataType)
val leftExtraSchema = leftFields.diff(rightFields)
val rightExtraSchema = rightFields.diff(leftFields)
require(leftExtraSchema.isEmpty && rightExtraSchema.isEmpty,
Expand All @@ -65,8 +60,8 @@ class Differ(options: DiffOptions) {

val columns = leftNonIgnored
val pkColumns = if (idColumns.isEmpty) columns.toList else idColumns
val nonPkColumns = columns.filter(notInWithCaseSensitivity(pkColumns))
val missingIdColumns = pkColumns.filter(notInWithCaseSensitivity(columns))
val nonPkColumns = columns.diffCaseSensitivity(pkColumns)
val missingIdColumns = pkColumns.diffCaseSensitivity(columns)
require(missingIdColumns.isEmpty,
s"Some id columns do not exist: ${missingIdColumns.mkString(", ")} missing among ${columns.mkString(", ")}")

Expand All @@ -75,49 +70,41 @@ class Differ(options: DiffOptions) {
s"Some ignore columns do not exist: ${missingIgnoreColumns.mkString(", ")} " +
s"missing among ${(leftNonIgnored ++ rightNonIgnored).distinct.sorted.mkString(", ")}")

require(notInWithCaseSensitivity(pkColumns)(options.diffColumn),
s"The id columns must not contain the diff column name '${options.diffColumn}': " +
s"${pkColumns.mkString(", ")}")
if(Set(DiffMode.LeftSide, DiffMode.RightSide).contains(options.diffMode))
require(notInWithCaseSensitivity(nonPkColumns)(options.diffColumn),
s"The non-id columns must not contain the diff column name '${options.diffColumn}': ${nonPkColumns.mkString((", "))}")

require(options.changeColumn.forall(notInWithCaseSensitivity(pkColumns)),
s"The id columns must not contain the change column name '${options.changeColumn.get}': ${pkColumns.mkString((", "))}")
if(Set(DiffMode.LeftSide, DiffMode.RightSide).contains(options.diffMode))
require(!options.changeColumn.exists(notInWithCaseSensitivity(nonPkColumns)),
s"The non-id columns must not contain the change column name '${options.changeColumn.get}': ${nonPkColumns.mkString((", "))}")
require(!pkColumns.containsCaseSensitivity(options.diffColumn),
s"The id columns must not contain the diff column name '${options.diffColumn}': ${pkColumns.mkString(", ")}")
require(options.changeColumn.forall(!pkColumns.containsCaseSensitivity(_)),
s"The id columns must not contain the change column name '${options.changeColumn.get}': ${pkColumns.mkString(", ")}")

val diffValueColumns = getDiffColumns(pkColumns, nonPkColumns, left, right, ignoreColumns).map(_._1).diff(pkColumns)

if (Seq(DiffMode.LeftSide, DiffMode.RightSide).contains(options.diffMode)) {
require(notInWithCaseSensitivity(diffValueColumns)(options.diffColumn),
require(!diffValueColumns.containsCaseSensitivity(options.diffColumn),
s"The ${if (options.diffMode == DiffMode.LeftSide) "left" else "right"} " +
s"non-id columns must not contain the diff column name '${options.diffColumn}': " +
s"${(if (options.diffMode == DiffMode.LeftSide) left else right).columns.diffCaseSensitivity(idColumns).mkString(", ")}")

options.changeColumn.foreach( changeColumn =>
require(notInWithCaseSensitivity(diffValueColumns)(changeColumn),
require(!diffValueColumns.contains(changeColumn),
s"The ${if (options.diffMode == DiffMode.LeftSide) "left" else "right"} " +
s"non-id columns must not contain the change column name '${options.changeColumn.get}': " +
s"${(if (options.diffMode == DiffMode.LeftSide) left else right).columns.diffCaseSensitivity(idColumns).mkString(", ")}")
)
} else {
require(notInWithCaseSensitivity(diffValueColumns)(options.diffColumn),
require(!diffValueColumns.containsCaseSensitivity(options.diffColumn),
s"The column prefixes '${options.leftColumnPrefix}' and '${options.rightColumnPrefix}', " +
s"together with these non-id columns " +
s"must not produce the diff column name '${options.diffColumn}': " +
s"${nonPkColumns.mkString(", ")}")

options.changeColumn.foreach( changeColumn =>
require(notInWithCaseSensitivity(diffValueColumns)(changeColumn),
require(!diffValueColumns.containsCaseSensitivity(changeColumn),
s"The column prefixes '${options.leftColumnPrefix}' and '${options.rightColumnPrefix}', " +
s"together with these non-id columns " +
s"must not produce the change column name '${changeColumn}': " +
s"${nonPkColumns.mkString(", ")}")
)

require(diffValueColumns.forall(column => notInWithCaseSensitivity(pkColumns)(column)),
require(diffValueColumns.forall(!pkColumns.containsCaseSensitivity(_)),
s"The column prefixes '${options.leftColumnPrefix}' and '${options.rightColumnPrefix}', " +
s"together with these non-id columns " +
s"must not produce any id column name '${pkColumns.mkString("', '")}': " +
Expand Down Expand Up @@ -217,12 +204,9 @@ class Differ(options: DiffOptions) {
private def doDiff[T, U](left: Dataset[T], right: Dataset[U], idColumns: Seq[String], ignoreColumns: Seq[String] = Seq.empty): DataFrame = {
checkSchema(left, right, idColumns, ignoreColumns)

val ignoreColumnsCs = ignoreColumns.map(handleConfiguredCaseSensitivity).toSet

val columns = left.columns.filter(c => !ignoreColumnsCs.contains(handleConfiguredCaseSensitivity(c))).toList
val columns = left.columns.diffCaseSensitivity(ignoreColumns).toList
val pkColumns = if (idColumns.isEmpty) columns else idColumns
val pkColumnsCs = pkColumns.map(handleConfiguredCaseSensitivity).toSet
val valueColumns = columns.filter(col => !pkColumnsCs.contains(handleConfiguredCaseSensitivity(col)))
val valueColumns = columns.diffCaseSensitivity(pkColumns)

val existsColumnName = distinctPrefixFor(left.columns) + "exists"
val leftWithExists = left.withColumn(existsColumnName, lit(1))
Expand Down
4 changes: 2 additions & 2 deletions src/test/scala/uk/co/gresearch/spark/diff/DiffSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1114,13 +1114,13 @@ class DiffSuite extends AnyFunSuite with SparkTestSession {
test("diff with left-side diff mode and diff column name in value columns") {
val options = DiffOptions.default.withDiffColumn("value").withDiffMode(DiffMode.LeftSide)
doTestRequirement(left.diff(right, options, "id"),
"The non-id columns must not contain the diff column name 'value': value")
"The left non-id columns must not contain the diff column name 'value': value")
}

test("diff with right-side diff mode and diff column name in value columns") {
val options = DiffOptions.default.withDiffColumn("value").withDiffMode(DiffMode.RightSide)
doTestRequirement(right.diff(right, options, "id"),
"The non-id columns must not contain the diff column name 'value': value")
"The right non-id columns must not contain the diff column name 'value': value")
}

test("diff with left-side diff mode and change column name in value columns") {
Expand Down

0 comments on commit dacbcaf

Please sign in to comment.