From 01308238009d5183c675abe1008ea549f47c45f8 Mon Sep 17 00:00:00 2001 From: Michael Schuster Date: Mon, 21 Oct 2024 12:16:39 +0200 Subject: [PATCH] Remove lineage graph and legacy dashboard support (#3064) * Remove lineage graph * Remove support for legacy dashboard * Remove legacy dashboard download from script * Redirect to pro dashboard if configured * Remove legacy function arg * Fix return annotation --- .../getting-started/deploying-zenml/README.md | 9 - scripts/install-dashboard.sh | 9 +- src/zenml/cli/server.py | 11 - src/zenml/config/server_config.py | 5 - src/zenml/constants.py | 5 - src/zenml/lineage_graph/__init__.py | 34 -- src/zenml/lineage_graph/edge.py | 24 -- src/zenml/lineage_graph/lineage_graph.py | 244 ------------ src/zenml/lineage_graph/node/__init__.py | 32 -- src/zenml/lineage_graph/node/artifact_node.py | 51 --- src/zenml/lineage_graph/node/base_node.py | 31 -- src/zenml/lineage_graph/node/step_node.py | 41 -- src/zenml/models/v2/misc/server_models.py | 4 - src/zenml/utils/dashboard_utils.py | 42 +- .../deploy/docker/docker_zen_server.py | 6 - .../deploy/local/local_zen_server.py | 6 - .../zen_server/routers/runs_endpoints.py | 28 -- src/zenml/zen_server/zen_server_api.py | 37 +- src/zenml/zen_stores/base_zen_store.py | 2 - src/zenml/zen_stores/sql_zen_store.py | 2 +- .../functional/test_lineage_graph.py | 362 ------------------ 21 files changed, 35 insertions(+), 950 deletions(-) delete mode 100644 src/zenml/lineage_graph/__init__.py delete mode 100644 src/zenml/lineage_graph/edge.py delete mode 100644 src/zenml/lineage_graph/lineage_graph.py delete mode 100644 src/zenml/lineage_graph/node/__init__.py delete mode 100644 src/zenml/lineage_graph/node/artifact_node.py delete mode 100644 src/zenml/lineage_graph/node/base_node.py delete mode 100644 src/zenml/lineage_graph/node/step_node.py delete mode 100644 tests/integration/functional/test_lineage_graph.py diff --git a/docs/book/getting-started/deploying-zenml/README.md b/docs/book/getting-started/deploying-zenml/README.md index f018e0580fd..626ff7bfbaa 100644 --- a/docs/book/getting-started/deploying-zenml/README.md +++ b/docs/book/getting-started/deploying-zenml/README.md @@ -67,11 +67,6 @@ connects the client to the server. The diagram for this looks as follows: ![ZenML with a local ZenML OSS Server](../../.gitbook/assets/Scenario2.png) -{% hint style="warning" %} -Currently the ZenML server supports a legacy and a brand-new version of the dashboard. To use the legacy version simply use the -following command `zenml up --legacy` -{% endhint %} - In order to move into production, the ZenML server needs to be deployed somewhere centrally so that the different cloud stack components can read from and write to the server. Additionally, this also allows all your team members to connect to it and share stacks and pipelines. ![ZenML centrally deployed for multiple users](../../.gitbook/assets/Scenario3.2.png) @@ -91,10 +86,6 @@ Currently, there are two main options to access a deployed ZenML server: 1. **Managed deployment:** With [ZenML Pro](../zenml-pro/README.md) offering you can utilize a control plane to create ZenML servers, also known as [tenants](../zenml-pro/tenants.md). These tenants are managed and maintained by ZenML's dedicated team, alleviating the burden of server management from your end. Importantly, your data remains securely within your stack, and ZenML's role is primarily to handle tracking of metadata and server maintenance. 2. **Self-hosted Deployment:** Alternatively, you have the ability to deploy ZenML on your own self-hosted environment. This can be achieved through various methods, including using [Docker](./deploy-with-docker.md), [Helm](./deploy-with-helm.md), or [HuggingFace Spaces](./deploy-using-huggingface-spaces.md). We also offer our Pro version for self-hosted deployments, so you can use our full paid feature-set while staying fully in control with an air-gapped solution on your infrastructure. -{% hint style="warning" %} -Currently the ZenML server supports a legacy and a brand-new version of the dashboard. To use the legacy version which supports stack registration from the dashboard simply set the following environment variable in the deployment environment: `export ZEN_SERVER_USE_LEGACY_DASHBOARD=True`. -{% endhint %} - Both options offer distinct advantages, allowing you to choose the deployment approach that best aligns with your organization's needs and infrastructure preferences. Whichever path you select, ZenML facilitates a seamless and efficient way to take advantage of the ZenML Server and enhance your machine learning workflows for production-level success. ### Options for deploying ZenML diff --git a/scripts/install-dashboard.sh b/scripts/install-dashboard.sh index d47ad97b440..c57141b7095 100755 --- a/scripts/install-dashboard.sh +++ b/scripts/install-dashboard.sh @@ -5,7 +5,6 @@ REPO_URL="https://github.com/zenml-io/zenml-dashboard" : "${INSTALL_PATH:=./src/zenml/zen_server}" : "${INSTALL_DIR:=dashboard}" -: "${LEGACY_INSTALL_DIR:=dashboard_legacy}" : "${VERIFY_CHECKSUM:=true}" # : "${DESIRED_VERSION:=latest}" @@ -29,8 +28,8 @@ verifySupported() { # checkGitIgnore checks if the dashboard directories are ignored by Git checkGitIgnore() { if [ -f ".gitignore" ]; then - if grep -q -E "(^|\/)dashboard($|\/)" ".gitignore" || grep -q -E "(^|\/)src\/zenml\/zen_server\/dashboard($|\/)" ".gitignore" || grep -q -E "(^|\/)dashboard-legacy($|\/)" ".gitignore" || grep -q -E "(^|\/)src\/zenml\/zen_server\/dashboard-legacy($|\/)" ".gitignore"; then - echo "Error: The '/dashboard', '/dashboard-legacy', 'src/zenml/zen_server/dashboard-legacy' or 'src/zenml/zen_server/dashboard' directory is ignored by Git." + if grep -q -E "(^|\/)dashboard($|\/)" ".gitignore" || grep -q -E "(^|\/)src\/zenml\/zen_server\/dashboard($|\/)" ".gitignore"; then + echo "Error: The '/dashboard' or 'src/zenml/zen_server/dashboard' directory is ignored by Git." echo "Please remove the corresponding entries from the .gitignore file to proceed with the installation." exit 1 fi @@ -167,9 +166,5 @@ if [[ -n "$TAG" ]]; then downloadFile "zenml-dashboard.tar.gz" verifyFile "zenml-dashboard.tar.gz" installFile "$INSTALL_DIR" - - downloadFile "zenml-dashboard-legacy.tar.gz" - verifyFile "zenml-dashboard-legacy.tar.gz" - installFile "$LEGACY_INSTALL_DIR" fi cleanup diff --git a/src/zenml/cli/server.py b/src/zenml/cli/server.py index 3d38f77e20b..908eb695448 100644 --- a/src/zenml/cli/server.py +++ b/src/zenml/cli/server.py @@ -92,13 +92,6 @@ default=None, help="Specify an ngrok auth token to use for exposing the ZenML server.", ) -@click.option( - "--legacy", - is_flag=True, - help="Start the legacy ZenML dashboard instead of the new ZenML dashboard.", - default=False, - type=click.BOOL, -) def up( docker: bool = False, ip_address: Union[ @@ -109,7 +102,6 @@ def up( connect: bool = False, image: Optional[str] = None, ngrok_token: Optional[str] = None, - legacy: bool = False, ) -> None: """Start the ZenML dashboard locally and connect the client to it. @@ -125,8 +117,6 @@ def up( ngrok_token: An ngrok auth token to use for exposing the ZenML dashboard on a public domain. Primarily used for accessing the dashboard in Colab. - legacy: Start the legacy ZenML dashboard instead of the new ZenML - dashboard. """ from zenml.zen_server.deploy.deployer import ServerDeployer @@ -198,7 +188,6 @@ def up( ServerProviderType.DOCKER, ]: config_attrs["ip_address"] = ip_address - config_attrs["use_legacy_dashboard"] = legacy from zenml.zen_server.deploy.deployment import ServerDeploymentConfig diff --git a/src/zenml/config/server_config.py b/src/zenml/config/server_config.py index 9b8ae5ffc76..399c00f6cfe 100644 --- a/src/zenml/config/server_config.py +++ b/src/zenml/config/server_config.py @@ -41,7 +41,6 @@ DEFAULT_ZENML_SERVER_SECURE_HEADERS_XFO, DEFAULT_ZENML_SERVER_SECURE_HEADERS_XXP, DEFAULT_ZENML_SERVER_THREAD_POOL_SIZE, - DEFAULT_ZENML_SERVER_USE_LEGACY_DASHBOARD, ENV_ZENML_SERVER_PREFIX, ) from zenml.enums import AuthScheme @@ -218,9 +217,6 @@ class ServerConfiguration(BaseModel): one of the reserved values `disabled`, `no`, `none`, `false`, `off` or to an empty string, the `Permissions-Policy` header will not be included in responses. - use_legacy_dashboard: Whether to use the legacy dashboard. If set to - `True`, the dashboard will be used with the old UI. If set to - `False`, the new dashboard will be used. server_name: The name of the ZenML server. Used only during initial deployment. Can be changed later as a part of the server settings. display_announcements: Whether to display announcements about ZenML in @@ -313,7 +309,6 @@ class ServerConfiguration(BaseModel): default=DEFAULT_ZENML_SERVER_SECURE_HEADERS_PERMISSIONS, union_mode="left_to_right", ) - use_legacy_dashboard: bool = DEFAULT_ZENML_SERVER_USE_LEGACY_DASHBOARD server_name: str = DEFAULT_ZENML_SERVER_NAME display_announcements: bool = True diff --git a/src/zenml/constants.py b/src/zenml/constants.py index 2d5b6185335..158f1c2da5d 100644 --- a/src/zenml/constants.py +++ b/src/zenml/constants.py @@ -181,9 +181,6 @@ def handle_int_env_var(var: str, default: int = 0) -> int: ENV_ZENML_SERVER_REPORTABLE_RESOURCES = ( f"{ENV_ZENML_SERVER_PREFIX}REPORTABLE_RESOURCES" ) -ENV_ZENML_SERVER_USE_LEGACY_DASHBOARD = ( - f"{ENV_ZENML_SERVER_PREFIX}USE_LEGACY_DASHBOARD" -) ENV_ZENML_SERVER_AUTO_ACTIVATE = f"{ENV_ZENML_SERVER_PREFIX}AUTO_ACTIVATE" ENV_ZENML_RUN_SINGLE_STEPS_WITHOUT_STACK = ( "ZENML_RUN_SINGLE_STEPS_WITHOUT_STACK" @@ -316,7 +313,6 @@ def handle_int_env_var(var: str, default: int = 0) -> int: "payment=(), sync-xhr=(), usb=()" ) DEFAULT_ZENML_SERVER_SECURE_HEADERS_REPORT_TO = "default" -DEFAULT_ZENML_SERVER_USE_LEGACY_DASHBOARD = False DEFAULT_ZENML_SERVER_REPORT_USER_ACTIVITY_TO_DB_SECONDS = 30 DEFAULT_ZENML_SERVER_MAX_REQUEST_BODY_SIZE_IN_BYTES = 256 * 1024 * 1024 @@ -354,7 +350,6 @@ def handle_int_env_var(var: str, default: int = 0) -> int: EVENT_SOURCES = "/event-sources" FLAVORS = "/flavors" GET_OR_CREATE = "/get-or-create" -GRAPH = "/graph" HEALTH = "/health" INFO = "/info" LOGIN = "/login" diff --git a/src/zenml/lineage_graph/__init__.py b/src/zenml/lineage_graph/__init__.py deleted file mode 100644 index f14daac2976..00000000000 --- a/src/zenml/lineage_graph/__init__.py +++ /dev/null @@ -1,34 +0,0 @@ -# Copyright (c) ZenML GmbH 2022. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at: -# -# https://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express -# or implied. See the License for the specific language governing -# permissions and limitations under the License. -"""Initialization of lineage generation module.""" - -from zenml.lineage_graph.edge import Edge # noqa -from zenml.lineage_graph.lineage_graph import LineageGraph # noqa -from zenml.lineage_graph.node import ( # noqa - ArtifactNode, - ArtifactNodeDetails, - BaseNode, - StepNode, - StepNodeDetails, -) - -__all__ = [ - "BaseNode", - "ArtifactNode", - "StepNode", - "Edge", - "LineageGraph", - "StepNodeDetails", - "ArtifactNodeDetails", -] diff --git a/src/zenml/lineage_graph/edge.py b/src/zenml/lineage_graph/edge.py deleted file mode 100644 index 443e52927cb..00000000000 --- a/src/zenml/lineage_graph/edge.py +++ /dev/null @@ -1,24 +0,0 @@ -# Copyright (c) ZenML GmbH 2022. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at: -# -# https://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express -# or implied. See the License for the specific language governing -# permissions and limitations under the License. -"""Class for Edges in a lineage graph.""" - -from pydantic import BaseModel - - -class Edge(BaseModel): - """A class that represents an edge in a lineage graph.""" - - id: str - source: str - target: str diff --git a/src/zenml/lineage_graph/lineage_graph.py b/src/zenml/lineage_graph/lineage_graph.py deleted file mode 100644 index 9dba12d304b..00000000000 --- a/src/zenml/lineage_graph/lineage_graph.py +++ /dev/null @@ -1,244 +0,0 @@ -# Copyright (c) ZenML GmbH 2022. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at: -# -# https://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express -# or implied. See the License for the specific language governing -# permissions and limitations under the License. -"""Class for lineage graph generation.""" - -from typing import TYPE_CHECKING, List, Optional, Tuple, Union - -from pydantic import BaseModel - -from zenml.enums import ExecutionStatus -from zenml.lineage_graph.edge import Edge -from zenml.lineage_graph.node import ( - ArtifactNode, - ArtifactNodeDetails, - StepNode, - StepNodeDetails, -) -from zenml.lineage_graph.node.artifact_node import ArtifactNodeStatus - -if TYPE_CHECKING: - from zenml.models import ( - ArtifactVersionResponse, - PipelineRunResponse, - StepRunResponse, - ) - - -ARTIFACT_PREFIX = "artifact_" -STEP_PREFIX = "step_" - - -class LineageGraph(BaseModel): - """A lineage graph representation of a PipelineRunResponseModel.""" - - nodes: List[Union[StepNode, ArtifactNode]] = [] - edges: List[Edge] = [] - root_step_id: Optional[str] = None - run_metadata: List[Tuple[str, str, str]] = [] - - def generate_run_nodes_and_edges(self, run: "PipelineRunResponse") -> None: - """Initializes a lineage graph from a pipeline run. - - Args: - run: The PipelineRunResponseModel to generate the lineage graph for. - """ - self.run_metadata = [ - (m.key, str(m.value), str(m.type)) - for m in run.run_metadata.values() - ] - - for step in run.steps.values(): - self.generate_step_nodes_and_edges(step) - - self.add_external_artifacts(run) - self.add_direct_edges(run) - - def generate_step_nodes_and_edges(self, step: "StepRunResponse") -> None: - """Generates the nodes and edges for a step and its artifacts. - - Args: - step: The step to generate the nodes and edges for. - """ - step_id = STEP_PREFIX + str(step.id) - - # Set a root step if it doesn't exist yet - if self.root_step_id is None: - self.root_step_id = step_id - - # Add the step node - self.add_step_node(step, step_id) - - # Add nodes and edges for all output artifacts - for artifact_name, artifact_version in step.outputs.items(): - artifact_version_id = ARTIFACT_PREFIX + str(artifact_version.id) - if step.status == ExecutionStatus.CACHED: - artifact_status = ArtifactNodeStatus.CACHED - elif step.status == ExecutionStatus.COMPLETED: - artifact_status = ArtifactNodeStatus.CREATED - else: - artifact_status = ArtifactNodeStatus.UNKNOWN - self.add_artifact_node( - artifact=artifact_version, - id=artifact_version_id, - name=artifact_name, - step_id=str(step_id), - status=artifact_status, - ) - self.add_edge(step_id, artifact_version_id) - - # Add nodes and edges for all input artifacts - for artifact_name, artifact_version in step.inputs.items(): - artifact_version_id = ARTIFACT_PREFIX + str(artifact_version.id) - self.add_edge(artifact_version_id, step_id) - - def add_external_artifacts(self, run: "PipelineRunResponse") -> None: - """Adds all external artifacts to the lineage graph. - - Args: - run: The pipeline run to add external artifacts for. - """ - nodes_ids = {node.id for node in self.nodes} - for step in run.steps.values(): - for artifact_name, artifact_version in step.inputs.items(): - artifact_version_id = ARTIFACT_PREFIX + str( - artifact_version.id - ) - if artifact_version_id not in nodes_ids: - self.add_artifact_node( - artifact=artifact_version, - id=artifact_version_id, - name=artifact_name, - step_id=str(artifact_version.producer_step_run_id), - status=ArtifactNodeStatus.EXTERNAL, - ) - - def add_direct_edges(self, run: "PipelineRunResponse") -> None: - """Add all direct edges between nodes generated by `after=...`. - - Args: - run: The pipeline run to add direct edges for. - """ - for step in run.steps.values(): - step_id = STEP_PREFIX + str(step.id) - for parent_step_id_uuid in step.parent_step_ids: - parent_step_id = STEP_PREFIX + str(parent_step_id_uuid) - if not self.has_artifact_link(step_id, parent_step_id): - self.add_edge(parent_step_id, step_id) - - def has_artifact_link(self, step_id: str, parent_step_id: str) -> bool: - """Checks if a step has an artifact link to a parent step. - - This is the case for all parent steps that were not specified via - `after=...`. - - Args: - step_id: The node ID of the step to check. - parent_step_id: T node ID of the parent step to check. - - Returns: - True if the steps are linked via an artifact, False otherwise. - """ - parent_outputs, child_inputs = set(), set() - for edge in self.edges: - if edge.source == parent_step_id: - parent_outputs.add(edge.target) - if edge.target == step_id: - child_inputs.add(edge.source) - return bool(parent_outputs.intersection(child_inputs)) - - def add_step_node( - self, - step: "StepRunResponse", - id: str, - ) -> None: - """Adds a step node to the lineage graph. - - Args: - step: The step to add a node for. - id: The id of the step node. - """ - step_config = step.config.model_dump() - if step_config: - step_config = { - key: value - for key, value in step_config.items() - if key not in ["inputs", "outputs", "parameters"] and value - } - self.nodes.append( - StepNode( - id=id, - data=StepNodeDetails( - execution_id=str(step.id), - name=step.name, # redundant for consistency - status=step.status, - entrypoint_name=step.config.name, # redundant for consistency - parameters=step.config.parameters, - configuration=step_config, - inputs={k: v.uri for k, v in step.inputs.items()}, - outputs={k: v.uri for k, v in step.outputs.items()}, - metadata=[ - (m.key, str(m.value), str(m.type)) - for m in step.run_metadata.values() - ], - ), - ) - ) - - def add_artifact_node( - self, - artifact: "ArtifactVersionResponse", - id: str, - name: str, - step_id: str, - status: ArtifactNodeStatus, - ) -> None: - """Adds an artifact node to the lineage graph. - - Args: - artifact: The artifact to add a node for. - id: The id of the artifact node. - name: The input or output name of the artifact. - step_id: The id of the step that produced the artifact. - status: The status of the step that produced the artifact. - """ - node = ArtifactNode( - id=id, - data=ArtifactNodeDetails( - execution_id=str(artifact.id), - name=name, - status=status, - is_cached=status == ArtifactNodeStatus.CACHED, - artifact_type=artifact.type, - artifact_data_type=artifact.data_type.import_path, - parent_step_id=step_id, - producer_step_id=str(artifact.producer_step_run_id), - uri=artifact.uri, - metadata=[ - (m.key, str(m.value), str(m.type)) - for m in artifact.run_metadata.values() - ], - ), - ) - self.nodes.append(node) - - def add_edge(self, source: str, target: str) -> None: - """Adds an edge to the lineage graph. - - Args: - source: The source node id. - target: The target node id. - """ - self.edges.append( - Edge(id=source + "_" + target, source=source, target=target) - ) diff --git a/src/zenml/lineage_graph/node/__init__.py b/src/zenml/lineage_graph/node/__init__.py deleted file mode 100644 index 8db1f47d774..00000000000 --- a/src/zenml/lineage_graph/node/__init__.py +++ /dev/null @@ -1,32 +0,0 @@ -# Copyright (c) ZenML GmbH 2022. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at: -# -# https://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express -# or implied. See the License for the specific language governing -# permissions and limitations under the License. -"""Initialization of lineage nodes.""" - -from zenml.lineage_graph.node.artifact_node import ( # noqa - ArtifactNode, - ArtifactNodeDetails, -) -from zenml.lineage_graph.node.base_node import BaseNode # noqa -from zenml.lineage_graph.node.step_node import ( # noqa - StepNode, - StepNodeDetails, -) - -__all__ = [ - "BaseNode", - "ArtifactNode", - "StepNode", - "StepNodeDetails", - "ArtifactNodeDetails", -] diff --git a/src/zenml/lineage_graph/node/artifact_node.py b/src/zenml/lineage_graph/node/artifact_node.py deleted file mode 100644 index 2c59d4c3511..00000000000 --- a/src/zenml/lineage_graph/node/artifact_node.py +++ /dev/null @@ -1,51 +0,0 @@ -# Copyright (c) ZenML GmbH 2022. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at: -# -# https://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express -# or implied. See the License for the specific language governing -# permissions and limitations under the License. -"""Class for all lineage artifact nodes.""" - -from typing import List, Optional, Tuple - -from zenml.lineage_graph.node.base_node import ( - BaseNode, - BaseNodeDetails, -) -from zenml.utils.enum_utils import StrEnum - - -class ArtifactNodeStatus(StrEnum): - """Enum that represents the status of an artifact.""" - - CACHED = "cached" - CREATED = "created" - EXTERNAL = "external" - UNKNOWN = "unknown" - - -class ArtifactNodeDetails(BaseNodeDetails): - """Captures all artifact details for the node.""" - - status: ArtifactNodeStatus - is_cached: bool - artifact_type: str - artifact_data_type: str - parent_step_id: str - producer_step_id: Optional[str] - uri: str - metadata: List[Tuple[str, str, str]] # (key, value, type) - - -class ArtifactNode(BaseNode): - """A class that represents an artifact node in a lineage graph.""" - - type: str = "artifact" - data: ArtifactNodeDetails diff --git a/src/zenml/lineage_graph/node/base_node.py b/src/zenml/lineage_graph/node/base_node.py deleted file mode 100644 index 09768b66bee..00000000000 --- a/src/zenml/lineage_graph/node/base_node.py +++ /dev/null @@ -1,31 +0,0 @@ -# Copyright (c) ZenML GmbH 2022. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at: -# -# https://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express -# or implied. See the License for the specific language governing -# permissions and limitations under the License. -"""Base class for all lineage nodes.""" - -from pydantic import BaseModel - - -class BaseNodeDetails(BaseModel): - """Captures all details for the node.""" - - execution_id: str - name: str - - -class BaseNode(BaseModel): - """A class that represents a node in a lineage graph.""" - - id: str - type: str - data: BaseNodeDetails diff --git a/src/zenml/lineage_graph/node/step_node.py b/src/zenml/lineage_graph/node/step_node.py deleted file mode 100644 index ca3c1549523..00000000000 --- a/src/zenml/lineage_graph/node/step_node.py +++ /dev/null @@ -1,41 +0,0 @@ -# Copyright (c) ZenML GmbH 2022. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at: -# -# https://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express -# or implied. See the License for the specific language governing -# permissions and limitations under the License. -"""Class for all lineage step nodes.""" - -from typing import Any, Dict, List, Tuple - -from zenml.enums import ExecutionStatus -from zenml.lineage_graph.node.base_node import ( - BaseNode, - BaseNodeDetails, -) - - -class StepNodeDetails(BaseNodeDetails): - """Captures all artifact details for the node.""" - - status: ExecutionStatus - entrypoint_name: str - parameters: Dict[str, Any] - configuration: Dict[str, Any] - inputs: Dict[str, Any] - outputs: Dict[str, Any] - metadata: List[Tuple[str, str, str]] # (key, value, type) - - -class StepNode(BaseNode): - """A class that represents a step node in a lineage graph.""" - - type: str = "step" - data: StepNodeDetails diff --git a/src/zenml/models/v2/misc/server_models.py b/src/zenml/models/v2/misc/server_models.py index 662adc3506f..acb29c8f393 100644 --- a/src/zenml/models/v2/misc/server_models.py +++ b/src/zenml/models/v2/misc/server_models.py @@ -99,10 +99,6 @@ class ServerModel(BaseModel): {}, title="The metadata associated with the server.", ) - use_legacy_dashboard: bool = Field( - False, - title="Flag to indicate whether the server is using the legacy dashboard.", - ) last_user_activity: Optional[datetime] = Field( None, diff --git a/src/zenml/utils/dashboard_utils.py b/src/zenml/utils/dashboard_utils.py index 2967c11d067..12efa89b856 100644 --- a/src/zenml/utils/dashboard_utils.py +++ b/src/zenml/utils/dashboard_utils.py @@ -13,7 +13,7 @@ # permissions and limitations under the License. """Utility class to help with interacting with the dashboard.""" -from typing import Optional, Tuple +from typing import Optional from uuid import UUID from zenml import constants @@ -48,30 +48,25 @@ def get_cloud_dashboard_url() -> Optional[str]: return None -def get_server_dashboard_url() -> Tuple[Optional[str], bool]: +def get_server_dashboard_url() -> Optional[str]: """Get the base url of the dashboard deployed by the server. Returns: - The server dashboard url and whether the dashboard is the legacy - dashboard or not. + The server dashboard url. """ client = Client() if client.zen_store.type == StoreType.REST: server_info = client.zen_store.get_store_info() - if server_info.use_legacy_dashboard: - suffix = f"{constants.WORKSPACES}/{client.active_workspace.name}" - else: - suffix = "" if server_info.server_url: url = server_info.server_url else: url = client.zen_store.url - return url + suffix, server_info.use_legacy_dashboard + return url - return None, False + return None def get_stack_url(stack: StackResponse) -> Optional[str]: @@ -83,13 +78,10 @@ def get_stack_url(stack: StackResponse) -> Optional[str]: Returns: the URL to the stack if the dashboard is available, else None. """ - base_url, is_legacy_dashboard = get_server_dashboard_url() + base_url = get_server_dashboard_url() if base_url: - if is_legacy_dashboard: - return base_url + f"{constants.STACKS}/{stack.id}/configuration" - else: - return base_url + constants.STACKS + return base_url + constants.STACKS return None @@ -103,16 +95,10 @@ def get_component_url(component: ComponentResponse) -> Optional[str]: Returns: the URL to the component if the dashboard is available, else None. """ - base_url, is_legacy_dashboard = get_server_dashboard_url() + base_url = get_server_dashboard_url() if base_url: - if is_legacy_dashboard: - return ( - base_url - + f"{constants.STACK_COMPONENTS}/{component.type.value}/{component.id}/configuration" - ) - else: - return base_url + constants.STACKS + return base_url + constants.STACKS return None @@ -130,15 +116,9 @@ def get_run_url(run: PipelineRunResponse) -> Optional[str]: if cloud_url: return f"{cloud_url}{constants.RUNS}/{run.id}" - dashboard_url, is_legacy_dashboard = get_server_dashboard_url() + dashboard_url = get_server_dashboard_url() if dashboard_url: - if is_legacy_dashboard: - if run.pipeline: - return f"{dashboard_url}{constants.PIPELINES}/{run.pipeline.id}{constants.RUNS}/{run.id}/dag" - else: - return f"{dashboard_url}/all-runs/{run.id}/dag" - else: - return f"{dashboard_url}{constants.RUNS}/{run.id}" + return f"{dashboard_url}{constants.RUNS}/{run.id}" return None diff --git a/src/zenml/zen_server/deploy/docker/docker_zen_server.py b/src/zenml/zen_server/deploy/docker/docker_zen_server.py index 2aaef53df97..271916cebb1 100644 --- a/src/zenml/zen_server/deploy/docker/docker_zen_server.py +++ b/src/zenml/zen_server/deploy/docker/docker_zen_server.py @@ -22,14 +22,12 @@ from zenml.config.global_config import GlobalConfiguration from zenml.config.store_config import StoreConfiguration from zenml.constants import ( - DEFAULT_ZENML_SERVER_USE_LEGACY_DASHBOARD, ENV_ZENML_ANALYTICS_OPT_IN, ENV_ZENML_CONFIG_PATH, ENV_ZENML_DISABLE_DATABASE_MIGRATION, ENV_ZENML_LOCAL_STORES_PATH, ENV_ZENML_SERVER_AUTO_ACTIVATE, ENV_ZENML_SERVER_DEPLOYMENT_TYPE, - ENV_ZENML_SERVER_USE_LEGACY_DASHBOARD, LOCAL_STORES_DIRECTORY_NAME, ZEN_SERVER_ENTRYPOINT, ) @@ -70,7 +68,6 @@ class DockerServerDeploymentConfig(ServerDeploymentConfig): port: int = 8238 image: str = DOCKER_ZENML_SERVER_DEFAULT_IMAGE store: Optional[StoreConfiguration] = None - use_legacy_dashboard: bool = DEFAULT_ZENML_SERVER_USE_LEGACY_DASHBOARD model_config = ConfigDict(extra="forbid") @@ -174,9 +171,6 @@ def _get_container_cmd(self) -> Tuple[List[str], Dict[str, str]]: LOCAL_STORES_DIRECTORY_NAME, ) env[ENV_ZENML_DISABLE_DATABASE_MIGRATION] = "True" - env[ENV_ZENML_SERVER_USE_LEGACY_DASHBOARD] = str( - self.config.server.use_legacy_dashboard - ) env[ENV_ZENML_SERVER_AUTO_ACTIVATE] = "True" return cmd, env diff --git a/src/zenml/zen_server/deploy/local/local_zen_server.py b/src/zenml/zen_server/deploy/local/local_zen_server.py index 87e0cd28c93..70dc0d8a907 100644 --- a/src/zenml/zen_server/deploy/local/local_zen_server.py +++ b/src/zenml/zen_server/deploy/local/local_zen_server.py @@ -24,13 +24,11 @@ from zenml.config.store_config import StoreConfiguration from zenml.constants import ( DEFAULT_LOCAL_SERVICE_IP_ADDRESS, - DEFAULT_ZENML_SERVER_USE_LEGACY_DASHBOARD, ENV_ZENML_CONFIG_PATH, ENV_ZENML_DISABLE_DATABASE_MIGRATION, ENV_ZENML_LOCAL_STORES_PATH, ENV_ZENML_SERVER_AUTO_ACTIVATE, ENV_ZENML_SERVER_DEPLOYMENT_TYPE, - ENV_ZENML_SERVER_USE_LEGACY_DASHBOARD, ZEN_SERVER_ENTRYPOINT, ) from zenml.enums import StoreType @@ -68,7 +66,6 @@ class LocalServerDeploymentConfig(ServerDeploymentConfig): ) blocking: bool = False store: Optional[StoreConfiguration] = None - use_legacy_dashboard: bool = DEFAULT_ZENML_SERVER_USE_LEGACY_DASHBOARD model_config = ConfigDict(extra="forbid") @@ -160,9 +157,6 @@ def _get_daemon_cmd(self) -> Tuple[List[str], Dict[str, str]]: GlobalConfiguration().local_stores_path ) env[ENV_ZENML_DISABLE_DATABASE_MIGRATION] = "True" - env[ENV_ZENML_SERVER_USE_LEGACY_DASHBOARD] = str( - self.config.server.use_legacy_dashboard - ) env[ENV_ZENML_SERVER_AUTO_ACTIVATE] = "True" return cmd, env diff --git a/src/zenml/zen_server/routers/runs_endpoints.py b/src/zenml/zen_server/routers/runs_endpoints.py index 89c8934f13a..6a135443218 100644 --- a/src/zenml/zen_server/routers/runs_endpoints.py +++ b/src/zenml/zen_server/routers/runs_endpoints.py @@ -20,7 +20,6 @@ from zenml.constants import ( API, - GRAPH, PIPELINE_CONFIGURATION, REFRESH, RUNS, @@ -29,7 +28,6 @@ VERSION_1, ) from zenml.enums import ExecutionStatus, StackComponentType -from zenml.lineage_graph.lineage_graph import LineageGraph from zenml.logger import get_logger from zenml.models import ( Page, @@ -206,32 +204,6 @@ def delete_run( ) -@router.get( - "/{run_id}" + GRAPH, - response_model=LineageGraph, - responses={401: error_response, 404: error_response, 422: error_response}, -) -@handle_exceptions -def get_run_dag( - run_id: UUID, - _: AuthContext = Security(authorize), -) -> LineageGraph: - """Get the DAG for a given pipeline run. - - Args: - run_id: ID of the pipeline run to use to get the DAG. - - Returns: - The DAG for a given pipeline run. - """ - run = verify_permissions_and_get_entity( - id=run_id, get_method=zen_store().get_run, hydrate=True - ) - graph = LineageGraph() - graph.generate_run_nodes_and_edges(run) - return graph - - @router.get( "/{run_id}" + STEPS, response_model=Page[StepRunResponse], diff --git a/src/zenml/zen_server/zen_server_api.py b/src/zenml/zen_server/zen_server_api.py index 488d536f25b..12c7cf63c8d 100644 --- a/src/zenml/zen_server/zen_server_api.py +++ b/src/zenml/zen_server/zen_server_api.py @@ -37,7 +37,12 @@ RequestResponseEndpoint, ) from starlette.middleware.cors import CORSMiddleware -from starlette.responses import FileResponse, JSONResponse, Response +from starlette.responses import ( + FileResponse, + JSONResponse, + RedirectResponse, + Response, +) from starlette.types import ASGIApp import zenml @@ -48,6 +53,7 @@ HEALTH, ) from zenml.enums import AuthScheme, SourceContextTypes +from zenml.models import ServerDeploymentType from zenml.zen_server.exceptions import error_detail from zenml.zen_server.routers import ( actions_endpoints, @@ -98,10 +104,7 @@ zen_store, ) -if server_config().use_legacy_dashboard: - DASHBOARD_DIRECTORY = "dashboard_legacy" -else: - DASHBOARD_DIRECTORY = "dashboard" +DASHBOARD_DIRECTORY = "dashboard" def relative_path(rel: str) -> str: @@ -346,17 +349,14 @@ def initialize() -> None: initialize_secure_headers() -if server_config().use_legacy_dashboard: - app.mount( - "/static", - StaticFiles( - directory=relative_path( - os.path.join(DASHBOARD_DIRECTORY, "static") - ), - check_dir=False, - ), - ) -else: +DASHBOARD_REDIRECT_URL = None +if ( + server_config().dashboard_url + and server_config().deployment_type == ServerDeploymentType.CLOUD +): + DASHBOARD_REDIRECT_URL = server_config().dashboard_url + +if not DASHBOARD_REDIRECT_URL: app.mount( "/assets", StaticFiles( @@ -396,6 +396,9 @@ async def dashboard(request: Request) -> Any: Raises: HTTPException: If the dashboard files are not included. """ + if DASHBOARD_REDIRECT_URL: + return RedirectResponse(url=DASHBOARD_REDIRECT_URL) + if not os.path.isfile( os.path.join(relative_path(DASHBOARD_DIRECTORY), "index.html") ): @@ -504,6 +507,8 @@ async def catch_all(request: Request, file_path: str) -> Any: Returns: The ZenML dashboard. """ + if DASHBOARD_REDIRECT_URL: + return RedirectResponse(url=DASHBOARD_REDIRECT_URL) # some static files need to be served directly from the root dashboard # directory if file_path and file_path in root_static_files: diff --git a/src/zenml/zen_stores/base_zen_store.py b/src/zenml/zen_stores/base_zen_store.py index ec520c54169..8617ad86391 100644 --- a/src/zenml/zen_stores/base_zen_store.py +++ b/src/zenml/zen_stores/base_zen_store.py @@ -426,7 +426,6 @@ def get_store_info(self) -> ServerModel: secrets_store_type = SecretsStoreType.NONE if isinstance(self, SqlZenStore) and self.config.secrets_store: secrets_store_type = self.config.secrets_store.type - use_legacy_dashboard = server_config.use_legacy_dashboard return ServerModel( id=GlobalConfiguration().user_id, active=True, @@ -440,7 +439,6 @@ def get_store_info(self) -> ServerModel: dashboard_url=server_config.dashboard_url or "", analytics_enabled=GlobalConfiguration().analytics_opt_in, metadata=metadata, - use_legacy_dashboard=use_legacy_dashboard, ) def is_local_store(self) -> bool: diff --git a/src/zenml/zen_stores/sql_zen_store.py b/src/zenml/zen_stores/sql_zen_store.py index 3f95ec278a3..dddb24ea5bc 100644 --- a/src/zenml/zen_stores/sql_zen_store.py +++ b/src/zenml/zen_stores/sql_zen_store.py @@ -9205,7 +9205,7 @@ def _create_default_user_on_db_init(self) -> bool: # Running inside server with external auth return False - if config.auto_activate or config.use_legacy_dashboard: + if config.auto_activate: return True else: diff --git a/tests/integration/functional/test_lineage_graph.py b/tests/integration/functional/test_lineage_graph.py deleted file mode 100644 index 34ae11e7de6..00000000000 --- a/tests/integration/functional/test_lineage_graph.py +++ /dev/null @@ -1,362 +0,0 @@ -# Copyright (c) ZenML GmbH 2023. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at: -# -# https://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express -# or implied. See the License for the specific language governing -# permissions and limitations under the License. -"""Tests for the lineage graph.""" - -from typing import TYPE_CHECKING -from uuid import UUID - -from typing_extensions import Annotated - -from tests.integration.functional.zen_stores.utils import ( - constant_int_output_test_step, - int_plus_one_test_step, -) -from zenml import load_artifact, pipeline, save_artifact, step -from zenml.artifacts.external_artifact import ExternalArtifact -from zenml.enums import MetadataResourceTypes -from zenml.lineage_graph.lineage_graph import ( - ARTIFACT_PREFIX, - STEP_PREFIX, - LineageGraph, -) -from zenml.metadata.metadata_types import MetadataTypeEnum, Uri -from zenml.models import PipelineRunResponse - -if TYPE_CHECKING: - from zenml.client import Client - - -def test_generate_run_nodes_and_edges( - clean_client: "Client", connected_two_step_pipeline -): - """Tests that the created lineage graph has the right nodes and edges. - - We also write some mock metadata for both pipeline runs and steps runs here - to test that they are correctly added to the lineage graph. - """ - active_stack_model = clean_client.active_stack_model - orchestrator_id = active_stack_model.components["orchestrator"][0].id - - # Create and retrieve a pipeline run - pipeline_instance = connected_two_step_pipeline( - step_1=constant_int_output_test_step, - step_2=int_plus_one_test_step, - ) - pipeline_instance() - pipeline_run = clean_client.get_pipeline( - "connected_two_step_pipeline" - ).runs[0] - - # Write some metadata for the pipeline run - clean_client.create_run_metadata( - metadata={"orchestrator_url": Uri("https://www.ariaflow.org")}, - resource_id=pipeline_run.id, - resource_type=MetadataResourceTypes.PIPELINE_RUN, - stack_component_id=orchestrator_id, - ) - - # Write some metadata for all steps - steps = pipeline_run.steps - for step_ in steps.values(): - clean_client.create_run_metadata( - metadata={ - "experiment_tracker_url": Uri("https://www.aria_and_blupus.ai") - }, - resource_id=step_.id, - resource_type=MetadataResourceTypes.STEP_RUN, - stack_component_id=orchestrator_id, # just link something - ) - - # Write some metadata for all artifacts - for output_artifact in step_.outputs.values(): - clean_client.create_run_metadata( - metadata={"aria_loves_alex": True}, - resource_id=output_artifact.id, - resource_type=MetadataResourceTypes.ARTIFACT_VERSION, - ) - - # Get the run again so all the metadata is loaded - pipeline_run = clean_client.get_pipeline( - "connected_two_step_pipeline" - ).runs[0] - - # Generate a lineage graph for the pipeline run - graph = LineageGraph() - graph.generate_run_nodes_and_edges(pipeline_run) - - # Check that the graph has the right attributes - # 2 steps + 2 artifacts - assert len(graph.nodes) == 4 - # 3 edges: step_1 -> artifact_1 -> step_2 -> artifact_2 - assert len(graph.edges) == 3 - - # Check that the graph makes sense - _validate_graph(graph, pipeline_run) - - # Check that the run, all steps, and all artifacts have metadata - # Here we only check the last element in case we run this test with stack - # components that add their own metadata in the future - assert len(graph.run_metadata) > 0 - assert graph.run_metadata[-1] == ( - "orchestrator_url", - "https://www.ariaflow.org", - MetadataTypeEnum.URI, - ) - for node in graph.nodes: - node_metadata = node.data.metadata - assert len(node_metadata) > 0 - if node.type == "step": - assert node_metadata[-1] == ( - "experiment_tracker_url", - "https://www.aria_and_blupus.ai", - MetadataTypeEnum.URI, - ) - elif node.type == "artifact": - assert node_metadata[-1] == ( - "aria_loves_alex", - "True", - MetadataTypeEnum.BOOL, - ) - - -@step -def int_step(a: int = 1) -> int: - return a - - -@step -def str_step(b: str = "a") -> str: - return b - - -@pipeline -def pipeline_with_direct_edge(): - int_step() - str_step(after=["int_step"]) - - -def test_add_direct_edges(clean_client: "Client"): - """Test that direct `.after(...)` edges are added to the lineage graph.""" - - # Create and retrieve a pipeline run - pipeline_with_direct_edge() - run_ = pipeline_with_direct_edge.model.last_run - - # Generate a lineage graph for the pipeline run - graph = LineageGraph() - graph.generate_run_nodes_and_edges(run_) - - # Check that the graph has the right attributes - # 2 steps + 2 artifacts - assert len(graph.nodes) == 4 - # 3 edges: int_step -> a; int_step -> str_step; str_step -> b - assert len(graph.edges) == 3 - - # Check that the graph generally makes sense - _validate_graph(graph, run_) - - # Check that the direct edge is added - node_ids = [node.id for node in graph.nodes] - direct_edge_exists = False - for edge in graph.edges: - if edge.source in node_ids and edge.target in node_ids: - direct_edge_exists = True - assert direct_edge_exists - - -@pipeline -def first_pipeline(): - int_step() - - -@step -def external_artifact_loader_step(a: int) -> int: - c = a - return c - - -@pipeline -def second_pipeline(artifact_version_id: UUID): - external_artifact_loader_step(a=ExternalArtifact(value=1)) - - -def test_add_external_artifacts(clean_client: "Client"): - """Test that external artifacts are added to the lineage graph.""" - - # Create and retrieve a pipeline run - first_pipeline() - second_pipeline(first_pipeline.model.last_run.steps["int_step"].output.id) - run_ = second_pipeline.model.last_run - - # Generate a lineage graph for the pipeline run - graph = LineageGraph() - graph.generate_run_nodes_and_edges(run_) - - # Check that the graph has the right attributes - # 1 step, 1 artifact, 1 external artifact - assert len(graph.nodes) == 3 - # 2 edges: a -> external_artifact_loader_step -> c - assert len(graph.edges) == 2 - - # Check that the graph generally makes sense - _validate_graph(graph, run_) - - # Check that the external artifact is a node in the graph - artifact_version_ids_of_run = { - artifact_version.id for artifact_version in run_.artifact_versions - } - external_artifact_node_id = None - for node in graph.nodes: - if node.type == "artifact": - if node.data.execution_id not in artifact_version_ids_of_run: - external_artifact_node_id = node.id - assert external_artifact_node_id - - # Check that the external artifact is connected to the step - step_nodes = [node for node in graph.nodes if node.type == "step"] - assert len(step_nodes) == 1 - step_node = step_nodes[0] - external_artifact_is_input_of_step = False - for edge in graph.edges: - if ( - edge.source == external_artifact_node_id - and edge.target == step_node.id - ): - external_artifact_is_input_of_step = True - assert external_artifact_is_input_of_step - - -@step -def manual_artifact_saving_step() -> Annotated[int, "output"]: - """A step that logs an artifact.""" - save_artifact(1, name="saved_unconsumed") - save_artifact(2, name="saved_consumed") - return 3 - - -@step -def manual_artifact_loading_step(input: int) -> None: - """A step that loads an artifact.""" - load_artifact("saved_consumed") - load_artifact("saved_before") - - -@pipeline -def saving_loading_pipeline(): - output = manual_artifact_saving_step() - manual_artifact_loading_step(input=output) - - -def test_manual_save_load_artifact(clean_client): - """Test that manually saved and loaded artifacts are added to the graph.""" - - # Save an artifact before the pipeline run - save_artifact(4, name="saved_before") - - # Create and retrieve a pipeline run - saving_loading_pipeline() - run_ = saving_loading_pipeline.model.last_run - - # Generate a lineage graph for the pipeline run - graph = LineageGraph() - graph.generate_run_nodes_and_edges(run_) - - # Check that the graph has the right attributes - # 6 = 2 steps + 4 artifacts (3 from save step, 1 additional from load step) - assert len(graph.nodes) == 6 - # 12 edges (3 per step) - assert len(graph.edges) == 6 - - # Check that the graph generally makes sense - _validate_graph(graph, run_) - - # Check that "saved_unconsumed", "saved_consumed", and "saved_before" are - # nodes in the graph - saved_unconsumed_node_id = None - saved_consumed_node_id = None - saved_before_node_id = None - for node in graph.nodes: - if node.type == "artifact": - if node.data.name == "saved_unconsumed": - saved_unconsumed_node_id = node.id - elif node.data.name == "saved_consumed": - saved_consumed_node_id = node.id - elif node.data.name == "saved_before": - saved_before_node_id = node.id - assert saved_unconsumed_node_id - assert saved_consumed_node_id - assert saved_before_node_id - - # Check that "saved_unconsumed" and "saved_consumed" are outputs of step 1 - step_nodes = [node for node in graph.nodes if node.type == "step"] - saved_unconsumed_is_output = False - saved_consumed_is_output = False - for edge in graph.edges: - if edge.source == step_nodes[0].id: - if edge.target == saved_unconsumed_node_id: - saved_unconsumed_is_output = True - elif edge.target == saved_consumed_node_id: - saved_consumed_is_output = True - assert saved_unconsumed_is_output - assert saved_consumed_is_output - - # Check that "saved_consumed" and "saved_before" are inputs of step 2 - saved_consumed_is_input = False - saved_before_is_input = False - for edge in graph.edges: - if edge.target == step_nodes[1].id: - if edge.source == saved_consumed_node_id: - saved_consumed_is_input = True - elif edge.source == saved_before_node_id: - saved_before_is_input = True - assert saved_consumed_is_input - assert saved_before_is_input - - -def _validate_graph( - graph: LineageGraph, pipeline_run: PipelineRunResponse -) -> None: - """Validates that the generated lineage graph matches the pipeline run. - - Args: - graph: The generated lineage graph. - pipeline_run: The pipeline run used to generate the lineage graph. - """ - edge_id_to_model_mapping = {edge.id: edge for edge in graph.edges} - node_id_to_model_mapping = {node.id: node for node in graph.nodes} - for step_ in pipeline_run.steps.values(): - step_id = STEP_PREFIX + str(step_.id) - - # Check that each step has a corresponding node - assert step_id in node_id_to_model_mapping - - # Check that each step node is connected to all of its output nodes - for output_artifact in step_.outputs.values(): - artifact_version_id = ARTIFACT_PREFIX + str(output_artifact.id) - assert artifact_version_id in node_id_to_model_mapping - edge_id = step_id + "_" + artifact_version_id - assert edge_id in edge_id_to_model_mapping - edge = edge_id_to_model_mapping[edge_id] - assert edge.source == step_id - assert edge.target == artifact_version_id - - # Check that each step node is connected to all of its input nodes - for input_artifact in step_.inputs.values(): - artifact_version_id = ARTIFACT_PREFIX + str(input_artifact.id) - assert artifact_version_id in node_id_to_model_mapping - edge_id = artifact_version_id + "_" + step_id - assert edge_id in edge_id_to_model_mapping - edge = edge_id_to_model_mapping[edge_id] - assert edge.source == artifact_version_id - assert edge.target == step_id