Skip to content

Commit

Permalink
kdeconnector module: replace pydbus with dbus-python (#2264)
Browse files Browse the repository at this point in the history
  • Loading branch information
valdur55 authored Oct 22, 2024
1 parent 7240e8e commit cf7ad84
Showing 1 changed file with 133 additions and 97 deletions.
230 changes: 133 additions & 97 deletions py3status/modules/kdeconnector.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,9 @@
color_good: Connected and battery charging
Requires:
pydbus: pythonic d-bus library
kdeconnect: adds communication between kde and your smartphone
dbus-python: Python bindings for dbus
PyGObject: Python bindings for GObject Introspectiom
Examples:
```
Expand Down Expand Up @@ -71,21 +72,27 @@
import sys
from threading import Thread

from dbus import Interface, SessionBus
from dbus.mainloop.glib import DBusGMainLoop
from gi.repository import GLib
from pydbus import SessionBus

STRING_NOT_INSTALLED = "{} not installed"

MOD_BATTERY = "battery"
MOD_NOTIFICATIONS = "notifications"
MOD_CONNECTIVITY_REPORT = "connectivity_report"

SERVICE_BUS = "org.kde.kdeconnect"
INTERFACE = SERVICE_BUS + ".device"
INTERFACE_DAEMON = SERVICE_BUS + ".daemon"
INTERFACE_BATTERY = INTERFACE + ".battery"
INTERFACE_NOTIFICATIONS = INTERFACE + ".notifications"
INTERFACE_CONN_REPORT = INTERFACE + ".connectivity_report"
INTERFACE_BATTERY = INTERFACE + "." + MOD_BATTERY
INTERFACE_NOTIFICATIONS = INTERFACE + "." + MOD_NOTIFICATIONS
INTERFACE_CONN_REPORT = INTERFACE + "." + MOD_CONNECTIVITY_REPORT
PATH = "/modules/kdeconnect"
DEVICE_PATH = PATH + "/devices"
BATTERY_SUBPATH = "/battery"
CONN_REPORT_SUBPATH = "/connectivity_report"
NOTIFICATIONS_SUBPATH = "/notifications"
BATTERY_SUBPATH = "/" + MOD_BATTERY
NOTIFICATIONS_SUBPATH = "/" + MOD_NOTIFICATIONS
CONN_REPORT_SUBPATH = "/" + MOD_CONNECTIVITY_REPORT
UNKNOWN = "Unknown"
UNKNOWN_DEVICE = "unknown device"
UNKNOWN_SYMBOL = "?"
Expand All @@ -108,6 +115,9 @@ class Py3status:
status_notif = " ✉"

def post_config_hook(self):
if not self.py3.check_commands("kdeconnect-cli"):
raise Exception(STRING_NOT_INSTALLED.format("kdeconnect"))

self._bat = None
self._con = None
self._dev = None
Expand All @@ -131,7 +141,7 @@ def post_config_hook(self):
# start last
self._kill = False
self._dbus_loop = DBusGMainLoop()
self._dbus = SessionBus()
self._dbus = SessionBus(mainloop=self._dbus_loop)
self._bus_initialized = self._init_dbus()
self._start_listener()

Expand All @@ -150,98 +160,131 @@ def _start_loop(self):
self._kill = True

def _start_listener(self):
self._signal_reachable_changed = self._dbus.con.signal_subscribe(
sender=None,
interface_name=INTERFACE,
member="reachableChanged",
object_path=None,
arg0=None,
flags=0,
callback=self._reachable_on_change,
self._signal_reachable_changed = self._dbus.add_signal_receiver(
dbus_interface=INTERFACE,
signal_name="reachableChanged",
handler_function=self._reachable_on_change,
path_keyword='path',
interface_keyword='iface',
member_keyword='member',
)

self._signal_battery = self._dbus.con.signal_subscribe(
sender=None,
interface_name=INTERFACE_BATTERY,
member=None,
object_path=None,
arg0=None,
flags=0,
callback=self._battery_on_change,
self._signal_battery = self._dbus.add_signal_receiver(
dbus_interface=INTERFACE_BATTERY,
signal_name=None,
handler_function=self._battery_on_change,
path_keyword='path',
interface_keyword='iface',
member_keyword='member',
)

if self._format_contains_notifications:
self._signal_notifications = self._dbus.con.signal_subscribe(
sender=None,
interface_name=INTERFACE_NOTIFICATIONS,
member=None,
object_path=None,
arg0=None,
flags=0,
callback=self._notifications_on_change,
self._signal_notifications = self._dbus.add_signal_receiver(
dbus_interface=INTERFACE_NOTIFICATIONS,
signal_name=None,
handler_function=self._notifications_on_change,
path_keyword='path',
interface_keyword='iface',
member_keyword='member',
)

if self._format_contains_connection_status:
self._signal_conn_report = self._dbus.con.signal_subscribe(
sender=None,
interface_name=INTERFACE_CONN_REPORT,
member=None,
object_path=None,
arg0=None,
flags=0,
callback=self._conn_report_on_change,
self._signal_conn_report = self._dbus.add_signal_receiver(
dbus_interface=INTERFACE_CONN_REPORT,
signal_name=None,
handler_function=self._conn_report_on_change,
path_keyword='path',
interface_keyword='iface',
member_keyword='member',
)

t = Thread(target=self._start_loop)
t.daemon = True
t.start()

def _notifications_on_change(
self, connection, owner, object_path, interface_name, event, new_value
):
if self._is_current_device(object_path):
def _notifications_on_change(self, *args, **kwargs):
if self._is_current_device(kwargs['path']):
self._update_notif_info()
self.py3.update()

def _reachable_on_change(
self, connection, owner, object_path, interface_name, event, new_value
):
if self._is_current_device(object_path):
def _reachable_on_change(self, *args, **kwargs):
if self._is_current_device(kwargs['path']):
# Update only when device is connected
if new_value[0]:
if args[0]:
self._update_battery_info()
self._update_notif_info()
self._update_conn_info()
self.py3.update()

def _battery_on_change(self, connection, owner, object_path, interface_name, event, new_value):
if self._is_current_device(object_path):
def _battery_on_change(self, *args, **kwargs):
event = kwargs.get("member")
if self._is_current_device(kwargs['path']):
if event == "refreshed":
if new_value[1] != -1:
self._set_battery_status(isCharging=new_value[0], charge=new_value[1])
if args[1] != -1:
self._set_battery_status(isCharging=args[0], charge=args[1])
elif event == "stateChanged":
self._set_battery_status(isCharging=new_value[0], charge=None)
self._set_battery_status(isCharging=args[0], charge=None)
elif event == "chargeChanged":
self._set_battery_status(isCharging=None, charge=new_value[0])
self._set_battery_status(isCharging=None, charge=args[0])
else:
self._update_battery_info()
self.py3.update()

def _conn_report_on_change(
self, connection, owner, object_path, interface_name, event, new_value
):
if self._is_current_device(object_path):
def _conn_report_on_change(self, *args, **kwargs):
if self._is_current_device(kwargs['path']):
event = kwargs.get("member")
if event == "refreshed":
if (
self._result["net_type"] != new_value[0]
or self._result["net_strength_raw"] != new_value[1]
self._result["net_type"] != args[0]
or self._result["net_strength_raw"] != args[1]
):
self._set_conn_status(net_type=new_value[0], net_strength=new_value[1])
self._set_conn_status(net_type=args[0], net_strength=args[1])
self.py3.update()
else:
self._update_conn_info()
self.py3.update()

# Get DBus method up to 2 arguments
def _dbus_method(self, method, mod=None, value1=None, value2=None):
path = DEVICE_PATH + f"/{self.device_id}"
iface = INTERFACE
if mod:
path = path + '/' + mod
iface = iface + '.' + mod
elif method in ["charge", "isCharging"]:
iface += "." + MOD_BATTERY
elif method == "activeNotifications":
iface += "." + MOD_NOTIFICATIONS

dbus_object = self._dbus.get_object(SERVICE_BUS, path)
if not value2:
answer = dbus_object.get_dbus_method(method, iface)(True)
else:
answer = dbus_object.get_dbus_method(method, iface)(value1, value2)
return answer

# Get DBus property
def _dbus_property(self, prop, mod=None, device_id=None):
if not device_id:
device_id = self.device_id
path = DEVICE_PATH + f"/{device_id}"
iface = INTERFACE
if mod:
path = path + '/' + mod
iface = iface + '.' + mod
dbus_object = self._dbus.get_object(SERVICE_BUS, path)
dbus_interface = Interface(dbus_object, 'org.freedesktop.DBus.Properties')
propertie = dbus_interface.Get(iface, prop)
return propertie

def _dbus_introspect_exists(self, mod=None):
path = DEVICE_PATH + f"/{self.device_id}"
if mod:
path += "/" + mod
dbus_object = self._dbus.get_object(SERVICE_BUS, path)
result = dbus_object.Introspect(dbus_interface="org.freedesktop.DBus.Introspectable")
return len(result) > 0

def _is_current_device(self, object_path):
return self.device_id in object_path

Expand All @@ -260,17 +303,12 @@ def _init_dbus(self):
return False

try:
self._dev = self._dbus.get(SERVICE_BUS, DEVICE_PATH + f"/{self.device_id}")
self._dev = self._dbus_introspect_exists()
try:
self._bat = self._dbus.get(
SERVICE_BUS, DEVICE_PATH + f"/{self.device_id}" + BATTERY_SUBPATH
)
self._bat = self._dbus_introspect_exists(MOD_BATTERY)

if self._format_contains_notifications:
self._not = self._dbus.get(
SERVICE_BUS,
DEVICE_PATH + f"/{self.device_id}" + NOTIFICATIONS_SUBPATH,
)
self._not = self._dbus_introspect_exists(MOD_NOTIFICATIONS)
else:
self._not = None
except Exception:
Expand All @@ -280,10 +318,7 @@ def _init_dbus(self):

try: # This plugin is released after kdeconnect version Mar 13, 2021
if self._format_contains_connection_status:
self._con = self._dbus.get(
SERVICE_BUS,
DEVICE_PATH + f"/{self.device_id}" + CONN_REPORT_SUBPATH,
)
self._con = self._dbus_introspect_exists(MOD_CONNECTIVITY_REPORT)
else:
self._con = None
except Exception:
Expand All @@ -298,15 +333,15 @@ def _get_device_id(self):
"""
Find the device id
"""
_bus = self._dbus.get(SERVICE_BUS, PATH)
_bus = self._dbus.get_object(SERVICE_BUS, PATH)
devices = _bus.devices()

if self.device is None and self.device_id is None and len(devices) == 1:
return devices[0]

for id in devices:
self._dev = self._dbus.get(SERVICE_BUS, DEVICE_PATH + f"/{id}")
if self.device == self._dev.name:
name = self._dbus_property("name", device_id=id)
if self.device == name:
return id

return None
Expand All @@ -316,13 +351,13 @@ def _get_isTrusted(self):
return False

try:
# New method which replaced 'isPaired' in version 1.0
return self._dev.isTrusted()
except AttributeError:
# New method which replaced 'isTrusted' in version 1.0
return self._dbus_method('isPaired')
except Exception:
try:
# Deprecated since version 1.0
return self._dev.isPaired()
except AttributeError:
return self._dbus_method("isTrusted")
except Exception:
return False

def _get_device(self):
Expand All @@ -331,8 +366,8 @@ def _get_device(self):
"""
try:
device = {
"name": self._dev.name,
"isReachable": self._dev.isReachable,
"name": self._dbus_property('name'),
"isReachable": self._dbus_property('isReachable'),
"isTrusted": self._get_isTrusted(),
}
except Exception:
Expand All @@ -346,11 +381,12 @@ def _get_battery(self):
"""
try:
if self._bat:
charge = self._bat.charge
isCharging = self._bat.isCharging
charge = self._dbus_property("charge", MOD_BATTERY)
isCharging = self._dbus_property("isCharging", MOD_BATTERY)
else:
charge = self._dev.charge()
isCharging = self._dev.isCharging()
charge = self._dbus_method("charge")
isCharging = self._dbus_method("isCharging")

battery = {
"charge": charge,
"isCharging": isCharging == 1,
Expand All @@ -370,8 +406,8 @@ def _get_conn(self):
try:
if self._con:
# Possible values are -1 - 4
strength = self._con.cellularNetworkStrength
type = self._con.cellularNetworkType
strength = self._dbus_property("cellularNetworkStrength", MOD_CONNECTIVITY_REPORT)
type = self._dbus_property("cellularNetworkType", MOD_CONNECTIVITY_REPORT)

con_info = {
"strength": strength,
Expand All @@ -396,9 +432,9 @@ def _get_notifications(self):
"""
try:
if self._not:
notifications = self._not.activeNotifications()
notifications = self._dbus_method("activeNotifications", MOD_NOTIFICATIONS)
else:
notifications = self._dev.activeNotifications()
notifications = self._dbus_method("activeNotifications")
except Exception:
return []

Expand Down Expand Up @@ -491,16 +527,16 @@ def _update_battery_info(self):
def kill(self):
self._kill = True
if self._signal_reachable_changed:
self._dbus.con.signal_unsubscribe(self._signal_reachable_changed)
self._signal_reachable_changed.remove()

if self._signal_battery:
self._dbus.con.signal_unsubscribe(self._signal_battery)
self._signal_battery.remove()

if self._signal_notifications:
self._dbus.con.signal_unsubscribe(self._signal_notifications)
self._signal_notifications.remove()

if self._signal_conn_report:
self._dbus.con.signal_unsubscribe(self._signal_conn_report)
self._signal_conn_report.remove()

def kdeconnector(self):
"""
Expand Down

0 comments on commit cf7ad84

Please sign in to comment.