Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue #22: Support for low-level key suppression #38

Merged
merged 15 commits into from Jan 19, 2017
Merged
34 changes: 32 additions & 2 deletions keyboard/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@
from threading import Lock as _Lock
from threading import Thread as _Thread
from ._keyboard_event import KeyboardEvent
from ._suppress import KeyTable

try:
_basestring = basestring
Expand Down Expand Up @@ -111,6 +112,9 @@ def pre_process_event(self, event):
else:
_pressed_events[event.scan_code] = event

if not _pressed_events:
_os_keyboard.allowed_keys.complete_sequence()

return True

def listen(self):
Expand Down Expand Up @@ -198,7 +202,16 @@ def call_later(fn, args=(), delay=0.001):
"""
_Thread(target=lambda: _time.sleep(delay) or fn(*args)).start()

def _suppress_hotkey(steps, timeout):
"""
Adds a hotkey to the list of keys to be suppressed.
To unsuppress all hotkeys use `clear_all_hotkeys()`.
"""
_os_keyboard.allowed_keys.suppress_sequence(steps, timeout)


_hotkeys = {}
_hotkeys_suppressed = {}
def clear_all_hotkeys():
"""
Removes all hotkey handlers. Note some functions such as 'wait' and 'record'
Expand All @@ -210,11 +223,13 @@ def clear_all_hotkeys():
for handler in _hotkeys.values():
unhook(handler)
_hotkeys.clear()
_os_keyboard.allowed_keys.suppress_none()
_hotkeys_suppressed.clear()

# Alias.
remove_all_hotkeys = clear_all_hotkeys

def add_hotkey(hotkey, callback, args=(), blocking=True, timeout=1):
def add_hotkey(hotkey, callback, args=(), blocking=True, suppress=False, timeout=1):
"""
Invokes a callback every time a key combination is pressed. The hotkey must
be in the format "ctrl+shift+a, s". This would trigger when the user holds
Expand Down Expand Up @@ -274,6 +289,10 @@ def handler(event):
return blocking

_hotkeys[hotkey] = handler
if suppress:
_suppress_hotkey(steps, timeout)
_hotkeys_suppressed[hotkey] = timeout

return hook(handler)

# Alias.
Expand Down Expand Up @@ -365,13 +384,22 @@ def _remove_named_hook(name_or_handler, names):
unhook(names[name])
del names[name]

return name

def remove_hotkey(hotkey_or_handler):
"""
Removes a previously registered hotkey. Accepts either the hotkey used
during registration (exact string) or the event handler returned by the
`add_hotkey` or `hook_key` functions.
"""
_remove_named_hook(hotkey_or_handler, _hotkeys)
name = _remove_named_hook(hotkey_or_handler, _hotkeys)
if name in _hotkeys_suppressed:
del _hotkeys_suppressed[name]
# because the table structure is optimized for runtime, we must recompile
_os_keyboard.allowed_keys.suppress_none()
for current_name in _hotkeys_suppressed:
_suppress_hotkey(canonicalize(current_name), _hotkeys_suppressed[current_name])


# Alias.
unhook_key = remove_hotkey
Expand Down Expand Up @@ -673,3 +701,5 @@ def get_typed_strings(events, allow_backspace=True):
yield string
string = ''
yield string

_os_keyboard.allowed_keys = KeyTable(press, release)
44 changes: 42 additions & 2 deletions keyboard/_keyboard_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import keyboard

from ._keyboard_event import KeyboardEvent, canonical_names, KEY_DOWN, KEY_UP
from ._suppress import KeyTable

# Fake events with fake scan codes for a totally deterministic test.
all_names = set(canonical_names.values()) | set(string.ascii_lowercase) | set(string.ascii_uppercase) | {'shift'}
Expand All @@ -24,6 +25,7 @@ def __init__(self):
self.listening = False
self.append = None
self.queue = None
self.allowed_keys = KeyTable(keyboard.press, keyboard.release)

def listen(self, queue):
self.listening = True
Expand Down Expand Up @@ -75,16 +77,21 @@ def tearDown(self):
self.wait_for_events_queue()

def press(self, name, scan_code=None):
is_allowed = keyboard._os_keyboard.allowed_keys.is_allowed(name, False)
keyboard._os_keyboard.queue.put(FakeEvent(KEY_DOWN, name, scan_code))
self.wait_for_events_queue()

return is_allowed

def release(self, name, scan_code=None):
is_allowed = keyboard._os_keyboard.allowed_keys.is_allowed(name, True)
keyboard._os_keyboard.queue.put(FakeEvent(KEY_UP, name, scan_code))
self.wait_for_events_queue()

return is_allowed

def click(self, name, scan_code=None):
self.press(name, scan_code)
self.release(name, scan_code)
return self.press(name, scan_code) and self.release(name, scan_code)

def flush_events(self):
self.wait_for_events_queue()
Expand Down Expand Up @@ -552,6 +559,39 @@ def trigger(): self.triggered = True
time.sleep(0.2)
self.assertTrue(self.triggered)

def test_suppression(self):
def dummy():
pass

keyboard.add_hotkey('a+b+c', dummy, suppress=True)
keyboard.add_hotkey('a+g+h', dummy, suppress=True, timeout=0.01)

for key in ['a', 'b', 'c']:
assert not self.press(key)
for key in ['a', 'b', 'c']:
assert not self.release(key)

assert self.click('d')

for key in ['a', 'b']:
assert not self.press(key)
for key in ['a', 'b']:
assert not self.release(key)

assert self.click('c')

for key in ['a', 'g']:
assert not self.press(key)
for key in ['a', 'g']:
assert not self.release(key)

time.sleep(0.03)
assert self.click('h')

keyboard.remove_hotkey('a+g+h')
keyboard.remove_hotkey('a+b+c')

assert self.click('a')

if __name__ == '__main__':
unittest.main()
1 change: 1 addition & 0 deletions keyboard/_nixkeyboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from collections import namedtuple
from ._keyboard_event import KeyboardEvent, KEY_DOWN, KEY_UP, normalize_name
from ._nixcommon import EV_KEY, aggregate_devices, ensure_root
from ._suppress import KeyTable

# TODO: start by reading current keyboard state, as to not missing any already pressed keys.
# See: http://stackoverflow.com/questions/3649874/how-to-get-keyboard-state-in-linux
Expand Down
175 changes: 175 additions & 0 deletions keyboard/_suppress.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
from threading import Lock, Thread
from timeit import default_timer as timer
from keyboard._keyboard_event import normalize_name


class KeyTable(object):
_keys = {}
_write = Lock() # Required to edit keys
_table = {}
_time = -1
_elapsed = 0 # Maximum time that has elapsed so far in the sequence
_read = Lock() # Required to edit table
_in_sequence = False
_keys_suppressed = [] # List of keys that have been suppressed so far in the sequence
_disable = False # Disables key suppression during replay to avoid infinite loop
SEQUENCE_END = 2 # Delimeter that signifies the end of the sequence

def __init__(self, press_key, release_key):
self.press_key = press_key
self.release_key = release_key

def is_allowed(self, key, is_up, advance=True):
"""
The goal of this function is to be very fast. This is accomplished
through the table structure, which ensures that we only need to
check whether `key is in self._table` and change what variable
is referenced by `self._table`.

Unfortunately, handling timeouts properly has added significantly to
the logic required, but the function should still be well within required
time limits.
"""
if self._disable:
return True

if key != self.SEQUENCE_END:
key = normalize_name(key.split(' ')[-1])

time = timer()
if self._time == -1:
elapsed = 0
else:
elapsed = time - self._time
if self._elapsed > elapsed:
elapsed = self._elapsed

if is_up:
if self._in_sequence:
if key != self.SEQUENCE_END:
self._keys_suppressed.append((key, is_up))
return False
else:
advance = False

in_sequence = key in self._table and elapsed < self._table[key][0]
in_keys = key in self._keys
suppress = in_sequence or in_keys
if advance:
self._read.acquire()
if in_sequence and self._table[key][2]:
self._keys_suppressed.clear()
if in_sequence and self._table[key][1]:
self._table = self._table[key][1]
if self._time != -1:
self._elapsed = elapsed
self._time = -1
elif in_keys and self._keys[key][1]:
self._table = self._keys[key][1]
if self._time != -1:
self._elapsed = elapsed
self._time = -1
self._replay_keys()
self._keys_suppressed.clear()
else:
self._table = self._keys
self._time = -1
self._elapsed = -1
self._replay_keys()
self._keys_suppressed.clear()
self._in_sequence = in_sequence
self._read.release()

if key != self.SEQUENCE_END and suppress:
self._keys_suppressed.append((key, is_up))

return not suppress

def complete_sequence(self):
if self.SEQUENCE_END in self._table:
self.is_allowed(self.SEQUENCE_END, False)
self._read.acquire()
self._time = timer()
self._read.release()
else:
self._read.acquire()
self._time = -1
self._elapsed = 0
self._table = self._keys
self._replay_keys()
self._keys_suppressed.clear()
self._read.release()

def _replay_keys(self):
self._disable = True
for key, is_up in self._keys_suppressed:
if is_up:
self.release_key(key)
else:
self.press_key(key)
self._disable = False

def _refresh(self):
self._read.acquire()
self._disable = False
self._table = self._keys
self._read.release()

def _acquire_table(self, sequence, table, timeout):
"""
Returns a flat (single level) dictionary
:param sequence:
:param table:
:return:
"""
el = sequence.pop(0)
if el not in table:
table[el] = (timeout, {}, False)
if table[el][0] < timeout:
table[el][0] = timeout

if sequence:
return self._acquire_table(sequence, table[el][1], timeout)
else:
return table

def suppress_sequence(self, sequence, timeout):
"""
Adds keys to the suppress_keys table
:param sequence: List of scan codes
:param timeout: Time allowed to elapse before resetting
"""

# the suppress_keys table is organized
# as a dict of dicts so that the critical
# path is only checking whether the
# scan code is 'in current_dict'
flat = []
for subsequence in sequence:
flat.extend(subsequence)
flat.append(self.SEQUENCE_END)

print(flat)
last_index = flat[-1]
self._write.acquire()
table = self._acquire_table(flat, self._keys, timeout)
table[last_index] = (table[last_index][0], table[last_index][1], True)
self._refresh()
self._write.release()

print(self._keys)

def suppress_none(self):
"""
Clears the suppress_keys table and disables
key suppression
:return:
"""
self._write.acquire()
self._keys = {}
self._refresh()
self._write.release()

self._read.acquire()
self._disable = True
self._read.release()
Loading