Skip to content

Commit

Permalink
Merge pull request #172 from fivetran/feature/historical-schedules-ja…
Browse files Browse the repository at this point in the history
…mie-redshift

Feature/historical schedules jamie redshift
  • Loading branch information
fivetran-catfritz authored Oct 4, 2024
2 parents 4f57203 + a6f3536 commit e342c79
Show file tree
Hide file tree
Showing 5 changed files with 68 additions and 35 deletions.
2 changes: 2 additions & 0 deletions .buildkite/scripts/run_models.sh
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@ cd integration_tests
dbt deps
dbt seed --target "$db" --full-refresh
dbt run --target "$db" --full-refresh
dbt run --target "$db"
dbt test --target "$db"
dbt run --vars '{zendesk__unstructured_enabled: true, using_schedules: false, using_domain_names: false, using_user_tags: false, using_ticket_form_history: false, using_organization_tags: false}' --target "$db" --full-refresh
dbt run --vars '{zendesk__unstructured_enabled: true, using_schedules: false, using_domain_names: false, using_user_tags: false, using_ticket_form_history: false, using_organization_tags: false}' --target "$db"
dbt test --target "$db"

# dbt run-operation fivetran_utils.drop_schemas_automation --target "$db"
4 changes: 2 additions & 2 deletions integration_tests/dbt_project.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ profile: 'integration_tests'

vars:
zendesk_schema: zendesk_integration_tests_50
using_schedule_histories: true
using_schedules: true
zendesk_source:
zendesk_organization_identifier: "organization_data"
zendesk_schedule_identifier: "schedule_data"
Expand Down Expand Up @@ -45,8 +47,6 @@ models:
seeds:
+quote_columns: "{{ true if target.type == 'redshift' else false }}"
zendesk_integration_tests:
+column_types:
_fivetran_synced: timestamp
+column_types:
_fivetran_synced: timestamp
group_data:
Expand Down
2 changes: 1 addition & 1 deletion macros/clean_schedule.sql
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,5 @@
{%- endmacro %}

{% macro default__clean_schedule(column_name) -%}
replace(replace(replace(replace({{ column_name }}, '{', ''), '}', ''), '"', ''), ' ', '')
replace(replace(replace(replace(cast({{ column_name }} as {{ dbt.type_string() }}), '{', ''), '}', ''), '"', ''), ' ', '')
{%- endmacro %}
31 changes: 17 additions & 14 deletions macros/regex_extract.sql
Original file line number Diff line number Diff line change
@@ -1,42 +1,45 @@
{% macro regex_extract(string, regex) -%}
{% macro regex_extract(string, day) -%}

{{ adapter.dispatch('regex_extract', 'zendesk') (string, regex) }}
{{ adapter.dispatch('regex_extract', 'zendesk') (string, day) }}

{%- endmacro %}

{% macro default__regex_extract(string, regex) %}

{% macro default__regex_extract(string, day) %}
{% set regex = "'.*?" ~ day ~ ".*?({.*?})'" %}
regexp_extract({{ string }}, {{ regex }} )

{%- endmacro %}

{% macro bigquery__regex_extract(string, regex) %}

{% macro bigquery__regex_extract(string, day) %}
{% set regex = "'.*?" ~ day ~ ".*?({.*?})'" %}
regexp_extract({{ string }}, {{ regex }} )

{%- endmacro %}

{% macro snowflake__regex_extract(string, regex) %}
{% macro snowflake__regex_extract(string, day) %}
{% set regex = "'.*?" ~ day ~ ".*?({.*?})'" %}

REGEXP_SUBSTR({{ string }}, {{ regex }}, 1, 1, 'e', 1 )

{%- endmacro %}

{% macro postgres__regex_extract(string, regex) %}
{% macro postgres__regex_extract(string, day) %}
{% set regex = "'.*?" ~ day ~ ".*?({.*?})'" %}

(regexp_matches({{ string }}, {{ regex }}))[1]

{%- endmacro %}

{% macro redshift__regex_extract(string, regex) %}
{% macro redshift__regex_extract(string, day) %}

{% set regex = '"' ~ day ~ '"' ~ ':\\\{([^\\\}]*)\\\}' -%}

{% set reformatted_regex = regex | replace(".*?", ".*") | replace("{", "\\\{") | replace("}", "\\\}") -%}
REGEXP_SUBSTR({{ string }}, {{ reformatted_regex }}, 1, 1, 'e')
'{' || REGEXP_SUBSTR({{ string }}, '{{ regex }}', 1, 1, 'e') || '}'

{%- endmacro %}

{% macro spark__regex_extract(string, regex) %}
{% set reformatted_regex = regex | replace("{", "\\\{") | replace("}", "\\\}") -%}
regexp_extract({{ string }}, {{ reformatted_regex }}, 1)
{% macro spark__regex_extract(string, day) %}
{% set regex = "'.*?" ~ day ~ ".*?({.*?})'" | replace("{", "\\\{") | replace("}", "\\\}") %}
regexp_extract({{ string }}, {{ regex }}, 1)

{%- endmacro %}
64 changes: 46 additions & 18 deletions models/history/int_zendesk__schedule_history.sql
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,16 @@ with audit_logs as (
where lower(change_description) like '%workweek changed from%'

), audit_logs_enhanced as (
select
select
schedule_id,
row_number() over (partition by schedule_id order by created_at) as schedule_id_index,
created_at,
replace(replace(replace(replace(change_description,
-- Clean up the change_description, sometimes has random html stuff in it
replace(replace(replace(replace(replace(replace(replace(replace(replace(replace(replace(change_description,
'workweek changed from', ''),
'"', '"'),
'amp;', ''),
'=>', ':')
'=>', ':'), ':mon:', '"mon":'), ':tue:', '"tue":'), ':wed:', '"wed":'), ':thu:', '"thu":'), ':fri:', '"fri":'), ':sat:', '"sat":'), ':sun:', '"sun":')
as change_description_cleaned
from audit_logs

Expand Down Expand Up @@ -101,47 +102,74 @@ with audit_logs as (
schedule_change,
'{{ day }}' as day_of_week,
cast('{{ day_number }}' as {{ dbt.type_int() }}) as day_of_week_number,
{{ zendesk.regex_extract('schedule_change', "'.*?" ~ day ~ ".*?({.*?})'") }} as day_of_week_schedule
from consolidate_actual_changes
{{ zendesk.regex_extract('schedule_change', day) }} as day_of_week_schedule
from consolidate_same_day_changes

{% if not loop.last %}union all{% endif %}
{% endfor %}

{% if target.type == 'redshift' %}
-- using PartiQL syntax to work with redshift's SUPER types, which requires an extra CTE
), redshift_parse_schedule as (
-- Redshift requires another CTE for unnesting
select
schedule_id,
valid_from,
valid_until,
schedule_change,
day_of_week,
day_of_week_number,
day_of_week_schedule,
json_parse('[' || replace(replace(day_of_week_schedule, ', ', ','), ',', '},{') || ']') as json_schedule

from split_days
where day_of_week_schedule != '{}'

), unnested_schedules as (
select
schedule_id,
valid_from,
valid_until,
schedule_change,
day_of_week,
day_of_week_number,
-- go back to strings
cast(day_of_week_schedule as {{ dbt.type_string() }}) as day_of_week_schedule,
{{ clean_schedule('JSON_SERIALIZE(unnested_schedule)') }} as cleaned_unnested_schedule

from redshift_parse_schedule as schedules, schedules.json_schedule as unnested_schedule

{% else %}
), unnested_schedules as (
select
split_days.*,

{%- if target.type == 'bigquery' %}
{%- if target.type == 'bigquery' %}
{{ clean_schedule('unnested_schedule') }} as cleaned_unnested_schedule
from split_days
cross join unnest(json_extract_array('[' || replace(day_of_week_schedule, ',', '},{') || ']', '$')) as unnested_schedule

{%- elif target.type == 'snowflake' %}
{%- elif target.type == 'snowflake' %}
unnested_schedule.key || ':' || unnested_schedule.value as cleaned_unnested_schedule
from split_days
cross join lateral flatten(input => parse_json(replace(replace(day_of_week_schedule, '\}\}', '\}'), '\{\{', '\{'))) as unnested_schedule

{%- elif target.type == 'postgres' %}
{%- elif target.type == 'postgres' %}
{{ clean_schedule('unnested_schedule::text') }} as cleaned_unnested_schedule
from split_days
cross join lateral jsonb_array_elements(('[' || replace(day_of_week_schedule, ',', '},{') || ']')::jsonb) as unnested_schedule

{%- elif target.type in ('databricks', 'spark') %}
{%- elif target.type in ('databricks', 'spark') %}
{{ clean_schedule('unnested_schedule') }} as cleaned_unnested_schedule
from split_days
lateral view explode(from_json(concat('[', replace(day_of_week_schedule, ',', '},{'), ']'), 'array<string>')) as unnested_schedule

{%- elif target.type == 'redshift' %}
{# json_parse('[' || replace(replace(day_of_week_schedule, '\}\}', '\}'), '\{\{', '\{') || ']') as json_schedule
from split_days #}
{# cross join lateral json_parse(replace(replace(day_of_week_schedule, '\}\}', '\}'), '\{\{', '\{')) as element #}

{% else %}
cast(null as {{ dbt.type_string() }}) as cleaned_unnested_schedule
from split_days
{%- endif %}

{% else %}
cast(null as {{ dbt.type_string() }}) as cleaned_unnested_schedule
from split_days
{%- endif %}
{% endif %}

), split_times as (
select
Expand Down

0 comments on commit e342c79

Please sign in to comment.