Skip to content

Commit

Permalink
Add a keyed fragmented change to the reader data instance only when i…
Browse files Browse the repository at this point in the history
…ts completed (#4261)

* Refs #20257: Add regression test

Signed-off-by: Mario Dominguez <mariodominguez@eprosima.com>

* #Refs #20257: Fix

Signed-off-by: Mario Dominguez <mariodominguez@eprosima.com>

* Refs #20239: Second rev suggestions

Signed-off-by: Mario Dominguez <mariodominguez@eprosima.com>

* Refs #20257: Linter

Signed-off-by: Mario Dominguez <mariodominguez@eprosima.com>

* Refs #20257: Retrieve instance handle condition before for avoid being nullptr

Signed-off-by: Mario Dominguez <mariodominguez@eprosima.com>

---------

Signed-off-by: Mario Dominguez <mariodominguez@eprosima.com>
  • Loading branch information
Mario-DL authored Jan 27, 2024
1 parent 06c8695 commit 9558ce4
Show file tree
Hide file tree
Showing 6 changed files with 105 additions and 19 deletions.
35 changes: 16 additions & 19 deletions src/cpp/fastdds/subscriber/history/DataReaderHistory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -132,14 +132,14 @@ DataReaderHistory::DataReaderHistory(
compute_key_for_change_fn_ =
[this](CacheChange_t* a_change)
{
if (a_change->instanceHandle.isDefined())
if (!a_change->is_fully_assembled())
{
return true;
return false;
}

if (!a_change->is_fully_assembled())
if (a_change->instanceHandle.isDefined())
{
return false;
return true;
}

if (type_ != nullptr)
Expand Down Expand Up @@ -736,27 +736,24 @@ bool DataReaderHistory::completed_change(
size_t unknown_missing_changes_up_to,
SampleRejectedStatusKind& rejection_reason)
{
bool ret_value = true;
rejection_reason = NOT_REJECTED;
bool ret_value = false;
rejection_reason = REJECTED_BY_INSTANCES_LIMIT;

if (!change->instanceHandle.isDefined())
if (compute_key_for_change_fn_(change))
{
ret_value = false;
if (compute_key_for_change_fn_(change))
InstanceCollection::iterator vit;
if (find_key(change->instanceHandle, vit))
{
InstanceCollection::iterator vit;
if (find_key(change->instanceHandle, vit))
{
ret_value = !change->instanceHandle.isDefined() ||
complete_fn_(change, *vit->second, unknown_missing_changes_up_to, rejection_reason);
}
else
{
rejection_reason = REJECTED_BY_INSTANCES_LIMIT;
}
ret_value = !change->instanceHandle.isDefined() ||
complete_fn_(change, *vit->second, unknown_missing_changes_up_to, rejection_reason);
}
}

if (ret_value)
{
rejection_reason = NOT_REJECTED;
}

return ret_value;
}

Expand Down
7 changes: 7 additions & 0 deletions src/cpp/rtps/reader/StatefulReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -723,6 +723,7 @@ bool StatefulReader::processDataFragMsg(
{
work_change->copy_not_memcpy(change_to_add);
work_change->serializedPayload.length = sampleSize;
work_change->instanceHandle.clear();
work_change->setFragmentSize(change_to_add->getFragmentSize(), true);
change_created = work_change;
}
Expand All @@ -731,6 +732,12 @@ bool StatefulReader::processDataFragMsg(

if (work_change != nullptr)
{
// Set the instanceHandle only when fragment number 1 is received
if (!work_change->instanceHandle.isDefined() && fragmentStartingNum == 1)
{
work_change->instanceHandle = change_to_add->instanceHandle;
}

work_change->add_fragments(change_to_add->serializedPayload, fragmentStartingNum,
fragmentsInSubmessage);
}
Expand Down
8 changes: 8 additions & 0 deletions src/cpp/rtps/reader/StatelessReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -714,6 +714,7 @@ bool StatelessReader::processDataFragMsg(
// Sample fits inside pending change. Reuse it.
work_change->copy_not_memcpy(change_to_add);
work_change->serializedPayload.length = sampleSize;
work_change->instanceHandle.clear();
work_change->setFragmentSize(change_to_add->getFragmentSize(), true);
}
else
Expand All @@ -739,6 +740,7 @@ bool StatelessReader::processDataFragMsg(
{
work_change->copy_not_memcpy(change_to_add);
work_change->serializedPayload.length = sampleSize;
work_change->instanceHandle.clear();
work_change->setFragmentSize(change_to_add->getFragmentSize(), true);
}
}
Expand All @@ -748,6 +750,12 @@ bool StatelessReader::processDataFragMsg(
CacheChange_t* change_completed = nullptr;
if (work_change != nullptr)
{
// Set the instanceHandle only when fragment number 1 is received
if (!work_change->instanceHandle.isDefined() && fragmentStartingNum == 1)
{
work_change->instanceHandle = change_to_add->instanceHandle;
}

if (work_change->add_fragments(change_to_add->serializedPayload, fragmentStartingNum,
fragmentsInSubmessage))
{
Expand Down
7 changes: 7 additions & 0 deletions test/blackbox/api/dds-pim/PubSubReader.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -1028,6 +1028,13 @@ class PubSubReader
return *this;
}

PubSubReader& expect_inline_qos(
bool expect)
{
datareader_qos_.expects_inline_qos(expect);
return *this;
}

PubSubReader& heartbeatResponseDelay(
const int32_t secs,
const int32_t frac)
Expand Down
7 changes: 7 additions & 0 deletions test/blackbox/api/fastrtps_deprecated/PubSubReader.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -794,6 +794,13 @@ class PubSubReader
return *this;
}

PubSubReader& expect_inline_qos(
bool expect)
{
subscriber_attr_.expectsInlineQos = expect;
return *this;
}

PubSubReader& heartbeatResponseDelay(
const int32_t secs,
const int32_t frac)
Expand Down
60 changes: 60 additions & 0 deletions test/blackbox/common/BlackboxTestsPubSubFragments.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -649,6 +649,66 @@ TEST_P(PubSubFragmentsLimited, AsyncPubSubAsReliableKeyedData300kbKeepLast1InLos
testTransport->dropLogLength);
}

// Regression test for 20257
// When a non existing change is removed, the change is also removed from the data instance changes sequence
TEST(PubSubFragmentsLimited,
AsyncPubSubAsReliableKeyedData300kbKeepLast1LoosyConditionsSmallFragmentsCorrectlyBehavesWhenInlineQoSAreForced)
{
PubSubReader<KeyedData1mbPubSubType> reader(TEST_TOPIC_NAME);
PubSubWriter<KeyedData1mbPubSubType> writer(TEST_TOPIC_NAME);

reader.history_depth(2)
.expect_inline_qos(true)
.reliability(eprosima::fastrtps::RELIABLE_RELIABILITY_QOS)
.init();

ASSERT_TRUE(reader.isInitialized());

// To simulate lossy conditions, we are going to remove the default
// builtin transport, and instead use a lossy shim layer variant.
auto testTransport = std::make_shared<test_UDPv4TransportDescriptor>();
testTransport->maxMessageSize = 1024;
// We drop 20% of all data frags
testTransport->dropDataFragMessagesPercentage = 20;
testTransport->dropLogLength = 1;
writer.disable_builtin_transport();
writer.add_user_transport_to_pparams(testTransport);

// When doing fragmentation, it is necessary to have some degree of
// flow control not to overrun the receive buffer.
uint32_t bytesPerPeriod = 153601;
uint32_t periodInMs = 100;
writer.add_throughput_controller_descriptor_to_pparams(
eprosima::fastdds::rtps::FlowControllerSchedulerPolicy::HIGH_PRIORITY, bytesPerPeriod, periodInMs)
.heartbeat_period_seconds(0)
.heartbeat_period_nanosec(1000000)
.history_depth(1)
.asynchronously(eprosima::fastrtps::ASYNCHRONOUS_PUBLISH_MODE).init();

ASSERT_TRUE(writer.isInitialized());

// Because its volatile the durability
// Wait for discovery.
writer.wait_discovery();
reader.wait_discovery();

auto data = default_keyeddata300kb_data_generator(5);

reader.startReception(data);

// Send data
writer.send(data, 100);
// In this test all data should be sent.
ASSERT_TRUE(data.empty());
// Block reader until reception finished or timeout.
reader.block_for_seq({ 0, 5 });

// Sanity check. Make sure we have dropped a few packets
ASSERT_EQ(
test_UDPv4Transport::test_UDPv4Transport_DropLog.size(),
testTransport->dropLogLength);
}

TEST_P(PubSubFragmentsLimited, AsyncPubSubAsReliableVolatileData300kbInLossyConditionsSmallFragments)
{
PubSubReader<Data1mbPubSubType> reader(TEST_TOPIC_NAME);
Expand Down

0 comments on commit 9558ce4

Please sign in to comment.