Skip to content

Commit

Permalink
[mac] Support coalesced fsevents (#734)
Browse files Browse the repository at this point in the history
* Add `is_coalesced` property to `NativeEvent`

So that we can effectively decide if we need to perform additional
system calls to figure out what really happened.

* Replace `NativeEvent._event_type` with `repr()` support

It's more pythonic, and the `_event_type` implementation wasn't quite
usable anyway.

NB: the representation is not truly copy/paste python code if there is
a double quote inside event.path, but that should be a rare case so we
don't add the expensive special case handling there.

* Allow running tests with debugger attached

Some Python debuggers create additional threads, so we shouldn't assume that there is only one.

* Request notifications for watched root

* Expect events on macOS instead of using `time.sleep()`

It might be even better to check for the emitter class, as opposed to platform

* Add exception handling to FSEventsEmitter

Reduce the amount of 'silent breakage'

* Use sentinel event when setting up tests on macOS

So that we can avoid a race between test setup and fseventsd

* Improve handling of coalesced events

* Revert accidental platform check change

* Fix renaming_top_level_directory test on macOS

* Generate sub events for move operations

* Remove `filesystem_view` again

While the `filesystem_view` helps with filtering out additional
`FileCreatedEvent`+`DirModifiedEvent` pairs then it also introduces
a huge amount of edge cases for synthetic events caused by move and
rename operations. On top of that, in order to properly resolve
those edge cases we'd have to go back to a solution very similar to
the old directory snapshots, with all the performance penalties they
suffered from...

As such I think it's better to acknowledge the behaviour for coalesced
events instead, and thus remove the `filesystem_view` again.

* Update Changelog
  • Loading branch information
CCP-Aporia committed Jan 19, 2021
1 parent 2fab7c2 commit fc1242e
Show file tree
Hide file tree
Showing 6 changed files with 240 additions and 180 deletions.
3 changes: 2 additions & 1 deletion changelog.rst
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ Changelog

- Avoid deprecated ``PyEval_InitThreads`` on Python 3.7+ (`#746 <https://github.com/gorakhargosh/watchdog/pull/746>`_)
- [inotify] Add support for ``IN_CLOSE_WRITE`` events. A ``FileCloseEvent`` event will be fired. Note that ``IN_CLOSE_NOWRITE`` events are not handled to prevent much noise. (`#184 <https://github.com/gorakhargosh/watchdog/pull/184>`_, `#245 <https://github.com/gorakhargosh/watchdog/pull/245>`_, `#280 <https://github.com/gorakhargosh/watchdog/pull/280>`_, `#313 <https://github.com/gorakhargosh/watchdog/pull/313>`_, `#690 <https://github.com/gorakhargosh/watchdog/pull/690>`_)
- Thanks to our beloved contributors: @bstaletic, @lukassup, @ysard
- [mac] Support coalesced filesystem events (`#734 <https://github.com/gorakhargosh/watchdog/pull/734>`_)
- Thanks to our beloved contributors: @bstaletic, @lukassup, @ysard, @SamSchott, @CCP-Aporia


1.0.2
Expand Down
139 changes: 82 additions & 57 deletions src/watchdog/observers/fsevents.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
:platforms: Mac OS X
"""

import logging
import os
import threading
import unicodedata
Expand All @@ -36,7 +37,9 @@
DirDeletedEvent,
DirModifiedEvent,
DirCreatedEvent,
DirMovedEvent
DirMovedEvent,
generate_sub_created_events,
generate_sub_moved_events
)

from watchdog.observers.api import (
Expand All @@ -46,6 +49,8 @@
DEFAULT_OBSERVER_TIMEOUT
)

logger = logging.getLogger('fsevents')


class FSEventsEmitter(EventEmitter):

Expand Down Expand Up @@ -74,79 +79,99 @@ def on_thread_stop(self):
_fsevents.stop(self)
self._watch = None

def queue_events(self, timeout):
with self._lock:
events = self.native_events
i = 0
while i < len(events):
event = events[i]
src_path = self._encode_path(event.path)

# For some reason the create and remove flags are sometimes also
# set for rename and modify type events, so let those take
# precedence.
if event.is_renamed:
# Internal moves appears to always be consecutive in the same
# buffer and have IDs differ by exactly one (while others
# don't) making it possible to pair up the two events coming
# from a singe move operation. (None of this is documented!)
# Otherwise, guess whether file was moved in or out.
# TODO: handle id wrapping
if (i + 1 < len(events) and events[i + 1].is_renamed
and events[i + 1].event_id == event.event_id + 1):
cls = DirMovedEvent if event.is_directory else FileMovedEvent
dst_path = self._encode_path(events[i + 1].path)
self.queue_event(cls(src_path, dst_path))
self.queue_event(DirModifiedEvent(os.path.dirname(src_path)))
self.queue_event(DirModifiedEvent(os.path.dirname(dst_path)))
i += 1
elif os.path.exists(event.path):
cls = DirCreatedEvent if event.is_directory else FileCreatedEvent
self.queue_event(cls(src_path))
self.queue_event(DirModifiedEvent(os.path.dirname(src_path)))
else:
cls = DirDeletedEvent if event.is_directory else FileDeletedEvent
self.queue_event(cls(src_path))
self.queue_event(DirModifiedEvent(os.path.dirname(src_path)))
# TODO: generate events for tree

elif event.is_modified or event.is_inode_meta_mod or event.is_xattr_mod:
cls = DirModifiedEvent if event.is_directory else FileModifiedEvent
def queue_event(self, event):
logger.info("queue_event %s", event)
EventEmitter.queue_event(self, event)

def queue_events(self, timeout, events):
i = 0
while i < len(events):
event = events[i]
logger.info(event)
src_path = self._encode_path(event.path)

if event.is_renamed:
# Internal moves appears to always be consecutive in the same
# buffer and have IDs differ by exactly one (while others
# don't) making it possible to pair up the two events coming
# from a singe move operation. (None of this is documented!)
# Otherwise, guess whether file was moved in or out.
# TODO: handle id wrapping
if (i + 1 < len(events) and events[i + 1].is_renamed
and events[i + 1].event_id == event.event_id + 1):
logger.info("Next event for rename is %s", events[i + 1])
cls = DirMovedEvent if event.is_directory else FileMovedEvent
dst_path = self._encode_path(events[i + 1].path)
self.queue_event(cls(src_path, dst_path))
self.queue_event(DirModifiedEvent(os.path.dirname(src_path)))
self.queue_event(DirModifiedEvent(os.path.dirname(dst_path)))
for sub_event in generate_sub_moved_events(src_path, dst_path):
logger.info("Generated sub event: %s", sub_event)
self.queue_event(sub_event)
i += 1
elif os.path.exists(event.path):
cls = DirCreatedEvent if event.is_directory else FileCreatedEvent
self.queue_event(cls(src_path))
self.queue_event(DirModifiedEvent(os.path.dirname(src_path)))
for sub_event in generate_sub_created_events(src_path):
self.queue_event(sub_event)
else:
cls = DirDeletedEvent if event.is_directory else FileDeletedEvent
self.queue_event(cls(src_path))
self.queue_event(DirModifiedEvent(os.path.dirname(src_path)))

elif event.is_created:
cls = DirCreatedEvent if event.is_directory else FileCreatedEvent
if event.is_created:
cls = DirCreatedEvent if event.is_directory else FileCreatedEvent
if not event.is_coalesced or (
event.is_coalesced and not event.is_renamed and not event.is_modified and not
event.is_inode_meta_mod and not event.is_xattr_mod
):
self.queue_event(cls(src_path))
self.queue_event(DirModifiedEvent(os.path.dirname(src_path)))

elif event.is_removed:
cls = DirDeletedEvent if event.is_directory else FileDeletedEvent
if event.is_modified and not event.is_coalesced and os.path.exists(src_path):
cls = DirModifiedEvent if event.is_directory else FileModifiedEvent
self.queue_event(cls(src_path))

if event.is_inode_meta_mod or event.is_xattr_mod:
if os.path.exists(src_path) and not event.is_coalesced:
# NB: in the scenario of touch(file) -> rm(file) we can trigger this twice
cls = DirModifiedEvent if event.is_directory else FileModifiedEvent
self.queue_event(cls(src_path))

if event.is_removed:
cls = DirDeletedEvent if event.is_directory else FileDeletedEvent
if not event.is_coalesced or (event.is_coalesced and not os.path.exists(event.path)):
self.queue_event(cls(src_path))
self.queue_event(DirModifiedEvent(os.path.dirname(src_path)))

if src_path == self.watch.path:
# this should not really occur, instead we expect
# is_root_changed to be set
logger.info("Stopping because root path was removed")
self.stop()

elif event.is_root_changed:
# This will be set if root or any if its parents is renamed or
# deleted.
# TODO: find out new path and generate DirMovedEvent?
self.queue_event(DirDeletedEvent(self.watch.path))
self.stop()
if event.is_root_changed:
# This will be set if root or any of its parents is renamed or
# deleted.
# TODO: find out new path and generate DirMovedEvent?
self.queue_event(DirDeletedEvent(self.watch.path))
logger.info("Stopping because root path was changed")
self.stop()

i += 1
i += 1

def run(self):
try:
def callback(pathnames, flags, ids, emitter=self):
with emitter._lock:
emitter.native_events = [
_fsevents.NativeEvent(event_path, event_flags, event_id)
for event_path, event_flags, event_id in zip(pathnames, flags, ids)
]
emitter.queue_events(emitter.timeout)
try:
with emitter._lock:
emitter.queue_events(emitter.timeout, [
_fsevents.NativeEvent(event_path, event_flags, event_id)
for event_path, event_flags, event_id in zip(pathnames, flags, ids)
])
except Exception:
logger.exception("Unhandled exception in fsevents callback")

# for pathname, flag in zip(pathnames, flags):
# if emitter.watch.is_recursive: # and pathname != emitter.watch.path:
Expand Down Expand Up @@ -175,7 +200,7 @@ def callback(pathnames, flags, ids, emitter=self):
self.pathnames)
_fsevents.read_events(self)
except Exception:
pass
logger.exception("Unhandled exception in FSEventsEmitter")

def _encode_path(self, path):
"""Encode path only if bytes were passed to this emitter. """
Expand Down
44 changes: 30 additions & 14 deletions src/watchdog_fsevents.c
Original file line number Diff line number Diff line change
Expand Up @@ -113,20 +113,15 @@ typedef struct {
FSEventStreamEventId id;
} NativeEventObject;

PyObject* NativeEventTypeString(PyObject* instance, void* closure)
{
UNUSED(closure);
PyObject* NativeEventRepr(PyObject* instance) {
NativeEventObject *self = (NativeEventObject*)instance;
if (self->flags & kFSEventStreamEventFlagItemCreated)
return PyUnicode_FromString("Created");
if (self->flags & kFSEventStreamEventFlagItemRemoved)
return PyUnicode_FromString("Removed");
if (self->flags & kFSEventStreamEventFlagItemRenamed)
return PyUnicode_FromString("Renamed");
if (self->flags & kFSEventStreamEventFlagItemModified)
return PyUnicode_FromString("Modified");

return PyUnicode_FromString("Unknown");

return PyUnicode_FromFormat(
"NativeEvent(path=\"%s\", flags=%x, id=%llu)",
self->path,
self->flags,
self->id
);
}

PyObject* NativeEventTypeFlags(PyObject* instance, void* closure)
Expand All @@ -150,6 +145,26 @@ PyObject* NativeEventTypeID(PyObject* instance, void* closure)
return PyLong_FromLong(self->id);
}

PyObject* NativeEventTypeIsCoalesced(PyObject* instance, void* closure)
{
UNUSED(closure);
NativeEventObject *self = (NativeEventObject*)instance;

// if any of these bitmasks match then we have a coalesced event and need to do sys calls to figure out what happened
FSEventStreamEventFlags coalesced_masks[] = {
kFSEventStreamEventFlagItemCreated | kFSEventStreamEventFlagItemRemoved,
kFSEventStreamEventFlagItemCreated | kFSEventStreamEventFlagItemRenamed,
kFSEventStreamEventFlagItemRemoved | kFSEventStreamEventFlagItemRenamed,
};
for (size_t i = 0; i < sizeof(coalesced_masks) / sizeof(FSEventStreamEventFlags); ++i) {
if ((self->flags & coalesced_masks[i]) == coalesced_masks[i]) {
Py_RETURN_TRUE;
}
}

Py_RETURN_FALSE;
}

#define FLAG_PROPERTY(suffix, flag) \
PyObject* NativeEventType##suffix(PyObject* instance, void* closure) \
{ \
Expand Down Expand Up @@ -197,10 +212,10 @@ static int NativeEventInit(NativeEventObject *self, PyObject *args, PyObject *kw
}

static PyGetSetDef NativeEventProperties[] = {
{"_event_type", NativeEventTypeString, NULL, "Textual representation of the native event that occurred", NULL},
{"flags", NativeEventTypeFlags, NULL, "The raw mask of flags as returend by FSEvents", NULL},
{"path", NativeEventTypePath, NULL, "The path for which this event was generated", NULL},
{"event_id", NativeEventTypeID, NULL, "The id of the generated event", NULL},
{"is_coalesced", NativeEventTypeIsCoalesced, NULL, "True if multiple ambiguous changes to the monitored path happened", NULL},
{"must_scan_subdirs", NativeEventTypeIsMustScanSubDirs, NULL, "True if application must rescan all subdirectories", NULL},
{"is_user_dropped", NativeEventTypeIsUserDropped, NULL, "True if a failure during event buffering occured", NULL},
{"is_kernel_dropped", NativeEventTypeIsKernelDropped, NULL, "True if a failure during event buffering occured", NULL},
Expand Down Expand Up @@ -238,6 +253,7 @@ static PyTypeObject NativeEventType = {
.tp_new = PyType_GenericNew,
.tp_getset = NativeEventProperties,
.tp_init = (initproc) NativeEventInit,
.tp_repr = (reprfunc) NativeEventRepr,
};


Expand Down
3 changes: 2 additions & 1 deletion tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,10 @@ def no_thread_leaks():
Fail on thread leak.
We do not use pytest-threadleak because it is not reliable.
"""
old_thread_count = threading.active_count()
yield
gc.collect() # Clear the stuff from other function-level fixtures
assert threading.active_count() == 1 # Only the main thread
assert threading.active_count() == old_thread_count # Only previously existing threads


@pytest.fixture(autouse=True)
Expand Down
Loading

0 comments on commit fc1242e

Please sign in to comment.