Skip to content

Commit

Permalink
Made tests check received packets
Browse files Browse the repository at this point in the history
  • Loading branch information
alexoj46 committed Nov 4, 2024
1 parent 4260119 commit ffc9263
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 9 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
"""
The :mod:`navigator_drone_comm` module is used to communicate with the drone from Navigator.
Packets received from the drone are published to /navigator_drone_comm/target, /navigator_drone_comm/gps,
and /navigator_drone_comm/target.
Can send packets to the drone by calling the services /navigator_drone_comm/estop, /navigator_drone_comm/stop,
/navigator_drone_comm/start "example mission name".
"""

from .driver import DroneCommDevice
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,15 +74,12 @@ def heartbeat_check(self, _):
pass

def estop_send(self):
# rospy.loginfo("sending EStop")
self.send_packet(EStopPacket())

def start_send(self, mission_name: str):
# rospy.loginfo(f"sending Start for mission: %s", mission_name)
self.send_packet(StartPacket(name=mission_name))

def stop_send(self):
# rospy.loginfo("sending Stop")
self.send_packet(StopPacket())

def on_packet_received(
Expand All @@ -94,6 +91,7 @@ def on_packet_received(
hbt_msg = Header(self.received_seq_num, rospy.Time.now(), "drone_heartbeat")
self.drone_heartbeat_pub.publish(hbt_msg)
self.received_seq_num += 1
self.drone_heartbeat_pub.publish(hbt_msg)
elif isinstance(packet, GPSDronePacket):
point_msg = Point(packet.lat, packet.lon, packet.alt)
self.gps_pub.publish(point_msg)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import rospy
import rostest
from geometry_msgs.msg import Point
from navigator_drone_comm.driver import DroneCommDevice
from navigator_drone_comm.packets import (
Color,
Expand All @@ -18,14 +19,28 @@
StopPacket,
TargetPacket,
)
from navigator_msgs.msg import DroneTarget
from navigator_msgs.srv import DroneMission, DroneMissionRequest
from std_msgs.msg import Header
from std_srvs.srv import Empty, EmptyRequest


class SimulatedBasicTest(unittest.TestCase):
def __init__(self, *args):
super().__init__(*args)

@classmethod
def gps_callback(cls, data):
cls.drone_gps = data

@classmethod
def target_callback(cls, data):
cls.target = data

@classmethod
def heartbeat_drone_callback(cls, data):
cls.last_drone_heartbeat = data

@classmethod
def setUpClass(cls):
cls.master, cls.slave = pty.openpty()
Expand All @@ -46,6 +61,12 @@ def setUpClass(cls):
DroneMission,
)
cls.start_proxy.wait_for_service()
cls.drone_gps = None
cls.target = None
cls.last_drone_heartbeat = None
rospy.Subscriber("~gps", Point, cls.gps_callback)
rospy.Subscriber("~target", DroneTarget, cls.target_callback)
rospy.Subscriber("~drone_heartbeat", Header, cls.heartbeat_drone_callback)

def test_device_initialization(self):
self.assertIsNotNone(self.device)
Expand Down Expand Up @@ -83,20 +104,21 @@ def test_start_mission(self):
break
except RuntimeError:
# ignore heartbeat packets here
rospy.logerr()
pass
self.assertIsInstance(packet, StartPacket)

# TODO add asserts
def test_gps_drone_receive(self):
gps_packet = GPSDronePacket(lat=37.7749, lon=-122.4194, alt=30.0)
self.assertEqual(self.drone_gps, None)
gps_packet = GPSDronePacket(lat=37.77, lon=-122.4194, alt=30.0)
os.write(self.master, bytes(gps_packet))
rospy.sleep(0.5)
self.assertAlmostEqual(round(self.drone_gps.x, 3), 37.77)

def test_target_receive(self):
target_packet = TargetPacket(lat=-67.7745, lon=12.654, color=Color.BLUE)
os.write(self.master, bytes(target_packet))
rospy.sleep(0.5)
self.assertEqual(self.target.color, "BLUE")

# tests that a heartbeat is sent from the boat every second
def test_z_sending_heartbeats(self):
Expand All @@ -106,14 +128,14 @@ def test_z_sending_heartbeats(self):
self.assertIsInstance(packet, HeartbeatSetPacket)
self.assertLess(time.time() - start_time, 1 * i + 0.1)

# TODO
# test that if a heartbeat is received every second, no error sounds
# test that the boat can receive drone heartbeats
def test_z_heartbeat_receive(self):
rospy.loginfo("Testing receiving 2 heartbeats...")
for i in range(3):
heartbeat_packet = HeartbeatReceivePacket()
self.device.on_packet_received(heartbeat_packet)
time.sleep(1)
self.assertEqual(self.last_drone_heartbeat.frame_id, "drone_heartbeat")
self.assertEqual(self.last_drone_heartbeat.seq, i)

@classmethod
def tearDownClass(cls):
Expand Down

0 comments on commit ffc9263

Please sign in to comment.