Skip to content

Commit

Permalink
Type propagation policy (#5081)
Browse files Browse the repository at this point in the history
* Refs #21313: Substitute auto_fill_type_information with fastdds.type_propagation policy

Signed-off-by: eduponz <eduardoponz@eprosima.com>

* Refs #21313: Rework DynamicPubSubType.hpp doxydoc

Signed-off-by: eduponz <eduardoponz@eprosima.com>

* Refs #21313: Add minimal bandwidth type propagation

Signed-off-by: eduponz <eduardoponz@eprosima.com>

* Refs #21313: Add registration only bandwidth type propagation

Signed-off-by: eduponz <eduardoponz@eprosima.com>

* Refs #21313: Only init TypeLookupManager when required

Signed-off-by: eduponz <eduardoponz@eprosima.com>

* Refs #21313: Add entry to versions.md

Signed-off-by: eduponz <eduardoponz@eprosima.com>

* Refs #21313: Remove unrecheable code

Signed-off-by: eduponz <eduardoponz@eprosima.com>

* Refs #21313: Improve tests

Signed-off-by: eduponz <eduardoponz@eprosima.com>

* Refs #21313: Remove unnecessary includes

Signed-off-by: eduponz <eduardoponz@eprosima.com>

* Refs #21313: Add RTPSParticipantImplt::type_propagation API to group code

Signed-off-by: eduponz <eduardoponz@eprosima.com>

* Refs #21313: Rename RTPSParticipant attr getter to get_attributes

Signed-off-by: eduponz <eduardoponz@eprosima.com>

* Refs #21313. Fix doxygen reference

Signed-off-by: eduponz <eduardoponz@eprosima.com>

* Refs #21313: Fix tests

Signed-off-by: Ricardo González Moreno <ricardo@richiware.dev>

* Refs #21313: Update doxygen documentation

Signed-off-by: Ricardo González Moreno <ricardo@richiware.dev>

* Refs #21313. Fix after rebase

Signed-off-by: Ricardo González Moreno <ricardo@richiware.dev>

* Refs #21313: Apply Miguel's suggestions

Signed-off-by: eduponz <eduardoponz@eprosima.com>

* Refs #21313: Fix build with BUILD_DOCUMENTATION=ON

Signed-off-by: eduponz <eduardoponz@eprosima.com>

* Refs #21313: Fix build with BUILD_DOCUMENTATION=ON and Fast CDR as thirdparty

Signed-off-by: eduponz <eduardoponz@eprosima.com>

* Refs #21313: Uncrustify

Signed-off-by: eduponz <eduardoponz@eprosima.com>

* Refs #21313: Fix uncrustify

Signed-off-by: eduponz <eduardoponz@eprosima.com>

* Refs #21313: Fix failing tests

Signed-off-by: eduponz <eduardoponz@eprosima.com>

* Refs #21313: Fix potential ABBA between PDP and RTPSParticipantImpl mutexes

Signed-off-by: eduponz <eduardoponz@eprosima.com>

---------

Signed-off-by: eduponz <eduardoponz@eprosima.com>
Signed-off-by: Ricardo González Moreno <ricardo@richiware.dev>
Co-authored-by: Ricardo González Moreno <ricardo@richiware.dev>
  • Loading branch information
EduPonz and richiware authored Jul 26, 2024
1 parent 9e4f961 commit 01cbcd1
Show file tree
Hide file tree
Showing 73 changed files with 1,373 additions and 364 deletions.
14 changes: 7 additions & 7 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -484,7 +484,7 @@ if(BUILD_DOCUMENTATION)
endif()

# Target to create documentation directories
add_custom_target(docdirs
add_custom_target(fastdds_docdirs
COMMAND ${CMAKE_COMMAND} -E make_directory ${PROJECT_BINARY_DIR}/doc
COMMENT "Creating documentation directory" VERBATIM)

Expand All @@ -497,12 +497,12 @@ if(BUILD_DOCUMENTATION)
# Configure the template doxyfile for or specific project
configure_file(utils/doxygen/doxyfile.in ${PROJECT_BINARY_DIR}/doxyfile @ONLY IMMEDIATE)
# Add custom target to run doxygen when ever the project is build
add_custom_target(doxygen
add_custom_target(fastdds_doxygen
COMMAND "${DOXYGEN_EXECUTABLE}" "${PROJECT_BINARY_DIR}/doxyfile"
SOURCES "${PROJECT_BINARY_DIR}/doxyfile"
COMMENT "Generating API documentation with doxygen" VERBATIM)

add_dependencies(doxygen docdirs)
add_dependencies(fastdds_doxygen fastdds_docdirs)

### README html ########################

Expand Down Expand Up @@ -552,15 +552,15 @@ if(BUILD_DOCUMENTATION)
COMMAND ${CMAKE_COMMAND} -P ${CMAKE_CURRENT_BINARY_DIR}/readthedocs_custom.cmake
)

add_dependencies(readthedocs docdirs)
add_dependencies(readthedocs fastdds_docdirs)
endif()

add_custom_target(doc ALL
add_custom_target(fastdds_doc ALL
COMMENT "Generated project documentation" VERBATIM)

add_dependencies(doc doxygen)
add_dependencies(fastdds_doc fastdds_doxygen)
if(NOT CHECK_DOCUMENTATION)
add_dependencies(doc readthedocs)
add_dependencies(fastdds_doc readthedocs)
endif()
endif()

Expand Down
2 changes: 1 addition & 1 deletion examples/cpp/xtypes/CLIParser.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ class CLIParser
}
else if (config.entity == CLIParser::EntityKind::SUBSCRIBER)
{
EPROSIMA_LOG_ERROR(CLI_PARSER, "--xml-type flag available only for subscriber entity");
EPROSIMA_LOG_ERROR(CLI_PARSER, "--xml-type flag available only for publisher entity");
print_help(EXIT_FAILURE);
}
else
Expand Down
3 changes: 3 additions & 0 deletions examples/cpp/xtypes/PublisherApp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,9 @@ void PublisherApp::run()
return is_stopped();
});
}

// Wait for acknowledgments with 500 ms timeout
writer_->wait_for_acknowledgments({0, 500000000});
}

bool PublisherApp::publish()
Expand Down
7 changes: 4 additions & 3 deletions examples/cpp/xtypes/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -127,20 +127,21 @@ This is accomplished by setting the environment variable ``FASTDDS_DEFAULT_PROFI
* Ubuntu ( / MacOS )

```shell
user@machine:example_path$ export FASTDDS_DEFAULT_PROFILES_FILE=xtypes_profile.xml
user@machine:example_path$ export FASTDDS_DEFAULT_PROFILES_FILE=xtypes_complete_profile.xml
```

* Windows

```powershell
example_path> set FASTDDS_DEFAULT_PROFILES_FILE=xtypes_profile.xml
example_path> set FASTDDS_DEFAULT_PROFILES_FILE=xtypes_complete_profile.xml
```

The example provides with an XML profiles files with certain QoS:
The example provides with two XML profiles files with certain QoS:

- Reliable reliability: avoid sample loss.
- Transient local durability: enable late-join subscriber applications to receive previous samples.
- Keep-last history with high depth: ensure certain amount of previous samples for late-joiners.
- Type propagation: Set to either complete of minimal depending on the used XML profiles file.

Applying different configurations to the entities will change to a greater or lesser extent how the application behaves in relation to sample management.
Even when these settings affect the behavior of the sample management, the applications' output will be the similar.
80 changes: 58 additions & 22 deletions examples/cpp/xtypes/SubscriberApp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,11 @@
#include "SubscriberApp.hpp"

#include <condition_variable>
#include <mutex>
#include <stdexcept>

#include <fastcdr/exceptions/BadParamException.h>

#include <fastdds/dds/core/status/SubscriptionMatchedStatus.hpp>
#include <fastdds/dds/domain/DomainParticipantFactory.hpp>
#include <fastdds/dds/subscriber/DataReader.hpp>
Expand Down Expand Up @@ -55,7 +58,7 @@ SubscriberApp::SubscriberApp(
, reader_(nullptr)
, samples_(config.samples)
, received_samples_(0)
, type_discovered_(false)
, type_discovered_("")
, stop_(false)
{
// Create the participant
Expand Down Expand Up @@ -114,11 +117,36 @@ void SubscriberApp::on_subscription_matched(
void SubscriberApp::on_data_available(
DataReader* reader)
{
using eprosima::fastcdr::exception::BadParamException;

SampleInfo info;
while ((!is_stopped()) && (RETCODE_OK == reader->take_next_sample(&hello_, &info)))
{
// Mininal type objects are registered with autogenerated member names that may containg non-utf-8 chars.
bool is_member_name_known = false;

// Extract type kind from remote type object
TypeKind type_kind = TK_NONE;

try
{
type_kind = remote_type_object_.complete()._d();
is_member_name_known = true;
}
catch (const BadParamException&)
{
try
{
type_kind = remote_type_object_.minimal()._d();
}
catch (const BadParamException&)
{
throw std::runtime_error("Cannot get type kind from remote type object");
}
}

if (ALIVE_INSTANCE_STATE == info.instance_state && info.valid_data &&
TK_STRUCTURE == remote_type_object_.complete()._d())
TK_STRUCTURE == type_kind)
{
// Increase received samples
received_samples_++;
Expand All @@ -136,7 +164,8 @@ void SubscriberApp::on_data_available(
// Print all the members
for (auto elem : members)
{
std::cout << " - " << elem.first << ": ";
std::string member_name = is_member_name_known ? elem.first.c_str() : "Unkown member name";
std::cout << " - " << member_name << ": ";

MemberDescriptor::_ref_type member_descriptor = {traits<MemberDescriptor>::make_shared()};

Expand Down Expand Up @@ -197,49 +226,56 @@ void SubscriberApp::on_data_writer_discovery(
// We don't want to ignore the writer
should_be_ignored = false;

std::lock_guard<std::mutex> lck(mtx_);

// Check if the discovered topic is the one we are interested in
if (topic_name_ == info.topic_name.to_string())
if ((type_discovered_ == "") && (topic_name_ == info.topic_name.to_string()))
{
// Get remote type information and use it to retrieve the type object
auto type_info = info.type_information.type_information;
auto type_id = type_info.complete().typeid_with_size().type_id();

if (RETCODE_OK != DomainParticipantFactory::get_instance()->type_object_registry().get_type_object(
type_id,
remote_type_object_))
remote_type_information_ = info.type_information.type_information;
auto type_id_complete = remote_type_information_.complete().typeid_with_size().type_id();
auto type_id_minimal = remote_type_information_.minimal().typeid_with_size().type_id();

if ((RETCODE_OK != DomainParticipantFactory::get_instance()->type_object_registry().get_type_object(
type_id_complete,
remote_type_object_)) &&
(RETCODE_OK != DomainParticipantFactory::get_instance()->type_object_registry().get_type_object(
type_id_minimal,
remote_type_object_)))
{
std::cout << "Cannot get discovered type from registry:" << std::endl;
std::cout << " - Topic name: " << info.topic_name << std::endl;
std::cout << " - Type name: " << info.type_name << std::endl;
throw std::runtime_error("Error getting type object from registry");
}

// Notify run thread that type has been discovered
type_discovered_.store(true);
terminate_cv_.notify_one();
type_discovered_ = info.type_name.to_string();
cv_.notify_one();
}
}

void SubscriberApp::run()
{
// Wait for type discovery
{
std::unique_lock<std::mutex> lck(terminate_cv_mtx_);
terminate_cv_.wait(lck, [&]
std::unique_lock<std::mutex> lck(mtx_);
cv_.wait(lck, [&]
{
return is_stopped() || type_discovered_.load();
return is_stopped() || (type_discovered_ != "");
});
}

// Create entities unless we need to exit
if (type_discovered_)
if (type_discovered_ != "")
{
initialize_entities();
}

// Wait for shutdown command
{
std::unique_lock<std::mutex> lck(terminate_cv_mtx_);
terminate_cv_.wait(lck, [&]
std::unique_lock<std::mutex> lck(mtx_);
cv_.wait(lck, [&]
{
return is_stopped();
});
Expand All @@ -254,7 +290,7 @@ bool SubscriberApp::is_stopped()
void SubscriberApp::stop()
{
stop_.store(true);
terminate_cv_.notify_all();
cv_.notify_all();
}

void SubscriberApp::initialize_entities()
Expand All @@ -276,9 +312,9 @@ void SubscriberApp::initialize_entities()
throw std::runtime_error("Error building type");
}

TypeSupport dyn_type_support(new DynamicPubSubType(remote_type_));
TypeSupport dyn_type_support(new DynamicPubSubType(remote_type_, remote_type_information_));

if (RETCODE_OK != dyn_type_support.register_type(participant_))
if (RETCODE_OK != dyn_type_support.register_type(participant_, type_discovered_))
{
throw std::runtime_error("Error registering type");
}
Expand All @@ -304,7 +340,7 @@ void SubscriberApp::initialize_entities()
// Create the topic
TopicQos topic_qos = TOPIC_QOS_DEFAULT;
participant_->get_default_topic_qos(topic_qos);
topic_ = participant_->create_topic(topic_name_, dyn_type_support.get_type_name(), topic_qos);
topic_ = participant_->create_topic(topic_name_, type_discovered_, topic_qos);

if (nullptr == topic_)
{
Expand Down
10 changes: 7 additions & 3 deletions examples/cpp/xtypes/SubscriberApp.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
#define FASTDDS_EXAMPLES_CPP_XTYPES__SUBSCRIBERAPP_HPP

#include <condition_variable>
#include <mutex>
#include <string>

#include <fastdds/dds/domain/DomainParticipant.hpp>
#include <fastdds/dds/domain/DomainParticipantListener.hpp>
Expand Down Expand Up @@ -82,6 +84,8 @@ class SubscriberApp : public Application, public DomainParticipantListener

::xtypes::TypeObject remote_type_object_;

::xtypes::TypeInformation remote_type_information_;

DynamicType::_ref_type remote_type_;

DomainParticipant* participant_;
Expand All @@ -98,13 +102,13 @@ class SubscriberApp : public Application, public DomainParticipantListener

uint16_t received_samples_;

std::atomic<bool> type_discovered_;
std::string type_discovered_;

std::atomic<bool> stop_;

mutable std::mutex terminate_cv_mtx_;
mutable std::mutex mtx_;

std::condition_variable terminate_cv_;
std::condition_variable cv_;

};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,15 @@
<domainId>0</domainId>
<rtps>
<name>xtypes_participant</name>
<propertiesPolicy>
<properties>
<!-- Explicitly activate complete type propagation -->
<property>
<name>fastdds.type_propagation</name>
<value>enabled</value>
</property>
</properties>
</propertiesPolicy>
</rtps>
</participant>
<data_writer profile_name="xtypes_datawriter_profile" is_default_profile="true">
Expand Down
72 changes: 72 additions & 0 deletions examples/cpp/xtypes/xtypes_minimal_profile.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
<?xml version="1.0" encoding="UTF-8" ?>
<dds xmlns="http://www.eprosima.com" >
<profiles>
<participant profile_name="xtypes_participant_profile" is_default_profile="true">
<domainId>0</domainId>
<rtps>
<name>xtypes_participant</name>
<propertiesPolicy>
<properties>
<!-- Activate minimal type propagation only -->
<property>
<name>fastdds.type_propagation</name>
<value>minimal_bandwidth</value>
</property>
</properties>
</propertiesPolicy>
</rtps>
</participant>
<data_writer profile_name="xtypes_datawriter_profile" is_default_profile="true">
<qos>
<durability>
<kind>TRANSIENT_LOCAL</kind>
</durability>
<reliability>
<kind>RELIABLE</kind>
</reliability>
</qos>
<topic>
<historyQos>
<kind>KEEP_LAST</kind>
<depth>100</depth>
</historyQos>
<resourceLimitsQos>
<max_samples>100</max_samples>
<max_instances>1</max_instances>
<max_samples_per_instance>100</max_samples_per_instance>
</resourceLimitsQos>
</topic>
</data_writer>

<data_reader profile_name="xtypes_datareader_profile" is_default_profile="true">
<qos>
<durability>
<kind>TRANSIENT_LOCAL</kind>
</durability>
<reliability>
<kind>RELIABLE</kind>
</reliability>
</qos>
<topic>
<historyQos>
<kind>KEEP_LAST</kind>
<depth>100</depth>
</historyQos>
<resourceLimitsQos>
<max_samples>100</max_samples>
<max_instances>1</max_instances>
<max_samples_per_instance>100</max_samples_per_instance>
</resourceLimitsQos>
</topic>
</data_reader>
</profiles>

<types>
<type>
<struct name="HelloWorld">
<member name="index" type="uint32"/>
<member name="message" type="string"/>
</struct>
</type>
</types>
</dds>
7 changes: 7 additions & 0 deletions include/fastdds/dds/core/policy/ParameterTypes.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -1176,6 +1176,13 @@ const char* const parameter_policy_physical_data_process = "fastdds.physical_dat
*/
const char* const parameter_enable_monitor_service = "fastdds.enable_monitor_service";

/**
* Parameter property value for configuring type propagation
*
* @ingroup PARAMETER_MODULE
*/
const char* const parameter_policy_type_propagation = "fastdds.type_propagation";

/**
* @ingroup PARAMETER_MODULE
*/
Expand Down
Loading

0 comments on commit 01cbcd1

Please sign in to comment.