Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implementation of unique locators request feature for readers [10497] #1768

105 changes: 85 additions & 20 deletions src/cpp/rtps/participant/RTPSParticipantImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,26 @@ static bool should_be_intraprocess_only(
(ParticipantFilteringFlags::FILTER_DIFFERENT_HOST | ParticipantFilteringFlags::FILTER_DIFFERENT_PROCESS);
}

static bool get_unique_flows_parameters(
const RTPSParticipantAttributes& part_att,
const EndpointAttributes& att,
bool& unique_flows,
uint16_t& initial_port,
uint16_t& final_port)
{
const std::string* value = PropertyPolicyHelper::find_property(att.properties, "fastdds.unique_network_flows");

unique_flows = (nullptr != value);
if (unique_flows)
{
// TODO (Miguel C): parse value to get port range
final_port = part_att.port.portBase;
initial_port = part_att.port.portBase - 400;
}

return true;
}

Locator_t& RTPSParticipantImpl::applyLocatorAdaptRule(
Locator_t& loc)
{
Expand Down Expand Up @@ -675,13 +695,6 @@ bool RTPSParticipantImpl::create_reader(
return false;
}

// Check for unique_network_flows feature
if (nullptr != PropertyPolicyHelper::find_property(param.endpoint.properties, "fastdds.unique_network_flows"))
{
logError(RTPS_PARTICIPANT, "Unique network flows not supported on readers");
return false;
}

// Special case for DiscoveryProtocol::BACKUP, which abuses persistence guid
GUID_t former_persistence_guid = param.endpoint.persistence_guid;
if (param.endpoint.persistence_guid == c_Guid_Unknown)
Expand All @@ -702,6 +715,15 @@ bool RTPSParticipantImpl::create_reader(
return false;
}

// Check for unique_network_flows feature
bool request_unique_flows = false;
uint16_t initial_port = 0;
uint16_t final_port = 0;
if (!get_unique_flows_parameters(m_att, param.endpoint, request_unique_flows, initial_port, final_port))
{
return false;
}

normalize_endpoint_locators(param.endpoint);

RTPSReader* SReader = nullptr;
Expand Down Expand Up @@ -750,7 +772,7 @@ bool RTPSParticipantImpl::create_reader(

if (enable)
{
if (!createAndAssociateReceiverswithEndpoint(SReader))
if (!createAndAssociateReceiverswithEndpoint(SReader, request_unique_flows, initial_port, final_port))
{
delete(SReader);
return false;
Expand Down Expand Up @@ -1091,26 +1113,64 @@ bool RTPSParticipantImpl::assignEndpointListenResources(
}

bool RTPSParticipantImpl::createAndAssociateReceiverswithEndpoint(
Endpoint* pend)
Endpoint* pend,
bool unique_flows,
uint16_t initial_unique_port,
uint16_t final_unique_port)
{
/* This function...
- Asks the network factory for new resources
- Encapsulates the new resources within the ReceiverControlBlock list
- Associated the endpoint to the new elements in the list
- Launches the listener thread
*/
// 1 - Ask the network factory to generate the elements that do still not exist
std::vector<ReceiverResource> newItems; //Store the newly created elements
std::vector<ReceiverResource> newItemsBuffer; //Store intermediate results
//Iterate through the list of unicast and multicast locators the endpoint has... unless its empty
//In that case, just use the standard
if (pend->getAttributes().unicastLocatorList.empty() && pend->getAttributes().multicastLocatorList.empty())
{
//Default unicast

if (unique_flows)
{
pend->getAttributes().multicastLocatorList.clear();
pend->getAttributes().unicastLocatorList = m_att.defaultUnicastLocatorList;

uint16_t port = initial_unique_port;
while (port < final_unique_port)
{
// Set port on unicast locators
for (Locator_t& loc : pend->getAttributes().unicastLocatorList)
{
loc.port = port;
}

// Try creating receiver resources
if (createReceiverResources(pend->getAttributes().unicastLocatorList, false, true))
{
break;
}

// Try with next port
++port;
}

// Fail when unique ports are exhausted
if (port >= final_unique_port)
{
logError(RTPS_PARTICIPANT, "Unique flows requested but exhausted. Port range: "
<< initial_unique_port << "-" << final_unique_port);
return false;
}
}
else
{
// 1 - Ask the network factory to generate the elements that do still not exist
//Iterate through the list of unicast and multicast locators the endpoint has... unless its empty
//In that case, just use the standard
if (pend->getAttributes().unicastLocatorList.empty() && pend->getAttributes().multicastLocatorList.empty())
{
// Take default locators from the participant.
pend->getAttributes().unicastLocatorList = m_att.defaultUnicastLocatorList;
pend->getAttributes().multicastLocatorList = m_att.defaultMulticastLocatorList;
}
createReceiverResources(pend->getAttributes().unicastLocatorList, false, true);
createReceiverResources(pend->getAttributes().multicastLocatorList, false, true);
}
createReceiverResources(pend->getAttributes().unicastLocatorList, false, true);
createReceiverResources(pend->getAttributes().multicastLocatorList, false, true);

// Associate the Endpoint with ReceiverControlBlock
assignEndpointListenResources(pend);
Expand Down Expand Up @@ -1177,12 +1237,13 @@ bool RTPSParticipantImpl::createSendResources(
return true;
}

void RTPSParticipantImpl::createReceiverResources(
bool RTPSParticipantImpl::createReceiverResources(
LocatorList_t& Locator_list,
bool ApplyMutation,
bool RegisterReceiver)
{
std::vector<std::shared_ptr<ReceiverResource>> newItemsBuffer;
bool ret_val = Locator_list.empty();

#if HAVE_SECURITY
// An auxilary buffer is needed in the ReceiverResource to to decrypt the message,
Expand All @@ -1207,6 +1268,8 @@ void RTPSParticipantImpl::createReceiverResources(
}
}

ret_val |= !newItemsBuffer.empty();

for (auto it_buffer = newItemsBuffer.begin(); it_buffer != newItemsBuffer.end(); ++it_buffer)
{
std::lock_guard<std::mutex> lock(m_receiverResourcelistMutex);
Expand All @@ -1223,6 +1286,8 @@ void RTPSParticipantImpl::createReceiverResources(
}
newItemsBuffer.clear();
}

return ret_val;
}

void RTPSParticipantImpl::createSenderResources(
Expand Down
12 changes: 9 additions & 3 deletions src/cpp/rtps/participant/RTPSParticipantImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -562,10 +562,16 @@ class RTPSParticipantImpl

/** Create the new ReceiverResources needed for a new Locator, contains the calls to assignEndpointListenResources
and consequently assignEndpoint2LocatorList
@param pend - Pointer to the endpoint which triggered the creation of the Receivers
@param pend - Pointer to the endpoint which triggered the creation of the Receivers.
@param unique_flows - Whether unique listening ports should be created for this endpoint.
@param initial_unique_port - First unique listening port to try.
@param final_unique_port - Unique listening port that will not be tried.
*/
bool createAndAssociateReceiverswithEndpoint(
Endpoint* pend);
Endpoint* pend,
bool unique_flows = false,
uint16_t initial_unique_port = 0,
uint16_t final_unique_port = 0);

/** Create non-existent SendResources based on the Locator list of the entity
@param pend - Pointer to the endpoint whose SenderResources are to be created
Expand Down Expand Up @@ -859,7 +865,7 @@ class RTPSParticipantImpl
* @param ApplyMutation - True if we want to create a Resource with a "similar" locator if the one we provide is unavailable
* @param RegisterReceiver - True if we want the receiver to be registered. Useful for receivers created after participant is enabled.
*/
void createReceiverResources(
bool createReceiverResources(
LocatorList_t& Locator_list,
bool ApplyMutation,
bool RegisterReceiver);
Expand Down
62 changes: 59 additions & 3 deletions test/blackbox/api/dds-pim/PubSubParticipant.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,13 @@
template<class TypeSupport>
class PubSubParticipant
{
public:

typedef TypeSupport type_support;
typedef typename type_support::type type;

private:

class PubListener : public eprosima::fastdds::dds::DataWriterListener
{
friend class PubSubParticipant;
Expand Down Expand Up @@ -117,6 +124,18 @@ class PubSubParticipant

}

void on_data_available(
eprosima::fastdds::dds::DataReader* reader) override
{
type data;
eprosima::fastdds::dds::SampleInfo info;

while (ReturnCode_t::RETCODE_OK == reader->take_next_sample(&data, &info))
{
participant_->data_received();
}
}

private:

SubListener& operator =(
Expand All @@ -127,9 +146,6 @@ class PubSubParticipant

public:

typedef TypeSupport type_support;
typedef typename type_support::type type;

PubSubParticipant(
unsigned int num_publishers,
unsigned int num_subscribers,
Expand Down Expand Up @@ -310,6 +326,18 @@ class PubSubParticipant
return false;
}

eprosima::fastdds::dds::DataWriter& get_native_writer(
unsigned int index)
{
return *(std::get<2>(publishers_[index]));
}

eprosima::fastdds::dds::DataReader& get_native_reader(
unsigned int index)
{
return *(std::get<2>(subscribers_[index]));
}

bool send_sample(
type& msg,
unsigned int index = 0)
Expand Down Expand Up @@ -467,6 +495,20 @@ class PubSubParticipant
return *this;
}

PubSubParticipant& pub_property_policy(
const eprosima::fastrtps::rtps::PropertyPolicy property_policy)
{
datawriter_qos_.properties() = property_policy;
return *this;
}

PubSubParticipant& sub_property_policy(
const eprosima::fastrtps::rtps::PropertyPolicy property_policy)
{
datareader_qos_.properties() = property_policy;
return *this;
}

PubSubParticipant& pub_topic_name(
std::string topicName)
{
Expand Down Expand Up @@ -585,6 +627,13 @@ class PubSubParticipant
sub_liveliness_cv_.notify_one();
}

void data_received()
{
std::unique_lock<std::mutex> lock(sub_data_mutex_);
sub_times_data_received_++;
sub_data_cv_.notify_one();
}

unsigned int pub_times_liveliness_lost()
{
std::unique_lock<std::mutex> lock(pub_liveliness_mutex_);
Expand Down Expand Up @@ -691,6 +740,13 @@ class PubSubParticipant
//! A condition variable for liveliness of publisher
std::condition_variable pub_liveliness_cv_;

//! A mutex protecting received data
std::mutex sub_data_mutex_;
//! A condition variable for received data
std::condition_variable sub_data_cv_;
//! Number of times a subscriber received data
size_t sub_times_data_received_ = 0;

eprosima::fastdds::dds::TypeSupport type_;
};

Expand Down
26 changes: 26 additions & 0 deletions test/blackbox/api/fastrtps_deprecated/PubSubParticipant.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,18 @@ class PubSubParticipant
return false;
}

eprosima::fastrtps::Publisher& get_native_writer(
unsigned int index)
{
return *(publishers_[index]);
}

eprosima::fastrtps::Subscriber& get_native_reader(
unsigned int index)
{
return *(subscribers_[index]);
}

bool send_sample(
type& msg,
unsigned int index = 0)
Expand Down Expand Up @@ -395,6 +407,20 @@ class PubSubParticipant
return *this;
}

PubSubParticipant& pub_property_policy(
const eprosima::fastrtps::rtps::PropertyPolicy property_policy)
{
publisher_attr_.properties = property_policy;
return *this;
}

PubSubParticipant& sub_property_policy(
const eprosima::fastrtps::rtps::PropertyPolicy property_policy)
{
subscriber_attr_.properties = property_policy;
return *this;
}

PubSubParticipant& pub_topic_name(
std::string topicName)
{
Expand Down
Loading