Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add --filter to diff app #190

Merged
merged 2 commits into from
Sep 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions DIFF.md
Original file line number Diff line number Diff line change
Expand Up @@ -419,9 +419,9 @@ spark-submit --packages com.github.scopt:scopt_2.13:4.1.0 spark-extension_2.13-2
```

```
Spark Diff app (2.7.0-3.4)
Spark Diff app (2.10.0-3.4)

Usage: spark-extension_2.13-2.7.0-3.4.jar [options] left right diff
Usage: spark-extension_2.13-2.10.0-3.4.jar [options] left right diff

left file path (requires format option) or table name to read left dataframe
right file path (requires format option) or table name to read right dataframe
Expand All @@ -430,10 +430,10 @@ Usage: spark-extension_2.13-2.7.0-3.4.jar [options] left right diff
Examples:

- Diff CSV files 'left.csv' and 'right.csv' and write result into CSV file 'diff.csv':
spark-submit --packages com.github.scopt:scopt_2.13:4.1.0 spark-extension_2.13-2.7.0-3.4.jar --format csv left.csv right.csv diff.csv
spark-submit --packages com.github.scopt:scopt_2.13:4.1.0 spark-extension_2.13-2.10.0-3.4.jar --format csv left.csv right.csv diff.csv

- Diff CSV file 'left.csv' with Parquet file 'right.parquet' with id column 'id', and write result into Hive table 'diff':
spark-submit --packages com.github.scopt:scopt_2.13:4.1.0 spark-extension_2.13-2.7.0-3.4.jar --left-format csv --right-format parquet --hive --id id left.csv right.parquet diff
spark-submit --packages com.github.scopt:scopt_2.13:4.1.0 spark-extension_2.13-2.10.0-3.4.jar --left-format csv --right-format parquet --hive --id id left.csv right.parquet diff

Spark session
--master <master> Spark master (local, yarn, ...), not needed with spark-submit
Expand All @@ -457,6 +457,7 @@ Input and output
--id <name> id column name
--ignore <name> ignore column name
--save-mode <save-mode> save mode for writing output (Append, Overwrite, ErrorIfExists, Ignore, default ErrorIfExists)
--filter <filter> Filters for rows with these diff actions, with default diffing options use 'N', 'I', 'D', or 'C' (see 'Diffing options' section)

Diffing options
--diff-column <name> column name for diff column (default 'diff')
Expand Down
20 changes: 17 additions & 3 deletions src/main/scala/uk/co/gresearch/spark/diff/App.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package uk.co.gresearch.spark.diff

import org.apache.spark.sql.functions.col
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
import scopt.OptionParser
import uk.co.gresearch._
Expand Down Expand Up @@ -44,6 +45,7 @@ object App {
ids: Seq[String] = Seq.empty,
ignore: Seq[String] = Seq.empty,
saveMode: SaveMode = SaveMode.ErrorIfExists,
filter: Set[String] = Set.empty,
diffOptions: DiffOptions = DiffOptions.default)

// read options from args
Expand Down Expand Up @@ -171,6 +173,12 @@ object App {
.valueName("<save-mode>")
.action((x, c) => c.copy(saveMode = SaveMode.valueOf(x)))
.text(s"save mode for writing output (${SaveMode.values().mkString(", ")}, default ${Options().saveMode})")
opt[String]("filter")
.unbounded()
.optional()
.valueName("<filter>")
.action((x, c) => c.copy(filter = c.filter + x))
.text(s"Filters for rows with these diff actions, with default diffing options use 'N', 'I', 'D', or 'C' (see 'Diffing options' section)")

note("")
note("Diffing options")
Expand Down Expand Up @@ -236,8 +244,9 @@ object App {
.when(schema.isDefined).call(_.schema(schema.get))
.when(format.isDefined).either(_.load(path)).or(_.table(path))

def write(df: DataFrame, format: Option[String], path: String, options: Map[String, String], saveMode: SaveMode): Unit =
df.write
def write(df: DataFrame, format: Option[String], path: String, options: Map[String, String], saveMode: SaveMode, filter: Set[String], diffOptions: DiffOptions): Unit =
df.when(filter.nonEmpty).call(_.where(col(diffOptions.diffColumn).isInCollection(filter)))
.write
.when(format.isDefined).call(_.format(format.get))
.options(options)
.mode(saveMode)
Expand All @@ -249,6 +258,11 @@ object App {
case Some(options) => options
case None => sys.exit(1)
}
val unknownFilters = options.filter.filter(filter => !options.diffOptions.diffValues.contains(filter))
if (unknownFilters.nonEmpty) {
throw new RuntimeException(s"Filter ${unknownFilters.mkString("'", "', '", "'")} not allowed, " +
s"these are the configured diff values: ${options.diffOptions.diffValues.mkString("'", "', '", "'")}")
}

// create spark session
val spark = SparkSession.builder()
Expand All @@ -261,6 +275,6 @@ object App {
val left = read(spark, options.leftFormat, options.leftPath.get, options.leftSchema, options.leftOptions)
val right = read(spark, options.rightFormat, options.rightPath.get, options.rightSchema, options.rightOptions)
val diff = left.diff(right, options.diffOptions, options.ids, options.ignore)
write(diff, options.outputFormat, options.outputPath.get, options.outputOptions, options.saveMode)
write(diff, options.outputFormat, options.outputPath.get, options.outputOptions, options.saveMode, options.filter, options.diffOptions)
}
}
56 changes: 56 additions & 0 deletions src/test/scala/uk/co/gresearch/spark/diff/AppSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -52,4 +52,60 @@ class AppSuite extends AnyFunSuite with SparkTestSession {
assert(actual.orderBy($"id").collect() === DiffSuite.expectedDiff)
}
}

Seq(Set("I"), Set("C"), Set("D"), Set("N"), Set("I", "C", "D")).foreach { filter =>
test(s"run app with filter ${filter.mkString("[", ",", "]")}") {
withTempPath { path =>
// write left dataframe as parquet
val leftPath = new File(path, "left.parquet").getAbsolutePath
DiffSuite.left(spark).write.parquet(leftPath)

// write right dataframe as csv
val rightPath = new File(path, "right.parquet").getAbsolutePath
DiffSuite.right(spark).write.parquet(rightPath)

// launch app
val outputPath = new File(path, "diff.parquet").getAbsolutePath
App.main(Array(
"--format", "parquet",
"--id", "id",
) ++ filter.toSeq.flatMap(f => Array("--filter", f)) ++ Array(
leftPath,
rightPath,
outputPath
))

// assert written diff
val actual = spark.read.parquet(outputPath).orderBy($"id").collect()
val expected = DiffSuite.expectedDiff.filter(row => filter.contains(row.getString(0)))
assert(actual === expected)
assert(expected.nonEmpty)
}
}
}

test(s"run app with unknown filter") {
withTempPath { path =>
// write left dataframe as parquet
val leftPath = new File(path, "left.parquet").getAbsolutePath
DiffSuite.left(spark).write.parquet(leftPath)

// write right dataframe as csv
val rightPath = new File(path, "right.parquet").getAbsolutePath
DiffSuite.right(spark).write.parquet(rightPath)

// launch app
val outputPath = new File(path, "diff.parquet").getAbsolutePath
assertThrows[RuntimeException](
App.main(Array(
"--format", "parquet",
"--id", "id",
"--filter", "A",
leftPath,
rightPath,
outputPath
))
)
}
}
}
Loading