Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DO NOT MERGE] patched 0.21.0 #2

Draft
wants to merge 8 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 23 additions & 14 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,16 +41,19 @@ stored login info. You can configure the AWS profile name to use via `aws_profil

A dbt profile can be configured to run against AWS Athena using the following configuration:

| Option | Description | Required? | Example |
|---------------- |-------------------------------------------------------------------------------- |----------- |-------------------- |
| s3_staging_dir | S3 location to store Athena query results and metadata | Required | `s3://bucket/dbt/` |
| region_name | AWS region of your Athena instance | Required | `eu-west-1` |
| schema | Specify the schema (Athena database) to build models into (lowercase **only**) | Required | `dbt` |
| database | Specify the database (Data catalog) to build models into (lowercase **only**) | Required | `awsdatacatalog` |
| poll_interval | Interval in seconds to use for polling the status of query results in Athena | Optional | `5` |
| aws_profile_name| Profile to use from your AWS shared credentials file. | Optional | `my-profile` |
| work_group| Identifier of Athena workgroup | Optional | `my-custom-workgroup` |
| num_retries| Number of times to retry a failing query | Optional | `3` | `5`
| Option | Description | Required? | Example |
|---------------- |-------------------------------------------------------------------------------- |----------- |---------------------- |
| s3_staging_dir | S3 location to store Athena query results and metadata | Required | `s3://bucket/dbt/` |
| region_name | AWS region of your Athena instance | Required | `eu-west-1` |
| schema | Specify the schema (Athena database) to build models into (lowercase **only**) | Required | `dbt` |
| database | Specify the database (Data catalog) to build models into (lowercase **only**) | Required | `awsdatacatalog` |
| poll_interval | Interval in seconds to use for polling the status of query results in Athena | Optional | `5` |
| aws_profile_name| Profile to use from your AWS shared credentials file. | Optional | `my-profile` |
| work_group | Identifier of Athena workgroup | Optional | `my-custom-workgroup` |
| num_retries | Number of times to retry a failing query | Optional | `3` |
| s3_data_dir | Prefix for storing tables, if different from the connection's `s3_staging_dir` | Optional | `s3://bucket2/dbt/` |
| s3_data_naming | How to generate table paths in `s3_data_dir`: `uuid/schema_table` | Optional | `uuid` |


**Example profiles.yml entry:**
```yaml
Expand Down Expand Up @@ -78,9 +81,7 @@ _Additional information_
#### Table Configuration

* `external_location` (`default=none`)
* The location where Athena saves your table in Amazon S3
* If `none` then it will default to `{s3_staging_dir}/tables`
* If you are using a static value, when your table/partition is recreated underlying data will be cleaned up and overwritten by new data
* If set, the full S3 path in which the table will be saved.
* `partitioned_by` (`default=none`)
* An array list of columns by which the table will be partitioned
* Limited to creation of 100 partitions (_currently_)
Expand All @@ -93,7 +94,15 @@ _Additional information_
* Supports `ORC`, `PARQUET`, `AVRO`, `JSON`, or `TEXTFILE`
* `field_delimiter` (`default=none`)
* Custom field delimiter, for when format is set to `TEXTFILE`


The location in which a table is saved is determined by:

1. If `external_location` is defined, that value is used.
2. If `s3_data_dir` is defined, the path is determined by that and `s3_data_naming`:
+ `s3_data_naming=uuid`: `{s3_data_dir}/{uuid4()}/`
+ `s3_data_naming=schema_table`: `{s3_data_dir}/{schema}/{table}/`
3. Otherwise, the default location for a CTAS query is used, which will depend on how your workgroup is configured.

More information: [CREATE TABLE AS][create-table-as]

[run_started_at]: https://docs.getdbt.com/reference/dbt-jinja-functions/run_started_at
Expand Down
2 changes: 1 addition & 1 deletion dbt/adapters/athena/__version__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version = "0.21.0"
version = "0.21.0+nvlt3"
2 changes: 2 additions & 0 deletions dbt/adapters/athena/connections.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ class AthenaCredentials(Credentials):
poll_interval: float = 1.0
_ALIASES = {"catalog": "database"}
num_retries: Optional[int] = 5
s3_data_dir: Optional[str] = None
s3_data_naming: Optional[str] = "uuid"

@property
def type(self) -> str:
Expand Down
77 changes: 66 additions & 11 deletions dbt/adapters/athena/impl.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from uuid import uuid4
import agate
import re
import boto3
import boto3.session
from botocore.exceptions import ClientError

from dbt.adapters.base import available
Expand Down Expand Up @@ -36,22 +36,76 @@ def convert_datetime_type(
return "timestamp"

@available
def s3_uuid_table_location(self):
def s3_table_prefix(self) -> str:
"""
Returns the root location for storing tables in S3.

This is `s3_data_dir`, if set, and `s3_staging_dir/tables/` if not.

We generate a value here even if `s3_data_dir` is not set,
since creating a seed table requires a non-default location.
"""
conn = self.connections.get_thread_connection()
client = conn.handle
creds = conn.credentials
if creds.s3_data_dir is not None:
return creds.s3_data_dir
else:
return f"{creds.s3_staging_dir}tables/"

@available
def s3_uuid_table_location(self) -> str:
"""
Returns a random location for storing a table, using a UUID as
the final directory part
"""
return f"{self.s3_table_prefix()}{str(uuid4())}/"


@available
def s3_schema_table_location(self, schema_name: str, table_name: str) -> str:
"""
Returns a fixed location for storing a table determined by the
(athena) schema and table name
"""
return f"{self.s3_table_prefix()}{schema_name}/{table_name}/"

@available
def s3_table_location(self, schema_name: str, table_name: str) -> str:
"""
Returns either a UUID or database/table prefix for storing a table,
depending on the value of s3_table
"""
conn = self.connections.get_thread_connection()
creds = conn.credentials
if creds.s3_data_naming == "schema_table":
return self.s3_schema_table_location(schema_name, table_name)
elif creds.s3_data_naming == "uuid":
return self.s3_uuid_table_location()
else:
raise ValueError(f"Unknown value for s3_data_naming: {creds.s3_data_naming}")

@available
def has_s3_data_dir(self) -> bool:
"""
Returns true if the user has specified `s3_data_dir`, and
we should set `external_location
"""
conn = self.connections.get_thread_connection()
creds = conn.credentials
return creds.s3_data_dir is not None

return f"{client.s3_staging_dir}tables/{str(uuid4())}/"

@available
def clean_up_partitions(
self, database_name: str, table_name: str, where_condition: str
):
# Look up Glue partitions & clean up
conn = self.connections.get_thread_connection()
client = conn.handle
creds = conn.credentials
session = boto3.session.Session(region_name=creds.region_name, profile_name=creds.aws_profile_name)

glue_client = boto3.client('glue', region_name=client.region_name)
s3_resource = boto3.resource('s3', region_name=client.region_name)
glue_client = session.client('glue')
s3_resource = session.resource('s3')
partitions = glue_client.get_partitions(
# CatalogId='123456789012', # Need to make this configurable if it is different from default AWS Account ID
DatabaseName=database_name,
Expand All @@ -74,8 +128,10 @@ def clean_up_table(
):
# Look up Glue partitions & clean up
conn = self.connections.get_thread_connection()
client = conn.handle
glue_client = boto3.client('glue', region_name=client.region_name)
creds = conn.credentials
session = boto3.session.Session(region_name=creds.region_name, profile_name=creds.aws_profile_name)

glue_client = session.client('glue')
try:
table = glue_client.get_table(
DatabaseName=database_name,
Expand All @@ -93,7 +149,6 @@ def clean_up_table(
if m is not None:
bucket_name = m.group(1)
prefix = m.group(2)
s3_resource = boto3.resource('s3', region_name=client.region_name)
s3_resource = session.resource('s3')
s3_bucket = s3_resource.Bucket(bucket_name)
s3_bucket.objects.filter(Prefix=prefix).delete()

6 changes: 3 additions & 3 deletions dbt/include/athena/macros/adapters.sql
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
with (
{%- if external_location is not none and not temporary %}
external_location='{{ external_location }}',
{%- elif adapter.has_s3_data_dir() -%}
external_location='{{ adapter.s3_table_location(relation.schema, relation.identifier) }}',
{%- endif %}
{%- if partitioned_by is not none %}
partitioned_by=ARRAY{{ partitioned_by | tojson | replace('\"', '\'') }},
Expand Down Expand Up @@ -110,9 +112,7 @@
{% endmacro %}

{% macro athena__drop_relation(relation) -%}
{% if config.get('incremental_strategy') == 'insert_overwrite' %}
{%- do adapter.clean_up_table(relation.schema, relation.table) -%}
{% endif %}
{%- do adapter.clean_up_table(relation.schema, relation.table) -%}
{% call statement('drop_relation', auto_begin=False) -%}
drop {{ relation.type }} if exists {{ relation }}
{%- endcall %}
Expand Down
2 changes: 1 addition & 1 deletion dbt/include/athena/macros/materializations/seed.sql
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
{%- endfor -%}
)
stored as parquet
location '{{ adapter.s3_uuid_table_location() }}'
location '{{ adapter.s3_table_location(model["schema"], model["alias"]) }}'
tblproperties ('classification'='parquet')
{% endset %}

Expand Down
4 changes: 2 additions & 2 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
dbt-core==0.21.0
pyathena==2.2.0
boto3==1.18.12
tenacity==6.3.1
boto3
tenacity
4 changes: 2 additions & 2 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ def _dbt_athena_version() -> str:
install_requires=[
"dbt-core==0.21.0",
"pyathena==2.2.0",
"boto3==1.18.12",
"tenacity==6.3.1",
"boto3",
"tenacity",
]
)