diff --git a/docs/concepts/tasks/README.md b/docs/concepts/tasks/README.md index 8c4800dd..dc6bbb96 100644 --- a/docs/concepts/tasks/README.md +++ b/docs/concepts/tasks/README.md @@ -17,18 +17,21 @@ As every task are extended from `BaseTask`, you will see that most of them share ``` - BaseTask - │ - │ - ┌──────┬───────────┬───────────┼───────────┬───────────┬──────────┐ - │ │ │ │ │ │ │ - │ │ │ │ │ │ │ - ▼ ▼ ▼ ▼ ▼ ▼ ▼ -Task CmdTask ResourceMaker FlowTask HttpChecker PortChecker PathChecker - │ - │ - ▼ - DockerComposeTask + BaseTask + │ + │ + ┌──────┬───────────┬───────────┬─────────────┼────────────────┬───────────┬──────────┐ + │ │ │ │ │ │ │ │ + │ │ │ │ │ │ │ │ + ▼ ▼ ▼ ▼ ▼ ▼ ▼ ▼ +Task CmdTask ResourceMaker FlowTask BaseRemoteCmdTask HttpChecker PortChecker PathChecker + │ │ + │ │ + ▼ ┌─────┴──────┐ + DockerComposeTask │ │ + ▼ ▼ + RemoteCmdTask RsyncTask + ``` Aside from the documentation, you can always dive down into [the source code](https://github.com/state-alchemists/zrb/tree/main/src/zrb/task) to see the detail implementation. @@ -118,29 +121,23 @@ A task might also have multiple upstreams. In that case, the upstreams will be e Every task has it's own lifecycle. ``` - ┌────────────────────────────┐ - │ │ - │ ▼ - │ ┌──► Ready ────► Stopped - │ │ ▲ -Triggered ────► Waiting ────► Started ───┤ │ - ▲ │ │ - │ └──► Failed ────────┘ - │ │ - │ │ - │ ▼ - └─────────── Retry +Triggered ┌─────────► Ready + │ │ + │ │ + ▼ │ + Waiting ────► Started ─────► Failed + │ ▲ │ + │ │ │ + ▼ │ ▼ + Skipped └────────── Retry ``` -> __Note:__ `Ready` and `Stopped` is interchangable. If your task is not a long running process, it most likely `stopped` first before `ready`. - - `Triggered`: Task is triggered and will be executed. - `Waiting`: Task won't be started until all it's upstreams are ready. - `Started`: Zrb has start the task. - `Failed`: The task is failed, due to internal error or other causes. A failed task can be retried or stopped, depends on `retries` setting. - `Retry`: The task has been failed and now rescheduled to be started. - `Ready`: The task is ready. Some tasks are automatically stopped after ready, but some others keep running in the background (e.g., web server, scheduler, etc) -- `Stopped`: The task is no longer running. # Common task parameters diff --git a/src/zrb/helper/callable.py b/src/zrb/helper/callable.py new file mode 100644 index 00000000..3a1dc090 --- /dev/null +++ b/src/zrb/helper/callable.py @@ -0,0 +1,10 @@ +from typing import Any, Callable +import inspect + + +async def run_async( + fn: Callable, *args: Any, **kwargs: Any +) -> Any: + if inspect.iscoroutinefunction(fn): + return await fn(*args, **kwargs) + return fn(*args, **kwargs) diff --git a/src/zrb/task/any_task.py b/src/zrb/task/any_task.py index e093a16b..03f7333b 100644 --- a/src/zrb/task/any_task.py +++ b/src/zrb/task/any_task.py @@ -30,6 +30,34 @@ async def run(self, *args: Any, **kwargs: Any) -> Any: async def check(self) -> bool: pass + @abstractmethod + async def on_triggered(self): + pass + + @abstractmethod + async def on_waiting(self): + pass + + @abstractmethod + async def on_skipped(self): + pass + + @abstractmethod + async def on_started(self): + pass + + @abstractmethod + async def on_ready(self): + pass + + @abstractmethod + async def on_failed(self, is_last_attempt: bool): + pass + + @abstractmethod + async def on_retry(self): + pass + @abstractmethod def to_function( self, env_prefix: str = '', raise_error: bool = True diff --git a/src/zrb/task/base_task.py b/src/zrb/task/base_task.py index d2cb6a75..2a43b68d 100644 --- a/src/zrb/task/base_task.py +++ b/src/zrb/task/base_task.py @@ -1,6 +1,7 @@ from zrb.helper.typing import ( Any, Callable, Iterable, List, Mapping, Optional, Union ) +from zrb.helper.callable import run_async from zrb.helper.typecheck import typechecked from zrb.task.any_task import AnyTask from zrb.task.base_task_composite import ( @@ -21,7 +22,6 @@ import asyncio import copy -import inspect import os import sys @@ -168,11 +168,33 @@ async def run(self, *args: Any, **kwargs: Any) -> Any: Please override this method. ''' if self._run_function is not None: - if inspect.iscoroutinefunction(self._run_function): - return await self._run_function(*args, **kwargs) - return self._run_function(*args, **kwargs) + return await run_async(self._run_function, *args, **kwargs) return None + async def on_triggered(self): + self.log_info('State: triggered') + + async def on_waiting(self): + self.log_info('State: waiting') + + async def on_skipped(self): + self.log_info('State: skipped') + + async def on_started(self): + self.log_info('State: started') + + async def on_ready(self): + self.log_info('State: ready') + + async def on_failed(self, is_last_attempt: bool): + failed_state_message = 'State failed' + if is_last_attempt: + failed_state_message = 'State failed (last attempt)' + self.log_info(failed_state_message) + + async def on_retry(self): + self.log_info('State: retry') + async def check(self) -> bool: ''' Return true when task is considered completed. @@ -315,7 +337,7 @@ async def _loop_check(self, show_done: bool = False) -> bool: selected_advertisement.show() self._show_done_info() self._play_bell() - self.log_info('State: ready') + await self.on_ready() return True def _show_run_command(self): @@ -391,9 +413,9 @@ async def _cached_run(self, *args: Any, **kwargs: Any) -> Any: if self._is_execution_triggered: self.log_debug('Task has been triggered') return - self.log_info('State: triggered') + await self.on_triggered() self._is_execution_triggered = True - self.log_info('State: waiting') + await self.on_waiting() # get upstream checker upstream_check_processes: Iterable[asyncio.Task] = [] self._allow_add_upstreams = False @@ -408,8 +430,7 @@ async def _cached_run(self, *args: Any, **kwargs: Any) -> Any: local_kwargs = dict(kwargs) local_kwargs['_task'] = self if not await self._check_should_execute(*args, **local_kwargs): - self.log_info('Skip execution') - self.log_info('State: stopped') + await self.on_skipped() await self._mark_done() return None # start running task @@ -419,27 +440,25 @@ async def _cached_run(self, *args: Any, **kwargs: Any) -> Any: self.log_debug( f'Started with args: {args} and kwargs: {local_kwargs}' ) - self.log_info('State: started') + await self.on_started() result = await self.run(*args, **local_kwargs) break except Exception: - self.log_info('State: failed') - if self._is_last_attempt(): + is_last_attempt = self._is_last_attempt() + await self.on_failed(is_last_attempt) + if is_last_attempt: raise attempt = self._get_attempt() self.log_error(f'Encounter error on attempt {attempt}') self._increase_attempt() await asyncio.sleep(self._retry_interval) - self.log_info('State: retry') + await self.on_retry() await self._mark_done() - self.log_info('State: stopped') return result async def _check_should_execute(self, *args: Any, **kwargs: Any) -> bool: if callable(self._should_execute): - if inspect.iscoroutinefunction(self._should_execute): - return await self._should_execute(*args, **kwargs) - return self._should_execute(*args, **kwargs) + return await run_async(self._should_execute, *args, **kwargs) return self.render_bool(self._should_execute) async def _set_keyval(self, kwargs: Mapping[str, Any], env_prefix: str):