Skip to content

Commit

Permalink
add event
Browse files Browse the repository at this point in the history
  • Loading branch information
goFrendiAsgard committed Oct 21, 2023
1 parent 8b1fc62 commit 356e8f4
Show file tree
Hide file tree
Showing 4 changed files with 98 additions and 44 deletions.
51 changes: 24 additions & 27 deletions docs/concepts/tasks/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,21 @@ As every task are extended from `BaseTask`, you will see that most of them share


```
BaseTask
┌──────┬───────────┬───────────┼───────────┬───────────┬──────────┐
│ │ │ │ │ │ │
│ │ │ │ │ │ │
▼ ▼ ▼ ▼ ▼ ▼ ▼
Task CmdTask ResourceMaker FlowTask HttpChecker PortChecker PathChecker
DockerComposeTask
BaseTask
┌──────┬───────────┬───────────┬─────────────┼────────────────┬───────────┬──────────┐
│ │ │ │ │ │ │ │
│ │ │ │ │ │ │ │
▼ ▼ ▼ ▼ ▼ ▼ ▼ ▼
Task CmdTask ResourceMaker FlowTask BaseRemoteCmdTask HttpChecker PortChecker PathChecker
│ │
│ │
▼ ┌─────┴──────┐
DockerComposeTask │ │
▼ ▼
RemoteCmdTask RsyncTask
```

Aside from the documentation, you can always dive down into [the source code](https://github.com/state-alchemists/zrb/tree/main/src/zrb/task) to see the detail implementation.
Expand Down Expand Up @@ -118,29 +121,23 @@ A task might also have multiple upstreams. In that case, the upstreams will be e
Every task has it's own lifecycle.

```
┌────────────────────────────┐
│ │
│ ▼
│ ┌──► Ready ────► Stopped
│ │ ▲
Triggered ────► Waiting ────► Started ───┤ │
▲ │ │
│ └──► Failed ────────┘
│ │
│ │
│ ▼
└─────────── Retry
Triggered ┌─────────► Ready
│ │
│ │
▼ │
Waiting ────► Started ─────► Failed
│ ▲ │
│ │ │
▼ │ ▼
Skipped └────────── Retry
```

> __Note:__ `Ready` and `Stopped` is interchangable. If your task is not a long running process, it most likely `stopped` first before `ready`.
- `Triggered`: Task is triggered and will be executed.
- `Waiting`: Task won't be started until all it's upstreams are ready.
- `Started`: Zrb has start the task.
- `Failed`: The task is failed, due to internal error or other causes. A failed task can be retried or stopped, depends on `retries` setting.
- `Retry`: The task has been failed and now rescheduled to be started.
- `Ready`: The task is ready. Some tasks are automatically stopped after ready, but some others keep running in the background (e.g., web server, scheduler, etc)
- `Stopped`: The task is no longer running.

# Common task parameters

Expand Down
10 changes: 10 additions & 0 deletions src/zrb/helper/callable.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
from typing import Any, Callable
import inspect


async def run_async(
fn: Callable, *args: Any, **kwargs: Any
) -> Any:
if inspect.iscoroutinefunction(fn):
return await fn(*args, **kwargs)
return fn(*args, **kwargs)
28 changes: 28 additions & 0 deletions src/zrb/task/any_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,34 @@ async def run(self, *args: Any, **kwargs: Any) -> Any:
async def check(self) -> bool:
pass

@abstractmethod
async def on_triggered(self):
pass

@abstractmethod
async def on_waiting(self):
pass

@abstractmethod
async def on_skipped(self):
pass

@abstractmethod
async def on_started(self):
pass

@abstractmethod
async def on_ready(self):
pass

@abstractmethod
async def on_failed(self, is_last_attempt: bool):
pass

@abstractmethod
async def on_retry(self):
pass

@abstractmethod
def to_function(
self, env_prefix: str = '', raise_error: bool = True
Expand Down
53 changes: 36 additions & 17 deletions src/zrb/task/base_task.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from zrb.helper.typing import (
Any, Callable, Iterable, List, Mapping, Optional, Union
)
from zrb.helper.callable import run_async
from zrb.helper.typecheck import typechecked
from zrb.task.any_task import AnyTask
from zrb.task.base_task_composite import (
Expand All @@ -21,7 +22,6 @@

import asyncio
import copy
import inspect
import os
import sys

Expand Down Expand Up @@ -168,11 +168,33 @@ async def run(self, *args: Any, **kwargs: Any) -> Any:
Please override this method.
'''
if self._run_function is not None:
if inspect.iscoroutinefunction(self._run_function):
return await self._run_function(*args, **kwargs)
return self._run_function(*args, **kwargs)
return await run_async(self._run_function, *args, **kwargs)
return None

async def on_triggered(self):
self.log_info('State: triggered')

async def on_waiting(self):
self.log_info('State: waiting')

async def on_skipped(self):
self.log_info('State: skipped')

async def on_started(self):
self.log_info('State: started')

async def on_ready(self):
self.log_info('State: ready')

async def on_failed(self, is_last_attempt: bool):
failed_state_message = 'State failed'
if is_last_attempt:
failed_state_message = 'State failed (last attempt)'
self.log_info(failed_state_message)

async def on_retry(self):
self.log_info('State: retry')

async def check(self) -> bool:
'''
Return true when task is considered completed.
Expand Down Expand Up @@ -315,7 +337,7 @@ async def _loop_check(self, show_done: bool = False) -> bool:
selected_advertisement.show()
self._show_done_info()
self._play_bell()
self.log_info('State: ready')
await self.on_ready()
return True

def _show_run_command(self):
Expand Down Expand Up @@ -391,9 +413,9 @@ async def _cached_run(self, *args: Any, **kwargs: Any) -> Any:
if self._is_execution_triggered:
self.log_debug('Task has been triggered')
return
self.log_info('State: triggered')
await self.on_triggered()
self._is_execution_triggered = True
self.log_info('State: waiting')
await self.on_waiting()
# get upstream checker
upstream_check_processes: Iterable[asyncio.Task] = []
self._allow_add_upstreams = False
Expand All @@ -408,8 +430,7 @@ async def _cached_run(self, *args: Any, **kwargs: Any) -> Any:
local_kwargs = dict(kwargs)
local_kwargs['_task'] = self
if not await self._check_should_execute(*args, **local_kwargs):
self.log_info('Skip execution')
self.log_info('State: stopped')
await self.on_skipped()
await self._mark_done()
return None
# start running task
Expand All @@ -419,27 +440,25 @@ async def _cached_run(self, *args: Any, **kwargs: Any) -> Any:
self.log_debug(
f'Started with args: {args} and kwargs: {local_kwargs}'
)
self.log_info('State: started')
await self.on_started()
result = await self.run(*args, **local_kwargs)
break
except Exception:
self.log_info('State: failed')
if self._is_last_attempt():
is_last_attempt = self._is_last_attempt()
await self.on_failed(is_last_attempt)
if is_last_attempt:
raise
attempt = self._get_attempt()
self.log_error(f'Encounter error on attempt {attempt}')
self._increase_attempt()
await asyncio.sleep(self._retry_interval)
self.log_info('State: retry')
await self.on_retry()
await self._mark_done()
self.log_info('State: stopped')
return result

async def _check_should_execute(self, *args: Any, **kwargs: Any) -> bool:
if callable(self._should_execute):
if inspect.iscoroutinefunction(self._should_execute):
return await self._should_execute(*args, **kwargs)
return self._should_execute(*args, **kwargs)
return await run_async(self._should_execute, *args, **kwargs)
return self.render_bool(self._should_execute)

async def _set_keyval(self, kwargs: Mapping[str, Any], env_prefix: str):
Expand Down

0 comments on commit 356e8f4

Please sign in to comment.