Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow nested folder structure in pipeline creation #3106

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 82 additions & 7 deletions kedro/framework/cli/pipeline.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""A collection of CLI commands for working with Kedro pipelines."""
from __future__ import annotations

import os
import re
import shutil
from pathlib import Path
Expand Down Expand Up @@ -59,9 +60,11 @@ def _assert_pkg_name_ok(pkg_name: str):
if len(pkg_name) < 2: # noqa: PLR2004
message = base_message + " It must be at least 2 characters long."
raise KedroCliError(message)
if not re.match(r"^\w+$", pkg_name[1:]):
if not re.match(r"^\w(\w+\.)*\w+$", pkg_name):
message = (
base_message + " It must contain only letters, digits, and/or underscores."
base_message
+ " It must contain only letters, digits, and/or underscores."
+ " Folders should be separated by '.'"
)
raise KedroCliError(message)

Expand All @@ -72,6 +75,45 @@ def _check_pipeline_name(ctx, param, value): # noqa: unused-argument
return value


def _split_on_last_dot(input_string: str) -> tuple[str, str]:
"""Split an input string based on the last occurrence of a dot.

Args:
input_string (str): The input string to be split.

Returns:
tuple: A tuple containing the portion before the last dot (if present) and
the portion after it. If there is no dot, the entire input string
is included in the second element of the tuple.

Example:
- 'path.to.pipeline' => ('path.to', 'pipeline')
- 'mypipeline' => ('', 'mypipeline')

"""
parts = input_string.rsplit(".", 1)
expected_nb_parts = 2
if len(parts) == expected_nb_parts:
return tuple(parts) # type: ignore
else:
return "", input_string


def _transform_dotted_string_to_path(dotted_string: str) -> Path:
"""
Transform a dotted string into a pathlib Path with OS-independent separator.

Args:
dotted_string (str): The input dotted string.

Returns:
Path: A pathlib Path object representing the path with OS-independent separator.
"""
# Replace dots with os-specific path separator and return as a string
path_str = os.path.join(*dotted_string.split("."))
return Path(path_str)


# noqa: missing-function-docstring
@click.group(name="Kedro")
def pipeline_cli(): # pragma: no cover
Expand Down Expand Up @@ -126,8 +168,25 @@ def create_pipeline(

click.secho(f"Using pipeline template at: '{template_path}'")

result_path = _create_pipeline(name, template_path, package_dir / "pipelines")
_copy_pipeline_tests(name, result_path, package_dir)
# handle dots in name
parent, name = _split_on_last_dot(name)

# ensure pipeline name is globally unique
for file in package_dir.glob(f"**/{name}/__init__.py"):
raise KedroCliError(f"Pipeline {name} already exists: ({file})")

# add necessary subfolders
pipeline_dir = package_dir / "pipelines" / _transform_dotted_string_to_path(parent)
tests_target = (
package_dir.parent
/ "tests"
/ "pipelines"
/ _transform_dotted_string_to_path(parent)
/ name
)

result_path = _create_pipeline(name, template_path, pipeline_dir)
_copy_pipeline_tests(result_path, tests_target)
_copy_pipeline_configs(result_path, project_conf_path, skip_config, env=env)
click.secho(f"\nPipeline '{name}' was successfully created.\n", fg="green")

Expand Down Expand Up @@ -158,12 +217,17 @@ def delete_pipeline(

pipeline_artifacts = _get_pipeline_artifacts(metadata, pipeline_name=name, env=env)

_, conf_name = _split_on_last_dot(name)

files_to_delete = [
pipeline_artifacts.pipeline_conf / filepath
for confdir in ("parameters", "catalog")
# Since we remove nesting in 'parameters' and 'catalog' folders,
# we want to also del the old project's structure for backward compatibility
for filepath in (Path(f"{confdir}_{name}.yml"), Path(confdir) / f"{name}.yml")
for filepath in (
Path(f"{confdir}_{conf_name}.yml"),
Path(confdir) / f"{conf_name}.yml",
)
if (pipeline_artifacts.pipeline_conf / filepath).is_file()
]

Expand All @@ -173,6 +237,16 @@ def delete_pipeline(
if path.is_dir()
]

for dir_to_delete in dirs_to_delete:
for subdir in dir_to_delete.glob("**/*/"):
for py_file in subdir.glob("**/*.py"):
raise KedroCliError(
f"Cannot delete the pipeline '{dir_to_delete}'"
" because it contains a child pipeline."
f" Please delete the child pipeline in '{py_file.parent}'"
" before deleting this one."
)

if not files_to_delete and not dirs_to_delete:
raise KedroCliError(f"Pipeline '{name}' not found.")

Expand Down Expand Up @@ -225,6 +299,8 @@ def _create_pipeline(name: str, template_path: Path, output_dir: Path) -> Path:
output_dir=str(output_dir),
no_input=True,
extra_context=cookie_context,
# allows creation of pipelines in parent folder of existing pipeline
overwrite_if_exists=True,
)
except Exception as exc:
click.secho("FAILED", fg="red")
Expand Down Expand Up @@ -317,9 +393,8 @@ def _get_artifacts_to_package(
return artifacts


def _copy_pipeline_tests(pipeline_name: str, result_path: Path, package_dir: Path):
def _copy_pipeline_tests(result_path: Path, tests_target: Path):
tests_source = result_path / "tests"
tests_target = package_dir.parent / "tests" / "pipelines" / pipeline_name
try:
_sync_dirs(tests_source, tests_target)
finally:
Expand Down
18 changes: 16 additions & 2 deletions kedro/framework/project/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -344,15 +344,25 @@ def find_pipelines() -> dict[str, Pipeline]: # noqa: PLR0912
if str(exc) == f"No module named '{PACKAGE_NAME}.pipelines'":
return pipelines_dict

for pipeline_dir in pipelines_package.iterdir():
for pipeline_dir in pipelines_package.glob("**/"):
if pipeline_dir == Path(pipelines_package):
continue
if not pipeline_dir.is_dir():
continue
# only check the dir if it contains py files
if not any(pipeline_dir.glob("*.py")):
continue

pipeline_name = pipeline_dir.name
if pipeline_name == "__pycache__":
continue

pipeline_module_name = f"{PACKAGE_NAME}.pipelines.{pipeline_name}"
pipeline_relative_path = pipeline_dir.relative_to(pipelines_package)
full_pipeline_name = os.path.normpath(pipeline_relative_path).replace(
os.path.sep, "."
)

pipeline_module_name = f"{PACKAGE_NAME}.pipelines.{full_pipeline_name}"
try:
pipeline_module = importlib.import_module(pipeline_module_name)
except: # noqa: bare-except # noqa: E722
Expand All @@ -365,5 +375,9 @@ def find_pipelines() -> dict[str, Pipeline]: # noqa: PLR0912

pipeline_obj = _create_pipeline(pipeline_module)
if pipeline_obj is not None:
if pipeline_name in pipelines_dict:
raise ValueError(
f"find_pipelines found two pipelines with the name {pipeline_name}"
)
pipelines_dict[pipeline_name] = pipeline_obj
return pipelines_dict
Loading