Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BUG: Cache before startup #155

Merged
merged 7 commits into from
Nov 27, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions docs/cookbook/settings.rst
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,22 @@ are in production or test:
config.silence_task_logging = False
config.silence_cond_check = False

.. note::

The tasks' caches (ie. status and last run/success/fail) are set after the
hooks have run. If your setup needs to run after the caches are set
and startup tasks have run, you can do it by:

.. code-block:: python

@app.setup()
def setup_app():
# Run before startup tasks and cache is set
...
yield
# Run after startup tasks and cache is set
...

You can also modify tasks in the setup. For example,
if you wish to have an environment to test only the
scheduling (without running anything):
Expand Down
1 change: 1 addition & 0 deletions docs/versions.rst
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ Version history
runs can be tracked in the logs using the field ``run_id``.

- Update: ``rocketry.conds.running`` refactored to support multi-launch.
- Update: Task cache is no longer set at initiation but at session start
- Add: New config option ``timezone``
- Add: New config option ``time_func`` for testing scheduling
- API: Added config option ``execution`` (deprecated ``task_execution``)
Expand Down
7 changes: 7 additions & 0 deletions rocketry/core/schedule.py
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,13 @@ async def startup(self):

self.logger.debug("Beginning startup sequence...")
for task in self.tasks:
try:
task.set_cached()
except TaskLoggingError:
self.logger.exception(f"Failed setting cache for task '{task.name}'")
if not self.session.config.silence_task_logging:
raise

if task.on_startup:
if isinstance(task.start_cond, AlwaysFalse) and not task.disabled:
# Make sure the tasks run if start_cond not set
Expand Down
16 changes: 11 additions & 5 deletions rocketry/core/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -326,10 +326,8 @@ def __init__(self, **kwargs):
self.session._check_readable_logger()

self.register()

# Update "last_run", "last_success", etc.
self.set_cached()

self._init_cache()

# Hooks
hooker.postrun()

Expand Down Expand Up @@ -908,6 +906,14 @@ def register(self):
name = self.name
self.session.add_task(self)

def _init_cache(self):
self._last_run = None
self._last_success = None
self._last_fail = None
self._last_terminate = None
self._last_inaction = None
self._last_crash = None

def set_cached(self):
"Update cached statuses"
# We get the logger here to not flood with warnings if missing repo
Expand Down Expand Up @@ -1248,7 +1254,7 @@ def _get_last_action(self, action:str, from_logs=None, logger=None) -> float:


if allow_cache: # and getattr(self, cache_attr) is not None
value = getattr(self, cache_attr)
value = getattr(self, cache_attr, None)
else:
value = self._get_last_action_from_log(action, logger)
setattr(self, cache_attr, value)
Expand Down
43 changes: 42 additions & 1 deletion rocketry/test/app/test_hooks.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,4 +53,45 @@ def do_things():
app.run()
assert calls == ['starting', 'setup 1', 'setup 2', 'startup task']
assert len(task_logger.handlers) == 1
assert task_logger.handlers[0].repo.model == LogRecord
assert task_logger.handlers[0].repo.model == LogRecord

def test_setup_cache():
app = Rocketry()
repo = MemoryRepo(model=MinimalRecord)
repo.add(MinimalRecord(created=1000, action="run", task_name="do_things"))
repo.add(MinimalRecord(created=2000, action="success", task_name="do_things"))

@app.setup()
def setup_func(logger=TaskLogger()):
assert task.status is None
logger.set_repo(repo)
yield
assert task.status == "success"

@app.task()
def do_things():
...
raise RuntimeError("This should never run")

# We double check the cache is also set before startup tasks
@app.task(on_startup=True)
def verify_cache():
assert task.status == "success"

task = app.session[do_things]
assert task.status is None
assert task._last_run is None
assert task._last_success is None
assert task._last_fail is None
assert task._last_inaction is None
assert task._last_crash is None

app.session.config.shut_cond = true
app.run()

# setup should have updated the cache
assert task.status == "success"
assert task._last_run == 1000.0
assert task._last_success == 2000.0

assert app.session[verify_cache].status == "success"
36 changes: 36 additions & 0 deletions rocketry/test/session/test_logs.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,11 +106,47 @@ def emit(self, record):
session.start()
assert task.status == "fail"

session.remove_task(task)

task = FuncTask({"success": do_success, "fail": do_fail}[status], name="b task", execution=execution, session=session)
task.run()

session.config.silence_task_logging = True
session.run(task)

assert task.status == "fail"

@pytest.mark.parametrize("on", ["startup", "normal", "shutdown"])
def test_failed_set_cache(on, session):
class MyHandler(logging.Handler):
def emit(self, record):
if record.action != "run":
raise RuntimeError("Oops")

if on == "normal":
session.config.shut_cond = TaskFinished(task="a task") >= 1
else:
session.config.shut_cond = SchedulerCycles() == 1

logger = logging.getLogger("rocketry.task")
logger.handlers.insert(0, MyHandler())
task = FuncTask(do_success, name="a task", session=session)
task.log_running()
if on == "startup":
task.on_startup = True
elif on == "shutdown":
task.on_shutdown = True
with pytest.raises(TaskLoggingError):
session.start()
assert task.status == "fail"

session.remove_task(task)
task = FuncTask(do_success, name="a task", session=session)
task.log_running()
session.config.silence_task_logging = True
session.start()
assert task.status == "fail"

@pytest.mark.parametrize(
"query,expected",
[
Expand Down
7 changes: 5 additions & 2 deletions rocketry/test/task/func/test_logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from rocketry.log.log_record import MinimalRecord
from rocketry.tasks import FuncTask
from rocketry.testing.log import create_task_record
from rocketry.conds import true

def run_success():
pass
Expand Down Expand Up @@ -55,6 +56,7 @@ def test_set_cached_in_init(session, optimized, last_status):
name="mytask",
session=session
)
task.set_cached()
for action, created in times.items():
dt = datetime.datetime.fromtimestamp(created)
last_action_value = getattr(task, f"last_{action}")
Expand Down Expand Up @@ -204,10 +206,11 @@ def test_without_handlers_status_warnings(session):
session=session
)
# Removing the handlers that were added

session.config.shut_cond = true
with pytest.warns(UserWarning) as warns:
session.start()
# Test warnings
expected_warnings = [
'Logger rocketry.task cannot be read. Logging is set to memory. To supress this warning, please set a handler that can be read (redbird.logging.RepoHandler)',
"Logger 'rocketry.task.test' for task 'task 1' does not have ability to be read. Past history of the task cannot be utilized.",
"Task 'task 1' logger is not readable. Latest run unknown.",
"Task 'task 1' logger is not readable. Latest success unknown.",
Expand Down
3 changes: 3 additions & 0 deletions rocketry/test/task/test_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,13 +76,15 @@ def test_pickle(session):

def test_crash(session):
task = DummyTask(name="mytest", session=session)
task.set_cached()
task.log_running()
assert task.status == "run"
assert task.last_crash is None
task.delete()

# Recreating and now should log crash
task = DummyTask(name="mytest", session=session)
task.set_cached()
assert task.status == "crash"
assert task.last_crash

Expand All @@ -105,6 +107,7 @@ def test_json(session):
"task": Task(),
"another_task": Task('another')
}, session=session)
task.set_cached()
j = task.json(indent=4)

dt_run = datetime.datetime.fromtimestamp(1640988000)
Expand Down