Skip to content

Commit

Permalink
Merge branch 'github-452' into 'main'
Browse files Browse the repository at this point in the history
github-452 Fix parsing empty strings in data records

See merge request dimajix/flowman!18
  • Loading branch information
kupferk committed Sep 27, 2023
2 parents 6d00624 + db57a5b commit fe446b4
Show file tree
Hide file tree
Showing 11 changed files with 88 additions and 17 deletions.
8 changes: 8 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,14 @@ Breaking changes will be documented in this changelog file for each version.
* github-435: Flowman should also load a project.yaml (with an 'a' in the extension)
* github-436: Document minimum Maven version of Flowman-maven-plugin
* github-438: Empty or non-existing module directories should not lead to an error
* github-452: [BUG] SQL assertions do not support empty strings as expected values

### Breaking changes

This version introduces some (minor) breaking changes:
* When providing sample records (for example, via the `values` mapping, or in the expected outcome of `sql` assertions),
empty strings will be interpreted as empty strings. Older versions interpreted empty strings as SQL NULL values. In
case you still need a NULL value, you can simply use the YAML `null` value.


## Version 1.0.1 - Upcoming
Expand Down
1 change: 1 addition & 0 deletions docs/releases.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ changes over time.
* github-428: Move project selector into top bar in Flowman History Server
* github-433: Add Trino JDBC driver as plugin
* github-434: Use sshj instead of ganymed for sftp
* github-452: [BUG] SQL assertions do not support empty strings as expected values


### Version 1.0.1 - Upcoming
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ object RowParser {
) : Options = {
val timeZone = DateTimeUtils.getTimeZone("UTC")
Options(
nullValue = "",
nullValue = null,
nanValue = "NaN",
negativeInf = "Inf",
positiveInf = "-Inf",
Expand Down Expand Up @@ -160,8 +160,13 @@ class RowParser(schema: StructType, options:RowParser.Options) {
java.sql.Date.valueOf(datum)
}

case _: StringType => (d: String) =>
nullSafeDatum(d, name, nullable) { datum =>
case _: StringType => (datum: String) =>
if (datum == options.nullValue || datum == null) {
if (!nullable) {
throw new RuntimeException(s"null value found but field $name is not nullable.")
}
null
} else {
datum
}

Expand All @@ -173,7 +178,7 @@ class RowParser(schema: StructType, options:RowParser.Options) {
datum: String,
name: String,
nullable: Boolean)(converter: ValueConverter): Any = {
if (datum == options.nullValue || datum == null) {
if (datum == options.nullValue || datum == null || datum.isEmpty) {
if (!nullable) {
throw new RuntimeException(s"null value found but field $name is not nullable.")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ class DataFrameBuilderTest extends AnyFlatSpec with Matchers with LocalSparkSess
df.collect() should be (Seq(
Row(1,"lala",2.3, new java.math.BigDecimal("3.400000"), Date.valueOf("2019-02-01"),new Timestamp(DateTimeUtils.stringToTime("2019-02-01T12:34:00").getTime)),
Row(2,"lolo",3.4, new java.math.BigDecimal("4.500000"), Date.valueOf("2019-02-02"),new Timestamp(DateTimeUtils.stringToTime("2019-02-01T12:34:00").getTime)),
Row(null,null,null,null,null,null),
Row(null,"",null,null,null,null),
Row(null,null,null,null,null,null)
))
df.schema should be (schema)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ class RowParserTest extends AnyFlatSpec with Matchers {
result should be (Seq(
Row(1,"lala",2.3, new java.math.BigDecimal("3.400000"), Date.valueOf("2019-02-01"),localTime("2019-02-01T12:34:00")),
Row(2,"lolo",3.4, new java.math.BigDecimal("4.500000"), Date.valueOf("2019-02-02"),localTime("2019-02-01T12:34:00")),
Row(null,null,null,null,null,null),
Row(null,"",null,null,null,null),
Row(null,null,null,null,null,null)
))
}
Expand Down Expand Up @@ -112,7 +112,7 @@ class RowParserTest extends AnyFlatSpec with Matchers {

result should be (Seq(
Row(1,"lala",null),
Row(null,null,null)
Row(null,"",null)
))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ class SqlAssertionTest extends AnyFlatSpec with Matchers with LocalSparkSession
| - query: SELECT * FROM lolo
| expected:
| - [A,1]
| - [B,2]
| - [B,""]
| - [C,null]
|""".stripMargin

val assertionSpec = ObjectMapper.parse[AssertionSpec](spec)
Expand Down Expand Up @@ -90,7 +91,7 @@ class SqlAssertionTest extends AnyFlatSpec with Matchers with LocalSparkSession
),
SqlAssertion.Case(
query = "SELECT * FROM lolo",
expected = Seq(Array("A", "1"), Array("B", "2"))
expected = Seq(Array("A", "1"), Array("B", ""), Array("C", null))
)
))
assertion.inputs should be (Set(MappingOutputIdentifier("lala"), MappingOutputIdentifier("lolo")))
Expand Down Expand Up @@ -264,4 +265,44 @@ class SqlAssertionTest extends AnyFlatSpec with Matchers with LocalSparkSession

session.shutdown()
}

it should "support empty strings and NULL values" in {
val session = Session.builder().withSparkSession(spark).build()
val context = session.context
val execution = session.execution

val spec =
"""
|kind: sql
|tests:
| - query: SELECT _1, _2 FROM mx
| expected:
| - [A,1]
| - [B,""]
| - [C,null]
|""".stripMargin

val assertionSpec = ObjectMapper.parse[AssertionSpec](spec)
assertionSpec shouldBe a[SqlAssertionSpec]

val assertion = assertionSpec.instantiate(context).asInstanceOf[SqlAssertion]

val mx = execution.spark.createDataFrame(Seq(
("A", "1"),
("B", ""),
("C", null)
))

val result = assertion.execute(execution, Map(MappingOutputIdentifier("mx") -> mx))
result.withoutTime should be(
AssertionResult(
assertion,
Seq(
AssertionTestResult("SELECT _1, _2 FROM mx", None, true)
)
).withoutTime
)

session.shutdown()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,8 @@ class ValuesDatasetTest extends AnyFlatSpec with Matchers with LocalSparkSession
records = Seq(
ArrayRecord("lala","12"),
ArrayRecord("lolo","13"),
ArrayRecord("",null)
ArrayRecord("",""),
ArrayRecord(null,null)
)
)

Expand All @@ -136,6 +137,7 @@ class ValuesDatasetTest extends AnyFlatSpec with Matchers with LocalSparkSession
df.collect() should be (Seq(
Row("lala", 12),
Row("lolo", 13),
Row("",null),
Row(null,null)
))

Expand All @@ -158,7 +160,8 @@ class ValuesDatasetTest extends AnyFlatSpec with Matchers with LocalSparkSession
records = Seq(
ArrayRecord("lala","12"),
ArrayRecord("lolo","13"),
ArrayRecord("",null)
ArrayRecord("",""),
ArrayRecord(null,null)
)
)

Expand All @@ -174,6 +177,7 @@ class ValuesDatasetTest extends AnyFlatSpec with Matchers with LocalSparkSession
df.collect() should be (Seq(
Row("lala", 12),
Row("lolo", 13),
Row("",null),
Row(null,null)
))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,8 @@ class MockMappingTest extends AnyFlatSpec with Matchers with MockFactory with Lo
Seq(
ArrayRecord("lala","12"),
ArrayRecord("lolo","13"),
ArrayRecord("",null)
ArrayRecord("",""),
ArrayRecord(null,null)
)
)

Expand All @@ -247,6 +248,7 @@ class MockMappingTest extends AnyFlatSpec with Matchers with MockFactory with Lo
df.collect() should be (Seq(
Row("lala", 12),
Row("lolo", 13),
Row("",null),
Row(null,null)
))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,8 @@ class ValuesMappingTest extends AnyFlatSpec with Matchers with MockFactory with
records = Seq(
ArrayRecord("lala","12"),
ArrayRecord("lolo","13"),
ArrayRecord("",null)
ArrayRecord("",""),
ArrayRecord(null,null)
)
)

Expand All @@ -172,6 +173,7 @@ class ValuesMappingTest extends AnyFlatSpec with Matchers with MockFactory with
df.collect() should be (Seq(
Row("lala", 12),
Row("lolo", 13),
Row("",null),
Row(null,null)
))

Expand Down Expand Up @@ -202,7 +204,8 @@ class ValuesMappingTest extends AnyFlatSpec with Matchers with MockFactory with
records = Seq(
ArrayRecord("lala","12"),
ArrayRecord("lolo","13"),
ArrayRecord("",null)
ArrayRecord("",""),
ArrayRecord(null,null)
)
)

Expand All @@ -218,6 +221,7 @@ class ValuesMappingTest extends AnyFlatSpec with Matchers with MockFactory with
df.collect() should be (Seq(
Row("lala", 12),
Row("lolo", 13),
Row("",null),
Row(null,null)
))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,8 @@ class MockRelationTest extends AnyFlatSpec with Matchers with MockFactory with L
Seq(
ArrayRecord("lala","12"),
ArrayRecord("lolo","13"),
ArrayRecord("",null)
ArrayRecord("",""),
ArrayRecord(null,null)
)
)
val schema = InlineSchema(
Expand All @@ -290,6 +291,7 @@ class MockRelationTest extends AnyFlatSpec with Matchers with MockFactory with L
df.collect() should be (Seq(
Row("lala", 12),
Row("lolo", 13),
Row("",null),
Row(null,null)
))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,8 @@ class ValuesRelationTest extends AnyFlatSpec with Matchers with MockFactory with
records = Seq(
ArrayRecord("lala","12"),
ArrayRecord("lolo","13"),
ArrayRecord("",null)
ArrayRecord("",""),
ArrayRecord(null,null)
)
)

Expand All @@ -200,6 +201,7 @@ class ValuesRelationTest extends AnyFlatSpec with Matchers with MockFactory with
df.collect() should be (Seq(
Row("lala", 12),
Row("lolo", 13),
Row("",null),
Row(null,null)
))

Expand Down Expand Up @@ -230,7 +232,8 @@ class ValuesRelationTest extends AnyFlatSpec with Matchers with MockFactory with
records = Seq(
ArrayRecord("lala","12"),
ArrayRecord("lolo","13"),
ArrayRecord("",null)
ArrayRecord("",""),
ArrayRecord(null,null)
)
)

Expand All @@ -255,6 +258,7 @@ class ValuesRelationTest extends AnyFlatSpec with Matchers with MockFactory with
df.collect() should be (Seq(
Row("lala", 12),
Row("lolo", 13),
Row("",null),
Row(null,null)
))

Expand Down

0 comments on commit fe446b4

Please sign in to comment.