diff --git a/dags/preprocess_actors.py b/dags/preprocess_actors.py index 87a265f..623501b 100644 --- a/dags/preprocess_actors.py +++ b/dags/preprocess_actors.py @@ -2,21 +2,25 @@ from airflow import DAG from airflow.operators.python import PythonOperator from airflow.providers.postgres.hooks.postgres import PostgresHook + from utils.utils import load_table, normalize_nom, normalize_url, normalize_email, normalize_phone_number, \ apply_normalization, save_to_database + def read_data_from_postgres(): pg_hook = PostgresHook(postgres_conn_id='lvao-preprod') engine = pg_hook.get_sqlalchemy_engine() df = load_table("qfdmo_acteur", engine) return df + def apply_address_normalization(**kwargs): df = kwargs['ti'].xcom_pull(task_ids='read_imported_actors') normalization_map = {"address": normalize_nom, "adresse_complement": normalize_nom} df_normalized = apply_normalization(df, normalization_map) return df_normalized + def apply_other_normalizations(**kwargs): df = kwargs['ti'].xcom_pull(task_ids='BAN_normalization') columns_to_exclude = ["identifiant_unique", "statut", "cree_le", "modifie_le"] @@ -31,12 +35,14 @@ def apply_other_normalizations(**kwargs): df_cleaned = apply_normalization(df, normalization_map) return df_cleaned + def write_data_to_postgres(**kwargs): df_cleaned = kwargs['ti'].xcom_pull(task_ids='other_normalizations') pg_hook = PostgresHook(postgres_conn_id='lvao-preprod') engine = pg_hook.get_sqlalchemy_engine() save_to_database(df_cleaned, "lvao_actors_processed", engine) + default_args = { 'owner': 'airflow', 'depends_on_past': False, @@ -50,7 +56,7 @@ def write_data_to_postgres(**kwargs): dag = DAG( 'imported_actors_preprocessing', default_args=default_args, - description='Enhanced DAG for normalizing and saving LVAO actors data with address normalization', + description='DAG for normalizing and saving LVAO actors', schedule_interval=None, ) @@ -81,4 +87,4 @@ def write_data_to_postgres(**kwargs): dag=dag, ) -t1 >> t2 >> t3 >> t4 \ No newline at end of file +t1 >> t2 >> t3 >> t4