Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[20596] Feature: topic keys with non breaking ABI (humble backport) #23

Merged
merged 2 commits into from
Mar 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ namespace rmw_fastrtps_cpp
class MessageTypeSupport : public TypeSupport
{
public:
explicit MessageTypeSupport(const message_type_support_callbacks_t * members);
explicit MessageTypeSupport(const message_type_support_callbacks_t * members, uint8_t abi_version);
};

} // namespace rmw_fastrtps_cpp
Expand Down
10 changes: 9 additions & 1 deletion rmw_fastrtps_cpp/include/rmw_fastrtps_cpp/TypeSupport.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
namespace rmw_fastrtps_cpp
{

uint8_t get_type_support_abi_version(const char * identifier);

class TypeSupport : public rmw_fastrtps_shared_cpp::TypeSupport
{
public:
Expand All @@ -42,13 +44,19 @@ class TypeSupport : public rmw_fastrtps_shared_cpp::TypeSupport
bool deserializeROSmessage(
eprosima::fastcdr::Cdr & deser, void * ros_message, const void * impl) const override;

protected:
bool get_key_hash_from_ros_message(
void * ros_message, eprosima::fastrtps::rtps::InstanceHandle_t * ihandle, bool force_md5, const void * impl) const override;

TypeSupport();

protected:
void set_members(const message_type_support_callbacks_t * members);

void set_members_v2(const message_type_support_callbacks_t * members);

private:
const message_type_support_callbacks_t * members_;
const message_type_support_key_callbacks_t* key_callbacks_;
bool has_data_;
};

Expand Down
13 changes: 12 additions & 1 deletion rmw_fastrtps_cpp/src/publisher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,8 @@ rmw_fastrtps_cpp::create_publisher(
}
}

uint8_t abi_version = get_type_support_abi_version(type_support->typesupport_identifier);

std::lock_guard<std::mutex> lck(participant_info->entity_creation_mutex_);

/////
Expand Down Expand Up @@ -180,7 +182,7 @@ rmw_fastrtps_cpp::create_publisher(
/////
// Create the Type Support struct
if (!fastdds_type) {
auto tsupport = new (std::nothrow) MessageTypeSupport_cpp(callbacks);
auto tsupport = new (std::nothrow) MessageTypeSupport_cpp(callbacks, abi_version);
if (!tsupport) {
RMW_SET_ERROR_MSG("create_publisher() failed to allocate MessageTypeSupport");
return nullptr;
Expand Down Expand Up @@ -269,6 +271,15 @@ rmw_fastrtps_cpp::create_publisher(
return nullptr;
}

// Apply resource limits QoS if the type is keyed
if (fastdds_type->m_isGetKeyDefined &&
!participant_info->leave_middleware_default_qos)
{
rmw_fastrtps_shared_cpp::apply_qos_resource_limits_for_keys(
writer_qos.history(),
writer_qos.resource_limits());
}

// Creates DataWriter with a mask enabling publication_matched calls for the listener
info->data_writer_ = publisher->create_datawriter(
topic.topic,
Expand Down
18 changes: 18 additions & 0 deletions rmw_fastrtps_cpp/src/rmw_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,15 @@ rmw_create_client(
return nullptr;
}

// Apply resource limits QoS if the type is keyed
if (response_fastdds_type->m_isGetKeyDefined &&
!participant_info->leave_middleware_default_qos)
{
rmw_fastrtps_shared_cpp::apply_qos_resource_limits_for_keys(
reader_qos.history(),
reader_qos.resource_limits());
}

// Creates DataReader
info->response_reader_ = subscriber->create_datareader(
response_topic_desc,
Expand Down Expand Up @@ -374,6 +383,15 @@ rmw_create_client(
return nullptr;
}

// Apply resource limits QoS if the type is keyed
if (request_fastdds_type->m_isGetKeyDefined &&
!participant_info->leave_middleware_default_qos)
{
rmw_fastrtps_shared_cpp::apply_qos_resource_limits_for_keys(
writer_qos.history(),
writer_qos.resource_limits());
}

// Creates DataWriter with a mask enabling publication_matched calls for the listener
info->request_writer_ = publisher->create_datawriter(
request_topic.topic,
Expand Down
8 changes: 6 additions & 2 deletions rmw_fastrtps_cpp/src/rmw_serialize.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,9 @@ rmw_serialize(
}

auto callbacks = static_cast<const message_type_support_callbacks_t *>(ts->data);
auto tss = MessageTypeSupport_cpp(callbacks);

uint8_t abi_version = rmw_fastrtps_cpp::get_type_support_abi_version(type_support->typesupport_identifier);
auto tss = MessageTypeSupport_cpp(callbacks, abi_version);
auto data_length = tss.getEstimatedSerializedSize(ros_message, callbacks);
if (serialized_message->buffer_capacity < data_length) {
if (rmw_serialized_message_resize(serialized_message, data_length) != RMW_RET_OK) {
Expand Down Expand Up @@ -78,7 +80,9 @@ rmw_deserialize(
}

auto callbacks = static_cast<const message_type_support_callbacks_t *>(ts->data);
auto tss = MessageTypeSupport_cpp(callbacks);

uint8_t abi_version = rmw_fastrtps_cpp::get_type_support_abi_version(type_support->typesupport_identifier);
auto tss = MessageTypeSupport_cpp(callbacks, abi_version);
eprosima::fastcdr::FastBuffer buffer(
reinterpret_cast<char *>(serialized_message->buffer), serialized_message->buffer_length);
eprosima::fastcdr::Cdr deser(buffer, eprosima::fastcdr::Cdr::DEFAULT_ENDIAN,
Expand Down
18 changes: 18 additions & 0 deletions rmw_fastrtps_cpp/src/rmw_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,15 @@ rmw_create_service(
return nullptr;
}

// Apply resource limits QoS if the type is keyed
if (request_fastdds_type->m_isGetKeyDefined &&
!participant_info->leave_middleware_default_qos)
{
rmw_fastrtps_shared_cpp::apply_qos_resource_limits_for_keys(
reader_qos.history(),
reader_qos.resource_limits());
}

// Creates DataReader
info->request_reader_ = subscriber->create_datareader(
request_topic_desc,
Expand Down Expand Up @@ -377,6 +386,15 @@ rmw_create_service(
return nullptr;
}

// Apply resource limits QoS if the type is keyed
if (response_fastdds_type->m_isGetKeyDefined &&
!participant_info->leave_middleware_default_qos)
{
rmw_fastrtps_shared_cpp::apply_qos_resource_limits_for_keys(
writer_qos.history(),
writer_qos.resource_limits());
}

// Creates DataWriter with a mask enabling publication_matched calls for the listener
info->response_writer_ = publisher->create_datawriter(
response_topic.topic,
Expand Down
13 changes: 12 additions & 1 deletion rmw_fastrtps_cpp/src/subscription.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,8 @@ create_subscription(
}
}

uint8_t abi_version = get_type_support_abi_version(type_support->typesupport_identifier);

std::lock_guard<std::mutex> lck(participant_info->entity_creation_mutex_);

/////
Expand Down Expand Up @@ -178,7 +180,7 @@ create_subscription(
/////
// Create the Type Support struct
if (!fastdds_type) {
auto tsupport = new (std::nothrow) MessageTypeSupport_cpp(callbacks);
auto tsupport = new (std::nothrow) MessageTypeSupport_cpp(callbacks, abi_version);
if (!tsupport) {
RMW_SET_ERROR_MSG("create_subscription() failed to allocate MessageTypeSupport");
return nullptr;
Expand Down Expand Up @@ -280,6 +282,15 @@ create_subscription(
return nullptr;
}

// Apply resource limits QoS if the type is keyed
if (fastdds_type->m_isGetKeyDefined &&
!participant_info->leave_middleware_default_qos)
{
rmw_fastrtps_shared_cpp::apply_qos_resource_limits_for_keys(
reader_qos.history(),
reader_qos.resource_limits());
}

info->datareader_qos_ = reader_qos;

// create_datareader
Expand Down
117 changes: 115 additions & 2 deletions rmw_fastrtps_cpp/src/type_support_common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,30 @@
namespace rmw_fastrtps_cpp
{

uint8_t get_type_support_abi_version(const char * identifier)
{
uint8_t abi_version = rmw_fastrtps_shared_cpp::TypeSupport::AbiVersion::ABI_V1;

if (strcmp(identifier, RMW_FASTRTPS_CPP_TYPESUPPORT_C_V2) == 0 ||
strcmp(identifier, RMW_FASTRTPS_CPP_TYPESUPPORT_CPP_V2) == 0)
{
abi_version = rmw_fastrtps_shared_cpp::TypeSupport::AbiVersion::ABI_V2;
}

return abi_version;
}

TypeSupport::TypeSupport()
{
abi_version_ = AbiVersion::ABI_V1;
m_isGetKeyDefined = false;
max_size_bound_ = false;
is_plain_ = false;
key_is_unbounded_ = false;
key_max_serialized_size_ = 0;
members_ = nullptr;
key_callbacks_ = nullptr;
has_data_ = false;
}

void TypeSupport::set_members(const message_type_support_callbacks_t * members)
Expand Down Expand Up @@ -59,6 +78,24 @@ void TypeSupport::set_members(const message_type_support_callbacks_t * members)
m_typeSize = (m_typeSize + 3) & ~3;
}

void TypeSupport::set_members_v2(const message_type_support_callbacks_t * members)
{

set_members(members);

if (nullptr != members->key_callbacks)
{
key_callbacks_ = members->key_callbacks;
m_isGetKeyDefined = true;

key_max_serialized_size_ = key_callbacks_->max_serialized_size_key(key_is_unbounded_);
if (!key_is_unbounded_)
{
key_buffer_.reserve(key_max_serialized_size_);
}
}
}

size_t TypeSupport::getEstimatedSerializedSize(const void * ros_message, const void * impl) const
{
if (is_plain_) {
Expand Down Expand Up @@ -129,14 +166,90 @@ bool TypeSupport::deserializeROSmessage(
return true;
}

MessageTypeSupport::MessageTypeSupport(const message_type_support_callbacks_t * members)
bool TypeSupport::get_key_hash_from_ros_message(
void * ros_message,
eprosima::fastrtps::rtps::InstanceHandle_t * ihandle,
bool force_md5,
const void * impl) const
{
assert(ros_message);
(void)impl;

// retrieve estimated serialized size in case key is unbounded
if (key_is_unbounded_)
{
key_max_serialized_size_ = (std::max) (
key_max_serialized_size_,
key_callbacks_->get_serialized_size_key(ros_message));
key_buffer_.reserve(key_max_serialized_size_);
}

eprosima::fastcdr::FastBuffer fast_buffer(
reinterpret_cast<char *>(key_buffer_.data()),
key_max_serialized_size_);

eprosima::fastcdr::Cdr ser(
fast_buffer, eprosima::fastcdr::Cdr::DEFAULT_ENDIAN, eprosima::fastcdr::Cdr::DDS_CDR);

key_callbacks_->cdr_serialize_key(ros_message, ser);

const size_t max_serialized_key_length = 16;

auto ser_length = ser.getSerializedDataLength();

// check for md5
if (force_md5 || key_max_serialized_size_ > max_serialized_key_length)
{
md5_.init();
md5_.update(key_buffer_.data(), static_cast<unsigned int>(ser_length));
md5_.finalize();

for (uint8_t i = 0; i < max_serialized_key_length; ++i)
{
ihandle->value[i] = md5_.digest[i];
}
}
else
{
memset(ihandle->value, 0, max_serialized_key_length);
for (uint8_t i = 0; i < ser_length; ++i)
{
ihandle->value[i] = key_buffer_[i];
}
}

return true;
}

MessageTypeSupport::MessageTypeSupport(
const message_type_support_callbacks_t * members,
uint8_t abi_version)
{
assert(members);

abi_version_ = abi_version;

std::string name = _create_type_name(members);
this->setName(name.c_str());

set_members(members);
switch (abi_version)
{
case TypeSupport::AbiVersion::ABI_V1:
{
set_members(members);
break;
}
case TypeSupport::AbiVersion::ABI_V2:
{
set_members_v2(members);
break;
}
default:
{
set_members(members);
break;
}
}
}

ServiceTypeSupport::ServiceTypeSupport()
Expand Down
2 changes: 2 additions & 0 deletions rmw_fastrtps_cpp/src/type_support_common.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@
#include "rosidl_typesupport_fastrtps_cpp/service_type_support.h"
#define RMW_FASTRTPS_CPP_TYPESUPPORT_C rosidl_typesupport_fastrtps_c__identifier
#define RMW_FASTRTPS_CPP_TYPESUPPORT_CPP rosidl_typesupport_fastrtps_cpp::typesupport_identifier
#define RMW_FASTRTPS_CPP_TYPESUPPORT_C_V2 rosidl_typesupport_fastrtps_c__identifier_v2
#define RMW_FASTRTPS_CPP_TYPESUPPORT_CPP_V2 rosidl_typesupport_fastrtps_cpp::typesupport_identifier_v2

using MessageTypeSupport_cpp = rmw_fastrtps_cpp::MessageTypeSupport;
using TypeSupport_cpp = rmw_fastrtps_cpp::TypeSupport;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ template<typename MembersType>
class MessageTypeSupport : public TypeSupport<MembersType>
{
public:
MessageTypeSupport(const MembersType * members, const void * ros_type_support);
MessageTypeSupport(const MembersType * members, const void * ros_type_support, uint8_t abi_version);
};

} // namespace rmw_fastrtps_dynamic_cpp
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,12 @@ namespace rmw_fastrtps_dynamic_cpp

template<typename MembersType>
MessageTypeSupport<MembersType>::MessageTypeSupport(
const MembersType * members, const void * ros_type_support)
const MembersType * members, const void * ros_type_support, uint8_t abi_version)
: TypeSupport<MembersType>(ros_type_support)
{
assert(members);
this->members_ = members;
this->abi_version_ = abi_version;

std::ostringstream ss;
std::string message_namespace(this->members_->message_namespace_);
Expand All @@ -56,10 +57,17 @@ MessageTypeSupport<MembersType>::MessageTypeSupport(
// Encapsulation size
this->m_typeSize = 4;
if (this->members_->member_count_ != 0) {
this->m_typeSize += static_cast<uint32_t>(this->calculateMaxSerializedSize(members, 0));
this->m_typeSize += static_cast<uint32_t>(this->calculateMaxSerializedSize(members, 0, this->key_max_serialized_size_));
} else {
this->m_typeSize++;
}

if (this->key_max_serialized_size_ != 0)
{
this->m_isGetKeyDefined = true;
this->key_buffer_.reserve(this->key_max_serialized_size_);
}

// Account for RTPS submessage alignment
this->m_typeSize = (this->m_typeSize + 3) & ~3;
}
Expand Down
Loading