Skip to content

Commit

Permalink
Add Spark Diff app (#160)
Browse files Browse the repository at this point in the history
  • Loading branch information
EnricoMi authored May 4, 2023
1 parent 63998ec commit 843a335
Show file tree
Hide file tree
Showing 11 changed files with 492 additions and 19 deletions.
1 change: 1 addition & 0 deletions .github/actions/build/action.yml
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ runs:
*
!.*
!target/*-javadoc.jar
!target/*-sources.jar
!target/site
branding:
Expand Down
27 changes: 27 additions & 0 deletions .github/actions/test-jvm/action.yml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,26 @@ runs:
name: Binaries-${{ inputs.spark-compat-version }}-${{ inputs.scala-version }}
path: .

- name: Cache Spark Binaries
uses: actions/cache@v3
if: ( ! contains(inputs.spark-version, '-SNAPSHOT') )
with:
path: ~/spark
key: ${{ runner.os }}-spark-binaries-${{ inputs.spark-version }}-${{ inputs.scala-compat-version }}

- name: Setup Spark Binaries
if: ( ! contains(inputs.spark-version, '-SNAPSHOT') )
env:
SPARK_PACKAGE: spark-${{ inputs.spark-version }}/spark-${{ inputs.spark-version }}-bin-hadoop${{ inputs.hadoop-version }}${{ inputs.scala-compat-version == '2.13' && '-scala2.13' || '' }}.tgz
run: |
if [[ ! -e ~/spark ]]
then
wget --progress=dot:giga "https://www.apache.org/dyn/closer.lua/spark/${SPARK_PACKAGE}?action=download" -O - | tar -xzC "${{ runner.temp }}"
archive=$(basename "${SPARK_PACKAGE}") bash -c "mv -v "${{ runner.temp }}/\${archive/%.tgz/}" ~/spark"
fi
echo "SPARK_HOME=$(cd ~/spark; pwd)" >> $GITHUB_ENV
shell: bash

- name: Cache Maven packages
uses: actions/cache@v3
with:
Expand All @@ -48,6 +68,13 @@ runs:
run: mvn --batch-mode test
shell: bash

- name: Diff App test
if: ( ! contains(inputs.spark-version, '-SNAPSHOT') )
run: |
$SPARK_HOME/bin/spark-submit --packages com.github.scopt:scopt_${{ inputs.scala-compat-version }}:4.1.0 target/spark-extension_*.jar --format parquet --id id src/test/files/test.parquet/file1.parquet src/test/files/test.parquet/file2.parquet diff.parquet
$SPARK_HOME/bin/spark-shell <<< 'val df = spark.read.parquet("diff.parquet").orderBy($"id").groupBy($"diff").count; df.show; if (df.count != 2) sys.exit(1)'
shell: bash

- name: Generate Unit Test Report
if: failure()
run: mvn --batch-mode surefire-report:report-only
Expand Down
11 changes: 11 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -182,39 +182,49 @@ jobs:
scala-compat-version: '2.12'
scala-version: '2.12.10'
spark-patch-version: '3'
hadoop-version: '2.7'
- spark-compat-version: '3.1'
scala-compat-version: '2.12'
scala-version: '2.12.10'
spark-patch-version: '3'
hadoop-version: '2.7'
- spark-compat-version: '3.2'
scala-compat-version: '2.12'
scala-version: '2.12.15'
hadoop-version: '2.7'
- spark-compat-version: '3.2'
scala-compat-version: '2.12'
scala-version: '2.12.15'
spark-patch-version: '4'
hadoop-version: '2.7'
- spark-compat-version: '3.3'
scala-compat-version: '2.12'
scala-version: '2.12.15'
hadoop-version: '2'
- spark-compat-version: '3.4'
scala-compat-version: '2.12'
scala-version: '2.12.17'
spark-patch-version: '0'
hadoop-version: '3'

- spark-compat-version: '3.2'
scala-compat-version: '2.13'
scala-version: '2.13.5'
hadoop-version: '3.2'
- spark-compat-version: '3.2'
scala-compat-version: '2.13'
scala-version: '2.13.5'
spark-patch-version: '4'
hadoop-version: '3.2'
- spark-compat-version: '3.3'
scala-compat-version: '2.13'
scala-version: '2.13.8'
hadoop-version: '3'
- spark-compat-version: '3.4'
scala-compat-version: '2.13'
scala-version: '2.13.8'
spark-patch-version: '0'
hadoop-version: '3'

steps:
- name: Checkout
Expand All @@ -229,6 +239,7 @@ jobs:
scala-version: ${{ matrix.scala-version }}
spark-compat-version: ${{ matrix.spark-compat-version }}
scala-compat-version: ${{ matrix.scala-compat-version }}
hadoop-version: ${{ matrix.hadoop-version }}

# pyspark is not available for snapshots or scala other than 2.12
# we would have to compile spark from sources for this, not worth it
Expand Down
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,11 @@
All notable changes to this project will be documented in this file.

The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/).
## [UNRELEASED] - YYYY-MM-DD

### Added

- Spark app to diff files or tables and write result back to file or table. (#160)

## [2.6.0] - 2023-04-11

Expand Down
82 changes: 82 additions & 0 deletions DIFF.md
Original file line number Diff line number Diff line change
Expand Up @@ -403,3 +403,85 @@ The latter variant is prefixed with `_with_options`.

* `def diff(self: DataFrame, other: DataFrame, *id_columns: str) -> DataFrame`
* `def diffwith(self: DataFrame, other: DataFrame, *id_columns: str) -> DataFrame:`

## Diff Spark application

There is also a Spark application that can be used to create a diff DataFrame. The application reads two DataFrames
`left` and `right` from files or tables, executes the diff transformation and writes the result DataFrame to a file or table.
The Diff app can be run via `spark-submit`:

```shell
# Scala 2.12
spark-submit --packages com.github.scopt:scopt_2.12:4.1.0 spark-extension_2.12-2.7.0-3.4.jar --help

# Scala 2.13
spark-submit --packages com.github.scopt:scopt_2.13:4.1.0 spark-extension_2.13-2.7.0-3.4.jar --help
```

```
Spark Diff app (2.7.0-3.4)
Usage: spark-extension_2.13-2.7.0-3.4.jar [options] left right diff
left file path (requires format option) or table name to read left dataframe
right file path (requires format option) or table name to read right dataframe
diff file path (requires format option) or table name to write diff dataframe
Examples:
- Diff CSV files 'left.csv' and 'right.csv' and write result into CSV file 'diff.csv':
spark-submit --packages com.github.scopt:scopt_2.13:4.1.0 spark-extension_2.13-2.7.0-3.4.jar --format csv left.csv right.csv diff.csv
- Diff CSV file 'left.csv' with Parquet file 'right.parquet' with id column 'id', and write result into Hive table 'diff':
spark-submit --packages com.github.scopt:scopt_2.13:4.1.0 spark-extension_2.13-2.7.0-3.4.jar --left-format csv --right-format parquet --hive --id id left.csv right.parquet diff
Spark session
--master <master> Spark master (local, yarn, ...), not needed with spark-submit
--app-name <app-name> Spark application name
--hive enable Hive support to read from and write to Hive tables
Input and output
-f, --format <format> input and output file format (csv, json, parquet, ...)
--left-format <format> left input file format (csv, json, parquet, ...)
--right-format <format> right input file format (csv, json, parquet, ...)
--output-format <formt> output file format (csv, json, parquet, ...)
-s, --schema <schema> input schema
--left-schema <schema> left input schema
--right-schema <schema> right input schema
--left-option:key=val left input option
--right-option:key=val right input option
--output-option:key=val output option
--id <name> id column name
--ignore <name> ignore column name
--save-mode <save-mode> save mode for writing output (Append, Overwrite, ErrorIfExists, Ignore, default ErrorIfExists)
Diffing options
--diff-column <name> column name for diff column (default 'diff')
--left-prefix <prefix> prefix for left column names (default 'left')
--right-prefix <prefix> prefix for right column names (default 'right')
--insert-value <value> value for insertion (default 'I')
--change-value <value> value for change (default 'C')
--delete-value <value> value for deletion (default 'D')
--no-change-value <val> value for no change (default 'N')
--change-column <name> column name for change column (default is no such column)
--diff-mode <mode> diff mode (ColumnByColumn, SideBySide, LeftSide, RightSide, default ColumnByColumn)
--sparse enable sparse diff
General
--help prints this usage text
```

### Examples

Diff CSV files `left.csv` and `right.csv` and write result into CSV file `diff.csv`:
```shell
spark-submit --packages com.github.scopt:scopt_2.13:4.1.0 spark-extension_2.13-2.7.0-3.4.jar --format csv left.csv right.csv diff.csv
```

Diff CSV file `left.csv` with Parquet file `right.parquet` with id column `id`, and write result into Hive table `diff`:
```shell
spark-submit --packages com.github.scopt:scopt_2.13:4.1.0 spark-extension_2.13-2.7.0-3.4.jar --left-format csv --right-format parquet --hive --id id left.csv right.parquet diff
```
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

This project provides extensions to the [Apache Spark project](https://spark.apache.org/) in Scala and Python:

**[Diff](DIFF.md):** A `diff` transformation for `Dataset`s that computes the differences between
**[Diff](DIFF.md):** A `diff` transformation and application for `Dataset`s that computes the differences between
two datasets, i.e. which rows to _add_, _delete_ or _change_ to get from one dataset to the other.

**[SortedGroups](GROUPS.md):** A `groupByKey` transformation that groups rows by a key while providing
Expand Down
27 changes: 27 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,20 @@
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_${scala.compat.version}</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>com.github.scopt</groupId>
<artifactId>scopt_${scala.compat.version}</artifactId>
<!-- keep DIFF.md section "Diff Spark application" synced with this value -->
<version>4.1.0</version>
</dependency>

<!-- Test -->
<dependency>
<groupId>org.apache.spark</groupId>
Expand Down Expand Up @@ -209,6 +223,19 @@
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<version>3.3.0</version>
<configuration>
<archive>
<manifest>
<addClasspath>true</addClasspath>
<mainClass>uk.co.gresearch.spark.diff.App</mainClass>
</manifest>
</archive>
</configuration>
</plugin>
<!-- run java tests -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
Expand Down
Loading

0 comments on commit 843a335

Please sign in to comment.