Skip to content

Commit

Permalink
Merge pull request #44 from golemfactory/refactor/yield-completed-tasks
Browse files Browse the repository at this point in the history
Yield completed tasks from `Engine.map()`
  • Loading branch information
azawlocki committed Sep 30, 2020
2 parents 4029eb2 + 93d3d9b commit 40f1474
Show file tree
Hide file tree
Showing 5 changed files with 209 additions and 181 deletions.
12 changes: 7 additions & 5 deletions examples/blender/blender.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import pathlib
import sys

from yapapi.log import enable_default_logger, log_summary, log_event_json # noqa
from yapapi.log import enable_default_logger, log_summary, log_event_repr # noqa
from yapapi.runner import Engine, Task, vm
from yapapi.runner.ctx import WorkContext
from datetime import timedelta
Expand Down Expand Up @@ -45,10 +45,12 @@ async def worker(ctx: WorkContext, tasks):
},
)
ctx.run("/golem/entrypoints/run-blender.sh")
ctx.download_file(f"/golem/output/out{frame:04d}.png", f"output_{frame}.png")
output_file = f"output_{frame}.png"
ctx.download_file(f"/golem/output/out{frame:04d}.png", output_file)
yield ctx.commit()
# TODO: Check if job results are valid
task.accept_task()
# and reject by: task.reject_task(reason = 'invalid file')
task.accept_task(result=output_file)

ctx.log("no more frames to render")

Expand All @@ -70,8 +72,8 @@ async def worker(ctx: WorkContext, tasks):
event_emitter=log_summary(),
) as engine:

async for progress in engine.map(worker, [Task(data=frame) for frame in frames]):
print("progress=", progress)
async for task in engine.map(worker, [Task(data=frame) for frame in frames]):
print(f"\033[36;1mTask computed: {task}, result: {task.output}\033[0m")


if __name__ == "__main__":
Expand Down
15 changes: 7 additions & 8 deletions yapapi/log.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@
```
Engine(..., event_emitter=yapapi.log.log_event)
```
For even more detailed machine-readable output use `log_event_json`:
For even more detailed machine-readable output use `log_event_repr`:
```
Engine(..., event_emitter=yapapi.log.log_event_json)
Engine(..., event_emitter=yapapi.log.log_event_repr)
```
For summary human-readable output use `log_summary()`:
```
Expand All @@ -37,7 +37,7 @@
```
Engine(
...
event_emitter=yapapi.log.log_summary(yapapi.log.log_event_json)
event_emitter=yapapi.log.log_summary(yapapi.log.log_event_repr)
)
```
"""
Expand Down Expand Up @@ -151,11 +151,10 @@ def _format(obj: Any, max_len: int = 200) -> str:
logger.log(loglevel, msg)


def log_event_json(event: events.Event) -> None:
"""Log an event as a tag with attributes in JSON format."""
(exc_info, event) = event.extract_exc_info()
info = {name: str(value) for name, value in asdict(event).items()}
logger.debug("%s %s", type(event).__name__, json.dumps(info) if info else "", exc_info=exc_info)
def log_event_repr(event: events.Event) -> None:
"""Log the result of calling `__repr__()` for the `event`."""
exc_info, _ = event.extract_exc_info()
logger.debug("%r", event, exc_info=exc_info)


class SummaryLogger:
Expand Down
Loading

0 comments on commit 40f1474

Please sign in to comment.