Skip to content

Commit

Permalink
dev2sh: group all devices with the same name into a single config
Browse files Browse the repository at this point in the history
  • Loading branch information
XPhyro committed Aug 4, 2024
1 parent fb82660 commit 25c0726
Showing 1 changed file with 82 additions and 57 deletions.
139 changes: 82 additions & 57 deletions src/py/udev/dev2sh
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@


import argparse
import asyncio as aio
import importlib.util
import multiprocessing as mp
import os
Expand Down Expand Up @@ -425,84 +426,98 @@ def update_callback_loop(config_path, dev_name, initial_wm_props):
last_wm_props = wm_props


def dev_read_loop_impl(args, dev_path, config_path):
def dev_read_loop_impl(args, dev_name: str, dev_paths: [str], config_path: str):
global callback_func

dev = ev.InputDevice(dev_path)

initial_wm_props = get_wm_props(default="")
dev_to_selector = get_config(config_path).dev_to_selector
if dev.name not in dev_to_selector:
if dev_name not in dev_to_selector:
return

callback_func = dev_to_selector[dev.name](*initial_wm_props)
callback_func = dev_to_selector[dev_name](*initial_wm_props)

thread = tp.Thread(
target=update_callback_loop,
args=(config_path, dev.name, initial_wm_props),
args=(config_path, dev_name, initial_wm_props),
)
thread.start()

with dev.grab_context():
ui = ev.UInput.from_device(dev, name=f"{PROGRAM_NAME} {dev.name}")
last_timestamps = {}
last_inverse_timestamps = {}
i = 0
for event in dev.read_loop():
button, event_type, value = event.code, event.value, event.type
if button == event_type == value == 0:
continue
global last_timestamps
global last_inverse_timestamps
global event_counter
last_timestamps = {}
last_inverse_timestamps = {}
event_counter = 0

def dev_read_loop_impl_impl_impl(ui, event):
global last_timestamps
global last_inverse_timestamps
global event_counter

button, event_type, value = event.code, event.value, event.type
if button == event_type == value == 0:
return

last_timestamp = last_timestamps.get((button, event_type, value), (0, 0))
last_inverse_timestamp = last_inverse_timestamps.get(
(button, event_type), (0, 0)
)

last_timestamp = last_timestamps.get((button, event_type, value), (0, 0))
last_inverse_timestamp = last_inverse_timestamps.get(
(button, event_type), (0, 0)
)
sec = event.sec + event.usec / 1e6
delta_sec = sec - (last_timestamp[0] + last_timestamp[1] / 1e6)
hold_sec = sec - (last_inverse_timestamp[0] + last_inverse_timestamp[1] / 1e6)
callback_data = CallbackData(
button, event_type, value, sec, delta_sec, hold_sec
)

sec = event.sec + event.usec / 1e6
delta_sec = sec - (last_timestamp[0] + last_timestamp[1] / 1e6)
hold_sec = sec - (
last_inverse_timestamp[0] + last_inverse_timestamp[1] / 1e6
)
callback_data = CallbackData(
button, event_type, value, sec, delta_sec, hold_sec
if args.verbose:
print(f"{event_counter}: received event {callback_data}")

if callback_func is not None:
grab = callback_func(callback_data)

if grab:
if API._print_buffer or args.verbose:
if not args.verbose:
print(f"{event_counter}: received event {callback_data}")
print("caught event")
else:
if args.verbose:
print("passed event through")
ui.write_event(event)
ui.syn()

while API._print_buffer:
string, file = API._print_buffer.pop(0)
print(string, file=file)

last_timestamps[(button, event_type, value)] = (event.sec, event.usec)
if event_type in [CallbackData.EVENT_TYPE_DOWN, CallbackData.EVENT_TYPE_UP]:
last_inverse_timestamps[(button, int(not event_type))] = (
event.sec,
event.usec,
)

if args.verbose:
print(f"{i}: received event {callback_data}")
event_counter += 1

if callback_func is not None:
grab = callback_func(callback_data)

if grab:
if API._print_buffer or args.verbose:
if not args.verbose:
print(f"{i}: received event {callback_data}")
print("caught event")
else:
if args.verbose:
print("passed event through")
ui.write_event(event)
ui.syn()

while API._print_buffer:
string, file = API._print_buffer.pop(0)
print(string, file=file)
async def dev_read_loop_impl_impl(dev):
ui = ev.UInput.from_device(dev, name=f"{PROGRAM_NAME} {dev.name}")
with dev.grab_context():
async for event in dev.async_read_loop():
dev_read_loop_impl_impl_impl(ui, event)

last_timestamps[(button, event_type, value)] = (event.sec, event.usec)
if event_type in [CallbackData.EVENT_TYPE_DOWN, CallbackData.EVENT_TYPE_UP]:
last_inverse_timestamps[(button, int(not event_type))] = (
event.sec,
event.usec,
)
for dev_path in dev_paths:
dev = ev.InputDevice(dev_path)
aio.ensure_future(dev_read_loop_impl_impl(dev))

i += 1
aio.get_event_loop().run_forever()


def dev_read_loop(args, dev_path, config_path):
os.environ["DEV2SH_DEVICE_PATH"] = dev_path
def dev_read_loop(args, dev_name: str, dev_paths: [str], config_path: str):
os.environ["DEV2SH_DEVICE_PATHS"] = "\n".join(dev_paths)
while True:
try:
dev_read_loop_impl(args, dev_path, config_path)
dev_read_loop_impl(args, dev_name, dev_paths, config_path)
except OSError as e:
if e.errno != 19:
raise e
Expand All @@ -514,11 +529,21 @@ def main() -> int:

sys.path.append(os.path.dirname(args.config_path))

devs = ev.list_devices()

grouped_devs = {}
for dev in devs:
dev = ev.InputDevice(dev)
if dev.name in grouped_devs:
grouped_devs[dev.name].append(dev.path)
else:
grouped_devs[dev.name] = [dev.path]

processes = []
for dev_path in ev.list_devices():
for dev_name, dev_paths in grouped_devs.items():
process = mp.Process(
target=dev_read_loop,
args=(args, dev_path, args.config_path),
args=(args, dev_name, dev_paths, args.config_path),
)
processes.append(process)
process.start()
Expand Down

0 comments on commit 25c0726

Please sign in to comment.