Skip to content

Commit

Permalink
WIP support Deadline, Lifespan, Liveliness QoS Policy types and event…
Browse files Browse the repository at this point in the history
… callbacks in Fast-RTPS 1.8

Signed-off-by: Emerson Knapp <eknapp@amazon.com>
  • Loading branch information
Emerson Knapp authored and mm318 committed May 8, 2019
1 parent 167383b commit b1bde54
Show file tree
Hide file tree
Showing 10 changed files with 513 additions and 138 deletions.
2 changes: 2 additions & 0 deletions rmw_fastrtps_shared_cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ find_package(rmw REQUIRED)
include_directories(include)

add_library(rmw_fastrtps_shared_cpp
src/custom_publisher_info.cpp
src/custom_subscriber_info.cpp
src/demangle.cpp
src/namespace_prefix.cpp
src/qos.cpp
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
// Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#ifndef RMW_FASTRTPS_SHARED_CPP__CUSTOM_EVENT_INFO_HPP_
#define RMW_FASTRTPS_SHARED_CPP__CUSTOM_EVENT_INFO_HPP_

#include <atomic>
#include <condition_variable>
#include <list>
#include <memory>
#include <mutex>
#include <utility>

#include "fastcdr/FastBuffer.h"

#include "fastrtps/subscriber/SampleInfo.h"
#include "fastrtps/subscriber/Subscriber.h"
#include "fastrtps/subscriber/SubscriberListener.h"
#include "fastrtps/participant/Participant.h"
#include "fastrtps/publisher/Publisher.h"
#include "fastrtps/publisher/PublisherListener.h"

#include "rmw/event.h"

#include "rmw_fastrtps_shared_cpp/TypeSupport.hpp"


class EventListenerInterface
{
public:
/// Connect a condition variable so a waiter can be notified of new data.
virtual void attachCondition(
std::mutex * conditionMutex,
std::condition_variable * conditionVariable) = 0;

/// Unset the information from attachCondition.
virtual void detachCondition() = 0;

/// Check if there is new data available for a specific event type.
/**
* \param event_type The event type to check on.
* \return `true` if new data is available.
*/
virtual bool hasEvent(rmw_event_type_t event_type) const = 0;

/// Take ready data for an event type.
/**
* \param event_type The event type to get data for.
* \param event_data A preallocated event information (from rmw/types.h) to fill with data
* \return `true` if data was successfully taken.
* \return `false` if data was not available, in this case nothing was written to event_data.
*/
virtual bool takeNextEvent(rmw_event_type_t event_type, void * event_data) = 0;
};

typedef struct CustomEventInfo
{
virtual EventListenerInterface * getListener() = 0;
} CustomEventInfo;

#endif // RMW_FASTRTPS_SHARED_CPP__CUSTOM_EVENT_INFO_HPP_
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#define RMW_FASTRTPS_SHARED_CPP__CUSTOM_PUBLISHER_INFO_HPP_

#include <mutex>
#include <condition_variable>
#include <set>

#include "fastrtps/publisher/Publisher.h"
Expand All @@ -25,29 +26,36 @@
#include "rmw/rmw.h"

#include "rmw_fastrtps_shared_cpp/TypeSupport.hpp"
#include "rmw_fastrtps_shared_cpp/custom_event_info.hpp"


class PubListener;

typedef struct CustomPublisherInfo
typedef struct CustomPublisherInfo : public CustomEventInfo
{
virtual ~CustomPublisherInfo() = default;

eprosima::fastrtps::Publisher * publisher_;
PubListener * listener_;
rmw_fastrtps_shared_cpp::TypeSupport * type_support_;
rmw_gid_t publisher_gid;
const char * typesupport_identifier_;

EventListenerInterface * getListener();
} CustomPublisherInfo;

class PubListener : public eprosima::fastrtps::PublisherListener
class PubListener : public EventListenerInterface, public eprosima::fastrtps::PublisherListener
{
public:
explicit PubListener(CustomPublisherInfo * info)
{
(void) info;
}

// PublisherListener implementation
void
onPublicationMatched(
eprosima::fastrtps::Publisher * pub, eprosima::fastrtps::rtps::MatchingInfo & info)
eprosima::fastrtps::Publisher * pub, eprosima::fastrtps::rtps::MatchingInfo & info) override
{
(void) pub;
std::lock_guard<std::mutex> lock(internalMutex_);
Expand All @@ -58,16 +66,55 @@ class PubListener : public eprosima::fastrtps::PublisherListener
}
}

void on_offered_deadline_missed(
eprosima::fastrtps::Publisher * publisher,
const eprosima::fastrtps::OfferedDeadlineMissedStatus & status) override;

void on_liveliness_lost(
eprosima::fastrtps::Publisher * publisher,
const eprosima::fastrtps::LivelinessLostStatus & status) override;


// EventListenerInterface implementation
bool
hasEvent(rmw_event_type_t /* event_type */) const override;

bool
takeNextEvent(rmw_event_type_t /* event_type */, void * /* event_data */) override;

// PubListener API
size_t subscriptionCount()
{
std::lock_guard<std::mutex> lock(internalMutex_);
return subscriptions_.size();
}

void
attachCondition(std::mutex * conditionMutex, std::condition_variable * conditionVariable)
{
std::lock_guard<std::mutex> lock(internalMutex_);
conditionMutex_ = conditionMutex;
conditionVariable_ = conditionVariable;
}

void
detachCondition()
{
std::lock_guard<std::mutex> lock(internalMutex_);
conditionMutex_ = nullptr;
conditionVariable_ = nullptr;
}

private:
std::mutex internalMutex_;
std::set<eprosima::fastrtps::rtps::GUID_t>
subscriptions_ RCPPUTILS_TSA_GUARDED_BY(internalMutex_);
std::set<eprosima::fastrtps::rtps::GUID_t> subscriptions_
RCPPUTILS_TSA_GUARDED_BY(internalMutex_);
eprosima::fastrtps::OfferedDeadlineMissedStatus offered_deadline_missed_status_
RCPPUTILS_TSA_GUARDED_BY(internalMutex_);
eprosima::fastrtps::LivelinessLostStatus liveliness_lost_status_
RCPPUTILS_TSA_GUARDED_BY(internalMutex_);
std::mutex * conditionMutex_ RCPPUTILS_TSA_GUARDED_BY(internalMutex_);
std::condition_variable * conditionVariable_ RCPPUTILS_TSA_GUARDED_BY(internalMutex_);
};

#endif // RMW_FASTRTPS_SHARED_CPP__CUSTOM_PUBLISHER_INFO_HPP_
Original file line number Diff line number Diff line change
Expand Up @@ -26,19 +26,27 @@

#include "rcpputils/thread_safety_annotations.hpp"

#include "rmw/impl/cpp/macros.hpp"

#include "rmw_fastrtps_shared_cpp/TypeSupport.hpp"
#include "rmw_fastrtps_shared_cpp/custom_event_info.hpp"


class SubListener;

typedef struct CustomSubscriberInfo
typedef struct CustomSubscriberInfo : public CustomEventInfo
{
virtual ~CustomSubscriberInfo() = default;

eprosima::fastrtps::Subscriber * subscriber_;
SubListener * listener_;
rmw_fastrtps_shared_cpp::TypeSupport * type_support_;
const char * typesupport_identifier_;

EventListenerInterface * getListener() override;
} CustomSubscriberInfo;

class SubListener : public eprosima::fastrtps::SubscriberListener
class SubListener : public EventListenerInterface, public eprosima::fastrtps::SubscriberListener
{
public:
explicit SubListener(CustomSubscriberInfo * info)
Expand All @@ -49,9 +57,10 @@ class SubListener : public eprosima::fastrtps::SubscriberListener
(void)info;
}

// SubscriberListener implementation
void
onSubscriptionMatched(
eprosima::fastrtps::Subscriber * sub, eprosima::fastrtps::rtps::MatchingInfo & info)
eprosima::fastrtps::Subscriber * sub, eprosima::fastrtps::rtps::MatchingInfo & info) override
{
(void)sub;

Expand All @@ -64,7 +73,7 @@ class SubListener : public eprosima::fastrtps::SubscriberListener
}

void
onNewDataMessage(eprosima::fastrtps::Subscriber * sub)
onNewDataMessage(eprosima::fastrtps::Subscriber * sub) override
{
(void)sub;
std::lock_guard<std::mutex> lock(internalMutex_);
Expand All @@ -81,6 +90,24 @@ class SubListener : public eprosima::fastrtps::SubscriberListener
}
}

void on_requested_deadline_missed(
eprosima::fastrtps::Subscriber *,
const eprosima::fastrtps::RequestedDeadlineMissedStatus &) override;

void on_liveliness_changed(
eprosima::fastrtps::Subscriber *,
const eprosima::fastrtps::LivelinessChangedStatus &) override;

// EventListenerInterface implementation
bool
hasEvent(rmw_event_type_t event_type) const
override;

bool
takeNextEvent(rmw_event_type_t event_type, void * event_data)
override;

// SubListener API
void
attachCondition(std::mutex * conditionMutex, std::condition_variable * conditionVariable)
{
Expand All @@ -98,7 +125,7 @@ class SubListener : public eprosima::fastrtps::SubscriberListener
}

bool
hasData()
hasData() const
{
return data_ > 0;
}
Expand All @@ -125,6 +152,11 @@ class SubListener : public eprosima::fastrtps::SubscriberListener
private:
std::mutex internalMutex_;
std::atomic_size_t data_;
eprosima::fastrtps::LivelinessChangedStatus liveliness_changed_status_
RCPPUTILS_TSA_GUARDED_BY(internalMutex_);
eprosima::fastrtps::RequestedDeadlineMissedStatus requested_deadline_missed_status_
RCPPUTILS_TSA_GUARDED_BY(internalMutex_);

std::mutex * conditionMutex_ RCPPUTILS_TSA_GUARDED_BY(internalMutex_);
std::condition_variable * conditionVariable_ RCPPUTILS_TSA_GUARDED_BY(internalMutex_);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,6 @@ get_datawriter_qos(

RMW_FASTRTPS_SHARED_CPP_PUBLIC
bool
is_time_default(
const rmw_time_t & time);

RMW_FASTRTPS_SHARED_CPP_PUBLIC
bool
is_valid_qos(
const rmw_qos_profile_t & qos_policies);
is_valid_qos(const rmw_qos_profile_t & qos_policies);

#endif // RMW_FASTRTPS_SHARED_CPP__QOS_HPP_
Loading

0 comments on commit b1bde54

Please sign in to comment.