Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Docs/update #63

Merged
merged 14 commits into from
Jul 23, 2022
5 changes: 5 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -78,6 +78,7 @@ Core functionalities:
- Task parametrization
- Task pipelining
- Modifiable session also in runtime
- Async support

Links:

@@ -188,6 +189,10 @@ def do_second(arg=Return(do_first)):
def do_unparallel():
...

@app.task("daily", execution="async")
async def do_async():
...

@app.task("daily", execution="thread")
def do_on_separate_thread():
...
132 changes: 132 additions & 0 deletions docs/code/demos/fast_api/api.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
from typing import List, Literal, Optional
from redbird.oper import in_, between, greater_equal, less_equal
from fastapi import APIRouter, FastAPI, Query

from scheduler import app as app_rocketry

app = FastAPI()
session = app_rocketry.session


# Session Config
# --------------

router_config = APIRouter(tags=["config"])

@router_config.get("/session/config")
async def get_session_config():
return session.config

@router_config.patch("/session/config")
async def patch_session_config(values:dict):
for key, val in values.items():
setattr(session.config, key, val)


# Session Parameters
# ------------------

router_params = APIRouter(tags=["session parameters"])

@router_params.get("/session/parameters")
async def get_session_parameters():
return session.parameters

@router_params.get("/session/parameters/{name}")
async def get_session_parameters(name):
return session.parameters[name]

@router_params.put("/session/parameters/{name}")
async def put_session_parameter(name:str, value):
session.parameters[name] = value

@router_params.delete("/session/parameters/{name}")
async def delete_session_parameter(name:str):
del session.parameters[name]


# Session Actions
# ---------------

router_session = APIRouter(tags=["session"])

@router_session.post("/session/shut_down")
async def shut_down_session():
session.shut_down()


# Task
# ----

router_task = APIRouter(tags=["task"])

@router_task.get("/tasks")
async def get_tasks():
return list(session.tasks)

@router_task.get("/tasks/{task_name}")
async def get_task(task_name:str):
return session[task_name]

@router_task.patch("/tasks/{task_name}")
async def patch_task(task_name:str, values:dict):
task = session[task_name]
for attr, val in values.items():
setattr(task, attr, val)


# Task Actions
# ------------

@router_task.post("/tasks/{task_name}/disable")
async def disable_task(task_name:str):
task = session[task_name]
task.disabled = True

@router_task.post("/tasks/{task_name}/run")
async def run_task(task_name:str):
task = session[task_name]
task.force_run = True


# Logging
# -------

router_logs = APIRouter(tags=["logs"])

@router_logs.get("/logs", description="Get tasks")
async def get_task_logs(action: Optional[List[Literal['run', 'success', 'fail', 'terminate', 'crash', 'inaction']]] = Query(default=[]),
min_created: Optional[int]=Query(default=None), max_created: Optional[int] = Query(default=None),
task: Optional[List[str]] = Query(default=None)):
filter = {}
if action:
filter['action'] = in_(action)
if min_created or max_created:
filter['created'] = between(min_created, max_created, none_as_open=True)
if task:
filter['task_name'] = in_(task)

repo = session.get_repo()
return repo.filter_by(**filter).all()

@router_logs.get("/task/{task_name}/logs", description="Get tasks")
async def get_task_logs(task_name:str,
action: Optional[List[Literal['run', 'success', 'fail', 'terminate', 'crash', 'inaction']]] = Query(default=[]),
min_created: Optional[int]=Query(default=None), max_created: Optional[int] = Query(default=None)):
filter = {}
if action:
filter['action'] = in_(action)
if min_created or max_created:
filter['created'] = between(min_created, max_created, none_as_open=True)

return session[task_name].logger.filter_by(**filter).all()


# Add routers
# -----------

app.include_router(router_config)
app.include_router(router_params)
app.include_router(router_session)
app.include_router(router_task)
app.include_router(router_logs)
30 changes: 30 additions & 0 deletions docs/code/demos/fast_api/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
import asyncio
import logging

import uvicorn

from api import app as app_fastapi
from scheduler import app as app_rocketry


class Server(uvicorn.Server):
"""Customized uvicorn.Server
Uvicorn server overrides signals and we need to include
Rocketry to the signals."""
def handle_exit(self, sig: int, frame) -> None:
app_rocketry.session.shut_down()
return super().handle_exit(sig, frame)


async def main():
"Run scheduler and the API"
server = Server(config=uvicorn.Config(app_fastapi, workers=1, loop="asyncio"))

api = asyncio.create_task(server.serve())
sched = asyncio.create_task(app_rocketry.serve())

await asyncio.wait([sched, api])

if __name__ == "__main__":
asyncio.run(main())
14 changes: 14 additions & 0 deletions docs/code/demos/fast_api/scheduler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
from rocketry import Rocketry

app = Rocketry(config={"task_execution": "async"})

@app.task('every 20 seconds')
async def do_things():
...

@app.task('every 5 seconds')
async def do_stuff():
...

if __name__ == "__main__":
app.run()
17 changes: 17 additions & 0 deletions docs/code/demos/minimal_async.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
import asyncio
from rocketry import Rocketry

app = Rocketry(config={'task_execution': 'async'})

@app.task()
async def do_things():
...

async def main():
"Launch Rocketry app (and possibly something else)"
rocketry_task = asyncio.create_task(app.serve())
... # Start possibly other async apps
await rocketry_task

if __name__ == "__main__":
asyncio.run(main())
10 changes: 7 additions & 3 deletions docs/code/execution.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
@app.task("daily", execution="main")
@app.task(execution="main")
def do_main():
...

@app.task("daily", execution="thread")
@app.task(execution="async")
async def do_async():
...

@app.task(execution="thread")
def do_thread():
...

@app.task("daily", execution="process")
@app.task(execution="process")
def do_process():
...
35 changes: 35 additions & 0 deletions docs/examples/demos/fastapi.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@

Scheduler with API (FastAPI)
============================

This is an example to integrate FastAPI with Rocketry.
Copy the files to your project and modify as you need.

This example contains three files:

- **scheduler.py**: Rocketry app. Put your tasks here.
- **api.py**: FastAPI app. Contains the API routes.
- **main.py**: Combines the apps. Run this file.

**scheduler.py**

.. literalinclude:: /code/demos/fast_api/scheduler.py
:language: py

**ui.py**

.. literalinclude:: /code/demos/fast_api/api.py
:language: py

**main.py**

.. literalinclude:: /code/demos/fast_api/main.py
:language: py

The system works via *async*. The execution alters between
the API and the scheduler constantly.

.. note::

Uvicorn modifies signal handlers. In order to make ``keyboardinterrupt``
to shut down the scheduler as well, we have to modify Uvicorn server.
6 changes: 6 additions & 0 deletions docs/examples/index.rst
Original file line number Diff line number Diff line change
@@ -2,6 +2,12 @@
Examples
========

.. toctree::
:maxdepth: 3
:caption: Contents:

demos/fastapi

**Minimal example:**

.. literalinclude:: /code/demos/minimal.py
14 changes: 13 additions & 1 deletion docs/handbooks/config.rst
Original file line number Diff line number Diff line change
@@ -95,6 +95,18 @@ Options

By default it is set to ``0.1``.

.. _config_instant_shutdown:

**instant_shutdown**: Whether to terminate all tasks on shutdown.

If set ``False``, the scheduler will wait till all tasks finish on shutdown.
Running tasks that are past their timeout or their ``end_cond`` are true,
are terminated normally. If set ``True``, the scheduler will always
terminate all running tasks on shutdown. Shutdown will still wait till all
threads are finished.

By default, ``False``.

**param_materialize**: When to turn arguments to actual values.

Whether to turn the arguments to actual values before or after
@@ -105,4 +117,4 @@ Options
- ``post``: After thread/process creation. (default)

Only applicable for some argument types and materialization type
specified in the argument itself overrides configuration setting.
specified in the argument itself overrides configuration setting.
1 change: 1 addition & 0 deletions docs/handbooks/index.rst
Original file line number Diff line number Diff line change
@@ -11,6 +11,7 @@ operates.
:maxdepth: 3
:caption: Contents:

task/index
conditions/index
config
logging
216 changes: 216 additions & 0 deletions docs/handbooks/task/execution.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,216 @@

.. _handbook-execution:

Execution
=========

There are four methods to execute Rocketry tasks:

- ``main``: Run on synchronously in main thread and process
- ``async``: Run asynchronously
- ``thread``: Run on separate thread
- ``process``: Run on separate process

Example usage:

.. literalinclude:: /code/execution.py
:language: py

Here is a summary of the methods:

=========== ============= ===================== ========================
Execution Parallerized? Can be terminated? Can modify the session?
=========== ============= ===================== ========================
``process`` Yes Yes No
``thread`` Partially Yes if task supports Yes
``async`` Partially Yes if awaits Yes
``main`` No No Yes
=========== ============= ===================== ========================

If your problem is CPU bound (uses a lot of computational resources), you
should use ``process`` as the execution type. If your problem
is IO bound, then you should use ``async`` if your IO libraries support async and
``thread`` if they don't support. If you wish to run your code completely
synchronously, use ``main``.

Note that ``process`` cannot share the memory with the scheduler engine thus
it cannot modify its content on runtime. Therefore if you wish to build APIs
or other ways modify the scheduler on runtime you should use either ``main``,
``async`` or ``thread`` as the execution type.

``main`` and ``async`` are the least expensive in terms of time and resources it
takes to initiate such task. ``thread`` has some overhead and ``process`` has
quite significantly.

You can also set the default execution method that is used if task does not specify
it:

.. code-block:: python
app = Rocketry(config={'task_execution': 'main'})
@app.task()
def do_main():
...
Main
----

This execution method runs the task unparallelized on main thread and
process. The executed task can utilize async but tasks with
this execution type blocks the scheduler to do anything else
when such a task is running. In other words, if a task with
this execution type is running, the scheduler cannot check
and launch other tasks in the mean time or do anything else.

.. note::

If the task can utilize async but the scheduler will not continue
scheduling until the task is completed (failed or success).

For example, this task blocks the scheduler:

.. code-block:: python
@app.task(execution="main")
async def do_async():
...
.. warning::

Tasks with execution ``main`` cannot be terminated. If they get stuck,
the scheduler will get stuck.

Useful if you wish to run synchronously and block all other execution
in the scheduler in the meantime.


Async
-----

This execution method runs the task asynchronously using `asyncio library <https://docs.python.org/3/library/asyncio.html>`_.
The task should utilize the async in order to gain benefits over
``main`` execution method. Otherwise the task will block scheduler
to do other things such as launching other tasks, terminating tasks
or checking shutting conditions.

.. code-block:: python
import asyncio
@app.task(execution="async")
async def do_sync():
await asyncio.sleep(10) # Do something async
.. warning::

Your task also should also use async. Otherwise the task
may completely block the scheduler and the end result is the same
as running with execution as ``main``. The task also cannot be
terminated if it does not ``await``.

This does not benefit from async:

.. code-block:: python
@app.task(execution="async")
def do_sync():
...
And neither does this (as it does not use ``async.sleep``):

.. code-block:: python
import time
@app.task(execution="async")
asnyc def do_sync():
time.sleep(10) # Do something without await
.. warning::

If a task with execution ``async`` gets stuck and it cannot call ``await``,
the scheduler will get stuck as well.

.. note::

Due to `GIL (Global Interpreter Lock) <https://wiki.python.org/moin/GlobalInterpreterLock>`_,
only one Python line can be executed at once.
Therefore pure Python code without any IO operations won't have
any performance benefits. However, if there are IO operations, there could
be improvements in performance.

In order to make ``async`` task to be terminated, the task should await at some point.

Useful for IO bound problems or to integrate APIs.


Thread
------

This execution method runs the task on a separate thread using
`threading library <https://docs.python.org/3/library/threading.html>`_.

.. note::

Due to `GIL (Global Interpreter Lock) <https://wiki.python.org/moin/GlobalInterpreterLock>`_,
only one Python line can be executed at once.
Therefore pure Python code without any IO operations won't have
any performance benefits. However, if there are IO operations, there could
be improvements in performance.

In order to make ``thread`` task to be terminated, the task should listen the termination flag:

.. code-block:: python
from rocketry.args import TerminationFlag
from rocketry.exc import TaskTerminationException
@app.task(execution="thread")
def do_thread(flag=TerminationFlag()):
while True:
... # Do something
if flag.is_set():
raise TaskTerminationException()
.. warning::

If a task with execution ``thread`` never finishes and it does not respect termination,
the scheduler cannot shut down itself.

.. note::

Terminated thread tasks should raise ``TaskTerminationException`` to signal
they finished prematurely due to termination. Without an exception, the task
was considered to run successfully.

Useful for IO bound problems where there are no async support.


Process
-------

This execution method runs the task on a separate process using
`multiprocessing library <https://docs.python.org/3/library/multiprocessing.html>`_.

.. warning::

The process cannot share memory with the main scheduler thus you cannot
modify the state of the scheduler from tasks parallelized with ``process``.
However, you may still pass parameters to the task.

.. warning::

You cannot pass parameters that cannot be pickled to tasks with
``process`` as the execution type.

.. warning::

Especially using ``process`` execution type, running the application **must be**
wrapped with ``if __name__ == "__main__":`` block or otherwise launching a process
task may launch another instance of the scheduler. Also, you should not wrap the
task function with decorators with execution type as ``process`` as otherwise
serialization of the function fails in initiating the process.

Useful for CPU bound problems or for problems in which the code has tendency to get stuck.
15 changes: 15 additions & 0 deletions docs/handbooks/task/index.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@

Task
====

In this handbook we go through some technical details of
how the tasks works and how to change their behaviour.


.. toctree::
:maxdepth: 2
:caption: Contents:

termination
execution
parameterization
244 changes: 244 additions & 0 deletions docs/handbooks/task/parameterization.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,244 @@

Parameterization
================

Parameters can be set on task level or on session level
where they can be used by multiple tasks or conditions.

Here are short definitions of the relevant terms:

.. glossary::

parameter
Key-value pair that can be passed to a task
to utilize.

argument
Value of the key-value pair or parameter.

Here is an example to initiate a session level parameter:

.. code-block:: python
@app.params(my_arg="Hello world")
This sets a parameter called ``"my_arg"`` to the session.
When this parameter is used, the function will be executed
and the return value will be used as the paramter value.

Here is an example to use it in a task:

.. code-block:: python
from rocketry.args import Arg
@app.task()
def do_things(arg=Arg("my_arg")):
assert arg == "Hello world"
...
Here is an example to use it in a custom condition:

.. code-block:: python
from rocketry.args import Arg
@app.cond("is foo")
def is_foo(arg=Arg("my_arg")):
assert arg == "Hello world"
...
return True
You can also use this argument in another argument:

.. code-block:: python
@app.param("my_arg_2")
def get_my_arg(arg=Arg("my_arg")):
assert arg == "Hello world"
return "Hello world 2"
.. warning::

The arguments may end up in infinite recursion if
argument *x* requests argument *y* and argument
*y* requests argument *x*.


Arg
---

The value of ``rocketry.args.Arg`` is acquired from the
session level parameters. The value is whatever is stored
behind a given key in the session parameters.

A simple example:

.. code-block:: python
from rocketry.args import Arg
app.params(my_arg="Hello")
@app.task()
def do_things(arg=Arg("my_arg")):
...
return "Hello world"
The argument in session parameters can be plain Python object
or another argument such as ``FuncArg``

You can access the parameters using ``app.session.parameters``.


Return
------

Argument ``rocketry.args.Return`` represents a return
value of a task. It can be used to pipeline input-output
of tasks.

.. code-block:: python
from rocketry.args import Return
@app.task()
def do_things():
...
return "Hello world"
@app.task()
def do_things(arg=Return(do_things)):
...
return "Hello world"
Alternatively, you can also refer to the task name using string:

.. code-block:: python
from rocketry.args import Return
@app.task()
def do_things():
...
return "Hello world"
@app.task()
def do_things(arg=Return("do_things")):
...
return "Hello world"
FuncArg
-------

Function argument (``rocketry.args.FuncArg``) is an argument
which value represents the return value of a function. The
function is run every time the argument value is evaluated.

A simple example:

.. code-block:: python
from rocketry.args import FuncArg
def get_my_arg():
...
return "Hello world"
@app.task()
def do_things(arg=FuncArg(get_my_arg)):
...
return "Hello world"
You can also set the ``FuncArg`` to the session parameters
using a wrapper in the application and pass the ``FuncArg``
using ``Arg`` to a task:

.. code-block:: python
from rocketry.args import Arg
@app.param("my_arg")
def get_my_arg():
...
return "Hello world"
@app.task()
def do_things(arg=Arg("my_arg")):
...
Alternatively, you can use the function:

.. code-block:: python
from rocketry.args import Arg
@app.param("my_arg")
def get_my_arg():
...
return "Hello world"
@app.task()
def do_things(arg=Arg(get_my_arg)):
...
Special Arguments
-----------------

There are also arguments which represents
a component in Rocketry's ecosystem.

Here is a list of those:

.. glossary::

``rocketry.args.Task``
The value is a task instance.

Example:

.. code-block:: python
from rocketry.args import Task
@app.task()
def do_things(task=Task()):
if task.last_success:
...
``rocketry.args.Session``
The value is the session instance.

Example:

.. code-block:: python
from rocketry.args import Session
@app.task()
def do_things(session=Session()):
session.shut_down()
``rocketry.args.TerminationFlag``
The value is a threading event to indicate when
the task has been called to be terminated. Should
be used tasks with execution as ``thread`` and those
tasks should check the value of this flag periodically
(``.is_set()``) and raise ``rocketry.exc.TaskTerminationException``
if the flag is set. Otherwise, the threaded task cannot
be terminated.

Example:

.. code-block:: python
from rocketry.args import TerminationFlag
from rocketry.exc import TaskTerminationException
@app.task(execution="thread")
def do_things(flag=TerminationFlag()):
while True:
if flag.is_set():
raise TaskTerminationException()
...
83 changes: 83 additions & 0 deletions docs/handbooks/task/termination.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@

Termination
===========

Tasks can be terminated if any of the
following are met:

- Task has run longer than its timeout
- Task's ``end_cond`` is true
- Scheduler is immediately shutting down

.. warning::

Only ``async``, ``thread`` and ``process`` tasks can be terminated.
Read more about the execution methods:
`execution method handbook <handbook-execution>`.

Timeout
-------

Timeout defines how long the task may run. It can be set on session level
and on task level. Task level timeout overrides the session setting so
if a task does not have timeout specified the session setting is used
instead.

To set it on task level:

.. code-block:: python
@app.task(timeout="1 hour", execution="process")
def do_things():
...
To set it on session level:

.. code-block:: python
from rocketry import Rocketry
app = Rocketry(config={"timeout": 0.1})
@app.task(timeout="1 hour", execution="process")
def do_things():
...
The timeout can be as:

- int or float (number of seconds)
- string (timedelta string)
- ``datetime.timedelta``

End Condition
-------------

End condition is a condition that when true, the task is terminated.
This is useful to prevent some tasks to run outside their intended
period. For example, you may want to kill all running less important
tasks when the actual production starts.

For example, this task will be terminated if it is running between
08:00 and 18:00 (8 am to 6 pm):

.. code-block:: python
from rocketry.conds import time_of_day
@app.task(end_cond=time_of_day.between("08:00", "18:00"))
def do_things():
...
Scheduler Shutdown
------------------

If scheduler shuts down with no errors (either shut_down was called or
the scheduler shut condition was reached), the scheduler waits for the
running tasks to finish or to reach their timeout or end condition.

However, the scheduler will terminate all tasks during shutdown if:

- Scheduler encountered a fatal error
- The configuration :ref:`instant_shutdown <config_instant_shutdown>` is ``True``
- ``session.shut_down(force=True)`` was called
- ``session.shut_down()`` was called twice
1 change: 1 addition & 0 deletions docs/index.rst
Original file line number Diff line number Diff line change
@@ -54,6 +54,7 @@ applications. It is simple, clean and extensive.
- Task parametrization
- Task pipelining
- Modifiable session also in runtime
- Async support

**It looks like this:**

11 changes: 2 additions & 9 deletions docs/tutorial/basic.rst
Original file line number Diff line number Diff line change
@@ -91,6 +91,7 @@ There are three options for how tasks are executed:

- ``process``: Run the task in a separate process
- ``thread``: Run the task in a separate thread
- ``async``: Run the task in async
- ``main``: Run the task in the main process and thread (default)

Here is a quick example of each:
@@ -109,15 +110,7 @@ if there is one that you prefer in your project:
def do_main():
...
There are pros and cons in each option. In short:

=========== ============= ===================== ========================
Execution Parallerized? Can be terminated? Can modify the session?
=========== ============= ===================== ========================
``process`` Yes Yes No
``thread`` Yes Yes if task supports Yes
``main`` No No Yes
=========== ============= ===================== ========================
Read more about the execution types in :ref:`execution handbook <handbook-execution>`.


Changing Logging Destination
11 changes: 11 additions & 0 deletions docs/tutorial/intermediate.rst
Original file line number Diff line number Diff line change
@@ -9,6 +9,17 @@ that you might not come across in very
simple applications but you eventually
need to know.

Running as Async
----------------

By default, ``app.run`` starts a new event loop.
If you wish to integrate other async apps, such
as FastAPI, you can also call ``app.serve`` method
which is an async method to start the scheduler:

.. literalinclude:: /code/demos/minimal_async.py
:language: py

Session Configurations
----------------------

3 changes: 2 additions & 1 deletion rocketry/test/app/test_app.py
Original file line number Diff line number Diff line change
@@ -159,7 +159,8 @@ def my_func():
return 'func arg value'

@app.cond('is foo')
def is_foo():
def is_foo(arg=Arg("my_func_arg")):
assert arg == "func arg value"
return True

# Creating some tasks
22 changes: 22 additions & 0 deletions rocketry/test/session/params/test_func.py
Original file line number Diff line number Diff line change
@@ -59,6 +59,28 @@ def test_session(session, execution):

assert "success" == task.status

@pytest.mark.parametrize("execution", ["main", "thread", "process"])
def test_session_with_arg(session, execution):

session.parameters["a_param"] = FuncArg(get_x)

task = FuncTask(
func_x_with_arg,
execution=execution,
name="a task",
parameters={"myparam": Arg('a_param')},
start_cond=AlwaysTrue()
)
session.config.shut_cond = (TaskStarted(task="a task") >= 1)

assert task.status is None
session.start()
if platform.system() == "Windows" and execution == "process" and (materialize == "post" or (materialize is None and config_mater in ("post", None))):
# Windows cannot pickle the session but apparently Linux can
assert "fail" == task.status
else:
assert "success" == task.status

@pytest.mark.parametrize("config_mater", ['pre', 'post', None])
@pytest.mark.parametrize("materialize", ['pre', 'post', None])
@pytest.mark.parametrize("execution", ["main", "thread", "process"])