From 3263926d4d61705a7a7eead50a701b917030e034 Mon Sep 17 00:00:00 2001 From: Miguel Company Date: Fri, 22 Mar 2024 16:26:54 +0100 Subject: [PATCH] Fix on_sample_lost notification on best-effort readers for fragmented samples (#4187) * Refs #20162. Regression test. Signed-off-by: Miguel Company * Refs #20162. Notify sample lost when dropping fragmented change. Signed-off-by: Miguel Company * Refs #20167. Linters. Signed-off-by: Miguel Company * Refs #20162. Apply suggestions. Signed-off-by: Miguel Company * Refs #20162. Use constexpr for buffer size. Signed-off-by: Miguel Company * Refs #20162. Lower buffer size. Signed-off-by: Miguel Company * Refs #20351. Uncrustify. Signed-off-by: Miguel Company --------- Signed-off-by: Miguel Company (cherry picked from commit 5ac198e80fb19ba8d780a0058a026b7e22ca1ef2) --- src/cpp/rtps/reader/StatelessReader.cpp | 20 +++ .../common/DDSBlackboxTestsListeners.cpp | 131 +++++++++++++++++- 2 files changed, 146 insertions(+), 5 deletions(-) diff --git a/src/cpp/rtps/reader/StatelessReader.cpp b/src/cpp/rtps/reader/StatelessReader.cpp index ee3703184fb..9bc3fe8a02c 100644 --- a/src/cpp/rtps/reader/StatelessReader.cpp +++ b/src/cpp/rtps/reader/StatelessReader.cpp @@ -639,6 +639,26 @@ bool StatelessReader::processDataFragMsg( { if (work_change->sequenceNumber < change_to_add->sequenceNumber) { + SequenceNumber_t updated_seq = work_change->sequenceNumber; + SequenceNumber_t previous_seq{ 0, 0 }; + previous_seq = update_last_notified(writer_guid, updated_seq); + + // Notify lost samples + auto listener = getListener(); + if (listener != nullptr) + { + if (SequenceNumber_t{ 0, 0 } != previous_seq) + { + assert(previous_seq < updated_seq); + uint64_t tmp = (updated_seq - previous_seq).to64long(); + int32_t lost_samples = + tmp > static_cast(std::numeric_limits::max()) ? + std::numeric_limits::max() : static_cast(tmp); + assert (0 < lost_samples); + listener->on_sample_lost(this, lost_samples); + } + } + // Pending change should be dropped. Check if it can be reused if (sampleSize <= work_change->serializedPayload.max_size) { diff --git a/test/blackbox/common/DDSBlackboxTestsListeners.cpp b/test/blackbox/common/DDSBlackboxTestsListeners.cpp index 90a0d9d4bfb..4d5fda3c560 100644 --- a/test/blackbox/common/DDSBlackboxTestsListeners.cpp +++ b/test/blackbox/common/DDSBlackboxTestsListeners.cpp @@ -674,8 +674,9 @@ TEST_P(DDSStatus, DataAvailableConditions) subscriber_reader.wait_waitset_timeout(); } +template void sample_lost_test_dw_init( - PubSubWriter& writer) + PubSubWriter& writer) { auto testTransport = std::make_shared(); testTransport->drop_data_messages_filter_ = [](eprosima::fastrtps::rtps::CDRMessage_t& msg)-> bool @@ -683,7 +684,8 @@ void sample_lost_test_dw_init( uint32_t old_pos = msg.pos; // see RTPS DDS 9.4.5.3 Data Submessage - EntityId_t readerID, writerID; + EntityId_t readerID; + EntityId_t writerID; SequenceNumber_t sn; msg.pos += 2; // flags @@ -711,6 +713,43 @@ void sample_lost_test_dw_init( return false; }; + testTransport->drop_data_frag_messages_filter_ = [](eprosima::fastrtps::rtps::CDRMessage_t& msg)-> bool + { + uint32_t old_pos = msg.pos; + + // see RTPS DDS 9.4.5.4 DataFrag Submessage + EntityId_t readerID; + EntityId_t writerID; + SequenceNumber_t sn; + uint32_t first_fragment = 0; + + msg.pos += 2; // flags + msg.pos += 2; // octets to inline quos + CDRMessage::readEntityId(&msg, &readerID); + CDRMessage::readEntityId(&msg, &writerID); + CDRMessage::readSequenceNumber(&msg, &sn); + CDRMessage::readUInt32(&msg, &first_fragment); + + // restore buffer pos + msg.pos = old_pos; + + // generate losses + if ((writerID.value[3] & 0xC0) == 0 // only user endpoints + && (1 == first_fragment) // only first fragment + && (sn == SequenceNumber_t{0, 2} || + sn == SequenceNumber_t(0, 3) || + sn == SequenceNumber_t(0, 4) || + sn == SequenceNumber_t(0, 6) || + sn == SequenceNumber_t(0, 8) || + sn == SequenceNumber_t(0, 10) || + sn == SequenceNumber_t(0, 11) || + sn == SequenceNumber_t(0, 13))) + { + return true; + } + + return false; + }; writer.disable_builtin_transport() @@ -721,19 +760,29 @@ void sample_lost_test_dw_init( } +template void sample_lost_test_dr_init( - PubSubReader& reader, + PubSubReader& reader, std::function functor) { + // We want to ensure that samples are only lost due to the custom filter we have set in sample_lost_test_dw_init. + // Since we are going to send 300KB samples in the test for fragments, let's increase the buffer size to avoid any + // other possible loss. + constexpr uint32_t BUFFER_SIZE = + 300ul * 1024ul // sample size + * 13ul // number of samples + * 2ul; // 2x to avoid any possible loss + reader.socket_buffer_size(BUFFER_SIZE); reader.sample_lost_status_functor(functor) .init(); ASSERT_TRUE(reader.isInitialized()); } +template void sample_lost_test_init( - PubSubReader& reader, - PubSubWriter& writer, + PubSubReader& reader, + PubSubWriter& writer, std::function functor) { sample_lost_test_dw_init(writer); @@ -802,6 +851,78 @@ TEST(DDSStatus, sample_lost_be_dw_be_dr) }); } +/*! + * \test DDS-STS-SLS-01 Test `SampleLostStatus` in a Best-Effort DataWriter and a Best-Effort DataReader communication. + * This is also a regression test for bug redmine 20162 + */ +TEST(DDSStatus, sample_lost_be_dw_be_dr_fragments) +{ + PubSubReader reader(TEST_TOPIC_NAME); + PubSubWriter writer(TEST_TOPIC_NAME); + + std::mutex test_step_mtx; + std::condition_variable test_step_cv; + uint8_t test_step = 0; + + writer.reliability(eprosima::fastdds::dds::BEST_EFFORT_RELIABILITY_QOS); + reader.reliability(eprosima::fastdds::dds::BEST_EFFORT_RELIABILITY_QOS); + + sample_lost_test_init(reader, writer, [&test_step_mtx, &test_step_cv, &test_step]( + const eprosima::fastdds::dds::SampleLostStatus& status) + { + { + std::unique_lock lock(test_step_mtx); + std::cout << status.total_count << " " << status.total_count_change << std::endl; + if (0 == test_step && 1 == status.total_count && 1 == status.total_count_change) + { + ++test_step; + } + else if (1 == test_step && 2 == status.total_count && 1 == status.total_count_change) + { + ++test_step; + } + else if (2 == test_step && 3 == status.total_count && 1 == status.total_count_change) + { + ++test_step; + } + else if (3 == test_step && 4 == status.total_count && 1 == status.total_count_change) + { + ++test_step; + } + else if (4 == test_step && 5 == status.total_count && 1 == status.total_count_change) + { + ++test_step; + } + else if (5 == test_step && 6 == status.total_count && 1 == status.total_count_change) + { + ++test_step; + } + else if (6 == test_step && 7 == status.total_count && 1 == status.total_count_change) + { + ++test_step; + } + else + { + test_step = 0; + } + } + + test_step_cv.notify_all(); + }); + + + auto data = default_data300kb_data_generator(13); + + reader.startReception(data); + writer.send(data, 100); + + std::unique_lock lock(test_step_mtx); + test_step_cv.wait(lock, [&test_step]() + { + return 7 == test_step; + }); +} + /*! * \test DDS-STS-SLS-02 Test `SampleLostStatus` in a Best-Effort DataWriter and a late-joiner Best-Effort DataReader * communication.