Skip to content

Commit

Permalink
Bilhetagem Jaé - Adiciona tipo da gratuidade e materializa passageiro…
Browse files Browse the repository at this point in the history
…s_hora (#206)

* adiciona gratuidade na tabela transacao

* adiciona gratuidade

* cria tabela staging gratuidade

* rho corrige logica particao

* ajustes gratuidade

* ajusta logica partição rho sppo

* materializar passageiros_hora

* corte integracao

* altera source para prod
  • Loading branch information
pixuimpou authored Feb 5, 2024
1 parent 89699d6 commit 40dce8f
Show file tree
Hide file tree
Showing 9 changed files with 369 additions and 59 deletions.
99 changes: 84 additions & 15 deletions models/br_rj_riodejaneiro_bilhetagem/passageiros_hora.sql
Original file line number Diff line number Diff line change
@@ -1,10 +1,48 @@
{{
config(
materialized="view",
)
materialized='incremental',
partition_by={
"field":"data",
"data_type": "date",
"granularity":"day"
},
incremental_strategy="insert_overwrite"
)
}}

WITH transacao_agrupada AS (
{% set transacao_staging = ref('staging_transacao') %}
{% if execute %}
{% if is_incremental() %}
{% set partitions_query %}
SELECT DISTINCT
CONCAT("'", DATE(data_transacao), "'") AS data_transacao
FROM
{{ transacao_staging }}
WHERE
DATE(data) BETWEEN DATE("{{var('date_range_start')}}") AND DATE("{{var('date_range_end')}}")
{% endset %}

{% set partitions = run_query(partitions_query) %}

{% set partition_list = partitions.columns[0].values() %}
{% endif %}
{% endif %}

WITH integracao AS (
SELECT
*
FROM
{{ ref("integracao") }}
{% if is_incremental() %}
WHERE
{% if partition_list|length > 0 %}
data IN ({{ partition_list|join(', ') }})
{% else %}
data = "2000-01-01"
{% endif %}
{% endif %}
),
transacao_agrupada AS (
SELECT
t.data,
t.hora,
Expand All @@ -16,19 +54,26 @@ WITH transacao_agrupada AS (
WHEN i.id_integracao IS NOT NULL THEN "Integração"
ELSE t.tipo_transacao
END AS tipo_transacao,
COUNT(t.id_transacao) AS quantidade_passageiros,
-- ROUND(100 * COUNT(t.id_transacao) / SUM(COUNT(*)) OVER (PARTITION BY t.data, t.modo), 2) AS percentual_passageiros_modo,
ROUND(100 * COUNT(t.id_transacao) / SUM(COUNT(*)) OVER (PARTITION BY t.data), 2) AS percentual_passageiros_dia
t.tipo_gratuidade,
t.tipo_pagamento,
COUNT(t.id_transacao) AS quantidade_passageiros
FROM
{{ ref("transacao") }} t
LEFT JOIN
{{ ref("integracao") }} i
integracao i
ON
t.id_transacao = i.id_transacao
WHERE
t.data >= "2023-07-19"
AND t.servico NOT IN ("888888",
"999999")
{% if is_incremental() %}
{% if partition_list|length > 0 %}
t.data IN ({{ partition_list|join(', ') }})
{% else %}
t.data = "2000-01-01"
{% endif %}
{% else %}
t.data >= "2023-07-19"
{% endif %}
AND t.servico NOT IN ("888888", "999999")
AND t.id_operadora != "2"
GROUP BY
1,
Expand All @@ -37,7 +82,28 @@ WITH transacao_agrupada AS (
4,
5,
6,
7
7,
8,
9
),
transacao_tratada AS (
SELECT
t.data,
t.hora,
t.modo,
t.consorcio,
t.servico,
t.sentido,
CASE
WHEN t.tipo_transacao = "Integração" THEN "Integração"
WHEN t.tipo_transacao IN ("Débito", "Botoeira") THEN "Tarifa Cheia"
ELSE t.tipo_transacao
END AS tipo_transacao_smtr,
t.tipo_gratuidade,
t.tipo_pagamento,
t.quantidade_passageiros
FROM
transacao_agrupada t
)
SELECT
t.data,
Expand All @@ -46,9 +112,12 @@ SELECT
t.consorcio,
t.servico,
t.sentido,
t.tipo_transacao,
t.tipo_transacao_smtr,
CASE
WHEN t.tipo_transacao_smtr = "Gratuidade" THEN t.tipo_gratuidade
WHEN t.tipo_transacao_smtr = "Integração" THEN "Integração"
ELSE t.tipo_pagamento
END AS tipo_transacao_detalhe_smtr,
t.quantidade_passageiros
FROM
transacao_agrupada t
ORDER BY
percentual_passageiros_dia DESC
transacao_tratada t
8 changes: 4 additions & 4 deletions models/br_rj_riodejaneiro_bilhetagem/schema.yml
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,12 @@ models:
- 'projects/rj-smtr/locations/us/taxonomies/6071707853840412174/policyTags/1168584191451386864'
- name: id_transacao
description: "Identificador único da transação"
- name: id_tipo_pagamento
description: "Código do tipo de pagamento utilizado"
- name: tipo_pagamento
description: "Tipo de pagamento utilizado"
- name: tipo_transacao
description: "Tipo de transação realizada (a primeira perna de integrações são classificadas como tipo Débito e não Integração)"
# - name: indicador_integracao
# description: "Indicador booleano se a transação é parte de uma integração (coluna atualizada diariamente)"
- name: tipo_gratuidade
description: "Tipo da gratuidade (Estudante, PCD, Sênior)"
- name: id_tipo_integracao
description: "Tipo da integração realizada (identificador relacionado à matriz de integração)"
- name: id_integracao
Expand Down
77 changes: 71 additions & 6 deletions models/br_rj_riodejaneiro_bilhetagem/transacao.sql
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,31 @@
)
}}

-- Verifica partições para consultar na tabela de gratuidades
{% set incremental_filter %}
DATE(data) BETWEEN DATE("{{var('date_range_start')}}") AND DATE("{{var('date_range_end')}}")
AND timestamp_captura BETWEEN DATETIME("{{var('date_range_start')}}") AND DATETIME("{{var('date_range_end')}}")
{% endset %}

{% set transacao_staging = ref('staging_transacao') %}
{% if execute %}
{% if is_incremental() %}
{% set gratuidade_partitions_query %}
SELECT DISTINCT
CAST(CAST(id_cliente AS FLOAT64) AS INT64) AS id_cliente
FROM
{{ transacao_staging }}
WHERE
{{ incremental_filter }}
AND tipo_transacao = "21"
{% endset %}

{% set gratuidade_partitions = run_query(gratuidade_partitions_query) %}

{% set gratuidade_partition_list = gratuidade_partitions.columns[0].values() %}
{% endif %}
{% endif %}

WITH transacao_deduplicada AS (
SELECT
* EXCEPT(rn)
Expand All @@ -20,11 +45,10 @@ WITH transacao_deduplicada AS (
*,
ROW_NUMBER() OVER (PARTITION BY id ORDER BY timestamp_captura DESC) AS rn
FROM
{{ ref("staging_transacao") }}
{{ transacao_staging }}
{% if is_incremental() -%}
WHERE
DATE(data) BETWEEN DATE("{{var('date_range_start')}}") AND DATE("{{var('date_range_end')}}")
AND timestamp_captura BETWEEN DATETIME("{{var('date_range_start')}}") AND DATETIME("{{var('date_range_end')}}")
{{ incremental_filter }}
{%- endif %}
)
WHERE
Expand All @@ -39,6 +63,35 @@ tipo_transacao AS (
WHERE
id_tabela = "transacao"
AND coluna = "id_tipo_transacao"
),
gratuidade AS (
SELECT
CAST(id_cliente AS STRING) AS id_cliente,
tipo_gratuidade,
data_inicio_validade,
data_fim_validade
FROM
{{ ref("gratuidade_aux") }}
-- se for incremental pega apenas as partições necessárias
{% if is_incremental() %}
WHERE
id_cliente
{% if gratuidade_partition_list|length > 0 %}
IN ({{ gratuidade_partition_list|join(', ') }})
{% else %}
= 0
{% endif %}
{% endif %}
),
tipo_pagamento AS (
SELECT
chave AS id_tipo_pagamento,
valor AS tipo_pagamento
FROM
`rj-smtr.br_rj_riodejaneiro_bilhetagem.dicionario`
WHERE
id_tabela = "transacao"
AND coluna = "id_tipo_pagamento"
)
SELECT
EXTRACT(DATE FROM data_transacao) AS data,
Expand All @@ -54,10 +107,11 @@ SELECT
l.nr_linha AS servico,
sentido,
NULL AS id_veiculo,
COALESCE(id_cliente, pan_hash) AS id_cliente,
COALESCE(t.id_cliente, t.pan_hash) AS id_cliente,
id AS id_transacao,
id_tipo_midia AS id_tipo_pagamento,
tp.tipo_pagamento,
tt.tipo_transacao,
g.tipo_gratuidade,
tipo_integracao AS id_tipo_integracao,
NULL AS id_integracao,
latitude_trx AS latitude,
Expand Down Expand Up @@ -89,4 +143,15 @@ ON
LEFT JOIN
tipo_transacao AS tt
ON
tt.id_tipo_transacao = t.tipo_transacao
tt.id_tipo_transacao = t.tipo_transacao
LEFT JOIN
tipo_pagamento tp
ON
t.id_tipo_midia = tp.id_tipo_pagamento
LEFT JOIN
gratuidade g
ON
t.tipo_transacao = "21"
AND t.id_cliente = g.id_cliente
AND t.data_transacao >= g.data_inicio_validade
AND (t.data_transacao < g.data_fim_validade OR g.data_fim_validade IS NULL)
92 changes: 92 additions & 0 deletions models/br_rj_riodejaneiro_bilhetagem_staging/gratuidade_aux.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
{{
config(
materialized="incremental",
incremental_strategy="insert_overwrite",
partition_by={
"field": "id_cliente",
"data_type": "int64",
"range": {
"start": 0,
"end": 100000000,
"interval": 10000
}
},
)
}}


{% set staging_gratuidade = ref('staging_gratuidade') %}

{% set incremental_filter %}
DATE(data) BETWEEN DATE("{{var('date_range_start')}}") AND DATE("{{var('date_range_end')}}")
AND timestamp_captura BETWEEN DATETIME("{{var('date_range_start')}}") AND DATETIME("{{var('date_range_end')}}")
{% endset %}

{% if execute %}
{% if is_incremental() %}
{% set partitions_query %}
SELECT DISTINCT
CAST(CAST(cd_cliente AS FLOAT64) AS INT64) AS cd_cliente
FROM
{{ staging_gratuidade }}
WHERE
{{ incremental_filter }}
{% endset %}

{% set partitions = run_query(partitions_query) %}

{% set partition_list = partitions.columns[0].values() %}
{% endif %}
{% endif %}

WITH gratuidade_complete_partitions AS (
SELECT
CAST(CAST(cd_cliente AS FLOAT64) AS INT64) AS id_cliente,
id AS id_gratuidade,
tipo_gratuidade,
data_inclusao AS data_inicio_validade,
timestamp_captura
FROM
{{ staging_gratuidade }}
{% if is_incremental() -%}
WHERE
{{ incremental_filter }}

{% if partition_list|length > 0 -%}
UNION ALL

SELECT
id_cliente,
id_gratuidade,
tipo_gratuidade,
data_inicio_validade,
timestamp_captura
FROM
{{ this }}
WHERE
id_cliente IN ({{ partition_list|join(', ') }})
{%- endif %}
{%- endif %}
),
gratuidade_deduplicada AS (
SELECT
* EXCEPT(rn)
FROM
(
SELECT
*,
ROW_NUMBER() OVER (PARTITION BY id_gratuidade ORDER BY timestamp_captura DESC) AS rn
FROM
gratuidade_complete_partitions
)
WHERE
rn = 1
)
SELECT
id_cliente,
tipo_gratuidade,
data_inicio_validade,
LEAD(data_inicio_validade) OVER (PARTITION BY id_cliente ORDER BY data_inicio_validade) AS data_fim_validade,
timestamp_captura
FROM
gratuidade_complete_partitions
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
{{
config(
alias='gratuidade',
)
}}

SELECT
data,
SAFE_CAST(id AS STRING) AS id,
DATETIME(PARSE_TIMESTAMP('%Y-%m-%d %H:%M:%S%Ez', timestamp_captura), "America/Sao_Paulo") AS timestamp_captura,
SAFE_CAST(JSON_VALUE(content, '$.cd_cliente') AS STRING) AS cd_cliente,
DATETIME(PARSE_TIMESTAMP('%Y-%m-%dT%H:%M:%E*S%Ez', SAFE_CAST(JSON_VALUE(content, '$.data_inclusao') AS STRING)), 'America/Sao_Paulo') AS data_inclusao,
SAFE_CAST(JSON_VALUE(content, '$.id_status_gratuidade') AS STRING) AS id_status_gratuidade,
SAFE_CAST(JSON_VALUE(content, '$.id_tipo_gratuidade') AS STRING) AS id_tipo_gratuidade,
SAFE_CAST(JSON_VALUE(content, '$.tipo_gratuidade') AS STRING) AS tipo_gratuidade
FROM
{{ source('br_rj_riodejaneiro_bilhetagem_staging', 'gratuidade') }}

Loading

0 comments on commit 40dce8f

Please sign in to comment.