diff --git a/rmw_fastrtps_cpp/src/rmw_service.cpp b/rmw_fastrtps_cpp/src/rmw_service.cpp index 7fd67d478..8bfbf012c 100644 --- a/rmw_fastrtps_cpp/src/rmw_service.cpp +++ b/rmw_fastrtps_cpp/src/rmw_service.cpp @@ -241,7 +241,7 @@ rmw_create_service( delete info->pub_listener_; } }); - info->pub_listener_ = new (std::nothrow) ServicePubListener(); + info->pub_listener_ = new (std::nothrow) PatchedServicePubListener(); if (!info->pub_listener_) { RMW_SET_ERROR_MSG("failed to create service response publisher listener"); return nullptr; diff --git a/rmw_fastrtps_dynamic_cpp/src/rmw_service.cpp b/rmw_fastrtps_dynamic_cpp/src/rmw_service.cpp index 20c2ef6e8..eddec68e0 100644 --- a/rmw_fastrtps_dynamic_cpp/src/rmw_service.cpp +++ b/rmw_fastrtps_dynamic_cpp/src/rmw_service.cpp @@ -231,7 +231,7 @@ rmw_create_service( RMW_SET_ERROR_MSG("failed to get datawriter qos"); goto fail; } - info->pub_listener_ = new ServicePubListener(); + info->pub_listener_ = new PatchedServicePubListener(); info->response_publisher_ = Domain::createPublisher(participant, publisherParam, info->pub_listener_); if (!info->response_publisher_) { diff --git a/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_service_info.hpp b/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_service_info.hpp index e2e7ff86a..1839e5a77 100644 --- a/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_service_info.hpp +++ b/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_service_info.hpp @@ -20,6 +20,7 @@ #include #include #include +#include #include "fastcdr/FastBuffer.h" @@ -37,6 +38,15 @@ class ServiceListener; class ServicePubListener; +class PatchedServicePubListener; + +enum class client_present_t +{ + FAILURE, // an error occurred when checking + MAYBE, // reader not matched, writer still present + YES, // reader matched + GONE // neither reader nor writer +}; typedef struct CustomServiceInfo { @@ -62,6 +72,132 @@ typedef struct CustomServiceRequest : buffer_(nullptr) {} } CustomServiceRequest; +class ServicePubListener : public eprosima::fastrtps::PublisherListener +{ +public: + ServicePubListener() = default; + + template + bool wait_for_subscription( + const eprosima::fastrtps::rtps::GUID_t & guid, + const std::chrono::duration & rel_time) + { + auto guid_is_present = [this, guid]() RCPPUTILS_TSA_REQUIRES(mutex_)->bool + { + return subscriptions_.find(guid) != subscriptions_.end(); + }; + + std::unique_lock lock(mutex_); + return cv_.wait_for(lock, rel_time, guid_is_present); + } + + void onPublicationMatched( + eprosima::fastrtps::Publisher * pub, + eprosima::fastrtps::rtps::MatchingInfo & matchingInfo) + { + (void) pub; + std::lock_guard lock(mutex_); + if (eprosima::fastrtps::rtps::MATCHED_MATCHING == matchingInfo.status) { + subscriptions_.insert(matchingInfo.remoteEndpointGuid); + } else if (eprosima::fastrtps::rtps::REMOVED_MATCHING == matchingInfo.status) { + subscriptions_.erase(matchingInfo.remoteEndpointGuid); + } else { + return; + } + cv_.notify_all(); + } + +protected: + std::mutex & getMutex() + { + return mutex_; + } + + std::unordered_set< + eprosima::fastrtps::rtps::GUID_t, + rmw_fastrtps_shared_cpp::hash_fastrtps_guid> & + getSubscriptions() + { + return subscriptions_; + } + + std::condition_variable & getConditionVariable() + { + return cv_; + } + +private: + using subscriptions_set_t = + std::unordered_set; + + std::mutex mutex_; + subscriptions_set_t subscriptions_ RCPPUTILS_TSA_GUARDED_BY(mutex_); + std::condition_variable cv_; +}; + +// Wrapper around ServicePubListener to fix issue when matching clients in an ABI compatible way +// See original patch: https://github.com/ros2/rmw_fastrtps/pull/467 +class PatchedServicePubListener : public ServicePubListener +{ +public: + using clients_endpoints_map_t = + std::unordered_map; + + void onPublicationMatched( + eprosima::fastrtps::Publisher * pub, + eprosima::fastrtps::rtps::MatchingInfo & matchingInfo) + { + (void) pub; + std::lock_guard lock(getMutex()); + std::unordered_set< + eprosima::fastrtps::rtps::GUID_t, + rmw_fastrtps_shared_cpp::hash_fastrtps_guid> & subscriptions_ = getSubscriptions(); + std::condition_variable & cv_ = getConditionVariable(); + if (eprosima::fastrtps::rtps::MATCHED_MATCHING == matchingInfo.status) { + subscriptions_.insert(matchingInfo.remoteEndpointGuid); + } else if (eprosima::fastrtps::rtps::REMOVED_MATCHING == matchingInfo.status) { + subscriptions_.erase(matchingInfo.remoteEndpointGuid); + auto endpoint = clients_endpoints_.find(matchingInfo.remoteEndpointGuid); + if (endpoint != clients_endpoints_.end()) { + clients_endpoints_.erase(endpoint->second); + clients_endpoints_.erase(matchingInfo.remoteEndpointGuid); + } + } else { + return; + } + cv_.notify_all(); + } + + client_present_t + check_for_subscription( + const eprosima::fastrtps::rtps::GUID_t & guid) + { + // Check if the guid is still in the map + if (clients_endpoints_.find(guid) == clients_endpoints_.end()) { + // Client is gone + return client_present_t::GONE; + } + // Wait for subscription + if (!wait_for_subscription(guid, std::chrono::milliseconds(100))) { + return client_present_t::MAYBE; + } + return client_present_t::YES; + } + + // Accesors + clients_endpoints_map_t & clients_endpoints() + { + std::lock_guard lock(getMutex()); + return clients_endpoints_; + } + +private: + clients_endpoints_map_t clients_endpoints_; +}; + class ServiceListener : public eprosima::fastrtps::SubscriberListener { public: @@ -72,6 +208,23 @@ class ServiceListener : public eprosima::fastrtps::SubscriberListener (void)info_; } + void + onSubscriptionMatched( + eprosima::fastrtps::Subscriber * sub, + eprosima::fastrtps::rtps::MatchingInfo & matchingInfo) + { + (void) sub; + PatchedServicePubListener * pub_listener = static_cast( + info_->pub_listener_); + if (eprosima::fastrtps::rtps::REMOVED_MATCHING == matchingInfo.status) { + auto endpoint = pub_listener->clients_endpoints().find( + matchingInfo.remoteEndpointGuid); + if (endpoint != pub_listener->clients_endpoints().end()) { + pub_listener->clients_endpoints().erase(endpoint->second); + pub_listener->clients_endpoints().erase(matchingInfo.remoteEndpointGuid); + } + } + } void onNewDataMessage(eprosima::fastrtps::Subscriber * sub) @@ -95,6 +248,14 @@ class ServiceListener : public eprosima::fastrtps::SubscriberListener request.sample_identity_.writer_guid() = reader_guid; } + // Save both guids in the clients_endpoints map + PatchedServicePubListener * pub_listener = + static_cast(info_->pub_listener_); + const eprosima::fastrtps::rtps::GUID_t & writer_guid = + request.sample_info_.sample_identity.writer_guid(); + pub_listener->clients_endpoints().emplace(reader_guid, writer_guid); + pub_listener->clients_endpoints().emplace(writer_guid, reader_guid); + std::lock_guard lock(internalMutex_); if (conditionMutex_ != nullptr) { @@ -169,49 +330,4 @@ class ServiceListener : public eprosima::fastrtps::SubscriberListener std::condition_variable * conditionVariable_ RCPPUTILS_TSA_GUARDED_BY(internalMutex_); }; -class ServicePubListener : public eprosima::fastrtps::PublisherListener -{ -public: - ServicePubListener() = default; - - template - bool wait_for_subscription( - const eprosima::fastrtps::rtps::GUID_t & guid, - const std::chrono::duration & rel_time) - { - auto guid_is_present = [this, guid]() RCPPUTILS_TSA_REQUIRES(mutex_)->bool - { - return subscriptions_.find(guid) != subscriptions_.end(); - }; - - std::unique_lock lock(mutex_); - return cv_.wait_for(lock, rel_time, guid_is_present); - } - - void onPublicationMatched( - eprosima::fastrtps::Publisher * pub, - eprosima::fastrtps::rtps::MatchingInfo & matchingInfo) - { - (void) pub; - std::lock_guard lock(mutex_); - if (eprosima::fastrtps::rtps::MATCHED_MATCHING == matchingInfo.status) { - subscriptions_.insert(matchingInfo.remoteEndpointGuid); - } else if (eprosima::fastrtps::rtps::REMOVED_MATCHING == matchingInfo.status) { - subscriptions_.erase(matchingInfo.remoteEndpointGuid); - } else { - return; - } - cv_.notify_all(); - } - -private: - using subscriptions_set_t = - std::unordered_set; - - std::mutex mutex_; - subscriptions_set_t subscriptions_ RCPPUTILS_TSA_GUARDED_BY(mutex_); - std::condition_variable cv_; -}; - #endif // RMW_FASTRTPS_SHARED_CPP__CUSTOM_SERVICE_INFO_HPP_ diff --git a/rmw_fastrtps_shared_cpp/src/rmw_response.cpp b/rmw_fastrtps_shared_cpp/src/rmw_response.cpp index 4ee96af24..d4a4d14ef 100644 --- a/rmw_fastrtps_shared_cpp/src/rmw_response.cpp +++ b/rmw_fastrtps_shared_cpp/src/rmw_response.cpp @@ -118,10 +118,13 @@ __rmw_send_response( if ((related_guid.entityId.value[3] & entity_id_is_reader_bit) != 0) { // Related guid is a reader, so it is the response subscription guid. // Wait for the response writer to be matched with it. - auto listener = info->pub_listener_; - if (!listener->wait_for_subscription(related_guid, std::chrono::milliseconds(100))) { + auto listener = static_cast(info->pub_listener_); + client_present_t ret = listener->check_for_subscription(related_guid); + if (ret == client_present_t::GONE) { + return RMW_RET_OK; + } else if (ret == client_present_t::MAYBE) { RMW_SET_ERROR_MSG("client will not receive response"); - return RMW_RET_ERROR; + return RMW_RET_TIMEOUT; } }