Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[20119] TCP socket send buffer limit (backport #4237) #4327

Merged
merged 1 commit into from
Mar 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/cpp/rtps/attributes/RTPSParticipantAttributes.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,7 @@ static void setup_transports_large_data(

auto tcp_transport = create_tcpv4_transport(att);
att.userTransports.push_back(tcp_transport);
att.properties.properties().emplace_back("fastdds.tcp_transport.non_blocking_send", "true");

Locator_t tcp_loc;
tcp_loc.kind = LOCATOR_KIND_TCPv4;
Expand Down Expand Up @@ -234,6 +235,7 @@ static void setup_transports_large_datav6(

auto tcp_transport = create_tcpv6_transport(att);
att.userTransports.push_back(tcp_transport);
att.properties.properties().emplace_back("fastdds.tcp_transport.non_blocking_send", "true");

Locator_t tcp_loc;
tcp_loc.kind = LOCATOR_KIND_TCPv6;
Expand Down
24 changes: 24 additions & 0 deletions src/cpp/rtps/transport/TCPChannelResource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,30 @@ bool TCPChannelResource::remove_logical_port(
return true;
}

bool TCPChannelResource::check_socket_send_buffer(
const size_t& msg_size,
const asio::ip::tcp::socket::native_handle_type& socket_native_handle)
{
int bytesInSendQueue = 0;

#ifndef _WIN32
if (ioctl(socket_native_handle, TIOCOUTQ, &bytesInSendQueue) == -1)
{
bytesInSendQueue = 0;
}
#else // ifdef _WIN32
static_cast<void>(socket_native_handle);
#endif // ifndef _WIN32


size_t future_queue_size = size_t(bytesInSendQueue) + msg_size;
if (future_queue_size > size_t(parent_->configuration()->sendBufferSize))
{
return false;
}
return true;
}

} // namespace rtps
} // namespace fastrtps
} // namespace eprosima
4 changes: 4 additions & 0 deletions src/cpp/rtps/transport/TCPChannelResource.h
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,10 @@ class TCPChannelResource : public ChannelResource
const std::vector<uint16_t>& availablePorts,
RTCPMessageManager* rtcp_manager);

bool check_socket_send_buffer(
const size_t& msg_size,
const asio::ip::tcp::socket::native_handle_type& socket_native_handle);

TCPConnectionType tcp_connection_type_;

friend class TCPTransportInterface;
Expand Down
7 changes: 7 additions & 0 deletions src/cpp/rtps/transport/TCPChannelResourceBasic.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,13 @@ size_t TCPChannelResourceBasic::send(
if (eConnecting < connection_status_)
{
std::lock_guard<std::mutex> send_guard(send_mutex_);

if (parent_->get_non_blocking_send() &&
!check_socket_send_buffer(header_size + size, socket_->native_handle()))
{
return 0;
}

if (header_size > 0)
{
std::array<asio::const_buffer, 2> buffers;
Expand Down
7 changes: 7 additions & 0 deletions src/cpp/rtps/transport/TCPChannelResourceSecure.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,13 @@ size_t TCPChannelResourceSecure::send(

if (eConnecting < connection_status_)
{
if (parent_->get_non_blocking_send() &&
!check_socket_send_buffer(header_size + size,
secure_socket_->lowest_layer().native_handle()))
{
return 0;
}

std::vector<asio::const_buffer> buffers;
if (header_size > 0)
{
Expand Down
11 changes: 10 additions & 1 deletion src/cpp/rtps/transport/TCPTransportInterface.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ TCPTransportInterface::TCPTransportInterface(
int32_t transport_kind)
: TransportInterface(transport_kind)
, alive_(true)
, non_blocking_send_(false)
#if TLS_FOUND
, ssl_context_(asio::ssl::context::sslv23)
#endif // if TLS_FOUND
Expand Down Expand Up @@ -363,7 +364,7 @@ bool TCPTransportInterface::DoInputLocatorsMatch(
}

bool TCPTransportInterface::init(
const fastrtps::rtps::PropertyPolicy*)
const fastrtps::rtps::PropertyPolicy* properties)
{
if (!apply_tls_config())
{
Expand All @@ -388,6 +389,14 @@ bool TCPTransportInterface::init(
ip::tcp::endpoint local_endpoint = initial_peer_local_locator_socket_->local_endpoint();
initial_peer_local_locator_port_ = local_endpoint.port();

// Get non_blocking_send property
if (properties)
{
auto s_non_blocking_send = eprosima::fastrtps::rtps::PropertyPolicyHelper::find_property(*properties,
"fastdds.tcp_transport.non_blocking_send");
non_blocking_send_ = s_non_blocking_send && *s_non_blocking_send == "true"? true : false;
}

// Check system buffer sizes.
if (configuration()->sendBufferSize == 0)
{
Expand Down
19 changes: 19 additions & 0 deletions src/cpp/rtps/transport/TCPTransportInterface.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,19 @@ class TCPTransportInterface : public TransportInterface
asio::io_service io_service_timers_;
std::unique_ptr<asio::ip::tcp::socket> initial_peer_local_locator_socket_;
uint16_t initial_peer_local_locator_port_;
/**
* Whether to use non-blocking calls to send().
*
* When set to true, calls to send() will return immediately if the send buffer is full.
* This may happen when receive buffer on reader's side is full. No error will be returned
* to the upper layer. This means that the application will behave
* as if the datagram is sent but lost (i.e. throughput may be reduced). This value is
* specially useful on high-frequency writers.
*
* When set to false, calls to send() will block until the send buffer has space for the
* datagram. This may cause application lock.
*/
bool non_blocking_send_;

#if TLS_FOUND
asio::ssl::context ssl_context_;
Expand Down Expand Up @@ -447,6 +460,12 @@ class TCPTransportInterface : public TransportInterface
*/
void fill_local_physical_port(
Locator& locator) const;

bool get_non_blocking_send() const
{
return non_blocking_send_;
}

};

} // namespace rtps
Expand Down
205 changes: 205 additions & 0 deletions test/unittest/transport/TCPv4Tests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include "mock/MockTCPChannelResource.h"
#include "mock/MockTCPv4Transport.h"
#include <fastdds/dds/log/Log.hpp>
#include <fastdds/rtps/attributes/RTPSParticipantAttributes.h>
#include <fastrtps/transport/TCPv4TransportDescriptor.h>
#include <fastrtps/utils/Semaphore.h>
#include <fastrtps/utils/IPFinder.h>
Expand Down Expand Up @@ -1265,6 +1266,126 @@ TEST_F(TCPv4Tests, send_and_receive_between_both_secure_ports_with_sni)
}
}

#ifndef _WIN32
// The primary purpose of this test is to check the non-blocking behavior of a secure socket sending data to a
// destination that does not read or does it so slowly.
TEST_F(TCPv4Tests, secure_non_blocking_send)
{
uint16_t port = g_default_port;
uint32_t msg_size = eprosima::fastdds::rtps::s_minimumSocketBuffer;
// Create a TCP Server transport
using TLSOptions = TCPTransportDescriptor::TLSConfig::TLSOptions;
using TLSVerifyMode = TCPTransportDescriptor::TLSConfig::TLSVerifyMode;
using TLSHSRole = TCPTransportDescriptor::TLSConfig::TLSHandShakeRole;
TCPv4TransportDescriptor senderDescriptor;
senderDescriptor.add_listener_port(port);
senderDescriptor.sendBufferSize = msg_size;
senderDescriptor.tls_config.handshake_role = TLSHSRole::CLIENT;
senderDescriptor.tls_config.verify_file = "ca.crt";
senderDescriptor.tls_config.verify_mode = TLSVerifyMode::VERIFY_PEER;
senderDescriptor.tls_config.add_option(TLSOptions::DEFAULT_WORKAROUNDS);
senderDescriptor.tls_config.add_option(TLSOptions::SINGLE_DH_USE);
senderDescriptor.tls_config.add_option(TLSOptions::NO_SSLV2);
senderDescriptor.tls_config.add_option(TLSOptions::NO_COMPRESSION);
MockTCPv4Transport senderTransportUnderTest(senderDescriptor);
eprosima::fastrtps::rtps::RTPSParticipantAttributes att;
att.properties.properties().emplace_back("fastdds.tcp_transport.non_blocking_send", "true");
senderTransportUnderTest.init(&att.properties);

// Create a TCP Client socket.
// The creation of a reception transport for testing this functionality is not
// feasible. For the saturation of the sending socket, it's necessary first to
// saturate the reception socket of the datareader. This saturation requires
// preventing the datareader from reading from the socket, what inevitably
// happens continuously if instantiating and connecting the receiver transport.
// Hence, a raw socket is opened and connected to the server. There won't be read
// calls on that socket.
Locator_t serverLoc;
serverLoc.kind = LOCATOR_KIND_TCPv4;
IPLocator::setIPv4(serverLoc, 127, 0, 0, 1);
serverLoc.port = port;
IPLocator::setLogicalPort(serverLoc, 7410);

// Socket TLS config
asio::ssl::context ssl_context(asio::ssl::context::sslv23);
ssl_context.set_verify_callback([](bool preverified, asio::ssl::verify_context&)
{
return preverified;
});
ssl_context.set_password_callback([](std::size_t, asio::ssl::context_base::password_purpose)
{
return "fastddspwd";
});
ssl_context.use_certificate_chain_file("fastdds.crt");
ssl_context.use_private_key_file("fastdds.key", asio::ssl::context::pem);
ssl_context.use_tmp_dh_file("dh_params.pem");

uint32_t options = 0;
options |= asio::ssl::context::default_workarounds;
options |= asio::ssl::context::single_dh_use;
options |= asio::ssl::context::no_sslv2;
options |= asio::ssl::context::no_compression;
ssl_context.set_options(options);

// TCPChannelResourceSecure::connect() like connection
asio::io_service io_service;
asio::ip::tcp::resolver resolver(io_service);
auto endpoints = resolver.resolve(
IPLocator::ip_to_string(serverLoc),
std::to_string(IPLocator::getPhysicalPort(serverLoc)));

auto secure_socket = std::make_shared<asio::ssl::stream<asio::ip::tcp::socket>>(io_service, ssl_context);
asio::ssl::verify_mode vm = 0x00;
vm |= asio::ssl::verify_peer;
secure_socket->set_verify_mode(vm);

asio::async_connect(secure_socket->lowest_layer(), endpoints,
[secure_socket](const std::error_code& ec
#if ASIO_VERSION >= 101200
, asio::ip::tcp::endpoint
#else
, const tcp::resolver::iterator& /*endpoint*/
#endif // if ASIO_VERSION >= 101200
)
{
ASSERT_TRUE(!ec);
asio::ssl::stream_base::handshake_type role = asio::ssl::stream_base::server;
secure_socket->async_handshake(role,
[](const std::error_code& ec)
{
ASSERT_TRUE(!ec);
});
});

std::this_thread::sleep_for(std::chrono::milliseconds(300));

/*
Get server's accepted channel. This is retrieved from the unbound_channel_resources_,
which is a vector where client channels are pushed immediately after the server accepts
a connection. This channel will not be present in the server's channel_resources_ map
as communication lacks most of the discovery messages using a raw socket as participant.
*/
auto sender_unbound_channel_resources = senderTransportUnderTest.get_unbound_channel_resources();
ASSERT_TRUE(sender_unbound_channel_resources.size() == 1);
auto sender_channel_resource =
std::static_pointer_cast<TCPChannelResourceBasic>(sender_unbound_channel_resources[0]);

// Prepare the message
asio::error_code ec;
std::vector<octet> message(msg_size, 0);
const octet* data = message.data();
size_t size = message.size();

// Send the message with no header
for (int i = 0; i < 5; i++)
{
sender_channel_resource->send(nullptr, 0, data, size, ec);
}

secure_socket->lowest_layer().close(ec);
}
#endif // ifndef _WIN32

#endif //TLS_FOUND

TEST_F(TCPv4Tests, send_and_receive_between_allowed_localhost_interfaces_ports)
Expand Down Expand Up @@ -1698,6 +1819,90 @@ TEST_F(TCPv4Tests, client_announced_local_port_uniqueness)
ASSERT_EQ(receiveTransportUnderTest.get_channel_resources().size(), 2);
}

#ifndef _WIN32
// The primary purpose of this test is to check the non-blocking behavior of a secure socket sending data to a
// destination that does not read or does it so slowly.
TEST_F(TCPv4Tests, non_blocking_send)
{
uint16_t port = g_default_port;
uint32_t msg_size = eprosima::fastdds::rtps::s_minimumSocketBuffer;
// Create a TCP Server transport
TCPv4TransportDescriptor senderDescriptor;
senderDescriptor.add_listener_port(port);
senderDescriptor.sendBufferSize = msg_size;
MockTCPv4Transport senderTransportUnderTest(senderDescriptor);
eprosima::fastrtps::rtps::RTPSParticipantAttributes att;
att.properties.properties().emplace_back("fastdds.tcp_transport.non_blocking_send", "true");
senderTransportUnderTest.init(&att.properties);

// Create a TCP Client socket.
// The creation of a reception transport for testing this functionality is not
// feasible. For the saturation of the sending socket, it's necessary first to
// saturate the reception socket of the datareader. This saturation requires
// preventing the datareader from reading from the socket, what inevitably
// happens continuously if instantiating and connecting the receiver transport.
// Hence, a raw socket is opened and connected to the server. There won't be read
// calls on that socket.
Locator_t serverLoc;
serverLoc.kind = LOCATOR_KIND_TCPv4;
IPLocator::setIPv4(serverLoc, 127, 0, 0, 1);
serverLoc.port = port;
IPLocator::setLogicalPort(serverLoc, 7410);

// TCPChannelResourceBasic::connect() like connection
asio::io_service io_service;
asio::ip::tcp::resolver resolver(io_service);
auto endpoints = resolver.resolve(
IPLocator::ip_to_string(serverLoc),
std::to_string(IPLocator::getPhysicalPort(serverLoc)));

asio::ip::tcp::socket socket = asio::ip::tcp::socket (io_service);
asio::async_connect(
socket,
endpoints,
[](std::error_code ec
#if ASIO_VERSION >= 101200
, asio::ip::tcp::endpoint
#else
, asio::ip::tcp::resolver::iterator
#endif // if ASIO_VERSION >= 101200
)
{
ASSERT_TRUE(!ec);
}
);

std::this_thread::sleep_for(std::chrono::milliseconds(100));

/*
Get server's accepted channel. This is retrieved from the unbound_channel_resources_,
which is a vector where client channels are pushed immediately after the server accepts
a connection. This channel will not be present in the server's channel_resources_ map
as communication lacks most of the discovery messages using a raw socket as participant.
*/
auto sender_unbound_channel_resources = senderTransportUnderTest.get_unbound_channel_resources();
ASSERT_TRUE(sender_unbound_channel_resources.size() == 1);
auto sender_channel_resource =
std::static_pointer_cast<TCPChannelResourceBasic>(sender_unbound_channel_resources[0]);

// Prepare the message
asio::error_code ec;
std::vector<octet> message(msg_size, 0);
const octet* data = message.data();
size_t size = message.size();

// Send the message with no header
for (int i = 0; i < 5; i++)
{
sender_channel_resource->send(nullptr, 0, data, size, ec);
}

socket.shutdown(asio::ip::tcp::socket::shutdown_both);
socket.cancel();
socket.close();
}
#endif // ifndef _WIN32

void TCPv4Tests::HELPER_SetDescriptorDefaults()
{
descriptor.add_listener_port(g_default_port);
Expand Down
Loading
Loading