Skip to content

Commit

Permalink
reorganized arm function
Browse files Browse the repository at this point in the history
  • Loading branch information
neuenfeldttj committed Apr 17, 2024
1 parent b75e258 commit 5891cb1
Showing 1 changed file with 140 additions and 151 deletions.
291 changes: 140 additions & 151 deletions src/teleoperation/backend/consumers.py
Original file line number Diff line number Diff line change
Expand Up @@ -250,167 +250,156 @@ def to_velocity(self, input: int, joint_name: str, brushless: bool = True) -> fl
* (self.brushed_motors[joint_name]["max_velocity"] - self.brushed_motors[joint_name]["min_velocity"])
/ 2
)

def publish_ik(self, axes):
left_trigger = self.filter_xbox_axis(axes[self.xbox_mappings["left_trigger"]])
right_trigger = self.filter_xbox_axis(axes[self.xbox_mappings["right_trigger"]])
ee_in_map = SE3.from_tf_tree(self.tf_buffer, "base_link", "arm_e_link")

ee_in_map.position[0] += (
self.ik_names["x"] * self.filter_xbox_axis(axes[self.xbox_mappings["left_js_x"]]),
)
ee_in_map.position[1] += (
self.ik_names["y"] * self.filter_xbox_axis(axes[self.xbox_mappings["left_js_y"]]),
)

ee_in_map.position[2] += self.ik_names["z"] * (left_trigger - right_trigger)

arm_ik_cmd = IK(target=PoseStamped(
header=Header(stamp=rospy.Time.now(), frame_id="base_link"),
pose=Pose(
position=Point(*ee_in_map.position),
orientation=Quaternion(*ee_in_map.rotation.quaternion),
)
)
)
self.send(text_data=json.dumps({
"type": "ik",
"target": {
"position": ee_in_map.position.tolist(),
"quaternion": ee_in_map.rotation.quaternion.tolist()
}
}))
self.arm_ik_pub.publish(arm_ik_cmd)

def publish_position(self, type, names, positions):
position_cmd = Position(
names=names,
positions=positions,
)
if type == "arm_values":
self.arm_position_cmd_pub.publish(position_cmd)
elif type == "sa_arm_values":
self.sa_position_cmd_pub.publish(position_cmd)
elif type == "cache_values":
self.cache_position_cmd_pub.publish(position_cmd)

def publish_velocity(self, type, names, axes, buttons):
left_trigger = self.filter_xbox_axis(axes[self.xbox_mappings["left_trigger"]])
right_trigger = self.filter_xbox_axis(axes[self.xbox_mappings["right_trigger"]])
velocity_cmd = Velocity()
velocity_cmd.names = names
if type == "cache_values":
velocity_cmd.velocities = [
self.sa_config["cache"]["multiplier"]
* self.filter_xbox_button(buttons, "right_bumper", "left_bumper")
]
self.cache_velocity_cmd_pub.publish(velocity_cmd)
elif type == "sa_arm_values":
velocity_cmd.velocities = [
self.to_velocity(
self.filter_xbox_axis(axes[self.sa_config["sa_x"]["xbox_index"]]), "sa_x", False
),
self.to_velocity(
self.filter_xbox_axis(axes[self.sa_config["sa_y"]["xbox_index"]]), "sa_y", False
),
self.to_velocity(
self.filter_xbox_axis(axes[self.sa_config["sa_z"]["xbox_index"]]), "sa_z", True
),
self.sa_config["sampler"]["multiplier"] * (right_trigger - left_trigger),
self.sa_config["sensor_actuator"]["multiplier"]
* self.filter_xbox_button(buttons, "right_bumper", "left_bumper"),
]
self.sa_velocity_cmd_pub.publish(velocity_cmd)
elif type == "arm_values":
velocity_cmd.velocities = [
self.to_velocity(
self.filter_xbox_axis(axes[self.ra_config["joint_a"]["xbox_index"]]), "joint_a"
),
self.to_velocity(
self.filter_xbox_axis(axes[self.ra_config["joint_b"]["xbox_index"]]), "joint_b", False
),
self.to_velocity(
self.filter_xbox_axis(axes[self.ra_config["joint_c"]["xbox_index"]]), "joint_c"
),
self.to_velocity(
self.filter_xbox_axis(axes[self.ra_config["joint_de_pitch"]["xbox_index"]]), "joint_de_0"
),
self.to_velocity(
self.filter_xbox_axis(axes[self.ra_config["joint_de_roll"]["xbox_index"]]), "joint_de_1"
),
self.ra_config["allen_key"]["multiplier"] * self.filter_xbox_button(buttons, "y", "a"),
self.ra_config["gripper"]["multiplier"] * self.filter_xbox_button(buttons, "b", "x"),
]
self.arm_velocity_cmd_pub.publish(velocity_cmd)

def publish_throttle(self, type, names, axes, buttons):
left_trigger = self.filter_xbox_axis(axes[self.xbox_mappings["left_trigger"]])
right_trigger = self.filter_xbox_axis(axes[self.xbox_mappings["right_trigger"]])
throttle_cmd = Throttle()
throttle_cmd.names = names
if type == "cache_values":
throttle_cmd.throttles = [
self.sa_config["cache"]["multiplier"]
* self.filter_xbox_button(buttons, "right_bumper", "left_bumper")
]
self.cache_throttle_cmd_pub.publish(throttle_cmd)
elif type == "arm_values":
throttle_cmd.throttles = [
self.filter_xbox_axis(axes[self.ra_config["joint_a"]["xbox_index"]]),
self.filter_xbox_axis(axes[self.ra_config["joint_b"]["xbox_index"]]),
self.filter_xbox_axis(axes[self.ra_config["joint_c"]["xbox_index"]]),
self.filter_xbox_axis(
axes[self.ra_config["joint_de_pitch"]["xbox_index_right"]]
- axes[self.ra_config["joint_de_pitch"]["xbox_index_left"]]
),
self.filter_xbox_axis(
buttons[self.ra_config["joint_de_roll"]["xbox_index_right"]]
- buttons[self.ra_config["joint_de_roll"]["xbox_index_left"]]
),
self.ra_config["allen_key"]["multiplier"] * self.filter_xbox_button(buttons, "y", "a"),
self.ra_config["gripper"]["multiplier"] * self.filter_xbox_button(buttons, "b", "x"),
]
self.arm_throttle_cmd_pub.publish(throttle_cmd)
elif type == "sa_arm_values":
throttle_cmd.throttles = [
self.filter_xbox_axis(axes[self.sa_config["sa_x"]["xbox_index"]]),
self.filter_xbox_axis(axes[self.sa_config["sa_y"]["xbox_index"]]),
self.filter_xbox_axis(axes[self.sa_config["sa_z"]["xbox_index"]]),
self.sa_config["sampler"]["multiplier"] * (right_trigger - left_trigger),
self.sa_config["sensor_actuator"]["multiplier"]
* self.filter_xbox_button(buttons, "right_bumper", "left_bumper"),
]
self.sa_throttle_cmd_pub.publish(throttle_cmd)

def handle_controls_message(self, msg):
CACHE = ["cache"]
SA_NAMES = ["sa_x", "sa_y", "sa_z", "sampler", "sensor_actuator"]
RA_NAMES = self.RA_NAMES
ra_slow_mode = False
left_trigger = self.filter_xbox_axis(msg["axes"][self.xbox_mappings["left_trigger"]])
right_trigger = self.filter_xbox_axis(msg["axes"][self.xbox_mappings["right_trigger"]])
arm_pubs = [self.arm_position_cmd_pub, self.arm_velocity_cmd_pub, self.arm_throttle_cmd_pub, self.arm_ik_pub]
sa_pubs = [self.sa_position_cmd_pub, self.sa_velocity_cmd_pub, self.sa_throttle_cmd_pub]
cache_pubs = [self.cache_position_cmd_pub, self.cache_velocity_cmd_pub, self.cache_throttle_cmd_pub]
publishers = []
controls_names = []
if msg["type"] == "cache_values":
controls_names = CACHE
publishers = cache_pubs
elif msg["type"] == "arm_values":
controls_names = RA_NAMES
publishers = arm_pubs
elif msg["type"] == "sa_arm_values":
controls_names = SA_NAMES
publishers = sa_pubs
names = ["joint_b", "joint_c", "joint_de_pitch", "joint_de_roll"]
if msg["type"] == "sa_arm_values":
names = ["sa_x", "sa_y", "sa_z", "sampler", "sensor_actuator"]
elif msg["type"] == "cache_values":
names = ["cache"]

if msg["arm_mode"] == "ik":
ee_in_map = SE3.from_tf_tree(self.tf_buffer, "base_link", "arm_e_link")

ee_in_map.position[0] += (
self.ik_names["x"] * self.filter_xbox_axis(msg["axes"][self.xbox_mappings["left_js_x"]]),
)
ee_in_map.position[1] += (
self.ik_names["y"] * self.filter_xbox_axis(msg["axes"][self.xbox_mappings["left_js_y"]]),
)
ee_in_map.position[2] += self.ik_names["z"] * (left_trigger - right_trigger)

arm_ik_cmd = IK(target=PoseStamped(
header=Header(stamp=rospy.Time.now(), frame_id="base_link"),
pose=Pose(
position=Point(*ee_in_map.position),
orientation=Quaternion(*ee_in_map.rotation.quaternion),
)
)
)
self.send(text_data=json.dumps({
"type": "ik",
"target": {
"position": ee_in_map.position.tolist(),
"quaternion": ee_in_map.rotation.quaternion.tolist()
}
}))
publishers[3].publish(arm_ik_cmd)
self.publish_ik(axes=msg["axes"])

elif msg["arm_mode"] == "position":
position_names = controls_names
if msg["type"] == "arm_values":
position_names = ["joint_b", "joint_c", "joint_de_pitch", "joint_de_roll"]
position_cmd = Position(
names=position_names,
positions=msg["positions"],
)
publishers[0].publish(position_cmd)
self.publish_position(type=msg["type"], names=names, positions=msg["positions"])

elif msg["arm_mode"] == "velocity":
velocity_cmd = Velocity()
velocity_cmd.names = controls_names
if msg["type"] == "cache_values":
velocity_cmd.velocities = [
self.sa_config["cache"]["multiplier"]
* self.filter_xbox_button(msg["buttons"], "right_bumper", "left_bumper")
]
elif msg["type"] == "sa_arm_values":
velocity_cmd.velocities = [
self.to_velocity(
self.filter_xbox_axis(msg["axes"][self.sa_config["sa_x"]["xbox_index"]]), "sa_x", False
),
self.to_velocity(
self.filter_xbox_axis(msg["axes"][self.sa_config["sa_y"]["xbox_index"]]), "sa_y", False
),
self.to_velocity(
self.filter_xbox_axis(msg["axes"][self.sa_config["sa_z"]["xbox_index"]]), "sa_z", True
),
self.sa_config["sampler"]["multiplier"] * (right_trigger - left_trigger),
self.sa_config["sensor_actuator"]["multiplier"]
* self.filter_xbox_button(msg["buttons"], "right_bumper", "left_bumper"),
]
elif msg["type"] == "arm_values":
velocity_cmd.velocities = [
self.to_velocity(
self.filter_xbox_axis(msg["axes"][self.ra_config["joint_a"]["xbox_index"]]), "joint_a"
),
self.to_velocity(
self.filter_xbox_axis(msg["axes"][self.ra_config["joint_b"]["xbox_index"]]), "joint_b", False
),
self.to_velocity(
self.filter_xbox_axis(msg["axes"][self.ra_config["joint_c"]["xbox_index"]]), "joint_c"
),
self.to_velocity(
self.filter_xbox_axis(msg["axes"][self.ra_config["joint_de_pitch"]["xbox_index"]]), "joint_de_0"
),
self.to_velocity(
self.filter_xbox_axis(msg["axes"][self.ra_config["joint_de_roll"]["xbox_index"]]), "joint_de_1"
),
self.ra_config["allen_key"]["multiplier"] * self.filter_xbox_button(msg["buttons"], "y", "a"),
self.ra_config["gripper"]["multiplier"] * self.filter_xbox_button(msg["buttons"], "b", "x"),
]
publishers[1].publish(velocity_cmd)
self.publish_velocity(type=msg["type"], names=names, axes=msg["axes"], buttons=msg["buttons"])

elif msg["arm_mode"] == "throttle":
throttle_cmd = Throttle()
throttle_cmd.names = controls_names
if msg["type"] == "cache_values":
throttle_cmd.throttles = [
self.sa_config["cache"]["multiplier"]
* self.filter_xbox_button(msg["buttons"], "right_bumper", "left_bumper")
]
elif msg["type"] == "arm_values":
# print(msg["buttons"])

# d_pad_x = msg["axes"][self.xbox_mappings["d_pad_x"]]
# if d_pad_x > 0.5:
# ra_slow_mode = True
# elif d_pad_x < -0.5:
# ra_slow_mode = False

throttle_cmd.throttles = [
self.filter_xbox_axis(msg["axes"][self.ra_config["joint_a"]["xbox_index"]]),
self.filter_xbox_axis(msg["axes"][self.ra_config["joint_b"]["xbox_index"]]),
self.filter_xbox_axis(msg["axes"][self.ra_config["joint_c"]["xbox_index"]]),
self.filter_xbox_axis(
msg["axes"][self.ra_config["joint_de_pitch"]["xbox_index_right"]]
- msg["axes"][self.ra_config["joint_de_pitch"]["xbox_index_left"]]
),
self.filter_xbox_axis(
msg["buttons"][self.ra_config["joint_de_roll"]["xbox_index_right"]]
- msg["buttons"][self.ra_config["joint_de_roll"]["xbox_index_left"]]
),
self.ra_config["allen_key"]["multiplier"] * self.filter_xbox_button(msg["buttons"], "y", "a"),
self.ra_config["gripper"]["multiplier"] * self.filter_xbox_button(msg["buttons"], "b", "x"),
]

for i, name in enumerate(self.RA_NAMES):
if ra_slow_mode:
throttle_cmd.throttles[i] *= self.ra_config[name]["slow_mode_multiplier"]
if self.ra_config[name]["invert"]:
throttle_cmd.throttles[i] *= -1
elif msg["type"] == "sa_arm_values":
throttle_cmd.throttles = [
self.filter_xbox_axis(msg["axes"][self.sa_config["sa_x"]["xbox_index"]]),
self.filter_xbox_axis(msg["axes"][self.sa_config["sa_y"]["xbox_index"]]),
self.filter_xbox_axis(msg["axes"][self.sa_config["sa_z"]["xbox_index"]]),
self.sa_config["sampler"]["multiplier"] * (right_trigger - left_trigger),
self.sa_config["sensor_actuator"]["multiplier"]
* self.filter_xbox_button(msg["buttons"], "right_bumper", "left_bumper"),
]

fast_mode_activated = msg["buttons"][self.xbox_mappings["a"]] or msg["buttons"][self.xbox_mappings["b"]]
if not fast_mode_activated:
for i, name in enumerate(SA_NAMES):
# When going up (vel > 0) with SA joint 2, we DON'T want slow mode.
if not (name == "sa_y" and throttle_cmd.throttles[i] > 0):
throttle_cmd.throttles[i] *= self.sa_config[name]["slow_mode_multiplier"]
publishers[2].publish(throttle_cmd)
self.publish_throttle(type=msg["type"], names=names, axes=msg["axes"], buttons=msg["buttons"])

def handle_joystick_message(self, msg):
# Tiny deadzone so we can safely e-stop with dampen switch
Expand Down

0 comments on commit 5891cb1

Please sign in to comment.