Skip to content

Commit

Permalink
Clock publisher in Player
Browse files Browse the repository at this point in the history
Add a `rosgraph_msgs/Clock` publisher to the `Player` - that uses `PlayerClock::now()` to publish current time.

Signed-off-by: Emerson Knapp <eknapp@amazon.com>
  • Loading branch information
Emerson Knapp committed Apr 2, 2021
1 parent daadf5b commit e69696b
Show file tree
Hide file tree
Showing 9 changed files with 69 additions and 14 deletions.
11 changes: 11 additions & 0 deletions ros2bag/ros2bag/verb/play.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,12 @@
from rosbag2_py import StorageOptions
import yaml

def positive_float(arg: str) -> float:
value = float(arg)
if value <= 0:
raise ValueError(f'Value {value} is less than or equal to zero.')
return value


class PlayVerb(VerbExtension):
"""Play back ROS data from a bag."""
Expand Down Expand Up @@ -71,6 +77,10 @@ def add_arguments(self, parser, cli_name): # noqa: D102
' pragmas: [\"<setting_name>\" = <setting_value>]'
'Note that applicable settings are limited to read-only for ros2 bag play.'
'For a list of sqlite3 settings, refer to sqlite3 documentation')
parser.add_argument(
'--clock', type=positive_float, default=0,
help='Publish to /clock at a specific frequency in Hz, to act as a ROS Time Source. '
'Value must be positive. Defaults to not publishing.')

def main(self, *, args): # noqa: D102
qos_profile_overrides = {} # Specify a valid default
Expand Down Expand Up @@ -104,6 +114,7 @@ def main(self, *, args): # noqa: D102
play_options.topic_qos_profile_overrides = qos_profile_overrides
play_options.loop = args.loop
play_options.topic_remapping_options = topic_remapping
play_options.clock_publish_frequency = args.clock

player = Player()
player.play(storage_options, play_options)
5 changes: 3 additions & 2 deletions rosbag2_cpp/include/rosbag2_cpp/clocks/player_clock.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,10 @@ class PlayerClock
/**
* Try to sleep (non-busy) the current thread until the provided time is reached - according to this Clock
*
* Return true if the time has been reached, false if it was not successfully reached after sleeping
* for the appropriate duration.
* The user should not take action based on this sleep until it returns true.
*
* \param until: The ROS time to sleep until
* \return true if the `until` has been reached, false if timeout or awakened early.
*/
ROSBAG2_CPP_PUBLIC
virtual bool sleep_until(rcutils_time_point_value_t until) = 0;
Expand Down
10 changes: 8 additions & 2 deletions rosbag2_cpp/include/rosbag2_cpp/clocks/time_controller_clock.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@ class TimeControllerClock : public PlayerClock
*
* \param starting_time: provides an initial offset for managing time
* This will likely be the timestamp of the first message in the bag
* \param sleep_timeout: maximum amount of time to sleep even if time is not reached
* This helps dictate spin rate when clock is paused, as well as allowing periodic return
* of control even if messages are far apart.
* Value <= 0 means that sleep_until could theoretically wait forever without returning.
* \param rate: Rate of playback, a unit-free ratio. 1.0 is real-time
* \param now_fn: Function used to get the current steady time
* defaults to std::chrono::steady_clock::now
Expand All @@ -45,6 +49,7 @@ class TimeControllerClock : public PlayerClock
ROSBAG2_CPP_PUBLIC
TimeControllerClock(
rcutils_time_point_value_t starting_time,
std::chrono::nanoseconds sleep_timeout,
double rate = 1.0,
NowFunction now_fn = std::chrono::steady_clock::now);

Expand All @@ -60,9 +65,10 @@ class TimeControllerClock : public PlayerClock
/**
* Try to sleep (non-busy) the current thread until the provided time is reached - according to this Clock
*
* Return true if the time has been reached, false if it was not successfully reached after sleeping
* for the appropriate duration.
* The user should not take action based on this sleep until it returns true.
*
* \param until: The ROS time to sleep until
* \return true if the `until` has been reached, false if timeout or awakened early.
*/
ROSBAG2_CPP_PUBLIC
bool sleep_until(rcutils_time_point_value_t until) override;
Expand Down
11 changes: 9 additions & 2 deletions rosbag2_cpp/src/rosbag2_cpp/clocks/time_controller_clock.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,19 +62,22 @@ class TimeControllerClockImpl

std::mutex mutex;
std::condition_variable cv;
std::chrono::nanoseconds sleep_timeout;
PlayerClock::NowFunction now_fn RCPPUTILS_TSA_GUARDED_BY(mutex);
double rate RCPPUTILS_TSA_GUARDED_BY(mutex) = 1.0;
TimeReference reference RCPPUTILS_TSA_GUARDED_BY(mutex);
};

TimeControllerClock::TimeControllerClock(
rcutils_time_point_value_t starting_time,
std::chrono::nanoseconds sleep_timeout,
double rate,
NowFunction now_fn)
: impl_(std::make_unique<TimeControllerClockImpl>())
{
std::lock_guard<std::mutex> lock(impl_->mutex);
impl_->now_fn = now_fn;
impl_->sleep_timeout = sleep_timeout;
impl_->reference.ros = starting_time;
impl_->reference.steady = impl_->now_fn();
impl_->rate = rate;
Expand All @@ -92,8 +95,12 @@ rcutils_time_point_value_t TimeControllerClock::now() const
bool TimeControllerClock::sleep_until(rcutils_time_point_value_t until)
{
{
std::unique_lock<std::mutex> lock(impl_->mutex);
const auto steady_until = impl_->ros_to_steady(until);
std::lock_guard<std::mutex> lock(impl_->mutex);
SteadyTimePoint until_timeout = impl_->now_fn() + impl_->sleep_timeout;
steady_until = impl_->ros_to_steady(until);
if (until_timeout < steady_until) {
steady_until = until_timeout;
}
impl_->cv.wait_until(lock, steady_until);
}
return now() >= until;
Expand Down
9 changes: 5 additions & 4 deletions rosbag2_cpp/test/rosbag2_cpp/test_time_controller_clock.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,14 @@ class TimeControllerClockTest : public Test

NowFunction now_fn;
SteadyTimePoint return_time; // defaults to 0
std::chrono::nanoseconds sleep_timeout{25000000}; // 40Hz
rcutils_time_point_value_t ros_start_time = 0;
};

TEST_F(TimeControllerClockTest, steadytime_precision)
{
const double playback_rate = 1.0;
rosbag2_cpp::TimeControllerClock pclock(ros_start_time, playback_rate, now_fn);
rosbag2_cpp::TimeControllerClock pclock(ros_start_time, sleep_timeout, playback_rate, now_fn);

const SteadyTimePoint begin_time(std::chrono::seconds(0));
return_time = begin_time;
Expand Down Expand Up @@ -75,7 +76,7 @@ TEST_F(TimeControllerClockTest, nonzero_start_time)
{
ros_start_time = 1234567890LL;
const double playback_rate = 1.0;
rosbag2_cpp::TimeControllerClock pclock(ros_start_time, playback_rate, now_fn);
rosbag2_cpp::TimeControllerClock pclock(ros_start_time, sleep_timeout, playback_rate, now_fn);

const SteadyTimePoint begin_time(std::chrono::seconds(0));
return_time = begin_time;
Expand All @@ -88,7 +89,7 @@ TEST_F(TimeControllerClockTest, nonzero_start_time)
TEST_F(TimeControllerClockTest, fast_rate)
{
const double playback_rate = 2.5;
rosbag2_cpp::TimeControllerClock pclock(ros_start_time, playback_rate, now_fn);
rosbag2_cpp::TimeControllerClock pclock(ros_start_time, sleep_timeout, playback_rate, now_fn);

const SteadyTimePoint begin_time(std::chrono::seconds(0));
return_time = begin_time;
Expand All @@ -102,7 +103,7 @@ TEST_F(TimeControllerClockTest, fast_rate)
TEST_F(TimeControllerClockTest, slow_rate)
{
const double playback_rate = 0.4;
rosbag2_cpp::TimeControllerClock pclock(ros_start_time, playback_rate, now_fn);
rosbag2_cpp::TimeControllerClock pclock(ros_start_time, sleep_timeout, playback_rate, now_fn);

const SteadyTimePoint begin_time(std::chrono::seconds(0));
return_time = begin_time;
Expand Down
1 change: 1 addition & 0 deletions rosbag2_py/src/rosbag2_py/_transport.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,7 @@ PYBIND11_MODULE(_transport, m) {
&PlayOptions::setTopicQoSProfileOverrides)
.def_readwrite("loop", &PlayOptions::loop)
.def_readwrite("topic_remapping_options", &PlayOptions::topic_remapping_options)
.def_readwrite("clock_publish_frequency", &PlayOptions::clock_publish_frequency)
;

py::class_<RecordOptions>(m, "RecordOptions")
Expand Down
4 changes: 4 additions & 0 deletions rosbag2_transport/include/rosbag2_transport/play_options.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@ struct PlayOptions
std::unordered_map<std::string, rclcpp::QoS> topic_qos_profile_overrides = {};
bool loop = false;
std::vector<std::string> topic_remapping_options = {};

// Rate in Hz at which to publish to /clock.
// 0 (or negative) means that no publisher will be created
float clock_publish_frequency = 0.f;
};

} // namespace rosbag2_transport
Expand Down
26 changes: 24 additions & 2 deletions rosbag2_transport/src/rosbag2_transport/player.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -169,10 +169,14 @@ void Player::play_messages_from_queue()
void Player::play_messages_until_queue_empty()
{
rosbag2_storage::SerializedBagMessageSharedPtr message;
rclcpp::executors::SingleThreadedExecutor exec;
exec.add_node(rosbag2_transport_);
while (message_queue_.try_dequeue(message) && rclcpp::ok()) {
// Do not move on until sleep_until returns true
// It will always sleep, so this is not a tight busy loop on pause
while (rclcpp::ok() && !clock_->sleep_until(message->time_stamp)) {}
do {
exec.spin_some();
} while (rclcpp::ok() && !clock_->sleep_until(message->time_stamp));
if (rclcpp::ok()) {
auto publisher_iter = publishers_.find(message->topic_name);
if (publisher_iter != publishers_.end()) {
Expand Down Expand Up @@ -217,7 +221,25 @@ void Player::prepare_publishers(const PlayOptions & options)
void Player::prepare_clock(const PlayOptions & options, rcutils_time_point_value_t starting_time)
{
float rate = options.rate > 0.0 ? options.rate : 1.0;
clock_ = std::make_unique<rosbag2_cpp::TimeControllerClock>(starting_time, rate);
std::chrono::nanoseconds sleep_timeout{25000000}; // default to 40Hz maximum spin

// Create /clock publisher
if (options.clock_publish_frequency > 0.f) {
// NOTE: PlayerClock does not own this publisher because rosbag2_cpp
// should not own transport-based functionality
std::chrono::duration<double> clock_period{1.0 / options.clock_publish_frequency};
sleep_timeout = std::chrono::duration_cast<std::chrono::nanoseconds>(clock_period);
clock_publisher_ = rosbag2_transport_->create_publisher<rosgraph_msgs::msg::Clock>(
"/clock", rclcpp::ClockQoS());
clock_publish_timer_ = rosbag2_transport_->create_wall_timer(
sleep_timeout, [this]() {
auto msg = rosgraph_msgs::msg::Clock();
msg.clock = rclcpp::Time(clock_->now());
clock_publisher_->publish(msg);
});
}

clock_ = std::make_unique<rosbag2_cpp::TimeControllerClock>(starting_time, sleep_timeout, rate);
}

} // namespace rosbag2_transport
6 changes: 4 additions & 2 deletions rosbag2_transport/src/rosbag2_transport/player.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,12 @@
#include <unordered_map>

#include "moodycamel/readerwriterqueue.h"

#include "rclcpp/publisher.hpp"
#include "rclcpp/qos.hpp"

#include "rosbag2_cpp/clocks/player_clock.hpp"
#include "rosbag2_storage/serialized_bag_message.hpp"
#include "rosbag2_transport/play_options.hpp"
#include "rosgraph_msgs/msg/clock.hpp"

namespace rosbag2_cpp
{
Expand Down Expand Up @@ -69,6 +69,8 @@ class Player
std::unordered_map<std::string, std::shared_ptr<GenericPublisher>> publishers_;
std::unordered_map<std::string, rclcpp::QoS> topic_qos_profile_overrides_;
std::unique_ptr<rosbag2_cpp::PlayerClock> clock_;
rclcpp::Publisher<rosgraph_msgs::msg::Clock>::SharedPtr clock_publisher_;
std::shared_ptr<rclcpp::TimerBase> clock_publish_timer_;
};

} // namespace rosbag2_transport
Expand Down

0 comments on commit e69696b

Please sign in to comment.