Skip to content

Commit

Permalink
Enable polling in file watcher
Browse files Browse the repository at this point in the history
`inotify` does not work reliably with network file systems
(e.g., NFS, CIFS) commonly used in cloud environments. These
file systems may not propagate file system events correctly,
causing `inotify` to miss changes. To ensure consistent file
monitoring across these environments, polling is enabled by
default in FileWatcher.

Signed-off-by: Daniel Zullo <daniel.zullo@frequenz.com>
  • Loading branch information
daniel-zullo-frequenz committed Sep 19, 2024
1 parent 1a262ef commit 0334cb9
Show file tree
Hide file tree
Showing 4 changed files with 39 additions and 5 deletions.
10 changes: 9 additions & 1 deletion RELEASE_NOTES.md
Original file line number Diff line number Diff line change
@@ -1,11 +1,19 @@
# Frequenz channels Release Notes

## Upgrading

- `FileWatcher`: The file polling mechanism is now enabled by default. This provides reliable and consistent file monitoring on network file systems (e.g., CIFS). However, it may have a performance impact on local file systems or when monitoring a large number of files.
- To disable file polling, set the `force_polling` parameter to `False`.
- The `polling_interval` parameter defines the interval for polling changes. This is relevant only when polling is enabled and defaults to 1 second.

## New Features

- `Timer.reset()` now supports setting the interval and will restart the timer with the new interval.

## Bug Fixes

- `FileWatcher`: Fixed `ready()` method to return False when an error occurs. Before this fix, `select()` (and other code using `ready()`) never detected the `FileWatcher` was stopped and the `select()` loop was continuously waking up to inform the receiver was ready.
- `FileWatcher`:
- Fixed `ready()` method to return False when an error occurs. Before this fix, `select()` (and other code using `ready()`) never detected the `FileWatcher` was stopped and the `select()` loop was continuously waking up to inform the receiver was ready.
- Reports file events correctly on network file systems like CIFS.

- `Timer.stop()` and `Timer.reset()` now immediately stop the timer if it is running. Before this fix, the timer would continue to run until the next interval.
16 changes: 15 additions & 1 deletion src/frequenz/channels/file_watcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import pathlib
from collections import abc
from dataclasses import dataclass
from datetime import timedelta
from enum import Enum

from watchfiles import Change, awatch
Expand Down Expand Up @@ -127,13 +128,22 @@ def __init__(
self,
paths: list[pathlib.Path | str],
event_types: abc.Iterable[EventType] = frozenset(EventType),
*,
force_polling: bool = True,
polling_interval: timedelta = timedelta(seconds=1),
) -> None:
"""Initialize this file watcher.
Args:
paths: The paths to watch for changes.
event_types: The types of events to watch for. Defaults to watch for
all event types.
force_polling: Whether to explicitly force file polling to check for
changes. Note that even if set to False, file polling will still
be used as a fallback when the underlying file system does not
support event-based notifications.
polling_interval: The interval to poll for changes. Only relevant if
polling is enabled.
"""
self.event_types: frozenset[EventType] = frozenset(event_types)
"""The types of events to watch for."""
Expand All @@ -144,7 +154,11 @@ def __init__(
for path in paths
]
self._awatch: abc.AsyncGenerator[set[FileChange], None] = awatch(
*self._paths, stop_event=self._stop_event, watch_filter=self._filter_events
*self._paths,
stop_event=self._stop_event,
watch_filter=self._filter_events,
force_polling=force_polling,
poll_delay_ms=int(polling_interval.total_seconds() * 1_000),
)
self._awatch_stopped_exc: Exception | None = None
self._changes: set[FileChange] = set()
Expand Down
6 changes: 5 additions & 1 deletion tests/test_file_watcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,11 @@ async def test_file_watcher_filter_events(

assert awatch_mock.mock_calls == [
mock.call(
pathlib.Path(good_path), stop_event=mock.ANY, watch_filter=filter_events
pathlib.Path(good_path),
stop_event=mock.ANY,
watch_filter=filter_events,
force_polling=True,
poll_delay_ms=1_000,
)
]
for event_type in EventType:
Expand Down
12 changes: 10 additions & 2 deletions tests/test_file_watcher_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,11 @@ async def test_file_watcher_deletes(tmp_path: pathlib.Path) -> None:
tmp_path: A tmp directory to run the file watcher on. Created by pytest.
"""
filename = tmp_path / "test-file"
file_watcher = FileWatcher(paths=[str(tmp_path)], event_types={EventType.DELETE})
file_watcher = FileWatcher(
paths=[str(tmp_path)],
event_types={EventType.DELETE},
force_polling=False,
)
write_timer = Timer(timedelta(seconds=0.1), SkipMissedAndDrift())
deletion_timer = Timer(timedelta(seconds=0.25), SkipMissedAndDrift())

Expand Down Expand Up @@ -113,7 +117,11 @@ async def test_file_watcher_exit_iterator(tmp_path: pathlib.Path) -> None:
number_of_writes = 0
expected_number_of_writes = 3

file_watcher = FileWatcher(paths=[str(tmp_path)])
file_watcher = FileWatcher(
paths=[str(tmp_path)],
force_polling=True,
polling_interval=timedelta(seconds=0.05),
)
timer = Timer(timedelta(seconds=0.1), SkipMissedAndDrift())

async for selected in select(file_watcher, timer):
Expand Down

0 comments on commit 0334cb9

Please sign in to comment.