From 5824e23b04be45a691063395ec90beeb000cd088 Mon Sep 17 00:00:00 2001 From: Enrico Minack Date: Wed, 7 Sep 2022 15:18:11 +0200 Subject: [PATCH 1/6] Diff similar DataFrames with ignoreColumns --- .../uk/co/gresearch/spark/diff/Diff.scala | 300 +++++++++--------- 1 file changed, 153 insertions(+), 147 deletions(-) diff --git a/src/main/scala/uk/co/gresearch/spark/diff/Diff.scala b/src/main/scala/uk/co/gresearch/spark/diff/Diff.scala index 14dcd97f..30849c6f 100644 --- a/src/main/scala/uk/co/gresearch/spark/diff/Diff.scala +++ b/src/main/scala/uk/co/gresearch/spark/diff/Diff.scala @@ -30,7 +30,7 @@ import scala.collection.JavaConverters */ class Differ(options: DiffOptions) { - private[diff] def checkSchema[T](left: Dataset[T], right: Dataset[T], idColumns: String*): Unit = { + private[diff] def checkSchema[T, U](left: Dataset[T], right: Dataset[U], idColumns: Seq[String], ignoreColumns: Seq[String]): Unit = { require(left.columns.length == left.columns.toSet.size && right.columns.length == right.columns.toSet.size, "The datasets have duplicate columns.\n" + @@ -144,8 +144,9 @@ class Differ(options: DiffOptions) { ) } - private[diff] def getDiffColumns[T](pkColumns: Seq[String], otherColumns: Seq[String], - left: Dataset[T], right: Dataset[T]): Seq[Column] = { + private[diff] def getDiffColumns[T, U](pkColumns: Seq[String], otherColumns: Seq[String], + left: Dataset[T], right: Dataset[U], + ignoreColumns: Seq[String]): Seq[Column] = { val idColumns = pkColumns.map(c => coalesce(left(backticks(c)), right(backticks(c))).as(c)) val (leftValues, rightValues) = if (options.sparseMode) { @@ -181,14 +182,48 @@ class Differ(options: DiffOptions) { idColumns ++ valueColumns } + private def doDiff[T, U](left: Dataset[T], right: Dataset[U], idColumns: Seq[String], ignoreColumns: Seq[String] = Seq.empty): DataFrame = { + checkSchema(left, right, idColumns, ignoreColumns) + + val pkColumns = if (idColumns.isEmpty) left.columns.toList else idColumns + val pkColumnsCs = pkColumns.map(handleConfiguredCaseSensitivity).toSet + val nonPkColumns = left.columns.filter(col => !pkColumnsCs.contains(handleConfiguredCaseSensitivity(col))) + + val ignoreColumnsCs = ignoreColumns.map(handleConfiguredCaseSensitivity).toSet + val valueColumns = nonPkColumns.filter(col => !ignoreColumnsCs(handleConfiguredCaseSensitivity(col))) + + val existsColumnName = distinctPrefixFor(left.columns) + "exists" + val leftWithExists = left.withColumn(existsColumnName, lit(1)) + val rightWithExists = right.withColumn(existsColumnName, lit(1)) + val joinCondition = pkColumns.map(c => leftWithExists(backticks(c)) <=> rightWithExists(backticks(c))).reduce(_ && _) + val unChanged = valueColumns.map(c => leftWithExists(backticks(c)) <=> rightWithExists(backticks(c))).reduceOption(_ && _) + val changeCondition = not(unChanged.getOrElse(lit(true))) + + val diffActionColumn = + when(leftWithExists(existsColumnName).isNull, lit(options.insertDiffValue)). + when(rightWithExists(existsColumnName).isNull, lit(options.deleteDiffValue)). + when(changeCondition, lit(options.changeDiffValue)). + otherwise(lit(options.nochangeDiffValue)). + as(options.diffColumn) + + val diffColumns = getDiffColumns(pkColumns, nonPkColumns, left, right, ignoreColumns) + val changeColumn = getChangeColumn(existsColumnName, valueColumns, leftWithExists, rightWithExists) + // turn this column into a sequence of one or none column so we can easily concat it below with diffActionColumn and diffColumns + .map(Seq(_)) + .getOrElse(Seq.empty[Column]) + + leftWithExists.join(rightWithExists, joinCondition, "fullouter") + .select((diffActionColumn +: changeColumn) ++ diffColumns: _*) + } + /** - * Returns a new DataFrame that contains the differences between the two Datasets + * Returns a new DataFrame that contains the differences between two Datasets * of the same type `T`. Both Datasets must contain the same set of column names and data types. * The order of columns in the two Datasets is not important as columns are compared based on the * name, not the the position. * * Optional `id` columns are used to uniquely identify rows to compare. If values in any non-id - * column are differing between the two Datasets, then that row is marked as `"C"`hange + * column are differing between two Datasets, then that row is marked as `"C"`hange * and `"N"`o-change otherwise. Rows of the right Dataset, that do not exist in the left Dataset * (w.r.t. the values in the id columns) are marked as `"I"`nsert. And rows of the left Dataset, * that do not exist in the right Dataset are marked as `"D"`elete. @@ -240,13 +275,13 @@ class Differ(options: DiffOptions) { } /** - * Returns a new DataFrame that contains the differences between the two Datasets - * of the same type `T`. Both Datasets must contain the same set of column names and data types. - * The order of columns in the two Datasets is not important as columns are compared based on the - * name, not the the position. + * Returns a new DataFrame that contains the differences between two Datasets of + * similar types `T` and `U`. Both Datasets must contain the same set of column names and data types, + * except for the columns in `ignoreColumns`. The order of columns in the two Datasets is not important as + * columns are compared based on the name, not the the position. * * Optional id columns are used to uniquely identify rows to compare. If values in any non-id - * column are differing between the two Datasets, then that row is marked as `"C"`hange + * column are differing between two Datasets, then that row is marked as `"C"`hange * and `"N"`o-change otherwise. Rows of the right Dataset, that do not exist in the left Dataset * (w.r.t. the values in the id columns) are marked as `"I"`nsert. And rows of the left Dataset, * that do not exist in the right Dataset are marked as `"D"`elete. @@ -294,48 +329,17 @@ class Differ(options: DiffOptions) { * columns of this Dataset are id columns and appear in the same order. The remaining non-id * columns are in the order of this Dataset. */ - def diff[T](left: Dataset[T], right: Dataset[T], idColumns: Seq[String], ignoreColumns: Seq[String] = Seq.empty): DataFrame = { - checkSchema(left, right, idColumns: _*) - - val pkColumns = if (idColumns.isEmpty) left.columns.toList else idColumns - val pkColumnsCs = pkColumns.map(handleConfiguredCaseSensitivity).toSet - val nonPkColumns = left.columns.filter(col => !pkColumnsCs.contains(handleConfiguredCaseSensitivity(col))) - - val ignoreColumnsCs = ignoreColumns.map(handleConfiguredCaseSensitivity).toSet - val valueColumns = nonPkColumns.filter(col => !ignoreColumnsCs(handleConfiguredCaseSensitivity(col))) - - val existsColumnName = distinctPrefixFor(left.columns) + "exists" - val leftWithExists = left.withColumn(existsColumnName, lit(1)) - val rightWithExists = right.withColumn(existsColumnName, lit(1)) - val joinCondition = pkColumns.map(c => leftWithExists(backticks(c)) <=> rightWithExists(backticks(c))).reduce(_ && _) - val unChanged = valueColumns.map(c => leftWithExists(backticks(c)) <=> rightWithExists(backticks(c))).reduceOption(_ && _) - val changeCondition = not(unChanged.getOrElse(lit(true))) - - val diffActionColumn = - when(leftWithExists(existsColumnName).isNull, lit(options.insertDiffValue)). - when(rightWithExists(existsColumnName).isNull, lit(options.deleteDiffValue)). - when(changeCondition, lit(options.changeDiffValue)). - otherwise(lit(options.nochangeDiffValue)). - as(options.diffColumn) - - val diffColumns = getDiffColumns(pkColumns, nonPkColumns, left, right) - val changeColumn = getChangeColumn(existsColumnName, valueColumns, leftWithExists, rightWithExists) - // turn this column into a sequence of one or none column so we can easily concat it below with diffActionColumn and diffColumns - .map(Seq(_)) - .getOrElse(Seq.empty[Column]) - - leftWithExists.join(rightWithExists, joinCondition, "fullouter") - .select((diffActionColumn +: changeColumn) ++ diffColumns: _*) - } + def diff[T, U](left: Dataset[T], right: Dataset[U], idColumns: Seq[String], ignoreColumns: Seq[String] = Seq.empty): DataFrame = + doDiff(left, right, idColumns, ignoreColumns) /** - * Returns a new DataFrame that contains the differences between the two Datasets - * of the same type `T`. Both Datasets must contain the same set of column names and data types. - * The order of columns in the two Datasets is not important as columns are compared based on the - * name, not the the position. + * Returns a new DataFrame that contains the differences between two Datasets of + * similar types `T` and `U`. Both Datasets must contain the same set of column names and data types, + * except for the columns in `ignoreColumns`. The order of columns in the two Datasets is not relevant as + * columns are compared based on the name, not the the position. * * Optional id columns are used to uniquely identify rows to compare. If values in any non-id - * column are differing between the two Datasets, then that row is marked as `"C"`hange + * column are differing between two Datasets, then that row is marked as `"C"`hange * and `"N"`o-change otherwise. Rows of the right Dataset, that do not exist in the left Dataset * (w.r.t. the values in the id columns) are marked as `"I"`nsert. And rows of the left Dataset, * that do not exist in the right Dataset are marked as `"D"`elete. @@ -383,58 +387,60 @@ class Differ(options: DiffOptions) { * columns of this Dataset are id columns and appear in the same order. The remaining non-id * columns are in the order of this Dataset. */ - def diff[T](left: Dataset[T], right: Dataset[T], idColumns: java.util.List[String], ignoreColumns: java.util.List[String]): DataFrame = { + def diff[T, U](left: Dataset[T], right: Dataset[U], idColumns: java.util.List[String], ignoreColumns: java.util.List[String]): DataFrame = { diff(left, right, JavaConverters.iterableAsScalaIterable(idColumns).toSeq, JavaConverters.iterableAsScalaIterable(ignoreColumns).toSeq) } /** - * Returns a new Dataset that contains the differences between the two Datasets of the same type `T`. + * Returns a new Dataset that contains the differences between two Datasets of the same type `T`. * - * See `diff(Dataset[T], Dataset[T], String*)`. + * See `diff(Dataset[T], Dataset[U], String*)`. * - * This requires an additional implicit `Encoder[U]` for the return type `Dataset[U]`. + * This requires an additional implicit `Encoder[V]` for the return type `Dataset[V]`. */ // no @scala.annotation.varargs here as implicit arguments are explicit in Java // this signature is redundant to the other diffAs method in Java - def diffAs[T, U](left: Dataset[T], right: Dataset[T], idColumns: String*) - (implicit diffEncoder: Encoder[U]): Dataset[U] = { + def diffAs[T, U, V](left: Dataset[T], right: Dataset[T], idColumns: String*) + (implicit diffEncoder: Encoder[V]): Dataset[V] = { diffAs(left, right, diffEncoder, idColumns: _*) } /** - * Returns a new Dataset that contains the differences between the two Datasets of the same type `T`. + * Returns a new Dataset that contains the differences between two Datasets of + * similar types `T` and `U`. * - * See `diff(Dataset[T], Dataset[T], Seq[String], Seq[String])`. + * See `diff(Dataset[T], Dataset[U], Seq[String], Seq[String])`. * - * This requires an additional implicit `Encoder[U]` for the return type `Dataset[U]`. + * This requires an additional implicit `Encoder[V]` for the return type `Dataset[V]`. */ - def diffAs[T, U](left: Dataset[T], right: Dataset[T], idColumns: Seq[String], ignoreColumns: Seq[String] = Seq.empty) - (implicit diffEncoder: Encoder[U]): Dataset[U] = { + def diffAs[T, U, V](left: Dataset[T], right: Dataset[U], idColumns: Seq[String], ignoreColumns: Seq[String] = Seq.empty) + (implicit diffEncoder: Encoder[V]): Dataset[V] = { diffAs(left, right, diffEncoder, idColumns, ignoreColumns) } /** - * Returns a new Dataset that contains the differences between the two Datasets of the same type `T`. + * Returns a new Dataset that contains the differences between two Datasets of the same type `T`. * * See `diff(Dataset[T], Dataset[T], String*)`. * - * This requires an additional explicit `Encoder[U]` for the return type `Dataset[U]`. + * This requires an additional explicit `Encoder[V]` for the return type `Dataset[V]`. */ @scala.annotation.varargs - def diffAs[T, U](left: Dataset[T], right: Dataset[T], - diffEncoder: Encoder[U], idColumns: String*): Dataset[U] = { + def diffAs[T, V](left: Dataset[T], right: Dataset[T], + diffEncoder: Encoder[V], idColumns: String*): Dataset[V] = { diffAs(left, right, diffEncoder, idColumns, Seq.empty) } /** - * Returns a new Dataset that contains the differences between the two Datasets of the same type `T`. + * Returns a new Dataset that contains the differences between two Datasets of + * similar types `T` and `U`. * - * See `of(Dataset[T], Dataset[T], Seq[String], Seq[String])`. + * See `of(Dataset[T], Dataset[U], Seq[String], Seq[String])`. * - * This requires an additional explicit `Encoder[U]` for the return type `Dataset[U]`. + * This requires an additional explicit `Encoder[V]` for the return type `Dataset[V]`. */ - def diffAs[T, U](left: Dataset[T], right: Dataset[T], - diffEncoder: Encoder[U], idColumns: Seq[String], ignoreColumns: Seq[String]): Dataset[U] = { + def diffAs[T, U, V](left: Dataset[T], right: Dataset[U], + diffEncoder: Encoder[V], idColumns: Seq[String], ignoreColumns: Seq[String]): Dataset[V] = { val nonIdColumns = left.columns.diff(if (idColumns.isEmpty) left.columns.toList else idColumns) val encColumns = diffEncoder.schema.fields.map(_.name) val diffColumns = Seq(options.diffColumn) ++ idColumns ++ getDiffValueColumns(nonIdColumns, options.diffMode) @@ -444,57 +450,55 @@ class Differ(options: DiffOptions) { s"Diff encoder's columns must be part of the diff result schema, " + s"these columns are unexpected: ${extraColumns.mkString(", ")}") - diff(left, right, idColumns, ignoreColumns).as[U](diffEncoder) + diff(left, right, idColumns, ignoreColumns).as[V](diffEncoder) } /** - * Returns a new Dataset that contains the differences between the two Datasets of the same type `T`. + * Returns a new Dataset that contains the differences between two Datasets of + * similar types `T` and `U`. * - * See `of(Dataset[T], Dataset[T], Seq[String], Seq[String])`. + * See `of(Dataset[T], Dataset[U], Seq[String], Seq[String])`. * - * This requires an additional explicit `Encoder[U]` for the return type `Dataset[U]`. + * This requires an additional explicit `Encoder[V]` for the return type `Dataset[V]`. */ - def diffAs[T, U](left: Dataset[T], right: Dataset[T], diffEncoder: Encoder[U], - idColumns: java.util.List[String], ignoreColumns: java.util.List[String]): Dataset[U] = { + def diffAs[T, U, V](left: Dataset[T], right: Dataset[U], diffEncoder: Encoder[V], + idColumns: java.util.List[String], ignoreColumns: java.util.List[String]): Dataset[V] = { diffAs(left, right, diffEncoder, JavaConverters.iterableAsScalaIterable(idColumns).toSeq, JavaConverters.iterableAsScalaIterable(ignoreColumns).toSeq) } /** - * Returns a new Dataset that contains the differences - * between this and the other Dataset of the same type `T` - * as tuples of type `(String, T, T)`. + * Returns a new Dataset that contains the differences between two Dataset of + * the same type `T` as tuples of type `(String, T, T)`. * * See `diff(Dataset[T], Dataset[T], String*)`. */ @scala.annotation.varargs def diffWith[T](left: Dataset[T], right: Dataset[T], idColumns: String*): Dataset[(String, T, T)] = { val df = diff(left, right, idColumns: _*) - diffWith(df, idColumns: _*)(left.encoder) + diffWith(df, idColumns: _*)(left.encoder, right.encoder) } /** - * Returns a new Dataset that contains the differences - * between this and the other Dataset of the same type `T` - * as tuples of type `(String, T, T)`. + * Returns a new Dataset that contains the differences between two Dataset of + * similar types `T` and `U` as tuples of type `(String, T, U)`. * - * See `diff(Dataset[T], Dataset[T], Seq[String], Seq[String])`. + * See `diff(Dataset[T], Dataset[U], Seq[String], Seq[String])`. */ - def diffWith[T](left: Dataset[T], right: Dataset[T], - idColumns: Seq[String], ignoreColumns: Seq[String]): Dataset[(String, T, T)] = { + def diffWith[T, U](left: Dataset[T], right: Dataset[U], + idColumns: Seq[String], ignoreColumns: Seq[String]): Dataset[(String, T, U)] = { val df = diff(left, right, idColumns, ignoreColumns) - diffWith(df, idColumns: _*)(left.encoder) + diffWith(df, idColumns: _*)(left.encoder, right.encoder) } /** - * Returns a new Dataset that contains the differences - * between this and the other Dataset of the same type `T` - * as tuples of type `(String, T, T)`. + * Returns a new Dataset that contains the differences between two Dataset of + * similar types `T` and `U` as tuples of type `(String, T, U)`. * - * See `diff(Dataset[T], Dataset[T], Seq[String], Seq[String])`. + * See `diff(Dataset[T], Dataset[U], Seq[String], Seq[String])`. */ - def diffWith[T](left: Dataset[T], right: Dataset[T], - idColumns: java.util.List[String], ignoreColumns: java.util.List[String]): Dataset[(String, T, T)] = { + def diffWith[T, U](left: Dataset[T], right: Dataset[U], + idColumns: java.util.List[String], ignoreColumns: java.util.List[String]): Dataset[(String, T, U)] = { diffWith(left, right, JavaConverters.iterableAsScalaIterable(idColumns).toSeq, JavaConverters.iterableAsScalaIterable(ignoreColumns).toSeq) } @@ -506,7 +510,7 @@ class Differ(options: DiffOptions) { .map(c => if (idColumns.contains(c)) col(c) else col(c).as(c.replace(prefix, ""))) } - private def diffWith[T : Encoder](diff: DataFrame, idColumns: String*): Dataset[(String, T, T)] = { + private def diffWith[T : Encoder, U : Encoder](diff: DataFrame, idColumns: String*): Dataset[(String, T, U)] = { val leftColumns = columnsOfSide(diff, idColumns, options.leftColumnPrefix) val rightColumns = columnsOfSide(diff, idColumns, options.rightColumnPrefix) @@ -520,8 +524,8 @@ class Differ(options: DiffOptions) { val plan = diff.select(diffColumn, leftStruct, rightStruct).queryExecution.logical - val encoder: Encoder[(String, T, T)] = Encoders.tuple( - Encoders.STRING, implicitly[Encoder[T]], implicitly[Encoder[T]] + val encoder: Encoder[(String, T, U)] = Encoders.tuple( + Encoders.STRING, implicitly[Encoder[T]], implicitly[Encoder[U]] ) new Dataset(diff.sparkSession, plan, encoder) @@ -536,13 +540,13 @@ object Diff { val default = new Differ(DiffOptions.default) /** - * Returns a new DataFrame that contains the differences between the two Datasets + * Returns a new DataFrame that contains the differences between two Datasets * of the same type `T`. Both Datasets must contain the same set of column names and data types. - * The order of columns in the two Datasets is not important as columns are compared based on the + * The order of columns in the two Datasets is not relevant as columns are compared based on the * name, not the the position. * * Optional id columns are used to uniquely identify rows to compare. If values in any non-id - * column are differing between the two Datasets, then that row is marked as `"C"`hange + * column are differing between two Datasets, then that row is marked as `"C"`hange * and `"N"`o-change otherwise. Rows of the right Dataset, that do not exist in the left Dataset * (w.r.t. the values in the id columns) are marked as `"I"`nsert. And rows of the left Dataset, * that do not exist in the right Dataset are marked as `"D"`elete. @@ -593,13 +597,13 @@ object Diff { default.diff(left, right, idColumns: _*) /** - * Returns a new DataFrame that contains the differences between the two Datasets - * of the same type `T`. Both Datasets must contain the same set of column names and data types. - * The order of columns in the two Datasets is not important as columns are compared based on the - * name, not the the position. + * Returns a new DataFrame that contains the differences between two Datasets of + * similar types `T` and `U`. Both Datasets must contain the same set of column names and data types, + * except for the columns in `ignoreColumns`. The order of columns in the two Datasets is not important as + * columns are compared based on the name, not the the position. * * Optional id columns are used to uniquely identify rows to compare. If values in any non-id - * column are differing between the two Datasets, then that row is marked as `"C"`hange + * column are differing between two Datasets, then that row is marked as `"C"`hange * and `"N"`o-change otherwise. Rows of the right Dataset, that do not exist in the left Dataset * (w.r.t. the values in the id columns) are marked as `"I"`nsert. And rows of the left Dataset, * that do not exist in the right Dataset are marked as `"D"`elete. @@ -647,17 +651,17 @@ object Diff { * columns of this Dataset are id columns and appear in the same order. The remaining non-id * columns are in the order of this Dataset. */ - def of[T](left: Dataset[T], right: Dataset[T], idColumns: Seq[String], ignoreColumns: Seq[String] = Seq.empty): DataFrame = + def of[T, U](left: Dataset[T], right: Dataset[U], idColumns: Seq[String], ignoreColumns: Seq[String] = Seq.empty): DataFrame = default.diff(left, right, idColumns, ignoreColumns) /** - * Returns a new DataFrame that contains the differences between the two Datasets - * of the same type `T`. Both Datasets must contain the same set of column names and data types. - * The order of columns in the two Datasets is not important as columns are compared based on the - * name, not the the position. + * Returns a new DataFrame that contains the differences between two Datasets of + * similar types `T` and `U`. Both Datasets must contain the same set of column names and data types, + * except for the columns in `ignoreColumns`. The order of columns in the two Datasets is not important as + * columns are compared based on the name, not the the position. * * Optional id columns are used to uniquely identify rows to compare. If values in any non-id - * column are differing between the two Datasets, then that row is marked as `"C"`hange + * column are differing between two Datasets, then that row is marked as `"C"`hange * and `"N"`o-change otherwise. Rows of the right Dataset, that do not exist in the left Dataset * (w.r.t. the values in the id columns) are marked as `"I"`nsert. And rows of the left Dataset, * that do not exist in the right Dataset are marked as `"D"`elete. @@ -705,71 +709,75 @@ object Diff { * columns of this Dataset are id columns and appear in the same order. The remaining non-id * columns are in the order of this Dataset. */ - def of[T](left: Dataset[T], right: Dataset[T], idColumns: java.util.List[String], ignoreColumns: java.util.List[String]): DataFrame = + def of[T, U](left: Dataset[T], right: Dataset[U], idColumns: java.util.List[String], ignoreColumns: java.util.List[String]): DataFrame = default.diff(left, right, idColumns, ignoreColumns) /** - * Returns a new Dataset that contains the differences between the two Datasets of the same type `T`. + * Returns a new Dataset that contains the differences between two Datasets of + * the same type `T`. * * See `of(Dataset[T], Dataset[T], String*)`. * - * This requires an additional implicit `Encoder[U]` for the return type `Dataset[U]`. + * This requires an additional implicit `Encoder[V]` for the return type `Dataset[V]`. */ // no @scala.annotation.varargs here as implicit arguments are explicit in Java // this signature is redundant to the other ofAs method in Java - def ofAs[T, U](left: Dataset[T], right: Dataset[T], idColumns: String*) - (implicit diffEncoder: Encoder[U]): Dataset[U] = + def ofAs[T, V](left: Dataset[T], right: Dataset[T], idColumns: String*) + (implicit diffEncoder: Encoder[V]): Dataset[V] = default.diffAs(left, right, idColumns: _*) /** - * Returns a new Dataset that contains the differences between the two Datasets of the same type `T`. + * Returns a new DataFrame that contains the differences between two Datasets of + * similar types `T` and `U`. * - * See `of(Dataset[T], Dataset[T], Seq[String], Seq[String])`. + * See `of(Dataset[T], Dataset[U], Seq[String], Seq[String])`. * - * This requires an additional implicit `Encoder[U]` for the return type `Dataset[U]`. + * This requires an additional implicit `Encoder[V]` for the return type `Dataset[V]`. */ - def ofAs[T, U](left: Dataset[T], right: Dataset[T], idColumns: Seq[String], ignoreColumns: Seq[String] = Seq.empty) - (implicit diffEncoder: Encoder[U]): Dataset[U] = + def ofAs[T, U, V](left: Dataset[T], right: Dataset[U], idColumns: Seq[String], ignoreColumns: Seq[String] = Seq.empty) + (implicit diffEncoder: Encoder[V]): Dataset[V] = default.diffAs(left, right, idColumns, ignoreColumns) /** - * Returns a new Dataset that contains the differences between the two Datasets of the same type `T`. + * Returns a new Dataset that contains the differences between two Datasets of + * the same type `T`. * * See `of(Dataset[T], Dataset[T], String*)`. * - * This requires an additional explicit `Encoder[U]` for the return type `Dataset[U]`. + * This requires an additional explicit `Encoder[V]` for the return type `Dataset[V]`. */ @scala.annotation.varargs - def ofAs[T, U](left: Dataset[T], right: Dataset[T], - diffEncoder: Encoder[U], idColumns: String*): Dataset[U] = + def ofAs[T, V](left: Dataset[T], right: Dataset[T], + diffEncoder: Encoder[V], idColumns: String*): Dataset[V] = default.diffAs(left, right, diffEncoder, idColumns: _*) /** - * Returns a new Dataset that contains the differences between the two Datasets of the same type `T`. + * Returns a new DataFrame that contains the differences between two Datasets of + * similar types `T` and `U`. * - * See `of(Dataset[T], Dataset[T], Seq[String], Seq[String])`. + * See `of(Dataset[T], Dataset[U], Seq[String], Seq[String])`. * - * This requires an additional explicit `Encoder[U]` for the return type `Dataset[U]`. + * This requires an additional explicit `Encoder[V]` for the return type `Dataset[V]`. */ - def ofAs[T, U](left: Dataset[T], right: Dataset[T], - diffEncoder: Encoder[U], idColumns: Seq[String], ignoreColumns: Seq[String]): Dataset[U] = + def ofAs[T, U, V](left: Dataset[T], right: Dataset[U], + diffEncoder: Encoder[V], idColumns: Seq[String], ignoreColumns: Seq[String]): Dataset[V] = default.diffAs(left, right, diffEncoder, idColumns, ignoreColumns) /** - * Returns a new Dataset that contains the differences between the two Datasets of the same type `T`. + * Returns a new DataFrame that contains the differences between two Datasets of + * similar types `T` and `U`. * - * See `of(Dataset[T], Dataset[T], Seq[String], Seq[String])`. + * See `of(Dataset[T], Dataset[U], Seq[String], Seq[String])`. * - * This requires an additional explicit `Encoder[U]` for the return type `Dataset[U]`. + * This requires an additional explicit `Encoder[V]` for the return type `Dataset[V]`. */ - def ofAs[T, U](left: Dataset[T], right: Dataset[T], diffEncoder: Encoder[U], - idColumns: java.util.List[String], ignoreColumns: java.util.List[String]): Dataset[U] = + def ofAs[T, U, V](left: Dataset[T], right: Dataset[U], diffEncoder: Encoder[V], + idColumns: java.util.List[String], ignoreColumns: java.util.List[String]): Dataset[V] = default.diffAs(left, right, diffEncoder, idColumns, ignoreColumns) /** - * Returns a new Dataset that contains the differences - * between this and the other Dataset of the same type `T` - * as tuples of type `(String, T, T)`. + * Returns a new Dataset that contains the differences between two Dataset of + * the same type `T` as tuples of type `(String, T, T)`. * * See `of(Dataset[T], Dataset[T], String*)`. */ @@ -778,24 +786,22 @@ object Diff { default.diffWith(left, right, idColumns: _*) /** - * Returns a new Dataset that contains the differences - * between this and the other Dataset of the same type `T` - * as tuples of type `(String, T, T)`. + * Returns a new DataFrame that contains the differences between two Datasets of + * similar types `T` and `U` as tuples of type `(String, T, U)`. * - * See `of(Dataset[T], Dataset[T], Seq[String], Seq[String])`. + * See `of(Dataset[T], Dataset[U], Seq[String], Seq[String])`. */ - def ofWith[T](left: Dataset[T], right: Dataset[T], - idColumns: Seq[String], ignoreColumns: Seq[String]): Dataset[(String, T, T)] = + def ofWith[T, U](left: Dataset[T], right: Dataset[U], + idColumns: Seq[String], ignoreColumns: Seq[String]): Dataset[(String, T, U)] = default.diffWith(left, right, idColumns, ignoreColumns) /** - * Returns a new Dataset that contains the differences - * between this and the other Dataset of the same type `T` - * as tuples of type `(String, T, T)`. + * Returns a new DataFrame that contains the differences between two Datasets of + * similar types `T` and `U` as tuples of type `(String, T, U)`. * - * See `of(Dataset[T], Dataset[T], Seq[String], Seq[String])`. + * See `of(Dataset[T], Dataset[U], Seq[String], Seq[String])`. */ - def ofWith[T](left: Dataset[T], right: Dataset[T], - idColumns: java.util.List[String], ignoreColumns: java.util.List[String]): Dataset[(String, T, T)] = + def ofWith[T, U](left: Dataset[T], right: Dataset[U], + idColumns: java.util.List[String], ignoreColumns: java.util.List[String]): Dataset[(String, T, U)] = default.diffWith(left, right, idColumns, ignoreColumns) } From b1c2401a54bbb09f299728b8e13eed911ad007d2 Mon Sep 17 00:00:00 2001 From: Enrico Minack Date: Mon, 12 Sep 2022 14:45:19 +0200 Subject: [PATCH 2/6] Completed implementation and tests --- .../uk/co/gresearch/spark/diff/Diff.scala | 208 +++++----- .../uk/co/gresearch/spark/diff/package.scala | 129 ++++--- .../co/gresearch/spark/diff/DiffSuite.scala | 357 +++++++++++++++--- 3 files changed, 497 insertions(+), 197 deletions(-) diff --git a/src/main/scala/uk/co/gresearch/spark/diff/Diff.scala b/src/main/scala/uk/co/gresearch/spark/diff/Diff.scala index 30849c6f..53ac4124 100644 --- a/src/main/scala/uk/co/gresearch/spark/diff/Diff.scala +++ b/src/main/scala/uk/co/gresearch/spark/diff/Diff.scala @@ -19,7 +19,6 @@ package uk.co.gresearch.spark.diff import org.apache.spark.sql._ import org.apache.spark.sql.functions._ import org.apache.spark.sql.types.{ArrayType, StringType} -import uk.co.gresearch.spark.diff.DiffMode.DiffMode import uk.co.gresearch.spark.{backticks, distinctPrefixFor} import scala.collection.JavaConverters @@ -37,16 +36,26 @@ class Differ(options: DiffOptions) { s"Left column names: ${left.columns.mkString(", ")}\n" + s"Right column names: ${right.columns.mkString(", ")}") - require(left.columns.length == right.columns.length, + val ignoreColumnsCs = ignoreColumns.map(handleConfiguredCaseSensitivity).toSet + def isIgnoredColumn(column: String): Boolean = !ignoreColumnsCs.contains(handleConfiguredCaseSensitivity(column)) + val leftNonIgnored = left.columns.filter(isIgnoredColumn) + val rightNonIgnored = right.columns.filter(isIgnoredColumn) + + def notInWithCaseSensitivity(columns: Seq[String])(column: String): Boolean = + !columns.map(handleConfiguredCaseSensitivity).contains(handleConfiguredCaseSensitivity(column)) + + val exceptIgnoredColumnsMsg = if (ignoreColumns.nonEmpty) " except ignored columns" else "" + + require(leftNonIgnored.length == rightNonIgnored.length, "The number of columns doesn't match.\n" + - s"Left column names (${left.columns.length}): ${left.columns.mkString(", ")}\n" + - s"Right column names (${right.columns.length}): ${right.columns.mkString(", ")}") + s"Left column names$exceptIgnoredColumnsMsg (${leftNonIgnored.length}): ${leftNonIgnored.mkString(", ")}\n" + + s"Right column names$exceptIgnoredColumnsMsg (${rightNonIgnored.length}): ${rightNonIgnored.mkString(", ")}") - require(left.columns.length > 0, "The schema must not be empty") + require(leftNonIgnored.length > 0, s"The schema$exceptIgnoredColumnsMsg must not be empty") // column types must match but we ignore the nullability of columns - val leftFields = left.schema.fields.map(f => handleConfiguredCaseSensitivity(f.name) -> f.dataType) - val rightFields = right.schema.fields.map(f => handleConfiguredCaseSensitivity(f.name) -> f.dataType) + val leftFields = left.schema.fields.filter(f => isIgnoredColumn(f.name)).map(f => handleConfiguredCaseSensitivity(f.name) -> f.dataType) + val rightFields = right.schema.fields.filter(f => isIgnoredColumn(f.name)).map(f => handleConfiguredCaseSensitivity(f.name) -> f.dataType) val leftExtraSchema = leftFields.diff(rightFields) val rightExtraSchema = rightFields.diff(leftFields) require(leftExtraSchema.isEmpty && rightExtraSchema.isEmpty, @@ -54,72 +63,65 @@ class Differ(options: DiffOptions) { s"Left extra columns: ${leftExtraSchema.map(t => s"${t._1} (${t._2})").mkString(", ")}\n" + s"Right extra columns: ${rightExtraSchema.map(t => s"${t._1} (${t._2})").mkString(", ")}") - val columns = left.columns.map(handleConfiguredCaseSensitivity) - val pkColumns = if (idColumns.isEmpty) columns.toList else idColumns.map(handleConfiguredCaseSensitivity) - val nonPkColumns = columns.diff(pkColumns) - val missingIdColumns = pkColumns.diff(columns) + val columns = leftNonIgnored + val pkColumns = if (idColumns.isEmpty) columns.toList else idColumns + val nonPkColumns = columns.filter(notInWithCaseSensitivity(pkColumns)) + val missingIdColumns = pkColumns.filter(notInWithCaseSensitivity(columns)) require(missingIdColumns.isEmpty, s"Some id columns do not exist: ${missingIdColumns.mkString(", ")} missing among ${columns.mkString(", ")}") - require(!pkColumns.contains(handleConfiguredCaseSensitivity(options.diffColumn)), + val missingIgnoreColumns = ignoreColumns.diffCaseSensitivity(left.columns).diffCaseSensitivity(right.columns) + require(missingIgnoreColumns.isEmpty, + s"Some ignore columns do not exist: ${missingIgnoreColumns.mkString(", ")} " + + s"missing among ${(leftNonIgnored ++ rightNonIgnored).distinct.sorted.mkString(", ")}") + + require(notInWithCaseSensitivity(pkColumns)(options.diffColumn), s"The id columns must not contain the diff column name '${options.diffColumn}': " + s"${pkColumns.mkString(", ")}") if(Set(DiffMode.LeftSide, DiffMode.RightSide).contains(options.diffMode)) - require(!nonPkColumns.contains(options.diffColumn), + require(notInWithCaseSensitivity(nonPkColumns)(options.diffColumn), s"The non-id columns must not contain the diff column name '${options.diffColumn}': ${nonPkColumns.mkString((", "))}") - require(!options.changeColumn.exists(pkColumns.contains), + require(options.changeColumn.forall(notInWithCaseSensitivity(pkColumns)), s"The id columns must not contain the change column name '${options.changeColumn.get}': ${pkColumns.mkString((", "))}") if(Set(DiffMode.LeftSide, DiffMode.RightSide).contains(options.diffMode)) - require(!options.changeColumn.exists(nonPkColumns.contains), + require(!options.changeColumn.exists(notInWithCaseSensitivity(nonPkColumns)), s"The non-id columns must not contain the change column name '${options.changeColumn.get}': ${nonPkColumns.mkString((", "))}") - val nonIdColumns = columns.diff(pkColumns) - val diffValueColumns = getDiffValueColumns(nonIdColumns, options.diffMode) + val diffValueColumns = getDiffColumns(pkColumns, nonPkColumns, left, right, ignoreColumns).map(_._1).diff(pkColumns) - require(!diffValueColumns.contains(handleConfiguredCaseSensitivity(options.diffColumn)), - s"The column prefixes '${options.leftColumnPrefix}' and '${options.rightColumnPrefix}', " + - s"together with these non-id columns " + - s"must not produce the diff column name '${options.diffColumn}': " + - s"${nonIdColumns.mkString(", ")}") + if (Seq(DiffMode.LeftSide, DiffMode.RightSide).contains(options.diffMode)) { + require(notInWithCaseSensitivity(diffValueColumns)(options.diffColumn), + s"The ${if (options.diffMode == DiffMode.LeftSide) "left" else "right"} " + + s"non-id columns must not contain the diff column name '${options.diffColumn}': " + + s"${(if (options.diffMode == DiffMode.LeftSide) left else right).columns.diffCaseSensitivity(idColumns).mkString(", ")}") - options.changeColumn.foreach( changeColumn => - require(!diffValueColumns.contains(handleConfiguredCaseSensitivity(changeColumn)), + options.changeColumn.foreach( changeColumn => + require(notInWithCaseSensitivity(diffValueColumns)(changeColumn), + s"The ${if (options.diffMode == DiffMode.LeftSide) "left" else "right"} " + + s"non-id columns must not contain the change column name '${options.changeColumn.get}': " + + s"${(if (options.diffMode == DiffMode.LeftSide) left else right).columns.diffCaseSensitivity(idColumns).mkString(", ")}") + ) + } else { + require(notInWithCaseSensitivity(diffValueColumns)(options.diffColumn), s"The column prefixes '${options.leftColumnPrefix}' and '${options.rightColumnPrefix}', " + s"together with these non-id columns " + - s"must not produce the change column name '${changeColumn}': " + - s"${nonIdColumns.mkString(", ")}") - ) - - require(diffValueColumns.forall(column => !pkColumns.contains(column)), - s"The column prefixes '${options.leftColumnPrefix}' and '${options.rightColumnPrefix}', " + - s"together with these non-id columns " + - s"must not produce any id column name '${pkColumns.mkString("', '")}': " + - s"${nonIdColumns.mkString(", ")}") - } - - /** - * Produces the left and right value columns (non-id columns). - * @param nonIdColumns value column names - * @return left and right diff value column names - */ - private[diff] def getDiffValueColumns(nonIdColumns: Seq[String], diffMode: DiffMode): Seq[String] = { - def prefixColumns(columns: Seq[String])(prefix: String): Seq[String] = - columns.map(column => s"${prefix}_$column") - - diffMode match { - case DiffMode.ColumnByColumn => - Seq(options.leftColumnPrefix, options.rightColumnPrefix) - .flatMap(prefixColumns(nonIdColumns)) - .map(handleConfiguredCaseSensitivity) - - case DiffMode.SideBySide => - prefixColumns(nonIdColumns)(options.leftColumnPrefix) ++ - prefixColumns(nonIdColumns)(options.rightColumnPrefix) - .map(handleConfiguredCaseSensitivity) + s"must not produce the diff column name '${options.diffColumn}': " + + s"${nonPkColumns.mkString(", ")}") + + options.changeColumn.foreach( changeColumn => + require(notInWithCaseSensitivity(diffValueColumns)(changeColumn), + s"The column prefixes '${options.leftColumnPrefix}' and '${options.rightColumnPrefix}', " + + s"together with these non-id columns " + + s"must not produce the change column name '${changeColumn}': " + + s"${nonPkColumns.mkString(", ")}") + ) - case DiffMode.LeftSide | DiffMode.RightSide => - nonIdColumns + require(diffValueColumns.forall(column => notInWithCaseSensitivity(pkColumns)(column)), + s"The column prefixes '${options.leftColumnPrefix}' and '${options.rightColumnPrefix}', " + + s"together with these non-id columns " + + s"must not produce any id column name '${pkColumns.mkString("', '")}': " + + s"${nonPkColumns.mkString(", ")}") } } @@ -144,53 +146,83 @@ class Differ(options: DiffOptions) { ) } - private[diff] def getDiffColumns[T, U](pkColumns: Seq[String], otherColumns: Seq[String], + private[diff] def getDiffColumns[T, U](pkColumns: Seq[String], valueColumns: Seq[String], left: Dataset[T], right: Dataset[U], - ignoreColumns: Seq[String]): Seq[Column] = { - val idColumns = pkColumns.map(c => coalesce(left(backticks(c)), right(backticks(c))).as(c)) + ignoreColumns: Seq[String]): Seq[(String, Column)] = { + val idColumns = pkColumns.map(c => c -> coalesce(left(backticks(c)), right(backticks(c))).as(c)) + + val leftValueColumns = left.columns.filterIsInCaseSensitivity(valueColumns) + val rightValueColumns = right.columns.filterIsInCaseSensitivity(valueColumns) + + val leftNonPkColumns = left.columns.diffCaseSensitivity(pkColumns) + val rightNonPkColumns = right.columns.diffCaseSensitivity(pkColumns) + + val leftIgnoredColumns = left.columns.filterIsInCaseSensitivity(ignoreColumns) + val rightIgnoredColumns = right.columns.filterIsInCaseSensitivity(ignoreColumns) val (leftValues, rightValues) = if (options.sparseMode) { ( - otherColumns.map(c => (c, if (options.sparseMode) when(not(left(backticks(c)) <=> right(backticks(c))), left(backticks(c))) else left(backticks(c)))).toMap, - otherColumns.map(c => (c, if (options.sparseMode) when(not(left(backticks(c)) <=> right(backticks(c))), right(backticks(c))) else right(backticks(c)))).toMap + leftNonPkColumns.map(c => (handleConfiguredCaseSensitivity(c), c -> (if (options.sparseMode) when(not(left(backticks(c)) <=> right(backticks(c))), left(backticks(c))) else left(backticks(c))))).toMap, + rightNonPkColumns.map(c => (handleConfiguredCaseSensitivity(c), c -> (if (options.sparseMode) when(not(left(backticks(c)) <=> right(backticks(c))), right(backticks(c))) else right(backticks(c))))).toMap ) } else { ( - otherColumns.map(c => (c, left(backticks(c)))).toMap, - otherColumns.map(c => (c, right(backticks(c)))).toMap + leftNonPkColumns.map(c => (handleConfiguredCaseSensitivity(c), c -> left(backticks(c)))).toMap, + rightNonPkColumns.map(c => (handleConfiguredCaseSensitivity(c), c -> right(backticks(c)))).toMap, ) } - val valueColumns = options.diffMode match { + def alias(prefix: Option[String], values: Map[String, (String, Column)])(name: String): (String, Column) = { + values(handleConfiguredCaseSensitivity(name)) match { + case (name, column) => + val alias = prefix.map(p => s"${p}_$name").getOrElse(name) + alias -> column.as(alias) + } + } + + def aliasLeft(name: String): (String, Column) = alias(Some(options.leftColumnPrefix), leftValues)(name) + + def aliasRight(name: String): (String, Column) = alias(Some(options.rightColumnPrefix), rightValues)(name) + + val prefixedLeftIgnoredColumns = leftIgnoredColumns.map(c => aliasLeft(c)) + val prefixedRightIgnoredColumns = rightIgnoredColumns.map(c => aliasRight(c)) + + val nonIdColumns = options.diffMode match { case DiffMode.ColumnByColumn => - otherColumns.flatMap(c => + valueColumns.flatMap(c => Seq( - leftValues(c).as(s"${options.leftColumnPrefix}_$c"), - rightValues(c).as(s"${options.rightColumnPrefix}_$c") + aliasLeft(c), + aliasRight(c) ) + ) ++ ignoreColumns.flatMap(c => + (if (leftIgnoredColumns.containsCaseSensitivity(c)) Seq(aliasLeft(c)) else Seq.empty) ++ + (if (rightIgnoredColumns.containsCaseSensitivity(c)) Seq(aliasRight(c)) else Seq.empty) ) case DiffMode.SideBySide => - otherColumns.map(c => leftValues(c).as(s"${options.leftColumnPrefix}_$c")) ++ - otherColumns.map(c => rightValues(c).as(s"${options.rightColumnPrefix}_$c")) + leftValueColumns.toSeq.map(c => aliasLeft(c)) ++ prefixedLeftIgnoredColumns ++ + rightValueColumns.toSeq.map(c => aliasRight(c)) ++ prefixedRightIgnoredColumns case DiffMode.LeftSide | DiffMode.RightSide => - otherColumns.map(c => - if (options.diffMode == DiffMode.LeftSide) leftValues(c).as(c) else rightValues(c).as(c) - ) + // in left-side / right-side mode, we do not prefix columns + ( + if (options.diffMode == DiffMode.LeftSide) valueColumns.map(alias(None, leftValues)) else valueColumns.map(alias(None, rightValues)) + ) ++ ( + if (options.diffMode == DiffMode.LeftSide) leftIgnoredColumns.map(alias(None, leftValues)) else rightIgnoredColumns.map(alias(None, rightValues)) + ) } - idColumns ++ valueColumns + idColumns ++ nonIdColumns } private def doDiff[T, U](left: Dataset[T], right: Dataset[U], idColumns: Seq[String], ignoreColumns: Seq[String] = Seq.empty): DataFrame = { checkSchema(left, right, idColumns, ignoreColumns) - val pkColumns = if (idColumns.isEmpty) left.columns.toList else idColumns - val pkColumnsCs = pkColumns.map(handleConfiguredCaseSensitivity).toSet - val nonPkColumns = left.columns.filter(col => !pkColumnsCs.contains(handleConfiguredCaseSensitivity(col))) - val ignoreColumnsCs = ignoreColumns.map(handleConfiguredCaseSensitivity).toSet - val valueColumns = nonPkColumns.filter(col => !ignoreColumnsCs(handleConfiguredCaseSensitivity(col))) + + val columns = left.columns.filter(c => !ignoreColumnsCs.contains(handleConfiguredCaseSensitivity(c))).toList + val pkColumns = if (idColumns.isEmpty) columns else idColumns + val pkColumnsCs = pkColumns.map(handleConfiguredCaseSensitivity).toSet + val valueColumns = columns.filter(col => !pkColumnsCs.contains(handleConfiguredCaseSensitivity(col))) val existsColumnName = distinctPrefixFor(left.columns) + "exists" val leftWithExists = left.withColumn(existsColumnName, lit(1)) @@ -206,7 +238,7 @@ class Differ(options: DiffOptions) { otherwise(lit(options.nochangeDiffValue)). as(options.diffColumn) - val diffColumns = getDiffColumns(pkColumns, nonPkColumns, left, right, ignoreColumns) + val diffColumns = getDiffColumns(pkColumns, valueColumns, left, right, ignoreColumns).map(_._2) val changeColumn = getChangeColumn(existsColumnName, valueColumns, leftWithExists, rightWithExists) // turn this column into a sequence of one or none column so we can easily concat it below with diffActionColumn and diffColumns .map(Seq(_)) @@ -217,9 +249,9 @@ class Differ(options: DiffOptions) { } /** - * Returns a new DataFrame that contains the differences between two Datasets - * of the same type `T`. Both Datasets must contain the same set of column names and data types. - * The order of columns in the two Datasets is not important as columns are compared based on the + * Returns a new DataFrame that contains the differences between two Datasets of + * the same type `T`. Both Datasets must contain the same set of column names and data types. + * The order of columns in the two Datasets is not relevant as columns are compared based on the * name, not the the position. * * Optional `id` columns are used to uniquely identify rows to compare. If values in any non-id @@ -277,7 +309,7 @@ class Differ(options: DiffOptions) { /** * Returns a new DataFrame that contains the differences between two Datasets of * similar types `T` and `U`. Both Datasets must contain the same set of column names and data types, - * except for the columns in `ignoreColumns`. The order of columns in the two Datasets is not important as + * except for the columns in `ignoreColumns`. The order of columns in the two Datasets is not relevant as * columns are compared based on the name, not the the position. * * Optional id columns are used to uniquely identify rows to compare. If values in any non-id @@ -441,10 +473,10 @@ class Differ(options: DiffOptions) { */ def diffAs[T, U, V](left: Dataset[T], right: Dataset[U], diffEncoder: Encoder[V], idColumns: Seq[String], ignoreColumns: Seq[String]): Dataset[V] = { - val nonIdColumns = left.columns.diff(if (idColumns.isEmpty) left.columns.toList else idColumns) + val nonIdColumns = if (idColumns.isEmpty) Seq.empty else left.columns.diffCaseSensitivity(idColumns).diffCaseSensitivity(ignoreColumns).toSeq val encColumns = diffEncoder.schema.fields.map(_.name) - val diffColumns = Seq(options.diffColumn) ++ idColumns ++ getDiffValueColumns(nonIdColumns, options.diffMode) - val extraColumns = encColumns.diff(diffColumns) + val diffColumns = Seq(options.diffColumn) ++ getDiffColumns(idColumns, nonIdColumns, left, right, ignoreColumns).map(_._1) + val extraColumns = encColumns.diffCaseSensitivity(diffColumns) require(extraColumns.isEmpty, s"Diff encoder's columns must be part of the diff result schema, " + @@ -599,7 +631,7 @@ object Diff { /** * Returns a new DataFrame that contains the differences between two Datasets of * similar types `T` and `U`. Both Datasets must contain the same set of column names and data types, - * except for the columns in `ignoreColumns`. The order of columns in the two Datasets is not important as + * except for the columns in `ignoreColumns`. The order of columns in the two Datasets is not relevant as * columns are compared based on the name, not the the position. * * Optional id columns are used to uniquely identify rows to compare. If values in any non-id @@ -657,7 +689,7 @@ object Diff { /** * Returns a new DataFrame that contains the differences between two Datasets of * similar types `T` and `U`. Both Datasets must contain the same set of column names and data types, - * except for the columns in `ignoreColumns`. The order of columns in the two Datasets is not important as + * except for the columns in `ignoreColumns`. The order of columns in the two Datasets is not relevant as * columns are compared based on the name, not the the position. * * Optional id columns are used to uniquely identify rows to compare. If values in any non-id diff --git a/src/main/scala/uk/co/gresearch/spark/diff/package.scala b/src/main/scala/uk/co/gresearch/spark/diff/package.scala index 5fd05032..0db0e1ed 100644 --- a/src/main/scala/uk/co/gresearch/spark/diff/package.scala +++ b/src/main/scala/uk/co/gresearch/spark/diff/package.scala @@ -26,8 +26,8 @@ package object diff { implicit class DatasetDiff[T](ds: Dataset[T]) { /** - * Returns a new DataFrame that contains the differences between this and the other Dataset - * of the same type `T`. Both Datasets must contain the same set of column names and data types. + * Returns a new DataFrame that contains the differences between this and the other Dataset of + * the same type `T`. Both Datasets must contain the same set of column names and data types. * The order of columns in the two Datasets is not important as one column is compared to the * column with the same name of the other Dataset, not the column with the same position. * @@ -87,10 +87,10 @@ package object diff { } /** - * Returns a new DataFrame that contains the differences between this and the other Dataset - * of the same type `T`. Both Datasets must contain the same set of column names and data types. - * The order of columns in the two Datasets is not important as one column is compared to the - * column with the same name of the other Dataset, not the column with the same position. + * Returns a new DataFrame that contains the differences between two Datasets of + * similar types `T` and `U`. Both Datasets must contain the same set of column names and data types, + * except for the columns in `ignoreColumns`. The order of columns in the two Datasets is not relevant as + * columns are compared based on the name, not the the position. * * Optional id columns are used to uniquely identify rows to compare. If values in any non-id * column are differing between this and the other Dataset, then that row is marked as `"C"`hange @@ -144,7 +144,7 @@ package object diff { * The id column names are take literally, i.e. "a.field" is interpreted as "`a.field`, which is a * column name containing a dot. This is not interpreted as a column "a" with a field "field" (struct). */ - def diff(other: Dataset[T], idColumns: Seq[String], ignoreColumns: Seq[String] = Seq.empty): DataFrame = { + def diff[U](other: Dataset[U], idColumns: Seq[String], ignoreColumns: Seq[String] = Seq.empty): DataFrame = { Diff.of(this.ds, other, idColumns, ignoreColumns) } @@ -163,13 +163,13 @@ package object diff { /** * Returns a new DataFrame that contains the differences - * between this and the other Dataset of the same type `T`. + * between this and the other Dataset of similar types `T` and `U`. * - * See `diff(Dataset[T], Seq[String], Seq[String])`. + * See `diff(Dataset[U], Seq[String], Seq[String])`. * * The schema of the returned DataFrame can be configured by the given `DiffOptions`. */ - def diff(other: Dataset[T], options: DiffOptions, idColumns: Seq[String], ignoreColumns: Seq[String]): DataFrame = { + def diff[U](other: Dataset[U], options: DiffOptions, idColumns: Seq[String], ignoreColumns: Seq[String]): DataFrame = { new Differ(options).diff(this.ds, other, idColumns, ignoreColumns) } @@ -189,14 +189,14 @@ package object diff { /** * Returns a new Dataset that contains the differences - * between this and the other Dataset of the same type `T`. + * between this and the other Dataset of similar types `T` and `U`. * - * See `diff(Dataset[T], Seq[String], Seq[String])`. + * See `diff(Dataset[U], Seq[String], Seq[String])`. * - * This requires an additional implicit `Encoder[U]` for the return type `Dataset[U]`. + * This requires an additional implicit `Encoder[V]` for the return type `Dataset[V]`. */ - def diffAs[U](other: Dataset[T], idColumns: Seq[String], ignoreColumns: Seq[String] = Seq.empty) - (implicit diffEncoder: Encoder[U]): Dataset[U] = { + def diffAs[U, V](other: Dataset[U], idColumns: Seq[String], ignoreColumns: Seq[String] = Seq.empty) + (implicit diffEncoder: Encoder[V]): Dataset[V] = { Diff.ofAs(this.ds, other, idColumns, ignoreColumns) } @@ -206,26 +206,26 @@ package object diff { * * See `diff(Dataset[T], String*)`. * - * This requires an additional implicit `Encoder[U]` for the return type `Dataset[U]`. + * This requires an additional implicit `Encoder[V]` for the return type `Dataset[V]`. * The schema of the returned Dataset can be configured by the given `DiffOptions`. */ // no @scala.annotation.varargs here as this implicit class is not nicely accessible from Java - def diffAs[U](other: Dataset[T], options: DiffOptions, idColumns: String*) - (implicit diffEncoder: Encoder[U]): Dataset[U] = { + def diffAs[V](other: Dataset[T], options: DiffOptions, idColumns: String*) + (implicit diffEncoder: Encoder[V]): Dataset[V] = { new Differ(options).diffAs(this.ds, other, idColumns: _*) } /** * Returns a new Dataset that contains the differences - * between this and the other Dataset of the same type `T`. + * between this and the other Dataset of similar types `T` and `U`. * - * See `diff(Dataset[T], Seq[String], Seq[String])`. + * See `diff(Dataset[U], Seq[String], Seq[String])`. * - * This requires an additional implicit `Encoder[U]` for the return type `Dataset[U]`. + * This requires an additional implicit `Encoder[V]` for the return type `Dataset[V]`. * The schema of the returned Dataset can be configured by the given `DiffOptions`. */ - def diffAs[U](other: Dataset[T], options: DiffOptions, idColumns: Seq[String], ignoreColumns: Seq[String]) - (implicit diffEncoder: Encoder[U]): Dataset[U] = { + def diffAs[U, V](other: Dataset[T], options: DiffOptions, idColumns: Seq[String], ignoreColumns: Seq[String]) + (implicit diffEncoder: Encoder[V]): Dataset[V] = { new Differ(options).diffAs(this.ds, other, idColumns, ignoreColumns) } @@ -235,22 +235,22 @@ package object diff { * * See `diff(Dataset[T], String*)`. * - * This requires an additional explicit `Encoder[U]` for the return type `Dataset[U]`. + * This requires an additional explicit `Encoder[V]` for the return type `Dataset[V]`. */ // no @scala.annotation.varargs here as this implicit class is not nicely accessible from Java - def diffAs[U](other: Dataset[T], diffEncoder: Encoder[U], idColumns: String*): Dataset[U] = { + def diffAs[V](other: Dataset[T], diffEncoder: Encoder[V], idColumns: String*): Dataset[V] = { Diff.ofAs(this.ds, other, diffEncoder, idColumns: _*) } /** * Returns a new Dataset that contains the differences - * between this and the other Dataset of the same type `T`. + * between this and the other Dataset of similar types `T` and `U`. * - * See `diff(Dataset[T], Seq[String], Seq[String])`. + * See `diff(Dataset[U], Seq[String], Seq[String])`. * - * This requires an additional explicit `Encoder[U]` for the return type `Dataset[U]`. + * This requires an additional explicit `Encoder[V]` for the return type `Dataset[V]`. */ - def diffAs[U](other: Dataset[T], diffEncoder: Encoder[U], idColumns: Seq[String], ignoreColumns: Seq[String]): Dataset[U] = { + def diffAs[U, V](other: Dataset[U], diffEncoder: Encoder[V], idColumns: Seq[String], ignoreColumns: Seq[String]): Dataset[V] = { Diff.ofAs(this.ds, other, diffEncoder, idColumns, ignoreColumns) } @@ -260,31 +260,31 @@ package object diff { * * See `diff(Dataset[T], String*)`. * - * This requires an additional explicit `Encoder[U]` for the return type `Dataset[U]`. + * This requires an additional explicit `Encoder[V]` for the return type `Dataset[V]`. * The schema of the returned Dataset can be configured by the given `DiffOptions`. */ // no @scala.annotation.varargs here as this implicit class is not nicely accessible from Java - def diffAs[U](other: Dataset[T], + def diffAs[V](other: Dataset[T], options: DiffOptions, - diffEncoder: Encoder[U], - idColumns: String*): Dataset[U] = { + diffEncoder: Encoder[V], + idColumns: String*): Dataset[V] = { new Differ(options).diffAs(this.ds, other, diffEncoder, idColumns: _*) } /** * Returns a new Dataset that contains the differences - * between this and the other Dataset of the same type `T`. + * between this and the other Dataset of similar types `T` and `U`. * - * See `diff(Dataset[T], Seq[String], Seq[String])`. + * See `diff(Dataset[U], Seq[String], Seq[String])`. * - * This requires an additional explicit `Encoder[U]` for the return type `Dataset[U]`. + * This requires an additional explicit `Encoder[V]` for the return type `Dataset[V]`. * The schema of the returned Dataset can be configured by the given `DiffOptions`. */ - def diffAs[U](other: Dataset[T], - options: DiffOptions, - diffEncoder: Encoder[U], - idColumns: Seq[String], - ignoreColumns: Seq[String]): Dataset[U] = { + def diffAs[U, V](other: Dataset[U], + options: DiffOptions, + diffEncoder: Encoder[V], + idColumns: Seq[String], + ignoreColumns: Seq[String]): Dataset[V] = { new Differ(options).diffAs(this.ds, other, diffEncoder, idColumns, ignoreColumns) } @@ -301,14 +301,14 @@ package object diff { /** * Returns a new Dataset that contains the differences - * between this and the other Dataset of the same type `T` - * as tuples of type `(String, T, T)`. + * between this and the other Dataset of similar types `T` and `U` + * as tuples of type `(String, T, U)`. * - * See `diff(Dataset[T], Seq[String], Seq[String])`. + * See `diff(Dataset[U], Seq[String], Seq[String])`. */ - def diffWith(other: Dataset[T], - idColumns: Seq[String], - ignoreColumns: Seq[String]): Dataset[(String, T, T)] = + def diffWith[U](other: Dataset[U], + idColumns: Seq[String], + ignoreColumns: Seq[String]): Dataset[(String, T, U)] = Diff.default.diffWith(this.ds, other, idColumns, ignoreColumns) /** @@ -328,17 +328,17 @@ package object diff { /** * Returns a new Dataset that contains the differences - * between this and the other Dataset of the same type `T` + * between this and the other Dataset of similar types `T` and `U`. * as tuples of type `(String, T, T)`. * - * See `diff(Dataset[T], Seq[String], Seq[String])`. + * See `diff(Dataset[U], Seq[String], Seq[String])`. * * The schema of the returned Dataset can be configured by the given `DiffOptions`. */ - def diffWith(other: Dataset[T], - options: DiffOptions, - idColumns: Seq[String], - ignoreColumns: Seq[String]): Dataset[(String, T, T)] = { + def diffWith[U](other: Dataset[U], + options: DiffOptions, + idColumns: Seq[String], + ignoreColumns: Seq[String]): Dataset[(String, T, U)] = { new Differ(options).diffWith(this.ds, other, idColumns, ignoreColumns) } } @@ -353,4 +353,27 @@ package object diff { private[diff] def handleConfiguredCaseSensitivity(columnName: String): String = if (SQLConf.get.caseSensitiveAnalysis) columnName else columnName.toLowerCase(Locale.ROOT) + + implicit class CaseInsensitiveSeq(seq: Seq[String]) { + def containsCaseSensitivity(string: String): Boolean = + seq.map(handleConfiguredCaseSensitivity).contains(handleConfiguredCaseSensitivity(string)) + + def filterIsInCaseSensitivity(other: Iterable[String]): Seq[String] = { + val otherSet = other.map(handleConfiguredCaseSensitivity).toSet + seq.filter(v => otherSet.contains(handleConfiguredCaseSensitivity(v))) + } + + def diffCaseSensitivity(other: Iterable[String]): Seq[String] = { + val otherSet = other.map(handleConfiguredCaseSensitivity).toSet + seq.filter(v => !otherSet.contains(handleConfiguredCaseSensitivity(v))) + } + } + + implicit class CaseInsensitiveArray(array: Array[String]) { + def containsCaseSensitivity(string: String): Boolean = + array.map(handleConfiguredCaseSensitivity).contains(handleConfiguredCaseSensitivity(string)) + def filterIsInCaseSensitivity(other: Iterable[String]): Array[String] = array.toSeq.filterIsInCaseSensitivity(other).toArray + def diffCaseSensitivity(other: Iterable[String]): Array[String] = array.toSeq.diffCaseSensitivity(other).toArray + } + } diff --git a/src/test/scala/uk/co/gresearch/spark/diff/DiffSuite.scala b/src/test/scala/uk/co/gresearch/spark/diff/DiffSuite.scala index d0255a24..188d8bd9 100644 --- a/src/test/scala/uk/co/gresearch/spark/diff/DiffSuite.scala +++ b/src/test/scala/uk/co/gresearch/spark/diff/DiffSuite.scala @@ -18,6 +18,7 @@ package uk.co.gresearch.spark.diff import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ +import org.apache.spark.sql.functions.regexp_replace import org.apache.spark.sql.{Dataset, Encoders, Row} import org.scalatest.funsuite.AnyFunSuite import uk.co.gresearch.spark.{SparkTestSession, distinctPrefixFor} @@ -31,6 +32,8 @@ case class Value5(first_id: Int, id: String) case class Value6(id: Int, label: String) case class Value7(id: Int, value: Option[String], label: Option[String]) case class Value8(id: Int, seq: Option[Int], value: Option[String], meta: Option[String]) +case class Value9(id: Int, seq: Option[Int], value: Option[String], info: Option[String]) +case class Value9up(ID: Int, SEQ: Option[Int], VALUE: Option[String], INFO: Option[String]) case class ValueLeft(left_id: Int, value: Option[String]) case class ValueRight(right_id: Int, value: Option[String]) @@ -46,6 +49,33 @@ case class DiffAs8(diff: String, right_value: Option[String], left_meta: Option[String], right_meta: Option[String]) +case class DiffAs8SideBySide(diff: String, + id: Int, + seq: Option[Int], + left_value: Option[String], + left_meta: Option[String], + right_value: Option[String], + right_meta: Option[String]) +case class DiffAs8OneSide(diff: String, + id: Int, + seq: Option[Int], + value: Option[String], + meta: Option[String]) +case class DiffAs8changes(diff: String, + changed: Array[String], + id: Int, + seq: Option[Int], + left_value: Option[String], + right_value: Option[String], + left_meta: Option[String], + right_meta: Option[String]) +case class DiffAs8and9(diff: String, + id: Int, + seq: Option[Int], + left_value: Option[String], + right_value: Option[String], + left_meta: Option[String], + right_info: Option[String]) case class DiffAsCustom(action: String, id: Int, @@ -121,6 +151,8 @@ class DiffSuite extends AnyFunSuite with SparkTestSession { Value8(3, None, None, None) ).toDS() + lazy val right9: Dataset[Value9] = right8.withColumn("info", regexp_replace($"meta", "user", "info")).drop("meta").as[Value9] + lazy val expectedDiffColumns: Seq[String] = Seq("diff", "id", "left_value", "right_value") lazy val expectedDiff: Seq[Row] = Seq( @@ -199,6 +231,17 @@ class DiffSuite extends AnyFunSuite with SparkTestSession { Row("N", 3, null, null, null, null, null) ) + lazy val expectedDiff8and9: Seq[Row] = Seq( + Row("N", 1, 1, "one", "one", "user1", "info2"), + Row("N", 1, 2, "one", "one", null, "info2"), + Row("N", 1, 3, "one", "one", "user1", null), + Row("N", 2, null, "two", "two", "user2", "info2"), + Row("D", 2, 1, "two", null, null, null), + Row("C", 2, 2, "two", "Two", null, "info1"), + Row("I", 2, 3, null, "two", null, "info2"), + Row("N", 3, null, null, null, null, null) + ) + lazy val expectedSideBySideDiff8: Seq[Row] = expectedDiff8.map(r => Row(r.get(0), r.get(1), r.get(2), r.get(3), r.get(5), r.get(4), r.get(6))) lazy val expectedLeftSideDiff8: Seq[Row] = expectedDiff8.map(r => Row(r.get(0), r.get(1), r.get(2), r.get(3), r.get(5))) lazy val expectedRightSideDiff8: Seq[Row] = expectedDiff8.map(r => Row(r.get(0), r.get(1), r.get(2), r.get(4), r.get(6))) @@ -240,6 +283,24 @@ class DiffSuite extends AnyFunSuite with SparkTestSession { ) ) + lazy val expectedDiffAs8and9: Seq[DiffAs8and9] = expectedDiff8and9.map(r => + DiffAs8and9(r.getString(0), + r.getInt(1), Some(r).filterNot(_.isNullAt(2)).map(_.getInt(2)), + Option(r.getString(3)), Option(r.getString(4)), + Option(r.getString(5)), Option(r.getString(6)) + ) + ) + + lazy val expectedDiffWith8and9: Seq[(String, Value8, Value9)] = expectedDiffAs8and9.map(v => ( + v.diff, + if (v.diff == "I") null else Value8(v.id, v.seq, v.left_value, v.left_meta), + if (v.diff == "D") null else Value9(v.id, v.seq, v.right_value, v.right_info) + )) + + lazy val expectedDiffWith8and9up: Seq[(String, Value8, Value9up)] = expectedDiffWith8and9.map(t => + t.copy(_3 = Option(t._3).map(v => Value9up(v.id, v.seq, v.value, v.info)).orNull) + ) + test("distinct prefix for") { assert(distinctPrefixFor(Seq.empty[String]) === "_") assert(distinctPrefixFor(Seq("a")) === "_") @@ -273,6 +334,17 @@ class DiffSuite extends AnyFunSuite with SparkTestSession { assert(actual.collect() === expected) } + test("diff with no id columns ids taken from left") { + // we can check from where ids are taken only with case insensitivity + withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") { + val left = this.left.toDF() + val right = this.right.toDF("ID", "VALUE") + + assert(left.diff(right).columns === Seq("diff", "id", "value")) + assert(right.diff(left).columns === Seq("diff", "ID", "VALUE")) + } + } + test("diff with one id column") { val actual = left.diff(right, "id").orderBy("id") val reverse = right.diff(left, "id").orderBy("id") @@ -557,7 +629,7 @@ class DiffSuite extends AnyFunSuite with SparkTestSession { withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") { doTestRequirement(left.withColumnRenamed("diff", "Diff") .diff(right.withColumnRenamed("diff", "Diff"), "Diff", "id"), - "The id columns must not contain the diff column name 'diff': diff, id") + "The id columns must not contain the diff column name 'diff': Diff, id") } withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") { @@ -791,6 +863,21 @@ class DiffSuite extends AnyFunSuite with SparkTestSession { "Right extra columns: value (IntegerType)") } + test("diff with ignored columns of different types") { + // different value types only compile with DataFrames + val left = Seq((1, "str")).toDF("id", "value") + val right = Seq((1, 2)).toDF("id", "value") + + val actual = left.diff(right, Seq.empty, Seq("value")) + assert(ignoreNullable(actual.schema) === StructType(Seq( + StructField("diff", StringType), + StructField("id", IntegerType), + StructField("left_value", StringType), + StructField("right_value", IntegerType), + ))) + assert(actual.collect() === Seq(Row("N", 1, "str", 2))) + } + test("diff with different nullability") { val leftSchema = StructType(left.schema.fields.map(_.copy(nullable = true))) val rightSchema = StructType(right.schema.fields.map(_.copy(nullable = false))) @@ -828,9 +915,9 @@ class DiffSuite extends AnyFunSuite with SparkTestSession { val actual = left.diff(right, "id").orderBy("id") val reverse = right.diff(left, "id").orderBy("id") - assert(actual.columns === expectedDiffColumns) + assert(actual.columns === Seq("diff", "id", "left_value", "right_VaLuE")) assert(actual.collect() === expectedDiff) - assert(reverse.columns === Seq("diff", "id", "left_VaLuE", "right_VaLuE")) + assert(reverse.columns === Seq("diff", "id", "left_VaLuE", "right_value")) assert(reverse.collect() === expectedReverseDiff) } } @@ -1033,19 +1120,19 @@ class DiffSuite extends AnyFunSuite with SparkTestSession { test("diff with right-side diff mode and diff column name in value columns") { val options = DiffOptions.default.withDiffColumn("value").withDiffMode(DiffMode.RightSide) doTestRequirement(right.diff(right, options, "id"), - "The non-id columns must not contain the diff column name 'value': value") + "The non-id columns must not contain the diff column name 'value': value") } test("diff with left-side diff mode and change column name in value columns") { val options = DiffOptions.default.withChangeColumn("value").withDiffMode(DiffMode.LeftSide) doTestRequirement(left.diff(right, options, "id"), - "The non-id columns must not contain the change column name 'value': value") + "The left non-id columns must not contain the change column name 'value': value") } test("diff with right-side diff mode and change column name in value columns") { val options = DiffOptions.default.withChangeColumn("value").withDiffMode(DiffMode.RightSide) doTestRequirement(right.diff(right, options, "id"), - "The non-id columns must not contain the change column name 'value': value") + "The right non-id columns must not contain the change column name 'value': value") } test("diff with dots in diff column") { @@ -1160,93 +1247,155 @@ class DiffSuite extends AnyFunSuite with SparkTestSession { assert(actual.collect() === expectedRightSideSparseDiff7) } - def assertIgnoredColumns[T](actual: Dataset[T], expected: Seq[T], - idColumns: Seq[String] = Seq("id", "seq"), - changedColumn: Option[String] = None, - diffColumns: Seq[String] = Seq("left_value", "right_value", "left_meta", "right_meta")): Unit = { - val expectedColumns = Seq("diff") ++ changedColumn.map(Seq(_)).getOrElse(Seq.empty) ++ idColumns ++ diffColumns - assert(actual.columns === expectedColumns) + def ignoreNullable(schema: StructType): StructType = { + schema.copy(fields = + schema.fields + .map(_.copy(nullable = true)) + .map(field => field.dataType match { + case a: ArrayType => field.copy(dataType = a.copy(containsNull = false)) + case _ => field + }) + ) + } + + def assertIgnoredColumns[T](actual: Dataset[T], expected: Seq[T], expectedSchema: StructType): Unit = { + // ignore nullable + assert(ignoreNullable(actual.schema) === ignoreNullable(expectedSchema)) assert(actual.orderBy("id", "seq").collect() === expected) } test("diff with ignored columns") { - assertIgnoredColumns(left8.diff(right8, Seq("id", "seq"), Seq("meta")), expectedDiff8) - assertIgnoredColumns(Diff.of(left8, right8, Seq("id", "seq"), Seq("meta")), expectedDiff8) + assertIgnoredColumns(left8.diff(right8, Seq("id", "seq"), Seq("meta")), expectedDiff8, Encoders.product[DiffAs8].schema) + assertIgnoredColumns(Diff.of(left8, right8, Seq("id", "seq"), Seq("meta")), expectedDiff8, Encoders.product[DiffAs8].schema) + assertIgnoredColumns(Diff.default.diff(left8, right8, Seq("id", "seq"), Seq("meta")), expectedDiff8, Encoders.product[DiffAs8].schema) + + assertIgnoredColumns[DiffAs8](left8.diffAs(right8, Seq("id", "seq"), Seq("meta")), expectedDiffAs8, Encoders.product[DiffAs8].schema) + assertIgnoredColumns[DiffAs8](Diff.ofAs(left8, right8, Seq("id", "seq"), Seq("meta")), expectedDiffAs8, Encoders.product[DiffAs8].schema) + assertIgnoredColumns[DiffAs8](Diff.default.diffAs(left8, right8, Seq("id", "seq"), Seq("meta")), expectedDiffAs8, Encoders.product[DiffAs8].schema) + + val expected = expectedDiff8.map(row => ( + row.getString(0), + Value8(row.getInt(1), Option(row.get(2)).map(_.asInstanceOf[Int]), Option(row.getString(3)), Option(row.getString(5))), + Value8(row.getInt(1), Option(row.get(2)).map(_.asInstanceOf[Int]), Option(row.getString(4)), Option(row.getString(6))) + )).map { case (diff, left, right) => ( + diff, + if (diff == "I") null else left, + if (diff == "D") null else right + ) + } - assertIgnoredColumns[DiffAs8](left8.diffAs(right8, Seq("id", "seq"), Seq("meta")), expectedDiffAs8) - assertIgnoredColumns[DiffAs8](Diff.ofAs(left8, right8, Seq("id", "seq"), Seq("meta")), expectedDiffAs8) + assertDiffWith(left8.diffWith(right8, Seq("id", "seq"), Seq("meta")).collect(), expected) + assertDiffWith(Diff.ofWith(left8, right8, Seq("id", "seq"), Seq("meta")).collect(), expected) + assertDiffWith(Diff.default.diffWith(left8, right8, Seq("id", "seq"), Seq("meta")).collect(), expected) } test("diff with ignored and change columns") { val options = DiffOptions.default.withChangeColumn("changed") val differ = new Differ(options) - assertIgnoredColumns(left8.diff(right8, options, Seq("id", "seq"), Seq("meta")), expectedDiff8WithChanges, changedColumn = options.changeColumn) - assertIgnoredColumns(differ.diff(left8, right8, Seq("id", "seq"), Seq("meta")), expectedDiff8WithChanges, changedColumn = options.changeColumn) + assertIgnoredColumns(left8.diff(right8, options, Seq("id", "seq"), Seq("meta")), expectedDiff8WithChanges, Encoders.product[DiffAs8changes].schema) + assertIgnoredColumns(differ.diff(left8, right8, Seq("id", "seq"), Seq("meta")), expectedDiff8WithChanges, Encoders.product[DiffAs8changes].schema) } test("diff with ignored columns and column-by-column diff mode") { val options = DiffOptions.default.withDiffMode(DiffMode.ColumnByColumn) val differ = new Differ(options) - assertIgnoredColumns(left8.diff(right8, options, Seq("id", "seq"), Seq("meta")), expectedDiff8) - assertIgnoredColumns(differ.diff(left8, right8, Seq("id", "seq"), Seq("meta")), expectedDiff8) + assertIgnoredColumns(left8.diff(right8, options, Seq("id", "seq"), Seq("meta")), expectedDiff8, Encoders.product[DiffAs8].schema) + assertIgnoredColumns(differ.diff(left8, right8, Seq("id", "seq"), Seq("meta")), expectedDiff8, Encoders.product[DiffAs8].schema) } test("diff with ignored columns and side-by-side diff mode") { val options = DiffOptions.default.withDiffMode(DiffMode.SideBySide) val differ = new Differ(options) - assertIgnoredColumns(left8.diff(right8, options, Seq("id", "seq"), Seq("meta")), expectedSideBySideDiff8, diffColumns = Seq("left_value", "left_meta", "right_value", "right_meta")) - assertIgnoredColumns(differ.diff(left8, right8, Seq("id", "seq"), Seq("meta")), expectedSideBySideDiff8, diffColumns = Seq("left_value", "left_meta", "right_value", "right_meta")) + assertIgnoredColumns(left8.diff(right8, options, Seq("id", "seq"), Seq("meta")), expectedSideBySideDiff8, Encoders.product[DiffAs8SideBySide].schema) + assertIgnoredColumns(differ.diff(left8, right8, Seq("id", "seq"), Seq("meta")), expectedSideBySideDiff8, Encoders.product[DiffAs8SideBySide].schema) } test("diff with ignored columns and left-side diff mode") { val options = DiffOptions.default.withDiffMode(DiffMode.LeftSide) val differ = new Differ(options) - assertIgnoredColumns(left8.diff(right8, options, Seq("id", "seq"), Seq("meta")), expectedLeftSideDiff8, diffColumns = Seq("value", "meta")) - assertIgnoredColumns(differ.diff(left8, right8, Seq("id", "seq"), Seq("meta")), expectedLeftSideDiff8, diffColumns = Seq("value", "meta")) + assertIgnoredColumns(left8.diff(right8, options, Seq("id", "seq"), Seq("meta")), expectedLeftSideDiff8, Encoders.product[DiffAs8OneSide].schema) + assertIgnoredColumns(differ.diff(left8, right8, Seq("id", "seq"), Seq("meta")), expectedLeftSideDiff8, Encoders.product[DiffAs8OneSide].schema) } test("diff with ignored columns and right-side diff mode") { val options = DiffOptions.default.withDiffMode(DiffMode.RightSide) val differ = new Differ(options) - assertIgnoredColumns(left8.diff(right8, options, Seq("id", "seq"), Seq("meta")), expectedRightSideDiff8, diffColumns = Seq("value", "meta")) - assertIgnoredColumns(differ.diff(left8, right8, Seq("id", "seq"), Seq("meta")), expectedRightSideDiff8, diffColumns = Seq("value", "meta")) + assertIgnoredColumns(left8.diff(right8, options, Seq("id", "seq"), Seq("meta")), expectedRightSideDiff8, Encoders.product[DiffAs8OneSide].schema) + assertIgnoredColumns(differ.diff(left8, right8, Seq("id", "seq"), Seq("meta")), expectedRightSideDiff8, Encoders.product[DiffAs8OneSide].schema) } test("diff with ignored columns, column-by-column diff and sparse mode") { val options = DiffOptions.default.withDiffMode(DiffMode.ColumnByColumn).withSparseMode(true) val differ = new Differ(options) - assertIgnoredColumns(left8.diff(right8, options, Seq("id", "seq"), Seq("meta")), expectedSparseDiff8) - assertIgnoredColumns(differ.diff(left8, right8, Seq("id", "seq"), Seq("meta")), expectedSparseDiff8) + assertIgnoredColumns(left8.diff(right8, options, Seq("id", "seq"), Seq("meta")), expectedSparseDiff8, Encoders.product[DiffAs8].schema) + assertIgnoredColumns(differ.diff(left8, right8, Seq("id", "seq"), Seq("meta")), expectedSparseDiff8, Encoders.product[DiffAs8].schema) } test("diff with ignored columns, side-by-side diff and sparse mode") { val options = DiffOptions.default.withDiffMode(DiffMode.SideBySide).withSparseMode(true) val differ = new Differ(options) - assertIgnoredColumns(left8.diff(right8, options, Seq("id", "seq"), Seq("meta")), expectedSideBySideSparseDiff8, diffColumns = Seq("left_value", "left_meta", "right_value", "right_meta")) - assertIgnoredColumns(differ.diff(left8, right8, Seq("id", "seq"), Seq("meta")), expectedSideBySideSparseDiff8, diffColumns = Seq("left_value", "left_meta", "right_value", "right_meta")) + assertIgnoredColumns(left8.diff(right8, options, Seq("id", "seq"), Seq("meta")), expectedSideBySideSparseDiff8, Encoders.product[DiffAs8SideBySide].schema) + assertIgnoredColumns(differ.diff(left8, right8, Seq("id", "seq"), Seq("meta")), expectedSideBySideSparseDiff8, Encoders.product[DiffAs8SideBySide].schema) } test("diff with ignored columns, left-side diff and sparse mode") { val options = DiffOptions.default.withDiffMode(DiffMode.LeftSide).withSparseMode(true) val differ = new Differ(options) - assertIgnoredColumns(left8.diff(right8, options, Seq("id", "seq"), Seq("meta")), expectedLeftSideSparseDiff8, diffColumns = Seq("value", "meta")) - assertIgnoredColumns(differ.diff(left8, right8, Seq("id", "seq"), Seq("meta")), expectedLeftSideSparseDiff8, diffColumns = Seq("value", "meta")) + assertIgnoredColumns(left8.diff(right8, options, Seq("id", "seq"), Seq("meta")), expectedLeftSideSparseDiff8, Encoders.product[DiffAs8OneSide].schema) + assertIgnoredColumns(differ.diff(left8, right8, Seq("id", "seq"), Seq("meta")), expectedLeftSideSparseDiff8, Encoders.product[DiffAs8OneSide].schema) } test("diff with ignored columns, right-side diff and sparse mode") { val options = DiffOptions.default.withDiffMode(DiffMode.RightSide).withSparseMode(true) val differ = new Differ(options) - assertIgnoredColumns(left8.diff(right8, options, Seq("id", "seq"), Seq("meta")), expectedRightSideSparseDiff8, diffColumns = Seq("value", "meta")) - assertIgnoredColumns(differ.diff(left8, right8, Seq("id", "seq"), Seq("meta")), expectedRightSideSparseDiff8, diffColumns = Seq("value", "meta")) + assertIgnoredColumns(left8.diff(right8, options, Seq("id", "seq"), Seq("meta")), expectedRightSideSparseDiff8, Encoders.product[DiffAs8OneSide].schema) + assertIgnoredColumns(differ.diff(left8, right8, Seq("id", "seq"), Seq("meta")), expectedRightSideSparseDiff8, Encoders.product[DiffAs8OneSide].schema) + } + + test("diff similar with ignored columns") { + val diffColumns = Seq("left_value", "right_value", "left_meta", "right_info") +/** + assertIgnoredColumns(left8.diff(right9, Seq("id", "seq"), Seq("meta", "info")), expectedDiff8and9, diffColumns = diffColumns) + assertIgnoredColumns(Diff.of(left8, right9, Seq("id", "seq"), Seq("meta", "info")), expectedDiff8and9, diffColumns = diffColumns) + assertIgnoredColumns(Diff.default.diff(left8, right9, Seq("id", "seq"), Seq("meta", "info")), expectedDiff8and9, diffColumns = diffColumns) + + assertIgnoredColumns[DiffAs8and9](left8.diffAs(right9, Seq("id", "seq"), Seq("meta", "info")), expectedDiffAs8and9, diffColumns = diffColumns) + assertIgnoredColumns[DiffAs8and9](Diff.ofAs(left8, right9, Seq("id", "seq"), Seq("meta", "info")), expectedDiffAs8and9, diffColumns = diffColumns) + assertIgnoredColumns[DiffAs8and9](Diff.default.diffAs(left8, right9, Seq("id", "seq"), Seq("meta", "info")), expectedDiffAs8and9, diffColumns = diffColumns) +**/ + + val expectedSchema = StructType(Seq( + StructField("_1", StringType), + StructField("_2", StructType(Seq( + StructField("id", IntegerType, nullable = true), + StructField("seq", IntegerType, nullable = true), + StructField("value", StringType, nullable = true), + StructField("meta", StringType, nullable = true) + ))), + StructField("_3", StructType(Seq( + StructField("id", IntegerType, nullable = true), + StructField("seq", IntegerType, nullable = true), + StructField("value", StringType, nullable = true), + StructField("info", StringType, nullable = true) + ))), + )) + + assertDiffWithSchema(left8.diffWith(right9, Seq("id", "seq"), Seq("meta", "info")), expectedDiffWith8and9, expectedSchema) + assertDiffWithSchema(Diff.ofWith(left8, right9, Seq("id", "seq"), Seq("meta", "info")), expectedDiffWith8and9, expectedSchema) + assertDiffWithSchema(Diff.default.diffWith(left8, right9, Seq("id", "seq"), Seq("meta", "info")), expectedDiffWith8and9, expectedSchema) + } + + test("diff similar with ignored columns of different type") { + } test("diff with ignored columns case-insensitive") { @@ -1254,11 +1403,26 @@ class DiffSuite extends AnyFunSuite with SparkTestSession { val left = left8.toDF("id", "seq", "value", "meta") val right = right8.toDF("ID", "SEQ", "VALUE", "META") - assertIgnoredColumns(left.diff(right, Seq("iD", "sEq"), Seq("MeTa")), expectedDiff8, idColumns = Seq("iD", "sEq")) - assertIgnoredColumns(Diff.of(left, right, Seq("Id", "SeQ"), Seq("mEtA")), expectedDiff8, idColumns = Seq("Id", "SeQ")) - - assertIgnoredColumns[DiffAs8](left.diffAs(right, Seq("id", "seq"), Seq("MeTa")), expectedDiffAs8) - assertIgnoredColumns[DiffAs8](Diff.ofAs(left, right, Seq("id", "seq"), Seq("mEtA")), expectedDiffAs8) + def expectedSchema(id: String, seq: String): StructType = + StructType(Seq( + StructField("diff", StringType), + StructField(id, IntegerType), + StructField(seq, IntegerType), + StructField("left_value", StringType), + StructField("right_VALUE", StringType), + StructField("left_meta", StringType), + StructField("right_META", StringType), + )) + + assertIgnoredColumns(left.diff(right, Seq("iD", "sEq"), Seq("MeTa")), expectedDiff8, expectedSchema("iD", "sEq")) + assertIgnoredColumns(Diff.of(left, right, Seq("Id", "SeQ"), Seq("mEtA")), expectedDiff8, expectedSchema("Id", "SeQ")) + assertIgnoredColumns(Diff.default.diff(left, right, Seq("ID", "SEQ"), Seq("META")), expectedDiff8, expectedSchema("ID", "SEQ")) + + assertIgnoredColumns[DiffAs8](left.diffAs(right, Seq("id", "seq"), Seq("MeTa")), expectedDiffAs8, expectedSchema("id", "seq")) + assertIgnoredColumns[DiffAs8](Diff.ofAs(left, right, Seq("id", "seq"), Seq("mEtA")), expectedDiffAs8, expectedSchema("id", "seq")) + assertIgnoredColumns[DiffAs8](Diff.default.diffAs(left, right, Seq("id", "seq"), Seq("meta")), expectedDiffAs8, expectedSchema("id", "seq")) + + // TODO: add diffWith } } @@ -1269,17 +1433,114 @@ class DiffSuite extends AnyFunSuite with SparkTestSession { doTestRequirement(left.diff(right, Seq("Id", "SeQ"), Seq("MeTa")), "The datasets do not have the same schema.\nLeft extra columns: id (IntegerType), seq (IntegerType), value (StringType), meta (StringType)\nRight extra columns: ID (IntegerType), SEQ (IntegerType), VALUE (StringType), META (StringType)") + doTestRequirement(Diff.of(left, right, Seq("Id", "SeQ"), Seq("MeTa")), + "The datasets do not have the same schema.\nLeft extra columns: id (IntegerType), seq (IntegerType), value (StringType), meta (StringType)\nRight extra columns: ID (IntegerType), SEQ (IntegerType), VALUE (StringType), META (StringType)") + doTestRequirement(Diff.default.diff(left, right, Seq("Id", "SeQ"), Seq("MeTa")), + "The datasets do not have the same schema.\nLeft extra columns: id (IntegerType), seq (IntegerType), value (StringType), meta (StringType)\nRight extra columns: ID (IntegerType), SEQ (IntegerType), VALUE (StringType), META (StringType)") doTestRequirement(left8.diff(right8, Seq("Id", "SeQ"), Seq("MeTa")), "Some id columns do not exist: Id, SeQ missing among id, seq, value, meta") + doTestRequirement(Diff.of(left8, right8, Seq("Id", "SeQ"), Seq("MeTa")), + "Some id columns do not exist: Id, SeQ missing among id, seq, value, meta") + doTestRequirement(Diff.default.diff(left8, right8, Seq("Id", "SeQ"), Seq("MeTa")), + "Some id columns do not exist: Id, SeQ missing among id, seq, value, meta") + + doTestRequirement(left8.diff(right8, Seq("id", "seq"), Seq("MeTa")), + "Some ignore columns do not exist: MeTa missing among id, meta, seq, value") + doTestRequirement(Diff.of(left8, right8, Seq("id", "seq"), Seq("MeTa")), + "Some ignore columns do not exist: MeTa missing among id, meta, seq, value") + doTestRequirement(Diff.default.diff(left8, right8, Seq("id", "seq"), Seq("MeTa")), + "Some ignore columns do not exist: MeTa missing among id, meta, seq, value") + } + } + + test("diff similar with ignored columns case-insensitive") { + withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") { + val left = left8.toDF("id", "seq", "value", "meta").as[Value8] + val right = right9.toDF("ID", "SEQ", "VALUE", "INFO").as[Value9up] + + def expectedSchema(id: String, seq: String): StructType = + StructType(Seq( + StructField("diff", StringType), + StructField(id, IntegerType), + StructField(seq, IntegerType), + StructField("left_value", StringType), + StructField("right_VALUE", StringType), + StructField("left_meta", StringType), + StructField("right_INFO", StringType), + )) + + assertIgnoredColumns(left.diff(right, Seq("iD", "sEq"), Seq("MeTa", "InFo")), expectedDiff8and9, expectedSchema("iD", "sEq")) + assertIgnoredColumns(Diff.of(left, right, Seq("Id", "SeQ"), Seq("mEtA", "iNfO")), expectedDiff8and9, expectedSchema("Id", "SeQ")) + assertIgnoredColumns(Diff.default.diff(left, right, Seq("ID", "SEQ"), Seq("META", "INFO")), expectedDiff8and9, expectedSchema("ID", "SEQ")) + + // TODO: remove generic type + assertIgnoredColumns[DiffAs8and9](left.diffAs(right, Seq("id", "seq"), Seq("MeTa", "InFo")), expectedDiffAs8and9, expectedSchema("id", "seq")) + assertIgnoredColumns[DiffAs8and9](Diff.ofAs(left, right, Seq("id", "seq"), Seq("mEtA", "iNfO")), expectedDiffAs8and9, expectedSchema("id", "seq")) + assertIgnoredColumns[DiffAs8and9](Diff.default.diffAs(left, right, Seq("id", "seq"), Seq("meta", "info")), expectedDiffAs8and9, expectedSchema("id", "seq")) + + def expectedSchemaWith(id: String, seq: String): StructType = + StructType(Seq( + StructField("_1", StringType, nullable = false), + StructField("_2", StructType(Seq( + StructField(id, IntegerType), + StructField(seq, IntegerType), + StructField("value", StringType), + StructField("meta", StringType) + )), nullable = true), + StructField("_3", StructType(Seq( + StructField(id, IntegerType), + StructField(seq, IntegerType), + StructField("VALUE", StringType), + StructField("INFO", StringType) + )), nullable = true), + )) + + assertIgnoredColumns[(String, Value8, Value9up)](left.diffWith(right, Seq("iD", "sEq"), Seq("MeTa", "InFo")), expectedDiffWith8and9up, expectedSchemaWith("iD", "sEq")) + assertIgnoredColumns[(String, Value8, Value9up)](Diff.ofWith(left, right, Seq("Id", "SeQ"), Seq("mEtA", "iNfO")), expectedDiffWith8and9up, expectedSchemaWith("Id", "SeQ")) + assertIgnoredColumns[(String, Value8, Value9up)](Diff.default.diffWith(left, right, Seq("ID", "SEQ"), Seq("META", "INFO")), expectedDiffWith8and9up, expectedSchemaWith("ID", "SEQ")) + } + } + + test("diff similar with ignored columns case-sensitive") { + withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") { + val left = left8.toDF("id", "seq", "value", "meta").as[Value8] + val right = right9.toDF("ID", "SEQ", "VALUE", "INFO").as[Value9up] + + doTestRequirement(left.diff(right, Seq("Id", "SeQ"), Seq("MeTa", "InFo")), + "The datasets do not have the same schema.\nLeft extra columns: id (IntegerType), seq (IntegerType), value (StringType), meta (StringType)\nRight extra columns: ID (IntegerType), SEQ (IntegerType), VALUE (StringType), INFO (StringType)") + doTestRequirement(Diff.of(left, right, Seq("Id", "SeQ"), Seq("MeTa", "InFo")), + "The datasets do not have the same schema.\nLeft extra columns: id (IntegerType), seq (IntegerType), value (StringType), meta (StringType)\nRight extra columns: ID (IntegerType), SEQ (IntegerType), VALUE (StringType), INFO (StringType)") + doTestRequirement(Diff.default.diff(left, right, Seq("Id", "SeQ"), Seq("MeTa", "InFo")), + "The datasets do not have the same schema.\nLeft extra columns: id (IntegerType), seq (IntegerType), value (StringType), meta (StringType)\nRight extra columns: ID (IntegerType), SEQ (IntegerType), VALUE (StringType), INFO (StringType)") + + doTestRequirement(left8.diff(right9, Seq("Id", "SeQ"), Seq("MeTa", "InFo")), + "The datasets do not have the same schema.\nLeft extra columns: meta (StringType)\nRight extra columns: info (StringType)") + doTestRequirement(Diff.of(left8, right9, Seq("Id", "SeQ"), Seq("MeTa", "InFo")), + "The datasets do not have the same schema.\nLeft extra columns: meta (StringType)\nRight extra columns: info (StringType)") + doTestRequirement(Diff.default.diff(left8, right9, Seq("Id", "SeQ"), Seq("MeTa", "InFo")), + "The datasets do not have the same schema.\nLeft extra columns: meta (StringType)\nRight extra columns: info (StringType)") + + doTestRequirement(left8.diff(right9, Seq("Id", "SeQ"), Seq("meta", "info")), + "Some id columns do not exist: Id, SeQ missing among id, seq, value") + doTestRequirement(Diff.of(left8, right9, Seq("Id", "SeQ"), Seq("meta", "info")), + "Some id columns do not exist: Id, SeQ missing among id, seq, value") + doTestRequirement(Diff.default.diff(left8, right9, Seq("Id", "SeQ"), Seq("meta", "info")), + "Some id columns do not exist: Id, SeQ missing among id, seq, value") } } - def assertDiffWith(actual: Seq[Any], expected: Seq[Any]): Unit = { + def assertDiffWith[T](actual: Seq[T], expected: Seq[T]): Unit = { assert(actual.toSet === expected.toSet) assert(actual.length === expected.length) } + def assertDiffWithSchema[T](actual: Dataset[T], expected: Seq[T], expectedSchema: StructType): Unit = { + // ignore nullable + assert(ignoreNullable(actual.schema) === ignoreNullable(expectedSchema)) + assertDiffWith(actual.collect(), expected) + } + test("diffWith") { val expected = Seq( ("N", Value(1, Some("one")), Value(1, Some("one"))), @@ -1293,22 +1554,6 @@ class DiffSuite extends AnyFunSuite with SparkTestSession { assertDiffWith(Diff.default.diffWith(left, right, "id").collect(), expected) } - test("diffWith ignored") { - val expected = expectedDiff8.map(row => ( - row.getString(0), - Value8(row.getInt(1), Option(row.get(2)).map(_.asInstanceOf[Int]), Option(row.getString(3)), Option(row.getString(5))), - Value8(row.getInt(1), Option(row.get(2)).map(_.asInstanceOf[Int]), Option(row.getString(4)), Option(row.getString(6))) - )).map { case (diff, left, right) => ( - diff, - if (diff == "I") null else left, - if (diff == "D") null else right - )} - - assertDiffWith(left8.diffWith(right8, Seq("id", "seq"), Seq("meta")).collect(), expected) - assertDiffWith(Diff.ofWith(left8, right8, Seq("id", "seq"), Seq("meta")).collect(), expected) - assertDiffWith(Diff.default.diffWith(left8, right8, Seq("id", "seq"), Seq("meta")).collect(), expected) - } - test("diffWith left-prefixed id") { val prefixedLeft = left.select($"id".as("left_id"), $"value").as[ValueLeft] val prefixedRight = right.select($"id".as("left_id"), $"value").as[ValueLeft] From abd7f1ded85643029255c43001667fae699e2da3 Mon Sep 17 00:00:00 2001 From: Enrico Minack Date: Mon, 12 Sep 2022 15:42:55 +0200 Subject: [PATCH 3/6] Use diffCaseSensitivity everywhere --- .../uk/co/gresearch/spark/diff/Diff.scala | 50 +++++++------------ .../co/gresearch/spark/diff/DiffSuite.scala | 4 +- 2 files changed, 19 insertions(+), 35 deletions(-) diff --git a/src/main/scala/uk/co/gresearch/spark/diff/Diff.scala b/src/main/scala/uk/co/gresearch/spark/diff/Diff.scala index 53ac4124..df2f0213 100644 --- a/src/main/scala/uk/co/gresearch/spark/diff/Diff.scala +++ b/src/main/scala/uk/co/gresearch/spark/diff/Diff.scala @@ -36,13 +36,8 @@ class Differ(options: DiffOptions) { s"Left column names: ${left.columns.mkString(", ")}\n" + s"Right column names: ${right.columns.mkString(", ")}") - val ignoreColumnsCs = ignoreColumns.map(handleConfiguredCaseSensitivity).toSet - def isIgnoredColumn(column: String): Boolean = !ignoreColumnsCs.contains(handleConfiguredCaseSensitivity(column)) - val leftNonIgnored = left.columns.filter(isIgnoredColumn) - val rightNonIgnored = right.columns.filter(isIgnoredColumn) - - def notInWithCaseSensitivity(columns: Seq[String])(column: String): Boolean = - !columns.map(handleConfiguredCaseSensitivity).contains(handleConfiguredCaseSensitivity(column)) + val leftNonIgnored = left.columns.diffCaseSensitivity(ignoreColumns) + val rightNonIgnored = right.columns.diffCaseSensitivity(ignoreColumns) val exceptIgnoredColumnsMsg = if (ignoreColumns.nonEmpty) " except ignored columns" else "" @@ -54,8 +49,8 @@ class Differ(options: DiffOptions) { require(leftNonIgnored.length > 0, s"The schema$exceptIgnoredColumnsMsg must not be empty") // column types must match but we ignore the nullability of columns - val leftFields = left.schema.fields.filter(f => isIgnoredColumn(f.name)).map(f => handleConfiguredCaseSensitivity(f.name) -> f.dataType) - val rightFields = right.schema.fields.filter(f => isIgnoredColumn(f.name)).map(f => handleConfiguredCaseSensitivity(f.name) -> f.dataType) + val leftFields = left.schema.fields.filter(f => !ignoreColumns.containsCaseSensitivity(f.name)).map(f => handleConfiguredCaseSensitivity(f.name) -> f.dataType) + val rightFields = right.schema.fields.filter(f => !ignoreColumns.containsCaseSensitivity(f.name)).map(f => handleConfiguredCaseSensitivity(f.name) -> f.dataType) val leftExtraSchema = leftFields.diff(rightFields) val rightExtraSchema = rightFields.diff(leftFields) require(leftExtraSchema.isEmpty && rightExtraSchema.isEmpty, @@ -65,8 +60,8 @@ class Differ(options: DiffOptions) { val columns = leftNonIgnored val pkColumns = if (idColumns.isEmpty) columns.toList else idColumns - val nonPkColumns = columns.filter(notInWithCaseSensitivity(pkColumns)) - val missingIdColumns = pkColumns.filter(notInWithCaseSensitivity(columns)) + val nonPkColumns = columns.diffCaseSensitivity(pkColumns) + val missingIdColumns = pkColumns.diffCaseSensitivity(columns) require(missingIdColumns.isEmpty, s"Some id columns do not exist: ${missingIdColumns.mkString(", ")} missing among ${columns.mkString(", ")}") @@ -75,49 +70,41 @@ class Differ(options: DiffOptions) { s"Some ignore columns do not exist: ${missingIgnoreColumns.mkString(", ")} " + s"missing among ${(leftNonIgnored ++ rightNonIgnored).distinct.sorted.mkString(", ")}") - require(notInWithCaseSensitivity(pkColumns)(options.diffColumn), - s"The id columns must not contain the diff column name '${options.diffColumn}': " + - s"${pkColumns.mkString(", ")}") - if(Set(DiffMode.LeftSide, DiffMode.RightSide).contains(options.diffMode)) - require(notInWithCaseSensitivity(nonPkColumns)(options.diffColumn), - s"The non-id columns must not contain the diff column name '${options.diffColumn}': ${nonPkColumns.mkString((", "))}") - - require(options.changeColumn.forall(notInWithCaseSensitivity(pkColumns)), - s"The id columns must not contain the change column name '${options.changeColumn.get}': ${pkColumns.mkString((", "))}") - if(Set(DiffMode.LeftSide, DiffMode.RightSide).contains(options.diffMode)) - require(!options.changeColumn.exists(notInWithCaseSensitivity(nonPkColumns)), - s"The non-id columns must not contain the change column name '${options.changeColumn.get}': ${nonPkColumns.mkString((", "))}") + require(!pkColumns.containsCaseSensitivity(options.diffColumn), + s"The id columns must not contain the diff column name '${options.diffColumn}': ${pkColumns.mkString(", ")}") + require(options.changeColumn.forall(!pkColumns.containsCaseSensitivity(_)), + s"The id columns must not contain the change column name '${options.changeColumn.get}': ${pkColumns.mkString(", ")}") val diffValueColumns = getDiffColumns(pkColumns, nonPkColumns, left, right, ignoreColumns).map(_._1).diff(pkColumns) if (Seq(DiffMode.LeftSide, DiffMode.RightSide).contains(options.diffMode)) { - require(notInWithCaseSensitivity(diffValueColumns)(options.diffColumn), + require(!diffValueColumns.containsCaseSensitivity(options.diffColumn), s"The ${if (options.diffMode == DiffMode.LeftSide) "left" else "right"} " + s"non-id columns must not contain the diff column name '${options.diffColumn}': " + s"${(if (options.diffMode == DiffMode.LeftSide) left else right).columns.diffCaseSensitivity(idColumns).mkString(", ")}") options.changeColumn.foreach( changeColumn => - require(notInWithCaseSensitivity(diffValueColumns)(changeColumn), + require(!diffValueColumns.contains(changeColumn), s"The ${if (options.diffMode == DiffMode.LeftSide) "left" else "right"} " + s"non-id columns must not contain the change column name '${options.changeColumn.get}': " + s"${(if (options.diffMode == DiffMode.LeftSide) left else right).columns.diffCaseSensitivity(idColumns).mkString(", ")}") ) } else { - require(notInWithCaseSensitivity(diffValueColumns)(options.diffColumn), + require(!diffValueColumns.containsCaseSensitivity(options.diffColumn), s"The column prefixes '${options.leftColumnPrefix}' and '${options.rightColumnPrefix}', " + s"together with these non-id columns " + s"must not produce the diff column name '${options.diffColumn}': " + s"${nonPkColumns.mkString(", ")}") options.changeColumn.foreach( changeColumn => - require(notInWithCaseSensitivity(diffValueColumns)(changeColumn), + require(!diffValueColumns.containsCaseSensitivity(changeColumn), s"The column prefixes '${options.leftColumnPrefix}' and '${options.rightColumnPrefix}', " + s"together with these non-id columns " + s"must not produce the change column name '${changeColumn}': " + s"${nonPkColumns.mkString(", ")}") ) - require(diffValueColumns.forall(column => notInWithCaseSensitivity(pkColumns)(column)), + require(diffValueColumns.forall(!pkColumns.containsCaseSensitivity(_)), s"The column prefixes '${options.leftColumnPrefix}' and '${options.rightColumnPrefix}', " + s"together with these non-id columns " + s"must not produce any id column name '${pkColumns.mkString("', '")}': " + @@ -217,12 +204,9 @@ class Differ(options: DiffOptions) { private def doDiff[T, U](left: Dataset[T], right: Dataset[U], idColumns: Seq[String], ignoreColumns: Seq[String] = Seq.empty): DataFrame = { checkSchema(left, right, idColumns, ignoreColumns) - val ignoreColumnsCs = ignoreColumns.map(handleConfiguredCaseSensitivity).toSet - - val columns = left.columns.filter(c => !ignoreColumnsCs.contains(handleConfiguredCaseSensitivity(c))).toList + val columns = left.columns.diffCaseSensitivity(ignoreColumns).toList val pkColumns = if (idColumns.isEmpty) columns else idColumns - val pkColumnsCs = pkColumns.map(handleConfiguredCaseSensitivity).toSet - val valueColumns = columns.filter(col => !pkColumnsCs.contains(handleConfiguredCaseSensitivity(col))) + val valueColumns = columns.diffCaseSensitivity(pkColumns) val existsColumnName = distinctPrefixFor(left.columns) + "exists" val leftWithExists = left.withColumn(existsColumnName, lit(1)) diff --git a/src/test/scala/uk/co/gresearch/spark/diff/DiffSuite.scala b/src/test/scala/uk/co/gresearch/spark/diff/DiffSuite.scala index 188d8bd9..9a21c227 100644 --- a/src/test/scala/uk/co/gresearch/spark/diff/DiffSuite.scala +++ b/src/test/scala/uk/co/gresearch/spark/diff/DiffSuite.scala @@ -1114,13 +1114,13 @@ class DiffSuite extends AnyFunSuite with SparkTestSession { test("diff with left-side diff mode and diff column name in value columns") { val options = DiffOptions.default.withDiffColumn("value").withDiffMode(DiffMode.LeftSide) doTestRequirement(left.diff(right, options, "id"), - "The non-id columns must not contain the diff column name 'value': value") + "The left non-id columns must not contain the diff column name 'value': value") } test("diff with right-side diff mode and diff column name in value columns") { val options = DiffOptions.default.withDiffColumn("value").withDiffMode(DiffMode.RightSide) doTestRequirement(right.diff(right, options, "id"), - "The non-id columns must not contain the diff column name 'value': value") + "The right non-id columns must not contain the diff column name 'value': value") } test("diff with left-side diff mode and change column name in value columns") { From 6d4fa6a837307e574c25ec6a53602386c36b8b89 Mon Sep 17 00:00:00 2001 From: Enrico Minack Date: Mon, 12 Sep 2022 15:56:03 +0200 Subject: [PATCH 4/6] Enable commented tests --- .../co/gresearch/spark/diff/DiffSuite.scala | 35 +++++++++++-------- 1 file changed, 21 insertions(+), 14 deletions(-) diff --git a/src/test/scala/uk/co/gresearch/spark/diff/DiffSuite.scala b/src/test/scala/uk/co/gresearch/spark/diff/DiffSuite.scala index 9a21c227..51e6f92a 100644 --- a/src/test/scala/uk/co/gresearch/spark/diff/DiffSuite.scala +++ b/src/test/scala/uk/co/gresearch/spark/diff/DiffSuite.scala @@ -1362,18 +1362,25 @@ class DiffSuite extends AnyFunSuite with SparkTestSession { } test("diff similar with ignored columns") { - val diffColumns = Seq("left_value", "right_value", "left_meta", "right_info") -/** - assertIgnoredColumns(left8.diff(right9, Seq("id", "seq"), Seq("meta", "info")), expectedDiff8and9, diffColumns = diffColumns) - assertIgnoredColumns(Diff.of(left8, right9, Seq("id", "seq"), Seq("meta", "info")), expectedDiff8and9, diffColumns = diffColumns) - assertIgnoredColumns(Diff.default.diff(left8, right9, Seq("id", "seq"), Seq("meta", "info")), expectedDiff8and9, diffColumns = diffColumns) + val expectedSchema = StructType(Seq( + StructField("diff", StringType), + StructField("id", IntegerType), + StructField("seq", IntegerType), + StructField("left_value", StringType), + StructField("right_value", StringType), + StructField("left_meta", StringType), + StructField("right_info", StringType), + )) - assertIgnoredColumns[DiffAs8and9](left8.diffAs(right9, Seq("id", "seq"), Seq("meta", "info")), expectedDiffAs8and9, diffColumns = diffColumns) - assertIgnoredColumns[DiffAs8and9](Diff.ofAs(left8, right9, Seq("id", "seq"), Seq("meta", "info")), expectedDiffAs8and9, diffColumns = diffColumns) - assertIgnoredColumns[DiffAs8and9](Diff.default.diffAs(left8, right9, Seq("id", "seq"), Seq("meta", "info")), expectedDiffAs8and9, diffColumns = diffColumns) -**/ + assertIgnoredColumns(left8.diff(right9, Seq("id", "seq"), Seq("meta", "info")), expectedDiff8and9, expectedSchema) + assertIgnoredColumns(Diff.of(left8, right9, Seq("id", "seq"), Seq("meta", "info")), expectedDiff8and9, expectedSchema) + assertIgnoredColumns(Diff.default.diff(left8, right9, Seq("id", "seq"), Seq("meta", "info")), expectedDiff8and9, expectedSchema) - val expectedSchema = StructType(Seq( + assertIgnoredColumns[DiffAs8and9](left8.diffAs(right9, Seq("id", "seq"), Seq("meta", "info")), expectedDiffAs8and9, expectedSchema) + assertIgnoredColumns[DiffAs8and9](Diff.ofAs(left8, right9, Seq("id", "seq"), Seq("meta", "info")), expectedDiffAs8and9, expectedSchema) + assertIgnoredColumns[DiffAs8and9](Diff.default.diffAs(left8, right9, Seq("id", "seq"), Seq("meta", "info")), expectedDiffAs8and9, expectedSchema) + + val expectedSchemaWith = StructType(Seq( StructField("_1", StringType), StructField("_2", StructType(Seq( StructField("id", IntegerType, nullable = true), @@ -1389,13 +1396,13 @@ class DiffSuite extends AnyFunSuite with SparkTestSession { ))), )) - assertDiffWithSchema(left8.diffWith(right9, Seq("id", "seq"), Seq("meta", "info")), expectedDiffWith8and9, expectedSchema) - assertDiffWithSchema(Diff.ofWith(left8, right9, Seq("id", "seq"), Seq("meta", "info")), expectedDiffWith8and9, expectedSchema) - assertDiffWithSchema(Diff.default.diffWith(left8, right9, Seq("id", "seq"), Seq("meta", "info")), expectedDiffWith8and9, expectedSchema) + assertDiffWithSchema(left8.diffWith(right9, Seq("id", "seq"), Seq("meta", "info")), expectedDiffWith8and9, expectedSchemaWith) + assertDiffWithSchema(Diff.ofWith(left8, right9, Seq("id", "seq"), Seq("meta", "info")), expectedDiffWith8and9, expectedSchemaWith) + assertDiffWithSchema(Diff.default.diffWith(left8, right9, Seq("id", "seq"), Seq("meta", "info")), expectedDiffWith8and9, expectedSchemaWith) } test("diff similar with ignored columns of different type") { - + // TODO } test("diff with ignored columns case-insensitive") { From 0137bfe611cbb34642a6385b929858f458bf2d14 Mon Sep 17 00:00:00 2001 From: Enrico Minack Date: Wed, 14 Sep 2022 15:41:03 +0200 Subject: [PATCH 5/6] Few new test cases, minor logic paraphrasing --- .../uk/co/gresearch/spark/diff/Diff.scala | 22 ++++++++----------- .../uk/co/gresearch/spark/diff/package.scala | 8 +++---- .../co/gresearch/spark/diff/DiffSuite.scala | 20 ++++++++++++++++- 3 files changed, 32 insertions(+), 18 deletions(-) diff --git a/src/main/scala/uk/co/gresearch/spark/diff/Diff.scala b/src/main/scala/uk/co/gresearch/spark/diff/Diff.scala index df2f0213..3fbb1344 100644 --- a/src/main/scala/uk/co/gresearch/spark/diff/Diff.scala +++ b/src/main/scala/uk/co/gresearch/spark/diff/Diff.scala @@ -83,12 +83,10 @@ class Differ(options: DiffOptions) { s"non-id columns must not contain the diff column name '${options.diffColumn}': " + s"${(if (options.diffMode == DiffMode.LeftSide) left else right).columns.diffCaseSensitivity(idColumns).mkString(", ")}") - options.changeColumn.foreach( changeColumn => - require(!diffValueColumns.contains(changeColumn), - s"The ${if (options.diffMode == DiffMode.LeftSide) "left" else "right"} " + - s"non-id columns must not contain the change column name '${options.changeColumn.get}': " + - s"${(if (options.diffMode == DiffMode.LeftSide) left else right).columns.diffCaseSensitivity(idColumns).mkString(", ")}") - ) + require(options.changeColumn.forall(!diffValueColumns.containsCaseSensitivity(_)), + s"The ${if (options.diffMode == DiffMode.LeftSide) "left" else "right"} " + + s"non-id columns must not contain the change column name '${options.changeColumn.get}': " + + s"${(if (options.diffMode == DiffMode.LeftSide) left else right).columns.diffCaseSensitivity(idColumns).mkString(", ")}") } else { require(!diffValueColumns.containsCaseSensitivity(options.diffColumn), s"The column prefixes '${options.leftColumnPrefix}' and '${options.rightColumnPrefix}', " + @@ -96,13 +94,11 @@ class Differ(options: DiffOptions) { s"must not produce the diff column name '${options.diffColumn}': " + s"${nonPkColumns.mkString(", ")}") - options.changeColumn.foreach( changeColumn => - require(!diffValueColumns.containsCaseSensitivity(changeColumn), - s"The column prefixes '${options.leftColumnPrefix}' and '${options.rightColumnPrefix}', " + - s"together with these non-id columns " + - s"must not produce the change column name '${changeColumn}': " + - s"${nonPkColumns.mkString(", ")}") - ) + require(options.changeColumn.forall(!diffValueColumns.containsCaseSensitivity(_)), + s"The column prefixes '${options.leftColumnPrefix}' and '${options.rightColumnPrefix}', " + + s"together with these non-id columns " + + s"must not produce the change column name '${options.changeColumn.orNull}': " + + s"${nonPkColumns.mkString(", ")}") require(diffValueColumns.forall(!pkColumns.containsCaseSensitivity(_)), s"The column prefixes '${options.leftColumnPrefix}' and '${options.rightColumnPrefix}', " + diff --git a/src/main/scala/uk/co/gresearch/spark/diff/package.scala b/src/main/scala/uk/co/gresearch/spark/diff/package.scala index 0db0e1ed..0b654e30 100644 --- a/src/main/scala/uk/co/gresearch/spark/diff/package.scala +++ b/src/main/scala/uk/co/gresearch/spark/diff/package.scala @@ -37,8 +37,8 @@ package object diff { * (w.r.t. the values in the id columns) are marked as `"I"`nsert. And rows of this Dataset, that * do not exist in the other Dataset are marked as `"D"`elete. * - * If no id columns are given, all columns are considered id columns. Then, no `"C"`hange rows - * will appear, as all changes will exists as respective `"D"`elete and `"I"`nsert. + * If no id columns are given (empty sequence), all columns are considered id columns. Then, + * no `"C"`hange rows will appear, as all changes will exists as respective `"D"`elete and `"I"`nsert. * * The returned DataFrame has the `diff` column as the first column. This holds the `"N"`, `"C"`, * `"I"` or `"D"` strings. The id columns follow, then the non-id columns (all remaining columns). @@ -98,8 +98,8 @@ package object diff { * (w.r.t. the values in the id columns) are marked as `"I"`nsert. And rows of this Dataset, that * do not exist in the other Dataset are marked as `"D"`elete. * - * If no id columns are given, all columns are considered id columns. Then, no `"C"`hange rows - * will appear, as all changes will exists as respective `"D"`elete and `"I"`nsert. + * If no id columns are given (empty sequence), all columns are considered id columns. Then, + * no `"C"`hange rows will appear, as all changes will exists as respective `"D"`elete and `"I"`nsert. * * Values in optional ignore columns are not compared but included in the output DataFrame. * diff --git a/src/test/scala/uk/co/gresearch/spark/diff/DiffSuite.scala b/src/test/scala/uk/co/gresearch/spark/diff/DiffSuite.scala index 51e6f92a..4c81c114 100644 --- a/src/test/scala/uk/co/gresearch/spark/diff/DiffSuite.scala +++ b/src/test/scala/uk/co/gresearch/spark/diff/DiffSuite.scala @@ -852,6 +852,14 @@ class DiffSuite extends AnyFunSuite with SparkTestSession { doTestRequirement(left.diff(right), "The schema must not be empty") } + test("diff similar with ignored columns and empty schema") { + val left = Seq((1, "info")).toDF("id", "info") + val right = Seq((1, "meta")).toDF("id", "meta") + + doTestRequirement(left.diff(right, Seq.empty, Seq("id", "info", "meta")), + "The schema except ignored columns must not be empty") + } + test("diff with different types") { // different value types only compiles with DataFrames val left = Seq((1, "str")).toDF("id", "value") @@ -940,7 +948,7 @@ class DiffSuite extends AnyFunSuite with SparkTestSession { "Some id columns do not exist: does not exists missing among id, value") } - test("diff with different number of column") { + test("diff with different number of columns") { // different column names only compiles with DataFrames val left = Seq((1, "str")).toDF("id", "value") val right = Seq((1, 1, "str")).toDF("id", "seq", "value") @@ -951,6 +959,16 @@ class DiffSuite extends AnyFunSuite with SparkTestSession { "Right column names (3): id, seq, value") } + test("diff similar with ignored column and different number of columns") { + val left = Seq((1, "str", "meta")).toDF("id", "value", "meta") + val right = Seq((1, 1, "str")).toDF("id", "seq", "value") + + doTestRequirement(left.diff(right, Seq("id"), Seq("meta")), + "The number of columns doesn't match.\n" + + "Left column names except ignored columns (2): id, value\n" + + "Right column names except ignored columns (3): id, seq, value") + } + test("diff as U") { val actual = left.diffAs[DiffAs](right, "id").orderBy("id") From 853ea7ea928303f0a227b29ae9bd69d7d19a5c0e Mon Sep 17 00:00:00 2001 From: Enrico Minack Date: Tue, 4 Oct 2022 20:47:29 +0200 Subject: [PATCH 6/6] Remove default value for ignoreColumns --- src/main/scala/uk/co/gresearch/spark/diff/Diff.scala | 9 ++++----- src/main/scala/uk/co/gresearch/spark/diff/package.scala | 8 ++++---- 2 files changed, 8 insertions(+), 9 deletions(-) diff --git a/src/main/scala/uk/co/gresearch/spark/diff/Diff.scala b/src/main/scala/uk/co/gresearch/spark/diff/Diff.scala index 3fbb1344..25cbb252 100644 --- a/src/main/scala/uk/co/gresearch/spark/diff/Diff.scala +++ b/src/main/scala/uk/co/gresearch/spark/diff/Diff.scala @@ -282,9 +282,8 @@ class Differ(options: DiffOptions) { * columns are in the order of this Dataset. */ @scala.annotation.varargs - def diff[T](left: Dataset[T], right: Dataset[T], idColumns: String*): DataFrame = { - diff(left, right, idColumns) - } + def diff[T](left: Dataset[T], right: Dataset[T], idColumns: String*): DataFrame = + doDiff(left, right, idColumns) /** * Returns a new DataFrame that contains the differences between two Datasets of @@ -341,7 +340,7 @@ class Differ(options: DiffOptions) { * columns of this Dataset are id columns and appear in the same order. The remaining non-id * columns are in the order of this Dataset. */ - def diff[T, U](left: Dataset[T], right: Dataset[U], idColumns: Seq[String], ignoreColumns: Seq[String] = Seq.empty): DataFrame = + def diff[T, U](left: Dataset[T], right: Dataset[U], idColumns: Seq[String], ignoreColumns: Seq[String]): DataFrame = doDiff(left, right, idColumns, ignoreColumns) /** @@ -425,7 +424,7 @@ class Differ(options: DiffOptions) { * * This requires an additional implicit `Encoder[V]` for the return type `Dataset[V]`. */ - def diffAs[T, U, V](left: Dataset[T], right: Dataset[U], idColumns: Seq[String], ignoreColumns: Seq[String] = Seq.empty) + def diffAs[T, U, V](left: Dataset[T], right: Dataset[U], idColumns: Seq[String], ignoreColumns: Seq[String]) (implicit diffEncoder: Encoder[V]): Dataset[V] = { diffAs(left, right, diffEncoder, idColumns, ignoreColumns) } diff --git a/src/main/scala/uk/co/gresearch/spark/diff/package.scala b/src/main/scala/uk/co/gresearch/spark/diff/package.scala index 0b654e30..53f47e72 100644 --- a/src/main/scala/uk/co/gresearch/spark/diff/package.scala +++ b/src/main/scala/uk/co/gresearch/spark/diff/package.scala @@ -144,7 +144,7 @@ package object diff { * The id column names are take literally, i.e. "a.field" is interpreted as "`a.field`, which is a * column name containing a dot. This is not interpreted as a column "a" with a field "field" (struct). */ - def diff[U](other: Dataset[U], idColumns: Seq[String], ignoreColumns: Seq[String] = Seq.empty): DataFrame = { + def diff[U](other: Dataset[U], idColumns: Seq[String], ignoreColumns: Seq[String]): DataFrame = { Diff.of(this.ds, other, idColumns, ignoreColumns) } @@ -182,8 +182,8 @@ package object diff { * This requires an additional implicit `Encoder[U]` for the return type `Dataset[U]`. */ // no @scala.annotation.varargs here as this implicit class is not nicely accessible from Java - def diffAs[U](other: Dataset[T], idColumns: String*) - (implicit diffEncoder: Encoder[U]): Dataset[U] = { + def diffAs[V](other: Dataset[T], idColumns: String*) + (implicit diffEncoder: Encoder[V]): Dataset[V] = { Diff.ofAs(this.ds, other, idColumns: _*) } @@ -195,7 +195,7 @@ package object diff { * * This requires an additional implicit `Encoder[V]` for the return type `Dataset[V]`. */ - def diffAs[U, V](other: Dataset[U], idColumns: Seq[String], ignoreColumns: Seq[String] = Seq.empty) + def diffAs[U, V](other: Dataset[U], idColumns: Seq[String], ignoreColumns: Seq[String]) (implicit diffEncoder: Encoder[V]): Dataset[V] = { Diff.ofAs(this.ds, other, idColumns, ignoreColumns) }