Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support coalesced fsevents #734

Merged
merged 17 commits into from
Jan 19, 2021
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
139 changes: 83 additions & 56 deletions src/watchdog/observers/fsevents.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
:platforms: Mac OS X
"""

import logging
import os
import threading
import unicodedata
Expand All @@ -46,6 +47,9 @@
DEFAULT_OBSERVER_TIMEOUT
)

logger = logging.getLogger('fsevents')
logger.addHandler(logging.NullHandler())


class FSEventsEmitter(EventEmitter):

Expand Down Expand Up @@ -74,79 +78,102 @@ def on_thread_stop(self):
_fsevents.stop(self)
self._watch = None

def queue_events(self, timeout):
with self._lock:
events = self.native_events
i = 0
while i < len(events):
event = events[i]
src_path = self._encode_path(event.path)

# For some reason the create and remove flags are sometimes also
# set for rename and modify type events, so let those take
# precedence.
if event.is_renamed:
# Internal moves appears to always be consecutive in the same
# buffer and have IDs differ by exactly one (while others
# don't) making it possible to pair up the two events coming
# from a singe move operation. (None of this is documented!)
# Otherwise, guess whether file was moved in or out.
# TODO: handle id wrapping
if (i + 1 < len(events) and events[i + 1].is_renamed
and events[i + 1].event_id == event.event_id + 1):
cls = DirMovedEvent if event.is_directory else FileMovedEvent
dst_path = self._encode_path(events[i + 1].path)
self.queue_event(cls(src_path, dst_path))
self.queue_event(DirModifiedEvent(os.path.dirname(src_path)))
self.queue_event(DirModifiedEvent(os.path.dirname(dst_path)))
i += 1
elif os.path.exists(event.path):
cls = DirCreatedEvent if event.is_directory else FileCreatedEvent
self.queue_event(cls(src_path))
self.queue_event(DirModifiedEvent(os.path.dirname(src_path)))
else:
cls = DirDeletedEvent if event.is_directory else FileDeletedEvent
self.queue_event(cls(src_path))
self.queue_event(DirModifiedEvent(os.path.dirname(src_path)))
# TODO: generate events for tree

elif event.is_modified or event.is_inode_meta_mod or event.is_xattr_mod:
cls = DirModifiedEvent if event.is_directory else FileModifiedEvent
def queue_event(self, event):
logger.info("queue_event %s", event)
EventEmitter.queue_event(self, event)

def queue_events(self, timeout, events):
i = 0
while i < len(events):
event = events[i]
logger.info(event)
src_path = self._encode_path(event.path)

"""
FIXME: It is not enough to just de-duplicate the events based on
whether they are coalesced or not. We must also take into
account old and new state. As such we need to track all
events that occurred in order to make a correct decision
about which events should be generated.
It is worth noting that `DirSnapshot` is _not_ the right
way of doing it since that traverses _everything_.
"""

# For some reason the create and remove flags are sometimes also
# set for rename and modify type events, so let those take
# precedence.
if event.is_renamed:
# Internal moves appears to always be consecutive in the same
# buffer and have IDs differ by exactly one (while others
# don't) making it possible to pair up the two events coming
# from a singe move operation. (None of this is documented!)
CCP-Aporia marked this conversation as resolved.
Show resolved Hide resolved
# Otherwise, guess whether file was moved in or out.
# TODO: handle id wrapping
if (i + 1 < len(events) and events[i + 1].is_renamed
and events[i + 1].event_id == event.event_id + 1):
logger.info("Next event for rename is %s", events[i + 1])
cls = DirMovedEvent if event.is_directory else FileMovedEvent
dst_path = self._encode_path(events[i + 1].path)
self.queue_event(cls(src_path, dst_path))
self.queue_event(DirModifiedEvent(os.path.dirname(src_path)))
self.queue_event(DirModifiedEvent(os.path.dirname(dst_path)))
i += 1
CCP-Aporia marked this conversation as resolved.
Show resolved Hide resolved
elif os.path.exists(event.path):
cls = DirCreatedEvent if event.is_directory else FileCreatedEvent
self.queue_event(cls(src_path))
self.queue_event(DirModifiedEvent(os.path.dirname(src_path)))
CCP-Aporia marked this conversation as resolved.
Show resolved Hide resolved
else:
cls = DirDeletedEvent if event.is_directory else FileDeletedEvent
self.queue_event(cls(src_path))
self.queue_event(DirModifiedEvent(os.path.dirname(src_path)))
CCP-Aporia marked this conversation as resolved.
Show resolved Hide resolved
CCP-Aporia marked this conversation as resolved.
Show resolved Hide resolved
# TODO: generate events for tree

elif event.is_created:
cls = DirCreatedEvent if event.is_directory else FileCreatedEvent
if event.is_created:
cls = DirCreatedEvent if event.is_directory else FileCreatedEvent
if not event.is_coalesced or (
event.is_coalesced and not event.is_renamed and os.path.exists(event.path)
):
self.queue_event(cls(src_path))
self.queue_event(DirModifiedEvent(os.path.dirname(src_path)))

elif event.is_removed:
cls = DirDeletedEvent if event.is_directory else FileDeletedEvent
if event.is_modified or event.is_inode_meta_mod or event.is_xattr_mod:
# NB: in the scenario of touch(file) -> rm(file) we can trigger this twice
cls = DirModifiedEvent if event.is_directory else FileModifiedEvent
self.queue_event(cls(src_path))

if event.is_removed:
cls = DirDeletedEvent if event.is_directory else FileDeletedEvent
if not event.is_coalesced or (event.is_coalesced and not os.path.exists(event.path)):
self.queue_event(cls(src_path))
self.queue_event(DirModifiedEvent(os.path.dirname(src_path)))

if src_path == self.watch.path:
# this should not really occur, instead we expect
# is_root_changed to be set
logger.info("Stopping because root path was removed")
self.stop()

elif event.is_root_changed:
# This will be set if root or any if its parents is renamed or
# deleted.
# TODO: find out new path and generate DirMovedEvent?
self.queue_event(DirDeletedEvent(self.watch.path))
self.stop()
if event.is_root_changed:
# This will be set if root or any of its parents is renamed or
# deleted.
# TODO: find out new path and generate DirMovedEvent?
self.queue_event(DirDeletedEvent(self.watch.path))
logger.info("Stopping because root path was changed")
self.stop()

i += 1
i += 1

def run(self):
try:
def callback(pathnames, flags, ids, emitter=self):
with emitter._lock:
emitter.native_events = [
_fsevents.NativeEvent(event_path, event_flags, event_id)
for event_path, event_flags, event_id in zip(pathnames, flags, ids)
]
emitter.queue_events(emitter.timeout)
try:
with emitter._lock:
emitter.queue_events(emitter.timeout, [
_fsevents.NativeEvent(event_path, event_flags, event_id)
for event_path, event_flags, event_id in zip(pathnames, flags, ids)
])
except Exception:
logger.exception("Unhandled exception in fsevents callback")

# for pathname, flag in zip(pathnames, flags):
# if emitter.watch.is_recursive: # and pathname != emitter.watch.path:
Expand Down Expand Up @@ -175,7 +202,7 @@ def callback(pathnames, flags, ids, emitter=self):
self.pathnames)
_fsevents.read_events(self)
except Exception:
pass
logger.exception("Unhandled exception in FSEventsEmitter")

def _encode_path(self, path):
"""Encode path only if bytes were passed to this emitter. """
Expand Down
44 changes: 30 additions & 14 deletions src/watchdog_fsevents.c
Original file line number Diff line number Diff line change
Expand Up @@ -113,20 +113,15 @@ typedef struct {
FSEventStreamEventId id;
} NativeEventObject;

PyObject* NativeEventTypeString(PyObject* instance, void* closure)
{
UNUSED(closure);
PyObject* NativeEventRepr(PyObject* instance) {
NativeEventObject *self = (NativeEventObject*)instance;
if (self->flags & kFSEventStreamEventFlagItemCreated)
return PyUnicode_FromString("Created");
if (self->flags & kFSEventStreamEventFlagItemRemoved)
return PyUnicode_FromString("Removed");
if (self->flags & kFSEventStreamEventFlagItemRenamed)
return PyUnicode_FromString("Renamed");
if (self->flags & kFSEventStreamEventFlagItemModified)
return PyUnicode_FromString("Modified");

return PyUnicode_FromString("Unknown");

return PyUnicode_FromFormat(
"NativeEvent(path=\"%s\", flags=%x, id=%llu)",
self->path,
self->flags,
self->id
);
}

PyObject* NativeEventTypeFlags(PyObject* instance, void* closure)
Expand All @@ -150,6 +145,26 @@ PyObject* NativeEventTypeID(PyObject* instance, void* closure)
return PyLong_FromLong(self->id);
}

PyObject* NativeEventTypeIsCoalesced(PyObject* instance, void* closure)
{
UNUSED(closure);
NativeEventObject *self = (NativeEventObject*)instance;

// if any of these bitmasks match then we have a coalesced event and need to do sys calls to figure out what happened
FSEventStreamEventFlags coalesced_masks[] = {
kFSEventStreamEventFlagItemCreated | kFSEventStreamEventFlagItemRemoved,
kFSEventStreamEventFlagItemCreated | kFSEventStreamEventFlagItemRenamed,
kFSEventStreamEventFlagItemRemoved | kFSEventStreamEventFlagItemRenamed,
};
for (size_t i = 0; i < sizeof(coalesced_masks) / sizeof(FSEventStreamEventFlags); ++i) {
if ((self->flags & coalesced_masks[i]) == coalesced_masks[i]) {
Py_RETURN_TRUE;
}
}

Py_RETURN_FALSE;
}

#define FLAG_PROPERTY(suffix, flag) \
PyObject* NativeEventType##suffix(PyObject* instance, void* closure) \
{ \
Expand Down Expand Up @@ -197,10 +212,10 @@ static int NativeEventInit(NativeEventObject *self, PyObject *args, PyObject *kw
}

static PyGetSetDef NativeEventProperties[] = {
{"_event_type", NativeEventTypeString, NULL, "Textual representation of the native event that occurred", NULL},
{"flags", NativeEventTypeFlags, NULL, "The raw mask of flags as returend by FSEvents", NULL},
{"path", NativeEventTypePath, NULL, "The path for which this event was generated", NULL},
{"event_id", NativeEventTypeID, NULL, "The id of the generated event", NULL},
{"is_coalesced", NativeEventTypeIsCoalesced, NULL, "True if multiple ambiguous changes to the monitored path happened", NULL},
{"must_scan_subdirs", NativeEventTypeIsMustScanSubDirs, NULL, "True if application must rescan all subdirectories", NULL},
{"is_user_dropped", NativeEventTypeIsUserDropped, NULL, "True if a failure during event buffering occured", NULL},
{"is_kernel_dropped", NativeEventTypeIsKernelDropped, NULL, "True if a failure during event buffering occured", NULL},
Expand Down Expand Up @@ -238,6 +253,7 @@ static PyTypeObject NativeEventType = {
.tp_new = PyType_GenericNew,
.tp_getset = NativeEventProperties,
.tp_init = (initproc) NativeEventInit,
.tp_repr = (reprfunc) NativeEventRepr,
};


Expand Down
3 changes: 2 additions & 1 deletion tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,10 @@ def no_thread_leaks():
Fail on thread leak.
We do not use pytest-threadleak because it is not reliable.
"""
old_thread_count = threading.active_count()
yield
gc.collect() # Clear the stuff from other function-level fixtures
assert threading.active_count() == 1 # Only the main thread
assert threading.active_count() == old_thread_count # Only previously existing threads


@pytest.fixture(autouse=True)
Expand Down
Loading