Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[20351] Fix on_sample_lost notification on best-effort readers for framented samples (backport #4187) #4608

Merged
merged 6 commits into from
May 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion include/fastdds/rtps/transport/TransportInterface.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ namespace rtps {
constexpr uint32_t s_maximumMessageSize = 65500;
//! Default maximum initial peers range
constexpr uint32_t s_maximumInitialPeersRange = 4;
//! Default minimum socket buffer
// Default minimum socket buffer
FASTDDS_DEPRECATED_UNTIL(3, s_minimumSocketBuffer, "Minimum socket buffer is now taken from the maximum msg size")
constexpr uint32_t s_minimumSocketBuffer = 65536;
//! Default IPv4 address
static const std::string s_IPv4AddressAny = "0.0.0.0";
Expand Down
3 changes: 2 additions & 1 deletion include/fastrtps/transport/TransportInterface.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ using TransportInterface = fastdds::rtps::TransportInterface;

static const uint32_t s_maximumMessageSize = fastdds::rtps::s_maximumMessageSize;
static const uint32_t s_maximumInitialPeersRange = fastdds::rtps::s_maximumInitialPeersRange;
static const uint32_t s_minimumSocketBuffer = fastdds::rtps::s_minimumSocketBuffer;
FASTDDS_DEPRECATED_UNTIL(3, s_minimumSocketBuffer, "Minimum socket buffer is now taken from the maximum msg size")
static const uint32_t s_minimumSocketBuffer = 65536;
static const std::string s_IPv4AddressAny = fastdds::rtps::s_IPv4AddressAny;
static const std::string s_IPv6AddressAny = fastdds::rtps::s_IPv6AddressAny;

Expand Down
20 changes: 20 additions & 0 deletions src/cpp/rtps/reader/StatelessReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -708,6 +708,26 @@ bool StatelessReader::processDataFragMsg(
{
if (work_change->sequenceNumber < change_to_add->sequenceNumber)
{
SequenceNumber_t updated_seq = work_change->sequenceNumber;
SequenceNumber_t previous_seq{ 0, 0 };
previous_seq = update_last_notified(writer_guid, updated_seq);

// Notify lost samples
auto listener = getListener();
if (listener != nullptr)
{
if (SequenceNumber_t{ 0, 0 } != previous_seq)
{
assert(previous_seq < updated_seq);
uint64_t tmp = (updated_seq - previous_seq).to64long();
int32_t lost_samples =
tmp > static_cast<uint64_t>(std::numeric_limits<int32_t>::max()) ?
std::numeric_limits<int32_t>::max() : static_cast<int32_t>(tmp);
assert (0 < lost_samples);
listener->on_sample_lost(this, lost_samples);
}
}

// Pending change should be dropped. Check if it can be reused
if (sampleSize <= work_change->serializedPayload.max_size)
{
Expand Down
49 changes: 48 additions & 1 deletion src/cpp/rtps/transport/TCPChannelResource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include <thread>

#include <fastrtps/utils/IPLocator.h>
#include <rtps/transport/asio_helpers.hpp>
#include <rtps/transport/TCPTransportInterface.h>

namespace eprosima {
Expand Down Expand Up @@ -370,6 +371,52 @@ bool TCPChannelResource::check_socket_send_buffer(
return true;
}

void TCPChannelResource::set_socket_options(
asio::basic_socket<asio::ip::tcp>& socket,
const TCPTransportDescriptor* options)
{
uint32_t minimum_value = options->maxMessageSize;

// Set the send buffer size
{
uint32_t desired_value = options->sendBufferSize;
uint32_t configured_value = 0;
if (!asio_helpers::try_setting_buffer_size<asio::socket_base::send_buffer_size>(
socket, desired_value, minimum_value, configured_value))
{
EPROSIMA_LOG_ERROR(TCP_TRANSPORT,
"Couldn't set send buffer size to minimum value: " << minimum_value);
}
else if (desired_value != configured_value)
{
EPROSIMA_LOG_WARNING(TCP_TRANSPORT,
"Couldn't set send buffer size to desired value. "
<< "Using " << configured_value << " instead of " << desired_value);
}
}

// Set the receive buffer size
{
uint32_t desired_value = options->receiveBufferSize;
uint32_t configured_value = 0;
if (!asio_helpers::try_setting_buffer_size<asio::socket_base::receive_buffer_size>(
socket, desired_value, minimum_value, configured_value))
{
EPROSIMA_LOG_ERROR(TCP_TRANSPORT,
"Couldn't set receive buffer size to minimum value: " << minimum_value);
}
else if (desired_value != configured_value)
{
EPROSIMA_LOG_WARNING(TCP_TRANSPORT,
"Couldn't set receive buffer size to desired value. "
<< "Using " << configured_value << " instead of " << desired_value);
}
}

// Set the TCP_NODELAY option
socket.set_option(asio::ip::tcp::no_delay(options->enable_tcp_nodelay));
}

} // namespace rtps
} // namespace fastrtps
} // namespace fastdds
} // namespace eprosima
10 changes: 10 additions & 0 deletions src/cpp/rtps/transport/TCPChannelResource.h
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,16 @@ class TCPChannelResource : public ChannelResource
const size_t& msg_size,
const asio::ip::tcp::socket::native_handle_type& socket_native_handle);

/**
* @brief Set descriptor options on a socket.
*
* @param socket Socket on which to set the options.
* @param options Descriptor with the options to set.
*/
static void set_socket_options(
asio::basic_socket<asio::ip::tcp>& socket,
const TCPTransportDescriptor* options);

TCPConnectionType tcp_connection_type_;

friend class TCPTransportInterface;
Expand Down
4 changes: 1 addition & 3 deletions src/cpp/rtps/transport/TCPChannelResourceBasic.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -200,9 +200,7 @@ asio::ip::tcp::endpoint TCPChannelResourceBasic::local_endpoint(
void TCPChannelResourceBasic::set_options(
const TCPTransportDescriptor* options)
{
socket_->set_option(socket_base::receive_buffer_size(options->receiveBufferSize));
socket_->set_option(socket_base::send_buffer_size(options->sendBufferSize));
socket_->set_option(ip::tcp::no_delay(options->enable_tcp_nodelay));
TCPChannelResource::set_socket_options(*socket_, options);
}

void TCPChannelResourceBasic::cancel()
Expand Down
4 changes: 1 addition & 3 deletions src/cpp/rtps/transport/TCPChannelResourceSecure.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -280,9 +280,7 @@ asio::ip::tcp::endpoint TCPChannelResourceSecure::local_endpoint(
void TCPChannelResourceSecure::set_options(
const TCPTransportDescriptor* options)
{
secure_socket_->lowest_layer().set_option(socket_base::receive_buffer_size(options->receiveBufferSize));
secure_socket_->lowest_layer().set_option(socket_base::send_buffer_size(options->sendBufferSize));
secure_socket_->lowest_layer().set_option(ip::tcp::no_delay(options->enable_tcp_nodelay));
TCPChannelResource::set_socket_options(secure_socket_->lowest_layer(), options);
}

void TCPChannelResourceSecure::set_tls_verify_mode(
Expand Down
86 changes: 53 additions & 33 deletions src/cpp/rtps/transport/TCPTransportInterface.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,11 @@
#include <cassert>
#include <chrono>
#include <cstring>
#include <limits>
#include <map>
#include <set>
#include <memory>
#include <mutex>
#include <set>
#include <string>
#include <thread>
#include <utility>
Expand Down Expand Up @@ -53,6 +54,7 @@
#include <fastrtps/utils/IPLocator.h>
#include <fastrtps/utils/System.h>

#include <rtps/transport/asio_helpers.hpp>
#include <statistics/rtps/messages/RTPSStatisticsMessages.hpp>
#include <utils/SystemInfo.hpp>
#include <utils/thread.hpp>
Expand Down Expand Up @@ -442,6 +444,42 @@ bool TCPTransportInterface::DoInputLocatorsMatch(
bool TCPTransportInterface::init(
const fastrtps::rtps::PropertyPolicy* properties)
{
uint32_t maximumMessageSize = s_maximumMessageSize;
uint32_t cfg_max_msg_size = configuration()->maxMessageSize;
uint32_t cfg_send_size = configuration()->sendBufferSize;
uint32_t cfg_recv_size = configuration()->receiveBufferSize;
uint32_t max_int_value = static_cast<uint32_t>(std::numeric_limits<int32_t>::max());

if (cfg_max_msg_size > maximumMessageSize)
{
EPROSIMA_LOG_ERROR(TRANSPORT_TCP, "maxMessageSize cannot be greater than " << maximumMessageSize);
return false;
}

if (cfg_send_size > max_int_value)
{
EPROSIMA_LOG_ERROR(TRANSPORT_TCP, "sendBufferSize cannot be greater than " << max_int_value);
return false;
}

if (cfg_recv_size > max_int_value)
{
EPROSIMA_LOG_ERROR(TRANSPORT_TCP, "receiveBufferSize cannot be greater than " << max_int_value);
return false;
}

if ((cfg_send_size > 0) && (cfg_max_msg_size > cfg_send_size))
{
EPROSIMA_LOG_ERROR(TRANSPORT_TCP, "maxMessageSize cannot be greater than sendBufferSize");
return false;
}

if ((cfg_recv_size > 0) && (cfg_max_msg_size > cfg_recv_size))
{
EPROSIMA_LOG_ERROR(TRANSPORT_TCP, "maxMessageSize cannot be greater than receiveBufferSize");
return false;
}

if (!apply_tls_config())
{
// TODO decide wether the Transport initialization should keep working after this error
Expand Down Expand Up @@ -474,48 +512,30 @@ bool TCPTransportInterface::init(
}

// Check system buffer sizes.
if (configuration()->sendBufferSize == 0)
{
socket_base::send_buffer_size option;
initial_peer_local_locator_socket_->get_option(option);
set_send_buffer_size(option.value());

if (configuration()->sendBufferSize < s_minimumSocketBuffer)
{
set_send_buffer_size(s_minimumSocketBuffer);
}
}

if (configuration()->receiveBufferSize == 0)
uint32_t send_size = 0;
uint32_t recv_size = 0;
if (!asio_helpers::configure_buffer_sizes(
*initial_peer_local_locator_socket_, *configuration(), send_size, recv_size))
{
socket_base::receive_buffer_size option;
initial_peer_local_locator_socket_->get_option(option);
set_receive_buffer_size(option.value());

if (configuration()->receiveBufferSize < s_minimumSocketBuffer)
{
set_receive_buffer_size(s_minimumSocketBuffer);
}
}

if (configuration()->maxMessageSize > s_maximumMessageSize)
{
EPROSIMA_LOG_ERROR(RTCP_MSG_OUT, "maxMessageSize cannot be greater than 65000");
EPROSIMA_LOG_ERROR(TRANSPORT_TCP, "Couldn't set buffer sizes to minimum value: " << cfg_max_msg_size);
return false;
}

if (configuration()->maxMessageSize > configuration()->sendBufferSize)
if (cfg_send_size > 0 && send_size != cfg_send_size)
{
EPROSIMA_LOG_ERROR(RTCP_MSG_OUT, "maxMessageSize cannot be greater than send_buffer_size");
return false;
EPROSIMA_LOG_WARNING(TRANSPORT_TCP, "UDPTransport sendBufferSize could not be set to the desired value. "
<< "Using " << send_size << " instead of " << cfg_send_size);
}

if (configuration()->maxMessageSize > configuration()->receiveBufferSize)
if (cfg_recv_size > 0 && recv_size != cfg_recv_size)
{
EPROSIMA_LOG_ERROR(RTCP_MSG_OUT, "maxMessageSize cannot be greater than receive_buffer_size");
return false;
EPROSIMA_LOG_WARNING(TRANSPORT_TCP, "UDPTransport receiveBufferSize could not be set to the desired value. "
<< "Using " << recv_size << " instead of " << cfg_recv_size);
}

set_send_buffer_size(send_size);
set_receive_buffer_size(recv_size);

if (!rtcp_message_manager_)
{
rtcp_message_manager_ = std::make_shared<RTCPMessageManager>(this);
Expand Down
Loading
Loading