From 8f32849f6eafb6ec3ab3af719a3b308e7ff31f69 Mon Sep 17 00:00:00 2001 From: azawlocki Date: Wed, 30 Sep 2020 09:43:52 +0200 Subject: [PATCH 1/4] Yield tasks from `Engine.map()`; move `Task` class to `yapapi.runner.task`. --- examples/blender/blender.py | 10 +- yapapi/runner/__init__.py | 208 ++++++++---------------------------- yapapi/runner/events.py | 2 +- yapapi/runner/task.py | 137 ++++++++++++++++++++++++ 4 files changed, 186 insertions(+), 171 deletions(-) create mode 100644 yapapi/runner/task.py diff --git a/examples/blender/blender.py b/examples/blender/blender.py index c0b4eff68..4d6be53b3 100755 --- a/examples/blender/blender.py +++ b/examples/blender/blender.py @@ -45,10 +45,12 @@ async def worker(ctx: WorkContext, tasks): }, ) ctx.run("/golem/entrypoints/run-blender.sh") - ctx.download_file(f"/golem/output/out{frame:04d}.png", f"output_{frame}.png") + output_file = f"output_{frame}.png" + ctx.download_file(f"/golem/output/out{frame:04d}.png", output_file) yield ctx.commit() # TODO: Check if job results are valid - task.accept_task() + # and reject by: task.reject_task(reason = 'invalid file') + task.accept_task(result=output_file) ctx.log("no more frames to render") @@ -70,8 +72,8 @@ async def worker(ctx: WorkContext, tasks): event_emitter=log_summary(), ) as engine: - async for progress in engine.map(worker, [Task(data=frame) for frame in frames]): - print("progress=", progress) + async for task in engine.map(worker, [Task(data=frame) for frame in frames]): + print(f"\033[36;1mTask computed: {task}, result: {task.output}\033[0m") if __name__ == "__main__": diff --git a/yapapi/runner/__init__.py b/yapapi/runner/__init__.py index 306488979..969068851 100644 --- a/yapapi/runner/__init__.py +++ b/yapapi/runner/__init__.py @@ -2,42 +2,35 @@ """ import abc -import sys -import os -import asyncio from datetime import datetime, timedelta, timezone from decimal import Decimal -from enum import Enum, auto -import itertools +import os +import sys from types import MappingProxyType from typing import ( - Optional, - TypeVar, - Generic, AsyncContextManager, + AsyncIterator, Callable, - Union, - cast, Dict, - NamedTuple, + Iterable, Mapping, - AsyncIterator, + NamedTuple, + Optional, Set, - Tuple, - Iterator, - Iterable, - ClassVar, + TypeVar, + Union, + cast, ) import traceback -from dataclasses import dataclass, asdict, field +from dataclasses import dataclass, field from typing_extensions import Final, AsyncGenerator from .ctx import WorkContext, CommandContainer, Work from .events import Event from . import events - +from .task import Task from .utils import AsyncWrapper from .. import rest from ..props import com, Activity, Identification, IdentificationKeys @@ -153,6 +146,10 @@ class _BufferItem(NamedTuple): proposal: rest.market.OfferProposal +D = TypeVar("D") # Type var for task data +R = TypeVar("R") # Type var for task result + + class Engine(AsyncContextManager): """Requestor engine. Used to run tasks based on a common package on providers.""" @@ -203,9 +200,9 @@ def __init__( async def map( self, - worker: Callable[[WorkContext, AsyncIterator["Task"]], AsyncGenerator[Work, None]], - data: Iterable["Task"], - ): + worker: Callable[[WorkContext, AsyncIterator[Task[D, R]]], AsyncGenerator[Work, None]], + data: Iterable[Task[D, R]], + ) -> AsyncIterator[Task[D, R]]: """Run computations on providers. :param worker: a callable that takes a WorkContext object and a list o tasks, @@ -231,11 +228,6 @@ async def map( ), ) - yield { - "allocation": self._budget_allocation.id, - **asdict(await self._budget_allocation.details()), - } - emit(events.ComputationStarted()) # Building offer @@ -252,6 +244,8 @@ async def map( activity_api: rest.Activity = self._activity_api strategy = self._strategy work_queue = SmartQueue(data) + done_queue: asyncio.Queue[Task[D, R]] = asyncio.Queue() + workers: Set[asyncio.Task[None]] = set() last_wid = 0 @@ -259,7 +253,7 @@ async def map( invoices: Dict[str, rest.payment.Invoice] = dict() payment_closing: bool = False - async def process_invoices(): + async def process_invoices() -> None: assert self._budget_allocation allocation: rest.payment.Allocation = self._budget_allocation async for invoice in self._payment_api.incoming_invoices(): @@ -278,7 +272,7 @@ async def process_invoices(): if payment_closing and not agreements_to_pay: break - async def accept_payment_for_agreement(agreement_id: str, *, partial=False) -> bool: + async def accept_payment_for_agreement(agreement_id: str, *, partial: bool = False) -> bool: assert self._budget_allocation allocation: rest.payment.Allocation = self._budget_allocation emit(events.PaymentPrepared(agr_id=agreement_id)) @@ -296,7 +290,7 @@ async def accept_payment_for_agreement(agreement_id: str, *, partial=False) -> b await inv.accept(amount=inv.amount, allocation=allocation) return True - async def find_offers(): + async def find_offers() -> None: try: subscription = await builder.subscribe(market_api) except Exception as ex: @@ -339,7 +333,7 @@ async def find_offers(): # ) storage_manager = await self._stack.enter_async_context(gftp.provider()) - async def start_worker(agreement: rest.market.Agreement): + async def start_worker(agreement: rest.market.Agreement) -> None: nonlocal last_wid wid = last_wid last_wid += 1 @@ -412,7 +406,7 @@ async def start_worker(agreement: rest.market.Agreement): await accept_payment_for_agreement(agreement.id) emit(events.WorkerFinished(agr_id=agreement.id)) - async def worker_starter(): + async def worker_starter() -> None: while True: await asyncio.sleep(2) if offer_buffer and len(workers) < self._conf.max_workers: @@ -443,22 +437,30 @@ async def worker_starter(): wait_until_done = loop.create_task(work_queue.wait_until_done()) # Py38: find_offers_task.set_name('find_offers_task') try: + get_done_task: Optional[asyncio.Task] = None services = { find_offers_task, loop.create_task(worker_starter()), process_invoices_job, wait_until_done, } - while wait_until_done in services: + + while wait_until_done in services or not done_queue.empty(): if datetime.now(timezone.utc) > self._expires: raise TimeoutError(f"task timeout exceeded. timeout={self._conf.timeout}") + + if not get_done_task: + get_done_task = loop.create_task(done_queue.get()) + services.add(get_done_task) + done, pending = await asyncio.wait( services.union(workers), timeout=10, return_when=asyncio.FIRST_COMPLETED ) + for task in done: # if an exception occurred when a service task was running if task in services and not task.cancelled() and task.exception(): - raise cast(Exception, task.exception()) + raise cast(BaseException, task.exception()) if task in workers: try: await task @@ -469,7 +471,12 @@ async def worker_starter(): workers -= done services -= done - yield {"stage": "all work done"} + assert get_done_task + if get_done_task.done(): + yield get_done_task.result() + assert get_done_task not in services + get_done_task = None + emit(events.ComputationFinished()) except Exception as e: if ( @@ -498,16 +505,13 @@ async def worker_starter(): timeout=5, return_when=asyncio.ALL_COMPLETED, ) - yield {"stage": "wait for invoices", "agreements_to_pay": agreements_to_pay} payment_closing = True if agreements_to_pay: await asyncio.wait( {process_invoices_job}, timeout=15, return_when=asyncio.ALL_COMPLETED ) - yield {"done": True} - - async def __aenter__(self): + async def __aenter__(self) -> "Engine": stack = self._stack # TODO: Cleanup on exception here. @@ -531,134 +535,6 @@ async def __aexit__(self, exc_type, exc_val, exc_tb): await self._stack.aclose() -class TaskStatus(Enum): - WAITING = auto() - RUNNING = auto() - ACCEPTED = auto() - REJECTED = auto() - - -TaskData = TypeVar("TaskData") -TaskResult = TypeVar("TaskResult") -TaskEvents = Union[events.TaskAccepted, events.TaskRejected] - - -class Task(Generic[TaskData, TaskResult], object): - """One computation unit. - - Represents one computation unit that will be run on the provider - (e.g. rendering of one frame). - """ - - ids: ClassVar[Iterator[int]] = itertools.count(1) - - def __init__( - self, - data: TaskData, - *, - expires: Optional[datetime] = None, - timeout: Optional[timedelta] = None, - ): - """Create a new Task object. - - :param data: contains information needed to prepare command list for the provider - :param expires: expiration datetime - :param timeout: timeout from now; overrides expires parameter if provided - """ - self.id: str = str(next(Task.ids)) - self._started = datetime.now() - self._expires: Optional[datetime] - self._emit: Optional[Callable[[TaskEvents], None]] = None - self._callbacks: Set[Callable[["Task", str], None]] = set() - self._handle: Optional[ - Tuple[Handle["Task[TaskData, TaskResult]"], SmartQueue["Task[TaskData, TaskResult]"]] - ] = None - if timeout: - self._expires = self._started + timeout - else: - self._expires = expires - - self._result: Optional[TaskResult] = None - self._data = data - self._status: TaskStatus = TaskStatus.WAITING - - def _add_callback(self, callback): - self._callbacks.add(callback) - - def __repr__(self): - return f"Task(id={self.id}, data={self._data})" - - def _start(self, emitter: Callable[[TaskEvents], None]) -> None: - self._status = TaskStatus.RUNNING - self._emit = emitter - - def _stop(self, retry: bool = False): - if self._handle: - (handle, queue) = self._handle - loop = asyncio.get_event_loop() - if retry: - loop.create_task(queue.reschedule(handle)) - else: - loop.create_task(queue.mark_done(handle)) - - @staticmethod - def for_handle( - handle: Handle["Task[TaskData, TaskResult]"], - queue: SmartQueue["Task[TaskData, TaskResult]"], - emitter: Callable[[Event], None], - ) -> "Task[TaskData, TaskResult]": - task = handle.data - task._handle = (handle, queue) - task._start(emitter) - return task - - @property - def data(self) -> TaskData: - return self._data - - @property - def output(self) -> Optional[TaskResult]: - return self._result - - @property - def expires(self): - return self._expires - - def accept_task(self, result: Optional[TaskResult] = None): - """Accept task that was completed. - - Must be called when the results of a task are correct and it shouldn't be retried. - - :param result: computation result (optional) - :return: None - """ - if self._emit: - self._emit(events.TaskAccepted(task_id=self.id, result=result)) - assert self._status == TaskStatus.RUNNING, "Task not running" - self._status = TaskStatus.ACCEPTED - self._stop() - for cb in self._callbacks: - cb(self, "accept") - - def reject_task(self, reason: Optional[str] = None, retry: bool = False): - """Reject task. - - Must be called when the results of the task - are not correct and it should be retried. - - :param reason: task rejection description (optional) - :return: None - """ - if self._emit: - self._emit(events.TaskRejected(task_id=self.id, reason=reason)) - assert self._status == TaskStatus.RUNNING, "Rejected task was not running" - self._status = TaskStatus.REJECTED - self._stop(retry) - - for cb in self._callbacks: - cb(self, "reject") - - class Package(abc.ABC): @abc.abstractmethod async def resolve_url(self) -> str: diff --git a/yapapi/runner/events.py b/yapapi/runner/events.py index 5acfbcb96..83d13e13f 100644 --- a/yapapi/runner/events.py +++ b/yapapi/runner/events.py @@ -145,7 +145,7 @@ class TaskEvent(Event): @dataclass class TaskStarted(AgreementEvent, TaskEvent): - task_data: str + task_data: Any @dataclass diff --git a/yapapi/runner/task.py b/yapapi/runner/task.py new file mode 100644 index 000000000..b7bd3ac1d --- /dev/null +++ b/yapapi/runner/task.py @@ -0,0 +1,137 @@ +import asyncio +from datetime import datetime, timedelta +from enum import Enum, auto +import itertools +from typing import Callable, ClassVar, Iterator, Generic, Optional, Set, Tuple, TypeVar, Union + +import yapapi.runner.events as events +from yapapi.runner._smartq import SmartQueue, Handle + + +class TaskStatus(Enum): + WAITING = auto() + RUNNING = auto() + ACCEPTED = auto() + REJECTED = auto() + + +TaskData = TypeVar("TaskData") +TaskResult = TypeVar("TaskResult") +TaskEvents = Union[events.TaskAccepted, events.TaskRejected] + + +class Task(Generic[TaskData, TaskResult]): + """One computation unit. + + Represents one computation unit that will be run on the provider + (e.g. rendering of one frame). + """ + + ids: ClassVar[Iterator[int]] = itertools.count(1) + + def __init__( + self, + data: TaskData, + *, + expires: Optional[datetime] = None, + timeout: Optional[timedelta] = None, + ): + """Create a new Task object. + + :param data: contains information needed to prepare command list for the provider + :param expires: expiration datetime + :param timeout: timeout from now; overrides expires parameter if provided + """ + self.id: str = str(next(Task.ids)) + self._started = datetime.now() + self._expires: Optional[datetime] + self._emit: Optional[Callable[[TaskEvents], None]] = None + self._callbacks: Set[Callable[["Task[TaskData, TaskResult]", str], None]] = set() + self._handle: Optional[ + Tuple[Handle["Task[TaskData, TaskResult]"], SmartQueue["Task[TaskData, TaskResult]"]] + ] = None + if timeout: + self._expires = self._started + timeout + else: + self._expires = expires + + self._result: Optional[TaskResult] = None + self._data = data + self._status: TaskStatus = TaskStatus.WAITING + + def _add_callback(self, callback: Callable[["Task[TaskData, TaskResult]", str], None]) -> None: + self._callbacks.add(callback) + + def __repr__(self) -> str: + return f"Task(id={self.id}, data={self._data})" + + def _start(self, emitter: Callable[[TaskEvents], None]) -> None: + self._status = TaskStatus.RUNNING + self._emit = emitter + + def _stop(self, retry: bool = False): + if self._handle: + (handle, queue) = self._handle + loop = asyncio.get_event_loop() + if retry: + loop.create_task(queue.reschedule(handle)) + else: + loop.create_task(queue.mark_done(handle)) + + @staticmethod + def for_handle( + handle: Handle["Task[TaskData, TaskResult]"], + queue: SmartQueue["Task[TaskData, TaskResult]"], + emitter: Callable[[Event], None], + ) -> "Task[TaskData, TaskResult]": + task = handle.data + task._handle = (handle, queue) + task._start(emitter) + return task + + @property + def data(self) -> TaskData: + return self._data + + @property + def output(self) -> Optional[TaskResult]: + return self._result + + @property + def expires(self) -> Optional[datetime]: + return self._expires + + def accept_task(self, result: Optional[TaskResult] = None) -> None: + """Accept task that was completed. + + Must be called when the results of a task are correct and it shouldn't be retried. + + :param result: computation result (optional) + :return: None + """ + if self._emit: + self._emit(events.TaskAccepted(task_id=self.id, result=result)) + assert self._status == TaskStatus.RUNNING, "Accepted task not running" + self._status = TaskStatus.ACCEPTED + self._result = result + self._stop() + for cb in self._callbacks: + cb(self, "accept") + + def reject_task(self, reason: Optional[str] = None, retry: bool = False) -> None: + """Reject task. + + Must be called when the results of the task + are not correct and it should be retried. + + :param reason: task rejection description (optional) + :return: None + """ + if self._emit: + self._emit(events.TaskRejected(task_id=self.id, reason=reason)) + assert self._status == TaskStatus.RUNNING, "Rejected task not running" + self._status = TaskStatus.REJECTED + self._stop(retry) + + for cb in self._callbacks: + cb(self, "reject") From 1a77d8af6bc6720acdb0b961d36625c54f94e912 Mon Sep 17 00:00:00 2001 From: azawlocki Date: Wed, 30 Sep 2020 17:12:20 +0200 Subject: [PATCH 2/4] Add accepted tasks to `done_queue` --- yapapi/runner/__init__.py | 16 ++++++++++++++-- yapapi/runner/task.py | 13 ++++++++----- 2 files changed, 22 insertions(+), 7 deletions(-) diff --git a/yapapi/runner/__init__.py b/yapapi/runner/__init__.py index 969068851..5a46f9a20 100644 --- a/yapapi/runner/__init__.py +++ b/yapapi/runner/__init__.py @@ -30,7 +30,7 @@ from .ctx import WorkContext, CommandContainer, Work from .events import Event from . import events -from .task import Task +from .task import Task, TaskStatus from .utils import AsyncWrapper from .. import rest from ..props import com, Activity, Identification, IdentificationKeys @@ -243,9 +243,21 @@ async def map( market_api = self._market_api activity_api: rest.Activity = self._activity_api strategy = self._strategy - work_queue = SmartQueue(data) + done_queue: asyncio.Queue[Task[D, R]] = asyncio.Queue() + def on_task_done(task: Task[D, R], status: TaskStatus) -> None: + """Callback run when `task` is accepted or rejected.""" + if status == TaskStatus.ACCEPTED: + done_queue.put_nowait(task) + + def input_tasks() -> Iterable[Task[D, R]]: + for task in data: + task._add_callback(on_task_done) + yield task + + work_queue = SmartQueue(input_tasks()) + workers: Set[asyncio.Task[None]] = set() last_wid = 0 diff --git a/yapapi/runner/task.py b/yapapi/runner/task.py index b7bd3ac1d..2a2c9da37 100644 --- a/yapapi/runner/task.py +++ b/yapapi/runner/task.py @@ -46,7 +46,7 @@ def __init__( self._started = datetime.now() self._expires: Optional[datetime] self._emit: Optional[Callable[[TaskEvents], None]] = None - self._callbacks: Set[Callable[["Task[TaskData, TaskResult]", str], None]] = set() + self._callbacks: Set[Callable[["Task[TaskData, TaskResult]", TaskStatus], None]] = set() self._handle: Optional[ Tuple[Handle["Task[TaskData, TaskResult]"], SmartQueue["Task[TaskData, TaskResult]"]] ] = None @@ -59,7 +59,10 @@ def __init__( self._data = data self._status: TaskStatus = TaskStatus.WAITING - def _add_callback(self, callback: Callable[["Task[TaskData, TaskResult]", str], None]) -> None: + def _add_callback( + self, + callback: Callable[["Task[TaskData, TaskResult]", TaskStatus], None] + ) -> None: self._callbacks.add(callback) def __repr__(self) -> str: @@ -82,7 +85,7 @@ def _stop(self, retry: bool = False): def for_handle( handle: Handle["Task[TaskData, TaskResult]"], queue: SmartQueue["Task[TaskData, TaskResult]"], - emitter: Callable[[Event], None], + emitter: Callable[[events.Event], None], ) -> "Task[TaskData, TaskResult]": task = handle.data task._handle = (handle, queue) @@ -116,7 +119,7 @@ def accept_task(self, result: Optional[TaskResult] = None) -> None: self._result = result self._stop() for cb in self._callbacks: - cb(self, "accept") + cb(self, TaskStatus.ACCEPTED) def reject_task(self, reason: Optional[str] = None, retry: bool = False) -> None: """Reject task. @@ -134,4 +137,4 @@ def reject_task(self, reason: Optional[str] = None, retry: bool = False) -> None self._stop(retry) for cb in self._callbacks: - cb(self, "reject") + cb(self, TaskStatus.REJECTED) From fc2ed6ea63af52ce97c47d1a6b246903cb7b46c0 Mon Sep 17 00:00:00 2001 From: azawlocki Date: Wed, 30 Sep 2020 17:15:19 +0200 Subject: [PATCH 3/4] Replace possibly buggy `log_event_json` with `log_event_repr` --- examples/blender/blender.py | 2 +- yapapi/log.py | 15 +++++++-------- yapapi/runner/task.py | 3 +-- 3 files changed, 9 insertions(+), 11 deletions(-) diff --git a/examples/blender/blender.py b/examples/blender/blender.py index 4d6be53b3..c664530f9 100755 --- a/examples/blender/blender.py +++ b/examples/blender/blender.py @@ -3,7 +3,7 @@ import pathlib import sys -from yapapi.log import enable_default_logger, log_summary, log_event_json # noqa +from yapapi.log import enable_default_logger, log_summary, log_event_repr # noqa from yapapi.runner import Engine, Task, vm from yapapi.runner.ctx import WorkContext from datetime import timedelta diff --git a/yapapi/log.py b/yapapi/log.py index 0db300842..769b7bf06 100644 --- a/yapapi/log.py +++ b/yapapi/log.py @@ -24,9 +24,9 @@ ``` Engine(..., event_emitter=yapapi.log.log_event) ``` -For even more detailed machine-readable output use `log_event_json`: +For even more detailed machine-readable output use `log_event_repr`: ``` - Engine(..., event_emitter=yapapi.log.log_event_json) + Engine(..., event_emitter=yapapi.log.log_event_repr) ``` For summary human-readable output use `log_summary()`: ``` @@ -37,7 +37,7 @@ ``` Engine( ... - event_emitter=yapapi.log.log_summary(yapapi.log.log_event_json) + event_emitter=yapapi.log.log_summary(yapapi.log.log_event_repr) ) ``` """ @@ -151,11 +151,10 @@ def _format(obj: Any, max_len: int = 200) -> str: logger.log(loglevel, msg) -def log_event_json(event: events.Event) -> None: - """Log an event as a tag with attributes in JSON format.""" - (exc_info, event) = event.extract_exc_info() - info = {name: str(value) for name, value in asdict(event).items()} - logger.debug("%s %s", type(event).__name__, json.dumps(info) if info else "", exc_info=exc_info) +def log_event_repr(event: events.Event) -> None: + """Log the result of calling `__repr__()` for the `event`.""" + exc_info, _ = event.extract_exc_info() + logger.debug("%r", event, exc_info=exc_info) class SummaryLogger: diff --git a/yapapi/runner/task.py b/yapapi/runner/task.py index 2a2c9da37..60d23b509 100644 --- a/yapapi/runner/task.py +++ b/yapapi/runner/task.py @@ -60,8 +60,7 @@ def __init__( self._status: TaskStatus = TaskStatus.WAITING def _add_callback( - self, - callback: Callable[["Task[TaskData, TaskResult]", TaskStatus], None] + self, callback: Callable[["Task[TaskData, TaskResult]", TaskStatus], None] ) -> None: self._callbacks.add(callback) From 93d3d9b66c9cb5b262d38851a1fb17b22162f936 Mon Sep 17 00:00:00 2001 From: azawlocki Date: Wed, 30 Sep 2020 17:27:20 +0200 Subject: [PATCH 4/4] Use relative imports in `task.py` to avoid import errors in Python 3.6 --- yapapi/runner/task.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/yapapi/runner/task.py b/yapapi/runner/task.py index 60d23b509..68c4be87e 100644 --- a/yapapi/runner/task.py +++ b/yapapi/runner/task.py @@ -4,8 +4,8 @@ import itertools from typing import Callable, ClassVar, Iterator, Generic, Optional, Set, Tuple, TypeVar, Union -import yapapi.runner.events as events -from yapapi.runner._smartq import SmartQueue, Handle +from . import events +from ._smartq import SmartQueue, Handle class TaskStatus(Enum):