diff --git a/execution_utils.py b/execution_utils.py index 66b45fd..0ea36d7 100644 --- a/execution_utils.py +++ b/execution_utils.py @@ -8,11 +8,11 @@ from datetime import timedelta from typing import Any, List, Tuple +from flytekit import WorkflowExecutionPhase from flytekit.exceptions.user import FlyteTimeout +from flytekit.exceptions.system import FlyteSystemException from flytekit.remote import FlyteRemote from flytekit.remote.executions import FlyteWorkflowExecution -from hydra.conf import HydraConf -from hydra_zen import store from rich.console import Console from rich.logging import RichHandler from rich.theme import Theme @@ -154,12 +154,19 @@ def wait_for_workflow_completion( except queue.Empty: response = "n" - if response in ["y", "yes"]: - remote.terminate(execution, "KeyboardInterrupt confirmed termination") - logger.info("Workflow execution terminated.") + synced_execution = remote.sync(execution) + if synced_execution.closure.phase in [WorkflowExecutionPhase.RUNNING]: + try: + if response in ["y", "yes"]: + remote.terminate(execution, "KeyboardInterrupt confirmed termination") + logger.info("Workflow execution terminated.") + else: + logger.warning( + f"\nExiting script without terminating workflow execution:\n\n{execution}\n" + ) + except FlyteSystemException as e: + logger.error(f"Error while trying to terminate the execution: {e}") else: - logger.warning( - f"\nExiting script without terminating workflow execution:\n\n{execution}\n" - ) + logger.info(f"Workflow execution already in terminal state: {synced_execution.closure.phase}") exit()