Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Using incremental syncing, the final table shows 0 records (the raw table contains records) #8286

Closed
sherifnada opened this issue Nov 29, 2021 · 12 comments · Fixed by #8394
Closed

Comments

@sherifnada
Copy link
Contributor

Environment

Reported via Slack

Is this your first time deploying Airbyte: no
OS Version / Instance: Linux EC2 m5.2xlarge
Deployment: Docker
Airbyte Version: 0.32.6-alpha
Source name: GSC 0.1.7
destination: Redshift 0.3.20

This is happening on a brand new instance of Airbyte

Expected Behavior

The normalized table should be populated with data.

Logs

If applicable, please upload the logs from the failing operation.
For sync jobs, you can download the full logs from the UI by going to the sync attempt page and
clicking the download logs button at the top right of the logs display window.

LOG

replace this with
your long log
output here

Steps to Reproduce

sync data on a new instance from GSC to Redshift with normalization turned on

@sherifnada sherifnada added type/bug Something isn't working priority/critical Critical priority! normalization labels Nov 29, 2021
@danieldiamond
Copy link
Contributor

incremental append
Screen Shot 2021-11-29 at 5 46 21 pm
Screen Shot 2021-11-29 at 5 46 14 pm

logs-5-0.txt
logs-4-0.txt

@ChristopheDuong
Copy link
Contributor

related to #8028

@ChristopheDuong
Copy link
Contributor

ChristopheDuong commented Nov 29, 2021

maybe this is the same problems discussed in another issue: #7479 (comment)

@ChristopheDuong
Copy link
Contributor

ChristopheDuong commented Nov 29, 2021

I've been trying to reproduce this setup but was unable to get the same results (it's working as expected on my side):

�[32mnormalization�[0m - 2021-11-29 14:48:56 INFO () LineGobbler(voidCall):82 - Found 8 models, 0 tests, 0 snapshots, 0 analyses, 504 macros, 0 operations, 0 seed files, 2 sources, 0 exposures
�[32mnormalization�[0m - 2021-11-29 14:48:56 INFO () LineGobbler(voidCall):82 - 
�[32mnormalization�[0m - 2021-11-29 14:49:00 INFO () LineGobbler(voidCall):82 - 14:49:00 | Concurrency: 32 threads (target='prod')
�[32mnormalization�[0m - 2021-11-29 14:49:00 INFO () LineGobbler(voidCall):82 - 14:49:00 | 
�[32mnormalization�[0m - 2021-11-29 14:49:04 INFO () LineGobbler(voidCall):82 - 14:49:04 | 1 of 2 START incremental model test_gsc.search_analytics_by_page............................................. [RUN]
�[32mnormalization�[0m - 2021-11-29 14:49:04 INFO () LineGobbler(voidCall):82 - 14:49:04 | 2 of 2 START incremental model test_gsc.sites................................................................ [RUN]
�[32mnormalization�[0m - 2021-11-29 14:49:20 INFO () LineGobbler(voidCall):82 - 14:49:20 | 2 of 2 OK created incremental model test_gsc.sites........................................................... [�[32mINSERT 0 2�[0m in 15.73s]
�[32mnormalization�[0m - 2021-11-29 14:49:20 INFO () LineGobbler(voidCall):82 - 14:49:20 | 1 of 2 OK created incremental model test_gsc.search_analytics_by_page........................................ [�[32mINSERT 0 1811�[0m in 15.95s]
�[32mnormalization�[0m - 2021-11-29 14:49:21 INFO () LineGobbler(voidCall):82 - 14:49:21 | 
�[32mnormalization�[0m - 2021-11-29 14:49:21 INFO () LineGobbler(voidCall):82 - 14:49:21 | Finished running 2 incremental models in 24.81s.

Find below logs from the first time the sync is run (with log messages stating that tables are created from scratch)
logs-5.log

�[32mnormalization�[0m - 2021-11-29 14:33:35 INFO () LineGobbler(voidCall):82 - 14:33:35 + "integrationtests".test_gsc."search_analytics_by_page"._airbyte_ab_id does not exist yet. The table will be created or rebuilt with dbt.full_refresh

Then when running in incremental, the rows are updated with new inserts:
logs-8.log

The dbt logs with more details are here:
dbt-8.log

and destination is properly showing the right number of rows being emitted/processed:
Screenshot 2021-11-29 at 17 29 09
Screenshot 2021-11-29 at 17 30 43

the sql query is:

select 
	'_airbyte_raw_search_analytics_by_page' as from_table,
	date(_airbyte_emitted_at) as emitted_date,
	date_trunc('hour', _airbyte_emitted_at) as emitted_hour,
	date_trunc('minute', _airbyte_emitted_at) as emitted_minute,
	count(*) as row_count
from test_gsc._airbyte_raw_search_analytics_by_page
group by 2, 3, 4

union all 

select 
	'search_analytics_by_page' as from_table,
	date(_airbyte_emitted_at) as emitted_date,
	date_trunc('hour', _airbyte_emitted_at) as emitted_hour,
	date_trunc('minute', _airbyte_emitted_at) as emitted_minute,
	count(*) as row_count
from test_gsc.search_analytics_by_page
group by 2, 3, 4

order by 2, 3, 4, 1

Maybe @danieldiamond if you could please post equivalent log files and results from same queries on your environment, that would help compare what you have against my setup?

@danieldiamond
Copy link
Contributor

first sync dbt.log

2021-11-28 21:54:52.765966 (Thread-12): On model.airbyte_utils.search_analytics_by_page: /* {"app": "dbt", "dbt_version": "0.21.0", "profile_name": "normalize", "target_name": "prod", "node_id": "model.airbyte_utils.search_analytics_by_page"} */



  create temporary table
    "search_analytics_by_page__dbt_tmp215452545898"


      compound sortkey(_airbyte_emitted_at)
  as (

with __dbt__cte__search_analytics_by_page_ab1 as (

-- SQL model to parse JSON blob stored in a single column and extract into separated field columns as described by the JSON Schema
-- depends_on: "snowplow".test_gsc._airbyte_raw_search_analytics_by_page
select
    case when json_extract_path_text(_airbyte_data, 'ctr', true) != '' then json_extract_path_text(_airbyte_data, 'ctr', true) end as ctr,
    case when json_extract_path_text(_airbyte_data, 'date', true) != '' then json_extract_path_text(_airbyte_data, 'date', true) end as date,
    case when json_extract_path_text(_airbyte_data, 'page', true) != '' then json_extract_path_text(_airbyte_data, 'page', true) end as page,
    case when json_extract_path_text(_airbyte_data, 'clicks', true) != '' then json_extract_path_text(_airbyte_data, 'clicks', true) end as clicks,
    case when json_extract_path_text(_airbyte_data, 'position', true) != '' then json_extract_path_text(_airbyte_data, 'position', true) end as position,
    case when json_extract_path_text(_airbyte_data, 'site_url', true) != '' then json_extract_path_text(_airbyte_data, 'site_url', true) end as site_url,
    case when json_extract_path_text(_airbyte_data, 'impressions', true) != '' then json_extract_path_text(_airbyte_data, 'impressions', true) end as impressions,
    case when json_extract_path_text(_airbyte_data, 'search_type', true) != '' then json_extract_path_text(_airbyte_data, 'search_type', true) end as search_type,
    _airbyte_ab_id,
    _airbyte_emitted_at,
    getdate() as _airbyte_normalized_at
from "snowplow".test_gsc._airbyte_raw_search_analytics_by_page as table_alias
-- search_analytics_by_page
where 1 = 1

),  __dbt__cte__search_analytics_by_page_ab2 as (

-- SQL model to cast each column to its adequate SQL type converted from the JSON schema type
-- depends_on: __dbt__cte__search_analytics_by_page_ab1
select
    cast(ctr as
    float
) as ctr,
    cast(nullif(date, '') as
    date
) as date,
    cast(page as varchar) as page,
    cast(clicks as
    bigint
) as clicks,
    cast(position as
    float
) as position,
    cast(site_url as varchar) as site_url,
    cast(impressions as
    bigint
) as impressions,
    cast(search_type as varchar) as search_type,
    _airbyte_ab_id,
    _airbyte_emitted_at,
    getdate() as _airbyte_normalized_at
from __dbt__cte__search_analytics_by_page_ab1
-- search_analytics_by_page
where 1 = 1

),  __dbt__cte__search_analytics_by_page_ab3 as (

-- SQL model to build a hash column based on the values of this record
-- depends_on: __dbt__cte__search_analytics_by_page_ab2
select
    md5(cast(coalesce(cast(ctr as varchar), '') || '-' || coalesce(cast(date as varchar), '') || '-' || coalesce(cast(page as varchar), '') || '-' || coalesce(cast(clicks as varchar), '') || '-' || coalesce(cast(position as varchar), '') || '-' || coalesce(cast(site_url as varchar), '') || '-' || coalesce(cast(impressions as varchar), '') || '-' || coalesce(cast(search_type as varchar), '') as varchar)) as _airbyte_search_analytics_by_page_hashid,
    tmp.*
from __dbt__cte__search_analytics_by_page_ab2 tmp
-- search_analytics_by_page
where 1 = 1

)-- Final base SQL model
-- depends_on: __dbt__cte__search_analytics_by_page_ab3
select
    ctr,
    date,
    page,
    clicks,
    position,
    site_url,
    impressions,
    search_type,
    _airbyte_ab_id,
    _airbyte_emitted_at,
    getdate() as _airbyte_normalized_at,
    _airbyte_search_analytics_by_page_hashid
from __dbt__cte__search_analytics_by_page_ab3
-- search_analytics_by_page from "snowplow".test_gsc._airbyte_raw_search_analytics_by_page
where 1 = 1

and cast(_airbyte_emitted_at as
    timestamp with time zone
) >= (select max(cast(_airbyte_emitted_at as
    timestamp with time zone
)) from "snowplow".test_gsc."search_analytics_by_page")

  );

second (incremental) sync dbt.log

2021-11-28 21:56:39.327054 (Thread-12): On model.airbyte_utils.search_analytics_by_page: /* {"app": "dbt", "dbt_version": "0.21.0", "profile_name": "normalize", "target_name": "prod", "node_id": "model.airbyte_utils.search_analytics_by_page"} */



  create temporary table
    "search_analytics_by_page__dbt_tmp215639087673"


      compound sortkey(_airbyte_emitted_at)
  as (

with __dbt__cte__search_analytics_by_page_ab1 as (

-- SQL model to parse JSON blob stored in a single column and extract into separated field columns as described by the JSON Schema
-- depends_on: "snowplow".test_gsc._airbyte_raw_search_analytics_by_page
select
    case when json_extract_path_text(_airbyte_data, 'ctr', true) != '' then json_extract_path_text(_airbyte_data, 'ctr', true) end as ctr,
    case when json_extract_path_text(_airbyte_data, 'date', true) != '' then json_extract_path_text(_airbyte_data, 'date', true) end as date,
    case when json_extract_path_text(_airbyte_data, 'page', true) != '' then json_extract_path_text(_airbyte_data, 'page', true) end as page,
    case when json_extract_path_text(_airbyte_data, 'clicks', true) != '' then json_extract_path_text(_airbyte_data, 'clicks', true) end as clicks,
    case when json_extract_path_text(_airbyte_data, 'position', true) != '' then json_extract_path_text(_airbyte_data, 'position', true) end as position,
    case when json_extract_path_text(_airbyte_data, 'site_url', true) != '' then json_extract_path_text(_airbyte_data, 'site_url', true) end as site_url,
    case when json_extract_path_text(_airbyte_data, 'impressions', true) != '' then json_extract_path_text(_airbyte_data, 'impressions', true) end as impressions,
    case when json_extract_path_text(_airbyte_data, 'search_type', true) != '' then json_extract_path_text(_airbyte_data, 'search_type', true) end as search_type,
    _airbyte_ab_id,
    _airbyte_emitted_at,
    getdate() as _airbyte_normalized_at
from "snowplow".test_gsc._airbyte_raw_search_analytics_by_page as table_alias
-- search_analytics_by_page
where 1 = 1

),  __dbt__cte__search_analytics_by_page_ab2 as (

-- SQL model to cast each column to its adequate SQL type converted from the JSON schema type
-- depends_on: __dbt__cte__search_analytics_by_page_ab1
select
    cast(ctr as
    float
) as ctr,
    cast(nullif(date, '') as
    date
) as date,
    cast(page as varchar) as page,
    cast(clicks as
    bigint
) as clicks,
    cast(position as
    float
) as position,
    cast(site_url as varchar) as site_url,
    cast(impressions as
    bigint
) as impressions,
    cast(search_type as varchar) as search_type,
    _airbyte_ab_id,
    _airbyte_emitted_at,
    getdate() as _airbyte_normalized_at
from __dbt__cte__search_analytics_by_page_ab1
-- search_analytics_by_page
where 1 = 1

),  __dbt__cte__search_analytics_by_page_ab3 as (

-- SQL model to build a hash column based on the values of this record
-- depends_on: __dbt__cte__search_analytics_by_page_ab2
select
    md5(cast(coalesce(cast(ctr as varchar), '') || '-' || coalesce(cast(date as varchar), '') || '-' || coalesce(cast(page as varchar), '') || '-' || coalesce(cast(clicks as varchar), '') || '-' || coalesce(cast(position as varchar), '') || '-' || coalesce(cast(site_url as varchar), '') || '-' || coalesce(cast(impressions as varchar), '') || '-' || coalesce(cast(search_type as varchar), '') as varchar)) as _airbyte_search_analytics_by_page_hashid,
    tmp.*
from __dbt__cte__search_analytics_by_page_ab2 tmp
-- search_analytics_by_page
where 1 = 1

)-- Final base SQL model
-- depends_on: __dbt__cte__search_analytics_by_page_ab3
select
    ctr,
    date,
    page,
    clicks,
    position,
    site_url,
    impressions,
    search_type,
    _airbyte_ab_id,
    _airbyte_emitted_at,
    getdate() as _airbyte_normalized_at,
    _airbyte_search_analytics_by_page_hashid
from __dbt__cte__search_analytics_by_page_ab3
-- search_analytics_by_page from "snowplow".test_gsc._airbyte_raw_search_analytics_by_page
where 1 = 1

and cast(_airbyte_emitted_at as
    timestamp with time zone
) >= (select max(cast(_airbyte_emitted_at as
    timestamp with time zone
)) from "snowplow".test_gsc."search_analytics_by_page")

  );

Screen Shot 2021-11-30 at 3 49 22 pm

@danieldiamond
Copy link
Contributor

@ChristopheDuong you can see in the logs i posted earlier the INSERT logs from dbt showing 0 records inserted

@ChristopheDuong
Copy link
Contributor

ChristopheDuong commented Nov 30, 2021

yes, I saw that the final insert is processing 0 rows.

The question is then what is the content of intermediate queries or temporary tables search_analytics_by_page__dbt_tmp215452545898 and search_analytics_by_page__dbt_tmp215639087673 tables? (not the SQL query itself, but what data do they return?)

That's why I wanted to see a comparison of data in your raw tables vs final tables.

in your screenshot here:
image

I don't see the row counts per emitted date in the search_analytics_by_page table in order to compare vs the raw table?

I am guessing it's because the table search_analytics_by_page is empty... and so this query also returns empty rows:

with __dbt__cte__search_analytics_by_page_ab1 as (

-- SQL model to parse JSON blob stored in a single column and extract into separated field columns as described by the JSON Schema
-- depends_on: "snowplow".test_gsc._airbyte_raw_search_analytics_by_page
select
    case when json_extract_path_text(_airbyte_data, 'ctr', true) != '' then json_extract_path_text(_airbyte_data, 'ctr', true) end as ctr,
    case when json_extract_path_text(_airbyte_data, 'date', true) != '' then json_extract_path_text(_airbyte_data, 'date', true) end as date,
    case when json_extract_path_text(_airbyte_data, 'page', true) != '' then json_extract_path_text(_airbyte_data, 'page', true) end as page,
    case when json_extract_path_text(_airbyte_data, 'clicks', true) != '' then json_extract_path_text(_airbyte_data, 'clicks', true) end as clicks,
    case when json_extract_path_text(_airbyte_data, 'position', true) != '' then json_extract_path_text(_airbyte_data, 'position', true) end as position,
    case when json_extract_path_text(_airbyte_data, 'site_url', true) != '' then json_extract_path_text(_airbyte_data, 'site_url', true) end as site_url,
    case when json_extract_path_text(_airbyte_data, 'impressions', true) != '' then json_extract_path_text(_airbyte_data, 'impressions', true) end as impressions,
    case when json_extract_path_text(_airbyte_data, 'search_type', true) != '' then json_extract_path_text(_airbyte_data, 'search_type', true) end as search_type,
    _airbyte_ab_id,
    _airbyte_emitted_at,
    getdate() as _airbyte_normalized_at
from "snowplow".test_gsc._airbyte_raw_search_analytics_by_page as table_alias
-- search_analytics_by_page
where 1 = 1

),  __dbt__cte__search_analytics_by_page_ab2 as (

-- SQL model to cast each column to its adequate SQL type converted from the JSON schema type
-- depends_on: __dbt__cte__search_analytics_by_page_ab1
select
    cast(ctr as
    float
) as ctr,
    cast(nullif(date, '') as
    date
) as date,
    cast(page as varchar) as page,
    cast(clicks as
    bigint
) as clicks,
    cast(position as
    float
) as position,
    cast(site_url as varchar) as site_url,
    cast(impressions as
    bigint
) as impressions,
    cast(search_type as varchar) as search_type,
    _airbyte_ab_id,
    _airbyte_emitted_at,
    getdate() as _airbyte_normalized_at
from __dbt__cte__search_analytics_by_page_ab1
-- search_analytics_by_page
where 1 = 1

),  __dbt__cte__search_analytics_by_page_ab3 as (

-- SQL model to build a hash column based on the values of this record
-- depends_on: __dbt__cte__search_analytics_by_page_ab2
select
    md5(cast(coalesce(cast(ctr as varchar), '') || '-' || coalesce(cast(date as varchar), '') || '-' || coalesce(cast(page as varchar), '') || '-' || coalesce(cast(clicks as varchar), '') || '-' || coalesce(cast(position as varchar), '') || '-' || coalesce(cast(site_url as varchar), '') || '-' || coalesce(cast(impressions as varchar), '') || '-' || coalesce(cast(search_type as varchar), '') as varchar)) as _airbyte_search_analytics_by_page_hashid,
    tmp.*
from __dbt__cte__search_analytics_by_page_ab2 tmp
-- search_analytics_by_page
where 1 = 1

)-- Final base SQL model
-- depends_on: __dbt__cte__search_analytics_by_page_ab3
select
    ctr,
    date,
    page,
    clicks,
    position,
    site_url,
    impressions,
    search_type,
    _airbyte_ab_id,
    _airbyte_emitted_at,
    getdate() as _airbyte_normalized_at,
    _airbyte_search_analytics_by_page_hashid
from __dbt__cte__search_analytics_by_page_ab3
-- search_analytics_by_page from "snowplow".test_gsc._airbyte_raw_search_analytics_by_page
where 1 = 1

and cast(_airbyte_emitted_at as
    timestamp with time zone
) >= (select max(cast(_airbyte_emitted_at as
    timestamp with time zone
)) from "snowplow".test_gsc."search_analytics_by_page")

@danieldiamond
Copy link
Contributor

just ran that above SQL
all CTEs work except that final WHERE clause
i.e.

and cast(_airbyte_emitted_at as
    timestamp with time zone
) >= (select max(cast(_airbyte_emitted_at as
    timestamp with time zone
)) from "snowplow".test_gsc."search_analytics_by_page")

will cause that query to return nothing. guessing by looking at it that because search_analytics_by_page is empty, _airbyte_emitted_at > NULL = NULL

@ChristopheDuong
Copy link
Contributor

ChristopheDuong commented Nov 30, 2021

I see, so if you delete completely the table search_analytics_by_page (by dropping it, not truncating it), your sync should work.

But if the table is empty, the incremental behavior does not work because of _airbyte_emitted_at > NULL = NULL.
(unless we tweak the incremental clause to handle empty tables)

@ChristopheDuong
Copy link
Contributor

Ok, I reproduced and confirmed it locally, I will make a PR to handle cases when the destination table already exists and is empty.

Workaround for the moment is to drop the final table in the meantime.

Thanks for your help!

@danieldiamond
Copy link
Contributor

thank you! handling empty tables is definitely the right move but future work, I wonder if reset connector should be reconsidered as this situation reminds me of previous issues relating to SCD tables not being dropped i.e. #5417

@ChristopheDuong
Copy link
Contributor

Yes, you are right the reset implementation does introduce some edge cases here and there...

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

5 participants