Skip to content

Commit

Permalink
final fix?
Browse files Browse the repository at this point in the history
  • Loading branch information
sanderegg committed Sep 20, 2023
1 parent 19aa9ea commit bfa12f0
Show file tree
Hide file tree
Showing 6 changed files with 188 additions and 73 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,10 @@ class ComputationalRunNotFoundError(PydanticErrorMixin, DirectorException):
msg_template = "Computational run not found"


class ComputationalTaskNotFoundError(PydanticErrorMixin, DirectorException):
msg_template = "Computational task {node_id} not found"


class NodeRightsAcquireError(PydanticErrorMixin, DirectorException):
msg_template = "Could not acquire a lock for {docker_node_id} since all {slots} slots are used."

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
from ...constants import UNDEFINED_STR_METADATA
from ...core.errors import (
ComputationalBackendNotConnectedError,
ComputationalBackendOnDemandClustersKeeperNotReadyError,
ComputationalBackendOnDemandNotReadyError,
ComputationalSchedulerChangedError,
InvalidPipelineError,
Expand Down Expand Up @@ -430,6 +431,7 @@ async def _process_started_tasks(
[t.node_id],
t.state,
optional_started=utc_now,
optional_progress=t.progress,
)
for t in tasks
)
Expand Down Expand Up @@ -478,6 +480,11 @@ async def _update_states_from_comp_backend(

# now process the tasks
if tasks_started:
# NOTE: the dask-scheduler cannot differentiate between tasks that are effectively computing and
# tasks that are only queued and accepted by a dask-worker.
# tasks_started should therefore be mostly empty but for cases where
# - dask Pub/Sub mechanism failed, the tasks goes from PENDING -> SUCCESS/FAILED/ABORTED without STARTED
# - the task finished so fast that the STARTED state was skipped between 2 runs of the dv-2 comp scheduler
await self._process_started_tasks(
tasks_started,
user_id=user_id,
Expand Down Expand Up @@ -744,6 +751,28 @@ async def _schedule_tasks_to_start(
)
for task in comp_tasks.values():
task.state = RunningState.WAITING_FOR_CLUSTER
except ComputationalBackendOnDemandClustersKeeperNotReadyError:
_logger.exception("Unexpected error while starting tasks:")
await publish_project_log(
self.rabbitmq_client,
user_id,
project_id,
log="Unexpected error while scheduling computational tasks! TIP: contact osparc support.",
log_level=logging.ERROR,
)

await CompTasksRepository.instance(
self.db_engine
).update_project_tasks_state(
project_id,
list(tasks_ready_to_start.keys()),
RunningState.FAILED,
optional_progress=1.0,
optional_stopped=arrow.utcnow().datetime,
)
for task in comp_tasks.values():
task.state = RunningState.FAILED

return comp_tasks

def _wake_up_scheduler_now(self) -> None:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@
from models_library.rabbitmq_messages import SimcorePlatformStatus
from models_library.users import UserID
from servicelib.common_headers import UNDEFINED_DEFAULT_SIMCORE_USER_AGENT_VALUE
from servicelib.logging_utils import log_catch
from simcore_service_director_v2.modules.db.repositories.comp_runs import (
CompRunsRepository,
)

from ...core.errors import (
ComputationalBackendOnDemandNotReadyError,
Expand Down Expand Up @@ -108,7 +112,6 @@ async def _start_tasks(
project_id,
list(scheduled_tasks.keys()),
RunningState.PENDING,
optional_started=arrow.utcnow().datetime,
)
# each task is started independently
results: list[list[tuple[NodeID, str]] | Exception] = await asyncio.gather(
Expand Down Expand Up @@ -309,40 +312,47 @@ async def _process_task_result(
)

async def _task_progress_change_handler(self, event: str) -> None:
task_progress_event = TaskProgressEvent.parse_raw(event)
_logger.debug("received task progress update: %s", task_progress_event)
*_, user_id, project_id, node_id = parse_dask_job_id(task_progress_event.job_id)

comp_tasks_repo = CompTasksRepository(self.db_engine)

if task_progress_event.progress == 0:
await comp_tasks_repo.update_project_tasks_state(
project_id,
[node_id],
RunningState.STARTED,
optional_progress=task_progress_event.progress,
with log_catch(_logger, reraise=False):
task_progress_event = TaskProgressEvent.parse_raw(event)
_logger.debug("received task progress update: %s", task_progress_event)
*_, user_id, project_id, node_id = parse_dask_job_id(
task_progress_event.job_id
)
else:
await comp_tasks_repo.update_project_task_progress(
project_id, node_id, task_progress_event.progress

comp_tasks_repo = CompTasksRepository(self.db_engine)
task = await comp_tasks_repo.get_task(project_id, node_id)
if task.progress is None:
task.state = RunningState.STARTED
task.progress = task_progress_event.progress
run = await CompRunsRepository(self.db_engine).get(user_id, project_id)
await self._process_started_tasks(
[task],
user_id=user_id,
iteration=run.iteration,
run_metadata=run.metadata,
)
else:
await comp_tasks_repo.update_project_task_progress(
project_id, node_id, task_progress_event.progress
)
await publish_service_progress(
self.rabbitmq_client,
user_id=user_id,
project_id=project_id,
node_id=node_id,
progress=task_progress_event.progress,
)
await publish_service_progress(
self.rabbitmq_client,
user_id=user_id,
project_id=project_id,
node_id=node_id,
progress=task_progress_event.progress,
)

async def _task_log_change_handler(self, event: str) -> None:
task_log_event = TaskLogEvent.parse_raw(event)
_logger.debug("received task log update: %s", task_log_event)
*_, user_id, project_id, node_id = parse_dask_job_id(task_log_event.job_id)
await publish_service_log(
self.rabbitmq_client,
user_id,
project_id,
node_id,
task_log_event.log,
task_log_event.log_level,
)
with log_catch(_logger, reraise=False):
task_log_event = TaskLogEvent.parse_raw(event)
_logger.debug("received task log update: %s", task_log_event)
*_, user_id, project_id, node_id = parse_dask_job_id(task_log_event.job_id)
await publish_service_log(
self.rabbitmq_client,
user_id,
project_id,
node_id,
task_log_event.log,
task_log_event.log_level,
)
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
from servicelib.logging_utils import log_context
from servicelib.utils import logged_gather
from simcore_postgres_database.utils_projects_nodes import ProjectNodesRepo
from simcore_service_director_v2.core.errors import ComputationalTaskNotFoundError
from sqlalchemy import literal_column
from sqlalchemy.dialects.postgresql import insert

Expand Down Expand Up @@ -250,6 +251,19 @@ async def _generate_tasks_list_from_project(


class CompTasksRepository(BaseRepository):
async def get_task(self, project_id: ProjectID, node_id: NodeID) -> CompTaskAtDB:
async with self.db_engine.acquire() as conn:
result = await conn.execute(
sa.select(comp_tasks).where(
(comp_tasks.c.project_id == f"{project_id}")
& (comp_tasks.c.node_id == f"{node_id}")
)
)
row = await result.fetchone()
if not row:
raise ComputationalTaskNotFoundError(node_id=node_id)
return CompTaskAtDB.from_orm(row)

async def list_tasks(
self,
project_id: ProjectID,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
RunningState.PUBLISHED,
RunningState.PENDING,
RunningState.WAITING_FOR_RESOURCES,
RunningState.WAITING_FOR_CLUSTER,
}

PROCESSING_STATES: set[RunningState] = {
Expand Down
Loading

0 comments on commit bfa12f0

Please sign in to comment.