Skip to content

Commit

Permalink
Misc findings and improvements 1
Browse files Browse the repository at this point in the history
Signed-off-by: Michael Orlov <michael.orlov@apex.ai>
  • Loading branch information
MichaelOrlov committed Apr 10, 2024
1 parent b87c944 commit bede854
Show file tree
Hide file tree
Showing 5 changed files with 25 additions and 23 deletions.
2 changes: 2 additions & 0 deletions rosbag2_transport/include/rosbag2_transport/player.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,8 @@ class Player : public rclcpp::Node
void delete_on_play_message_callback(const callback_handle_t & handle);

/// Wait until sent service requests will receive responses from service servers.
/// \note The player node shall be spun in the executor in a parallel thread to be able to wait
/// for responses.
/// \param service_name - Name of the service or service event from what to wait responses.
/// \note is service_name is empty the function will wait until all service requests sent to all
/// service servers will finish. Timeout in this cases will be used for each service name.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,17 +107,17 @@ class PlayerServiceClientManager final
bool request_future_queue_is_full();

bool register_request_future(
rclcpp::GenericClient::FutureAndRequestId & request_future,
rclcpp::GenericClient::FutureAndRequestId && request_future,
std::weak_ptr<rclcpp::GenericClient> client);

bool wait_for_all_futures(std::chrono::duration<double> timeout = std::chrono::seconds(5));

private:
using time_point = std::chrono::steady_clock::time_point;
using ptr_future_and_request_id = std::unique_ptr<rclcpp::GenericClient::FutureAndRequestId>;
using request_id_and_client_t =
using future_request_id_and_client_t =
std::pair<ptr_future_and_request_id, std::weak_ptr<rclcpp::GenericClient>>;
std::map<time_point, request_id_and_client_t> request_futures_list_;
std::map<time_point, future_request_id_and_client_t> request_futures_list_;
std::mutex request_futures_list_mutex_;

std::chrono::seconds request_future_timeout_;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -226,10 +226,11 @@ PlayOptions get_play_options_from_node_params(rclcpp::Node & node)
} else if (service_requests_source == "CLIENT_INTROSPECTION") {
play_options.service_requests_source = ServiceRequestsSource::CLIENT_INTROSPECTION;
} else {
play_options.service_requests_source = ServiceRequestsSource::SERVICE_INTROSPECTION;
RCLCPP_ERROR(
node.get_logger(),
"play.service_requests_source doesn't support %s. It must be one of SERVICE_INTROSPECTION"
" and CLIENT_INTROSPECTION. Change it by default value SERVICE_INTROSPECTION.",
" and CLIENT_INTROSPECTION. Changed it to default value SERVICE_INTROSPECTION.",
service_requests_source.c_str());
}

Expand Down
13 changes: 6 additions & 7 deletions rosbag2_transport/src/rosbag2_transport/player.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -965,7 +965,7 @@ namespace
{
bool allow_topic(
bool is_service,
const std::string topic_name,
const std::string & topic_name,
const rosbag2_storage::StorageFilter & storage_filter)
{
auto & include_topics = storage_filter.topics;
Expand Down Expand Up @@ -1103,11 +1103,11 @@ void PlayerImpl::prepare_publishers()
auto service_name = rosbag2_cpp::service_event_topic_name_to_service_name(topic.name);
auto service_type = rosbag2_cpp::service_event_topic_type_to_service_type(topic.type);
try {
auto cli = owner_->create_generic_client(service_name, service_type);
auto player_cli = std::make_shared<PlayerServiceClient>(
std::move(cli), service_name, topic.type, owner_->get_logger(),
auto generic_client = owner_->create_generic_client(service_name, service_type);
auto player_client = std::make_shared<PlayerServiceClient>(
std::move(generic_client), service_name, topic.type, owner_->get_logger(),
player_service_client_manager_);
service_clients_.insert(std::make_pair(topic.name, player_cli));
service_clients_.insert(std::make_pair(topic.name, player_client));
} catch (const std::runtime_error & e) {
RCLCPP_WARN(
owner_->get_logger(),
Expand Down Expand Up @@ -1238,7 +1238,6 @@ bool PlayerImpl::publish_message(rosbag2_storage::SerializedBagMessageSharedPtr
if (service_event_type == service_msgs::msg::ServiceEventInfo::RESPONSE_SENT ||
service_event_type == service_msgs::msg::ServiceEventInfo::RESPONSE_RECEIVED)
{
// TODO(morlov): Shall we ignore REQUEST_RECEIVED as well?
return false;
}

Expand Down Expand Up @@ -1284,7 +1283,7 @@ bool PlayerImpl::publish_message(rosbag2_storage::SerializedBagMessageSharedPtr

bool message_published = false;
try {
client_iter->second->async_send_request(service_event);
service_client->async_send_request(service_event);
message_published = true;
} catch (const std::exception & e) {
RCLCPP_ERROR_STREAM(
Expand Down
24 changes: 12 additions & 12 deletions rosbag2_transport/src/rosbag2_transport/player_service_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,8 @@ void PlayerServiceClient::async_send_request(
if (request_member.size_function(request_sequence_ptr) > 0) {
void * request_ptr = request_member.get_function(request_sequence_ptr, 0);
auto future_and_request_id = client_->async_send_request(request_ptr);
player_service_client_manager_->register_request_future(future_and_request_id, client_);
player_service_client_manager_->register_request_future(
std::move(future_and_request_id), client_);
} // else { /* No service request in the service event. Do nothing, just skip it. */ }
}

Expand Down Expand Up @@ -245,23 +246,22 @@ bool PlayerServiceClientManager::request_future_queue_is_full()
}

bool PlayerServiceClientManager::register_request_future(
rclcpp::GenericClient::FutureAndRequestId & request_future,
rclcpp::GenericClient::FutureAndRequestId && request_future,
std::weak_ptr<rclcpp::GenericClient> client)
{
auto future_and_request_id =
std::make_unique<rclcpp::GenericClient::FutureAndRequestId>(std::move(request_future));

if (!request_future_queue_is_full()) {
std::lock_guard<std::mutex> lock(request_futures_list_mutex_);
request_futures_list_[std::chrono::steady_clock::now()] =
{std::move(future_and_request_id), client};
return true;
auto emplace_result = request_futures_list_.emplace(
std::chrono::steady_clock::now(), std::make_pair(std::move(future_and_request_id), client));
return emplace_result.second;
} else {
ROSBAG2_TRANSPORT_LOG_WARN(
"Client request queue is full. "
"Please consider increasing the length of the queue.");
}

return false;
}

Expand Down Expand Up @@ -301,13 +301,12 @@ bool PlayerServiceClientManager::wait_for_all_futures(std::chrono::duration<doub
void PlayerServiceClientManager::remove_complete_request_future()
{
std::vector<time_point> remove_keys;
for (auto & [timestamp, request_id_and_client] : request_futures_list_) {
if (request_id_and_client.first->wait_for(std::chrono::seconds(0)) ==
std::future_status::ready)
{
auto client = request_id_and_client.second.lock();
std::lock_guard<std::mutex> lock(request_futures_list_mutex_);
for (auto & [timestamp, future_request_id_and_client] : request_futures_list_) {
if (future_request_id_and_client.first->wait_for(0s) == std::future_status::ready) {
auto client = future_request_id_and_client.second.lock();
if (client) {
client->remove_pending_request(request_id_and_client.first->request_id);
client->remove_pending_request(future_request_id_and_client.first->request_id);
}
remove_keys.emplace_back(timestamp);
}
Expand All @@ -319,6 +318,7 @@ void PlayerServiceClientManager::remove_complete_request_future()

void PlayerServiceClientManager::remove_all_timeout_request_future()
{
std::lock_guard<std::mutex> lock(request_futures_list_mutex_);
auto current_time = std::chrono::steady_clock::now();
auto first_iter_without_timeout =
request_futures_list_.lower_bound(current_time - request_future_timeout_);
Expand Down

0 comments on commit bede854

Please sign in to comment.