diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 1ced9fe5..9ddfe985 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -32,4 +32,4 @@ jobs: architecture: "x64" - run: pip3 install hatch - - run: hatch run tests.py3.12-2.10:static-check + - run: CONFIG_ROOT_DIR=`pwd`"/dags" hatch run tests.py3.12-2.10:static-check diff --git a/Dockerfile b/Dockerfile index 5adf041a..f7fb8770 100644 --- a/Dockerfile +++ b/Dockerfile @@ -3,6 +3,7 @@ FROM python:3.8-slim ARG AIRFLOW_VERSION=2.0.0 ARG AIRFLOW_HOME=/usr/local/airflow ENV SLUGIFY_USES_TEXT_UNIDECODE=yes +ENV CONFIG_ROOT_DIR=/usr/local/airflow/dags/ RUN set -ex \ && buildDeps=' \ diff --git a/dagfactory/dagfactory.py b/dagfactory/dagfactory.py index 40b3d53b..b469e551 100644 --- a/dagfactory/dagfactory.py +++ b/dagfactory/dagfactory.py @@ -60,10 +60,13 @@ def __join(loader: yaml.FullLoader, node: yaml.Node) -> str: yaml.add_constructor("!join", __join, yaml.FullLoader) - config: Dict[str, Any] = yaml.load( - stream=open(config_filepath, "r", encoding="utf-8"), - Loader=yaml.FullLoader, - ) + with open(config_filepath, "r", encoding="utf-8") as fp: + yaml.add_constructor("!join", __join, yaml.FullLoader) + config_with_env = os.path.expandvars(fp.read()) + config: Dict[str, Any] = yaml.load( + stream=config_with_env, + Loader=yaml.FullLoader, + ) except Exception as err: raise DagFactoryConfigException("Invalid DAG Factory config file") from err return config diff --git a/examples/datasets/example_dag_datasets.py b/examples/datasets/example_dag_datasets.py index d6f82573..4eb94b68 100644 --- a/examples/datasets/example_dag_datasets.py +++ b/examples/datasets/example_dag_datasets.py @@ -1,8 +1,17 @@ -from airflow import DAG +import os +from pathlib import Path + +# The following import is here so Airflow parses this file +# from airflow import DAG + import dagfactory -config_file = "/usr/local/airflow/dags/datasets/example_dag_datasets.yml" +DEFAULT_CONFIG_ROOT_DIR = "/usr/local/airflow/dags/" +CONFIG_ROOT_DIR = Path(os.getenv("CONFIG_ROOT_DIR", DEFAULT_CONFIG_ROOT_DIR)) + +config_file = str(CONFIG_ROOT_DIR / "datasets/example_dag_datasets.yml") + example_dag_factory = dagfactory.DagFactory(config_file) # Creating task dependencies diff --git a/examples/datasets/example_dag_datasets.yml b/examples/datasets/example_dag_datasets.yml index b1450aa4..d04ddeac 100644 --- a/examples/datasets/example_dag_datasets.yml +++ b/examples/datasets/example_dag_datasets.yml @@ -40,13 +40,13 @@ example_custom_config_dataset_producer_dag: operator: airflow.operators.bash_operator.BashOperator bash_command: "echo 1" outlets: - file: /usr/local/airflow/dags/datasets/example_config_datasets.yml + file: $CONFIG_ROOT_DIR/datasets/example_config_datasets.yml datasets: ['dataset_custom_1', 'dataset_custom_2'] example_custom_config_dataset_consumer_dag: description: "Example DAG consumer custom config datasets" schedule: - file: /usr/local/airflow/dags/datasets/example_config_datasets.yml + file: $CONFIG_ROOT_DIR/datasets/example_config_datasets.yml datasets: ['dataset_custom_1', 'dataset_custom_2'] tasks: task_1: diff --git a/examples/example_customize_operator.py b/examples/example_customize_operator.py index 0fcd8eec..6e456638 100644 --- a/examples/example_customize_operator.py +++ b/examples/example_customize_operator.py @@ -1,8 +1,17 @@ -from airflow import DAG +import os +from pathlib import Path + +# The following import is here so Airflow parses this file +# from airflow import DAG + import dagfactory -config_file = "/usr/local/airflow/dags/example_customize_operator.yml" +DEFAULT_CONFIG_ROOT_DIR = "/usr/local/airflow/dags/" +CONFIG_ROOT_DIR = Path(os.getenv("CONFIG_ROOT_DIR", DEFAULT_CONFIG_ROOT_DIR)) + +config_file = str(CONFIG_ROOT_DIR / "example_customize_operator.yml") + example_dag_factory = dagfactory.DagFactory(config_file) # Creating task dependencies diff --git a/examples/example_dag_factory.py b/examples/example_dag_factory.py index e2ed59bc..84bca73c 100644 --- a/examples/example_dag_factory.py +++ b/examples/example_dag_factory.py @@ -1,7 +1,17 @@ -from airflow import DAG +import os +from pathlib import Path + +# The following import is here so Airflow parses this file +# from airflow import DAG + import dagfactory -config_file = "/usr/local/airflow/dags/example_dag_factory.yml" + +DEFAULT_CONFIG_ROOT_DIR = "/usr/local/airflow/dags/" +CONFIG_ROOT_DIR = Path(os.getenv("CONFIG_ROOT_DIR", DEFAULT_CONFIG_ROOT_DIR)) + +config_file = str(CONFIG_ROOT_DIR / "example_dag_factory.yml") + example_dag_factory = dagfactory.DagFactory(config_file) # Creating task dependencies diff --git a/examples/example_dag_factory.yml b/examples/example_dag_factory.yml index 8ab9c66f..f1838628 100644 --- a/examples/example_dag_factory.yml +++ b/examples/example_dag_factory.yml @@ -12,9 +12,9 @@ default: orientation: "LR" schedule_interval: "0 1 * * *" on_success_callback_name: print_hello - on_success_callback_file: /usr/local/airflow/dags/print_hello.py + on_success_callback_file: $CONFIG_ROOT_DIR/print_hello.py on_failure_callback_name: print_hello - on_failure_callback_file: /usr/local/airflow/dags/print_hello.py + on_failure_callback_file: $CONFIG_ROOT_DIR/print_hello.py example_dag: default_args: @@ -34,7 +34,7 @@ example_dag: task_3: operator: airflow.operators.python_operator.PythonOperator python_callable_name: print_hello - python_callable_file: /usr/local/airflow/dags/print_hello.py + python_callable_file: $CONFIG_ROOT_DIR/print_hello.py dependencies: [task_1] example_dag2: @@ -84,7 +84,7 @@ example_dag4: task_3: operator: airflow.operators.python_operator.PythonOperator python_callable_name: print_hello - python_callable_file: /Users/buraky/Projects/dag-factory/examples/print_hello.py + python_callable_file: $CONFIG_ROOT_DIR/print_hello.py task_group_name: task_group_1 dependencies: [task_2] task_4: diff --git a/examples/example_dynamic_task_mapping.py b/examples/example_dynamic_task_mapping.py index 955e9f96..4b17f9b1 100644 --- a/examples/example_dynamic_task_mapping.py +++ b/examples/example_dynamic_task_mapping.py @@ -1,8 +1,16 @@ -from airflow import DAG +import os +from pathlib import Path + +# The following import is here so Airflow parses this file +# from airflow import DAG + import dagfactory -config_file = "/usr/local/airflow/dags/example_dynamic_task_mapping.yml" +DEFAULT_CONFIG_ROOT_DIR = "/usr/local/airflow/dags/" +CONFIG_ROOT_DIR = Path(os.getenv("CONFIG_ROOT_DIR", DEFAULT_CONFIG_ROOT_DIR)) + +config_file = str(CONFIG_ROOT_DIR / "example_dynamic_task_mapping.yml") example_dag_factory = dagfactory.DagFactory(config_file) # Creating task dependencies diff --git a/examples/example_dynamic_task_mapping.yml b/examples/example_dynamic_task_mapping.yml index a638b186..078e4f2d 100644 --- a/examples/example_dynamic_task_mapping.yml +++ b/examples/example_dynamic_task_mapping.yml @@ -9,11 +9,11 @@ test_expand: request: operator: airflow.operators.python.PythonOperator python_callable_name: example_task_mapping - python_callable_file: /usr/local/airflow/dags/expand_tasks.py + python_callable_file: $CONFIG_ROOT_DIR/expand_tasks.py process: operator: airflow.operators.python_operator.PythonOperator python_callable_name: expand_task - python_callable_file: /usr/local/airflow/dags/expand_tasks.py + python_callable_file: $CONFIG_ROOT_DIR/expand_tasks.py partial: op_kwargs: test_id: "test"