Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

docs: Cleanup Python models section of README #643

Merged
merged 4 commits into from
May 10, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
264 changes: 138 additions & 126 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,10 @@
- [Timestamp strategy](#timestamp-strategy)
- [Check strategy](#check-strategy)
- [Hard-deletes](#hard-deletes)
- [AWS Lakeformation integration](#aws-lakeformation-integration)
- [Working example](#working-example)
- [Snapshots Known issues](#snapshots-known-issues)
- [AWS Lakeformation integration](#aws-lakeformation-integration)
- [Python Models](#python-models)
- [Contracts](#contracts)
- [Contributing](#contributing)
- [Contributors ✨](#contributors-)
Expand Down Expand Up @@ -278,7 +279,7 @@ athena:
}
```

> Notes:
> Notes:
nicor88 marked this conversation as resolved.
Show resolved Hide resolved
>
> - `lf_tags` and `lf_tags_columns` configs support only attaching lf tags to corresponding resources.
> We recommend managing LF Tags permissions somewhere outside dbt. For example, you may use
Expand Down Expand Up @@ -553,7 +554,113 @@ To use the check strategy refer to the [dbt docs](https://docs.getdbt.com/docs/b
The materialization also supports invalidating hard deletes. Check
the [docs](https://docs.getdbt.com/docs/build/snapshots#hard-deletes-opt-in) to understand usage.

### AWS Lakeformation integration
### Working example
nicor88 marked this conversation as resolved.
Show resolved Hide resolved

seed file - employent_indicators_november_2022_csv_tables.csv

```csv
Series_reference,Period,Data_value,Suppressed
MEIM.S1WA,1999.04,80267,
MEIM.S1WA,1999.05,70803,
MEIM.S1WA,1999.06,65792,
MEIM.S1WA,1999.07,66194,
MEIM.S1WA,1999.08,67259,
MEIM.S1WA,1999.09,69691,
MEIM.S1WA,1999.1,72475,
MEIM.S1WA,1999.11,79263,
MEIM.S1WA,1999.12,86540,
MEIM.S1WA,2000.01,82552,
MEIM.S1WA,2000.02,81709,
MEIM.S1WA,2000.03,84126,
MEIM.S1WA,2000.04,77089,
MEIM.S1WA,2000.05,73811,
MEIM.S1WA,2000.06,70070,
MEIM.S1WA,2000.07,69873,
MEIM.S1WA,2000.08,71468,
MEIM.S1WA,2000.09,72462,
MEIM.S1WA,2000.1,74897,
```

model.sql

```sql
{{ config(
materialized='table'
) }}

select row_number() over() as id
, *
, cast(from_unixtime(to_unixtime(now())) as timestamp(6)) as refresh_timestamp
from {{ ref('employment_indicators_november_2022_csv_tables') }}
```

timestamp strategy - model_snapshot_1

```sql
{% snapshot model_snapshot_1 %}

{{
config(
strategy='timestamp',
updated_at='refresh_timestamp',
unique_key='id'
)
}}

select *
from {{ ref('model') }} {% endsnapshot %}
```

invalidate hard deletes - model_snapshot_2

```sql
{% snapshot model_snapshot_2 %}

{{
config
(
unique_key='id',
strategy='timestamp',
updated_at='refresh_timestamp',
invalidate_hard_deletes=True,
)
}}
select *
from {{ ref('model') }} {% endsnapshot %}
```

check strategy - model_snapshot_3

```sql
{% snapshot model_snapshot_3 %}

{{
config
(
unique_key='id',
strategy='check',
check_cols=['series_reference','data_value']
)
}}
select *
from {{ ref('model') }} {% endsnapshot %}
```

### Snapshots Known issues

- Incremental Iceberg models - Sync all columns on schema change can't remove columns used as partitioning.
The only way, from a dbt perspective, is to do a full-refresh of the incremental model.

- Tables, schemas and database should only be lowercase

- In order to avoid potential conflicts, make sure [`dbt-athena-adapter`](https://github.com/Tomme/dbt-athena) is not
installed in the target environment.
See <https://github.com/dbt-athena/dbt-athena/issues/103> for more details.

- Snapshot does not support dropping columns from the source table. If you drop a column make sure to drop the column
from the snapshot as well. Another workaround is to NULL the column in the snapshot definition to preserve history

nicor88 marked this conversation as resolved.
Show resolved Hide resolved
## AWS Lakeformation integration

The adapter implements AWS Lakeformation tags management in the following way:

Expand All @@ -568,7 +675,7 @@ It's important to understand the following points:
- dbt does not manage lf-tags for database
- dbt does not manage lakeformation permissions

That's why you should handle this by yourself manually or using some automation tools like terraform, AWS CDK etc.
That's why you should handle this by yourself manually or using some automation tools like terraform, AWS CDK etc.
You may find the following links useful to manage that:

<!-- markdownlint-disable -->
Expand All @@ -582,23 +689,23 @@ The adapter supports python models using [`spark`](https://docs.aws.amazon.com/a

### Setup

- A spark enabled work group created in athena
- A Spark-enabled workgroup created in Athena
- Spark execution role granted access to Athena, Glue and S3
- The spark work group is added to the ~/.dbt/profiles.yml file and the profile is referenced in dbt_project.yml
that will be created. It is recommended to keep this same as threads.
- The Spark workgroup is added to the `~/.dbt/profiles.yml` file and the profile
is referenced in `dbt_project.yml` that will be created. It is recommended to keep this same as threads.

### Spark specific table configuration
### Spark-specific table configuration

- `timeout` (`default=43200`)
- Time out in seconds for each python model execution. Defaults to 12 hours/43200 seconds.
- Time out in seconds for each Python model execution. Defaults to 12 hours/43200 seconds.
- `spark_encryption` (`default=false`)
- If this flag is set to true, encrypts data in transit between Spark nodes and also encrypts data at rest stored
locally by Spark.
- `spark_cross_account_catalog` (`default=false`)
- In spark, you can query the external account catalog and for that the consumer account has to be configured to
- In Spark, you can query the external account catalog and for that the consumer account has to be configured to
access the producer catalog.
- If this flag is set to true, "/" can be used as the glue catalog separator.
Ex: 999999999999/mydatabase.cloudfront_logs (*where *999999999999* is the external catalog id*)
Ex: 999999999999/mydatabase.cloudfront_logs (*where *999999999999* is the external catalog ID*)
- `spark_requester_pays` (`default=false`)
- When an Amazon S3 bucket is configured as requester pays, the account of the user running the query is charged for
data access and data transfer fees associated with the query.
Expand All @@ -608,11 +715,12 @@ The adapter supports python models using [`spark`](https://docs.aws.amazon.com/a

- A session is created for each unique engine configuration defined in the models that are part of the invocation.
- A session's idle timeout is set to 10 minutes. Within the timeout period, if there is a new calculation
(spark python model) ready for execution and the engine configuration matches, the process will reuse the same session.
- Number of python models running at a time depends on the `threads`. Number of sessions created for the entire run
depends on number of unique engine configurations and availability of session to maintain threads concurrency.
- For iceberg table, it is recommended to use table_properties configuration to set the format_version to 2. This is to
maintain compatability between iceberg tables created by Trino with those created by Spark.
(Spark Python model) ready for execution and the engine configuration matches, the process will reuse the same session.
- The number of Python models running at a time depends on the `threads`. The number of sessions created for the
entire run depends on the number of unique engine configurations and the availability of sessions to maintain
thread concurrency.
- For Iceberg tables, it is recommended to use `table_properties` configuration to set the `format_version` to 2.
This is to maintain compatibility between Iceberg tables created by Trino with those created by Spark.

### Example models

Expand Down Expand Up @@ -714,118 +822,22 @@ def model(dbt, spark_session):
return df.withColumn("udf_test_col", udf_with_import(col("alpha")))
```

#### Known issues in python models

- Incremental models do not fully utilize spark capabilities. They depend partially on existing sql based logic which
runs on trino.
- Snapshots materializations are not supported.
### Known issues in Python models

- Python models cannot
[reference Athena SQL views](https://docs.aws.amazon.com/athena/latest/ug/notebooks-spark.html).
- Third-party Python libraries can be used, but they must be [included in the pre-installed list]([pre-installed list])
or [imported manually]([imported manually]).
- Python models can only reference or write to tables with names meeting the
regular expression: `^[0-9a-zA-Z_]+$`. Dashes and special characters are not
supported by Spark, even though Athena supports them.
- Incremental models do not fully utilize Spark capabilities. They depend partially on existing SQL-based logic which
runs on Trino.
- Snapshot materializations are not supported.
- Spark can only reference tables within the same catalog.

### Working example

seed file - employent_indicators_november_2022_csv_tables.csv

```csv
Series_reference,Period,Data_value,Suppressed
MEIM.S1WA,1999.04,80267,
MEIM.S1WA,1999.05,70803,
MEIM.S1WA,1999.06,65792,
MEIM.S1WA,1999.07,66194,
MEIM.S1WA,1999.08,67259,
MEIM.S1WA,1999.09,69691,
MEIM.S1WA,1999.1,72475,
MEIM.S1WA,1999.11,79263,
MEIM.S1WA,1999.12,86540,
MEIM.S1WA,2000.01,82552,
MEIM.S1WA,2000.02,81709,
MEIM.S1WA,2000.03,84126,
MEIM.S1WA,2000.04,77089,
MEIM.S1WA,2000.05,73811,
MEIM.S1WA,2000.06,70070,
MEIM.S1WA,2000.07,69873,
MEIM.S1WA,2000.08,71468,
MEIM.S1WA,2000.09,72462,
MEIM.S1WA,2000.1,74897,
```

model.sql

```sql
{{ config(
materialized='table'
) }}

select row_number() over() as id
, *
, cast(from_unixtime(to_unixtime(now())) as timestamp(6)) as refresh_timestamp
from {{ ref('employment_indicators_november_2022_csv_tables') }}
```

timestamp strategy - model_snapshot_1

```sql
{% snapshot model_snapshot_1 %}

{{
config(
strategy='timestamp',
updated_at='refresh_timestamp',
unique_key='id'
)
}}

select *
from {{ ref('model') }} {% endsnapshot %}
```

invalidate hard deletes - model_snapshot_2

```sql
{% snapshot model_snapshot_2 %}

{{
config
(
unique_key='id',
strategy='timestamp',
updated_at='refresh_timestamp',
invalidate_hard_deletes=True,
)
}}
select *
from {{ ref('model') }} {% endsnapshot %}
```

check strategy - model_snapshot_3

```sql
{% snapshot model_snapshot_3 %}

{{
config
(
unique_key='id',
strategy='check',
check_cols=['series_reference','data_value']
)
}}
select *
from {{ ref('model') }} {% endsnapshot %}
```

### Snapshots Known issues

- Incremental Iceberg models - Sync all columns on schema change can't remove columns used as partitioning.
The only way, from a dbt perspective, is to do a full-refresh of the incremental model.

- Tables, schemas and database should only be lowercase

- In order to avoid potential conflicts, make sure [`dbt-athena-adapter`](https://github.com/Tomme/dbt-athena) is not
installed in the target environment.
See <https://github.com/dbt-athena/dbt-athena/issues/103> for more details.

- Snapshot does not support dropping columns from the source table. If you drop a column make sure to drop the column
from the snapshot as well. Another workaround is to NULL the column in the snapshot definition to preserve history
[pre-installed list]: https://docs.aws.amazon.com/athena/latest/ug/notebooks-spark-preinstalled-python-libraries.html
[imported manually]: https://docs.aws.amazon.com/athena/latest/ug/notebooks-import-files-libraries.html

## Contracts

Expand Down
Loading