Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Key order sensitive map comparator #187

Merged
merged 2 commits into from
Aug 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion DIFF.md
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,7 @@ The following alternative comparators are provided:
|`DiffComparators.epsilon(epsilon)`|Two values are equal when they are at most `epsilon` apart.<br/><br/>The comparator can be configured to use `epsilon` as an absolute (`.asAbsolute()`) threshold, or as relative (`.asRelative()`) to the larger value. Further, the threshold itself can be considered equal (`.asInclusive()`) or not equal (`.asExclusive()`):<ul><li>`DiffComparators.epsilon(epsilon).asAbsolute().asInclusive()`:<br/>`x` and `y` are equal iff `abs(x - y) ≤ epsilon`</li><li>`DiffComparators.epsilon(epsilon).asAbsolute().asExclusive()`:<br/>`x` and `y` are equal iff `abs(x - y) < epsilon`</li><li>`DiffComparators.epsilon(epsilon).asRelative().asInclusive()`:<br/>`x` and `y` are equal iff `abs(x - y) ≤ epsilon * max(abs(x), abs(y))`</li><li>`DiffComparators.epsilon(epsilon).asRelative().asExclusive()`:<br/>`x` and `y` are equal iff `abs(x - y) < epsilon * max(abs(x), abs(y))`</li></ul>|
|`DiffComparators.string()`|Two `StringType` values are compared while ignoring white space differences. For this comparison, sequences of whitespaces are collapesed into single whitespaces, leading and trailing whitespaces are removed. With `DiffComparators.string(false)`, string values are compared with the default comparator.|
|`DiffComparators.duration(duration)`|Two `DateType` or `TimestampType` values are equal when they are at most `duration` apart. That duration is an instance of `java.time.Duration`.<br/><br/>The comparator can be configured to consider `duration` as equal (`.asInclusive()`) or not equal (`.asExclusive()`):<ul><li>`DiffComparators.duration(duration).asInclusive()`:<br/>`x` and `y` are equal iff `x - y ≤ duration`</li><li>`DiffComparators.duration(duration).asExclusive()`:<br/>`x` and `y` are equal iff `x - y < duration`</li></lu>|
|`DiffComparators.map[K,V]()`|Two `Map[K,V]` values are equal when they match in all their keys and values.|
|`DiffComparators.map[K,V](keyOrderSensitive)`|Two `Map[K,V]` values are equal when they match in all their keys and values. With `keyOrderSensitive=true`, the order of the keys matters, with `keyOrderSensitive=false` (default), the order of keys is ignored.|

An example:

Expand Down
11 changes: 10 additions & 1 deletion src/main/scala/uk/co/gresearch/spark/diff/DiffComparators.scala
Original file line number Diff line number Diff line change
Expand Up @@ -96,5 +96,14 @@ object DiffComparators {
/**
* This comparator compares two `Map[K,V]` values. They are equal when they match in all their keys and values.
*/
def map[K: Encoder, V: Encoder](): DiffComparator = MapDiffComparator[K, V]()
def map[K: Encoder, V: Encoder](): DiffComparator = MapDiffComparator[K, V](keyOrderSensitive = false)

// for backward compatibility to v2.4.0 up to v2.8.0
// replace with default value in above map when moving to v3
/**
* This comparator compares two `Map[K,V]` values. They are equal when they match in all their keys and values.
*
* @param keyOrderSensitive comparator compares key order if true
*/
def map[K: Encoder, V: Encoder](keyOrderSensitive: Boolean): DiffComparator = MapDiffComparator[K, V](keyOrderSensitive)
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,14 @@ case class MapDiffComparator[K, V](private val comparator: EquivDiffComparator[U
override def equiv(left: Column, right: Column): Column = comparator.equiv(left, right)
}

private case class MapDiffEquiv[K: ClassTag, V](keyType: DataType, valueType: DataType) extends math.Equiv[UnsafeMapData] {
private case class MapDiffEquiv[K: ClassTag, V](keyType: DataType, valueType: DataType, keyOrderSensitive: Boolean) extends math.Equiv[UnsafeMapData] {
override def equiv(left: UnsafeMapData, right: UnsafeMapData): Boolean = {

val leftKeysIndices: Map[K, Int] = left.keyArray().toArray(keyType).zipWithIndex.toMap
val rightKeysIndices: Map[K, Int] = right.keyArray().toArray(keyType).zipWithIndex.toMap
val leftKeys: Array[K] = left.keyArray().toArray(keyType)
val rightKeys: Array[K] = right.keyArray().toArray(keyType)

val leftKeysIndices: Map[K, Int] = leftKeys.zipWithIndex.toMap
val rightKeysIndices: Map[K, Int] = rightKeys.zipWithIndex.toMap

val leftValues = left.valueArray()
val rightValues = right.valueArray()
Expand All @@ -46,18 +49,22 @@ private case class MapDiffEquiv[K: ClassTag, V](keyType: DataType, valueType: Da
}

left.numElements() == right.numElements() &&
leftKeysIndices.keySet.diff(rightKeysIndices.keySet).isEmpty &&
(keyOrderSensitive && leftKeys.sameElements(rightKeys) || !keyOrderSensitive && leftKeys.toSet.diff(rightKeys.toSet).isEmpty) &&
valuesAreEqual.forall(identity)
}
}

case object MapDiffComparator {
def apply[K: Encoder, V: Encoder](): MapDiffComparator[K, V] = {
def apply[K: Encoder, V: Encoder](keyOrderSensitive: Boolean): MapDiffComparator[K, V] = {
val keyType = encoderFor[K].schema.fields(0).dataType
val valueType = encoderFor[V].schema.fields(0).dataType
val equiv = MapDiffEquiv(keyType, valueType)
val equiv = MapDiffEquiv(keyType, valueType, keyOrderSensitive)
val dataType = MapType(keyType, valueType)
val comparator = InputTypedEquivDiffComparator[UnsafeMapData](equiv, dataType)
MapDiffComparator[K, V](comparator)
}

// for backward compatibility to v2.4.0 up to v2.8.0
// replace with default value in above apply when moving to v3
def apply[K: Encoder, V: Encoder](): MapDiffComparator[K, V] = apply(keyOrderSensitive = false)
}
34 changes: 18 additions & 16 deletions src/test/scala/uk/co/gresearch/spark/diff/DiffComparatorSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -355,22 +355,24 @@ class DiffComparatorSuite extends AnyFunSuite with SparkTestSession {
}
}

Seq("true", "false").foreach { codegen =>
test(s"map comparator - codegen enabled=$codegen") {
withSQLConf(
SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> codegen,
SQLConf.CODEGEN_FALLBACK.key -> "false"
) {
val options = DiffOptions.default.withComparator(DiffComparators.map[Int, Long](), "map")

val actual = leftMaps.diff(rightMaps, options, "id").orderBy($"id").collect()
val diffs = Seq((1, "N"), (2, "C"), (3, "C"), (4, "D"), (5, "I"), (6, "N"), (7, "C")).toDF("id", "diff")
val expected = leftMaps.withColumnRenamed("map", "left_map")
.join(rightMaps.withColumnRenamed("map", "right_map"), Seq("id"), "fullouter")
.join(diffs, "id")
.select($"diff", $"id", $"left_map", $"right_map")
.orderBy($"id").collect()
assert(actual === expected)
Seq(true, false).foreach { sensitive =>
Seq("true", "false").foreach { codegen =>
test(s"map comparator - keyOrderSensitive=$sensitive - codegen enabled=$codegen") {
withSQLConf(
SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> codegen,
SQLConf.CODEGEN_FALLBACK.key -> "false"
) {
val options = DiffOptions.default.withComparator(DiffComparators.map[Int, Long](sensitive), "map")

val actual = leftMaps.diff(rightMaps, options, "id").orderBy($"id").collect()
val diffs = Seq((1, "N"), (2, "C"), (3, "C"), (4, "D"), (5, "I"), (6, if (sensitive) "C" else "N"), (7, "C")).toDF("id", "diff")
val expected = leftMaps.withColumnRenamed("map", "left_map")
.join(rightMaps.withColumnRenamed("map", "right_map"), Seq("id"), "fullouter")
.join(diffs, "id")
.select($"diff", $"id", $"left_map", $"right_map")
.orderBy($"id").collect()
assert(actual === expected)
}
}
}
}
Expand Down
Loading