diff --git a/yapapi/log.py b/yapapi/log.py index 769b7bf06..27c857ad1 100644 --- a/yapapi/log.py +++ b/yapapi/log.py @@ -41,10 +41,9 @@ ) ``` """ -from collections import defaultdict +from collections import defaultdict, Counter from dataclasses import asdict import itertools -import json import logging import time from typing import Any, Callable, Dict, Iterator, List, Optional, Set @@ -169,10 +168,10 @@ class SummaryLogger: `wrapped_emitter`. For example, with the following setup, each event emitted by `engine` - will be logged by `log_event_json`, and additionally, certain events + will be logged by `log_event_repr`, and additionally, certain events will cause summary messages to be logged. ``` - detailed_logger = log_event_json + detailed_logger = log_event_repr summary_logger = SummaryLogger(wrapped_emitter=detailed_logger).log engine = Engine(..., event_emitter=summary_logger) ``` @@ -186,6 +185,9 @@ class SummaryLogger: # Start time of the computation start_time: float + # Maps received proposal ids to provider ids + received_proposals: Dict[str, str] + # Set of confirmed proposal ids confirmed_proposals: Set[str] @@ -204,6 +206,9 @@ class SummaryLogger: # Map a provider name to the sum of amounts in this provider's invoices provider_cost: Dict[str, float] + # Count how many times a worker failed on a provider + provider_failures: Dict[str, int] + # Has computation finished? finished: bool @@ -218,12 +223,14 @@ def _reset(self) -> None: """Reset all information aggregated by this logger.""" self.start_time = time.time() + self.received_proposals = {} self.confirmed_proposals = set() self.agreement_provider_name = {} self.confirmed_agreements = set() self.task_data = {} self.provider_tasks = defaultdict(list) self.provider_cost = {} + self.provider_failures = Counter() self.finished = False self.error_occurred = False @@ -259,10 +266,16 @@ def _handle(self, event: events.Event): if isinstance(event, events.SubscriptionCreated): self.logger.info(event_type_to_string[type(event)]) + elif isinstance(event, events.ProposalReceived): + self.received_proposals[event.prop_id] = event.provider_id + elif isinstance(event, events.ProposalConfirmed): self.confirmed_proposals.add(event.prop_id) + confirmed_providers = set( + self.received_proposals[prop_id] for prop_id in self.confirmed_proposals + ) self.logger.info( - "Received proposals from %s providers so far", len(self.confirmed_proposals) + "Received proposals from %s providers so far", len(confirmed_providers) ) elif isinstance(event, events.AgreementCreated): @@ -306,15 +319,35 @@ def _handle(self, event: events.Event): self.provider_cost[provider_name] = cost self._print_total_cost() + elif isinstance(event, events.WorkerFinished): + if event.exception is None: + return + exc_type, exc, tb = event.exception + provider_name = self.agreement_provider_name[event.agr_id] + self.provider_failures[provider_name] += 1 + self.logger.warning("Activity failed on provider '%s', reason: %r", provider_name, exc) + elif isinstance(event, events.ComputationFinished): self.finished = True total_time = time.time() - self.start_time self.logger.info(f"Computation finished in {total_time:.1f}s") + agreement_providers = { + self.agreement_provider_name[agr_id] for agr_id in self.confirmed_agreements + } self.logger.info( - "Negotiated agreements with %s providers", len(self.confirmed_agreements) + "Negotiated %s agreements with %s providers", + len(self.confirmed_agreements), + len(agreement_providers), ) for provider_name, tasks in self.provider_tasks.items(): self.logger.info("Provider '%s' computed %s tasks", provider_name, len(tasks)) + for provider_name in set(self.agreement_provider_name.values()): + if provider_name not in self.provider_tasks: + self.logger.info("Provider '%s' did not compute any tasks", provider_name) + for provider_name, count in self.provider_failures.items(): + self.logger.info( + "Activity failed %s time(s) on provider '%s'", count, provider_name + ) self._print_total_cost()