Skip to content

Commit

Permalink
Merge pull request #364 from DNO-inc/airflow_test
Browse files Browse the repository at this point in the history
Airflow integration
  • Loading branch information
m-o-d-e-r authored Jun 10, 2024
2 parents 3d91ca5 + 79d04ae commit 07c16ab
Show file tree
Hide file tree
Showing 11 changed files with 429 additions and 16 deletions.
44 changes: 44 additions & 0 deletions .github/workflows/airflow-publish.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
name: Create and publish a Docker image

on:
push:
branches: [ "main" ]

env:
REGISTRY: ghcr.io
IMAGE_NAME: ${{ github.repository }}

jobs:
build-and-push-image:
runs-on: ubuntu-latest

permissions:
contents: read
packages: write

steps:
- name: Checkout repository
uses: actions/checkout@v4

- name: Log in to the Container registry
uses: docker/login-action@65b78e6e13532edd9afa3aa52ac7964289d1a9c1
with:
registry: ${{ env.REGISTRY }}
username: ${{ github.actor }}
password: ${{ secrets.GITHUB_TOKEN }}

- name: Extract metadata (tags, labels) for Docker
id: meta
uses: docker/metadata-action@9ec57ed1fcdbf14dcef7dfbe97b2010124a938b7
with:
images: ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}

- name: Build and push Docker image
id: push
uses: docker/build-push-action@f2a1d5e99d037542a71f64918e516c093c6f3fc4
with:
context: .
file: ./burrito/airflow/Dockerfile
push: true
tags: ${{ steps.meta.outputs.tags }}
labels: ${{ steps.meta.outputs.labels }}
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,5 @@ shadow
.poetry.lock

*.ipynb

*/airflow/logs/*
32 changes: 32 additions & 0 deletions burrito/airflow/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
FROM python:3.10-slim-buster as build-base

ENV POETRY_NO_INTERACTION=1 \
POETRY_VIRTUALENVS_IN_PROJECT=true \
PATH="/opt/pysetup/.venv/bin:$PATH"

RUN apt-get update
RUN apt-get install --no-install-recommends -y build-essential

WORKDIR /opt/pysetup

RUN pip3 install poetry

COPY pyproject.toml ./

RUN poetry install --only main

RUN apt -y install netcat iputils-ping

FROM apache/airflow:2.9.1-python3.10

ENV PATH="/opt/pysetup/.venv/bin:$PATH"
ENV PYTHONPATH="${PYTHONPATH}:/opt/burrito_project"

COPY --from=build-base /opt/pysetup/ /opt/pysetup/
COPY ./preprocessor_config.json /opt/burrito_project/burrito/preprocessor_config.json
COPY ./CONTRIBUTORS.md /opt/burrito_project/burrito/CONTRIBUTORS.md
COPY ./CHANGELOG.md /opt/burrito_project/burrito/CHANGELOG.md
COPY ./burrito /opt/burrito_project/burrito
COPY ./event_init.sql /opt/burrito_project/burrito/event_init.sql

CMD []
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
import datetime

from airflow import DAG
from airflow.operators.python import PythonOperator

from burrito.utils.email_util import publish_email
from burrito.utils.email_templates import TEMPLATE__EMAIL_NOTIFICATION_FOR_ADMIN
from burrito.utils.query_util import STATUS_NEW
Expand Down Expand Up @@ -44,3 +47,17 @@ def check_for_new_tickets():
)
)
get_logger().info(f"Found {len(tickets_list)} tickets with status NEW")


with DAG(
dag_id="new_tickets_dag",
description="Check if there is tickets with NEW status and send email if any",
start_date=datetime.datetime.now() - datetime.timedelta(days=1),
schedule_interval=datetime.timedelta(hours=3),
catchup=False,
is_paused_upon_creation=False
) as dag:
PythonOperator(
task_id="check_for_new_tickets",
python_callable=check_for_new_tickets
)
52 changes: 52 additions & 0 deletions burrito/airflow/dags/ping_dag.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
import socket
import datetime

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.models.baseoperator import chain

from burrito.utils.logger import get_logger
from burrito.utils.config_reader import get_config


HOSTS_TO_PING = (
(get_config().BURRITO_DB_HOST, get_config().BURRITO_DB_PORT),
(get_config().BURRITO_REDIS_HOST, get_config().BURRITO_REDIS_PORT),
(get_config().BURRITO_MONGO_HOST, get_config().BURRITO_MONGO_PORT),
("iis.sumdu.edu.ua", 80),
(get_config().BURRITO_SMTP_SERVER, 25),
(get_config().BURRITO_SMTP_SERVER, 465),
)


def burrito_ping(host, port):
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

try:
sock.connect((host, int(port)))
except socket.error:
get_logger().critical(f"({host}, {port}) is unreachable")
except Exception as exc:
get_logger().error(exc)


with DAG(
dag_id="ping_dag",
description="Pinging important hosts",
start_date=datetime.datetime.now() - datetime.timedelta(days=1),
schedule_interval=datetime.timedelta(hours=1),
catchup=False,
is_paused_upon_creation=False
) as dag:
chain(
*[
PythonOperator(
task_id=f"pinging__{host}_{port}",
python_callable=burrito_ping,
op_kwargs={
"host": host,
"port": port
}
) for host, port in HOSTS_TO_PING
]
)
66 changes: 66 additions & 0 deletions burrito/airflow/dags/preinstall_dag.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
import datetime

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.models.baseoperator import chain
from peewee import MySQLDatabase

from burrito.utils.db_utils import create_tables
from burrito.utils.db_cursor_object import get_database_cursor
from burrito.utils.mongo_util import mongo_init_ttl_indexes
from burrito.utils.logger import get_logger
from burrito.airflow.utils import preprocessor_task

from burrito.models.m_email_code import EmailVerificationCode
from burrito.models.m_password_rest_model import AccessRenewMetaData


def init_db_events():
with open(
"/opt/burrito_project/burrito/event_init.sql",
"r",
encoding="utf-8"
) as file:
db: MySQLDatabase = get_database_cursor()

for query in file.read().split(";"):
query = query.replace('\t', "").replace("\n", "")
query = ' '.join(query.split())

if not query:
continue

try:
db.execute_sql(query)
except Exception as e:
get_logger().error(e)


with DAG(
dag_id="preinstall_dag",
description="Preparing DB",
start_date=datetime.datetime.now() - datetime.timedelta(days=1),
schedule_interval=None,
is_paused_upon_creation=False
) as dag:
chain(
PythonOperator(
task_id="create_db_tables",
python_callable=create_tables
),
PythonOperator(
task_id="first_preprocessor_run",
python_callable=preprocessor_task
),
PythonOperator(
task_id="create_mongo_ttl_indexes",
python_callable=mongo_init_ttl_indexes,
op_kwargs={
"models": [EmailVerificationCode, AccessRenewMetaData]
}
),
PythonOperator(
task_id="create_db_events",
python_callable=init_db_events
)
)
48 changes: 48 additions & 0 deletions burrito/airflow/dags/preprocessor_dag.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
import datetime
import orjson as json

from airflow import DAG
from airflow.operators.python import PythonOperator

from burrito.models.group_model import Groups
from burrito.models.statuses_model import Statuses
from burrito.models.faculty_model import Faculties
from burrito.models.queues_model import Queues
from burrito.models.permissions_model import Permissions
from burrito.models.roles_model import Roles
from burrito.models.role_permissions_model import RolePermissions
from burrito.airflow.utils import preprocessor_task


MODEL_KEYS = {
"groups": Groups,
"faculties": Faculties,
"statuses": Statuses,
"queues": Queues,
"permissions": Permissions,
"roles": Roles,
"role_permissions": RolePermissions
}

DEFAULT_CONFIG = ""

with open(
"/opt/burrito_project/burrito/preprocessor_config.json",
"r",
encoding="utf-8"
) as file:
DEFAULT_CONFIG = json.loads(file.read())


with DAG(
dag_id="preprocessor_dag",
description="Synchronize DB with data get from API",
start_date=datetime.datetime.now() - datetime.timedelta(days=1),
schedule_interval=datetime.timedelta(minutes=30),
catchup=False,
is_paused_upon_creation=False
) as dag:
PythonOperator(
task_id="preprocessor_task",
python_callable=preprocessor_task
)
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,17 @@

DEFAULT_CONFIG = ""

with open("preprocessor_config.json", "r", encoding="utf-8") as file:
with open(
"/opt/burrito_project/burrito/preprocessor_config.json",
"r",
encoding="utf-8"
) as file:
DEFAULT_CONFIG = json.loads(file.read())


def preprocessor_task():
PluginLoader.load()

get_logger().info("Preprocessor is started")

conn = None
Expand Down
14 changes: 0 additions & 14 deletions burrito/utils/tasks/ping.py

This file was deleted.

Loading

0 comments on commit 07c16ab

Please sign in to comment.