Skip to content

Commit

Permalink
Finish third iteration of suppression code
Browse files Browse the repository at this point in the history
The first version that could suppress events was introduced by @xoviat
via #38. It was a separate module, using nested dictionaries to simulate
a state machine that had to be updated on every hotkey registered. It
also handled the entire hotkey process, including multi-steps and
timeouts. It was very complex, and had some bugs that we weren't sure
were fixable with that architecture.

The second version was developed in the `suppress` branch for over an
year. It used an explicit state machine for each key, divided by several
attributes (is it a modifier, was it processed by a key, etc). It
processed only single-step hotkeys. Multi-step hotkeys were simulated by
adding and removing hotkeys as necessary, and keeping track separately of
which events were suppressed from previous steps. This version worked
better, with fewer bugs and somewhat simpler, but the execution was
still hard to understand and some bugs were creeping in.

This commit introduces the third attempt at making a resiliant and
simple to understand key event suppression system. It borrows the
concept of multi-step hotkeys as being sequences of single-step hotkeys,
but revamps everything else.

The meat of the code is a set of global variables and a decision tree.

The decision tree is executed for each incoming event, and uses the
global state to decide if a given event should be:
1) Suppressed. It's part of a blocking hotkey and at least one callback
returned False. The event is blocked with no hope of recovery. For key
down events, the corresponding key up event will also be suppressed.
2) Delayed. This event is part of a *subset* of one or more blocking
hotkeys. We block it now, but may decide to resend it later.
3) Allowed. The event is passed along normally. If there were any
pending events, they are resent before allowing this one.

The global variables keep track of which keys the OS reports as
currently pressed (i.e. physically pressed keys), which keys were
allowed to be passed (i.e. logically pressed keys, from the point of
view of other applications), which key presses have been tentatively
suppressed, and which key presses were definitely suppressed by hotkeys.

It's still complex, but much easier to understand than the two previous
versions, doesn't have the same bugs (I hope it has no bugs at all), and
is much easier to debug and amend.

Third time is the charm, wish me luck.
  • Loading branch information
boppreh committed Apr 2, 2018
1 parent e53822f commit d3439be
Show file tree
Hide file tree
Showing 2 changed files with 155 additions and 107 deletions.
203 changes: 128 additions & 75 deletions keyboard/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,98 +160,153 @@ def init(self):
self.nonblocking_hotkeys = _collections.defaultdict(list)
self.is_replaying = False

# Supporting hotkey suppression is harder than it looks. See
# https://github.com/boppreh/keyboard/issues/22
self.modifier_states = {} # "alt" -> "allowed"

# TODO: this is a hack to populate the _modifier_scan_codes set.
is_modifier(0)

def pre_process_event(self, event):
for key_hook in self.nonblocking_key_hooks[event.scan_code]:
key_hook(event)

with _state_lock:
hotkey = frozenset(_physically_pressed_events)
for callback in self.nonblocking_hotkeys[hotkey]:
for callback in self.nonblocking_hotkeys[_active_keys]:
callback(event)

return event.scan_code or (event.name and event.name != 'unknown')

def update_active(self):
global _active_keys
global _active_modifiers
_active_keys = frozenset(_physically_pressed_events)
_active_modifiers = frozenset(key for key in _physically_pressed_events if is_modifier(key))


def decide_event(self, event):
"""
This function is called for every OS keyboard event and decides if the
event should be blocked or not, and passes a copy of the event to
other, non-blocking, listeners.
Decides if a given event should be:
1) Suppressed. It's part of a blocking hotkey and at least one callback
returned False. The event is blocked with no hope of recovery. For key
down events, the corresponding key up event will also be suppressed.
2) Delayed. This event is part of a *subset* of one or more blocking
hotkeys. We block it now, but may decide to resend it later.
3) Allowed. The event is passed along normally. If there were any
pending events, they are resent before allowing this one.
Note that suppression treats modifiers differently. `b+a` will trigger
the hotkey "a+b", but `a+shift` will not trigger "shift+a". This is done
to avoid delaying every press of `a`, which is very jarring, just
because `modifiers+a` is registered.
"""
# Pass through all fake key events, don't even report to other handlers.
if self.is_replaying:
# TODO: update _logically_pressed_events
return True

event_type = event.event_type
scan_code = event.scan_code
test_all_callbacks = lambda callbacks: all([c(event) for c in callbacks])

if not test_all_callbacks(self.blocking_hooks) or not test_all_callbacks(self.blocking_key_hooks[event.scan_code]):
def suppress():
_suppressed_presses.add(scan_code)
_suppressed_presses.update(_pending_presses)
del _pending_presses[:]
return False

# Queue for handlers that won't block the event.
self.queue.put(event)
def delay():
_pending_presses.append(scan_code)
return False

if event_type == KEY_DOWN:
with _state_lock:
_physically_pressed_events[scan_code] = event
self.update_active()
def allow():
for modifier in _active_modifiers & _suppressed_presses:
press(modifier)
_suppressed_presses.remove(modifier)
for pending_scan_code in _pending_presses:
press(pending_scan_code)
del _pending_presses[:]
return True

if is_modifier(scan_code):
if any(_active_modifiers.issubset(modifiers) for modifiers in self.blocking_hotkeys):
suppress()
# TODO: decide what to do when "shift+a" is blocked and the user presses
# "shift+b+a" (i.e. didn't release "b" quickly enough).
if event.event_type == KEY_DOWN:
if self.blocking_hotkeys[_active_modifiers][_active_keys]:
if self.test_all_callbacks(event, self.blocking_hotkeys[_active_modifiers][_active_keys]):
#print('JUDGEMENT: Accepted press by callback return')
return allow()
else:
open_the_flood_gates()
elif self.blocking_hotkeys[_active_modifiers][_active_keys]:
if test_all_callbacks(self.blocking_hotkeys[_active_modifiers][_active_keys]):
open_the_flood_gates()
#print('JUDGEMENT: Suppressed press by hotkey')
return suppress()
elif is_modifier(scan_code):
if any(_active_modifiers.issubset(modifiers) and sum(subdict.values(), []) for modifiers, subdict in self.blocking_hotkeys.items()):
#print('JUDGEMENT: Pending modifier press')
return delay()
else:
suppress()
elif any(_active_keys.issubset(keys) for keys in self.blocking_hotkeys[_active_modifiers]):
suppress()
#print('JUDGEMENT: Accepted modifier press')
return allow()
elif any(_active_keys.issubset(keys) and callbacks for keys, callbacks in self.blocking_hotkeys[_active_modifiers].items()):
#print('JUDGEMENT: Pending press by hotkey subset')
return delay()
else:
open_the_flood_gates()

elif event_type == KEY_UP:
with _state_lock:
_physically_pressed_events.pop(scan_code, None)
self.update_active()
#print('JUDGEMENT: Accepted press as last option')
return allow()

elif event.event_type == KEY_UP:
# Keep track of what key combination was just released.
releasing_modifiers = _active_modifiers | {scan_code} if is_modifier(scan_code) else _active_modifiers
releasing_keys = _active_keys | {scan_code}

try:
if self.blocking_hotkeys[_active_modifiers][_active_keys]:
if test_all_callbacks(self.blocking_hotkeys[releasing_modifiers][releasing_keys]):
open_the_flood_gates()
else:
suppress()
elif scan_code in _suppressed_presses:
suppress()
# TODO: remove assert after testing
assert scan_code not in _pending_presses
if self.blocking_hotkeys[releasing_modifiers][releasing_keys]:
if self.test_all_callbacks(event, self.blocking_hotkeys[releasing_modifiers][releasing_keys]):
#print('JUDGEMENT: Accepted release by callback return')
return allow()
else:
open_the_flood_gates()
finally:
#print('JUDGEMENT: Suppressed release by hotkey')
#assert scan_code not in _pending_presses
_suppressed_presses.discard(scan_code)
return suppress()
elif scan_code in _suppressed_presses:
#assert scan_code not in _pending_presses
#print('JUDGEMENT: Suppressed release associated with suppressed press')
_suppressed_presses.discard(scan_code)
_pending_presses.discard(scan_code)
return suppress()
else:
#print('JUDGEMENT: Accepted release as last option')
return allow()


def test_all_callbacks(self, event, callbacks):
"""
Returns True if all callbacks returned True when called with the given
event.
"""
# Make sure we always process all callbacks.
results = [callback(event) for callback in callbacks]
return all(results)

def process_event(self, event):
"""
This function is called for every OS keyboard event. It's responsible
for calling hotkeys/hooks, deciding if the event should be allowed or
blocked (by return value), and bookkeeping of active events.
"""
if self.is_replaying:
# Pass through all fake key events and don't report them to other
# handlers.
accept = True
else:
if not self.test_all_callbacks(event, self.blocking_hooks) or not self.test_all_callbacks(event, self.blocking_key_hooks[event.scan_code]):
return False

with _state_lock:
if event.event_type == KEY_DOWN:
_physically_pressed_events[event.scan_code] = event
else:
_physically_pressed_events.pop(event.scan_code, None)
global _active_keys
global _active_modifiers
_active_keys = frozenset(_physically_pressed_events)
_active_modifiers = frozenset(key for key in _physically_pressed_events if is_modifier(key))

# Queue for handlers that won't block the event.
self.queue.put(event)

accept = self.decide_event(event)

if accept:
with _state_lock:
if event.event_type == KEY_DOWN:
_logically_pressed_events[event.scan_code] = event
else:
_logically_pressed_events.pop(event.scan_code, None)


return accept

def listen(self):
_os_keyboard.listen(self.decide_event)
_os_keyboard.listen(self.process_event)

_listener = _KeyboardListener()

Expand Down Expand Up @@ -361,7 +416,8 @@ def release(hotkey):

def is_pressed(hotkey):
"""
Returns True if the key is pressed.
Returns True if the key is physically pressed. Accepts scan codes, key
names, or single-step hotkeys (i.e. no commas).
is_pressed(57) #-> True
is_pressed('space') #-> True
Expand All @@ -376,13 +432,10 @@ def is_pressed(hotkey):

steps = parse_hotkey(hotkey)
if len(steps) > 1:
raise ValueError("Impossible to check if multi-step hotkeys are pressed (`a+b` is ok, `a, b` isn't).")
raise ValueError("Cannot check if multi-step hotkeys are pressed (`a+b` is ok, `a, b` isn't).")

# Convert _physically_pressed_events into a set
with _state_lock:
pressed_scan_codes = set(_physically_pressed_events)
for scan_codes in steps[0]:
if not any(scan_code in pressed_scan_codes for scan_code in scan_codes):
if not any(scan_code in _physically_pressed_events for scan_code in scan_codes):
return False
return True

Expand Down Expand Up @@ -528,7 +581,7 @@ def combine_step(step):
# event delays, we list all possible combinations of scan codes for these
# keys. Hotkeys are usually small, and there are not many combinations, so
# this is not as insane as it sounds.
return (tuple(sorted(scan_codes)) for scan_codes in _itertools.product(*step))
return (frozenset(scan_codes) for scan_codes in _itertools.product(*step))

return tuple(tuple(combine_step(step)) for step in parse_hotkey(hotkey))

Expand All @@ -546,8 +599,8 @@ def _add_hotkey_step(handler, combinations, suppress):

# Register the scan codes of every possible combination of
# modfiier + main key.
for container in containers:
for scan_codes in combinations:
for scan_codes in combinations:
for container in containers:
container[scan_codes].append(handler)

def remove():
Expand Down Expand Up @@ -659,7 +712,7 @@ def set_index(new_index):
state.remove_catch_misses()
# Must be `suppress=True` to ensure `send` has priority.
state.remove_catch_misses = hook(catch_misses, suppress=True)

if new_index == len(steps) - 1:
def handler(event):
if event.event_type == KEY_UP:
Expand Down Expand Up @@ -732,11 +785,11 @@ def remap_hotkey(src, dst, suppress=True, trigger_on_release=False):
remap('alt+w', 'ctrl+up')
"""
def handler():
active_modifiers = sorted(modifier for modifier, state in _listener.modifier_states.items() if state == 'allowed')
for modifier in active_modifiers:
modifiers = sorted(key for key in _logically_pressed_events if is_modifier(key))
for modifier in modifiers:
release(modifier)
send(dst)
for modifier in reversed(active_modifiers):
for modifier in reversed(modifiers):
press(modifier)
return False
return add_hotkey(src, handler, suppress=suppress, trigger_on_release=trigger_on_release)
Expand Down Expand Up @@ -816,7 +869,7 @@ def write(text, delay=0, restore_state_after=True, exact=None):
except (KeyError, ValueError):
_os_keyboard.type_unicode(letter)
continue

for modifier in modifiers:
press(modifier)

Expand Down
Loading

0 comments on commit d3439be

Please sign in to comment.