diff --git a/yapapi/engine.py b/yapapi/engine.py index e296a0f90..9c7b8dda7 100644 --- a/yapapi/engine.py +++ b/yapapi/engine.py @@ -266,20 +266,34 @@ async def _shutdown(self, *exc_info): from yapapi.log import pluralize logger.info("Golem is shutting down...") + # Wait until all computations are finished await asyncio.gather(*[job.finished.wait() for job in self._jobs]) logger.info("All jobs have finished") self._payment_closing = True + # Cancel all services except the one that processes invoices for task in self._services: if task is not self._process_invoices_job: task.cancel() - if self._process_invoices_job and not any( - True for job in self._jobs if job.agreements_pool.confirmed > 0 - ): - logger.debug("No need to wait for invoices.") + # Wait for some time for invoices for unpaid agreements, + # then cancel the invoices service + if self._process_invoices_job: + + if self._agreements_to_pay: + logger.info( + "%s still unpaid, waiting for invoices...", + pluralize(len(self._agreements_to_pay), "agreement"), + ) + try: + await asyncio.wait_for(self._process_invoices_job, timeout=30) + except asyncio.TimeoutError: + logger.debug("process_invoices_job cancelled") + if self._agreements_to_pay: + logger.warning("Unpaid agreements: %s", self._agreements_to_pay) + self._process_invoices_job.cancel() try: @@ -292,20 +306,6 @@ async def _shutdown(self, *exc_info): except Exception: logger.debug("Got error when waiting for services to finish", exc_info=True) - if self._agreements_to_pay and self._process_invoices_job: - logger.info( - "%s still unpaid, waiting for invoices...", - pluralize(len(self._agreements_to_pay), "agreement"), - ) - try: - await asyncio.wait_for(self._process_invoices_job, timeout=30) - except asyncio.TimeoutError: - logger.debug("process_invoices_job cancelled") - if self._agreements_to_pay: - logger.warning("Unpaid agreements: %s", self._agreements_to_pay) - - await asyncio.gather(*[job.finished.wait() for job in self._jobs]) - async def __aexit__(self, exc_type, exc_val, exc_tb): await self._stack.aclose()