From 79799200a5734c83f9b4efb64f132f029123298d Mon Sep 17 00:00:00 2001 From: Kaya Kupferschmidt Date: Wed, 27 Jul 2022 12:41:36 +0200 Subject: [PATCH 1/9] Update for next development version --- docker/pom.xml | 2 +- flowman-client/pom.xml | 2 +- flowman-common/pom.xml | 2 +- flowman-core/pom.xml | 2 +- flowman-dist/pom.xml | 2 +- flowman-dsl/pom.xml | 2 +- flowman-hub/pom.xml | 2 +- flowman-parent/pom.xml | 2 +- flowman-plugins/aws/pom.xml | 2 +- flowman-plugins/azure/pom.xml | 2 +- flowman-plugins/delta/pom.xml | 2 +- flowman-plugins/impala/pom.xml | 2 +- flowman-plugins/json/pom.xml | 2 +- flowman-plugins/kafka/pom.xml | 2 +- flowman-plugins/mariadb/pom.xml | 2 +- flowman-plugins/mssqlserver/pom.xml | 2 +- flowman-plugins/mysql/pom.xml | 2 +- flowman-plugins/openapi/pom.xml | 2 +- flowman-plugins/oracle/pom.xml | 2 +- flowman-plugins/postgresql/pom.xml | 2 +- flowman-plugins/swagger/pom.xml | 2 +- flowman-scalatest-compat/pom.xml | 2 +- flowman-server-ui/pom.xml | 2 +- flowman-server/pom.xml | 2 +- flowman-spark-extensions/pom.xml | 2 +- flowman-spark-testing/pom.xml | 2 +- flowman-spec/pom.xml | 2 +- flowman-studio-ui/pom.xml | 2 +- flowman-studio/pom.xml | 2 +- flowman-testing/pom.xml | 2 +- flowman-tools/pom.xml | 2 +- pom.xml | 2 +- 32 files changed, 32 insertions(+), 32 deletions(-) diff --git a/docker/pom.xml b/docker/pom.xml index c9183c7a7..d56193775 100644 --- a/docker/pom.xml +++ b/docker/pom.xml @@ -10,7 +10,7 @@ com.dimajix.flowman flowman-root - 0.26.0 + 0.26.1-SNAPSHOT ../pom.xml diff --git a/flowman-client/pom.xml b/flowman-client/pom.xml index 4ac9318cc..141d2beab 100644 --- a/flowman-client/pom.xml +++ b/flowman-client/pom.xml @@ -9,7 +9,7 @@ com.dimajix.flowman flowman-root - 0.26.0 + 0.26.1-SNAPSHOT ../pom.xml diff --git a/flowman-common/pom.xml b/flowman-common/pom.xml index 8e2e65947..531c8404c 100644 --- a/flowman-common/pom.xml +++ b/flowman-common/pom.xml @@ -9,7 +9,7 @@ com.dimajix.flowman flowman-root - 0.26.0 + 0.26.1-SNAPSHOT ../pom.xml diff --git a/flowman-core/pom.xml b/flowman-core/pom.xml index 0599fa388..1cc9424ae 100644 --- a/flowman-core/pom.xml +++ b/flowman-core/pom.xml @@ -9,7 +9,7 @@ com.dimajix.flowman flowman-root - 0.26.0 + 0.26.1-SNAPSHOT ../pom.xml diff --git a/flowman-dist/pom.xml b/flowman-dist/pom.xml index db2819274..5020f34d6 100644 --- a/flowman-dist/pom.xml +++ b/flowman-dist/pom.xml @@ -10,7 +10,7 @@ com.dimajix.flowman flowman-root - 0.26.0 + 0.26.1-SNAPSHOT ../pom.xml diff --git a/flowman-dsl/pom.xml b/flowman-dsl/pom.xml index 2168cbb02..3837a989c 100644 --- a/flowman-dsl/pom.xml +++ b/flowman-dsl/pom.xml @@ -9,7 +9,7 @@ flowman-root com.dimajix.flowman - 0.26.0 + 0.26.1-SNAPSHOT ../pom.xml diff --git a/flowman-hub/pom.xml b/flowman-hub/pom.xml index add48e29f..567f6b91a 100644 --- a/flowman-hub/pom.xml +++ b/flowman-hub/pom.xml @@ -9,7 +9,7 @@ flowman-root com.dimajix.flowman - 0.26.0 + 0.26.1-SNAPSHOT ../pom.xml diff --git a/flowman-parent/pom.xml b/flowman-parent/pom.xml index c770af95c..442a6712d 100644 --- a/flowman-parent/pom.xml +++ b/flowman-parent/pom.xml @@ -10,7 +10,7 @@ com.dimajix.flowman flowman-root - 0.26.0 + 0.26.1-SNAPSHOT ../pom.xml diff --git a/flowman-plugins/aws/pom.xml b/flowman-plugins/aws/pom.xml index 683772e38..b5b341bcf 100644 --- a/flowman-plugins/aws/pom.xml +++ b/flowman-plugins/aws/pom.xml @@ -9,7 +9,7 @@ com.dimajix.flowman flowman-root - 0.26.0 + 0.26.1-SNAPSHOT ../../pom.xml diff --git a/flowman-plugins/azure/pom.xml b/flowman-plugins/azure/pom.xml index 67a8a54ce..053342b70 100644 --- a/flowman-plugins/azure/pom.xml +++ b/flowman-plugins/azure/pom.xml @@ -9,7 +9,7 @@ com.dimajix.flowman flowman-root - 0.26.0 + 0.26.1-SNAPSHOT ../../pom.xml diff --git a/flowman-plugins/delta/pom.xml b/flowman-plugins/delta/pom.xml index b8ff45359..1b144afe3 100644 --- a/flowman-plugins/delta/pom.xml +++ b/flowman-plugins/delta/pom.xml @@ -9,7 +9,7 @@ com.dimajix.flowman flowman-root - 0.26.0 + 0.26.1-SNAPSHOT ../../pom.xml diff --git a/flowman-plugins/impala/pom.xml b/flowman-plugins/impala/pom.xml index 2f7b0980b..4ef25f0f1 100644 --- a/flowman-plugins/impala/pom.xml +++ b/flowman-plugins/impala/pom.xml @@ -9,7 +9,7 @@ com.dimajix.flowman flowman-root - 0.26.0 + 0.26.1-SNAPSHOT ../../pom.xml diff --git a/flowman-plugins/json/pom.xml b/flowman-plugins/json/pom.xml index 797f5808e..30c461e25 100644 --- a/flowman-plugins/json/pom.xml +++ b/flowman-plugins/json/pom.xml @@ -9,7 +9,7 @@ com.dimajix.flowman flowman-root - 0.26.0 + 0.26.1-SNAPSHOT ../../pom.xml diff --git a/flowman-plugins/kafka/pom.xml b/flowman-plugins/kafka/pom.xml index ce65e8477..bb4129f59 100644 --- a/flowman-plugins/kafka/pom.xml +++ b/flowman-plugins/kafka/pom.xml @@ -9,7 +9,7 @@ com.dimajix.flowman flowman-root - 0.26.0 + 0.26.1-SNAPSHOT ../../pom.xml diff --git a/flowman-plugins/mariadb/pom.xml b/flowman-plugins/mariadb/pom.xml index 50e63ebf8..74616d5cf 100644 --- a/flowman-plugins/mariadb/pom.xml +++ b/flowman-plugins/mariadb/pom.xml @@ -9,7 +9,7 @@ com.dimajix.flowman flowman-root - 0.26.0 + 0.26.1-SNAPSHOT ../../pom.xml diff --git a/flowman-plugins/mssqlserver/pom.xml b/flowman-plugins/mssqlserver/pom.xml index 7e8c56bb1..d754fb0c3 100644 --- a/flowman-plugins/mssqlserver/pom.xml +++ b/flowman-plugins/mssqlserver/pom.xml @@ -9,7 +9,7 @@ com.dimajix.flowman flowman-root - 0.26.0 + 0.26.1-SNAPSHOT ../../pom.xml diff --git a/flowman-plugins/mysql/pom.xml b/flowman-plugins/mysql/pom.xml index 748c5c876..7157f6b06 100644 --- a/flowman-plugins/mysql/pom.xml +++ b/flowman-plugins/mysql/pom.xml @@ -9,7 +9,7 @@ com.dimajix.flowman flowman-root - 0.26.0 + 0.26.1-SNAPSHOT ../../pom.xml diff --git a/flowman-plugins/openapi/pom.xml b/flowman-plugins/openapi/pom.xml index 0b8b860ca..2522d853e 100644 --- a/flowman-plugins/openapi/pom.xml +++ b/flowman-plugins/openapi/pom.xml @@ -9,7 +9,7 @@ com.dimajix.flowman flowman-root - 0.26.0 + 0.26.1-SNAPSHOT ../../pom.xml diff --git a/flowman-plugins/oracle/pom.xml b/flowman-plugins/oracle/pom.xml index 26c29b32a..416888e2e 100644 --- a/flowman-plugins/oracle/pom.xml +++ b/flowman-plugins/oracle/pom.xml @@ -9,7 +9,7 @@ com.dimajix.flowman flowman-root - 0.26.0 + 0.26.1-SNAPSHOT ../../pom.xml diff --git a/flowman-plugins/postgresql/pom.xml b/flowman-plugins/postgresql/pom.xml index 0b66d9bdd..a23b9eb3f 100644 --- a/flowman-plugins/postgresql/pom.xml +++ b/flowman-plugins/postgresql/pom.xml @@ -9,7 +9,7 @@ com.dimajix.flowman flowman-root - 0.26.0 + 0.26.1-SNAPSHOT ../../pom.xml diff --git a/flowman-plugins/swagger/pom.xml b/flowman-plugins/swagger/pom.xml index dea86caae..2b08fb8c6 100644 --- a/flowman-plugins/swagger/pom.xml +++ b/flowman-plugins/swagger/pom.xml @@ -9,7 +9,7 @@ com.dimajix.flowman flowman-root - 0.26.0 + 0.26.1-SNAPSHOT ../../pom.xml diff --git a/flowman-scalatest-compat/pom.xml b/flowman-scalatest-compat/pom.xml index c93fde90c..b479f7f98 100644 --- a/flowman-scalatest-compat/pom.xml +++ b/flowman-scalatest-compat/pom.xml @@ -9,7 +9,7 @@ com.dimajix.flowman flowman-root - 0.26.0 + 0.26.1-SNAPSHOT ../pom.xml diff --git a/flowman-server-ui/pom.xml b/flowman-server-ui/pom.xml index e26b2fb1d..0c5eda9eb 100644 --- a/flowman-server-ui/pom.xml +++ b/flowman-server-ui/pom.xml @@ -9,7 +9,7 @@ com.dimajix.flowman flowman-root - 0.26.0 + 0.26.1-SNAPSHOT ../pom.xml diff --git a/flowman-server/pom.xml b/flowman-server/pom.xml index e29589e16..9e10dc304 100644 --- a/flowman-server/pom.xml +++ b/flowman-server/pom.xml @@ -9,7 +9,7 @@ flowman-root com.dimajix.flowman - 0.26.0 + 0.26.1-SNAPSHOT ../pom.xml diff --git a/flowman-spark-extensions/pom.xml b/flowman-spark-extensions/pom.xml index b707e301f..2729d2b6d 100644 --- a/flowman-spark-extensions/pom.xml +++ b/flowman-spark-extensions/pom.xml @@ -9,7 +9,7 @@ com.dimajix.flowman flowman-root - 0.26.0 + 0.26.1-SNAPSHOT ../pom.xml diff --git a/flowman-spark-testing/pom.xml b/flowman-spark-testing/pom.xml index a94394cf8..56cbc315e 100644 --- a/flowman-spark-testing/pom.xml +++ b/flowman-spark-testing/pom.xml @@ -9,7 +9,7 @@ com.dimajix.flowman flowman-root - 0.26.0 + 0.26.1-SNAPSHOT ../pom.xml diff --git a/flowman-spec/pom.xml b/flowman-spec/pom.xml index 10d665433..02a738ebc 100644 --- a/flowman-spec/pom.xml +++ b/flowman-spec/pom.xml @@ -9,7 +9,7 @@ flowman-root com.dimajix.flowman - 0.26.0 + 0.26.1-SNAPSHOT ../pom.xml diff --git a/flowman-studio-ui/pom.xml b/flowman-studio-ui/pom.xml index 902479b40..a2ebb1476 100644 --- a/flowman-studio-ui/pom.xml +++ b/flowman-studio-ui/pom.xml @@ -9,7 +9,7 @@ com.dimajix.flowman flowman-root - 0.26.0 + 0.26.1-SNAPSHOT ../pom.xml diff --git a/flowman-studio/pom.xml b/flowman-studio/pom.xml index ec02b3f0b..6d86a7a59 100644 --- a/flowman-studio/pom.xml +++ b/flowman-studio/pom.xml @@ -9,7 +9,7 @@ flowman-root com.dimajix.flowman - 0.26.0 + 0.26.1-SNAPSHOT ../pom.xml diff --git a/flowman-testing/pom.xml b/flowman-testing/pom.xml index db297f2d9..a38c838e2 100644 --- a/flowman-testing/pom.xml +++ b/flowman-testing/pom.xml @@ -9,7 +9,7 @@ com.dimajix.flowman flowman-root - 0.26.0 + 0.26.1-SNAPSHOT ../pom.xml diff --git a/flowman-tools/pom.xml b/flowman-tools/pom.xml index 85803675b..176017ccc 100644 --- a/flowman-tools/pom.xml +++ b/flowman-tools/pom.xml @@ -9,7 +9,7 @@ com.dimajix.flowman flowman-root - 0.26.0 + 0.26.1-SNAPSHOT ../pom.xml diff --git a/pom.xml b/pom.xml index 01900f802..b6fe7da83 100644 --- a/pom.xml +++ b/pom.xml @@ -5,7 +5,7 @@ 4.0.0 com.dimajix.flowman flowman-root - 0.26.0 + 0.26.1-SNAPSHOT pom Flowman root pom A Spark based ETL tool From 685adff20251a849825f7031366b5a21ce439d23 Mon Sep 17 00:00:00 2001 From: Kaya Kupferschmidt Date: Thu, 28 Jul 2022 06:58:16 +0200 Subject: [PATCH 2/9] Switch to central Apache archieve in Dockerfile --- docker/Dockerfile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docker/Dockerfile b/docker/Dockerfile index b516666f5..0ba8559b5 100644 --- a/docker/Dockerfile +++ b/docker/Dockerfile @@ -12,7 +12,7 @@ ENV SPARK_HOME=/opt/spark ENV PATH=$PATH:$SPARK_HOME/bin:$FLOWMAN_HOME/bin # Download and install Spark -RUN curl -sL --retry 3 "http://ftp.fau.de/apache/spark/spark-${BUILD_SPARK_VERSION}/spark-${BUILD_SPARK_VERSION}-bin-hadoop${BUILD_HADOOP_VERSION}.tgz" \ +RUN curl -sL --retry 3 "https://archive.apache.org/dist/spark/spark-${BUILD_SPARK_VERSION}/spark-${BUILD_SPARK_VERSION}-bin-hadoop${BUILD_HADOOP_VERSION}.tgz" \ | tar xz -C /opt \ && ln -s /opt/spark-${BUILD_SPARK_VERSION}-bin-hadoop${BUILD_HADOOP_VERSION} ${SPARK_HOME} \ && chown -R root:root $SPARK_HOME From 756691b5302b82db44ef84f79b2ab6aada4736b1 Mon Sep 17 00:00:00 2001 From: Kaya Kupferschmidt Date: Thu, 28 Jul 2022 08:15:20 +0200 Subject: [PATCH 3/9] github-226 Upgrade to Spark 3.2.2 --- CHANGELOG.md | 5 +++++ docs/releases.md | 5 +++++ pom.xml | 4 ++-- 3 files changed, 12 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 544de74b3..f75015d41 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,8 @@ +# Version 0.26.1 + +* github-226: Upgrade to Spark 3.2.2 + + # Version 0.26.0 - 2022-07-27 * github-202: Add support for Spark 3.3 diff --git a/docs/releases.md b/docs/releases.md index cb70ffb9b..40e28f2f3 100644 --- a/docs/releases.md +++ b/docs/releases.md @@ -14,6 +14,11 @@ The following gives an (incomplete) list of past releases of the last 12 months. changes over time. +### Version 0.26.1 + +* github-226: Upgrade to Spark 3.2.2 + + ### Version 0.26.0 - 2022-07-27 * github-202: Add support for Spark 3.3 diff --git a/pom.xml b/pom.xml index b6fe7da83..faf03a9b3 100644 --- a/pom.xml +++ b/pom.xml @@ -96,7 +96,7 @@ 3.2 1.2.0 1.1.2 - 3.2.1 + 3.2.2 3.2 1.1.8.4 4.1.68.Final @@ -377,7 +377,7 @@ 3.2 1.2.0 1.1.2 - 3.2.1 + 3.2.2 3.2 1.1.8.4 4.1.68.Final From 41025d680cf827a4e08019654cd7d6528897a8bc Mon Sep 17 00:00:00 2001 From: Kaya Kupferschmidt Date: Mon, 1 Aug 2022 12:24:07 +0200 Subject: [PATCH 4/9] github-227 Flowman should not fail with field names containing '-', '/' etc --- CHANGELOG.md | 1 + docs/releases.md | 1 + .../com/dimajix/spark/sql/SchemaUtils.scala | 17 ++++++++++- .../dimajix/spark/sql/SchemaUtilsTest.scala | 30 +++++++++++++++++++ 4 files changed, 48 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index f75015d41..41e13815c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,7 @@ # Version 0.26.1 * github-226: Upgrade to Spark 3.2.2 +* github-227: [BUG] Flowman should not fail with field names containing "-", "/" etc # Version 0.26.0 - 2022-07-27 diff --git a/docs/releases.md b/docs/releases.md index 40e28f2f3..1934c4f49 100644 --- a/docs/releases.md +++ b/docs/releases.md @@ -17,6 +17,7 @@ changes over time. ### Version 0.26.1 * github-226: Upgrade to Spark 3.2.2 +* github-227: [BUG] Flowman should not fail with field names containing "-", "/" etc ### Version 0.26.0 - 2022-07-27 diff --git a/flowman-spark-extensions/src/main/scala/com/dimajix/spark/sql/SchemaUtils.scala b/flowman-spark-extensions/src/main/scala/com/dimajix/spark/sql/SchemaUtils.scala index 4a1efb99e..47dbdf26a 100644 --- a/flowman-spark-extensions/src/main/scala/com/dimajix/spark/sql/SchemaUtils.scala +++ b/flowman-spark-extensions/src/main/scala/com/dimajix/spark/sql/SchemaUtils.scala @@ -20,6 +20,7 @@ import java.util.Locale import org.apache.spark.sql.Column import org.apache.spark.sql.DataFrame +import org.apache.spark.sql.catalyst.parser.ParseException import org.apache.spark.sql.functions.lit import org.apache.spark.sql.functions.rpad import org.apache.spark.sql.functions.substring @@ -235,7 +236,13 @@ object SchemaUtils { def recoverCharVarchar(field:StructField) : StructField = { if (field.metadata.contains(CHAR_VARCHAR_TYPE_STRING_METADATA_KEY)) { val typeString = field.metadata.getString(CHAR_VARCHAR_TYPE_STRING_METADATA_KEY) - val dt = CustomSqlParser.parseDataType(typeString) + val dt = try { + CustomSqlParser.parseDataType(typeString) + } catch { + // Work around bad field names, which will fire a parse-exception + case _:ParseException => + recoverCharVarchar(field.dataType) + } val meta = new MetadataBuilder().withMetadata(field.metadata) .remove(CHAR_VARCHAR_TYPE_STRING_METADATA_KEY) .build() @@ -244,6 +251,14 @@ object SchemaUtils { field } } + private def recoverCharVarchar(dataType: DataType) : DataType = { + dataType match { + case struct: StructType => struct.copy(fields = struct.fields.map(recoverCharVarchar)) + case array: ArrayType => ArrayType(recoverCharVarchar(array.elementType), array.containsNull) + case map: MapType => MapType(recoverCharVarchar(map.keyType), recoverCharVarchar(map.valueType), map.valueContainsNull) + case dt: DataType => dt + } + } /** * Removes all meta data from a Spark schema. Useful for comparing results in unit tests diff --git a/flowman-spark-extensions/src/test/scala/com/dimajix/spark/sql/SchemaUtilsTest.scala b/flowman-spark-extensions/src/test/scala/com/dimajix/spark/sql/SchemaUtilsTest.scala index f5f946a28..dc1a7852b 100644 --- a/flowman-spark-extensions/src/test/scala/com/dimajix/spark/sql/SchemaUtilsTest.scala +++ b/flowman-spark-extensions/src/test/scala/com/dimajix/spark/sql/SchemaUtilsTest.scala @@ -175,6 +175,36 @@ class SchemaUtilsTest extends AnyFlatSpec with Matchers with LocalSparkSession w recoveredSchema should be (originalSchema) } + it should "gracefully handle non-standard characters in field names" in { + val originalSchema = StructType(Seq( + StructField("array-col", ArrayType(CharType(10))), + StructField("map:col", MapType(CharType(15), VarcharType(20))), + StructField("struct col", StructType(Seq( + StructField("char-field", CharType(10)), + StructField("varchar:field", VarcharType(25)), + StructField("int field", IntegerType), + StructField("Mönster Fieldß", VarcharType(30)) + ))), + StructField("varchar/col", VarcharType(4), nullable = false) + )) + + val replacedSchema = SchemaUtils.replaceCharVarchar(originalSchema) + SchemaUtils.dropMetadata(replacedSchema) should be(StructType(Seq( + StructField("array-col", ArrayType(StringType)), + StructField("map:col", MapType(StringType, StringType)), + StructField("struct col", StructType(Seq( + StructField("char-field", StringType), + StructField("varchar:field", StringType), + StructField("int field", IntegerType), + StructField("Mönster Fieldß", StringType) + ))), + StructField("varchar/col", StringType, nullable = false) + ))) + + val recoveredSchema = SchemaUtils.recoverCharVarchar(replacedSchema) + recoveredSchema should be(originalSchema) + } + "SchemaUtils.dropMetadata" should "remove all meta data" in { val comment = "123456789" val schema = StructType(Seq( From 92a51a882497a44f3fb923d2a0d1fc6f52632d64 Mon Sep 17 00:00:00 2001 From: Kaya Kupferschmidt Date: Mon, 1 Aug 2022 14:17:28 +0200 Subject: [PATCH 5/9] github-227 Implement utility function to remove extended type information --- .../com/dimajix/spark/sql/SchemaUtils.scala | 28 ++++++++++++++ .../dimajix/spark/sql/SchemaUtilsTest.scala | 37 +++++++++++++++++++ 2 files changed, 65 insertions(+) diff --git a/flowman-spark-extensions/src/main/scala/com/dimajix/spark/sql/SchemaUtils.scala b/flowman-spark-extensions/src/main/scala/com/dimajix/spark/sql/SchemaUtils.scala index 47dbdf26a..ac52f72bb 100644 --- a/flowman-spark-extensions/src/main/scala/com/dimajix/spark/sql/SchemaUtils.scala +++ b/flowman-spark-extensions/src/main/scala/com/dimajix/spark/sql/SchemaUtils.scala @@ -225,6 +225,34 @@ object SchemaUtils { metadata.contains(CHAR_VARCHAR_TYPE_STRING_METADATA_KEY) } + def dropExtendedTypeInfo(struct:StructType) : StructType = { + StructType(struct.fields.map(dropExtendedTypeInfo)) + } + def dropExtendedTypeInfo(field:StructField) : StructField = { + val meta = dropExtendedTypeInfo(field.metadata) + val dt = dropExtendedTypeInfo(field.dataType) + field.copy(dataType = dt, metadata = meta) + } + def dropExtendedTypeInfo(metadata: Metadata): Metadata = { + if (metadata.contains(CHAR_VARCHAR_TYPE_STRING_METADATA_KEY)) { + new MetadataBuilder() + .withMetadata(metadata) + .remove(CHAR_VARCHAR_TYPE_STRING_METADATA_KEY) + .build() + } + else { + metadata + } + } + private def dropExtendedTypeInfo(dtype: DataType): DataType = { + dtype match { + case struct: StructType => dropExtendedTypeInfo(struct) + case array: ArrayType => ArrayType(dropExtendedTypeInfo(array.elementType), array.containsNull) + case map: MapType => MapType(dropExtendedTypeInfo(map.keyType), dropExtendedTypeInfo(map.valueType), map.valueContainsNull) + case dt: DataType => dt + } + } + /** * Recovers the original CHAR/VARCHAR types from a struct, which was previously cleaned via replaceCharVarchar * @param schema diff --git a/flowman-spark-extensions/src/test/scala/com/dimajix/spark/sql/SchemaUtilsTest.scala b/flowman-spark-extensions/src/test/scala/com/dimajix/spark/sql/SchemaUtilsTest.scala index dc1a7852b..717710b10 100644 --- a/flowman-spark-extensions/src/test/scala/com/dimajix/spark/sql/SchemaUtilsTest.scala +++ b/flowman-spark-extensions/src/test/scala/com/dimajix/spark/sql/SchemaUtilsTest.scala @@ -205,6 +205,43 @@ class SchemaUtilsTest extends AnyFlatSpec with Matchers with LocalSparkSession w recoveredSchema should be(originalSchema) } + "SchemaUtils.dropExtendedTypeInfo" should "remove only extended type info" in { + val comment = "123456789" + val schema = StructType(Seq( + StructField("Name", VarcharType(50), false).withComment(comment), + StructField("Nested", + StructType(Seq( + StructField("AmOUnt", CharType(10)).withComment(comment), + StructField("SomeArray", ArrayType(VarcharType(20), false)).withComment(comment) + )) + ).withComment(comment), + StructField("StructArray", ArrayType( + StructType(Seq( + StructField("Name", StringType).withComment(comment) + )) + )).withComment(comment) + )) + + val replacedSchema = SchemaUtils.replaceCharVarchar(schema) + val pureSchema = SchemaUtils.dropExtendedTypeInfo(replacedSchema) + + val expectedSchema = StructType(Seq( + StructField("Name", StringType, false).withComment(comment), + StructField("Nested", + StructType(Seq( + StructField("AmOUnt", StringType).withComment(comment), + StructField("SomeArray", ArrayType(StringType, false)).withComment(comment) + )) + ).withComment(comment), + StructField("StructArray", ArrayType( + StructType(Seq( + StructField("Name", StringType).withComment(comment) + )) + )).withComment(comment) + )) + pureSchema should be(expectedSchema) + } + "SchemaUtils.dropMetadata" should "remove all meta data" in { val comment = "123456789" val schema = StructType(Seq( From e6f9390c69015e419381ef9178197d01ed9ef639 Mon Sep 17 00:00:00 2001 From: Kaya Kupferschmidt Date: Mon, 1 Aug 2022 19:33:39 +0200 Subject: [PATCH 6/9] github-228 Padding and truncation of CHAR(n)/VARCHAR(n) should be configurable --- CHANGELOG.md | 1 + docs/releases.md | 1 + docs/setup/config.md | 6 + docs/spec/mapping/schema.md | 28 ++ docs/spec/relation/index.md | 10 + .../dimajix/flowman/config/FlowmanConf.scala | 29 +- .../com/dimajix/flowman/model/Relation.scala | 15 +- .../flowman/transforms/SchemaEnforcer.scala | 132 ++++++--- .../transforms/SchemaEnforcerTest.scala | 253 +++++++++++------- .../com/dimajix/spark/sql/SchemaUtils.scala | 39 +-- .../flowman/spec/mapping/SchemaMapping.scala | 16 +- .../flowman/spec/mapping/UnionMapping.scala | 2 +- .../relation/HiveUnionTableRelation.scala | 3 +- .../flowman/spec/target/CopyTarget.scala | 24 +- .../spec/mapping/SchemaMappingTest.scala | 29 +- 15 files changed, 401 insertions(+), 187 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 41e13815c..adf1e301f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,7 @@ * github-226: Upgrade to Spark 3.2.2 * github-227: [BUG] Flowman should not fail with field names containing "-", "/" etc +* github-228: Padding and truncation of CHAR(n)/VARCHAR(n) should be configurable # Version 0.26.0 - 2022-07-27 diff --git a/docs/releases.md b/docs/releases.md index 1934c4f49..224967e58 100644 --- a/docs/releases.md +++ b/docs/releases.md @@ -18,6 +18,7 @@ changes over time. * github-226: Upgrade to Spark 3.2.2 * github-227: [BUG] Flowman should not fail with field names containing "-", "/" etc +* github-228: Padding and truncation of CHAR(n)/VARCHAR(n) should be configurable ### Version 0.26.0 - 2022-07-27 diff --git a/docs/setup/config.md b/docs/setup/config.md index 69a3013ba..9185ced62 100644 --- a/docs/setup/config.md +++ b/docs/setup/config.md @@ -108,6 +108,9 @@ Sets the strategy to use how tables should be migrated. Possible values are: Defines how Flowman should handle a mismatch between the types of the actual schema of a relation when reading from it and the types of the schema as defined in the relation. Per default, Flowman ignores any mismatch and simply passes through the types of the actual relation. See [relations](../spec/relation/index.md) for possible options and more details. +- `flowman.default.relation.input.charVarcharPolicy` *(type: string)* *(default:`PAD_AND_TRUUNCATE`)* (since Flowman 0.26.1) + Defines how Flowman will handle `CHAR(n)`/`VARCHAR(n)` data on reading. Per default Flowman will truncate/pad `CHAR(n)` + columns and truncate `VARCHAR(n)` columns. See [relations](../spec/relation/index.md) for possible options and more details. - `flowman.default.relation.output.columnMismatchPolicy` *(type: string)* *(default:`ADD_REMOVE_COLUMNS`)* (since Flowman 0.20.0) Defines how Flowman should handle a mismatch of columns of records being written to a relation and the relations actual defined columns. Per default Flowman will add/remove columns to/from records such that they match the current @@ -116,6 +119,9 @@ Sets the strategy to use how tables should be migrated. Possible values are: Defines how Flowman should handle a mismatch of columns of records being written to a relation and the relations actual defined columns. Per default Flowman will add/remove columns to/from records such that they match the current physical layout. See [relations](../spec/relation/index.md) for possible options and more details. +- `flowman.default.relation.output.charVarcharPolicy` *(type: string)* *(default:`PAD_AND_TRUUNCATE`)* (since Flowman 0.26.1) + Defines how Flowman will handle `CHAR(n)`/`VARCHAR(n)` data when writing. Per default Flowman will truncate/pad `CHAR(n)` + columns and truncate `VARCHAR(n)` columns. See [relations](../spec/relation/index.md) for possible options and more details. ### Target related Properties diff --git a/docs/spec/mapping/schema.md b/docs/spec/mapping/schema.md index b96d82531..4ef49121f 100644 --- a/docs/spec/mapping/schema.md +++ b/docs/spec/mapping/schema.md @@ -23,6 +23,9 @@ mappings: partial_facts: kind: schema input: facts + columnMismatchPolicy: ADD_REMOVE_COLUMNS + typeMismatchPolicy: CAST_ALWAYS + charVarcharPolicy: PAD_AND_TRUNCATE schema: kind: inline fields: @@ -58,6 +61,31 @@ Specifies the list of column names (key) with their type (value) As an alternative of specifying a list of columns you can also directly specify a schema, as described in [schema](../schema/index.md). +* `columnMismatchPolicy` **(optional)** *(type: string)* *(default: `ADD_REMOVE_COLUMNS`)*: +Control how Flowman will handle a mismatch between column names in the source and the provided schema: + - `IGNORE` will simply pass through the input columns unchanged + - `ERROR` will fail the build once a mismatch between actual and requested schema is detected + - `ADD_COLUMNS_OR_IGNORE` will add (`NULL`) columns from the requested schema to the input schema, and will keep columns in the input schema which are not present in the requested schema + - `ADD_COLUMNS_OR_ERROR` will add (`NULL`) columns from the requested schema to the input schema, but will fail the build if the input schema contains columns not present in the requested schema + - `REMOVE_COLUMNS_OR_IGNORE` will remove columns from the input schema which are not present in the requested schema + - `REMOVE_COLUMNS_OR_ERROR` will remove columns from the input schema which are not present in the requested schema and will fail if the input schema is missing requested columns + - `ADD_REMOVE_COLUMNS` will essentially pass through the requested schema as is (the default) + +* `typeMismatchPolicy` **(optional)** *(type: string)* *(default: `CAST_ALWAYS`)*: +Control how Flowman will convert between data types: + - `IGNORE` - Ignores any data type mismatches and does not perform any conversion + - `ERROR` - Throws an error on data type mismatches + - `CAST_COMPATIBLE_OR_ERROR` - Performs a data type conversion with compatible types, otherwise throws an error + - `CAST_COMPATIBLE_OR_IGNORE` - Performs a data type conversion with compatible types, otherwise does not perform conversion + - `CAST_ALWAYS` - Always performs data type conversion (the default) + +* `charVarcharPolicy` **(optional)** *(type: string)* *(default: `PAD_AND_TRUNCATE`)*: +Control how Flowman will treat `VARCHAR(n)` and `CHAR(n)` data types. The possible values are + - `IGNORE` - Do not apply any length restrictions + - `PAD_AND_TRUNCATE` - Truncate `VARCHAR(n)`/`CHAR(n)` strings which are too long and pad `CHAR(n)` strings which are too short + - `PAD` - Pad `CHAR(n)` strings which are too short + - `TRUNCATE` - Truncate `VARCHAR(n)`/`CHAR(n)` strings which are too long + * `filter` **(optional)** *(type: string)* *(default: empty)*: An optional SQL filter expression that is applied *after* schema operation. diff --git a/docs/spec/relation/index.md b/docs/spec/relation/index.md index 5d348721a..2b3dbd29f 100644 --- a/docs/spec/relation/index.md +++ b/docs/spec/relation/index.md @@ -24,8 +24,10 @@ Most relations support implicit schema conversions, which means that The details of these conversions can be controlled via some config variables * `flowman.default.relation.input.columnMismatchPolicy` (default is `IGNORE`) * `flowman.default.relation.input.typeMismatchPolicy` (default is `IGNORE`) +* `flowman.default.relation.input.charVarcharPolicy` (default is `PAD_AND_TRUUNCATE`) * `flowman.default.relation.output.columnMismatchPolicy` (default is `ADD_REMOVE_COLUMNS`) * `flowman.default.relation.output.typeMismatchPolicy` (default is `CAST_ALWAYS`) +* `flowman.default.relation.output.charVarcharPolicy` (default is `PAD_AND_TRUUNCATE`) The schema conversion is implemented using two aspects. The first is a mismatch between column (names). This can be configured using the `columnMismatchPolicy` as follows. Basically the idea is that @@ -70,6 +72,14 @@ follows: | `CAST_ALWAYS` | Source can be safely cast to dest | Target Data Type | | `CAST_ALWAYS` | Source cannot be safely cast to dest | Target Data Type | +The two options `flowman.default.relation.input.charVarcharPolicy` and `flowman.default.relation.output.charVarcharPolicy` +control how Flowman will treat `VARCHAR(n)` and `CHAR(n)` data types. The possible values are +* `IGNORE` - Do not apply any length restrictions +* `PAD_AND_TRUNCATE` - Truncate `VARCHAR(n)`/`CHAR(n)` strings which are too long and pad `CHAR(n)` strings which are too short +* `PAD` - Pad `CHAR(n)` strings which are too short +* `TRUNCATE` - Truncate `VARCHAR(n)`/`CHAR(n)` strings which are too long + + ## Relation Types Flowman directly provides support for the most important data sources, which are also diff --git a/flowman-core/src/main/scala/com/dimajix/flowman/config/FlowmanConf.scala b/flowman-core/src/main/scala/com/dimajix/flowman/config/FlowmanConf.scala index 0ced47940..9401d686e 100644 --- a/flowman-core/src/main/scala/com/dimajix/flowman/config/FlowmanConf.scala +++ b/flowman-core/src/main/scala/com/dimajix/flowman/config/FlowmanConf.scala @@ -32,8 +32,9 @@ import com.dimajix.flowman.execution.SimpleExecutor import com.dimajix.flowman.execution.DependencyScheduler import com.dimajix.flowman.execution.Scheduler import com.dimajix.flowman.model.VerifyPolicy -import com.dimajix.flowman.transforms.ColumnMismatchStrategy -import com.dimajix.flowman.transforms.TypeMismatchStrategy +import com.dimajix.flowman.transforms.ColumnMismatchPolicy +import com.dimajix.flowman.transforms.CharVarcharPolicy +import com.dimajix.flowman.transforms.TypeMismatchPolicy import com.dimajix.spark.features @@ -120,22 +121,30 @@ object FlowmanConf { .stringConf .createWithDefault(MigrationStrategy.ALTER.toString) - val DEFAULT_RELATION_INPUT_COLUMN_MISMATCH_STRATEGY = buildConf("flowman.default.relation.input.columnMismatchPolicy") + val DEFAULT_RELATION_INPUT_COLUMN_MISMATCH_POLICY = buildConf("flowman.default.relation.input.columnMismatchPolicy") .doc("Default strategy to use on schema column mismatch while reading relations. Can be 'ignore', 'error', 'add_columns_or_ignore', 'add_columns_or_error', 'remove_columns_or_ignore', 'remove_columns_or_error', 'add_remove_columns'") .stringConf - .createWithDefault(ColumnMismatchStrategy.IGNORE.toString) - val DEFAULT_RELATION_INPUT_TYPE_MISMATCH_STRATEGY = buildConf("flowman.default.relation.input.typeMismatchPolicy") + .createWithDefault(ColumnMismatchPolicy.IGNORE.toString) + val DEFAULT_RELATION_INPUT_TYPE_MISMATCH_POLICY = buildConf("flowman.default.relation.input.typeMismatchPolicy") .doc("Default strategy to use on schema type mismatch while reading relations. Can be 'ignore', 'error', 'cast_compatible_or_ignore', 'cast_compatible_or_error', 'cast_always'") .stringConf - .createWithDefault(TypeMismatchStrategy.IGNORE.toString) - val DEFAULT_RELATION_OUTPUT_COLUMN_MISMATCH_STRATEGY = buildConf("flowman.default.relation.output.columnMismatchPolicy") + .createWithDefault(TypeMismatchPolicy.IGNORE.toString) + val DEFAULT_RELATION_INPUT_CHAR_VARCHAR_POLICY = buildConf("flowman.default.relation.input.charVarcharPolicy") + .doc("Default strategy to use when reading CHAR(n)/VARCHAR(n) data types. Can be 'ignore', 'pad', 'truncate' or 'pad_and_truncate'") + .stringConf + .createWithDefault(CharVarcharPolicy.PAD_AND_TRUNCATE.toString) + val DEFAULT_RELATION_OUTPUT_COLUMN_MISMATCH_POLICY = buildConf("flowman.default.relation.output.columnMismatchPolicy") .doc("Default strategy to use on schema column mismatch while reading relations. Can be 'ignore', 'error', 'add_columns_or_ignore', 'add_columns_or_error', 'remove_columns_or_ignore', 'remove_columns_or_error', 'add_remove_columns'") .stringConf - .createWithDefault(ColumnMismatchStrategy.ADD_REMOVE_COLUMNS.toString) - val DEFAULT_RELATION_OUTPUT_TYPE_MISMATCH_STRATEGY = buildConf("flowman.default.relation.output.typeMismatchPolicy") + .createWithDefault(ColumnMismatchPolicy.ADD_REMOVE_COLUMNS.toString) + val DEFAULT_RELATION_OUTPUT_TYPE_MISMATCH_POLICY = buildConf("flowman.default.relation.output.typeMismatchPolicy") .doc("Default strategy to use on schema type mismatch while reading relations. Can be 'ignore', 'error', 'cast_compatible_or_ignore', 'cast_compatible_or_error', 'cast_always'") .stringConf - .createWithDefault(TypeMismatchStrategy.CAST_ALWAYS.toString) + .createWithDefault(TypeMismatchPolicy.CAST_ALWAYS.toString) + val DEFAULT_RELATION_OUTPUT_CHAR_VARCHAR_POLICY = buildConf("flowman.default.relation.output.charVarcharPolicy") + .doc("Default strategy to use when writing CHAR(n)/VARCHAR(n) data types. Can be 'ignore', 'pad', 'truncate' or 'pad_and_truncate'") + .stringConf + .createWithDefault(CharVarcharPolicy.PAD_AND_TRUNCATE.toString) val DEFAULT_TARGET_VERIFY_POLICY = buildConf("flowman.default.target.verifyPolicy") .doc("Policy for verifying a target. Accepted verify policies are 'empty_as_success', 'empty_as_failure' and 'empty_as_success_with_errors'.") diff --git a/flowman-core/src/main/scala/com/dimajix/flowman/model/Relation.scala b/flowman-core/src/main/scala/com/dimajix/flowman/model/Relation.scala index f8ecde5dc..f5195c884 100644 --- a/flowman-core/src/main/scala/com/dimajix/flowman/model/Relation.scala +++ b/flowman-core/src/main/scala/com/dimajix/flowman/model/Relation.scala @@ -44,9 +44,10 @@ import com.dimajix.flowman.execution.Operation import com.dimajix.flowman.execution.OutputMode import com.dimajix.flowman.graph.Linker import com.dimajix.flowman.model -import com.dimajix.flowman.transforms.ColumnMismatchStrategy +import com.dimajix.flowman.transforms.ColumnMismatchPolicy import com.dimajix.flowman.transforms.SchemaEnforcer -import com.dimajix.flowman.transforms.TypeMismatchStrategy +import com.dimajix.flowman.transforms.CharVarcharPolicy +import com.dimajix.flowman.transforms.TypeMismatchPolicy import com.dimajix.flowman.types.Field import com.dimajix.flowman.types.FieldValue import com.dimajix.flowman.types.SingleValue @@ -514,8 +515,9 @@ abstract class BaseRelation extends AbstractInstance with Relation { val conf = execution.flowmanConf val enforcer = SchemaEnforcer( schema, - columnMismatchStrategy = ColumnMismatchStrategy.ofString(conf.getConf(FlowmanConf.DEFAULT_RELATION_OUTPUT_COLUMN_MISMATCH_STRATEGY)), - typeMismatchStrategy = TypeMismatchStrategy.ofString(conf.getConf(FlowmanConf.DEFAULT_RELATION_OUTPUT_TYPE_MISMATCH_STRATEGY)) + columnMismatchPolicy = ColumnMismatchPolicy.ofString(conf.getConf(FlowmanConf.DEFAULT_RELATION_OUTPUT_COLUMN_MISMATCH_POLICY)), + typeMismatchPolicy = TypeMismatchPolicy.ofString(conf.getConf(FlowmanConf.DEFAULT_RELATION_OUTPUT_TYPE_MISMATCH_POLICY)), + charVarcharPolicy = CharVarcharPolicy.ofString(conf.getConf(FlowmanConf.DEFAULT_RELATION_OUTPUT_CHAR_VARCHAR_POLICY)) ) enforcer.transform(df) } @@ -537,8 +539,9 @@ abstract class BaseRelation extends AbstractInstance with Relation { val conf = execution.flowmanConf val enforcer = SchemaEnforcer( schema, - columnMismatchStrategy = ColumnMismatchStrategy.ofString(conf.getConf(FlowmanConf.DEFAULT_RELATION_INPUT_COLUMN_MISMATCH_STRATEGY)), - typeMismatchStrategy = TypeMismatchStrategy.ofString(conf.getConf(FlowmanConf.DEFAULT_RELATION_INPUT_TYPE_MISMATCH_STRATEGY)) + columnMismatchPolicy = ColumnMismatchPolicy.ofString(conf.getConf(FlowmanConf.DEFAULT_RELATION_INPUT_COLUMN_MISMATCH_POLICY)), + typeMismatchPolicy = TypeMismatchPolicy.ofString(conf.getConf(FlowmanConf.DEFAULT_RELATION_INPUT_TYPE_MISMATCH_POLICY)), + charVarcharPolicy = CharVarcharPolicy.ofString(conf.getConf(FlowmanConf.DEFAULT_RELATION_INPUT_CHAR_VARCHAR_POLICY)) ) enforcer.transform(df) } diff --git a/flowman-core/src/main/scala/com/dimajix/flowman/transforms/SchemaEnforcer.scala b/flowman-core/src/main/scala/com/dimajix/flowman/transforms/SchemaEnforcer.scala index ced4e46a1..471d8ad77 100644 --- a/flowman-core/src/main/scala/com/dimajix/flowman/transforms/SchemaEnforcer.scala +++ b/flowman-core/src/main/scala/com/dimajix/flowman/transforms/SchemaEnforcer.scala @@ -23,9 +23,11 @@ import org.apache.spark.sql.DataFrame import org.apache.spark.sql.SparkShim import org.apache.spark.sql.functions.col import org.apache.spark.sql.functions.lit +import org.apache.spark.sql.functions.length import org.apache.spark.sql.functions.rpad import org.apache.spark.sql.functions.struct import org.apache.spark.sql.functions.substring +import org.apache.spark.sql.functions.when import org.apache.spark.sql.types.ArrayType import org.apache.spark.sql.types.CharType import org.apache.spark.sql.types.DataType @@ -39,20 +41,21 @@ import org.apache.spark.sql.types.VarcharType import com.dimajix.common.MapIgnoreCase import com.dimajix.flowman.execution.SchemaMismatchException import com.dimajix.flowman.util.SchemaUtils.coerce +import com.dimajix.spark.sql.SchemaUtils.CHAR_VARCHAR_TYPE_STRING_METADATA_KEY import com.dimajix.spark.sql.functions.nullable_struct -sealed abstract class ColumnMismatchStrategy extends Product with Serializable -object ColumnMismatchStrategy { - case object IGNORE extends ColumnMismatchStrategy - case object ERROR extends ColumnMismatchStrategy - case object ADD_COLUMNS_OR_IGNORE extends ColumnMismatchStrategy - case object ADD_COLUMNS_OR_ERROR extends ColumnMismatchStrategy - case object REMOVE_COLUMNS_OR_IGNORE extends ColumnMismatchStrategy - case object REMOVE_COLUMNS_OR_ERROR extends ColumnMismatchStrategy - case object ADD_REMOVE_COLUMNS extends ColumnMismatchStrategy +sealed abstract class ColumnMismatchPolicy extends Product with Serializable +object ColumnMismatchPolicy { + case object IGNORE extends ColumnMismatchPolicy + case object ERROR extends ColumnMismatchPolicy + case object ADD_COLUMNS_OR_IGNORE extends ColumnMismatchPolicy + case object ADD_COLUMNS_OR_ERROR extends ColumnMismatchPolicy + case object REMOVE_COLUMNS_OR_IGNORE extends ColumnMismatchPolicy + case object REMOVE_COLUMNS_OR_ERROR extends ColumnMismatchPolicy + case object ADD_REMOVE_COLUMNS extends ColumnMismatchPolicy - def ofString(mode:String) : ColumnMismatchStrategy = { + def ofString(mode:String) : ColumnMismatchPolicy = { mode.toLowerCase(Locale.ROOT) match { case "ignore" => IGNORE case "error" => ERROR @@ -61,30 +64,49 @@ object ColumnMismatchStrategy { case "remove_columns_or_ignore"|"removecolumnsorignore" => REMOVE_COLUMNS_OR_IGNORE case "remove_columns_or_error"|"removecolumnsorerror" => REMOVE_COLUMNS_OR_ERROR case "add_remove_columns"|"addremovecolumnso" => ADD_REMOVE_COLUMNS - case _ => throw new IllegalArgumentException(s"Unknown column mismatch strategy: '$mode'. " + - "Accepted error strategies are ignore', 'error', 'add_columns_or_ignore', 'add_columns_or_error', 'remove_columns_or_ignore', 'remove_columns_or_error', 'add_remove_columns'.") + case _ => throw new IllegalArgumentException(s"Unknown column mismatch policy: '$mode'. " + + "Accepted error strategies are 'ignore', 'error', 'add_columns_or_ignore', 'add_columns_or_error', 'remove_columns_or_ignore', 'remove_columns_or_error', 'add_remove_columns'.") } } } -sealed abstract class TypeMismatchStrategy extends Product with Serializable -object TypeMismatchStrategy { - case object IGNORE extends TypeMismatchStrategy - case object ERROR extends TypeMismatchStrategy - case object CAST_COMPATIBLE_OR_ERROR extends TypeMismatchStrategy - case object CAST_COMPATIBLE_OR_IGNORE extends TypeMismatchStrategy - case object CAST_ALWAYS extends TypeMismatchStrategy +sealed abstract class TypeMismatchPolicy extends Product with Serializable +object TypeMismatchPolicy { + case object IGNORE extends TypeMismatchPolicy + case object ERROR extends TypeMismatchPolicy + case object CAST_COMPATIBLE_OR_ERROR extends TypeMismatchPolicy + case object CAST_COMPATIBLE_OR_IGNORE extends TypeMismatchPolicy + case object CAST_ALWAYS extends TypeMismatchPolicy - def ofString(mode:String) : TypeMismatchStrategy = { + def ofString(mode:String) : TypeMismatchPolicy = { mode.toLowerCase(Locale.ROOT) match { case "ignore" => IGNORE case "error" => ERROR case "cast_compatible_or_error"|"castcompatibleorerror" => CAST_COMPATIBLE_OR_ERROR case "cast_compatible_or_ignore"|"castcompatibleorignore" => CAST_COMPATIBLE_OR_IGNORE case "cast_always"|"castalways" => CAST_ALWAYS - case _ => throw new IllegalArgumentException(s"Unknown type mismatch strategy: '$mode'. " + - "Accepted error strategies are ignore', 'error', 'cast_compatible_or_error', 'cast_compatible_or_ignore', 'cast_always'.") + case _ => throw new IllegalArgumentException(s"Unknown type mismatch policy: '$mode'. " + + "Accepted error strategies are 'ignore', 'error', 'cast_compatible_or_error', 'cast_compatible_or_ignore', 'cast_always'.") + } + } +} + +sealed abstract class CharVarcharPolicy extends Product with Serializable +object CharVarcharPolicy { + case object PAD extends CharVarcharPolicy + case object TRUNCATE extends CharVarcharPolicy + case object PAD_AND_TRUNCATE extends CharVarcharPolicy + case object IGNORE extends CharVarcharPolicy + + def ofString(mode: String): CharVarcharPolicy = { + mode.toLowerCase(Locale.ROOT) match { + case "ignore" => IGNORE + case "truncate" => TRUNCATE + case "pad" => PAD + case "pad_and_truncate" | "padandtruncate" => PAD_AND_TRUNCATE + case _ => throw new IllegalArgumentException(s"Unknown char/varchar policy: '$mode'. " + + "Accepted error strategies are 'ignore', 'truncate', 'pad', 'pad_and_truncate'.") } } } @@ -92,29 +114,33 @@ object TypeMismatchStrategy { final case class SchemaEnforcer( schema:StructType, - columnMismatchStrategy:ColumnMismatchStrategy=ColumnMismatchStrategy.ADD_REMOVE_COLUMNS, - typeMismatchStrategy:TypeMismatchStrategy=TypeMismatchStrategy.CAST_ALWAYS + columnMismatchPolicy:ColumnMismatchPolicy=ColumnMismatchPolicy.ADD_REMOVE_COLUMNS, + typeMismatchPolicy:TypeMismatchPolicy=TypeMismatchPolicy.CAST_ALWAYS, + charVarcharPolicy:CharVarcharPolicy=CharVarcharPolicy.PAD_AND_TRUNCATE ) { private val addColumns = - columnMismatchStrategy == ColumnMismatchStrategy.ADD_COLUMNS_OR_IGNORE || - columnMismatchStrategy == ColumnMismatchStrategy.ADD_COLUMNS_OR_ERROR || - columnMismatchStrategy == ColumnMismatchStrategy.ADD_REMOVE_COLUMNS + columnMismatchPolicy == ColumnMismatchPolicy.ADD_COLUMNS_OR_IGNORE || + columnMismatchPolicy == ColumnMismatchPolicy.ADD_COLUMNS_OR_ERROR || + columnMismatchPolicy == ColumnMismatchPolicy.ADD_REMOVE_COLUMNS private val removeColumns = - columnMismatchStrategy == ColumnMismatchStrategy.REMOVE_COLUMNS_OR_IGNORE || - columnMismatchStrategy == ColumnMismatchStrategy.REMOVE_COLUMNS_OR_ERROR || - columnMismatchStrategy == ColumnMismatchStrategy.ADD_REMOVE_COLUMNS - private val ignoreColumns = columnMismatchStrategy == ColumnMismatchStrategy.IGNORE || - columnMismatchStrategy == ColumnMismatchStrategy.REMOVE_COLUMNS_OR_IGNORE || - columnMismatchStrategy == ColumnMismatchStrategy.ADD_COLUMNS_OR_IGNORE + columnMismatchPolicy == ColumnMismatchPolicy.REMOVE_COLUMNS_OR_IGNORE || + columnMismatchPolicy == ColumnMismatchPolicy.REMOVE_COLUMNS_OR_ERROR || + columnMismatchPolicy == ColumnMismatchPolicy.ADD_REMOVE_COLUMNS + private val ignoreColumns = columnMismatchPolicy == ColumnMismatchPolicy.IGNORE || + columnMismatchPolicy == ColumnMismatchPolicy.REMOVE_COLUMNS_OR_IGNORE || + columnMismatchPolicy == ColumnMismatchPolicy.ADD_COLUMNS_OR_IGNORE private val castCompatibleTypes = - typeMismatchStrategy == TypeMismatchStrategy.CAST_COMPATIBLE_OR_ERROR || - typeMismatchStrategy == TypeMismatchStrategy.CAST_COMPATIBLE_OR_IGNORE + typeMismatchPolicy == TypeMismatchPolicy.CAST_COMPATIBLE_OR_ERROR || + typeMismatchPolicy == TypeMismatchPolicy.CAST_COMPATIBLE_OR_IGNORE private val castAlways = - typeMismatchStrategy == TypeMismatchStrategy.CAST_ALWAYS + typeMismatchPolicy == TypeMismatchPolicy.CAST_ALWAYS private val ignoreTypes = - typeMismatchStrategy == TypeMismatchStrategy.IGNORE || - typeMismatchStrategy == TypeMismatchStrategy.CAST_COMPATIBLE_OR_IGNORE + typeMismatchPolicy == TypeMismatchPolicy.IGNORE || + typeMismatchPolicy == TypeMismatchPolicy.CAST_COMPATIBLE_OR_IGNORE + + private val pad = charVarcharPolicy == CharVarcharPolicy.PAD || charVarcharPolicy == CharVarcharPolicy.PAD_AND_TRUNCATE + private val truncate = charVarcharPolicy == CharVarcharPolicy.TRUNCATE || charVarcharPolicy == CharVarcharPolicy.PAD_AND_TRUNCATE /** * Helper method for conforming a given schema to a target schema. This will project the given schema and also @@ -123,7 +149,7 @@ final case class SchemaEnforcer( * @return */ def transform(inputSchema:StructType) : Seq[Column] = { - if (columnMismatchStrategy != ColumnMismatchStrategy.IGNORE || typeMismatchStrategy != TypeMismatchStrategy.IGNORE) { + if (columnMismatchPolicy != ColumnMismatchPolicy.IGNORE || typeMismatchPolicy != TypeMismatchPolicy.IGNORE) { conformStruct(schema, inputSchema, "") } else { @@ -138,7 +164,7 @@ final case class SchemaEnforcer( * @return */ def transform(df:DataFrame) : DataFrame = { - if (columnMismatchStrategy != ColumnMismatchStrategy.IGNORE || typeMismatchStrategy != TypeMismatchStrategy.IGNORE) { + if (columnMismatchPolicy != ColumnMismatchPolicy.IGNORE || typeMismatchPolicy != TypeMismatchPolicy.IGNORE) { val unifiedColumns = transform(df.schema) df.select(unifiedColumns: _*) } @@ -150,8 +176,21 @@ final case class SchemaEnforcer( private def applyType(col:Column, field:StructField) : Column = { field.dataType match { - case CharType(n) => rpad(col.cast(StringType), n, " ") - case VarcharType(n) => substring(col.cast(StringType), 0, n) + case CharType(n) => + if (pad && truncate) + rpad(col.cast(StringType), n, " ") + else if (pad) + when(length(col.cast(StringType)) > lit(n), col.cast(StringType)) + .otherwise(rpad(col.cast(StringType), n, " ")) + else if (truncate) + substring(col.cast(StringType), 0, n) + else + col.cast(StringType) + case VarcharType(n) => + if (truncate) + substring(col.cast(StringType), 0, n) + else + col.cast(StringType) case _ => col.cast(field.dataType) } } @@ -241,6 +280,15 @@ final case class SchemaEnforcer( .foreach(c => builder.putString("comment", c)) } + // Preserve extended string type info + requiredField.dataType match { + case VarcharType(_) if requiredField.dataType != inputField.dataType => + builder.putString(CHAR_VARCHAR_TYPE_STRING_METADATA_KEY, requiredField.dataType.catalogString) + case CharType(_) if requiredField.dataType != inputField.dataType => + builder.putString(CHAR_VARCHAR_TYPE_STRING_METADATA_KEY, requiredField.dataType.catalogString) + case _ => + } + builder.build() } diff --git a/flowman-core/src/test/scala/com/dimajix/flowman/transforms/SchemaEnforcerTest.scala b/flowman-core/src/test/scala/com/dimajix/flowman/transforms/SchemaEnforcerTest.scala index 6fb4b170f..7c0fa0c03 100644 --- a/flowman-core/src/test/scala/com/dimajix/flowman/transforms/SchemaEnforcerTest.scala +++ b/flowman-core/src/test/scala/com/dimajix/flowman/transforms/SchemaEnforcerTest.scala @@ -38,65 +38,65 @@ import com.dimajix.spark.testing.LocalSparkSession class SchemaEnforcerTest extends AnyFlatSpec with Matchers with LocalSparkSession { - "The ColumnMismatchStrategy" should "parse correctly" in { - ColumnMismatchStrategy.ofString("IGNORE") should be (ColumnMismatchStrategy.IGNORE) - ColumnMismatchStrategy.ofString("ignore") should be (ColumnMismatchStrategy.IGNORE) - ColumnMismatchStrategy.ofString("ERROR") should be (ColumnMismatchStrategy.ERROR) - ColumnMismatchStrategy.ofString("ADD_COLUMNS_OR_IGNORE") should be (ColumnMismatchStrategy.ADD_COLUMNS_OR_IGNORE) - ColumnMismatchStrategy.ofString("ADD_COLUMNS_OR_ERROR") should be (ColumnMismatchStrategy.ADD_COLUMNS_OR_ERROR) - ColumnMismatchStrategy.ofString("REMOVE_COLUMNS_OR_IGNORE") should be (ColumnMismatchStrategy.REMOVE_COLUMNS_OR_IGNORE) - ColumnMismatchStrategy.ofString("REMOVE_COLUMNS_OR_ERROR") should be (ColumnMismatchStrategy.REMOVE_COLUMNS_OR_ERROR) - ColumnMismatchStrategy.ofString("ADD_REMOVE_COLUMNS") should be (ColumnMismatchStrategy.ADD_REMOVE_COLUMNS) - a[NullPointerException] shouldBe thrownBy(ColumnMismatchStrategy.ofString(null)) - an[IllegalArgumentException] shouldBe thrownBy(ColumnMismatchStrategy.ofString("NO_SUCH_MODE")) + "The ColumnMismatchPolicy" should "parse correctly" in { + ColumnMismatchPolicy.ofString("IGNORE") should be (ColumnMismatchPolicy.IGNORE) + ColumnMismatchPolicy.ofString("ignore") should be (ColumnMismatchPolicy.IGNORE) + ColumnMismatchPolicy.ofString("ERROR") should be (ColumnMismatchPolicy.ERROR) + ColumnMismatchPolicy.ofString("ADD_COLUMNS_OR_IGNORE") should be (ColumnMismatchPolicy.ADD_COLUMNS_OR_IGNORE) + ColumnMismatchPolicy.ofString("ADD_COLUMNS_OR_ERROR") should be (ColumnMismatchPolicy.ADD_COLUMNS_OR_ERROR) + ColumnMismatchPolicy.ofString("REMOVE_COLUMNS_OR_IGNORE") should be (ColumnMismatchPolicy.REMOVE_COLUMNS_OR_IGNORE) + ColumnMismatchPolicy.ofString("REMOVE_COLUMNS_OR_ERROR") should be (ColumnMismatchPolicy.REMOVE_COLUMNS_OR_ERROR) + ColumnMismatchPolicy.ofString("ADD_REMOVE_COLUMNS") should be (ColumnMismatchPolicy.ADD_REMOVE_COLUMNS) + a[NullPointerException] shouldBe thrownBy(ColumnMismatchPolicy.ofString(null)) + an[IllegalArgumentException] shouldBe thrownBy(ColumnMismatchPolicy.ofString("NO_SUCH_MODE")) } it should "provide a toString method" in { - ColumnMismatchStrategy.IGNORE.toString should be ("IGNORE") - ColumnMismatchStrategy.ERROR.toString should be ("ERROR") - ColumnMismatchStrategy.ADD_COLUMNS_OR_IGNORE.toString should be ("ADD_COLUMNS_OR_IGNORE") - ColumnMismatchStrategy.ADD_COLUMNS_OR_ERROR.toString should be ("ADD_COLUMNS_OR_ERROR") - ColumnMismatchStrategy.REMOVE_COLUMNS_OR_IGNORE.toString should be ("REMOVE_COLUMNS_OR_IGNORE") - ColumnMismatchStrategy.REMOVE_COLUMNS_OR_ERROR.toString should be ("REMOVE_COLUMNS_OR_ERROR") - ColumnMismatchStrategy.ADD_REMOVE_COLUMNS.toString should be ("ADD_REMOVE_COLUMNS") + ColumnMismatchPolicy.IGNORE.toString should be ("IGNORE") + ColumnMismatchPolicy.ERROR.toString should be ("ERROR") + ColumnMismatchPolicy.ADD_COLUMNS_OR_IGNORE.toString should be ("ADD_COLUMNS_OR_IGNORE") + ColumnMismatchPolicy.ADD_COLUMNS_OR_ERROR.toString should be ("ADD_COLUMNS_OR_ERROR") + ColumnMismatchPolicy.REMOVE_COLUMNS_OR_IGNORE.toString should be ("REMOVE_COLUMNS_OR_IGNORE") + ColumnMismatchPolicy.REMOVE_COLUMNS_OR_ERROR.toString should be ("REMOVE_COLUMNS_OR_ERROR") + ColumnMismatchPolicy.ADD_REMOVE_COLUMNS.toString should be ("ADD_REMOVE_COLUMNS") } it should "parse toString correctly" in { - ColumnMismatchStrategy.ofString(ColumnMismatchStrategy.IGNORE.toString) should be (ColumnMismatchStrategy.IGNORE) - ColumnMismatchStrategy.ofString(ColumnMismatchStrategy.ERROR.toString) should be (ColumnMismatchStrategy.ERROR) - ColumnMismatchStrategy.ofString(ColumnMismatchStrategy.ADD_COLUMNS_OR_IGNORE.toString) should be (ColumnMismatchStrategy.ADD_COLUMNS_OR_IGNORE) - ColumnMismatchStrategy.ofString(ColumnMismatchStrategy.ADD_COLUMNS_OR_ERROR.toString) should be (ColumnMismatchStrategy.ADD_COLUMNS_OR_ERROR) - ColumnMismatchStrategy.ofString(ColumnMismatchStrategy.REMOVE_COLUMNS_OR_IGNORE.toString) should be (ColumnMismatchStrategy.REMOVE_COLUMNS_OR_IGNORE) - ColumnMismatchStrategy.ofString(ColumnMismatchStrategy.REMOVE_COLUMNS_OR_ERROR.toString) should be (ColumnMismatchStrategy.REMOVE_COLUMNS_OR_ERROR) - ColumnMismatchStrategy.ofString(ColumnMismatchStrategy.ADD_REMOVE_COLUMNS.toString) should be (ColumnMismatchStrategy.ADD_REMOVE_COLUMNS) + ColumnMismatchPolicy.ofString(ColumnMismatchPolicy.IGNORE.toString) should be (ColumnMismatchPolicy.IGNORE) + ColumnMismatchPolicy.ofString(ColumnMismatchPolicy.ERROR.toString) should be (ColumnMismatchPolicy.ERROR) + ColumnMismatchPolicy.ofString(ColumnMismatchPolicy.ADD_COLUMNS_OR_IGNORE.toString) should be (ColumnMismatchPolicy.ADD_COLUMNS_OR_IGNORE) + ColumnMismatchPolicy.ofString(ColumnMismatchPolicy.ADD_COLUMNS_OR_ERROR.toString) should be (ColumnMismatchPolicy.ADD_COLUMNS_OR_ERROR) + ColumnMismatchPolicy.ofString(ColumnMismatchPolicy.REMOVE_COLUMNS_OR_IGNORE.toString) should be (ColumnMismatchPolicy.REMOVE_COLUMNS_OR_IGNORE) + ColumnMismatchPolicy.ofString(ColumnMismatchPolicy.REMOVE_COLUMNS_OR_ERROR.toString) should be (ColumnMismatchPolicy.REMOVE_COLUMNS_OR_ERROR) + ColumnMismatchPolicy.ofString(ColumnMismatchPolicy.ADD_REMOVE_COLUMNS.toString) should be (ColumnMismatchPolicy.ADD_REMOVE_COLUMNS) } - "The TypeMismatchStrategy" should "parse correctly" in { - TypeMismatchStrategy.ofString("IGNORE") should be (TypeMismatchStrategy.IGNORE) - TypeMismatchStrategy.ofString("ignore") should be (TypeMismatchStrategy.IGNORE) - TypeMismatchStrategy.ofString("ERROR") should be (TypeMismatchStrategy.ERROR) - TypeMismatchStrategy.ofString("CAST_COMPATIBLE_OR_ERROR") should be (TypeMismatchStrategy.CAST_COMPATIBLE_OR_ERROR) - TypeMismatchStrategy.ofString("CAST_COMPATIBLE_OR_IGNORE") should be (TypeMismatchStrategy.CAST_COMPATIBLE_OR_IGNORE) - TypeMismatchStrategy.ofString("CAST_ALWAYS") should be (TypeMismatchStrategy.CAST_ALWAYS) - a[NullPointerException] shouldBe thrownBy(TypeMismatchStrategy.ofString(null)) - an[IllegalArgumentException] shouldBe thrownBy(TypeMismatchStrategy.ofString("NO_SUCH_MODE")) + "The TypeMismatchPolicy" should "parse correctly" in { + TypeMismatchPolicy.ofString("IGNORE") should be (TypeMismatchPolicy.IGNORE) + TypeMismatchPolicy.ofString("ignore") should be (TypeMismatchPolicy.IGNORE) + TypeMismatchPolicy.ofString("ERROR") should be (TypeMismatchPolicy.ERROR) + TypeMismatchPolicy.ofString("CAST_COMPATIBLE_OR_ERROR") should be (TypeMismatchPolicy.CAST_COMPATIBLE_OR_ERROR) + TypeMismatchPolicy.ofString("CAST_COMPATIBLE_OR_IGNORE") should be (TypeMismatchPolicy.CAST_COMPATIBLE_OR_IGNORE) + TypeMismatchPolicy.ofString("CAST_ALWAYS") should be (TypeMismatchPolicy.CAST_ALWAYS) + a[NullPointerException] shouldBe thrownBy(TypeMismatchPolicy.ofString(null)) + an[IllegalArgumentException] shouldBe thrownBy(TypeMismatchPolicy.ofString("NO_SUCH_MODE")) } it should "provide a toString method" in { - TypeMismatchStrategy.IGNORE.toString should be ("IGNORE") - TypeMismatchStrategy.ERROR.toString should be ("ERROR") - TypeMismatchStrategy.CAST_COMPATIBLE_OR_ERROR.toString should be ("CAST_COMPATIBLE_OR_ERROR") - TypeMismatchStrategy.CAST_COMPATIBLE_OR_IGNORE.toString should be ("CAST_COMPATIBLE_OR_IGNORE") - TypeMismatchStrategy.CAST_ALWAYS.toString should be ("CAST_ALWAYS") + TypeMismatchPolicy.IGNORE.toString should be ("IGNORE") + TypeMismatchPolicy.ERROR.toString should be ("ERROR") + TypeMismatchPolicy.CAST_COMPATIBLE_OR_ERROR.toString should be ("CAST_COMPATIBLE_OR_ERROR") + TypeMismatchPolicy.CAST_COMPATIBLE_OR_IGNORE.toString should be ("CAST_COMPATIBLE_OR_IGNORE") + TypeMismatchPolicy.CAST_ALWAYS.toString should be ("CAST_ALWAYS") } it should "parse toString correctly" in { - TypeMismatchStrategy.ofString(TypeMismatchStrategy.IGNORE.toString) should be (TypeMismatchStrategy.IGNORE) - TypeMismatchStrategy.ofString(TypeMismatchStrategy.ERROR.toString) should be (TypeMismatchStrategy.ERROR) - TypeMismatchStrategy.ofString(TypeMismatchStrategy.CAST_COMPATIBLE_OR_ERROR.toString) should be (TypeMismatchStrategy.CAST_COMPATIBLE_OR_ERROR) - TypeMismatchStrategy.ofString(TypeMismatchStrategy.CAST_COMPATIBLE_OR_IGNORE.toString) should be (TypeMismatchStrategy.CAST_COMPATIBLE_OR_IGNORE) - TypeMismatchStrategy.ofString(TypeMismatchStrategy.CAST_ALWAYS.toString) should be (TypeMismatchStrategy.CAST_ALWAYS) + TypeMismatchPolicy.ofString(TypeMismatchPolicy.IGNORE.toString) should be (TypeMismatchPolicy.IGNORE) + TypeMismatchPolicy.ofString(TypeMismatchPolicy.ERROR.toString) should be (TypeMismatchPolicy.ERROR) + TypeMismatchPolicy.ofString(TypeMismatchPolicy.CAST_COMPATIBLE_OR_ERROR.toString) should be (TypeMismatchPolicy.CAST_COMPATIBLE_OR_ERROR) + TypeMismatchPolicy.ofString(TypeMismatchPolicy.CAST_COMPATIBLE_OR_IGNORE.toString) should be (TypeMismatchPolicy.CAST_COMPATIBLE_OR_IGNORE) + TypeMismatchPolicy.ofString(TypeMismatchPolicy.CAST_ALWAYS.toString) should be (TypeMismatchPolicy.CAST_ALWAYS) } @@ -123,7 +123,7 @@ class SchemaEnforcerTest extends AnyFlatSpec with Matchers with LocalSparkSessio ))) } - it should "support ColumnMismatchStrategy.ADD_REMOVE_COLUMNS" in { + it should "support ColumnMismatchPolicy.ADD_REMOVE_COLUMNS" in { val requestedSchema = StructType(Seq( StructField("col2", StringType), StructField("col1", StringType), @@ -131,8 +131,8 @@ class SchemaEnforcerTest extends AnyFlatSpec with Matchers with LocalSparkSessio )) val xfs = SchemaEnforcer( requestedSchema, - columnMismatchStrategy=ColumnMismatchStrategy.ADD_REMOVE_COLUMNS, - typeMismatchStrategy=TypeMismatchStrategy.CAST_ALWAYS + columnMismatchPolicy=ColumnMismatchPolicy.ADD_REMOVE_COLUMNS, + typeMismatchPolicy=TypeMismatchPolicy.CAST_ALWAYS ) val inputSchema = StructType(Seq( @@ -148,7 +148,7 @@ class SchemaEnforcerTest extends AnyFlatSpec with Matchers with LocalSparkSessio StructField("col4", IntegerType) ))) } - it should "support ColumnMismatchStrategy.IGNORE" in { + it should "support ColumnMismatchPolicy.IGNORE" in { val requestedSchema = StructType(Seq( StructField("col2", StringType), StructField("col1", StringType), @@ -156,8 +156,8 @@ class SchemaEnforcerTest extends AnyFlatSpec with Matchers with LocalSparkSessio )) val xfs = SchemaEnforcer( requestedSchema, - columnMismatchStrategy=ColumnMismatchStrategy.IGNORE, - typeMismatchStrategy=TypeMismatchStrategy.CAST_ALWAYS + columnMismatchPolicy=ColumnMismatchPolicy.IGNORE, + typeMismatchPolicy=TypeMismatchPolicy.CAST_ALWAYS ) val inputSchema = StructType(Seq( @@ -173,7 +173,7 @@ class SchemaEnforcerTest extends AnyFlatSpec with Matchers with LocalSparkSessio StructField("col3", IntegerType) ))) } - it should "support ColumnMismatchStrategy.ERROR (1)" in { + it should "support ColumnMismatchPolicy.ERROR (1)" in { val requestedSchema = StructType(Seq( StructField("col2", StringType), StructField("col1", StringType), @@ -181,8 +181,8 @@ class SchemaEnforcerTest extends AnyFlatSpec with Matchers with LocalSparkSessio )) val xfs = SchemaEnforcer( requestedSchema, - columnMismatchStrategy=ColumnMismatchStrategy.ERROR, - typeMismatchStrategy=TypeMismatchStrategy.CAST_ALWAYS + columnMismatchPolicy=ColumnMismatchPolicy.ERROR, + typeMismatchPolicy=TypeMismatchPolicy.CAST_ALWAYS ) val inputSchema = StructType(Seq( @@ -198,7 +198,7 @@ class SchemaEnforcerTest extends AnyFlatSpec with Matchers with LocalSparkSessio StructField("col4", IntegerType) ))) } - it should "support ColumnMismatchStrategy.ERROR (2)" in { + it should "support ColumnMismatchPolicy.ERROR (2)" in { val requestedSchema = StructType(Seq( StructField("col2", StringType), StructField("col1", StringType), @@ -206,8 +206,8 @@ class SchemaEnforcerTest extends AnyFlatSpec with Matchers with LocalSparkSessio )) val xfs = SchemaEnforcer( requestedSchema, - columnMismatchStrategy=ColumnMismatchStrategy.ERROR, - typeMismatchStrategy=TypeMismatchStrategy.CAST_ALWAYS + columnMismatchPolicy=ColumnMismatchPolicy.ERROR, + typeMismatchPolicy=TypeMismatchPolicy.CAST_ALWAYS ) val inputSchema = StructType(Seq( @@ -219,7 +219,7 @@ class SchemaEnforcerTest extends AnyFlatSpec with Matchers with LocalSparkSessio val inputDf = spark.createDataFrame(spark.sparkContext.emptyRDD[Row], inputSchema) a[SchemaMismatchException] should be thrownBy(xfs.transform(inputDf)) } - it should "support ColumnMismatchStrategy.ERROR (3)" in { + it should "support ColumnMismatchPolicy.ERROR (3)" in { val requestedSchema = StructType(Seq( StructField("col2", StringType), StructField("col1", StringType), @@ -227,8 +227,8 @@ class SchemaEnforcerTest extends AnyFlatSpec with Matchers with LocalSparkSessio )) val xfs = SchemaEnforcer( requestedSchema, - columnMismatchStrategy=ColumnMismatchStrategy.ERROR, - typeMismatchStrategy=TypeMismatchStrategy.CAST_ALWAYS + columnMismatchPolicy=ColumnMismatchPolicy.ERROR, + typeMismatchPolicy=TypeMismatchPolicy.CAST_ALWAYS ) val inputSchema = StructType(Seq( @@ -239,7 +239,7 @@ class SchemaEnforcerTest extends AnyFlatSpec with Matchers with LocalSparkSessio a[SchemaMismatchException] should be thrownBy(xfs.transform(inputDf)) } - it should "support ColumnMismatchStrategy.ADD_COLUMNS_OR_IGNORE (1)" in { + it should "support ColumnMismatchPolicy.ADD_COLUMNS_OR_IGNORE (1)" in { val requestedSchema = StructType(Seq( StructField("col2", StringType), StructField("col1", StringType), @@ -248,8 +248,8 @@ class SchemaEnforcerTest extends AnyFlatSpec with Matchers with LocalSparkSessio )) val xfs = SchemaEnforcer( requestedSchema, - columnMismatchStrategy=ColumnMismatchStrategy.ADD_COLUMNS_OR_IGNORE, - typeMismatchStrategy=TypeMismatchStrategy.CAST_ALWAYS + columnMismatchPolicy=ColumnMismatchPolicy.ADD_COLUMNS_OR_IGNORE, + typeMismatchPolicy=TypeMismatchPolicy.CAST_ALWAYS ) val inputSchema = StructType(Seq( @@ -266,7 +266,7 @@ class SchemaEnforcerTest extends AnyFlatSpec with Matchers with LocalSparkSessio StructField("col4", IntegerType) ))) } - it should "support ColumnMismatchStrategy.ADD_COLUMNS_OR_IGNORE (2)" in { + it should "support ColumnMismatchPolicy.ADD_COLUMNS_OR_IGNORE (2)" in { val requestedSchema = StructType(Seq( StructField("col2", StringType), StructField("col1", StringType), @@ -274,8 +274,8 @@ class SchemaEnforcerTest extends AnyFlatSpec with Matchers with LocalSparkSessio )) val xfs = SchemaEnforcer( requestedSchema, - columnMismatchStrategy=ColumnMismatchStrategy.ADD_COLUMNS_OR_IGNORE, - typeMismatchStrategy=TypeMismatchStrategy.CAST_ALWAYS + columnMismatchPolicy=ColumnMismatchPolicy.ADD_COLUMNS_OR_IGNORE, + typeMismatchPolicy=TypeMismatchPolicy.CAST_ALWAYS ) val inputSchema = StructType(Seq( @@ -293,7 +293,7 @@ class SchemaEnforcerTest extends AnyFlatSpec with Matchers with LocalSparkSessio ))) } - it should "support ColumnMismatchStrategy.ADD_COLUMNS_OR_ERROR (1)" in { + it should "support ColumnMismatchPolicy.ADD_COLUMNS_OR_ERROR (1)" in { val requestedSchema = StructType(Seq( StructField("col2", StringType), StructField("col1", StringType), @@ -302,8 +302,8 @@ class SchemaEnforcerTest extends AnyFlatSpec with Matchers with LocalSparkSessio )) val xfs = SchemaEnforcer( requestedSchema, - columnMismatchStrategy=ColumnMismatchStrategy.ADD_COLUMNS_OR_ERROR, - typeMismatchStrategy=TypeMismatchStrategy.CAST_ALWAYS + columnMismatchPolicy=ColumnMismatchPolicy.ADD_COLUMNS_OR_ERROR, + typeMismatchPolicy=TypeMismatchPolicy.CAST_ALWAYS ) val inputSchema = StructType(Seq( @@ -320,7 +320,7 @@ class SchemaEnforcerTest extends AnyFlatSpec with Matchers with LocalSparkSessio StructField("col4", IntegerType) ))) } - it should "support ColumnMismatchStrategy.ADD_COLUMNS_OR_ERROR (2)" in { + it should "support ColumnMismatchPolicy.ADD_COLUMNS_OR_ERROR (2)" in { val requestedSchema = StructType(Seq( StructField("col2", StringType), StructField("col1", StringType), @@ -328,8 +328,8 @@ class SchemaEnforcerTest extends AnyFlatSpec with Matchers with LocalSparkSessio )) val xfs = SchemaEnforcer( requestedSchema, - columnMismatchStrategy=ColumnMismatchStrategy.ADD_COLUMNS_OR_ERROR, - typeMismatchStrategy=TypeMismatchStrategy.CAST_ALWAYS + columnMismatchPolicy=ColumnMismatchPolicy.ADD_COLUMNS_OR_ERROR, + typeMismatchPolicy=TypeMismatchPolicy.CAST_ALWAYS ) val inputSchema = StructType(Seq( @@ -341,7 +341,7 @@ class SchemaEnforcerTest extends AnyFlatSpec with Matchers with LocalSparkSessio a[SchemaMismatchException] should be thrownBy(xfs.transform(inputDf)) } - it should "support ColumnMismatchStrategy.REMOVE_COLUMNS_OR_IGNORE (1)" in { + it should "support ColumnMismatchPolicy.REMOVE_COLUMNS_OR_IGNORE (1)" in { val requestedSchema = StructType(Seq( StructField("col2", StringType), StructField("col1", StringType), @@ -350,8 +350,8 @@ class SchemaEnforcerTest extends AnyFlatSpec with Matchers with LocalSparkSessio )) val xfs = SchemaEnforcer( requestedSchema, - columnMismatchStrategy=ColumnMismatchStrategy.REMOVE_COLUMNS_OR_IGNORE, - typeMismatchStrategy=TypeMismatchStrategy.CAST_ALWAYS + columnMismatchPolicy=ColumnMismatchPolicy.REMOVE_COLUMNS_OR_IGNORE, + typeMismatchPolicy=TypeMismatchPolicy.CAST_ALWAYS ) val inputSchema = StructType(Seq( @@ -367,7 +367,7 @@ class SchemaEnforcerTest extends AnyFlatSpec with Matchers with LocalSparkSessio StructField("col4", IntegerType) ))) } - it should "support ColumnMismatchStrategy.REMOVE_COLUMNS_OR_IGNORE (2)" in { + it should "support ColumnMismatchPolicy.REMOVE_COLUMNS_OR_IGNORE (2)" in { val requestedSchema = StructType(Seq( StructField("col2", StringType), StructField("col1", StringType), @@ -375,8 +375,8 @@ class SchemaEnforcerTest extends AnyFlatSpec with Matchers with LocalSparkSessio )) val xfs = SchemaEnforcer( requestedSchema, - columnMismatchStrategy=ColumnMismatchStrategy.REMOVE_COLUMNS_OR_IGNORE, - typeMismatchStrategy=TypeMismatchStrategy.CAST_ALWAYS + columnMismatchPolicy=ColumnMismatchPolicy.REMOVE_COLUMNS_OR_IGNORE, + typeMismatchPolicy=TypeMismatchPolicy.CAST_ALWAYS ) val inputSchema = StructType(Seq( @@ -394,7 +394,7 @@ class SchemaEnforcerTest extends AnyFlatSpec with Matchers with LocalSparkSessio ))) } - it should "support ColumnMismatchStrategy.REMOVE_COLUMNS_OR_ERROR (1)" in { + it should "support ColumnMismatchPolicy.REMOVE_COLUMNS_OR_ERROR (1)" in { val requestedSchema = StructType(Seq( StructField("col2", StringType), StructField("col1", StringType), @@ -402,8 +402,8 @@ class SchemaEnforcerTest extends AnyFlatSpec with Matchers with LocalSparkSessio )) val xfs = SchemaEnforcer( requestedSchema, - columnMismatchStrategy=ColumnMismatchStrategy.REMOVE_COLUMNS_OR_ERROR, - typeMismatchStrategy=TypeMismatchStrategy.CAST_ALWAYS + columnMismatchPolicy=ColumnMismatchPolicy.REMOVE_COLUMNS_OR_ERROR, + typeMismatchPolicy=TypeMismatchPolicy.CAST_ALWAYS ) val inputSchema = StructType(Seq( @@ -420,7 +420,7 @@ class SchemaEnforcerTest extends AnyFlatSpec with Matchers with LocalSparkSessio StructField("col3", StringType) ))) } - it should "support ColumnMismatchStrategy.REMOVE_COLUMNS_OR_ERROR (2)" in { + it should "support ColumnMismatchPolicy.REMOVE_COLUMNS_OR_ERROR (2)" in { val requestedSchema = StructType(Seq( StructField("col2", StringType), StructField("col1", StringType), @@ -428,8 +428,8 @@ class SchemaEnforcerTest extends AnyFlatSpec with Matchers with LocalSparkSessio )) val xfs = SchemaEnforcer( requestedSchema, - columnMismatchStrategy=ColumnMismatchStrategy.REMOVE_COLUMNS_OR_ERROR, - typeMismatchStrategy=TypeMismatchStrategy.CAST_ALWAYS + columnMismatchPolicy=ColumnMismatchPolicy.REMOVE_COLUMNS_OR_ERROR, + typeMismatchPolicy=TypeMismatchPolicy.CAST_ALWAYS ) val inputSchema = StructType(Seq( @@ -516,9 +516,12 @@ class SchemaEnforcerTest extends AnyFlatSpec with Matchers with LocalSparkSessio } "The SchemaEnforcer" should "support extended string attributes" in { + val spark = this.spark + import spark.implicits._ + val inputDf = spark.createDataFrame(Seq( - ("col1", "12"), - ("col2", "23") + ("col", "1"), + ("col123", "2345") )) .withColumn("_1", col("_1").as("_1")) .withColumn("_2", col("_2").as("_2")) @@ -526,26 +529,96 @@ class SchemaEnforcerTest extends AnyFlatSpec with Matchers with LocalSparkSessio val inputSchema = inputDf.schema val requestedSchema = com.dimajix.flowman.types.StructType(Seq( - Field("_1", FieldType.of("varchar(10)")), - Field("_2", FieldType.of("char(20)")), + Field("_1", FieldType.of("varchar(4)")), + Field("_2", FieldType.of("char(2)")), Field("_3", FieldType.of("string")) )) - val xfs = SchemaEnforcer(requestedSchema.sparkType) + val xfs = SchemaEnforcer(requestedSchema.catalogType) val columns = xfs.transform(inputSchema) val outputDf = inputDf.select(columns:_*) + val recs = outputDf.orderBy(col("_1"), col("_2")).as[(String, String, Option[String])].collect() + recs should be(Seq( + ("col", "1 ", None), + ("col1", "23", None) + )) + SchemaUtils.dropMetadata(outputDf.schema) should be (StructType(Seq( StructField("_1", StringType), StructField("_2", StringType), StructField("_3", StringType) ))) com.dimajix.flowman.types.StructType.of(outputDf.schema) should be (com.dimajix.flowman.types.StructType(Seq( - Field("_1", FieldType.of("varchar(10)")), - Field("_2", FieldType.of("char(20)")), + Field("_1", FieldType.of("varchar(4)")), + Field("_2", FieldType.of("char(2)")), Field("_3", FieldType.of("string")) ))) } + it should "work with different StringCastStrategies" in { + val spark = this.spark + import spark.implicits._ + + val inputDf = spark.createDataFrame(Seq( + ("col", "1"), + ("col123", "2345") + )) + .withColumn("_1", col("_1").as("_1")) + .withColumn("_2", col("_2").as("_2")) + .withColumn("_5", col("_2").as("_2")) + val inputSchema = inputDf.schema + + val requestedSchema = com.dimajix.flowman.types.StructType(Seq( + Field("_1", FieldType.of("varchar(4)")), + Field("_2", FieldType.of("char(2)")), + Field("_3", FieldType.of("string")) + )) + + { + val xfs = SchemaEnforcer(requestedSchema.catalogType, charVarcharPolicy = CharVarcharPolicy.PAD_AND_TRUNCATE) + val columns = xfs.transform(inputSchema) + val outputDf = inputDf.select(columns: _*) + val recs = outputDf.orderBy(col("_1"), col("_2")).as[(String, String, Option[String])].collect() + recs should be(Seq( + ("col", "1 ", None), + ("col1", "23", None) + )) + } + + { + val xfs = SchemaEnforcer(requestedSchema.catalogType, charVarcharPolicy = CharVarcharPolicy.PAD) + val columns = xfs.transform(inputSchema) + val outputDf = inputDf.select(columns: _*) + val recs = outputDf.orderBy(col("_1"), col("_2")).as[(String, String, Option[String])].collect() + recs should be(Seq( + ("col", "1 ", None), + ("col123", "2345", None) + )) + } + + { + val xfs = SchemaEnforcer(requestedSchema.catalogType, charVarcharPolicy = CharVarcharPolicy.TRUNCATE) + val columns = xfs.transform(inputSchema) + val outputDf = inputDf.select(columns: _*) + val recs = outputDf.orderBy(col("_1"), col("_2")).as[(String, String, Option[String])].collect() + recs should be(Seq( + ("col", "1", None), + ("col1", "23", None) + )) + } + + { + val xfs = SchemaEnforcer(requestedSchema.catalogType, charVarcharPolicy = CharVarcharPolicy.IGNORE) + val columns = xfs.transform(inputSchema) + val outputDf = inputDf.select(columns: _*) + val recs = outputDf.orderBy(col("_1"), col("_2")).as[(String, String, Option[String])].collect() + recs should be(Seq( + ("col", "1", None), + ("col123", "2345", None) + )) + } + } + it should "support comments" in { val inputDf = spark.createDataFrame(Seq( ("col1", "12"), diff --git a/flowman-spark-extensions/src/main/scala/com/dimajix/spark/sql/SchemaUtils.scala b/flowman-spark-extensions/src/main/scala/com/dimajix/spark/sql/SchemaUtils.scala index ac52f72bb..32983c65d 100644 --- a/flowman-spark-extensions/src/main/scala/com/dimajix/spark/sql/SchemaUtils.scala +++ b/flowman-spark-extensions/src/main/scala/com/dimajix/spark/sql/SchemaUtils.scala @@ -40,7 +40,7 @@ import com.dimajix.spark.sql.catalyst.parser.CustomSqlParser object SchemaUtils { // This key is compatible with Spark 3.x - private val CHAR_VARCHAR_TYPE_STRING_METADATA_KEY = "__CHAR_VARCHAR_TYPE_STRING" + val CHAR_VARCHAR_TYPE_STRING_METADATA_KEY = "__CHAR_VARCHAR_TYPE_STRING" /** * Helper method for applying an optional schema to a given DataFrame. This will apply the types and order @@ -199,9 +199,10 @@ object SchemaUtils { } def replaceCharVarchar(field:StructField) : StructField = { val metadata = if (hasCharVarchar(field.dataType)) { - val builder = new MetadataBuilder().withMetadata(field.metadata) + new MetadataBuilder() + .withMetadata(field.metadata) .putString(CHAR_VARCHAR_TYPE_STRING_METADATA_KEY, field.dataType.catalogString) - builder.build() + .build() } else { field.metadata } @@ -262,22 +263,24 @@ object SchemaUtils { StructType(schema.map(recoverCharVarchar)) } def recoverCharVarchar(field:StructField) : StructField = { - if (field.metadata.contains(CHAR_VARCHAR_TYPE_STRING_METADATA_KEY)) { - val typeString = field.metadata.getString(CHAR_VARCHAR_TYPE_STRING_METADATA_KEY) - val dt = try { - CustomSqlParser.parseDataType(typeString) - } catch { - // Work around bad field names, which will fire a parse-exception - case _:ParseException => - recoverCharVarchar(field.dataType) - } - val meta = new MetadataBuilder().withMetadata(field.metadata) - .remove(CHAR_VARCHAR_TYPE_STRING_METADATA_KEY) - .build() - field.copy(dataType = dt, metadata = meta) - } else { - field + val dt = field.dataType match { + case struct: StructType => recoverCharVarchar(struct) + case _:StringType|_:ArrayType|_:MapType if field.metadata.contains(CHAR_VARCHAR_TYPE_STRING_METADATA_KEY) => + val typeString = field.metadata.getString(CHAR_VARCHAR_TYPE_STRING_METADATA_KEY) + try { + CustomSqlParser.parseDataType(typeString) + } catch { + // Work around bad field names, which will fire a parse-exception + case _: ParseException => + recoverCharVarchar(field.dataType) + } + case dt: DataType => dt } + + val meta = new MetadataBuilder().withMetadata(field.metadata) + .remove(CHAR_VARCHAR_TYPE_STRING_METADATA_KEY) + .build() + field.copy(dataType = dt, metadata = meta) } private def recoverCharVarchar(dataType: DataType) : DataType = { dataType match { diff --git a/flowman-spec/src/main/scala/com/dimajix/flowman/spec/mapping/SchemaMapping.scala b/flowman-spec/src/main/scala/com/dimajix/flowman/spec/mapping/SchemaMapping.scala index 7abd53069..cf3a0c73d 100644 --- a/flowman-spec/src/main/scala/com/dimajix/flowman/spec/mapping/SchemaMapping.scala +++ b/flowman-spec/src/main/scala/com/dimajix/flowman/spec/mapping/SchemaMapping.scala @@ -31,7 +31,10 @@ import com.dimajix.flowman.model.Mapping import com.dimajix.flowman.model.MappingOutputIdentifier import com.dimajix.flowman.model.Schema import com.dimajix.flowman.spec.schema.SchemaSpec +import com.dimajix.flowman.transforms.ColumnMismatchPolicy import com.dimajix.flowman.transforms.SchemaEnforcer +import com.dimajix.flowman.transforms.CharVarcharPolicy +import com.dimajix.flowman.transforms.TypeMismatchPolicy import com.dimajix.flowman.types.Field import com.dimajix.flowman.types.FieldType import com.dimajix.flowman.types.StructType @@ -43,6 +46,9 @@ case class SchemaMapping( input:MappingOutputIdentifier, columns:Seq[Field] = Seq(), schema:Option[Schema] = None, + columnMismatchPolicy:ColumnMismatchPolicy = ColumnMismatchPolicy.ADD_REMOVE_COLUMNS, + typeMismatchPolicy:TypeMismatchPolicy = TypeMismatchPolicy.CAST_ALWAYS, + charVarcharPolicy:CharVarcharPolicy = CharVarcharPolicy.PAD_AND_TRUNCATE, filter:Option[String] = None ) extends BaseMapping { @@ -96,10 +102,10 @@ extends BaseMapping { } private lazy val xfs = if(schema.nonEmpty) { - SchemaEnforcer(schema.get.sparkSchema) + SchemaEnforcer(schema.get.catalogSchema) } else { - SchemaEnforcer(StructType(columns).sparkType) + SchemaEnforcer(StructType(columns).catalogType) } } @@ -111,6 +117,9 @@ class SchemaMappingSpec extends MappingSpec { @JsonProperty(value = "columns", required = false) private var columns: ListMap[String,String] = ListMap() @JsonProperty(value = "schema", required = false) private var schema: Option[SchemaSpec] = None @JsonProperty(value = "filter", required=false) private var filter:Option[String] = None + @JsonProperty(value = "columnMismatchPolicy", required=false) private var columnMismatchPolicy:String = "ADD_REMOVE_COLUMNS" + @JsonProperty(value = "typeMismatchPolicy", required=false) private var typeMismatchPolicy:String = "CAST_ALWAYS" + @JsonProperty(value = "charVarcharPolicy", required=false) private var charVarcharPolicy:String = "PAD_AND_TRUNCATE" /** * Creates the instance of the specified Mapping with all variable interpolation being performed @@ -123,6 +132,9 @@ class SchemaMappingSpec extends MappingSpec { MappingOutputIdentifier(context.evaluate(this.input)), columns.toSeq.map(kv => Field(kv._1, FieldType.of(context.evaluate(kv._2)))), schema.map(_.instantiate(context)), + ColumnMismatchPolicy.ofString(context.evaluate(columnMismatchPolicy)), + TypeMismatchPolicy.ofString(context.evaluate(typeMismatchPolicy)), + CharVarcharPolicy.ofString(context.evaluate(charVarcharPolicy)), context.evaluate(filter) ) } diff --git a/flowman-spec/src/main/scala/com/dimajix/flowman/spec/mapping/UnionMapping.scala b/flowman-spec/src/main/scala/com/dimajix/flowman/spec/mapping/UnionMapping.scala index 9f1cbccf0..f55d6a0e7 100644 --- a/flowman-spec/src/main/scala/com/dimajix/flowman/spec/mapping/UnionMapping.scala +++ b/flowman-spec/src/main/scala/com/dimajix/flowman/spec/mapping/UnionMapping.scala @@ -64,7 +64,7 @@ case class UnionMapping( val union = if (schema.nonEmpty) { // Project all tables onto specified schema - val schemaEnforcer = SchemaEnforcer(schema.get.sparkSchema) + val schemaEnforcer = SchemaEnforcer(schema.get.catalogSchema) val projectedTables = dfs.map(schemaEnforcer.transform) projectedTables.reduce((l,r) => l.union(r)) } diff --git a/flowman-spec/src/main/scala/com/dimajix/flowman/spec/relation/HiveUnionTableRelation.scala b/flowman-spec/src/main/scala/com/dimajix/flowman/spec/relation/HiveUnionTableRelation.scala index 609c41c09..60bc07f5e 100644 --- a/flowman-spec/src/main/scala/com/dimajix/flowman/spec/relation/HiveUnionTableRelation.scala +++ b/flowman-spec/src/main/scala/com/dimajix/flowman/spec/relation/HiveUnionTableRelation.scala @@ -50,6 +50,7 @@ import com.dimajix.flowman.model.ResourceIdentifier import com.dimajix.flowman.model.Schema import com.dimajix.flowman.model.SchemaRelation import com.dimajix.flowman.transforms.SchemaEnforcer +import com.dimajix.flowman.transforms.CharVarcharPolicy import com.dimajix.flowman.transforms.UnionTransformer import com.dimajix.flowman.types.FieldValue import com.dimajix.flowman.types.SingleValue @@ -66,7 +67,7 @@ object HiveUnionTableRelation { */ private[relation] def unionSql(tables:Seq[DataFrame], schema:StructType) : String = { val union = UnionTransformer().transformDataFrames(tables) - val conformed = SchemaEnforcer(schema).transform(union) + val conformed = SchemaEnforcer(schema,charVarcharPolicy=CharVarcharPolicy.IGNORE).transform(union) new SqlBuilder(conformed).toSQL } } diff --git a/flowman-spec/src/main/scala/com/dimajix/flowman/spec/target/CopyTarget.scala b/flowman-spec/src/main/scala/com/dimajix/flowman/spec/target/CopyTarget.scala index f3cabbf4c..ba174255a 100644 --- a/flowman-spec/src/main/scala/com/dimajix/flowman/spec/target/CopyTarget.scala +++ b/flowman-spec/src/main/scala/com/dimajix/flowman/spec/target/CopyTarget.scala @@ -23,6 +23,7 @@ import org.slf4j.LoggerFactory import com.dimajix.common.No import com.dimajix.common.Trilean import com.dimajix.common.Yes +import com.dimajix.flowman.config.FlowmanConf import com.dimajix.flowman.config.FlowmanConf.DEFAULT_TARGET_OUTPUT_MODE import com.dimajix.flowman.config.FlowmanConf.DEFAULT_TARGET_PARALLELISM import com.dimajix.flowman.config.FlowmanConf.DEFAULT_TARGET_REBALANCE @@ -36,7 +37,10 @@ import com.dimajix.flowman.model.Dataset import com.dimajix.flowman.model.ResourceIdentifier import com.dimajix.flowman.model.Target import com.dimajix.flowman.spec.dataset.DatasetSpec +import com.dimajix.flowman.transforms.ColumnMismatchPolicy import com.dimajix.flowman.transforms.SchemaEnforcer +import com.dimajix.flowman.transforms.CharVarcharPolicy +import com.dimajix.flowman.transforms.TypeMismatchPolicy import com.dimajix.flowman.types.SchemaWriter import com.dimajix.flowman.types.StructType @@ -111,12 +115,12 @@ case class CopyTarget( * * @param executor */ - override protected def build(executor: Execution): Unit = { - require(executor != null) + override protected def build(execution: Execution): Unit = { + require(execution != null) logger.info(s"Copying dataset ${source.name} to ${target.name}") - val dfIn = source.read(executor) + val dfIn = source.read(execution) val data = if (parallelism <= 0) dfIn @@ -124,15 +128,21 @@ case class CopyTarget( dfIn.repartition(parallelism) else dfIn.coalesce(parallelism) - val conformed = target.describe(executor).map { schema => - val xfs = SchemaEnforcer(schema.sparkType) + val conformed = target.describe(execution).map { schema => + val conf = execution.flowmanConf + val xfs = SchemaEnforcer( + schema.catalogType, + columnMismatchPolicy = ColumnMismatchPolicy.ofString(conf.getConf(FlowmanConf.DEFAULT_RELATION_OUTPUT_COLUMN_MISMATCH_POLICY)), + typeMismatchPolicy = TypeMismatchPolicy.ofString(conf.getConf(FlowmanConf.DEFAULT_RELATION_OUTPUT_TYPE_MISMATCH_POLICY)), + charVarcharPolicy = CharVarcharPolicy.ofString(conf.getConf(FlowmanConf.DEFAULT_RELATION_OUTPUT_CHAR_VARCHAR_POLICY)) + ) xfs.transform(data) }.getOrElse(data) - target.write(executor, conformed, mode) + target.write(execution, conformed, mode) schema.foreach { spec => logger.info(s"Writing schema to file '${spec.file}'") - val schema = source.describe(executor).getOrElse(StructType.of(data.schema)) + val schema = source.describe(execution).getOrElse(StructType.of(data.schema)) val file = context.fs.file(spec.file) new SchemaWriter(schema.fields).format(spec.format).save(file) } diff --git a/flowman-spec/src/test/scala/com/dimajix/flowman/spec/mapping/SchemaMappingTest.scala b/flowman-spec/src/test/scala/com/dimajix/flowman/spec/mapping/SchemaMappingTest.scala index b55d2cd25..9fb7907f1 100644 --- a/flowman-spec/src/test/scala/com/dimajix/flowman/spec/mapping/SchemaMappingTest.scala +++ b/flowman-spec/src/test/scala/com/dimajix/flowman/spec/mapping/SchemaMappingTest.scala @@ -155,9 +155,12 @@ class SchemaMappingTest extends AnyFlatSpec with Matchers with LocalSparkSession } it should "correctly process extended string types" in { + val spark = this.spark + import spark.implicits._ + val inputDf = spark.createDataFrame(Seq( - ("col1", "12"), - ("col2", "23") + ("col", "1"), + ("col123", "2345") )) .withColumn("_1", col("_1").as("_1")) .withColumn("_2", col("_2").as("_2")) @@ -171,16 +174,16 @@ class SchemaMappingTest extends AnyFlatSpec with Matchers with LocalSparkSession Mapping.Properties(session.context, name = "map"), MappingOutputIdentifier("myview"), Seq( - Field("_1", FieldType.of("varchar(10)")), - Field("_2", FieldType.of("char(20)")), + Field("_1", FieldType.of("varchar(4)")), + Field("_2", FieldType.of("char(2)")), Field("_3", FieldType.of("int")) ) ) mapping.input should be (MappingOutputIdentifier("myview")) mapping.columns should be (Seq( - Field("_1", FieldType.of("varchar(10)")), - Field("_2", FieldType.of("char(20)")), + Field("_1", FieldType.of("varchar(4)")), + Field("_2", FieldType.of("char(2)")), Field("_3", FieldType.of("int")) )) mapping.inputs should be (Set(MappingOutputIdentifier("myview"))) @@ -189,12 +192,18 @@ class SchemaMappingTest extends AnyFlatSpec with Matchers with LocalSparkSession val desc = mapping.describe(executor, Map(MappingOutputIdentifier("myview") -> inputSchema))("main") desc should be (com.dimajix.flowman.types.StructType(Seq( - Field("_1", FieldType.of("varchar(10)")), - Field("_2", FieldType.of("char(20)")), + Field("_1", FieldType.of("varchar(4)")), + Field("_2", FieldType.of("char(2)")), Field("_3", FieldType.of("int")) ))) val result = mapping.execute(executor, Map(MappingOutputIdentifier("myview") -> inputDf))("main") + val recs = result.orderBy(col("_1"),col("_2")).as[(String,String,Option[Int])].collect() + recs should be (Seq( + ("col", "1 ", None), + ("col1", "23", None) + )) + SchemaUtils.dropMetadata(result.schema) should be (StructType(Seq( StructField("_1", StringType), StructField("_2", StringType), @@ -202,8 +211,8 @@ class SchemaMappingTest extends AnyFlatSpec with Matchers with LocalSparkSession ))) val resultSchema = com.dimajix.flowman.types.StructType.of(result.schema) resultSchema should be (com.dimajix.flowman.types.StructType(Seq( - Field("_1", FieldType.of("varchar(10)")), - Field("_2", FieldType.of("char(20)")), + Field("_1", FieldType.of("varchar(4)")), + Field("_2", FieldType.of("char(2)")), Field("_3", FieldType.of("int")) ))) } From e282ff7d3dff1f972b3b4f92b761e7135e6f9b4a Mon Sep 17 00:00:00 2001 From: Kaya Kupferschmidt Date: Tue, 2 Aug 2022 20:16:34 +0200 Subject: [PATCH 7/9] Improve implementation of 'conform' mapping --- docs/spec/fields.md | 2 +- docs/spec/mapping/conform.md | 20 ++- .../flowman/transforms/TypeReplacer.scala | 12 +- .../com/dimajix/flowman/types/FieldType.scala | 2 +- .../dimajix/flowman/types/IntegerType.scala | 3 +- .../flowman/transforms/TypeReplacerTest.scala | 121 +++++++++++++++++- .../flowman/types/IntegerTypeTest.scala | 4 +- .../flowman/spec/mapping/ConformMapping.scala | 4 +- 8 files changed, 148 insertions(+), 20 deletions(-) diff --git a/docs/spec/fields.md b/docs/spec/fields.md index 7233c4398..3ded0557e 100644 --- a/docs/spec/fields.md +++ b/docs/spec/fields.md @@ -13,7 +13,7 @@ The following simple data types are supported by Flowman * `smallint`, `short` - 16 bit signed numbers * `int`, `integer` - 32 bit signed numbers * `bigint`, `long` - 64 bit signed numbers -* `boolean` - true or false +* `boolean`, `bool` - true or false * `float` - 32 bit floating point number * `double` - 64 bit floating point number * `decimal(a,b)` diff --git a/docs/spec/mapping/conform.md b/docs/spec/mapping/conform.md index e49682bd0..3a20f2a58 100644 --- a/docs/spec/mapping/conform.md +++ b/docs/spec/mapping/conform.md @@ -1,6 +1,6 @@ # Conform Mapping The `conform` mapping performs simply name and type mangling transformations to conform data to some standard. For -example you can replace all date columns by timestamp columns (this is required for older versions of Hive) or +example, you can replace all date columns by timestamp columns (this is required for older versions of Hive) or you can transform column names from camel case to snake case to better match SQL. ## Example @@ -39,7 +39,20 @@ Specifies the naming scheme used for the output. The following values are suppor * `camelCaseUpper` * `types` **(optional)** *(type: map:string)*: -Specifies the list of types and how they should be replaced +Specifies the list of types and how they should be replaced. The following types can be specified as source types: + * `BYTE` or `TNINYINT` + * `SHORT` or `SMALLINT` + * `INT` or `INTEGER` + * `LONG` or `BIGINT` + * `BOOLEAN` or `BOOL` + * `FLOAT` + * `DOUBLE` + * `DECIMAL` + * `STRING` or `TEXT` + * `DURATION` + * `TIMESTAMP` + * `DATE` +Note that both `CHAR(n)` and `VARCHAR(n)` are matched to the entry for `STRING` type. * `flatten` **(optional)** *(type: boolean)* *(default: false)*: Flattens all nested structs into a flat list of columns if set to `true` @@ -50,6 +63,3 @@ An optional SQL filter expression that is applied *after* conforming. ## Outputs * `main` - the only output of the mapping - - -## Description diff --git a/flowman-core/src/main/scala/com/dimajix/flowman/transforms/TypeReplacer.scala b/flowman-core/src/main/scala/com/dimajix/flowman/transforms/TypeReplacer.scala index adb136421..f60ac8aa1 100644 --- a/flowman-core/src/main/scala/com/dimajix/flowman/transforms/TypeReplacer.scala +++ b/flowman-core/src/main/scala/com/dimajix/flowman/transforms/TypeReplacer.scala @@ -25,13 +25,14 @@ import org.apache.spark.sql.functions.struct import org.apache.spark.sql.{types => stypes} import com.dimajix.spark.sql.functions.nullable_struct - import com.dimajix.flowman.types.ArrayType +import com.dimajix.flowman.types.CharType import com.dimajix.flowman.types.DecimalType import com.dimajix.flowman.types.FieldType import com.dimajix.flowman.types.MapType import com.dimajix.flowman.types.StructType import com.dimajix.flowman.types.VarcharType +import com.dimajix.spark.sql.SchemaUtils.dropExtendedTypeInfo /** @@ -41,6 +42,8 @@ import com.dimajix.flowman.types.VarcharType final case class TypeReplacer(replace:Map[String, FieldType]) extends Transformer { private val typeAliases = Map( "text" -> "string", + "bool" -> "boolean", + "integer" -> "int", "long" -> "bigint", "short" -> "smallint", "byte" -> "tinyint" @@ -94,6 +97,8 @@ final case class TypeReplacer(replace:Map[String, FieldType]) extends Transforme val column = field.dataType match { case _:stypes.VarcharType => typeMap.get("string").map(d => col(fqn).cast(d.sparkType)) + case _:stypes.CharType => + typeMap.get("string").map(d => col(fqn).cast(d.sparkType)) case _:stypes.DecimalType => typeMap.get("decimal").map(d => col(fqn).cast(d.sparkType)) case at:stypes.ArrayType => processArray(fqn, at) @@ -102,7 +107,7 @@ final case class TypeReplacer(replace:Map[String, FieldType]) extends Transforme case dt:stypes.DataType => typeMap.get(dt.sql.toLowerCase(Locale.ROOT)).map(t => col(fqn).cast(t.sparkType)) } - column.map(_.as(field.name, field.metadata)) + column.map(_.as(field.name, dropExtendedTypeInfo(field.metadata))) } val fields = df.schema.fields.map(f => (col(f.name), processField("", f))) @@ -113,8 +118,9 @@ final case class TypeReplacer(replace:Map[String, FieldType]) extends Transforme override def transform(schema:StructType) : StructType = { def processType(fieldType: FieldType) : FieldType = { fieldType match { - case dt:DecimalType => typeMap.getOrElse("decimal", dt) case dt:VarcharType => typeMap.getOrElse("string", dt) + case dt:CharType => typeMap.getOrElse("string", dt) + case dt:DecimalType => typeMap.getOrElse("decimal", dt) case at:ArrayType => processArray(at) case mt:MapType => processMap(mt) case st:StructType => processStruct(st) diff --git a/flowman-core/src/main/scala/com/dimajix/flowman/types/FieldType.scala b/flowman-core/src/main/scala/com/dimajix/flowman/types/FieldType.scala index c477e53a9..fe0325c92 100644 --- a/flowman-core/src/main/scala/com/dimajix/flowman/types/FieldType.scala +++ b/flowman-core/src/main/scala/com/dimajix/flowman/types/FieldType.scala @@ -57,7 +57,7 @@ object FieldType { "byte" -> ByteType, "short" -> ShortType, "long" -> LongType, - "int" -> IntegerType, + "integer" -> IntegerType, "text" -> StringType ) } diff --git a/flowman-core/src/main/scala/com/dimajix/flowman/types/IntegerType.scala b/flowman-core/src/main/scala/com/dimajix/flowman/types/IntegerType.scala index 4d5fb1cc8..8d7047e42 100644 --- a/flowman-core/src/main/scala/com/dimajix/flowman/types/IntegerType.scala +++ b/flowman-core/src/main/scala/com/dimajix/flowman/types/IntegerType.scala @@ -1,5 +1,5 @@ /* - * Copyright 2018 Kaya Kupferschmidt + * Copyright 2018-2022 Kaya Kupferschmidt * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -22,5 +22,6 @@ import org.apache.spark.sql.types.DataType case object IntegerType extends IntegralType[Int] { protected def parseRaw(value:String) : Int = value.toInt + override def sqlType: String = "int" override def sparkType : DataType = org.apache.spark.sql.types.IntegerType } diff --git a/flowman-core/src/test/scala/com/dimajix/flowman/transforms/TypeReplacerTest.scala b/flowman-core/src/test/scala/com/dimajix/flowman/transforms/TypeReplacerTest.scala index ecf5ed83d..215b8e7c0 100644 --- a/flowman-core/src/test/scala/com/dimajix/flowman/transforms/TypeReplacerTest.scala +++ b/flowman-core/src/test/scala/com/dimajix/flowman/transforms/TypeReplacerTest.scala @@ -1,5 +1,5 @@ /* - * Copyright 2018 Kaya Kupferschmidt + * Copyright 2018-2022 Kaya Kupferschmidt * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -19,14 +19,18 @@ package com.dimajix.flowman.transforms import org.apache.spark.sql.DataFrame import org.apache.spark.sql.types.ArrayType import org.apache.spark.sql.types.DoubleType +import org.apache.spark.sql.types.FloatType import org.apache.spark.sql.types.IntegerType import org.apache.spark.sql.types.LongType +import org.apache.spark.sql.types.StringType import org.apache.spark.sql.types.StructField import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.types.VarcharType import org.scalatest.flatspec.AnyFlatSpec import org.scalatest.matchers.should.Matchers import com.dimajix.flowman.{types => ftypes} +import com.dimajix.spark.sql.SchemaUtils import com.dimajix.spark.testing.LocalSparkSession @@ -63,7 +67,7 @@ class TypeReplacerTest extends AnyFlatSpec with Matchers with LocalSparkSession } "A TypeReplacer" should "transform DataFrames correctly" in { - val mapping = new TypeReplacer( + val mapping = TypeReplacer( Map( "string" -> ftypes.IntegerType ) @@ -89,7 +93,7 @@ class TypeReplacerTest extends AnyFlatSpec with Matchers with LocalSparkSession } it should "provide a correct output schema" in { - val mapping = new TypeReplacer( + val mapping = TypeReplacer( Map( "string" -> ftypes.IntegerType ) @@ -110,10 +114,119 @@ class TypeReplacerTest extends AnyFlatSpec with Matchers with LocalSparkSession val outputSchema = mapping.transform(ftypes.StructType.of(inputDf.schema)) outputSchema.sparkType should be (expectedSchema) + outputSchema.catalogType should be (expectedSchema) + } + + it should "support simple types" in { + val rawInputDf = spark.createDataFrame(Seq( + ("1","1"), + ("123","123") + )) + + + val simpleTypes = Seq( + "string", "text", + "bool", "boolean", + "byte", "tinyint", + "short", "smallint", + "int", "integer", + "long", "bigint", + "float", + "double" + ) + for (typeName <- simpleTypes) { + val fieldType = ftypes.FieldType.of(typeName) + val inputSchema = ftypes.StructType(Seq( + ftypes.Field("_1", fieldType) + )) + val xfs = SchemaEnforcer(inputSchema.catalogType) + val inputDf = xfs.transform(rawInputDf) + + val mapping = TypeReplacer( + Map( + typeName -> ftypes.FloatType + ) + ) + val expectedSchema = StructType(Seq( + StructField("_1", FloatType) + )) + + val outputSchema = mapping.transform(ftypes.StructType.of(inputDf.schema)) + outputSchema.sparkType should be(expectedSchema) + outputSchema.catalogType should be(expectedSchema) + + val outputDf = mapping.transform(inputDf) + outputDf.schema should be(expectedSchema) + } + } + + it should "support extended string types (1)" in { + val mapping = TypeReplacer( + Map( + "string" -> ftypes.IntegerType + ) + ) + + val rawInputDf = spark.createDataFrame(Seq( + ("col", 1.0f), + ("col123", 2345.0f) + )) + val inputSchema = ftypes.StructType(Seq( + ftypes.Field("_1", ftypes.VarcharType(10)), + ftypes.Field("_2", ftypes.FloatType) + )) + val xfs = SchemaEnforcer(inputSchema.catalogType) + val inputDf = xfs.transform(rawInputDf) + val expectedSchema = StructType(Seq( + StructField("_1", IntegerType), + StructField("_2", FloatType, false) + )) + + val outputSchema = mapping.transform(ftypes.StructType.of(inputDf.schema)) + outputSchema.sparkType should be(expectedSchema) + outputSchema.catalogType should be(expectedSchema) + + val outputDf = mapping.transform(inputDf) + outputDf.schema should be (expectedSchema) + } + + it should "support extended string types (2)" in { + val mapping = TypeReplacer( + Map( + "int" -> ftypes.FloatType + ) + ) + + val rawInputDf = spark.createDataFrame(Seq( + ("col", 1), + ("col123", 2345) + )) + val inputSchema = ftypes.StructType(Seq( + ftypes.Field("_1", ftypes.VarcharType(10)), + ftypes.Field("_2", ftypes.IntegerType) + )) + val xfs = SchemaEnforcer(inputSchema.catalogType) + val inputDf = xfs.transform(rawInputDf) + val expectedSchema = StructType(Seq( + StructField("_1", VarcharType(10)), + StructField("_2", FloatType, false) + )) + val expectedSimpleSchema = StructType(Seq( + StructField("_1", StringType), + StructField("_2", FloatType, false) + )) + + val outputSchema = mapping.transform(ftypes.StructType.of(inputDf.schema)) + SchemaUtils.dropExtendedTypeInfo(outputSchema.sparkType) should be(expectedSimpleSchema) + outputSchema.catalogType should be(expectedSchema) + + val outputDf = mapping.transform(inputDf) + SchemaUtils.dropExtendedTypeInfo(outputDf.schema) should be(expectedSimpleSchema) + SchemaUtils.recoverCharVarchar(outputDf.schema) should be(expectedSchema) } it should "throw an error for arrays" in { - val mapping = new TypeReplacer( + val mapping = TypeReplacer( Map( "long" -> ftypes.IntegerType ) diff --git a/flowman-core/src/test/scala/com/dimajix/flowman/types/IntegerTypeTest.scala b/flowman-core/src/test/scala/com/dimajix/flowman/types/IntegerTypeTest.scala index e73ec35a4..61e367863 100644 --- a/flowman-core/src/test/scala/com/dimajix/flowman/types/IntegerTypeTest.scala +++ b/flowman-core/src/test/scala/com/dimajix/flowman/types/IntegerTypeTest.scala @@ -1,5 +1,5 @@ /* - * Copyright 2018 Kaya Kupferschmidt + * Copyright 2018-2022 Kaya Kupferschmidt * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -102,7 +102,7 @@ class IntegerTypeTest extends AnyFlatSpec with Matchers { } it should "provide the correct SQL type" in { - IntegerType.sqlType should be ("integer") + IntegerType.sqlType should be ("int") IntegerType.sparkType.sql should be ("INT") IntegerType.typeName should be ("integer") } diff --git a/flowman-spec/src/main/scala/com/dimajix/flowman/spec/mapping/ConformMapping.scala b/flowman-spec/src/main/scala/com/dimajix/flowman/spec/mapping/ConformMapping.scala index 7f9407337..c15153f72 100644 --- a/flowman-spec/src/main/scala/com/dimajix/flowman/spec/mapping/ConformMapping.scala +++ b/flowman-spec/src/main/scala/com/dimajix/flowman/spec/mapping/ConformMapping.scala @@ -63,7 +63,6 @@ extends BaseMapping { require(input != null) val df = input(this.input) - val transforms = this.transforms // Apply all transformations in order val result = transforms.foldLeft(df)((df,xfs) => xfs.transform(df)) @@ -84,7 +83,6 @@ extends BaseMapping { require(input != null) val schema = input(this.input) - val transforms = this.transforms // Apply all transformations in order val result = transforms.foldLeft(schema)((df,xfs) => xfs.transform(df)) @@ -94,7 +92,7 @@ extends BaseMapping { applyDocumentation(schemas) } - private def transforms : Seq[Transformer] = { + lazy val transforms : Seq[Transformer] = { Seq( Option(types).filter(_.nonEmpty).map(t => TypeReplacer(t)), naming.map(f => CaseFormatter(f)), From d1ecd7a543c586babf6e2029c8a72e41c64dcdcd Mon Sep 17 00:00:00 2001 From: Kaya Kupferschmidt Date: Wed, 3 Aug 2022 08:50:43 +0200 Subject: [PATCH 8/9] Update CHANGELOG.md for next release --- CHANGELOG.md | 2 +- docs/releases.md | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index adf1e301f..9a47dc67a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,4 +1,4 @@ -# Version 0.26.1 +# Version 0.26.1 - 2022-08-03 * github-226: Upgrade to Spark 3.2.2 * github-227: [BUG] Flowman should not fail with field names containing "-", "/" etc diff --git a/docs/releases.md b/docs/releases.md index 224967e58..271983045 100644 --- a/docs/releases.md +++ b/docs/releases.md @@ -14,7 +14,7 @@ The following gives an (incomplete) list of past releases of the last 12 months. changes over time. -### Version 0.26.1 +### Version 0.26.1 - 2022-08-03 * github-226: Upgrade to Spark 3.2.2 * github-227: [BUG] Flowman should not fail with field names containing "-", "/" etc From 94583f1dcd78422ad9b195ef16a3e99b1b7f60e1 Mon Sep 17 00:00:00 2001 From: Kaya Kupferschmidt Date: Wed, 3 Aug 2022 09:14:25 +0200 Subject: [PATCH 9/9] Update versions for release --- docker/pom.xml | 2 +- flowman-client/pom.xml | 2 +- flowman-common/pom.xml | 2 +- flowman-core/pom.xml | 2 +- flowman-dist/pom.xml | 2 +- flowman-dsl/pom.xml | 2 +- flowman-hub/pom.xml | 2 +- flowman-parent/pom.xml | 2 +- flowman-plugins/aws/pom.xml | 2 +- flowman-plugins/azure/pom.xml | 2 +- flowman-plugins/delta/pom.xml | 2 +- flowman-plugins/impala/pom.xml | 2 +- flowman-plugins/json/pom.xml | 2 +- flowman-plugins/kafka/pom.xml | 2 +- flowman-plugins/mariadb/pom.xml | 2 +- flowman-plugins/mssqlserver/pom.xml | 2 +- flowman-plugins/mysql/pom.xml | 2 +- flowman-plugins/openapi/pom.xml | 2 +- flowman-plugins/oracle/pom.xml | 2 +- flowman-plugins/postgresql/pom.xml | 2 +- flowman-plugins/swagger/pom.xml | 2 +- flowman-scalatest-compat/pom.xml | 2 +- flowman-server-ui/pom.xml | 2 +- flowman-server/pom.xml | 2 +- flowman-spark-extensions/pom.xml | 2 +- flowman-spark-testing/pom.xml | 2 +- flowman-spec/pom.xml | 2 +- flowman-studio-ui/pom.xml | 2 +- flowman-studio/pom.xml | 2 +- flowman-testing/pom.xml | 2 +- flowman-tools/pom.xml | 2 +- pom.xml | 2 +- 32 files changed, 32 insertions(+), 32 deletions(-) diff --git a/docker/pom.xml b/docker/pom.xml index d56193775..8f1098c3a 100644 --- a/docker/pom.xml +++ b/docker/pom.xml @@ -10,7 +10,7 @@ com.dimajix.flowman flowman-root - 0.26.1-SNAPSHOT + 0.26.1 ../pom.xml diff --git a/flowman-client/pom.xml b/flowman-client/pom.xml index 141d2beab..656535d4c 100644 --- a/flowman-client/pom.xml +++ b/flowman-client/pom.xml @@ -9,7 +9,7 @@ com.dimajix.flowman flowman-root - 0.26.1-SNAPSHOT + 0.26.1 ../pom.xml diff --git a/flowman-common/pom.xml b/flowman-common/pom.xml index 531c8404c..27a21a931 100644 --- a/flowman-common/pom.xml +++ b/flowman-common/pom.xml @@ -9,7 +9,7 @@ com.dimajix.flowman flowman-root - 0.26.1-SNAPSHOT + 0.26.1 ../pom.xml diff --git a/flowman-core/pom.xml b/flowman-core/pom.xml index 1cc9424ae..2b3b4416f 100644 --- a/flowman-core/pom.xml +++ b/flowman-core/pom.xml @@ -9,7 +9,7 @@ com.dimajix.flowman flowman-root - 0.26.1-SNAPSHOT + 0.26.1 ../pom.xml diff --git a/flowman-dist/pom.xml b/flowman-dist/pom.xml index 5020f34d6..d372576d2 100644 --- a/flowman-dist/pom.xml +++ b/flowman-dist/pom.xml @@ -10,7 +10,7 @@ com.dimajix.flowman flowman-root - 0.26.1-SNAPSHOT + 0.26.1 ../pom.xml diff --git a/flowman-dsl/pom.xml b/flowman-dsl/pom.xml index 3837a989c..00c0e8992 100644 --- a/flowman-dsl/pom.xml +++ b/flowman-dsl/pom.xml @@ -9,7 +9,7 @@ flowman-root com.dimajix.flowman - 0.26.1-SNAPSHOT + 0.26.1 ../pom.xml diff --git a/flowman-hub/pom.xml b/flowman-hub/pom.xml index 567f6b91a..ba5083725 100644 --- a/flowman-hub/pom.xml +++ b/flowman-hub/pom.xml @@ -9,7 +9,7 @@ flowman-root com.dimajix.flowman - 0.26.1-SNAPSHOT + 0.26.1 ../pom.xml diff --git a/flowman-parent/pom.xml b/flowman-parent/pom.xml index 442a6712d..c10e069e7 100644 --- a/flowman-parent/pom.xml +++ b/flowman-parent/pom.xml @@ -10,7 +10,7 @@ com.dimajix.flowman flowman-root - 0.26.1-SNAPSHOT + 0.26.1 ../pom.xml diff --git a/flowman-plugins/aws/pom.xml b/flowman-plugins/aws/pom.xml index b5b341bcf..85ffc1747 100644 --- a/flowman-plugins/aws/pom.xml +++ b/flowman-plugins/aws/pom.xml @@ -9,7 +9,7 @@ com.dimajix.flowman flowman-root - 0.26.1-SNAPSHOT + 0.26.1 ../../pom.xml diff --git a/flowman-plugins/azure/pom.xml b/flowman-plugins/azure/pom.xml index 053342b70..196deb06b 100644 --- a/flowman-plugins/azure/pom.xml +++ b/flowman-plugins/azure/pom.xml @@ -9,7 +9,7 @@ com.dimajix.flowman flowman-root - 0.26.1-SNAPSHOT + 0.26.1 ../../pom.xml diff --git a/flowman-plugins/delta/pom.xml b/flowman-plugins/delta/pom.xml index 1b144afe3..088ddd300 100644 --- a/flowman-plugins/delta/pom.xml +++ b/flowman-plugins/delta/pom.xml @@ -9,7 +9,7 @@ com.dimajix.flowman flowman-root - 0.26.1-SNAPSHOT + 0.26.1 ../../pom.xml diff --git a/flowman-plugins/impala/pom.xml b/flowman-plugins/impala/pom.xml index 4ef25f0f1..2ea8ae32d 100644 --- a/flowman-plugins/impala/pom.xml +++ b/flowman-plugins/impala/pom.xml @@ -9,7 +9,7 @@ com.dimajix.flowman flowman-root - 0.26.1-SNAPSHOT + 0.26.1 ../../pom.xml diff --git a/flowman-plugins/json/pom.xml b/flowman-plugins/json/pom.xml index 30c461e25..dbe4cfa6b 100644 --- a/flowman-plugins/json/pom.xml +++ b/flowman-plugins/json/pom.xml @@ -9,7 +9,7 @@ com.dimajix.flowman flowman-root - 0.26.1-SNAPSHOT + 0.26.1 ../../pom.xml diff --git a/flowman-plugins/kafka/pom.xml b/flowman-plugins/kafka/pom.xml index bb4129f59..f7c03e4e8 100644 --- a/flowman-plugins/kafka/pom.xml +++ b/flowman-plugins/kafka/pom.xml @@ -9,7 +9,7 @@ com.dimajix.flowman flowman-root - 0.26.1-SNAPSHOT + 0.26.1 ../../pom.xml diff --git a/flowman-plugins/mariadb/pom.xml b/flowman-plugins/mariadb/pom.xml index 74616d5cf..8ceae6746 100644 --- a/flowman-plugins/mariadb/pom.xml +++ b/flowman-plugins/mariadb/pom.xml @@ -9,7 +9,7 @@ com.dimajix.flowman flowman-root - 0.26.1-SNAPSHOT + 0.26.1 ../../pom.xml diff --git a/flowman-plugins/mssqlserver/pom.xml b/flowman-plugins/mssqlserver/pom.xml index d754fb0c3..51e4c04b4 100644 --- a/flowman-plugins/mssqlserver/pom.xml +++ b/flowman-plugins/mssqlserver/pom.xml @@ -9,7 +9,7 @@ com.dimajix.flowman flowman-root - 0.26.1-SNAPSHOT + 0.26.1 ../../pom.xml diff --git a/flowman-plugins/mysql/pom.xml b/flowman-plugins/mysql/pom.xml index 7157f6b06..4ceeea540 100644 --- a/flowman-plugins/mysql/pom.xml +++ b/flowman-plugins/mysql/pom.xml @@ -9,7 +9,7 @@ com.dimajix.flowman flowman-root - 0.26.1-SNAPSHOT + 0.26.1 ../../pom.xml diff --git a/flowman-plugins/openapi/pom.xml b/flowman-plugins/openapi/pom.xml index 2522d853e..cc00acecd 100644 --- a/flowman-plugins/openapi/pom.xml +++ b/flowman-plugins/openapi/pom.xml @@ -9,7 +9,7 @@ com.dimajix.flowman flowman-root - 0.26.1-SNAPSHOT + 0.26.1 ../../pom.xml diff --git a/flowman-plugins/oracle/pom.xml b/flowman-plugins/oracle/pom.xml index 416888e2e..bcfa4e6c3 100644 --- a/flowman-plugins/oracle/pom.xml +++ b/flowman-plugins/oracle/pom.xml @@ -9,7 +9,7 @@ com.dimajix.flowman flowman-root - 0.26.1-SNAPSHOT + 0.26.1 ../../pom.xml diff --git a/flowman-plugins/postgresql/pom.xml b/flowman-plugins/postgresql/pom.xml index a23b9eb3f..54c8bf221 100644 --- a/flowman-plugins/postgresql/pom.xml +++ b/flowman-plugins/postgresql/pom.xml @@ -9,7 +9,7 @@ com.dimajix.flowman flowman-root - 0.26.1-SNAPSHOT + 0.26.1 ../../pom.xml diff --git a/flowman-plugins/swagger/pom.xml b/flowman-plugins/swagger/pom.xml index 2b08fb8c6..a453e977b 100644 --- a/flowman-plugins/swagger/pom.xml +++ b/flowman-plugins/swagger/pom.xml @@ -9,7 +9,7 @@ com.dimajix.flowman flowman-root - 0.26.1-SNAPSHOT + 0.26.1 ../../pom.xml diff --git a/flowman-scalatest-compat/pom.xml b/flowman-scalatest-compat/pom.xml index b479f7f98..5ab91da1a 100644 --- a/flowman-scalatest-compat/pom.xml +++ b/flowman-scalatest-compat/pom.xml @@ -9,7 +9,7 @@ com.dimajix.flowman flowman-root - 0.26.1-SNAPSHOT + 0.26.1 ../pom.xml diff --git a/flowman-server-ui/pom.xml b/flowman-server-ui/pom.xml index 0c5eda9eb..54ccd0ca5 100644 --- a/flowman-server-ui/pom.xml +++ b/flowman-server-ui/pom.xml @@ -9,7 +9,7 @@ com.dimajix.flowman flowman-root - 0.26.1-SNAPSHOT + 0.26.1 ../pom.xml diff --git a/flowman-server/pom.xml b/flowman-server/pom.xml index 9e10dc304..b832437e5 100644 --- a/flowman-server/pom.xml +++ b/flowman-server/pom.xml @@ -9,7 +9,7 @@ flowman-root com.dimajix.flowman - 0.26.1-SNAPSHOT + 0.26.1 ../pom.xml diff --git a/flowman-spark-extensions/pom.xml b/flowman-spark-extensions/pom.xml index 2729d2b6d..432f731ba 100644 --- a/flowman-spark-extensions/pom.xml +++ b/flowman-spark-extensions/pom.xml @@ -9,7 +9,7 @@ com.dimajix.flowman flowman-root - 0.26.1-SNAPSHOT + 0.26.1 ../pom.xml diff --git a/flowman-spark-testing/pom.xml b/flowman-spark-testing/pom.xml index 56cbc315e..d0df899cc 100644 --- a/flowman-spark-testing/pom.xml +++ b/flowman-spark-testing/pom.xml @@ -9,7 +9,7 @@ com.dimajix.flowman flowman-root - 0.26.1-SNAPSHOT + 0.26.1 ../pom.xml diff --git a/flowman-spec/pom.xml b/flowman-spec/pom.xml index 02a738ebc..55cd9591f 100644 --- a/flowman-spec/pom.xml +++ b/flowman-spec/pom.xml @@ -9,7 +9,7 @@ flowman-root com.dimajix.flowman - 0.26.1-SNAPSHOT + 0.26.1 ../pom.xml diff --git a/flowman-studio-ui/pom.xml b/flowman-studio-ui/pom.xml index a2ebb1476..06ba93dcf 100644 --- a/flowman-studio-ui/pom.xml +++ b/flowman-studio-ui/pom.xml @@ -9,7 +9,7 @@ com.dimajix.flowman flowman-root - 0.26.1-SNAPSHOT + 0.26.1 ../pom.xml diff --git a/flowman-studio/pom.xml b/flowman-studio/pom.xml index 6d86a7a59..416483c72 100644 --- a/flowman-studio/pom.xml +++ b/flowman-studio/pom.xml @@ -9,7 +9,7 @@ flowman-root com.dimajix.flowman - 0.26.1-SNAPSHOT + 0.26.1 ../pom.xml diff --git a/flowman-testing/pom.xml b/flowman-testing/pom.xml index a38c838e2..c53d2f3ca 100644 --- a/flowman-testing/pom.xml +++ b/flowman-testing/pom.xml @@ -9,7 +9,7 @@ com.dimajix.flowman flowman-root - 0.26.1-SNAPSHOT + 0.26.1 ../pom.xml diff --git a/flowman-tools/pom.xml b/flowman-tools/pom.xml index 176017ccc..1c14d25e9 100644 --- a/flowman-tools/pom.xml +++ b/flowman-tools/pom.xml @@ -9,7 +9,7 @@ com.dimajix.flowman flowman-root - 0.26.1-SNAPSHOT + 0.26.1 ../pom.xml diff --git a/pom.xml b/pom.xml index faf03a9b3..3298e0622 100644 --- a/pom.xml +++ b/pom.xml @@ -5,7 +5,7 @@ 4.0.0 com.dimajix.flowman flowman-root - 0.26.1-SNAPSHOT + 0.26.1 pom Flowman root pom A Spark based ETL tool