From 43f76560154db9099dc986c9767e20e8ba236ae0 Mon Sep 17 00:00:00 2001 From: Miguel Company Date: Fri, 30 Aug 2024 10:05:16 +0200 Subject: [PATCH 1/5] Refs #20866. Regression test. Signed-off-by: Miguel Company --- .../common/DDSBlackboxTestsOwnershipQos.cpp | 77 +++++++++++++++++++ 1 file changed, 77 insertions(+) diff --git a/test/blackbox/common/DDSBlackboxTestsOwnershipQos.cpp b/test/blackbox/common/DDSBlackboxTestsOwnershipQos.cpp index 375c6bad2b..aafbf196b6 100644 --- a/test/blackbox/common/DDSBlackboxTestsOwnershipQos.cpp +++ b/test/blackbox/common/DDSBlackboxTestsOwnershipQos.cpp @@ -2165,6 +2165,83 @@ TEST_P(OwnershipQos, exclusive_kind_keyed_besteffort_disposing_instance) exclusive_kind_keyed_disposing_instance(false); } +/*! + * This is a regression test for redmine issue 20866. + * + * This test checks that a reader with a KEEP_ALL history and an exclusive ownership policy only returns the data + * from the writer with the highest strength. + */ +TEST(OwnershipQos, exclusive_kind_keep_all_reliable) +{ + PubSubReader reader(TEST_TOPIC_NAME); + PubSubWriter low_strength_writer(TEST_TOPIC_NAME); + PubSubWriter high_strength_writer(TEST_TOPIC_NAME); + + // Prepare data. + std::list generated_data = default_keyedhelloworld_data_generator(20); + auto middle = std::next(generated_data.begin(), 10); + std::list low_strength_data(generated_data.begin(), middle); + std::list high_strength_data(middle, generated_data.end()); + auto expected_data = high_strength_data; + + // Initialize writers. + low_strength_writer.ownership_strength(3) + .history_kind(eprosima::fastdds::dds::KEEP_ALL_HISTORY_QOS) + .reliability(eprosima::fastdds::dds::RELIABLE_RELIABILITY_QOS) + .init(); + ASSERT_TRUE(low_strength_writer.isInitialized()); + + // High strength writer will use a custom transport to ensure its data is received after the low strength data. + auto test_transport = std::make_shared(); + std::atomic drop_messages(false); + test_transport->messages_filter_ = [&drop_messages](eprosima::fastdds::rtps::CDRMessage_t&) + { + return drop_messages.load(); + }; + high_strength_writer.ownership_strength(4) + .history_kind(eprosima::fastdds::dds::KEEP_ALL_HISTORY_QOS) + .reliability(eprosima::fastdds::dds::RELIABLE_RELIABILITY_QOS) + .disable_builtin_transport() + .add_user_transport_to_pparams(test_transport) + .init(); + ASSERT_TRUE(high_strength_writer.isInitialized()); + + // Initialize reader. + reader.ownership_exclusive() + .history_kind(eprosima::fastdds::dds::KEEP_ALL_HISTORY_QOS) + .reliability(eprosima::fastdds::dds::RELIABLE_RELIABILITY_QOS) + .init(); + ASSERT_TRUE(reader.isInitialized()); + + // Wait for discovery. + low_strength_writer.wait_discovery(); + high_strength_writer.wait_discovery(); + reader.wait_discovery(std::chrono::seconds::zero(), 2); + + // Send high strength data first, so it has the lowest source timestamps, but drop the messages, so they arrive + // later to the reader. + drop_messages.store(true); + high_strength_writer.send(high_strength_data); + EXPECT_TRUE(high_strength_data.empty()); + + // Send low strength data, so it has the highest source timestamps. + low_strength_writer.send(low_strength_data); + EXPECT_TRUE(low_strength_data.empty()); + // Wait for the reader to receive the data. + EXPECT_TRUE(low_strength_writer.waitForAllAcked(std::chrono::seconds(1))); + + // Let high strength writer send the data. + drop_messages.store(false); + + // Wait for the reader to receive the high strength data. + EXPECT_TRUE(high_strength_writer.waitForAllAcked(std::chrono::seconds(1))); + + // Make the reader process the data, expecting only the high strength data. + // The issue was reproduced by the reader complaining about reception of unexpected data. + reader.startReception(expected_data); + reader.block_for_all(); +} + #ifdef INSTANTIATE_TEST_SUITE_P #define GTEST_INSTANTIATE_TEST_MACRO(x, y, z, w) INSTANTIATE_TEST_SUITE_P(x, y, z, w) #else From 7cd8ed4a040f6349b8b5d7cea1957641c5d6e4e7 Mon Sep 17 00:00:00 2001 From: Miguel Company Date: Tue, 3 Sep 2024 09:06:10 +0200 Subject: [PATCH 2/5] Refs #20866. Additional regression test. Signed-off-by: Miguel Company --- .../common/DDSBlackboxTestsOwnershipQos.cpp | 82 +++++++++++++++++++ 1 file changed, 82 insertions(+) diff --git a/test/blackbox/common/DDSBlackboxTestsOwnershipQos.cpp b/test/blackbox/common/DDSBlackboxTestsOwnershipQos.cpp index aafbf196b6..32291e2d52 100644 --- a/test/blackbox/common/DDSBlackboxTestsOwnershipQos.cpp +++ b/test/blackbox/common/DDSBlackboxTestsOwnershipQos.cpp @@ -2242,6 +2242,88 @@ TEST(OwnershipQos, exclusive_kind_keep_all_reliable) reader.block_for_all(); } +/*! + * This is a regression test for redmine issue 20866. + * + * This test checks that a reader with a KEEP_ALL history and an exclusive ownership policy only does not return + * data from the writer with the lowest strength after returning data from the highest one. + */ +TEST(OwnershipQos, exclusive_kind_keep_all_reliable_mixed) +{ + PubSubReader reader(TEST_TOPIC_NAME); + PubSubWriter low_strength_writer(TEST_TOPIC_NAME); + PubSubWriter high_strength_writer(TEST_TOPIC_NAME); + + // Prepare data. + std::list generated_data = default_keyedhelloworld_data_generator(20); + auto middle = std::next(generated_data.begin(), 10); + std::list low_strength_data(generated_data.begin(), middle); + std::list high_strength_data(middle, generated_data.end()); + auto expected_data = high_strength_data; + auto it = low_strength_data.begin(); + // Expect reception of the first two samples from the low strength writer (one per instance). + expected_data.push_front(*it++); + expected_data.push_front(*it); + + // Initialize writers. + low_strength_writer.ownership_strength(3) + .history_kind(eprosima::fastdds::dds::KEEP_ALL_HISTORY_QOS) + .reliability(eprosima::fastdds::dds::RELIABLE_RELIABILITY_QOS) + .init(); + ASSERT_TRUE(low_strength_writer.isInitialized()); + + // High strength writer will use a custom transport to ensure its data is received after the low strength data. + auto test_transport = std::make_shared(); + std::atomic drop_messages(false); + test_transport->messages_filter_ = [&drop_messages](eprosima::fastdds::rtps::CDRMessage_t&) + { + return drop_messages.load(); + }; + high_strength_writer.ownership_strength(4) + .history_kind(eprosima::fastdds::dds::KEEP_ALL_HISTORY_QOS) + .reliability(eprosima::fastdds::dds::RELIABLE_RELIABILITY_QOS) + .disable_builtin_transport() + .add_user_transport_to_pparams(test_transport) + .init(); + ASSERT_TRUE(high_strength_writer.isInitialized()); + + // Initialize reader. + reader.ownership_exclusive() + .history_kind(eprosima::fastdds::dds::KEEP_ALL_HISTORY_QOS) + .reliability(eprosima::fastdds::dds::RELIABLE_RELIABILITY_QOS) + .init(); + ASSERT_TRUE(reader.isInitialized()); + + // Wait for discovery. + low_strength_writer.wait_discovery(); + high_strength_writer.wait_discovery(); + reader.wait_discovery(std::chrono::seconds::zero(), 2); + + // Drop the messages from the high strength writer, so they arrive later to the reader. + drop_messages.store(true); + + // Send one sample from each writer, with low strength data first. + while (!low_strength_data.empty() && !high_strength_data.empty()) + { + EXPECT_TRUE(low_strength_writer.send_sample(low_strength_data.front())); + EXPECT_TRUE(high_strength_writer.send_sample(high_strength_data.front())); + low_strength_data.pop_front(); + high_strength_data.pop_front(); + } + + // Wait for the reader to receive the low strength data. + EXPECT_TRUE(low_strength_writer.waitForAllAcked(std::chrono::seconds(1))); + + // Let high strength writer send the data, and wait for the reader to receive it. + drop_messages.store(false); + EXPECT_TRUE(high_strength_writer.waitForAllAcked(std::chrono::seconds(1))); + + // Make the reader process the data, expecting only the high strength data. + // The issue was reproduced by the reader complaining about reception of unexpected data. + reader.startReception(expected_data); + reader.block_for_all(); +} + #ifdef INSTANTIATE_TEST_SUITE_P #define GTEST_INSTANTIATE_TEST_MACRO(x, y, z, w) INSTANTIATE_TEST_SUITE_P(x, y, z, w) #else From 56c49002dd54ba3f7f21f7959b5a0266493f96d6 Mon Sep 17 00:00:00 2001 From: Miguel Company Date: Tue, 3 Sep 2024 09:08:02 +0200 Subject: [PATCH 3/5] Refs #20866. Fix issue. Signed-off-by: Miguel Company --- .../subscriber/history/DataReaderHistory.cpp | 26 +++++++++++++++++++ 1 file changed, 26 insertions(+) diff --git a/src/cpp/fastdds/subscriber/history/DataReaderHistory.cpp b/src/cpp/fastdds/subscriber/history/DataReaderHistory.cpp index 0dcd18a45d..1a209ad230 100644 --- a/src/cpp/fastdds/subscriber/history/DataReaderHistory.cpp +++ b/src/cpp/fastdds/subscriber/history/DataReaderHistory.cpp @@ -853,6 +853,7 @@ bool DataReaderHistory::update_instance_nts( assert(vit != instances_.end()); assert(false == change->isRead); + auto previous_owner = vit->second->current_owner.first; ++counters_.samples_unread; bool ret = vit->second->update_state(counters_, change->kind, change->writerGUID, @@ -860,6 +861,31 @@ bool DataReaderHistory::update_instance_nts( change->reader_info.disposed_generation_count = vit->second->disposed_generation_count; change->reader_info.no_writers_generation_count = vit->second->no_writers_generation_count; + auto current_owner = vit->second->current_owner.first; + if (current_owner != previous_owner) + { + assert(current_owner == change->writerGUID); + + // Remove all changes from different owners after the change. + DataReaderInstance::ChangeCollection& changes = vit->second->cache_changes; + auto it = std::lower_bound(changes.begin(), changes.end(), change, rtps::history_order_cmp); + assert(it != changes.end()); + assert(*it == change); + ++it; + while (it != changes.end()) + { + if ((*it)->writerGUID != current_owner) + { + // Remove from history + remove_change_sub(*it, it); + + // Current iterator will point to change next to the one removed. Avoid incrementing. + continue; + } + ++it; + } + } + return ret; } From 3548b2fdfe83c95bdd27b8fc8f46fbcf71e2e677 Mon Sep 17 00:00:00 2001 From: Miguel Company Date: Tue, 3 Sep 2024 15:39:38 +0200 Subject: [PATCH 4/5] Refs #20866. Fix unit tests. Signed-off-by: Miguel Company --- .../dds/subscriber/DataReaderHistoryTests.cpp | 127 +++++++----------- 1 file changed, 46 insertions(+), 81 deletions(-) diff --git a/test/unittest/dds/subscriber/DataReaderHistoryTests.cpp b/test/unittest/dds/subscriber/DataReaderHistoryTests.cpp index 1b69e5672d..2fea658022 100644 --- a/test/unittest/dds/subscriber/DataReaderHistoryTests.cpp +++ b/test/unittest/dds/subscriber/DataReaderHistoryTests.cpp @@ -51,6 +51,23 @@ class TestType : public TopicDataType (override)); }; +bool add_test_change( + eprosima::fastdds::dds::detail::DataReaderHistory& history, + eprosima::fastdds::rtps::CacheChange_t& change, + std::vector>& test_changes) +{ + ++change.sequenceNumber; + eprosima::fastdds::rtps::Time_t::now(change.sourceTimestamp); + eprosima::fastdds::rtps::CacheChange_t* new_change = new eprosima::fastdds::rtps::CacheChange_t(); + new_change->copy(&change); + new_change->reader_info.writer_ownership_strength = change.reader_info.writer_ownership_strength; + + EXPECT_TRUE(history.received_change(new_change, 0)); + bool ret = history.update_instance_nts(new_change); + test_changes.push_back(std::unique_ptr(new_change)); + return ret; +} + /*! * \test DDS-OWN-HIST-01 Tests `DataReaderInstance` handles successfully the reception of Non-Keyed samples with * different Ownership's strength. @@ -65,6 +82,7 @@ TEST(DataReaderHistory, exclusive_ownership_non_keyed_sample_reception) DataReaderHistory history(type, topic, qos); eprosima::fastdds::RecursiveTimedMutex mutex; eprosima::fastdds::rtps::StatelessReader reader(&history, &mutex); + std::vector> changes; eprosima::fastdds::rtps::CacheChange_t dw1_change; dw1_change.writerGUID = {{}, 1}; @@ -77,50 +95,32 @@ TEST(DataReaderHistory, exclusive_ownership_non_keyed_sample_reception) dw3_change.reader_info.writer_ownership_strength = 3; // Receives a sample with seq 1 from DW1 and update instance with strength 1. - ++dw1_change.sequenceNumber; - ASSERT_TRUE(history.received_change(&dw1_change, 0)); - ASSERT_TRUE(history.update_instance_nts(&dw1_change)); + ASSERT_TRUE(add_test_change(history, dw1_change, changes)); // Receives a sample with seq 1 from DW2 and update instance with strength 2. - ++dw2_change.sequenceNumber; - ASSERT_TRUE(history.received_change(&dw2_change, 0)); - ASSERT_TRUE(history.update_instance_nts(&dw2_change)); + ASSERT_TRUE(add_test_change(history, dw2_change, changes)); // Receives a sample with seq 2 from DW1 and update instance with strength 1. - ++dw1_change.sequenceNumber; - ASSERT_TRUE(history.received_change(&dw1_change, 0)); - ASSERT_FALSE(history.update_instance_nts(&dw1_change)); + ASSERT_FALSE(add_test_change(history, dw1_change, changes)); // Receives a sample with seq 2 from DW2 and update instance with strength 2. - ++dw2_change.sequenceNumber; - ASSERT_TRUE(history.received_change(&dw2_change, 0)); - ASSERT_TRUE(history.update_instance_nts(&dw2_change)); + ASSERT_TRUE(add_test_change(history, dw2_change, changes)); // Receives a sample with seq 1 from DW3 and update instance with strength 3. - ++dw3_change.sequenceNumber; - ASSERT_TRUE(history.received_change(&dw3_change, 0)); - ASSERT_TRUE(history.update_instance_nts(&dw3_change)); + ASSERT_TRUE(add_test_change(history, dw3_change, changes)); // Receives a sample with seq 3 from DW1 and update instance with strength 1. - ++dw1_change.sequenceNumber; - ASSERT_TRUE(history.received_change(&dw1_change, 0)); - ASSERT_FALSE(history.update_instance_nts(&dw1_change)); + ASSERT_FALSE(add_test_change(history, dw1_change, changes)); // Receives a sample with seq 3 from DW2 and update instance with strength 2. - ++dw2_change.sequenceNumber; - ASSERT_TRUE(history.received_change(&dw2_change, 0)); - ASSERT_FALSE(history.update_instance_nts(&dw2_change)); + ASSERT_FALSE(add_test_change(history, dw2_change, changes)); // Receives a sample with seq 2 from DW3 and update instance with strength 1. - ++dw3_change.sequenceNumber; dw3_change.reader_info.writer_ownership_strength = 1; - ASSERT_TRUE(history.received_change(&dw3_change, 0)); - ASSERT_TRUE(history.update_instance_nts(&dw3_change)); + ASSERT_TRUE(add_test_change(history, dw3_change, changes)); // Receives a sample with seq 4 from DW2 and update instance with strength 2. - ++dw2_change.sequenceNumber; - ASSERT_TRUE(history.received_change(&dw2_change, 0)); - ASSERT_TRUE(history.update_instance_nts(&dw2_change)); + ASSERT_TRUE(add_test_change(history, dw2_change, changes)); ASSERT_EQ(9u, history.getHistorySize()); } @@ -142,6 +142,7 @@ TEST(DataReaderHistory, exclusive_ownership_keyed_sample_reception) DataReaderHistory history(type, topic, qos); eprosima::fastdds::RecursiveTimedMutex mutex; eprosima::fastdds::rtps::StatelessReader reader(&history, &mutex); + std::vector> changes; const InstanceHandle_t instance_1 = eprosima::fastdds::rtps::GUID_t{{}, 1}; const InstanceHandle_t instance_2 = eprosima::fastdds::rtps::GUID_t{{}, 2}; @@ -158,112 +159,76 @@ TEST(DataReaderHistory, exclusive_ownership_keyed_sample_reception) // Receives instance 1 with seq 1 from DW1 and update instance with strength 1. dw1_change.instanceHandle = instance_1; - ++dw1_change.sequenceNumber; - ASSERT_TRUE(history.received_change(&dw1_change, 0)); - ASSERT_TRUE(history.update_instance_nts(&dw1_change)); + ASSERT_TRUE(add_test_change(history, dw1_change, changes)); // Receives instance 2 with seq 2 from DW1 and update instance with strength 1. dw1_change.instanceHandle = instance_2; - ++dw1_change.sequenceNumber; - ASSERT_TRUE(history.received_change(&dw1_change, 0)); - ASSERT_TRUE(history.update_instance_nts(&dw1_change)); + ASSERT_TRUE(add_test_change(history, dw1_change, changes)); // Receives instance 1 with seq 1 from DW2 and update instance with strength 2. dw2_change.instanceHandle = instance_1; - ++dw2_change.sequenceNumber; - ASSERT_TRUE(history.received_change(&dw2_change, 0)); - ASSERT_TRUE(history.update_instance_nts(&dw2_change)); + ASSERT_TRUE(add_test_change(history, dw2_change, changes)); // Receives instance 1 with seq 3 from DW1 and update instance with strength 1. dw1_change.instanceHandle = instance_1; - ++dw1_change.sequenceNumber; - ASSERT_TRUE(history.received_change(&dw1_change, 0)); - ASSERT_FALSE(history.update_instance_nts(&dw1_change)); + ASSERT_FALSE(add_test_change(history, dw1_change, changes)); // Receives instance 2 with seq 4 from DW1 and update instance with strength 1. dw1_change.instanceHandle = instance_2; - ++dw1_change.sequenceNumber; - ASSERT_TRUE(history.received_change(&dw1_change, 0)); - ASSERT_TRUE(history.update_instance_nts(&dw1_change)); + ASSERT_TRUE(add_test_change(history, dw1_change, changes)); // Receives instance 2 with seq 1 from DW3 and update instance with strength 3. dw3_change.instanceHandle = instance_2; - ++dw3_change.sequenceNumber; - ASSERT_TRUE(history.received_change(&dw3_change, 0)); - ASSERT_TRUE(history.update_instance_nts(&dw3_change)); + ASSERT_TRUE(add_test_change(history, dw3_change, changes)); // Receives instance 1 with seq 5 from DW1 and update instance with strength 1. dw1_change.instanceHandle = instance_1; - ++dw1_change.sequenceNumber; - ASSERT_TRUE(history.received_change(&dw1_change, 0)); - ASSERT_FALSE(history.update_instance_nts(&dw1_change)); + ASSERT_FALSE(add_test_change(history, dw1_change, changes)); // Receives instance 2 with seq 6 from DW1 and update instance with strength 1. dw1_change.instanceHandle = instance_2; - ++dw1_change.sequenceNumber; - ASSERT_TRUE(history.received_change(&dw1_change, 0)); - ASSERT_FALSE(history.update_instance_nts(&dw1_change)); + ASSERT_FALSE(add_test_change(history, dw1_change, changes)); // Receives instance 3 with seq 2 from DW3 and update instance with strength 3. dw3_change.instanceHandle = instance_3; - ++dw3_change.sequenceNumber; - ASSERT_TRUE(history.received_change(&dw3_change, 0)); - ASSERT_TRUE(history.update_instance_nts(&dw3_change)); + ASSERT_TRUE(add_test_change(history, dw3_change, changes)); // Receives instance 3 with seq 2 from DW2 and update instance with strength 2. dw2_change.instanceHandle = instance_3; - ++dw2_change.sequenceNumber; - ASSERT_TRUE(history.received_change(&dw2_change, 0)); - ASSERT_FALSE(history.update_instance_nts(&dw2_change)); + ASSERT_FALSE(add_test_change(history, dw2_change, changes)); // Receives instance 3 with seq 3 from DW3 and update instance with strength 1. dw3_change.instanceHandle = instance_3; dw3_change.reader_info.writer_ownership_strength = 1; - ++dw3_change.sequenceNumber; - ASSERT_TRUE(history.received_change(&dw3_change, 0)); - ASSERT_TRUE(history.update_instance_nts(&dw3_change)); + ASSERT_TRUE(add_test_change(history, dw3_change, changes)); // Receives instance 3 with seq 3 from DW2 and update instance with strength 2. dw2_change.instanceHandle = instance_3; - ++dw2_change.sequenceNumber; - ASSERT_TRUE(history.received_change(&dw2_change, 0)); - ASSERT_TRUE(history.update_instance_nts(&dw2_change)); + ASSERT_TRUE(add_test_change(history, dw2_change, changes)); // Receives instance 1 with seq 7 from DW1 and update instance with strength 1. dw1_change.instanceHandle = instance_1; - ++dw1_change.sequenceNumber; - ASSERT_TRUE(history.received_change(&dw1_change, 0)); - ASSERT_FALSE(history.update_instance_nts(&dw1_change)); + ASSERT_FALSE(add_test_change(history, dw1_change, changes)); // Receives instance 2 with seq 8 from DW1 and update instance with strength 1. dw1_change.instanceHandle = instance_2; - ++dw1_change.sequenceNumber; - ASSERT_TRUE(history.received_change(&dw1_change, 0)); - ASSERT_FALSE(history.update_instance_nts(&dw1_change)); + ASSERT_FALSE(add_test_change(history, dw1_change, changes)); // Receives instance 3 with seq 9 from DW1 and update instance with strength 1. dw1_change.instanceHandle = instance_3; - ++dw1_change.sequenceNumber; - ASSERT_TRUE(history.received_change(&dw1_change, 0)); - ASSERT_FALSE(history.update_instance_nts(&dw1_change)); + ASSERT_FALSE(add_test_change(history, dw1_change, changes)); // Receives instance 1 with seq 4 from DW2 and update instance with strength 2. dw2_change.instanceHandle = instance_1; - ++dw2_change.sequenceNumber; - ASSERT_TRUE(history.received_change(&dw2_change, 0)); - ASSERT_TRUE(history.update_instance_nts(&dw2_change)); + ASSERT_TRUE(add_test_change(history, dw2_change, changes)); // Receives instance 2 with seq 5 from DW2 and update instance with strength 2. dw2_change.instanceHandle = instance_2; - ++dw2_change.sequenceNumber; - ASSERT_TRUE(history.received_change(&dw2_change, 0)); - ASSERT_FALSE(history.update_instance_nts(&dw2_change)); + ASSERT_FALSE(add_test_change(history, dw2_change, changes)); // Receives instance 3 with seq 6 from DW2 and update instance with strength 2. dw2_change.instanceHandle = instance_3; - ++dw2_change.sequenceNumber; - ASSERT_TRUE(history.received_change(&dw2_change, 0)); - ASSERT_TRUE(history.update_instance_nts(&dw2_change)); + ASSERT_TRUE(add_test_change(history, dw2_change, changes)); ASSERT_EQ(18u, history.getHistorySize()); } From b886d8643b8c84f291f6210d41a5310aa1c6286b Mon Sep 17 00:00:00 2001 From: Miguel Company Date: Fri, 6 Sep 2024 12:46:56 +0200 Subject: [PATCH 5/5] Refs #20866. Refactor test to run several cases. Signed-off-by: Miguel Company --- .../common/DDSBlackboxTestsOwnershipQos.cpp | 163 ++++++++---------- 1 file changed, 70 insertions(+), 93 deletions(-) diff --git a/test/blackbox/common/DDSBlackboxTestsOwnershipQos.cpp b/test/blackbox/common/DDSBlackboxTestsOwnershipQos.cpp index 32291e2d52..6ee263351c 100644 --- a/test/blackbox/common/DDSBlackboxTestsOwnershipQos.cpp +++ b/test/blackbox/common/DDSBlackboxTestsOwnershipQos.cpp @@ -2168,106 +2168,51 @@ TEST_P(OwnershipQos, exclusive_kind_keyed_besteffort_disposing_instance) /*! * This is a regression test for redmine issue 20866. * - * This test checks that a reader with a KEEP_ALL history and an exclusive ownership policy only returns the data - * from the writer with the highest strength. - */ -TEST(OwnershipQos, exclusive_kind_keep_all_reliable) -{ - PubSubReader reader(TEST_TOPIC_NAME); - PubSubWriter low_strength_writer(TEST_TOPIC_NAME); - PubSubWriter high_strength_writer(TEST_TOPIC_NAME); - - // Prepare data. - std::list generated_data = default_keyedhelloworld_data_generator(20); - auto middle = std::next(generated_data.begin(), 10); - std::list low_strength_data(generated_data.begin(), middle); - std::list high_strength_data(middle, generated_data.end()); - auto expected_data = high_strength_data; - - // Initialize writers. - low_strength_writer.ownership_strength(3) - .history_kind(eprosima::fastdds::dds::KEEP_ALL_HISTORY_QOS) - .reliability(eprosima::fastdds::dds::RELIABLE_RELIABILITY_QOS) - .init(); - ASSERT_TRUE(low_strength_writer.isInitialized()); - - // High strength writer will use a custom transport to ensure its data is received after the low strength data. - auto test_transport = std::make_shared(); - std::atomic drop_messages(false); - test_transport->messages_filter_ = [&drop_messages](eprosima::fastdds::rtps::CDRMessage_t&) - { - return drop_messages.load(); - }; - high_strength_writer.ownership_strength(4) - .history_kind(eprosima::fastdds::dds::KEEP_ALL_HISTORY_QOS) - .reliability(eprosima::fastdds::dds::RELIABLE_RELIABILITY_QOS) - .disable_builtin_transport() - .add_user_transport_to_pparams(test_transport) - .init(); - ASSERT_TRUE(high_strength_writer.isInitialized()); - - // Initialize reader. - reader.ownership_exclusive() - .history_kind(eprosima::fastdds::dds::KEEP_ALL_HISTORY_QOS) - .reliability(eprosima::fastdds::dds::RELIABLE_RELIABILITY_QOS) - .init(); - ASSERT_TRUE(reader.isInitialized()); - - // Wait for discovery. - low_strength_writer.wait_discovery(); - high_strength_writer.wait_discovery(); - reader.wait_discovery(std::chrono::seconds::zero(), 2); - - // Send high strength data first, so it has the lowest source timestamps, but drop the messages, so they arrive - // later to the reader. - drop_messages.store(true); - high_strength_writer.send(high_strength_data); - EXPECT_TRUE(high_strength_data.empty()); - - // Send low strength data, so it has the highest source timestamps. - low_strength_writer.send(low_strength_data); - EXPECT_TRUE(low_strength_data.empty()); - // Wait for the reader to receive the data. - EXPECT_TRUE(low_strength_writer.waitForAllAcked(std::chrono::seconds(1))); - - // Let high strength writer send the data. - drop_messages.store(false); - - // Wait for the reader to receive the high strength data. - EXPECT_TRUE(high_strength_writer.waitForAllAcked(std::chrono::seconds(1))); - - // Make the reader process the data, expecting only the high strength data. - // The issue was reproduced by the reader complaining about reception of unexpected data. - reader.startReception(expected_data); - reader.block_for_all(); -} - -/*! - * This is a regression test for redmine issue 20866. + * This test checks that a reader keeping a long number of samples and with an exclusive ownership policy only + * returns the data from the writer with the highest strength. * - * This test checks that a reader with a KEEP_ALL history and an exclusive ownership policy only does not return - * data from the writer with the lowest strength after returning data from the highest one. + * @param use_keep_all_history Whether to use KEEP_ALL history or KEEP_LAST(20). + * @param mixed_data Whether to send data from both writers in an interleaved way. */ -TEST(OwnershipQos, exclusive_kind_keep_all_reliable_mixed) +static void test_exclusive_kind_big_history( + bool use_keep_all_history, + bool mixed_data) { PubSubReader reader(TEST_TOPIC_NAME); PubSubWriter low_strength_writer(TEST_TOPIC_NAME); PubSubWriter high_strength_writer(TEST_TOPIC_NAME); + // Configure history QoS. + if (use_keep_all_history) + { + reader.history_kind(eprosima::fastdds::dds::KEEP_ALL_HISTORY_QOS); + low_strength_writer.history_kind(eprosima::fastdds::dds::KEEP_ALL_HISTORY_QOS); + high_strength_writer.history_kind(eprosima::fastdds::dds::KEEP_ALL_HISTORY_QOS); + } + else + { + reader.history_kind(eprosima::fastdds::dds::KEEP_LAST_HISTORY_QOS).history_depth(20); + low_strength_writer.history_kind(eprosima::fastdds::dds::KEEP_LAST_HISTORY_QOS).history_depth(20); + high_strength_writer.history_kind(eprosima::fastdds::dds::KEEP_LAST_HISTORY_QOS).history_depth(20); + } + // Prepare data. std::list generated_data = default_keyedhelloworld_data_generator(20); auto middle = std::next(generated_data.begin(), 10); std::list low_strength_data(generated_data.begin(), middle); std::list high_strength_data(middle, generated_data.end()); auto expected_data = high_strength_data; - auto it = low_strength_data.begin(); - // Expect reception of the first two samples from the low strength writer (one per instance). - expected_data.push_front(*it++); - expected_data.push_front(*it); + + if (mixed_data) + { + // Expect reception of the first two samples from the low strength writer (one per instance). + auto it = low_strength_data.begin(); + expected_data.push_front(*it++); + expected_data.push_front(*it); + } // Initialize writers. low_strength_writer.ownership_strength(3) - .history_kind(eprosima::fastdds::dds::KEEP_ALL_HISTORY_QOS) .reliability(eprosima::fastdds::dds::RELIABLE_RELIABILITY_QOS) .init(); ASSERT_TRUE(low_strength_writer.isInitialized()); @@ -2280,7 +2225,6 @@ TEST(OwnershipQos, exclusive_kind_keep_all_reliable_mixed) return drop_messages.load(); }; high_strength_writer.ownership_strength(4) - .history_kind(eprosima::fastdds::dds::KEEP_ALL_HISTORY_QOS) .reliability(eprosima::fastdds::dds::RELIABLE_RELIABILITY_QOS) .disable_builtin_transport() .add_user_transport_to_pparams(test_transport) @@ -2289,7 +2233,6 @@ TEST(OwnershipQos, exclusive_kind_keep_all_reliable_mixed) // Initialize reader. reader.ownership_exclusive() - .history_kind(eprosima::fastdds::dds::KEEP_ALL_HISTORY_QOS) .reliability(eprosima::fastdds::dds::RELIABLE_RELIABILITY_QOS) .init(); ASSERT_TRUE(reader.isInitialized()); @@ -2302,13 +2245,27 @@ TEST(OwnershipQos, exclusive_kind_keep_all_reliable_mixed) // Drop the messages from the high strength writer, so they arrive later to the reader. drop_messages.store(true); - // Send one sample from each writer, with low strength data first. - while (!low_strength_data.empty() && !high_strength_data.empty()) + if (mixed_data) { - EXPECT_TRUE(low_strength_writer.send_sample(low_strength_data.front())); - EXPECT_TRUE(high_strength_writer.send_sample(high_strength_data.front())); - low_strength_data.pop_front(); - high_strength_data.pop_front(); + // Send one sample from each writer, with low strength data first. + while (!low_strength_data.empty() && !high_strength_data.empty()) + { + EXPECT_TRUE(low_strength_writer.send_sample(low_strength_data.front())); + EXPECT_TRUE(high_strength_writer.send_sample(high_strength_data.front())); + low_strength_data.pop_front(); + high_strength_data.pop_front(); + } + } + else + { + // Send high strength data first, so it has the lowest source timestamps, but drop the messages, so they arrive + // later to the reader. + high_strength_writer.send(high_strength_data); + EXPECT_TRUE(high_strength_data.empty()); + + // Send low strength data, so it has the highest source timestamps. + low_strength_writer.send(low_strength_data); + EXPECT_TRUE(low_strength_data.empty()); } // Wait for the reader to receive the low strength data. @@ -2318,12 +2275,32 @@ TEST(OwnershipQos, exclusive_kind_keep_all_reliable_mixed) drop_messages.store(false); EXPECT_TRUE(high_strength_writer.waitForAllAcked(std::chrono::seconds(1))); - // Make the reader process the data, expecting only the high strength data. + // Make the reader process the data, expecting only the required data. // The issue was reproduced by the reader complaining about reception of unexpected data. reader.startReception(expected_data); reader.block_for_all(); } +TEST(OwnershipQos, exclusive_kind_keep_all_reliable) +{ + test_exclusive_kind_big_history(true, false); +} + +TEST(OwnershipQos, exclusive_kind_keep_all_reliable_mixed) +{ + test_exclusive_kind_big_history(true, true); +} + +TEST(OwnershipQos, exclusive_kind_keep_last_reliable) +{ + test_exclusive_kind_big_history(false, false); +} + +TEST(OwnershipQos, exclusive_kind_keep_last_reliable_mixed) +{ + test_exclusive_kind_big_history(false, true); +} + #ifdef INSTANTIATE_TEST_SUITE_P #define GTEST_INSTANTIATE_TEST_MACRO(x, y, z, w) INSTANTIATE_TEST_SUITE_P(x, y, z, w) #else