From 2987b8814e1149ecf6981028359668a67426b151 Mon Sep 17 00:00:00 2001 From: "mergify[bot]" <37929162+mergify[bot]@users.noreply.github.com> Date: Tue, 28 May 2024 08:11:08 +0200 Subject: [PATCH] Fix on_sample_lost notification on best-effort readers for fragmented samples (#4187) (#4608) * Handle errors when setting socket buffer sizes (#4760) (#4796) * Refs #20972. Method socket_buffer_size in DDS_PIM helpers sets also sending buffer. Signed-off-by: Miguel Company * Refs #20972. Method socket_buffer_size in fastrtps_deprecated helpers sets also sending buffer. Signed-off-by: Miguel Company * Refs #20972. Improvements in on_sample_lost blackbox tests. Signed-off-by: Miguel Company * Refs #20972. Move code into new private methods. Signed-off-by: Miguel Company * Refs #20972. Refactor on configure_send_buffer_size. Signed-off-by: Miguel Company * Refs #20972. Refactor on configure_receive_buffer_size. Signed-off-by: Miguel Company * Refs #20972. Check user configuration at the beginning of init method. Signed-off-by: Miguel Company * Refs #20972. Use maxMessageSize as minimum possible value. Signed-off-by: Miguel Company * Refs #20972. Applying changes on OpenAndBindUnicastOutputSocket. Signed-off-by: Miguel Company * Refs #20972. Applying changes on CreateInputChannelResource. Signed-off-by: Miguel Company * Revert "Refs #20972. Applying changes on CreateInputChannelResource." This reverts commit ed848e9de267fcfdbe1cb7294b2e408d85a33575. * Refs #20972. Add helper header with template method. Signed-off-by: Miguel Company * Refs #20972. Configure methods return boolean. Signed-off-by: Miguel Company * Refs #20972. Configure methods use new template method. Signed-off-by: Miguel Company * Refs #20972. OpenAndBindUnicastOutputSocket uses new template method. Signed-off-by: Miguel Company * Refs #20972. Changes in OpenAndBindInputSocket. Signed-off-by: Miguel Company * Refs #20972.Setting options on TCP channels. Signed-off-by: Miguel Company * Refs #20972. Doxygen. Signed-off-by: Miguel Company * Refs #20972. Check limits of configured sizes. Signed-off-by: Miguel Company * Refs #20972. Add UDP unit tests. Signed-off-by: Miguel Company * Refs #20972. Add TCP unit tests. Signed-off-by: Miguel Company * Refs #20972. Move checks in TCP to beginning of init. Signed-off-by: Miguel Company * Refs #20972. Refactor for common code in UDP. Signed-off-by: Miguel Company * Refs #20972. Refactor for common code in TCP. Signed-off-by: Miguel Company * Refs #20972. Remove unused constants in UDP tests. Signed-off-by: Miguel Company * Refs #20972. Check final configuration on unit tests. Signed-off-by: Miguel Company * Refs #20972. Uncrustify. Signed-off-by: Miguel Company * Refs #20972. Less strict tests. Signed-off-by: Miguel Company * Refs #20972. Remove `s_minimumSocketBuffer` from tests. Signed-off-by: Miguel Company * Refs #20972. Deprecate `s_minimumSocketBuffer`. Signed-off-by: Miguel Company --------- Signed-off-by: Miguel Company (cherry picked from commit 53cd211a8449bac59c13e18912ebd7a9262de51c) # Conflicts: # src/cpp/rtps/transport/TCPTransportInterface.cpp # src/cpp/rtps/transport/UDPTransportInterface.cpp # src/cpp/rtps/transport/UDPv4Transport.cpp # src/cpp/rtps/transport/UDPv6Transport.cpp # test/blackbox/common/DDSBlackboxTestsListeners.cpp * Refs #21036. Fix conflicts. Signed-off-by: Miguel Company * Refs #21036. Update for non-backported changes Signed-off-by: Miguel Company * Fix on_sample_lost notification on best-effort readers for fragmented samples (#4187) * Refs #20162. Regression test. Signed-off-by: Miguel Company * Refs #20162. Notify sample lost when dropping fragmented change. Signed-off-by: Miguel Company * Refs #20167. Linters. Signed-off-by: Miguel Company * Refs #20162. Apply suggestions. Signed-off-by: Miguel Company * Refs #20162. Use constexpr for buffer size. Signed-off-by: Miguel Company * Refs #20162. Lower buffer size. Signed-off-by: Miguel Company * Refs #20351. Uncrustify. Signed-off-by: Miguel Company --------- Signed-off-by: Miguel Company (cherry picked from commit 5ac198e80fb19ba8d780a0058a026b7e22ca1ef2) * Make sample_lost_be_dw_be_dr_fragments test less flaky (#4620) * Refs #20692. Make sample_lost_be_dw_be_dr_fragments test less flakey. Signed-off-by: Miguel Company * Refs #20692. Uncrustify. Signed-off-by: Miguel Company --------- Signed-off-by: Miguel Company * Refs #20972. Improvements in on_sample_lost blackbox tests. Signed-off-by: Miguel Company --------- Signed-off-by: Miguel Company Co-authored-by: Miguel Company --- src/cpp/rtps/reader/StatelessReader.cpp | 20 +++ .../common/DDSBlackboxTestsListeners.cpp | 137 +++++++++++++++++- 2 files changed, 152 insertions(+), 5 deletions(-) diff --git a/src/cpp/rtps/reader/StatelessReader.cpp b/src/cpp/rtps/reader/StatelessReader.cpp index d39eb0176ea..6d36964b398 100644 --- a/src/cpp/rtps/reader/StatelessReader.cpp +++ b/src/cpp/rtps/reader/StatelessReader.cpp @@ -708,6 +708,26 @@ bool StatelessReader::processDataFragMsg( { if (work_change->sequenceNumber < change_to_add->sequenceNumber) { + SequenceNumber_t updated_seq = work_change->sequenceNumber; + SequenceNumber_t previous_seq{ 0, 0 }; + previous_seq = update_last_notified(writer_guid, updated_seq); + + // Notify lost samples + auto listener = getListener(); + if (listener != nullptr) + { + if (SequenceNumber_t{ 0, 0 } != previous_seq) + { + assert(previous_seq < updated_seq); + uint64_t tmp = (updated_seq - previous_seq).to64long(); + int32_t lost_samples = + tmp > static_cast(std::numeric_limits::max()) ? + std::numeric_limits::max() : static_cast(tmp); + assert (0 < lost_samples); + listener->on_sample_lost(this, lost_samples); + } + } + // Pending change should be dropped. Check if it can be reused if (sampleSize <= work_change->serializedPayload.max_size) { diff --git a/test/blackbox/common/DDSBlackboxTestsListeners.cpp b/test/blackbox/common/DDSBlackboxTestsListeners.cpp index 1d7c40933bb..873537a8da3 100644 --- a/test/blackbox/common/DDSBlackboxTestsListeners.cpp +++ b/test/blackbox/common/DDSBlackboxTestsListeners.cpp @@ -674,16 +674,29 @@ TEST_P(DDSStatus, DataAvailableConditions) subscriber_reader.wait_waitset_timeout(); } +// We want to ensure that samples are only lost due to the custom filter we have set in sample_lost_test_dw_init. +// Since we are going to send 300KB samples in the test for fragments, let's increase the buffer size to avoid any +// other possible loss. +static constexpr uint32_t SAMPLE_LOST_TEST_BUFFER_SIZE = + 300ul * 1024ul // sample size + * 13ul // number of samples + * 2ul; // 2x to avoid any possible loss + +template void sample_lost_test_dw_init( - PubSubWriter& writer) + PubSubWriter& writer) { auto testTransport = std::make_shared(); + testTransport->sendBufferSize = SAMPLE_LOST_TEST_BUFFER_SIZE; + testTransport->receiveBufferSize = SAMPLE_LOST_TEST_BUFFER_SIZE; + testTransport->drop_data_messages_filter_ = [](eprosima::fastrtps::rtps::CDRMessage_t& msg)-> bool { uint32_t old_pos = msg.pos; // see RTPS DDS 9.4.5.3 Data Submessage - EntityId_t readerID, writerID; + EntityId_t readerID; + EntityId_t writerID; SequenceNumber_t sn; msg.pos += 2; // flags @@ -711,6 +724,43 @@ void sample_lost_test_dw_init( return false; }; + testTransport->drop_data_frag_messages_filter_ = [](eprosima::fastrtps::rtps::CDRMessage_t& msg)-> bool + { + uint32_t old_pos = msg.pos; + + // see RTPS DDS 9.4.5.4 DataFrag Submessage + EntityId_t readerID; + EntityId_t writerID; + SequenceNumber_t sn; + uint32_t first_fragment = 0; + + msg.pos += 2; // flags + msg.pos += 2; // octets to inline quos + CDRMessage::readEntityId(&msg, &readerID); + CDRMessage::readEntityId(&msg, &writerID); + CDRMessage::readSequenceNumber(&msg, &sn); + CDRMessage::readUInt32(&msg, &first_fragment); + + // restore buffer pos + msg.pos = old_pos; + + // generate losses + if ((writerID.value[3] & 0xC0) == 0 // only user endpoints + && (1 == first_fragment) // only first fragment + && (sn == SequenceNumber_t{0, 2} || + sn == SequenceNumber_t(0, 3) || + sn == SequenceNumber_t(0, 4) || + sn == SequenceNumber_t(0, 6) || + sn == SequenceNumber_t(0, 8) || + sn == SequenceNumber_t(0, 10) || + sn == SequenceNumber_t(0, 11) || + sn == SequenceNumber_t(0, 13))) + { + return true; + } + + return false; + }; writer.disable_builtin_transport() @@ -721,8 +771,9 @@ void sample_lost_test_dw_init( } +template void sample_lost_test_dr_init( - PubSubReader& reader, + PubSubReader& reader, std::function functor) { reader.sample_lost_status_functor(functor) @@ -731,11 +782,15 @@ void sample_lost_test_dr_init( ASSERT_TRUE(reader.isInitialized()); } +template void sample_lost_test_init( - PubSubReader& reader, - PubSubWriter& writer, + PubSubReader& reader, + PubSubWriter& writer, std::function functor) { + reader.socket_buffer_size(SAMPLE_LOST_TEST_BUFFER_SIZE); + writer.socket_buffer_size(SAMPLE_LOST_TEST_BUFFER_SIZE); + sample_lost_test_dw_init(writer); sample_lost_test_dr_init(reader, functor); @@ -802,6 +857,78 @@ TEST(DDSStatus, sample_lost_be_dw_be_dr) }); } +/*! + * \test DDS-STS-SLS-01 Test `SampleLostStatus` in a Best-Effort DataWriter and a Best-Effort DataReader communication. + * This is also a regression test for bug redmine 20162 + */ +TEST(DDSStatus, sample_lost_be_dw_be_dr_fragments) +{ + PubSubReader reader(TEST_TOPIC_NAME); + PubSubWriter writer(TEST_TOPIC_NAME); + + std::mutex test_step_mtx; + std::condition_variable test_step_cv; + uint8_t test_step = 0; + + writer.reliability(eprosima::fastdds::dds::BEST_EFFORT_RELIABILITY_QOS); + reader.reliability(eprosima::fastdds::dds::BEST_EFFORT_RELIABILITY_QOS); + + sample_lost_test_init(reader, writer, [&test_step_mtx, &test_step_cv, &test_step]( + const eprosima::fastdds::dds::SampleLostStatus& status) + { + { + std::unique_lock lock(test_step_mtx); + std::cout << status.total_count << " " << status.total_count_change << std::endl; + if (0 == test_step && 1 == status.total_count && 1 == status.total_count_change) + { + ++test_step; + } + else if (1 == test_step && 2 == status.total_count && 1 == status.total_count_change) + { + ++test_step; + } + else if (2 == test_step && 3 == status.total_count && 1 == status.total_count_change) + { + ++test_step; + } + else if (3 == test_step && 4 == status.total_count && 1 == status.total_count_change) + { + ++test_step; + } + else if (4 == test_step && 5 == status.total_count && 1 == status.total_count_change) + { + ++test_step; + } + else if (5 == test_step && 6 == status.total_count && 1 == status.total_count_change) + { + ++test_step; + } + else if (6 == test_step && 7 == status.total_count && 1 == status.total_count_change) + { + ++test_step; + } + else + { + test_step = 0; + } + } + + test_step_cv.notify_all(); + }); + + + auto data = default_data300kb_data_generator(13); + + reader.startReception(data); + writer.send(data, 100); + + std::unique_lock lock(test_step_mtx); + test_step_cv.wait(lock, [&test_step]() + { + return 7 == test_step; + }); +} + /*! * \test DDS-STS-SLS-02 Test `SampleLostStatus` in a Best-Effort DataWriter and a late-joiner Best-Effort DataReader * communication.