Skip to content

Commit

Permalink
Refactor code to use Pydantic schema (partial)
Browse files Browse the repository at this point in the history
  • Loading branch information
augusto-herrmann committed Aug 22, 2024
1 parent 0252124 commit 7d91d1c
Show file tree
Hide file tree
Showing 7 changed files with 175 additions and 103 deletions.
22 changes: 11 additions & 11 deletions src/dou_dag_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ def _merge_dict(dict1, dict2):

def result_as_html(specs: DAGConfig) -> bool:
"""Só utiliza resultado HTML apenas para email"""
return specs.discord_webhook and specs.slack_webhook
return specs.report.discord["webhook"] and specs.report.slack["webhook"]


class DouDigestDagGenerator:
Expand Down Expand Up @@ -122,7 +122,7 @@ def prepare_doc_md(specs: DAGConfig, config_file: str) -> str:
Returns:
str: The DAG documentation in markdown format.
"""
config = asdict(specs)
config = specs.model_dump()
# options that won't show in the "DAG Docs"
del config["description"]
del config["doc_md"]
Expand Down Expand Up @@ -170,7 +170,7 @@ def _get_safe_schedule(self, specs: DAGConfig, default_schedule: str) -> str:
"""

schedule = default_schedule
id_based_minute = self._hash_dag_id(specs.dag_id, 60)
id_based_minute = self._hash_dag_id(specs.id, 60)
schedule_without_min = " ".join(schedule.split(" ")[1:])
schedule = f"{id_based_minute} {schedule_without_min}"

Expand Down Expand Up @@ -232,7 +232,7 @@ def generate_dags(self):

for filepath in files_list:
dag_specs = self.parser(filepath).parse()
dag_id = dag_specs.dag_id
dag_id = dag_specs.id
globals()[dag_id] = self.create_dag(dag_specs, filepath)

def perform_searches(
Expand Down Expand Up @@ -356,7 +356,7 @@ def create_dag(self, specs: DAGConfig, config_file: str) -> DAG:
"""
# Prepare the markdown documentation
doc_md = (
self.prepare_doc_md(specs, config_file) if specs.doc_md else specs.doc_md
self.prepare_doc_md(specs, config_file) if specs.doc_md else None
)
# DAG parameters
default_args = {
Expand All @@ -372,14 +372,14 @@ def create_dag(self, specs: DAGConfig, config_file: str) -> DAG:
schedule = self._update_schedule(specs)

dag = DAG(
specs.dag_id,
specs.id,
default_args=default_args,
schedule=schedule,
description=specs.description,
doc_md=doc_md,
catchup=False,
params={"trigger_date": "2022-01-02T12:00"},
tags=specs.dag_tags,
tags=specs.tags,
)

with dag:
Expand Down Expand Up @@ -427,9 +427,8 @@ def create_dag(self, specs: DAGConfig, config_file: str) -> DAG:
)

if subsearch["sql"]:
(
select_terms_from_db_task >> exec_search_task
) # pylint: disable=pointless-statement
# pylint: disable=pointless-statement
select_terms_from_db_task >> exec_search_task

has_matches_task = BranchPythonOperator(
task_id="has_matches",
Expand All @@ -443,7 +442,7 @@ def create_dag(self, specs: DAGConfig, config_file: str) -> DAG:
]
)
+ ") }}",
"skip_null": specs.skip_null,
"skip_null": specs.report.skip_null,
},
)

Expand All @@ -465,6 +464,7 @@ def create_dag(self, specs: DAGConfig, config_file: str) -> DAG:
},
)

# pylint: disable=pointless-statement
tg_exec_searchs >> has_matches_task

has_matches_task >> [send_notification_task, skip_notification_task]
Expand Down
19 changes: 11 additions & 8 deletions src/notification/discord_sender.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,20 @@
import requests
import re

import requests

from notification.isender import ISender
from schemas import ReportConfig


class DiscordSender(ISender):
highlight_tags = ("__", "__")

def __init__(self, specs) -> None:
self.webhook_url = specs.discord_webhook
self.hide_filters = specs.hide_filters
self.header_text = specs.header_text
self.footer_text = specs.footer_text
self.no_results_found_text = specs.no_results_found_text
def __init__(self, report_config: ReportConfig) -> None:
self.webhook_url = report_config.discord["webhook"]
self.hide_filters = report_config.hide_filters
self.header_text = report_config.header_text
self.footer_text = report_config.footer_text
self.no_results_found_text = report_config.no_results_found_text

def send(self, search_report: list, report_date: str = None):
"""Parse the content, and send message to Discord"""
Expand Down Expand Up @@ -68,4 +71,4 @@ def _remove_html_tags(self, text):
# Define a regular expression pattern to match HTML tags
clean = re.compile('<.*?>')
# Substitute HTML tags with an empty string
return re.sub(clean, '', text)
return re.sub(clean, '', text)
48 changes: 27 additions & 21 deletions src/notification/email_sender.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
"""Module for sending emails.
"""

import os
import sys
import textwrap
from tempfile import NamedTemporaryFile
import textwrap

import markdown
import pandas as pd
Expand All @@ -14,12 +17,17 @@
sys.path.insert(0, parent_dir)

from notification.isender import ISender
from schemas import ReportConfig


class EmailSender(ISender):
"""Prepare and send e-mails with the reports."""

highlight_tags = ("<span class='highlight' style='background:#FFA;'>", "</span>")
def __init__(self, specs) -> None:
self.specs = specs

def __init__(self, report_config: ReportConfig) -> None:
self.report_config = report_config
self.search_report = ""
self.watermark = """
<p><small>Esta pesquisa foi realizada automaticamente pelo
<a href="https://gestaogovbr.github.io/Ro-dou/">Ro-DOU</a>
Expand All @@ -29,36 +37,36 @@ def __init__(self, specs) -> None:
def send(self, search_report: list, report_date: str):
"""Builds the email content, the CSV if applies, and send it"""
self.search_report = search_report
full_subject = f"{self.specs.subject} - DOs de {report_date}"
full_subject = f"{self.report_config.subject} - DOs de {report_date}"
skip_notification = True
for search in self.search_report:

items = ["contains" for k, v in search["result"].items() if v]
if items:
skip_notification = False
else:
content = self.specs.no_results_found_text
content = self.report_config.no_results_found_text

if skip_notification:
if self.specs.skip_null:
if self.report_config.skip_null:
return "skip_notification"
else:
content = self.generate_email_content()

content += self.watermark

if self.specs.attach_csv and skip_notification is False:
if self.report_config.attach_csv and skip_notification is False:
with self.get_csv_tempfile() as csv_file:
send_email(
to=self.specs.emails,
to=self.report_config.emails,
subject=full_subject,
files=[csv_file.name],
html_content=content,
mime_charset="utf-8",
)
else:
send_email(
to=self.specs.emails,
to=self.report_config.emails,
subject=full_subject,
html_content=content,
mime_charset="utf-8",
Expand All @@ -73,18 +81,18 @@ def generate_email_content(self) -> str:
parent_directory = os.path.dirname(current_directory)
file_path = os.path.join(parent_directory, "report_style.css")

with open(file_path, "r") as f:
with open(file_path, "r", encoding="utf-8") as f:
blocks = [f"<style>\n{f.read()}</style>"]

if self.specs.header_text:
blocks.append(self.specs.header_text)
if self.report_config.header_text:
blocks.append(self.report_config.header_text)

for search in self.search_report:

if search["header"]:
blocks.append(f"<h1>{search['header']}</h1>")

if not self.specs.hide_filters:
if not self.report_config.hide_filters:
if search["department"]:
blocks.append(
"""<p class="secao-marker">Filtrando resultados somente para:</p>"""
Expand All @@ -97,24 +105,22 @@ def generate_email_content(self) -> str:
for group, results in search["result"].items():

if not results:
blocks.append(
f"<p>{self.specs.no_results_found_text}.</p>"
)
blocks.append(f"<p>{self.report_config.no_results_found_text}.</p>")
else:
if not self.specs.hide_filters:
if not self.report_config.hide_filters:
if group != "single_group":
blocks.append("\n")
blocks.append(f"**Grupo: {group}**")
blocks.append("\n\n")

for term, items in results.items():
blocks.append("\n")
if not self.specs.hide_filters:
if not self.report_config.hide_filters:
blocks.append(f"* # Resultados para: {term}")

for item in items:

if not self.specs.hide_filters:
if not self.report_config.hide_filters:
sec_desc = item["section"]
item_html = f"""
<p class="secao-marker">{sec_desc}</p>
Expand All @@ -131,8 +137,8 @@ def generate_email_content(self) -> str:
blocks.append(textwrap.dedent(item_html))

blocks.append("---")
if self.specs.footer_text:
blocks.append(self.specs.footer_text)
if self.report_config.footer_text:
blocks.append(self.report_config.footer_text)

return markdown.markdown("\n".join(blocks))

Expand Down
18 changes: 12 additions & 6 deletions src/notification/notifier.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,21 @@ class Notifier:

def __init__(self, specs: DAGConfig) -> None:
self.senders = []
if specs.emails:
self.senders.append(EmailSender(specs))
if specs.discord_webhook:
self.senders.append(DiscordSender(specs))
if specs.slack_webhook:
self.senders.append(SlackSender(specs))
if specs.report.emails:
self.senders.append(EmailSender(specs.report))
if specs.report.discord["webhook"]:
self.senders.append(DiscordSender(specs.report))
if specs.report.slack["webhook"]:
self.senders.append(SlackSender(specs.report))


def send_notification(self, search_report: str, report_date: str):
"""Sends the notification to the specified email, Discord or Slack
Args:
search_report (str): The report to be sent
report_date (str): The date of the report
"""
# Convert to data structure after it's retrieved from xcom
search_report = ast.literal_eval(search_report)

Expand Down
30 changes: 18 additions & 12 deletions src/notification/slack_sender.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,27 @@
"""Send reports to Slack.
"""

from datetime import datetime
import re

import requests
import re
from notification.isender import ISender

from schemas import ReportConfig


class SlackSender(ISender):
"""Prepare a report and send it to Slack.
"""
highlight_tags = ("*", "*")

def __init__(self, specs) -> None:
self.webhook_url = specs.slack_webhook
def __init__(self, report_config: ReportConfig) -> None:
self.webhook_url = report_config.slack["webhook"]
self.blocks = []
self.hide_filters = specs.hide_filters
self.header_text = specs.header_text
self.footer_text = specs.footer_text
self.no_results_found_text = specs.no_results_found_text
self.hide_filters = report_config.hide_filters
self.header_text = report_config.header_text
self.footer_text = report_config.footer_text
self.no_results_found_text = report_config.no_results_found_text

def send(self, search_report: list, report_date: str = None):
"""Parse the content, and send message to Slack"""
Expand All @@ -37,9 +44,7 @@ def send(self, search_report: list, report_date: str = None):
for item in items:
self._add_block(item)
else:
self._add_text(
self.no_results_found_text
)
self._add_text(self.no_results_found_text)

if self.footer_text:
footer_text = _remove_html_tags(self.footer_text)
Expand Down Expand Up @@ -111,8 +116,9 @@ def _format_date(date_str: str) -> str:
_from, _to = WEEKDAYS_EN_TO_PT[date.weekday()]
return date.strftime("%a %d/%m").replace(_from, _to)


def _remove_html_tags(text):
# Define a regular expression pattern to match HTML tags
clean = re.compile('<.*?>')
clean = re.compile("<.*?>")
# Substitute HTML tags with an empty string
return re.sub(clean, '', text)
return re.sub(clean, "", text)
Loading

0 comments on commit 7d91d1c

Please sign in to comment.