Skip to content

Commit

Permalink
Add start/stop mechanics to FileWatcher
Browse files Browse the repository at this point in the history
The start/stop mechanics will allow users to control
when the file watcher starts and stops.
This will be useful for users who needs, for any reason,
restart the file watcher (e.g. after an actor restart).

Signed-off-by: Daniel Zullo <daniel.zullo@frequenz.com>
  • Loading branch information
daniel-zullo-frequenz committed Sep 9, 2024
1 parent 315d043 commit 8640fb2
Showing 1 changed file with 22 additions and 3 deletions.
25 changes: 22 additions & 3 deletions src/frequenz/channels/file_watcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,13 +118,15 @@ def __init__(
self,
paths: list[pathlib.Path | str],
event_types: abc.Iterable[EventType] = frozenset(EventType),
auto_start: bool = True,
) -> None:
"""Initialize this file watcher.
Args:
paths: The paths to watch for changes.
event_types: The types of events to watch for. Defaults to watch for
all event types.
auto_start: Whether to start the file watcher immediately.
"""
self.event_types: frozenset[EventType] = frozenset(event_types)
"""The types of events to watch for."""
Expand All @@ -134,12 +136,13 @@ def __init__(
path if isinstance(path, pathlib.Path) else pathlib.Path(path)
for path in paths
]
self._awatch: abc.AsyncGenerator[set[FileChange], None] = awatch(
*self._paths, stop_event=self._stop_event, watch_filter=self._filter_events
)
self._awatch: abc.AsyncGenerator[set[FileChange], None] | None = None
self._awatch_stopped_exc: Exception | None = None
self._changes: set[FileChange] = set()

if auto_start:
self.start()

def _filter_events(
self,
change: Change,
Expand Down Expand Up @@ -177,6 +180,9 @@ async def ready(self) -> bool:
if self._changes:
return True

if self._awatch is None:
return False

# if it was already stopped, return immediately.
if self._awatch_stopped_exc is not None:
return False
Expand Down Expand Up @@ -206,6 +212,19 @@ def consume(self) -> Event:
change, path_str = self._changes.pop()
return Event(type=EventType(change), path=pathlib.Path(path_str))

def start(self) -> None:
"""Start this receiver for watching file events."""
self._awatch_stopped_exc = None
self._stop_event.clear()
self._awatch = awatch(
*self._paths, stop_event=self._stop_event, watch_filter=self._filter_events
)

def stop(self) -> None:
"""Stop this receiver from watching file events."""
self._stop_event.set()
self._awatch = None

def __str__(self) -> str:
"""Return a string representation of this receiver."""
if len(self._paths) > 3:
Expand Down

0 comments on commit 8640fb2

Please sign in to comment.