Skip to content

Commit

Permalink
SHM sending improvements (#3642)
Browse files Browse the repository at this point in the history
* Refs #18966. Refactor of try_push.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs #18966. Retry try_push after regenerating output port.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs #18966. Clean all output ports on every send.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs #18966. Clean output ports when they have no listeners.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs #18966: Applied suggestions

Signed-off-by: Javier Santiago <javiersantiago@eprosima.com>

* Refs #18966: Uncrustify

Signed-off-by: Javier Santiago <javiersantiago@eprosima.com>

* Refs #18966: Prevent port cleanup on Windows platforms

Signed-off-by: Javier Santiago <javiersantiago@eprosima.com>

---------

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>
Signed-off-by: Javier Santiago <javiersantiago@eprosima.com>
Co-authored-by: Miguel Company <MiguelCompany@eprosima.com>
  • Loading branch information
jsan-rt and MiguelCompany authored Jul 13, 2023
1 parent c92d2f9 commit 2f80b06
Show file tree
Hide file tree
Showing 5 changed files with 123 additions and 17 deletions.
8 changes: 8 additions & 0 deletions src/cpp/rtps/transport/shared_mem/SharedMemGlobal.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -626,6 +626,14 @@ class SharedMemGlobal
return node_->is_port_ok;
}

/**
* Checks if a port is OK and is opened for reading with listeners active
*/
inline bool port_has_listeners() const
{
return node_->is_port_ok && node_->is_opened_for_reading && node_->num_listeners > 0;
}

inline uint32_t port_id() const
{
return node_->port_id;
Expand Down
16 changes: 15 additions & 1 deletion src/cpp/rtps/transport/shared_mem/SharedMemManager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -841,13 +841,26 @@ class SharedMemManager :
return *this;
}

/**
* Checks if a port is OK and opened for reading with listeners active
*/
bool has_listeners() const
{
return global_port_->port_has_listeners();
}

/**
* Try to enqueue a buffer in the port.
* @param[in, out] buffer reference to the SHM buffer to push to
* @param[out] is_port_ok true if the port is ok
* @returns false If the port's queue is full so buffer couldn't be enqueued.
*/
bool try_push(
const std::shared_ptr<Buffer>& buffer)
const std::shared_ptr<Buffer>& buffer,
bool& is_port_ok)
{
is_port_ok = true;

assert(std::dynamic_pointer_cast<SharedMemBuffer>(buffer));

SharedMemBuffer* shared_mem_buffer = std::static_pointer_cast<SharedMemBuffer>(buffer).get();
Expand Down Expand Up @@ -881,6 +894,7 @@ class SharedMemManager :
<< e.what());

regenerate_port();
is_port_ok = false;
ret = false;
}
else
Expand Down
37 changes: 35 additions & 2 deletions src/cpp/rtps/transport/shared_mem/SharedMemTransport.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -428,6 +428,10 @@ bool SharedMemTransport::send(
{
using namespace eprosima::fastdds::statistics::rtps;

#if !defined(_WIN32)
cleanup_output_ports();
#endif // if !defined(_WIN32)

fastrtps::rtps::LocatorsIterator& it = *destination_locators_begin;

bool ret = true;
Expand Down Expand Up @@ -478,6 +482,22 @@ bool SharedMemTransport::send(

}

void SharedMemTransport::cleanup_output_ports()
{
auto it = opened_ports_.begin();
while (it != opened_ports_.end())
{
if (it->second->has_listeners())
{
++it;
}
else
{
it = opened_ports_.erase(it);
}
}
}

std::shared_ptr<SharedMemManager::Port> SharedMemTransport::find_port(
uint32_t port_id)
{
Expand Down Expand Up @@ -505,9 +525,22 @@ bool SharedMemTransport::push_discard(
{
try
{
if (!find_port(remote_locator.port)->try_push(buffer))
bool is_port_ok = false;
const size_t num_retries = 2;
for (size_t i = 0; i < num_retries && !is_port_ok; ++i)
{
EPROSIMA_LOG_INFO(RTPS_MSG_OUT, "Port " << remote_locator.port << " full. Buffer dropped");
if (!find_port(remote_locator.port)->try_push(buffer, is_port_ok))
{
if (is_port_ok)
{
EPROSIMA_LOG_INFO(RTPS_MSG_OUT, "Port " << remote_locator.port << " full. Buffer dropped");
}
else
{
EPROSIMA_LOG_WARNING(RTPS_MSG_OUT, "Port " << remote_locator.port << " inconsistent. Port dropped");
opened_ports_.erase(remote_locator.port);
}
}
}
}
catch (const std::exception& error)
Expand Down
2 changes: 2 additions & 0 deletions src/cpp/rtps/transport/shared_mem/SharedMemTransport.h
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,8 @@ class SharedMemTransport : public TransportInterface
const std::shared_ptr<SharedMemManager::Buffer>& buffer,
const Locator& remote_locator);

void cleanup_output_ports();

std::shared_ptr<SharedMemManager::Port> find_port(
uint32_t port_id);

Expand Down
77 changes: 63 additions & 14 deletions test/unittest/transport/SharedMemTests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1299,7 +1299,11 @@ TEST_F(SHMTransportTests, port_listener_dead_recover)
ASSERT_TRUE(buf != nullptr);
memset(buf->data(), 0, buf->size());
*static_cast<uint8_t*>(buf->data()) = 1u;
ASSERT_TRUE(port_sender->try_push(buf));
{
bool is_port_ok = false;
ASSERT_TRUE(port_sender->try_push(buf, is_port_ok));
ASSERT_TRUE(is_port_ok);
}

// Wait until message received
while (thread_listener2_state.load() < 1u)
Expand All @@ -1324,10 +1328,18 @@ TEST_F(SHMTransportTests, port_listener_dead_recover)

*static_cast<uint8_t*>(buf->data()) = 2u;
// This push must fail because port is not OK
ASSERT_FALSE(port_sender->try_push(buf));
{
bool is_port_ok = false;
ASSERT_FALSE(port_sender->try_push(buf, is_port_ok));
ASSERT_FALSE(is_port_ok);
}

// This push must success because port was regenerated in the last try_push call.
ASSERT_TRUE(port_sender->try_push(buf));
{
bool is_port_ok = false;
ASSERT_TRUE(port_sender->try_push(buf, is_port_ok));
ASSERT_TRUE(is_port_ok);
}

// Wait until port is regenerated
while (thread_listener2_state.load() < 3u)
Expand Down Expand Up @@ -1458,8 +1470,16 @@ TEST_F(SHMTransportTests, port_not_ok_listener_recover)
auto buffer = data_segment->alloc_buffer(1, std::chrono::steady_clock::now() + std::chrono::milliseconds(100));
*static_cast<uint8_t*>(buffer->data()) = 6;
// Fail because port regeneration
ASSERT_FALSE(managed_port->try_push(buffer));
ASSERT_TRUE(managed_port->try_push(buffer));
{
bool is_port_ok = false;
ASSERT_FALSE(managed_port->try_push(buffer, is_port_ok));
ASSERT_FALSE(is_port_ok);
}
{
bool is_port_ok = false;
ASSERT_TRUE(managed_port->try_push(buffer, is_port_ok));
ASSERT_TRUE(is_port_ok);
}

thread_listener.join();
}
Expand Down Expand Up @@ -1529,14 +1549,25 @@ TEST_F(SHMTransportTests, buffer_recover)

// Test 1 (without port overflow)
uint32_t send_counter = 0u;

bool is_port_ok = false;

while (listener1_recv_count.load() < 16u)
{
{
// The segment should never overflow
auto buf = segment->alloc_buffer(1, std::chrono::steady_clock::time_point());

ASSERT_EQ(true, pub_sub1_write->try_push(buf));
ASSERT_EQ(true, pub_sub2_write->try_push(buf));
{
is_port_ok = false;
ASSERT_TRUE(pub_sub1_write->try_push(buf, is_port_ok));
ASSERT_TRUE(is_port_ok);
}
{
is_port_ok = false;
ASSERT_TRUE(pub_sub2_write->try_push(buf, is_port_ok));
ASSERT_TRUE(is_port_ok);
}
}

{
Expand Down Expand Up @@ -1571,14 +1602,22 @@ TEST_F(SHMTransportTests, buffer_recover)
// The segment should never overflow
auto buf = segment->alloc_buffer(1u, std::chrono::steady_clock::time_point());

if (!pub_sub1_write->try_push(buf))
{
port_overflows1++;
is_port_ok = false;
if (!pub_sub1_write->try_push(buf, is_port_ok))
{
EXPECT_TRUE(is_port_ok);
port_overflows1++;
}
}

if (!pub_sub2_write->try_push(buf))
{
port_overflows2++;
is_port_ok = false;
if (!pub_sub2_write->try_push(buf, is_port_ok))
{
EXPECT_TRUE(is_port_ok);
port_overflows2++;
}
}
}

Expand All @@ -1602,8 +1641,16 @@ TEST_F(SHMTransportTests, buffer_recover)

{
auto buf = segment->alloc_buffer(1u, std::chrono::steady_clock::time_point());
ASSERT_EQ(true, pub_sub1_write->try_push(buf));
ASSERT_EQ(true, pub_sub2_write->try_push(buf));
{
is_port_ok = false;
ASSERT_TRUE(pub_sub1_write->try_push(buf, is_port_ok));
ASSERT_TRUE(is_port_ok);
}
{
is_port_ok = false;
ASSERT_TRUE(pub_sub2_write->try_push(buf, is_port_ok));
ASSERT_TRUE(is_port_ok);
}
}

thread_listener1.join();
Expand Down Expand Up @@ -1645,7 +1692,9 @@ TEST_F(SHMTransportTests, remote_segments_free)
{
if (j != i)
{
ASSERT_TRUE(ports[j]->try_push(buf));
bool is_port_ok = false;
ASSERT_TRUE(ports[j]->try_push(buf, is_port_ok));
ASSERT_TRUE(is_port_ok);
ASSERT_TRUE(listeners[j]->pop() != nullptr);
}
}
Expand Down

0 comments on commit 2f80b06

Please sign in to comment.