-
Notifications
You must be signed in to change notification settings - Fork 15
/
example-gatt-client
executable file
·224 lines (162 loc) · 5.72 KB
/
example-gatt-client
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
#!/usr/bin/env python3
# SPDX-License-Identifier: LGPL-2.1-or-later
import dbus
from gi.repository import GLib
import sys
from dbus.mainloop.glib import DBusGMainLoop
bus = None
mainloop = None
BLUEZ_SERVICE_NAME = 'org.bluez'
DBUS_OM_IFACE = 'org.freedesktop.DBus.ObjectManager'
DBUS_PROP_IFACE = 'org.freedesktop.DBus.Properties'
GATT_SERVICE_IFACE = 'org.bluez.GattService1'
GATT_CHRC_IFACE = 'org.bluez.GattCharacteristic1'
HR_SVC_UUID = '0000180d-0000-1000-8000-00805f9b34fb'
HR_MSRMT_UUID = '00002a37-0000-1000-8000-00805f9b34fb'
BODY_SNSR_LOC_UUID = '00002a38-0000-1000-8000-00805f9b34fb'
HR_CTRL_PT_UUID = '00002a39-0000-1000-8000-00805f9b34fb'
# The objects that we interact with.
hr_service = None
hr_msrmt_chrc = None
body_snsr_loc_chrc = None
hr_ctrl_pt_chrc = None
def generic_error_cb(error):
print('D-Bus call failed: ' + str(error))
mainloop.quit()
def body_sensor_val_to_str(val):
if val == 0:
return 'Other'
if val == 1:
return 'Chest'
if val == 2:
return 'Wrist'
if val == 3:
return 'Finger'
if val == 4:
return 'Hand'
if val == 5:
return 'Ear Lobe'
if val == 6:
return 'Foot'
return 'Reserved value'
def sensor_contact_val_to_str(val):
if val == 0 or val == 1:
return 'not supported'
if val == 2:
return 'no contact detected'
if val == 3:
return 'contact detected'
return 'invalid value'
def body_sensor_val_cb(value):
if len(value) != 1:
print('Invalid body sensor location value: ' + repr(value))
return
print('Body sensor location value: ' + body_sensor_val_to_str(value[0]))
def hr_msrmt_start_notify_cb():
print('HR Measurement notifications enabled')
def hr_msrmt_changed_cb(iface, changed_props, invalidated_props):
if iface != GATT_CHRC_IFACE:
return
if not len(changed_props):
return
value = changed_props.get('Value', None)
if not value:
return
print('New HR Measurement')
flags = value[0]
value_format = flags & 0x01
sc_status = (flags >> 1) & 0x03
ee_status = flags & 0x08
if value_format == 0x00:
hr_msrmt = value[1]
next_ind = 2
else:
hr_msrmt = value[1] | (value[2] << 8)
next_ind = 3
print('\tHR: ' + str(int(hr_msrmt)))
print('\tSensor Contact status: ' +
sensor_contact_val_to_str(sc_status))
if ee_status:
print('\tEnergy Expended: ' + str(int(value[next_ind])))
def start_client():
# Read the Body Sensor Location value and print it asynchronously.
body_snsr_loc_chrc[0].ReadValue({}, reply_handler=body_sensor_val_cb,
error_handler=generic_error_cb,
dbus_interface=GATT_CHRC_IFACE)
# Listen to PropertiesChanged signals from the Heart Measurement
# Characteristic.
hr_msrmt_prop_iface = dbus.Interface(hr_msrmt_chrc[0], DBUS_PROP_IFACE)
hr_msrmt_prop_iface.connect_to_signal("PropertiesChanged",
hr_msrmt_changed_cb)
# Subscribe to Heart Rate Measurement notifications.
hr_msrmt_chrc[0].StartNotify(reply_handler=hr_msrmt_start_notify_cb,
error_handler=generic_error_cb,
dbus_interface=GATT_CHRC_IFACE)
def process_chrc(chrc_path):
chrc = bus.get_object(BLUEZ_SERVICE_NAME, chrc_path)
chrc_props = chrc.GetAll(GATT_CHRC_IFACE,
dbus_interface=DBUS_PROP_IFACE)
uuid = chrc_props['UUID']
if uuid == HR_MSRMT_UUID:
global hr_msrmt_chrc
hr_msrmt_chrc = (chrc, chrc_props)
elif uuid == BODY_SNSR_LOC_UUID:
global body_snsr_loc_chrc
body_snsr_loc_chrc = (chrc, chrc_props)
elif uuid == HR_CTRL_PT_UUID:
global hr_ctrl_pt_chrc
hr_ctrl_pt_chrc = (chrc, chrc_props)
else:
print('Unrecognized characteristic: ' + uuid)
return True
def process_hr_service(service_path, chrc_paths):
service = bus.get_object(BLUEZ_SERVICE_NAME, service_path)
service_props = service.GetAll(GATT_SERVICE_IFACE,
dbus_interface=DBUS_PROP_IFACE)
uuid = service_props['UUID']
if uuid != HR_SVC_UUID:
return False
print('Heart Rate Service found: ' + service_path)
# Process the characteristics.
for chrc_path in chrc_paths:
process_chrc(chrc_path)
global hr_service
hr_service = (service, service_props, service_path)
return True
def interfaces_removed_cb(object_path, interfaces):
if not hr_service:
return
if object_path == hr_service[2]:
print('Service was removed')
mainloop.quit()
def main():
# Set up the main loop.
DBusGMainLoop(set_as_default=True)
global bus
bus = dbus.SystemBus()
global mainloop
mainloop = GLib.MainLoop()
om = dbus.Interface(bus.get_object(BLUEZ_SERVICE_NAME, '/'), DBUS_OM_IFACE)
om.connect_to_signal('InterfacesRemoved', interfaces_removed_cb)
print('Getting objects...')
objects = om.GetManagedObjects()
chrcs = []
# List characteristics found
for path, interfaces in objects.items():
if GATT_CHRC_IFACE not in interfaces.keys():
continue
chrcs.append(path)
# List sevices found
for path, interfaces in objects.items():
if GATT_SERVICE_IFACE not in interfaces.keys():
continue
chrc_paths = [d for d in chrcs if d.startswith(path + "/")]
if process_hr_service(path, chrc_paths):
break
if not hr_service:
print('No Heart Rate Service found')
sys.exit(1)
start_client()
mainloop.run()
if __name__ == '__main__':
main()