Skip to content

Commit

Permalink
Feature: topic keys (#23)
Browse files Browse the repository at this point in the history
* Refs #20596: Cherry pick Keys Feature from iron

* Refs #20164: rmw_fastrtps_shared: prepare api for keys support

Signed-off-by: Mario Dominguez <mariodominguez@eprosima.com>

* Refs #20164: rmw_fastrtps_shared: getKey() implementation

Signed-off-by: Mario Dominguez <mariodominguez@eprosima.com>

* Refs #20164: rmw_fastrtps* interface methods update for keys support

Signed-off-by: Mario Dominguez <mariodominguez@eprosima.com>

* Refs #20164: rmw_fastrtps_cpp empty implementation

Signed-off-by: Mario Dominguez <mariodominguez@eprosima.com>

* Refs #20164: rmw_fastrtps_dynamic_cpp Message & Service Typesupport construction updates

Signed-off-by: Mario Dominguez <mariodominguez@eprosima.com>

* Refs #20164: rmw_fastrtps_dynamic_cpp type_support_proxy updates implementation

Signed-off-by: Mario Dominguez <mariodominguez@eprosima.com>

* Refs #20164: rmw_fastrtps_dynamic_cpp type_support implementation

Signed-off-by: Mario Dominguez <mariodominguez@eprosima.com>

* Refs #20164: Move typesupport key vars to rmw_fastrtps_shared_cpp

Signed-off-by: Mario Dominguez <mariodominguez@eprosima.com>

* Refs #20164: Initialize Typesupport key members in rmw_fastrtps_cpp

Signed-off-by: Mario Dominguez <mariodominguez@eprosima.com>

* Refs #20310: rmw_fastrtps_shared_cpp: add abi_version enum and type support member

Signed-off-by: Mario Dominguez <mariodominguez@eprosima.com>

* Refs #20310: rmw_fastrtps_cpp: adopt abi version v2

Signed-off-by: Mario Dominguez <mariodominguez@eprosima.com>

* Refs #20310: rmw_fastrtps_dynamic_cpp: adopt abi version v2

Signed-off-by: Mario Dominguez <mariodominguez@eprosima.com>

* Refs #20310: rmw_fastrtps_shared_cpp: apply_qos_resource_limits_for_keys() free function

Signed-off-by: Mario Dominguez <mariodominguez@eprosima.com>

* Refs #20310: rmw_fastrtps_cpp: apply qos for keys

Signed-off-by: Mario Dominguez <mariodominguez@eprosima.com>

* Refs #20310: rmw_fastrtps_dynamic_cpp: apply qos for keys

Signed-off-by: Mario Dominguez <mariodominguez@eprosima.com>

* Refs #20310: Review suggestions

Signed-off-by: Mario Dominguez <mariodominguez@eprosima.com>

* Refs #20310: fix: include checking for the v2 identifier when trying to register the typesupport_introspection. This is useful when builtin types are also built with v2 abi and rmw_fastrtps_cpp is used

Signed-off-by: Mario Dominguez <mariodominguez@eprosima.com>

* Refs #20310: rmw_fastrtps_cpp review 2 suggestion

Signed-off-by: Mario Dominguez <mariodominguez@eprosima.com>

* Refs #20310: Updates according latest message_type_support structure (no initial alignment)

Signed-off-by: Mario Dominguez <mariodominguez@eprosima.com>

* Refs #20310: Compile for Fast CDR 1 only

Signed-off-by: Mario Dominguez <mariodominguez@eprosima.com>

* Refs #20310: Comment typo

Signed-off-by: Mario Dominguez <mariodominguez@eprosima.com>

---------

Signed-off-by: Mario Dominguez <mariodominguez@eprosima.com>

* Refs #20596: Adapt rmw_fastrtps_shared_cpp: adapt getKey() method to humble

Signed-off-by: Mario Dominguez <mariodominguez@eprosima.com>

---------

Signed-off-by: Mario Dominguez <mariodominguez@eprosima.com>
  • Loading branch information
Mario-DL committed Jun 21, 2024
1 parent 6440cf7 commit d2b84e9
Show file tree
Hide file tree
Showing 28 changed files with 716 additions and 63 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ namespace rmw_fastrtps_cpp
class MessageTypeSupport : public TypeSupport
{
public:
explicit MessageTypeSupport(const message_type_support_callbacks_t * members);
explicit MessageTypeSupport(const message_type_support_callbacks_t * members, uint8_t abi_version);
};

} // namespace rmw_fastrtps_cpp
Expand Down
10 changes: 9 additions & 1 deletion rmw_fastrtps_cpp/include/rmw_fastrtps_cpp/TypeSupport.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
namespace rmw_fastrtps_cpp
{

uint8_t get_type_support_abi_version(const char * identifier);

class TypeSupport : public rmw_fastrtps_shared_cpp::TypeSupport
{
public:
Expand All @@ -42,13 +44,19 @@ class TypeSupport : public rmw_fastrtps_shared_cpp::TypeSupport
bool deserializeROSmessage(
eprosima::fastcdr::Cdr & deser, void * ros_message, const void * impl) const override;

protected:
bool get_key_hash_from_ros_message(
void * ros_message, eprosima::fastrtps::rtps::InstanceHandle_t * ihandle, bool force_md5, const void * impl) const override;

TypeSupport();

protected:
void set_members(const message_type_support_callbacks_t * members);

void set_members_v2(const message_type_support_callbacks_t * members);

private:
const message_type_support_callbacks_t * members_;
const message_type_support_key_callbacks_t* key_callbacks_;
bool has_data_;
};

Expand Down
13 changes: 12 additions & 1 deletion rmw_fastrtps_cpp/src/publisher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,8 @@ rmw_fastrtps_cpp::create_publisher(
}
}

uint8_t abi_version = get_type_support_abi_version(type_support->typesupport_identifier);

std::lock_guard<std::mutex> lck(participant_info->entity_creation_mutex_);

/////
Expand Down Expand Up @@ -180,7 +182,7 @@ rmw_fastrtps_cpp::create_publisher(
/////
// Create the Type Support struct
if (!fastdds_type) {
auto tsupport = new (std::nothrow) MessageTypeSupport_cpp(callbacks);
auto tsupport = new (std::nothrow) MessageTypeSupport_cpp(callbacks, abi_version);
if (!tsupport) {
RMW_SET_ERROR_MSG("create_publisher() failed to allocate MessageTypeSupport");
return nullptr;
Expand Down Expand Up @@ -269,6 +271,15 @@ rmw_fastrtps_cpp::create_publisher(
return nullptr;
}

// Apply resource limits QoS if the type is keyed
if (fastdds_type->m_isGetKeyDefined &&
!participant_info->leave_middleware_default_qos)
{
rmw_fastrtps_shared_cpp::apply_qos_resource_limits_for_keys(
writer_qos.history(),
writer_qos.resource_limits());
}

// Creates DataWriter with a mask enabling publication_matched calls for the listener
info->data_writer_ = publisher->create_datawriter(
topic.topic,
Expand Down
18 changes: 18 additions & 0 deletions rmw_fastrtps_cpp/src/rmw_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,15 @@ rmw_create_client(
return nullptr;
}

// Apply resource limits QoS if the type is keyed
if (response_fastdds_type->m_isGetKeyDefined &&
!participant_info->leave_middleware_default_qos)
{
rmw_fastrtps_shared_cpp::apply_qos_resource_limits_for_keys(
reader_qos.history(),
reader_qos.resource_limits());
}

// Creates DataReader
info->response_reader_ = subscriber->create_datareader(
response_topic_desc,
Expand Down Expand Up @@ -374,6 +383,15 @@ rmw_create_client(
return nullptr;
}

// Apply resource limits QoS if the type is keyed
if (request_fastdds_type->m_isGetKeyDefined &&
!participant_info->leave_middleware_default_qos)
{
rmw_fastrtps_shared_cpp::apply_qos_resource_limits_for_keys(
writer_qos.history(),
writer_qos.resource_limits());
}

// Creates DataWriter with a mask enabling publication_matched calls for the listener
info->request_writer_ = publisher->create_datawriter(
request_topic.topic,
Expand Down
8 changes: 6 additions & 2 deletions rmw_fastrtps_cpp/src/rmw_serialize.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,9 @@ rmw_serialize(
}

auto callbacks = static_cast<const message_type_support_callbacks_t *>(ts->data);
auto tss = MessageTypeSupport_cpp(callbacks);

uint8_t abi_version = rmw_fastrtps_cpp::get_type_support_abi_version(type_support->typesupport_identifier);
auto tss = MessageTypeSupport_cpp(callbacks, abi_version);
auto data_length = tss.getEstimatedSerializedSize(ros_message, callbacks);
if (serialized_message->buffer_capacity < data_length) {
if (rmw_serialized_message_resize(serialized_message, data_length) != RMW_RET_OK) {
Expand Down Expand Up @@ -78,7 +80,9 @@ rmw_deserialize(
}

auto callbacks = static_cast<const message_type_support_callbacks_t *>(ts->data);
auto tss = MessageTypeSupport_cpp(callbacks);

uint8_t abi_version = rmw_fastrtps_cpp::get_type_support_abi_version(type_support->typesupport_identifier);
auto tss = MessageTypeSupport_cpp(callbacks, abi_version);
eprosima::fastcdr::FastBuffer buffer(
reinterpret_cast<char *>(serialized_message->buffer), serialized_message->buffer_length);
eprosima::fastcdr::Cdr deser(buffer, eprosima::fastcdr::Cdr::DEFAULT_ENDIAN,
Expand Down
18 changes: 18 additions & 0 deletions rmw_fastrtps_cpp/src/rmw_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,15 @@ rmw_create_service(
return nullptr;
}

// Apply resource limits QoS if the type is keyed
if (request_fastdds_type->m_isGetKeyDefined &&
!participant_info->leave_middleware_default_qos)
{
rmw_fastrtps_shared_cpp::apply_qos_resource_limits_for_keys(
reader_qos.history(),
reader_qos.resource_limits());
}

// Creates DataReader
info->request_reader_ = subscriber->create_datareader(
request_topic_desc,
Expand Down Expand Up @@ -377,6 +386,15 @@ rmw_create_service(
return nullptr;
}

// Apply resource limits QoS if the type is keyed
if (response_fastdds_type->m_isGetKeyDefined &&
!participant_info->leave_middleware_default_qos)
{
rmw_fastrtps_shared_cpp::apply_qos_resource_limits_for_keys(
writer_qos.history(),
writer_qos.resource_limits());
}

// Creates DataWriter with a mask enabling publication_matched calls for the listener
info->response_writer_ = publisher->create_datawriter(
response_topic.topic,
Expand Down
13 changes: 12 additions & 1 deletion rmw_fastrtps_cpp/src/subscription.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,8 @@ create_subscription(
}
}

uint8_t abi_version = get_type_support_abi_version(type_support->typesupport_identifier);

std::lock_guard<std::mutex> lck(participant_info->entity_creation_mutex_);

/////
Expand Down Expand Up @@ -178,7 +180,7 @@ create_subscription(
/////
// Create the Type Support struct
if (!fastdds_type) {
auto tsupport = new (std::nothrow) MessageTypeSupport_cpp(callbacks);
auto tsupport = new (std::nothrow) MessageTypeSupport_cpp(callbacks, abi_version);
if (!tsupport) {
RMW_SET_ERROR_MSG("create_subscription() failed to allocate MessageTypeSupport");
return nullptr;
Expand Down Expand Up @@ -280,6 +282,15 @@ create_subscription(
return nullptr;
}

// Apply resource limits QoS if the type is keyed
if (fastdds_type->m_isGetKeyDefined &&
!participant_info->leave_middleware_default_qos)
{
rmw_fastrtps_shared_cpp::apply_qos_resource_limits_for_keys(
reader_qos.history(),
reader_qos.resource_limits());
}

info->datareader_qos_ = reader_qos;

// create_datareader
Expand Down
117 changes: 115 additions & 2 deletions rmw_fastrtps_cpp/src/type_support_common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,30 @@
namespace rmw_fastrtps_cpp
{

uint8_t get_type_support_abi_version(const char * identifier)
{
uint8_t abi_version = rmw_fastrtps_shared_cpp::TypeSupport::AbiVersion::ABI_V1;

if (strcmp(identifier, RMW_FASTRTPS_CPP_TYPESUPPORT_C_V2) == 0 ||
strcmp(identifier, RMW_FASTRTPS_CPP_TYPESUPPORT_CPP_V2) == 0)
{
abi_version = rmw_fastrtps_shared_cpp::TypeSupport::AbiVersion::ABI_V2;
}

return abi_version;
}

TypeSupport::TypeSupport()
{
abi_version_ = AbiVersion::ABI_V1;
m_isGetKeyDefined = false;
max_size_bound_ = false;
is_plain_ = false;
key_is_unbounded_ = false;
key_max_serialized_size_ = 0;
members_ = nullptr;
key_callbacks_ = nullptr;
has_data_ = false;
}

void TypeSupport::set_members(const message_type_support_callbacks_t * members)
Expand Down Expand Up @@ -59,6 +78,24 @@ void TypeSupport::set_members(const message_type_support_callbacks_t * members)
m_typeSize = (m_typeSize + 3) & ~3;
}

void TypeSupport::set_members_v2(const message_type_support_callbacks_t * members)
{

set_members(members);

if (nullptr != members->key_callbacks)
{
key_callbacks_ = members->key_callbacks;
m_isGetKeyDefined = true;

key_max_serialized_size_ = key_callbacks_->max_serialized_size_key(key_is_unbounded_);
if (!key_is_unbounded_)
{
key_buffer_.reserve(key_max_serialized_size_);
}
}
}

size_t TypeSupport::getEstimatedSerializedSize(const void * ros_message, const void * impl) const
{
if (is_plain_) {
Expand Down Expand Up @@ -129,14 +166,90 @@ bool TypeSupport::deserializeROSmessage(
return true;
}

MessageTypeSupport::MessageTypeSupport(const message_type_support_callbacks_t * members)
bool TypeSupport::get_key_hash_from_ros_message(
void * ros_message,
eprosima::fastrtps::rtps::InstanceHandle_t * ihandle,
bool force_md5,
const void * impl) const
{
assert(ros_message);
(void)impl;

// retrieve estimated serialized size in case key is unbounded
if (key_is_unbounded_)
{
key_max_serialized_size_ = (std::max) (
key_max_serialized_size_,
key_callbacks_->get_serialized_size_key(ros_message));
key_buffer_.reserve(key_max_serialized_size_);
}

eprosima::fastcdr::FastBuffer fast_buffer(
reinterpret_cast<char *>(key_buffer_.data()),
key_max_serialized_size_);

eprosima::fastcdr::Cdr ser(
fast_buffer, eprosima::fastcdr::Cdr::DEFAULT_ENDIAN, eprosima::fastcdr::Cdr::DDS_CDR);

key_callbacks_->cdr_serialize_key(ros_message, ser);

const size_t max_serialized_key_length = 16;

auto ser_length = ser.getSerializedDataLength();

// check for md5
if (force_md5 || key_max_serialized_size_ > max_serialized_key_length)
{
md5_.init();
md5_.update(key_buffer_.data(), static_cast<unsigned int>(ser_length));
md5_.finalize();

for (uint8_t i = 0; i < max_serialized_key_length; ++i)
{
ihandle->value[i] = md5_.digest[i];
}
}
else
{
memset(ihandle->value, 0, max_serialized_key_length);
for (uint8_t i = 0; i < ser_length; ++i)
{
ihandle->value[i] = key_buffer_[i];
}
}

return true;
}

MessageTypeSupport::MessageTypeSupport(
const message_type_support_callbacks_t * members,
uint8_t abi_version)
{
assert(members);

abi_version_ = abi_version;

std::string name = _create_type_name(members);
this->setName(name.c_str());

set_members(members);
switch (abi_version)
{
case TypeSupport::AbiVersion::ABI_V1:
{
set_members(members);
break;
}
case TypeSupport::AbiVersion::ABI_V2:
{
set_members_v2(members);
break;
}
default:
{
set_members(members);
break;
}
}
}

ServiceTypeSupport::ServiceTypeSupport()
Expand Down
2 changes: 2 additions & 0 deletions rmw_fastrtps_cpp/src/type_support_common.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@
#include "rosidl_typesupport_fastrtps_cpp/service_type_support.h"
#define RMW_FASTRTPS_CPP_TYPESUPPORT_C rosidl_typesupport_fastrtps_c__identifier
#define RMW_FASTRTPS_CPP_TYPESUPPORT_CPP rosidl_typesupport_fastrtps_cpp::typesupport_identifier
#define RMW_FASTRTPS_CPP_TYPESUPPORT_C_V2 rosidl_typesupport_fastrtps_c__identifier_v2
#define RMW_FASTRTPS_CPP_TYPESUPPORT_CPP_V2 rosidl_typesupport_fastrtps_cpp::typesupport_identifier_v2

using MessageTypeSupport_cpp = rmw_fastrtps_cpp::MessageTypeSupport;
using TypeSupport_cpp = rmw_fastrtps_cpp::TypeSupport;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ template<typename MembersType>
class MessageTypeSupport : public TypeSupport<MembersType>
{
public:
MessageTypeSupport(const MembersType * members, const void * ros_type_support);
MessageTypeSupport(const MembersType * members, const void * ros_type_support, uint8_t abi_version);
};

} // namespace rmw_fastrtps_dynamic_cpp
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,12 @@ namespace rmw_fastrtps_dynamic_cpp

template<typename MembersType>
MessageTypeSupport<MembersType>::MessageTypeSupport(
const MembersType * members, const void * ros_type_support)
const MembersType * members, const void * ros_type_support, uint8_t abi_version)
: TypeSupport<MembersType>(ros_type_support)
{
assert(members);
this->members_ = members;
this->abi_version_ = abi_version;

std::ostringstream ss;
std::string message_namespace(this->members_->message_namespace_);
Expand All @@ -56,10 +57,17 @@ MessageTypeSupport<MembersType>::MessageTypeSupport(
// Encapsulation size
this->m_typeSize = 4;
if (this->members_->member_count_ != 0) {
this->m_typeSize += static_cast<uint32_t>(this->calculateMaxSerializedSize(members, 0));
this->m_typeSize += static_cast<uint32_t>(this->calculateMaxSerializedSize(members, 0, this->key_max_serialized_size_));
} else {
this->m_typeSize++;
}

if (this->key_max_serialized_size_ != 0)
{
this->m_isGetKeyDefined = true;
this->key_buffer_.reserve(this->key_max_serialized_size_);
}

// Account for RTPS submessage alignment
this->m_typeSize = (this->m_typeSize + 3) & ~3;
}
Expand Down
Loading

0 comments on commit d2b84e9

Please sign in to comment.