Skip to content

Commit

Permalink
/clock publisher in Player (ros2#695)
Browse files Browse the repository at this point in the history
* Add /clock publishing to Player

Signed-off-by: Emerson Knapp <eknapp@amazon.com>
  • Loading branch information
emersonknapp authored and Marcel Zeilinger committed Apr 9, 2021
1 parent 942564d commit ca5c1e5
Show file tree
Hide file tree
Showing 9 changed files with 173 additions and 2 deletions.
12 changes: 12 additions & 0 deletions ros2bag/ros2bag/verb/play.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,13 @@
import yaml


def positive_float(arg: str) -> float:
value = float(arg)
if value <= 0:
raise ValueError(f'Value {value} is less than or equal to zero.')
return value


class PlayVerb(VerbExtension):
"""Play back ROS data from a bag."""

Expand Down Expand Up @@ -71,6 +78,10 @@ def add_arguments(self, parser, cli_name): # noqa: D102
' pragmas: [\"<setting_name>\" = <setting_value>]'
'Note that applicable settings are limited to read-only for ros2 bag play.'
'For a list of sqlite3 settings, refer to sqlite3 documentation')
parser.add_argument(
'--clock', type=positive_float, nargs='?', const=40, default=0,
help='Publish to /clock at a specific frequency in Hz, to act as a ROS Time Source. '
'Value must be positive. Defaults to not publishing.')

def main(self, *, args): # noqa: D102
qos_profile_overrides = {} # Specify a valid default
Expand Down Expand Up @@ -104,6 +115,7 @@ def main(self, *, args): # noqa: D102
play_options.topic_qos_profile_overrides = qos_profile_overrides
play_options.loop = args.loop
play_options.topic_remapping_options = topic_remapping
play_options.clock_publish_frequency = args.clock

player = Player()
player.play(storage_options, play_options)
1 change: 1 addition & 0 deletions rosbag2_py/src/rosbag2_py/_transport.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,7 @@ PYBIND11_MODULE(_transport, m) {
&PlayOptions::setTopicQoSProfileOverrides)
.def_readwrite("loop", &PlayOptions::loop)
.def_readwrite("topic_remapping_options", &PlayOptions::topic_remapping_options)
.def_readwrite("clock_publish_frequency", &PlayOptions::clock_publish_frequency)
;

py::class_<RecordOptions>(m, "RecordOptions")
Expand Down
6 changes: 6 additions & 0 deletions rosbag2_transport/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,12 @@ function(create_tests_for_rmw_implementation)
${SKIP_TEST}
LINK_LIBS rosbag2_transport
AMENT_DEPS rosbag2_cpp rosbag2_storage test_msgs rosbag2_test_common)

rosbag2_transport_add_gmock(test_play_publish_clock
test/rosbag2_transport/test_play_publish_clock.cpp
${SKIP_TEST}
LINK_LIBS rosbag2_transport
AMENT_DEPS rosbag2_cpp rosbag2_storage test_msgs rosbag2_test_common)
endfunction()

if(BUILD_TESTING)
Expand Down
4 changes: 4 additions & 0 deletions rosbag2_transport/include/rosbag2_transport/play_options.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@ struct PlayOptions
std::unordered_map<std::string, rclcpp::QoS> topic_qos_profile_overrides = {};
bool loop = false;
std::vector<std::string> topic_remapping_options = {};

// Rate in Hz at which to publish to /clock.
// 0 (or negative) means that no publisher will be created
double clock_publish_frequency = 0.0;
};

} // namespace rosbag2_transport
Expand Down
16 changes: 16 additions & 0 deletions rosbag2_transport/src/rosbag2_transport/player.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,22 @@ void Player::prepare_clock(const PlayOptions & options, rcutils_time_point_value
{
double rate = options.rate > 0.0 ? options.rate : 1.0;
clock_ = std::make_unique<rosbag2_cpp::TimeControllerClock>(starting_time, rate);

// Create /clock publisher
if (options.clock_publish_frequency > 0.f) {
const auto publish_period = std::chrono::nanoseconds(
static_cast<uint64_t>(RCUTILS_S_TO_NS(1) / options.clock_publish_frequency));
// NOTE: PlayerClock does not own this publisher because rosbag2_cpp
// should not own transport-based functionality
clock_publisher_ = transport_node_->create_publisher<rosgraph_msgs::msg::Clock>(
"/clock", rclcpp::ClockQoS());
clock_publish_timer_ = transport_node_->create_wall_timer(
publish_period, [this]() {
auto msg = rosgraph_msgs::msg::Clock();
msg.clock = rclcpp::Time(clock_->now());
clock_publisher_->publish(msg);
});
}
}

} // namespace rosbag2_transport
4 changes: 4 additions & 0 deletions rosbag2_transport/src/rosbag2_transport/player.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,13 @@
#include "moodycamel/readerwriterqueue.h"

#include "rclcpp/node.hpp"
#include "rclcpp/publisher.hpp"
#include "rclcpp/qos.hpp"

#include "rosbag2_cpp/clocks/player_clock.hpp"
#include "rosbag2_storage/serialized_bag_message.hpp"
#include "rosbag2_transport/play_options.hpp"
#include "rosgraph_msgs/msg/clock.hpp"

namespace rosbag2_cpp
{
Expand Down Expand Up @@ -70,6 +72,8 @@ class Player
std::unordered_map<std::string, std::shared_ptr<rclcpp::GenericPublisher>> publishers_;
std::unordered_map<std::string, rclcpp::QoS> topic_qos_profile_overrides_;
std::unique_ptr<rosbag2_cpp::PlayerClock> clock_;
rclcpp::Publisher<rosgraph_msgs::msg::Clock>::SharedPtr clock_publisher_;
std::shared_ptr<rclcpp::TimerBase> clock_publish_timer_;
};

} // namespace rosbag2_transport
Expand Down
16 changes: 14 additions & 2 deletions rosbag2_transport/src/rosbag2_transport/rosbag2_transport.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

#include "rclcpp/rclcpp.hpp"

#include "rcpputils/scope_exit.hpp"
#include "rcutils/time.h"

#include "rosbag2_cpp/reader.hpp"
Expand Down Expand Up @@ -89,9 +90,20 @@ std::shared_ptr<rclcpp::Node> Rosbag2Transport::setup_node(
void Rosbag2Transport::play(
const rosbag2_storage::StorageOptions & storage_options, const PlayOptions & play_options)
{
auto transport_node =
setup_node(play_options.node_prefix, play_options.topic_remapping_options);
rclcpp::executors::SingleThreadedExecutor exec;
exec.add_node(transport_node);
auto spin_thread = std::thread(
[&exec]() {
exec.spin();
});
auto exit = rcpputils::scope_exit(
[&exec, &spin_thread]() {
exec.cancel();
spin_thread.join();
});
try {
auto transport_node =
setup_node(play_options.node_prefix, play_options.topic_remapping_options);
Player player(reader_, transport_node);
do {
reader_->open(storage_options, {"", rmw_get_serialization_format()});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

#include <memory>

#include "rosbag2_test_common/subscription_manager.hpp"
#include "rosbag2_transport_test_fixture.hpp"

class RosBag2PlayTestFixture : public Rosbag2TransportTestFixture
Expand Down
115 changes: 115 additions & 0 deletions rosbag2_transport/test/rosbag2_transport/test_play_publish_clock.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
// Copyright 2021 Amazon.com, Inc. or its affiliates. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include <gmock/gmock.h>

#include <memory>
#include <utility>
#include <vector>

#include "rosbag2_transport/rosbag2_transport.hpp"
#include "rosgraph_msgs/msg/clock.hpp"
#include "test_msgs/message_fixtures.hpp"

#include "rosbag2_play_test_fixture.hpp"

using namespace ::testing; // NOLINT

namespace
{
rcutils_duration_value_t period_for_frequency(double frequency)
{
return static_cast<rcutils_duration_value_t>(RCUTILS_S_TO_NS(1) / frequency);
}
} // namespace

class ClockPublishFixture : public RosBag2PlayTestFixture
{
public:
ClockPublishFixture()
: RosBag2PlayTestFixture()
{
// Fake bag setup
auto topic_types = std::vector<rosbag2_storage::TopicMetadata>{
{"topic1", "test_msgs/BasicTypes", "", ""},
};

std::vector<std::shared_ptr<rosbag2_storage::SerializedBagMessage>> messages;
for (size_t i = 0; i < messages_to_play_; i++) {
auto message = get_messages_basic_types()[0];
message->int32_value = static_cast<int32_t>(i);
messages.push_back(
serialize_test_message("topic1", milliseconds_between_messages_ * i, message));
}

// Player setup
auto prepared_mock_reader = std::make_unique<MockSequentialReader>();
prepared_mock_reader->prepare(messages, topic_types);
reader_ = std::make_unique<rosbag2_cpp::Reader>(std::move(prepared_mock_reader));

sub_->add_subscription<rosgraph_msgs::msg::Clock>(
"/clock", expected_clock_messages_, rclcpp::ClockQoS());
}

void run_test()
{
auto await_received_messages = sub_->spin_subscriptions();
rosbag2_transport::Rosbag2Transport rosbag2_transport(reader_, writer_);
rosbag2_transport.play(storage_options_, play_options_);
await_received_messages.get();

// Check that we got enough messages
auto received_clock = sub_->get_received_messages<rosgraph_msgs::msg::Clock>("/clock");
EXPECT_THAT(received_clock, SizeIs(Ge(expected_clock_messages_)));

// Check time deltas between messages
const auto expect_clock_delta =
period_for_frequency(play_options_.clock_publish_frequency) * play_options_.rate;
const auto allowed_error = static_cast<rcutils_duration_value_t>(
expect_clock_delta * error_tolerance_);
// On Windows, publishing seems to take time to "warm up", ignore the first couple messages
const auto start_message = 2;
for (size_t i = start_message; i < expected_clock_messages_ - 1; i++) {
auto current = rclcpp::Time(received_clock[i]->clock).nanoseconds();
auto next = rclcpp::Time(received_clock[i + 1]->clock).nanoseconds();
auto delta = next - current;
auto error = std::abs(delta - expect_clock_delta);
EXPECT_LE(error, allowed_error) << "Message was too far from next: " << i;
}
}

// Number of bag messages to publish
size_t messages_to_play_ = 10;
const int64_t milliseconds_between_messages_ = 50;

// Amount of error allowed between expected and actual clock deltas (expected to be much smaller)
const double error_tolerance_ = 0.3;

// Wait for just a few clock messages, we're checking the time between them, not the total count
size_t expected_clock_messages_ = 6;
};

TEST_F(ClockPublishFixture, clock_is_published_at_chosen_frequency)
{
play_options_.clock_publish_frequency = 20;
run_test();
}

TEST_F(ClockPublishFixture, clock_respects_playback_rate)
{
play_options_.clock_publish_frequency = 20;
play_options_.rate = 0.5;
messages_to_play_ = 5;
run_test();
}

0 comments on commit ca5c1e5

Please sign in to comment.