Skip to content

Commit

Permalink
Merge branch 'develop'
Browse files Browse the repository at this point in the history
  • Loading branch information
kupferk committed Aug 3, 2022
2 parents fb34202 + 94583f1 commit 8d5e019
Show file tree
Hide file tree
Showing 57 changed files with 700 additions and 236 deletions.
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,10 @@
# Version 0.26.1 - 2022-08-03

* github-226: Upgrade to Spark 3.2.2
* github-227: [BUG] Flowman should not fail with field names containing "-", "/" etc
* github-228: Padding and truncation of CHAR(n)/VARCHAR(n) should be configurable


# Version 0.26.0 - 2022-07-27

* github-202: Add support for Spark 3.3
Expand Down
2 changes: 1 addition & 1 deletion docker/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ ENV SPARK_HOME=/opt/spark
ENV PATH=$PATH:$SPARK_HOME/bin:$FLOWMAN_HOME/bin

# Download and install Spark
RUN curl -sL --retry 3 "http://ftp.fau.de/apache/spark/spark-${BUILD_SPARK_VERSION}/spark-${BUILD_SPARK_VERSION}-bin-hadoop${BUILD_HADOOP_VERSION}.tgz" \
RUN curl -sL --retry 3 "https://archive.apache.org/dist/spark/spark-${BUILD_SPARK_VERSION}/spark-${BUILD_SPARK_VERSION}-bin-hadoop${BUILD_HADOOP_VERSION}.tgz" \
| tar xz -C /opt \
&& ln -s /opt/spark-${BUILD_SPARK_VERSION}-bin-hadoop${BUILD_HADOOP_VERSION} ${SPARK_HOME} \
&& chown -R root:root $SPARK_HOME
Expand Down
2 changes: 1 addition & 1 deletion docker/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
<parent>
<groupId>com.dimajix.flowman</groupId>
<artifactId>flowman-root</artifactId>
<version>0.26.0</version>
<version>0.26.1</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
7 changes: 7 additions & 0 deletions docs/releases.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,13 @@ The following gives an (incomplete) list of past releases of the last 12 months.
changes over time.


### Version 0.26.1 - 2022-08-03

* github-226: Upgrade to Spark 3.2.2
* github-227: [BUG] Flowman should not fail with field names containing "-", "/" etc
* github-228: Padding and truncation of CHAR(n)/VARCHAR(n) should be configurable


### Version 0.26.0 - 2022-07-27

* github-202: Add support for Spark 3.3
Expand Down
6 changes: 6 additions & 0 deletions docs/setup/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,9 @@ Sets the strategy to use how tables should be migrated. Possible values are:
Defines how Flowman should handle a mismatch between the types of the actual schema of a relation when reading from
it and the types of the schema as defined in the relation. Per default, Flowman ignores any mismatch and simply passes
through the types of the actual relation. See [relations](../spec/relation/index.md) for possible options and more details.
- `flowman.default.relation.input.charVarcharPolicy` *(type: string)* *(default:`PAD_AND_TRUUNCATE`)* (since Flowman 0.26.1)
Defines how Flowman will handle `CHAR(n)`/`VARCHAR(n)` data on reading. Per default Flowman will truncate/pad `CHAR(n)`
columns and truncate `VARCHAR(n)` columns. See [relations](../spec/relation/index.md) for possible options and more details.
- `flowman.default.relation.output.columnMismatchPolicy` *(type: string)* *(default:`ADD_REMOVE_COLUMNS`)* (since Flowman 0.20.0)
Defines how Flowman should handle a mismatch of columns of records being written to a relation and the relations
actual defined columns. Per default Flowman will add/remove columns to/from records such that they match the current
Expand All @@ -116,6 +119,9 @@ Sets the strategy to use how tables should be migrated. Possible values are:
Defines how Flowman should handle a mismatch of columns of records being written to a relation and the relations
actual defined columns. Per default Flowman will add/remove columns to/from records such that they match the current
physical layout. See [relations](../spec/relation/index.md) for possible options and more details.
- `flowman.default.relation.output.charVarcharPolicy` *(type: string)* *(default:`PAD_AND_TRUUNCATE`)* (since Flowman 0.26.1)
Defines how Flowman will handle `CHAR(n)`/`VARCHAR(n)` data when writing. Per default Flowman will truncate/pad `CHAR(n)`
columns and truncate `VARCHAR(n)` columns. See [relations](../spec/relation/index.md) for possible options and more details.


### Target related Properties
Expand Down
2 changes: 1 addition & 1 deletion docs/spec/fields.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ The following simple data types are supported by Flowman
* `smallint`, `short` - 16 bit signed numbers
* `int`, `integer` - 32 bit signed numbers
* `bigint`, `long` - 64 bit signed numbers
* `boolean` - true or false
* `boolean`, `bool` - true or false
* `float` - 32 bit floating point number
* `double` - 64 bit floating point number
* `decimal(a,b)`
Expand Down
20 changes: 15 additions & 5 deletions docs/spec/mapping/conform.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Conform Mapping
The `conform` mapping performs simply name and type mangling transformations to conform data to some standard. For
example you can replace all date columns by timestamp columns (this is required for older versions of Hive) or
example, you can replace all date columns by timestamp columns (this is required for older versions of Hive) or
you can transform column names from camel case to snake case to better match SQL.

## Example
Expand Down Expand Up @@ -39,7 +39,20 @@ Specifies the naming scheme used for the output. The following values are suppor
* `camelCaseUpper`

* `types` **(optional)** *(type: map:string)*:
Specifies the list of types and how they should be replaced
Specifies the list of types and how they should be replaced. The following types can be specified as source types:
* `BYTE` or `TNINYINT`
* `SHORT` or `SMALLINT`
* `INT` or `INTEGER`
* `LONG` or `BIGINT`
* `BOOLEAN` or `BOOL`
* `FLOAT`
* `DOUBLE`
* `DECIMAL`
* `STRING` or `TEXT`
* `DURATION`
* `TIMESTAMP`
* `DATE`
Note that both `CHAR(n)` and `VARCHAR(n)` are matched to the entry for `STRING` type.

* `flatten` **(optional)** *(type: boolean)* *(default: false)*:
Flattens all nested structs into a flat list of columns if set to `true`
Expand All @@ -50,6 +63,3 @@ An optional SQL filter expression that is applied *after* conforming.

## Outputs
* `main` - the only output of the mapping


## Description
28 changes: 28 additions & 0 deletions docs/spec/mapping/schema.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ mappings:
partial_facts:
kind: schema
input: facts
columnMismatchPolicy: ADD_REMOVE_COLUMNS
typeMismatchPolicy: CAST_ALWAYS
charVarcharPolicy: PAD_AND_TRUNCATE
schema:
kind: inline
fields:
Expand Down Expand Up @@ -58,6 +61,31 @@ Specifies the list of column names (key) with their type (value)
As an alternative of specifying a list of columns you can also directly specify a schema, as described in
[schema](../schema/index.md).

* `columnMismatchPolicy` **(optional)** *(type: string)* *(default: `ADD_REMOVE_COLUMNS`)*:
Control how Flowman will handle a mismatch between column names in the source and the provided schema:
- `IGNORE` will simply pass through the input columns unchanged
- `ERROR` will fail the build once a mismatch between actual and requested schema is detected
- `ADD_COLUMNS_OR_IGNORE` will add (`NULL`) columns from the requested schema to the input schema, and will keep columns in the input schema which are not present in the requested schema
- `ADD_COLUMNS_OR_ERROR` will add (`NULL`) columns from the requested schema to the input schema, but will fail the build if the input schema contains columns not present in the requested schema
- `REMOVE_COLUMNS_OR_IGNORE` will remove columns from the input schema which are not present in the requested schema
- `REMOVE_COLUMNS_OR_ERROR` will remove columns from the input schema which are not present in the requested schema and will fail if the input schema is missing requested columns
- `ADD_REMOVE_COLUMNS` will essentially pass through the requested schema as is (the default)

* `typeMismatchPolicy` **(optional)** *(type: string)* *(default: `CAST_ALWAYS`)*:
Control how Flowman will convert between data types:
- `IGNORE` - Ignores any data type mismatches and does not perform any conversion
- `ERROR` - Throws an error on data type mismatches
- `CAST_COMPATIBLE_OR_ERROR` - Performs a data type conversion with compatible types, otherwise throws an error
- `CAST_COMPATIBLE_OR_IGNORE` - Performs a data type conversion with compatible types, otherwise does not perform conversion
- `CAST_ALWAYS` - Always performs data type conversion (the default)

* `charVarcharPolicy` **(optional)** *(type: string)* *(default: `PAD_AND_TRUNCATE`)*:
Control how Flowman will treat `VARCHAR(n)` and `CHAR(n)` data types. The possible values are
- `IGNORE` - Do not apply any length restrictions
- `PAD_AND_TRUNCATE` - Truncate `VARCHAR(n)`/`CHAR(n)` strings which are too long and pad `CHAR(n)` strings which are too short
- `PAD` - Pad `CHAR(n)` strings which are too short
- `TRUNCATE` - Truncate `VARCHAR(n)`/`CHAR(n)` strings which are too long

* `filter` **(optional)** *(type: string)* *(default: empty)*:
An optional SQL filter expression that is applied *after* schema operation.

Expand Down
10 changes: 10 additions & 0 deletions docs/spec/relation/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,10 @@ Most relations support implicit schema conversions, which means that
The details of these conversions can be controlled via some config variables
* `flowman.default.relation.input.columnMismatchPolicy` (default is `IGNORE`)
* `flowman.default.relation.input.typeMismatchPolicy` (default is `IGNORE`)
* `flowman.default.relation.input.charVarcharPolicy` (default is `PAD_AND_TRUUNCATE`)
* `flowman.default.relation.output.columnMismatchPolicy` (default is `ADD_REMOVE_COLUMNS`)
* `flowman.default.relation.output.typeMismatchPolicy` (default is `CAST_ALWAYS`)
* `flowman.default.relation.output.charVarcharPolicy` (default is `PAD_AND_TRUUNCATE`)

The schema conversion is implemented using two aspects. The first is a mismatch between column (names). This can be
configured using the `columnMismatchPolicy` as follows. Basically the idea is that
Expand Down Expand Up @@ -70,6 +72,14 @@ follows:
| `CAST_ALWAYS` | Source can be safely cast to dest | Target Data Type |
| `CAST_ALWAYS` | Source cannot be safely cast to dest | Target Data Type |

The two options `flowman.default.relation.input.charVarcharPolicy` and `flowman.default.relation.output.charVarcharPolicy`
control how Flowman will treat `VARCHAR(n)` and `CHAR(n)` data types. The possible values are
* `IGNORE` - Do not apply any length restrictions
* `PAD_AND_TRUNCATE` - Truncate `VARCHAR(n)`/`CHAR(n)` strings which are too long and pad `CHAR(n)` strings which are too short
* `PAD` - Pad `CHAR(n)` strings which are too short
* `TRUNCATE` - Truncate `VARCHAR(n)`/`CHAR(n)` strings which are too long


## Relation Types

Flowman directly provides support for the most important data sources, which are also
Expand Down
2 changes: 1 addition & 1 deletion flowman-client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
<parent>
<groupId>com.dimajix.flowman</groupId>
<artifactId>flowman-root</artifactId>
<version>0.26.0</version>
<version>0.26.1</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion flowman-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
<parent>
<groupId>com.dimajix.flowman</groupId>
<artifactId>flowman-root</artifactId>
<version>0.26.0</version>
<version>0.26.1</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion flowman-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
<parent>
<groupId>com.dimajix.flowman</groupId>
<artifactId>flowman-root</artifactId>
<version>0.26.0</version>
<version>0.26.1</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,9 @@ import com.dimajix.flowman.execution.SimpleExecutor
import com.dimajix.flowman.execution.DependencyScheduler
import com.dimajix.flowman.execution.Scheduler
import com.dimajix.flowman.model.VerifyPolicy
import com.dimajix.flowman.transforms.ColumnMismatchStrategy
import com.dimajix.flowman.transforms.TypeMismatchStrategy
import com.dimajix.flowman.transforms.ColumnMismatchPolicy
import com.dimajix.flowman.transforms.CharVarcharPolicy
import com.dimajix.flowman.transforms.TypeMismatchPolicy
import com.dimajix.spark.features


Expand Down Expand Up @@ -120,22 +121,30 @@ object FlowmanConf {
.stringConf
.createWithDefault(MigrationStrategy.ALTER.toString)

val DEFAULT_RELATION_INPUT_COLUMN_MISMATCH_STRATEGY = buildConf("flowman.default.relation.input.columnMismatchPolicy")
val DEFAULT_RELATION_INPUT_COLUMN_MISMATCH_POLICY = buildConf("flowman.default.relation.input.columnMismatchPolicy")
.doc("Default strategy to use on schema column mismatch while reading relations. Can be 'ignore', 'error', 'add_columns_or_ignore', 'add_columns_or_error', 'remove_columns_or_ignore', 'remove_columns_or_error', 'add_remove_columns'")
.stringConf
.createWithDefault(ColumnMismatchStrategy.IGNORE.toString)
val DEFAULT_RELATION_INPUT_TYPE_MISMATCH_STRATEGY = buildConf("flowman.default.relation.input.typeMismatchPolicy")
.createWithDefault(ColumnMismatchPolicy.IGNORE.toString)
val DEFAULT_RELATION_INPUT_TYPE_MISMATCH_POLICY = buildConf("flowman.default.relation.input.typeMismatchPolicy")
.doc("Default strategy to use on schema type mismatch while reading relations. Can be 'ignore', 'error', 'cast_compatible_or_ignore', 'cast_compatible_or_error', 'cast_always'")
.stringConf
.createWithDefault(TypeMismatchStrategy.IGNORE.toString)
val DEFAULT_RELATION_OUTPUT_COLUMN_MISMATCH_STRATEGY = buildConf("flowman.default.relation.output.columnMismatchPolicy")
.createWithDefault(TypeMismatchPolicy.IGNORE.toString)
val DEFAULT_RELATION_INPUT_CHAR_VARCHAR_POLICY = buildConf("flowman.default.relation.input.charVarcharPolicy")
.doc("Default strategy to use when reading CHAR(n)/VARCHAR(n) data types. Can be 'ignore', 'pad', 'truncate' or 'pad_and_truncate'")
.stringConf
.createWithDefault(CharVarcharPolicy.PAD_AND_TRUNCATE.toString)
val DEFAULT_RELATION_OUTPUT_COLUMN_MISMATCH_POLICY = buildConf("flowman.default.relation.output.columnMismatchPolicy")
.doc("Default strategy to use on schema column mismatch while reading relations. Can be 'ignore', 'error', 'add_columns_or_ignore', 'add_columns_or_error', 'remove_columns_or_ignore', 'remove_columns_or_error', 'add_remove_columns'")
.stringConf
.createWithDefault(ColumnMismatchStrategy.ADD_REMOVE_COLUMNS.toString)
val DEFAULT_RELATION_OUTPUT_TYPE_MISMATCH_STRATEGY = buildConf("flowman.default.relation.output.typeMismatchPolicy")
.createWithDefault(ColumnMismatchPolicy.ADD_REMOVE_COLUMNS.toString)
val DEFAULT_RELATION_OUTPUT_TYPE_MISMATCH_POLICY = buildConf("flowman.default.relation.output.typeMismatchPolicy")
.doc("Default strategy to use on schema type mismatch while reading relations. Can be 'ignore', 'error', 'cast_compatible_or_ignore', 'cast_compatible_or_error', 'cast_always'")
.stringConf
.createWithDefault(TypeMismatchStrategy.CAST_ALWAYS.toString)
.createWithDefault(TypeMismatchPolicy.CAST_ALWAYS.toString)
val DEFAULT_RELATION_OUTPUT_CHAR_VARCHAR_POLICY = buildConf("flowman.default.relation.output.charVarcharPolicy")
.doc("Default strategy to use when writing CHAR(n)/VARCHAR(n) data types. Can be 'ignore', 'pad', 'truncate' or 'pad_and_truncate'")
.stringConf
.createWithDefault(CharVarcharPolicy.PAD_AND_TRUNCATE.toString)

val DEFAULT_TARGET_VERIFY_POLICY = buildConf("flowman.default.target.verifyPolicy")
.doc("Policy for verifying a target. Accepted verify policies are 'empty_as_success', 'empty_as_failure' and 'empty_as_success_with_errors'.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,10 @@ import com.dimajix.flowman.execution.Operation
import com.dimajix.flowman.execution.OutputMode
import com.dimajix.flowman.graph.Linker
import com.dimajix.flowman.model
import com.dimajix.flowman.transforms.ColumnMismatchStrategy
import com.dimajix.flowman.transforms.ColumnMismatchPolicy
import com.dimajix.flowman.transforms.SchemaEnforcer
import com.dimajix.flowman.transforms.TypeMismatchStrategy
import com.dimajix.flowman.transforms.CharVarcharPolicy
import com.dimajix.flowman.transforms.TypeMismatchPolicy
import com.dimajix.flowman.types.Field
import com.dimajix.flowman.types.FieldValue
import com.dimajix.flowman.types.SingleValue
Expand Down Expand Up @@ -514,8 +515,9 @@ abstract class BaseRelation extends AbstractInstance with Relation {
val conf = execution.flowmanConf
val enforcer = SchemaEnforcer(
schema,
columnMismatchStrategy = ColumnMismatchStrategy.ofString(conf.getConf(FlowmanConf.DEFAULT_RELATION_OUTPUT_COLUMN_MISMATCH_STRATEGY)),
typeMismatchStrategy = TypeMismatchStrategy.ofString(conf.getConf(FlowmanConf.DEFAULT_RELATION_OUTPUT_TYPE_MISMATCH_STRATEGY))
columnMismatchPolicy = ColumnMismatchPolicy.ofString(conf.getConf(FlowmanConf.DEFAULT_RELATION_OUTPUT_COLUMN_MISMATCH_POLICY)),
typeMismatchPolicy = TypeMismatchPolicy.ofString(conf.getConf(FlowmanConf.DEFAULT_RELATION_OUTPUT_TYPE_MISMATCH_POLICY)),
charVarcharPolicy = CharVarcharPolicy.ofString(conf.getConf(FlowmanConf.DEFAULT_RELATION_OUTPUT_CHAR_VARCHAR_POLICY))
)
enforcer.transform(df)
}
Expand All @@ -537,8 +539,9 @@ abstract class BaseRelation extends AbstractInstance with Relation {
val conf = execution.flowmanConf
val enforcer = SchemaEnforcer(
schema,
columnMismatchStrategy = ColumnMismatchStrategy.ofString(conf.getConf(FlowmanConf.DEFAULT_RELATION_INPUT_COLUMN_MISMATCH_STRATEGY)),
typeMismatchStrategy = TypeMismatchStrategy.ofString(conf.getConf(FlowmanConf.DEFAULT_RELATION_INPUT_TYPE_MISMATCH_STRATEGY))
columnMismatchPolicy = ColumnMismatchPolicy.ofString(conf.getConf(FlowmanConf.DEFAULT_RELATION_INPUT_COLUMN_MISMATCH_POLICY)),
typeMismatchPolicy = TypeMismatchPolicy.ofString(conf.getConf(FlowmanConf.DEFAULT_RELATION_INPUT_TYPE_MISMATCH_POLICY)),
charVarcharPolicy = CharVarcharPolicy.ofString(conf.getConf(FlowmanConf.DEFAULT_RELATION_INPUT_CHAR_VARCHAR_POLICY))
)
enforcer.transform(df)
}
Expand Down
Loading

0 comments on commit 8d5e019

Please sign in to comment.