Skip to content

Commit

Permalink
Redesign Player class with PIMPL idiom (#1447)
Browse files Browse the repository at this point in the history
* Player PIMPL

Signed-off-by: Patrick Roncagliolo <ronca.pat@gmail.com>

* Fix

Signed-off-by: Patrick Roncagliolo <ronca.pat@gmail.com>

* Fix

Signed-off-by: Patrick Roncagliolo <ronca.pat@gmail.com>

* Fix

Signed-off-by: Patrick Roncagliolo <ronca.pat@gmail.com>

* Fix

Signed-off-by: Patrick Roncagliolo <ronca.pat@gmail.com>

* Fix

Signed-off-by: Patrick Roncagliolo <ronca.pat@gmail.com>

* Fix

Signed-off-by: Patrick Roncagliolo <ronca.pat@gmail.com>

* WIP: make test compile again

Signed-off-by: Patrick Roncagliolo <ronca.pat@gmail.com>

* WIP: make test compile again

Signed-off-by: Patrick Roncagliolo <ronca.pat@gmail.com>

* WIP: make test compile again

Signed-off-by: Patrick Roncagliolo <ronca.pat@gmail.com>

* WIP: make test compile again

Signed-off-by: Patrick Roncagliolo <ronca.pat@gmail.com>

* WIP: make test compile again

Signed-off-by: Patrick Roncagliolo <ronca.pat@gmail.com>

* Uncrustify

Signed-off-by: Patrick Roncagliolo <ronca.pat@gmail.com>

* WIP: make test compile again

Signed-off-by: Patrick Roncagliolo <ronca.pat@gmail.com>

* WIP: make test compile again

Signed-off-by: Patrick Roncagliolo <ronca.pat@gmail.com>

* Clean

Signed-off-by: Patrick Roncagliolo <ronca.pat@gmail.com>

* Revert wrong reformat

Signed-off-by: Patrick Roncagliolo <ronca.pat@gmail.com>

* Some renames and code formatting

Signed-off-by: Michael Orlov <michael.orlov@apex.ai>

* Fixes for failing keyboard_handler tests

Use upper level public API from owner class for callbacks to facilitate
unit tests

Signed-off-by: Michael Orlov <michael.orlov@apex.ai>

---------

Signed-off-by: Patrick Roncagliolo <ronca.pat@gmail.com>
Signed-off-by: Michael Orlov <michael.orlov@apex.ai>
Co-authored-by: Michael Orlov <michael.orlov@apex.ai>
  • Loading branch information
roncapat and MichaelOrlov authored Sep 20, 2023
1 parent 64e2972 commit 96c11b0
Show file tree
Hide file tree
Showing 3 changed files with 558 additions and 267 deletions.
136 changes: 27 additions & 109 deletions rosbag2_transport/include/rosbag2_transport/player.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,9 @@ class Reader;

namespace rosbag2_transport
{

class PlayerImpl;

class Player : public rclcpp::Node
{
public:
Expand Down Expand Up @@ -200,118 +203,33 @@ class Player : public rclcpp::Node
void delete_on_play_message_callback(const callback_handle_t & handle);

protected:
struct play_msg_callback_data
{
callback_handle_t handle;
play_msg_callback_t callback;
};

std::mutex on_play_msg_callbacks_mutex_;
std::forward_list<play_msg_callback_data> on_play_msg_pre_callbacks_;
std::forward_list<play_msg_callback_data> on_play_msg_post_callbacks_;

class PlayerPublisher final
{
public:
explicit PlayerPublisher(
std::shared_ptr<rclcpp::GenericPublisher> pub,
bool disable_loan_message)
: publisher_(std::move(pub))
{
using std::placeholders::_1;
if (disable_loan_message || !publisher_->can_loan_messages()) {
publish_func_ = std::bind(&rclcpp::GenericPublisher::publish, publisher_, _1);
} else {
publish_func_ = std::bind(&rclcpp::GenericPublisher::publish_as_loaned_msg, publisher_, _1);
}
}

~PlayerPublisher() {}

void publish(const rclcpp::SerializedMessage & message)
{
publish_func_(message);
}

std::shared_ptr<rclcpp::GenericPublisher> generic_publisher()
{
return publisher_;
}
/// \brief Getter for publishers corresponding to each topic
/// \return Hashtable representing topic to publisher map excluding inner clock_publisher
ROSBAG2_TRANSPORT_PUBLIC
std::unordered_map<std::string, std::shared_ptr<rclcpp::GenericPublisher>> get_publishers();

private:
std::shared_ptr<rclcpp::GenericPublisher> publisher_;
std::function<void(const rclcpp::SerializedMessage &)> publish_func_;
};
bool is_ready_to_play_from_queue_{false};
std::mutex ready_to_play_from_queue_mutex_;
std::condition_variable ready_to_play_from_queue_cv_;
rclcpp::Publisher<rosgraph_msgs::msg::Clock>::SharedPtr clock_publisher_;
std::unordered_map<std::string, std::shared_ptr<PlayerPublisher>> publishers_;
/// \brief Getter for inner clock_publisher
/// \return Shared pointer to the inner clock_publisher
ROSBAG2_TRANSPORT_PUBLIC
rclcpp::Publisher<rosgraph_msgs::msg::Clock>::SharedPtr get_clock_publisher();

/// \brief Blocks and wait on condition variable until first message will be taken from read
/// queue
ROSBAG2_TRANSPORT_PUBLIC
void wait_for_playback_to_start();

/// \brief Getter for the number of registered on_play_msg_pre_callbacks
/// \return Number of registered on_play_msg_pre_callbacks
ROSBAG2_TRANSPORT_PUBLIC
size_t get_number_of_registered_on_play_msg_pre_callbacks();

/// \brief Getter for the number of registered on_play_msg_post_callbacks
/// \return Number of registered on_play_msg_post_callbacks
ROSBAG2_TRANSPORT_PUBLIC
size_t get_number_of_registered_on_play_msg_post_callbacks();

private:
rosbag2_storage::SerializedBagMessageSharedPtr peek_next_message_from_queue();
void load_storage_content();
bool is_storage_completely_loaded() const;
void enqueue_up_to_boundary(size_t boundary) RCPPUTILS_TSA_REQUIRES(reader_mutex_);
void wait_for_filled_queue() const;
void play_messages_from_queue();
void prepare_publishers();
bool publish_message(rosbag2_storage::SerializedBagMessageSharedPtr message);
static callback_handle_t get_new_on_play_msg_callback_handle();
void add_key_callback(
KeyboardHandler::KeyCode key,
const std::function<void()> & cb,
const std::string & op_name);
void add_keyboard_callbacks();
void create_control_services();
void configure_play_until_timestamp();
bool shall_stop_at_timestamp(const rcutils_time_point_value_t & msg_timestamp) const;

static constexpr double read_ahead_lower_bound_percentage_ = 0.9;
static const std::chrono::milliseconds queue_read_wait_period_;
std::atomic_bool cancel_wait_for_next_message_{false};
std::atomic_bool stop_playback_{false};

std::mutex reader_mutex_;
std::unique_ptr<rosbag2_cpp::Reader> reader_ RCPPUTILS_TSA_GUARDED_BY(reader_mutex_);

void publish_clock_update();
void publish_clock_update(const rclcpp::Time & time);

rosbag2_storage::StorageOptions storage_options_;
rosbag2_transport::PlayOptions play_options_;
rcutils_time_point_value_t play_until_timestamp_ = -1;
moodycamel::ReaderWriterQueue<rosbag2_storage::SerializedBagMessageSharedPtr> message_queue_;
mutable std::future<void> storage_loading_future_;
std::atomic_bool load_storage_content_{true};
std::unordered_map<std::string, rclcpp::QoS> topic_qos_profile_overrides_;
std::unique_ptr<rosbag2_cpp::PlayerClock> clock_;
std::shared_ptr<rclcpp::TimerBase> clock_publish_timer_;
std::mutex skip_message_in_main_play_loop_mutex_;
bool skip_message_in_main_play_loop_ RCPPUTILS_TSA_GUARDED_BY(
skip_message_in_main_play_loop_mutex_) = false;
std::atomic_bool is_in_playback_{false};

rcutils_time_point_value_t starting_time_;

// control services
rclcpp::Service<rosbag2_interfaces::srv::Pause>::SharedPtr srv_pause_;
rclcpp::Service<rosbag2_interfaces::srv::Resume>::SharedPtr srv_resume_;
rclcpp::Service<rosbag2_interfaces::srv::TogglePaused>::SharedPtr srv_toggle_paused_;
rclcpp::Service<rosbag2_interfaces::srv::IsPaused>::SharedPtr srv_is_paused_;
rclcpp::Service<rosbag2_interfaces::srv::GetRate>::SharedPtr srv_get_rate_;
rclcpp::Service<rosbag2_interfaces::srv::SetRate>::SharedPtr srv_set_rate_;
rclcpp::Service<rosbag2_interfaces::srv::Play>::SharedPtr srv_play_;
rclcpp::Service<rosbag2_interfaces::srv::PlayNext>::SharedPtr srv_play_next_;
rclcpp::Service<rosbag2_interfaces::srv::Burst>::SharedPtr srv_burst_;
rclcpp::Service<rosbag2_interfaces::srv::Seek>::SharedPtr srv_seek_;
rclcpp::Service<rosbag2_interfaces::srv::Stop>::SharedPtr srv_stop_;

rclcpp::Publisher<rosbag2_interfaces::msg::ReadSplitEvent>::SharedPtr split_event_pub_;

// defaults
std::shared_ptr<KeyboardHandler> keyboard_handler_;
std::vector<KeyboardHandler::callback_handle_t> keyboard_callbacks_;
std::unique_ptr<PlayerImpl> pimpl_;
};

} // namespace rosbag2_transport
Expand Down
Loading

0 comments on commit 96c11b0

Please sign in to comment.