Skip to content

Commit

Permalink
Fix total_unread_ consistent with reader's history upon get_first_unt…
Browse files Browse the repository at this point in the history
…aken_info() (#3217)

* Fix total_unread_ consistent with reader's history after get_first_untaken_info (#3203)

* Refs 16608: Added BlackBoxTest

Signed-off-by: Mario Dominguez <mariodominguez@eprosima.com>

* Refs 16608: Fix. Make get_first_untaken_info() a read-only API call

Signed-off-by: Mario Dominguez <mariodominguez@eprosima.com>

* Refs #16608: Fix tsan warning

Signed-off-by: Mario Dominguez <mariodominguez@eprosima.com>

* Refs #16608: Applied suggested changes

Signed-off-by: Mario Dominguez <mariodominguez@eprosima.com>

* Refs #16608: Applied second-reviewed suggested changes

Signed-off-by: Mario Dominguez <mariodominguez@eprosima.com>

* Refs #16608: Skip test for Data-Sharing

Signed-off-by: Mario Dominguez <mariodominguez@eprosima.com>

Signed-off-by: Mario Dominguez <mariodominguez@eprosima.com>
(cherry picked from commit de5cd9c)
Signed-off-by: Mario Dominguez <mariodominguez@eprosima.com>

* Resolved conflicts

Signed-off-by: Mario Dominguez <mariodominguez@eprosima.com>

* Refs #16608. Reversed unnecessary changes on DDSBlackboxTestsBasic

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs #16608. Not using shared pointer for PubSubReader and PubSubWriter.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs #16608. Improved test.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs #16608. Improved solution.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Linter

Signed-off-by: Mario Dominguez <mariodominguez@eprosima.com>

Signed-off-by: Mario Dominguez <mariodominguez@eprosima.com>
Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>
Co-authored-by: Mario Domínguez López <116071334+Mario-DL@users.noreply.github.com>
Co-authored-by: Mario Dominguez <mariodominguez@eprosima.com>
Co-authored-by: Miguel Company <MiguelCompany@eprosima.com>
  • Loading branch information
4 people authored Jan 25, 2023
1 parent 9bbcd31 commit cb48e90
Show file tree
Hide file tree
Showing 4 changed files with 107 additions and 20 deletions.
24 changes: 8 additions & 16 deletions src/cpp/fastdds/subscriber/history/DataReaderHistory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -353,22 +353,14 @@ bool DataReaderHistory::get_first_untaken_info(
{
std::lock_guard<RecursiveTimedMutex> lock(*getMutex());

CacheChange_t* change = nullptr;
WriterProxy* wp = nullptr;
if (mp_reader->nextUntakenCache(&change, &wp))
{
auto it = data_available_instances_.find(change->instanceHandle);
assert(it != data_available_instances_.end());
auto& instance_changes = it->second->cache_changes;
auto item =
std::find_if(instance_changes.cbegin(), instance_changes.cend(),
[change](const DataReaderCacheChange& v)
{
return v == change;
});
ReadTakeCommand::generate_info(info, *(it->second), *item);
mp_reader->change_read_by_user(change, wp, false);
return true;
for (auto& it : data_available_instances_)
{
auto& instance_changes = it.second->cache_changes;
if (!instance_changes.empty())
{
ReadTakeCommand::generate_info(info, *(it.second), instance_changes.front());
return true;
}
}

return false;
Expand Down
33 changes: 29 additions & 4 deletions test/blackbox/api/dds-pim/PubSubReader.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,8 @@ class PubSubReader
PubSubReader(
const std::string& topic_name,
bool take = true,
bool statistics = false)
bool statistics = false,
bool read = true)
: participant_listener_(*this)
, listener_(*this)
, participant_(nullptr)
Expand All @@ -293,10 +294,12 @@ class PubSubReader
, receiving_(false)
, current_processed_count_(0)
, number_samples_expected_(0)
, current_unread_count_(0)
, discovery_result_(false)
, onDiscovery_(nullptr)
, onEndpointDiscovery_(nullptr)
, take_(take)
, read_(read)
, statistics_(statistics)
#if HAVE_SECURITY
, authorized_(0)
Expand Down Expand Up @@ -548,6 +551,16 @@ class PubSubReader
return current_processed_count_;
}

size_t block_for_unread_count_of(
size_t n_unread)
{
block([this, n_unread]() -> bool
{
return current_unread_count_ >= n_unread;
});
return current_unread_count_;
}

void block(
std::function<bool()> checker)
{
Expand Down Expand Up @@ -1675,6 +1688,14 @@ class PubSubReader
type data;
eprosima::fastdds::dds::SampleInfo info;

if (!take_ && !read_)
{
current_unread_count_ = datareader->get_unread_count();
std::cout << "Total unread count " << current_unread_count_ << std::endl;
cv_.notify_one();
return;
}

ReturnCode_t success = take_ ?
datareader->take_next_sample((void*)&data, &info) :
datareader->read_next_sample((void*)&data, &info);
Expand Down Expand Up @@ -1841,8 +1862,9 @@ class PubSubReader
eprosima::fastdds::dds::TypeSupport type_;
using LastSeqInfo = std::pair<eprosima::fastrtps::rtps::InstanceHandle_t, eprosima::fastrtps::rtps::GUID_t>;
std::map<LastSeqInfo, eprosima::fastrtps::rtps::SequenceNumber_t> last_seq;
size_t current_processed_count_;
size_t number_samples_expected_;
std::atomic<size_t> current_processed_count_;
std::atomic<size_t> number_samples_expected_;
std::atomic<size_t> current_unread_count_;
bool discovery_result_;

std::string xml_file_ = "";
Expand All @@ -1852,9 +1874,12 @@ class PubSubReader
std::function<bool(const eprosima::fastrtps::rtps::ParticipantDiscoveryInfo& info)> onDiscovery_;
std::function<bool(const eprosima::fastrtps::rtps::WriterDiscoveryInfo& info)> onEndpointDiscovery_;

//! True to take data from history. False to read
//! True to take data from history. On False, read_ is checked.
bool take_;

//! True to read data from history. False, do nothing on data reception.
bool read_;

//! True if the class is called from the statistics blackbox (specific topic name and domain id).
bool statistics_;

Expand Down
1 change: 1 addition & 0 deletions test/blackbox/api/dds-pim/PubSubWriter.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -506,6 +506,7 @@ class PubSubWriter
bool send_sample(
type& msg)
{
default_send_print(msg);
return datawriter_->write((void*)&msg);
}

Expand Down
69 changes: 69 additions & 0 deletions test/blackbox/common/DDSBlackboxTestsDataReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,75 @@ TEST_P(DDSDataReader, LivelinessChangedStatusGet)

}

// Regression test of Refs #16608, Github #3203. Checks that total_unread_ variable is consistent with
// unread changes in reader's history after performing a get_first_untaken_info() on a change with no writer matched.
TEST_P(DDSDataReader, ConsistentTotalUnreadAfterGetFirstUntakenInfo)
{
if (enable_datasharing)
{
//! TODO: Datasharing changes the behavior of this test. Changes are
//! instantly removed on removePublisher() call and on the PUBListener callback
GTEST_SKIP() << "Data-sharing removes the changes instantly changing the behavior of this test. Skipping";
}

//! Spawn a couple of participants writer/reader
PubSubWriter<HelloWorldPubSubType> pubsub_writer(TEST_TOPIC_NAME);
//! Create a reader that does nothing when new data is available. Neither take nor read it.
PubSubReader<HelloWorldPubSubType> pubsub_reader(TEST_TOPIC_NAME, false, false, false);

// Initialization of all the participants
std::cout << "Initializing PubSubs for topic " << TEST_TOPIC_NAME << std::endl;

//! Participant Writer configuration and qos
pubsub_writer.reliability(eprosima::fastdds::dds::ReliabilityQosPolicyKind::RELIABLE_RELIABILITY_QOS)
.durability_kind(eprosima::fastdds::dds::DurabilityQosPolicyKind::TRANSIENT_LOCAL_DURABILITY_QOS)
.history_kind(eprosima::fastdds::dds::HistoryQosPolicyKind::KEEP_ALL_HISTORY_QOS)
.init();
ASSERT_EQ(pubsub_writer.isInitialized(), true);

//! Participant Reader configuration and qos
pubsub_reader.reliability(eprosima::fastdds::dds::ReliabilityQosPolicyKind::RELIABLE_RELIABILITY_QOS)
.durability_kind(eprosima::fastdds::dds::DurabilityQosPolicyKind::TRANSIENT_LOCAL_DURABILITY_QOS)
.history_kind(eprosima::fastdds::dds::HistoryQosPolicyKind::KEEP_ALL_HISTORY_QOS)
.init();
ASSERT_EQ(pubsub_reader.isInitialized(), true);

eprosima::fastdds::dds::DataReader& reader = pubsub_reader.get_native_reader();
eprosima::fastdds::dds::SampleInfo info;

EXPECT_EQ(ReturnCode_t::RETCODE_NO_DATA, reader.get_first_untaken_info(&info));

// Wait for discovery.
pubsub_reader.wait_discovery();
pubsub_writer.wait_discovery();

auto data = default_helloworld_data_generator();

pubsub_reader.startReception(data);

pubsub_writer.send(data);
EXPECT_TRUE(data.empty());

pubsub_reader.block_for_unread_count_of(3);
pubsub_writer.removePublisher();
pubsub_reader.wait_writer_undiscovery();

//! Try reading the first untaken info.
//! Checks whether total_unread_ is consistent with
//! the number of unread changes in history
//! This API call should NOT modify the history
EXPECT_EQ(ReturnCode_t::RETCODE_OK, reader.get_first_untaken_info(&info));

HelloWorld msg;
eprosima::fastdds::dds::SampleInfo sinfo;

//! Try getting a sample
auto result = reader.take_next_sample((void*)&msg, &sinfo);

//! Assert last operation
ASSERT_EQ(result, ReturnCode_t::RETCODE_OK) << "Reader's unread count is: " << reader.get_unread_count();
}

#ifdef INSTANTIATE_TEST_SUITE_P
#define GTEST_INSTANTIATE_TEST_MACRO(x, y, z, w) INSTANTIATE_TEST_SUITE_P(x, y, z, w)
#else
Expand Down

0 comments on commit cb48e90

Please sign in to comment.