-
Notifications
You must be signed in to change notification settings - Fork 0
/
turtle_spawner.py
109 lines (90 loc) · 3.79 KB
/
turtle_spawner.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
#!/usr/bin/env python3
from functools import partial
import random
import math
import rclpy
from rclpy.node import Node
from turtlesim.srv import Spawn
from turtlesim.srv import Kill
from my_robot_interfaces.msg import Turtle
from my_robot_interfaces.msg import TurtleArray
from my_robot_interfaces.srv import CatchTurtle
class TurtleSpawner(Node):
def __init__(self):
super().__init__("turtle_spawner")
self.declare_parameter("spawn_frequency", 1.0)
self.declare_parameter("turtle_name_prefix", "turtle")
self.spawn_frequency_ = self.get_parameter("spawn_frequency").value
self.turtle_name_prefix_ = self.get_parameter("turtle_name_prefix").value
self.turtle_counter_ = 0
self.alive_turtles_ = []
self.alive_turtles_publisher_ = self.create_publisher(TurtleArray, "alive_turtles", 10)
self.spawn_turtle_timer_ = self.create_timer(1.0/self.spawn_frequency_, self.spawn_new_turtle)
self.catch_turtle_service_ = self.create_service(CatchTurtle, "catch_turtle", self.callback_catch_turtle)
def callback_catch_turtle(self, request, response):
self.call_kill_server(request.name)
response.success = True
return response
def publish_alive_turtles(self):
msg = TurtleArray()
msg.turtles = self.alive_turtles_
self.alive_turtles_publisher_.publish(msg)
def spawn_new_turtle(self):
self.turtle_counter_ += 1
name = self.turtle_name_prefix_ + str(self.turtle_counter_)
x = random.uniform(0.0, 11.0)
y = random.uniform(0.0, 11.0)
theta = random.uniform(0.0, 2*math.pi)
self.call_spawn_server(name, x, y, theta)
def call_spawn_server(self, turtle_name, x, y, theta):
client = self.create_client(Spawn, "spawn")
while not client.wait_for_service(1.0):
self.get_logger().warn("Waiting for Server...")
request = Spawn.Request()
request.x = x
request.y = y
request.theta = theta
request.name = turtle_name
future = client.call_async(request)
future.add_done_callback(
partial(self.callback_call_spawn, turtle_name=turtle_name, x=x, y=y, theta=theta))
def callback_call_spawn(self, future, turtle_name, x, y, theta):
try:
response = future.result()
if response.name != "":
self.get_logger().info("Turtle " + response.name + " is now alive")
new_turtle = Turtle()
new_turtle.name = response.name
new_turtle.x = x
new_turtle.y = y
new_turtle.theta = theta
self.alive_turtles_.append(new_turtle)
self.publish_alive_turtles()
except Exception as e:
self.get_logger().error("Service call failed %r" % (e,))
def call_kill_server(self, turtle_name):
client = self.create_client(Kill, "kill")
while not client.wait_for_service(1.0):
self.get_logger().warn("Waiting for Server...")
request = Kill.Request()
request.name = turtle_name
future = client.call_async(request)
future.add_done_callback(
partial(self.callback_call_kill, turtle_name=turtle_name))
def callback_call_kill(self, future, turtle_name):
try:
future.result()
for (i, turtle) in enumerate(self.alive_turtles_):
if turtle.name == turtle_name:
del self.alive_turtles_[i]
self.publish_alive_turtles()
break
except Exception as e:
self.get_logger().error("Service call failed %r" % (e,))
def main(args=None):
rclpy.init(args=args)
node = TurtleSpawner()
rclpy.spin(node)
rclpy.shutdown()
if __name__ == "__main__":
main()