Skip to content

Commit

Permalink
Fix total_unread_ consistent with reader's history after get_first_un…
Browse files Browse the repository at this point in the history
…taken_info (#3203)

* Refs 16608: Added BlackBoxTest

Signed-off-by: Mario Dominguez <mariodominguez@eprosima.com>

* Refs 16608: Fix. Make get_first_untaken_info() a read-only API call

Signed-off-by: Mario Dominguez <mariodominguez@eprosima.com>

* Refs #16608: Fix tsan warning

Signed-off-by: Mario Dominguez <mariodominguez@eprosima.com>

* Refs #16608: Applied suggested changes

Signed-off-by: Mario Dominguez <mariodominguez@eprosima.com>

* Refs #16608: Applied second-reviewed suggested changes

Signed-off-by: Mario Dominguez <mariodominguez@eprosima.com>

* Refs #16608: Skip test for Data-Sharing

Signed-off-by: Mario Dominguez <mariodominguez@eprosima.com>

Signed-off-by: Mario Dominguez <mariodominguez@eprosima.com>
(cherry picked from commit de5cd9c)
Signed-off-by: Mario Dominguez <mariodominguez@eprosima.com>
  • Loading branch information
Mario-DL committed Jan 18, 2023
1 parent b38ade0 commit 242ff1c
Show file tree
Hide file tree
Showing 5 changed files with 196 additions and 14 deletions.
15 changes: 3 additions & 12 deletions src/cpp/fastdds/subscriber/history/DataReaderHistory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -350,21 +350,12 @@ bool DataReaderHistory::get_first_untaken_info(
{
std::lock_guard<RecursiveTimedMutex> lock(*getMutex());

CacheChange_t* change = nullptr;
WriterProxy* wp = nullptr;
if (mp_reader->nextUntakenCache(&change, &wp))
if (!data_available_instances_.empty())
{
auto it = data_available_instances_.find(change->instanceHandle);
assert(it != data_available_instances_.end());
auto it = data_available_instances_.begin();
auto& instance_changes = it->second->cache_changes;
auto item =
std::find_if(instance_changes.cbegin(), instance_changes.cend(),
[change](const DataReaderCacheChange& v)
{
return v == change;
});
auto item = instance_changes.cbegin();
ReadTakeCommand::generate_info(info, *(it->second), *item);
mp_reader->change_read_by_user(change, wp, false);
return true;
}

Expand Down
34 changes: 32 additions & 2 deletions test/blackbox/api/dds-pim/PubSubReader.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,8 @@ class PubSubReader
PubSubReader(
const std::string& topic_name,
bool take = true,
bool statistics = false)
bool statistics = false,
bool read = true)
: participant_listener_(*this)
, listener_(*this)
, participant_(nullptr)
Expand All @@ -293,10 +294,12 @@ class PubSubReader
, receiving_(false)
, current_processed_count_(0)
, number_samples_expected_(0)
, current_unread_count_(0)
, discovery_result_(false)
, onDiscovery_(nullptr)
, onEndpointDiscovery_(nullptr)
, take_(take)
, read_(read)
, statistics_(statistics)
#if HAVE_SECURITY
, authorized_(0)
Expand Down Expand Up @@ -548,6 +551,16 @@ class PubSubReader
return current_processed_count_;
}

size_t block_for_unread_count_of(
size_t n_unread)
{
block([this, n_unread]() -> bool
{
return current_unread_count_ >= n_unread;
});
return current_unread_count_;
}

void block(
std::function<bool()> checker)
{
Expand Down Expand Up @@ -1669,6 +1682,14 @@ class PubSubReader
type data;
eprosima::fastdds::dds::SampleInfo info;

if (!take_ && !read_)
{
current_unread_count_ = datareader->get_unread_count();
std::cout << "Total unread count " << current_unread_count_ << std::endl;
cv_.notify_one();
return;
}

ReturnCode_t success = take_ ?
datareader->take_next_sample((void*)&data, &info) :
datareader->read_next_sample((void*)&data, &info);
Expand Down Expand Up @@ -1835,8 +1856,14 @@ class PubSubReader
eprosima::fastdds::dds::TypeSupport type_;
using LastSeqInfo = std::pair<eprosima::fastrtps::rtps::InstanceHandle_t, eprosima::fastrtps::rtps::GUID_t>;
std::map<LastSeqInfo, eprosima::fastrtps::rtps::SequenceNumber_t> last_seq;
<<<<<<< HEAD
size_t current_processed_count_;
size_t number_samples_expected_;
=======
std::atomic<size_t> current_processed_count_;
std::atomic<size_t> number_samples_expected_;
std::atomic<size_t> current_unread_count_;
>>>>>>> de5cd9c2e (Fix total_unread_ consistent with reader's history after get_first_untaken_info (#3203))
bool discovery_result_;
std::string xml_file_ = "";
Expand All @@ -1846,9 +1873,12 @@ class PubSubReader
std::function<bool(const eprosima::fastrtps::rtps::ParticipantDiscoveryInfo& info)> onDiscovery_;
std::function<bool(const eprosima::fastrtps::rtps::WriterDiscoveryInfo& info)> onEndpointDiscovery_;
//! True to take data from history. False to read
//! True to take data from history. On False, read_ is checked.
bool take_;
//! True to read data from history. False, do nothing on data reception.
bool read_;
//! True if the class is called from the statistics blackbox (specific topic name and domain id).
bool statistics_;
Expand Down
1 change: 1 addition & 0 deletions test/blackbox/api/dds-pim/PubSubWriter.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -506,6 +506,7 @@ class PubSubWriter
bool send_sample(
type& msg)
{
default_send_print(msg);
return datawriter_->write((void*)&msg);
}

Expand Down
93 changes: 93 additions & 0 deletions test/blackbox/common/DDSBlackboxTestsBasic.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include <condition_variable>
#include <mutex>
#include <thread>
#include <future>

#include <gtest/gtest.h>

Expand All @@ -34,9 +35,12 @@
#include <fastdds/dds/topic/Topic.hpp>
#include <fastdds/dds/topic/TypeSupport.hpp>
#include <fastrtps/types/TypesBase.h>
#include <fastrtps/xmlparser/XMLProfileManager.h>

#include "BlackboxTests.hpp"
#include "../types/HelloWorldPubSubTypes.h"
#include "PubSubReader.hpp"
#include "PubSubWriter.hpp"

namespace eprosima {
namespace fastdds {
Expand Down Expand Up @@ -184,6 +188,95 @@ TEST(DDSBasic, MultithreadedPublisherCreation)
ASSERT_EQ(ReturnCode_t::RETCODE_OK, factory->delete_participant(participant));
}

<<<<<<< HEAD
=======
TEST(DDSBasic, MultithreadedReaderCreationDoesNotDeadlock)
{
// Get factory
DomainParticipantFactory* factory = DomainParticipantFactory::get_instance();
ASSERT_NE(nullptr, factory);

// Create participant
DomainParticipant* participant = factory->create_participant((uint32_t)GET_PID() % 230, PARTICIPANT_QOS_DEFAULT);
ASSERT_NE(nullptr, participant);

// Register type
TypeSupport type_support;
type_support.reset(new FixedSizedPubSubType());
type_support.register_type(participant);
ASSERT_NE(nullptr, type_support);

// Create subscriber
Subscriber* subscriber = participant->create_subscriber(SUBSCRIBER_QOS_DEFAULT);
ASSERT_NE(nullptr, subscriber);

// Create publisher
Publisher* publisher = participant->create_publisher(PUBLISHER_QOS_DEFAULT);
ASSERT_NE(nullptr, publisher);

// Create Topic
Topic* topic = participant->create_topic(TEST_TOPIC_NAME, type_support.get_type_name(), TOPIC_QOS_DEFAULT);
ASSERT_NE(nullptr, topic);

// Set QoS
DataSharingQosPolicy dsp;
dsp.off();

DataWriterQos dw_qos;
DataReaderQos dr_qos;
dw_qos.data_sharing(dsp);
dr_qos.data_sharing(dsp);

// Create DataWriter
DataWriter* writer = publisher->create_datawriter(topic, dw_qos);
ASSERT_NE(nullptr, writer);

std::mutex mtx;
std::condition_variable cv;
bool should_finish = false;

auto thread_run = [subscriber, topic, &mtx, &cv, &should_finish, &dr_qos]()
{
// Create reader
DataReader* reader = subscriber->create_datareader(topic, dr_qos);
ASSERT_NE(nullptr, reader);

// Wait for test completion request
std::unique_lock<std::mutex> lock(mtx);
cv.wait(lock, [&should_finish]()
{
return should_finish;
});

ASSERT_EQ(ReturnCode_t::RETCODE_OK, subscriber->delete_datareader(reader));
};

{
std::vector<std::thread> threads;
for (size_t i = 0; i < 10; ++i)
{
threads.push_back(std::thread(thread_run));
}

{
std::lock_guard<std::mutex> guard(mtx);
should_finish = true;
cv.notify_all();
}

for (std::thread& thr : threads)
{
thr.join();
}
}

ASSERT_EQ(ReturnCode_t::RETCODE_OK, publisher->delete_datawriter(writer));
ASSERT_EQ(ReturnCode_t::RETCODE_OK, participant->delete_publisher(publisher));
ASSERT_EQ(ReturnCode_t::RETCODE_OK, participant->delete_subscriber(subscriber));
ASSERT_EQ(ReturnCode_t::RETCODE_OK, participant->delete_topic(topic));
ASSERT_EQ(ReturnCode_t::RETCODE_OK, factory->delete_participant(participant));
}
>>>>>>> de5cd9c2e (Fix total_unread_ consistent with reader's history after get_first_untaken_info (#3203))
} // namespace dds
} // namespace fastdds
} // namespace eprosima
67 changes: 67 additions & 0 deletions test/blackbox/common/DDSBlackboxTestsDataReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,73 @@ TEST_P(DDSDataReader, LivelinessChangedStatusGet)

}

// Regression test of Refs #16608, Github #3203. Checks that total_unread_ variable is consistent with
// unread changes in reader's history after performing a get_first_untaken_info() on a change with no writer matched.
TEST_P(DDSDataReader, ConsistentTotalUnreadAfterGetFirstUntakenInfo)
{
if (enable_datasharing)
{
//! TODO: Datasharing changes the behavior of this test. Changes are
//! instantly removed on removePublisher() call and on the PUBListener callback
GTEST_SKIP() << "Data-sharing removes the changes instantly changing the behavior of this test. Skipping";
}

//! Spawn a couple of participants writer/reader
auto pubsub_writer = std::make_shared<PubSubWriter<HelloWorldPubSubType>>(TEST_TOPIC_NAME);
//! Create a reader that does nothing when new data is available. Neither take nor read it.
auto pubsub_reader = std::make_shared<PubSubReader<HelloWorldPubSubType>>(TEST_TOPIC_NAME, false, false, false);

// Initialization of all the participants
std::cout << "Initializing PubSubs for topic " << TEST_TOPIC_NAME << std::endl;

//! Participant Writer configuration and qos
pubsub_writer->reliability(eprosima::fastdds::dds::ReliabilityQosPolicyKind::RELIABLE_RELIABILITY_QOS);
pubsub_writer->durability_kind(eprosima::fastdds::dds::DurabilityQosPolicyKind::TRANSIENT_LOCAL_DURABILITY_QOS);
pubsub_writer->history_kind(eprosima::fastdds::dds::HistoryQosPolicyKind::KEEP_ALL_HISTORY_QOS);
pubsub_writer->init();
ASSERT_EQ(pubsub_writer->isInitialized(), true);

//! Participant Reader configuration and qos
pubsub_reader->reliability(eprosima::fastdds::dds::ReliabilityQosPolicyKind::RELIABLE_RELIABILITY_QOS);
pubsub_reader->durability_kind(eprosima::fastdds::dds::DurabilityQosPolicyKind::TRANSIENT_LOCAL_DURABILITY_QOS);
pubsub_reader->history_kind(eprosima::fastdds::dds::HistoryQosPolicyKind::KEEP_ALL_HISTORY_QOS);
pubsub_reader->init();
ASSERT_EQ(pubsub_reader->isInitialized(), true);

// Wait for discovery.
pubsub_reader->wait_discovery();
pubsub_writer->wait_discovery();

auto data = default_helloworld_data_generator();

pubsub_reader->startReception(data);

pubsub_writer->send(data);
EXPECT_TRUE(data.empty());

pubsub_reader->block_for_unread_count_of(3);
pubsub_writer->removePublisher();
pubsub_reader->wait_writer_undiscovery();

eprosima::fastdds::dds::DataReader& reader = pubsub_reader->get_native_reader();
eprosima::fastdds::dds::SampleInfo info;

//! Try reading the first untaken info.
//! Checks whether total_unread_ is consistent with
//! the number of unread changes in history
//! This API call should NOT modify the history
reader.get_first_untaken_info(&info);

HelloWorld msg;
eprosima::fastdds::dds::SampleInfo sinfo;

//! Try getting a sample
auto result = reader.take_next_sample((void*)&msg, &sinfo);

//! Assert last operation
ASSERT_EQ(result, ReturnCode_t::RETCODE_OK) << "Reader's unread count is: " << reader.get_unread_count();
}

#ifdef INSTANTIATE_TEST_SUITE_P
#define GTEST_INSTANTIATE_TEST_MACRO(x, y, z, w) INSTANTIATE_TEST_SUITE_P(x, y, z, w)
#else
Expand Down

0 comments on commit 242ff1c

Please sign in to comment.