Skip to content

Commit

Permalink
Create Pydantic schemas, migrate the tests that validate yaml files
Browse files Browse the repository at this point in the history
Co-authored-by: Gustavo <gutaors@gmail.com>
  • Loading branch information
augusto-herrmann and gutaors committed Aug 21, 2024
1 parent 05b77f5 commit f651f0b
Show file tree
Hide file tree
Showing 2 changed files with 148 additions and 30 deletions.
136 changes: 136 additions & 0 deletions src/schemas.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
"""
This module defines the Pydantic models for validating the structure of
the YAML files used in the application.
The main classes are:
- `SearchTerms`: search terms in the YAML file.
- `Search`: search configuration in the YAML file.
- `Report`: report configuration in the YAML file.
- `DAG`: DAG defined in the YAML file.
- `Config`: overall configuration in the YAML file.
These models are used to validate the YAML files using the Pydantic
library.
"""

from typing import List, Optional, Union
from pydantic import AnyHttpUrl, BaseModel, EmailStr, Field


class DBSelect(BaseModel):
"""Represents the structure of the 'from_db_select' field in the YAML file."""

sql: str = Field(description="SQL query to fetch the search terms")
conn_id: str = Field(description="Airflow connection ID to use for the SQL query")


class SearchTerms(BaseModel):
"""Represents the search terms in the YAML file."""

from_airflow_variable: Optional[str] = Field(
default=None,
description="Variável do Airflow a ser usada como termos de pesquisa",
)
from_db_select: Optional[DBSelect] = Field(
default=None,
description="Consulta SQL para buscar os termos de pesquisa em um "
"banco de dados",
)


class SearchField(BaseModel):
"""Represents the field for search in the YAML file."""

description: str
value: str


class Search(BaseModel):
"""Represents the search configuration in the YAML file."""

header: Optional[str] = Field(
default=None, description="Cabeçalho da consulta de pesquisa"
)
sources: Optional[List[str]] = Field(
default=["DOU"],
description="Lista de fontes de dados para pesquisar (Querido Diário [QD], "
"Diário Oficial da União [DOU], INLABS). Default: DOU.",
)
territory_id: Optional[int] = Field(
default=None,
description="ID do território no Querido Diário para filtragem "
"baseada em localização",
)
terms: Union[List[str], SearchTerms] = Field(
description="Lista de termos de pesquisa ou uma forma de buscá-los"
)
department: Optional[List[str]] = Field(
default=None, description="Lista de departamentos para filtrar a pesquisa"
)


class Report(BaseModel):
"""Represents the report configuration in the YAML file."""

slack: Optional[dict] = Field(
default=None, description="Configuração do webhook do Slack para relatórios"
)
discord: Optional[dict] = Field(
default=None, description="Configuração do webhook do Discord para relatórios"
)
emails: Optional[List[EmailStr]] = Field(
default=None, description="Lista de endereços de e-mail para enviar o relatório"
)
attach_csv: Optional[bool] = Field(
default=None,
description="Se deve anexar um arquivo CSV com os resultados da pesquisa",
)
subject: Optional[str] = Field(
default=None, description="Assunto do relatório por e-mail"
)
skip_null: Optional[bool] = Field(
default=None,
description="Se deve pular a notificação de resultados nulos/vazios",
)
hide_filters: Optional[bool] = Field(
default=None, description="Se deve ocultar os filtros aplicados no relatório"
)
header_text: Optional[str] = Field(
default=None, description="Texto a ser incluído no cabeçalho do relatório"
)
footer_text: Optional[str] = Field(
default=None, description="Texto a ser incluído no rodapé do relatório"
)
no_results_found_text: Optional[str] = Field(
default=None, description="Texto a ser exibido quando não há resultados"
)


class DAG(BaseModel):
"""Represents the DAG configuration in the YAML file."""

id: str = Field(description="Nome único da DAG")
description: str = Field(description="Descrição da DAG")
tags: Optional[List[str]] = Field(
default=[], description="Lista de tags para filtragem da DAG no Airflow"
)
owner: Optional[List[str]] = Field(
default=[], description="Lista de owners para filtragem da DAG no Airflow"
)
schedule: Optional[str] = Field(default=None, description="Expressão cron")
dataset: Optional[str] = Field(default=None, description="Nome do Dataset")
search: Union[List[Search], Search] = Field(
description="Seção para definição da busca no Diário"
)
doc_md: Optional[str] = Field(default="", description="description")
report: Report = Field(
description="Aceita: `slack`, `discord`, `emails`, `attach_csv`, "
"`subject`, `skip_null`"
)


class Config(BaseModel):
"""Represents the overall configuration in the YAML file."""

dag: DAG = Field(description="Instanciação da DAG")
42 changes: 12 additions & 30 deletions tests/test_validate_yaml_schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,39 +2,18 @@
"""

import glob
import json
from urllib.parse import urlparse
import yaml
import os
import sys

import jsonschema
from pydantic import ValidationError
import pytest
import requests
import yaml

# add module path so we can import from other modules
sys.path.insert(0, os.path.abspath(os.path.dirname(__file__)))
from schemas import Config

YAMLS_DIR = "../dags/ro_dou/dag_confs"
SCHEMA_FILEPATH = "../schemas/ro-dou.json"
# or
# SCHEMA_FILEPATH = "https://raw.githubusercontent.com/gestaogovbr/Ro-dou/main/schemas/ro-dou.json"


def get_schema(filepath):
def _is_valid_url(url):
try:
result = urlparse(url)
return all([result.scheme, result.netloc])
except ValueError:
return False

if _is_valid_url(filepath):
response = requests.get(filepath)
response.raise_for_status()
return json.loads(response.text)
else:
with open(filepath) as f:
return json.load(f)


SCHEMA = get_schema(SCHEMA_FILEPATH)


@pytest.mark.parametrize(
Expand All @@ -45,8 +24,11 @@ def _is_valid_url(url):
+ glob.glob(f"{YAMLS_DIR}/**/*.yaml", recursive=True)
],
)
def test_json_schema_validation(data_file):
def test_pydantic_validation(data_file):
with open(data_file) as data_fp:
data = yaml.safe_load(data_fp)

jsonschema.validate(instance=data, schema=SCHEMA)
try:
Config(**data)
except ValidationError as e:
pytest.fail(f"YAML file {data_file} is not valid:\n{e}")

0 comments on commit f651f0b

Please sign in to comment.