-
Notifications
You must be signed in to change notification settings - Fork 9.1k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
split locationd and liblocationd tests (#24977)
* laikad: use cython version of gnss kf * fix import error * test liblocationd separate * Revert "laikad: use cython version of gnss kf" This reverts commit bdd769b.
- Loading branch information
Showing
3 changed files
with
108 additions
and
95 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,106 @@ | ||
#!/usr/bin/env python3 | ||
"""This test can't be run together with other locationd tests. | ||
cffi.dlopen breaks the list of registered filters.""" | ||
import os | ||
import random | ||
import unittest | ||
|
||
from cffi import FFI | ||
|
||
import cereal.messaging as messaging | ||
from cereal import log | ||
|
||
SENSOR_DECIMATION = 1 | ||
VISION_DECIMATION = 1 | ||
|
||
LIBLOCATIOND_PATH = os.path.abspath(os.path.join(os.path.dirname(__file__), '../liblocationd.so')) | ||
|
||
|
||
class TestLocationdLib(unittest.TestCase): | ||
def setUp(self): | ||
header = '''typedef ...* Localizer_t; | ||
Localizer_t localizer_init(); | ||
void localizer_get_message_bytes(Localizer_t localizer, bool inputsOK, bool sensorsOK, bool gpsOK, bool msgValid, char *buff, size_t buff_size); | ||
void localizer_handle_msg_bytes(Localizer_t localizer, const char *data, size_t size);''' | ||
|
||
self.ffi = FFI() | ||
self.ffi.cdef(header) | ||
self.lib = self.ffi.dlopen(LIBLOCATIOND_PATH) | ||
|
||
self.localizer = self.lib.localizer_init() | ||
|
||
self.buff_size = 2048 | ||
self.msg_buff = self.ffi.new(f'char[{self.buff_size}]') | ||
|
||
def localizer_handle_msg(self, msg_builder): | ||
bytstr = msg_builder.to_bytes() | ||
self.lib.localizer_handle_msg_bytes(self.localizer, self.ffi.from_buffer(bytstr), len(bytstr)) | ||
|
||
def localizer_get_msg(self, t=0, inputsOK=True, sensorsOK=True, gpsOK=True, msgValid=True): | ||
self.lib.localizer_get_message_bytes(self.localizer, inputsOK, sensorsOK, gpsOK, msgValid, self.ffi.addressof(self.msg_buff, 0), self.buff_size) | ||
return log.Event.from_bytes(self.ffi.buffer(self.msg_buff), nesting_limit=self.buff_size // 8) | ||
|
||
def test_liblocalizer(self): | ||
msg = messaging.new_message('liveCalibration') | ||
msg.liveCalibration.validBlocks = random.randint(1, 10) | ||
msg.liveCalibration.rpyCalib = [random.random() / 10 for _ in range(3)] | ||
|
||
self.localizer_handle_msg(msg) | ||
liveloc = self.localizer_get_msg() | ||
self.assertTrue(liveloc is not None) | ||
|
||
@unittest.skip("temporarily disabled due to false positives") | ||
def test_device_fell(self): | ||
msg = messaging.new_message('sensorEvents', 1) | ||
msg.sensorEvents[0].sensor = 1 | ||
msg.sensorEvents[0].timestamp = msg.logMonoTime | ||
msg.sensorEvents[0].type = 1 | ||
msg.sensorEvents[0].init('acceleration') | ||
msg.sensorEvents[0].acceleration.v = [10.0, 0.0, 0.0] # zero with gravity | ||
self.localizer_handle_msg(msg) | ||
|
||
ret = self.localizer_get_msg() | ||
self.assertTrue(ret.liveLocationKalman.deviceStable) | ||
|
||
msg = messaging.new_message('sensorEvents', 1) | ||
msg.sensorEvents[0].sensor = 1 | ||
msg.sensorEvents[0].timestamp = msg.logMonoTime | ||
msg.sensorEvents[0].type = 1 | ||
msg.sensorEvents[0].init('acceleration') | ||
msg.sensorEvents[0].acceleration.v = [50.1, 0.0, 0.0] # more than 40 m/s**2 | ||
self.localizer_handle_msg(msg) | ||
|
||
ret = self.localizer_get_msg() | ||
self.assertFalse(ret.liveLocationKalman.deviceStable) | ||
|
||
def test_posenet_spike(self): | ||
for _ in range(SENSOR_DECIMATION): | ||
msg = messaging.new_message('carState') | ||
msg.carState.vEgo = 6.0 # more than 5 m/s | ||
self.localizer_handle_msg(msg) | ||
|
||
ret = self.localizer_get_msg() | ||
self.assertTrue(ret.liveLocationKalman.posenetOK) | ||
|
||
for _ in range(20 * VISION_DECIMATION): # size of hist_old | ||
msg = messaging.new_message('cameraOdometry') | ||
msg.cameraOdometry.rot = [0.0, 0.0, 0.0] | ||
msg.cameraOdometry.rotStd = [0.1, 0.1, 0.1] | ||
msg.cameraOdometry.trans = [0.0, 0.0, 0.0] | ||
msg.cameraOdometry.transStd = [2.0, 0.1, 0.1] | ||
self.localizer_handle_msg(msg) | ||
|
||
for _ in range(20 * VISION_DECIMATION): # size of hist_new | ||
msg = messaging.new_message('cameraOdometry') | ||
msg.cameraOdometry.rot = [0.0, 0.0, 0.0] | ||
msg.cameraOdometry.rotStd = [1.0, 1.0, 1.0] | ||
msg.cameraOdometry.trans = [0.0, 0.0, 0.0] | ||
msg.cameraOdometry.transStd = [10.1, 0.1, 0.1] # more than 4 times larger | ||
self.localizer_handle_msg(msg) | ||
|
||
ret = self.localizer_get_msg() | ||
self.assertFalse(ret.liveLocationKalman.posenetOK) | ||
|
||
if __name__ == "__main__": | ||
unittest.main() | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters