Skip to content

Commit

Permalink
Refactoring. Do full deserialization and only once
Browse files Browse the repository at this point in the history
- Rationale: We can't rely on assumption that we can safely partially
deserialize service event to the ServiceEventInfo structure.

Signed-off-by: Michael Orlov <michael.orlov@apex.ai>
  • Loading branch information
MichaelOrlov committed Mar 31, 2024
1 parent 5f324c1 commit 6397df6
Show file tree
Hide file tree
Showing 4 changed files with 166 additions and 48 deletions.
2 changes: 2 additions & 0 deletions rosbag2_cpp/src/rosbag2_cpp/service_utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,8 @@ size_t get_serialization_size_for_service_metadata_event()
static_cast<const rosidl_typesupport_introspection_cpp::MessageMembers *>(
type_support_handle->data);

// TODO(morlov): We shall not rely on this arithmetic!!! It is up to the serialization
// implementation
// endian type (4 size) + service event info size + empty request (4 bytes)
// + emtpy response (4 bytes)
size = 4 + service_event_info->size_of_ + 4 + 4;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ class PlayerServiceClientManager;
class PlayerServiceClient final
{
public:
using ServiceEventType = service_msgs::msg::ServiceEventInfo::_event_type_type;
using ClientGidType = service_msgs::msg::ServiceEventInfo::_client_gid_type;

explicit
PlayerServiceClient(
std::shared_ptr<rclcpp::GenericClient> generic_client,
Expand All @@ -46,7 +49,22 @@ class PlayerServiceClient final
rclcpp::Logger logger,
std::shared_ptr<PlayerServiceClientManager> player_service_client_manager);

// Note: Call this function only if is_include_request_message() return true
const std::string & get_service_name();

/// \brief Deserialize message to the type erased service event
/// \param message - Serialized message
/// \return Shared pointer to the byte array with deserialized service event if success,
/// otherwise nullptr
std::shared_ptr<uint8_t[]> deserialize_service_event(const rcl_serialized_message_t & message);

std::tuple<PlayerServiceClient::ServiceEventType, PlayerServiceClient::ClientGidType>
get_service_event_type_and_client_gid(const std::shared_ptr<uint8_t[]> type_erased_service_event);

bool is_service_event_include_request_message(
const std::shared_ptr<uint8_t[]> type_erased_service_event);

void async_send_request(const std::shared_ptr<uint8_t[]> type_erased_service_event);

void async_send_request(const rcl_serialized_message_t & message);

std::shared_ptr<rclcpp::GenericClient> generic_client()
Expand All @@ -73,16 +91,15 @@ class PlayerServiceClient final
};
bool service_set_introspection_content_ = false;

using client_id = service_msgs::msg::ServiceEventInfo::_client_gid_type;
// Info on request data from service or client
std::unordered_map<client_id, request_info_from, rosbag2_cpp::client_id_hash> request_info_;
std::unordered_map<ClientGidType, request_info_from, rosbag2_cpp::client_id_hash> request_info_;

const rosidl_message_type_support_t * service_event_type_ts_;
const rosidl_typesupport_introspection_cpp::MessageMembers * service_event_members_;

rcutils_allocator_t allocator_ = rcutils_get_default_allocator();

std::tuple<uint8_t, client_id, int64_t>
std::tuple<uint8_t, ClientGidType, int64_t>
get_msg_event_type(const rcl_serialized_message_t & message);
};

Expand Down
64 changes: 53 additions & 11 deletions rosbag2_transport/src/rosbag2_transport/player.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
#include <string>
#include <unordered_map>
#include <utility>
#include <variant>
#include <vector>
#include <thread>

Expand All @@ -36,7 +35,6 @@
#include "rosbag2_cpp/clocks/time_controller_clock.hpp"
#include "rosbag2_cpp/reader.hpp"
#include "rosbag2_cpp/service_utils.hpp"
#include "rosbag2_cpp/typesupport_helpers.hpp"

#include "rosbag2_storage/storage_filter.hpp"
#include "rosbag2_storage/qos.hpp"
Expand Down Expand Up @@ -1170,13 +1168,11 @@ void PlayerImpl::run_play_msg_post_callbacks(

bool PlayerImpl::publish_message(rosbag2_storage::SerializedBagMessageSharedPtr message)
{
bool message_published = false;

auto pub_iter = publishers_.find(message->topic_name);
if (pub_iter != publishers_.end()) {
// Calling on play message pre-callbacks
run_play_msg_pre_callbacks(message);

bool message_published = false;
try {
pub_iter->second->publish(rclcpp::SerializedMessage(*message->serialized_data));
message_published = true;
Expand All @@ -1191,17 +1187,61 @@ bool PlayerImpl::publish_message(rosbag2_storage::SerializedBagMessageSharedPtr
return message_published;
}

// Try to publish message as service request
auto client_iter = service_clients_.find(message->topic_name);
if (client_iter != service_clients_.end()) {
if (!client_iter->second->is_include_request_message(*message->serialized_data)) {
return message_published;
const auto & service_client = client_iter->second;
// TODO(morlov):
// Wrap deserialize_service_event and get_service_event_type_and_client_gid in try-catch
auto service_event = service_client->deserialize_service_event(*message->serialized_data);
if (!service_event) {
RCLCPP_ERROR_STREAM(
owner_->get_logger(), "Failed to deserialize service event message for '" <<
service_client->get_service_name() << "' service!\n");
return false;
}

auto [service_event_type, client_gid] =
service_client->get_service_event_type_and_client_gid(service_event);
// Ignore response message
if (service_event_type == service_msgs::msg::ServiceEventInfo::RESPONSE_SENT ||
service_event_type == service_msgs::msg::ServiceEventInfo::RESPONSE_RECEIVED)
{
// TODO(morlov): Shall we ignore REQUEST_RECEIVED as well?
return false;
}

if (!service_client->generic_client()->service_is_ready()) {
RCLCPP_ERROR(
owner_->get_logger(), "Service request hasn't been sent. The '%s' service isn't ready !",
service_client->get_service_name().c_str());
return false;
}

if (!service_client->is_service_event_include_request_message(service_event)) {
if (service_event_type == service_msgs::msg::ServiceEventInfo::REQUEST_RECEIVED) {
RCUTILS_LOG_WARN_ONCE_NAMED(
ROSBAG2_TRANSPORT_PACKAGE_NAME,
"Can't send service request. "
"The configuration of introspection for '%s' was metadata only on service side!",
service_client->get_service_name().c_str());
} else if (service_event_type == service_msgs::msg::ServiceEventInfo::REQUEST_SENT) {
RCUTILS_LOG_WARN_ONCE_NAMED(
ROSBAG2_TRANSPORT_PACKAGE_NAME,
"Can't send service request. "
"The configuration of introspection for '%s' client [ID: %s]` was metadata only!",
service_client->get_service_name().c_str(),
rosbag2_cpp::client_id_to_string(client_gid).c_str());
}
return false;
}

// Calling on play message pre-callbacks
run_play_msg_pre_callbacks(message);

bool message_published = false;
try {
client_iter->second->async_send_request(*message->serialized_data);
client_iter->second->async_send_request(service_event);
message_published = true;
} catch (const std::exception & e) {
RCLCPP_ERROR_STREAM(
Expand All @@ -1215,9 +1255,11 @@ bool PlayerImpl::publish_message(rosbag2_storage::SerializedBagMessageSharedPtr
return message_published;
}

RCLCPP_WARN_STREAM(
owner_->get_logger(), "Not find sender for topic '" << message->topic_name << "' topic.");
return message_published;
RCUTILS_LOG_WARN_ONCE_NAMED(
ROSBAG2_TRANSPORT_PACKAGE_NAME,
"Publisher for topic '%s' not found", message->topic_name.c_str());

return false;
}

void PlayerImpl::add_key_callback(
Expand Down
123 changes: 90 additions & 33 deletions rosbag2_transport/src/rosbag2_transport/player_service_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,9 @@ PlayerServiceClient::PlayerServiceClient(
throw std::invalid_argument(ss.str());
}

if (service_event_members_->members_[1].get_function == nullptr) {
if (!service_event_members_->members_[1].is_array_) {
std::stringstream ss;
ss << "get_function() for service request '" << service_name_ << "' is not set.\n";
ss << "The service request for '" << service_name_ << "' is not array.\n";
throw std::invalid_argument(ss.str());
}

Expand All @@ -75,6 +75,12 @@ PlayerServiceClient::PlayerServiceClient(
throw std::invalid_argument(ss.str());
}

if (service_event_members_->members_[1].get_function == nullptr) {
std::stringstream ss;
ss << "get_function() for service request '" << service_name_ << "' is not set.\n";
throw std::invalid_argument(ss.str());
}

if (service_event_members_->init_function == nullptr) {
std::stringstream ss;
ss << "service_event_members_->init_function for '" << service_name_ << "' is not set.\n";
Expand Down Expand Up @@ -194,61 +200,115 @@ bool PlayerServiceClient::is_include_request_message(const rcl_serialized_messag
return ret;
}

void PlayerServiceClient::async_send_request(const rcl_serialized_message_t & message)
const std::string & PlayerServiceClient::get_service_name()
{
if (!client_->service_is_ready()) {
RCLCPP_ERROR(
logger_, "Service request hasn't been sent. The '%s' service isn't ready !",
service_name_.c_str());
return;
}
return service_name_;
}

auto type_erased_ros_message = std::shared_ptr<uint8_t[]>(
std::shared_ptr<uint8_t[]>
PlayerServiceClient::deserialize_service_event(const rcl_serialized_message_t & message)
{
auto type_erased_service_event = std::shared_ptr<uint8_t[]>(
new uint8_t[service_event_members_->size_of_],
[fini_function = this->service_event_members_->fini_function](uint8_t * msg) {
fini_function(msg);
delete[] msg;
});

service_event_members_->init_function(
type_erased_ros_message.get(), rosidl_runtime_cpp::MessageInitialization::ZERO);
type_erased_service_event.get(), rosidl_runtime_cpp::MessageInitialization::ZERO);

rmw_ret_t ret =
rmw_deserialize(&message, service_event_type_ts_, type_erased_service_event.get());
if (ret != RMW_RET_OK) { // Failed to deserialize service event message
type_erased_service_event.reset();
}
return type_erased_service_event;
}

std::tuple<PlayerServiceClient::ServiceEventType, PlayerServiceClient::ClientGidType>
PlayerServiceClient::get_service_event_type_and_client_gid(
const std::shared_ptr<uint8_t[]> type_erased_service_event)
{
if (type_erased_service_event) {
// members_[0]: service_info, members_[1]: request[<=1], members_[2]: response[<=1]
const auto & info_member = service_event_members_->members_[0];

auto service_event_info_ptr = reinterpret_cast<service_msgs::msg::ServiceEventInfo *>(
type_erased_service_event.get() + info_member.offset_);
if (service_event_info_ptr == nullptr) {
throw std::runtime_error("Error: The service_event_info_ptr is nullptr");
}
return {service_event_info_ptr->event_type, service_event_info_ptr->client_gid};
} else {
throw std::invalid_argument("Error: The type_erased_service_event is nullptr");
}
}

rmw_ret_t ret = rmw_deserialize(&message, service_event_type_ts_, type_erased_ros_message.get());
if (ret == RMW_RET_OK) {
bool PlayerServiceClient::is_service_event_include_request_message(
const std::shared_ptr<uint8_t[]> type_erased_service_event)
{
if (type_erased_service_event) {
// members_[0]: service_info, members_[1]: request[<=1], members_[2]: response[<=1]
const auto & request_member = service_event_members_->members_[1];
void * request_sequence_ptr = type_erased_ros_message.get() + request_member.offset_;
if (request_member.is_array_ && request_member.size_function(request_sequence_ptr) > 0) {
void * request_ptr = request_member.get_function(request_sequence_ptr, 0);
auto future_and_request_id = client_->async_send_request(request_ptr);
player_service_client_manager_->register_request_future(future_and_request_id, client_);
} // else { /* No request in the service event */ }
void * request_sequence_ptr = type_erased_service_event.get() + request_member.offset_;
if (request_member.size_function(request_sequence_ptr) > 0) {
return true;
} // else { /* No service request */ }
} else {
throw std::invalid_argument("Error: The type_erased_service_event is nullptr");
}
return false;
}

void PlayerServiceClient::async_send_request(
const std::shared_ptr<uint8_t[]> type_erased_service_event)
{
// members_[0]: service_info, members_[1]: request[<=1], members_[2]: response[<=1]
const auto & request_member = service_event_members_->members_[1];
void * request_sequence_ptr = type_erased_service_event.get() + request_member.offset_;
if (request_member.size_function(request_sequence_ptr) > 0) {
void * request_ptr = request_member.get_function(request_sequence_ptr, 0);
auto future_and_request_id = client_->async_send_request(request_ptr);
player_service_client_manager_->register_request_future(future_and_request_id, client_);
} // else { /* No service request in the service event. Do nothing, just skip it. */ }
}

void PlayerServiceClient::async_send_request(const rcl_serialized_message_t & message)
{
if (!client_->service_is_ready()) {
RCLCPP_ERROR(
logger_, "Service request hasn't been sent. The '%s' service isn't ready !",
service_name_.c_str());
return;
}

if (ret != RMW_RET_OK) {
auto type_erased_ros_message = deserialize_service_event(message);

if (type_erased_ros_message) {
async_send_request(type_erased_ros_message);
} else {
throw std::runtime_error(
"Failed to deserialize service event message for " + service_name_ + " !");
}
}

std::tuple<uint8_t, PlayerServiceClient::client_id, int64_t>
std::tuple<uint8_t, PlayerServiceClient::ClientGidType, int64_t>
PlayerServiceClient::get_msg_event_type(const rcl_serialized_message_t & message)
{
auto msg = service_msgs::msg::ServiceEventInfo();

const rosidl_message_type_support_t * type_support_info =
rosidl_typesupport_cpp::
get_message_type_support_handle<service_msgs::msg::ServiceEventInfo>();
rosidl_typesupport_cpp::get_message_type_support_handle<service_msgs::msg::ServiceEventInfo>();
if (type_support_info == nullptr) {
throw std::runtime_error(
"Failed to get message type support handle of service event info !");
throw std::runtime_error("Failed to get message type support handle of service event info !");
}

auto ret = rmw_deserialize(
&message,
type_support_info,
reinterpret_cast<void *>(&msg));
if (ret != RMW_RET_OK) {
// Partially deserialize service event message. Deserializing only first member ServiceEventInfo
// with assumption that it is going to be the first in serialized message.
// TODO(morlov): We can't rely on this assumption. It is up to the underlying RMW and
// serialization format implementation!
if (rmw_deserialize(&message, type_support_info, reinterpret_cast<void *>(&msg)) != RMW_RET_OK) {
throw std::runtime_error("Failed to deserialize message !");
}

Expand All @@ -273,10 +333,7 @@ bool PlayerServiceClientManager::request_future_queue_is_full()
return false;
}

// Remove complete request future
remove_complete_request_future();

// Remove all timeout request future
remove_all_timeout_request_future();

if (request_futures_list_.size() == maximum_request_future_queue_) {
Expand Down

0 comments on commit 6397df6

Please sign in to comment.