Skip to content

Commit

Permalink
Merge pull request #48 from Capstone-Projects-2022-Fall/device-pairin…
Browse files Browse the repository at this point in the history
…g-data-removal

Pairing data is removed when device disconnects.
  • Loading branch information
calin-pescaru authored Dec 4, 2022
2 parents e86f3ca + e2a8dd3 commit add168a
Show file tree
Hide file tree
Showing 4 changed files with 177 additions and 37 deletions.
13 changes: 11 additions & 2 deletions rear_rider_device/bluetooth.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,9 +102,18 @@ def is_strobe_on(self):
self.writeline('is_strobe_on')
return bool(self.readline_sync())

def discoverable_changed(self, value: str):
def discoverable_changed(self, value: bool):
'''
value
If this value is True, then the device is in a discoverable state.
'''
timeout = self.rear_rider_bt.get_discoverable_timeout()
self.writeline(f'discoverable\n{value} {timeout}')
if value:
# report that the discoverability lights should be ON = '1'
self.writeline(f'discoverable\n1 {timeout}')
else:
# report that the discoverability lights should be OFF = '0'
self.writeline(f'discoverable\n0 {timeout}')

def read_accelerometer(self) -> tuple[float, float, float]:
'''
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
from typing import TypeVar
import dbus
from rear_rider_device.rear_rider_bluetooth_server.src.bluez.example_gatt_server import find_adapter
from rear_rider_device.rear_rider_bluetooth_server.src.bluetooth_device import BluetoothDevice

BLUEZ_SERVICE_NAME = 'org.bluez'
DBUS_PROPS_IFACE = 'org.freedesktop.DBus.Properties'
OBJECT_MANAGER_IFACE = 'org.freedesktop.DBus.ObjectManager'
ADAPTER_IFACE = 'org.bluez.Adapter1'
DEVICE_IFACE = 'org.bluez.Device1'

class BluetoothAdapter:
'''
Reference:
https://git.kernel.org/pub/scm/bluetooth/bluez.git/tree/doc/adapter-api.txt
'''
def __init__(self, bus: dbus.SystemBus):
self._bus = bus
self._adapter_path = _find_adapter(bus)
self._adapter = dbus.Interface(
bus.get_object(BLUEZ_SERVICE_NAME, self._adapter_path),
ADAPTER_IFACE)
self._adapter_props = dbus.Interface(
bus.get_object(BLUEZ_SERVICE_NAME, self._adapter_path),
DBUS_PROPS_IFACE)
self._manager = dbus.Interface(bus.get_object(BLUEZ_SERVICE_NAME, '/'),
OBJECT_MANAGER_IFACE)

def __adapter_get(self, prop: str):
'''
prop
This is the name of the property as defined by the bluez docs.
'''
return self._adapter_props.Get(ADAPTER_IFACE, prop)

def __adapter_set(self, prop: str, value):
'''
prop
This is the name of the property as defined by the bluez docs.
value
The property named `prop` is set to this value. The value must be one of the dbus types.
'''
return self._adapter_props.Set(ADAPTER_IFACE, prop, value)

def get_device_list(self, only_if_paired = False):
'''
Get the Bluetooth devices managed by this adapter.
'''
objects = self._manager.GetManagedObjects()
all_devices = (str(path) for path, interfaces in objects.items() if
DEVICE_IFACE in interfaces.keys())
device_list = [BluetoothDevice(self._bus, d)
for d in all_devices if d.startswith(self.get_adapter_path() + '/')]
if only_if_paired:
paired_device_list = [pd for pd in device_list if pd.paired()]
return device_list

def get_adapter_path(self):
return str(self._adapter_path)

def get_discoverable(self):
return bool(self.__adapter_get('Discoverable'))

def get_discoverable_timeout(self):
return int(self.__adapter_get('DiscoverableTimeout'))

def set_pairable(self, value: bool):
self.__adapter_set('Pairable', dbus.Boolean(value))

def remove_device(self, device: BluetoothDevice):
self._adapter.RemoveDevice(device.get_path())

def _find_adapter(bus: dbus.SystemBus) -> dbus.ObjectPath:
adapter = find_adapter(bus)
if adapter is None:
raise Exception('Expected the adapter to not be None.')
return adapter
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,30 @@ def get_address(self):
'''
return str(self._props.Get(BLUEZ_DEVICE_1, 'Address'))

def connected(self):
'''
Check if the devices is paired.
'''
return bool(self._props.Get(BLUEZ_DEVICE_1, 'Connected'))

def disconnect(self):
'''
Disconnect this device.
'''
self._device.Disconnect()

def get_path(self):
return str(self._device.object_path)


def paired(self):
'''
Check if the devices is paired.
'''
return bool(self._props.Get(BLUEZ_DEVICE_1, 'Paired'))

def __str__(self) -> str:
return (
f'Address: {self.get_address()}'
f'Path: {self.get_path()}'
)
107 changes: 72 additions & 35 deletions rear_rider_device/rear_rider_bluetooth_server/src/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from typing import Callable, Literal, Union

from rear_rider_device.rear_rider_bluetooth_server.src.advertisement.rear_rider_adv import RearRiderAdvertisement
from rear_rider_device.rear_rider_bluetooth_server.src.bluetooth_adapter import BluetoothAdapter

from rear_rider_device.rear_rider_bluetooth_server.src.bluez.example_advertisement import LE_ADVERTISING_MANAGER_IFACE
from rear_rider_device.rear_rider_bluetooth_server.src.bluez.example_gatt_server import find_adapter, dbus, BLUEZ_SERVICE_NAME, GATT_MANAGER_IFACE
Expand Down Expand Up @@ -41,16 +42,11 @@ def get_object_interface(INTERFACE: Literal):
return get_object_interface

class RearRiderBluetooth:
_discoverable: str
"""
"0" or "1"
"""
_on_discoverable_changed: Union[None, Callable[[str], None]]
_discoverable: bool
_on_discoverable_changed: Union[None, Callable[[bool], None]]
def __init__(self, bus: dbus.SystemBus, hello_world_svc: HelloWorldService, sensors_svc: SensorsService):
self._bus = bus
self._adapter = find_adapter(self._bus)
get_object_interface = get_object_interface_getter(bus, self._adapter)
self._adapter_props = get_object_interface(DBUS_PROPS_IFACE)
self._adapter = BluetoothAdapter(bus)
self._connected_device: Union[None, BluetoothDevice] = None
self.hello_world_svc = hello_world_svc
self.sensors_svc = sensors_svc
Expand All @@ -64,7 +60,7 @@ def bt_properties_changed(interface, changed, invalidated, path):
"""
changed_props = changed.keys()
if 'Discoverable' in changed_props:
self.set_discoverable(str(changed.get('Discoverable')))
self.__set_discoverable_state(bool(changed.get('Discoverable')))
if 'Connected' in changed_props:
self._on_device_connection_change(
bool(changed.get('Connected')),
Expand All @@ -75,36 +71,44 @@ def bt_properties_changed(interface, changed, invalidated, path):
signal_name="PropertiesChanged",
path_keyword="path")
listen_to_bluetooth_property_changes()

##
# Discovery
##

def set_discoverable(self, value: str):
if value != '0' and value != '1':
raise Exception('discoverable value must be `0` or `1`')
self._discoverable = value
def sync_discoverable_state(self):
'''
Synchronize the discoverable state by checking with the adapter properties.
'''
self.__set_discoverable_state(self._adapter.get_discoverable())

def __set_discoverable_state(self, value: bool):
'''
Update the discoverability state.
Calls the callback function set by `self.set_on_discoverable_changed(...)`. Takes into
account that no device is connected.
'''
self._discoverable = value and self.allowing_connections()
if self._on_discoverable_changed is not None:
self._on_discoverable_changed(self._discoverable)
def set_on_discoverable_changed(self, callback: Callable[[str], None]):

def set_on_discoverable_changed(self, callback: Callable[[bool], None]):
"""
Callback should expect that value be "0" or "1"
"""
self._on_discoverable_changed = callback
self._on_discoverable_changed(self._discoverable)

def get_discoverable_timeout(self):
return int(self._adapter_props.Get('org.bluez.Adapter1',
'DiscoverableTimeout'))
return self._adapter.get_discoverable_timeout()

##
# Pairing
##

def _set_pairable(self, value: bool):
self._adapter_props.Set('org.bluez.Adapter1', 'Pairable', dbus.Boolean(value))
self._adapter.set_pairable(value)

def _on_device_connection_change(self, connected: bool, device_path: str):
device = BluetoothDevice(self._bus, device_path)
if connected:
Expand All @@ -115,9 +119,8 @@ def _on_device_connection_change(self, connected: bool, device_path: str):
raise Exception('We only want one device at a time to be connected.')
# Disable pairing since we only want one device at a time to be connected.
self._set_pairable(False)
# Turn off discoverability
self.set_discoverable('0')
self._connected_device = device
self.sync_discoverable_state()
return
# connect == False, therefore this device was just disconnected.
if (self._connected_device is not None and
Expand All @@ -127,13 +130,51 @@ def _on_device_connection_change(self, connected: bool, device_path: str):
'Expected _connected_device to be not None and the object paths to be the same.\n'
f'{self._connected_device.get_address()} {device.get_address()}')
self._connected_device = None
self.__remove_devices(self._adapter.get_device_list())
self._set_pairable(True)
self.set_discoverable('1')

def has_connected_device(self):
return self._connected_device is not None
self.sync_discoverable_state()

def __remove_devices(self, devices: list[BluetoothDevice]):
for device in devices:
self._adapter.remove_device(device)

def allowing_connections(self):
'''
Returns true or false denoting if new device connections are allowed.
'''
return self._connected_device is None

def kick_and_remove_devices_if_not_alone(self):
'''
Remove/Forget all paired devices under the following conditions:
- Not currently connected.
- Is connected, but is also not the only connected devices.
This effectively ensures at most one device is connected, and implements a work around for
having to manually "remove" a device via `bluetoothctl`.
'''
devices = self._adapter.get_device_list(only_if_paired=True)
connected_devices: list[BluetoothDevice] = []
disconnected_devices: list[BluetoothDevice] = []
for device in devices:
if device.connected():
connected_devices.append(device)
continue
disconnected_devices.append(device)

len_connected_devices = len(connected_devices)
if len_connected_devices == 1:
# connected
self._connected_device = connected_devices[0]
else:
# connected devices are 0: skip for ... in ...:
for connected_device in connected_devices:
# connected devices are > 1
connected_device.disconnect()
disconnected_devices.append(connected_device)
self.__remove_devices(disconnected_devices)


def main(print, on_ready: Union[None, Callable[[RearRiderBluetooth], None]], strobe_light: StrobeLight):
"""
"""
Expand Down Expand Up @@ -205,14 +246,10 @@ def register_advertisement():
def register_ad_cb():
print('Advertisement registered')
print(rear_rider_adv.get_properties())
# rear_rider_adv.Powe
if on_ready is not None:
rear_rider_bt = RearRiderBluetooth(bus, app.hello_world_service, app.sensors_service)
# Get initial value
# discoverable = '0'
discoverable = adapter_props.Get("org.bluez.Adapter1", "Discoverable")
rear_rider_bt.set_discoverable(str(discoverable))
# print(f'Initial "discoverable" value: {discoverable}')
rear_rider_bt.kick_and_remove_devices_if_not_alone()
rear_rider_bt.sync_discoverable_state()
on_ready(rear_rider_bt)
else:
stdout.write('ready\n')
Expand Down

0 comments on commit add168a

Please sign in to comment.