From 7c85fd4d21e2b4b02261d9abf7b2f87394e3bfb6 Mon Sep 17 00:00:00 2001 From: Go Frendi Gunawan Date: Thu, 30 Nov 2023 09:24:02 +0700 Subject: [PATCH] Release/0.0.118 (#58) * update to version 0.0.118 * add test * refactor task * add more test * add test for non-renderable env * refactor * fix typing * make some methods and properties private * use protected and private properties * turn methods into private * update docs and add test --- .coveragerc | 5 + docs/concepts/tasks/README.md | 2 +- docs/getting-started.md | 204 +++++-- docs/installation.md | 6 +- pyproject.toml | 2 +- src/zrb/helper/env_map/fetch.py | 12 +- src/zrb/task/base_remote_cmd_task.py | 2 +- src/zrb/task/base_task/__init__.py | 0 src/zrb/task/{ => base_task}/base_task.py | 262 ++------ src/zrb/task/base_task/component/__init__.py | 0 .../base_task/component/base_task_model.py | 258 ++++++++ .../base_task/component/common_task_model.py | 282 +++++++++ src/zrb/task/base_task/component/pid_model.py | 17 + src/zrb/task/base_task/component/renderer.py | 119 ++++ src/zrb/task/base_task/component/trackers.py | 76 +++ src/zrb/task/base_task_composite.py | 558 ------------------ src/zrb/task/checker.py | 2 +- src/zrb/task/cmd_task.py | 92 +-- src/zrb/task/docker_compose_task.py | 42 +- src/zrb/task/flow_task.py | 2 +- src/zrb/task/recurring_task.py | 2 +- src/zrb/task/resource_maker.py | 2 +- src/zrb/task/task.py | 2 +- src/zrb/task_env/env.py | 40 +- src/zrb/task_env/env_file.py | 30 +- src/zrb/task_input/base_input.py | 4 +- test/task/task_copy/env_file.env | 1 + test/task/task_copy/new_env_file.env | 1 + test/task/task_copy/test_task_copy.py | 105 ++++ test/task/test_flow_task.py | 1 - test/task/test_task.py | 270 ++------- test/task/test_task_env.py | 151 +++++ .../{test_task.env => test_task_env_file.env} | 0 test/task/test_task_env_file.py | 49 ++ test/task/test_task_execution_id.py | 36 ++ test/task/test_task_input.py | 219 +++++++ zrb_init.py | 2 +- 37 files changed, 1713 insertions(+), 1145 deletions(-) create mode 100644 .coveragerc create mode 100644 src/zrb/task/base_task/__init__.py rename src/zrb/task/{ => base_task}/base_task.py (65%) create mode 100644 src/zrb/task/base_task/component/__init__.py create mode 100644 src/zrb/task/base_task/component/base_task_model.py create mode 100644 src/zrb/task/base_task/component/common_task_model.py create mode 100644 src/zrb/task/base_task/component/pid_model.py create mode 100644 src/zrb/task/base_task/component/renderer.py create mode 100644 src/zrb/task/base_task/component/trackers.py delete mode 100644 src/zrb/task/base_task_composite.py create mode 100644 test/task/task_copy/env_file.env create mode 100644 test/task/task_copy/new_env_file.env create mode 100644 test/task/task_copy/test_task_copy.py create mode 100644 test/task/test_task_env.py rename test/task/{test_task.env => test_task_env_file.env} (100%) create mode 100644 test/task/test_task_env_file.py create mode 100644 test/task/test_task_execution_id.py create mode 100644 test/task/test_task_input.py diff --git a/.coveragerc b/.coveragerc new file mode 100644 index 00000000..57fafc04 --- /dev/null +++ b/.coveragerc @@ -0,0 +1,5 @@ +[run] +omit = + src/zrb/task/any_task.py + src/zrb/task_input/any_input.py + src/zrb/**/__init__.py \ No newline at end of file diff --git a/docs/concepts/tasks/README.md b/docs/concepts/tasks/README.md index 499d4123..42e4f11b 100644 --- a/docs/concepts/tasks/README.md +++ b/docs/concepts/tasks/README.md @@ -26,7 +26,7 @@ As every task are extended from `BaseTask`, you will see that most of them share │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ ▼ ▼ ▼ ▼ ▼ ▼ ▼ ▼ ▼ -Task CmdTask ResourceMaker FlowTask BaseRemoteCmdTask TriggeredTask HttpChecker PortChecker PathChecker +Task CmdTask ResourceMaker FlowTask BaseRemoteCmdTask ReccuringTask HttpChecker PortChecker PathChecker │ │ │ │ ▼ ┌─────┴──────┐ diff --git a/docs/getting-started.md b/docs/getting-started.md index fece23c5..3ea34cf0 100644 --- a/docs/getting-started.md +++ b/docs/getting-started.md @@ -88,18 +88,32 @@ See our [tutorial](tutorials/integration-with-other-tools.md) to see how you can By convention, we usually put related `tasks` under the same `task-group`. -For example, we have the following two tasks under `base64` group: +For example, there two tasks under `base64` group. Both are dealing with base64 encoding/decoding algorithm: -- `encode` -- `decode` +```bash +zrb base64 +``` -Now, let's try to decode our base64-encoded text: +``` +Usage: zrb base64 [OPTIONS] COMMAND [ARGS]... + + Base64 operations + +Options: + --help Show this message and exit. + +Commands: + decode Decode a base64 encoded text + encode Encode a text using base64 algorithm +``` + +Now, let's try to decode our previously base64-encoded text: ```bash zrb base64 decode --text "bm9uLWNyZWRlbnRpYWwtc3RyaW5n" ``` -You should get your original text back. +The command will return the original text (i.e., `non-credential-string`). > __💡 HINT:__ You don't have to memorize any `task-group` or `task` name. The next two subsections will show you how to locate and execute any `task` without memorize anything. @@ -173,7 +187,7 @@ Commands: Once you find the task you want to execute, you can type `zrb [task-groups...] ` without bothering about `task-parameters`. -Zrb will automatically prompt you to provide the parameter interactively. +Zrb will ask you to provide the parameter interactively. ```bash zrb base64 encode @@ -198,13 +212,13 @@ To run again: zrb base64 encode --text "non-credential-string" To make things more manageable, you must put all related resources and task definitions under a `project`. A project is a directory containing `zrb_init.py`. -You can create a project manually or use Zrb's built-in task to generate the project. Suppose you want to create a project named `my-project`. +You can create a project manually or use Zrb's built-in task to generate the project. Suppose you want to create a project named `my-project`. You can do so by invoking the following command: ```bash zrb project create --project-dir my-project --project-name my-project ``` -Once invoked, you will have a directory named `my-project`. Let's see how the project looks like: +Once invoked, you will see a directory named `my-project`. Let's see what the project looks like: ```bash cd my-project @@ -232,10 +246,10 @@ Every Zrb project has a file named `zrb_init.py` under the top-level directory. By convention, a project usually contains two sub-directories: -- `_automate`: This folder contains all your automation scripts and task definitions -- `src`: This folder contains all your resources like Docker compose file, helm charts, and source code. +- ___automate__: This folder contains all your automation scripts and task definitions +- __src__: This folder contains all your resources like Docker compose file, helm charts, and source code. -When you make a project using `zrb project create` command, Zrb will generate a default `task-group` named `project`. This `task-group` contains some tasks to run/deploy everything. Try to type `zrb project` to see what tasks are available by default: +When you make a project using `zrb project create` command, Zrb will generate a default `task-group` named `project`. This `task-group` contains some tasks to run/deploy your applications. Try to type `zrb project` to see what tasks are available by default: ```bash zrb project @@ -263,24 +277,43 @@ Commands: stop-containers Stop project containers ``` + +> __💡 HINT:__ To start working with Zrb, it is better to create a project. You can create a project by using `zrb project create` command, or by creating a file named `zrb_init.py` + ## Activating Virtual Environment -If you generate the project by invoking `zrb project create`, then you need to run the following command everytime you start working with the project: +Working in a virtual environment is recommended in most cases. This encapsulates your project pip packages, ensuring better independence and reproducibility. + +### Activating Virtual Environment On A Generated Project + +If you generate the project by invoking `zrb project create`, then you need to run the following command every time you start working on the project: ```bash source project.sh ``` -The command will ensure that you work under the project's virtual environment. +The command will activate the project's virtual environment and install necessary pip packages. + +### Activating Virtual Environment On A Manually Created Project -If you create the project manually, you need to make a virtual environment for your project: +If you create the project manually (i.e., by creating `zrb_init.py`), you also need to make a virtual environment for your project. Creating a virtual environment is necessary if you work with non-standard Python libraries. + +To create a virtual environment, you can invoke the following command: ```bash python -m venv .venv +``` + +Once you make the virtual environment, you can activate it by invoking the following command: + +```bash source .venv/bin/activate ``` -> __⚠️ WARNING:__ You have to make sure you are working under virtual environment everytime you work with Zrb project, either by invoking `source project.sh` or `source .venv/bin/activate`. +You need to run the command every time you start working on the project. + + +> __💡 HINT:__ Working with virtual environment is recommended whenever you work with any Python project, including Zrb project. # Creating a Task @@ -513,7 +546,7 @@ There are several built-in task classes. Each with its specific use case: - __RSyncTask__: Copy file from/to remote computers using `rsync` command. - __ResourceMaker__: Create resources (source code/documents) based on provided templates. - __FlowTask__: Combine unrelated tasks into a single Workflow. -- __TriggeredTask__: Create a long-running scheduled task or file watcher based on another task. +- __RecurringTask__: Create a long-running recurring task. You can also create a custom task class as long as it fits `AnyTask` interface. The easiest way to ensure compatibility is by extending `BaseTask`. See our [tutorial](tutorials/extending-cmd-task.md) to see how we can create a new Task Class based on `CmdTask`. @@ -561,10 +594,11 @@ You can apply task parameters to both Task classes and `@python_task` decorator. ## Task Inputs You can define task inputs using `StrInput`, `BoolInput`, `ChoiceInput`, `FloatInput`, `IntInput`, or `PasswordInput`. -To create an input, you need to provide two parameters at least: +To create an input, you need to provide some parameters: -- __name__: The name of the input. By convention, this should be kebab-cased. -- __default__: The default value of the input. +- __name__: The name of the input. By convention, this should be kebab-cased (required). +- __default__: The default value of the input (optional, default: `None`). +- __should_render__: Whether the input should be rendered as Jinja template or not (optional, default: `True`). For example, here you have an input named `message` with `Hello World` as the default value: @@ -576,83 +610,123 @@ message = StrInput(name='message', default='Hello World') When you run a task with task inputs, Zrb will prompt you to override the input values. You can press `enter` if you want to use the default values. -To access the values of your inputs from your `CmdTask`, you can use Jinja template `{{ input.input_name }}`. Notice that you should use `snake_case` instead of `kebab-case` in your Jinja template. +### Using Task Inputs on CmdTask -As for `@python_task`, you can use `kwargs` dictionary to get the input. Let's see the following example: +To access the values of your inputs from your `CmdTask`, you can use Jinja template `{{ input.input_name }}`. Notice that you should use `snake_case` instead of `kebab-case` to refer to the input. Let's see the following example: ```python -from zrb import runner, CmdTask, python_task, StrInput +from zrb import runner, CmdTask, StrInput hello_cmd = CmdTask( name='hello-cmd', inputs=[ - StrInput(name='name', default='World') + StrInput(name='your-name', default='World') ], - cmd='echo Hello {{input.name}}' + # Notice, we use {{input.your_name}} not {{input.your-name}} !!! + cmd='echo Hello {{input.your_name}}' ) runner.register(hello_cmd) +``` + +You can then run the task by invoking: + +```bash +zrb hello-cmd +# or +zrb hello-cmd --your-name "John Wick" +``` + +### Using Task Inputs on @python_task Decorator + +As for `@python_task`, you can use `kwargs` dictionary to get the input. +```python +from zrb import runner, python_task, StrInput @python_task( name='hello-py', inputs=[ - StrInput(name='name', default='World') + StrInput(name='your-name', default='World') ], runner=runner ) def hello_py(*args, **kwargs): - name = kwargs.get('name') + # Notice, we use `your_name` instead of `your-name` !!! + name = kwargs.get('your_name') return f'Hello {name}' - ``` -You can run the tasks by invoking: -``` bash -zrb hello-cmd +You can then run the task by invoking: + +```bash zrb hello-py +# or +zrb hello-py --your-name "John Wick" ``` -our you can provide the input values: +## Task Environments -```bash -zrb hello-cmd --name "Go Frendi" -zrb hello-py --name "Go Frendi" +Aside from input, you can also define the `Task`'s environment variables using `Env` and `EnvFile`. + +### Env + +You can use `Env` to define a single environment variable for your Tasks. Typically, a Task could take multiple `Env`. + +To create an `Env`, you need to provide some parameters: + +- __name__: Name of the environment variable (required). +- __os_name__: Name of OS environment (optional, default=`None`) + - if set to `None`, Zrb will link the environment variable to the OS environment. + - if set to an empty string (i.e., `''`), Zrb will not link the environment variable to the OS's environment. + - if set to a non-empty string, Zrb will link the environment variable to the OS's environment corresponding to this value. +- __default__: Default value of the environment variable (optional, default: `None`). +- __should_render__: Whether the environment variable should be rendered as a Jinja template (optional, default: `True`). + + +```python +from zrb import Env + +env = Env(name='MESSAGE') ``` -## Task Environments +### EnvFile + +`EnvFile` loads an environment file and uses its values as Task's environment variables. Typically a Task could take multiple `EnvFile`. + +To create an `EnvFile`, you need to provide some parameters: -Aside from input, you can also use environment variables by using `Env` and `EnvFile` +- __env_file__: Name of the environment file (required). +- __prefix__: Custom prefix for environment's os_name (optional, default=`None`) +- __should_render__: Whether the environment variable should be rendered as a Jinja template (optional, default: `True`). ```python -from zrb import Env, EnvFile +from zrb import EnvFile import os PROJECT_ENV = os.path.join(os.path.dirname(__file__), 'project.env') env_file = EnvFile(env_file=PROJECT_ENV) - -env = Env(name='MESSAGE') ``` -You can use `Env` and `EnvFile`, in your tasks. Let's first create an environment file named `project.env`: +### Using Env and EnvFile + +To use `EnvFile` in your tasks. Let's first create an environment file named `project.env`: ```bash # file-name: project.env SERVER_HOST=localhost ``` -To access the values of your inputs from your `CmdTask`, you can use Jinja template `{{ env.ENV_NAME }}`. +### Using Env and EnvFile on CmdTask -As for `@python_task`, you cannot use `os.getenv` to access task's environment. Instead, you should get the `task` instance and invoke `task.get_env_map()`. +To access the values of your inputs from your `CmdTask`, you can use Jinja template `{{ env.ENV_NAME }}`. ```python -from zrb import runner, CmdTask, AnyTask, python_task, Env, EnvFile +from zrb import runner, CmdTask, Env, EnvFile import os PROJECT_ENV = os.path.join(os.path.dirname(__file__), 'project.env') -print(PROJECT_ENV) - hello_cmd = CmdTask( name='hello-cmd', envs=[ @@ -667,6 +741,30 @@ hello_cmd = CmdTask( ] ) runner.register(hello_cmd) +``` + +You can then run the task by invoking: + +```bash +zrb hello-cmd +``` + +It will give you the following results: + +``` +Message: Hello world +Host: localhost +``` + +### Using Env and EnvFile on @python_task Decorator + +As for `@python_task`, you cannot use `os.getenv` to access task's environment. Instead, you should get the `task` instance from `kwargs`` and invoke `task.get_env_map()`. + +```python +from zrb import runner, AnyTask, python_task, Env, EnvFile +import os + +PROJECT_ENV = os.path.join(os.path.dirname(__file__), 'project.env') @python_task( @@ -690,14 +788,13 @@ def hello_py(*args, **kwargs): ]) ``` -Now, you can invoke the tasks as follows: +You can then run the task by invoking: ```bash zrb hello-cmd -zrb hello-py ``` -Both tasks will show you similar outputs: +It will give you the following results: ``` Message: Hello world @@ -706,11 +803,11 @@ Host: localhost ## Switching Environment -Zrb has a feature named environment cascading. This feature automatically helps you switch between multiple environments (e.g., dev, staging, production). +Zrb has a feature named environment cascading. This feature helps you switch between multiple environments (e.g., dev, staging, production). To switch between environments, you can use `ZRB_ENV` -Let's see the following example: +Let's go back to our previous example and set some environment variables: ```bash @@ -719,7 +816,6 @@ export PROD_MESSAGE="Hello, Client" export PROD_SERVER_HOST=stalchmst.com zrb hello-cmd -zrb-hello-py ``` Without `ZRB_ENV`, when you run the following commands, you will get the same outputs: @@ -729,9 +825,11 @@ Message: Hello world Host: localhost ``` +Since we don't have `MESSAGE` and `HOST` on OS's environment variable, Zrb will use the default values. + ### Dev Environment -Now let's try this again with `DEV` environment: +Now, let's try this again with `DEV` environment: ```bash export DEV_MESSAGE="Test Hello World" @@ -740,7 +838,6 @@ export PROD_SERVER_HOST=stalchmst.com export ZRB_ENV=DEV zrb hello-cmd -zrb-hello-py ``` Now, it will get the the following outputs: @@ -765,7 +862,6 @@ export PROD_SERVER_HOST=stalchmst.com export ZRB_ENV=PROD zrb hello-cmd -zrb-hello-py ``` Now, since Zrb can find both `PROD_MESSAGE` and `PROD_SERVER_HOST`, Zrb will show the following output: diff --git a/docs/installation.md b/docs/installation.md index 3cf886bd..019934cb 100644 --- a/docs/installation.md +++ b/docs/installation.md @@ -22,11 +22,15 @@ We provide an [installation script](https://github.com/state-alchemists/zrb/blob curl https://raw.githubusercontent.com/state-alchemists/zrb/main/install.sh | bash ``` +We recommend this installation method if you work on a new computer/VM. + ## As Python Package +You can also install Zrb as a Python package using pip. + ### Checking for Prerequisites -Before installing Zrb as Python packages, make sure you have the following prerequisites: +Before installing Zrb as a Python package, make sure you have the following prerequisites: - 🐍 `Python` - 📦 `Pip` diff --git a/pyproject.toml b/pyproject.toml index bffcd66b..b727a725 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "flit_core.buildapi" [project] name = "zrb" -version = "0.0.117" +version = "0.0.118" authors = [ { name="Go Frendi Gunawan", email="gofrendiasgard@gmail.com" }, ] diff --git a/src/zrb/helper/env_map/fetch.py b/src/zrb/helper/env_map/fetch.py index a3fd1c21..b8e9dc71 100644 --- a/src/zrb/helper/env_map/fetch.py +++ b/src/zrb/helper/env_map/fetch.py @@ -42,7 +42,7 @@ def _add_envs_to_env_map( env_map: Mapping[str, str], envs: List[Env] ) -> Mapping[str, str]: for env in envs: - if env.os_name == '': + if env.get_os_name() == '': continue env_name = _get_env_name(env) env_default = _get_env_default(env) @@ -64,13 +64,13 @@ def _cascade_env_map( @typechecked def _get_env_name(env: Env) -> str: - if env.os_name is None: - return env.name - return env.os_name + if env.get_os_name() is None: + return env.get_name() + return env.get_os_name() @typechecked def _get_env_default(env: Env) -> str: - if is_probably_jinja(env.default): + if is_probably_jinja(env.get_default()): return '' - return env.default + return env.get_default() diff --git a/src/zrb/task/base_remote_cmd_task.py b/src/zrb/task/base_remote_cmd_task.py index af5c0977..c628d41d 100644 --- a/src/zrb/task/base_remote_cmd_task.py +++ b/src/zrb/task/base_remote_cmd_task.py @@ -7,7 +7,7 @@ from zrb.task.any_task_event_handler import ( OnTriggered, OnWaiting, OnSkipped, OnStarted, OnReady, OnRetry, OnFailed ) -from zrb.task.base_task import BaseTask +from zrb.task.base_task.base_task import BaseTask from zrb.task_env.env import Env from zrb.task_env.env_file import EnvFile from zrb.task_group.group import Group diff --git a/src/zrb/task/base_task/__init__.py b/src/zrb/task/base_task/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/src/zrb/task/base_task.py b/src/zrb/task/base_task/base_task.py similarity index 65% rename from src/zrb/task/base_task.py rename to src/zrb/task/base_task/base_task.py index 12cbcbdf..11c5e1ca 100644 --- a/src/zrb/task/base_task.py +++ b/src/zrb/task/base_task/base_task.py @@ -7,34 +7,29 @@ from zrb.task.any_task_event_handler import ( OnTriggered, OnWaiting, OnSkipped, OnStarted, OnReady, OnRetry, OnFailed ) -from zrb.task.base_task_composite import ( - AttemptTracker, FinishTracker, Renderer, TaskModelWithPrinterAndTracker +from zrb.task.base_task.component.trackers import ( + AttemptTracker, FinishTracker ) +from zrb.task.base_task.component.renderer import Renderer +from zrb.task.base_task.component.base_task_model import BaseTaskModel from zrb.advertisement import advertisements from zrb.task_group.group import Group -from zrb.task_env.constant import RESERVED_ENV_NAMES from zrb.task_env.env import Env from zrb.task_env.env_file import EnvFile from zrb.task_input.any_input import AnyInput -from zrb.task_input.constant import RESERVED_INPUT_NAMES -from zrb.helper.accessories.color import colored from zrb.helper.accessories.name import get_random_name from zrb.helper.advertisement import get_advertisement -from zrb.helper.string.modification import double_quote from zrb.helper.string.conversion import to_variable_name from zrb.helper.map.conversion import to_str as map_to_str -from zrb.config.config import show_advertisement, env_prefix +from zrb.config.config import show_advertisement import asyncio import copy -import os -import sys @typechecked class BaseTask( - FinishTracker, AttemptTracker, Renderer, TaskModelWithPrinterAndTracker, - AnyTask + FinishTracker, AttemptTracker, Renderer, BaseTaskModel, AnyTask ): ''' Base class for all tasks. @@ -74,7 +69,7 @@ def __init__( FinishTracker.__init__(self) Renderer.__init__(self) AttemptTracker.__init__(self, retry=retry) - TaskModelWithPrinterAndTracker.__init__( + BaseTaskModel.__init__( self, name=name, group=group, @@ -90,66 +85,26 @@ def __init__( checkers=checkers, checking_interval=checking_interval, run=run, + on_triggered=on_triggered, + on_waiting=on_waiting, + on_skipped=on_skipped, + on_started=on_started, + on_ready=on_ready, + on_retry=on_retry, + on_failed=on_failed, should_execute=should_execute, + return_upstream_result=return_upstream_result ) - self._return_upstream_result = return_upstream_result - # Event Handler - self._on_triggered = on_triggered - self._on_waiting = on_waiting - self._on_skipped = on_skipped - self._on_started = on_started - self._on_ready = on_ready - self._on_retry = on_retry - self._on_failed = on_failed - # init private properties - self._is_keyval_set = False # Flag - self._all_inputs: Optional[List[AnyInput]] = None - self._is_check_triggered: bool = False - self._is_ready: bool = False - self._is_execution_triggered: bool = False - self._is_execution_started: bool = False - self._args: List[Any] = [] - self._kwargs: Mapping[str, Any] = {} + # init private flags + self.__is_keyval_set = False + self.__is_check_triggered: bool = False + self.__is_ready: bool = False + self.__is_execution_triggered: bool = False + self.__is_execution_started: bool = False def copy(self) -> AnyTask: return copy.deepcopy(self) - def _get_combined_inputs(self) -> Iterable[AnyInput]: - '''' - Getting all inputs of this task and all its upstream, non-duplicated. - ''' - if self._all_inputs is not None: - return self._all_inputs - self._all_inputs: List[AnyInput] = [] - existing_input_names: Mapping[str, bool] = {} - # Add task inputs - inputs = self._get_inputs() - for input_index, first_occurence_task_input in enumerate(inputs): - input_name = first_occurence_task_input.get_name() - if input_name in existing_input_names: - continue - # Look for all input with the same name in the current task - task_inputs = [ - candidate - for candidate in inputs[input_index:] - if candidate.get_name() == input_name - ] - # Get the last input, and add it to _all_inputs - task_input = task_inputs[-1] - self._all_inputs.append(task_input) - existing_input_names[input_name] = True - # Add upstream inputs - for upstream in self._get_upstreams(): - upstream_inputs = upstream._get_combined_inputs() - for upstream_input in upstream_inputs: - if upstream_input.get_name() in existing_input_names: - continue - self._all_inputs.append(upstream_input) - existing_input_names[upstream_input.get_name()] = True - self._allow_add_upstreams = False - self._allow_add_inputs = False - return self._all_inputs - def to_function( self, env_prefix: str = '', @@ -229,79 +184,6 @@ async def check(self) -> bool: ''' return await self._is_done() - def _show_done_info(self): - elapsed_time = self._get_elapsed_time() - self.print_out_dark(f'Completed in {elapsed_time} seconds') - self._play_bell() - - def _get_multiline_repr(self, text: str) -> str: - lines_repr: Iterable[str] = [] - lines = text.split('\n') - if len(lines) == 1: - return lines[0] - for index, line in enumerate(lines): - line_number_repr = str(index + 1).rjust(4, '0') - lines_repr.append(f' {line_number_repr} | {line}') - return '\n' + '\n'.join(lines_repr) - - async def _set_local_keyval( - self, kwargs: Mapping[str, Any], env_prefix: str = '' - ): - if self._is_keyval_set: - return True - self._is_keyval_set = True - self.log_info('Set input map') - for task_input in self._get_combined_inputs(): - input_name = self._get_normalized_input_key(task_input.get_name()) - input_value = kwargs.get(input_name, task_input.get_default()) - if task_input.should_render(): - input_value = self.render_any(input_value) - self._set_input_map(input_name, input_value) - self.log_debug( - 'Input map:\n' + map_to_str(self.get_input_map(), item_prefix=' ') - ) - self.log_info('Merging task envs, task env files, and native envs') - for env_name, env in self._get_combined_env().items(): - env_value = env.get(env_prefix) - if env.should_render: - env_value = self.render_any(env_value) - self._set_env_map(env_name, env_value) - self._set_env_map('_ZRB_EXECUTION_ID', self._execution_id) - self.log_debug( - 'Env map:\n' + map_to_str(self.get_env_map(), item_prefix=' ') - ) - - def _get_combined_env(self) -> Mapping[str, Env]: - all_envs: Mapping[str, Env] = {} - for env_name in os.environ: - if env_name in RESERVED_ENV_NAMES: - continue - all_envs[env_name] = Env( - name=env_name, os_name=env_name, should_render=False - ) - for env_file in self._get_env_files(): - for env in env_file.get_envs(): - all_envs[env.name] = env - for env in self._get_envs(): - all_envs[env.name] = env - self._allow_add_envs = False - self._allow_add_env_files = False - return all_envs - - def _get_normalized_input_key(self, key: str) -> str: - if key in RESERVED_INPUT_NAMES: - return key - return to_variable_name(key) - - def _propagate_execution_id(self): - execution_id = self.get_execution_id() - for upstream_task in self._get_upstreams(): - upstream_task._set_execution_id(execution_id) - upstream_task._propagate_execution_id() - for checker_task in self._get_checkers(): - checker_task._set_execution_id(execution_id) - checker_task._propagate_execution_id() - async def _run_and_check_all( self, env_prefix: str, @@ -329,8 +211,8 @@ async def _run_and_check_all( new_kwargs['_args'] = new_args # inject self as input_map['_task'] new_kwargs['_task'] = self - self._args = new_args - self._kwargs = new_kwargs + self._set_args(new_args) + self._set_kwargs(new_kwargs) # run the task coroutines = [ asyncio.create_task( @@ -352,28 +234,6 @@ async def _run_and_check_all( self._show_run_command() self._play_bell() - def _print_result(self, result: Any): - if result is None: - return - if self._return_upstream_result: - # if _return_upstream_result, result is list (see: self._run_all) - upstreams = self._get_upstreams() - upstream_results = list(result) - for upstream_index, upstream_result in enumerate(upstream_results): - upstreams[upstream_index]._print_result(upstream_result) - return - self.print_result(result) - - def print_result(self, result: Any): - ''' - Print result to stdout so that it can be processed further. - e.g.: echo $(zrb explain solid) > solid-principle.txt - - You need to override this method - if you want to show the result differently. - ''' - print(result) - async def _loop_check(self, show_done_info: bool = False) -> bool: self.log_info('Start readiness checking') while not await self._cached_check(): @@ -389,41 +249,16 @@ async def _loop_check(self, show_done_info: bool = False) -> bool: await self.on_ready() return True - def _show_env_prefix(self): - if env_prefix == '': - return - colored_env_prefix = colored(env_prefix, color='yellow') - colored_label = colored('Your current environment: ', attrs=['dark']) - print(colored(f'{colored_label}{colored_env_prefix}'), file=sys.stderr) - - def _show_run_command(self): - params: List[str] = [double_quote(arg) for arg in self._args] - for task_input in self._get_combined_inputs(): - if task_input.is_hidden(): - continue - key = task_input.get_name() - kwarg_key = self._get_normalized_input_key(key) - quoted_value = double_quote(str(self._kwargs[kwarg_key])) - params.append(f'--{key} {quoted_value}') - run_cmd = self._get_full_cmd_name() - run_cmd_with_param = run_cmd - if len(params) > 0: - param_str = ' '.join(params) - run_cmd_with_param += ' ' + param_str - colored_command = colored(run_cmd_with_param, color='yellow') - colored_label = colored('To run again: ', attrs=['dark']) - print(colored(f'{colored_label}{colored_command}'), file=sys.stderr) - async def _cached_check(self) -> bool: - if self._is_check_triggered: + if self.__is_check_triggered: self.log_debug('Waiting readiness flag to be set') - while not self._is_ready: + while not self.__is_ready: await asyncio.sleep(0.1) return True - self._is_check_triggered = True + self.__is_check_triggered = True check_result = await self._check() if check_result: - self._is_ready = True + self.__is_ready = True self.log_debug('Set readiness flag to True') return check_result @@ -437,7 +272,7 @@ async def _check(self) -> bool: if len(self._checkers) == 0: return await self.check() self.log_debug('Waiting execution to be started') - while not self._is_execution_started: + while not self.__is_execution_started: # Don't start checking before the execution itself has been started await asyncio.sleep(0.1) check_coroutines: Iterable[asyncio.Task] = [] @@ -453,7 +288,7 @@ async def _run_all(self, *args: Any, **kwargs: Any) -> Any: await self._mark_awaited() coroutines: Iterable[asyncio.Task] = [] # Add upstream tasks to processes - self._allow_add_upstreams = False + self._lock_upstreams() for upstream_task in self._get_upstreams(): upstream_task._set_execution_id(self.get_execution_id()) coroutines.append(asyncio.create_task( @@ -468,15 +303,15 @@ async def _run_all(self, *args: Any, **kwargs: Any) -> Any: return results[-1] async def _cached_run(self, *args: Any, **kwargs: Any) -> Any: - if self._is_execution_triggered: + if self.__is_execution_triggered: self.log_debug('Task has been triggered') return await self.on_triggered() - self._is_execution_triggered = True + self.__is_execution_triggered = True await self.on_waiting() # get upstream checker upstream_check_processes: Iterable[asyncio.Task] = [] - self._allow_add_upstreams = False + self._lock_upstreams() for upstream_task in self._get_upstreams(): upstream_check_processes.append(asyncio.create_task( upstream_task._loop_check() @@ -484,7 +319,7 @@ async def _cached_run(self, *args: Any, **kwargs: Any) -> Any: # wait all upstream checkers to complete await asyncio.gather(*upstream_check_processes) # mark execution as started, so that checkers can start checking - self._is_execution_started = True + self.__is_execution_started = True local_kwargs = dict(kwargs) local_kwargs['_task'] = self if not await self._check_should_execute(*args, **local_kwargs): @@ -522,7 +357,7 @@ async def _check_should_execute(self, *args: Any, **kwargs: Any) -> bool: async def _set_keyval(self, kwargs: Mapping[str, Any], env_prefix: str): # if input is not in input_map, add default values for task_input in self._get_combined_inputs(): - key = self._get_normalized_input_key(task_input.get_name()) + key = to_variable_name(task_input.get_name()) if key in kwargs: continue kwargs[key] = task_input.get_default() @@ -533,7 +368,7 @@ async def _set_keyval(self, kwargs: Mapping[str, Any], env_prefix: str): new_kwargs.update(self.get_input_map()) upstream_coroutines = [] # set upstreams keyval - self._allow_add_upstreams = False + self._lock_upstreams() for upstream_task in self._get_upstreams(): upstream_coroutines.append(asyncio.create_task( upstream_task._set_keyval( @@ -555,5 +390,32 @@ async def _set_keyval(self, kwargs: Mapping[str, Any], env_prefix: str): coroutines = checker_coroutines + upstream_coroutines await asyncio.gather(*coroutines) + async def _set_local_keyval( + self, kwargs: Mapping[str, Any], env_prefix: str = '' + ): + if self.__is_keyval_set: + return True + self.__is_keyval_set = True + self.log_info('Set input map') + for task_input in self._get_combined_inputs(): + input_name = to_variable_name(task_input.get_name()) + input_value = kwargs.get(input_name, task_input.get_default()) + if task_input.should_render(): + input_value = self.render_any(input_value) + self._set_input_map(input_name, input_value) + self.log_debug( + 'Input map:\n' + map_to_str(self.get_input_map(), item_prefix=' ') + ) + self.log_info('Merging task envs, task env files, and native envs') + for env_name, env in self._get_combined_env().items(): + env_value = env.get(env_prefix) + if env.should_render(): + env_value = self.render_any(env_value) + self._set_env_map(env_name, env_value) + self._set_env_map('_ZRB_EXECUTION_ID', self.get_execution_id()) + self.log_debug( + 'Env map:\n' + map_to_str(self.get_env_map(), item_prefix=' ') + ) + def __repr__(self) -> str: return f'' diff --git a/src/zrb/task/base_task/component/__init__.py b/src/zrb/task/base_task/component/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/src/zrb/task/base_task/component/base_task_model.py b/src/zrb/task/base_task/component/base_task_model.py new file mode 100644 index 00000000..22d1bbdc --- /dev/null +++ b/src/zrb/task/base_task/component/base_task_model.py @@ -0,0 +1,258 @@ +from zrb.helper.typing import ( + Any, Callable, Iterable, List, Mapping, Optional, Union +) +from zrb.helper.typecheck import typechecked +from zrb.config.config import show_time +from zrb.task.any_task import AnyTask +from zrb.task.any_task_event_handler import ( + OnTriggered, OnWaiting, OnSkipped, OnStarted, OnReady, OnRetry, OnFailed +) +from zrb.helper.log import logger +from zrb.helper.accessories.color import colored +from zrb.task_input.any_input import AnyInput +from zrb.task_group.group import Group +from zrb.task_env.env import Env +from zrb.task_env.env_file import EnvFile +from zrb.task.base_task.component.common_task_model import CommonTaskModel +from zrb.task.base_task.component.pid_model import PidModel +from zrb.task.base_task.component.trackers import TimeTracker +from zrb.config.config import env_prefix +from zrb.helper.string.modification import double_quote +from zrb.helper.string.conversion import to_variable_name + +import datetime +import os +import sys + +LOG_NAME_LENGTH = 20 + + +@typechecked +class BaseTaskModel(CommonTaskModel, PidModel, TimeTracker): + def __init__( + self, + name: str, + group: Optional[Group] = None, + description: str = '', + inputs: List[AnyInput] = [], + envs: Iterable[Env] = [], + env_files: Iterable[EnvFile] = [], + icon: Optional[str] = None, + color: Optional[str] = None, + retry: int = 2, + retry_interval: Union[int, float] = 1, + upstreams: Iterable[AnyTask] = [], + checkers: Iterable[AnyTask] = [], + checking_interval: Union[int, float] = 0, + run: Optional[Callable[..., Any]] = None, + on_triggered: Optional[OnTriggered] = None, + on_waiting: Optional[OnWaiting] = None, + on_skipped: Optional[OnSkipped] = None, + on_started: Optional[OnStarted] = None, + on_ready: Optional[OnReady] = None, + on_retry: Optional[OnRetry] = None, + on_failed: Optional[OnFailed] = None, + should_execute: Union[bool, str, Callable[..., bool]] = True, + return_upstream_result: bool = False + ): + self.__rjust_full_cmd_name: Optional[str] = None + self.__has_cli_interface = False + self.__complete_name: Optional[str] = None + CommonTaskModel.__init__( + self, + name=name, + group=group, + description=description, + inputs=inputs, + envs=envs, + env_files=env_files, + icon=icon, + color=color, + retry=retry, + retry_interval=retry_interval, + upstreams=upstreams, + checkers=checkers, + checking_interval=checking_interval, + run=run, + on_triggered=on_triggered, + on_waiting=on_waiting, + on_skipped=on_skipped, + on_started=on_started, + on_ready=on_ready, + on_retry=on_retry, + on_failed=on_failed, + should_execute=should_execute, + return_upstream_result=return_upstream_result + ) + PidModel.__init__(self) + TimeTracker.__init__(self) + self.__args: List[Any] = [] + self.__kwargs: Mapping[str, Any] = {} + + def _set_args(self, args: Iterable[Any]): + self.__args = list(args) + + def _set_kwargs(self, kwargs: Mapping[str, Any]): + self.__kwargs = kwargs + + def log_debug(self, message: Any): + prefix = self.__get_log_prefix() + colored_message = colored( + f'{prefix} • {message}', attrs=['dark'] + ) + logger.debug(colored_message) + + def log_warn(self, message: Any): + prefix = self.__get_log_prefix() + colored_message = colored( + f'{prefix} • {message}', attrs=['dark'] + ) + logger.warning(colored_message) + + def log_info(self, message: Any): + prefix = self.__get_log_prefix() + colored_message = colored( + f'{prefix} • {message}', attrs=['dark'] + ) + logger.info(colored_message) + + def log_error(self, message: Any): + prefix = self.__get_log_prefix() + colored_message = colored( + f'{prefix} • {message}', color='red', attrs=['bold'] + ) + logger.error(colored_message, exc_info=True) + + def log_critical(self, message: Any): + prefix = self.__get_log_prefix() + colored_message = colored( + f'{prefix} • {message}', color='red', attrs=['bold'] + ) + logger.critical(colored_message, exc_info=True) + + def print_out(self, message: Any, trim_message: bool = True): + prefix = self.__get_colored_print_prefix() + message_str = f'{message}'.rstrip() if trim_message else f'{message}' + print(f'🤖 ○ {prefix} • {message_str}', file=sys.stderr) + sys.stderr.flush() + + def print_err(self, message: Any, trim_message: bool = True): + prefix = self.__get_colored_print_prefix() + message_str = f'{message}'.rstrip() if trim_message else f'{message}' + print(f'🤖 △ {prefix} • {message_str}', file=sys.stderr) + sys.stderr.flush() + + def print_out_dark(self, message: Any, trim_message: bool = True): + message_str = f'{message}' + self.print_out(colored(message_str, attrs=['dark']), trim_message) + + def _print_result(self, result: Any): + if result is None: + return + if self._return_upstream_result: + # if _return_upstream_result, result is list (see: self._run_all) + upstreams = self._get_upstreams() + upstream_results = list(result) + for upstream_index, upstream_result in enumerate(upstream_results): + upstreams[upstream_index]._print_result(upstream_result) + return + self.print_result(result) + + def print_result(self, result: Any): + ''' + Print result to stdout so that it can be processed further. + e.g.: echo $(zrb explain solid) > solid-principle.txt + + You need to override this method + if you want to show the result differently. + ''' + print(result) + + def _play_bell(self): + print('\a', end='', file=sys.stderr) + + def _show_done_info(self): + elapsed_time = self._get_elapsed_time() + self.print_out_dark(f'Completed in {elapsed_time} seconds') + self._play_bell() + + def _show_env_prefix(self): + if env_prefix == '': + return + colored_env_prefix = colored(env_prefix, color='yellow') + colored_label = colored('Your current environment: ', attrs=['dark']) + print(colored(f'{colored_label}{colored_env_prefix}'), file=sys.stderr) + + def _show_run_command(self): + params: List[str] = [double_quote(arg) for arg in self.__args] + for task_input in self._get_combined_inputs(): + if task_input.is_hidden(): + continue + key = task_input.get_name() + kwarg_key = to_variable_name(key) + quoted_value = double_quote(str(self.__kwargs[kwarg_key])) + params.append(f'--{key} {quoted_value}') + run_cmd = self._get_full_cmd_name() + run_cmd_with_param = run_cmd + if len(params) > 0: + param_str = ' '.join(params) + run_cmd_with_param += ' ' + param_str + colored_command = colored(run_cmd_with_param, color='yellow') + colored_label = colored('To run again: ', attrs=['dark']) + print(colored(f'{colored_label}{colored_command}'), file=sys.stderr) + + def __get_colored_print_prefix(self) -> str: + return self.__get_colored(self.__get_print_prefix()) + + def __get_colored(self, text: str) -> str: + return colored(text, color=self.get_color()) + + def __get_print_prefix(self) -> str: + common_prefix = self.__get_common_prefix(show_time=show_time) + icon = self.get_icon() + rjust_cmd_name = self.__get_rjust_full_cmd_name() + return f'{common_prefix} {icon} {rjust_cmd_name}' + + def __get_log_prefix(self) -> str: + common_prefix = self.__get_common_prefix(show_time=False) + icon = self.get_icon() + filled_name = self.__get_rjust_full_cmd_name() + return f'{common_prefix} {icon} {filled_name}' + + def __get_common_prefix(self, show_time: bool) -> str: + attempt = self._get_attempt() + max_attempt = self._get_max_attempt() + pid = str(self._get_task_pid()).rjust(6) + if show_time: + now = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3] + return f'◷ {now} ❁ {pid} → {attempt}/{max_attempt}' + return f'❁ {pid} → {attempt}/{max_attempt}' + + def __get_rjust_full_cmd_name(self) -> str: + if self.__rjust_full_cmd_name is not None: + return self.__rjust_full_cmd_name + complete_name = self._get_full_cmd_name() + self.__rjust_full_cmd_name = complete_name.rjust(LOG_NAME_LENGTH, ' ') + return self.__rjust_full_cmd_name + + def __get_executable_name(self) -> str: + if len(sys.argv) > 0 and sys.argv[0] != '': + return os.path.basename(sys.argv[0]) + return 'zrb' + + def _get_full_cmd_name(self) -> str: + if self.__complete_name is not None: + return self.__complete_name + executable_prefix = '' + if self.__has_cli_interface: + executable_prefix += self.__get_executable_name() + ' ' + cmd_name = self.get_cmd_name() + if self._group is None: + self.__complete_name = f'{executable_prefix}{cmd_name}' + return self.__complete_name + group_cmd_name = self._group.get_complete_name() + self.__complete_name = f'{executable_prefix}{group_cmd_name} {cmd_name}' # noqa + return self.__complete_name + + def _set_has_cli_interface(self): + self.__has_cli_interface = True diff --git a/src/zrb/task/base_task/component/common_task_model.py b/src/zrb/task/base_task/component/common_task_model.py new file mode 100644 index 00000000..89f6aac6 --- /dev/null +++ b/src/zrb/task/base_task/component/common_task_model.py @@ -0,0 +1,282 @@ +from zrb.helper.typing import ( + Any, Callable, Iterable, List, Mapping, Optional, Union +) +from zrb.helper.typecheck import typechecked +from zrb.task.any_task_event_handler import ( + OnTriggered, OnWaiting, OnSkipped, OnStarted, OnReady, OnRetry, OnFailed +) +from zrb.task.any_task import AnyTask +from zrb.helper.string.conversion import to_cmd_name +from zrb.helper.accessories.color import get_random_color +from zrb.helper.accessories.icon import get_random_icon +from zrb.helper.util import coalesce_str +from zrb.task_input.any_input import AnyInput +from zrb.task_group.group import Group +from zrb.task_env.constant import RESERVED_ENV_NAMES +from zrb.task_env.env import Env +from zrb.task_env.env_file import EnvFile + +import os + + +@typechecked +class CommonTaskModel(): + def __init__( + self, + name: str, + group: Optional[Group] = None, + description: str = '', + inputs: List[AnyInput] = [], + envs: Iterable[Env] = [], + env_files: Iterable[EnvFile] = [], + icon: Optional[str] = None, + color: Optional[str] = None, + retry: int = 2, + retry_interval: Union[float, int] = 1, + upstreams: Iterable[AnyTask] = [], + checkers: Iterable[AnyTask] = [], + checking_interval: Union[float, int] = 0, + run: Optional[Callable[..., Any]] = None, + on_triggered: Optional[OnTriggered] = None, + on_waiting: Optional[OnWaiting] = None, + on_skipped: Optional[OnSkipped] = None, + on_started: Optional[OnStarted] = None, + on_ready: Optional[OnReady] = None, + on_retry: Optional[OnRetry] = None, + on_failed: Optional[OnFailed] = None, + should_execute: Union[bool, str, Callable[..., bool]] = True, + return_upstream_result: bool = False + ): + self._name = name + self._group = group + if group is not None: + group.add_task(self) + self._description = coalesce_str(description, name) + self._inputs = inputs + self._envs = envs + self._env_files = env_files + self._icon = coalesce_str(icon, get_random_icon()) + self._color = coalesce_str(color, get_random_color()) + self._retry = retry + self._retry_interval = retry_interval + self._upstreams = upstreams + self._checkers = checkers + self._checking_interval = checking_interval + self._run_function: Optional[Callable[..., Any]] = run + self._on_triggered = on_triggered + self._on_waiting = on_waiting + self._on_skipped = on_skipped + self._on_started = on_started + self._on_ready = on_ready + self._on_retry = on_retry + self._on_failed = on_failed + self._should_execute = should_execute + self._return_upstream_result = return_upstream_result + self.__execution_id = '' + self.__allow_add_envs = True + self.__allow_add_env_files = True + self.__allow_add_inputs = True + self.__allow_add_upstreams: bool = True + self.__has_already_inject_env_files: bool = False + self.__has_already_inject_envs: bool = False + self.__has_already_inject_inputs: bool = False + self.__has_already_inject_checkers: bool = False + self.__has_already_inject_upstreams: bool = False + self.__all_inputs: Optional[List[AnyInput]] = None + + def _lock_upstreams(self): + self.__allow_add_upstreams = False + + def _set_execution_id(self, execution_id: str): + if self.__execution_id == '': + self.__execution_id = execution_id + + def _propagate_execution_id(self): + execution_id = self.get_execution_id() + for upstream_task in self._get_upstreams(): + upstream_task._set_execution_id(execution_id) + upstream_task._propagate_execution_id() + for checker_task in self._get_checkers(): + checker_task._set_execution_id(execution_id) + checker_task._propagate_execution_id() + + def get_execution_id(self) -> str: + return self.__execution_id + + def set_name(self, new_name: str): + if self._description == self._name: + self._description = new_name + self._name = new_name + + def get_cmd_name(self) -> str: + return to_cmd_name(self._name) + + def set_description(self, new_description: str): + self._description = new_description + + def get_description(self) -> str: + return self._description + + def set_icon(self, new_icon: str): + self._icon = new_icon + + def set_color(self, new_color: str): + self._color = new_color + + def set_retry(self, new_retry: int): + self._retry = new_retry + + def set_should_execute( + self, should_execute: Union[bool, str, Callable[..., bool]] + ): + self._should_execute = should_execute + + def set_retry_interval(self, new_retry_interval: Union[float, int]): + self._retry_interval = new_retry_interval + + def set_checking_interval(self, new_checking_interval: Union[float, int]): + self._checking_interval = new_checking_interval + + def insert_input(self, *inputs: AnyInput): + if not self.__allow_add_inputs: + raise Exception(f'Cannot insert inputs for `{self._name}`') + self._inputs = list(inputs) + list(self._inputs) + + def add_input(self, *inputs: AnyInput): + if not self.__allow_add_inputs: + raise Exception(f'Cannot add inputs for `{self._name}`') + self._inputs = list(self._inputs) + list(inputs) + + def inject_inputs(self): + pass + + def _get_inputs(self) -> List[AnyInput]: + if not self.__has_already_inject_inputs: + self.inject_inputs() + self.__has_already_inject_inputs = True + return list(self._inputs) + + def _get_combined_inputs(self) -> Iterable[AnyInput]: + '''' + Getting all inputs of this task and all its upstream, non-duplicated. + ''' + if self.__all_inputs is not None: + return self.__all_inputs + self.__all_inputs: List[AnyInput] = [] + existing_input_names: Mapping[str, bool] = {} + # Add task inputs + inputs = self._get_inputs() + for input_index, first_occurence_task_input in enumerate(inputs): + input_name = first_occurence_task_input.get_name() + if input_name in existing_input_names: + continue + # Look for all input with the same name in the current task + task_inputs = [ + candidate + for candidate in inputs[input_index:] + if candidate.get_name() == input_name + ] + # Get the last input, and add it to _all_inputs + task_input = task_inputs[-1] + self.__all_inputs.append(task_input) + existing_input_names[input_name] = True + # Add upstream inputs + for upstream in self._get_upstreams(): + upstream_inputs = upstream._get_combined_inputs() + for upstream_input in upstream_inputs: + if upstream_input.get_name() in existing_input_names: + continue + self.__all_inputs.append(upstream_input) + existing_input_names[upstream_input.get_name()] = True + self._lock_upstreams() + self.__allow_add_inputs = False + return self.__all_inputs + + def insert_env(self, *envs: Env): + if not self.__allow_add_envs: + raise Exception(f'Cannot insert envs to `{self._name}`') + self._envs = list(envs) + list(self._envs) + + def add_env(self, *envs: Env): + if not self.__allow_add_envs: + raise Exception(f'Cannot add envs to `{self._name}`') + self._envs = list(self._envs) + list(envs) + + def inject_envs(self): + pass + + def _get_envs(self) -> List[Env]: + if not self.__has_already_inject_envs: + self.inject_envs() + self.__has_already_inject_envs = True + return list(self._envs) + + def _get_combined_env(self) -> Mapping[str, Env]: + all_envs: Mapping[str, Env] = {} + for env_name in os.environ: + if env_name in RESERVED_ENV_NAMES: + continue + all_envs[env_name] = Env( + name=env_name, os_name=env_name, should_render=False + ) + for env_file in self._get_env_files(): + for env in env_file.get_envs(): + all_envs[env.get_name()] = env + for env in self._get_envs(): + all_envs[env.get_name()] = env + self.__allow_add_envs = False + self.__allow_add_env_files = False + return all_envs + + def insert_env_file(self, *env_files: EnvFile): + if not self.__allow_add_env_files: + raise Exception(f'Cannot insert env_files to `{self._name}`') + self._env_files = list(env_files) + list(self._env_files) + + def add_env_file(self, *env_files: EnvFile): + if not self.__allow_add_env_files: + raise Exception(f'Cannot add env_files to `{self._name}`') + self._env_files = list(self._env_files) + list(env_files) + + def inject_env_files(self): + pass + + def insert_upstream(self, *upstreams: AnyTask): + if not self.__allow_add_upstreams: + raise Exception(f'Cannot insert upstreams to `{self._name}`') + self._upstreams = list(upstreams) + list(self._upstreams) + + def add_upstream(self, *upstreams: AnyTask): + if not self.__allow_add_upstreams: + raise Exception(f'Cannot add upstreams to `{self._name}`') + self._upstreams = list(self._upstreams) + list(upstreams) + + def inject_upstreams(self): + pass + + def _get_upstreams(self) -> List[AnyTask]: + if not self.__has_already_inject_upstreams: + self.inject_upstreams() + self.__has_already_inject_upstreams = True + return list(self._upstreams) + + def get_icon(self) -> str: + return self._icon + + def get_color(self) -> str: + return self._color + + def _get_env_files(self) -> List[EnvFile]: + if not self.__has_already_inject_env_files: + self.inject_env_files() + self.__has_already_inject_env_files = True + return self._env_files + + def inject_checkers(self): + pass + + def _get_checkers(self) -> List[AnyTask]: + if not self.__has_already_inject_checkers: + self.inject_checkers() + self.__has_already_inject_checkers = True + return list(self._checkers) diff --git a/src/zrb/task/base_task/component/pid_model.py b/src/zrb/task/base_task/component/pid_model.py new file mode 100644 index 00000000..75149873 --- /dev/null +++ b/src/zrb/task/base_task/component/pid_model.py @@ -0,0 +1,17 @@ +from zrb.helper.typecheck import typechecked + +import os + + +@typechecked +class PidModel(): + + def __init__(self): + self.__task_pid: int = os.getpid() + + def _set_task_pid(self, pid: int): + self.__task_pid = pid + + def _get_task_pid(self) -> int: + return self.__task_pid + diff --git a/src/zrb/task/base_task/component/renderer.py b/src/zrb/task/base_task/component/renderer.py new file mode 100644 index 00000000..774a249a --- /dev/null +++ b/src/zrb/task/base_task/component/renderer.py @@ -0,0 +1,119 @@ +from zrb.helper.typing import Any, Mapping, Optional, Union +from zrb.helper.typecheck import typechecked +from zrb.helper.string.conversion import to_boolean +from zrb.helper.string.jinja import is_probably_jinja +from zrb.helper.render_data import DEFAULT_RENDER_DATA + +import os +import jinja2 + + +class AnyExtensionFileSystemLoader(jinja2.FileSystemLoader): + def get_source(self, environment, template): + for search_dir in self.searchpath: + file_path = os.path.join(search_dir, template) + if os.path.exists(file_path): + with open(file_path, 'r') as file: + contents = file.read() + return contents, file_path, lambda: False + raise jinja2.TemplateNotFound(template) + + +@typechecked +class Renderer(): + + def __init__(self): + self.__input_map: Mapping[str, Any] = {} + self.__env_map: Mapping[str, str] = {} + self.__render_data: Optional[Mapping[str, Any]] = None + self.__rendered_str: Mapping[str, str] = {} + + def get_input_map(self) -> Mapping[str, Any]: + # This return reference to input map, so input map can be updated + return self.__input_map + + def _set_input_map(self, key: str, val: Any): + self.__input_map[key] = val + + def get_env_map(self) -> Mapping[str, str]: + # This return reference to env map, so env map can be updated + return self.__env_map + + def _set_env_map(self, key: str, val: str): + self.__env_map[key] = val + + def render_any( + self, val: Any, data: Optional[Mapping[str, Any]] = None + ) -> Any: + if isinstance(val, str): + return self.render_str(val, data) + return val + + def render_float( + self, val: Union[str, float], data: Optional[Mapping[str, Any]] = None + ) -> float: + if isinstance(val, str): + return float(self.render_str(val, data)) + return val + + def render_int( + self, val: Union[str, int], data: Optional[Mapping[str, Any]] = None + ) -> int: + if isinstance(val, str): + return int(self.render_str(val, data)) + return val + + def render_bool( + self, val: Union[str, bool], data: Optional[Mapping[str, Any]] = None + ) -> bool: + if isinstance(val, str): + return to_boolean(self.render_str(val, data)) + return val + + def render_str( + self, val: str, data: Optional[Mapping[str, Any]] = None + ) -> str: + if val in self.__rendered_str: + return self.__rendered_str[val] + if not is_probably_jinja(val): + return val + template = jinja2.Template(val) + render_data = self.__get_render_data(additional_data=data) + try: + rendered_text = template.render(render_data) + except Exception: + raise Exception(f'Fail to render "{val}" with data: {render_data}') + self.__rendered_str[val] = rendered_text + return rendered_text + + def render_file( + self, location: str, data: Optional[Mapping[str, Any]] = None + ) -> str: + location_dir = os.path.dirname(location) + env = jinja2.Environment( + loader=AnyExtensionFileSystemLoader([location_dir]) + ) + template = env.get_template(location) + render_data = self.__get_render_data(additional_data=data) + render_data['TEMPLATE_DIR'] = location_dir + rendered_text = template.render(render_data) + return rendered_text + + def __get_render_data( + self, additional_data: Optional[Mapping[str, Any]] = None + ) -> Mapping[str, Any]: + self.__ensure_cached_render_data() + if additional_data is None: + return self.__render_data + return {**self.__render_data, **additional_data} + + def __ensure_cached_render_data(self): + if self.__render_data is not None: + return self.__render_data + render_data = dict(DEFAULT_RENDER_DATA) + render_data.update({ + 'env': self.__env_map, + 'input': self.__input_map, + }) + self.__render_data = render_data + return render_data diff --git a/src/zrb/task/base_task/component/trackers.py b/src/zrb/task/base_task/component/trackers.py new file mode 100644 index 00000000..ead2d386 --- /dev/null +++ b/src/zrb/task/base_task/component/trackers.py @@ -0,0 +1,76 @@ +from zrb.helper.typing import Optional +from zrb.helper.typecheck import typechecked + +import asyncio +import time + +LOG_NAME_LENGTH = 20 + + +@typechecked +class TimeTracker(): + + def __init__(self): + self.__start_time: float = 0 + self.__end_time: float = 0 + + def _start_timer(self): + self.__start_time = time.time() + + def _end_timer(self): + self.__end_time = time.time() + + def _get_elapsed_time(self) -> float: + return self.__end_time - self.__start_time + + +@typechecked +class AttemptTracker(): + + def __init__(self, retry: int = 2): + self.__retry = retry + self.__attempt: int = 1 + + def _get_max_attempt(self) -> int: + return self.__retry + 1 + + def _get_attempt(self) -> int: + return self.__attempt + + def _increase_attempt(self): + self.__attempt += 1 + + def _should_attempt(self) -> bool: + attempt = self._get_attempt() + max_attempt = self._get_max_attempt() + return attempt <= max_attempt + + def _is_last_attempt(self) -> bool: + attempt = self._get_attempt() + max_attempt = self._get_max_attempt() + return attempt >= max_attempt + + +@typechecked +class FinishTracker(): + + def __init__(self): + self.__execution_queue: Optional[asyncio.Queue] = None + self.__counter = 0 + + async def _mark_awaited(self): + if self.__execution_queue is None: + self.__execution_queue = asyncio.Queue() + self.__counter += 1 + + async def _mark_done(self): + # Tracker might be started several times + # However, when the execution is marked as done, it applied globally + # Thus, we need to send event as much as the counter. + for i in range(self.__counter): + await self.__execution_queue.put(True) + + async def _is_done(self) -> bool: + while self.__execution_queue is None: + await asyncio.sleep(0.05) + return await self.__execution_queue.get() diff --git a/src/zrb/task/base_task_composite.py b/src/zrb/task/base_task_composite.py deleted file mode 100644 index 24019374..00000000 --- a/src/zrb/task/base_task_composite.py +++ /dev/null @@ -1,558 +0,0 @@ -from zrb.helper.typing import ( - Any, Callable, Iterable, List, Mapping, Optional, Union -) -from zrb.helper.typecheck import typechecked -from zrb.config.config import show_time -from zrb.task.any_task import AnyTask -from zrb.helper.string.conversion import to_boolean, to_cmd_name -from zrb.helper.string.jinja import is_probably_jinja -from zrb.helper.render_data import DEFAULT_RENDER_DATA -from zrb.helper.log import logger -from zrb.helper.accessories.color import colored, get_random_color -from zrb.helper.accessories.icon import get_random_icon -from zrb.helper.util import coalesce_str -from zrb.task_input.any_input import AnyInput -from zrb.task_group.group import Group -from zrb.task_env.env import Env -from zrb.task_env.env_file import EnvFile - -import asyncio -import datetime -import os -import time -import jinja2 -import sys - -LOG_NAME_LENGTH = 20 - - -@typechecked -class CommonTaskModel(): - def __init__( - self, - name: str, - group: Optional[Group] = None, - description: str = '', - inputs: List[AnyInput] = [], - envs: Iterable[Env] = [], - env_files: Iterable[EnvFile] = [], - icon: Optional[str] = None, - color: Optional[str] = None, - retry: int = 2, - retry_interval: Union[float, int] = 1, - upstreams: Iterable[AnyTask] = [], - checkers: Iterable[AnyTask] = [], - checking_interval: Union[float, int] = 0, - run: Optional[Callable[..., Any]] = None, - should_execute: Union[bool, str, Callable[..., bool]] = True - ): - self._name = name - self._group = group - if group is not None: - group.add_task(self) - self._description = coalesce_str(description, name) - self._inputs = inputs - self._envs = envs - self._env_files = env_files - self._icon = coalesce_str(icon, get_random_icon()) - self._color = coalesce_str(color, get_random_color()) - self._retry = retry - self._retry_interval = retry_interval - self._upstreams = upstreams - self._checkers = checkers - self._checking_interval = checking_interval - self._run_function: Optional[Callable[..., Any]] = run - self._should_execute = should_execute - self._allow_add_envs = True - self._allow_add_env_files = True - self._allow_add_inputs = True - self._allow_add_upstreams: bool = True - self._has_already_inject_env_files: bool = False - self._has_already_inject_envs: bool = False - self._has_already_inject_inputs: bool = False - self._has_already_inject_checkers: bool = False - self._has_already_inject_upstreams: bool = False - self._execution_id = '' - - def _set_execution_id(self, execution_id: str): - if self._execution_id == '': - self._execution_id = execution_id - - def set_name(self, new_name: str): - if self._description == self._name: - self._description = new_name - self._name = new_name - - def set_description(self, new_description: str): - self._description = new_description - - def set_icon(self, new_icon: str): - self._icon = new_icon - - def set_color(self, new_color: str): - self._color = new_color - - def set_retry(self, new_retry: int): - self._retry = new_retry - - def set_should_execute( - self, should_execute: Union[bool, str, Callable[..., bool]] - ): - self._should_execute = should_execute - - def set_retry_interval(self, new_retry_interval: Union[float, int]): - self._retry_interval = new_retry_interval - - def set_checking_interval(self, new_checking_interval: Union[float, int]): - self._checking_interval = new_checking_interval - - def insert_input(self, *inputs: AnyInput): - if not self._allow_add_inputs: - raise Exception(f'Cannot insert inputs for `{self._name}`') - self._inputs = list(inputs) + list(self._inputs) - - def add_input(self, *inputs: AnyInput): - if not self._allow_add_inputs: - raise Exception(f'Cannot add inputs for `{self._name}`') - self._inputs = list(self._inputs) + list(inputs) - - def insert_env(self, *envs: Env): - if not self._allow_add_envs: - raise Exception(f'Cannot insert envs to `{self._name}`') - self._envs = list(envs) + list(self._envs) - - def add_env(self, *envs: Env): - if not self._allow_add_envs: - raise Exception(f'Cannot add envs to `{self._name}`') - self._envs = list(self._envs) + list(envs) - - def insert_env_file(self, *env_files: EnvFile): - if not self._allow_add_env_files: - raise Exception(f'Cannot insert env_files to `{self._name}`') - self._env_files = list(env_files) + list(self._env_files) - - def add_env_file(self, *env_files: EnvFile): - if not self._allow_add_env_files: - raise Exception(f'Cannot add env_files to `{self._name}`') - self._env_files = list(self._env_files) + list(env_files) - - def insert_upstream(self, *upstreams: AnyTask): - if not self._allow_add_upstreams: - raise Exception(f'Cannot insert upstreams to `{self._name}`') - self._upstreams = list(upstreams) + list(self._upstreams) - - def add_upstream(self, *upstreams: AnyTask): - if not self._allow_add_upstreams: - raise Exception(f'Cannot add upstreams to `{self._name}`') - self._upstreams = list(self._upstreams) + list(upstreams) - - def get_execution_id(self) -> str: - return self._execution_id - - def get_icon(self) -> str: - return self._icon - - def get_color(self) -> str: - return self._color - - def inject_env_files(self): - pass - - def _get_env_files(self) -> List[EnvFile]: - if not self._has_already_inject_env_files: - self.inject_env_files() - self._has_already_inject_env_files = True - return self._env_files - - def inject_envs(self): - pass - - def _get_envs(self) -> List[Env]: - if not self._has_already_inject_envs: - self.inject_envs() - self._has_already_inject_envs = True - return list(self._envs) - - def inject_inputs(self): - pass - - def _get_inputs(self) -> List[AnyInput]: - if not self._has_already_inject_inputs: - self.inject_inputs() - self._has_already_inject_inputs = True - return list(self._inputs) - - def inject_checkers(self): - pass - - def _get_checkers(self) -> List[AnyTask]: - if not self._has_already_inject_checkers: - self.inject_checkers() - self._has_already_inject_checkers = True - return list(self._checkers) - - def inject_upstreams(self): - pass - - def _get_upstreams(self) -> List[AnyTask]: - if not self._has_already_inject_upstreams: - self.inject_upstreams() - self._has_already_inject_upstreams = True - return list(self._upstreams) - - def get_description(self) -> str: - return self._description - - def get_cmd_name(self) -> str: - return to_cmd_name(self._name) - - -@typechecked -class TimeTracker(): - - def __init__(self): - self.__start_time: float = 0 - self.__end_time: float = 0 - - def _start_timer(self): - self.__start_time = time.time() - - def _end_timer(self): - self.__end_time = time.time() - - def _get_elapsed_time(self) -> float: - return self.__end_time - self.__start_time - - -@typechecked -class AttemptTracker(): - - def __init__(self, retry: int = 2): - self.__retry = retry - self.__attempt: int = 1 - self.__no_more_attempt: bool = False - - def _get_max_attempt(self) -> int: - return self.__retry + 1 - - def _get_attempt(self) -> int: - return self.__attempt - - def _increase_attempt(self): - self.__attempt += 1 - - def _should_attempt(self) -> bool: - attempt = self._get_attempt() - max_attempt = self._get_max_attempt() - return attempt <= max_attempt - - def _is_last_attempt(self) -> bool: - attempt = self._get_attempt() - max_attempt = self._get_max_attempt() - return attempt >= max_attempt - - -@typechecked -class FinishTracker(): - - def __init__(self): - self.__execution_queue: Optional[asyncio.Queue] = None - self.__counter = 0 - - async def _mark_awaited(self): - if self.__execution_queue is None: - self.__execution_queue = asyncio.Queue() - self.__counter += 1 - - async def _mark_done(self): - # Tracker might be started several times - # However, when the execution is marked as done, it applied globally - # Thus, we need to send event as much as the counter. - for i in range(self.__counter): - await self.__execution_queue.put(True) - - async def _is_done(self) -> bool: - while self.__execution_queue is None: - await asyncio.sleep(0.05) - return await self.__execution_queue.get() - - -@typechecked -class PidModel(): - - def __init__(self): - self.__task_pid: int = os.getpid() - - def _set_task_pid(self, pid: int): - self.__task_pid = pid - - def _get_task_pid(self) -> int: - return self.__task_pid - - -class AnyExtensionFileSystemLoader(jinja2.FileSystemLoader): - def get_source(self, environment, template): - for search_dir in self.searchpath: - file_path = os.path.join(search_dir, template) - if os.path.exists(file_path): - with open(file_path, 'r') as file: - contents = file.read() - return contents, file_path, lambda: False - raise jinja2.TemplateNotFound(template) - - -@typechecked -class Renderer(): - - def __init__(self): - self.__input_map: Mapping[str, Any] = {} - self.__env_map: Mapping[str, str] = {} - self.__render_data: Optional[Mapping[str, Any]] = None - self.__rendered_str: Mapping[str, str] = {} - - def get_input_map(self) -> Mapping[str, Any]: - # This return reference to input map, so input map can be updated - return self.__input_map - - def _set_input_map(self, key: str, val: Any): - self.__input_map[key] = val - - def get_env_map(self) -> Mapping[str, str]: - # This return reference to env map, so env map can be updated - return self.__env_map - - def _set_env_map(self, key: str, val: str): - self.__env_map[key] = val - - def render_any( - self, val: Any, data: Optional[Mapping[str, Any]] = None - ) -> Any: - if isinstance(val, str): - return self.render_str(val, data) - return val - - def render_float( - self, val: Union[str, float], data: Optional[Mapping[str, Any]] = None - ) -> float: - if isinstance(val, str): - return float(self.render_str(val, data)) - return val - - def render_int( - self, val: Union[str, int], data: Optional[Mapping[str, Any]] = None - ) -> int: - if isinstance(val, str): - return int(self.render_str(val, data)) - return val - - def render_bool( - self, val: Union[str, bool], data: Optional[Mapping[str, Any]] = None - ) -> bool: - if isinstance(val, str): - return to_boolean(self.render_str(val, data)) - return val - - def render_str( - self, val: str, data: Optional[Mapping[str, Any]] = None - ) -> str: - if val in self.__rendered_str: - return self.__rendered_str[val] - if not is_probably_jinja(val): - return val - template = jinja2.Template(val) - render_data = self._get_render_data(additional_data=data) - try: - rendered_text = template.render(render_data) - except Exception: - raise Exception(f'Fail to render "{val}" with data: {render_data}') - self.__rendered_str[val] = rendered_text - return rendered_text - - def render_file( - self, location: str, data: Optional[Mapping[str, Any]] = None - ) -> str: - location_dir = os.path.dirname(location) - env = jinja2.Environment( - loader=AnyExtensionFileSystemLoader([location_dir]) - ) - template = env.get_template(location) - render_data = self._get_render_data(additional_data=data) - render_data['TEMPLATE_DIR'] = location_dir - rendered_text = template.render(render_data) - return rendered_text - - def _get_render_data( - self, additional_data: Optional[Mapping[str, Any]] = None - ) -> Mapping[str, Any]: - self._ensure_cached_render_data() - if additional_data is None: - return self.__render_data - return {**self.__render_data, **additional_data} - - def _ensure_cached_render_data(self): - if self.__render_data is not None: - return self.__render_data - render_data = dict(DEFAULT_RENDER_DATA) - render_data.update({ - 'env': self.__env_map, - 'input': self.__input_map, - }) - self.__render_data = render_data - return render_data - - -@typechecked -class TaskModelWithPrinterAndTracker( - CommonTaskModel, PidModel, TimeTracker -): - def __init__( - self, - name: str, - group: Optional[Group] = None, - description: str = '', - inputs: List[AnyInput] = [], - envs: Iterable[Env] = [], - env_files: Iterable[EnvFile] = [], - icon: Optional[str] = None, - color: Optional[str] = None, - retry: int = 2, - retry_interval: Union[int, float] = 1, - upstreams: Iterable[AnyTask] = [], - checkers: Iterable[AnyTask] = [], - checking_interval: Union[int, float] = 0, - run: Optional[Callable[..., Any]] = None, - should_execute: Union[bool, str, Callable[..., bool]] = True - ): - self._rjust_full_cmd_name: Optional[str] = None - self._has_cli_interface = False - self._complete_name: Optional[str] = None - CommonTaskModel.__init__( - self, - name=name, - group=group, - description=description, - inputs=inputs, - envs=envs, - env_files=env_files, - icon=icon, - color=color, - retry=retry, - retry_interval=retry_interval, - upstreams=upstreams, - checkers=checkers, - checking_interval=checking_interval, - run=run, - should_execute=should_execute, - ) - PidModel.__init__(self) - TimeTracker.__init__(self) - - def log_debug(self, message: Any): - prefix = self._get_log_prefix() - colored_message = colored( - f'{prefix} • {message}', attrs=['dark'] - ) - logger.debug(colored_message) - - def log_warn(self, message: Any): - prefix = self._get_log_prefix() - colored_message = colored( - f'{prefix} • {message}', attrs=['dark'] - ) - logger.warning(colored_message) - - def log_info(self, message: Any): - prefix = self._get_log_prefix() - colored_message = colored( - f'{prefix} • {message}', attrs=['dark'] - ) - logger.info(colored_message) - - def log_error(self, message: Any): - prefix = self._get_log_prefix() - colored_message = colored( - f'{prefix} • {message}', color='red', attrs=['bold'] - ) - logger.error(colored_message, exc_info=True) - - def log_critical(self, message: Any): - prefix = self._get_log_prefix() - colored_message = colored( - f'{prefix} • {message}', color='red', attrs=['bold'] - ) - logger.critical(colored_message, exc_info=True) - - def print_out(self, message: Any, trim_message: bool = True): - prefix = self._get_colored_print_prefix() - message_str = f'{message}'.rstrip() if trim_message else f'{message}' - print(f'🤖 ○ {prefix} • {message_str}', file=sys.stderr) - sys.stderr.flush() - - def print_err(self, message: Any, trim_message: bool = True): - prefix = self._get_colored_print_prefix() - message_str = f'{message}'.rstrip() if trim_message else f'{message}' - print(f'🤖 △ {prefix} • {message_str}', file=sys.stderr) - sys.stderr.flush() - - def print_out_dark(self, message: Any, trim_message: bool = True): - message_str = f'{message}' - self.print_out(colored(message_str, attrs=['dark']), trim_message) - - def _play_bell(self): - print('\a', end='', file=sys.stderr) - - def _get_colored_print_prefix(self) -> str: - return self._get_colored(self._get_print_prefix()) - - def _get_colored(self, text: str) -> str: - return colored(text, color=self.get_color()) - - def _get_print_prefix(self) -> str: - common_prefix = self._get_common_prefix(show_time=show_time) - icon = self.get_icon() - rjust_cmd_name = self._get_rjust_full_cmd_name() - return f'{common_prefix} {icon} {rjust_cmd_name}' - - def _get_log_prefix(self) -> str: - common_prefix = self._get_common_prefix(show_time=False) - icon = self.get_icon() - filled_name = self._get_rjust_full_cmd_name() - return f'{common_prefix} {icon} {filled_name}' - - def _get_common_prefix(self, show_time: bool) -> str: - attempt = self._get_attempt() - max_attempt = self._get_max_attempt() - pid = str(self._get_task_pid()).rjust(6) - if show_time: - now = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3] - return f'◷ {now} ❁ {pid} → {attempt}/{max_attempt}' - return f'❁ {pid} → {attempt}/{max_attempt}' - - def _get_rjust_full_cmd_name(self) -> str: - if self._rjust_full_cmd_name is not None: - return self._rjust_full_cmd_name - complete_name = self._get_full_cmd_name() - self._rjust_full_cmd_name = complete_name.rjust(LOG_NAME_LENGTH, ' ') - return self._rjust_full_cmd_name - - def _get_full_cmd_name(self) -> str: - if self._complete_name is not None: - return self._complete_name - executable_prefix = '' - if self._has_cli_interface: - executable_prefix += self._get_executable_name() + ' ' - cmd_name = self.get_cmd_name() - if self._group is None: - self._complete_name = f'{executable_prefix}{cmd_name}' - return self._complete_name - group_cmd_name = self._group.get_complete_name() - self._complete_name = f'{executable_prefix}{group_cmd_name} {cmd_name}' - return self._complete_name - - def _get_executable_name(self) -> str: - if len(sys.argv) > 0 and sys.argv[0] != '': - return os.path.basename(sys.argv[0]) - return 'zrb' - - def _set_has_cli_interface(self): - self._has_cli_interface = True diff --git a/src/zrb/task/checker.py b/src/zrb/task/checker.py index 0b00f07c..ca7c8f0b 100644 --- a/src/zrb/task/checker.py +++ b/src/zrb/task/checker.py @@ -1,6 +1,6 @@ from zrb.helper.typing import Any, Callable, Iterable, Optional, Union from zrb.helper.typecheck import typechecked -from zrb.task.base_task import BaseTask +from zrb.task.base_task.base_task import BaseTask from zrb.task.any_task import AnyTask from zrb.task.any_task_event_handler import ( OnTriggered, OnWaiting, OnSkipped, OnStarted, OnReady, OnRetry, OnFailed diff --git a/src/zrb/task/cmd_task.py b/src/zrb/task/cmd_task.py index 375988bb..4e413327 100644 --- a/src/zrb/task/cmd_task.py +++ b/src/zrb/task/cmd_task.py @@ -2,11 +2,12 @@ Any, Callable, Iterable, List, Optional, Union, TypeVar ) from zrb.helper.typecheck import typechecked +from zrb.helper.string.conversion import to_variable_name from zrb.task.any_task import AnyTask from zrb.task.any_task_event_handler import ( OnTriggered, OnWaiting, OnSkipped, OnStarted, OnReady, OnRetry, OnFailed ) -from zrb.task.base_task import BaseTask +from zrb.task.base_task.base_task import BaseTask from zrb.task_env.env import Env from zrb.task_env.env_file import EnvFile from zrb.task_group.group import Group @@ -150,7 +151,7 @@ def __init__( max_error_line = max_error_line if max_error_line > 0 else 1 self._cmd = cmd self._cmd_path = cmd_path - self._set_cwd(cwd) + self.__set_cwd(cwd) self._max_output_size = max_output_line self._max_error_size = max_error_line self._output_buffer: Iterable[str] = [] @@ -164,7 +165,10 @@ def __init__( def copy(self) -> TCmdTask: return super().copy() - def _set_cwd( + def set_cwd(self, cwd: Union[str, pathlib.Path]): + self.__set_cwd(cwd) + + def __set_cwd( self, cwd: Optional[Union[str, pathlib.Path]] ): if cwd is None: @@ -192,7 +196,7 @@ def inject_envs(self): super().inject_envs() input_map = self.get_input_map() for task_input in self._get_combined_inputs(): - input_key = self._get_normalized_input_key(task_input.get_name()) + input_key = to_variable_name(task_input.get_name()) input_value = input_map.get(input_key) env_name = '_INPUT_' + input_key.upper() should_render = task_input.should_render() @@ -205,7 +209,7 @@ def inject_envs(self): async def run(self, *args: Any, **kwargs: Any) -> CmdResult: cmd = self.get_cmd_script(*args, **kwargs) - self.print_out_dark('Run script: ' + self._get_multiline_repr(cmd)) + self.print_out_dark('Run script: ' + self.__get_multiline_repr(cmd)) self.print_out_dark('Working directory: ' + self._cwd) self._output_buffer = [] self._error_buffer = [] @@ -225,14 +229,14 @@ async def run(self, *args: Any, **kwargs: Any) -> CmdResult: self._pids.append(process.pid) self._process = process try: - signal.signal(signal.SIGINT, self._on_kill) - signal.signal(signal.SIGTERM, self._on_kill) + signal.signal(signal.SIGINT, self.__on_kill) + signal.signal(signal.SIGTERM, self.__on_kill) except Exception as e: self.print_err(e) - atexit.register(self._on_exit) - await self._wait_process(process) + atexit.register(self.__on_exit) + await self.__wait_process(process) self.log_info('Process completed') - atexit.unregister(self._on_exit) + atexit.unregister(self.__on_exit) output = '\n'.join(self._output_buffer) error = '\n'.join(self._error_buffer) # get return code @@ -254,36 +258,42 @@ def _is_last_attempt(self) -> bool: return True return super()._is_last_attempt() - def _on_kill(self, signum: Any, frame: Any): + def __on_kill(self, signum: Any, frame: Any): self._global_state.no_more_attempt = True self._global_state.is_killed_by_signal = True self.print_out_dark(f'Getting signal {signum}') for pid in self._pids: - self._kill_by_pid(pid) - self.print_out_dark(f'Exiting with signal {signum}') + self.__kill_by_pid(pid) + tasks = asyncio.all_tasks() + for task in tasks: + try: + task.cancel() + except Exception as e: + self.print_err(e) time.sleep(0.3) + self.print_out_dark(f'Exiting with signal {signum}') sys.exit(signum) - def _on_exit(self): + def __on_exit(self): self._global_state.no_more_attempt = True - self._kill_by_pid(self._process.pid) + self.__kill_by_pid(self._process.pid) - def _kill_by_pid(self, pid: int): + def __kill_by_pid(self, pid: int): ''' Kill a pid, gracefully ''' try: process_ever_exists = False - if self._is_process_exist(pid): + if self.__is_process_exist(pid): process_ever_exists = True self.print_out_dark(f'Send SIGTERM to process {pid}') os.killpg(os.getpgid(pid), signal.SIGTERM) time.sleep(0.3) - if self._is_process_exist(pid): + if self.__is_process_exist(pid): self.print_out_dark(f'Send SIGINT to process {pid}') os.killpg(os.getpgid(pid), signal.SIGINT) time.sleep(0.3) - if self._is_process_exist(pid): + if self.__is_process_exist(pid): self.print_out_dark(f'Send SIGKILL to process {pid}') os.killpg(os.getpgid(pid), signal.SIGKILL) if process_ever_exists: @@ -291,30 +301,30 @@ def _kill_by_pid(self, pid: int): except Exception: self.log_error(f'Cannot kill process {pid}') - def _is_process_exist(self, pid: int) -> bool: + def __is_process_exist(self, pid: int) -> bool: try: os.killpg(os.getpgid(pid), 0) return True except ProcessLookupError: return False - async def _wait_process(self, process: asyncio.subprocess.Process): + async def __wait_process(self, process: asyncio.subprocess.Process): # Create queue stdout_queue = asyncio.Queue() stderr_queue = asyncio.Queue() # Read from streams and put into queue - stdout_process = asyncio.create_task(self._queue_stream( + stdout_process = asyncio.create_task(self.__queue_stream( process.stdout, stdout_queue )) - stderr_process = asyncio.create_task(self._queue_stream( + stderr_process = asyncio.create_task(self.__queue_stream( process.stderr, stderr_queue )) # Handle messages in queue - stdout_log_process = asyncio.create_task(self._log_from_queue( + stdout_log_process = asyncio.create_task(self.__log_from_queue( stdout_queue, self.print_out, self._output_buffer, self._max_output_size )) - stderr_log_process = asyncio.create_task(self._log_from_queue( + stderr_log_process = asyncio.create_task(self.__log_from_queue( stderr_queue, self.print_err, self._error_buffer, self._max_error_size )) @@ -338,13 +348,13 @@ def _create_cmd_script( ) -> str: if not isinstance(cmd_path, str) or cmd_path != '': if callable(cmd_path): - return self._get_rendered_cmd_path(cmd_path(*args, **kwargs)) - return self._get_rendered_cmd_path(cmd_path) + return self.__get_rendered_cmd_path(cmd_path(*args, **kwargs)) + return self.__get_rendered_cmd_path(cmd_path) if callable(cmd): - return self._get_rendered_cmd(cmd(*args, **kwargs)) - return self._get_rendered_cmd(cmd) + return self.__get_rendered_cmd(cmd(*args, **kwargs)) + return self.__get_rendered_cmd(cmd) - def _get_rendered_cmd_path( + def __get_rendered_cmd_path( self, cmd_path: Union[str, Iterable[str]] ) -> str: if isinstance(cmd_path, str): @@ -354,12 +364,12 @@ def _get_rendered_cmd_path( for cmd_path_str in cmd_path ]) - def _get_rendered_cmd(self, cmd: Union[str, Iterable[str]]) -> str: + def __get_rendered_cmd(self, cmd: Union[str, Iterable[str]]) -> str: if isinstance(cmd, str): return self.render_str(cmd) return self.render_str('\n'.join(list(cmd))) - async def _queue_stream(self, stream, queue: asyncio.Queue): + async def __queue_stream(self, stream, queue: asyncio.Queue): while True: try: line = await stream.readline() @@ -369,7 +379,7 @@ async def _queue_stream(self, stream, queue: asyncio.Queue): break await queue.put(line) - async def _log_from_queue( + async def __log_from_queue( self, queue: asyncio.Queue, print_log: Callable[[str], None], @@ -381,17 +391,27 @@ async def _log_from_queue( if not line: break line_str = line.decode('utf-8').rstrip() - self._add_to_buffer(buffer, max_size, line_str) + self.__add_to_buffer(buffer, max_size, line_str) _reset_stty() print_log(line_str) _reset_stty() - def _add_to_buffer( + def __add_to_buffer( self, buffer: Iterable[str], max_size: int, new_line: str ): if len(buffer) >= max_size: buffer.pop(0) buffer.append(new_line) - + + def __get_multiline_repr(self, text: str) -> str: + lines_repr: Iterable[str] = [] + lines = text.split('\n') + if len(lines) == 1: + return lines[0] + for index, line in enumerate(lines): + line_number_repr = str(index + 1).rjust(4, '0') + lines_repr.append(f' {line_number_repr} | {line}') + return '\n' + '\n'.join(lines_repr) + def __repr__(self) -> str: return f'' diff --git a/src/zrb/task/docker_compose_task.py b/src/zrb/task/docker_compose_task.py index f70d3060..c2eaa066 100644 --- a/src/zrb/task/docker_compose_task.py +++ b/src/zrb/task/docker_compose_task.py @@ -145,10 +145,10 @@ def __init__( self._compose_flags = compose_flags self._compose_args = compose_args self._compose_env_prefix = compose_env_prefix - self._compose_template_file = self._get_compose_template_file( + self._compose_template_file = self.__get_compose_template_file( compose_file ) - self._compose_runtime_file = self._get_compose_runtime_file( + self._compose_runtime_file = self.__get_compose_runtime_file( self._compose_template_file ) # Flag to make mark whether service config and compose environments @@ -160,7 +160,7 @@ def copy(self) -> TDockerComposeTask: return super().copy() async def run(self, *args, **kwargs: Any) -> CmdResult: - self._generate_compose_runtime_file() + self.__generate_compose_runtime_file() try: result = await super().run(*args, **kwargs) finally: @@ -192,7 +192,7 @@ def inject_env_files(self): for _, service_config in self._compose_service_configs.items(): self.insert_env_file(*service_config.get_env_files()) - def _generate_compose_runtime_file(self): + def __generate_compose_runtime_file(self): compose_data = read_compose_file(self._compose_template_file) for service, service_config in self._compose_service_configs.items(): envs: List[Env] = [] @@ -200,10 +200,12 @@ def _generate_compose_runtime_file(self): for env_file in env_files: envs += env_file.get_envs() envs += service_config.get_envs() - compose_data = self._apply_service_env(compose_data, service, envs) + compose_data = self.__apply_service_env( + compose_data, service, envs + ) write_compose_file(self._compose_runtime_file, compose_data) - def _apply_service_env( + def __apply_service_env( self, compose_data: Any, service: str, envs: List[Env] ) -> Any: # service not found @@ -213,12 +215,13 @@ def _apply_service_env( # service has no environment definition if 'environment' not in compose_data['services'][service]: compose_data['services'][service]['environment'] = { - env.name: self._get_env_compose_value(env) for env in envs + env.get_name(): self.__get_env_compose_value(env) + for env in envs } return compose_data # service environment is a map if isinstance(compose_data['services'][service]['environment'], dict): - new_env_map = self._get_service_new_env_map( + new_env_map = self.__get_service_new_env_map( compose_data['services'][service]['environment'], envs ) for key, value in new_env_map.items(): @@ -226,43 +229,44 @@ def _apply_service_env( return compose_data # service environment is a list if isinstance(compose_data['services'][service]['environment'], list): - new_env_list = self._get_service_new_env_list( + new_env_list = self.__get_service_new_env_list( compose_data['services'][service]['environment'], envs ) compose_data['services'][service]['environment'] += new_env_list return compose_data return compose_data - def _get_service_new_env_map( + def __get_service_new_env_map( self, service_env_map: Mapping[str, str], new_envs: List[Env] ) -> Mapping[str, str]: new_service_envs: Mapping[str, str] = {} for env in new_envs: - if env.name in service_env_map: + env_name = env.get_name() + if env_name in service_env_map: continue - new_service_envs[env.name] = self._get_env_compose_value(env) + new_service_envs[env_name] = self.__get_env_compose_value(env) return new_service_envs - def _get_service_new_env_list( + def __get_service_new_env_list( self, service_env_list: List[str], new_envs: List[Env] ) -> List[str]: new_service_envs: List[str] = [] for env in new_envs: should_be_added = 0 == len([ service_env for service_env in service_env_list - if service_env.startswith(env.name + '=') + if service_env.startswith(env.get_name() + '=') ]) if not should_be_added: continue new_service_envs.append( - env.name + '=' + self._get_env_compose_value(env) + env.get_name() + '=' + self.__get_env_compose_value(env) ) return new_service_envs - def _get_env_compose_value(self, env: Env) -> str: - return '${' + env.name + ':-' + env.default + '}' + def __get_env_compose_value(self, env: Env) -> str: + return '${' + env.get_name() + ':-' + env.get_default() + '}' - def _get_compose_runtime_file(self, compose_file_name: str) -> str: + def __get_compose_runtime_file(self, compose_file_name: str) -> str: directory, file = os.path.split(compose_file_name) prefix = '_' if file.startswith('.') else '._' runtime_prefix = self.get_cmd_name() @@ -281,7 +285,7 @@ def _get_compose_runtime_file(self, compose_file_name: str) -> str: runtime_file_name = prefix + file + runtime_prefix return os.path.join(directory, runtime_file_name) - def _get_compose_template_file(self, compose_file: Optional[str]) -> str: + def __get_compose_template_file(self, compose_file: Optional[str]) -> str: if compose_file is None: for _compose_file in [ 'compose.yml', 'compose.yaml', diff --git a/src/zrb/task/flow_task.py b/src/zrb/task/flow_task.py index 601f2e2b..21038c2c 100644 --- a/src/zrb/task/flow_task.py +++ b/src/zrb/task/flow_task.py @@ -2,7 +2,7 @@ Callable, Iterable, List, Optional, TypeVar, Union ) from zrb.helper.typecheck import typechecked -from zrb.task.base_task import BaseTask +from zrb.task.base_task.base_task import BaseTask from zrb.task.any_task import AnyTask from zrb.task.any_task_event_handler import ( OnTriggered, OnWaiting, OnSkipped, OnStarted, OnReady, OnRetry, OnFailed diff --git a/src/zrb/task/recurring_task.py b/src/zrb/task/recurring_task.py index 729068c2..2810da5a 100644 --- a/src/zrb/task/recurring_task.py +++ b/src/zrb/task/recurring_task.py @@ -2,7 +2,7 @@ Any, Callable, Iterable, Mapping, Optional, Union ) from zrb.helper.typecheck import typechecked -from zrb.task.base_task import BaseTask +from zrb.task.base_task.base_task import BaseTask from zrb.task.any_task import AnyTask from zrb.task.any_task_event_handler import ( OnTriggered, OnWaiting, OnSkipped, OnStarted, OnReady, OnRetry, OnFailed diff --git a/src/zrb/task/resource_maker.py b/src/zrb/task/resource_maker.py index daad203d..e95382df 100644 --- a/src/zrb/task/resource_maker.py +++ b/src/zrb/task/resource_maker.py @@ -2,7 +2,7 @@ Any, Callable, Iterable, Mapping, Optional, Union, TypeVar ) from zrb.helper.typecheck import typechecked -from zrb.task.base_task import BaseTask +from zrb.task.base_task.base_task import BaseTask from zrb.task.any_task import AnyTask from zrb.task.any_task_event_handler import ( OnTriggered, OnWaiting, OnSkipped, OnStarted, OnReady, OnRetry, OnFailed diff --git a/src/zrb/task/task.py b/src/zrb/task/task.py index 901dfa00..b1b4f522 100644 --- a/src/zrb/task/task.py +++ b/src/zrb/task/task.py @@ -1,4 +1,4 @@ -from zrb.task.base_task import BaseTask +from zrb.task.base_task.base_task import BaseTask from zrb.helper.typecheck import typechecked diff --git a/src/zrb/task_env/env.py b/src/zrb/task_env/env.py index 02a16679..58f826d7 100644 --- a/src/zrb/task_env/env.py +++ b/src/zrb/task_env/env.py @@ -19,10 +19,22 @@ def __init__( ): if name in RESERVED_ENV_NAMES: raise ValueError(f'Forbidden input name: {name}') - self.name: str = name - self.os_name: str = os_name if os_name is not None else name - self.default: str = default - self.should_render: bool = should_render + self.__name: str = name + self.__os_name: str = os_name if os_name is not None else name + self.__default: str = default + self.__should_render: bool = should_render + + def get_name(self) -> str: + return self.__name + + def get_os_name(self) -> str: + return self.__os_name + + def get_default(self) -> str: + return self.__default + + def should_render(self) -> bool: + return self.__should_render def get(self, prefix: str = '') -> str: ''' @@ -41,22 +53,22 @@ def get(self, prefix: str = '') -> str: print(env.get('STAG')) # will show '0.0.0.0' ``` ''' - if self.os_name == '': - return self.default - prefixed_name = self._get_prefixed_name(self.os_name, prefix) + if self.__os_name == '': + return self.__default + prefixed_name = self.__get_prefixed_name(self.__os_name, prefix) if prefixed_name in os.environ and os.environ[prefixed_name] != '': return os.environ[prefixed_name] - if self.os_name in os.environ and os.environ[self.os_name] != '': - return os.environ[self.os_name] - return self.default + if self.__os_name in os.environ and os.environ[self.__os_name] != '': + return os.environ[self.__os_name] + return self.__default - def _get_prefixed_name(self, name: str, prefix: str): + def __get_prefixed_name(self, name: str, prefix: str): if prefix is None or prefix == '': return name return prefix + '_' + name def __repr__(self) -> str: - name = self.name - os_name = self.os_name - default = self.default + name = self.__name + os_name = self.__os_name + default = self.__default return f'' diff --git a/src/zrb/task_env/env_file.py b/src/zrb/task_env/env_file.py index 8ba48d34..919e62e7 100644 --- a/src/zrb/task_env/env_file.py +++ b/src/zrb/task_env/env_file.py @@ -14,34 +14,34 @@ def __init__( prefix: Optional[str] = None, should_render: bool = False ): - self.env_file = env_file - self.prefix = prefix.upper() if prefix is not None else None - self.should_render = should_render - self._env_list: List[Env] = [] - self._env_list_fetched: bool = False + self.__env_file = env_file + self.__prefix = prefix.upper() if prefix is not None else None + self.__should_render = should_render + self.__env_list: List[Env] = [] + self.__env_list_fetched: bool = False def get_envs(self) -> List[Env]: - if self._env_list_fetched: - return self._env_list + if self.__env_list_fetched: + return self.__env_list env_list: List[Env] = [] - env_map = dotenv_values(self.env_file) + env_map = dotenv_values(self.__env_file) for key, value in env_map.items(): if key in RESERVED_ENV_NAMES: continue os_name: Optional[str] = None - if self.prefix is not None and self.prefix != '': - os_name = f'{self.prefix}_{key}' + if self.__prefix is not None and self.__prefix != '': + os_name = f'{self.__prefix}_{key}' env_list.append(Env( name=key, os_name=os_name, default=value, - should_render=self.should_render + should_render=self.__should_render )) - self._env_list = env_list - self._env_list_fetched = True + self.__env_list = env_list + self.__env_list_fetched = True return env_list def __repr__(self) -> str: - env_file = self.env_file - prefix = self.prefix + env_file = self.__env_file + prefix = self.__prefix return f'' diff --git a/src/zrb/task_input/base_input.py b/src/zrb/task_input/base_input.py index 8bb9e578..e95e0ae7 100644 --- a/src/zrb/task_input/base_input.py +++ b/src/zrb/task_input/base_input.py @@ -52,7 +52,7 @@ def __init__( self._show_choices = show_choices self._show_envvar = show_envvar self._nargs = nargs - self._should_render = should_render + self.__should_render = should_render def get_name(self) -> str: return self._name @@ -90,7 +90,7 @@ def get_options(self) -> Mapping[str, Any]: return options def should_render(self) -> bool: - return self._should_render + return self.__should_render def is_hidden(self) -> bool: return False diff --git a/test/task/task_copy/env_file.env b/test/task/task_copy/env_file.env new file mode 100644 index 00000000..3e1478e8 --- /dev/null +++ b/test/task/task_copy/env_file.env @@ -0,0 +1 @@ +HOST=localhost \ No newline at end of file diff --git a/test/task/task_copy/new_env_file.env b/test/task/task_copy/new_env_file.env new file mode 100644 index 00000000..69093ab2 --- /dev/null +++ b/test/task/task_copy/new_env_file.env @@ -0,0 +1 @@ +HOST=stalchmst.com \ No newline at end of file diff --git a/test/task/task_copy/test_task_copy.py b/test/task/task_copy/test_task_copy.py new file mode 100644 index 00000000..96a6e928 --- /dev/null +++ b/test/task/task_copy/test_task_copy.py @@ -0,0 +1,105 @@ +from zrb.task.any_task import AnyTask +from zrb.task.task import Task +from zrb.task.cmd_task import CmdTask +from zrb.task_input.str_input import StrInput +from zrb.task_env.env import Env +from zrb.task_env.env_file import EnvFile + +import os + +CURRENT_DIR = os.path.dirname(__file__) + + +def test_task_copy(): + def _run(*args, **kwargs) -> str: + name = kwargs.get('name') + task: AnyTask = kwargs.get('_task') + env_map = task.get_env_map() + environment = env_map.get('ENVIRONMENT') + host = env_map.get('HOST') + return f'hello {name} on {environment}, host: {host}' + task = Task( + name='task', + inputs=[ + StrInput(name='name', default='Nicholas Flamel') + ], + envs=[ + Env(name='ENVIRONMENT', default='dev') + ], + env_files=[ + EnvFile(env_file=os.path.join(CURRENT_DIR, 'env_file.env')) + ], + run=_run, + retry=0 + ) + new_task: Task = task.copy() + new_task.set_name('new-task') + assert task.get_cmd_name() == 'task' + assert new_task.get_cmd_name() == 'new-task' + assert task.get_description() == 'task' + assert new_task.get_description() == 'new-task' + new_task.set_description('new description') + assert task.get_cmd_name() == 'task' + assert new_task.get_cmd_name() == 'new-task' + assert task.get_description() == 'task' + assert new_task.get_description() == 'new description' + new_task.set_icon('🔥') + assert new_task.get_icon() == '🔥' + new_task.set_color('green') + assert new_task.get_color() == 'green' + new_task.set_retry(1) + assert new_task._retry == 1 + new_task.add_input(StrInput(name='name', default='Dumbledore')) + new_task.add_env(Env(name='ENVIRONMENT', default='prod')) + new_task.add_env_file( + EnvFile(env_file=os.path.join(CURRENT_DIR, 'new_env_file.env')) + ) + function = new_task.to_function() + result = function() + assert result == 'hello Dumbledore on prod, host: stalchmst.com' + + +def test_cmd_task_copy(): + task = CmdTask( + name='task', + inputs=[ + StrInput(name='name', default='Nicholas Flamel') + ], + envs=[ + Env(name='ENVIRONMENT', default='dev') + ], + env_files=[ + EnvFile(env_file=os.path.join(CURRENT_DIR, 'env_file.env')) + ], + cwd=CURRENT_DIR, + cmd='echo hello $_INPUT_NAME on $ENVIRONMENT, host: $HOST', + retry=0 + ) + new_task: CmdTask = task.copy() + new_task.set_name('new-task') + assert task.get_cmd_name() == 'task' + assert new_task.get_cmd_name() == 'new-task' + assert task.get_description() == 'task' + assert new_task.get_description() == 'new-task' + new_task.set_description('new description') + assert task.get_cmd_name() == 'task' + assert new_task.get_cmd_name() == 'new-task' + assert task.get_description() == 'task' + assert new_task.get_description() == 'new description' + new_task.set_icon('🔥') + assert new_task.get_icon() == '🔥' + new_task.set_color('green') + assert new_task.get_color() == 'green' + new_task.set_retry(1) + assert new_task._retry == 1 + new_task.set_cwd(os.path.dirname(CURRENT_DIR)) + assert task._cwd == CURRENT_DIR + assert new_task._cwd == os.path.dirname(CURRENT_DIR) + new_task.add_input(StrInput(name='name', default='Dumbledore')) + new_task.add_env(Env(name='ENVIRONMENT', default='prod')) + new_task.add_env_file( + EnvFile(env_file=os.path.join(CURRENT_DIR, 'new_env_file.env')) + ) + function = new_task.to_function() + result = function() + assert result.output == 'hello Dumbledore on prod, host: stalchmst.com' diff --git a/test/task/test_flow_task.py b/test/task/test_flow_task.py index 50602b56..0e79f527 100644 --- a/test/task/test_flow_task.py +++ b/test/task/test_flow_task.py @@ -81,4 +81,3 @@ def create_salt(*args, **kwargs): assert 'Na' in outputs assert 'Cl' in outputs assert outputs[3] == 'NaCl' - diff --git a/test/task/test_task.py b/test/task/test_task.py index 15050da3..0391128c 100644 --- a/test/task/test_task.py +++ b/test/task/test_task.py @@ -1,8 +1,6 @@ from zrb.task.task import Task -from zrb.task_env.env import Env -from zrb.task_env.env_file import EnvFile from zrb.task_input.str_input import StrInput -import os +import asyncio def test_task_with_no_runner(): @@ -28,7 +26,7 @@ def _run(*args, **kwargs) -> str: assert result == 'hello' -def test_task_with_should_execute_equal_to_false(): +def test_should_not_executed_task(): def _run(*args, **kwargs) -> str: return 'hello' task = Task( @@ -42,253 +40,65 @@ def _run(*args, **kwargs) -> str: assert result is None -def test_task_with_input(): - def _run(*args, **kwargs) -> str: - name = kwargs['name'] - favorite_drink = kwargs['favorite_drink'] - return f'hello {name}, your favorite drink is {favorite_drink}' - task = Task( - name='hello-name', - inputs=[ - StrInput(name='name'), - StrInput(name='favorite-drink') - ], - run=_run, - retry=0 - ) - function = task.to_function() - result = function(name='Dumbledore', favorite_drink='Elixir') - assert result == 'hello Dumbledore, your favorite drink is Elixir' - - -def test_task_with_default_input(): - def _run(*args, **kwargs) -> str: - name = kwargs['name'] - favorite_drink = kwargs['favorite_drink'] - return f'hello {name}, your favorite drink is {favorite_drink}' - task = Task( - name='hello-name', - inputs=[ - StrInput(name='name', default='Nicholas Flamel'), - StrInput(name='favorite-drink', default='Elixir') - ], - run=_run, - retry=0 - ) - function = task.to_function() - result = function() - assert result == 'hello Nicholas Flamel, your favorite drink is Elixir' - - -def test_task_with_templated_input_without_kwarg(): - def _run(*args, **kwargs) -> str: - name = kwargs['name'] - alias = kwargs['alias'] - return f'hello {name}, aka {alias}' - task = Task( - name='hello-name', - inputs=[ - StrInput(name='name', default='Nicholas Flamel'), - StrInput(name='alias', default='{{input.name}}') - ], - run=_run, - retry=0 - ) - function = task.to_function() - result = function() - assert result == 'hello Nicholas Flamel, aka Nicholas Flamel' - - -def test_task_with_templated_input_and_partial_kwarg(): - def _run(*args, **kwargs) -> str: - name = kwargs['name'] - alias = kwargs['alias'] - return f'hello {name}, aka {alias}' - task = Task( - name='hello-name', - inputs=[ - StrInput(name='name', default='Nicholas Flamel'), - StrInput(name='alias', default='{{input.name}}') - ], - run=_run, - retry=0 - ) - function = task.to_function() - result = function(name='Alchemist') - assert result == 'hello Alchemist, aka Alchemist' - - -def test_task_with_templated_input(): - def _run(*args, **kwargs) -> str: - name = kwargs['name'] - alias = kwargs['alias'] - return f'hello {name}, aka {alias}' - task = Task( - name='hello-name', - inputs=[ - StrInput(name='name', default='Nicholas Flamel'), - StrInput(name='alias', default='{{input.name}}') - ], - run=_run, - retry=0 +def test_task_returning_upstream_result(): + task_upstream_1 = Task( + name='task-upstream-1', + run=lambda *args, **kwargs: 'articuno' ) - # Name and alias provided - function = task.to_function() - result = function(name='Nicholas Flamel', alias='Alchemist') - assert result == 'hello Nicholas Flamel, aka Alchemist' - - -def test_task_with_templated_env(): - def _run(*args, **kwargs) -> str: - task: Task = kwargs.get('_task') - env_map = task.get_env_map() - return env_map.get('ZRB_TEST_TASK_ENV_2') - task = Task( - name='templated-env', - envs=[ - Env(name='ZRB_TEST_TASK_ENV_1', default='Elixir'), - Env( - name='ZRB_TEST_TASK_ENV_2', - default='{{env.ZRB_TEST_TASK_ENV_1}} of immortality' - ) - ], - run=_run, - retry=0 + task_upstream_2 = Task( + name='task-upstream-2', + run=lambda *args, **kwargs: 'zapdos' ) - function = task.to_function() - if 'ZRB_TEST_TASK_ENV_1' in os.environ: - del os.environ['ZRB_TEST_TASK_ENV_1'] - if 'ZRB_TEST_TASK_ENV_2' in os.environ: - del os.environ['ZRB_TEST_TASK_ENV_2'] - result = function() - assert result == 'Elixir of immortality' - - -def test_task_with_env_file(): - def _run(*args, **kwargs) -> str: - task: Task = kwargs.get('_task') - env_map = task.get_env_map() - return env_map.get('COLOR') - env_file = os.path.join(os.path.dirname(__file__), 'test_task.env') - task = Task( - name='env-file', - env_files=[EnvFile(env_file=env_file)], - run=_run, - retry=0 + task_upstream_3 = Task( + name='task-upstream-3', + run=lambda *args, **kwargs: 'moltres' ) - function = task.to_function() - result = function() - assert result == 'blue' - - -def test_task_with_env_file_and_prefix(): - def _run(*args, **kwargs) -> str: - task: Task = kwargs.get('_task') - env_map = task.get_env_map() - return env_map.get('COLOR') - env_file = os.path.join(os.path.dirname(__file__), 'test_task.env') task = Task( - name='env-file-prefixed', - env_files=[ - EnvFile( - env_file=env_file, prefix='ZRB_TEST_TASK_WITH_ENV_AND_PREFIX' - ) + name='task', + upstreams=[ + task_upstream_1, task_upstream_2, task_upstream_3 ], - run=_run, - retry=0 + return_upstream_result=True ) function = task.to_function() - if 'ZRB_TEST_TASK_WITH_ENV_AND_PREFIX_COLOR' in os.environ: - del os.environ['ZRB_TEST_TASK_WITH_ENV_AND_PREFIX_COLOR'] - os.environ['ZRB_TEST_TASK_WITH_ENV_AND_PREFIX_COLOR'] = 'red' result = function() - assert result == 'red' - del os.environ['ZRB_TEST_TASK_WITH_ENV_AND_PREFIX_COLOR'] + assert len(result) == 3 + assert 'articuno' in result + assert 'zapdos' in result + assert 'moltres' in result -def test_task_env_with_none_as_os_name(): - ''' - When env exist, it should override env_file - ''' - def _run(*args, **kwargs) -> str: - task: Task = kwargs.get('_task') - env_map = task.get_env_map() - return env_map.get('COLOR') - env_file = os.path.join(os.path.dirname(__file__), 'test_task.env') +def test_task_to_async_function(): task = Task( - name='env-file-prefixed', - env_files=[ - EnvFile(env_file=env_file, prefix='ZRB_TEST_TASK_OS_NAME_NONE') + name='task', + inputs=[ + StrInput(name='name', default='zapdos') ], - envs=[Env(name='COLOR', os_name=None, default='green')], - run=_run, - retry=0 + run=lambda *args, **kwargs: kwargs.get('name') ) - function = task.to_function() - if 'ZRB_TEST_TASK_OS_NAME_NONE_COLOR' in os.environ: - del os.environ['ZRB_TEST_TASK_OS_NAME_NONE_COLOR'] - if 'COLOR' in os.environ: - del os.environ['COLOR'] - os.environ['ZRB_TEST_TASK_OS_NAME_NONE_COLOR'] = 'red' - os.environ['COLOR'] = 'cyan' - result = function() - assert result == 'cyan' - del os.environ['ZRB_TEST_TASK_OS_NAME_NONE_COLOR'] - del os.environ['COLOR'] + function = task.to_function(is_async=True) + result = asyncio.run(function()) + assert result == 'zapdos' -def test_task_env_with_empty_string_as_os_name(): - ''' - When env exist, it should override env_file, - If env has empty os_name, the default value should always be used. - ''' - def _run(*args, **kwargs) -> str: - task: Task = kwargs.get('_task') - env_map = task.get_env_map() - return env_map.get('COLOR') - env_file = os.path.join(os.path.dirname(__file__), 'test_task.env') +def test_task_function_should_accept_kwargs_args(): task = Task( - name='env-file-prefixed', - env_files=[ - EnvFile(env_file=env_file, prefix='ZRB_TEST_TASK_OS_NAME_EMPTY') - ], - envs=[Env(name='COLOR', os_name='', default='green')], - run=_run, - retry=0 + name='task', + run=lambda *args, **kwargs: args[0] ) function = task.to_function() - if 'ZRB_TEST_TASK_OS_NAME_EMPTY_COLOR' in os.environ: - del os.environ['ZRB_TEST_TASK_OS_NAME_EMPTY_COLOR'] - if 'COLOR' in os.environ: - del os.environ['COLOR'] - os.environ['ZRB_TEST_TASK_OS_NAME_EMPTY_COLOR'] = 'red' - os.environ['COLOR'] = 'cyan' - result = function() - assert result == 'green' - del os.environ['ZRB_TEST_TASK_OS_NAME_EMPTY_COLOR'] - del os.environ['COLOR'] + # _args keyword argument should be treated as *args + result = function(_args=['moltres']) + assert result == 'moltres' -def test_task_redeclared_env(): - ''' - When env exist, it should override env_file, - If two env has the same name, the later should override the first one - ''' - def _run(*args, **kwargs) -> str: - task: Task = kwargs.get('_task') - env_map = task.get_env_map() - return env_map.get('COLOR') - env_file = os.path.join(os.path.dirname(__file__), 'test_task.env') +def test_callable_as_task_should_execute_value(): + # should execute should accept executable task = Task( - name='env-file-prefixed', - env_files=[EnvFile(env_file=env_file, prefix='ZRB_TEST_TASK')], - envs=[ - Env(name='COLOR', os_name='', default='green'), - Env(name='COLOR', os_name='', default='yellow') - ], - run=_run, - retry=0 + name='task', + should_execute=lambda *args, **kwargs: True, + run=lambda *args, **kwargs: 'articuno' ) function = task.to_function() result = function() - assert result == 'yellow' + assert result == 'articuno' diff --git a/test/task/test_task_env.py b/test/task/test_task_env.py new file mode 100644 index 00000000..026f6f76 --- /dev/null +++ b/test/task/test_task_env.py @@ -0,0 +1,151 @@ +from zrb.task.task import Task +from zrb.task_env.env import Env +from zrb.task_env.env_file import EnvFile +import os + + +def test_task_env_with_jinja_value(): + def _run(*args, **kwargs) -> str: + task: Task = kwargs.get('_task') + env_map = task.get_env_map() + return env_map.get('ZRB_TEST_TASK_ENV_2') + task = Task( + name='templated-env', + envs=[ + Env(name='ZRB_TEST_TASK_ENV_1', default='Elixir'), + Env( + name='ZRB_TEST_TASK_ENV_2', + default='{{env.ZRB_TEST_TASK_ENV_1}} of immortality' + ) + ], + run=_run, + retry=0 + ) + function = task.to_function() + if 'ZRB_TEST_TASK_ENV_1' in os.environ: + del os.environ['ZRB_TEST_TASK_ENV_1'] + if 'ZRB_TEST_TASK_ENV_2' in os.environ: + del os.environ['ZRB_TEST_TASK_ENV_2'] + result = function() + assert result == 'Elixir of immortality' + + +def test_task_env_with_should_not_be_rendered_jinja_value(): + def _run(*args, **kwargs) -> str: + task: Task = kwargs.get('_task') + env_map = task.get_env_map() + return env_map.get('ZRB_TEST_TASK_ENV_2') + task = Task( + name='templated-env', + envs=[ + Env(name='ZRB_TEST_TASK_ENV_1', default='Elixir'), + Env( + name='ZRB_TEST_TASK_ENV_2', + default='{{env.ZRB_TEST_TASK_ENV_1}} of immortality', + should_render=False + ) + ], + run=_run, + retry=0 + ) + function = task.to_function() + if 'ZRB_TEST_TASK_ENV_1' in os.environ: + del os.environ['ZRB_TEST_TASK_ENV_1'] + if 'ZRB_TEST_TASK_ENV_2' in os.environ: + del os.environ['ZRB_TEST_TASK_ENV_2'] + result = function() + assert result == '{{env.ZRB_TEST_TASK_ENV_1}} of immortality' + + +def test_task_env_with_none_as_os_name(): + ''' + When env exist, it should override env_file + ''' + def _run(*args, **kwargs) -> str: + task: Task = kwargs.get('_task') + env_map = task.get_env_map() + return env_map.get('COLOR') + env_file = os.path.join( + os.path.dirname(__file__), 'test_task_env_file.env' + ) + task = Task( + name='env-file-prefixed', + env_files=[ + EnvFile(env_file=env_file, prefix='ZRB_TEST_TASK_OS_NAME_NONE') + ], + envs=[Env(name='COLOR', os_name=None, default='green')], + run=_run, + retry=0 + ) + function = task.to_function() + if 'ZRB_TEST_TASK_OS_NAME_NONE_COLOR' in os.environ: + del os.environ['ZRB_TEST_TASK_OS_NAME_NONE_COLOR'] + if 'COLOR' in os.environ: + del os.environ['COLOR'] + os.environ['ZRB_TEST_TASK_OS_NAME_NONE_COLOR'] = 'red' + os.environ['COLOR'] = 'cyan' + result = function() + assert result == 'cyan' + del os.environ['ZRB_TEST_TASK_OS_NAME_NONE_COLOR'] + del os.environ['COLOR'] + + +def test_task_env_with_empty_string_as_os_name(): + ''' + When env exist, it should override env_file, + If env has empty os_name, the default value should always be used. + ''' + def _run(*args, **kwargs) -> str: + task: Task = kwargs.get('_task') + env_map = task.get_env_map() + return env_map.get('COLOR') + env_file = os.path.join( + os.path.dirname(__file__), 'test_task_env_file.env' + ) + task = Task( + name='env-file-prefixed', + env_files=[ + EnvFile(env_file=env_file, prefix='ZRB_TEST_TASK_OS_NAME_EMPTY') + ], + envs=[Env(name='COLOR', os_name='', default='green')], + run=_run, + retry=0 + ) + function = task.to_function() + if 'ZRB_TEST_TASK_OS_NAME_EMPTY_COLOR' in os.environ: + del os.environ['ZRB_TEST_TASK_OS_NAME_EMPTY_COLOR'] + if 'COLOR' in os.environ: + del os.environ['COLOR'] + os.environ['ZRB_TEST_TASK_OS_NAME_EMPTY_COLOR'] = 'red' + os.environ['COLOR'] = 'cyan' + result = function() + assert result == 'green' + del os.environ['ZRB_TEST_TASK_OS_NAME_EMPTY_COLOR'] + del os.environ['COLOR'] + + +def test_task_duplicate_env(): + ''' + When env exist, it should override env_file, + If two env has the same name, the later should override the first one + ''' + def _run(*args, **kwargs) -> str: + task: Task = kwargs.get('_task') + env_map = task.get_env_map() + return env_map.get('COLOR') + env_file = os.path.join( + os.path.dirname(__file__), 'test_task_env_file.env' + ) + task = Task( + name='env-file-prefixed', + env_files=[EnvFile(env_file=env_file, prefix='ZRB_TEST_TASK')], + envs=[ + Env(name='COLOR', os_name='', default='green'), + Env(name='COLOR', os_name='', default='yellow') + ], + run=_run, + retry=0 + ) + function = task.to_function() + result = function() + assert result == 'yellow' diff --git a/test/task/test_task.env b/test/task/test_task_env_file.env similarity index 100% rename from test/task/test_task.env rename to test/task/test_task_env_file.env diff --git a/test/task/test_task_env_file.py b/test/task/test_task_env_file.py new file mode 100644 index 00000000..82fdc893 --- /dev/null +++ b/test/task/test_task_env_file.py @@ -0,0 +1,49 @@ +from zrb.task.task import Task +from zrb.task_env.env_file import EnvFile +import os + + +def test_task_with_env_file(): + def _run(*args, **kwargs) -> str: + task: Task = kwargs.get('_task') + env_map = task.get_env_map() + return env_map.get('COLOR') + env_file = os.path.join( + os.path.dirname(__file__), 'test_task_env_file.env' + ) + task = Task( + name='env-file', + env_files=[EnvFile(env_file=env_file)], + run=_run, + retry=0 + ) + function = task.to_function() + result = function() + assert result == 'blue' + + +def test_task_with_env_file_and_prefix(): + def _run(*args, **kwargs) -> str: + task: Task = kwargs.get('_task') + env_map = task.get_env_map() + return env_map.get('COLOR') + env_file = os.path.join( + os.path.dirname(__file__), 'test_task_env_file.env' + ) + task = Task( + name='env-file-prefixed', + env_files=[ + EnvFile( + env_file=env_file, prefix='ZRB_TEST_TASK_WITH_ENV_AND_PREFIX' + ) + ], + run=_run, + retry=0 + ) + function = task.to_function() + if 'ZRB_TEST_TASK_WITH_ENV_AND_PREFIX_COLOR' in os.environ: + del os.environ['ZRB_TEST_TASK_WITH_ENV_AND_PREFIX_COLOR'] + os.environ['ZRB_TEST_TASK_WITH_ENV_AND_PREFIX_COLOR'] = 'red' + result = function() + assert result == 'red' + del os.environ['ZRB_TEST_TASK_WITH_ENV_AND_PREFIX_COLOR'] diff --git a/test/task/test_task_execution_id.py b/test/task/test_task_execution_id.py new file mode 100644 index 00000000..549b109a --- /dev/null +++ b/test/task/test_task_execution_id.py @@ -0,0 +1,36 @@ +from zrb.task.task import Task +from zrb.task.cmd_task import CmdTask + + +def test_task_execution_id_cannot_be_set_twice(): + task = Task( + name='task', + run=lambda *args, **kwargs: kwargs.get('_task').get_execution_id() + ) + task._set_execution_id('execution_id_1') + task._set_execution_id('execution_id_2') + task._set_execution_id('execution_id_3') + function = task.to_function() + result = function() + assert result == 'execution_id_1' + + +def test_consistent_task_upstream_execution_id(): + task_upstream_1 = Task( + name='task-upstream-1', + run=lambda *args, **kwargs: kwargs.get('_task').get_execution_id() + ) + task_upstream_2 = CmdTask( + name='task-upstream-2', + cmd='echo $_ZRB_EXECUTION_ID' + ) + task = Task( + name='task', + upstreams=[ + task_upstream_1, task_upstream_2 + ], + return_upstream_result=True + ) + function = task.to_function() + result = function() + assert result[0] == result[1].output diff --git a/test/task/test_task_input.py b/test/task/test_task_input.py new file mode 100644 index 00000000..c1ddf954 --- /dev/null +++ b/test/task/test_task_input.py @@ -0,0 +1,219 @@ +from zrb.task.task import Task +from zrb.task.cmd_task import CmdTask +from zrb.task_input.str_input import StrInput + + +def test_task_input(): + def _run(*args, **kwargs) -> str: + name = kwargs['name'] + favorite_drink = kwargs['favorite_drink'] + return f'hello {name}, your favorite drink is {favorite_drink}' + task = Task( + name='hello-name', + inputs=[ + StrInput(name='name'), + StrInput(name='favorite-drink') + ], + run=_run, + retry=0 + ) + function = task.to_function() + result = function(name='Dumbledore', favorite_drink='Elixir') + assert result == 'hello Dumbledore, your favorite drink is Elixir' + + +def test_cmd_task_input(): + task = CmdTask( + name='hello-name', + inputs=[ + StrInput(name='name'), + StrInput(name='favorite-drink') + ], + cmd='echo hello $_INPUT_NAME, your favorite drink is $_INPUT_FAVORITE_DRINK', # noqa + retry=0 + ) + function = task.to_function() + result = function(name='Dumbledore', favorite_drink='Elixir') + assert result.output == 'hello Dumbledore, your favorite drink is Elixir' + + +def test_task_input_with_default_value(): + def _run(*args, **kwargs) -> str: + name = kwargs['name'] + favorite_drink = kwargs['favorite_drink'] + return f'hello {name}, your favorite drink is {favorite_drink}' + task = Task( + name='hello-name', + inputs=[ + StrInput(name='name', default='Nicholas Flamel'), + StrInput(name='favorite-drink', default='Elixir') + ], + run=_run, + retry=0 + ) + function = task.to_function() + result = function() + assert result == 'hello Nicholas Flamel, your favorite drink is Elixir' + + +def test_cmd_task_input_with_default_value(): + task = CmdTask( + name='hello-name', + inputs=[ + StrInput(name='name', default='Nicholas Flamel'), + StrInput(name='favorite-drink', default='Elixir') + ], + cmd='echo hello $_INPUT_NAME, your favorite drink is $_INPUT_FAVORITE_DRINK', # noqa + retry=0 + ) + function = task.to_function() + result = function() + assert result.output == 'hello Nicholas Flamel, your favorite drink is Elixir' # noqa + + +def test_task_input_with_jinja_value(): + def _run(*args, **kwargs) -> str: + name = kwargs['name'] + alias = kwargs['alias'] + return f'hello {name}, aka {alias}' + task = Task( + name='hello-name', + inputs=[ + StrInput(name='name', default='Nicholas Flamel'), + StrInput(name='alias', default='{{input.name}}') + ], + run=_run, + retry=0 + ) + function = task.to_function() + result = function() + assert result == 'hello Nicholas Flamel, aka Nicholas Flamel' + + +def test_cmd_task_input_with_jinja_value(): + task = CmdTask( + name='hello-name', + inputs=[ + StrInput(name='name', default='Nicholas Flamel'), + StrInput(name='alias', default='{{input.name}}') + ], + cmd='echo hello $_INPUT_NAME, aka $_INPUT_ALIAS', + retry=0 + ) + function = task.to_function() + result = function() + assert result.output == 'hello Nicholas Flamel, aka Nicholas Flamel' + + +def test_task_input_with_should_not_be_rendered_jinja_value(): + def _run(*args, **kwargs) -> str: + name = kwargs['name'] + alias = kwargs['alias'] + return f'hello {name}, aka {alias}' + task = Task( + name='hello-name', + inputs=[ + StrInput(name='name', default='Nicholas Flamel'), + StrInput( + name='alias', default='{{input.name}}', should_render=False + ) + ], + run=_run, + retry=0 + ) + function = task.to_function() + result = function() + assert result == 'hello Nicholas Flamel, aka {{input.name}}' + + +def test_cmd_task_input_with_should_not_be_rendered_jinja_value(): + task = CmdTask( + name='hello-name', + inputs=[ + StrInput(name='name', default='Nicholas Flamel'), + StrInput( + name='alias', default='{{input.name}}', should_render=False + ) + ], + cmd='echo hello $_INPUT_NAME, aka $_INPUT_ALIAS', + retry=0 + ) + function = task.to_function() + result = function() + assert result.output == 'hello Nicholas Flamel, aka {{input.name}}' + + +def test_task_input_with_jinja_value_and_partial_custom_kwargs(): + def _run(*args, **kwargs) -> str: + name = kwargs['name'] + alias = kwargs['alias'] + return f'hello {name}, aka {alias}' + task = Task( + name='hello-name', + inputs=[ + StrInput(name='name', default='Nicholas Flamel'), + StrInput(name='alias', default='{{input.name}}') + ], + run=_run, + retry=0 + ) + function = task.to_function() + result = function(name='Alchemist') + assert result == 'hello Alchemist, aka Alchemist' + + +def test_task_input_with_jinja_value_and_custom_kwargs(): + def _run(*args, **kwargs) -> str: + name = kwargs['name'] + alias = kwargs['alias'] + return f'hello {name}, aka {alias}' + task = Task( + name='hello-name', + inputs=[ + StrInput(name='name', default='Nicholas Flamel'), + StrInput(name='alias', default='{{input.name}}') + ], + run=_run, + retry=0 + ) + # Name and alias provided + function = task.to_function() + result = function(name='Nicholas Flamel', alias='Alchemist') + assert result == 'hello Nicholas Flamel, aka Alchemist' + + +def test_task_input_with_the_same_name(): + task = Task( + name='task', + inputs=[ + StrInput(name='name', default='articuno'), + StrInput(name='name', default='zapdos'), + StrInput(name='name', default='moltres'), + ], + run=lambda *args, **kwargs: kwargs.get('name') + ) + function = task.to_function() + result = function() + assert result == 'moltres' + + +def test_task_input_with_the_same_name_on_upstream(): + task_upstream = Task( + name='task-upstream', + inputs=[ + StrInput(name='name', default='articuno') + ], + run=lambda *args, **kwargs: kwargs.get('name') + ) + task = Task( + name='task', + inputs=[ + StrInput(name='name', default='zapdos') + ], + upstreams=[task_upstream], + run=lambda *args, **kwargs: kwargs.get('name') + ) + function = task.to_function() + result = function() + assert result == 'zapdos' + diff --git a/zrb_init.py b/zrb_init.py index 7b04e760..47d2d6f7 100644 --- a/zrb_init.py +++ b/zrb_init.py @@ -297,7 +297,7 @@ 'set -e', f'cd {CURRENT_DIR}', 'echo "🤖 Perform test"', - 'pytest --ignore-glob="**/template/**/test" --ignore=playground --cov=zrb --cov-report=html --cov-report=term --cov-report=term-missing {{input.test}}' # noqa + 'pytest --ignore-glob="**/template/**/test" --ignore=playground --cov=zrb --cov-config=".coveragerc" --cov-report=html --cov-report=term --cov-report=term-missing {{input.test}}' # noqa ], retry=0, checking_interval=1