diff --git a/src/test/scala/uk/co/gresearch/spark/SparkTestSession.scala b/src/test/scala/uk/co/gresearch/spark/SparkTestSession.scala index 80cccc9c..f9b79bd5 100644 --- a/src/test/scala/uk/co/gresearch/spark/SparkTestSession.scala +++ b/src/test/scala/uk/co/gresearch/spark/SparkTestSession.scala @@ -29,6 +29,7 @@ trait SparkTestSession extends SQLHelper { .appName("spark test example") .config("spark.sql.shuffle.partitions", 2) .config("spark.local.dir", ".") + .enableHiveSupport() .getOrCreate() } diff --git a/src/test/scala/uk/co/gresearch/spark/diff/AppSuite.scala b/src/test/scala/uk/co/gresearch/spark/diff/AppSuite.scala new file mode 100644 index 00000000..cdd2e2b9 --- /dev/null +++ b/src/test/scala/uk/co/gresearch/spark/diff/AppSuite.scala @@ -0,0 +1,55 @@ +/* + * Copyright 2023 G-Research + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package uk.co.gresearch.spark.diff + +import org.apache.spark.sql.SaveMode +import org.scalatest.funsuite.AnyFunSuite +import uk.co.gresearch.spark.SparkTestSession + +import java.io.File + + +class AppSuite extends AnyFunSuite with SparkTestSession { + import spark.implicits._ + + test("run app with file and hive table") { + withTempPath { path => + // write left dataframe as csv + val leftPath = new File(path, "left.csv").getAbsolutePath + DiffSuite.left(spark).write.csv(leftPath) + + // write right dataframe as parquet table + DiffSuite.right(spark).write.format("parquet").mode(SaveMode.Overwrite).saveAsTable("right_parquet") + + // launch app + val jsonPath = new File(path, "diff.json").getAbsolutePath + App.main(Array( + "--left-format", "csv", + "--left-schema", "id int, value string", + "--output-format", "json", + "--id", "id", + leftPath, + "right_parquet", + jsonPath + )) + + // assert written diff + val actual = spark.read.json(jsonPath) + assert(actual.orderBy($"id").collect() === DiffSuite.expectedDiff) + } + } +} diff --git a/src/test/scala/uk/co/gresearch/spark/diff/DiffSuite.scala b/src/test/scala/uk/co/gresearch/spark/diff/DiffSuite.scala index b277b3b6..c47551c0 100644 --- a/src/test/scala/uk/co/gresearch/spark/diff/DiffSuite.scala +++ b/src/test/scala/uk/co/gresearch/spark/diff/DiffSuite.scala @@ -19,7 +19,7 @@ package uk.co.gresearch.spark.diff import org.apache.spark.sql.functions.regexp_replace import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ -import org.apache.spark.sql.{Dataset, Encoders, Row} +import org.apache.spark.sql.{Dataset, Encoders, Row, SparkSession} import org.scalatest.funsuite.AnyFunSuite import uk.co.gresearch.spark.{SparkTestSession, distinctPrefixFor} @@ -93,21 +93,40 @@ case class DiffAsOneSide(diff: String, id: Int, value: Option[String]) +object DiffSuite { + def left(spark: SparkSession): Dataset[Value] = { + import spark.implicits._ + Seq( + Value(1, Some("one")), + Value(2, Some("two")), + Value(3, Some("three")) + ).toDS() + } + + def right(spark: SparkSession): Dataset[Value] = { + import spark.implicits._ + Seq( + Value(1, Some("one")), + Value(2, Some("Two")), + Value(4, Some("four")) + ).toDS() + } + + val expectedDiff: Seq[Row] = Seq( + Row("N", 1, "one", "one"), + Row("C", 2, "two", "Two"), + Row("D", 3, "three", null), + Row("I", 4, null, "four") + ) + +} + class DiffSuite extends AnyFunSuite with SparkTestSession { import spark.implicits._ - lazy val left: Dataset[Value] = Seq( - Value(1, Some("one")), - Value(2, Some("two")), - Value(3, Some("three")) - ).toDS() - - lazy val right: Dataset[Value] = Seq( - Value(1, Some("one")), - Value(2, Some("Two")), - Value(4, Some("four")) - ).toDS() + lazy val left: Dataset[Value] = DiffSuite.left(spark) + lazy val right: Dataset[Value] = DiffSuite.right(spark) lazy val left7: Dataset[Value7] = Seq( Value7(1, Some("one"), Some("one label")), @@ -155,12 +174,7 @@ class DiffSuite extends AnyFunSuite with SparkTestSession { lazy val expectedDiffColumns: Seq[String] = Seq("diff", "id", "left_value", "right_value") - lazy val expectedDiff: Seq[Row] = Seq( - Row("N", 1, "one", "one"), - Row("C", 2, "two", "Two"), - Row("D", 3, "three", null), - Row("I", 4, null, "four") - ) + lazy val expectedDiff: Seq[Row] = DiffSuite.expectedDiff lazy val expectedReverseDiff: Seq[Row] = Seq( Row("N", 1, "one", "one"),