From 470bbe0325d9a1ed968902e7396278b8333bb710 Mon Sep 17 00:00:00 2001 From: shadeofblue Date: Tue, 23 May 2023 16:44:46 +0200 Subject: [PATCH 1/6] Network serialize/deserialize --- yapapi/network.py | 28 ++++++++++++++++++++++++++++ 1 file changed, 28 insertions(+) diff --git a/yapapi/network.py b/yapapi/network.py index 36116b89d..5dce1c8b5 100644 --- a/yapapi/network.py +++ b/yapapi/network.py @@ -315,6 +315,34 @@ def _next_address(self) -> IpAddress: except StopIteration: raise NetworkError(f"No more addresses available in '{self._ip_network.with_netmask}'.") + def serialize(self) -> dict: + """Provide a complete dictionary of values allowing reconstruction of a Network object.""" + return { + "_network_id": self._network_id, + "ip": self._ip_network.with_netmask, + "gateway": self.gateway, + "owner_id": self._owner_id, + "owner_ip": self.owner_ip, + "state": self.state.value, + "nodes": {_id: n.ip for _id, n in self._nodes.items()}, + } + + @classmethod + def deserialize(cls, net_api: "yapapi.rest.net.Net", obj_dict: dict) -> "Network": + network = cls( + net_api, + ip=obj_dict.get("ip"), + owner_id=obj_dict.get("owner_id"), + owner_ip=obj_dict.get("owner_ip"), + gateway=obj_dict.get("gateway"), + ) + network._network_id = obj_dict.get("_network_id") + network._state_machine.current_state_value = obj_dict.get("state") + if obj_dict.get("nodes"): + for _id, ip in obj_dict.get("nodes").items(): + network._nodes[_id] = Node(network=network, node_id=_id, ip=ip) + return network + class NetworkError(Exception): """Exception raised by :class:`Network` when an operation is not possible.""" From 6b221142469cc93234881bbe15143c67ff48bf1b Mon Sep 17 00:00:00 2001 From: shadeofblue Date: Tue, 20 Jun 2023 06:14:38 +0200 Subject: [PATCH 2/6] Blue/serialize dirty (#1136) * serialize service API objects * suspend / resume the Service API * skip waiting for payments on suspend * move the original webapp to `webapp.py`, add a new `webapp_suspend_resume.py` example --- examples/webapp/webapp_suspend_resume.py | 296 +++++++++++++++++++++++ yapapi/agreements_pool.py | 92 ++++--- yapapi/engine.py | 75 +++++- yapapi/golem.py | 71 ++++-- yapapi/invoice_manager.py | 10 +- yapapi/network.py | 25 +- yapapi/rest/activity.py | 24 +- yapapi/rest/market.py | 8 +- yapapi/services/__init__.py | 3 +- yapapi/services/cluster.py | 34 ++- yapapi/services/service.py | 23 +- yapapi/services/service_runner.py | 170 ++++++++++--- yapapi/services/service_state.py | 10 + 13 files changed, 711 insertions(+), 130 deletions(-) create mode 100755 examples/webapp/webapp_suspend_resume.py diff --git a/examples/webapp/webapp_suspend_resume.py b/examples/webapp/webapp_suspend_resume.py new file mode 100755 index 000000000..49aac93f0 --- /dev/null +++ b/examples/webapp/webapp_suspend_resume.py @@ -0,0 +1,296 @@ +#!/usr/bin/env python3 +import asyncio +import json +import pathlib +import sys +from datetime import datetime, timedelta +from decimal import Decimal + +from yapapi import Golem +from yapapi.contrib.service.http_proxy import HttpProxyService, LocalHttpProxy +from yapapi.network import Network +from yapapi.payload import vm +from yapapi.props import com +from yapapi.services import Service, ServiceState +from yapapi.strategy import ( + DecreaseScoreForUnconfirmedAgreement, + LeastExpensiveLinearPayuMS, + PROP_DEBIT_NOTE_INTERVAL_SEC, + PropValueRange, +) + +examples_dir = pathlib.Path(__file__).resolve().parent.parent +sys.path.append(str(examples_dir)) + +from utils import ( + TEXT_COLOR_CYAN, + TEXT_COLOR_DEFAULT, + build_parser, + print_env_info, + run_golem_example, +) + +HTTP_IMAGE_HASH = "c37c1364f637c199fe710ca62241ff486db92c875b786814c6030aa1" +DB_IMAGE_HASH = "85021afecf51687ecae8bdc21e10f3b11b82d2e3b169ba44e177340c" + +STARTING_TIMEOUT = timedelta(minutes=4) + + +DEBIT_NOTE_INTERVAL_SEC = 3600 + + +class HttpService(HttpProxyService): + def __init__(self, db_address: str, db_port: int = 4001): + super().__init__(remote_port=5000) + self._db_address = db_address + self._db_port = db_port + + @staticmethod + async def get_payload(): + return await vm.repo( + image_hash=HTTP_IMAGE_HASH, + capabilities=[vm.VM_CAPS_VPN], + ) + + async def start(self): + # perform the initialization of the Service + # (which includes sending the network details within the `deploy` command) + async for script in super().start(): + yield script + + script = self._ctx.new_script(timeout=timedelta(seconds=20)) + + script.run( + "/bin/bash", + "-c", + f"cd /webapp && python app.py " + f"--db-address {self._db_address} " + f"--db-port {self._db_port}" + f" initdb", + ) + script.run( + "/bin/bash", + "-c", + f"cd /webapp && python app.py " + f"--db-address {self._db_address} " + f"--db-port {self._db_port} " + f"run > /webapp/out 2> /webapp/err &", + ) + yield script + + def _serialize_init_params(self): + return {"db_address": self._db_address, "db_port": self._db_port, } + + +class DbService(Service): + @staticmethod + async def get_payload(): + return await vm.repo( + image_hash=DB_IMAGE_HASH, + capabilities=[vm.VM_CAPS_VPN], + ) + + async def start(self): + # perform the initialization of the Service + # (which includes sending the network details within the `deploy` command) + async for script in super().start(): + yield script + + script = self._ctx.new_script(timeout=timedelta(seconds=30)) + script.run("/bin/run_rqlite.sh") + yield script + + +class MyMarketStrategy(LeastExpensiveLinearPayuMS): + acceptable_prop_value_range_overrides = { + PROP_DEBIT_NOTE_INTERVAL_SEC: PropValueRange(DEBIT_NOTE_INTERVAL_SEC, None), + } + + +async def main(subnet_tag, payment_driver, payment_network, port): + + base_strategy = MyMarketStrategy( + max_fixed_price=Decimal("1.0"), + max_price_for={com.Counter.CPU: Decimal("0.2"), com.Counter.TIME: Decimal("0.1")}, + ) + strategy = DecreaseScoreForUnconfirmedAgreement(base_strategy, 0.5) + + golem = Golem( + budget=1.0, + subnet_tag=subnet_tag, + payment_driver=payment_driver, + payment_network=payment_network, + strategy=strategy, + ) + + golem.add_event_consumer(strategy.on_event) + + await golem.start() + + print_env_info(golem) + + network = await golem.create_network("192.168.0.1/24") + db_cluster = await golem.run_service(DbService, network=network) + db_instance = db_cluster.instances[0] + + def still_starting(cluster): + return any( + i.state in (ServiceState.pending, ServiceState.starting) for i in cluster.instances + ) + + def raise_exception_if_still_starting(cluster): + if still_starting(cluster): + raise Exception( + f"Failed to start {cluster} instances " + f"after {STARTING_TIMEOUT.total_seconds()} seconds" + ) + + commissioning_time = datetime.now() + + while still_starting(db_cluster) and datetime.now() < commissioning_time + STARTING_TIMEOUT: + print(db_cluster.instances) + await asyncio.sleep(5) + + raise_exception_if_still_starting(db_cluster) + + print(f"{TEXT_COLOR_CYAN}DB instance started, spawning the web server{TEXT_COLOR_DEFAULT}") + + web_cluster = await golem.run_service( + HttpService, + network=network, + instance_params=[{"db_address": db_instance.network_node.ip}], + ) + + # wait until all remote http instances are started + + while still_starting(web_cluster) and datetime.now() < commissioning_time + STARTING_TIMEOUT: + print(web_cluster.instances + db_cluster.instances) + await asyncio.sleep(5) + + raise_exception_if_still_starting(web_cluster) + + # service instances started, start the local HTTP server + + proxy = LocalHttpProxy(web_cluster, port) + await proxy.run() + + print( + f"{TEXT_COLOR_CYAN}Local HTTP server listening on:\n" + f"http://localhost:{port}{TEXT_COLOR_DEFAULT}" + ) + + secs = 7 + print(f"{TEXT_COLOR_CYAN}waiting {secs} seconds...{TEXT_COLOR_DEFAULT}") + await asyncio.sleep(secs) + + await proxy.stop() + print(f"{TEXT_COLOR_CYAN}HTTP server stopped{TEXT_COLOR_DEFAULT}") + + print("=================================================================== SERIALIZING AND DROPPING CURRENT STATE") + + network_serialized = network.serialize() + db_serialized = db_cluster.serialize_instances() + web_serialized = web_cluster.serialize_instances() + + web_cluster.suspend() + db_cluster.suspend() + + print(f"{TEXT_COLOR_CYAN}waiting {secs} seconds...{TEXT_COLOR_DEFAULT}") + await asyncio.sleep(secs) + + print("=================================================================== STOPPING GOLEM ENGINE") + + await golem.stop(wait_for_payments=False) + + print(f"{TEXT_COLOR_CYAN}waiting {secs} seconds...{TEXT_COLOR_DEFAULT}") + await asyncio.sleep(secs) + + + print("=================================================================== SERIALIZED STATE: ") + + print(json.dumps([network_serialized, db_serialized, web_serialized], indent=4)) + + print(f"{TEXT_COLOR_CYAN}waiting {secs} seconds...{TEXT_COLOR_DEFAULT}") + await asyncio.sleep(secs) + + print("=================================================================== RESTARTING THE ENGINE AND THE SERVICES") + + golem = Golem( + budget=1.0, + subnet_tag=subnet_tag, + payment_driver=payment_driver, + payment_network=payment_network, + ) + + await golem.start() + + print_env_info(golem) + + + network = Network.deserialize(golem._engine._net_api, network_serialized) + + db_cluster = await golem.resume_service(DbService, instances=db_serialized, network=network) + web_cluster = await golem.resume_service(HttpService, instances=web_serialized, network=network) + + print([i.state for i in web_cluster.instances]) + + raise_exception_if_still_starting(web_cluster) + + proxy = LocalHttpProxy(web_cluster, port) + await proxy.run() + + print( + f"{TEXT_COLOR_CYAN}Local HTTP server listening on:\n" + f"http://localhost:{port}{TEXT_COLOR_DEFAULT}" + ) + + # wait until Ctrl-C + + while True: + print(web_cluster.instances + db_cluster.instances) + try: + await asyncio.sleep(10) + except (KeyboardInterrupt, asyncio.CancelledError): + break + + # perform the shutdown of the local http server and the service cluster + + await proxy.stop() + print(f"{TEXT_COLOR_CYAN}HTTP server stopped{TEXT_COLOR_DEFAULT}") + + web_cluster.stop() + db_cluster.stop() + + cnt = 0 + while cnt < 3 and any( + s.is_available for s in web_cluster.instances + db_cluster.instances + ): + print(web_cluster.instances + db_cluster.instances) + await asyncio.sleep(5) + cnt += 1 + + await network.remove() + await golem.stop() + + +if __name__ == "__main__": + parser = build_parser("Golem simple Web app example") + parser.add_argument( + "--port", + type=int, + default=8080, + help="The local port to listen on", + ) + now = datetime.now().strftime("%Y-%m-%d_%H.%M.%S") + parser.set_defaults(log_file=f"webapp-yapapi-{now}.log") + args = parser.parse_args() + + run_golem_example( + main( + subnet_tag=args.subnet_tag, + payment_driver=args.payment_driver, + payment_network=args.payment_network, + port=args.port, + ), + log_file=args.log_file, + ) diff --git a/yapapi/agreements_pool.py b/yapapi/agreements_pool.py index f3962975a..510c7fa16 100644 --- a/yapapi/agreements_pool.py +++ b/yapapi/agreements_pool.py @@ -10,7 +10,7 @@ from yapapi import events from yapapi.props import Activity, NodeInfo -from yapapi.rest.market import Agreement, AgreementDetails, ApiException, OfferProposal +from yapapi.rest.market import Agreement, AgreementDetails, ApiException, OfferProposal, Market logger = logging.getLogger(__name__) @@ -42,6 +42,7 @@ def __init__( self, emitter: Callable[..., events.Event], offer_recycler: Callable[[OfferProposal], None], + market_api: Market, ): self.emitter = emitter self.offer_recycler = offer_recycler @@ -49,6 +50,7 @@ def __init__( self._agreements: Dict[str, BufferedAgreement] = {} # agreement_id -> Agreement self._lock = asyncio.Lock() self.confirmed = 0 + self._market_api = market_api async def cycle(self): """Perform cyclic tasks. @@ -74,18 +76,26 @@ async def add_proposal(self, score: float, proposal: OfferProposal) -> None: ) async def use_agreement( - self, cbk: Callable[[Agreement], asyncio.Task] + self, cbk: Callable[[Agreement], asyncio.Task], agreement_id: Optional[str] = None ) -> Optional[asyncio.Task]: """Get an agreement and start the `cbk()` task within it.""" async with self._lock: - agreement = await self._get_agreement() + if not agreement_id: + agreement = await self._get_agreement() + else: + agreement = await self._fetch_existing_agreement(agreement_id) + if agreement is None: return None + task = cbk(agreement) - await self._set_worker(agreement.id, task) + self._set_worker(agreement.id, task) + + logger.debug("Using agreement: %s, worker task: %s", agreement, task) + return task - async def _set_worker(self, agreement_id: str, task: asyncio.Task) -> None: + def _set_worker(self, agreement_id: str, task: asyncio.Task) -> None: try: buffered_agreement = self._agreements[agreement_id] except KeyError: @@ -93,6 +103,46 @@ async def _set_worker(self, agreement_id: str, task: asyncio.Task) -> None: assert buffered_agreement.worker_task is None buffered_agreement.worker_task = task + async def _prepare_agreement( + self, + agreement: Agreement, + proposal: Optional[OfferProposal] = None, + requires_confirmation: bool = True, + ) -> Optional[Agreement]: + try: + agreement_details = await agreement.get_details() + provider_activity = agreement_details.provider_view.extract(Activity) + requestor_activity = agreement_details.requestor_view.extract(Activity) + node_info = agreement_details.provider_view.extract(NodeInfo) + logger.debug("New agreement. id: %s, provider: %s", agreement.id, node_info) + self.emitter(events.AgreementCreated, agreement=agreement) + except (ApiException, asyncio.TimeoutError, aiohttp.ClientOSError): + logger.debug("Cannot get agreement details. id: %s", agreement.id, exc_info=True) + self.emitter(events.AgreementRejected, agreement=agreement) + if proposal: + self.offer_recycler(proposal) + return None + if requires_confirmation and not await agreement.confirm(): + self.emitter(events.AgreementRejected, agreement=agreement) + if proposal: + self.offer_recycler(proposal) + return None + self._agreements[agreement.id] = BufferedAgreement( + agreement=agreement, + agreement_details=agreement_details, + worker_task=None, + has_multi_activity=bool( + provider_activity.multi_activity and requestor_activity.multi_activity + ), + ) + self.emitter(events.AgreementConfirmed, agreement=agreement) + self.confirmed += 1 + return agreement + + async def _fetch_existing_agreement(self, agreement_id) -> Optional[Agreement]: + agreement = Agreement(self._market_api._api, agreement_id) + return await self._prepare_agreement(agreement, requires_confirmation=False) + async def _get_agreement(self) -> Optional[Agreement]: """Return an Agreement. @@ -127,33 +177,8 @@ async def _get_agreement(self) -> Optional[Agreement]: exc_info = (type(e), e, sys.exc_info()[2]) emit(events.ProposalFailed, proposal=offer.proposal, exc_info=exc_info) raise - try: - agreement_details = await agreement.get_details() - provider_activity = agreement_details.provider_view.extract(Activity) - requestor_activity = agreement_details.requestor_view.extract(Activity) - node_info = agreement_details.provider_view.extract(NodeInfo) - logger.debug("New agreement. id: %s, provider: %s", agreement.id, node_info) - emit(events.AgreementCreated, agreement=agreement) - except (ApiException, asyncio.TimeoutError, aiohttp.ClientOSError): - logger.debug("Cannot get agreement details. id: %s", agreement.id, exc_info=True) - emit(events.AgreementRejected, agreement=agreement) - self.offer_recycler(offer.proposal) - return None - if not await agreement.confirm(): - emit(events.AgreementRejected, agreement=agreement) - self.offer_recycler(offer.proposal) - return None - self._agreements[agreement.id] = BufferedAgreement( - agreement=agreement, - agreement_details=agreement_details, - worker_task=None, - has_multi_activity=bool( - provider_activity.multi_activity and requestor_activity.multi_activity - ), - ) - emit(events.AgreementConfirmed, agreement=agreement) - self.confirmed += 1 - return agreement + + return await self._prepare_agreement(agreement, offer.proposal) async def release_agreement(self, agreement_id: str, allow_reuse: bool = True) -> None: """Mark agreement as unused. @@ -167,6 +192,7 @@ async def release_agreement(self, agreement_id: str, allow_reuse: bool = True) - buffered_agreement = self._agreements[agreement_id] except KeyError: return + logger.debug("Releasing agreement: %s", buffered_agreement) buffered_agreement.worker_task = None # Check whether agreement can be reused if not allow_reuse or not buffered_agreement.has_multi_activity: @@ -222,7 +248,7 @@ async def terminate_all(self, reason: dict) -> None: """Terminate all agreements.""" async with self._lock: - for agreement_id in frozenset(self._agreements): + for agreement_id, agreement in self._agreements.items(): await self._terminate_agreement(agreement_id, reason) async def on_agreement_terminated(self, agr_id: str, reason: dict) -> None: diff --git a/yapapi/engine.py b/yapapi/engine.py index 1b4d0e04d..9b932b992 100644 --- a/yapapi/engine.py +++ b/yapapi/engine.py @@ -165,6 +165,7 @@ def __init__( self._num_payable_debit_notes: Dict[ActivityId, int] = defaultdict(int) self._activity_created_at: Dict[ActivityId, datetime] = dict() self._payment_closing: bool = False + self._await_payments: bool = True self._process_invoices_job: Optional[asyncio.Task] = None @@ -229,11 +230,12 @@ def emit(self, event_class: Type[events.EventType], **kwargs) -> events.EventTyp def _emit_event(self, event: events.Event) -> None: self._event_consumer(event) - async def stop(self, *exc_info) -> Optional[bool]: + async def stop(self, *exc_info, wait_for_payments: bool = True) -> Optional[bool]: """Stop the engine. This *must* be called at the end of the work, by the Engine user. """ + self._await_payments = wait_for_payments if exc_info[0] is not None: self.emit(events.ExecutionInterrupted, exc_info=exc_info) return await self._stack.__aexit__(None, None, None) @@ -303,6 +305,7 @@ async def _shutdown(self, *exc_info): # Some generators created by `execute_tasks` may still have elements; # if we don't close them now, their jobs will never be marked as finished. + for gen in self._generators: await gen.aclose() @@ -320,7 +323,7 @@ async def _shutdown(self, *exc_info): # Wait for some time for invoices for unpaid agreements, # then cancel the invoices service - if self._process_invoices_job: + if self._process_invoices_job and self._await_payments: unpaid_agreements = self._invoice_manager.payable_unpaid_agreement_ids if unpaid_agreements: logger.info( @@ -386,11 +389,15 @@ def _get_allocation( async def _process_invoices(self) -> None: """Process incoming invoices.""" + invoice_manager = self._invoice_manager async for invoice in self._payment_api.incoming_invoices(): invoice_manager.add_invoice(invoice) await self._agreement_payment_attempt(invoice.agreement_id) - if self._payment_closing and not invoice_manager.has_payable_unpaid_agreements: + if self._payment_closing and not ( + self._await_payments and invoice_manager.has_payable_unpaid_agreements + ): + break async def accept_payments_for_agreement(self, job_id: str, agreement_id: str) -> None: @@ -629,11 +636,19 @@ async def create_activity(self, agreement_id: str) -> Activity: agreement_id, stream_events=self._stream_output ) + async def fetch_activity(self, activity_id: str) -> Activity: + """Create an activity for given `agreement_id`.""" + return await self._activity_api.use_activity( + activity_id, stream_events=self._stream_output + ) + async def start_worker( self, job: "Job", - run_worker: Callable[[WorkContext], Awaitable], + run_worker: Callable[[WorkContext], Awaitable[bool]], on_agreement_ready: Optional[Callable[[Agreement], None]] = None, + existing_agreement_id: Optional[str] = None, + existing_activity_id: Optional[str] = None, ) -> Optional[asyncio.Task]: loop = asyncio.get_event_loop() @@ -645,6 +660,9 @@ async def worker_task(agreement: Agreement): """ if on_agreement_ready: on_agreement_ready(agreement) + + logger.debug("Starting worker task on agreement %s", agreement) + self._all_agreements[agreement.id] = agreement self._invoice_manager.add_agreement(job, agreement) @@ -653,8 +671,17 @@ async def worker_task(agreement: Agreement): activity_start_time = datetime.now() try: - activity = await self.create_activity(agreement.id) - except Exception: + if existing_activity_id: + activity = await self.fetch_activity(existing_activity_id) + else: + activity = await self.create_activity(agreement.id) + except Exception as e: + logger.error( + "Activity init failed with error: %s. agreement: %s, existing activity id: %s", + e, + agreement.id, + existing_activity_id + ) job.emit(events.ActivityCreateFailed, agreement=agreement, exc_info=sys.exc_info()) raise @@ -663,15 +690,34 @@ async def worker_task(agreement: Agreement): self._activity_created_at[activity.id] = activity_start_time - async with activity: + allow_agreement_reuse = True + keep_activity = False + + try: self.accept_debit_notes_for_agreement(job.id, agreement.id) - await run_worker(work_context) + keep_activity = await run_worker(work_context) + except Exception: + logger.debug( + "Error while working on activity %s : [%s]", + activity.id, + sys.exc_info(), + ) + allow_agreement_reuse = False + finally: + logger.debug("Finished working with activity %s", activity.id) + + if not keep_activity: + await activity.destroy() # Providers may issue debit notes after activity ends. # This will prevent terminating agreements when this happens. self._activity_created_at.pop(activity.id, None) + # and release the agreement + await job.agreements_pool.release_agreement(agreement.id, allow_agreement_reuse) + return await job.agreements_pool.use_agreement( - lambda agreement: loop.create_task(worker_task(agreement)) + lambda agreement: loop.create_task(worker_task(agreement)), + agreement_id=existing_agreement_id, ) async def process_batches( @@ -683,7 +729,10 @@ async def process_batches( ) -> None: """Send command batches produced by `batch_generator` to `activity`.""" - script: Script = await batch_generator.__anext__() + try: + script: Script = await batch_generator.__anext__() + except Exception as e: + raise while True: batch_deadline = ( @@ -694,7 +743,7 @@ async def process_batches( await script._before() batch: List[BatchCommand] = script._evaluate() remote = await activity.send(batch, deadline=batch_deadline) - except Exception: + except Exception as e: script = await batch_generator.athrow(*sys.exc_info()) continue @@ -737,7 +786,7 @@ async def get_batch_results() -> List[events.CommandEvent]: future_results = loop.create_task(get_batch_results()) script = await batch_generator.asend(future_results) - def recycle_offer(self, offer: OfferProposal) -> None: + def recycle_offer(self, offer: OfferProposal) -> None: """Mark given offer as a fresh one, regardless of its previous processing. This offer was already processed, but something happened and we should treat it as a @@ -819,7 +868,7 @@ def __init__( self.expiration_time: datetime = expiration_time self.payload: Payload = payload - self.agreements_pool = AgreementsPool(self.emit, self.engine.recycle_offer) + self.agreements_pool = AgreementsPool(self.emit, self.engine.recycle_offer, market_api=self.engine._market_api) self.finished = asyncio.Event() self._demand_builder: Optional[DemandBuilder] = None diff --git a/yapapi/golem.py b/yapapi/golem.py index d87f81e04..da5e841f2 100644 --- a/yapapi/golem.py +++ b/yapapi/golem.py @@ -39,7 +39,7 @@ from yapapi.payload import Payload from yapapi.props import com from yapapi.script import Script -from yapapi.services import Cluster, ServiceType +from yapapi.services import Cluster, ServiceType, ServiceSerialization from yapapi.strategy import DecreaseScoreForUnconfirmedAgreement, LeastExpensiveLinearPayuMS if TYPE_CHECKING: @@ -285,12 +285,12 @@ async def start(self) -> None: await self._stop_with_exc_info(*sys.exc_info()) raise - async def stop(self) -> None: + async def stop(self, wait_for_payments: bool = True) -> None: """Stop the Golem engine after it was started in non-contextmanager mode. Details: :func:`Golem.start()` """ - await self._stop_with_exc_info(None, None, None) + await self._stop_with_exc_info(None, None, None, wait_for_payments=wait_for_payments) async def __aenter__(self) -> "Golem": await self.start() @@ -299,9 +299,9 @@ async def __aenter__(self) -> "Golem": async def __aexit__(self, *exc_info) -> Optional[bool]: return await self._stop_with_exc_info(*exc_info) - async def _stop_with_exc_info(self, *exc_info) -> Optional[bool]: + async def _stop_with_exc_info(self, *exc_info, wait_for_payments: bool = True) -> Optional[bool]: async with self._engine_state_lock: - res = await self._engine.stop(*exc_info) + res = await self._engine.stop(*exc_info, wait_for_payments=wait_for_payments) await self._event_dispatcher.stop() # Engine that was stopped is not usable anymore, there is no "full" cleanup. @@ -379,6 +379,50 @@ async def worker(context: WorkContext, tasks: AsyncIterable[Task]): async for t in executor.submit(worker, data, job_id=job_id): yield t + async def _init_cluster( + self, + service_class: Type[ServiceType], + payload: Optional[Payload] = None, + expiration: Optional[datetime] = None, + network: Optional[Network] = None, + ) -> Cluster[ServiceType]: + payload = payload or await service_class.get_payload() + + if not payload: + raise ValueError( + f"No payload returned from {service_class.__name__}.get_payload()" + " nor given in the `payload` argument." + ) + + return Cluster( + engine=self._engine, + service_class=service_class, + payload=payload, + expiration=expiration, + network=network, + ) + + async def resume_service( + self, + service_class: Type[ServiceType], + instances: List[ServiceSerialization], + payload: Optional[Payload] = None, + expiration: Optional[datetime] = None, + network: Optional[Network] = None, + ) -> Cluster[ServiceType]: + cluster = await self._init_cluster( + service_class=service_class, + payload=payload, + expiration=expiration, + network=network, + ) + + await self._engine.add_to_async_context(cluster) + + cluster.resume_instances(instances) + + return cluster + async def run_service( self, service_class: Type[ServiceType], @@ -474,25 +518,16 @@ async def main(): await asyncio.sleep(REFRESH_INTERVAL_SEC) """ # noqa: E501 - payload = payload or await service_class.get_payload() - - if not payload: - raise ValueError( - f"No payload returned from {service_class.__name__}.get_payload()" - " nor given in the `payload` argument." - ) - - if network_addresses and not network: - raise ValueError("`network_addresses` provided without a `network`.") - - cluster = Cluster( - engine=self._engine, + cluster = await self._init_cluster( service_class=service_class, payload=payload, expiration=expiration, network=network, ) + if network_addresses and not network: + raise ValueError("`network_addresses` provided without a `network`.") + await self._engine.add_to_async_context(cluster) cluster.spawn_instances(num_instances, instance_params, network_addresses) diff --git a/yapapi/invoice_manager.py b/yapapi/invoice_manager.py index 4a75b55c0..1d1cf7a73 100644 --- a/yapapi/invoice_manager.py +++ b/yapapi/invoice_manager.py @@ -43,15 +43,7 @@ def add_agreement(self, job: "Job", agreement: "Agreement") -> None: """Inform the InvoiceManager about a new agreement (so that we can use the agreement_id in \ the future).""" - ad = self._agreement_data.get(agreement.id) - if ad: - # Currently possible if we're having more than one activity for a single agreement - # (We could make some effort to ensure this method is called only once, when the - # agreement is created, but it will make the code more complex as we'll have to reach - # here from the AgreeementsPool) - assert ad.job is job and ad.agreement is agreement - else: - self._agreement_data[agreement.id] = AgreementData(agreement, job) + self._agreement_data[agreement.id] = AgreementData(agreement, job) def agreement_job(self, agreement_id: str) -> "Job": # NOTE: this has nothing to do with InvoiceManaging and is supposed to disappear diff --git a/yapapi/network.py b/yapapi/network.py index 5dce1c8b5..58aae3af7 100644 --- a/yapapi/network.py +++ b/yapapi/network.py @@ -1,7 +1,7 @@ import asyncio import logging from ipaddress import IPv4Address, IPv4Network, IPv6Address, IPv6Network, ip_address, ip_network -from typing import Dict, Optional, Union +from typing import Dict, Optional, TypedDict, Union from urllib.parse import urlparse from dataclasses import dataclass @@ -315,20 +315,27 @@ def _next_address(self) -> IpAddress: except StopIteration: raise NetworkError(f"No more addresses available in '{self._ip_network.with_netmask}'.") - def serialize(self) -> dict: + def serialize(self) -> "NetworkSerialization": """Provide a complete dictionary of values allowing reconstruction of a Network object.""" + assert self._network_id + assert self.state + return { - "_network_id": self._network_id, + "_network_id": self._network_id, # noqa "ip": self._ip_network.with_netmask, "gateway": self.gateway, "owner_id": self._owner_id, "owner_ip": self.owner_ip, - "state": self.state.value, + "state": self.state.value, # noqa "nodes": {_id: n.ip for _id, n in self._nodes.items()}, } @classmethod - def deserialize(cls, net_api: "yapapi.rest.net.Net", obj_dict: dict) -> "Network": + def deserialize( + cls, + net_api: "yapapi.rest.net.Net", + obj_dict: "NetworkSerialization", + ) -> "Network": network = cls( net_api, ip=obj_dict.get("ip"), @@ -343,6 +350,14 @@ def deserialize(cls, net_api: "yapapi.rest.net.Net", obj_dict: dict) -> "Network network._nodes[_id] = Node(network=network, node_id=_id, ip=ip) return network +class NetworkSerialization(TypedDict): + _network_id: str + ip: str + gateway: Optional[str] + owner_id: str + owner_ip: str + state: str + nodes: Dict[str, str] class NetworkError(Exception): """Exception raised by :class:`Network` when an operation is not possible.""" diff --git a/yapapi/rest/activity.py b/yapapi/rest/activity.py index b5a99319f..91d71f994 100644 --- a/yapapi/rest/activity.py +++ b/yapapi/rest/activity.py @@ -40,6 +40,16 @@ async def new_activity(self, agreement_id: str, stream_events: bool = False) -> activity_id = await self._api.create_activity(agreement_id) return Activity(self._api, self._state, activity_id, stream_events) + async def use_activity(self, activity_id: str, stream_events: bool = False) -> "Activity": + """Instantiate an Activity object based on an existing activity_id. + + :return: the object that represents the Activity + and allows to query and control its state + :rtype: Activity + """ + await self._state.get_activity_state(activity_id) + return Activity(self._api, self._state, activity_id, stream_events) + class Activity(AsyncContextManager["Activity"]): """Mid-level wrapper for REST's Activity endpoint.""" @@ -82,6 +92,14 @@ async def send(self, script: List[dict], deadline: Optional[datetime] = None) -> return StreamingBatch(self, batch_id, len(script), deadline) return PollingBatch(self, batch_id, len(script), deadline) + async def destroy(self): + """Destroy the Activity and free the execution unit.""" + try: + await self._api.destroy_activity(self._id) + _log.debug("Activity %s destroyed successfully", self._id) + except yexc.ApiException: + _log.debug("Got API Exception when destroying activity %s", self._id, exc_info=True) + async def __aenter__(self) -> "Activity": return self @@ -93,11 +111,7 @@ async def __aexit__(self, exc_type, exc_val, exc_tb) -> None: ) else: _log.debug("Destroying activity %s", self._id) - try: - await self._api.destroy_activity(self._id) - _log.debug("Activity %s destroyed successfully", self._id) - except yexc.ApiException: - _log.debug("Got API Exception when destroying activity %s", self._id, exc_info=True) + await self.destroy() @dataclass diff --git a/yapapi/rest/market.py b/yapapi/rest/market.py index e70a646f1..ffb27f233 100644 --- a/yapapi/rest/market.py +++ b/yapapi/rest/market.py @@ -58,7 +58,11 @@ def __init__(self, *, _ref: models.Agreement): class Agreement(object): """Mid-level interface to the REST's Agreement model.""" - def __init__(self, api: RequestorApi, subscription: "Subscription", agreement_id: str): + def __init__( + self, api: RequestorApi, + agreement_id: str, + subscription: Optional["Subscription"] = None + ): self._api = api self._subscription = subscription self._id = agreement_id @@ -177,7 +181,7 @@ async def create_agreement(self, timeout=timedelta(hours=1)) -> Agreement: ) api: RequestorApi = self._subscription._api agreement_id = await api.create_agreement(proposal) - return Agreement(api, self._subscription, agreement_id) + return Agreement(api, agreement_id, self._subscription) def __str__(self): proposal = self._proposal.proposal diff --git a/yapapi/services/__init__.py b/yapapi/services/__init__.py index a6dd40b73..5ca7e835c 100644 --- a/yapapi/services/__init__.py +++ b/yapapi/services/__init__.py @@ -1,5 +1,5 @@ from .cluster import Cluster -from .service import Service, ServiceInstance, ServiceType +from .service import Service, ServiceInstance, ServiceType, ServiceSerialization from .service_runner import ServiceRunner from .service_state import ServiceState @@ -9,5 +9,6 @@ "ServiceInstance", "ServiceType", "ServiceRunner", + "ServiceSerialization", "ServiceState", ) diff --git a/yapapi/services/cluster.py b/yapapi/services/cluster.py index d15d9eed2..9b7860660 100644 --- a/yapapi/services/cluster.py +++ b/yapapi/services/cluster.py @@ -15,7 +15,7 @@ from yapapi.network import Network from yapapi.payload import Payload -from .service import ServiceType +from .service import ServiceType, ServiceSerialization from .service_runner import ServiceRunner DEFAULT_SERVICE_EXPIRATION: Final[timedelta] = timedelta(minutes=180) @@ -69,6 +69,12 @@ def stop(self): for instance in self.instances: self.stop_instance(instance) + def suspend(self): + """Suspend all services in this :class:`Cluster`.""" + self.service_runner.suspend() + for instance in self.instances: + self.suspend_instance(instance) + async def _terminate(self, exc_type, exc_val, exc_tb): # NOTE: this might be called more then once (e.g. by `terminate()` followed by `__aexit__`), # but it's harmless, so we don't care @@ -113,9 +119,13 @@ def instances(self) -> List[ServiceType]: return self.service_runner.instances.copy() def stop_instance(self, service: ServiceType): - """Stop the specific :class:`Service` instance belonging to this :class:`Cluster`.""" + """Stop the specific :class:`Service` instance.""" self.service_runner.stop_instance(service) + def suspend_instance(self, service: ServiceType): + """Suspend the specific :class:`Service` instance.""" + self.service_runner.suspend_instance(service) + def spawn_instances( self, num_instances: Optional[int] = None, @@ -156,6 +166,23 @@ def spawn_instances( self.service_runner.add_instance(service, self.network, network_address) service._set_cluster(self) + def resume_instances(self, + serialized_instances: List[ServiceSerialization], + ): + for service_obj in serialized_instances: + service = self.service_class(**service_obj.get('params', dict())) + self.service_runner.add_existing_instance( + service, + service_obj.get("state"), + service_obj.get("agreement_id"), + service_obj.get("activity_id"), + self.network, + service_obj.get("network_node", dict()), + ) + service._set_cluster(self) + + + def _resolve_instance_params( self, num_instances: Optional[int], @@ -180,3 +207,6 @@ def _resolve_instance_params( def _default_expiration(self): return datetime.now(timezone.utc) + DEFAULT_SERVICE_EXPIRATION + + def serialize_instances(self) -> List[ServiceSerialization]: + return [i.serialize() for i in self.instances] diff --git a/yapapi/services/service.py b/yapapi/services/service.py index 756711d16..49dca2d05 100644 --- a/yapapi/services/service.py +++ b/yapapi/services/service.py @@ -12,6 +12,7 @@ Optional, Tuple, Type, + TypedDict, TypeVar, Union, ) @@ -62,8 +63,8 @@ class Service: _ctx: Optional["WorkContext"] = None _network_node: Optional[Node] = None - def __init__(self): - self.__id = str(uuid.uuid4()) + def __init__(self, _id: Optional[str] = None): + self.__id = _id or str(uuid.uuid4()) self.__inqueue: asyncio.Queue[ServiceSignal] = asyncio.Queue() self.__outqueue: asyncio.Queue[ServiceSignal] = asyncio.Queue() @@ -426,6 +427,24 @@ async def is_activity_responsive(self) -> bool: logger.error("Couldn't retrieve the activity state (%s)", e) return False + def _serialize_init_params(self) -> Dict[str, Any]: + return {"_id": self.id} + + def serialize(self) -> "ServiceSerialization": + return { + "params": self._serialize_init_params(), + "activity_id": self._ctx._activity.id, + "agreement_id": self._ctx._agreement.id, + "state": self.state.value, + "network_node": {"network_id": self._network_node.network.network_id, "node_id": self._network_node.node_id, "ip": self._network_node.ip, } + } + +class ServiceSerialization(TypedDict): + params: Dict[str, Any] + activity_id: str + agreement_id: str + state: str + network_node: Dict[str, str] ServiceType = TypeVar("ServiceType", bound=Service) diff --git a/yapapi/services/service_runner.py b/yapapi/services/service_runner.py index 461572c3c..9b0445b32 100644 --- a/yapapi/services/service_runner.py +++ b/yapapi/services/service_runner.py @@ -4,16 +4,19 @@ import logging import sys from types import TracebackType -from typing import TYPE_CHECKING, AsyncContextManager, List, Optional, Set, Tuple, Type, Union +from typing import TYPE_CHECKING, AsyncContextManager, Dict, List, Optional, Set, Tuple, Type, Union from typing_extensions import Final +import statemachine +import statemachine.exceptions + if TYPE_CHECKING: from yapapi.engine import Job from yapapi import events from yapapi.ctx import WorkContext -from yapapi.network import Network +from yapapi.network import Network, Node from yapapi.rest.activity import BatchError from yapapi.rest.market import Agreement @@ -41,7 +44,18 @@ class ControlSignal(enum.Enum): SeviceRunner.""" stop = "stop" + suspend = "suspend" + + +class ServiceRunnerState(statemachine.StateMachine): + """The state of a :class:`ServiceRunner`.""" + active = statemachine.State("active", initial=True) + stopped = statemachine.State("stopped") + suspended = statemachine.State("suspended") + + stop: statemachine.Transition = active.to(stopped) + suspend: statemachine.Transition = active.to(suspended) class ServiceRunner(AsyncContextManager): def __init__( @@ -61,7 +75,7 @@ def __init__( self._job = job self._instances: List[Service] = [] self._instance_tasks: List[asyncio.Task] = [] - self._stopped = False + self._state: ServiceRunnerState = ServiceRunnerState() self._health_check_interval = health_check_interval self._health_check_retries = health_check_retries @@ -73,6 +87,18 @@ def id(self) -> str: def instances(self): return self._instances.copy() + @property + def state(self): + return self._state.current_state + + @property + def stopped(self): + return self.state == ServiceRunnerState.stopped + + @property + def suspended(self): + return self.state == ServiceRunnerState.suspended + def add_instance( self, service: ServiceType, @@ -89,10 +115,49 @@ def add_instance( task = loop.create_task(self.spawn_instance(service, network, network_address)) self._instance_tasks.append(task) + def add_existing_instance( + self, + service: ServiceType, + state: str, + agreement_id: str, + activity_id: str, + network: Optional[Network] = None, + network_node_dict = Optional[Dict[str, str]], + ) -> None: + """Add an existing service to the collection of services managed by this ServiceRunner. + + The same object should never be managed by more than one ServiceRunner. + """ + + service.service_instance.service_state.current_state_value = state + + if network_node_dict: + service._set_network_node( + Node( + network=network, + node_id=network_node_dict.get("node_id"), + ip=network_node_dict.get("ip") + ) + ) + + self._instances.append(service) + + loop = asyncio.get_event_loop() + task = loop.create_task(self.spawn_instance(service, network, existing_agreement_id=agreement_id, existing_activity_id=activity_id)) + self._instance_tasks.append(task) + def stop_instance(self, service: Service): - """Stop the specific :class:`Service` instance belonging to this :class:`ServiceRunner`.""" + """Stop the specific :class:`Service` instance.""" service.service_instance.control_queue.put_nowait(ControlSignal.stop) + def suspend_instance(self, service: Service): + """Suspend the specific :class:`Service` instance.""" + service.service_instance.control_queue.put_nowait(ControlSignal.suspend) + + def suspend(self): + """Mark this runner suspended, so that its agreements are not killed when it exits.""" + self._state.suspend() + async def __aenter__(self): """Post a Demand and start collecting provider Offers for running service instances.""" @@ -113,19 +178,15 @@ def raise_if_failed(task): task.add_done_callback(raise_if_failed) self.__services.add(task) - async def agreements_pool_cycler(): - # shouldn't this be part of the Agreement pool itself? (or a task within Job?) - while True: - await asyncio.sleep(2) - await self._job.agreements_pool.cycle() - - self.__services.add(loop.create_task(agreements_pool_cycler())) - async def __aexit__(self, exc_type, exc_val, exc_tb): """Release resources used by this ServiceRunner.""" - self._stopped = True + try: + self._state.stop() + except statemachine.exceptions.TransitionNotAllowed: + """The ServiceRunner is not running, """ + pass - logger.debug("%s is shutting down...", self) + logger.debug("%s is shutting down... state: %s", self, self.state) if exc_type is not None: self._job.set_exc_info((exc_type, exc_val, exc_tb)) @@ -133,7 +194,7 @@ async def __aexit__(self, exc_type, exc_val, exc_tb): # Give the instance tasks some time to terminate gracefully. # Then cancel them without mercy! if self._instance_tasks: - logger.debug("Waiting for service instances to terminate...") + logger.debug("Waiting for service instances to terminate... %s", self._instance_tasks) _, still_running = await asyncio.wait(self._instance_tasks, timeout=10) if still_running: for task in still_running: @@ -141,21 +202,22 @@ async def __aexit__(self, exc_type, exc_val, exc_tb): task.cancel() await asyncio.gather(*still_running, return_exceptions=True) - # TODO: should be different if we stop due to an error - termination_reason = { - "message": "Successfully finished all work", - "golem.requestor.code": "Success", - } + if self.stopped: + # TODO: should be different if we stop due to an error + termination_reason = { + "message": "Successfully finished all work", + "golem.requestor.code": "Success", + } - try: - logger.debug("Terminating agreements...") - await self._job.agreements_pool.terminate_all(reason=termination_reason) - except Exception: - logger.debug("Couldn't terminate agreements", exc_info=True) + try: + logger.debug("Terminating agreements on %s", self) + await self._job.agreements_pool.terminate_all(reason=termination_reason) + except Exception: + logger.debug("Couldn't terminate agreements", exc_info=True) for task in self.__services: if not task.done(): - logger.debug("Cancelling task: %s", task) + logger.debug("Cancelling task: %s on %s", task, self) task.cancel() await asyncio.gather(*self.__services, return_exceptions=True) @@ -167,6 +229,7 @@ def _get_handler(instance: ServiceInstance): ServiceState.starting: instance.service.start, ServiceState.running: instance.service.run, ServiceState.stopping: instance.service.shutdown, + ServiceState.suspended: None, } handler = _handlers.get(instance.state, None) if handler: @@ -202,6 +265,8 @@ def _change_state( elif isinstance(event, tuple) or event == ControlSignal.stop: # Transition on error or `stop` signal instance.service_state.error_or_stop() + elif event == ControlSignal.suspend: + instance.service_state.suspend() else: # Unhandled signal, don't change the state assert isinstance(event, ControlSignal) @@ -244,7 +309,10 @@ async def _ensure_alive(self, service: Service): async def _run_instance(self, instance: ServiceInstance): loop = asyncio.get_event_loop() - logger.info("%s commissioned", instance.service) + if instance.state == ServiceState.starting: + logger.info("%s commissioned", instance.service) + else: + logger.info("%s resumed", instance.service) handler = None batch_task: Optional[asyncio.Task] = None @@ -362,17 +430,21 @@ def change_state(event: Union[ControlSignal, ExcInfo] = (None, None, None)) -> N except asyncio.CancelledError: pass - logger.info("%s decommissioned", instance.service) + if instance.state == ServiceState.terminated: + logger.info("%s decommissioned", instance.service) async def spawn_instance( self, service: ServiceType, network: Optional[Network] = None, network_address: Optional[str] = None, + existing_agreement_id: Optional[str] = None, + existing_activity_id: Optional[str] = None, + ) -> None: """Lifecycle the service within this :class:`ServiceRunner`. - :param service: instance of the service class, expected to be in a pending state + :param service: instance of the service class :param network: a :class:`~yapapi.network.Network` this service should be attached to :param network_address: the address withing the network, ignored if network is None determining whether service should be reset and lifecycle should restart @@ -385,20 +457,25 @@ async def spawn_instance( instance = service.service_instance - async def _worker(work_context: WorkContext) -> None: + async def _worker(work_context: WorkContext) -> bool: nonlocal instance assert agreement is not None + logger.debug("`spawn_instance` worker starting for %s on %s", instance, self) + activity = work_context._activity service._set_ctx(work_context) - self._change_state(instance) # pending -> starting + + if instance.state == ServiceState.pending: + self._change_state(instance) # pending -> starting + try: - if network: + if network and not service.network_node: service._set_network_node( await network.add_node(work_context.provider_id, network_address) ) - if not self._stopped: + if not self.stopped: instance_batches = self._run_instance(instance) try: await self._job.engine.process_batches( @@ -413,20 +490,33 @@ async def _worker(work_context: WorkContext) -> None: work_context.emit(events.WorkerFinished, exc_info=sys.exc_info()) raise finally: - if network and service.network_node: - await network.remove_node(work_context.provider_id) - service._clear_network_node() - await self._job.engine.accept_payments_for_agreement(self._job.id, agreement.id) - await self._job.agreements_pool.release_agreement(agreement.id, allow_reuse=False) + if service.state != ServiceState.suspended: + if network and service.network_node: + await network.remove_node(work_context.provider_id) + service._clear_network_node() + await self._job.engine.accept_payments_for_agreement(self._job.id, agreement.id) + await self._job.agreements_pool.release_agreement(agreement.id, allow_reuse=False) + + # keep activity? + return False + + # keep activity? + return True def on_agreement_ready(agreement_ready: Agreement) -> None: nonlocal agreement agreement = agreement_ready - while not self._stopped: + while not self.stopped: agreement = None await asyncio.sleep(1.0) - task = await self._job.engine.start_worker(self._job, _worker, on_agreement_ready) + task = await self._job.engine.start_worker( + self._job, + _worker, + on_agreement_ready, + existing_agreement_id=existing_agreement_id, + existing_activity_id=existing_activity_id + ) if not task: continue try: diff --git a/yapapi/services/service_state.py b/yapapi/services/service_state.py index 4e3cb2138..15adbf28b 100644 --- a/yapapi/services/service_state.py +++ b/yapapi/services/service_state.py @@ -45,6 +45,13 @@ class ServiceState(statemachine.StateMachine): """ unresponsive = statemachine.State("unresponsive") + suspended = statemachine.State("suspended") + """This service instance has been suspended. + + Its handlers should not be processed by the ServiceRunner anymore but no resultant changes + to the activity itself should be made. + """ + # transitions start: statemachine.Transition = pending.to(starting) ready: statemachine.Transition = starting.to(running) @@ -65,6 +72,9 @@ class ServiceState(statemachine.StateMachine): error_or_stop = stop | terminate """transition performed on error or `ControlSignal.stop`""" + suspend: statemachine.Transition = suspended.from_(starting, running, stopping) + """transition performed on `ControlSignal.suspend`""" + AVAILABLE = (starting, running, stopping) """A helper set of states in which the service instance is bound to an activity and can be interacted with.""" From 9366cfc16632b70bb3f8ca03f5db38b273518e7d Mon Sep 17 00:00:00 2001 From: shadeofblue Date: Tue, 20 Jun 2023 16:54:25 +0200 Subject: [PATCH 3/6] fixes --- examples/webapp/webapp_suspend_resume.py | 22 +++++++------- yapapi/agreements_pool.py | 2 +- yapapi/engine.py | 37 ++++++++++++++++-------- yapapi/executor/__init__.py | 4 ++- yapapi/golem.py | 6 ++-- yapapi/network.py | 17 ++++++----- yapapi/rest/market.py | 4 +-- yapapi/services/__init__.py | 2 +- yapapi/services/cluster.py | 15 ++++------ yapapi/services/service.py | 22 +++++++++----- yapapi/services/service_runner.py | 36 ++++++++++++++--------- yapapi/services/service_state.py | 6 ++-- 12 files changed, 101 insertions(+), 72 deletions(-) diff --git a/examples/webapp/webapp_suspend_resume.py b/examples/webapp/webapp_suspend_resume.py index 49aac93f0..c3eb76725 100755 --- a/examples/webapp/webapp_suspend_resume.py +++ b/examples/webapp/webapp_suspend_resume.py @@ -13,9 +13,9 @@ from yapapi.props import com from yapapi.services import Service, ServiceState from yapapi.strategy import ( + PROP_DEBIT_NOTE_INTERVAL_SEC, DecreaseScoreForUnconfirmedAgreement, LeastExpensiveLinearPayuMS, - PROP_DEBIT_NOTE_INTERVAL_SEC, PropValueRange, ) @@ -79,7 +79,10 @@ async def start(self): yield script def _serialize_init_params(self): - return {"db_address": self._db_address, "db_port": self._db_port, } + return { + "db_address": self._db_address, + "db_port": self._db_port, + } class DbService(Service): @@ -108,7 +111,6 @@ class MyMarketStrategy(LeastExpensiveLinearPayuMS): async def main(subnet_tag, payment_driver, payment_network, port): - base_strategy = MyMarketStrategy( max_fixed_price=Decimal("1.0"), max_price_for={com.Counter.CPU: Decimal("0.2"), com.Counter.TIME: Decimal("0.1")}, @@ -186,7 +188,7 @@ def raise_exception_if_still_starting(cluster): await proxy.stop() print(f"{TEXT_COLOR_CYAN}HTTP server stopped{TEXT_COLOR_DEFAULT}") - print("=================================================================== SERIALIZING AND DROPPING CURRENT STATE") + print("=============================================== SERIALIZING AND DROPPING CURRENT STATE") network_serialized = network.serialize() db_serialized = db_cluster.serialize_instances() @@ -198,22 +200,21 @@ def raise_exception_if_still_starting(cluster): print(f"{TEXT_COLOR_CYAN}waiting {secs} seconds...{TEXT_COLOR_DEFAULT}") await asyncio.sleep(secs) - print("=================================================================== STOPPING GOLEM ENGINE") + print("=============================================== STOPPING GOLEM ENGINE") await golem.stop(wait_for_payments=False) print(f"{TEXT_COLOR_CYAN}waiting {secs} seconds...{TEXT_COLOR_DEFAULT}") await asyncio.sleep(secs) - - print("=================================================================== SERIALIZED STATE: ") + print("=============================================== SERIALIZED STATE: ") print(json.dumps([network_serialized, db_serialized, web_serialized], indent=4)) print(f"{TEXT_COLOR_CYAN}waiting {secs} seconds...{TEXT_COLOR_DEFAULT}") await asyncio.sleep(secs) - print("=================================================================== RESTARTING THE ENGINE AND THE SERVICES") + print("=============================================== RESTARTING THE ENGINE AND THE SERVICES") golem = Golem( budget=1.0, @@ -226,7 +227,6 @@ def raise_exception_if_still_starting(cluster): print_env_info(golem) - network = Network.deserialize(golem._engine._net_api, network_serialized) db_cluster = await golem.resume_service(DbService, instances=db_serialized, network=network) @@ -262,9 +262,7 @@ def raise_exception_if_still_starting(cluster): db_cluster.stop() cnt = 0 - while cnt < 3 and any( - s.is_available for s in web_cluster.instances + db_cluster.instances - ): + while cnt < 3 and any(s.is_available for s in web_cluster.instances + db_cluster.instances): print(web_cluster.instances + db_cluster.instances) await asyncio.sleep(5) cnt += 1 diff --git a/yapapi/agreements_pool.py b/yapapi/agreements_pool.py index 510c7fa16..17dc1465e 100644 --- a/yapapi/agreements_pool.py +++ b/yapapi/agreements_pool.py @@ -10,7 +10,7 @@ from yapapi import events from yapapi.props import Activity, NodeInfo -from yapapi.rest.market import Agreement, AgreementDetails, ApiException, OfferProposal, Market +from yapapi.rest.market import Agreement, AgreementDetails, ApiException, Market, OfferProposal logger = logging.getLogger(__name__) diff --git a/yapapi/engine.py b/yapapi/engine.py index 9b932b992..86fec177a 100644 --- a/yapapi/engine.py +++ b/yapapi/engine.py @@ -395,9 +395,8 @@ async def _process_invoices(self) -> None: invoice_manager.add_invoice(invoice) await self._agreement_payment_attempt(invoice.agreement_id) if self._payment_closing and not ( - self._await_payments and invoice_manager.has_payable_unpaid_agreements + self._await_payments and invoice_manager.has_payable_unpaid_agreements ): - break async def accept_payments_for_agreement(self, job_id: str, agreement_id: str) -> None: @@ -638,9 +637,7 @@ async def create_activity(self, agreement_id: str) -> Activity: async def fetch_activity(self, activity_id: str) -> Activity: """Create an activity for given `agreement_id`.""" - return await self._activity_api.use_activity( - activity_id, stream_events=self._stream_output - ) + return await self._activity_api.use_activity(activity_id, stream_events=self._stream_output) async def start_worker( self, @@ -650,6 +647,22 @@ async def start_worker( existing_agreement_id: Optional[str] = None, existing_activity_id: Optional[str] = None, ) -> Optional[asyncio.Task]: + """Start a single worker (activity) within a Job. + + :param job: :class:`Job` within which the worker is launched. + :param run_worker: an async function which receives the work context and performs + any neccessary operations on it. If the function returns `True`, the activity + respective :class:`WorkContext` won't be terminated after the worker finishes. + :param on_agreement_ready: an optional callable to be called when the agreement is + created or initialized + :param existing_agreement_id: optional identifier of an existing agreement. + if given, the engine will attempt to use this agreement to launch the activity, + instead of signing a new one. + :param existing_activity_id: optional identifier of an existing activity. + if given, the engine won't launch a new activity and will try to reuse an existing one + instead. + """ + loop = asyncio.get_event_loop() async def worker_task(agreement: Agreement): @@ -680,7 +693,7 @@ async def worker_task(agreement: Agreement): "Activity init failed with error: %s. agreement: %s, existing activity id: %s", e, agreement.id, - existing_activity_id + existing_activity_id, ) job.emit(events.ActivityCreateFailed, agreement=agreement, exc_info=sys.exc_info()) raise @@ -729,10 +742,7 @@ async def process_batches( ) -> None: """Send command batches produced by `batch_generator` to `activity`.""" - try: - script: Script = await batch_generator.__anext__() - except Exception as e: - raise + script: Script = await batch_generator.__anext__() while True: batch_deadline = ( @@ -744,6 +754,7 @@ async def process_batches( batch: List[BatchCommand] = script._evaluate() remote = await activity.send(batch, deadline=batch_deadline) except Exception as e: + logger.error("Error executing script %s: %s(%s).", script, type(e), str(e)) script = await batch_generator.athrow(*sys.exc_info()) continue @@ -786,7 +797,7 @@ async def get_batch_results() -> List[events.CommandEvent]: future_results = loop.create_task(get_batch_results()) script = await batch_generator.asend(future_results) - def recycle_offer(self, offer: OfferProposal) -> None: + def recycle_offer(self, offer: OfferProposal) -> None: """Mark given offer as a fresh one, regardless of its previous processing. This offer was already processed, but something happened and we should treat it as a @@ -868,7 +879,9 @@ def __init__( self.expiration_time: datetime = expiration_time self.payload: Payload = payload - self.agreements_pool = AgreementsPool(self.emit, self.engine.recycle_offer, market_api=self.engine._market_api) + self.agreements_pool = AgreementsPool( + self.emit, self.engine.recycle_offer, market_api=self.engine._market_api + ) self.finished = asyncio.Event() self._demand_builder: Optional[DemandBuilder] = None diff --git a/yapapi/executor/__init__.py b/yapapi/executor/__init__.py index e69d4f5c7..a85eb1680 100644 --- a/yapapi/executor/__init__.py +++ b/yapapi/executor/__init__.py @@ -177,7 +177,7 @@ async def input_tasks() -> AsyncIterator[Task[D, R]]: work_queue = SmartQueue(input_tasks()) - async def run_worker(work_context: WorkContext) -> None: + async def run_worker(work_context: WorkContext) -> bool: """Run an instance of `worker` for the particular work context.""" agreement = work_context._agreement activity = work_context._activity @@ -237,6 +237,8 @@ async def task_generator() -> AsyncGenerator[Task[D, R], None]: if consumer.finished: raise StopAsyncIteration() + return False + async def worker_starter() -> None: while True: await asyncio.sleep(2) diff --git a/yapapi/golem.py b/yapapi/golem.py index da5e841f2..f1aefdb45 100644 --- a/yapapi/golem.py +++ b/yapapi/golem.py @@ -39,7 +39,7 @@ from yapapi.payload import Payload from yapapi.props import com from yapapi.script import Script -from yapapi.services import Cluster, ServiceType, ServiceSerialization +from yapapi.services import Cluster, ServiceSerialization, ServiceType from yapapi.strategy import DecreaseScoreForUnconfirmedAgreement, LeastExpensiveLinearPayuMS if TYPE_CHECKING: @@ -299,7 +299,9 @@ async def __aenter__(self) -> "Golem": async def __aexit__(self, *exc_info) -> Optional[bool]: return await self._stop_with_exc_info(*exc_info) - async def _stop_with_exc_info(self, *exc_info, wait_for_payments: bool = True) -> Optional[bool]: + async def _stop_with_exc_info( + self, *exc_info, wait_for_payments: bool = True + ) -> Optional[bool]: async with self._engine_state_lock: res = await self._engine.stop(*exc_info, wait_for_payments=wait_for_payments) await self._event_dispatcher.stop() diff --git a/yapapi/network.py b/yapapi/network.py index 58aae3af7..efe50cb95 100644 --- a/yapapi/network.py +++ b/yapapi/network.py @@ -338,18 +338,18 @@ def deserialize( ) -> "Network": network = cls( net_api, - ip=obj_dict.get("ip"), - owner_id=obj_dict.get("owner_id"), - owner_ip=obj_dict.get("owner_ip"), + ip=obj_dict["ip"], + owner_id=obj_dict["owner_id"], + owner_ip=obj_dict["owner_ip"], gateway=obj_dict.get("gateway"), ) - network._network_id = obj_dict.get("_network_id") - network._state_machine.current_state_value = obj_dict.get("state") - if obj_dict.get("nodes"): - for _id, ip in obj_dict.get("nodes").items(): - network._nodes[_id] = Node(network=network, node_id=_id, ip=ip) + network._network_id = obj_dict["_network_id"] + network._state_machine.current_state_value = obj_dict["state"] + for _id, ip in obj_dict["nodes"].items(): + network._nodes[_id] = Node(network=network, node_id=_id, ip=ip) return network + class NetworkSerialization(TypedDict): _network_id: str ip: str @@ -359,5 +359,6 @@ class NetworkSerialization(TypedDict): state: str nodes: Dict[str, str] + class NetworkError(Exception): """Exception raised by :class:`Network` when an operation is not possible.""" diff --git a/yapapi/rest/market.py b/yapapi/rest/market.py index ffb27f233..4732ac383 100644 --- a/yapapi/rest/market.py +++ b/yapapi/rest/market.py @@ -59,9 +59,7 @@ class Agreement(object): """Mid-level interface to the REST's Agreement model.""" def __init__( - self, api: RequestorApi, - agreement_id: str, - subscription: Optional["Subscription"] = None + self, api: RequestorApi, agreement_id: str, subscription: Optional["Subscription"] = None ): self._api = api self._subscription = subscription diff --git a/yapapi/services/__init__.py b/yapapi/services/__init__.py index 5ca7e835c..4b5e036df 100644 --- a/yapapi/services/__init__.py +++ b/yapapi/services/__init__.py @@ -1,5 +1,5 @@ from .cluster import Cluster -from .service import Service, ServiceInstance, ServiceType, ServiceSerialization +from .service import Service, ServiceInstance, ServiceSerialization, ServiceType from .service_runner import ServiceRunner from .service_state import ServiceState diff --git a/yapapi/services/cluster.py b/yapapi/services/cluster.py index 9b7860660..e1e9f22cc 100644 --- a/yapapi/services/cluster.py +++ b/yapapi/services/cluster.py @@ -15,7 +15,7 @@ from yapapi.network import Network from yapapi.payload import Payload -from .service import ServiceType, ServiceSerialization +from .service import ServiceSerialization, ServiceType from .service_runner import ServiceRunner DEFAULT_SERVICE_EXPIRATION: Final[timedelta] = timedelta(minutes=180) @@ -166,14 +166,12 @@ def spawn_instances( self.service_runner.add_instance(service, self.network, network_address) service._set_cluster(self) - def resume_instances(self, - serialized_instances: List[ServiceSerialization], - ): + def resume_instances(self, serialized_instances: List[ServiceSerialization]): for service_obj in serialized_instances: - service = self.service_class(**service_obj.get('params', dict())) + service = self.service_class(**service_obj.get("params", dict())) self.service_runner.add_existing_instance( service, - service_obj.get("state"), + service_obj["state"], service_obj.get("agreement_id"), service_obj.get("activity_id"), self.network, @@ -181,8 +179,6 @@ def resume_instances(self, ) service._set_cluster(self) - - def _resolve_instance_params( self, num_instances: Optional[int], @@ -205,7 +201,8 @@ def _resolve_instance_params( f"`instance_params` iterable depleted after {i} spawned instances." ) - def _default_expiration(self): + @staticmethod + def _default_expiration(): return datetime.now(timezone.utc) + DEFAULT_SERVICE_EXPIRATION def serialize_instances(self) -> List[ServiceSerialization]: diff --git a/yapapi/services/service.py b/yapapi/services/service.py index 49dca2d05..35ee7b769 100644 --- a/yapapi/services/service.py +++ b/yapapi/services/service.py @@ -64,7 +64,7 @@ class Service: _network_node: Optional[Node] = None def __init__(self, _id: Optional[str] = None): - self.__id = _id or str(uuid.uuid4()) + self.__id = _id or str(uuid.uuid4()) self.__inqueue: asyncio.Queue[ServiceSignal] = asyncio.Queue() self.__outqueue: asyncio.Queue[ServiceSignal] = asyncio.Queue() @@ -433,18 +433,26 @@ def _serialize_init_params(self) -> Dict[str, Any]: def serialize(self) -> "ServiceSerialization": return { "params": self._serialize_init_params(), - "activity_id": self._ctx._activity.id, - "agreement_id": self._ctx._agreement.id, + "activity_id": self._ctx._activity.id if self._ctx else None, + "agreement_id": self._ctx._agreement.id if self._ctx else None, "state": self.state.value, - "network_node": {"network_id": self._network_node.network.network_id, "node_id": self._network_node.node_id, "ip": self._network_node.ip, } + "network_node": { + "network_id": self._network_node.network.network_id, + "node_id": self._network_node.node_id, + "ip": self._network_node.ip, + } + if self._network_node + else None, } + class ServiceSerialization(TypedDict): params: Dict[str, Any] - activity_id: str - agreement_id: str + activity_id: Optional[str] + agreement_id: Optional[str] state: str - network_node: Dict[str, str] + network_node: Optional[Dict[str, str]] + ServiceType = TypeVar("ServiceType", bound=Service) diff --git a/yapapi/services/service_runner.py b/yapapi/services/service_runner.py index 9b0445b32..4399dc2b8 100644 --- a/yapapi/services/service_runner.py +++ b/yapapi/services/service_runner.py @@ -6,10 +6,9 @@ from types import TracebackType from typing import TYPE_CHECKING, AsyncContextManager, Dict, List, Optional, Set, Tuple, Type, Union -from typing_extensions import Final - import statemachine import statemachine.exceptions +from typing_extensions import Final if TYPE_CHECKING: from yapapi.engine import Job @@ -57,6 +56,7 @@ class ServiceRunnerState(statemachine.StateMachine): stop: statemachine.Transition = active.to(stopped) suspend: statemachine.Transition = active.to(suspended) + class ServiceRunner(AsyncContextManager): def __init__( self, @@ -119,10 +119,10 @@ def add_existing_instance( self, service: ServiceType, state: str, - agreement_id: str, - activity_id: str, + agreement_id: Optional[str] = None, + activity_id: Optional[str] = None, network: Optional[Network] = None, - network_node_dict = Optional[Dict[str, str]], + network_node_dict: Optional[Dict[str, str]] = None, ) -> None: """Add an existing service to the collection of services managed by this ServiceRunner. @@ -131,19 +131,26 @@ def add_existing_instance( service.service_instance.service_state.current_state_value = state - if network_node_dict: + if network and network_node_dict: service._set_network_node( Node( network=network, - node_id=network_node_dict.get("node_id"), - ip=network_node_dict.get("ip") + node_id=network_node_dict["node_id"], + ip=network_node_dict["ip"], ) ) self._instances.append(service) loop = asyncio.get_event_loop() - task = loop.create_task(self.spawn_instance(service, network, existing_agreement_id=agreement_id, existing_activity_id=activity_id)) + task = loop.create_task( + self.spawn_instance( + service, + network, + existing_agreement_id=agreement_id, + existing_activity_id=activity_id, + ) + ) self._instance_tasks.append(task) def stop_instance(self, service: Service): @@ -183,7 +190,7 @@ async def __aexit__(self, exc_type, exc_val, exc_tb): try: self._state.stop() except statemachine.exceptions.TransitionNotAllowed: - """The ServiceRunner is not running, """ + """The ServiceRunner is not running,""" pass logger.debug("%s is shutting down... state: %s", self, self.state) @@ -440,7 +447,6 @@ async def spawn_instance( network_address: Optional[str] = None, existing_agreement_id: Optional[str] = None, existing_activity_id: Optional[str] = None, - ) -> None: """Lifecycle the service within this :class:`ServiceRunner`. @@ -448,6 +454,8 @@ async def spawn_instance( :param network: a :class:`~yapapi.network.Network` this service should be attached to :param network_address: the address withing the network, ignored if network is None determining whether service should be reset and lifecycle should restart + :param existing_agreement_id: id of an existing agreement to attach the Service to + :param existing_activity_id: id of an existing activity to attach the Servide instance to """ await self._ensure_payload_matches(service) @@ -495,7 +503,9 @@ async def _worker(work_context: WorkContext) -> bool: await network.remove_node(work_context.provider_id) service._clear_network_node() await self._job.engine.accept_payments_for_agreement(self._job.id, agreement.id) - await self._job.agreements_pool.release_agreement(agreement.id, allow_reuse=False) + await self._job.agreements_pool.release_agreement( + agreement.id, allow_reuse=False + ) # keep activity? return False @@ -515,7 +525,7 @@ def on_agreement_ready(agreement_ready: Agreement) -> None: _worker, on_agreement_ready, existing_agreement_id=existing_agreement_id, - existing_activity_id=existing_activity_id + existing_activity_id=existing_activity_id, ) if not task: continue diff --git a/yapapi/services/service_state.py b/yapapi/services/service_state.py index 15adbf28b..cc19a07c5 100644 --- a/yapapi/services/service_state.py +++ b/yapapi/services/service_state.py @@ -47,9 +47,9 @@ class ServiceState(statemachine.StateMachine): suspended = statemachine.State("suspended") """This service instance has been suspended. - - Its handlers should not be processed by the ServiceRunner anymore but no resultant changes - to the activity itself should be made. + + Its handlers should not be processed by the ServiceRunner anymore but no resultant changes + to the activity itself should be made. """ # transitions From 81383db441bb3803f7abed099c24d2777def389f Mon Sep 17 00:00:00 2001 From: shadeofblue Date: Tue, 20 Jun 2023 17:11:59 +0200 Subject: [PATCH 4/6] fix the agreements pool tests --- tests/test_agreements_pool.py | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/tests/test_agreements_pool.py b/tests/test_agreements_pool.py index 69070853a..340b1cabb 100644 --- a/tests/test_agreements_pool.py +++ b/tests/test_agreements_pool.py @@ -28,6 +28,12 @@ async def create_agreement(): return create_agreement +def get_agreements_pool() -> agreements_pool.AgreementsPool: + return agreements_pool.AgreementsPool( + lambda _event, **kwargs: None, lambda _offer: None, mock.Mock() + ) + + @pytest.mark.asyncio async def test_use_agreement_chooses_max_score(): """Test that a proposal with the largest score is chosen in AgreementsPool.use_agreement().""" @@ -40,7 +46,7 @@ async def test_use_agreement_chooses_max_score(): mock_score = random.random() proposals[n] = (mock_score, mock_proposal) - pool = agreements_pool.AgreementsPool(lambda _event, **kwargs: None, lambda _offer: None) + pool = get_agreements_pool() for score, proposal in proposals.values(): await pool.add_proposal(score, proposal) @@ -76,7 +82,7 @@ async def test_use_agreement_shuffles_proposals(): mock_score = 42.0 if n != 0 else 41.0 proposals.append((mock_score, mock_proposal)) - pool = agreements_pool.AgreementsPool(lambda _event, **kwargs: None, lambda _offer: None) + pool = get_agreements_pool() for score, proposal in proposals: await pool.add_proposal(score, proposal) @@ -95,7 +101,7 @@ def use_agreement_cb(agreement): async def test_use_agreement_no_proposals(): """Test that `AgreementPool.use_agreement()` returns `None` when there are no proposals.""" - pool = agreements_pool.AgreementsPool(lambda _event, **kwargs: None, lambda _offer: None) + pool = get_agreements_pool() def use_agreement_cb(_agreement): assert False, "use_agreement callback called" @@ -120,7 +126,7 @@ async def test_terminate_agreement(multi_activity, simulate_race, event_emitted) events = [] pool = agreements_pool.AgreementsPool( - lambda event, **kwargs: events.append(event), lambda _offer: None # noqa + lambda event, **kwargs: events.append(event), lambda _offer: None, mock.Mock() # noqa ) agreement: BufferedAgreement = BufferedAgreementFactory(has_multi_activity=multi_activity) pool._agreements[agreement.agreement.id] = agreement From 4bb740aeafda6a6b53337a6d6fefb2a38b593c1a Mon Sep 17 00:00:00 2001 From: shadeofblue Date: Wed, 21 Jun 2023 15:46:05 +0200 Subject: [PATCH 5/6] fix the agreement termination --- yapapi/agreements_pool.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/yapapi/agreements_pool.py b/yapapi/agreements_pool.py index 17dc1465e..2b08be0e5 100644 --- a/yapapi/agreements_pool.py +++ b/yapapi/agreements_pool.py @@ -248,7 +248,7 @@ async def terminate_all(self, reason: dict) -> None: """Terminate all agreements.""" async with self._lock: - for agreement_id, agreement in self._agreements.items(): + for agreement_id in frozenset(self._agreements): await self._terminate_agreement(agreement_id, reason) async def on_agreement_terminated(self, agr_id: str, reason: dict) -> None: From 2ffd14506fb51be4a991b10f1bb6540ed7d7b618 Mon Sep 17 00:00:00 2001 From: shadeofblue Date: Wed, 21 Jun 2023 15:54:25 +0200 Subject: [PATCH 6/6] fix test --- tests/goth_tests/test_power_outage.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/tests/goth_tests/test_power_outage.py b/tests/goth_tests/test_power_outage.py index 2ad515091..9189c085f 100644 --- a/tests/goth_tests/test_power_outage.py +++ b/tests/goth_tests/test_power_outage.py @@ -11,7 +11,7 @@ from goth.runner.log import configure_logging from goth.runner.probe import ProviderProbe, RequestorProbe -from .assertions import assert_no_errors, assert_tasks_processed +from .assertions import assert_tasks_processed logger = logging.getLogger("goth.test.power_outage") @@ -67,7 +67,6 @@ async def test_power_outage( env=os.environ, ) as (_cmd_task, cmd_monitor, _process_monitor): # Add assertions to the command output monitor `cmd_monitor`: - cmd_monitor.add_assertion(assert_no_errors) all_sent = cmd_monitor.add_assertion(assert_all_tasks_started) all_computed = cmd_monitor.add_assertion(assert_all_tasks_computed)