Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix ready() method in FileWatcher #318

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion RELEASE_NOTES.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,4 @@

## Bug Fixes

<!-- Here goes notable bug fixes that are worth a special mention or explanation -->
- `FileWatcher`: Fixed `ready()` method to return False when an error occurs. Before this fix, `select()` (and other code using `ready()`) never detected the `FileWatcher` was stopped and the `select()` loop was continuously waking up to inform the receiver was ready.
10 changes: 10 additions & 0 deletions src/frequenz/channels/file_watcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,15 @@ class FileWatcher(Receiver[Event]):
the [`path`][frequenz.channels.file_watcher.Event.path] where the change was
observed.

Note:
The owner of the [`FileWatcher`][frequenz.channels.file_watcher.FileWatcher]
receiver is responsible for recreating the `FileWatcher` after it has been
cancelled or stopped.
For example, if a [`Task`][asyncio.Task] uses an asynchronous iterator to consume
events from the `FileWatcher` and the task is cancelled, the `FileWatcher` will
also stop. Therefore, the same `FileWatcher` instance cannot be reused for a new
task to consume events. In this case, a new FileWatcher instance must be created.

# Event Types

The following event types are available:
Expand Down Expand Up @@ -185,6 +194,7 @@ async def ready(self) -> bool:
self._changes = await anext(self._awatch)
except StopAsyncIteration as err:
self._awatch_stopped_exc = err
return False

return True

Expand Down
35 changes: 34 additions & 1 deletion tests/test_file_watcher_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

import pytest

from frequenz.channels import select, selected_from
from frequenz.channels import ReceiverStoppedError, select, selected_from
from frequenz.channels.file_watcher import Event, EventType, FileWatcher
from frequenz.channels.timer import SkipMissedAndDrift, Timer

Expand Down Expand Up @@ -99,3 +99,36 @@ async def test_file_watcher_deletes(tmp_path: pathlib.Path) -> None:
# Can be more because the watcher could take some time to trigger
assert number_of_write >= 3
assert number_of_events == 2


@pytest.mark.integration
async def test_file_watcher_exit_iterator(tmp_path: pathlib.Path) -> None:
"""Test breaking the file watcher iterator.

Args:
tmp_path: A tmp directory to run the file watcher on. Created by pytest.
"""
filename = tmp_path / "test-file"

number_of_writes = 0
expected_number_of_writes = 3

file_watcher = FileWatcher(paths=[str(tmp_path)])
timer = Timer(timedelta(seconds=0.1), SkipMissedAndDrift())

async for selected in select(file_watcher, timer):
if selected_from(selected, timer):
filename.write_text(f"{selected.message}")
elif selected_from(selected, file_watcher):
number_of_writes += 1
if number_of_writes == expected_number_of_writes:
file_watcher._stop_event.set() # pylint: disable=protected-access
break

ready = await file_watcher.ready()
assert ready is False

with pytest.raises(ReceiverStoppedError):
file_watcher.consume()

assert number_of_writes == expected_number_of_writes
Loading