Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add monitor task decorator to create tasks that to poll some state #235

Merged
merged 8 commits into from
Aug 19, 2024

Conversation

superstar54
Copy link
Member

@superstar54 superstar54 commented Aug 15, 2024

This PR introduces an monitor task decorator, which can be particularly useful for tasks that need to poll some state (e.g., the presence of a file, the state of another WorkGraph) at given intervals until a success criterion is met.

Possible use cases:

  • Time trigger: start a task until a given time.
  • File trigger: start a task until a file exits.
  • Task monitor: monitor the state of a task and determine what's to do for a special event.
  • Cross-WorkGraph Dependencies: allows adding a task that checks the state of another task inside a different workgraph.

Note: while polling the state, the task will sleep for a given interval (default 1.0 second, can be changed by user), and relinquish control to the WorkGraph engine, so that it can run other tasks.

Example Usage:

Time monitor

A task wati until a given time.

@task.monitor()
def time_monitor(time):
    """Return True if the current time is greater than the given time."""
    return datetime.datetime.now() > time

# The task will wait until 2024-08-16, 10:54
monitor1 = wg.add_task(time_monitor, time=datetime.datetime(2024, 8, 16, 10, 54, 0))
add1 = wg.add_task(add, "add1",x = 1, y = 2)
add1.waiting_on.add(monitor1)

File monitor

Start a task until a file exits.

@task.monitor()
def file_monitor(filename):
    """Check if the file exists."""
    import os
    return os.path.exists(filename)

# The task will wait until the file exists, checking every 60 seconds.
monitor1 = wg.add_task(file_monitor, filepath="/tmp/test.txt", interval=60.0)
add1 = wg.add_task(add, "add1",x = 1, y = 2)
add1.waiting_on.add(monitor1)

Builtin tasks

I created some builtin Tasks: TaskMonitor, TimeMonitor and FileMonitor

task monitor

# wait for task "add2" from WorkGraph "test_task_monitor2", you can also use workgraph_pk for a running WorkGraph.
wg1.add_task("workgraph.task_monitor", workgraph_name="test_task_monitor2", task_name="add2")

time monitor

wg.add_task("workgraph.time_monitor", datetime=datetime.datetime.now() + datetime.timedelta(seconds=10))

file monitor

wg.add_task("workgraph.file_monitor", filepath="/tmp/test.txt")

General awaitable task

I also created an awaitable decorator to allow the user to take full control of the asyncio function.

import asyncio
from aiida_workgraph import WorkGraph, task
from aiida import load_profile

# Load the AiiDA profile
load_profile()

# Define an awaitable task using the new decorator
@task.awaitable()
async def awaitable_func(x, y):
    await asyncio.sleep(0.5)  # Simulate a delay for polling
    return x + y

# Create a WorkGraph and add the awaitable task to it
wg = WorkGraph(name="test_awaitable")
awaitable1 = wg.add_task(awaitable_func, "awaitable_func1", x=1, y=2)

# Run the WorkGraph
wg.run()

About the the asyncio

The awaitable task will let the WorkGraph go to the Waiting state. the task will relinquish control to the asyncio event loop, thus the WorkGraph can run other tasks.
However, if there is a long-running calcfunction, the available task will wait for the calcfunction to finish before it can get the control to run the next next step.

TODO

  • How to kill a running monitor task? or trigger the task manually?
  • How to keep track of the monitor task, how to set a database record using FunctionNode?

@superstar54 superstar54 changed the title Add awaitable task decorator Add Awaitable task decorator Aug 15, 2024
@superstar54 superstar54 linked an issue Aug 15, 2024 that may be closed by this pull request
@codecov-commenter
Copy link

codecov-commenter commented Aug 15, 2024

Codecov Report

Attention: Patch coverage is 83.73016% with 41 lines in your changes missing coverage. Please review.

Project coverage is 79.59%. Comparing base (5937b88) to head (b9a77b4).
Report is 36 commits behind head on main.

Files Patch % Lines
aiida_workgraph/engine/workgraph.py 65.88% 29 Missing ⚠️
aiida_workgraph/decorator.py 65.00% 7 Missing ⚠️
aiida_workgraph/executors/monitors.py 84.37% 5 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main     #235      +/-   ##
==========================================
+ Coverage   75.75%   79.59%   +3.83%     
==========================================
  Files          70       65       -5     
  Lines        4615     4802     +187     
==========================================
+ Hits         3496     3822     +326     
+ Misses       1119      980     -139     
Flag Coverage Δ
python-3.11 79.46% <83.73%> (+3.80%) ⬆️
python-3.12 79.48% <83.73%> (?)
python-3.9 79.52% <83.66%> (+3.78%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@superstar54 superstar54 changed the title Add Awaitable task decorator Add monitor task decorator to create tasks that to poll some state Aug 16, 2024
Copy link
Member

@edan-bainglass edan-bainglass left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Extensive! Great to have the monitor. I take it this supports monitoring in the AiiDA sense? So currently in AiiDA, a monitor function can be assigned to a CalcJob (see docs). Though initially intended to terminate the job if some condition is met, the monitor function can in principle do more.

Is this PR's feature meant to replicate this via aiida-workgraph methodology? Could you please comment a bit on what it can('t) do? For example, can it kill the job or attach (tracked) outputs to the job? These are currently supported. One interesting future (current?) task for a monitor would be to trigger other jobs if some condition is met. This is a feature request from the W&C community. The trick is how we should keep track of these actions in the provenance.

As for awaitable decorator (maybe should be in a separate PR?), no comments there. I don't play with async all that much. However, good to support it, as concurrency is a powerful tool in general!

@superstar54
Copy link
Member Author

@edan-bainglass , thanks for for the comment!

The Calcjob monitor and the WorkGraph monitor are different in terms of the scope of monitoring.

The Monitor in AiiDA currently works on the Calcjob. It is called within the CalcJob's process, so it can modify its output. This is intra-process monitoring.

The Monitor in WorkGraph is a task. The monitor task can monitor any events (e.g. time, file changes, etc.) as long as there is a API. Of course, it can also monitor the AiiDA processes using AiiDA API. It can fetch the state of other AiiDA processes, and it can kill/pause/play the process, but it can not modify the data (outputs) of the process. This is inter-process monitoring.

One interesting future (current?) task for a monitor would be to trigger other jobs if some condition is met.

This is exactly what the monitor task does.

The trick is how we should keep track of these actions in the provenance.

The provenance is not tracked as the monitor task is not a aiida process, and it can not be a process, otherwise it will store too much unuseful data. I believe we need to create a special data type for the monitor task to store the provenance, because AiiDA provenance requires AiiDA data,

The monitor task is actually implemented using the awatiable feature similar to the awaitable decorator, but we hide the asyncio from the user. The user only needs to write the monitor task as a normal function, and the WorkGraph will take care of the rest.

@edan-bainglass
Copy link
Member

@superstar54 thanks for clarifying the various parts. Let's discuss next week the submission of other jobs via a monitor. This is quite interesting.

As for the PR, good to merge when you're ready.

Copy link
Contributor

@agoscinski agoscinski left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR, looks good to me. Only minor suggestions.

aiida_workgraph/engine/workgraph.py Outdated Show resolved Hide resolved
aiida_workgraph/engine/workgraph.py Outdated Show resolved Hide resolved
aiida_workgraph/engine/workgraph.py Outdated Show resolved Hide resolved
aiida_workgraph/engine/workgraph.py Show resolved Hide resolved
aiida_workgraph/engine/workgraph.py Outdated Show resolved Hide resolved
aiida_workgraph/engine/workgraph.py Show resolved Hide resolved
aiida_workgraph/engine/workgraph.py Outdated Show resolved Hide resolved
aiida_workgraph/engine/workgraph.py Outdated Show resolved Hide resolved
loop=self.loop,
)
awaitable = self.construct_awaitable_function(name, awaitable_target)
self.set_task_state_info(name, "state", "RUNNING")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we report the finished state should we not also log somewhere the running state?

Suggested change
self.set_task_state_info(name, "state", "RUNNING")
self.set_task_state_info(name, "state", "RUNNING")
self.report(f"Task: {name} is running.", )

Maybe on a different log level like debug, but for that I think one needs to write a new function, but the report function is fairly easy to implement. https://github.com/aiidateam/aiida-core/blob/fb3686271fcdeb5506838a5a3069955546b05460/src/aiida/engine/processes/process.py#L589-L600

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At the top of the run_tasks function, it reports that to run the task and also shows the type of the task. Thus I prefer not to report the running state.

aiida_workgraph/engine/workgraph.py Show resolved Hide resolved
Copy link
Contributor

@agoscinski agoscinski left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks looks good!

@superstar54 superstar54 merged commit 05ffc34 into main Aug 19, 2024
8 checks passed
@superstar54 superstar54 deleted the feature/custom_awaitable branch August 19, 2024 17:36
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Add Awaitable task decorator
4 participants