Skip to content

Commit

Permalink
Have node_helpers more closely follow the template
Browse files Browse the repository at this point in the history
  • Loading branch information
apockill committed Dec 8, 2024
1 parent 6f50797 commit 5b84222
Show file tree
Hide file tree
Showing 3 changed files with 68 additions and 0 deletions.
2 changes: 2 additions & 0 deletions pkgs/node_helpers/node_helpers/nodes/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,11 @@
TransformsFile,
)
from .sound_player import SoundPlayer
from .node_helpers_node import ExampleNode

__all__ = [
"HelpfulNode",
"ExampleNode",
"InteractiveTransformPublisher",
"TransformModel",
"TransformsFile",
Expand Down
65 changes: 65 additions & 0 deletions pkgs/node_helpers/node_helpers/nodes/node_helpers_node.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
"""This is a total example node to show off some simple node_helpers features. It's not
meant to be a comprehensive example, but rather a simple one to show off some of the
features of the node_helpers package.
"""

from typing import Any

from sensor_msgs.msg import JointState

from node_helpers.nodes import HelpfulNode
from pydantic import BaseModel
from node_helpers.spinning import create_spin_function
from rclpy.qos import qos_profile_services_default

from node_helpers.example_urdf import ForkliftURDF

class ExampleNode(HelpfulNode):

class Parameters(BaseModel):
# Define your ROS parameters here
forklift_speed: float # m/s
forklift_max_extent: float

def __init__(self, **kwargs: Any):
super().__init__("ExampleNode", **kwargs)
# Load parameters from the ROS parameter server
self.params = self.declare_from_pydantic_model(self.Parameters, "root_config")
self.urdf = ForkliftURDF.with_namespace(self.get_namespace())

# Track the forks position and direction, so we can move them up and down
self.forklift_position = 0.0
self.forklift_direction = 1

# Create publishers
self.joint_state_publisher = self.create_publisher(
JointState,
"desired_joint_states",
qos_profile_services_default
)

# Create a timer to publish joint values
self._publish_hz = 20
self.create_timer(1 / self._publish_hz, self.on_publish_joints)


def on_publish_joints(self) -> None:
if self.forklift_position >= self.params.forklift_max_extent:
self.forklift_direction = -1
elif self.forklift_position <= 0:
self.forklift_direction = 1

self.forklift_position += self.forklift_direction * self.params.forklift_speed / self._publish_hz

joint_positions = {
self.urdf.joints.FORKS: self.forklift_position
}

self.joint_state_publisher.publish(
JointState(
name=list(joint_positions.keys()),
position = list(joint_positions.values())
)
)

main = create_spin_function(ExampleNode)
1 change: 1 addition & 0 deletions pkgs/node_helpers/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ transforms3d = "^0.4.2"
interactive_transform_publisher = "node_helpers.nodes.interactive_transform_publisher:main"
sound_player = "node_helpers.nodes.sound_player:main"
placeholder = "node_helpers.nodes.placeholder:main"
run_ExampleNode = "node_helpers.nodes.node_helpers_node:main"

[tool.colcon-poetry-ros.data-files]
"share/ament_index/resource_index/packages" = ["resource/node_helpers"]
Expand Down

0 comments on commit 5b84222

Please sign in to comment.