diff --git a/rmw_fastrtps_shared_cpp/CMakeLists.txt b/rmw_fastrtps_shared_cpp/CMakeLists.txt index ac20a63d0..eb92fd63c 100644 --- a/rmw_fastrtps_shared_cpp/CMakeLists.txt +++ b/rmw_fastrtps_shared_cpp/CMakeLists.txt @@ -47,6 +47,8 @@ find_package(rmw REQUIRED) include_directories(include) add_library(rmw_fastrtps_shared_cpp + src/custom_publisher_info.cpp + src/custom_subscriber_info.cpp src/demangle.cpp src/namespace_prefix.cpp src/qos.cpp diff --git a/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_event_info.hpp b/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_event_info.hpp new file mode 100644 index 000000000..d2cbe3e49 --- /dev/null +++ b/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_event_info.hpp @@ -0,0 +1,72 @@ +// Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef RMW_FASTRTPS_SHARED_CPP__CUSTOM_EVENT_INFO_HPP_ +#define RMW_FASTRTPS_SHARED_CPP__CUSTOM_EVENT_INFO_HPP_ + +#include +#include +#include +#include +#include +#include + +#include "fastcdr/FastBuffer.h" + +#include "fastrtps/subscriber/SampleInfo.h" +#include "fastrtps/subscriber/Subscriber.h" +#include "fastrtps/subscriber/SubscriberListener.h" +#include "fastrtps/participant/Participant.h" +#include "fastrtps/publisher/Publisher.h" +#include "fastrtps/publisher/PublisherListener.h" + +#include "rmw/event.h" + +#include "rmw_fastrtps_shared_cpp/TypeSupport.hpp" + + +class EventListenerInterface +{ +public: + /// Connect a condition variable so a waiter can be notified of new data. + virtual void attachCondition( + std::mutex * conditionMutex, + std::condition_variable * conditionVariable) = 0; + + /// Unset the information from attachCondition. + virtual void detachCondition() = 0; + + /// Check if there is new data available for a specific event type. + /** + * \param event_type The event type to check on. + * \return `true` if new data is available. + */ + virtual bool hasEvent(rmw_event_type_t event_type) const = 0; + + /// Take ready data for an event type. + /** + * \param event_type The event type to get data for. + * \param event_data A preallocated event information (from rmw/types.h) to fill with data + * \return `true` if data was successfully taken. + * \return `false` if data was not available, in this case nothing was written to event_data. + */ + virtual bool takeNextEvent(rmw_event_type_t event_type, void * event_data) = 0; +}; + +typedef struct CustomEventInfo +{ + virtual EventListenerInterface * getListener() = 0; +} CustomEventInfo; + +#endif // RMW_FASTRTPS_SHARED_CPP__CUSTOM_EVENT_INFO_HPP_ diff --git a/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_publisher_info.hpp b/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_publisher_info.hpp index ceb902fc2..e1c6ec8e2 100644 --- a/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_publisher_info.hpp +++ b/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_publisher_info.hpp @@ -16,6 +16,7 @@ #define RMW_FASTRTPS_SHARED_CPP__CUSTOM_PUBLISHER_INFO_HPP_ #include +#include #include #include "fastrtps/publisher/Publisher.h" @@ -25,19 +26,25 @@ #include "rmw/rmw.h" #include "rmw_fastrtps_shared_cpp/TypeSupport.hpp" +#include "rmw_fastrtps_shared_cpp/custom_event_info.hpp" + class PubListener; -typedef struct CustomPublisherInfo +typedef struct CustomPublisherInfo : public CustomEventInfo { + virtual ~CustomPublisherInfo() = default; + eprosima::fastrtps::Publisher * publisher_; PubListener * listener_; rmw_fastrtps_shared_cpp::TypeSupport * type_support_; rmw_gid_t publisher_gid; const char * typesupport_identifier_; + + EventListenerInterface * getListener(); } CustomPublisherInfo; -class PubListener : public eprosima::fastrtps::PublisherListener +class PubListener : public EventListenerInterface, public eprosima::fastrtps::PublisherListener { public: explicit PubListener(CustomPublisherInfo * info) @@ -45,9 +52,10 @@ class PubListener : public eprosima::fastrtps::PublisherListener (void) info; } + // PublisherListener implementation void onPublicationMatched( - eprosima::fastrtps::Publisher * pub, eprosima::fastrtps::rtps::MatchingInfo & info) + eprosima::fastrtps::Publisher * pub, eprosima::fastrtps::rtps::MatchingInfo & info) override { (void) pub; std::lock_guard lock(internalMutex_); @@ -58,16 +66,55 @@ class PubListener : public eprosima::fastrtps::PublisherListener } } + void on_offered_deadline_missed( + eprosima::fastrtps::Publisher * publisher, + const eprosima::fastrtps::OfferedDeadlineMissedStatus & status) override; + + void on_liveliness_lost( + eprosima::fastrtps::Publisher * publisher, + const eprosima::fastrtps::LivelinessLostStatus & status) override; + + + // EventListenerInterface implementation + bool + hasEvent(rmw_event_type_t /* event_type */) const override; + + bool + takeNextEvent(rmw_event_type_t /* event_type */, void * /* event_data */) override; + + // PubListener API size_t subscriptionCount() { std::lock_guard lock(internalMutex_); return subscriptions_.size(); } + void + attachCondition(std::mutex * conditionMutex, std::condition_variable * conditionVariable) + { + std::lock_guard lock(internalMutex_); + conditionMutex_ = conditionMutex; + conditionVariable_ = conditionVariable; + } + + void + detachCondition() + { + std::lock_guard lock(internalMutex_); + conditionMutex_ = nullptr; + conditionVariable_ = nullptr; + } + private: std::mutex internalMutex_; - std::set - subscriptions_ RCPPUTILS_TSA_GUARDED_BY(internalMutex_); + std::set subscriptions_ + RCPPUTILS_TSA_GUARDED_BY(internalMutex_); + eprosima::fastrtps::OfferedDeadlineMissedStatus offered_deadline_missed_status_ + RCPPUTILS_TSA_GUARDED_BY(internalMutex_); + eprosima::fastrtps::LivelinessLostStatus liveliness_lost_status_ + RCPPUTILS_TSA_GUARDED_BY(internalMutex_); + std::mutex * conditionMutex_ RCPPUTILS_TSA_GUARDED_BY(internalMutex_); + std::condition_variable * conditionVariable_ RCPPUTILS_TSA_GUARDED_BY(internalMutex_); }; #endif // RMW_FASTRTPS_SHARED_CPP__CUSTOM_PUBLISHER_INFO_HPP_ diff --git a/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_subscriber_info.hpp b/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_subscriber_info.hpp index 2aba576a7..2eb65fbdd 100644 --- a/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_subscriber_info.hpp +++ b/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_subscriber_info.hpp @@ -26,19 +26,27 @@ #include "rcpputils/thread_safety_annotations.hpp" +#include "rmw/impl/cpp/macros.hpp" + #include "rmw_fastrtps_shared_cpp/TypeSupport.hpp" +#include "rmw_fastrtps_shared_cpp/custom_event_info.hpp" + class SubListener; -typedef struct CustomSubscriberInfo +typedef struct CustomSubscriberInfo : public CustomEventInfo { + virtual ~CustomSubscriberInfo() = default; + eprosima::fastrtps::Subscriber * subscriber_; SubListener * listener_; rmw_fastrtps_shared_cpp::TypeSupport * type_support_; const char * typesupport_identifier_; + + EventListenerInterface * getListener() override; } CustomSubscriberInfo; -class SubListener : public eprosima::fastrtps::SubscriberListener +class SubListener : public EventListenerInterface, public eprosima::fastrtps::SubscriberListener { public: explicit SubListener(CustomSubscriberInfo * info) @@ -49,9 +57,10 @@ class SubListener : public eprosima::fastrtps::SubscriberListener (void)info; } + // SubscriberListener implementation void onSubscriptionMatched( - eprosima::fastrtps::Subscriber * sub, eprosima::fastrtps::rtps::MatchingInfo & info) + eprosima::fastrtps::Subscriber * sub, eprosima::fastrtps::rtps::MatchingInfo & info) override { (void)sub; @@ -64,7 +73,7 @@ class SubListener : public eprosima::fastrtps::SubscriberListener } void - onNewDataMessage(eprosima::fastrtps::Subscriber * sub) + onNewDataMessage(eprosima::fastrtps::Subscriber * sub) override { (void)sub; std::lock_guard lock(internalMutex_); @@ -81,6 +90,24 @@ class SubListener : public eprosima::fastrtps::SubscriberListener } } + void on_requested_deadline_missed( + eprosima::fastrtps::Subscriber *, + const eprosima::fastrtps::RequestedDeadlineMissedStatus &) override; + + void on_liveliness_changed( + eprosima::fastrtps::Subscriber *, + const eprosima::fastrtps::LivelinessChangedStatus &) override; + + // EventListenerInterface implementation + bool + hasEvent(rmw_event_type_t event_type) const + override; + + bool + takeNextEvent(rmw_event_type_t event_type, void * event_data) + override; + + // SubListener API void attachCondition(std::mutex * conditionMutex, std::condition_variable * conditionVariable) { @@ -98,7 +125,7 @@ class SubListener : public eprosima::fastrtps::SubscriberListener } bool - hasData() + hasData() const { return data_ > 0; } @@ -125,6 +152,11 @@ class SubListener : public eprosima::fastrtps::SubscriberListener private: std::mutex internalMutex_; std::atomic_size_t data_; + eprosima::fastrtps::LivelinessChangedStatus liveliness_changed_status_ + RCPPUTILS_TSA_GUARDED_BY(internalMutex_); + eprosima::fastrtps::RequestedDeadlineMissedStatus requested_deadline_missed_status_ + RCPPUTILS_TSA_GUARDED_BY(internalMutex_); + std::mutex * conditionMutex_ RCPPUTILS_TSA_GUARDED_BY(internalMutex_); std::condition_variable * conditionVariable_ RCPPUTILS_TSA_GUARDED_BY(internalMutex_); diff --git a/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/qos.hpp b/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/qos.hpp index 90f77f24a..fce19d99f 100644 --- a/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/qos.hpp +++ b/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/qos.hpp @@ -42,12 +42,6 @@ get_datawriter_qos( RMW_FASTRTPS_SHARED_CPP_PUBLIC bool -is_time_default( - const rmw_time_t & time); - -RMW_FASTRTPS_SHARED_CPP_PUBLIC -bool -is_valid_qos( - const rmw_qos_profile_t & qos_policies); +is_valid_qos(const rmw_qos_profile_t & qos_policies); #endif // RMW_FASTRTPS_SHARED_CPP__QOS_HPP_ diff --git a/rmw_fastrtps_shared_cpp/src/custom_publisher_info.cpp b/rmw_fastrtps_shared_cpp/src/custom_publisher_info.cpp new file mode 100644 index 000000000..70b78ac3b --- /dev/null +++ b/rmw_fastrtps_shared_cpp/src/custom_publisher_info.cpp @@ -0,0 +1,110 @@ +// Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "rmw_fastrtps_shared_cpp/custom_publisher_info.hpp" + +EventListenerInterface * +CustomPublisherInfo::getListener() +{ + return listener_; +} + +void +PubListener::on_offered_deadline_missed( + eprosima::fastrtps::Publisher * /* publisher */, + const eprosima::fastrtps::OfferedDeadlineMissedStatus & status) +{ + std::lock_guard lock(internalMutex_); + if (conditionMutex_ != nullptr) { + { + // the change to offered_deadline_missed_count_ needs to be mutually exclusive with + // rmw_wait() which checks hasEvent() and decides if wait() needs to be called + std::unique_lock clock(*conditionMutex_); + // Assign absolute values + offered_deadline_missed_status_.total_count = status.total_count; + // Accumulate deltas + offered_deadline_missed_status_.total_count_change += status.total_count_change; + } + conditionVariable_->notify_one(); + } else { + // Assign absolute values + offered_deadline_missed_status_.total_count = status.total_count; + // Accumulate deltas + offered_deadline_missed_status_.total_count_change += status.total_count_change; + } +} + +void PubListener::on_liveliness_lost( + eprosima::fastrtps::Publisher * /* publisher */, + const eprosima::fastrtps::LivelinessLostStatus & status) +{ + std::lock_guard lock(internalMutex_); + if (conditionMutex_ != nullptr) { + { + // the change to liveliness_lost_count_ needs to be mutually exclusive with + // rmw_wait() which checks hasEvent() and decides if wait() needs to be called + std::unique_lock clock(*conditionMutex_); + // Assign absolute values + liveliness_lost_status_.total_count = status.total_count; + // Accumulate deltas + liveliness_lost_status_.total_count_change += status.total_count_change; + } + conditionVariable_->notify_one(); + } else { + // Assign absolute values + liveliness_lost_status_.total_count = status.total_count; + // Accumulate deltas + liveliness_lost_status_.total_count_change += status.total_count_change; + } +} + +bool PubListener::hasEvent(rmw_event_type_t event_type) const +{ + switch (event_type) { + case RMW_EVENT_LIVELINESS_LOST: + return liveliness_lost_status_.total_count_change != 0; + case RMW_EVENT_OFFERED_DEADLINE_MISSED: + return offered_deadline_missed_status_.total_count_change != 0; + default: + break; + } + return false; +} + +bool PubListener::takeNextEvent(rmw_event_type_t event_type, void * event_data) +{ + if (!hasEvent(event_type)) { + return false; + } + std::lock_guard lock(internalMutex_); + switch (event_type) { + case RMW_EVENT_LIVELINESS_LOST: { + rmw_liveliness_lost_status_t * rmw_data = + static_cast(event_data); + rmw_data->total_count = liveliness_lost_status_.total_count; + rmw_data->total_count_change = liveliness_lost_status_.total_count_change; + liveliness_lost_status_.total_count_change = 0; + } break; + case RMW_EVENT_OFFERED_DEADLINE_MISSED: { + rmw_offered_deadline_missed_status_t * rmw_data = + static_cast(event_data); + rmw_data->total_count = offered_deadline_missed_status_.total_count; + rmw_data->total_count_change = offered_deadline_missed_status_.total_count_change; + offered_deadline_missed_status_.total_count_change = 0; + } break; + default: + return false; + } + return true; +} diff --git a/rmw_fastrtps_shared_cpp/src/custom_subscriber_info.cpp b/rmw_fastrtps_shared_cpp/src/custom_subscriber_info.cpp new file mode 100644 index 000000000..bdd53685b --- /dev/null +++ b/rmw_fastrtps_shared_cpp/src/custom_subscriber_info.cpp @@ -0,0 +1,118 @@ +// Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "rmw_fastrtps_shared_cpp/custom_subscriber_info.hpp" + +EventListenerInterface * +CustomSubscriberInfo::getListener() +{ + return listener_; +} + +void +SubListener::on_requested_deadline_missed( + eprosima::fastrtps::Subscriber * /* subscriber */, + const eprosima::fastrtps::RequestedDeadlineMissedStatus & status) +{ + std::lock_guard lock(internalMutex_); + if (conditionMutex_ != nullptr) { + { + // the change to requested_deadline_missed_count_ needs to be mutually exclusive with + // rmw_wait() which checks hasEvent() and decides if wait() needs to be called + std::unique_lock clock(*conditionMutex_); + // Assign absolute values + requested_deadline_missed_status_.total_count = status.total_count; + // Accumulate deltas + requested_deadline_missed_status_.total_count_change += status.total_count_change; + } + conditionVariable_->notify_one(); + } else { + // Assign absolute values + requested_deadline_missed_status_.total_count = status.total_count; + // Accumulate deltas + requested_deadline_missed_status_.total_count_change += status.total_count_change; + } +} + +void SubListener::on_liveliness_changed( + eprosima::fastrtps::Subscriber * /* subscriber */, + const eprosima::fastrtps::LivelinessChangedStatus & status) +{ + std::lock_guard lock(internalMutex_); + if (conditionMutex_ != nullptr) { + { + // the change to liveliness_changed_count_ needs to be mutually exclusive with + // rmw_wait() which checks hasEvent() and decides if wait() needs to be called + std::unique_lock clock(*conditionMutex_); + // Assign absolute values + liveliness_changed_status_.alive_count = status.alive_count; + liveliness_changed_status_.not_alive_count = status.not_alive_count; + // Accumulate deltas + liveliness_changed_status_.alive_count_change += status.alive_count_change; + liveliness_changed_status_.not_alive_count_change += status.not_alive_count_change; + } + conditionVariable_->notify_one(); + } else { + // Assign absolute values + liveliness_changed_status_.alive_count = status.alive_count; + liveliness_changed_status_.not_alive_count = status.not_alive_count; + // Accumulate deltas + liveliness_changed_status_.alive_count_change += status.alive_count_change; + liveliness_changed_status_.not_alive_count_change += status.not_alive_count_change; + } +} + +bool SubListener::hasEvent(rmw_event_type_t event_type) const +{ + switch (event_type) { + case RMW_EVENT_LIVELINESS_CHANGED: + return liveliness_changed_status_.alive_count_change + + liveliness_changed_status_.not_alive_count_change != 0; + case RMW_EVENT_REQUESTED_DEADLINE_MISSED: + return requested_deadline_missed_status_.total_count_change != 0; + default: + break; + } + return false; +} + +bool SubListener::takeNextEvent(rmw_event_type_t event_type, void * event_data) +{ + if (!hasEvent(event_type)) { + return false; + } + std::lock_guard lock(internalMutex_); + switch (event_type) { + case RMW_EVENT_LIVELINESS_CHANGED: { + rmw_liveliness_changed_status_t * rmw_data = + static_cast(event_data); + rmw_data->alive_count = liveliness_changed_status_.alive_count; + rmw_data->not_alive_count = liveliness_changed_status_.not_alive_count; + rmw_data->alive_count_change = liveliness_changed_status_.alive_count_change; + rmw_data->not_alive_count_change = liveliness_changed_status_.not_alive_count_change; + liveliness_changed_status_.alive_count_change = 0; + liveliness_changed_status_.not_alive_count_change = 0; + } break; + case RMW_EVENT_REQUESTED_DEADLINE_MISSED: { + rmw_requested_deadline_missed_status_t * rmw_data = + static_cast(event_data); + rmw_data->total_count = requested_deadline_missed_status_.total_count; + rmw_data->total_count_change = requested_deadline_missed_status_.total_count_change; + requested_deadline_missed_status_.total_count_change = 0; + } break; + default: + return false; + } + return true; +} diff --git a/rmw_fastrtps_shared_cpp/src/qos.cpp b/rmw_fastrtps_shared_cpp/src/qos.cpp index 4228b24bb..551b75456 100644 --- a/rmw_fastrtps_shared_cpp/src/qos.cpp +++ b/rmw_fastrtps_shared_cpp/src/qos.cpp @@ -21,84 +21,33 @@ #include "rmw/error_handling.h" -bool -get_datareader_qos( - const rmw_qos_profile_t & qos_policies, - eprosima::fastrtps::SubscriberAttributes & sattr) +static +eprosima::fastrtps::Duration_t +rmw_time_to_fastrtps(const rmw_time_t & time) { - switch (qos_policies.history) { - case RMW_QOS_POLICY_HISTORY_KEEP_LAST: - sattr.topic.historyQos.kind = eprosima::fastrtps::KEEP_LAST_HISTORY_QOS; - break; - case RMW_QOS_POLICY_HISTORY_KEEP_ALL: - sattr.topic.historyQos.kind = eprosima::fastrtps::KEEP_ALL_HISTORY_QOS; - break; - case RMW_QOS_POLICY_HISTORY_SYSTEM_DEFAULT: - break; - default: - RMW_SET_ERROR_MSG("Unknown QoS history policy"); - return false; - } - - switch (qos_policies.reliability) { - case RMW_QOS_POLICY_RELIABILITY_BEST_EFFORT: - sattr.qos.m_reliability.kind = eprosima::fastrtps::BEST_EFFORT_RELIABILITY_QOS; - break; - case RMW_QOS_POLICY_RELIABILITY_RELIABLE: - sattr.qos.m_reliability.kind = eprosima::fastrtps::RELIABLE_RELIABILITY_QOS; - break; - case RMW_QOS_POLICY_RELIABILITY_SYSTEM_DEFAULT: - break; - default: - RMW_SET_ERROR_MSG("Unknown QoS reliability policy"); - return false; - } - - switch (qos_policies.durability) { - case RMW_QOS_POLICY_DURABILITY_TRANSIENT_LOCAL: - sattr.qos.m_durability.kind = eprosima::fastrtps::TRANSIENT_LOCAL_DURABILITY_QOS; - break; - case RMW_QOS_POLICY_DURABILITY_VOLATILE: - sattr.qos.m_durability.kind = eprosima::fastrtps::VOLATILE_DURABILITY_QOS; - break; - case RMW_QOS_POLICY_DURABILITY_SYSTEM_DEFAULT: - break; - default: - RMW_SET_ERROR_MSG("Unknown QoS durability policy"); - return false; - } - - if (qos_policies.depth != RMW_QOS_POLICY_DEPTH_SYSTEM_DEFAULT) { - sattr.topic.historyQos.depth = static_cast(qos_policies.depth); - } - - // ensure the history depth is at least the requested queue size - assert(sattr.topic.historyQos.depth >= 0); - if ( - eprosima::fastrtps::KEEP_LAST_HISTORY_QOS == sattr.topic.historyQos.kind && - static_cast(sattr.topic.historyQos.depth) < qos_policies.depth) - { - if (qos_policies.depth > (std::numeric_limits::max)()) { - RMW_SET_ERROR_MSG( - "failed to set history depth since the requested queue size exceeds the DDS type"); - return false; - } - sattr.topic.historyQos.depth = static_cast(qos_policies.depth); - } - - return true; + return eprosima::fastrtps::Duration_t(time.sec, time.nsec); } +static bool -get_datawriter_qos( - const rmw_qos_profile_t & qos_policies, eprosima::fastrtps::PublisherAttributes & pattr) +is_time_default( + const rmw_time_t & time) +{ + return time.sec == 0 && time.nsec == 0; +} + +template +bool fill_entity_qos_from_profile( + const rmw_qos_profile_t & qos_policies, + DDSEntityQos & entity_qos, + eprosima::fastrtps::HistoryQosPolicy & history_qos) { switch (qos_policies.history) { case RMW_QOS_POLICY_HISTORY_KEEP_LAST: - pattr.topic.historyQos.kind = eprosima::fastrtps::KEEP_LAST_HISTORY_QOS; + history_qos.kind = eprosima::fastrtps::KEEP_LAST_HISTORY_QOS; break; case RMW_QOS_POLICY_HISTORY_KEEP_ALL: - pattr.topic.historyQos.kind = eprosima::fastrtps::KEEP_ALL_HISTORY_QOS; + history_qos.kind = eprosima::fastrtps::KEEP_ALL_HISTORY_QOS; break; case RMW_QOS_POLICY_HISTORY_SYSTEM_DEFAULT: break; @@ -109,10 +58,10 @@ get_datawriter_qos( switch (qos_policies.durability) { case RMW_QOS_POLICY_DURABILITY_TRANSIENT_LOCAL: - pattr.qos.m_durability.kind = eprosima::fastrtps::TRANSIENT_LOCAL_DURABILITY_QOS; + entity_qos.m_durability.kind = eprosima::fastrtps::TRANSIENT_LOCAL_DURABILITY_QOS; break; case RMW_QOS_POLICY_DURABILITY_VOLATILE: - pattr.qos.m_durability.kind = eprosima::fastrtps::VOLATILE_DURABILITY_QOS; + entity_qos.m_durability.kind = eprosima::fastrtps::VOLATILE_DURABILITY_QOS; break; case RMW_QOS_POLICY_DURABILITY_SYSTEM_DEFAULT: break; @@ -123,10 +72,10 @@ get_datawriter_qos( switch (qos_policies.reliability) { case RMW_QOS_POLICY_RELIABILITY_BEST_EFFORT: - pattr.qos.m_reliability.kind = eprosima::fastrtps::BEST_EFFORT_RELIABILITY_QOS; + entity_qos.m_reliability.kind = eprosima::fastrtps::BEST_EFFORT_RELIABILITY_QOS; break; case RMW_QOS_POLICY_RELIABILITY_RELIABLE: - pattr.qos.m_reliability.kind = eprosima::fastrtps::RELIABLE_RELIABILITY_QOS; + entity_qos.m_reliability.kind = eprosima::fastrtps::RELIABLE_RELIABILITY_QOS; break; case RMW_QOS_POLICY_RELIABILITY_SYSTEM_DEFAULT: break; @@ -136,49 +85,86 @@ get_datawriter_qos( } if (qos_policies.depth != RMW_QOS_POLICY_DEPTH_SYSTEM_DEFAULT) { - pattr.topic.historyQos.depth = static_cast(qos_policies.depth); + history_qos.depth = static_cast(qos_policies.depth); } // ensure the history depth is at least the requested queue size - assert(pattr.topic.historyQos.depth >= 0); + assert(history_qos.depth >= 0); if ( - eprosima::fastrtps::KEEP_LAST_HISTORY_QOS == pattr.topic.historyQos.kind && - static_cast(pattr.topic.historyQos.depth) < qos_policies.depth) + eprosima::fastrtps::KEEP_LAST_HISTORY_QOS == history_qos.kind && + static_cast(history_qos.depth) < qos_policies.depth) { if (qos_policies.depth > (std::numeric_limits::max)()) { RMW_SET_ERROR_MSG( "failed to set history depth since the requested queue size exceeds the DDS type"); return false; } - pattr.topic.historyQos.depth = static_cast(qos_policies.depth); + history_qos.depth = static_cast(qos_policies.depth); + } + + if (!is_time_default(qos_policies.lifespan)) { + entity_qos.m_lifespan.duration = rmw_time_to_fastrtps(qos_policies.lifespan); + } + + if (!is_time_default(qos_policies.deadline)) { + entity_qos.m_deadline.period = rmw_time_to_fastrtps(qos_policies.deadline); + } + + switch (qos_policies.liveliness) { + case RMW_QOS_POLICY_LIVELINESS_AUTOMATIC: + entity_qos.m_liveliness.kind = eprosima::fastrtps::AUTOMATIC_LIVELINESS_QOS; + break; + case RMW_QOS_POLICY_LIVELINESS_MANUAL_BY_NODE: + entity_qos.m_liveliness.kind = eprosima::fastrtps::MANUAL_BY_PARTICIPANT_LIVELINESS_QOS; + break; + case RMW_QOS_POLICY_LIVELINESS_MANUAL_BY_TOPIC: + entity_qos.m_liveliness.kind = eprosima::fastrtps::MANUAL_BY_TOPIC_LIVELINESS_QOS; + break; + case RMW_QOS_POLICY_LIVELINESS_SYSTEM_DEFAULT: + break; + default: + RMW_SET_ERROR_MSG("Unknown QoS Liveliness policy"); + return false; + } + if (!is_time_default(qos_policies.liveliness_lease_duration)) { + entity_qos.m_liveliness.lease_duration = + rmw_time_to_fastrtps(qos_policies.liveliness_lease_duration); + + // Docs suggest setting no higher than 0.7 * lease_duration, choosing 2/3 to give safe buffer. + uint64_t total_nanos = entity_qos.m_liveliness.lease_duration.to_ns() * 2 / 3; + uint32_t seconds = RCUTILS_NS_TO_S(total_nanos); + uint32_t remainder_nanos = total_nanos - RCUTILS_S_TO_NS(seconds); + fprintf(stderr, "WE DOING IT %d %d\n", seconds, remainder_nanos); + entity_qos.m_liveliness.announcement_period = + eprosima::fastrtps::Duration_t(seconds, remainder_nanos); } return true; } bool -is_time_default( - const rmw_time_t & time) +get_datareader_qos( + const rmw_qos_profile_t & qos_policies, + eprosima::fastrtps::SubscriberAttributes & sattr) { - return time.sec == 0 && time.nsec == 0; + return fill_entity_qos_from_profile(qos_policies, sattr.qos, sattr.topic.historyQos); +} + +bool +get_datawriter_qos( + const rmw_qos_profile_t & qos_policies, eprosima::fastrtps::PublisherAttributes & pattr) +{ + return fill_entity_qos_from_profile(qos_policies, pattr.qos, pattr.topic.historyQos); } bool is_valid_qos( const rmw_qos_profile_t & qos_policies) { - if (!is_time_default(qos_policies.deadline)) { - RMW_SET_ERROR_MSG("Deadline unsupported for fastrtps"); - return false; - } - if (!is_time_default(qos_policies.lifespan)) { - RMW_SET_ERROR_MSG("Lifespan unsupported for fastrtps"); - return false; - } if (qos_policies.liveliness == RMW_QOS_POLICY_LIVELINESS_MANUAL_BY_NODE || qos_policies.liveliness == RMW_QOS_POLICY_LIVELINESS_MANUAL_BY_TOPIC) { - RMW_SET_ERROR_MSG("Liveliness unsupported for fastrtps"); + RMW_SET_ERROR_MSG("Manual liveliness unsupported for fastrtps"); return false; } return true; diff --git a/rmw_fastrtps_shared_cpp/src/rmw_take.cpp b/rmw_fastrtps_shared_cpp/src/rmw_take.cpp index 56bb50f26..2d5e97634 100644 --- a/rmw_fastrtps_shared_cpp/src/rmw_take.cpp +++ b/rmw_fastrtps_shared_cpp/src/rmw_take.cpp @@ -89,21 +89,27 @@ __rmw_take_event( void * event_info, bool * taken) { - RCUTILS_CHECK_FOR_NULL_WITH_MSG( - event_handle, "event_handle pointer is null", return RMW_RET_ERROR); - RCUTILS_CHECK_FOR_NULL_WITH_MSG( - event_info, "event info output pointer is null", return RMW_RET_ERROR); - RCUTILS_CHECK_FOR_NULL_WITH_MSG(taken, "boolean flag for taken is null", return RMW_RET_ERROR); + RMW_CHECK_ARGUMENT_FOR_NULL(event_handle, RMW_RET_INVALID_ARGUMENT); + RMW_CHECK_ARGUMENT_FOR_NULL(event_info, RMW_RET_INVALID_ARGUMENT); + RMW_CHECK_ARGUMENT_FOR_NULL(taken, RMW_RET_INVALID_ARGUMENT); *taken = false; - if (event_handle->implementation_identifier != identifier) { - RMW_SET_ERROR_MSG("event handle not from this implementation"); - return RMW_RET_ERROR; + RMW_CHECK_TYPE_IDENTIFIERS_MATCH( + event handle, + event_handle->implementation_identifier, + identifier, + return RMW_RET_ERROR); + + rmw_ret_t ret = RMW_RET_ERROR; + + auto event = static_cast(event_handle->data); + if (event->getListener()->takeNextEvent(event_handle->event_type, event_info)) { + *taken = true; + ret = RMW_RET_OK; } - RMW_SET_ERROR_MSG("take_event is not yet implemented by rmw_fastrtps yet"); - return RMW_RET_UNSUPPORTED; + return ret; } rmw_ret_t diff --git a/rmw_fastrtps_shared_cpp/src/rmw_wait.cpp b/rmw_fastrtps_shared_cpp/src/rmw_wait.cpp index ea784a21a..324abb514 100644 --- a/rmw_fastrtps_shared_cpp/src/rmw_wait.cpp +++ b/rmw_fastrtps_shared_cpp/src/rmw_wait.cpp @@ -30,7 +30,8 @@ check_wait_set_for_data( const rmw_subscriptions_t * subscriptions, const rmw_guard_conditions_t * guard_conditions, const rmw_services_t * services, - const rmw_clients_t * clients) + const rmw_clients_t * clients, + const rmw_events_t * events) { if (subscriptions) { for (size_t i = 0; i < subscriptions->subscriber_count; ++i) { @@ -63,6 +64,16 @@ check_wait_set_for_data( } } + if (events) { + for (size_t i = 0; i < events->event_count; ++i) { + auto event = static_cast(events->events[i]); + auto custom_event_info = static_cast(event->data); + if (!custom_event_info->getListener()->hasEvent(event->event_type)) { + return true; + } + } + } + if (guard_conditions) { for (size_t i = 0; i < guard_conditions->guard_condition_count; ++i) { void * data = guard_conditions->guard_conditions[i]; @@ -131,14 +142,13 @@ __rmw_wait( } } - // TODO(mm318): implement attachCondition for events when feature becomes available in fastrtps - // if (events) { - // for (size_t i = 0; i < events->event_count; ++i) { - // void * data = events->events[i]; - // auto custom_event_info = static_cast(data); - // custom_event_info->getListener()->attachCondition(conditionMutex, conditionVariable); - // } - // } + if (events) { + for (size_t i = 0; i < events->event_count; ++i) { + auto event = static_cast(events->events[i]); + auto custom_event_info = static_cast(event->data); + custom_event_info->getListener()->attachCondition(conditionMutex, conditionVariable); + } + } if (guard_conditions) { for (size_t i = 0; i < guard_conditions->guard_condition_count; ++i) { @@ -154,9 +164,10 @@ __rmw_wait( // otherwise the decision to wait might be incorrect std::unique_lock lock(*conditionMutex); - bool hasData = check_wait_set_for_data(subscriptions, guard_conditions, services, clients); - auto predicate = [subscriptions, guard_conditions, services, clients]() { - return check_wait_set_for_data(subscriptions, guard_conditions, services, clients); + bool hasData = check_wait_set_for_data( + subscriptions, guard_conditions, services, clients, events); + auto predicate = [subscriptions, guard_conditions, services, clients, events]() { + return check_wait_set_for_data(subscriptions, guard_conditions, services, clients, events); }; bool timeout = false; @@ -213,17 +224,14 @@ __rmw_wait( } } - // TODO(mm318): implement detachCondition for events when feature becomes available in fastrtps - // For now, set all to NULL because data is not ready if (events) { for (size_t i = 0; i < events->event_count; ++i) { - events->events[i] = 0; - // void * data = events->events[i]; - // auto custom_event_info = static_cast(data); - // custom_event_info->getListener()->detachCondition(); - // if (!custom_event_info->getListener()->hasEvent()) { - // events->events[i] = 0; - // } + auto event = static_cast(events->events[i]); + auto custom_event_info = static_cast(event->data); + custom_event_info->getListener()->detachCondition(); + if (!custom_event_info->getListener()->hasEvent(event->event_type)) { + events->events[i] = nullptr; + } } }