Skip to content

Commit

Permalink
read qfdmo_acteur and qfdmo_propositionservice without transformation
Browse files Browse the repository at this point in the history
  • Loading branch information
kolok committed Mar 14, 2024
1 parent 55f757e commit e12106c
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 13 deletions.
6 changes: 3 additions & 3 deletions dags/create_final_actors.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ def write_data_to_postgres(**kwargs):
t1 = PythonOperator(
task_id='load_normalized_actors',
python_callable=read_data_from_postgres,
op_kwargs={"table_name": "qfdmo_actors_processed"},
op_kwargs={"table_name": "qfdmo_acteur"},
dag=dag,
)

Expand All @@ -115,14 +115,14 @@ def write_data_to_postgres(**kwargs):
t2 = PythonOperator(
task_id='load_manual_actor_updates',
python_callable=read_data_from_postgres,
op_kwargs={"table_name": "qfdmo_manual_actors_updates"},
op_kwargs={"table_name": "qfdmo_revisionacteur"},
dag=dag,
)

t2_bis = PythonOperator(
task_id='load_manual_propositionservice_updates',
python_callable=read_data_from_postgres,
op_kwargs={"table_name": "qfdmo_manual_propositionservice_updates"},
op_kwargs={"table_name": "qfdmo_revisionpropositionservice"},
dag=dag,
)

Expand Down
17 changes: 7 additions & 10 deletions dags/preprocess_actors.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,8 @@
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.postgres.hooks.postgres import PostgresHook
import sys

from utils.utils import load_table, normalize_nom, normalize_url, normalize_email, normalize_phone_number, \
apply_normalization, save_to_database
from utils.utils import load_table, normalize_nom, apply_normalization, save_to_database


def read_data_from_postgres():
Expand All @@ -24,14 +22,13 @@ def apply_address_normalization(**kwargs):

def apply_other_normalizations(**kwargs):
df = kwargs['ti'].xcom_pull(task_ids='BAN_normalization')
columns_to_exclude = ["identifiant_unique", "statut", "cree_le", "modifie_le"]
normalization_map = {
"nom": normalize_nom,
"nom_commercial": normalize_nom,
"ville": normalize_nom,
"url": normalize_url,
"email": normalize_email,
"telephone": normalize_phone_number,
# "nom": normalize_nom,
# "nom_commercial": normalize_nom,
# "ville": normalize_nom,
# "url": normalize_url,
# "email": normalize_email,
# "telephone": normalize_phone_number,
}
df_cleaned = apply_normalization(df, normalization_map)
return df_cleaned
Expand Down

0 comments on commit e12106c

Please sign in to comment.