Skip to content

Commit

Permalink
Simplify cloud credentials mounting (#548)
Browse files Browse the repository at this point in the history
Addresses #256
  • Loading branch information
PhilippeMoussalli committed Oct 25, 2023
1 parent 522488b commit a2611ca
Show file tree
Hide file tree
Showing 8 changed files with 327 additions and 32 deletions.
3 changes: 2 additions & 1 deletion data_explorer/app/widgets.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ def build_sidebar(base_path: str) -> Tuple[Manifest, str, Dict[str, str]]:
selected_pipeline_path = os.path.join(base_path, selected_pipeline)

# 2) List available runs in descending order (most recent first)
available_runs = [os.path.basename(item) for item in fs.ls(selected_pipeline_path)]
available_runs = [os.path.basename(item) for item in fs.ls(selected_pipeline_path) if
item != "cache"]
available_runs.sort(reverse=True)
selected_run = st.sidebar.selectbox("Select run", available_runs)
selected_run_path = os.path.join(*[base_path, selected_pipeline, selected_run])
Expand Down
6 changes: 3 additions & 3 deletions docs/data_explorer.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,13 @@ fondant explore [--base_path BASE_PATH] [--container CONTAINER] [--tag TAG] [--p
```

Where the base path can be either a local or remote base path. Make sure to pass the proper mount credentials arguments when using a remote base path or a local base path
that references remote datasets.
that references remote datasets. You can do that either with `--auth-gcp`, `--auth-aws` or `--auth-azure` to
mount your default local cloud credentials to the pipeline. Or You can also use the `--credentials` argument to mount custom credentials to the local container pipeline.

Example:

```bash
fondant explore --base_path gs://foo/bar \
-c $HOME/.config/gcloud/application_default_credentials.json:/root/.config/gcloud/application_default_credentials.json
fondant explore --base_path gs://foo/bar --auth-gcp
```
## Data explorer UI

Expand Down
10 changes: 7 additions & 3 deletions docs/pipeline.md
Original file line number Diff line number Diff line change
Expand Up @@ -98,11 +98,15 @@ during component development.

The local runner will try to check if the `base_path` of the pipeline is a local or remote storage. If it's local, the `base_path` will be mounted as a bind volume on every service/component.

If you want to use remote paths (GCS, S3, etc.) you can use the `--extra_volumes` argument to mount extra credentials. This volume will be mounted to every component/service of the docker-compose spec.
If you want to use remote paths (GCS, S3, etc.) you can use the `--auth-gcp`, `--auth-aws` or `--auth-azure`.
This will mount your default local cloud credentials to the pipeline. Make sure you are authenticated locally before running the pipeline and
that you have the correct permissions to access the `base_path` of the pipeline (read/write/create).

You can also use the `--extra_volumes` argument to mount extra credentials or additional files.
This volumes will be mounted to every component/service of the docker-compose spec.

```bash
fondant run local <pipeline_ref> \
--extra-volumes $HOME/.aws/credentials:/root/.aws/credential
fondant run local <pipeline_ref> --auth-gcp
```

### Vertex Runner
Expand Down
98 changes: 87 additions & 11 deletions src/fondant/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,13 @@
import importlib
import inspect
import logging
import os
import shutil
import sys
import textwrap
import typing as t
from collections import defaultdict
from enum import Enum
from pathlib import Path
from types import ModuleType

Expand All @@ -39,6 +41,26 @@
logger = logging.getLogger(__name__)


class CloudCredentialsMount(Enum):
home_directory = os.path.expanduser("~")
AWS = f"{home_directory}/credentials:/root/.aws/credentials"
GCP = f"{home_directory}/.config/gcloud/application_default_credentials.json:/root/.config/gcloud/application_default_credentials.json"
AZURE = f"{home_directory}/.azure:/root/.azure"


def get_cloud_credentials(args) -> t.Optional[str]:
if args.auth_gcp:
return CloudCredentialsMount.GCP.value
if args.auth_aws:
return CloudCredentialsMount.AWS.value
if args.auth_azure:
return CloudCredentialsMount.AZURE.value
if args.credentials:
return args.credentials

return None


def entrypoint():
"""Entrypoint for the fondant CLI."""
parser = argparse.ArgumentParser(
Expand Down Expand Up @@ -103,6 +125,8 @@ def register_explore(parent_parser):
""",
),
)
auth_group = parser.add_mutually_exclusive_group()

parser.add_argument(
"--base_path",
"-b",
Expand Down Expand Up @@ -131,21 +155,39 @@ def register_explore(parent_parser):
default=8501,
help="Port to expose the container on.",
)
parser.add_argument(

auth_group.add_argument(
"--auth-gcp",
action="store_true",
help=f"Flag to authenticate with GCP. Uses the following mount command"
f" `{CloudCredentialsMount.GCP.value}`",
)

auth_group.add_argument(
"--auth-azure",
action="store_true",
help="Flag to authenticate with Azure. Uses the following mount command"
f" `{CloudCredentialsMount.AZURE.value}`",
)

auth_group.add_argument(
"--auth-aws",
action="store_true",
help="Flag to authenticate with AWS. Uses the following mount command"
f" `{CloudCredentialsMount.AWS.value}`",
)

auth_group.add_argument(
"--credentials",
"-c",
type=str,
default=None,
help="""Path mapping of the source (local) and target (docker file system)
credential paths in the format of src:target
\nExamples:\n
Google Cloud: $HOME/.config/gcloud/application_default_credentials.json:/root/."
+ "config/gcloud/application_default_credentials.json
AWS: HOME/.aws/credentials:/root/.aws/credentials
More info on
Google Cloud:
https://cloud.google.com/docs/authentication/application-default-credentials
AWS: https: // docs.aws.amazon.com/sdkref/latest/guide/file-location.html
Google Cloud: https://cloud.google.com/docs/authentication/application-default-credentials
AWS: https://docs.aws.amazon.com/sdkref/latest/guide/file-location.html
Azure: https://stackoverflow.com/questions/69010943/how-does-az-login-store-credential-information
""",
)

Expand All @@ -158,12 +200,14 @@ def explore(args):
"Docker runtime not found. Please install Docker and try again.",
)

cloud_cred = get_cloud_credentials(args)

run_explorer_app(
base_path=args.base_path,
container=args.container,
tag=args.tag,
port=args.port,
credentials=args.credentials,
credentials=cloud_cred,
)


Expand Down Expand Up @@ -257,6 +301,8 @@ def register_compile(parent_parser):
compiler_subparser = parser.add_subparsers()

local_parser = compiler_subparser.add_parser(name="local", help="Local compiler")
auth_group_local_parser = local_parser.add_mutually_exclusive_group()

kubeflow_parser = compiler_subparser.add_parser(
name="kubeflow",
help="Kubeflow compiler",
Expand Down Expand Up @@ -294,6 +340,27 @@ def register_compile(parent_parser):
default=[],
)

auth_group_local_parser.add_argument(
"--auth-gcp",
action="store_true",
help=f"Flag to authenticate with GCP. Uses the following mount command"
f" `{CloudCredentialsMount.GCP.value}`",
)

auth_group_local_parser.add_argument(
"--auth-azure",
action="store_true",
help="Flag to authenticate with Azure. Uses the following mount command"
f" `{CloudCredentialsMount.AZURE.value}`",
)

auth_group_local_parser.add_argument(
"--auth-aws",
action="store_true",
help="Flag to authenticate with AWS. Uses the following mount command"
f" `{CloudCredentialsMount.AWS.value}`",
)

# Kubeflow parser
kubeflow_parser.add_argument(
"ref",
Expand Down Expand Up @@ -330,11 +397,20 @@ def register_compile(parent_parser):


def compile_local(args):
extra_volumes = []
cloud_cred = get_cloud_credentials(args)

if args.extra_volumes:
extra_volumes.extend(args.extra_volumes)

if cloud_cred:
extra_volumes.append(cloud_cred)

pipeline = pipeline_from_module(args.ref)
compiler = DockerCompiler()
compiler.compile(
pipeline=pipeline,
extra_volumes=args.extra_volumes,
extra_volumes=extra_volumes,
output_path=args.output_path,
build_args=args.build_arg,
)
Expand Down Expand Up @@ -366,7 +442,7 @@ def register_run(parent_parser):
You can run `fondant <mode> --help` to find out more about the specific arguments for each mode.
Examples of running component:
fondant run local --extra-volumes $HOME/.aws/credentials:/root/.aws/credentials my_project.my_pipeline.py
fondant run local --auth-gcp
fondant run kubeflow ./my_compiled_kubeflow_pipeline.tgz
""",
),
Expand Down
130 changes: 129 additions & 1 deletion src/fondant/component_spec.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,138 @@
from referencing import Registry, Resource
from referencing.jsonschema import DRAFT4

from fondant.exceptions import InvalidComponentSpec
from fondant.exceptions import InvalidComponentSpec, InvalidSubsetMapping
from fondant.schema import Field, Type


class SubsetFieldMapper:
"""
A class for managing subset field mappings between a dataset and component.
Attributes:
subset_field_mappings (Dict[str, Dict[str, Any]]): A dictionary storing mappings between
fields of source dataset subsets and their target component subsets.
"""

def __init__(self):
"""Initialize the SubsetFieldMapper."""
self.subset_field_mappings: t.Dict[str, t.Dict[str, t.Any]] = {}

def add_mapping(
self,
dataset_subset: str,
component_subset: str,
field_mapping: t.Dict[str, t.Any],
) -> None:
"""
Add a new mapping between source and target subsets.
Args:
dataset_subset: The source dataset subset.
component_subset: The component target subset.
field_mapping: The field mapping.
"""
if dataset_subset in list(self.subset_mapping.keys()):
msg = (
f"Attempting to map dataset subset {dataset_subset} to component subset"
f" {component_subset}."
f" Mapping already exists for the dataset subset: {self.to_json()}."
f" \n One-to-one mapping violated."
)
raise InvalidSubsetMapping(msg)

if component_subset in list(self.subset_mapping.values()):
msg = (
f"Attempting to map dataset subset {dataset_subset} to component subset"
f" {component_subset}."
f" Mapping already exists for the component subset: {self.to_json()}."
f"\n One-to-one mapping violated."
)
raise InvalidSubsetMapping(msg)

if dataset_subset not in self.subset_field_mappings:
self.subset_field_mappings[dataset_subset] = {}

self.subset_field_mappings[dataset_subset][component_subset] = field_mapping
self.subset_mapping[dataset_subset] = component_subset

def remove_mapping(self, dataset_subset: str, component_subset: str) -> None:
"""
Remove the mapping between the specified source and target subsets.
Args:
dataset_subset (str): The source subset.
component_subset (str): The target subset.
"""
if (
dataset_subset not in self.subset_field_mappings
or component_subset not in self.subset_field_mappings[dataset_subset]
):
msg = f"Mapping between '{dataset_subset}' and '{component_subset}' does not exist."
raise InvalidSubsetMapping(msg)

del self.subset_field_mappings[dataset_subset]

def get_mapping(self, dataset_subset: str, component_subset: str) -> t.Any:
"""
Retrieve the mapping between the specified source and target subsets.
Args:
dataset_subset (str): The source subset.
component_subset (str): The target subset.
Returns:
Any: The corresponding field mapping if it exists, else None.
"""
if (
dataset_subset in self.subset_field_mappings
and component_subset in self.subset_field_mappings[dataset_subset]
):
return self.subset_field_mappings[dataset_subset][component_subset]

return None

def to_json(self) -> str:
"""
Convert the current state of the SubsetFieldMapper to a JSON string.
Returns:
str: A JSON string representing the current state of the object.
"""
return json.dumps(
{
"subset_field_mappings": self.subset_field_mappings,
"subset_mapping": self.subset_mapping,
},
)

@classmethod
def from_json(cls, json_string: str) -> "SubsetFieldMapper":
"""
Create a SubsetFieldMapper object from a JSON string.
Args:
json_string (str): The JSON string representing the object.
Returns:
SubsetFieldMapper: The SubsetFieldMapper object created from the JSON string.
"""
data = json.loads(json_string)
obj = cls()
obj.subset_field_mappings = data["subset_field_mappings"]
return obj

@property
def subset_mapping(self):
"""Subset mapping between the dataset and component."""
subset_mapping = {}
if self.subset_field_mappings:
for dataset_subset, component_subset in self.subset_field_mappings.items():
subset_mapping[dataset_subset] = list(component_subset.keys())[0]
return subset_mapping


@dataclass
class Argument:
"""
Expand Down
6 changes: 6 additions & 0 deletions src/fondant/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,9 @@ class InvalidPipelineDefinition(ValidationError, FondantException):

class InvalidTypeSchema(ValidationError, FondantException):
"""Thrown when a Type schema definition is invalid."""


class InvalidSubsetMapping(ValidationError, FondantException):
"""Thrown when attempting to map a given input/output subset to multiple output/input
subsets.
"""
Loading

0 comments on commit a2611ca

Please sign in to comment.