Skip to content

Commit

Permalink
github-318 Support mappings from different projects and with non-stan…
Browse files Browse the repository at this point in the history
…dard outputs in SQL
  • Loading branch information
kupferk committed Jan 26, 2023
1 parent a5c35df commit 2a07963
Show file tree
Hide file tree
Showing 21 changed files with 139 additions and 75 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
* github-307: Describe Flowmans security policy in SECURITY.md
* github-315: Create build profile for CDP 7.1 with Spark 3.3
* github-317: Perform retry on failing JDBC commands
* github-318: Support mappings from different projects and with non-standard outputs in SQL


# Version 0.30.0 - 2023-01-03
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ trait Assertion extends Instance {
* Returns the dependencies (i.e. names of tables in the Dataflow model)
* @return
*/
def inputs : Seq[MappingOutputIdentifier]
def inputs : Set[MappingOutputIdentifier]

/**
* Executes this [[Assertion]] and returns a corresponding DataFrame. This method is allowed to throw an exception,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ final case class Identifier[T](name:String, project:Option[String]) {
object MappingOutputIdentifier {
val empty = MappingOutputIdentifier("", "", None)
def apply(name:String) : MappingOutputIdentifier = parse(name)
def apply(mapping: MappingIdentifier, output:String) : MappingOutputIdentifier = MappingOutputIdentifier(mapping.name, output, mapping.project)
def apply(mapping:MappingIdentifier, output:String) : MappingOutputIdentifier = MappingOutputIdentifier(mapping.name, output, mapping.project)

def parse(fqName:String) : MappingOutputIdentifier = {
if (fqName == null || fqName.isEmpty) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ class RunnerTestTest extends AnyFlatSpec with MockFactory with Matchers with Loc
(assertion.name _).expects().atLeastOnce().returns("assertion")
(assertion.description _).expects().atLeastOnce().returns(None)
(assertion.context _).expects().onCall(() => assertionContext)
(assertion.inputs _).expects().atLeastOnce().returns(Seq(MappingOutputIdentifier("map", "main", None)))
(assertion.inputs _).expects().atLeastOnce().returns(Set(MappingOutputIdentifier("map", "main", None)))
(assertion.execute _).expects(*,*).returns(AssertionResult(assertion, Instant.now()))

var overrideMappingContext:Context = null
Expand Down Expand Up @@ -310,7 +310,7 @@ class RunnerTestTest extends AnyFlatSpec with MockFactory with Matchers with Loc
(assertion.context _).expects().onCall(() => assertionContext)
(assertion.name _).expects().atLeastOnce().returns("assertion")
(assertion.description _).expects().atLeastOnce().returns(None)
(assertion.inputs _).expects().atLeastOnce().returns(Seq(MappingOutputIdentifier("map", "main", None)))
(assertion.inputs _).expects().atLeastOnce().returns(Set(MappingOutputIdentifier("map", "main", None)))
(assertion.execute _).expects(*,*).throws(new UnsupportedOperationException())

var mappingContext:Context = null
Expand Down Expand Up @@ -414,13 +414,13 @@ class RunnerTestTest extends AnyFlatSpec with MockFactory with Matchers with Loc
(assertion1.context _).expects().returns(context)
(assertion1.name _).expects().atLeastOnce().returns("assertion1")
(assertion1.description _).expects().atLeastOnce().returns(None)
(assertion1.inputs _).expects().atLeastOnce().returns(Seq())
(assertion1.inputs _).expects().atLeastOnce().returns(Set.empty)
(assertion1.execute _).expects(*,*).throws(new UnsupportedOperationException())
(assertionTemplate2.instantiate _).expects(*,None).returns(assertion2)
(assertion2.context _).expects().returns(context)
(assertion2.description _).expects().atLeastOnce().returns(None)
(assertion2.name _).expects().atLeastOnce().returns("assertion2")
(assertion2.inputs _).expects().atLeastOnce().returns(Seq())
(assertion2.inputs _).expects().atLeastOnce().returns(Set.empty)
(assertion2.execute _).expects(*,*).returns(AssertionResult(assertion2, Instant.now()))

runner.executeTest(test, keepGoing = true) should be (Status.FAILED)
Expand Down Expand Up @@ -455,12 +455,12 @@ class RunnerTestTest extends AnyFlatSpec with MockFactory with Matchers with Loc
(assertion1.context _).expects().returns(context)
(assertion1.name _).expects().atLeastOnce().returns("assertion1")
(assertion1.description _).expects().atLeastOnce().returns(None)
(assertion1.inputs _).expects().atLeastOnce().returns(Seq())
(assertion1.inputs _).expects().atLeastOnce().returns(Set.empty)
(assertion1.execute _).expects(*,*).throws(new UnsupportedOperationException())
(assertionTemplate2.instantiate _).expects(*,None).returns(assertion2)
(assertion2.name _).expects().atLeastOnce().returns("assertion2")
(assertion2.description _).expects().atLeastOnce().returns(None)
(assertion2.inputs _).expects().returns(Seq())
(assertion2.inputs _).expects().returns(Set.empty)

runner.executeTest(test, keepGoing = false) should be (Status.FAILED)

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2021-2022 Kaya Kupferschmidt
* Copyright 2021-2023 Kaya Kupferschmidt
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -110,8 +110,8 @@ object DataFrameUtils {
tempViewLock.lock()

try {
// Register all input DataFrames as temp views
input.foreach(kv => kv._2.createOrReplaceTempView(kv._1))
// Register all input DataFrames as temp views. We use quotes to escape mapping names
input.foreach(kv => kv._2.createOrReplaceTempView("`" + kv._1 + "`"))

try {
fn
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2019 Kaya Kupferschmidt
* Copyright 2019-2023 Kaya Kupferschmidt
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -72,10 +72,13 @@ object SqlParser {
)
.toSet

// Use this regex to unquote identifiers
val regex = "`(.|$)".r
val tables =
allQueries.flatMap (
_.collect { case p:UnresolvedRelation if !cteNames.contains(p.tableName) => p.tableName }
)
.map(regex.replaceAllIn(_,"$1"))
.toSet

tables ++ cteDependencies
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2019-2021 Kaya Kupferschmidt
* Copyright 2019-2023 Kaya Kupferschmidt
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -19,6 +19,8 @@ package com.dimajix.spark.sql
import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers

import com.dimajix.spark.SPARK_VERSION


class SqlParserTest extends AnyFlatSpec with Matchers {
"The SqlParser" should "detect all dependencies" in {
Expand Down Expand Up @@ -60,7 +62,7 @@ class SqlParserTest extends AnyFlatSpec with Matchers {
deps should be (Set("db.lala"))
}

it should "support scalar exporessions" in {
it should "support scalar expressions" in {
val sql =
"""
|SELECT
Expand All @@ -74,4 +76,26 @@ class SqlParserTest extends AnyFlatSpec with Matchers {
val deps = SqlParser.resolveDependencies(sql)
deps should be (Set("fe_campaign", "campaign_contacts_raw"))
}

it should "support escaped names" in {
val sql =
"""
|SELECT
| *
|FROM `some_mapping`
|""".stripMargin
val deps = SqlParser.resolveDependencies(sql)
deps should be(Set("some_mapping"))
}

it should "support escaped names with special characters" in (if (SPARK_VERSION >= "3") {
val sql =
"""
|SELECT
| *
|FROM `prj/some_mapping:``output```
|""".stripMargin
val deps = SqlParser.resolveDependencies(sql)
deps should be(Set("prj/some_mapping:`output`"))
})
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2021 Kaya Kupferschmidt
* Copyright 2021-2023 Kaya Kupferschmidt
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -83,7 +83,7 @@ case class ColumnsAssertion(
*
* @return
*/
override def inputs: Seq[MappingOutputIdentifier] = Seq(mapping)
override def inputs: Set[MappingOutputIdentifier] = Set(mapping)

/**
* Executes this [[Assertion]] and returns a corresponding DataFrame
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2021 Kaya Kupferschmidt
* Copyright 2021-2023 Kaya Kupferschmidt
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -54,7 +54,7 @@ case class ExpressionAssertion(
*
* @return
*/
override def inputs: Seq[MappingOutputIdentifier] = Seq(mapping)
override def inputs: Set[MappingOutputIdentifier] = Set(mapping)

/**
* Executes this [[Assertion]] and returns a corresponding DataFrame
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2021 Kaya Kupferschmidt
* Copyright 2021-2023 Kaya Kupferschmidt
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -71,7 +71,7 @@ case class SchemaAssertion(
*
* @return
*/
override def inputs: Seq[MappingOutputIdentifier] = Seq(mapping)
override def inputs: Set[MappingOutputIdentifier] = Set(mapping)

/**
* Executes this [[Assertion]] and returns a corresponding DataFrame
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2021 Kaya Kupferschmidt
* Copyright 2021-2023 Kaya Kupferschmidt
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -82,11 +82,7 @@ case class SqlAssertion(
*
* @return
*/
override def inputs: Seq[MappingOutputIdentifier] = {
tests.flatMap(test => SqlParser.resolveDependencies(test.sql))
.map(MappingOutputIdentifier.parse)
.distinct
}
override def inputs: Set[MappingOutputIdentifier] = dependencies

/**
* Executes this [[Assertion]] and returns a corresponding DataFrame
Expand All @@ -100,7 +96,7 @@ case class SqlAssertion(
require(input != null)

AssertionResult.of(this) {
DataFrameUtils.withTempViews(input.map(kv => kv._1.name -> kv._2)) {
DataFrameUtils.withTempViews(rawDependencies.map(d => d -> input(MappingOutputIdentifier(d)))) {
tests.par.map { test =>
// Execute query
val sql = test.sql
Expand All @@ -121,6 +117,9 @@ case class SqlAssertion(
}
}
}

private lazy val rawDependencies = tests.flatMap(test => SqlParser.resolveDependencies(test.sql)).toSet
private lazy val dependencies = rawDependencies.map(MappingOutputIdentifier.parse)
}


Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2021 Kaya Kupferschmidt
* Copyright 2021-2023 Kaya Kupferschmidt
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -52,7 +52,7 @@ case class UniqueKeyAssertion(
*
* @return
*/
override def inputs: Seq[MappingOutputIdentifier] = Seq(mapping)
override def inputs: Set[MappingOutputIdentifier] = Set(mapping)

/**
* Executes this [[Assertion]] and returns a corresponding DataFrame
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2018-2022 Kaya Kupferschmidt
* Copyright 2018-2023 Kaya Kupferschmidt
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -16,14 +16,11 @@

package com.dimajix.flowman.spec.mapping

import java.io.StringWriter
import java.net.URL
import java.nio.charset.Charset

import com.fasterxml.jackson.annotation.JsonProperty
import com.fasterxml.jackson.annotation.JsonPropertyDescription
import org.apache.commons.io.IOUtils
import org.apache.hadoop.fs.Path
import org.apache.spark.sql.DataFrame

import com.dimajix.flowman.execution.Context
Expand Down Expand Up @@ -55,7 +52,7 @@ extends BaseMapping {
require(execution != null)
require(input != null)

val result = DataFrameUtils.withTempViews(input.map(kv => kv._1.name -> kv._2)) {
val result = DataFrameUtils.withTempViews(rawDependencies.map(d => d -> input(MappingOutputIdentifier(d)))) {
execution.spark.sql(statement)
}

Expand All @@ -69,7 +66,8 @@ extends BaseMapping {
*/
override def inputs : Set[MappingOutputIdentifier] = dependencies

private lazy val dependencies = SqlParser.resolveDependencies(statement).map(MappingOutputIdentifier.parse)
private lazy val rawDependencies = SqlParser.resolveDependencies(statement)
private lazy val dependencies = rawDependencies.map(MappingOutputIdentifier.parse)
private lazy val statement : String = {
sql
.orElse(file.map { f =>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2021 Kaya Kupferschmidt
* Copyright 2021-2023 Kaya Kupferschmidt
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -62,8 +62,8 @@ class ColumnsAssertionTest extends AnyFlatSpec with Matchers with LocalSparkSess
ColumnsAssertion.ColumnIsOfType("campaign", Seq(IntegerType, LongType)),
ColumnsAssertion.ColumnIsOfType("lineitem", Seq(FloatType))
))
assertion.inputs should be (Seq(MappingOutputIdentifier("lala")))
assertion.requires should be (Set())
assertion.inputs should be (Set(MappingOutputIdentifier("lala")))
assertion.requires should be (Set.empty)
}

it should "work" in {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2021 Kaya Kupferschmidt
* Copyright 2021-2023 Kaya Kupferschmidt
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -16,8 +16,6 @@

package com.dimajix.flowman.spec.assertion

import java.time.Instant

import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers

Expand Down Expand Up @@ -53,8 +51,8 @@ class ExpressionAssertionTest extends AnyFlatSpec with Matchers with LocalSparkS
"network IS NOT NULL",
"campaign IN (1,2)"
))
assertion.inputs should be (Seq(MappingOutputIdentifier("lala")))
assertion.requires should be (Set())
assertion.inputs should be (Set(MappingOutputIdentifier("lala")))
assertion.requires should be (Set.empty)
}

it should "work" in {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2021 Kaya Kupferschmidt
* Copyright 2021-2023 Kaya Kupferschmidt
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -64,6 +64,8 @@ class SchemaAssertionTest extends AnyFlatSpec with Matchers with LocalSparkSessi
assertion.mapping should be (MappingOutputIdentifier("some_mapping"))
assertion.columns should be (Seq(Field("col_1", StringType), Field("col_2", IntegerType)))
assertion.schema should be (None)
assertion.inputs should be(Set(MappingOutputIdentifier("some_mapping")))
assertion.requires should be(Set.empty)
}

it should "be parseable with embedded schema" in {
Expand Down
Loading

0 comments on commit 2a07963

Please sign in to comment.