Skip to content

Commit

Permalink
New max_message_size property to limit output datagrams size (#4777) (
Browse files Browse the repository at this point in the history
#4807)

* Refs #20489. Parse property in RTPS writer.

Signed-off-by: Miguel Company <miguelcompany@eprosima.com>

* Refs #20849. Parse property in RTPS participant.

Signed-off-by: Miguel Company <miguelcompany@eprosima.com>

* Refs #20849: Add test for RTPS writer

Signed-off-by: elianalf <62831776+elianalf@users.noreply.github.com>

* Refs #20849: Add test for participant

Signed-off-by: elianalf <62831776+elianalf@users.noreply.github.com>

* Refs #20849: Apply suggestions

Signed-off-by: elianalf <62831776+elianalf@users.noreply.github.com>

* Refs #20849: Uncrustify

Signed-off-by: elianalf <62831776+elianalf@users.noreply.github.com>

* Refs #20849: Apply suggestions

Signed-off-by: elianalf <62831776+elianalf@users.noreply.github.com>

* Refs #20849: Add tests in DDS layer

Signed-off-by: elianalf <62831776+elianalf@users.noreply.github.com>

* Refs #20849: Apply suggestions

Signed-off-by: elianalf <62831776+elianalf@users.noreply.github.com>

---------

Signed-off-by: Miguel Company <miguelcompany@eprosima.com>
Signed-off-by: elianalf <62831776+elianalf@users.noreply.github.com>
Co-authored-by: Miguel Company <miguelcompany@eprosima.com>
(cherry picked from commit 6d20b64)

Co-authored-by: elianalf <62831776+elianalf@users.noreply.github.com>
(cherry picked from commit 0571391)

# Conflicts:
#	src/cpp/rtps/participant/RTPSParticipantImpl.cpp
#	test/blackbox/common/DDSBlackboxTestsBasic.cpp
  • Loading branch information
mergify[bot] committed Jun 5, 2024
1 parent 0103a39 commit 41bd5a0
Show file tree
Hide file tree
Showing 6 changed files with 728 additions and 13 deletions.
3 changes: 3 additions & 0 deletions include/fastdds/rtps/writer/RTPSWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

#include <chrono>
#include <functional>
#include <limits>
#include <memory>
#include <mutex>
#include <vector>
Expand Down Expand Up @@ -472,6 +473,8 @@ class RTPSWriter

//! Flow controller.
fastdds::rtps::FlowController* flow_controller_;
//! Maximum number of bytes allowed for an RTPS datagram generated by this writer.
uint32_t max_output_message_size_ = std::numeric_limits<uint32_t>::max();

//!WriterHistory
WriterHistory* mp_history = nullptr;
Expand Down
26 changes: 24 additions & 2 deletions src/cpp/rtps/participant/RTPSParticipantImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -359,6 +359,7 @@ RTPSParticipantImpl::RTPSParticipantImpl(
});
}

<<<<<<< HEAD
// Creation of user locator and receiver resources
//If no default locators are defined we define some.
/* The reasoning here is the following.
Expand Down Expand Up @@ -423,6 +424,24 @@ RTPSParticipantImpl::RTPSParticipantImpl(
logWarning(RTPS_PARTICIPANT,
"Metatraffic multicast port " << meta_multicast_port_for_check << " cannot be opened."
" It may is opened by another application. Discovery may fail.");
=======
void RTPSParticipantImpl::setup_output_traffic()
{
{
const std::string* max_size_property =
PropertyPolicyHelper::find_property(m_att.properties, "fastdds.max_message_size");
if (max_size_property != nullptr)
{
try
{
max_output_message_size_ = std::stoul(*max_size_property);
}
catch (const std::exception& e)
{
EPROSIMA_LOG_ERROR(RTPS_WRITER, "Error parsing max_message_size property: " << e.what());
}
}
>>>>>>> 0571391b5 (New `max_message_size` property to limit output datagrams size (#4777) (#4807))
}

bool allow_growing_buffers = m_att.allocation.send_buffers.dynamic;
Expand Down Expand Up @@ -2109,8 +2128,11 @@ uint32_t RTPSParticipantImpl::getMaxMessageSize() const
#endif // if HAVE_SECURITY

return (std::min)(
m_network_Factory.get_max_message_size_between_transports(),
max_receiver_buffer_size);
{
max_output_message_size_,
m_network_Factory.get_max_message_size_between_transports(),
max_receiver_buffer_size
});
}

uint32_t RTPSParticipantImpl::getMaxDataSize()
Expand Down
3 changes: 3 additions & 0 deletions src/cpp/rtps/participant/RTPSParticipantImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include <chrono>
#include <cstdio>
#include <cstdlib>
#include <limits>
#include <list>
#include <mutex>
#include <sys/types.h>
Expand Down Expand Up @@ -574,6 +575,8 @@ class RTPSParticipantImpl
std::function<bool(const std::string&)> type_check_fn_;
//!Pool of send buffers
std::unique_ptr<SendBuffersManager> send_buffers_;
//! Maximum number of bytes allowed for an RTPS datagram generated by this writer.
uint32_t max_output_message_size_ = std::numeric_limits<uint32_t>::max();

/**
* Client override flag: SIMPLE participant that has been overriden with the environment variable and transformed
Expand Down
40 changes: 29 additions & 11 deletions src/cpp/rtps/writer/RTPSWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,24 +17,22 @@
*
*/

#include <mutex>

#include <rtps/history/BasicPayloadPool.hpp>
#include <rtps/history/CacheChangePool.h>

#include <rtps/DataSharing/DataSharingNotifier.hpp>
#include <rtps/DataSharing/WriterPool.hpp>
#include <fastdds/rtps/writer/RTPSWriter.h>

#include <rtps/participant/RTPSParticipantImpl.h>
#include <mutex>

#include <fastdds/dds/log/Log.hpp>

#include <fastdds/rtps/writer/RTPSWriter.h>

#include <fastdds/rtps/attributes/PropertyPolicy.h>
#include <fastdds/rtps/history/WriterHistory.h>

#include <fastdds/rtps/messages/RTPSMessageCreator.h>

#include <rtps/DataSharing/DataSharingNotifier.hpp>
#include <rtps/DataSharing/WriterPool.hpp>
#include <rtps/history/BasicPayloadPool.hpp>
#include <rtps/history/CacheChangePool.h>
#include <rtps/participant/RTPSParticipantImpl.h>

#include <statistics/rtps/StatisticsBase.hpp>
#include <statistics/rtps/messages/RTPSStatisticsMessages.hpp>

Expand Down Expand Up @@ -109,6 +107,22 @@ void RTPSWriter::init(
const std::shared_ptr<IChangePool>& change_pool,
const WriterAttributes& att)
{
{
const std::string* max_size_property =
PropertyPolicyHelper::find_property(att.endpoint.properties, "fastdds.max_message_size");
if (max_size_property != nullptr)
{
try
{
max_output_message_size_ = std::stoul(*max_size_property);
}
catch (const std::exception& e)
{
EPROSIMA_LOG_ERROR(RTPS_WRITER, "Error parsing max_message_size property: " << e.what());
}
}
}

payload_pool_ = payload_pool;
change_pool_ = change_pool;
fixed_payload_size_ = 0;
Expand Down Expand Up @@ -306,6 +320,10 @@ uint32_t RTPSWriter::getMaxDataSize()
uint32_t flow_max = flow_controller_->get_max_payload();
uint32_t part_max = mp_RTPSParticipant->getMaxMessageSize();
uint32_t max_size = flow_max > part_max ? part_max : flow_max;
if (max_output_message_size_ < max_size)
{
max_size = max_output_message_size_;
}

max_size = calculateMaxDataSize(max_size);
return max_size &= ~3;
Expand Down
Loading

0 comments on commit 41bd5a0

Please sign in to comment.