Skip to content

Commit

Permalink
Redo "Discriminate when the Client has gone from when the Client has …
Browse files Browse the repository at this point in the history
…not completely matched" (#490)

* Revert "Revert "Discriminate when the Client has gone from when the Client has not completely matched (#479)" (#489)"

This reverts commit 18483c2.

* Use PatchedServicePubListener in rmw_fastrtps_dynamic_cpp

Signed-off-by: Jacob Perron <jacob@openrobotics.org>
  • Loading branch information
jacobperron authored Dec 8, 2020
1 parent 18483c2 commit 95c2f6a
Show file tree
Hide file tree
Showing 4 changed files with 169 additions and 50 deletions.
2 changes: 1 addition & 1 deletion rmw_fastrtps_cpp/src/rmw_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ rmw_create_service(
delete info->pub_listener_;
}
});
info->pub_listener_ = new (std::nothrow) ServicePubListener();
info->pub_listener_ = new (std::nothrow) PatchedServicePubListener();
if (!info->pub_listener_) {
RMW_SET_ERROR_MSG("failed to create service response publisher listener");
return nullptr;
Expand Down
2 changes: 1 addition & 1 deletion rmw_fastrtps_dynamic_cpp/src/rmw_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ rmw_create_service(
RMW_SET_ERROR_MSG("failed to get datawriter qos");
goto fail;
}
info->pub_listener_ = new ServicePubListener();
info->pub_listener_ = new PatchedServicePubListener();
info->response_publisher_ =
Domain::createPublisher(participant, publisherParam, info->pub_listener_);
if (!info->response_publisher_) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <list>
#include <mutex>
#include <unordered_set>
#include <unordered_map>

#include "fastcdr/FastBuffer.h"

Expand All @@ -37,6 +38,15 @@

class ServiceListener;
class ServicePubListener;
class PatchedServicePubListener;

enum class client_present_t
{
FAILURE, // an error occurred when checking
MAYBE, // reader not matched, writer still present
YES, // reader matched
GONE // neither reader nor writer
};

typedef struct CustomServiceInfo
{
Expand All @@ -62,6 +72,132 @@ typedef struct CustomServiceRequest
: buffer_(nullptr) {}
} CustomServiceRequest;

class ServicePubListener : public eprosima::fastrtps::PublisherListener
{
public:
ServicePubListener() = default;

template<class Rep, class Period>
bool wait_for_subscription(
const eprosima::fastrtps::rtps::GUID_t & guid,
const std::chrono::duration<Rep, Period> & rel_time)
{
auto guid_is_present = [this, guid]() RCPPUTILS_TSA_REQUIRES(mutex_)->bool
{
return subscriptions_.find(guid) != subscriptions_.end();
};

std::unique_lock<std::mutex> lock(mutex_);
return cv_.wait_for(lock, rel_time, guid_is_present);
}

void onPublicationMatched(
eprosima::fastrtps::Publisher * pub,
eprosima::fastrtps::rtps::MatchingInfo & matchingInfo)
{
(void) pub;
std::lock_guard<std::mutex> lock(mutex_);
if (eprosima::fastrtps::rtps::MATCHED_MATCHING == matchingInfo.status) {
subscriptions_.insert(matchingInfo.remoteEndpointGuid);
} else if (eprosima::fastrtps::rtps::REMOVED_MATCHING == matchingInfo.status) {
subscriptions_.erase(matchingInfo.remoteEndpointGuid);
} else {
return;
}
cv_.notify_all();
}

protected:
std::mutex & getMutex()
{
return mutex_;
}

std::unordered_set<
eprosima::fastrtps::rtps::GUID_t,
rmw_fastrtps_shared_cpp::hash_fastrtps_guid> &
getSubscriptions()
{
return subscriptions_;
}

std::condition_variable & getConditionVariable()
{
return cv_;
}

private:
using subscriptions_set_t =
std::unordered_set<eprosima::fastrtps::rtps::GUID_t,
rmw_fastrtps_shared_cpp::hash_fastrtps_guid>;

std::mutex mutex_;
subscriptions_set_t subscriptions_ RCPPUTILS_TSA_GUARDED_BY(mutex_);
std::condition_variable cv_;
};

// Wrapper around ServicePubListener to fix issue when matching clients in an ABI compatible way
// See original patch: https://github.com/ros2/rmw_fastrtps/pull/467
class PatchedServicePubListener : public ServicePubListener
{
public:
using clients_endpoints_map_t =
std::unordered_map<eprosima::fastrtps::rtps::GUID_t,
eprosima::fastrtps::rtps::GUID_t,
rmw_fastrtps_shared_cpp::hash_fastrtps_guid>;

void onPublicationMatched(
eprosima::fastrtps::Publisher * pub,
eprosima::fastrtps::rtps::MatchingInfo & matchingInfo)
{
(void) pub;
std::lock_guard<std::mutex> lock(getMutex());
std::unordered_set<
eprosima::fastrtps::rtps::GUID_t,
rmw_fastrtps_shared_cpp::hash_fastrtps_guid> & subscriptions_ = getSubscriptions();
std::condition_variable & cv_ = getConditionVariable();
if (eprosima::fastrtps::rtps::MATCHED_MATCHING == matchingInfo.status) {
subscriptions_.insert(matchingInfo.remoteEndpointGuid);
} else if (eprosima::fastrtps::rtps::REMOVED_MATCHING == matchingInfo.status) {
subscriptions_.erase(matchingInfo.remoteEndpointGuid);
auto endpoint = clients_endpoints_.find(matchingInfo.remoteEndpointGuid);
if (endpoint != clients_endpoints_.end()) {
clients_endpoints_.erase(endpoint->second);
clients_endpoints_.erase(matchingInfo.remoteEndpointGuid);
}
} else {
return;
}
cv_.notify_all();
}

client_present_t
check_for_subscription(
const eprosima::fastrtps::rtps::GUID_t & guid)
{
// Check if the guid is still in the map
if (clients_endpoints_.find(guid) == clients_endpoints_.end()) {
// Client is gone
return client_present_t::GONE;
}
// Wait for subscription
if (!wait_for_subscription(guid, std::chrono::milliseconds(100))) {
return client_present_t::MAYBE;
}
return client_present_t::YES;
}

// Accesors
clients_endpoints_map_t & clients_endpoints()
{
std::lock_guard<std::mutex> lock(getMutex());
return clients_endpoints_;
}

private:
clients_endpoints_map_t clients_endpoints_;
};

class ServiceListener : public eprosima::fastrtps::SubscriberListener
{
public:
Expand All @@ -72,6 +208,23 @@ class ServiceListener : public eprosima::fastrtps::SubscriberListener
(void)info_;
}

void
onSubscriptionMatched(
eprosima::fastrtps::Subscriber * sub,
eprosima::fastrtps::rtps::MatchingInfo & matchingInfo)
{
(void) sub;
PatchedServicePubListener * pub_listener = static_cast<PatchedServicePubListener *>(
info_->pub_listener_);
if (eprosima::fastrtps::rtps::REMOVED_MATCHING == matchingInfo.status) {
auto endpoint = pub_listener->clients_endpoints().find(
matchingInfo.remoteEndpointGuid);
if (endpoint != pub_listener->clients_endpoints().end()) {
pub_listener->clients_endpoints().erase(endpoint->second);
pub_listener->clients_endpoints().erase(matchingInfo.remoteEndpointGuid);
}
}
}

void
onNewDataMessage(eprosima::fastrtps::Subscriber * sub)
Expand All @@ -95,6 +248,14 @@ class ServiceListener : public eprosima::fastrtps::SubscriberListener
request.sample_identity_.writer_guid() = reader_guid;
}

// Save both guids in the clients_endpoints map
PatchedServicePubListener * pub_listener =
static_cast<PatchedServicePubListener *>(info_->pub_listener_);
const eprosima::fastrtps::rtps::GUID_t & writer_guid =
request.sample_info_.sample_identity.writer_guid();
pub_listener->clients_endpoints().emplace(reader_guid, writer_guid);
pub_listener->clients_endpoints().emplace(writer_guid, reader_guid);

std::lock_guard<std::mutex> lock(internalMutex_);

if (conditionMutex_ != nullptr) {
Expand Down Expand Up @@ -169,49 +330,4 @@ class ServiceListener : public eprosima::fastrtps::SubscriberListener
std::condition_variable * conditionVariable_ RCPPUTILS_TSA_GUARDED_BY(internalMutex_);
};

class ServicePubListener : public eprosima::fastrtps::PublisherListener
{
public:
ServicePubListener() = default;

template<class Rep, class Period>
bool wait_for_subscription(
const eprosima::fastrtps::rtps::GUID_t & guid,
const std::chrono::duration<Rep, Period> & rel_time)
{
auto guid_is_present = [this, guid]() RCPPUTILS_TSA_REQUIRES(mutex_)->bool
{
return subscriptions_.find(guid) != subscriptions_.end();
};

std::unique_lock<std::mutex> lock(mutex_);
return cv_.wait_for(lock, rel_time, guid_is_present);
}

void onPublicationMatched(
eprosima::fastrtps::Publisher * pub,
eprosima::fastrtps::rtps::MatchingInfo & matchingInfo)
{
(void) pub;
std::lock_guard<std::mutex> lock(mutex_);
if (eprosima::fastrtps::rtps::MATCHED_MATCHING == matchingInfo.status) {
subscriptions_.insert(matchingInfo.remoteEndpointGuid);
} else if (eprosima::fastrtps::rtps::REMOVED_MATCHING == matchingInfo.status) {
subscriptions_.erase(matchingInfo.remoteEndpointGuid);
} else {
return;
}
cv_.notify_all();
}

private:
using subscriptions_set_t =
std::unordered_set<eprosima::fastrtps::rtps::GUID_t,
rmw_fastrtps_shared_cpp::hash_fastrtps_guid>;

std::mutex mutex_;
subscriptions_set_t subscriptions_ RCPPUTILS_TSA_GUARDED_BY(mutex_);
std::condition_variable cv_;
};

#endif // RMW_FASTRTPS_SHARED_CPP__CUSTOM_SERVICE_INFO_HPP_
9 changes: 6 additions & 3 deletions rmw_fastrtps_shared_cpp/src/rmw_response.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -118,10 +118,13 @@ __rmw_send_response(
if ((related_guid.entityId.value[3] & entity_id_is_reader_bit) != 0) {
// Related guid is a reader, so it is the response subscription guid.
// Wait for the response writer to be matched with it.
auto listener = info->pub_listener_;
if (!listener->wait_for_subscription(related_guid, std::chrono::milliseconds(100))) {
auto listener = static_cast<PatchedServicePubListener *>(info->pub_listener_);
client_present_t ret = listener->check_for_subscription(related_guid);
if (ret == client_present_t::GONE) {
return RMW_RET_OK;
} else if (ret == client_present_t::MAYBE) {
RMW_SET_ERROR_MSG("client will not receive response");
return RMW_RET_ERROR;
return RMW_RET_TIMEOUT;
}
}

Expand Down

0 comments on commit 95c2f6a

Please sign in to comment.