Skip to content

Commit

Permalink
Allow filtering output by diff action
Browse files Browse the repository at this point in the history
  • Loading branch information
EnricoMi committed Sep 21, 2023
1 parent 0db5e1a commit cd8705d
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 7 deletions.
9 changes: 5 additions & 4 deletions DIFF.md
Original file line number Diff line number Diff line change
Expand Up @@ -419,9 +419,9 @@ spark-submit --packages com.github.scopt:scopt_2.13:4.1.0 spark-extension_2.13-2
```

```
Spark Diff app (2.7.0-3.4)
Spark Diff app (2.10.0-3.4)
Usage: spark-extension_2.13-2.7.0-3.4.jar [options] left right diff
Usage: spark-extension_2.13-2.10.0-3.4.jar [options] left right diff
left file path (requires format option) or table name to read left dataframe
right file path (requires format option) or table name to read right dataframe
Expand All @@ -430,10 +430,10 @@ Usage: spark-extension_2.13-2.7.0-3.4.jar [options] left right diff
Examples:
- Diff CSV files 'left.csv' and 'right.csv' and write result into CSV file 'diff.csv':
spark-submit --packages com.github.scopt:scopt_2.13:4.1.0 spark-extension_2.13-2.7.0-3.4.jar --format csv left.csv right.csv diff.csv
spark-submit --packages com.github.scopt:scopt_2.13:4.1.0 spark-extension_2.13-2.10.0-3.4.jar --format csv left.csv right.csv diff.csv
- Diff CSV file 'left.csv' with Parquet file 'right.parquet' with id column 'id', and write result into Hive table 'diff':
spark-submit --packages com.github.scopt:scopt_2.13:4.1.0 spark-extension_2.13-2.7.0-3.4.jar --left-format csv --right-format parquet --hive --id id left.csv right.parquet diff
spark-submit --packages com.github.scopt:scopt_2.13:4.1.0 spark-extension_2.13-2.10.0-3.4.jar --left-format csv --right-format parquet --hive --id id left.csv right.parquet diff
Spark session
--master <master> Spark master (local, yarn, ...), not needed with spark-submit
Expand All @@ -457,6 +457,7 @@ Input and output
--id <name> id column name
--ignore <name> ignore column name
--save-mode <save-mode> save mode for writing output (Append, Overwrite, ErrorIfExists, Ignore, default ErrorIfExists)
--filter <filter> Filters for rows with these diff actions, with default diffing options use 'N', 'I', 'D', or 'C' (see 'Diffing options' section)
Diffing options
--diff-column <name> column name for diff column (default 'diff')
Expand Down
15 changes: 12 additions & 3 deletions src/main/scala/uk/co/gresearch/spark/diff/App.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package uk.co.gresearch.spark.diff

import org.apache.spark.sql.functions.col
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
import scopt.OptionParser
import uk.co.gresearch._
Expand Down Expand Up @@ -44,6 +45,7 @@ object App {
ids: Seq[String] = Seq.empty,
ignore: Seq[String] = Seq.empty,
saveMode: SaveMode = SaveMode.ErrorIfExists,
filter: Set[String] = Set.empty,
diffOptions: DiffOptions = DiffOptions.default)

// read options from args
Expand Down Expand Up @@ -171,6 +173,12 @@ object App {
.valueName("<save-mode>")
.action((x, c) => c.copy(saveMode = SaveMode.valueOf(x)))
.text(s"save mode for writing output (${SaveMode.values().mkString(", ")}, default ${Options().saveMode})")
opt[String]("filter")
.unbounded()
.optional()
.valueName("<filter>")
.action((x, c) => c.copy(filter = c.filter + x))
.text(s"Filters for rows with these diff actions, with default diffing options use 'N', 'I', 'D', or 'C' (see 'Diffing options' section)")

note("")
note("Diffing options")
Expand Down Expand Up @@ -236,8 +244,9 @@ object App {
.when(schema.isDefined).call(_.schema(schema.get))
.when(format.isDefined).either(_.load(path)).or(_.table(path))

def write(df: DataFrame, format: Option[String], path: String, options: Map[String, String], saveMode: SaveMode): Unit =
df.write
def write(df: DataFrame, format: Option[String], path: String, options: Map[String, String], saveMode: SaveMode, filter: Set[String], diffOptions: DiffOptions): Unit =
df.when(filter.nonEmpty).call(_.where(col(diffOptions.diffColumn).isInCollection(filter)))
.write
.when(format.isDefined).call(_.format(format.get))
.options(options)
.mode(saveMode)
Expand All @@ -261,6 +270,6 @@ object App {
val left = read(spark, options.leftFormat, options.leftPath.get, options.leftSchema, options.leftOptions)
val right = read(spark, options.rightFormat, options.rightPath.get, options.rightSchema, options.rightOptions)
val diff = left.diff(right, options.diffOptions, options.ids, options.ignore)
write(diff, options.outputFormat, options.outputPath.get, options.outputOptions, options.saveMode)
write(diff, options.outputFormat, options.outputPath.get, options.outputOptions, options.saveMode, options.filter, options.diffOptions)
}
}
31 changes: 31 additions & 0 deletions src/test/scala/uk/co/gresearch/spark/diff/AppSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -52,4 +52,35 @@ class AppSuite extends AnyFunSuite with SparkTestSession {
assert(actual.orderBy($"id").collect() === DiffSuite.expectedDiff)
}
}

Seq(Set("I"), Set("C"), Set("D"), Set("N"), Set("I", "C", "D")).foreach { filter =>
test(s"run app with filter ${filter.mkString("[", ",", "]")}") {
withTempPath { path =>
// write left dataframe as parquet
val leftPath = new File(path, "left.parquet").getAbsolutePath
DiffSuite.left(spark).write.parquet(leftPath)

// write right dataframe as csv
val rightPath = new File(path, "right.parquet").getAbsolutePath
DiffSuite.right(spark).write.parquet(rightPath)

// launch app
val outputPath = new File(path, "diff.parquet").getAbsolutePath
App.main(Array(
"--format", "parquet",
"--id", "id",
) ++ filter.toSeq.flatMap(f => Array("--filter", f)) ++ Array(
leftPath,
rightPath,
outputPath
))

// assert written diff
val actual = spark.read.parquet(outputPath).orderBy($"id").collect()
val expected = DiffSuite.expectedDiff.filter(row => filter.contains(row.getString(0)))
assert(actual === expected)
assert(expected.nonEmpty)
}
}
}
}

0 comments on commit cd8705d

Please sign in to comment.