Skip to content

Commit

Permalink
Fixes for task-execution
Browse files Browse the repository at this point in the history
We were previously doing execution incorrectly -- this adds additional
logging + sets up the correct executor to run for tasks.
  • Loading branch information
elijahbenizzy committed Aug 17, 2023
1 parent 27a60d6 commit 3ad3dec
Showing 1 changed file with 11 additions and 4 deletions.
15 changes: 11 additions & 4 deletions hamilton/execution/executors.py
Original file line number Diff line number Diff line change
Expand Up @@ -305,9 +305,9 @@ def get_executor_for_task(self, task: TaskImplementation) -> TaskExecutor:
:return: A local task if this is a "single-node" task, a remote task otherwise
"""

if task.purpose == NodeGroupPurpose.EXECUTE_SINGLE:
return self.local_executor
return self.remote_executor
if task.purpose == NodeGroupPurpose.EXECUTE_BLOCK:
return self.remote_executor
return self.local_executor


def run_graph_to_completion(
Expand All @@ -331,7 +331,14 @@ def run_graph_to_completion(
if next_task is not None:
task_executor = execution_manager.get_executor_for_task(next_task)
if task_executor.can_submit_task():
submitted = task_executor.submit_task(next_task)
try:
submitted = task_executor.submit_task(next_task)
except Exception as e:
logger.exception(
f"Exception submitting task {next_task.task_id}, with nodes: "
f"{[item.name for item in next_task.nodes]}"
)
raise e
task_futures[next_task.task_id] = submitted
else:
# Whoops, back on the queue
Expand Down

0 comments on commit 3ad3dec

Please sign in to comment.