Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
Hazelmat committed Feb 7, 2024
1 parent 088566f commit af2e35f
Showing 1 changed file with 8 additions and 2 deletions.
10 changes: 8 additions & 2 deletions dags/preprocess_actors.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,25 @@
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.postgres.hooks.postgres import PostgresHook

from utils.utils import load_table, normalize_nom, normalize_url, normalize_email, normalize_phone_number, \
apply_normalization, save_to_database


def read_data_from_postgres():
pg_hook = PostgresHook(postgres_conn_id='lvao-preprod')
engine = pg_hook.get_sqlalchemy_engine()
df = load_table("qfdmo_acteur", engine)
return df


def apply_address_normalization(**kwargs):
df = kwargs['ti'].xcom_pull(task_ids='read_imported_actors')
normalization_map = {"address": normalize_nom, "adresse_complement": normalize_nom}
df_normalized = apply_normalization(df, normalization_map)
return df_normalized


def apply_other_normalizations(**kwargs):
df = kwargs['ti'].xcom_pull(task_ids='BAN_normalization')
columns_to_exclude = ["identifiant_unique", "statut", "cree_le", "modifie_le"]
Expand All @@ -31,12 +35,14 @@ def apply_other_normalizations(**kwargs):
df_cleaned = apply_normalization(df, normalization_map)
return df_cleaned


def write_data_to_postgres(**kwargs):
df_cleaned = kwargs['ti'].xcom_pull(task_ids='other_normalizations')
pg_hook = PostgresHook(postgres_conn_id='lvao-preprod')
engine = pg_hook.get_sqlalchemy_engine()
save_to_database(df_cleaned, "lvao_actors_processed", engine)


default_args = {
'owner': 'airflow',
'depends_on_past': False,
Expand All @@ -50,7 +56,7 @@ def write_data_to_postgres(**kwargs):
dag = DAG(
'imported_actors_preprocessing',
default_args=default_args,
description='Enhanced DAG for normalizing and saving LVAO actors data with address normalization',
description='DAG for normalizing and saving LVAO actors',
schedule_interval=None,
)

Expand Down Expand Up @@ -81,4 +87,4 @@ def write_data_to_postgres(**kwargs):
dag=dag,
)

t1 >> t2 >> t3 >> t4
t1 >> t2 >> t3 >> t4

0 comments on commit af2e35f

Please sign in to comment.