Skip to content

Commit

Permalink
Add support for loading manifest from cloud store using Airflow Objec…
Browse files Browse the repository at this point in the history
…t Store (#1109)

## Summary

This PR introduces the capability to load `manifest.json` files from
various cloud storage services using Airflow's Object Store integration.
The supported cloud storages include AWS S3, Google Cloud Storage (GCS),
and Azure Blob Storage. The feature allows seamless integration with
remote paths, providing enhanced flexibility and scalability for
managing DBT projects.

### Key Changes

1. Parameters in DbtDag and DbtTaskGroup:
`manifest_path`: Accepts both local paths and remote URLs (e.g., S3,
GCS, Azure Blob Storage).
`manifest_conn_id`: (Optional) An Airflow connection ID for accessing
the remote path.
2. Automatic Detection of Storage Type:
The system automatically identifies the storage service based on the
scheme of the URL provided (e.g., s3://, gs://, abfs://) by integrating
with Airflow Object Store
3. If a `manifest_conn_id` is provided, it is used to fetch the
necessary credentials.
4. If no `manifest_conn_id` is provided, the default connection ID for
the identified scheme is used.

### Validation and Error Handling

1. Validates the existence of the `manifest.json` file when a path is
specified.
2. Raises appropriate errors if a remote `manifest_path` is given but
the required min Airflow version 2.8(Object Store feature) support is
not available.


### Backward Compatibility
Ensures compatibility with existing workflows that use local paths for
the manifest.json.


### How to Use

1. Local Path:
```
DbtDag(
    project_config=ProjectConfig(
        manifest_path="/path/to/local/manifest.json",
    ),
    ...
)
```
2. Remote Path (e.g., S3):
```
DbtDag(
    project_config=ProjectConfig(
        manifest_path="s3://bucket/path/to/manifest.json",
        manifest_conn_id="aws_s3_conn",
    ),
    ...
)
```
3.  Remote Path without Explicit Connection ID:
```
DbtDag(
    project_config=ProjectConfig(
        manifest_path="gs://bucket/path/to/manifest.json",
        # No manifest_conn_id provided, will use default Airflow GCS connection `google_cloud_default`
    ),
    ...
)
```


### Additional Notes

1. Ensure that the required Airflow version (2.8 or later) is used to
take advantage of the Object Store features.
2. Review the updated documentation for detailed usage instructions and
examples.


### Testing

1. Added unit tests to cover various scenarios including local paths,
remote paths with and without manifest_conn_id.
2. Verified integration with different cloud storage services (AWS S3,
GCS, Azure Blob Storage).
3. Ensured backward compatibility with existing local path workflows.

## Related Issue(s)

closes: #448

## Breaking Change?
No.

## Checklist

- [x] I have made corresponding changes to the documentation (if
required)
- [x] I have added tests that prove my fix is effective or that my
feature works

---------

Co-authored-by: Tatiana Al-Chueyr <tatiana.alchueyr@gmail.com>
  • Loading branch information
pankajkoti and tatiana authored Jul 24, 2024
1 parent 89a8034 commit b8a14a3
Show file tree
Hide file tree
Showing 13 changed files with 264 additions and 42 deletions.
12 changes: 12 additions & 0 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,9 @@ jobs:
AIRFLOW__COSMOS__ENABLE_CACHE_DBT_LS: 0
AIRFLOW_HOME: /home/runner/work/astronomer-cosmos/astronomer-cosmos/
AIRFLOW_CONN_EXAMPLE_CONN: postgres://postgres:postgres@0.0.0.0:5432/postgres
AIRFLOW_CONN_AWS_S3_CONN: ${{ secrets.AIRFLOW_CONN_AWS_S3_CONN }}
AIRFLOW_CONN_GCP_GS_CONN: ${{ secrets.AIRFLOW_CONN_GCP_GS_CONN }}
AIRFLOW_CONN_AZURE_ABFS_CONN: ${{ secrets.AIRFLOW_CONN_AZURE_ABFS_CONN }}
DATABRICKS_HOST: mock
DATABRICKS_WAREHOUSE_ID: mock
DATABRICKS_TOKEN: mock
Expand Down Expand Up @@ -213,6 +216,9 @@ jobs:
env:
AIRFLOW_HOME: /home/runner/work/astronomer-cosmos/astronomer-cosmos/
AIRFLOW_CONN_EXAMPLE_CONN: postgres://postgres:postgres@0.0.0.0:5432/postgres
AIRFLOW_CONN_AWS_S3_CONN: ${{ secrets.AIRFLOW_CONN_AWS_S3_CONN }}
AIRFLOW_CONN_GCP_GS_CONN: ${{ secrets.AIRFLOW_CONN_GCP_GS_CONN }}
AIRFLOW_CONN_AZURE_ABFS_CONN: ${{ secrets.AIRFLOW_CONN_AZURE_ABFS_CONN }}
PYTHONPATH: /home/runner/work/astronomer-cosmos/astronomer-cosmos/:$PYTHONPATH
AIRFLOW_CONN_DATABRICKS_DEFAULT: ${{ secrets.AIRFLOW_CONN_DATABRICKS_DEFAULT }}
DATABRICKS_CLUSTER_ID: ${{ secrets.DATABRICKS_CLUSTER_ID }}
Expand Down Expand Up @@ -275,6 +281,9 @@ jobs:
env:
AIRFLOW_HOME: /home/runner/work/astronomer-cosmos/astronomer-cosmos/
AIRFLOW_CONN_EXAMPLE_CONN: postgres://postgres:postgres@0.0.0.0:5432/postgres
AIRFLOW_CONN_AWS_S3_CONN: ${{ secrets.AIRFLOW_CONN_AWS_S3_CONN }}
AIRFLOW_CONN_GCP_GS_CONN: ${{ secrets.AIRFLOW_CONN_GCP_GS_CONN }}
AIRFLOW_CONN_AZURE_ABFS_CONN: ${{ secrets.AIRFLOW_CONN_AZURE_ABFS_CONN }}
AIRFLOW__CORE__DAGBAG_IMPORT_TIMEOUT: 90.0
PYTHONPATH: /home/runner/work/astronomer-cosmos/astronomer-cosmos/:$PYTHONPATH
COSMOS_CONN_POSTGRES_PASSWORD: ${{ secrets.COSMOS_CONN_POSTGRES_PASSWORD }}
Expand Down Expand Up @@ -348,6 +357,9 @@ jobs:
env:
AIRFLOW_HOME: /home/runner/work/astronomer-cosmos/astronomer-cosmos/
AIRFLOW_CONN_EXAMPLE_CONN: postgres://postgres:postgres@0.0.0.0:5432/postgres
AIRFLOW_CONN_AWS_S3_CONN: ${{ secrets.AIRFLOW_CONN_AWS_S3_CONN }}
AIRFLOW_CONN_GCP_GS_CONN: ${{ secrets.AIRFLOW_CONN_GCP_GS_CONN }}
AIRFLOW_CONN_AZURE_ABFS_CONN: ${{ secrets.AIRFLOW_CONN_AZURE_ABFS_CONN }}
AIRFLOW__CORE__DAGBAG_IMPORT_TIMEOUT: 90.0
PYTHONPATH: /home/runner/work/astronomer-cosmos/astronomer-cosmos/:$PYTHONPATH
COSMOS_CONN_POSTGRES_PASSWORD: ${{ secrets.COSMOS_CONN_POSTGRES_PASSWORD }}
Expand Down
43 changes: 34 additions & 9 deletions cosmos/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,12 @@
from pathlib import Path
from typing import Any, Callable, Iterator

from airflow.version import version as airflow_version

from cosmos.cache import create_cache_profile, get_cached_profile, is_profile_cache_enabled
from cosmos.constants import (
DEFAULT_PROFILES_FILE_NAME,
FILE_SCHEME_AIRFLOW_DEFAULT_CONN_ID_MAP,
DbtResourceType,
ExecutionMode,
InvocationMode,
Expand All @@ -24,6 +27,7 @@
from cosmos.exceptions import CosmosValueError
from cosmos.log import get_logger
from cosmos.profiles import BaseProfileMapping
from cosmos.settings import AIRFLOW_IO_AVAILABLE

logger = get_logger(__name__)

Expand Down Expand Up @@ -150,6 +154,7 @@ def __init__(
seeds_relative_path: str | Path = "seeds",
snapshots_relative_path: str | Path = "snapshots",
manifest_path: str | Path | None = None,
manifest_conn_id: str | None = None,
project_name: str | None = None,
env_vars: dict[str, str] | None = None,
dbt_vars: dict[str, str] | None = None,
Expand All @@ -175,7 +180,25 @@ def __init__(
self.project_name = self.dbt_project_path.stem

if manifest_path:
self.manifest_path = Path(manifest_path)
manifest_path_str = str(manifest_path)
if not manifest_conn_id:
manifest_scheme = manifest_path_str.split("://")[0]
# Use the default Airflow connection ID for the scheme if it is not provided.
manifest_conn_id = FILE_SCHEME_AIRFLOW_DEFAULT_CONN_ID_MAP.get(manifest_scheme, None)

if manifest_conn_id is not None and not AIRFLOW_IO_AVAILABLE:
raise CosmosValueError(
f"The manifest path {manifest_path_str} uses a remote file scheme, but the required Object "
f"Storage feature is unavailable in Airflow version {airflow_version}. Please upgrade to "
f"Airflow 2.8 or later."
)

if AIRFLOW_IO_AVAILABLE:
from airflow.io.path import ObjectStoragePath

self.manifest_path = ObjectStoragePath(manifest_path_str, conn_id=manifest_conn_id)
else:
self.manifest_path = Path(manifest_path_str)

self.env_vars = env_vars
self.dbt_vars = dbt_vars
Expand All @@ -192,28 +215,30 @@ def validate_project(self) -> None:
"""

mandatory_paths = {}

# We validate the existence of paths added to the `mandatory_paths` map by calling the `exists()` method on each
# one. Starting with Cosmos 1.6.0, if the Airflow version is `>= 2.8.0` and a `manifest_path` is provided, we
# cast it to an `airflow.io.path.ObjectStoragePath` instance during `ProjectConfig` initialisation, and it
# includes the `exists()` method. For the remaining paths in the `mandatory_paths` map, we cast them to
# `pathlib.Path` objects to ensure that the subsequent `exists()` call while iterating on the `mandatory_paths`
# map works correctly for all paths, thereby validating the project.
if self.dbt_project_path:
project_yml_path = self.dbt_project_path / "dbt_project.yml"
mandatory_paths = {
"dbt_project.yml": project_yml_path,
"models directory ": self.models_path,
"dbt_project.yml": Path(project_yml_path) if project_yml_path else None,
"models directory ": Path(self.models_path) if self.models_path else None,
}
if self.manifest_path:
mandatory_paths["manifest"] = self.manifest_path

for name, path in mandatory_paths.items():
if path is None or not Path(path).exists():
if path is None or not path.exists():
raise CosmosValueError(f"Could not find {name} at {path}")

def is_manifest_available(self) -> bool:
"""
Check if the `dbt` project manifest is set and if the file exists.
"""
if not self.manifest_path:
return False

return self.manifest_path.exists()
return self.manifest_path.exists() if self.manifest_path else False


@dataclass
Expand Down
14 changes: 14 additions & 0 deletions cosmos/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@
from pathlib import Path

import aenum
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from airflow.providers.google.cloud.hooks.gcs import GCSHook
from airflow.providers.microsoft.azure.hooks.wasb import WasbHook
from packaging.version import Version

DBT_PROFILE_PATH = Path(os.path.expanduser("~")).joinpath(".dbt/profiles.yml")
Expand All @@ -28,6 +31,17 @@
PARTIALLY_SUPPORTED_AIRFLOW_VERSIONS = [Version("2.9.0"), Version("2.9.1")]


S3_FILE_SCHEME = "s3"
GS_FILE_SCHEME = "gs"
ABFS_FILE_SCHEME = "abfs"

FILE_SCHEME_AIRFLOW_DEFAULT_CONN_ID_MAP = {
S3_FILE_SCHEME: S3Hook.default_conn_name,
GS_FILE_SCHEME: GCSHook.default_conn_name,
ABFS_FILE_SCHEME: WasbHook.default_conn_name,
}


class LoadMode(Enum):
"""
Supported ways to load a `dbt` project into a `DbtGraph` instance.
Expand Down
8 changes: 6 additions & 2 deletions cosmos/dbt/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from functools import cached_property
from pathlib import Path
from subprocess import PIPE, Popen
from typing import Any
from typing import TYPE_CHECKING, Any

from airflow.models import Variable

Expand Down Expand Up @@ -611,7 +611,11 @@ def load_from_dbt_manifest(self) -> None:
raise CosmosLoadDbtException("Unable to load manifest without ExecutionConfig.dbt_project_path")

nodes = {}
with open(self.project.manifest_path) as fp: # type: ignore[arg-type]

if TYPE_CHECKING:
assert self.project.manifest_path is not None # pragma: no cover

with self.project.manifest_path.open() as fp:
manifest = json.load(fp)

resources = {**manifest.get("nodes", {}), **manifest.get("sources", {}), **manifest.get("exposures", {})}
Expand Down
4 changes: 4 additions & 0 deletions cosmos/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

import airflow
from airflow.configuration import conf
from airflow.version import version as airflow_version
from packaging.version import Version

from cosmos.constants import DEFAULT_COSMOS_CACHE_DIR_NAME, DEFAULT_OPENLINEAGE_NAMESPACE

Expand All @@ -24,3 +26,5 @@
LINEAGE_NAMESPACE = conf.get("openlineage", "namespace")
except airflow.exceptions.AirflowConfigException:
LINEAGE_NAMESPACE = os.getenv("OPENLINEAGE_NAMESPACE", DEFAULT_OPENLINEAGE_NAMESPACE)

AIRFLOW_IO_AVAILABLE = Version(airflow_version) >= Version("2.8.0")
97 changes: 82 additions & 15 deletions dev/dags/cosmos_manifest_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,17 @@
from datetime import datetime
from pathlib import Path

from cosmos import DbtDag, ExecutionConfig, LoadMode, ProfileConfig, ProjectConfig, RenderConfig
from airflow.decorators import dag
from airflow.operators.empty import EmptyOperator

from cosmos import DbtTaskGroup, ExecutionConfig, LoadMode, ProfileConfig, ProjectConfig, RenderConfig
from cosmos.profiles import DbtProfileConfigVars, PostgresUserPasswordProfileMapping

DEFAULT_DBT_ROOT_PATH = Path(__file__).parent / "dbt"
DBT_ROOT_PATH = Path(os.getenv("DBT_ROOT_PATH", DEFAULT_DBT_ROOT_PATH))

execution_config = ExecutionConfig(dbt_project_path=DBT_ROOT_PATH / "jaffle_shop")

profile_config = ProfileConfig(
profile_name="default",
target_name="dev",
Expand All @@ -22,22 +27,84 @@
),
)

# [START local_example]
cosmos_manifest_example = DbtDag(
# dbt/cosmos-specific parameters
project_config=ProjectConfig(
manifest_path=DBT_ROOT_PATH / "jaffle_shop" / "target" / "manifest.json",
project_name="jaffle_shop",
),
profile_config=profile_config,
render_config=RenderConfig(load_method=LoadMode.DBT_MANIFEST, select=["path:seeds/raw_customers.csv"]),
execution_config=ExecutionConfig(dbt_project_path=DBT_ROOT_PATH / "jaffle_shop"),
operator_args={"install_deps": True},
# normal dag parameters
render_config = RenderConfig(load_method=LoadMode.DBT_MANIFEST, select=["path:seeds/raw_customers.csv"])


@dag(
schedule_interval="@daily",
start_date=datetime(2023, 1, 1),
catchup=False,
dag_id="cosmos_manifest_example",
default_args={"retries": 2},
)
# [END local_example]
def cosmos_manifest_example() -> None:

pre_dbt = EmptyOperator(task_id="pre_dbt")

# [START local_example]
local_example = DbtTaskGroup(
group_id="local_example",
project_config=ProjectConfig(
manifest_path=DBT_ROOT_PATH / "jaffle_shop" / "target" / "manifest.json",
project_name="jaffle_shop",
),
profile_config=profile_config,
render_config=render_config,
execution_config=execution_config,
operator_args={"install_deps": True},
)
# [END local_example]

# [START aws_s3_example]
aws_s3_example = DbtTaskGroup(
group_id="aws_s3_example",
project_config=ProjectConfig(
manifest_path="s3://cosmos-manifest-test/manifest.json",
manifest_conn_id="aws_s3_conn",
# `manifest_conn_id` is optional. If not provided, the default connection ID `aws_default` is used.
project_name="jaffle_shop",
),
profile_config=profile_config,
render_config=render_config,
execution_config=execution_config,
operator_args={"install_deps": True},
)
# [END aws_s3_example]

# [START gcp_gs_example]
gcp_gs_example = DbtTaskGroup(
group_id="gcp_gs_example",
project_config=ProjectConfig(
manifest_path="gs://cosmos-manifest-test/manifest.json",
manifest_conn_id="gcp_gs_conn",
# `manifest_conn_id` is optional. If not provided, the default connection ID `google_cloud_default` is used.
project_name="jaffle_shop",
),
profile_config=profile_config,
render_config=render_config,
execution_config=execution_config,
operator_args={"install_deps": True},
)
# [END gcp_gs_example]

# [START azure_abfs_example]
azure_abfs_example = DbtTaskGroup(
group_id="azure_abfs_example",
project_config=ProjectConfig(
manifest_path="abfs://cosmos-manifest-test/manifest.json",
manifest_conn_id="azure_abfs_conn",
# `manifest_conn_id` is optional. If not provided, the default connection ID `wasb_default` is used.
project_name="jaffle_shop",
),
profile_config=profile_config,
render_config=render_config,
execution_config=execution_config,
operator_args={"install_deps": True},
)
# [END azure_abfs_example]

post_dbt = EmptyOperator(task_id="post_dbt")

(pre_dbt >> local_example >> aws_s3_example >> gcp_gs_example >> azure_abfs_example >> post_dbt)


cosmos_manifest_example()
59 changes: 48 additions & 11 deletions docs/configuration/parsing-methods.rst
Original file line number Diff line number Diff line change
Expand Up @@ -34,26 +34,63 @@ When you don't supply an argument to the ``load_mode`` parameter (or you supply

To use this method, you don't need to supply any additional config. This is the default.


``dbt_manifest``
----------------

If you already have a ``manifest.json`` file created by dbt, Cosmos will parse the manifest to generate your DAG.

You can supply a ``manifest_path`` parameter on the DbtDag / DbtTaskGroup with a path to a ``manifest.json`` file.

To use this:
Before Cosmos 1.6.0, the path to ``manifest.json`` supplied via the ``DbtDag`` / ``DbtTaskGroup`` ``manifest_path``
argument accepted only local paths. However, starting with Cosmos 1.6.0, if you've Airflow >= 2.8.0, you can supply a
a remote path (e.g., an S3 URL) too. For supporting remote paths, Cosmos leverages the
`Airflow Object Storage <https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/objectstorage.html>`_
feature released in Airflow 2.8.0.
For remote paths, you can specify a ``manifest_conn_id``, which is an
Airflow connection ID containing the credentials to access the remote path. If you do not specify a
``manifest_conn_id``, Cosmos will use the default connection ID specific to the scheme, identified using the Airflow
hook's ``default_conn_id`` corresponding to the URL's scheme.

.. code-block:: python
Examples of how to supply ``manifest.json`` using ``manifest_path`` argument:

- Local path:

.. literalinclude:: ../../dev/dags/cosmos_manifest_example.py
:language: python
:start-after: [START local_example]
:end-before: [END local_example]

- AWS S3 URL (available since Cosmos 1.6):

Ensure that you have the required dependencies installed to use the S3 URL. You can install the required dependencies
using the following command: ``pip install "astronomer-cosmos[amazon]"``

.. literalinclude:: ../../dev/dags/cosmos_manifest_example.py
:language: python
:start-after: [START aws_s3_example]
:end-before: [END aws_s3_example]

- GCP GCS URL (available since Cosmos 1.6):

Ensure that you have the required dependencies installed to use the GCS URL. You can install the required dependencies
using the following command: ``pip install "astronomer-cosmos[google]"``

.. literalinclude:: ../../dev/dags/cosmos_manifest_example.py
:language: python
:start-after: [START gcp_gs_example]
:end-before: [END gcp_gs_example]

- Azure Blob Storage URL (available since Cosmos 1.6):

Ensure that you have the required dependencies installed to use the Azure blob URL. You can install the required
dependencies using the following command: ``pip install "astronomer-cosmos[microsoft]"``

.. literalinclude:: ../../dev/dags/cosmos_manifest_example.py
:language: python
:start-after: [START azure_abfs_example]
:end-before: [END azure_abfs_example]

DbtDag(
project_config=ProjectConfig(
manifest_path="/path/to/manifest.json",
),
render_config=RenderConfig(
load_method=LoadMode.DBT_MANIFEST,
),
# ...,
)

``dbt_ls``
----------
Expand Down
Loading

0 comments on commit b8a14a3

Please sign in to comment.