Skip to content

Commit

Permalink
Improve performance on intraprocess + data-sharing (#3743)
Browse files Browse the repository at this point in the history
* Refs #19247. Some datasharing blackbox tests using intraprocess.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs #19247. Enable datasharing reception on intraprocess.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs #19247. Avoid acknacks on intraprocess.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs #19247. Small refactor on send_ack_if_datasharing.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

---------

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>
  • Loading branch information
MiguelCompany authored Jul 27, 2023
1 parent a7697c7 commit c3b4de3
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 14 deletions.
23 changes: 15 additions & 8 deletions src/cpp/rtps/reader/StatefulReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,18 +43,12 @@

using namespace eprosima::fastrtps::rtps;

static void send_ack_if_datasharing(
static void send_datasharing_ack(
StatefulReader* reader,
ReaderHistory* history,
WriterProxy* writer,
const SequenceNumber_t& sequence_number)
{
// If not datasharing, we are done
if (!writer || !writer->is_datasharing_writer())
{
return;
}

// This may not be the change read with highest SN,
// need to find largest SN to ACK
for (std::vector<CacheChange_t*>::iterator it = history->changesBegin(); it != history->changesEnd(); ++it)
Expand All @@ -80,6 +74,19 @@ static void send_ack_if_datasharing(
reader->send_acknack(writer, sns, writer, false);
}

static inline void send_ack_if_datasharing(
StatefulReader* reader,
ReaderHistory* history,
WriterProxy* writer,
const SequenceNumber_t& sequence_number)
{
// Shall be datasharing, and not on same process
if (writer && writer->is_datasharing_writer() && !writer->is_on_same_process())
{
send_datasharing_ack(reader, history, writer, sequence_number);
}
}

StatefulReader::~StatefulReader()
{
EPROSIMA_LOG_INFO(RTPS_READER, "StatefulReader destructor.");
Expand Down Expand Up @@ -196,7 +203,7 @@ bool StatefulReader::matched_writer_add(

listener = mp_listener;
bool is_same_process = RTPSDomainImpl::should_intraprocess_between(m_guid, wdata.guid());
bool is_datasharing = !is_same_process && is_datasharing_compatible_with(wdata);
bool is_datasharing = is_datasharing_compatible_with(wdata);

for (WriterProxy* it : matched_writers_)
{
Expand Down
2 changes: 1 addition & 1 deletion src/cpp/rtps/reader/StatelessReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ bool StatelessReader::matched_writer_add(
}

bool is_same_process = RTPSDomainImpl::should_intraprocess_between(m_guid, wdata.guid());
bool is_datasharing = !is_same_process && is_datasharing_compatible_with(wdata);
bool is_datasharing = is_datasharing_compatible_with(wdata);

RemoteWriterInfo_t info;
info.guid = wdata.guid();
Expand Down
51 changes: 46 additions & 5 deletions test/blackbox/common/DDSBlackboxTestsDataSharing.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,33 @@ bool check_shared_file (
return result;
}

TEST(DDSDataSharing, BasicCommunication)
class DDSDataSharing : public testing::TestWithParam<bool>
{
public:

void SetUp() override
{
if (GetParam())
{
LibrarySettingsAttributes library_settings;
library_settings.intraprocess_delivery = IntraprocessDeliveryType::INTRAPROCESS_FULL;
xmlparser::XMLProfileManager::library_settings(library_settings);
}
}

void TearDown() override
{
if (GetParam())
{
LibrarySettingsAttributes library_settings;
library_settings.intraprocess_delivery = IntraprocessDeliveryType::INTRAPROCESS_OFF;
xmlparser::XMLProfileManager::library_settings(library_settings);
}
}

};

TEST_P(DDSDataSharing, BasicCommunication)
{
PubSubReader<FixedSizedPubSubType> reader(TEST_TOPIC_NAME);
PubSubWriter<FixedSizedPubSubType> writer(TEST_TOPIC_NAME);
Expand Down Expand Up @@ -162,7 +188,7 @@ TEST(DDSDataSharing, TransientReader)
}


TEST(DDSDataSharing, BestEffortDirtyPayloads)
TEST_P(DDSDataSharing, BestEffortDirtyPayloads)
{
// The writer's pool is smaller than the reader history.
// The number of samples is larger than the pool size, so some payloads get reused
Expand Down Expand Up @@ -224,7 +250,7 @@ TEST(DDSDataSharing, BestEffortDirtyPayloads)
ASSERT_FALSE(check_shared_file(".", writer.datawriter_guid()));
}

TEST(DDSDataSharing, ReliableDirtyPayloads)
TEST_P(DDSDataSharing, ReliableDirtyPayloads)
{
// The writer's pool is smaller than the reader history.
// The number of samples is larger than the pool size, so some payloads get rused
Expand Down Expand Up @@ -604,7 +630,7 @@ TEST(DDSDataSharing, DataSharingReader_CommonDomainWriters)
}


TEST(DDSDataSharing, DataSharingPoolError)
TEST_P(DDSDataSharing, DataSharingPoolError)
{
PubSubWriter<Data1mbPubSubType> writer_datasharing(TEST_TOPIC_NAME);
PubSubWriter<Data1mbPubSubType> writer_auto(TEST_TOPIC_NAME);
Expand Down Expand Up @@ -654,7 +680,7 @@ TEST(DDSDataSharing, DataSharingPoolError)
}


TEST(DDSDataSharing, DataSharingDefaultDirectory)
TEST_P(DDSDataSharing, DataSharingDefaultDirectory)
{
// Since the default directory heavily depends on the system,
// we are not checking the creation of the files in this case,
Expand Down Expand Up @@ -907,3 +933,18 @@ TEST(DDSDataSharing, acknack_reception_when_get_unread_count)
ASSERT_FALSE(check_shared_file(".", reader.datareader_guid()));
ASSERT_FALSE(check_shared_file(".", writer.datawriter_guid()));
}

#ifdef INSTANTIATE_TEST_SUITE_P
#define GTEST_INSTANTIATE_TEST_MACRO(x, y, z, w) INSTANTIATE_TEST_SUITE_P(x, y, z, w)
#else
#define GTEST_INSTANTIATE_TEST_MACRO(x, y, z, w) INSTANTIATE_TEST_CASE_P(x, y, z, w)
#endif // ifdef INSTANTIATE_TEST_SUITE_P

GTEST_INSTANTIATE_TEST_MACRO(DDSDataSharing,
DDSDataSharing,
testing::Values(false, true),
[](const testing::TestParamInfo<DDSDataSharing::ParamType>& info)
{
bool intraprocess = info.param;
return intraprocess ? "Intraprocess_and_datasharing" : "Datasharing_only";
});

0 comments on commit c3b4de3

Please sign in to comment.