Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[16608] Fix total_unread_ consistent with reader's history upon get_first_untaken_info() #3203

Merged
merged 6 commits into from
Jan 17, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 3 additions & 12 deletions src/cpp/fastdds/subscriber/history/DataReaderHistory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -353,21 +353,12 @@ bool DataReaderHistory::get_first_untaken_info(
{
std::lock_guard<RecursiveTimedMutex> lock(*getMutex());

CacheChange_t* change = nullptr;
WriterProxy* wp = nullptr;
if (mp_reader->nextUntakenCache(&change, &wp))
if (!data_available_instances_.empty())
{
auto it = data_available_instances_.find(change->instanceHandle);
assert(it != data_available_instances_.end());
auto it = data_available_instances_.begin();
auto& instance_changes = it->second->cache_changes;
auto item =
std::find_if(instance_changes.cbegin(), instance_changes.cend(),
[change](const DataReaderCacheChange& v)
{
return v == change;
});
auto item = instance_changes.cbegin();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this item could be NULL? see ros2/rmw_fastrtps#659

ReadTakeCommand::generate_info(info, *(it->second), *item);
mp_reader->change_read_by_user(change, wp, false);
return true;
}

Expand Down
29 changes: 27 additions & 2 deletions test/blackbox/api/dds-pim/PubSubReader.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,8 @@ class PubSubReader
PubSubReader(
const std::string& topic_name,
bool take = true,
bool statistics = false)
bool statistics = false,
bool read = true)
: participant_listener_(*this)
, listener_(*this)
, participant_(nullptr)
Expand All @@ -293,10 +294,12 @@ class PubSubReader
, receiving_(false)
, current_processed_count_(0)
, number_samples_expected_(0)
, current_unread_count_(0)
, discovery_result_(false)
, onDiscovery_(nullptr)
, onEndpointDiscovery_(nullptr)
, take_(take)
, read_(read)
, statistics_(statistics)
#if HAVE_SECURITY
, authorized_(0)
Expand Down Expand Up @@ -550,6 +553,16 @@ class PubSubReader
return current_processed_count_;
}

size_t block_for_unread_count_of(
size_t n_unread)
{
block([this, n_unread]() -> bool
{
return current_unread_count_ >= n_unread;
});
return current_unread_count_;
}

void block(
std::function<bool()> checker)
{
Expand Down Expand Up @@ -1677,6 +1690,14 @@ class PubSubReader
type data;
eprosima::fastdds::dds::SampleInfo info;

if (!take_ && !read_)
{
current_unread_count_ = datareader->get_unread_count();
std::cout << "Total unread count " << current_unread_count_ << std::endl;
cv_.notify_one();
return;
}

ReturnCode_t success = take_ ?
datareader->take_next_sample((void*)&data, &info) :
datareader->read_next_sample((void*)&data, &info);
Expand Down Expand Up @@ -1845,6 +1866,7 @@ class PubSubReader
std::map<LastSeqInfo, eprosima::fastrtps::rtps::SequenceNumber_t> last_seq;
std::atomic<size_t> current_processed_count_;
std::atomic<size_t> number_samples_expected_;
std::atomic<size_t> current_unread_count_;
bool discovery_result_;

std::string xml_file_ = "";
Expand All @@ -1854,9 +1876,12 @@ class PubSubReader
std::function<bool(const eprosima::fastrtps::rtps::ParticipantDiscoveryInfo& info)> onDiscovery_;
std::function<bool(const eprosima::fastrtps::rtps::WriterDiscoveryInfo& info)> onEndpointDiscovery_;

//! True to take data from history. False to read
//! True to take data from history. On False, read_ is checked.
bool take_;

//! True to read data from history. False, do nothing on data reception.
bool read_;

//! True if the class is called from the statistics blackbox (specific topic name and domain id).
bool statistics_;

Expand Down
1 change: 1 addition & 0 deletions test/blackbox/api/dds-pim/PubSubWriter.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -507,6 +507,7 @@ class PubSubWriter
bool send_sample(
type& msg)
{
default_send_print(msg);
return datawriter_->write((void*)&msg);
}

Expand Down
5 changes: 4 additions & 1 deletion test/blackbox/common/DDSBlackboxTestsBasic.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include <condition_variable>
#include <mutex>
#include <thread>
#include <future>

#include <gtest/gtest.h>

Expand All @@ -34,9 +35,12 @@
#include <fastdds/dds/topic/Topic.hpp>
#include <fastdds/dds/topic/TypeSupport.hpp>
#include <fastrtps/types/TypesBase.h>
#include <fastrtps/xmlparser/XMLProfileManager.h>

#include "BlackboxTests.hpp"
#include "../types/HelloWorldPubSubTypes.h"
#include "PubSubReader.hpp"
#include "PubSubWriter.hpp"

namespace eprosima {
namespace fastdds {
Expand Down Expand Up @@ -270,7 +274,6 @@ TEST(DDSBasic, MultithreadedReaderCreationDoesNotDeadlock)
ASSERT_EQ(ReturnCode_t::RETCODE_OK, participant->delete_topic(topic));
ASSERT_EQ(ReturnCode_t::RETCODE_OK, factory->delete_participant(participant));
}

} // namespace dds
} // namespace fastdds
} // namespace eprosima
85 changes: 85 additions & 0 deletions test/blackbox/common/DDSBlackboxTestsDataReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,91 @@ TEST_P(DDSDataReader, LivelinessChangedStatusGet)

}

// Regression test of Refs #16608, Github #3203. Checks that total_unread_ variable is consistent with
// unread changes in reader's history after performing a get_first_untaken_info() on a change with no writer matched.
TEST_P(DDSDataReader, ConsistentTotalUnreadAfterGetFirstUntakenInfo)
{
//! Spawn a couple of participants writer/reader
auto pubsub_writer = std::make_shared<PubSubWriter<HelloWorldPubSubType>>(TEST_TOPIC_NAME);
//! Create a reader that does nothing when new data is available. Neither take nor read it.
auto pubsub_reader = std::make_shared<PubSubReader<HelloWorldPubSubType>>(TEST_TOPIC_NAME, false, false, false);

// Initialization of all the participants
std::cout << "Initializing PubSubs for topic " << TEST_TOPIC_NAME << std::endl;

//! Participant Writer configuration and qos
pubsub_writer->reliability(eprosima::fastdds::dds::ReliabilityQosPolicyKind::RELIABLE_RELIABILITY_QOS);
pubsub_writer->durability_kind(eprosima::fastdds::dds::DurabilityQosPolicyKind::TRANSIENT_LOCAL_DURABILITY_QOS);
pubsub_writer->history_kind(eprosima::fastdds::dds::HistoryQosPolicyKind::KEEP_ALL_HISTORY_QOS);
pubsub_writer->init();
ASSERT_EQ(pubsub_writer->isInitialized(), true);

//! Participant Reader configuration and qos
pubsub_reader->reliability(eprosima::fastdds::dds::ReliabilityQosPolicyKind::RELIABLE_RELIABILITY_QOS);
pubsub_reader->durability_kind(eprosima::fastdds::dds::DurabilityQosPolicyKind::TRANSIENT_LOCAL_DURABILITY_QOS);
pubsub_reader->history_kind(eprosima::fastdds::dds::HistoryQosPolicyKind::KEEP_ALL_HISTORY_QOS);
pubsub_reader->init();
ASSERT_EQ(pubsub_reader->isInitialized(), true);

// Wait for discovery.
pubsub_reader->wait_discovery();
pubsub_writer->wait_discovery();

auto data = default_helloworld_data_generator();

pubsub_reader->startReception(data);

std::mutex mtx;
bool should_stop(false);
auto ret = std::async(std::launch::async, [&pubsub_writer, &data, &should_stop, &mtx]
{
for (auto sample : data)
{
std::unique_lock<std::mutex> lock(mtx);
if (!should_stop)
{
lock.unlock();
pubsub_writer->send_sample(sample);
std::this_thread::sleep_for(std::chrono::milliseconds(50));
}
else
{
break;
}
}
//! drop publisher
pubsub_writer->removePublisher();
}
);

pubsub_reader->block_for_unread_count_of(3);

{
std::unique_lock<std::mutex> lock(mtx);
should_stop = true;
}

pubsub_reader->wait_writer_undiscovery();
MiguelCompany marked this conversation as resolved.
Show resolved Hide resolved

eprosima::fastdds::dds::DataReader& reader = pubsub_reader->get_native_reader();
eprosima::fastdds::dds::SampleInfo info;

//! Try reading the first untaken info.
//! Checks whether total_unread_ is consistent with
//! the number of unread changes in history
//! This API call should NOT modify the history
reader.get_first_untaken_info(&info);

HelloWorld msg;
eprosima::fastdds::dds::SampleInfo sinfo;

//! Try getting a sample
auto result = reader.take_next_sample((void*)&msg, &sinfo);

//! Assert last operation
ASSERT_EQ(result, ReturnCode_t::RETCODE_OK) << "Reader's unread count is: " << reader.get_unread_count();
}

#ifdef INSTANTIATE_TEST_SUITE_P
#define GTEST_INSTANTIATE_TEST_MACRO(x, y, z, w) INSTANTIATE_TEST_SUITE_P(x, y, z, w)
#else
Expand Down