Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support using envvar in config YAML #236

Merged
merged 2 commits into from
Oct 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -32,4 +32,4 @@ jobs:
architecture: "x64"

- run: pip3 install hatch
- run: hatch run tests.py3.12-2.10:static-check
- run: CONFIG_ROOT_DIR=`pwd`"/dags" hatch run tests.py3.12-2.10:static-check
1 change: 1 addition & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ FROM python:3.8-slim
ARG AIRFLOW_VERSION=2.0.0
ARG AIRFLOW_HOME=/usr/local/airflow
ENV SLUGIFY_USES_TEXT_UNIDECODE=yes
ENV CONFIG_ROOT_DIR=/usr/local/airflow/dags/

RUN set -ex \
&& buildDeps=' \
Expand Down
11 changes: 7 additions & 4 deletions dagfactory/dagfactory.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,13 @@ def __join(loader: yaml.FullLoader, node: yaml.Node) -> str:

yaml.add_constructor("!join", __join, yaml.FullLoader)

config: Dict[str, Any] = yaml.load(
stream=open(config_filepath, "r", encoding="utf-8"),
Loader=yaml.FullLoader,
)
with open(config_filepath, "r", encoding="utf-8") as fp:
yaml.add_constructor("!join", __join, yaml.FullLoader)
config_with_env = os.path.expandvars(fp.read())
config: Dict[str, Any] = yaml.load(
stream=config_with_env,
Loader=yaml.FullLoader,
)
except Exception as err:
raise DagFactoryConfigException("Invalid DAG Factory config file") from err
return config
Expand Down
13 changes: 11 additions & 2 deletions examples/datasets/example_dag_datasets.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,17 @@
from airflow import DAG
import os
from pathlib import Path

# The following import is here so Airflow parses this file
# from airflow import DAG

import dagfactory


config_file = "/usr/local/airflow/dags/datasets/example_dag_datasets.yml"
DEFAULT_CONFIG_ROOT_DIR = "/usr/local/airflow/dags/"
CONFIG_ROOT_DIR = Path(os.getenv("CONFIG_ROOT_DIR", DEFAULT_CONFIG_ROOT_DIR))

config_file = str(CONFIG_ROOT_DIR / "datasets/example_dag_datasets.yml")

example_dag_factory = dagfactory.DagFactory(config_file)

# Creating task dependencies
Expand Down
4 changes: 2 additions & 2 deletions examples/datasets/example_dag_datasets.yml
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,13 @@ example_custom_config_dataset_producer_dag:
operator: airflow.operators.bash_operator.BashOperator
bash_command: "echo 1"
outlets:
file: /usr/local/airflow/dags/datasets/example_config_datasets.yml
file: $CONFIG_ROOT_DIR/datasets/example_config_datasets.yml
datasets: ['dataset_custom_1', 'dataset_custom_2']

example_custom_config_dataset_consumer_dag:
description: "Example DAG consumer custom config datasets"
schedule:
file: /usr/local/airflow/dags/datasets/example_config_datasets.yml
file: $CONFIG_ROOT_DIR/datasets/example_config_datasets.yml
datasets: ['dataset_custom_1', 'dataset_custom_2']
tasks:
task_1:
Expand Down
13 changes: 11 additions & 2 deletions examples/example_customize_operator.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,17 @@
from airflow import DAG
import os
from pathlib import Path

# The following import is here so Airflow parses this file
# from airflow import DAG

import dagfactory


config_file = "/usr/local/airflow/dags/example_customize_operator.yml"
DEFAULT_CONFIG_ROOT_DIR = "/usr/local/airflow/dags/"
CONFIG_ROOT_DIR = Path(os.getenv("CONFIG_ROOT_DIR", DEFAULT_CONFIG_ROOT_DIR))

config_file = str(CONFIG_ROOT_DIR / "example_customize_operator.yml")

example_dag_factory = dagfactory.DagFactory(config_file)

# Creating task dependencies
Expand Down
14 changes: 12 additions & 2 deletions examples/example_dag_factory.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,17 @@
from airflow import DAG
import os
from pathlib import Path

# The following import is here so Airflow parses this file
# from airflow import DAG

import dagfactory

config_file = "/usr/local/airflow/dags/example_dag_factory.yml"

DEFAULT_CONFIG_ROOT_DIR = "/usr/local/airflow/dags/"
CONFIG_ROOT_DIR = Path(os.getenv("CONFIG_ROOT_DIR", DEFAULT_CONFIG_ROOT_DIR))

config_file = str(CONFIG_ROOT_DIR / "example_dag_factory.yml")

example_dag_factory = dagfactory.DagFactory(config_file)

# Creating task dependencies
Expand Down
8 changes: 4 additions & 4 deletions examples/example_dag_factory.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@ default:
orientation: "LR"
schedule_interval: "0 1 * * *"
on_success_callback_name: print_hello
on_success_callback_file: /usr/local/airflow/dags/print_hello.py
on_success_callback_file: $CONFIG_ROOT_DIR/print_hello.py
on_failure_callback_name: print_hello
on_failure_callback_file: /usr/local/airflow/dags/print_hello.py
on_failure_callback_file: $CONFIG_ROOT_DIR/print_hello.py

example_dag:
default_args:
Expand All @@ -34,7 +34,7 @@ example_dag:
task_3:
operator: airflow.operators.python_operator.PythonOperator
python_callable_name: print_hello
python_callable_file: /usr/local/airflow/dags/print_hello.py
python_callable_file: $CONFIG_ROOT_DIR/print_hello.py
dependencies: [task_1]

example_dag2:
Expand Down Expand Up @@ -84,7 +84,7 @@ example_dag4:
task_3:
operator: airflow.operators.python_operator.PythonOperator
python_callable_name: print_hello
python_callable_file: /Users/buraky/Projects/dag-factory/examples/print_hello.py
python_callable_file: $CONFIG_ROOT_DIR/print_hello.py
task_group_name: task_group_1
dependencies: [task_2]
task_4:
Expand Down
12 changes: 10 additions & 2 deletions examples/example_dynamic_task_mapping.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,16 @@
from airflow import DAG
import os
from pathlib import Path

# The following import is here so Airflow parses this file
# from airflow import DAG

import dagfactory


config_file = "/usr/local/airflow/dags/example_dynamic_task_mapping.yml"
DEFAULT_CONFIG_ROOT_DIR = "/usr/local/airflow/dags/"
CONFIG_ROOT_DIR = Path(os.getenv("CONFIG_ROOT_DIR", DEFAULT_CONFIG_ROOT_DIR))

config_file = str(CONFIG_ROOT_DIR / "example_dynamic_task_mapping.yml")
example_dag_factory = dagfactory.DagFactory(config_file)

# Creating task dependencies
Expand Down
4 changes: 2 additions & 2 deletions examples/example_dynamic_task_mapping.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,11 @@ test_expand:
request:
operator: airflow.operators.python.PythonOperator
python_callable_name: example_task_mapping
python_callable_file: /usr/local/airflow/dags/expand_tasks.py
python_callable_file: $CONFIG_ROOT_DIR/expand_tasks.py
process:
operator: airflow.operators.python_operator.PythonOperator
python_callable_name: expand_task
python_callable_file: /usr/local/airflow/dags/expand_tasks.py
python_callable_file: $CONFIG_ROOT_DIR/expand_tasks.py
partial:
op_kwargs:
test_id: "test"
Expand Down
Loading