Skip to content

Commit

Permalink
fix(execute): handle workflow already terminated
Browse files Browse the repository at this point in the history
Signed-off-by: Cameron Smith <cameron.ray.smith@gmail.com>
  • Loading branch information
cameronraysmith committed Nov 25, 2023
1 parent b0ee5e9 commit 0a0bc59
Showing 1 changed file with 15 additions and 8 deletions.
23 changes: 15 additions & 8 deletions execution_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,11 @@
from datetime import timedelta
from typing import Any, List, Tuple

from flytekit import WorkflowExecutionPhase
from flytekit.exceptions.user import FlyteTimeout
from flytekit.exceptions.system import FlyteSystemException
from flytekit.remote import FlyteRemote
from flytekit.remote.executions import FlyteWorkflowExecution
from hydra.conf import HydraConf
from hydra_zen import store
from rich.console import Console
from rich.logging import RichHandler
from rich.theme import Theme
Expand Down Expand Up @@ -154,12 +154,19 @@ def wait_for_workflow_completion(
except queue.Empty:
response = "n"

if response in ["y", "yes"]:
remote.terminate(execution, "KeyboardInterrupt confirmed termination")
logger.info("Workflow execution terminated.")
synced_execution = remote.sync(execution)
if synced_execution.closure.phase in [WorkflowExecutionPhase.RUNNING]:
try:
if response in ["y", "yes"]:
remote.terminate(execution, "KeyboardInterrupt confirmed termination")
logger.info("Workflow execution terminated.")
else:
logger.warning(
f"\nExiting script without terminating workflow execution:\n\n{execution}\n"
)
except FlyteSystemException as e:
logger.error(f"Error while trying to terminate the execution: {e}")
else:
logger.warning(
f"\nExiting script without terminating workflow execution:\n\n{execution}\n"
)
logger.info(f"Workflow execution already in terminal state: {synced_execution.closure.phase}")

exit()

0 comments on commit 0a0bc59

Please sign in to comment.