From 57f3a1a96e83dc335a5a7ba1cbda0a24043503a0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micka=C3=ABl=20Schoentgen?= Date: Thu, 6 May 2021 10:35:25 +0200 Subject: [PATCH 1/6] [mac] Fix callback exceptions when the watcher is deleted but still receiving events The watcher might be cleared on thread stop but the C callback would still send events. Let's ignore them to prevent unhandled exceptions later: AttributeError: 'NoneType' object has no attribute 'is_recursive' File "watchdog/observers/fsevents.py", line 299, in callback emitter.queue_events(emitter.timeout, events) File "watchdog/observers/fsevents.py", line 261, in queue_events self._queue_created_event(event, src_path, src_dirname) File "watchdog/observers/fsevents.py", line 124, in _queue_created_event self.queue_event(DirModifiedEvent(dirname)) File "watchdog/observers/fsevents.py", line 97, in queue_event if self._watch.is_recursive : Or even: AttributeError: 'NoneType' object has no attribute 'path' File "watchdog/observers/fsevents.py", line 299, in callback emitter.queue_events(emitter.timeout, events) File "watchdog/observers/fsevents.py", line 174, in queue_events src_path = self._encode_path(event.path) File "watchdog/observers/fsevents.py", line 323, in _encode_path if isinstance(self.watch.path, bytes): Co-authored-by: Romain Grasland --- changelog.rst | 4 +- src/watchdog/observers/fsevents.py | 40 ++++++++------ tests/test_fsevents.py | 84 ++++++++++++++++++++++++++++++ 3 files changed, 109 insertions(+), 19 deletions(-) diff --git a/changelog.rst b/changelog.rst index 8c1e7b21b..245b91d9f 100644 --- a/changelog.rst +++ b/changelog.rst @@ -8,8 +8,8 @@ Changelog 2021-0x-xx • `full history `__ -- -- Thanks to our beloved contributors: +- [mac] Fix callback exceptions when the watcher is deleted but still receiving events +- Thanks to our beloved contributors: @rom1win, @BoboTiG 2.1.0 diff --git a/src/watchdog/observers/fsevents.py b/src/watchdog/observers/fsevents.py index 01b6bf72c..b2eb52759 100644 --- a/src/watchdog/observers/fsevents.py +++ b/src/watchdog/observers/fsevents.py @@ -83,13 +83,14 @@ def __init__(self, event_queue, watch, timeout=DEFAULT_EMITTER_TIMEOUT, suppress self.suppress_history = suppress_history self._start_time = 0.0 self._starting_state = None - self._lock = threading.Lock() + self._lock = threading.RLock() def on_thread_stop(self): if self.watch: _fsevents.remove_watch(self.watch) _fsevents.stop(self) - self._watch = None + with self._lock: + self._watch = None def queue_event(self, event): # fsevents defaults to be recursive, so if the watch was meant to be non-recursive then we need to drop @@ -287,23 +288,28 @@ def queue_events(self, timeout, events): self._fs_view.clear() + def events_callback(self, paths, inodes, flags, ids): + try: + events = [ + _fsevents.NativeEvent(path, inode, event_flags, event_id) + for path, inode, event_flags, event_id in zip( + paths, inodes, flags, ids + ) + ] + if not self.watch: + # The watcher might be cleared on thread stop but the C callback would + # still send events. Let's ignore them to prevent unhandled exceptions later. + return + with self._lock: + self.queue_events(self.timeout, events) + except Exception: + logger.exception("Unhandled exception in fsevents callback") + def run(self): + self.pathnames = [self.watch.path] + self._start_time = time.monotonic() try: - def callback(paths, inodes, flags, ids, emitter=self): - try: - with emitter._lock: - events = [ - _fsevents.NativeEvent(path, inode, event_flags, event_id) - for path, inode, event_flags, event_id in zip(paths, inodes, flags, ids) - ] - emitter.queue_events(emitter.timeout, events) - except Exception: - logger.exception("Unhandled exception in fsevents callback") - - self.pathnames = [self.watch.path] - self._start_time = time.monotonic() - - _fsevents.add_watch(self, self.watch, callback, self.pathnames) + _fsevents.add_watch(self, self.watch, self.events_callback, self.pathnames) _fsevents.read_events(self) except Exception: logger.exception("Unhandled exception in FSEventsEmitter") diff --git a/tests/test_fsevents.py b/tests/test_fsevents.py index b850a519e..4a0430c7a 100644 --- a/tests/test_fsevents.py +++ b/tests/test_fsevents.py @@ -12,6 +12,10 @@ from functools import partial from os import mkdir, rmdir from queue import Queue +from random import random +from threading import Thread +from time import sleep +from unittest.mock import patch import _watchdog_fsevents as _fsevents from watchdog.events import FileSystemEventHandler @@ -100,6 +104,86 @@ def callback(path, inodes, flags, ids): rmdir(a) +def test_watcher_deletion_while_receiving_events_1(caplog, observer): + """ + When the watcher is stopped while there are events, such exception could happen: + + Traceback (most recent call last): + File "observers/fsevents.py", line 327, in events_callback + self.queue_events(self.timeout, events) + File "observers/fsevents.py", line 187, in queue_events + src_path = self._encode_path(event.path) + File "observers/fsevents.py", line 352, in _encode_path + if isinstance(self.watch.path, bytes): + AttributeError: 'NoneType' object has no attribute 'path' + """ + tmpdir = p() + + orig = FSEventsEmitter.events_callback + + def cb(*args): + FSEventsEmitter.stop(emitter) + orig(*args) + + with caplog.at_level(logging.ERROR), patch.object(FSEventsEmitter, "events_callback", new=cb): + start_watching(tmpdir) + # Less than #00 is not enough events to trigger the error + for n in range(100): + touch(p("{}.txt".format(n))) + emitter.stop() + assert not caplog.records + + +def test_watcher_deletion_while_receiving_events_2(caplog): + """Note: that test takes about 20 seconds to complete. + + Quite similar test to prevent another issue + when the watcher is stopped while there are events, such exception could happen: + + Traceback (most recent call last): + File "observers/fsevents.py", line 327, in events_callback + self.queue_events(self.timeout, events) + File "observers/fsevents.py", line 235, in queue_events + self._queue_created_event(event, src_path, src_dirname) + File "observers/fsevents.py", line 132, in _queue_created_event + self.queue_event(cls(src_path)) + File "observers/fsevents.py", line 104, in queue_event + if self._watch.is_recursive: + AttributeError: 'NoneType' object has no attribute 'is_recursive' + """ + + def try_to_fail(): + tmpdir = p() + start_watching(tmpdir) + + def create_files(): + # Less than 2000 is not enough events to trigger the error + for n in range(2000): + touch(p(str(n) + ".txt")) + + def stop(em): + sleep(random()) + em.stop() + + th1 = Thread(target=create_files) + th2 = Thread(target=stop, args=(emitter,)) + + try: + with caplog.at_level(logging.ERROR): + th1.start() + th2.start() + th1.join() + th2.join() + assert not caplog.records + finally: + emitter.stop() + + # 20 attempts to make the random failure happen + for _ in range(20): + try_to_fail() + sleep(random()) + + def test_remove_watch_twice(): """ ValueError: PyCapsule_GetPointer called with invalid PyCapsule object From 6dde6edb5ab13dab5ac0bf1c1a7e4a8ed987e463 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micka=C3=ABl=20Schoentgen?= Date: Thu, 6 May 2021 15:52:51 +0200 Subject: [PATCH 2/6] [skip ci] Add docstring + fix comment --- src/watchdog/observers/fsevents.py | 3 +++ tests/test_fsevents.py | 2 +- 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/src/watchdog/observers/fsevents.py b/src/watchdog/observers/fsevents.py index b2eb52759..33d2e0ed6 100644 --- a/src/watchdog/observers/fsevents.py +++ b/src/watchdog/observers/fsevents.py @@ -289,6 +289,9 @@ def queue_events(self, timeout, events): self._fs_view.clear() def events_callback(self, paths, inodes, flags, ids): + """Callback passed to FSEventStreamCreate(), it will receive all + FS events and queue them. + """ try: events = [ _fsevents.NativeEvent(path, inode, event_flags, event_id) diff --git a/tests/test_fsevents.py b/tests/test_fsevents.py index 4a0430c7a..e4ddb1f1d 100644 --- a/tests/test_fsevents.py +++ b/tests/test_fsevents.py @@ -127,7 +127,7 @@ def cb(*args): with caplog.at_level(logging.ERROR), patch.object(FSEventsEmitter, "events_callback", new=cb): start_watching(tmpdir) - # Less than #00 is not enough events to trigger the error + # Less than 100 is not enough events to trigger the error for n in range(100): touch(p("{}.txt".format(n))) emitter.stop() From 8c4629cc58e88e15f5e066307108b0dc11f61d7e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micka=C3=ABl=20Schoentgen?= Date: Fri, 7 May 2021 14:01:54 +0200 Subject: [PATCH 3/6] Patch from #788 --- src/watchdog/observers/fsevents.py | 7 ++----- src/watchdog_fsevents.c | 11 +++++++---- tests/test_fsevents.py | 4 +--- 3 files changed, 10 insertions(+), 12 deletions(-) diff --git a/src/watchdog/observers/fsevents.py b/src/watchdog/observers/fsevents.py index 33d2e0ed6..5267b05c6 100644 --- a/src/watchdog/observers/fsevents.py +++ b/src/watchdog/observers/fsevents.py @@ -86,11 +86,8 @@ def __init__(self, event_queue, watch, timeout=DEFAULT_EMITTER_TIMEOUT, suppress self._lock = threading.RLock() def on_thread_stop(self): - if self.watch: - _fsevents.remove_watch(self.watch) - _fsevents.stop(self) - with self._lock: - self._watch = None + _fsevents.remove_watch(self.watch) + _fsevents.stop(self) def queue_event(self, event): # fsevents defaults to be recursive, so if the watch was meant to be non-recursive then we need to drop diff --git a/src/watchdog_fsevents.c b/src/watchdog_fsevents.c index 7deecdc71..c2c762680 100644 --- a/src/watchdog_fsevents.c +++ b/src/watchdog_fsevents.c @@ -784,17 +784,20 @@ static PyObject * watchdog_remove_watch(PyObject *self, PyObject *watch) { UNUSED(self); - PyObject *value = PyDict_GetItem(watch_to_stream, watch); + PyObject *streamref_capsule = PyDict_GetItem(watch_to_stream, watch); + if (!streamref_capsule) { + // A watch might have been removed explicitly before, in which case we can simply early out. + Py_RETURN_NONE; + } PyDict_DelItem(watch_to_stream, watch); - FSEventStreamRef stream_ref = PyCapsule_GetPointer(value, NULL); + FSEventStreamRef stream_ref = PyCapsule_GetPointer(streamref_capsule, NULL); FSEventStreamStop(stream_ref); FSEventStreamInvalidate(stream_ref); FSEventStreamRelease(stream_ref); - Py_INCREF(Py_None); - return Py_None; + Py_RETURN_NONE; } PyDoc_STRVAR(watchdog_stop__doc__, diff --git a/tests/test_fsevents.py b/tests/test_fsevents.py index e4ddb1f1d..4dac7c17b 100644 --- a/tests/test_fsevents.py +++ b/tests/test_fsevents.py @@ -227,9 +227,7 @@ def on_thread_stop(self): w = observer.schedule(FileSystemEventHandler(), a, recursive=False) rmdir(a) time.sleep(0.1) - with pytest.raises(KeyError): - # watch no longer exists! - observer.unschedule(w) + observer.unschedule(w) def test_converting_cfstring_to_pyunicode(): From 3007b17258f17b094cc8b2c245231a24e47e76b6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micka=C3=ABl=20Schoentgen?= Date: Fri, 7 May 2021 14:03:04 +0200 Subject: [PATCH 4/6] Revert some changes --- src/watchdog/observers/fsevents.py | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/src/watchdog/observers/fsevents.py b/src/watchdog/observers/fsevents.py index 5267b05c6..5ea99c7c0 100644 --- a/src/watchdog/observers/fsevents.py +++ b/src/watchdog/observers/fsevents.py @@ -83,7 +83,7 @@ def __init__(self, event_queue, watch, timeout=DEFAULT_EMITTER_TIMEOUT, suppress self.suppress_history = suppress_history self._start_time = 0.0 self._starting_state = None - self._lock = threading.RLock() + self._lock = threading.Lock() def on_thread_stop(self): _fsevents.remove_watch(self.watch) @@ -296,10 +296,6 @@ def events_callback(self, paths, inodes, flags, ids): paths, inodes, flags, ids ) ] - if not self.watch: - # The watcher might be cleared on thread stop but the C callback would - # still send events. Let's ignore them to prevent unhandled exceptions later. - return with self._lock: self.queue_events(self.timeout, events) except Exception: From b2539a1258d118139931c9f269bc2de5bebca92d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micka=C3=ABl=20Schoentgen?= Date: Fri, 7 May 2021 14:04:16 +0200 Subject: [PATCH 5/6] Ad the good contrinutor Co-authored-by: Thomas --- changelog.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/changelog.rst b/changelog.rst index 245b91d9f..0ec47b018 100644 --- a/changelog.rst +++ b/changelog.rst @@ -9,7 +9,7 @@ Changelog 2021-0x-xx • `full history `__ - [mac] Fix callback exceptions when the watcher is deleted but still receiving events -- Thanks to our beloved contributors: @rom1win, @BoboTiG +- Thanks to our beloved contributors: @rom1win, @BoboTiG, @CCP-Aporia 2.1.0 From 3e73f566c3beef7471f25c507ef7e0be14826b9c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micka=C3=ABl=20Schoentgen?= Date: Fri, 7 May 2021 14:10:32 +0200 Subject: [PATCH 6/6] [skip ci] Minor perf improvement --- src/watchdog/observers/fsevents.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/watchdog/observers/fsevents.py b/src/watchdog/observers/fsevents.py index 5ea99c7c0..e7593f449 100644 --- a/src/watchdog/observers/fsevents.py +++ b/src/watchdog/observers/fsevents.py @@ -289,9 +289,10 @@ def events_callback(self, paths, inodes, flags, ids): """Callback passed to FSEventStreamCreate(), it will receive all FS events and queue them. """ + cls = _fsevents.NativeEvent try: events = [ - _fsevents.NativeEvent(path, inode, event_flags, event_id) + cls(path, inode, event_flags, event_id) for path, inode, event_flags, event_id in zip( paths, inodes, flags, ids )