Skip to content

dynamic dag

Földi Tamás edited this page Mar 15, 2020 · 1 revision

Airflow DAG Design for COVID-19 (Proposal)

The purpose of this describe the to-be process to execute Airflow DAGs to generate COVID-19 related ETL processes such as notebooks, S3 deployments and Snowflake table population.

Concepts

We use most of the times convention over configuration principles to make things as dynamic as possible. The goal is to reduce Airflow Variables to the minimal, mostly for credentials and connections.

Main Data pipeline - Dynamic DAG Generator

The proposed architecture will have a single application stored in /dag folder (in the DagBag path) that generates one DAG for each data source. These DAGs contains:

  • Extraction from source systems, using notebooks from /notebooks folder. These notebooks MUST put their outputs to /output folder in the repository. Contents of /output are not tracked by git.
    • The generated file name MUST be prefixed with the basename of the notebook. Example: notebooks/JHU_COVID-19.ipynb will generate its file(s) as output/JHU_COVID-19*.csv. This will help downstream tasks to find the generated files in the output folder.
  • Ingestion to S3 bucket. Bucket name SHOULD be an Airflow variable.
  • Ingestion to Snowflake database. The database table names MUST be the same as the CSV file basenames.
  • Later, execute data quality checks
  • The DAG SHOULD notify the users in case of failures / DQ issues.
  • The Schedule should of the DAG SHOULD be None

Technical Specs

The single python file (proposed name: covid_etl_dag.py) will generate the above mentioned DAGs and Tasks. By utilizing naming conventions and existing files the program flow is the following:

  1. Looks for ipynb files /notebooks folder. Generate a DAG for each notebook with the ID of etl_{{basename of the filename}}. Example: etl_JHU_COVID-19.
  2. Generate the necessary tasks inside the dynamically created DAG, as:
    1. cleanup_output_folder
    2. execute_notebook
    3. upload_to_s3
    4. upload_to_snowflake
    5. send_email
  3. Set up of dependencies

Triggering jobs

By using @szilardhuber's code, we can create trigger DAG for github sources that looks for changes in the github repository and triggers the specific subdag (like etl_JHU_COVID-19). The proposed schedule is every ten minutes.

Github update

I propose to have a dag that pulls the github master every ten minutes in case the ENVIRONMENT variable set to production. We can make it trigger based too (webhooks or something).