Skip to content

Commit

Permalink
[mac] Fix callback exceptions when the watcher is deleted but still r…
Browse files Browse the repository at this point in the history
…eceiving events (#786)

The watcher might be cleared on thread stop but the C callback would still send events,
ending in unhandled exceptions later:

    AttributeError: 'NoneType' object has no attribute 'is_recursive'
      File "watchdog/observers/fsevents.py", line 299, in callback
        emitter.queue_events(emitter.timeout, events)
      File "watchdog/observers/fsevents.py", line 261, in queue_events
        self._queue_created_event(event, src_path, src_dirname)
      File "watchdog/observers/fsevents.py", line 124, in _queue_created_event
        self.queue_event(DirModifiedEvent(dirname))
      File "watchdog/observers/fsevents.py", line 97, in queue_event
        if self._watch.is_recursive :

Or even:

    AttributeError: 'NoneType' object has no attribute 'path'
      File "watchdog/observers/fsevents.py", line 299, in callback
        emitter.queue_events(emitter.timeout, events)
      File "watchdog/observers/fsevents.py", line 174, in queue_events
        src_path = self._encode_path(event.path)
      File "watchdog/observers/fsevents.py", line 323, in _encode_path
        if isinstance(self.watch.path, bytes):

The C extension wasn't great at dealing with some of the edge cases when watches are removed.
This manifested itself in a couple of different errors. Some of them were worked around in Python
land, most notably with checks for `self._watch` and setting it to `None` when it was removed.
This masked an issue where we assumed that a watch was always found in the internal lookup
dictionary when it was removed. With this issue fixed we can remove those checks, which is
also done here.

Co-authored-by: Romain Grasland <rgrasland@nuxeo.com>
Co-authored-by: Thomas <thomas@ccpgames.com>
  • Loading branch information
3 people committed May 10, 2021
1 parent 8ebb408 commit a70b350
Show file tree
Hide file tree
Showing 4 changed files with 116 additions and 28 deletions.
4 changes: 2 additions & 2 deletions changelog.rst
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ Changelog

2021-0x-xx • `full history <https://github.com/gorakhargosh/watchdog/compare/v2.1.0...master>`__

-
- Thanks to our beloved contributors:
- [mac] Fix callback exceptions when the watcher is deleted but still receiving events
- Thanks to our beloved contributors: @rom1win, @BoboTiG, @CCP-Aporia


2.1.0
Expand Down
41 changes: 22 additions & 19 deletions src/watchdog/observers/fsevents.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,10 +86,8 @@ def __init__(self, event_queue, watch, timeout=DEFAULT_EMITTER_TIMEOUT, suppress
self._lock = threading.Lock()

def on_thread_stop(self):
if self.watch:
_fsevents.remove_watch(self.watch)
_fsevents.stop(self)
self._watch = None
_fsevents.remove_watch(self.watch)
_fsevents.stop(self)

def queue_event(self, event):
# fsevents defaults to be recursive, so if the watch was meant to be non-recursive then we need to drop
Expand Down Expand Up @@ -287,23 +285,28 @@ def queue_events(self, timeout, events):

self._fs_view.clear()

def events_callback(self, paths, inodes, flags, ids):
"""Callback passed to FSEventStreamCreate(), it will receive all
FS events and queue them.
"""
cls = _fsevents.NativeEvent
try:
events = [
cls(path, inode, event_flags, event_id)
for path, inode, event_flags, event_id in zip(
paths, inodes, flags, ids
)
]
with self._lock:
self.queue_events(self.timeout, events)
except Exception:
logger.exception("Unhandled exception in fsevents callback")

def run(self):
self.pathnames = [self.watch.path]
self._start_time = time.monotonic()
try:
def callback(paths, inodes, flags, ids, emitter=self):
try:
with emitter._lock:
events = [
_fsevents.NativeEvent(path, inode, event_flags, event_id)
for path, inode, event_flags, event_id in zip(paths, inodes, flags, ids)
]
emitter.queue_events(emitter.timeout, events)
except Exception:
logger.exception("Unhandled exception in fsevents callback")

self.pathnames = [self.watch.path]
self._start_time = time.monotonic()

_fsevents.add_watch(self, self.watch, callback, self.pathnames)
_fsevents.add_watch(self, self.watch, self.events_callback, self.pathnames)
_fsevents.read_events(self)
except Exception:
logger.exception("Unhandled exception in FSEventsEmitter")
Expand Down
11 changes: 7 additions & 4 deletions src/watchdog_fsevents.c
Original file line number Diff line number Diff line change
Expand Up @@ -784,17 +784,20 @@ static PyObject *
watchdog_remove_watch(PyObject *self, PyObject *watch)
{
UNUSED(self);
PyObject *value = PyDict_GetItem(watch_to_stream, watch);
PyObject *streamref_capsule = PyDict_GetItem(watch_to_stream, watch);
if (!streamref_capsule) {
// A watch might have been removed explicitly before, in which case we can simply early out.
Py_RETURN_NONE;
}
PyDict_DelItem(watch_to_stream, watch);

FSEventStreamRef stream_ref = PyCapsule_GetPointer(value, NULL);
FSEventStreamRef stream_ref = PyCapsule_GetPointer(streamref_capsule, NULL);

FSEventStreamStop(stream_ref);
FSEventStreamInvalidate(stream_ref);
FSEventStreamRelease(stream_ref);

Py_INCREF(Py_None);
return Py_None;
Py_RETURN_NONE;
}

PyDoc_STRVAR(watchdog_stop__doc__,
Expand Down
88 changes: 85 additions & 3 deletions tests/test_fsevents.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@
from functools import partial
from os import mkdir, rmdir
from queue import Queue
from random import random
from threading import Thread
from time import sleep
from unittest.mock import patch

import _watchdog_fsevents as _fsevents
from watchdog.events import FileSystemEventHandler
Expand Down Expand Up @@ -100,6 +104,86 @@ def callback(path, inodes, flags, ids):
rmdir(a)


def test_watcher_deletion_while_receiving_events_1(caplog, observer):
"""
When the watcher is stopped while there are events, such exception could happen:
Traceback (most recent call last):
File "observers/fsevents.py", line 327, in events_callback
self.queue_events(self.timeout, events)
File "observers/fsevents.py", line 187, in queue_events
src_path = self._encode_path(event.path)
File "observers/fsevents.py", line 352, in _encode_path
if isinstance(self.watch.path, bytes):
AttributeError: 'NoneType' object has no attribute 'path'
"""
tmpdir = p()

orig = FSEventsEmitter.events_callback

def cb(*args):
FSEventsEmitter.stop(emitter)
orig(*args)

with caplog.at_level(logging.ERROR), patch.object(FSEventsEmitter, "events_callback", new=cb):
start_watching(tmpdir)
# Less than 100 is not enough events to trigger the error
for n in range(100):
touch(p("{}.txt".format(n)))
emitter.stop()
assert not caplog.records


def test_watcher_deletion_while_receiving_events_2(caplog):
"""Note: that test takes about 20 seconds to complete.
Quite similar test to prevent another issue
when the watcher is stopped while there are events, such exception could happen:
Traceback (most recent call last):
File "observers/fsevents.py", line 327, in events_callback
self.queue_events(self.timeout, events)
File "observers/fsevents.py", line 235, in queue_events
self._queue_created_event(event, src_path, src_dirname)
File "observers/fsevents.py", line 132, in _queue_created_event
self.queue_event(cls(src_path))
File "observers/fsevents.py", line 104, in queue_event
if self._watch.is_recursive:
AttributeError: 'NoneType' object has no attribute 'is_recursive'
"""

def try_to_fail():
tmpdir = p()
start_watching(tmpdir)

def create_files():
# Less than 2000 is not enough events to trigger the error
for n in range(2000):
touch(p(str(n) + ".txt"))

def stop(em):
sleep(random())
em.stop()

th1 = Thread(target=create_files)
th2 = Thread(target=stop, args=(emitter,))

try:
with caplog.at_level(logging.ERROR):
th1.start()
th2.start()
th1.join()
th2.join()
assert not caplog.records
finally:
emitter.stop()

# 20 attempts to make the random failure happen
for _ in range(20):
try_to_fail()
sleep(random())


def test_remove_watch_twice():
"""
ValueError: PyCapsule_GetPointer called with invalid PyCapsule object
Expand Down Expand Up @@ -143,9 +227,7 @@ def on_thread_stop(self):
w = observer.schedule(FileSystemEventHandler(), a, recursive=False)
rmdir(a)
time.sleep(0.1)
with pytest.raises(KeyError):
# watch no longer exists!
observer.unschedule(w)
observer.unschedule(w)


def test_converting_cfstring_to_pyunicode():
Expand Down

0 comments on commit a70b350

Please sign in to comment.