Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Eliminate unnecessary delay with inotify events #477

Merged
merged 1 commit into from
Jan 12, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 33 additions & 17 deletions src/watchdog/observers/inotify_buffer.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,33 @@ def close(self):
self.stop()
self.join()

def _group_events(self, event_list):
"""Group any matching move events"""
grouped = []
for inotify_event in event_list:
logger.debug("in-event %s", inotify_event)
def matching_from_event(event):
return (not isinstance(event, tuple) and event.is_moved_from
and event.cookie == inotify_event.cookie)

if inotify_event.is_moved_to:
# Check if move_from is already in the buffer
for index, event in enumerate(grouped):
if matching_from_event(event):
grouped[index] = (event, inotify_event)
break
else:
# Check if move_from is in delayqueue already
from_event = self._queue.remove(matching_from_event)
if from_event is not None:
grouped.append((from_event, inotify_event))
else:
logger.debug("could not find matching move_from event")
grouped.append(inotify_event)
else:
grouped.append(inotify_event)
return grouped

def run(self):
"""Read event from `inotify` and add them to `queue`. When reading a
IN_MOVE_TO event, remove the previous added matching IN_MOVE_FROM event
Expand All @@ -58,24 +85,13 @@ def run(self):
deleted_self = False
while self.should_keep_running() and not deleted_self:
inotify_events = self._inotify.read_events()
for inotify_event in inotify_events:
logger.debug("in-event %s", inotify_event)
if inotify_event.is_moved_to:

def matching_from_event(event):
return (not isinstance(event, tuple) and event.is_moved_from
and event.cookie == inotify_event.cookie)

from_event = self._queue.remove(matching_from_event)
if from_event is not None:
self._queue.put((from_event, inotify_event))
else:
logger.debug("could not find matching move_from event")
self._queue.put(inotify_event)
else:
self._queue.put(inotify_event)
grouped_events = self._group_events(inotify_events)
for inotify_event in grouped_events:
# Only add delay for unmatched move_from events
delay = not isinstance(inotify_event, tuple) and inotify_event.is_moved_from
self._queue.put(inotify_event, delay)

if inotify_event.is_delete_self and \
if not isinstance(inotify_event, tuple) and inotify_event.is_delete_self and \
inotify_event.src_path == self._inotify.path:
# Deleted the watched directory, stop watching for events
deleted_self = True
31 changes: 13 additions & 18 deletions src/watchdog/utils/delayed_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,16 @@
class DelayedQueue(object):

def __init__(self, delay):
self.delay = delay
self.delay_sec = delay
self._lock = threading.Lock()
self._not_empty = threading.Condition(self._lock)
self._queue = deque()
self._closed = False

def put(self, element):
def put(self, element, delay=False):
"""Add element to queue."""
self._lock.acquire()
self._queue.append((element, time.time()))
self._queue.append((element, time.time(), delay))
self._not_empty.notify()
self._lock.release()

Expand All @@ -56,33 +56,28 @@ def get(self):
if self._closed:
self._not_empty.release()
return None
head, insert_time = self._queue[0]
head, insert_time, delay = self._queue[0]
self._not_empty.release()

# wait for delay
time_left = insert_time + self.delay - time.time()
while time_left > 0:
time.sleep(time_left)
time_left = insert_time + self.delay - time.time()
# wait for delay if required
if delay:
time_left = insert_time + self.delay_sec - time.time()
while time_left > 0:
time.sleep(time_left)
time_left = insert_time + self.delay_sec - time.time()

# return element if it's still in the queue
self._lock.acquire()
try:
with self._lock:
if len(self._queue) > 0 and self._queue[0][0] is head:
self._queue.popleft()
return head
finally:
self._lock.release()

def remove(self, predicate):
"""Remove and return the first items for which predicate is True,
ignoring delay."""
try:
self._lock.acquire()
for i, (elem, t) in enumerate(self._queue):
with self._lock:
for i, (elem, t, delay) in enumerate(self._queue):
if predicate(elem):
del self._queue[i]
return elem
finally:
self._lock.release()
return None
15 changes: 13 additions & 2 deletions tests/test_delayed_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,22 @@
from watchdog.utils.delayed_queue import DelayedQueue


def test_get():
def test_delayed_get():
q = DelayedQueue(2)
q.put("")
q.put("", True)
inserted = time()
q.get()
elapsed = time() - inserted
# 2.10 instead of 2.05 for slow macOS slaves on Travis
assert 2.10 > elapsed > 1.99

def test_nondelayed_get():
q = DelayedQueue(2)
q.put("", False)
inserted = time()
q.get()
elapsed = time() - inserted
# Far less than 1 second
assert elapsed < 1