Skip to content

Commit

Permalink
Merge pull request #48 from golemfactory/feature/47-report-worker-errors
Browse files Browse the repository at this point in the history
Report exceptions raised in worker tasks
  • Loading branch information
azawlocki committed Sep 30, 2020
2 parents 40f1474 + c15ae1d commit b4b80ca
Showing 1 changed file with 39 additions and 6 deletions.
45 changes: 39 additions & 6 deletions yapapi/log.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,9 @@
)
```
"""
from collections import defaultdict
from collections import defaultdict, Counter
from dataclasses import asdict
import itertools
import json
import logging
import time
from typing import Any, Callable, Dict, Iterator, List, Optional, Set
Expand Down Expand Up @@ -169,10 +168,10 @@ class SummaryLogger:
`wrapped_emitter`.
For example, with the following setup, each event emitted by `engine`
will be logged by `log_event_json`, and additionally, certain events
will be logged by `log_event_repr`, and additionally, certain events
will cause summary messages to be logged.
```
detailed_logger = log_event_json
detailed_logger = log_event_repr
summary_logger = SummaryLogger(wrapped_emitter=detailed_logger).log
engine = Engine(..., event_emitter=summary_logger)
```
Expand All @@ -186,6 +185,9 @@ class SummaryLogger:
# Start time of the computation
start_time: float

# Maps received proposal ids to provider ids
received_proposals: Dict[str, str]

# Set of confirmed proposal ids
confirmed_proposals: Set[str]

Expand All @@ -204,6 +206,9 @@ class SummaryLogger:
# Map a provider name to the sum of amounts in this provider's invoices
provider_cost: Dict[str, float]

# Count how many times a worker failed on a provider
provider_failures: Dict[str, int]

# Has computation finished?
finished: bool

Expand All @@ -218,12 +223,14 @@ def _reset(self) -> None:
"""Reset all information aggregated by this logger."""

self.start_time = time.time()
self.received_proposals = {}
self.confirmed_proposals = set()
self.agreement_provider_name = {}
self.confirmed_agreements = set()
self.task_data = {}
self.provider_tasks = defaultdict(list)
self.provider_cost = {}
self.provider_failures = Counter()
self.finished = False
self.error_occurred = False

Expand Down Expand Up @@ -259,10 +266,16 @@ def _handle(self, event: events.Event):
if isinstance(event, events.SubscriptionCreated):
self.logger.info(event_type_to_string[type(event)])

elif isinstance(event, events.ProposalReceived):
self.received_proposals[event.prop_id] = event.provider_id

elif isinstance(event, events.ProposalConfirmed):
self.confirmed_proposals.add(event.prop_id)
confirmed_providers = set(
self.received_proposals[prop_id] for prop_id in self.confirmed_proposals
)
self.logger.info(
"Received proposals from %s providers so far", len(self.confirmed_proposals)
"Received proposals from %s providers so far", len(confirmed_providers)
)

elif isinstance(event, events.AgreementCreated):
Expand Down Expand Up @@ -306,15 +319,35 @@ def _handle(self, event: events.Event):
self.provider_cost[provider_name] = cost
self._print_total_cost()

elif isinstance(event, events.WorkerFinished):
if event.exception is None:
return
exc_type, exc, tb = event.exception
provider_name = self.agreement_provider_name[event.agr_id]
self.provider_failures[provider_name] += 1
self.logger.warning("Activity failed on provider '%s', reason: %r", provider_name, exc)

elif isinstance(event, events.ComputationFinished):
self.finished = True
total_time = time.time() - self.start_time
self.logger.info(f"Computation finished in {total_time:.1f}s")
agreement_providers = {
self.agreement_provider_name[agr_id] for agr_id in self.confirmed_agreements
}
self.logger.info(
"Negotiated agreements with %s providers", len(self.confirmed_agreements)
"Negotiated %s agreements with %s providers",
len(self.confirmed_agreements),
len(agreement_providers),
)
for provider_name, tasks in self.provider_tasks.items():
self.logger.info("Provider '%s' computed %s tasks", provider_name, len(tasks))
for provider_name in set(self.agreement_provider_name.values()):
if provider_name not in self.provider_tasks:
self.logger.info("Provider '%s' did not compute any tasks", provider_name)
for provider_name, count in self.provider_failures.items():
self.logger.info(
"Activity failed %s time(s) on provider '%s'", count, provider_name
)
self._print_total_cost()


Expand Down

0 comments on commit b4b80ca

Please sign in to comment.