Skip to content

Commit

Permalink
fixup: change end of events sentinels
Browse files Browse the repository at this point in the history
  • Loading branch information
JHolba committed Aug 30, 2024
1 parent 3acdf62 commit 851a2e1
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 14 deletions.
12 changes: 7 additions & 5 deletions src/_ert_forward_model_runner/reporting/event.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import threading
from datetime import datetime, timedelta
from pathlib import Path
from typing import Type, Union
from typing import Final, Union

from _ert import events
from _ert.events import (
Expand Down Expand Up @@ -37,6 +37,10 @@
logger = logging.getLogger(__name__)


class EventSentinel:
pass


class Event(Reporter):
"""
The Event reporter forwards events, coming from the running job, added with
Expand All @@ -53,7 +57,7 @@ class Event(Reporter):
before stopping the reporter. Any remaining events will not be sent.
"""

_sentinel: None = None
_sentinel: Final = EventSentinel()

def __init__(self, evaluator_url, token=None, cert_path=None):
self._evaluator_url = evaluator_url
Expand All @@ -72,9 +76,7 @@ def __init__(self, evaluator_url, token=None, cert_path=None):

self._ens_id = None
self._real_id = None
self._event_queue: queue.Queue[events.Event | Type[Event._sentinel]] = (
queue.Queue()
)
self._event_queue: queue.Queue[events.Event | EventSentinel] = queue.Queue()
self._event_publisher_thread = ErtThread(target=self._event_publisher)
self._timeout_timestamp = None
self._timestamp_lock = threading.Lock()
Expand Down
18 changes: 9 additions & 9 deletions src/ert/ensemble_evaluator/monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import logging
import ssl
import uuid
from typing import TYPE_CHECKING, Any, AsyncGenerator, Optional, Union
from typing import TYPE_CHECKING, Any, AsyncGenerator, Final, Optional, Union

from aiohttp import ClientError
from websockets import ConnectionClosed, Headers, WebSocketClientProtocol
Expand All @@ -25,17 +25,17 @@
logger = logging.getLogger(__name__)


class CloseTrackerEvent:
class EventSentinel:
pass


class Monitor:
_sentinel: Final = EventSentinel()

def __init__(self, ee_con_info: "EvaluatorConnectionInfo") -> None:
self._ee_con_info = ee_con_info
self._id = str(uuid.uuid1()).split("-", maxsplit=1)[0]
self._event_queue: asyncio.Queue[Union[Event, CloseTrackerEvent]] = (
asyncio.Queue()
)
self._event_queue: asyncio.Queue[Union[Event, EventSentinel]] = asyncio.Queue()
self._connection: Optional[WebSocketClientProtocol] = None
self._receiver_task: Optional[asyncio.Task[None]] = None
self._connected: asyncio.Event = asyncio.Event()
Expand Down Expand Up @@ -71,7 +71,7 @@ async def __aexit__(self, exc_type: Any, exc_value: Any, traceback: Any) -> None
async def signal_cancel(self) -> None:
if not self._connection:
return
await self._event_queue.put(CloseTrackerEvent())
await self._event_queue.put(Monitor._sentinel)
logger.debug(f"monitor-{self._id} asking server to cancel...")

cancel_event = EEUserCancel(monitor=self._id)
Expand All @@ -81,7 +81,7 @@ async def signal_cancel(self) -> None:
async def signal_done(self) -> None:
if not self._connection:
return
await self._event_queue.put(CloseTrackerEvent())
await self._event_queue.put(Monitor._sentinel)
logger.debug(f"monitor-{self._id} informing server monitor is done...")

done_event = EEUserDone(monitor=self._id)
Expand All @@ -108,12 +108,12 @@ async def track(
logger.error("Evaluator did not send the TERMINATED event!")
break
event = None
if isinstance(event, CloseTrackerEvent):
if isinstance(event, EventSentinel):
closetracker_received = True
_heartbeat_interval = self._receiver_timeout
else:
yield event
if event is not None and type(event) is EETerminated:
if type(event) is EETerminated:
logger.debug(f"monitor-{self._id} client received terminated")
break
if event is not None:
Expand Down

0 comments on commit 851a2e1

Please sign in to comment.