Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Always cancel process_invoices_job when shutting down Golem #452

Merged
merged 2 commits into from
Jun 10, 2021
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 18 additions & 18 deletions yapapi/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -266,20 +266,34 @@ async def _shutdown(self, *exc_info):
from yapapi.log import pluralize

logger.info("Golem is shutting down...")

# Wait until all computations are finished
await asyncio.gather(*[job.finished.wait() for job in self._jobs])
logger.info("All jobs have finished")

self._payment_closing = True

# Cancel all services except the one that processes invoices
for task in self._services:
if task is not self._process_invoices_job:
task.cancel()

if self._process_invoices_job and not any(
True for job in self._jobs if job.agreements_pool.confirmed > 0
):
logger.debug("No need to wait for invoices.")
# Wait for some time for invoices for unpaid agreements,
# then cancel the invoices service
if self._process_invoices_job:

if self._agreements_to_pay:
logger.info(
"%s still unpaid, waiting for invoices...",
pluralize(len(self._agreements_to_pay), "agreement"),
)
try:
await asyncio.wait_for(self._process_invoices_job, timeout=30)
except asyncio.TimeoutError:
logger.debug("process_invoices_job cancelled")
if self._agreements_to_pay:
logger.warning("Unpaid agreements: %s", self._agreements_to_pay)

self._process_invoices_job.cancel()
Copy link
Contributor Author

@azawlocki azawlocki Jun 10, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this one line was misplaced, it should always be executed


try:
Expand All @@ -292,20 +306,6 @@ async def _shutdown(self, *exc_info):
except Exception:
logger.debug("Got error when waiting for services to finish", exc_info=True)

if self._agreements_to_pay and self._process_invoices_job:
logger.info(
"%s still unpaid, waiting for invoices...",
pluralize(len(self._agreements_to_pay), "agreement"),
)
try:
await asyncio.wait_for(self._process_invoices_job, timeout=30)
except asyncio.TimeoutError:
logger.debug("process_invoices_job cancelled")
if self._agreements_to_pay:
logger.warning("Unpaid agreements: %s", self._agreements_to_pay)

await asyncio.gather(*[job.finished.wait() for job in self._jobs])
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this line is a duplicate, we already do this at the beginning of this method


async def __aexit__(self, exc_type, exc_val, exc_tb):
await self._stack.aclose()

Expand Down