From 8640fb2c0de162b550beb162307019fc5ebaa515 Mon Sep 17 00:00:00 2001 From: Daniel Zullo Date: Mon, 9 Sep 2024 13:43:26 +0200 Subject: [PATCH] Add start/stop mechanics to FileWatcher The start/stop mechanics will allow users to control when the file watcher starts and stops. This will be useful for users who needs, for any reason, restart the file watcher (e.g. after an actor restart). Signed-off-by: Daniel Zullo --- src/frequenz/channels/file_watcher.py | 25 ++++++++++++++++++++++--- 1 file changed, 22 insertions(+), 3 deletions(-) diff --git a/src/frequenz/channels/file_watcher.py b/src/frequenz/channels/file_watcher.py index 860c0bda..e9b6737a 100644 --- a/src/frequenz/channels/file_watcher.py +++ b/src/frequenz/channels/file_watcher.py @@ -118,6 +118,7 @@ def __init__( self, paths: list[pathlib.Path | str], event_types: abc.Iterable[EventType] = frozenset(EventType), + auto_start: bool = True, ) -> None: """Initialize this file watcher. @@ -125,6 +126,7 @@ def __init__( paths: The paths to watch for changes. event_types: The types of events to watch for. Defaults to watch for all event types. + auto_start: Whether to start the file watcher immediately. """ self.event_types: frozenset[EventType] = frozenset(event_types) """The types of events to watch for.""" @@ -134,12 +136,13 @@ def __init__( path if isinstance(path, pathlib.Path) else pathlib.Path(path) for path in paths ] - self._awatch: abc.AsyncGenerator[set[FileChange], None] = awatch( - *self._paths, stop_event=self._stop_event, watch_filter=self._filter_events - ) + self._awatch: abc.AsyncGenerator[set[FileChange], None] | None = None self._awatch_stopped_exc: Exception | None = None self._changes: set[FileChange] = set() + if auto_start: + self.start() + def _filter_events( self, change: Change, @@ -177,6 +180,9 @@ async def ready(self) -> bool: if self._changes: return True + if self._awatch is None: + return False + # if it was already stopped, return immediately. if self._awatch_stopped_exc is not None: return False @@ -206,6 +212,19 @@ def consume(self) -> Event: change, path_str = self._changes.pop() return Event(type=EventType(change), path=pathlib.Path(path_str)) + def start(self) -> None: + """Start this receiver for watching file events.""" + self._awatch_stopped_exc = None + self._stop_event.clear() + self._awatch = awatch( + *self._paths, stop_event=self._stop_event, watch_filter=self._filter_events + ) + + def stop(self) -> None: + """Stop this receiver from watching file events.""" + self._stop_event.set() + self._awatch = None + def __str__(self) -> str: """Return a string representation of this receiver.""" if len(self._paths) > 3: