Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Performance improvements for the fsevents module #680

Merged
merged 3 commits into from
Oct 10, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion changelog.rst
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ Changelog

- Add logger parameter for the LoggingEventHandler (`#676 <https://github.com/gorakhargosh/watchdog/pull/676>`_)
- Replace mutable default arguments with ``if None`` implementation (`#677 <https://github.com/gorakhargosh/watchdog/pull/677>`_)
- Thanks to our beloved contributors: @Sraw
- [mac] Performance improvements for the `fsevents` module (`#680 <https://github.com/gorakhargosh/watchdog/pull/680>`_)
- Thanks to our beloved contributors: @Sraw, @CCP-Aporia


0.10.3
Expand Down
83 changes: 53 additions & 30 deletions src/watchdog/observers/fsevents.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

from __future__ import with_statement

import os
import sys
import threading
import unicodedata
Expand All @@ -41,7 +42,6 @@
DirMovedEvent
)

from watchdog.utils.dirsnapshot import DirectorySnapshot
from watchdog.observers.api import (
BaseObserver,
EventEmitter,
Expand Down Expand Up @@ -70,7 +70,6 @@ class FSEventsEmitter(EventEmitter):
def __init__(self, event_queue, watch, timeout=DEFAULT_EMITTER_TIMEOUT):
EventEmitter.__init__(self, event_queue, watch, timeout)
self._lock = threading.Lock()
self.snapshot = DirectorySnapshot(watch.path, watch.is_recursive)

def on_thread_stop(self):
if self.watch:
Expand All @@ -80,37 +79,61 @@ def on_thread_stop(self):

def queue_events(self, timeout):
with self._lock:
if (not self.watch.is_recursive
and self.watch.path not in self.pathnames):
return
new_snapshot = DirectorySnapshot(self.watch.path,
self.watch.is_recursive)
events = new_snapshot - self.snapshot
self.snapshot = new_snapshot

# Files.
for src_path in events.files_deleted:
self.queue_event(FileDeletedEvent(src_path))
for src_path in events.files_modified:
self.queue_event(FileModifiedEvent(src_path))
for src_path in events.files_created:
self.queue_event(FileCreatedEvent(src_path))
for src_path, dest_path in events.files_moved:
self.queue_event(FileMovedEvent(src_path, dest_path))

# Directories.
for src_path in events.dirs_deleted:
self.queue_event(DirDeletedEvent(src_path))
for src_path in events.dirs_modified:
self.queue_event(DirModifiedEvent(src_path))
for src_path in events.dirs_created:
self.queue_event(DirCreatedEvent(src_path))
for src_path, dest_path in events.dirs_moved:
self.queue_event(DirMovedEvent(src_path, dest_path))
events = self.native_events
i = 0
while i < len(events):
event = events[i]

# For some reason the create and remove flags are sometimes also
# set for rename and modify type events, so let those take
# precedence.
if event.is_renamed:
# Internal moves appears to always be consecutive in the same
# buffer and have IDs differ by exactly one (while others
# don't) making it possible to pair up the two events coming
# from a singe move operation. (None of this is documented!)
# Otherwise, guess whether file was moved in or out.
# TODO: handle id wrapping
if (i + 1 < len(events) and events[i + 1].is_renamed
and events[i + 1].event_id == event.event_id + 1):
cls = DirMovedEvent if event.is_directory else FileMovedEvent
self.queue_event(cls(event.path, events[i + 1].path))
self.queue_event(DirModifiedEvent(os.path.dirname(event.path)))
self.queue_event(DirModifiedEvent(os.path.dirname(events[i + 1].path)))
i += 1
elif os.path.exists(event.path):
cls = DirCreatedEvent if event.is_directory else FileCreatedEvent
self.queue_event(cls(event.path))
self.queue_event(DirModifiedEvent(os.path.dirname(event.path)))
else:
cls = DirDeletedEvent if event.is_directory else FileDeletedEvent
self.queue_event(cls(event.path))
self.queue_event(DirModifiedEvent(os.path.dirname(event.path)))
# TODO: generate events for tree

elif event.is_modified or event.is_inode_meta_mod or event.is_xattr_mod :
cls = DirModifiedEvent if event.is_directory else FileModifiedEvent
self.queue_event(cls(event.path))

elif event.is_created:
cls = DirCreatedEvent if event.is_directory else FileCreatedEvent
self.queue_event(cls(event.path))
self.queue_event(DirModifiedEvent(os.path.dirname(event.path)))

elif event.is_removed:
cls = DirDeletedEvent if event.is_directory else FileDeletedEvent
self.queue_event(cls(event.path))
self.queue_event(DirModifiedEvent(os.path.dirname(event.path)))
i += 1

def run(self):
try:
def callback(pathnames, flags, emitter=self):
def callback(pathnames, flags, ids, emitter=self):
with emitter._lock:
emitter.native_events = [
_fsevents.NativeEvent(event_path, event_flags, event_id)
for event_path, event_flags, event_id in zip(pathnames, flags, ids)
]
emitter.queue_events(emitter.timeout)

# for pathname, flag in zip(pathnames, flags):
Expand Down
159 changes: 149 additions & 10 deletions src/watchdog_fsevents.c
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,12 @@
#define G_RETURN_IF_NOT(condition) do { if (!condition) { return; } } while (0)
#define UNUSED(x) (void)x

#if PY_MAJOR_VERSION < 3
#define AS_PYTHON_STRING(x) PyString_FromString(x)
#else /* PY_MAJOR_VERSION < 3 */
#define AS_PYTHON_STRING(x) PyUnicode_FromString(x)
#endif /* PY_MAJOR_VERSION < 3 */

/* Error message definitions. */
#define ERROR_CANNOT_CALL_CALLBACK "Unable to call Python callback."

Expand All @@ -56,7 +62,7 @@ typedef struct {
* function must accept 2 arguments, both of which
* are Python lists::
*
* def python_callback(event_paths, event_flags):
* def python_callback(event_paths, event_flags, event_ids):
* pass
*/
PyObject *python_callback;
Expand All @@ -77,6 +83,116 @@ typedef struct {
} StreamCallbackInfo;


/**
* NativeEvent type so that we don't need to expose the FSEvents constants to Python land
*/
typedef struct {
PyObject_HEAD
const char *path;
FSEventStreamEventFlags flags;
FSEventStreamEventId id;
} NativeEventObject;

PyObject* NativeEventTypeString(PyObject* instance, void* closure)
{
UNUSED(closure);
NativeEventObject *self = (NativeEventObject*)instance;
if (self->flags & kFSEventStreamEventFlagItemCreated)
return AS_PYTHON_STRING("Created");
if (self->flags & kFSEventStreamEventFlagItemRemoved)
return AS_PYTHON_STRING("Removed");
if (self->flags & kFSEventStreamEventFlagItemRenamed)
return AS_PYTHON_STRING("Renamed");
if (self->flags & kFSEventStreamEventFlagItemModified)
return AS_PYTHON_STRING("Modified");

return AS_PYTHON_STRING("Unknown");
}

PyObject* NativeEventTypeFlags(PyObject* instance, void* closure)
{
UNUSED(closure);
NativeEventObject *self = (NativeEventObject*)instance;
#if PY_MAJOR_VERSION < 3
return PyInt_FromLong(self->flags);
#else /* PY_MAJOR_VERSION < 3 */
return PyLong_FromLong(self->flags);
#endif /* PY_MAJOR_VERSION < 3 */
}

PyObject* NativeEventTypePath(PyObject* instance, void* closure)
{
UNUSED(closure);
NativeEventObject *self = (NativeEventObject*)instance;
return AS_PYTHON_STRING(self->path);
}

PyObject* NativeEventTypeID(PyObject* instance, void* closure)
{
UNUSED(closure);
NativeEventObject *self = (NativeEventObject*)instance;
#if PY_MAJOR_VERSION < 3
return PyInt_FromLong(self->id);
#else /* PY_MAJOR_VERSION < 3 */
return PyLong_FromLong(self->id);
#endif /* PY_MAJOR_VERSION < 3 */
}

#define FLAG_PROPERTY(suffix, flag) \
PyObject* NativeEventType##suffix(PyObject* instance, void* closure) \
{ \
UNUSED(closure); \
NativeEventObject *self = (NativeEventObject*)instance; \
if (self->flags & flag) { \
Py_RETURN_TRUE; \
} \
Py_RETURN_FALSE; \
}

FLAG_PROPERTY(IsCreated, kFSEventStreamEventFlagItemCreated)
FLAG_PROPERTY(IsRemoved, kFSEventStreamEventFlagItemRemoved)
FLAG_PROPERTY(IsRenamed, kFSEventStreamEventFlagItemRenamed)
FLAG_PROPERTY(IsModified, kFSEventStreamEventFlagItemModified)
FLAG_PROPERTY(IsDirectory, kFSEventStreamEventFlagItemIsDir)

static int NativeEventInit(NativeEventObject *self, PyObject *args, PyObject *kwds)
{
static char *kwlist[] = {"path", "flags", "id", NULL};

if (!PyArg_ParseTupleAndKeywords(args, kwds, "|sIL", kwlist, &self->path, &self->flags, &self->id)) {
return -1;
}

return 0;
}

static PyGetSetDef NativeEventProperties[] = {
{"_event_type", NativeEventTypeString, NULL, "Textual representation of the native event that occurred", NULL},
{"flags", NativeEventTypeFlags, NULL, "The raw mask of flags as returend by FSEvents", NULL},
{"path", NativeEventTypePath, NULL, "The path for which this event was generated", NULL},
{"id", NativeEventTypeID, NULL, "The id of the generated event", NULL},
{"is_created", NativeEventTypeIsCreated, NULL, "True if self.path was created on the filesystem", NULL},
{"is_removed", NativeEventTypeIsRemoved, NULL, "True if self.path was removed from the filesystem", NULL},
{"is_renamed", NativeEventTypeIsRenamed, NULL, "True if self.path was renamed on the filesystem", NULL},
{"is_modified", NativeEventTypeIsModified, NULL, "True if self.path was modified", NULL},
{"is_directory", NativeEventTypeIsDirectory, NULL, "True if self.path is a directory", NULL},
{NULL, NULL, NULL, NULL, NULL},
};


static PyTypeObject NativeEventType = {
PyVarObject_HEAD_INIT(NULL, 0)
.tp_name = "_watchdog_fsevents.NativeEvent",
.tp_doc = "A wrapper around native FSEvents events",
.tp_basicsize = sizeof(NativeEventObject),
.tp_itemsize = 0,
.tp_flags = Py_TPFLAGS_DEFAULT | Py_TPFLAGS_BASETYPE,
.tp_new = PyType_GenericNew,
.tp_getset = NativeEventProperties,
.tp_init = (initproc) NativeEventInit,
};


/**
* Dictionary to keep track of which run loop
* belongs to which emitter thread.
Expand Down Expand Up @@ -136,12 +252,13 @@ watchdog_FSEventStreamCallback(ConstFSEventStreamRef stream_ref,
const FSEventStreamEventId event_ids[])
{
UNUSED(stream_ref);
UNUSED(event_ids);
size_t i = 0;
PyObject *callback_result = NULL;
PyObject *path = NULL;
PyObject *id = NULL;
PyObject *flags = NULL;
PyObject *py_event_flags = NULL;
PyObject *py_event_ids = NULL;
PyObject *py_event_paths = NULL;
PyThreadState *saved_thread_state = NULL;

Expand All @@ -152,41 +269,46 @@ watchdog_FSEventStreamCallback(ConstFSEventStreamRef stream_ref,
/* Convert event flags and paths to Python ints and strings. */
py_event_paths = PyList_New(num_events);
py_event_flags = PyList_New(num_events);
if (G_NOT(py_event_paths && py_event_flags))
py_event_ids = PyList_New(num_events);
if (G_NOT(py_event_paths && py_event_flags && py_event_ids))
{
Py_DECREF(py_event_paths);
Py_DECREF(py_event_ids);
Py_DECREF(py_event_flags);
return /*NULL*/;
}
for (i = 0; i < num_events; ++i)
{
id = PyLong_FromLongLong(event_flags[i]);
#if PY_MAJOR_VERSION >= 3
path = PyUnicode_FromString(event_paths[i]);
flags = PyLong_FromLong(event_flags[i]);
#else
path = PyString_FromString(event_paths[i]);
flags = PyInt_FromLong(event_flags[i]);
#endif
if (G_NOT(path && flags))
if (G_NOT(path && flags && id))
{
Py_DECREF(py_event_paths);
Py_DECREF(py_event_flags);
Py_DECREF(py_event_ids);
return /*NULL*/;
}
PyList_SET_ITEM(py_event_paths, i, path);
PyList_SET_ITEM(py_event_flags, i, flags);
PyList_SET_ITEM(py_event_ids, i, id);
}

/* Call the Python callback function supplied by the stream information
* struct. The Python callback function should accept two arguments,
* both being Python lists:
*
* def python_callback(event_paths, event_flags):
* def python_callback(event_paths, event_flags, event_ids):
* pass
*/
callback_result = \
PyObject_CallFunction(stream_callback_info_ref->python_callback,
"OO", py_event_paths, py_event_flags);
"OOO", py_event_paths, py_event_flags, py_event_ids);
if (G_IS_NULL(callback_result))
{
if (G_NOT(PyErr_Occurred()))
Expand Down Expand Up @@ -306,7 +428,7 @@ watchdog_FSEventStreamCreate(StreamCallbackInfo *stream_callback_info_ref,
paths,
kFSEventStreamEventIdSinceNow,
stream_latency,
kFSEventStreamCreateFlagNoDefer);
kFSEventStreamCreateFlagNoDefer | kFSEventStreamCreateFlagFileEvents);
CFRelease(paths);
return stream_ref;
}
Expand All @@ -322,9 +444,9 @@ PyDoc_STRVAR(watchdog_add_watch__doc__,
:param callback:\n\
The callback function to call when an event occurs.\n\n\
Example::\n\n\
def callback(paths, flags):\n\
for path, flag in zip(paths, flags):\n\
print(\"%s=%ul\" % (path, flag))\n\
def callback(paths, flags, ids):\n\
for path, flag, event_id in zip(paths, flags, ids):\n\
print(\"%d: %s=%ul\" % (event_id, path, flag))\n\
:param paths:\n\
A list of paths to monitor.\n");
static PyObject *
Expand Down Expand Up @@ -591,9 +713,18 @@ void initwatchdog_fsevents(void);
void
init_watchdog_fsevents(void)
{
NativeEventType.tp_new = PyType_GenericNew;
G_RETURN_IF(PyType_Ready(&NativeEventType) < 0);
PyObject *module = Py_InitModule3(MODULE_NAME,
watchdog_fsevents_methods,
watchdog_fsevents_module__doc__);
G_RETURN_IF(module == NULL);
Py_INCREF(&NativeEventType);
if (PyModule_AddObject(module, "NativeEvent", (PyObject*)&NativeEventType) < 0) {
Py_DECREF(&NativeEventType);
Py_DECREF(module);
return;
}
watchdog_module_add_attributes(module);
watchdog_module_init();
}
Expand All @@ -617,7 +748,15 @@ static struct PyModuleDef watchdog_fsevents_module = {
*/
PyMODINIT_FUNC
PyInit__watchdog_fsevents(void){
G_RETURN_NULL_IF(PyType_Ready(&NativeEventType) < 0);
PyObject *module = PyModule_Create(&watchdog_fsevents_module);
G_RETURN_NULL_IF_NULL(module);
Py_INCREF(&NativeEventType);
if (PyModule_AddObject(module, "NativeEvent", (PyObject*)&NativeEventType) < 0) {
Py_DECREF(&NativeEventType);
Py_DECREF(module);
return NULL;
}
watchdog_module_add_attributes(module);
watchdog_module_init();
return module;
Expand Down