Skip to content

Commit

Permalink
Fix topic interference on liveliness_changed status (#4988) (#5032)
Browse files Browse the repository at this point in the history
* Fix topic interference on `liveliness_changed` status (#4988)

* Refs #21189. Basic infrastructure for ROS2 blackbox tests.

Signed-off-by: Miguel Company <miguelcompany@eprosima.com>

* Refs #21189. Added callback for liveliness_changed event.

Signed-off-by: Miguel Company <miguelcompany@eprosima.com>

* Refs #21189. Added ROS2 regression test.

Signed-off-by: Miguel Company <miguelcompany@eprosima.com>

* Refs #21189. Added blackbox regression test.

Signed-off-by: Miguel Company <miguelcompany@eprosima.com>

* Refs #21189. Fix StatefulReader.

Signed-off-by: Miguel Company <miguelcompany@eprosima.com>

* Refs #21189. Fix StatelessReader.

Signed-off-by: Miguel Company <miguelcompany@eprosima.com>

* Refs #21189. Uncrustify and doxygen.

Signed-off-by: Miguel Company <miguelcompany@eprosima.com>

* Refs #21245. Change liveliness announcement periods.

Signed-off-by: Miguel Company <miguelcompany@eprosima.com>

* Refs #21245. Create ROS 2 builtin endpoints.

Signed-off-by: Miguel Company <miguelcompany@eprosima.com>

* Refs #21245. Fix type and QoS on ros_discovery_info.

Signed-off-by: Miguel Company <miguelcompany@eprosima.com>

* Refs #21245. Avoid collision with true ROS 2 topics.

Signed-off-by: Miguel Company <miguelcompany@eprosima.com>

* Refs #21245. Uncrustify.

Signed-off-by: Miguel Company <miguelcompany@eprosima.com>

* Refs #21245. Fix warnings on MacOS.

Signed-off-by: Miguel Company <miguelcompany@eprosima.com>

* Refs #21245. Fix tsan reported deadlock.

Signed-off-by: Miguel Company <miguelcompany@eprosima.com>

* Refs #21189. Fix long-standing deadlock in WLP.

Signed-off-by: Miguel Company <miguelcompany@eprosima.com>

* Refs #21189. Fix build after rebase.

Signed-off-by: Miguel Company <miguelcompany@eprosima.com>

---------

Signed-off-by: Miguel Company <miguelcompany@eprosima.com>
(cherry picked from commit 9243ead)

# Conflicts:
#	src/cpp/rtps/reader/StatefulReader.cpp
#	src/cpp/rtps/reader/StatelessReader.cpp
#	test/blackbox/common/BlackboxTestsLivelinessQos.cpp

* Fix conflicts

Signed-off-by: Miguel Company <miguelcompany@eprosima.com>

* Fix build after backport.

Signed-off-by: Miguel Company <miguelcompany@eprosima.com>

---------

Signed-off-by: Miguel Company <miguelcompany@eprosima.com>
Co-authored-by: Miguel Company <miguelcompany@eprosima.com>
(cherry picked from commit 4a6b934)

# Conflicts:
#	src/cpp/rtps/reader/StatelessReader.cpp
  • Loading branch information
mergify[bot] committed Jul 9, 2024
1 parent 0e4e234 commit 43524ba
Show file tree
Hide file tree
Showing 13 changed files with 1,356 additions and 60 deletions.
3 changes: 2 additions & 1 deletion src/cpp/rtps/builtin/liveliness/WLP.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -860,10 +860,11 @@ bool WLP::remove_local_reader(

bool WLP::automatic_liveliness_assertion()
{
std::lock_guard<std::recursive_mutex> guard(*mp_builtinProtocols->mp_PDP->getMutex());
std::unique_lock<std::recursive_mutex> lock(*mp_builtinProtocols->mp_PDP->getMutex());

if (0 < automatic_writers_.size())
{
lock.unlock();
return send_liveliness_message(automatic_instance_handle_);
}

Expand Down
64 changes: 32 additions & 32 deletions src/cpp/rtps/reader/StatefulReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -354,40 +354,11 @@ bool StatefulReader::matched_writer_remove(
const GUID_t& writer_guid,
bool removed_by_lease)
{

if (is_alive_ && liveliness_lease_duration_ < c_TimeInfinite)
{
auto wlp = this->mp_RTPSParticipant->wlp();
if ( wlp != nullptr)
{
LivelinessData::WriterStatus writer_liveliness_status;
wlp->sub_liveliness_manager_->remove_writer(
writer_guid,
liveliness_kind_,
liveliness_lease_duration_,
writer_liveliness_status);

if (writer_liveliness_status == LivelinessData::WriterStatus::ALIVE)
{
wlp->update_liveliness_changed_status(writer_guid, this, -1, 0);
}
else if (writer_liveliness_status == LivelinessData::WriterStatus::NOT_ALIVE)
{
wlp->update_liveliness_changed_status(writer_guid, this, 0, -1);
}

}
else
{
EPROSIMA_LOG_ERROR(RTPS_LIVELINESS,
"Finite liveliness lease duration but WLP not enabled, cannot remove writer");
}
}

std::unique_lock<RecursiveTimedMutex> lock(mp_mutex);
WriterProxy* wproxy = nullptr;
if (is_alive_)
{
std::unique_lock<RecursiveTimedMutex> lock(mp_mutex);

//Remove cachechanges belonging to the unmatched writer
mp_history->writer_unmatched(writer_guid, get_last_notified(writer_guid));

Expand Down Expand Up @@ -439,7 +410,36 @@ bool StatefulReader::matched_writer_remove(
}
}

return (wproxy != nullptr);
bool ret_val = (wproxy != nullptr);
if (ret_val && liveliness_lease_duration_ < c_TimeInfinite)
{
auto wlp = this->mp_RTPSParticipant->wlp();
if ( wlp != nullptr)
{
LivelinessData::WriterStatus writer_liveliness_status;
wlp->sub_liveliness_manager_->remove_writer(
writer_guid,
liveliness_kind_,
liveliness_lease_duration_,
writer_liveliness_status);

if (writer_liveliness_status == LivelinessData::WriterStatus::ALIVE)
{
wlp->update_liveliness_changed_status(writer_guid, this, -1, 0);
}
else if (writer_liveliness_status == LivelinessData::WriterStatus::NOT_ALIVE)
{
wlp->update_liveliness_changed_status(writer_guid, this, 0, -1);
}
}
else
{
EPROSIMA_LOG_ERROR(RTPS_LIVELINESS,
"Finite liveliness lease duration but WLP not enabled, cannot remove writer");
}
}

return ret_val;
}

bool StatefulReader::matched_writer_is_matched(
Expand Down
73 changes: 46 additions & 27 deletions src/cpp/rtps/reader/StatelessReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -204,33 +204,8 @@ bool StatelessReader::matched_writer_remove(
const GUID_t& writer_guid,
bool removed_by_lease)
{
if (liveliness_lease_duration_ < c_TimeInfinite)
{
auto wlp = mp_RTPSParticipant->wlp();
if ( wlp != nullptr)
{
LivelinessData::WriterStatus writer_liveliness_status;
wlp->sub_liveliness_manager_->remove_writer(
writer_guid,
liveliness_kind_,
liveliness_lease_duration_,
writer_liveliness_status);
bool ret_val = false;

if (writer_liveliness_status == LivelinessData::WriterStatus::ALIVE)
{
wlp->update_liveliness_changed_status(writer_guid, this, -1, 0);
}
else if (writer_liveliness_status == LivelinessData::WriterStatus::NOT_ALIVE)
{
wlp->update_liveliness_changed_status(writer_guid, this, 0, -1);
}
}
else
{
EPROSIMA_LOG_ERROR(RTPS_LIVELINESS,
"Finite liveliness lease duration but WLP not enabled, cannot remove writer");
}
}
{
std::unique_lock<RecursiveTimedMutex> guard(mp_mutex);

Expand Down Expand Up @@ -260,11 +235,55 @@ bool StatelessReader::matched_writer_remove(
guard.unlock();
listener->on_writer_discovery(this, WriterDiscoveryInfo::REMOVED_WRITER, writer_guid, nullptr);
}
<<<<<<< HEAD
return true;
=======

#ifdef FASTDDS_STATISTICS
// notify monitor service so that the connectionlist for this entity
// could be updated
if (nullptr != mp_RTPSParticipant->get_connections_observer() && !m_guid.is_builtin())
{
mp_RTPSParticipant->get_connections_observer()->on_local_entity_connections_change(m_guid);
}
#endif //FASTDDS_STATISTICS

ret_val = true;
break;
>>>>>>> 4a6b93479 (Fix topic interference on `liveliness_changed` status (#4988) (#5032))
}
}
}
return false;

if (ret_val && liveliness_lease_duration_ < c_TimeInfinite)
{
auto wlp = mp_RTPSParticipant->wlp();
if ( wlp != nullptr)
{
LivelinessData::WriterStatus writer_liveliness_status;
wlp->sub_liveliness_manager_->remove_writer(
writer_guid,
liveliness_kind_,
liveliness_lease_duration_,
writer_liveliness_status);

if (writer_liveliness_status == LivelinessData::WriterStatus::ALIVE)
{
wlp->update_liveliness_changed_status(writer_guid, this, -1, 0);
}
else if (writer_liveliness_status == LivelinessData::WriterStatus::NOT_ALIVE)
{
wlp->update_liveliness_changed_status(writer_guid, this, 0, -1);
}
}
else
{
EPROSIMA_LOG_ERROR(RTPS_LIVELINESS,
"Finite liveliness lease duration but WLP not enabled, cannot remove writer");
}
}

return ret_val;
}

bool StatelessReader::matched_writer_is_matched(
Expand Down
170 changes: 170 additions & 0 deletions test/blackbox/common/BlackboxTestsLivelinessQos.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

#include "BlackboxTests.hpp"

#include <string>
#include <thread>

#include "PubSubReader.hpp"
Expand Down Expand Up @@ -2039,6 +2040,175 @@ TEST(LivelinessTests, correct_liveliness_state_one_writer_multiple_readers)
ASSERT_EQ(reader.sub_wait_liveliness_lost_for(2, std::chrono::seconds(4)), 2u);
}

/**
* This is a regression test for redmine issue #21189.
*
* The test ensures that liveliness changed status is not affected by writers on a topic different from
* the one of the reader.
*
* The test creates two readers and two writers, each reader and writer pair on a different topic.
* Writing a sample on one writer should not affect the liveliness changed status of the other reader.
* Destroying the writer should not affect the liveliness changed status of the other reader.
*/
static void test_liveliness_qos_independent_topics(
const std::string& topic_name,
eprosima::fastdds::dds::ReliabilityQosPolicyKind reliability_kind)
{
const auto lease_dutation_time = std::chrono::seconds(1);
const eprosima::fastrtps::Duration_t lease_duration(1, 0);
const eprosima::fastrtps::Duration_t announcement_period(0, 250000000);

PubSubReader<HelloWorldPubSubType> reader1(topic_name + "1");
PubSubReader<HelloWorldPubSubType> reader2(topic_name + "2");

PubSubWriter<HelloWorldPubSubType> writer1(topic_name + "1");
PubSubWriter<HelloWorldPubSubType> writer2(topic_name + "2");

// Configure and start the readers
reader1.liveliness_kind(eprosima::fastdds::dds::AUTOMATIC_LIVELINESS_QOS)
.liveliness_lease_duration(lease_duration)
.reliability(reliability_kind)
.init();
reader2.liveliness_kind(eprosima::fastdds::dds::AUTOMATIC_LIVELINESS_QOS)
.liveliness_lease_duration(lease_duration)
.reliability(reliability_kind)
.init();

// Configure and start the writers
writer1.liveliness_kind(eprosima::fastdds::dds::AUTOMATIC_LIVELINESS_QOS)
.liveliness_lease_duration(lease_duration)
.liveliness_announcement_period(announcement_period)
.reliability(reliability_kind)
.init();
writer2.liveliness_kind(eprosima::fastdds::dds::AUTOMATIC_LIVELINESS_QOS)
.liveliness_lease_duration(lease_duration)
.liveliness_announcement_period(announcement_period)
.reliability(reliability_kind)
.init();

// Wait for discovery
reader1.wait_discovery();
reader2.wait_discovery();
writer1.wait_discovery();
writer2.wait_discovery();

HelloWorldPubSubType::type data;

// Write a sample on writer1 and wait for reader1 to assert writer1's liveliness
writer1.send_sample(data);
reader1.wait_liveliness_recovered();

// Check liveliness changed status on both readers
{
auto liveliness = reader1.liveliness_changed_status();
EXPECT_EQ(liveliness.alive_count, 1);
EXPECT_EQ(liveliness.not_alive_count, 0);
}

{
auto liveliness = reader2.liveliness_changed_status();
EXPECT_EQ(liveliness.alive_count, 0);
EXPECT_EQ(liveliness.not_alive_count, 0);
}

// Write a sample on writer2 and wait for reader2 to assert writer2's liveliness
writer2.send_sample(data);
reader2.wait_liveliness_recovered();

// Check liveliness changed status on both readers
{
auto liveliness = reader1.liveliness_changed_status();
EXPECT_EQ(liveliness.alive_count, 1);
EXPECT_EQ(liveliness.not_alive_count, 0);
}

{
auto liveliness = reader2.liveliness_changed_status();
EXPECT_EQ(liveliness.alive_count, 1);
EXPECT_EQ(liveliness.not_alive_count, 0);
}

// Destroy writer2 and wait twice the lease duration time
writer2.destroy();
std::this_thread::sleep_for(lease_dutation_time * 2);

// Check liveliness changed status on both readers
{
auto liveliness = reader1.liveliness_changed_status();
EXPECT_EQ(liveliness.alive_count, 1);
EXPECT_EQ(liveliness.not_alive_count, 0);
}

{
auto liveliness = reader2.liveliness_changed_status();
EXPECT_EQ(liveliness.alive_count, 0);
EXPECT_EQ(liveliness.not_alive_count, 0);
}

// Start writer2 again and wait for reader2 to assert writer2's liveliness
writer2.init();
reader2.wait_discovery();
writer2.send_sample(data);
reader2.wait_liveliness_recovered(2);

// Check liveliness changed status on both readers
{
auto liveliness = reader1.liveliness_changed_status();
EXPECT_EQ(liveliness.alive_count, 1);
EXPECT_EQ(liveliness.not_alive_count, 0);
}

{
auto liveliness = reader2.liveliness_changed_status();
EXPECT_EQ(liveliness.alive_count, 1);
EXPECT_EQ(liveliness.not_alive_count, 0);
}

// Destroy writer1 and wait twice the lease duration time
writer1.destroy();
std::this_thread::sleep_for(lease_dutation_time * 2);

// Check liveliness changed status on both readers
{
auto liveliness = reader1.liveliness_changed_status();
EXPECT_EQ(liveliness.alive_count, 0);
EXPECT_EQ(liveliness.not_alive_count, 0);
}

{
auto liveliness = reader2.liveliness_changed_status();
EXPECT_EQ(liveliness.alive_count, 1);
EXPECT_EQ(liveliness.not_alive_count, 0);
}

// Destroy writer2 and wait twice the lease duration time
writer2.destroy();
std::this_thread::sleep_for(lease_dutation_time * 2);

// Check liveliness changed status on both readers
{
auto liveliness = reader1.liveliness_changed_status();
EXPECT_EQ(liveliness.alive_count, 0);
EXPECT_EQ(liveliness.not_alive_count, 0);
}

{
auto liveliness = reader2.liveliness_changed_status();
EXPECT_EQ(liveliness.alive_count, 0);
EXPECT_EQ(liveliness.not_alive_count, 0);
}
}

TEST_P(LivelinessQos, IndependentTopics_reliable)
{
test_liveliness_qos_independent_topics(TEST_TOPIC_NAME, eprosima::fastdds::dds::RELIABLE_RELIABILITY_QOS);
}

TEST_P(LivelinessQos, IndependentTopics_besteffort)
{
test_liveliness_qos_independent_topics(TEST_TOPIC_NAME, eprosima::fastdds::dds::BEST_EFFORT_RELIABILITY_QOS);
}

#ifdef INSTANTIATE_TEST_SUITE_P
#define GTEST_INSTANTIATE_TEST_MACRO(x, y, z, w) INSTANTIATE_TEST_SUITE_P(x, y, z, w)
#else
Expand Down
Loading

0 comments on commit 43524ba

Please sign in to comment.