-
Notifications
You must be signed in to change notification settings - Fork 304
Replies: 2 comments · 10 replies
-
Can you share the code? Maybe a job for |
Beta Was this translation helpful? Give feedback.
All reactions
-
It was a bit complicated because main part of the code on this computer side has be done for serial communication before I went to the project. But here is the code. Of course, it is not the whole code but just a simplified part I use to test the new BLE communication that I'm implementing (2 files, 2 classes). The code runs perfectly when we gather connection, start_notify, asyncio.sleep(), stop_notify and disconnection in a single async function (see it by uncommenting lines in start() from Communication class). But then, there is no way to call feedback() function (in Communication class). Also, I really want the notification run continiously until I stop it with another function (stop() from Communcation class). Notice that the communication part in the BLE device is working well. I test it in each scenario I wanted with nRF connect.
data_table = []
def callback(handle, data):
char = bytearray.decode(data)
data_table.append(char)
class Communication:
def __init__(self, session_duration):
self.session_duration = session_duration
self.data = []
self.client = BleakClient(DEVICE_ADDRESS, timeout=5.0)
print("Connected to \"", DEVICE_NAME, "\" (ADD: ", DEVICE_ADDRESS, ")")
def get_data(self):
global data_table
return data_table
async def start(self):
async with self.client as client:
try:
print("Checking sensor status")
press_bytes = await client.read_gatt_char(PRESS_CHAR_UUID)
imu_bytes = await client.read_gatt_char(IMU_CHAR_UUID)
press_ = bytearray.decode(imu_bytes)
imu_ = bytearray.decode(press_bytes)
print('Press status : ', imu_)
print('IMU status : ', press_)
await asyncio.sleep(0.5)
print("Begin session")
await client.write_gatt_char(SESS_CHAR_UUID, BA_TRUE) # Data won't be notified if this session char. is not written to true
await asyncio.sleep(0.5)
print("Begin notification")
await client.start_notify(DATA_CHAR_UUID, callback)
"""await asyncio.sleep(self.session_duration)
await client.stop_notify(DATA_CHAR_UUID)
print("End notification")
await client.write_gatt_char(SESS_CHAR_UUID, BA_FALSE)
print("End session")"""
except Exception as e:
print(e)
"""finally:
await client.disconnect()"""
async def stop(self):
async with self.client as client:
try:
await client.stop_notify(DATA_CHAR_UUID)
print("End notification")
await client.write_gatt_char(SESS_CHAR_UUID, BA_FALSE)
print("End session")
except Exception as e:
print(e)
finally:
await client.disconnect()
async def feedback(self, state):
async with self.client as client:
try:
print("Feedback sending")
if state == State.OK:
await client.write_gatt_char(FDBK_CHAR_UUID, BA_OK)
elif state == State.INTERRUPTION:
await client.write_gatt_char(FDBK_CHAR_UUID, BA_INTERRUPTION)
else:
print("ERROR: Smtg wrong with Communication.feedback() func.")
except Exception as e:
print(e)
class ESPReader:
def __init__(self, session_duration, save=None):
self.BLE = Communication(session_duration)
self.data = []
self.cols = ["time", "press1", "press2", "press3", "acc1", "acc2", "acc3", "gyro1", "gyro2", "gyro3"]
self.save = save
pass
async def start(self):
await self.BLE.start()
def get_data(self):
self.data = []
new_data = self.BLE.get_data()
new_data = [re.split(",", s) for s in new_data]
new_data = [a for a in new_data if len(a) == 10 and is_float(a)]
new_data = np.array(new_data).astype("float")
self.data.extend(new_data)
self.data = pd.DataFrame(self.data, columns=self.cols)
return self.data
async def stop(self):
data = self.get_data()
await self.BLE.stop()
#if self.save is not None:
# data.to_csv(f'./recordings/{self.save}/data.csv')
`
`async def main():
reader = ESPReader(save=False, session_duration=7)
print('Starting reader')
acquisition = asyncio.create_task(reader.start())
await asyncio.sleep(5) # simulate processing
print('Printing data (1):')
print(reader.get_data())
await reader.BLE.feedback(State.INTERRUPTION) # simulate feedback to do as a result of processing
await asyncio.sleep(5) # simulate processing
print('Printing data (2):')
print(reader.get_data())
await reader.start()
await reader.stop()
if __name__ == '__main__':
asyncio.run(main()) |
Beta Was this translation helpful? Give feedback.
All reactions
-
Using the context manager ( |
Beta Was this translation helpful? Give feedback.
All reactions
-
👍 1
-
Yes thank you. Here is the new code :
class Communication:
def __init__(self, session_duration):
self.session_duration = session_duration #Not used here
self.data = []
self.client = BleakClient(DEVICE_ADDRESS)
print("Connected to \"", DEVICE_NAME, "\" (ADD: ", DEVICE_ADDRESS, ")")
def get_data(self):
return self.data
async def start(self):
try:
await self.client.connect()
print("Checking sensor status")
press_bytes = await self.client.read_gatt_char(PRESS_CHAR_UUID)
imu_bytes = await self.client.read_gatt_char(IMU_CHAR_UUID)
press_ = bytearray.decode(imu_bytes)
imu_ = bytearray.decode(press_bytes)
print('Press status : ', imu_)
print('IMU status : ', press_)
print("Begin session")
await self.client.write_gatt_char(SESS_CHAR_UUID, BA_TRUE) # Data won't be notified if this session char. is not written to true
print("Begin notification")
await self.client.start_notify(DATA_CHAR_UUID, partial(self.callback, self))
except Exception as e:
print(e)
async def stop(self):
try:
await self.client.stop_notify(DATA_CHAR_UUID)
print("End notification")
await self.client.write_gatt_char(SESS_CHAR_UUID, BA_FALSE)
print("End session")
except Exception as e:
print(e)
finally:
await self.client.disconnect()
async def feedback(self, state):
try:
print("Feedback sending")
if state == State.OK:
await self.client.write_gatt_char(FDBK_CHAR_UUID, BA_OK)
elif state == State.INTERRUPTION:
await self.client.write_gatt_char(FDBK_CHAR_UUID, BA_INTERRUPTION)
else:
print("ERROR: Smtg wrong with Communication.feedback() func.")
except Exception as e:
print(e)
def callback(self: Communication, handle: int, data: bytearray):
char = bytearray.decode(data)
self.data.append(char)
print(f"{handle}: {char}")`
class ESPReader:
def __init__(self, session_duration, save=None):
self.BLE = Communication(session_duration)
self.data = []
self.cols = ["time", "press1", "press2", "press3", "acc1", "acc2", "acc3", "gyro1", "gyro2", "gyro3"]
self.save = save
pass
async def start(self):
await self.BLE.start()
def get_data(self):
self.data = []
new_data = self.BLE.get_data()
new_data = [re.split(",", s) for s in new_data]
new_data = [a for a in new_data if len(a) == 10 and is_float(a)]
new_data = np.array(new_data).astype("float")
self.data.extend(new_data)
self.data = pd.DataFrame(self.data, columns=self.cols)
return self.data
async def stop(self):
data = self.get_data()
await self.BLE.stop()
#if self.save is not None:
# data.to_csv(f'./recordings/{self.save}/data.csv')
async def main():
reader = ESPReader(save=False, session_duration=7)
print('Starting reader')
await reader.start()
await asyncio.sleep(5) # simulate processing
print('Printing data (1):')
print(reader.get_data())
await reader.BLE.feedback(State.INTERRUPTION) # simulate feedback to do as a result of processing
await asyncio.sleep(5) # simulate processing
print('Printing data (2):')
print(reader.get_data())
await reader.stop()
if __name__ == '__main__':
asyncio.run(main())` I can run it without any issue without the await reader.BLE.feedback(State.INTERRUPTION) line, but by using it I have the following error :
|
Beta Was this translation helpful? Give feedback.
All reactions
-
I don't see this in your code, so I'm guessing the crash is from different code. |
Beta Was this translation helpful? Give feedback.
All reactions
-
Sorry, I change the name but "main_thread" attribute correspond to "BLE" attribute. |
Beta Was this translation helpful? Give feedback.
All reactions
-
Ok, I'm back to the office and there is no more error. The current test code is like this right now :
class Communication:
def __init__(self):
self.data = []
self.client = BleakClient(DEVICE_ADDRESS)
print("Connected to \"", DEVICE_NAME, "\" (ADD: ", DEVICE_ADDRESS, ")")
def get_data(self):
return self.data
async def start(self):
try:
print("LAUNCHING START")
await self.client.connect()
print("Checking Gloves status")
press_bytes = await self.client.read_gatt_char(PRESS_CHAR_UUID)
imu_bytes = await self.client.read_gatt_char(IMU_CHAR_UUID)
press_ = bytearray.decode(imu_bytes)
imu_ = bytearray.decode(press_bytes)
print('Press status : ', imu_)
print('IMU status : ', press_)
print("Begin session")
await self.client.write_gatt_char(SESS_CHAR_UUID, BA_TRUE)
print("Begin notification")
await self.client.start_notify(DATA_CHAR_UUID, partial(callback, self))
except Exception as e:
print(e)
print("ENDING START")
async def stop(self):
try:
await self.client.stop_notify(DATA_CHAR_UUID)
print("End notification")
await self.client.write_gatt_char(SESS_CHAR_UUID, BA_FALSE)
print("End session")
except Exception as e:
print(e)
finally:
await self.client.disconnect()
async def feedback(self, state):
try:
print("Feedback sending")
if state == State.OK:
await self.client.write_gatt_char(FDBK_CHAR_UUID, BA_OK)
elif state == State.INTERRUPTION:
await self.client.write_gatt_char(FDBK_CHAR_UUID, BA_INTERRUPTION)
else:
print("ERROR: Smtg wrong with Communication.feedback() func.")
except Exception as e:
print(e)
def callback(self: Communication, handle: int, data: bytearray):
char = bytearray.decode(data)
self.data.append(char)
class ESPReader:
def __init__(self, save=None):
self.BLE = Communication()
self.data = []
self.cols = ["time", "press1", "press2", "press3", "acc1", "acc2", "acc3", "gyro1", "gyro2", "gyro3"]
self.save = save
pass
async def start(self):
await self.BLE.start()
def get_data(self):
self.data = []
new_data = self.BLE.get_data()
new_data = [re.split(",", s) for s in new_data]
new_data = [a for a in new_data if len(a) == 10 and is_float(a)]
new_data = np.array(new_data).astype("float")
self.data.extend(new_data)
self.data = pd.DataFrame(self.data, columns=self.cols)
return self.data
async def stop(self):
data = self.get_data()
await self.BLE.stop()
#if self.save is not None:
# data.to_csv(f'./recordings/{self.save}/data.csv')
async def essai():
reader = ESPReader(save=False)
print('Starting reader')
await reader.start()
print('waiting for data collection')
await asyncio.sleep(3)
print('printing data (1):')
print(reader.get_data())
await reader.BLE.feedback(State.INTERRUPTION)
await asyncio.sleep(3)
print('printing data (2):')
print(reader.get_data())
await reader.stop() And when we call the async essai() function from ESPReader class this way :
So the test is working fine. Now, I am about to implement the BLE communication part to an existing working code. Here is a summary of it (because the code is too long and was done by a former person working in the project).
async def run(self):
await self.reader.start()
print("DEBUG")
# Let the user perform for a moment before giving feedback
if self.config["session length"] is not None:
self.ending_time = self.config["session length"] + time.time()
else:
self.ending_time = None
self.state = State.OK
# Main loop
try:
while True:
await asyncio.sleep(self.config["feedback timer"])
data = self.reader.get_data()
self.state = self.model.estimate(data)
# We give feedback and enable cooldown only if state is not OK
if self.state != State.OK:
await self.reader.BLE.feedback(self.state.value)
await asyncio.sleep(0.5)
if self.ending_time is not None \
and self.ending_time < time.time():
break
await self.stop()
except KeyboardInterrupt:
await self.stop()
async def stop(self):
# Exit cleanup only done if the program is running currently
if self.state != State.READY:
# Save the main data
await self.reader.stop()`
Then, there is a _main.py_ file which does the following :
- _main.py_ file :
`async def main():
program = Processing()
try:
await program.run()
# Catch the exit signal for graceful exit and saving of current session.
except KeyboardInterrupt:
await program.stop()
if __name__ == '__main__':
asyncio.run(main()) Here is the console log :
The code seems to be blocked on the await self.client.connect() line from the start() function of Communication class, between the print("LAUNCHING START") and print("Checking Gloves status") lines. The computer is connected to the BLE device (blue LED onboard is on), but nothing is happening (even in the console log of the BLE device which should display something while transmitting or after ending time passed away). |
Beta Was this translation helpful? Give feedback.
All reactions
-
And sometimes I have this error by executing the same code :
It seems to be an error when services are getting but I don't see why it works well in the test and returns this error here. |
Beta Was this translation helpful? Give feedback.
-
Hello !
My context
I'm working on a BLE device for feedback purposes and the data processing happen in Python on a computer (the work have been done before me) using Bleak to communicate with the device. Thing is I'm really new in BLE area and I'm not that familiar with Python and asynchronous function.
For my project, I need to gather data constiniously from a device BLE characteristic value via notification, and react to it (at some times) by writing a value in another characteristic.
The problem
I have created a class to manipulate the BLE interaction but I'm not able to run a writing characteristic function during another notifiying function is running on another characteristic. Currently, writing function is running when notifiying function is a completed (these two functions are async.)
So, I'm looking for a proper solution which have :
The problem may come from the way the async functions are called. I don't really know from where it does come from.
Any help would be welcome. Thanks !
Beta Was this translation helpful? Give feedback.
All reactions