Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve normalization incremental runtime (incremental DBT) #4286

Closed
sherifnada opened this issue Jun 23, 2021 · 12 comments · Fixed by #7162
Closed

Improve normalization incremental runtime (incremental DBT) #4286

sherifnada opened this issue Jun 23, 2021 · 12 comments · Fixed by #7162
Assignees
Labels
area/connectors Connector related issues lang/python normalization type/enhancement New feature or request

Comments

@sherifnada
Copy link
Contributor

Tell us about the problem you're trying to solve

We've received many reports in the past few weeks of normalization taking an incredibly long time to run.

One example:
logs-42-0.txt
A user synced a few hundred records from Zendesk. The sync took about 25s. Normalization took about 50 minutes. For the record the destination contains >200k records but the individual sync read only a few hundred new records. Slack thread

One very likely root cause is that currently normalization reads the entire _raw table and recreates the normalized tables completely from scratch every time a sync happens. This means work scales with the size of the total dataset. We want work to scale with the size of the new data written to the destination.

In the interest of complete intellectual honesty, I'm actually not 100% sure the issue is the size of the target dataset. The only reason I hesitate is because other users have reported syncing >100gb DBs into warehouses. If this problem was this bad on 200k records from an API, then surely it would have ground the large DB syncs to a halt. So there might be more nuance to this issue than I'm realizing.

Describe the solution you’d like

I would like normalization runtime to scale with the size of data written not total data set. Also I would like normalization to take a "reasonable" amount of time to run. ~300 new records and 200k total records taking 50m seems crazy. 5 minutes is acceptable. 1 minute would be amazing.

Describe the alternative you’ve considered or used

Not using normalization
Rolling out a custom connector which implements its own normalization

@sherifnada sherifnada added type/enhancement New feature or request area/connectors Connector related issues normalization lang/python labels Jun 23, 2021
@sherifnada
Copy link
Contributor Author

Update @Eric Hartono and @gunu in Slack once this issue is done

@Boopathy-Raja
Copy link

Boopathy-Raja commented Jul 26, 2021

Update @Boopathy Raja in Slack once this issue is done.

@paschmaria
Copy link

Update @Paschal Onuorah in Slack once this issue is done.

@ChristopheDuong ChristopheDuong changed the title Improve normalization runtime Improve normalization incremental runtime Sep 1, 2021
@ChristopheDuong ChristopheDuong changed the title Improve normalization incremental runtime Improve normalization incremental runtime (incremental DBT) Sep 8, 2021
@ChristopheDuong
Copy link
Contributor

To solve this issue, we'd probably need:

Read more here for example:

@Jiyuan-Bolt
Copy link

Thanks for tracking this issue! This is a blocker for our company (Bolt) to adopt Airbyte. We want to use Airbyte to keep our data warehouse (BigQuery) in sync with Operational DB (Postgres, 500GB now but growing fast) and the current way normalization works will cost us a fortune to use Airbyte because BigQuery is charged by both storage and query.

@paschmaria
Copy link

This is exactly the same issue we had. After the bill shock from last month, we've had to turn Airbyte off completely for the interim.

@olivermeyer
Copy link
Contributor

To solve this issue, we'd probably need:

* to figure some new columns in the raw table (such as `sync_id` or dedicated `cursor field` column containing the value for each row).

* from there, we'd be able to re-compute the normalized table last state or store it in some metadata table to retrieve this information

* comparing the last state of the raw table vs the normalized table would allow the usage of the incremental feature https://docs.getdbt.com/docs/building-a-dbt-project/building-models/configuring-incremental-models

Read more here for example:

* https://discourse.getdbt.com/t/on-the-limits-of-incrementality/303

* https://discourse.getdbt.com/t/benchmarking-incremental-strategies-on-bigquery/981

This might be a naive approach but I gave this a bit of thought and figured it would be possible to use the _airbyte_emitted_at field to make the normalization incremental - in dbt this would look like where _airbyte_emitted_at > (select max(_airbyte_emitted_at) from {{ this }}. Should be pretty straightforward to do, but I'm not sure that it works will all destinations.

@mohammad-bolt
Copy link
Contributor

actually dbt supports merge materialization.

I made these changes sample dbt code

  1. changed to merge
{{ config(materialized='incremental') }}
  1. updated incremental load
{% if is_incremental() %}
  where _airbyte_emitted_at > (select max(_airbyte_emitted_at) from {{ this }})
{% endif %}

and it started loading incrementally.

in my case

 created model dbt.devices...................... [CREATE (189M rows, 57.8 GB processed) in 5.4m]

changed to

created incremental model dbt.devices...................... [MERGE (1.0 rows, 57.8 GB processed) in 3.34s]

note that byte usage is still same because bigquery doesn't care about WHERE condition. so this change wouldn't save you on money. To mitigate this you can either switch to bigquery reserved pricing (need a minimum of 96$/day) or we can probably cluster the resulting table by _airbyte_emitted_at

@olivermeyer
Copy link
Contributor

That makes sense - to make incremental normalization as efficient as possible, the raw table should be clustered/sorted by whichever field is used for the incremental logic. I think this needs to be done at the destination level though, since the concepts vary between destinations (e.g. clustering in BigQuery vs sorting in Redshift). For instance, for Redshift it could be done here. I'm not sure whether this is in the scope of this issue.

@mohammad-bolt
Copy link
Contributor

update: as mentioed in https://airbytehq.slack.com/archives/C019WEENQRM/p1633606112398200 we achieved significant cost saving by only clustering on _airbyte_emitted_at and writing a custom dbt that uses dbt's incremental tables and only loads new things after each run.

@andresbravog
Copy link
Contributor

Hi everyone,

we have implemented this on custom DBT transformations and I'd like to share some issues we faced:

 where _airbyte_emitted_at > (select max(_airbyte_emitted_at) from {{ this }})

will bring issues when the last job failed or was canceled for some reason after destination succeeded (transformation failure due to BQ job limit for example). We finally went with a multiplier of the connection running cycle (if the connection runs every 1h we look 6h back).

{% if not should_full_refresh()  %}
  where `_airbyte_emitted_at` > TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 6 HOUR)
{% endif %}

Using the macro should_full_refresh allows you to run dbt with the flag --full-refresh to force not incremental transformations when needed (aka data refresh)

https://github.com/dbt-labs/dbt-core/blob/HEAD/core/dbt/include/global_project/macros/materializations/helpers.sql#L68

@danieldiamond
Copy link
Contributor

Just adding a comment here. Connector MySQL to Snowflake. The normalization feels very excessive (and is very costly here)

Screen Shot 2021-10-19 at 11 25 23 am

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area/connectors Connector related issues lang/python normalization type/enhancement New feature or request
Projects
None yet
Development

Successfully merging a pull request may close this issue.

10 participants