Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cria parâmetro para Dataset no Data-Aware Schedulling #115

Merged
merged 11 commits into from
Aug 9, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ dag:
description: DAG de teste
tags:
- inlabs
schedule: 0 8 * * MON-FRI
dataset: inlabs
owner:
- cdata
search:
Expand Down
1 change: 1 addition & 0 deletions dag_confs/examples_and_tests/inlabs_example.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ dag:
tags:
- inlabs
schedule: 0 8 * * MON-FRI
dataset: inlabs
owner:
- cdata
search:
Expand Down
28 changes: 25 additions & 3 deletions dag_load_inlabs/ro-dou_inlabs_load_pg_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,10 @@
import logging
from datetime import datetime, timedelta

from airflow import Dataset
from airflow.decorators import dag, task
from airflow.models import Variable
from airflow.providers.common.sql.operators.sql import SQLCheckOperator

sys.path.insert(0, os.path.abspath(os.path.dirname(__file__)))

Expand Down Expand Up @@ -57,7 +59,7 @@ def get_date() -> str:
context = get_current_context()
return get_trigger_date(context, local_time=True).strftime("%Y-%m-%d")

@task
@task.short_circuit
def download_n_unzip_files(trigger_date: str):
import requests
from bs4 import BeautifulSoup
Expand Down Expand Up @@ -107,6 +109,9 @@ def _download_files():
"origem": "736372697074",
}
files = _find_files(session, headers)
if not files:
return False

for file in files:
r = session.request(
"GET",
Expand All @@ -118,6 +123,8 @@ def _download_files():

logging.info("Downloaded files: %s", files)

return True

def _unzip_files():
all_files = os.listdir(dest_path)
# filter zip files
Expand All @@ -131,9 +138,11 @@ def _unzip_files():
inlabs_conn = BaseHook.get_connection(INLABS_CONN_ID)
dest_path = os.path.join(Variable.get("path_tmp"), DEST_DIR)
_create_directories()
_download_files()
files_exists = _download_files()
_unzip_files()

return files_exists

@task
def load_data(trigger_date: str):
from bs4 import BeautifulSoup
Expand Down Expand Up @@ -191,6 +200,19 @@ def _clean_db(hook: PostgresHook):
)
logging.info("Table `%s` updated with %s lines.", STG_TABLE, len(df))

check_loaded_data = SQLCheckOperator(
task_id="check_loaded_data",
conn_id=DEST_CONN_ID,
sql=f"""
SELECT 1
FROM
{STG_TABLE}
WHERE
DATE(pubdate) = '{{{{ ti.xcom_pull(task_ids='get_date')}}}}'
""",
outlets=[Dataset("inlabs")]
)

@task
def remove_directory():
dest_path = os.path.join(Variable.get("path_tmp"), DEST_DIR)
Expand All @@ -200,7 +222,7 @@ def remove_directory():
## Orchestration
trigger_date = get_date()
download_n_unzip_files(trigger_date) >> \
load_data(trigger_date) >> \
load_data(trigger_date) >> check_loaded_data >> \
remove_directory()


Expand Down
1 change: 1 addition & 0 deletions docs/docs/como_funciona/parametros.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ A página abaixo lista os parâmetros configuráveis nos arquivos YAML:
* **description**: Descrição da DAG de pesquisa.
* **doc_md**: Documentação em markdown da DAG para uma descrição mais completa.
* **schedule**: Agendamento da periodicidade de execução da DAG. Padrão cron (0 8 * * MON-FRI)
* **dataset**: Agendamento da DAG baseado na atualização de um Dataset do Airflow. Em conjunto com o schedule a execução é condicionada ao schedule e dataset.
* **tags**: Tags para categorizar a DAG.
* **owner**: Responsável pela DAG.

Expand Down
6 changes: 5 additions & 1 deletion schemas/ro-dou.json
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,11 @@
},
"schedule": {
"type": "string",
"description": "Expressão cron ou nome do Dataset"
"description": "Expressão cron"
},
"dataset": {
"type": "string",
"description": "Nome do Dataset"
},
"search": {
"oneOf":
Expand Down
83 changes: 80 additions & 3 deletions src/dou_dag_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,20 @@
import textwrap
from dataclasses import asdict
from datetime import datetime, timedelta
from typing import Dict, List, Optional
from typing import Dict, List, Optional, Union
import json

import pandas as pd
from airflow import DAG
from airflow import DAG, Dataset
from airflow.utils.task_group import TaskGroup
from airflow.hooks.base import BaseHook
from airflow.operators.empty import EmptyOperator
from airflow.operators.python import BranchPythonOperator, PythonOperator
from airflow.providers.microsoft.mssql.hooks.mssql import MsSqlHook
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.providers.slack.notifications.slack import SlackNotifier
from airflow.timetables.datasets import DatasetOrTimeSchedule
from airflow.timetables.trigger import CronTriggerTimetable


sys.path.insert(0, os.path.abspath(os.path.dirname(__file__)))
Expand Down Expand Up @@ -76,6 +78,8 @@ class DouDigestDagGenerator:

YAMLS_DIR_LIST = [dag_confs for dag_confs in YAMLS_DIR.split(":")]
SLACK_CONN_ID = "slack_notify_rodou_dagrun"
DEFAULT_SCHEDULE = "0 5 * * *"

parser = YAMLParser
searchers: Dict[str, BaseSearcher]

Expand Down Expand Up @@ -145,6 +149,76 @@ def prepare_doc_md(specs: DAGConfig, config_file: str) -> str:
doc_md = doc_md + "</dl>\n"
return doc_md

@staticmethod
def _hash_dag_id(dag_id: str, size: int) -> int:
"""Hashes the `dag_id` into a integer between 0 and `size`"""
buffer = 0
for _char in dag_id:
buffer += ord(_char)
try:
_hash = buffer % size
except ZeroDivisionError:
raise ValueError("`size` deve ser maior que 0.")
return _hash

def _get_safe_schedule(self, specs: DAGConfig, default_schedule: str) -> str:
"""Retorna um novo valor de `schedule` randomizando o
minuto de execução baseado no `dag_id`, caso a dag utilize o
schedule padrão. Aplica uma função de hash na string
dag_id que retorna valor entre 0 e 60 que define o minuto de
execução.
"""

schedule = default_schedule
id_based_minute = self._hash_dag_id(specs.dag_id, 60)
schedule_without_min = " ".join(schedule.split(" ")[1:])
schedule = f"{id_based_minute} {schedule_without_min}"

return schedule

def _update_schedule_with_dataset(
self, dataset: str, schedule: str, is_default_schedule: bool
) -> Union[Dataset, DatasetOrTimeSchedule]:
"""Caso informado um dataset o schedule é alterado
para ser condicionado a execução por Dataset ou
DatasetOrTimeSchedule
(caso o valor de schedule esteja informado no YAML).
"""
if not is_default_schedule:
return DatasetOrTimeSchedule(
timetable=CronTriggerTimetable(
schedule, timezone=os.getenv("AIRFLOW__CORE__DEFAULT_TIMEZONE")
),
datasets=[Dataset(dataset)],
)
return [Dataset(dataset)]

def _update_schedule(
self, specs: DAGConfig
) -> Union[str, Union[Dataset, DatasetOrTimeSchedule]]:
"""Atualiza o valor do schedule para o
valor default ou para Dataset, se for o caso.
"""
schedule = specs.schedule

if schedule is None:
schedule = self._get_safe_schedule(
specs=specs,
default_schedule=self.DEFAULT_SCHEDULE
)
is_default_schedule = True
else:
is_default_schedule = False

if specs.dataset is not None:
schedule = self._update_schedule_with_dataset(
dataset=specs.dataset,
schedule=schedule,
is_default_schedule=is_default_schedule,
)

return schedule

def generate_dags(self):
"""Iterates over the YAML files and creates all dags"""

Expand Down Expand Up @@ -294,10 +368,13 @@ def create_dag(self, specs: DAGConfig, config_file: str) -> DAG:
"on_retry_callback": self.on_retry_callback,
"on_failure_callback": self.on_failure_callback,
}

schedule = self._update_schedule(specs)

dag = DAG(
specs.dag_id,
default_args=default_args,
schedule=specs.schedule,
schedule=schedule,
description=specs.description,
doc_md=doc_md,
catchup=False,
Expand Down
43 changes: 5 additions & 38 deletions src/parsers.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""Abstract and concrete classes to parse DAG configuration from a file."""

import ast
import os
import textwrap
from abc import ABC, abstractmethod
from dataclasses import dataclass
Expand All @@ -10,7 +11,6 @@
from airflow import Dataset
from airflow.models import Variable


@dataclass
class SearchConfig:
header: str
Expand Down Expand Up @@ -40,6 +40,7 @@ class DAGConfig:
discord_webhook: str
slack_webhook: str
schedule: str
dataset: str
description: str
skip_null: bool
doc_md: str
Expand All @@ -57,49 +58,13 @@ class FileParser(ABC):
@abstractmethod
def parse(self):
pass

def _hash_dag_id(self, dag_id: str, size: int) -> int:
"""Hashes the `dag_id` into a integer between 0 and `size`"""
buffer = 0
for _char in dag_id:
buffer += ord(_char)
try:
_hash = buffer % size
except ZeroDivisionError:
raise ValueError("`size` deve ser maior que 0.")
return _hash

def _get_safe_schedule(
self, dag: dict, default_schedule: str
) -> Union[str, Dataset]:
"""Retorna um novo valor de `schedule` randomizando o
minuto de execução baseado no `dag_id`, caso a dag utilize o
schedule padrão. Aplica uma função de hash na string
dag_id que retorna valor entre 0 e 60 que define o minuto de
execução.
"""
schedule = dag.get("schedule", default_schedule)
# is_cron?
if len(schedule.split(" ")) == 5:
if schedule == default_schedule:
id_based_minute = self._hash_dag_id(dag["id"], 60)
schedule_without_min = " ".join(schedule.split(" ")[1:])
schedule = f"{id_based_minute} {schedule_without_min}"
else:
schedule = [Dataset(schedule)]

return schedule


class YAMLParser(FileParser):
"""Parses YAML file and get the DAG parameters.

It guarantees that mandatory fields are in place and are properly
defined providing clear error messages.
"""

DEFAULT_SCHEDULE = "0 5 * * *"

def __init__(self, filepath: str):
self.filepath = filepath

Expand Down Expand Up @@ -153,7 +118,8 @@ def _parse_yaml(self) -> DAGConfig:
)
slack_webhook = report["slack"]["webhook"] if report.get("slack") else None

schedule = self._get_safe_schedule(dag, self.DEFAULT_SCHEDULE)
schedule = dag.get("schedule", None)
dataset = dag.get("dataset", None)
doc_md = dag.get("doc_md", None)
if doc_md:
doc_md = textwrap.dedent(doc_md)
Expand Down Expand Up @@ -182,6 +148,7 @@ def _parse_yaml(self) -> DAGConfig:
discord_webhook=discord_webhook,
slack_webhook=slack_webhook,
schedule=schedule,
dataset=dataset,
description=description,
skip_null=skip_null,
doc_md=doc_md,
Expand Down
Loading
Loading