Skip to content

Commit

Permalink
introduce recurring task
Browse files Browse the repository at this point in the history
  • Loading branch information
goFrendiAsgard committed Nov 28, 2023
1 parent 433d4ad commit cea811b
Show file tree
Hide file tree
Showing 19 changed files with 536 additions and 263 deletions.
7 changes: 7 additions & 0 deletions src/zrb/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,12 @@
from zrb.task.base_remote_cmd_task import BaseRemoteCmdTask, RemoteConfig
from zrb.task.remote_cmd_task import RemoteCmdTask
from zrb.task.rsync_task import RsyncTask
from zrb.task.checker import Checker
from zrb.task.http_checker import HTTPChecker
from zrb.task.port_checker import PortChecker
from zrb.task.path_checker import PathChecker
from zrb.task.path_watcher import PathWatcher
from zrb.task.time_watcher import TimeWatcher
from zrb.task.resource_maker import (
ResourceMaker, Replacement, ReplacementMutator
)
Expand Down Expand Up @@ -39,11 +42,15 @@
assert RemoteConfig
assert RemoteCmdTask
assert RsyncTask
assert Checker
assert HTTPChecker
assert PortChecker
assert PathChecker
assert PathWatcher
assert TimeWatcher
assert ResourceMaker
assert FlowTask
assert RecurringTask
assert Replacement
assert ReplacementMutator
assert AnyInput
Expand Down
5 changes: 4 additions & 1 deletion src/zrb/builtin/schedule.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from zrb.task.recurring_task import RecurringTask
from zrb.task.cmd_task import CmdTask
from zrb.task.time_watcher import TimeWatcher
from zrb.task_input.str_input import StrInput
from zrb.runner import runner

Expand All @@ -15,7 +16,9 @@
description='Schedule cron pattern to show the message'
),
],
schedule='{{input.schedule}}',
triggers=[
TimeWatcher(name='watch-schedule', schedule='{{input.schedule}}')
],
task=CmdTask(
name='run-task',
inputs=[
Expand Down
5 changes: 4 additions & 1 deletion src/zrb/builtin/watch.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from zrb.task.recurring_task import RecurringTask
from zrb.task.cmd_task import CmdTask
from zrb.task.path_watcher import PathWatcher
from zrb.task_input.str_input import StrInput
from zrb.runner import runner

Expand All @@ -15,7 +16,9 @@
description='File pattern to be watched'
),
],
watched_path='{{input.pattern}}',
triggers=[
PathWatcher(name='watch-path', path='{{input.pattern}}')
],
task=CmdTask(
name='run-task',
inputs=[
Expand Down
6 changes: 3 additions & 3 deletions src/zrb/task/base_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -253,9 +253,9 @@ async def _set_local_keyval(
self.log_info('Set input map')
for task_input in self._get_combined_inputs():
input_name = self._get_normalized_input_key(task_input.get_name())
input_value = self.render_any(
kwargs.get(input_name, task_input.get_default())
)
input_value = kwargs.get(input_name, task_input.get_default())
if task_input.should_render():
input_value = self.render_any(input_value)
self._set_input_map(input_name, input_value)
self.log_debug(
'Input map:\n' + map_to_str(self.get_input_map(), item_prefix=' ')
Expand Down
89 changes: 89 additions & 0 deletions src/zrb/task/checker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
from zrb.helper.typing import Any, Callable, Iterable, Optional, Union
from zrb.helper.typecheck import typechecked
from zrb.task.base_task import BaseTask
from zrb.task.any_task import AnyTask
from zrb.task.any_task_event_handler import (
OnTriggered, OnWaiting, OnSkipped, OnStarted, OnReady, OnRetry, OnFailed
)
from zrb.task_env.env import Env
from zrb.task_env.env_file import EnvFile
from zrb.task_group.group import Group
from zrb.task_input.any_input import AnyInput

import asyncio


@typechecked
class Checker(BaseTask):

def __init__(
self,
name: str = 'port-check',
group: Optional[Group] = None,
inputs: Iterable[AnyInput] = [],
envs: Iterable[Env] = [],
env_files: Iterable[EnvFile] = [],
icon: Optional[str] = None,
color: Optional[str] = None,
description: str = '',
upstreams: Iterable[AnyTask] = [],
on_triggered: Optional[OnTriggered] = None,
on_waiting: Optional[OnWaiting] = None,
on_skipped: Optional[OnSkipped] = None,
on_started: Optional[OnStarted] = None,
on_ready: Optional[OnReady] = None,
on_retry: Optional[OnRetry] = None,
on_failed: Optional[OnFailed] = None,
checking_interval: Union[int, float] = 0.1,
progress_interval: Union[int, float] = 30,
expected_result: bool = True,
should_execute: Union[bool, str, Callable[..., bool]] = True
):
BaseTask.__init__(
self,
name=name,
group=group,
inputs=inputs,
envs=envs,
env_files=env_files,
icon=icon,
color=color,
description=description,
upstreams=upstreams,
on_triggered=on_triggered,
on_waiting=on_waiting,
on_skipped=on_skipped,
on_started=on_started,
on_ready=on_ready,
on_retry=on_retry,
on_failed=on_failed,
checkers=[],
checking_interval=checking_interval,
retry=0,
retry_interval=0,
should_execute=should_execute,
)
self._progress_interval = progress_interval
self._expected_result = expected_result
self._should_show_progress = False

async def run(self, *args: Any, **kwargs: Any) -> bool:
wait_time = 0
while True:
self._should_show_progress = wait_time >= self._progress_interval
inspect_result = await self.inspect(*args, **kwargs)
if inspect_result == self._expected_result:
return True
if wait_time >= self._progress_interval:
wait_time = 0
await asyncio.sleep(self._checking_interval)
wait_time += self._checking_interval

async def inspect(self, *args: Any, **kwargs: Any) -> bool:
return False

def show_progress(self, message: str):
if self._should_show_progress:
self.print_out_dark(message)
return
self.log_debug(message)
131 changes: 58 additions & 73 deletions src/zrb/task/http_checker.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from zrb.helper.typing import Any, Callable, Iterable, Optional, Union, TypeVar
from zrb.helper.typecheck import typechecked
from zrb.task.checker import Checker
from http.client import HTTPConnection, HTTPSConnection
from zrb.task.base_task import BaseTask
from zrb.task.any_task import AnyTask
from zrb.task.any_task_event_handler import (
OnTriggered, OnWaiting, OnSkipped, OnStarted, OnReady, OnRetry, OnFailed
Expand All @@ -11,17 +11,46 @@
from zrb.task_group.group import Group
from zrb.task_input.any_input import AnyInput

import asyncio

THTTPChecker = TypeVar('THTTPChecker', bound='HTTPChecker')


@typechecked
class HTTPChecker(BaseTask):
class HttpConnectionConfig():
def __init__(
self,
is_https: bool,
method: str,
host: str,
port: int,
url: str,
timeout: int,
):
self.is_https = is_https
self.method = method
self.host = host
self.port = port
self.url = '/' + url if not url.startswith('/') else url
self.timeout = timeout
self.protocol = 'https' if self.is_https else 'http'
full_url = f'{self.protocol}://{self.host}:{self.port}{self.url}'
self.label = f'{self.method} {full_url}'

def get_connection(self) -> Union[HTTPConnection, HTTPSConnection]:
if self.is_https:
return HTTPSConnection(
host=self.host, port=self.port, timeout=self.timeout
)
return HTTPConnection(
host=self.host, port=self.port, timeout=self.timeout
)


@typechecked
class HTTPChecker(Checker):

def __init__(
self,
name: str = 'http-check',
name: str = 'check-http',
group: Optional[Group] = None,
inputs: Iterable[AnyInput] = [],
envs: Iterable[Env] = [],
Expand All @@ -43,11 +72,12 @@ def __init__(
on_ready: Optional[OnReady] = None,
on_retry: Optional[OnRetry] = None,
on_failed: Optional[OnFailed] = None,
checking_interval: float = 0.1,
show_error_interval: float = 5,
checking_interval: Union[int, float] = 0.1,
progress_interval: Union[int, float] = 5,
expected_result: bool = True,
should_execute: Union[bool, str, Callable[..., bool]] = True
):
BaseTask.__init__(
Checker.__init__(
self,
name=name,
group=group,
Expand All @@ -65,19 +95,18 @@ def __init__(
on_ready=on_ready,
on_retry=on_retry,
on_failed=on_failed,
checkers=[],
checking_interval=checking_interval,
retry=0,
retry_interval=0,
should_execute=should_execute
progress_interval=progress_interval,
expected_result=expected_result,
should_execute=should_execute,
)
self._host = host
self._port = port
self._timeout = timeout
self._method = method
self._url = url
self._is_https = is_https
self._show_error_interval = show_error_interval
self._config: Optional[HttpConnectionConfig] = None

def copy(self) -> THTTPChecker:
return super().copy()
Expand All @@ -94,77 +123,33 @@ def to_function(
)

async def run(self, *args: Any, **kwargs: Any) -> bool:
is_https = self.render_bool(self._is_https)
method = self.render_str(self._method)
host = self.render_str(self._host)
port = self.render_int(self._port)
url = self.render_str(self._url)
if not url.startswith('/'):
url = '/' + url
timeout = self.render_int(self._timeout)
wait_time = 0
while not self._check_connection(
method=method,
host=host,
port=port,
url=url,
is_https=is_https,
timeout=timeout,
should_print_error=wait_time >= self._show_error_interval
):
if wait_time >= self._show_error_interval:
wait_time = 0
await asyncio.sleep(self._checking_interval)
wait_time += self._checking_interval
return True
self._config = HttpConnectionConfig(
is_https=self.render_bool(self._is_https),
method=self.render_str(self._method),
host=self.render_str(self._host),
port=self.render_int(self._port),
url=self.render_str(self._url),
timeout=self.render_int(self._timeout)
)
return await super().run(*args, **kwargs)

def _check_connection(
self,
method: str,
host: str,
port: int,
url: str,
is_https: bool,
timeout: int,
should_print_error: bool
) -> bool:
label = self._get_label(method, host, port, is_https, url)
conn = self._get_connection(host, port, is_https, timeout)
async def inspect(self, *args: Any, **kwargs: Any) -> bool:
conn = self._config.get_connection()
try:
conn.request(method, url)
conn.request(self._config.method, self._config.url)
res = conn.getresponse()
if res.status < 300:
self.log_info('Connection success')
self.print_out(f'{label} {res.status} (OK)')
self.print_out(f'{self._config.label} {res.status} (OK)')
return True
self._debug_and_print_error(
f'{label} {res.status} (Not OK)', should_print_error
self.show_progress(
f'{self._config.label} {res.status} (Not OK)'
)
except Exception:
self._debug_and_print_error(
f'{label} Connection error', should_print_error
)
self.show_progress(f'{self._config.label} Connection error')
finally:
conn.close()
return False

def _debug_and_print_error(self, message: str, should_print_error: bool):
if should_print_error:
self.print_err(message)
self.log_debug(message)

def _get_label(
self, method: str, host: str, port: int, is_https: bool, url: str
) -> str:
protocol = 'https' if is_https else 'http'
return f'{method} {protocol}://{host}:{port}{url}'

def _get_connection(
self, host: str, port: int, is_https: bool, timeout: int
) -> Union[HTTPConnection, HTTPSConnection]:
if is_https:
return HTTPSConnection(host, port, timeout=timeout)
return HTTPConnection(host, port, timeout=timeout)

def __repr__(self) -> str:
return f'<HttpChecker name={self._name}>'
Loading

0 comments on commit cea811b

Please sign in to comment.