Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add optional event debouncing for auto-restart #940

Merged
merged 12 commits into from
Jan 29, 2023
4 changes: 2 additions & 2 deletions changelog.rst
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ Changelog

2023-xx-xx • `full history <https://github.com/gorakhargosh/watchdog/compare/v2.2.1...HEAD>`__

-
- Thanks to our beloved contributors: @BoboTiG
- [watchmedo] Add optional event debouncing for ``auto-restart``, only restarting once if many events happen in quick succession (`#940 <https://github.com/gorakhargosh/watchdog/pull/940>`__)
- Thanks to our beloved contributors: @BoboTiG, @taleinat

2.2.1
~~~~~
Expand Down
112 changes: 81 additions & 31 deletions src/watchdog/tricks/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,12 @@
import os
import signal
import subprocess
import threading
import time

from watchdog.events import PatternMatchingEventHandler
from watchdog.utils import echo
from watchdog.utils.event_debouncer import EventDebouncer
from watchdog.utils.process_watcher import ProcessWatcher

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -177,60 +179,108 @@ class AutoRestartTrick(Trick):

def __init__(self, command, patterns=None, ignore_patterns=None,
ignore_directories=False, stop_signal=signal.SIGINT,
kill_after=10):
kill_after=10, debounce_interval_seconds=0):
if kill_after < 0:
BoboTiG marked this conversation as resolved.
Show resolved Hide resolved
raise ValueError("kill_after must be non-negative.")
if debounce_interval_seconds < 0:
raise ValueError("debounce_interval_seconds must be non-negative.")

super().__init__(
patterns=patterns, ignore_patterns=ignore_patterns,
ignore_directories=ignore_directories)

self.command = command
self.stop_signal = stop_signal
self.kill_after = kill_after
self.debounce_interval_seconds = debounce_interval_seconds

self.process = None
self.process_watcher = None
self.event_debouncer = None
self.restart_count = 0

self._is_process_stopping = False
self._is_trick_stopping = False
self._stopping_lock = threading.RLock()

def start(self):
# windows doesn't have setsid
self.process = subprocess.Popen(self.command, preexec_fn=getattr(os, 'setsid', None))
self.process_watcher = ProcessWatcher(self.process, self._restart)
self.process_watcher.start()
self.event_debouncer = EventDebouncer(self.debounce_interval_seconds, lambda events: self._restart_process())
self.event_debouncer.start()
self._start_process()

def stop(self):
if self.process is None:
# Ensure the body of the function is only run once.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need to ensure that it runs only once? It seems like it should behave OK even if called multiple times.

with self._stopping_lock:
if self._is_trick_stopping:
return
self._is_trick_stopping = True

process_watcher = self.process_watcher
self.event_debouncer.stop()
self._stop_process()

# Don't leak threads: Wait for background threads to stop.
self.event_debouncer.join()
process_watcher.join()

def _start_process(self):
if self._is_trick_stopping:
return

if self.process_watcher is not None:
self.process_watcher.stop()
self.process_watcher = None
# windows doesn't have setsid
self.process = subprocess.Popen(self.command, preexec_fn=getattr(os, 'setsid', None))
self.process_watcher = ProcessWatcher(self.process, self._restart_process)
self.process_watcher.start()

def kill_process(stop_signal):
if hasattr(os, 'getpgid') and hasattr(os, 'killpg'):
os.killpg(os.getpgid(self.process.pid), stop_signal)
else:
os.kill(self.process.pid, self.stop_signal)
def _stop_process(self):
# Ensure the body of the function is not run in parallel in different threads.
with self._stopping_lock:
if self._is_process_stopping:
return
self._is_process_stopping = True

try:
kill_process(self.stop_signal)
except OSError:
# Process is already gone
pass
else:
kill_time = time.time() + self.kill_after
while time.time() < kill_time:
if self.process.poll() is not None:
break
time.sleep(0.25)
else:
if self.process_watcher is not None:
self.process_watcher.stop()
self.process_watcher = None

if self.process is not None:
try:
kill_process(9)
kill_process(self.process.pid, self.stop_signal)
except OSError:
# Process is already gone
pass
self.process = None
else:
kill_time = time.time() + self.kill_after
while time.time() < kill_time:
if self.process.poll() is not None:
break
time.sleep(0.25)
else:
try:
kill_process(self.process.pid, 9)
except OSError:
# Process is already gone
pass
self.process = None
finally:
self._is_process_stopping = False

@echo_events
def on_any_event(self, event):
self._restart()
self.event_debouncer.handle_event(event)

def _restart_process(self):
if self._is_trick_stopping:
return
self._stop_process()
self._start_process()
self.restart_count += 1


def _restart(self):
self.stop()
self.start()
if hasattr(os, 'getpgid') and hasattr(os, 'killpg'):
def kill_process(pid, stop_signal):
os.killpg(os.getpgid(pid), stop_signal)
else:
def kill_process(pid, stop_signal):
os.kill(pid, stop_signal)
47 changes: 47 additions & 0 deletions src/watchdog/utils/event_debouncer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
import logging
import queue

from watchdog.utils import BaseThread


logger = logging.getLogger(__name__)


class EventDebouncer(BaseThread):
"""Background thread for debouncing event handling.

When an event is received, wait until the configured debounce interval
passes before calling the callback. If additional events are received
before the interval passes, reset the timer and keep waiting. When the
debouncing interval passes, the callback will be called with a list of
events in the order in which they were received.
"""
def __init__(self, debounce_interval_seconds, events_callback):
super().__init__()
self.debounce_interval_seconds = debounce_interval_seconds
self.events_callback = events_callback

self._event_queue = queue.Queue()

def handle_event(self, event):
self._event_queue.put(event)

def run(self):
while self.should_keep_running():
events = []

# Get first event.
try:
events.append(self._event_queue.get(timeout=0.2))
except queue.Empty:
continue

if self.debounce_interval_seconds:
# Collect additional events until the debounce interval passes.
while self.should_keep_running():
try:
events.append(self._event_queue.get(timeout=self.debounce_interval_seconds))
except queue.Empty:
break

self.events_callback(events)
Copy link
Contributor

@oprypin oprypin Jan 18, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here's a different implementation, like I did it in MkDocs.

It has the benefit of being instantly stoppable, rather than having a 0 - 0.2 sec delay before being able to stop.

It also ensures to instantly stop firing events when it's stopped, maybe eliminating the need for

The more involved start and stop mechanisms in AutoRestartTrick are needed to avoid restarting the debouncer when restart the process

Suggested change
import logging
import queue
from watchdog.utils import BaseThread
logger = logging.getLogger(__name__)
class EventDebouncer(BaseThread):
"""Background thread for debouncing event handling.
When an event is received, wait until the configured debounce interval
passes before calling the callback. If additional events are received
before the interval passes, reset the timer and keep waiting. When the
debouncing interval passes, the callback will be called with a list of
events in the order in which they were received.
"""
def __init__(self, debounce_interval_seconds, events_callback):
super().__init__()
self.debounce_interval_seconds = debounce_interval_seconds
self.events_callback = events_callback
self._event_queue = queue.Queue()
def handle_event(self, event):
self._event_queue.put(event)
def run(self):
while self.should_keep_running():
events = []
# Get first event.
try:
events.append(self._event_queue.get(timeout=0.2))
except queue.Empty:
continue
if self.debounce_interval_seconds:
# Collect additional events until the debounce interval passes.
while self.should_keep_running():
try:
events.append(self._event_queue.get(timeout=self.debounce_interval_seconds))
except queue.Empty:
break
self.events_callback(events)
import logging
import threading
logger = logging.getLogger(__name__)
class EventDebouncer(threading.Thread):
"""Background thread for debouncing event handling.
When an event is received, wait until the configured debounce interval
passes before calling the callback. If additional events are received
before the interval passes, reset the timer and keep waiting. When the
debouncing interval passes, the callback will be called with a list of
events in the order in which they were received.
"""
def __init__(self, debounce_interval_seconds, events_callback):
super().__init__()
self.debounce_interval_seconds = debounce_interval_seconds
self.events_callback = events_callback
self._cond = threading.Condition()
self._shutdown = False
self._events = []
def handle_event(self, event):
with self._cond:
self._events.append(event)
self._cond.notify_all()
def run(self):
while True:
with self._cond:
self._cond.wait_for(lambda: self._events or self._shutdown)
if self._shutdown:
break
if self.debounce_interval_seconds:
while self._cond.wait(timeout=self.debounce_interval_seconds):
# Wait for events to stop happening.
pass
events = list(self._events)
self._events.clear()
self.events_callback(events)
def stop(self):
with self._cond:
self._shutdown = True
self._cond.notify_all()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for this @oprypin!

Using a condition like this is indeed better for instant stopping, I'll work that into the implementation.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@taleinat Well you can just "accept suggestion", I think that will also end up simpler than, for example, keeping it a queue. It's a fully functioning replacement, just in 1 of the new tests will need to add a microscopic sleep to make it pass.

Copy link
Contributor Author

@taleinat taleinat Jan 23, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, and your work to make it such a drop-in replacement was helpful and is appreciated!

This version however does not handle well the case where a shutdown signal is given during the debouncing interval. Also, the use of Condition.wait_for() is more complex than necessary.

So instead I've integrated the approach while addressing these two concerns.

11 changes: 9 additions & 2 deletions src/watchdog/watchmedo.py
Original file line number Diff line number Diff line change
Expand Up @@ -585,7 +585,13 @@ def shell_command(args):
default=10.0,
type=float,
help='When stopping, kill the subprocess after the specified timeout '
'in seconds (default 10.0).')])
'in seconds (default 10.0).'),
argument('--debounce-interval',
dest='debounce_interval',
default=0.0,
type=float,
help='After a file change, Wait until the specified interval (in '
'seconds) passes with no file changes, and only then restart.')])
def auto_restart(args):
"""
Command to start a long-running subprocess and restart it on matched events.
Expand Down Expand Up @@ -633,7 +639,8 @@ def handler_termination_signal(_signum, _frame):
ignore_patterns=ignore_patterns,
ignore_directories=args.ignore_directories,
stop_signal=stop_signal,
kill_after=args.kill_after)
kill_after=args.kill_after,
debounce_interval_seconds=args.debounce_interval)
handler.start()
observer = Observer(timeout=args.timeout)
try:
Expand Down
62 changes: 58 additions & 4 deletions tests/test_0_watchmedo.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,17 +106,68 @@ def test_shell_command_subprocess_termination_nowait(tmpdir):
assert not trick.is_process_running()


def test_auto_restart_on_file_change(tmpdir, capfd):
"""Simulate changing 3 files.

Expect 3 restarts.
"""
from watchdog.tricks import AutoRestartTrick
import sys
import time
script = make_dummy_script(tmpdir, n=2)
trick = AutoRestartTrick([sys.executable, script])
trick.start()
time.sleep(1)
trick.on_any_event("foo/bar.baz")
trick.on_any_event("foo/bar2.baz")
trick.on_any_event("foo/bar3.baz")
time.sleep(1)
trick.stop()
cap = capfd.readouterr()
assert cap.out.splitlines(keepends=False).count('+++++ 0') >= 2
assert trick.restart_count == 3


def test_auto_restart_on_file_change_debounce(tmpdir, capfd):
"""Simulate changing 3 files quickly and then another change later.

Expect 2 restarts due to debouncing.
"""
from watchdog.tricks import AutoRestartTrick
import sys
import time
script = make_dummy_script(tmpdir, n=2)
trick = AutoRestartTrick([sys.executable, script], debounce_interval_seconds=0.5)
trick.start()
time.sleep(1)
trick.on_any_event("foo/bar.baz")
trick.on_any_event("foo/bar2.baz")
time.sleep(0.1)
trick.on_any_event("foo/bar3.baz")
time.sleep(1)
trick.on_any_event("foo/bar.baz")
time.sleep(1)
trick.stop()
cap = capfd.readouterr()
assert cap.out.splitlines(keepends=False).count('+++++ 0') == 3
BoboTiG marked this conversation as resolved.
Show resolved Hide resolved
taleinat marked this conversation as resolved.
Show resolved Hide resolved


def test_auto_restart_subprocess_termination(tmpdir, capfd):
"""Run auto-restart with a script that terminates in about 2 seconds.

After 5 seconds, expect it to have been restarted at least once.
"""
from watchdog.tricks import AutoRestartTrick
import sys
import time
script = make_dummy_script(tmpdir, n=2)
a = AutoRestartTrick([sys.executable, script])
a.start()
trick = AutoRestartTrick([sys.executable, script])
trick.start()
time.sleep(5)
a.stop()
trick.stop()
cap = capfd.readouterr()
assert cap.out.splitlines(keepends=False).count('+++++ 0') > 1
assert trick.restart_count >= 1


def test_auto_restart_arg_parsing_basic():
Expand All @@ -129,11 +180,14 @@ def test_auto_restart_arg_parsing_basic():


def test_auto_restart_arg_parsing():
args = watchmedo.cli.parse_args(["auto-restart", "-d", ".", "--kill-after", "12.5", "cmd"])
args = watchmedo.cli.parse_args(
["auto-restart", "-d", ".", "--kill-after", "12.5", "--debounce-interval=0.2", "cmd"]
)
assert args.func is watchmedo.auto_restart
assert args.command == "cmd"
assert args.directories == ["."]
assert args.kill_after == pytest.approx(12.5)
assert args.debounce_interval == pytest.approx(0.2)


def test_shell_command_arg_parsing():
Expand Down