Skip to content

Commit

Permalink
Revert "Fix total_unread_ consistent with reader's history after get_…
Browse files Browse the repository at this point in the history
…first_untaken_info (#3203)"

This reverts commit de5cd9c.

Signed-off-by: Tomoya Fujita <Tomoya.Fujita@sony.com>
  • Loading branch information
fujitatomoya committed Jan 17, 2023
1 parent 6d0c397 commit 28072ba
Show file tree
Hide file tree
Showing 5 changed files with 15 additions and 102 deletions.
15 changes: 12 additions & 3 deletions src/cpp/fastdds/subscriber/history/DataReaderHistory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -353,12 +353,21 @@ bool DataReaderHistory::get_first_untaken_info(
{
std::lock_guard<RecursiveTimedMutex> lock(*getMutex());

if (!data_available_instances_.empty())
CacheChange_t* change = nullptr;
WriterProxy* wp = nullptr;
if (mp_reader->nextUntakenCache(&change, &wp))
{
auto it = data_available_instances_.begin();
auto it = data_available_instances_.find(change->instanceHandle);
assert(it != data_available_instances_.end());
auto& instance_changes = it->second->cache_changes;
auto item = instance_changes.cbegin();
auto item =
std::find_if(instance_changes.cbegin(), instance_changes.cend(),
[change](const DataReaderCacheChange& v)
{
return v == change;
});
ReadTakeCommand::generate_info(info, *(it->second), *item);
mp_reader->change_read_by_user(change, wp, false);
return true;
}

Expand Down
29 changes: 2 additions & 27 deletions test/blackbox/api/dds-pim/PubSubReader.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -278,8 +278,7 @@ class PubSubReader
PubSubReader(
const std::string& topic_name,
bool take = true,
bool statistics = false,
bool read = true)
bool statistics = false)
: participant_listener_(*this)
, listener_(*this)
, participant_(nullptr)
Expand All @@ -294,12 +293,10 @@ class PubSubReader
, receiving_(false)
, current_processed_count_(0)
, number_samples_expected_(0)
, current_unread_count_(0)
, discovery_result_(false)
, onDiscovery_(nullptr)
, onEndpointDiscovery_(nullptr)
, take_(take)
, read_(read)
, statistics_(statistics)
#if HAVE_SECURITY
, authorized_(0)
Expand Down Expand Up @@ -553,16 +550,6 @@ class PubSubReader
return current_processed_count_;
}

size_t block_for_unread_count_of(
size_t n_unread)
{
block([this, n_unread]() -> bool
{
return current_unread_count_ >= n_unread;
});
return current_unread_count_;
}

void block(
std::function<bool()> checker)
{
Expand Down Expand Up @@ -1690,14 +1677,6 @@ class PubSubReader
type data;
eprosima::fastdds::dds::SampleInfo info;

if (!take_ && !read_)
{
current_unread_count_ = datareader->get_unread_count();
std::cout << "Total unread count " << current_unread_count_ << std::endl;
cv_.notify_one();
return;
}

ReturnCode_t success = take_ ?
datareader->take_next_sample((void*)&data, &info) :
datareader->read_next_sample((void*)&data, &info);
Expand Down Expand Up @@ -1866,7 +1845,6 @@ class PubSubReader
std::map<LastSeqInfo, eprosima::fastrtps::rtps::SequenceNumber_t> last_seq;
std::atomic<size_t> current_processed_count_;
std::atomic<size_t> number_samples_expected_;
std::atomic<size_t> current_unread_count_;
bool discovery_result_;

std::string xml_file_ = "";
Expand All @@ -1876,12 +1854,9 @@ class PubSubReader
std::function<bool(const eprosima::fastrtps::rtps::ParticipantDiscoveryInfo& info)> onDiscovery_;
std::function<bool(const eprosima::fastrtps::rtps::WriterDiscoveryInfo& info)> onEndpointDiscovery_;

//! True to take data from history. On False, read_ is checked.
//! True to take data from history. False to read
bool take_;

//! True to read data from history. False, do nothing on data reception.
bool read_;

//! True if the class is called from the statistics blackbox (specific topic name and domain id).
bool statistics_;

Expand Down
1 change: 0 additions & 1 deletion test/blackbox/api/dds-pim/PubSubWriter.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -507,7 +507,6 @@ class PubSubWriter
bool send_sample(
type& msg)
{
default_send_print(msg);
return datawriter_->write((void*)&msg);
}

Expand Down
5 changes: 1 addition & 4 deletions test/blackbox/common/DDSBlackboxTestsBasic.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
#include <condition_variable>
#include <mutex>
#include <thread>
#include <future>

#include <gtest/gtest.h>

Expand All @@ -35,12 +34,9 @@
#include <fastdds/dds/topic/Topic.hpp>
#include <fastdds/dds/topic/TypeSupport.hpp>
#include <fastrtps/types/TypesBase.h>
#include <fastrtps/xmlparser/XMLProfileManager.h>

#include "BlackboxTests.hpp"
#include "../types/HelloWorldPubSubTypes.h"
#include "PubSubReader.hpp"
#include "PubSubWriter.hpp"

namespace eprosima {
namespace fastdds {
Expand Down Expand Up @@ -274,6 +270,7 @@ TEST(DDSBasic, MultithreadedReaderCreationDoesNotDeadlock)
ASSERT_EQ(ReturnCode_t::RETCODE_OK, participant->delete_topic(topic));
ASSERT_EQ(ReturnCode_t::RETCODE_OK, factory->delete_participant(participant));
}

} // namespace dds
} // namespace fastdds
} // namespace eprosima
67 changes: 0 additions & 67 deletions test/blackbox/common/DDSBlackboxTestsDataReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -159,73 +159,6 @@ TEST_P(DDSDataReader, LivelinessChangedStatusGet)

}

// Regression test of Refs #16608, Github #3203. Checks that total_unread_ variable is consistent with
// unread changes in reader's history after performing a get_first_untaken_info() on a change with no writer matched.
TEST_P(DDSDataReader, ConsistentTotalUnreadAfterGetFirstUntakenInfo)
{
if (enable_datasharing)
{
//! TODO: Datasharing changes the behavior of this test. Changes are
//! instantly removed on removePublisher() call and on the PUBListener callback
GTEST_SKIP() << "Data-sharing removes the changes instantly changing the behavior of this test. Skipping";
}

//! Spawn a couple of participants writer/reader
auto pubsub_writer = std::make_shared<PubSubWriter<HelloWorldPubSubType>>(TEST_TOPIC_NAME);
//! Create a reader that does nothing when new data is available. Neither take nor read it.
auto pubsub_reader = std::make_shared<PubSubReader<HelloWorldPubSubType>>(TEST_TOPIC_NAME, false, false, false);

// Initialization of all the participants
std::cout << "Initializing PubSubs for topic " << TEST_TOPIC_NAME << std::endl;

//! Participant Writer configuration and qos
pubsub_writer->reliability(eprosima::fastdds::dds::ReliabilityQosPolicyKind::RELIABLE_RELIABILITY_QOS);
pubsub_writer->durability_kind(eprosima::fastdds::dds::DurabilityQosPolicyKind::TRANSIENT_LOCAL_DURABILITY_QOS);
pubsub_writer->history_kind(eprosima::fastdds::dds::HistoryQosPolicyKind::KEEP_ALL_HISTORY_QOS);
pubsub_writer->init();
ASSERT_EQ(pubsub_writer->isInitialized(), true);

//! Participant Reader configuration and qos
pubsub_reader->reliability(eprosima::fastdds::dds::ReliabilityQosPolicyKind::RELIABLE_RELIABILITY_QOS);
pubsub_reader->durability_kind(eprosima::fastdds::dds::DurabilityQosPolicyKind::TRANSIENT_LOCAL_DURABILITY_QOS);
pubsub_reader->history_kind(eprosima::fastdds::dds::HistoryQosPolicyKind::KEEP_ALL_HISTORY_QOS);
pubsub_reader->init();
ASSERT_EQ(pubsub_reader->isInitialized(), true);

// Wait for discovery.
pubsub_reader->wait_discovery();
pubsub_writer->wait_discovery();

auto data = default_helloworld_data_generator();

pubsub_reader->startReception(data);

pubsub_writer->send(data);
EXPECT_TRUE(data.empty());

pubsub_reader->block_for_unread_count_of(3);
pubsub_writer->removePublisher();
pubsub_reader->wait_writer_undiscovery();

eprosima::fastdds::dds::DataReader& reader = pubsub_reader->get_native_reader();
eprosima::fastdds::dds::SampleInfo info;

//! Try reading the first untaken info.
//! Checks whether total_unread_ is consistent with
//! the number of unread changes in history
//! This API call should NOT modify the history
reader.get_first_untaken_info(&info);

HelloWorld msg;
eprosima::fastdds::dds::SampleInfo sinfo;

//! Try getting a sample
auto result = reader.take_next_sample((void*)&msg, &sinfo);

//! Assert last operation
ASSERT_EQ(result, ReturnCode_t::RETCODE_OK) << "Reader's unread count is: " << reader.get_unread_count();
}

#ifdef INSTANTIATE_TEST_SUITE_P
#define GTEST_INSTANTIATE_TEST_MACRO(x, y, z, w) INSTANTIATE_TEST_SUITE_P(x, y, z, w)
#else
Expand Down

0 comments on commit 28072ba

Please sign in to comment.