From 9db08590eed128531b23b22a5d71a7fe442208d4 Mon Sep 17 00:00:00 2001 From: Miguel Company Date: Mon, 20 May 2024 07:45:22 +0200 Subject: [PATCH 1/6] Handle errors when setting socket buffer sizes (#4760) (#4796) * Refs #20972. Method socket_buffer_size in DDS_PIM helpers sets also sending buffer. Signed-off-by: Miguel Company * Refs #20972. Method socket_buffer_size in fastrtps_deprecated helpers sets also sending buffer. Signed-off-by: Miguel Company * Refs #20972. Improvements in on_sample_lost blackbox tests. Signed-off-by: Miguel Company * Refs #20972. Move code into new private methods. Signed-off-by: Miguel Company * Refs #20972. Refactor on configure_send_buffer_size. Signed-off-by: Miguel Company * Refs #20972. Refactor on configure_receive_buffer_size. Signed-off-by: Miguel Company * Refs #20972. Check user configuration at the beginning of init method. Signed-off-by: Miguel Company * Refs #20972. Use maxMessageSize as minimum possible value. Signed-off-by: Miguel Company * Refs #20972. Applying changes on OpenAndBindUnicastOutputSocket. Signed-off-by: Miguel Company * Refs #20972. Applying changes on CreateInputChannelResource. Signed-off-by: Miguel Company * Revert "Refs #20972. Applying changes on CreateInputChannelResource." This reverts commit ed848e9de267fcfdbe1cb7294b2e408d85a33575. * Refs #20972. Add helper header with template method. Signed-off-by: Miguel Company * Refs #20972. Configure methods return boolean. Signed-off-by: Miguel Company * Refs #20972. Configure methods use new template method. Signed-off-by: Miguel Company * Refs #20972. OpenAndBindUnicastOutputSocket uses new template method. Signed-off-by: Miguel Company * Refs #20972. Changes in OpenAndBindInputSocket. Signed-off-by: Miguel Company * Refs #20972.Setting options on TCP channels. Signed-off-by: Miguel Company * Refs #20972. Doxygen. Signed-off-by: Miguel Company * Refs #20972. Check limits of configured sizes. Signed-off-by: Miguel Company * Refs #20972. Add UDP unit tests. Signed-off-by: Miguel Company * Refs #20972. Add TCP unit tests. Signed-off-by: Miguel Company * Refs #20972. Move checks in TCP to beginning of init. Signed-off-by: Miguel Company * Refs #20972. Refactor for common code in UDP. Signed-off-by: Miguel Company * Refs #20972. Refactor for common code in TCP. Signed-off-by: Miguel Company * Refs #20972. Remove unused constants in UDP tests. Signed-off-by: Miguel Company * Refs #20972. Check final configuration on unit tests. Signed-off-by: Miguel Company * Refs #20972. Uncrustify. Signed-off-by: Miguel Company * Refs #20972. Less strict tests. Signed-off-by: Miguel Company * Refs #20972. Remove `s_minimumSocketBuffer` from tests. Signed-off-by: Miguel Company * Refs #20972. Deprecate `s_minimumSocketBuffer`. Signed-off-by: Miguel Company --------- Signed-off-by: Miguel Company (cherry picked from commit 53cd211a8449bac59c13e18912ebd7a9262de51c) # Conflicts: # src/cpp/rtps/transport/TCPTransportInterface.cpp # src/cpp/rtps/transport/UDPTransportInterface.cpp # src/cpp/rtps/transport/UDPv4Transport.cpp # src/cpp/rtps/transport/UDPv6Transport.cpp # test/blackbox/common/DDSBlackboxTestsListeners.cpp --- .../rtps/transport/TransportInterface.h | 3 +- .../fastrtps/transport/TransportInterface.h | 3 +- src/cpp/rtps/transport/TCPChannelResource.cpp | 49 +++++- src/cpp/rtps/transport/TCPChannelResource.h | 10 ++ .../transport/TCPChannelResourceBasic.cpp | 4 +- .../transport/TCPChannelResourceSecure.cpp | 4 +- .../rtps/transport/TCPTransportInterface.cpp | 64 +++++++- .../rtps/transport/UDPTransportInterface.cpp | 112 ++++++++++--- .../rtps/transport/UDPTransportInterface.h | 1 + src/cpp/rtps/transport/UDPv4Transport.cpp | 22 ++- src/cpp/rtps/transport/UDPv6Transport.cpp | 22 ++- src/cpp/rtps/transport/asio_helpers.hpp | 155 ++++++++++++++++++ test/blackbox/api/dds-pim/PubSubReader.hpp | 1 + test/blackbox/api/dds-pim/PubSubWriter.hpp | 1 + .../api/fastrtps_deprecated/PubSubReader.hpp | 1 + .../api/fastrtps_deprecated/PubSubWriter.hpp | 1 + .../common/DDSBlackboxTestsListeners.cpp | 21 +++ test/unittest/transport/TCPv4Tests.cpp | 73 ++++++++- test/unittest/transport/TCPv6Tests.cpp | 71 +++++++- test/unittest/transport/UDPv4Tests.cpp | 74 ++++++++- test/unittest/transport/UDPv6Tests.cpp | 73 ++++++++- 21 files changed, 716 insertions(+), 49 deletions(-) create mode 100644 src/cpp/rtps/transport/asio_helpers.hpp diff --git a/include/fastdds/rtps/transport/TransportInterface.h b/include/fastdds/rtps/transport/TransportInterface.h index 60dec2d299..26d713b9c5 100644 --- a/include/fastdds/rtps/transport/TransportInterface.h +++ b/include/fastdds/rtps/transport/TransportInterface.h @@ -34,7 +34,8 @@ namespace rtps { constexpr uint32_t s_maximumMessageSize = 65500; //! Default maximum initial peers range constexpr uint32_t s_maximumInitialPeersRange = 4; -//! Default minimum socket buffer +// Default minimum socket buffer +FASTDDS_DEPRECATED_UNTIL(3, s_minimumSocketBuffer, "Minimum socket buffer is now taken from the maximum msg size") constexpr uint32_t s_minimumSocketBuffer = 65536; //! Default IPv4 address static const std::string s_IPv4AddressAny = "0.0.0.0"; diff --git a/include/fastrtps/transport/TransportInterface.h b/include/fastrtps/transport/TransportInterface.h index 9f0a5f8876..22c1386803 100644 --- a/include/fastrtps/transport/TransportInterface.h +++ b/include/fastrtps/transport/TransportInterface.h @@ -29,7 +29,8 @@ using TransportInterface = fastdds::rtps::TransportInterface; static const uint32_t s_maximumMessageSize = fastdds::rtps::s_maximumMessageSize; static const uint32_t s_maximumInitialPeersRange = fastdds::rtps::s_maximumInitialPeersRange; -static const uint32_t s_minimumSocketBuffer = fastdds::rtps::s_minimumSocketBuffer; +FASTDDS_DEPRECATED_UNTIL(3, s_minimumSocketBuffer, "Minimum socket buffer is now taken from the maximum msg size") +static const uint32_t s_minimumSocketBuffer = 65536; static const std::string s_IPv4AddressAny = fastdds::rtps::s_IPv4AddressAny; static const std::string s_IPv6AddressAny = fastdds::rtps::s_IPv6AddressAny; diff --git a/src/cpp/rtps/transport/TCPChannelResource.cpp b/src/cpp/rtps/transport/TCPChannelResource.cpp index efc7a6903b..2fbaa3118c 100644 --- a/src/cpp/rtps/transport/TCPChannelResource.cpp +++ b/src/cpp/rtps/transport/TCPChannelResource.cpp @@ -18,6 +18,7 @@ #include #include +#include #include namespace eprosima { @@ -370,6 +371,52 @@ bool TCPChannelResource::check_socket_send_buffer( return true; } +void TCPChannelResource::set_socket_options( + asio::basic_socket& socket, + const TCPTransportDescriptor* options) +{ + uint32_t minimum_value = options->maxMessageSize; + + // Set the send buffer size + { + uint32_t desired_value = options->sendBufferSize; + uint32_t configured_value = 0; + if (!asio_helpers::try_setting_buffer_size( + socket, desired_value, minimum_value, configured_value)) + { + EPROSIMA_LOG_ERROR(TCP_TRANSPORT, + "Couldn't set send buffer size to minimum value: " << minimum_value); + } + else if (desired_value != configured_value) + { + EPROSIMA_LOG_WARNING(TCP_TRANSPORT, + "Couldn't set send buffer size to desired value. " + << "Using " << configured_value << " instead of " << desired_value); + } + } + + // Set the receive buffer size + { + uint32_t desired_value = options->receiveBufferSize; + uint32_t configured_value = 0; + if (!asio_helpers::try_setting_buffer_size( + socket, desired_value, minimum_value, configured_value)) + { + EPROSIMA_LOG_ERROR(TCP_TRANSPORT, + "Couldn't set receive buffer size to minimum value: " << minimum_value); + } + else if (desired_value != configured_value) + { + EPROSIMA_LOG_WARNING(TCP_TRANSPORT, + "Couldn't set receive buffer size to desired value. " + << "Using " << configured_value << " instead of " << desired_value); + } + } + + // Set the TCP_NODELAY option + socket.set_option(asio::ip::tcp::no_delay(options->enable_tcp_nodelay)); +} + } // namespace rtps -} // namespace fastrtps +} // namespace fastdds } // namespace eprosima diff --git a/src/cpp/rtps/transport/TCPChannelResource.h b/src/cpp/rtps/transport/TCPChannelResource.h index c0a9e97f7b..589ae6221e 100644 --- a/src/cpp/rtps/transport/TCPChannelResource.h +++ b/src/cpp/rtps/transport/TCPChannelResource.h @@ -234,6 +234,16 @@ class TCPChannelResource : public ChannelResource const size_t& msg_size, const asio::ip::tcp::socket::native_handle_type& socket_native_handle); + /** + * @brief Set descriptor options on a socket. + * + * @param socket Socket on which to set the options. + * @param options Descriptor with the options to set. + */ + static void set_socket_options( + asio::basic_socket& socket, + const TCPTransportDescriptor* options); + TCPConnectionType tcp_connection_type_; friend class TCPTransportInterface; diff --git a/src/cpp/rtps/transport/TCPChannelResourceBasic.cpp b/src/cpp/rtps/transport/TCPChannelResourceBasic.cpp index 51a28913c1..7e3f53789e 100644 --- a/src/cpp/rtps/transport/TCPChannelResourceBasic.cpp +++ b/src/cpp/rtps/transport/TCPChannelResourceBasic.cpp @@ -200,9 +200,7 @@ asio::ip::tcp::endpoint TCPChannelResourceBasic::local_endpoint( void TCPChannelResourceBasic::set_options( const TCPTransportDescriptor* options) { - socket_->set_option(socket_base::receive_buffer_size(options->receiveBufferSize)); - socket_->set_option(socket_base::send_buffer_size(options->sendBufferSize)); - socket_->set_option(ip::tcp::no_delay(options->enable_tcp_nodelay)); + TCPChannelResource::set_socket_options(*socket_, options); } void TCPChannelResourceBasic::cancel() diff --git a/src/cpp/rtps/transport/TCPChannelResourceSecure.cpp b/src/cpp/rtps/transport/TCPChannelResourceSecure.cpp index deddd7ffd3..7e42431537 100644 --- a/src/cpp/rtps/transport/TCPChannelResourceSecure.cpp +++ b/src/cpp/rtps/transport/TCPChannelResourceSecure.cpp @@ -280,9 +280,7 @@ asio::ip::tcp::endpoint TCPChannelResourceSecure::local_endpoint( void TCPChannelResourceSecure::set_options( const TCPTransportDescriptor* options) { - secure_socket_->lowest_layer().set_option(socket_base::receive_buffer_size(options->receiveBufferSize)); - secure_socket_->lowest_layer().set_option(socket_base::send_buffer_size(options->sendBufferSize)); - secure_socket_->lowest_layer().set_option(ip::tcp::no_delay(options->enable_tcp_nodelay)); + TCPChannelResource::set_socket_options(secure_socket_->lowest_layer(), options); } void TCPChannelResourceSecure::set_tls_verify_mode( diff --git a/src/cpp/rtps/transport/TCPTransportInterface.cpp b/src/cpp/rtps/transport/TCPTransportInterface.cpp index b550d9dece..6e2bd10c2f 100644 --- a/src/cpp/rtps/transport/TCPTransportInterface.cpp +++ b/src/cpp/rtps/transport/TCPTransportInterface.cpp @@ -18,10 +18,11 @@ #include #include #include +#include #include -#include #include #include +#include #include #include #include @@ -53,6 +54,7 @@ #include #include +#include #include #include #include @@ -442,6 +444,42 @@ bool TCPTransportInterface::DoInputLocatorsMatch( bool TCPTransportInterface::init( const fastrtps::rtps::PropertyPolicy* properties) { + uint32_t maximumMessageSize = max_msg_size_no_frag == 0 ? s_maximumMessageSize : max_msg_size_no_frag; + uint32_t cfg_max_msg_size = configuration()->maxMessageSize; + uint32_t cfg_send_size = configuration()->sendBufferSize; + uint32_t cfg_recv_size = configuration()->receiveBufferSize; + uint32_t max_int_value = static_cast(std::numeric_limits::max()); + + if (cfg_max_msg_size > maximumMessageSize) + { + EPROSIMA_LOG_ERROR(TRANSPORT_TCP, "maxMessageSize cannot be greater than " << maximumMessageSize); + return false; + } + + if (cfg_send_size > max_int_value) + { + EPROSIMA_LOG_ERROR(TRANSPORT_TCP, "sendBufferSize cannot be greater than " << max_int_value); + return false; + } + + if (cfg_recv_size > max_int_value) + { + EPROSIMA_LOG_ERROR(TRANSPORT_TCP, "receiveBufferSize cannot be greater than " << max_int_value); + return false; + } + + if ((cfg_send_size > 0) && (cfg_max_msg_size > cfg_send_size)) + { + EPROSIMA_LOG_ERROR(TRANSPORT_TCP, "maxMessageSize cannot be greater than sendBufferSize"); + return false; + } + + if ((cfg_recv_size > 0) && (cfg_max_msg_size > cfg_recv_size)) + { + EPROSIMA_LOG_ERROR(TRANSPORT_TCP, "maxMessageSize cannot be greater than receiveBufferSize"); + return false; + } + if (!apply_tls_config()) { // TODO decide wether the Transport initialization should keep working after this error @@ -474,8 +512,12 @@ bool TCPTransportInterface::init( } // Check system buffer sizes. - if (configuration()->sendBufferSize == 0) + uint32_t send_size = 0; + uint32_t recv_size = 0; + if (!asio_helpers::configure_buffer_sizes( + *initial_peer_local_locator_socket_, *configuration(), send_size, recv_size)) { +<<<<<<< HEAD socket_base::send_buffer_size option; initial_peer_local_locator_socket_->get_option(option); set_send_buffer_size(option.value()); @@ -501,21 +543,27 @@ bool TCPTransportInterface::init( if (configuration()->maxMessageSize > s_maximumMessageSize) { EPROSIMA_LOG_ERROR(RTCP_MSG_OUT, "maxMessageSize cannot be greater than 65000"); +======= + EPROSIMA_LOG_ERROR(TRANSPORT_TCP, "Couldn't set buffer sizes to minimum value: " << cfg_max_msg_size); +>>>>>>> 53cd211a8 (Handle errors when setting socket buffer sizes (#4760) (#4796)) return false; } - if (configuration()->maxMessageSize > configuration()->sendBufferSize) + if (cfg_send_size > 0 && send_size != cfg_send_size) { - EPROSIMA_LOG_ERROR(RTCP_MSG_OUT, "maxMessageSize cannot be greater than send_buffer_size"); - return false; + EPROSIMA_LOG_WARNING(TRANSPORT_TCP, "UDPTransport sendBufferSize could not be set to the desired value. " + << "Using " << send_size << " instead of " << cfg_send_size); } - if (configuration()->maxMessageSize > configuration()->receiveBufferSize) + if (cfg_recv_size > 0 && recv_size != cfg_recv_size) { - EPROSIMA_LOG_ERROR(RTCP_MSG_OUT, "maxMessageSize cannot be greater than receive_buffer_size"); - return false; + EPROSIMA_LOG_WARNING(TRANSPORT_TCP, "UDPTransport receiveBufferSize could not be set to the desired value. " + << "Using " << recv_size << " instead of " << cfg_recv_size); } + set_send_buffer_size(send_size); + set_receive_buffer_size(recv_size); + if (!rtcp_message_manager_) { rtcp_message_manager_ = std::make_shared(this); diff --git a/src/cpp/rtps/transport/UDPTransportInterface.cpp b/src/cpp/rtps/transport/UDPTransportInterface.cpp index 5c5feaa400..6290505a2f 100644 --- a/src/cpp/rtps/transport/UDPTransportInterface.cpp +++ b/src/cpp/rtps/transport/UDPTransportInterface.cpp @@ -14,15 +14,18 @@ #include -#include -#include #include #include +#include +#include +#include +#include #include #include -#include #include + +#include #include #include @@ -119,6 +122,7 @@ bool UDPTransportInterface::DoInputLocatorsMatch( bool UDPTransportInterface::init( const fastrtps::rtps::PropertyPolicy*) { +<<<<<<< HEAD if (configuration()->sendBufferSize == 0 || configuration()->receiveBufferSize == 0) { // Check system buffer sizes. @@ -155,25 +159,84 @@ bool UDPTransportInterface::init( if (configuration()->maxMessageSize > s_maximumMessageSize) { EPROSIMA_LOG_ERROR(RTPS_MSG_OUT, "maxMessageSize cannot be greater than 65000"); +======= + uint32_t maximumMessageSize = max_msg_size_no_frag == 0 ? s_maximumMessageSize : max_msg_size_no_frag; + uint32_t cfg_max_msg_size = configuration()->maxMessageSize; + uint32_t cfg_send_size = configuration()->sendBufferSize; + uint32_t cfg_recv_size = configuration()->receiveBufferSize; + uint32_t max_int_value = static_cast(std::numeric_limits::max()); + + if (cfg_max_msg_size > maximumMessageSize) + { + EPROSIMA_LOG_ERROR(TRANSPORT_UDP, "maxMessageSize cannot be greater than " << maximumMessageSize); +>>>>>>> 53cd211a8 (Handle errors when setting socket buffer sizes (#4760) (#4796)) return false; } - if (configuration()->maxMessageSize > configuration()->sendBufferSize) + if (cfg_send_size > max_int_value) { - EPROSIMA_LOG_ERROR(RTPS_MSG_OUT, "maxMessageSize cannot be greater than send_buffer_size"); + EPROSIMA_LOG_ERROR(TRANSPORT_UDP, "sendBufferSize cannot be greater than " << max_int_value); return false; } - if (configuration()->maxMessageSize > configuration()->receiveBufferSize) + if (cfg_recv_size > max_int_value) { - EPROSIMA_LOG_ERROR(RTPS_MSG_OUT, "maxMessageSize cannot be greater than receive_buffer_size"); + EPROSIMA_LOG_ERROR(TRANSPORT_UDP, "receiveBufferSize cannot be greater than " << max_int_value); return false; } +<<<<<<< HEAD // TODO(Ricardo) Create an event that update this list. get_ips(currentInterfaces); return true; +======= + if ((cfg_send_size > 0) && (cfg_max_msg_size > cfg_send_size)) + { + EPROSIMA_LOG_ERROR(TRANSPORT_UDP, "maxMessageSize cannot be greater than sendBufferSize"); + return false; + } + + if ((cfg_recv_size > 0) && (cfg_max_msg_size > cfg_recv_size)) + { + EPROSIMA_LOG_ERROR(TRANSPORT_UDP, "maxMessageSize cannot be greater than receiveBufferSize"); + return false; + } + + asio::error_code ec; + ip::udp::socket socket(io_service_); + socket.open(generate_protocol(), ec); + if (!!ec) + { + EPROSIMA_LOG_ERROR(TRANSPORT_UDP, "Error creating socket: " << ec.message()); + return false; + } + + bool ret = asio_helpers::configure_buffer_sizes(socket, *configuration(), mSendBufferSize, mReceiveBufferSize); + if (ret) + { + if (cfg_send_size > 0 && mSendBufferSize != cfg_send_size) + { + EPROSIMA_LOG_WARNING(TRANSPORT_UDP, "UDPTransport sendBufferSize could not be set to the desired value. " + << "Using " << mSendBufferSize << " instead of " << cfg_send_size); + } + + if (cfg_recv_size > 0 && mReceiveBufferSize != cfg_recv_size) + { + EPROSIMA_LOG_WARNING(TRANSPORT_UDP, "UDPTransport receiveBufferSize could not be set to the desired value. " + << "Using " << mReceiveBufferSize << " instead of " << cfg_recv_size); + } + + set_send_buffer_size(mSendBufferSize); + set_receive_buffer_size(mReceiveBufferSize); + } + else + { + EPROSIMA_LOG_ERROR(TRANSPORT_UDP, "Couldn't set buffer sizes to minimum value: " << cfg_max_msg_size); + } + + return ret; +>>>>>>> 53cd211a8 (Handle errors when setting socket buffer sizes (#4760) (#4796)) } bool UDPTransportInterface::IsInputChannelOpen( @@ -211,9 +274,8 @@ bool UDPTransportInterface::OpenAndBindInputSockets( catch (asio::system_error const& e) { (void)e; - EPROSIMA_LOG_INFO(RTPS_MSG_OUT, "UDPTransport Error binding at port: (" << IPLocator::getPhysicalPort( - locator) << ")" - << " with msg: " << e.what()); + EPROSIMA_LOG_INFO(TRANSPORT_UDP, "UDPTransport Error binding at port: (" + << IPLocator::getPhysicalPort(locator) << ")" << " with msg: " << e.what()); mInputSockets.erase(IPLocator::getPhysicalPort(locator)); return false; } @@ -243,7 +305,18 @@ eProsimaUDPSocket UDPTransportInterface::OpenAndBindUnicastOutputSocket( getSocketPtr(socket)->open(generate_protocol()); if (mSendBufferSize != 0) { - getSocketPtr(socket)->set_option(socket_base::send_buffer_size(static_cast(mSendBufferSize))); + uint32_t configured_value = 0; + if (!asio_helpers::try_setting_buffer_size( + socket, mSendBufferSize, configuration()->maxMessageSize, configured_value)) + { + EPROSIMA_LOG_ERROR(TRANSPORT_UDP, + "Couldn't set send buffer size to minimum value: " << configuration()->maxMessageSize); + } + else if (configured_value != mSendBufferSize) + { + EPROSIMA_LOG_WARNING(TRANSPORT_UDP, "UDPTransport sendBufferSize could not be set to the desired value. " + << "Using " << configured_value << " instead of " << mSendBufferSize); + } } getSocketPtr(socket)->set_option(ip::multicast::hops(configuration()->TTL)); getSocketPtr(socket)->bind(endpoint); @@ -301,7 +374,7 @@ bool UDPTransportInterface::OpenOutputChannel( catch (asio::system_error const& e) { (void)e; - EPROSIMA_LOG_WARNING(RTPS_MSG_OUT, "UDPTransport Error binding interface " + EPROSIMA_LOG_WARNING(TRANSPORT_UDP, "UDPTransport Error binding interface " << localhost_name() << " (skipping) with msg: " << e.what()); } } @@ -325,7 +398,7 @@ bool UDPTransportInterface::OpenOutputChannel( catch (asio::system_error const& e) { (void)e; - EPROSIMA_LOG_WARNING(RTPS_MSG_OUT, "UDPTransport Error binding interface " + EPROSIMA_LOG_WARNING(TRANSPORT_UDP, "UDPTransport Error binding interface " << (*locIt).name << " (skipping) with msg: " << e.what()); } } @@ -357,7 +430,7 @@ bool UDPTransportInterface::OpenOutputChannel( { (void)e; /* TODO Que hacer? - EPROSIMA_LOG_ERROR(RTPS_MSG_OUT, "UDPTransport Error binding at port: (" << IPLocator::getPhysicalPort(locator) << ")" + EPROSIMA_LOG_ERROR(TRANSPORT_UDP, "UDPTransport Error binding at port: (" << IPLocator::getPhysicalPort(locator) << ")" << " with msg: " << e.what()); for (auto& socket : mOutputSockets) { @@ -515,23 +588,24 @@ bool UDPTransportInterface::send( if ((ec.value() == asio::error::would_block) || (ec.value() == asio::error::try_again)) { - EPROSIMA_LOG_WARNING(RTPS_MSG_OUT, "UDP send would have blocked. Packet is dropped."); + EPROSIMA_LOG_WARNING(TRANSPORT_UDP, "UDP send would have blocked. Packet is dropped."); return true; } - EPROSIMA_LOG_WARNING(RTPS_MSG_OUT, ec.message()); + EPROSIMA_LOG_WARNING(TRANSPORT_UDP, ec.message()); return false; } } catch (const std::exception& error) { - EPROSIMA_LOG_WARNING(RTPS_MSG_OUT, error.what()); + EPROSIMA_LOG_WARNING(TRANSPORT_UDP, error.what()); return false; } (void)bytesSent; - EPROSIMA_LOG_INFO(RTPS_MSG_OUT, "UDPTransport: " << bytesSent << " bytes TO endpoint: " << destinationEndpoint - << " FROM " << getSocketPtr(socket)->local_endpoint()); + EPROSIMA_LOG_INFO(TRANSPORT_UDP, + "UDPTransport: " << bytesSent << " bytes TO endpoint: " << destinationEndpoint << + " FROM " << getSocketPtr(socket)->local_endpoint()); success = true; } diff --git a/src/cpp/rtps/transport/UDPTransportInterface.h b/src/cpp/rtps/transport/UDPTransportInterface.h index bbeeafed88..2e4585e9c9 100644 --- a/src/cpp/rtps/transport/UDPTransportInterface.h +++ b/src/cpp/rtps/transport/UDPTransportInterface.h @@ -292,6 +292,7 @@ class UDPTransportInterface : public TransportInterface bool return_loopback = false); std::atomic_bool rescan_interfaces_ = {true}; + }; } // namespace rtps diff --git a/src/cpp/rtps/transport/UDPv4Transport.cpp b/src/cpp/rtps/transport/UDPv4Transport.cpp index 2d1b1e0e4e..fc5be0411b 100644 --- a/src/cpp/rtps/transport/UDPv4Transport.cpp +++ b/src/cpp/rtps/transport/UDPv4Transport.cpp @@ -26,6 +26,13 @@ #include #include +<<<<<<< HEAD +======= +#include +#include + +#include +>>>>>>> 53cd211a8 (Handle errors when setting socket buffer sizes (#4760) (#4796)) using namespace std; using namespace asio; @@ -297,7 +304,20 @@ eProsimaUDPSocket UDPv4Transport::OpenAndBindInputSocket( getSocketPtr(socket)->open(generate_protocol()); if (mReceiveBufferSize != 0) { - getSocketPtr(socket)->set_option(socket_base::receive_buffer_size(mReceiveBufferSize)); + uint32_t configured_value = 0; + uint32_t minimum_value = configuration()->maxMessageSize; + if (!asio_helpers::try_setting_buffer_size( + socket, mReceiveBufferSize, minimum_value, configured_value)) + { + EPROSIMA_LOG_ERROR(TRANSPORT_UDPV4, + "Couldn't set receive buffer size to minimum value: " << minimum_value); + } + else if (mReceiveBufferSize != configured_value) + { + EPROSIMA_LOG_WARNING(TRANSPORT_UDPV4, + "Receive buffer size could not be set to the desired value. " + << "Using " << configured_value << " instead of " << mReceiveBufferSize); + } } if (is_multicast) diff --git a/src/cpp/rtps/transport/UDPv6Transport.cpp b/src/cpp/rtps/transport/UDPv6Transport.cpp index 385ee12e52..60191afd7e 100644 --- a/src/cpp/rtps/transport/UDPv6Transport.cpp +++ b/src/cpp/rtps/transport/UDPv6Transport.cpp @@ -23,6 +23,13 @@ #include #include #include +<<<<<<< HEAD +======= + +#include +#include +#include +>>>>>>> 53cd211a8 (Handle errors when setting socket buffer sizes (#4760) (#4796)) using namespace std; using namespace asio; @@ -301,7 +308,20 @@ eProsimaUDPSocket UDPv6Transport::OpenAndBindInputSocket( getSocketPtr(socket)->open(generate_protocol()); if (mReceiveBufferSize != 0) { - getSocketPtr(socket)->set_option(socket_base::receive_buffer_size(mReceiveBufferSize)); + uint32_t configured_value = 0; + uint32_t minimum_value = configuration()->maxMessageSize; + if (!asio_helpers::asio_helpers::try_setting_buffer_size( + socket, mReceiveBufferSize, minimum_value, configured_value)) + { + EPROSIMA_LOG_ERROR(TRANSPORT_UDPV6, + "Couldn't set receive buffer size to minimum value: " << minimum_value); + } + else if (mReceiveBufferSize != configured_value) + { + EPROSIMA_LOG_WARNING(TRANSPORT_UDPV6, + "Receive buffer size could not be set to the desired value. " + << "Using " << configured_value << " instead of " << mReceiveBufferSize); + } } if (is_multicast) diff --git a/src/cpp/rtps/transport/asio_helpers.hpp b/src/cpp/rtps/transport/asio_helpers.hpp new file mode 100644 index 0000000000..f9ba81b84e --- /dev/null +++ b/src/cpp/rtps/transport/asio_helpers.hpp @@ -0,0 +1,155 @@ +// Copyright 2024 Proyectos y Sistemas de Mantenimiento SL (eProsima). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef RTPS_TRANSPORT__ASIO_HELPERS_HPP_ +#define RTPS_TRANSPORT__ASIO_HELPERS_HPP_ + +#include + +#include + +namespace eprosima { +namespace fastdds { +namespace rtps { + +/// Helper functions for asio. +// NOTE: using a struct instead of a namespace to avoid linker errors when using inline free functions. +struct asio_helpers +{ + /** + * @brief Try to set a buffer size on a socket, trying to set the initial value and then halving it until it is + * possible to set it or the minimum value is reached. + * + * @tparam BufferOptionType Type of the buffer option to set. + * @tparam SocketType Type of socket on which to set the buffer size option. + * + * @param socket Socket on which to set the buffer size option. + * @param initial_buffer_value Initial value to try to set. + * @param minimum_buffer_value Minimum value to set. + * @param final_buffer_value Output parameter where the final value set will be stored. + * + * @return true if the buffer size was successfully set, false otherwise. + */ + template + static inline bool try_setting_buffer_size( + SocketType& socket, + const uint32_t initial_buffer_value, + const uint32_t minimum_buffer_value, + uint32_t& final_buffer_value) + { + asio::error_code ec; + + final_buffer_value = initial_buffer_value; + while (final_buffer_value >= minimum_buffer_value) + { + socket.set_option(BufferOptionType(static_cast(final_buffer_value)), ec); + if (!ec) + { + return true; + } + + final_buffer_value /= 2; + } + + final_buffer_value = minimum_buffer_value; + socket.set_option(BufferOptionType(final_buffer_value), ec); + return !ec; + } + + /** + * @brief Configure a buffer size on a socket, using the system default value if the initial value is 0. + * Ensures that the final buffer size is at least the minimum value. + * + * @tparam BufferOptionType Type of the buffer option to set. + * @tparam SocketType Type of socket on which to set the buffer size option. + * + * @param socket Socket on which to set the buffer size option. + * @param initial_buffer_value Initial value to try to set. + * @param minimum_buffer_value Minimum value to set. + * @param final_buffer_value Output parameter where the final value set will be stored. + * + * @return true if the buffer size was successfully set, false otherwise. + */ + template + static inline bool configure_buffer_size( + SocketType& socket, + const uint32_t initial_buffer_value, + const uint32_t minimum_buffer_value, + uint32_t& final_buffer_value) + { + final_buffer_value = initial_buffer_value; + + // If the initial value is 0, try using the system default value + if (initial_buffer_value == 0) + { + asio::error_code ec; + BufferOptionType option; + socket.get_option(option, ec); + if (!ec) + { + final_buffer_value = option.value(); + } + } + + // Ensure the minimum value is used + if (final_buffer_value < minimum_buffer_value) + { + final_buffer_value = minimum_buffer_value; + } + + // Try to set the highest possible value the system allows + return try_setting_buffer_size(socket, final_buffer_value, minimum_buffer_value, + final_buffer_value); + } + + /** + * @brief Configure the send and receive buffer sizes on a socket, using the system default value if the initial + * values are 0. Ensures that the final buffer sizes are at least the minimum value. + * + * @tparam SocketType Type of socket on which to set the buffer size options. + * + * @param socket Socket on which to set the buffer size options. + * @param descriptor Transport descriptor with the buffer sizes to set. + * @param final_send_buffer_size Output parameter where the final send buffer size will be stored. + * @param final_receive_buffer_size Output parameter where the final receive buffer size will be stored. + * + * @return true if the buffer sizes were successfully set, false otherwise. + */ + template + static inline bool configure_buffer_sizes( + SocketType& socket, + const SocketTransportDescriptor& descriptor, + uint32_t& final_send_buffer_size, + uint32_t& final_receive_buffer_size) + { + uint32_t minimum_socket_buffer = descriptor.maxMessageSize; + uint32_t send_buffer_size = descriptor.sendBufferSize; + uint32_t receive_buffer_size = descriptor.receiveBufferSize; + + bool send_buffer_size_set = configure_buffer_size( + socket, send_buffer_size, minimum_socket_buffer, final_send_buffer_size); + bool receive_buffer_size_set = configure_buffer_size( + socket, receive_buffer_size, minimum_socket_buffer, final_receive_buffer_size); + + return send_buffer_size_set && receive_buffer_size_set; + } + +}; + +} // namespace rtps +} // namespace fastdds +} // namespace eprosima + +#endif // RTPS_TRANSPORT__ASIO_HELPERS_HPP_ + diff --git a/test/blackbox/api/dds-pim/PubSubReader.hpp b/test/blackbox/api/dds-pim/PubSubReader.hpp index 699f2ab3ef..fb0945f4e6 100644 --- a/test/blackbox/api/dds-pim/PubSubReader.hpp +++ b/test/blackbox/api/dds-pim/PubSubReader.hpp @@ -1302,6 +1302,7 @@ class PubSubReader uint32_t sockerBufferSize) { participant_qos_.transport().listen_socket_buffer_size = sockerBufferSize; + participant_qos_.transport().send_socket_buffer_size = sockerBufferSize; return *this; } diff --git a/test/blackbox/api/dds-pim/PubSubWriter.hpp b/test/blackbox/api/dds-pim/PubSubWriter.hpp index 5a37df5497..0e358e4fc2 100644 --- a/test/blackbox/api/dds-pim/PubSubWriter.hpp +++ b/test/blackbox/api/dds-pim/PubSubWriter.hpp @@ -1485,6 +1485,7 @@ class PubSubWriter uint32_t sockerBufferSize) { participant_qos_.transport().listen_socket_buffer_size = sockerBufferSize; + participant_qos_.transport().send_socket_buffer_size = sockerBufferSize; return *this; } diff --git a/test/blackbox/api/fastrtps_deprecated/PubSubReader.hpp b/test/blackbox/api/fastrtps_deprecated/PubSubReader.hpp index 3031ade192..be99ec6903 100644 --- a/test/blackbox/api/fastrtps_deprecated/PubSubReader.hpp +++ b/test/blackbox/api/fastrtps_deprecated/PubSubReader.hpp @@ -1069,6 +1069,7 @@ class PubSubReader uint32_t sockerBufferSize) { participant_attr_.rtps.listenSocketBufferSize = sockerBufferSize; + participant_attr_.rtps.sendSocketBufferSize = sockerBufferSize; return *this; } diff --git a/test/blackbox/api/fastrtps_deprecated/PubSubWriter.hpp b/test/blackbox/api/fastrtps_deprecated/PubSubWriter.hpp index c5ee2a1bd7..1fdd2beb51 100644 --- a/test/blackbox/api/fastrtps_deprecated/PubSubWriter.hpp +++ b/test/blackbox/api/fastrtps_deprecated/PubSubWriter.hpp @@ -1249,6 +1249,7 @@ class PubSubWriter uint32_t sockerBufferSize) { participant_attr_.rtps.listenSocketBufferSize = sockerBufferSize; + participant_attr_.rtps.sendSocketBufferSize = sockerBufferSize; return *this; } diff --git a/test/blackbox/common/DDSBlackboxTestsListeners.cpp b/test/blackbox/common/DDSBlackboxTestsListeners.cpp index 1d7c40933b..d443811747 100644 --- a/test/blackbox/common/DDSBlackboxTestsListeners.cpp +++ b/test/blackbox/common/DDSBlackboxTestsListeners.cpp @@ -674,10 +674,25 @@ TEST_P(DDSStatus, DataAvailableConditions) subscriber_reader.wait_waitset_timeout(); } +<<<<<<< HEAD +======= +// We want to ensure that samples are only lost due to the custom filter we have set in sample_lost_test_dw_init. +// Since we are going to send 300KB samples in the test for fragments, let's increase the buffer size to avoid any +// other possible loss. +static constexpr uint32_t SAMPLE_LOST_TEST_BUFFER_SIZE = + 300ul * 1024ul // sample size + * 13ul // number of samples + * 2ul; // 2x to avoid any possible loss + +template +>>>>>>> 53cd211a8 (Handle errors when setting socket buffer sizes (#4760) (#4796)) void sample_lost_test_dw_init( PubSubWriter& writer) { auto testTransport = std::make_shared(); + testTransport->sendBufferSize = SAMPLE_LOST_TEST_BUFFER_SIZE; + testTransport->receiveBufferSize = SAMPLE_LOST_TEST_BUFFER_SIZE; + testTransport->drop_data_messages_filter_ = [](eprosima::fastrtps::rtps::CDRMessage_t& msg)-> bool { uint32_t old_pos = msg.pos; @@ -736,6 +751,12 @@ void sample_lost_test_init( PubSubWriter& writer, std::function functor) { +<<<<<<< HEAD +======= + reader.socket_buffer_size(SAMPLE_LOST_TEST_BUFFER_SIZE); + writer.socket_buffer_size(SAMPLE_LOST_TEST_BUFFER_SIZE); + +>>>>>>> 53cd211a8 (Handle errors when setting socket buffer sizes (#4760) (#4796)) sample_lost_test_dw_init(writer); sample_lost_test_dr_init(reader, functor); diff --git a/test/unittest/transport/TCPv4Tests.cpp b/test/unittest/transport/TCPv4Tests.cpp index 36ee6f139e..cfb9064ec1 100644 --- a/test/unittest/transport/TCPv4Tests.cpp +++ b/test/unittest/transport/TCPv4Tests.cpp @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include #include #include @@ -82,6 +83,74 @@ class TCPv4Tests : public ::testing::Test std::unique_ptr receiverThread; }; +TEST_F(TCPv4Tests, wrong_configuration_values) +{ + // Too big sendBufferSize + { + auto wrong_descriptor = descriptor; + wrong_descriptor.sendBufferSize = std::numeric_limits::max(); + TCPv4Transport transportUnderTest(wrong_descriptor); + ASSERT_FALSE(transportUnderTest.init()); + eprosima::fastdds::dds::Log::Flush(); + } + + // Too big receiveBufferSize + { + auto wrong_descriptor = descriptor; + wrong_descriptor.receiveBufferSize = std::numeric_limits::max(); + TCPv4Transport transportUnderTest(wrong_descriptor); + ASSERT_FALSE(transportUnderTest.init()); + eprosima::fastdds::dds::Log::Flush(); + } + + // Too big maxMessageSize + { + auto wrong_descriptor = descriptor; + wrong_descriptor.maxMessageSize = std::numeric_limits::max(); + TCPv4Transport transportUnderTest(wrong_descriptor); + ASSERT_FALSE(transportUnderTest.init()); + eprosima::fastdds::dds::Log::Flush(); + } + + // maxMessageSize bigger than receiveBufferSize + { + auto wrong_descriptor = descriptor; + wrong_descriptor.maxMessageSize = 10; + wrong_descriptor.receiveBufferSize = 5; + TCPv4Transport transportUnderTest(wrong_descriptor); + ASSERT_FALSE(transportUnderTest.init()); + eprosima::fastdds::dds::Log::Flush(); + } + + // maxMessageSize bigger than sendBufferSize + { + auto wrong_descriptor = descriptor; + wrong_descriptor.maxMessageSize = 10; + wrong_descriptor.sendBufferSize = 5; + TCPv4Transport transportUnderTest(wrong_descriptor); + ASSERT_FALSE(transportUnderTest.init()); + eprosima::fastdds::dds::Log::Flush(); + } + + // Buffer sizes automatically decrease + { + auto wrong_descriptor = descriptor; + wrong_descriptor.sendBufferSize = static_cast(std::numeric_limits::max()); + wrong_descriptor.receiveBufferSize = static_cast(std::numeric_limits::max()); + wrong_descriptor.maxMessageSize = 1470; + TCPv4Transport transportUnderTest(wrong_descriptor); + ASSERT_TRUE(transportUnderTest.init()); + auto* final_cfg = transportUnderTest.configuration(); + EXPECT_GE(final_cfg->sendBufferSize, final_cfg->maxMessageSize); + // The system could allow for the send buffer to be MAX_INT, so we cannot check it to be strictly lower + EXPECT_LE(final_cfg->sendBufferSize, wrong_descriptor.sendBufferSize); + EXPECT_GE(final_cfg->receiveBufferSize, final_cfg->maxMessageSize); + // The system could allow for the receive buffer to be MAX_INT, so we cannot check it to be strictly lower + EXPECT_LE(final_cfg->receiveBufferSize, wrong_descriptor.receiveBufferSize); + eprosima::fastdds::dds::Log::Flush(); + } +} + TEST_F(TCPv4Tests, locators_with_kind_1_supported) { // Given @@ -1334,7 +1403,7 @@ TEST_F(TCPv4Tests, secure_non_blocking_send) eprosima::fastdds::dds::Log::SetVerbosity(eprosima::fastdds::dds::Log::Kind::Info); uint16_t port = g_default_port; - uint32_t msg_size = eprosima::fastdds::rtps::s_minimumSocketBuffer; + uint32_t msg_size = 64ul * 1024ul; // Create a TCP Server transport using TLSOptions = TCPTransportDescriptor::TLSConfig::TLSOptions; using TLSVerifyMode = TCPTransportDescriptor::TLSConfig::TLSVerifyMode; @@ -1910,7 +1979,7 @@ TEST_F(TCPv4Tests, client_announced_local_port_uniqueness) TEST_F(TCPv4Tests, non_blocking_send) { uint16_t port = g_default_port; - uint32_t msg_size = eprosima::fastdds::rtps::s_minimumSocketBuffer; + uint32_t msg_size = 64ul * 1024ul; // Create a TCP Server transport TCPv4TransportDescriptor senderDescriptor; senderDescriptor.add_listener_port(port); diff --git a/test/unittest/transport/TCPv6Tests.cpp b/test/unittest/transport/TCPv6Tests.cpp index fa9791f8bd..4cad23e28e 100644 --- a/test/unittest/transport/TCPv6Tests.cpp +++ b/test/unittest/transport/TCPv6Tests.cpp @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include #include #include @@ -83,6 +84,74 @@ class TCPv6Tests : public ::testing::Test std::unique_ptr receiverThread; }; +TEST_F(TCPv6Tests, wrong_configuration_values) +{ + // Too big sendBufferSize + { + auto wrong_descriptor = descriptor; + wrong_descriptor.sendBufferSize = std::numeric_limits::max(); + TCPv6Transport transportUnderTest(wrong_descriptor); + ASSERT_FALSE(transportUnderTest.init()); + eprosima::fastdds::dds::Log::Flush(); + } + + // Too big receiveBufferSize + { + auto wrong_descriptor = descriptor; + wrong_descriptor.receiveBufferSize = std::numeric_limits::max(); + TCPv6Transport transportUnderTest(wrong_descriptor); + ASSERT_FALSE(transportUnderTest.init()); + eprosima::fastdds::dds::Log::Flush(); + } + + // Too big maxMessageSize + { + auto wrong_descriptor = descriptor; + wrong_descriptor.maxMessageSize = std::numeric_limits::max(); + TCPv6Transport transportUnderTest(wrong_descriptor); + ASSERT_FALSE(transportUnderTest.init()); + eprosima::fastdds::dds::Log::Flush(); + } + + // maxMessageSize bigger than receiveBufferSize + { + auto wrong_descriptor = descriptor; + wrong_descriptor.maxMessageSize = 10; + wrong_descriptor.receiveBufferSize = 5; + TCPv6Transport transportUnderTest(wrong_descriptor); + ASSERT_FALSE(transportUnderTest.init()); + eprosima::fastdds::dds::Log::Flush(); + } + + // maxMessageSize bigger than sendBufferSize + { + auto wrong_descriptor = descriptor; + wrong_descriptor.maxMessageSize = 10; + wrong_descriptor.sendBufferSize = 5; + TCPv6Transport transportUnderTest(wrong_descriptor); + ASSERT_FALSE(transportUnderTest.init()); + eprosima::fastdds::dds::Log::Flush(); + } + + // Buffer sizes automatically decrease + { + auto wrong_descriptor = descriptor; + wrong_descriptor.sendBufferSize = static_cast(std::numeric_limits::max()); + wrong_descriptor.receiveBufferSize = static_cast(std::numeric_limits::max()); + wrong_descriptor.maxMessageSize = 1470; + TCPv6Transport transportUnderTest(wrong_descriptor); + ASSERT_TRUE(transportUnderTest.init()); + auto* final_cfg = transportUnderTest.configuration(); + EXPECT_GE(final_cfg->sendBufferSize, final_cfg->maxMessageSize); + // The system could allow for the send buffer to be MAX_INT, so we cannot check it to be strictly lower + EXPECT_LE(final_cfg->sendBufferSize, wrong_descriptor.sendBufferSize); + EXPECT_GE(final_cfg->receiveBufferSize, final_cfg->maxMessageSize); + // The system could allow for the receive buffer to be MAX_INT, so we cannot check it to be strictly lower + EXPECT_LE(final_cfg->receiveBufferSize, wrong_descriptor.receiveBufferSize); + eprosima::fastdds::dds::Log::Flush(); + } +} + TEST_F(TCPv6Tests, conversion_to_ip6_string) { Locator_t locator; @@ -323,7 +392,7 @@ TEST_F(TCPv6Tests, client_announced_local_port_uniqueness) TEST_F(TCPv6Tests, non_blocking_send) { uint16_t port = g_default_port; - uint32_t msg_size = eprosima::fastdds::rtps::s_minimumSocketBuffer; + uint32_t msg_size = 64ul * 1024ul; // Create a TCP Server transport TCPv6TransportDescriptor senderDescriptor; senderDescriptor.add_listener_port(port); diff --git a/test/unittest/transport/UDPv4Tests.cpp b/test/unittest/transport/UDPv4Tests.cpp index 430e58feab..3f24c0a034 100644 --- a/test/unittest/transport/UDPv4Tests.cpp +++ b/test/unittest/transport/UDPv4Tests.cpp @@ -12,12 +12,14 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include #include #include #include #include +#include #include #include #include @@ -30,10 +32,6 @@ using namespace eprosima::fastrtps; using namespace eprosima::fastrtps::rtps; using UDPv4Transport = eprosima::fastdds::rtps::UDPv4Transport; -#ifndef __APPLE__ -const uint32_t ReceiveBufferCapacity = 65536; -#endif // ifndef __APPLE__ - #if defined(_WIN32) #define GET_PID _getpid #else @@ -75,6 +73,74 @@ class UDPv4Tests : public ::testing::Test std::unique_ptr receiverThread; }; +TEST_F(UDPv4Tests, wrong_configuration) +{ + // Too big sendBufferSize + { + UDPv4TransportDescriptor wrong_descriptor; + wrong_descriptor.sendBufferSize = std::numeric_limits::max(); + UDPv4Transport transportUnderTest(wrong_descriptor); + ASSERT_FALSE(transportUnderTest.init()); + eprosima::fastdds::dds::Log::Flush(); + } + + // Too big receiveBufferSize + { + UDPv4TransportDescriptor wrong_descriptor; + wrong_descriptor.receiveBufferSize = std::numeric_limits::max(); + UDPv4Transport transportUnderTest(wrong_descriptor); + ASSERT_FALSE(transportUnderTest.init()); + eprosima::fastdds::dds::Log::Flush(); + } + + // Too big maxMessageSize + { + UDPv4TransportDescriptor wrong_descriptor; + wrong_descriptor.maxMessageSize = std::numeric_limits::max(); + UDPv4Transport transportUnderTest(wrong_descriptor); + ASSERT_FALSE(transportUnderTest.init()); + eprosima::fastdds::dds::Log::Flush(); + } + + // maxMessageSize bigger than receiveBufferSize + { + UDPv4TransportDescriptor wrong_descriptor; + wrong_descriptor.maxMessageSize = 10; + wrong_descriptor.receiveBufferSize = 5; + UDPv4Transport transportUnderTest(wrong_descriptor); + ASSERT_FALSE(transportUnderTest.init()); + eprosima::fastdds::dds::Log::Flush(); + } + + // maxMessageSize bigger than sendBufferSize + { + UDPv4TransportDescriptor wrong_descriptor; + wrong_descriptor.maxMessageSize = 10; + wrong_descriptor.sendBufferSize = 5; + UDPv4Transport transportUnderTest(wrong_descriptor); + ASSERT_FALSE(transportUnderTest.init()); + eprosima::fastdds::dds::Log::Flush(); + } + + // Buffer sizes automatically decrease + { + UDPv4TransportDescriptor wrong_descriptor; + wrong_descriptor.sendBufferSize = static_cast(std::numeric_limits::max()); + wrong_descriptor.receiveBufferSize = static_cast(std::numeric_limits::max()); + wrong_descriptor.maxMessageSize = 1470; + UDPv4Transport transportUnderTest(wrong_descriptor); + ASSERT_TRUE(transportUnderTest.init()); + auto* final_cfg = transportUnderTest.configuration(); + EXPECT_GE(final_cfg->sendBufferSize, final_cfg->maxMessageSize); + // The system could allow for the send buffer to be MAX_INT, so we cannot check it to be strictly lower + EXPECT_LE(final_cfg->sendBufferSize, wrong_descriptor.sendBufferSize); + EXPECT_GE(final_cfg->receiveBufferSize, final_cfg->maxMessageSize); + // The system could allow for the receive buffer to be MAX_INT, so we cannot check it to be strictly lower + EXPECT_LE(final_cfg->receiveBufferSize, wrong_descriptor.receiveBufferSize); + eprosima::fastdds::dds::Log::Flush(); + } +} + TEST_F(UDPv4Tests, locators_with_kind_1_supported) { // Given diff --git a/test/unittest/transport/UDPv6Tests.cpp b/test/unittest/transport/UDPv6Tests.cpp index 805518934c..6009c7b593 100644 --- a/test/unittest/transport/UDPv6Tests.cpp +++ b/test/unittest/transport/UDPv6Tests.cpp @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include #include #include @@ -30,10 +31,6 @@ using namespace eprosima::fastrtps::rtps; using namespace eprosima::fastrtps; using UDPv6Transport = eprosima::fastdds::rtps::UDPv6Transport; -#ifndef __APPLE__ -const uint32_t ReceiveBufferCapacity = 65536; -#endif // ifndef __APPLE__ - #if defined(_WIN32) #define GET_PID _getpid #else @@ -83,6 +80,74 @@ class UDPv6Tests : public ::testing::Test std::unique_ptr receiverThread; }; +TEST_F(UDPv6Tests, wrong_configuration) +{ + // Too big sendBufferSize + { + UDPv6TransportDescriptor wrong_descriptor; + wrong_descriptor.sendBufferSize = std::numeric_limits::max(); + UDPv6Transport transportUnderTest(wrong_descriptor); + ASSERT_FALSE(transportUnderTest.init()); + eprosima::fastdds::dds::Log::Flush(); + } + + // Too big receiveBufferSize + { + UDPv6TransportDescriptor wrong_descriptor; + wrong_descriptor.receiveBufferSize = std::numeric_limits::max(); + UDPv6Transport transportUnderTest(wrong_descriptor); + ASSERT_FALSE(transportUnderTest.init()); + eprosima::fastdds::dds::Log::Flush(); + } + + // Too big maxMessageSize + { + UDPv6TransportDescriptor wrong_descriptor; + wrong_descriptor.maxMessageSize = std::numeric_limits::max(); + UDPv6Transport transportUnderTest(wrong_descriptor); + ASSERT_FALSE(transportUnderTest.init()); + eprosima::fastdds::dds::Log::Flush(); + } + + // maxMessageSize bigger than receiveBufferSize + { + UDPv6TransportDescriptor wrong_descriptor; + wrong_descriptor.maxMessageSize = 10; + wrong_descriptor.receiveBufferSize = 5; + UDPv6Transport transportUnderTest(wrong_descriptor); + ASSERT_FALSE(transportUnderTest.init()); + eprosima::fastdds::dds::Log::Flush(); + } + + // maxMessageSize bigger than sendBufferSize + { + UDPv6TransportDescriptor wrong_descriptor; + wrong_descriptor.maxMessageSize = 10; + wrong_descriptor.sendBufferSize = 5; + UDPv6Transport transportUnderTest(wrong_descriptor); + ASSERT_FALSE(transportUnderTest.init()); + eprosima::fastdds::dds::Log::Flush(); + } + + // Buffer sizes automatically decrease + { + UDPv6TransportDescriptor wrong_descriptor; + wrong_descriptor.sendBufferSize = static_cast(std::numeric_limits::max()); + wrong_descriptor.receiveBufferSize = static_cast(std::numeric_limits::max()); + wrong_descriptor.maxMessageSize = 1470; + UDPv6Transport transportUnderTest(wrong_descriptor); + ASSERT_TRUE(transportUnderTest.init()); + auto* final_cfg = transportUnderTest.configuration(); + EXPECT_GE(final_cfg->sendBufferSize, final_cfg->maxMessageSize); + // The system could allow for the send buffer to be MAX_INT, so we cannot check it to be strictly lower + EXPECT_LE(final_cfg->sendBufferSize, wrong_descriptor.sendBufferSize); + EXPECT_GE(final_cfg->receiveBufferSize, final_cfg->maxMessageSize); + // The system could allow for the receive buffer to be MAX_INT, so we cannot check it to be strictly lower + EXPECT_LE(final_cfg->receiveBufferSize, wrong_descriptor.receiveBufferSize); + eprosima::fastdds::dds::Log::Flush(); + } +} + TEST_F(UDPv6Tests, conversion_to_ip6_string) { Locator_t locator; From 17e304100362924de17583573b4b9f4860d879aa Mon Sep 17 00:00:00 2001 From: Miguel Company Date: Mon, 20 May 2024 10:16:18 +0200 Subject: [PATCH 2/6] Refs #21036. Fix conflicts. Signed-off-by: Miguel Company --- .../rtps/transport/TCPTransportInterface.cpp | 28 ----------- .../rtps/transport/UDPTransportInterface.cpp | 49 ++----------------- src/cpp/rtps/transport/UDPv4Transport.cpp | 6 --- src/cpp/rtps/transport/UDPv6Transport.cpp | 5 -- .../common/DDSBlackboxTestsListeners.cpp | 21 -------- 5 files changed, 3 insertions(+), 106 deletions(-) diff --git a/src/cpp/rtps/transport/TCPTransportInterface.cpp b/src/cpp/rtps/transport/TCPTransportInterface.cpp index 6e2bd10c2f..7a6a54c6e7 100644 --- a/src/cpp/rtps/transport/TCPTransportInterface.cpp +++ b/src/cpp/rtps/transport/TCPTransportInterface.cpp @@ -517,35 +517,7 @@ bool TCPTransportInterface::init( if (!asio_helpers::configure_buffer_sizes( *initial_peer_local_locator_socket_, *configuration(), send_size, recv_size)) { -<<<<<<< HEAD - socket_base::send_buffer_size option; - initial_peer_local_locator_socket_->get_option(option); - set_send_buffer_size(option.value()); - - if (configuration()->sendBufferSize < s_minimumSocketBuffer) - { - set_send_buffer_size(s_minimumSocketBuffer); - } - } - - if (configuration()->receiveBufferSize == 0) - { - socket_base::receive_buffer_size option; - initial_peer_local_locator_socket_->get_option(option); - set_receive_buffer_size(option.value()); - - if (configuration()->receiveBufferSize < s_minimumSocketBuffer) - { - set_receive_buffer_size(s_minimumSocketBuffer); - } - } - - if (configuration()->maxMessageSize > s_maximumMessageSize) - { - EPROSIMA_LOG_ERROR(RTCP_MSG_OUT, "maxMessageSize cannot be greater than 65000"); -======= EPROSIMA_LOG_ERROR(TRANSPORT_TCP, "Couldn't set buffer sizes to minimum value: " << cfg_max_msg_size); ->>>>>>> 53cd211a8 (Handle errors when setting socket buffer sizes (#4760) (#4796)) return false; } diff --git a/src/cpp/rtps/transport/UDPTransportInterface.cpp b/src/cpp/rtps/transport/UDPTransportInterface.cpp index 6290505a2f..41d1f75e82 100644 --- a/src/cpp/rtps/transport/UDPTransportInterface.cpp +++ b/src/cpp/rtps/transport/UDPTransportInterface.cpp @@ -122,44 +122,6 @@ bool UDPTransportInterface::DoInputLocatorsMatch( bool UDPTransportInterface::init( const fastrtps::rtps::PropertyPolicy*) { -<<<<<<< HEAD - if (configuration()->sendBufferSize == 0 || configuration()->receiveBufferSize == 0) - { - // Check system buffer sizes. - ip::udp::socket socket(io_service_); - socket.open(generate_protocol()); - - if (configuration()->sendBufferSize == 0) - { - socket_base::send_buffer_size option; - socket.get_option(option); - set_send_buffer_size(static_cast(option.value())); - - if (configuration()->sendBufferSize < s_minimumSocketBuffer) - { - set_send_buffer_size(s_minimumSocketBuffer); - mSendBufferSize = s_minimumSocketBuffer; - } - } - - if (configuration()->receiveBufferSize == 0) - { - socket_base::receive_buffer_size option; - socket.get_option(option); - set_receive_buffer_size(static_cast(option.value())); - - if (configuration()->receiveBufferSize < s_minimumSocketBuffer) - { - set_receive_buffer_size(s_minimumSocketBuffer); - mReceiveBufferSize = s_minimumSocketBuffer; - } - } - } - - if (configuration()->maxMessageSize > s_maximumMessageSize) - { - EPROSIMA_LOG_ERROR(RTPS_MSG_OUT, "maxMessageSize cannot be greater than 65000"); -======= uint32_t maximumMessageSize = max_msg_size_no_frag == 0 ? s_maximumMessageSize : max_msg_size_no_frag; uint32_t cfg_max_msg_size = configuration()->maxMessageSize; uint32_t cfg_send_size = configuration()->sendBufferSize; @@ -169,7 +131,6 @@ bool UDPTransportInterface::init( if (cfg_max_msg_size > maximumMessageSize) { EPROSIMA_LOG_ERROR(TRANSPORT_UDP, "maxMessageSize cannot be greater than " << maximumMessageSize); ->>>>>>> 53cd211a8 (Handle errors when setting socket buffer sizes (#4760) (#4796)) return false; } @@ -185,12 +146,6 @@ bool UDPTransportInterface::init( return false; } -<<<<<<< HEAD - // TODO(Ricardo) Create an event that update this list. - get_ips(currentInterfaces); - - return true; -======= if ((cfg_send_size > 0) && (cfg_max_msg_size > cfg_send_size)) { EPROSIMA_LOG_ERROR(TRANSPORT_UDP, "maxMessageSize cannot be greater than sendBufferSize"); @@ -203,6 +158,9 @@ bool UDPTransportInterface::init( return false; } + // TODO(Ricardo) Create an event that update this list. + get_ips(currentInterfaces); + asio::error_code ec; ip::udp::socket socket(io_service_); socket.open(generate_protocol(), ec); @@ -236,7 +194,6 @@ bool UDPTransportInterface::init( } return ret; ->>>>>>> 53cd211a8 (Handle errors when setting socket buffer sizes (#4760) (#4796)) } bool UDPTransportInterface::IsInputChannelOpen( diff --git a/src/cpp/rtps/transport/UDPv4Transport.cpp b/src/cpp/rtps/transport/UDPv4Transport.cpp index fc5be0411b..1f4f0416d9 100644 --- a/src/cpp/rtps/transport/UDPv4Transport.cpp +++ b/src/cpp/rtps/transport/UDPv4Transport.cpp @@ -26,14 +26,8 @@ #include #include -<<<<<<< HEAD -======= -#include #include -#include ->>>>>>> 53cd211a8 (Handle errors when setting socket buffer sizes (#4760) (#4796)) - using namespace std; using namespace asio; diff --git a/src/cpp/rtps/transport/UDPv6Transport.cpp b/src/cpp/rtps/transport/UDPv6Transport.cpp index 60191afd7e..c825a180be 100644 --- a/src/cpp/rtps/transport/UDPv6Transport.cpp +++ b/src/cpp/rtps/transport/UDPv6Transport.cpp @@ -23,13 +23,8 @@ #include #include #include -<<<<<<< HEAD -======= -#include #include -#include ->>>>>>> 53cd211a8 (Handle errors when setting socket buffer sizes (#4760) (#4796)) using namespace std; using namespace asio; diff --git a/test/blackbox/common/DDSBlackboxTestsListeners.cpp b/test/blackbox/common/DDSBlackboxTestsListeners.cpp index d443811747..1d7c40933b 100644 --- a/test/blackbox/common/DDSBlackboxTestsListeners.cpp +++ b/test/blackbox/common/DDSBlackboxTestsListeners.cpp @@ -674,25 +674,10 @@ TEST_P(DDSStatus, DataAvailableConditions) subscriber_reader.wait_waitset_timeout(); } -<<<<<<< HEAD -======= -// We want to ensure that samples are only lost due to the custom filter we have set in sample_lost_test_dw_init. -// Since we are going to send 300KB samples in the test for fragments, let's increase the buffer size to avoid any -// other possible loss. -static constexpr uint32_t SAMPLE_LOST_TEST_BUFFER_SIZE = - 300ul * 1024ul // sample size - * 13ul // number of samples - * 2ul; // 2x to avoid any possible loss - -template ->>>>>>> 53cd211a8 (Handle errors when setting socket buffer sizes (#4760) (#4796)) void sample_lost_test_dw_init( PubSubWriter& writer) { auto testTransport = std::make_shared(); - testTransport->sendBufferSize = SAMPLE_LOST_TEST_BUFFER_SIZE; - testTransport->receiveBufferSize = SAMPLE_LOST_TEST_BUFFER_SIZE; - testTransport->drop_data_messages_filter_ = [](eprosima::fastrtps::rtps::CDRMessage_t& msg)-> bool { uint32_t old_pos = msg.pos; @@ -751,12 +736,6 @@ void sample_lost_test_init( PubSubWriter& writer, std::function functor) { -<<<<<<< HEAD -======= - reader.socket_buffer_size(SAMPLE_LOST_TEST_BUFFER_SIZE); - writer.socket_buffer_size(SAMPLE_LOST_TEST_BUFFER_SIZE); - ->>>>>>> 53cd211a8 (Handle errors when setting socket buffer sizes (#4760) (#4796)) sample_lost_test_dw_init(writer); sample_lost_test_dr_init(reader, functor); From a4b492892f7977d68419b6ad63e3c00397f1707d Mon Sep 17 00:00:00 2001 From: Miguel Company Date: Fri, 24 May 2024 08:48:52 +0200 Subject: [PATCH 3/6] Refs #21036. Update for non-backported changes Signed-off-by: Miguel Company --- src/cpp/rtps/transport/TCPTransportInterface.cpp | 2 +- src/cpp/rtps/transport/UDPTransportInterface.cpp | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/cpp/rtps/transport/TCPTransportInterface.cpp b/src/cpp/rtps/transport/TCPTransportInterface.cpp index 7a6a54c6e7..de753416c2 100644 --- a/src/cpp/rtps/transport/TCPTransportInterface.cpp +++ b/src/cpp/rtps/transport/TCPTransportInterface.cpp @@ -444,7 +444,7 @@ bool TCPTransportInterface::DoInputLocatorsMatch( bool TCPTransportInterface::init( const fastrtps::rtps::PropertyPolicy* properties) { - uint32_t maximumMessageSize = max_msg_size_no_frag == 0 ? s_maximumMessageSize : max_msg_size_no_frag; + uint32_t maximumMessageSize = s_maximumMessageSize; uint32_t cfg_max_msg_size = configuration()->maxMessageSize; uint32_t cfg_send_size = configuration()->sendBufferSize; uint32_t cfg_recv_size = configuration()->receiveBufferSize; diff --git a/src/cpp/rtps/transport/UDPTransportInterface.cpp b/src/cpp/rtps/transport/UDPTransportInterface.cpp index 41d1f75e82..613851509f 100644 --- a/src/cpp/rtps/transport/UDPTransportInterface.cpp +++ b/src/cpp/rtps/transport/UDPTransportInterface.cpp @@ -122,7 +122,7 @@ bool UDPTransportInterface::DoInputLocatorsMatch( bool UDPTransportInterface::init( const fastrtps::rtps::PropertyPolicy*) { - uint32_t maximumMessageSize = max_msg_size_no_frag == 0 ? s_maximumMessageSize : max_msg_size_no_frag; + uint32_t maximumMessageSize = s_maximumMessageSize; uint32_t cfg_max_msg_size = configuration()->maxMessageSize; uint32_t cfg_send_size = configuration()->sendBufferSize; uint32_t cfg_recv_size = configuration()->receiveBufferSize; From 918ba5ea191dfbb764d840978c3c988d51767f88 Mon Sep 17 00:00:00 2001 From: Miguel Company Date: Fri, 22 Mar 2024 16:26:54 +0100 Subject: [PATCH 4/6] Fix on_sample_lost notification on best-effort readers for fragmented samples (#4187) * Refs #20162. Regression test. Signed-off-by: Miguel Company * Refs #20162. Notify sample lost when dropping fragmented change. Signed-off-by: Miguel Company * Refs #20167. Linters. Signed-off-by: Miguel Company * Refs #20162. Apply suggestions. Signed-off-by: Miguel Company * Refs #20162. Use constexpr for buffer size. Signed-off-by: Miguel Company * Refs #20162. Lower buffer size. Signed-off-by: Miguel Company * Refs #20351. Uncrustify. Signed-off-by: Miguel Company --------- Signed-off-by: Miguel Company (cherry picked from commit 5ac198e80fb19ba8d780a0058a026b7e22ca1ef2) --- src/cpp/rtps/reader/StatelessReader.cpp | 20 +++ .../common/DDSBlackboxTestsListeners.cpp | 131 +++++++++++++++++- 2 files changed, 146 insertions(+), 5 deletions(-) diff --git a/src/cpp/rtps/reader/StatelessReader.cpp b/src/cpp/rtps/reader/StatelessReader.cpp index d39eb0176e..6d36964b39 100644 --- a/src/cpp/rtps/reader/StatelessReader.cpp +++ b/src/cpp/rtps/reader/StatelessReader.cpp @@ -708,6 +708,26 @@ bool StatelessReader::processDataFragMsg( { if (work_change->sequenceNumber < change_to_add->sequenceNumber) { + SequenceNumber_t updated_seq = work_change->sequenceNumber; + SequenceNumber_t previous_seq{ 0, 0 }; + previous_seq = update_last_notified(writer_guid, updated_seq); + + // Notify lost samples + auto listener = getListener(); + if (listener != nullptr) + { + if (SequenceNumber_t{ 0, 0 } != previous_seq) + { + assert(previous_seq < updated_seq); + uint64_t tmp = (updated_seq - previous_seq).to64long(); + int32_t lost_samples = + tmp > static_cast(std::numeric_limits::max()) ? + std::numeric_limits::max() : static_cast(tmp); + assert (0 < lost_samples); + listener->on_sample_lost(this, lost_samples); + } + } + // Pending change should be dropped. Check if it can be reused if (sampleSize <= work_change->serializedPayload.max_size) { diff --git a/test/blackbox/common/DDSBlackboxTestsListeners.cpp b/test/blackbox/common/DDSBlackboxTestsListeners.cpp index 1d7c40933b..29cdd197be 100644 --- a/test/blackbox/common/DDSBlackboxTestsListeners.cpp +++ b/test/blackbox/common/DDSBlackboxTestsListeners.cpp @@ -674,8 +674,9 @@ TEST_P(DDSStatus, DataAvailableConditions) subscriber_reader.wait_waitset_timeout(); } +template void sample_lost_test_dw_init( - PubSubWriter& writer) + PubSubWriter& writer) { auto testTransport = std::make_shared(); testTransport->drop_data_messages_filter_ = [](eprosima::fastrtps::rtps::CDRMessage_t& msg)-> bool @@ -683,7 +684,8 @@ void sample_lost_test_dw_init( uint32_t old_pos = msg.pos; // see RTPS DDS 9.4.5.3 Data Submessage - EntityId_t readerID, writerID; + EntityId_t readerID; + EntityId_t writerID; SequenceNumber_t sn; msg.pos += 2; // flags @@ -711,6 +713,43 @@ void sample_lost_test_dw_init( return false; }; + testTransport->drop_data_frag_messages_filter_ = [](eprosima::fastrtps::rtps::CDRMessage_t& msg)-> bool + { + uint32_t old_pos = msg.pos; + + // see RTPS DDS 9.4.5.4 DataFrag Submessage + EntityId_t readerID; + EntityId_t writerID; + SequenceNumber_t sn; + uint32_t first_fragment = 0; + + msg.pos += 2; // flags + msg.pos += 2; // octets to inline quos + CDRMessage::readEntityId(&msg, &readerID); + CDRMessage::readEntityId(&msg, &writerID); + CDRMessage::readSequenceNumber(&msg, &sn); + CDRMessage::readUInt32(&msg, &first_fragment); + + // restore buffer pos + msg.pos = old_pos; + + // generate losses + if ((writerID.value[3] & 0xC0) == 0 // only user endpoints + && (1 == first_fragment) // only first fragment + && (sn == SequenceNumber_t{0, 2} || + sn == SequenceNumber_t(0, 3) || + sn == SequenceNumber_t(0, 4) || + sn == SequenceNumber_t(0, 6) || + sn == SequenceNumber_t(0, 8) || + sn == SequenceNumber_t(0, 10) || + sn == SequenceNumber_t(0, 11) || + sn == SequenceNumber_t(0, 13))) + { + return true; + } + + return false; + }; writer.disable_builtin_transport() @@ -721,19 +760,29 @@ void sample_lost_test_dw_init( } +template void sample_lost_test_dr_init( - PubSubReader& reader, + PubSubReader& reader, std::function functor) { + // We want to ensure that samples are only lost due to the custom filter we have set in sample_lost_test_dw_init. + // Since we are going to send 300KB samples in the test for fragments, let's increase the buffer size to avoid any + // other possible loss. + constexpr uint32_t BUFFER_SIZE = + 300ul * 1024ul // sample size + * 13ul // number of samples + * 2ul; // 2x to avoid any possible loss + reader.socket_buffer_size(BUFFER_SIZE); reader.sample_lost_status_functor(functor) .init(); ASSERT_TRUE(reader.isInitialized()); } +template void sample_lost_test_init( - PubSubReader& reader, - PubSubWriter& writer, + PubSubReader& reader, + PubSubWriter& writer, std::function functor) { sample_lost_test_dw_init(writer); @@ -802,6 +851,78 @@ TEST(DDSStatus, sample_lost_be_dw_be_dr) }); } +/*! + * \test DDS-STS-SLS-01 Test `SampleLostStatus` in a Best-Effort DataWriter and a Best-Effort DataReader communication. + * This is also a regression test for bug redmine 20162 + */ +TEST(DDSStatus, sample_lost_be_dw_be_dr_fragments) +{ + PubSubReader reader(TEST_TOPIC_NAME); + PubSubWriter writer(TEST_TOPIC_NAME); + + std::mutex test_step_mtx; + std::condition_variable test_step_cv; + uint8_t test_step = 0; + + writer.reliability(eprosima::fastdds::dds::BEST_EFFORT_RELIABILITY_QOS); + reader.reliability(eprosima::fastdds::dds::BEST_EFFORT_RELIABILITY_QOS); + + sample_lost_test_init(reader, writer, [&test_step_mtx, &test_step_cv, &test_step]( + const eprosima::fastdds::dds::SampleLostStatus& status) + { + { + std::unique_lock lock(test_step_mtx); + std::cout << status.total_count << " " << status.total_count_change << std::endl; + if (0 == test_step && 1 == status.total_count && 1 == status.total_count_change) + { + ++test_step; + } + else if (1 == test_step && 2 == status.total_count && 1 == status.total_count_change) + { + ++test_step; + } + else if (2 == test_step && 3 == status.total_count && 1 == status.total_count_change) + { + ++test_step; + } + else if (3 == test_step && 4 == status.total_count && 1 == status.total_count_change) + { + ++test_step; + } + else if (4 == test_step && 5 == status.total_count && 1 == status.total_count_change) + { + ++test_step; + } + else if (5 == test_step && 6 == status.total_count && 1 == status.total_count_change) + { + ++test_step; + } + else if (6 == test_step && 7 == status.total_count && 1 == status.total_count_change) + { + ++test_step; + } + else + { + test_step = 0; + } + } + + test_step_cv.notify_all(); + }); + + + auto data = default_data300kb_data_generator(13); + + reader.startReception(data); + writer.send(data, 100); + + std::unique_lock lock(test_step_mtx); + test_step_cv.wait(lock, [&test_step]() + { + return 7 == test_step; + }); +} + /*! * \test DDS-STS-SLS-02 Test `SampleLostStatus` in a Best-Effort DataWriter and a late-joiner Best-Effort DataReader * communication. From 008ee700183fe31628436cd7d336ae27b18c6540 Mon Sep 17 00:00:00 2001 From: Miguel Company Date: Wed, 27 Mar 2024 08:28:26 +0100 Subject: [PATCH 5/6] Make sample_lost_be_dw_be_dr_fragments test less flaky (#4620) * Refs #20692. Make sample_lost_be_dw_be_dr_fragments test less flakey. Signed-off-by: Miguel Company * Refs #20692. Uncrustify. Signed-off-by: Miguel Company --------- Signed-off-by: Miguel Company --- .../common/DDSBlackboxTestsListeners.cpp | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/test/blackbox/common/DDSBlackboxTestsListeners.cpp b/test/blackbox/common/DDSBlackboxTestsListeners.cpp index 29cdd197be..5926bb01ef 100644 --- a/test/blackbox/common/DDSBlackboxTestsListeners.cpp +++ b/test/blackbox/common/DDSBlackboxTestsListeners.cpp @@ -765,14 +765,6 @@ void sample_lost_test_dr_init( PubSubReader& reader, std::function functor) { - // We want to ensure that samples are only lost due to the custom filter we have set in sample_lost_test_dw_init. - // Since we are going to send 300KB samples in the test for fragments, let's increase the buffer size to avoid any - // other possible loss. - constexpr uint32_t BUFFER_SIZE = - 300ul * 1024ul // sample size - * 13ul // number of samples - * 2ul; // 2x to avoid any possible loss - reader.socket_buffer_size(BUFFER_SIZE); reader.sample_lost_status_functor(functor) .init(); @@ -785,6 +777,16 @@ void sample_lost_test_init( PubSubWriter& writer, std::function functor) { + // We want to ensure that samples are only lost due to the custom filter we have set in sample_lost_test_dw_init. + // Since we are going to send 300KB samples in the test for fragments, let's increase the buffer size to avoid any + // other possible loss. + constexpr uint32_t BUFFER_SIZE = + 300ul * 1024ul // sample size + * 13ul // number of samples + * 2ul; // 2x to avoid any possible loss + reader.socket_buffer_size(BUFFER_SIZE); + writer.socket_buffer_size(BUFFER_SIZE); + sample_lost_test_dw_init(writer); sample_lost_test_dr_init(reader, functor); From 050be91292af202a97fd2df148dbf9358ce5a77a Mon Sep 17 00:00:00 2001 From: Miguel Company Date: Fri, 24 May 2024 10:37:15 +0200 Subject: [PATCH 6/6] Refs #20972. Improvements in on_sample_lost blackbox tests. Signed-off-by: Miguel Company --- .../common/DDSBlackboxTestsListeners.cpp | 22 +++++++++++-------- 1 file changed, 13 insertions(+), 9 deletions(-) diff --git a/test/blackbox/common/DDSBlackboxTestsListeners.cpp b/test/blackbox/common/DDSBlackboxTestsListeners.cpp index 5926bb01ef..873537a8da 100644 --- a/test/blackbox/common/DDSBlackboxTestsListeners.cpp +++ b/test/blackbox/common/DDSBlackboxTestsListeners.cpp @@ -674,11 +674,22 @@ TEST_P(DDSStatus, DataAvailableConditions) subscriber_reader.wait_waitset_timeout(); } +// We want to ensure that samples are only lost due to the custom filter we have set in sample_lost_test_dw_init. +// Since we are going to send 300KB samples in the test for fragments, let's increase the buffer size to avoid any +// other possible loss. +static constexpr uint32_t SAMPLE_LOST_TEST_BUFFER_SIZE = + 300ul * 1024ul // sample size + * 13ul // number of samples + * 2ul; // 2x to avoid any possible loss + template void sample_lost_test_dw_init( PubSubWriter& writer) { auto testTransport = std::make_shared(); + testTransport->sendBufferSize = SAMPLE_LOST_TEST_BUFFER_SIZE; + testTransport->receiveBufferSize = SAMPLE_LOST_TEST_BUFFER_SIZE; + testTransport->drop_data_messages_filter_ = [](eprosima::fastrtps::rtps::CDRMessage_t& msg)-> bool { uint32_t old_pos = msg.pos; @@ -777,15 +788,8 @@ void sample_lost_test_init( PubSubWriter& writer, std::function functor) { - // We want to ensure that samples are only lost due to the custom filter we have set in sample_lost_test_dw_init. - // Since we are going to send 300KB samples in the test for fragments, let's increase the buffer size to avoid any - // other possible loss. - constexpr uint32_t BUFFER_SIZE = - 300ul * 1024ul // sample size - * 13ul // number of samples - * 2ul; // 2x to avoid any possible loss - reader.socket_buffer_size(BUFFER_SIZE); - writer.socket_buffer_size(BUFFER_SIZE); + reader.socket_buffer_size(SAMPLE_LOST_TEST_BUFFER_SIZE); + writer.socket_buffer_size(SAMPLE_LOST_TEST_BUFFER_SIZE); sample_lost_test_dw_init(writer); sample_lost_test_dr_init(reader, functor);