From cd8705d3d07bdaff1af5e9c5e7c90a625c0a631f Mon Sep 17 00:00:00 2001 From: Enrico Minack Date: Wed, 13 Sep 2023 11:57:36 +0200 Subject: [PATCH 1/2] Allow filtering output by diff action --- DIFF.md | 9 +++--- .../uk/co/gresearch/spark/diff/App.scala | 15 +++++++-- .../uk/co/gresearch/spark/diff/AppSuite.scala | 31 +++++++++++++++++++ 3 files changed, 48 insertions(+), 7 deletions(-) diff --git a/DIFF.md b/DIFF.md index e4ecae4c..23ea6258 100644 --- a/DIFF.md +++ b/DIFF.md @@ -419,9 +419,9 @@ spark-submit --packages com.github.scopt:scopt_2.13:4.1.0 spark-extension_2.13-2 ``` ``` -Spark Diff app (2.7.0-3.4) +Spark Diff app (2.10.0-3.4) -Usage: spark-extension_2.13-2.7.0-3.4.jar [options] left right diff +Usage: spark-extension_2.13-2.10.0-3.4.jar [options] left right diff left file path (requires format option) or table name to read left dataframe right file path (requires format option) or table name to read right dataframe @@ -430,10 +430,10 @@ Usage: spark-extension_2.13-2.7.0-3.4.jar [options] left right diff Examples: - Diff CSV files 'left.csv' and 'right.csv' and write result into CSV file 'diff.csv': - spark-submit --packages com.github.scopt:scopt_2.13:4.1.0 spark-extension_2.13-2.7.0-3.4.jar --format csv left.csv right.csv diff.csv + spark-submit --packages com.github.scopt:scopt_2.13:4.1.0 spark-extension_2.13-2.10.0-3.4.jar --format csv left.csv right.csv diff.csv - Diff CSV file 'left.csv' with Parquet file 'right.parquet' with id column 'id', and write result into Hive table 'diff': - spark-submit --packages com.github.scopt:scopt_2.13:4.1.0 spark-extension_2.13-2.7.0-3.4.jar --left-format csv --right-format parquet --hive --id id left.csv right.parquet diff + spark-submit --packages com.github.scopt:scopt_2.13:4.1.0 spark-extension_2.13-2.10.0-3.4.jar --left-format csv --right-format parquet --hive --id id left.csv right.parquet diff Spark session --master Spark master (local, yarn, ...), not needed with spark-submit @@ -457,6 +457,7 @@ Input and output --id id column name --ignore ignore column name --save-mode save mode for writing output (Append, Overwrite, ErrorIfExists, Ignore, default ErrorIfExists) + --filter Filters for rows with these diff actions, with default diffing options use 'N', 'I', 'D', or 'C' (see 'Diffing options' section) Diffing options --diff-column column name for diff column (default 'diff') diff --git a/src/main/scala/uk/co/gresearch/spark/diff/App.scala b/src/main/scala/uk/co/gresearch/spark/diff/App.scala index 230feb7f..bf5fc0f3 100644 --- a/src/main/scala/uk/co/gresearch/spark/diff/App.scala +++ b/src/main/scala/uk/co/gresearch/spark/diff/App.scala @@ -16,6 +16,7 @@ package uk.co.gresearch.spark.diff +import org.apache.spark.sql.functions.col import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession} import scopt.OptionParser import uk.co.gresearch._ @@ -44,6 +45,7 @@ object App { ids: Seq[String] = Seq.empty, ignore: Seq[String] = Seq.empty, saveMode: SaveMode = SaveMode.ErrorIfExists, + filter: Set[String] = Set.empty, diffOptions: DiffOptions = DiffOptions.default) // read options from args @@ -171,6 +173,12 @@ object App { .valueName("") .action((x, c) => c.copy(saveMode = SaveMode.valueOf(x))) .text(s"save mode for writing output (${SaveMode.values().mkString(", ")}, default ${Options().saveMode})") + opt[String]("filter") + .unbounded() + .optional() + .valueName("") + .action((x, c) => c.copy(filter = c.filter + x)) + .text(s"Filters for rows with these diff actions, with default diffing options use 'N', 'I', 'D', or 'C' (see 'Diffing options' section)") note("") note("Diffing options") @@ -236,8 +244,9 @@ object App { .when(schema.isDefined).call(_.schema(schema.get)) .when(format.isDefined).either(_.load(path)).or(_.table(path)) - def write(df: DataFrame, format: Option[String], path: String, options: Map[String, String], saveMode: SaveMode): Unit = - df.write + def write(df: DataFrame, format: Option[String], path: String, options: Map[String, String], saveMode: SaveMode, filter: Set[String], diffOptions: DiffOptions): Unit = + df.when(filter.nonEmpty).call(_.where(col(diffOptions.diffColumn).isInCollection(filter))) + .write .when(format.isDefined).call(_.format(format.get)) .options(options) .mode(saveMode) @@ -261,6 +270,6 @@ object App { val left = read(spark, options.leftFormat, options.leftPath.get, options.leftSchema, options.leftOptions) val right = read(spark, options.rightFormat, options.rightPath.get, options.rightSchema, options.rightOptions) val diff = left.diff(right, options.diffOptions, options.ids, options.ignore) - write(diff, options.outputFormat, options.outputPath.get, options.outputOptions, options.saveMode) + write(diff, options.outputFormat, options.outputPath.get, options.outputOptions, options.saveMode, options.filter, options.diffOptions) } } diff --git a/src/test/scala/uk/co/gresearch/spark/diff/AppSuite.scala b/src/test/scala/uk/co/gresearch/spark/diff/AppSuite.scala index cdd2e2b9..5be0dd3d 100644 --- a/src/test/scala/uk/co/gresearch/spark/diff/AppSuite.scala +++ b/src/test/scala/uk/co/gresearch/spark/diff/AppSuite.scala @@ -52,4 +52,35 @@ class AppSuite extends AnyFunSuite with SparkTestSession { assert(actual.orderBy($"id").collect() === DiffSuite.expectedDiff) } } + + Seq(Set("I"), Set("C"), Set("D"), Set("N"), Set("I", "C", "D")).foreach { filter => + test(s"run app with filter ${filter.mkString("[", ",", "]")}") { + withTempPath { path => + // write left dataframe as parquet + val leftPath = new File(path, "left.parquet").getAbsolutePath + DiffSuite.left(spark).write.parquet(leftPath) + + // write right dataframe as csv + val rightPath = new File(path, "right.parquet").getAbsolutePath + DiffSuite.right(spark).write.parquet(rightPath) + + // launch app + val outputPath = new File(path, "diff.parquet").getAbsolutePath + App.main(Array( + "--format", "parquet", + "--id", "id", + ) ++ filter.toSeq.flatMap(f => Array("--filter", f)) ++ Array( + leftPath, + rightPath, + outputPath + )) + + // assert written diff + val actual = spark.read.parquet(outputPath).orderBy($"id").collect() + val expected = DiffSuite.expectedDiff.filter(row => filter.contains(row.getString(0))) + assert(actual === expected) + assert(expected.nonEmpty) + } + } + } } From 7a46941cc4548c584599ae694be62eaf85f3c5e6 Mon Sep 17 00:00:00 2001 From: Enrico Minack Date: Thu, 21 Sep 2023 15:44:28 +0200 Subject: [PATCH 2/2] Assert diff app filters to be diff value --- .../uk/co/gresearch/spark/diff/App.scala | 5 ++++ .../uk/co/gresearch/spark/diff/AppSuite.scala | 25 +++++++++++++++++++ 2 files changed, 30 insertions(+) diff --git a/src/main/scala/uk/co/gresearch/spark/diff/App.scala b/src/main/scala/uk/co/gresearch/spark/diff/App.scala index bf5fc0f3..a0041bd5 100644 --- a/src/main/scala/uk/co/gresearch/spark/diff/App.scala +++ b/src/main/scala/uk/co/gresearch/spark/diff/App.scala @@ -258,6 +258,11 @@ object App { case Some(options) => options case None => sys.exit(1) } + val unknownFilters = options.filter.filter(filter => !options.diffOptions.diffValues.contains(filter)) + if (unknownFilters.nonEmpty) { + throw new RuntimeException(s"Filter ${unknownFilters.mkString("'", "', '", "'")} not allowed, " + + s"these are the configured diff values: ${options.diffOptions.diffValues.mkString("'", "', '", "'")}") + } // create spark session val spark = SparkSession.builder() diff --git a/src/test/scala/uk/co/gresearch/spark/diff/AppSuite.scala b/src/test/scala/uk/co/gresearch/spark/diff/AppSuite.scala index 5be0dd3d..60cd6654 100644 --- a/src/test/scala/uk/co/gresearch/spark/diff/AppSuite.scala +++ b/src/test/scala/uk/co/gresearch/spark/diff/AppSuite.scala @@ -83,4 +83,29 @@ class AppSuite extends AnyFunSuite with SparkTestSession { } } } + + test(s"run app with unknown filter") { + withTempPath { path => + // write left dataframe as parquet + val leftPath = new File(path, "left.parquet").getAbsolutePath + DiffSuite.left(spark).write.parquet(leftPath) + + // write right dataframe as csv + val rightPath = new File(path, "right.parquet").getAbsolutePath + DiffSuite.right(spark).write.parquet(rightPath) + + // launch app + val outputPath = new File(path, "diff.parquet").getAbsolutePath + assertThrows[RuntimeException]( + App.main(Array( + "--format", "parquet", + "--id", "id", + "--filter", "A", + leftPath, + rightPath, + outputPath + )) + ) + } + } }