Skip to content

Commit

Permalink
Revert "Add listener callbacks to newly added events"
Browse files Browse the repository at this point in the history
This reverts commit 67b20ae.
  • Loading branch information
Jeffery Hsu committed Sep 1, 2022
1 parent c805abe commit af7155c
Show file tree
Hide file tree
Showing 9 changed files with 161 additions and 154 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@

#include "rmw/event_callback_type.h"

#include "rmw_fastrtps_shared_cpp/custom_event_info.hpp"
#include "rmw_fastrtps_shared_cpp/TypeSupport.hpp"

class ClientListener;
Expand Down Expand Up @@ -129,7 +128,13 @@ class ClientListener : public eprosima::fastdds::dds::DataReaderListener
list_has_data_.store(true);
}

on_data_available_.call();
std::unique_lock<std::mutex> lock_mutex(on_new_response_m_);

if (on_new_response_cb_) {
on_new_response_cb_(user_data_, 1);
} else {
unread_count_++;
}
}
}
}
Expand Down Expand Up @@ -193,7 +198,20 @@ class ClientListener : public eprosima::fastdds::dds::DataReaderListener
const void * user_data,
rmw_event_callback_t callback)
{
on_data_available_.set_callback(user_data, callback);
std::unique_lock<std::mutex> lock_mutex(on_new_response_m_);

if (callback) {
// Push events arrived before setting the the executor callback
if (unread_count_) {
callback(user_data, unread_count_);
unread_count_ = 0;
}
user_data_ = user_data;
on_new_response_cb_ = callback;
} else {
user_data_ = nullptr;
on_new_response_cb_ = nullptr;
}
}

private:
Expand All @@ -216,8 +234,10 @@ class ClientListener : public eprosima::fastdds::dds::DataReaderListener
std::condition_variable * conditionVariable_ RCPPUTILS_TSA_GUARDED_BY(internalMutex_);
std::set<eprosima::fastrtps::rtps::GUID_t> publishers_;

// Callback to call when the listener detects events
EventTypeCallback on_data_available_;
rmw_event_callback_t on_new_response_cb_{nullptr};
const void * user_data_{nullptr};
std::mutex on_new_response_m_;
uint64_t unread_count_ = 0;
};

class ClientPubListener : public eprosima::fastdds::dds::DataWriterListener
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,14 @@ class EventListenerInterface

// Provide handlers to perform an action when a
// new event from this listener has ocurred
virtual rmw_ret_t set_on_new_event_callback(
rmw_event_type_t event_type,
virtual void set_on_new_event_callback(
const void * user_data,
rmw_event_callback_t callback) = 0;

rmw_event_callback_t on_new_event_cb_{nullptr};
const void * user_data_{nullptr};
uint64_t unread_events_count_ = 0;
std::mutex on_new_event_m_;
};

class EventListenerInterface::ConditionalScopedLock
Expand Down Expand Up @@ -101,51 +105,4 @@ struct CustomEventInfo
virtual EventListenerInterface * getListener() const = 0;
};

class EventTypeCallback
{
public:
EventTypeCallback() = default;

EventTypeCallback(size_t depth)
{
history_depth_ = (depth > 0) ? depth : std::numeric_limits<size_t>::max();
}

void set_callback(const void * user_data, rmw_event_callback_t callback)
{
std::lock_guard<std::mutex> lock(mutex_);

if (callback) {
if (unread_count_) {
size_t count = std::min(unread_count_, history_depth_);
callback(user_data, count);
unread_count_ = 0;
}
user_data_ = user_data;
callback_ = callback;
} else {
user_data_ = nullptr;
callback_ = nullptr;
}
}

void call()
{
std::lock_guard<std::mutex> lock(mutex_);

if (callback_) {
callback_(user_data_, 1);
} else {
unread_count_++;
}
}

private:
std::mutex mutex_;
rmw_event_callback_t callback_{nullptr};
const void * user_data_{nullptr};
size_t unread_count_{0};
size_t history_depth_ = std::numeric_limits<size_t>::max();
};

#endif // RMW_FASTRTPS_SHARED_CPP__CUSTOM_EVENT_INFO_HPP_
Original file line number Diff line number Diff line change
Expand Up @@ -107,9 +107,7 @@ class PubListener : public EventListenerInterface, public eprosima::fastdds::dds
hasEvent(rmw_event_type_t event_type) const final;

RMW_FASTRTPS_SHARED_CPP_PUBLIC
rmw_ret_t
set_on_new_event_callback(
rmw_event_type_t event_type,
void set_on_new_event_callback(
const void * user_data,
rmw_event_callback_t callback) final;

Expand Down Expand Up @@ -160,11 +158,6 @@ class PubListener : public EventListenerInterface, public eprosima::fastdds::dds

std::mutex * conditionMutex_ RCPPUTILS_TSA_GUARDED_BY(internalMutex_);
std::condition_variable * conditionVariable_ RCPPUTILS_TSA_GUARDED_BY(internalMutex_);

// The callbacks to call when the listener detects events
EventTypeCallback on_liveliness_lost_;
EventTypeCallback on_offered_deadline_missed_;
EventTypeCallback on_offered_incompatible_qos_;
};

#endif // RMW_FASTRTPS_SHARED_CPP__CUSTOM_PUBLISHER_INFO_HPP_
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@

#include "rmw/event_callback_type.h"

#include "rmw_fastrtps_shared_cpp/custom_event_info.hpp"
#include "rmw_fastrtps_shared_cpp/guid_utils.hpp"
#include "rmw_fastrtps_shared_cpp/TypeSupport.hpp"

Expand Down Expand Up @@ -250,7 +249,13 @@ class ServiceListener : public eprosima::fastdds::dds::DataReaderListener
list_has_data_.store(true);
}

on_data_available_.call();
std::unique_lock<std::mutex> lock_mutex(on_new_request_m_);

if (on_new_request_cb_) {
on_new_request_cb_(user_data_, 1);
} else {
unread_count_++;
}
}
}
}
Expand Down Expand Up @@ -308,7 +313,20 @@ class ServiceListener : public eprosima::fastdds::dds::DataReaderListener
const void * user_data,
rmw_event_callback_t callback)
{
on_data_available_.set_callback(user_data, callback);
std::unique_lock<std::mutex> lock_mutex(on_new_request_m_);

if (callback) {
// Push events arrived before setting the the executor callback
if (unread_count_) {
callback(user_data, unread_count_);
unread_count_ = 0;
}
user_data_ = user_data;
on_new_request_cb_ = callback;
} else {
user_data_ = nullptr;
on_new_request_cb_ = nullptr;
}
}

private:
Expand All @@ -319,8 +337,10 @@ class ServiceListener : public eprosima::fastdds::dds::DataReaderListener
std::mutex * conditionMutex_ RCPPUTILS_TSA_GUARDED_BY(internalMutex_);
std::condition_variable * conditionVariable_ RCPPUTILS_TSA_GUARDED_BY(internalMutex_);

// Callback to call when the listener detects events
EventTypeCallback on_data_available_;
rmw_event_callback_t on_new_request_cb_{nullptr};
const void * user_data_{nullptr};
std::mutex on_new_request_m_;
uint64_t unread_count_ = 0;
};

#endif // RMW_FASTRTPS_SHARED_CPP__CUSTOM_SERVICE_INFO_HPP_
Original file line number Diff line number Diff line change
Expand Up @@ -91,9 +91,9 @@ class SubListener : public EventListenerInterface, public eprosima::fastdds::dds
sample_lost_changes_(false),
incompatible_qos_changes_(false),
conditionMutex_(nullptr),
conditionVariable_(nullptr),
on_data_available_(qos_depth)
conditionVariable_(nullptr)
{
qos_depth_ = (qos_depth > 0) ? qos_depth : std::numeric_limits<size_t>::max();
// Field is not used right now
(void)info;
}
Expand All @@ -115,13 +115,19 @@ class SubListener : public EventListenerInterface, public eprosima::fastdds::dds
update_has_data(reader);
}

RMW_FASTRTPS_SHARED_CPP_PUBLIC
void
on_data_available(eprosima::fastdds::dds::DataReader * reader) final;
on_data_available(eprosima::fastdds::dds::DataReader * reader) final
{
update_has_data(reader);

RMW_FASTRTPS_SHARED_CPP_PUBLIC
void
set_on_data_available_callback(const void * user_data, rmw_event_callback_t callback);
std::unique_lock<std::mutex> lock_mutex(on_new_message_m_);

if (on_new_message_cb_) {
on_new_message_cb_(user_data_, 1);
} else {
new_data_unread_count_++;
}
}

RMW_FASTRTPS_SHARED_CPP_PUBLIC
void
Expand Down Expand Up @@ -153,9 +159,7 @@ class SubListener : public EventListenerInterface, public eprosima::fastdds::dds
hasEvent(rmw_event_type_t event_type) const final;

RMW_FASTRTPS_SHARED_CPP_PUBLIC
rmw_ret_t
set_on_new_event_callback(
rmw_event_type_t event_type,
void set_on_new_event_callback(
const void * user_data,
rmw_event_callback_t callback) final;

Expand Down Expand Up @@ -205,6 +209,30 @@ class SubListener : public EventListenerInterface, public eprosima::fastdds::dds
return publishers_.size();
}

// Provide handlers to perform an action when a
// new event from this listener has ocurred
void
set_on_new_message_callback(
const void * user_data,
rmw_event_callback_t callback)
{
std::unique_lock<std::mutex> lock_mutex(on_new_message_m_);

if (callback) {
// Push events arrived before setting the executor's callback
if (new_data_unread_count_) {
auto unread_count = std::min(new_data_unread_count_, qos_depth_);
callback(user_data, unread_count);
new_data_unread_count_ = 0;
}
user_data_ = user_data;
on_new_message_cb_ = callback;
} else {
user_data_ = nullptr;
on_new_message_cb_ = nullptr;
}
}

private:
mutable std::mutex internalMutex_;

Expand All @@ -231,12 +259,10 @@ class SubListener : public EventListenerInterface, public eprosima::fastdds::dds

std::set<eprosima::fastrtps::rtps::GUID_t> publishers_ RCPPUTILS_TSA_GUARDED_BY(internalMutex_);

// The callbacks to call when the listener detects events
EventTypeCallback on_sample_lost_;
EventTypeCallback on_data_available_;
EventTypeCallback on_liveliness_changed_;
EventTypeCallback on_requested_deadline_missed_;
EventTypeCallback on_requested_incompatible_qos_;
rmw_event_callback_t on_new_message_cb_{nullptr};
std::mutex on_new_message_m_;
size_t qos_depth_;
size_t new_data_unread_count_ = 0;
};

#endif // RMW_FASTRTPS_SHARED_CPP__CUSTOM_SUBSCRIBER_INFO_HPP_
52 changes: 28 additions & 24 deletions rmw_fastrtps_shared_cpp/src/custom_publisher_info.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#include "rmw/error_handling.h"

#include "rmw_fastrtps_shared_cpp/custom_publisher_info.hpp"

#include "fastdds/dds/core/status/BaseStatus.hpp"
Expand Down Expand Up @@ -46,7 +44,13 @@ PubListener::on_offered_deadline_missed(

deadline_changes_.store(true, std::memory_order_relaxed);

on_offered_deadline_missed_.call();
std::unique_lock<std::mutex> lock_mutex(on_new_event_m_);

if (on_new_event_cb_) {
on_new_event_cb_(user_data_, 1);
} else {
unread_events_count_++;
}
}

void PubListener::on_liveliness_lost(
Expand All @@ -66,7 +70,13 @@ void PubListener::on_liveliness_lost(

liveliness_changes_.store(true, std::memory_order_relaxed);

on_liveliness_lost_.call();
std::unique_lock<std::mutex> lock_mutex(on_new_event_m_);

if (on_new_event_cb_) {
on_new_event_cb_(user_data_, 1);
} else {
unread_events_count_++;
}
}

void PubListener::on_offered_incompatible_qos(
Expand All @@ -86,8 +96,6 @@ void PubListener::on_offered_incompatible_qos(
incompatible_qos_status_.total_count_change += status.total_count_change;

incompatible_qos_changes_.store(true, std::memory_order_relaxed);

on_offered_incompatible_qos_.call();
}

bool PubListener::hasEvent(rmw_event_type_t event_type) const
Expand All @@ -106,28 +114,24 @@ bool PubListener::hasEvent(rmw_event_type_t event_type) const
return false;
}

rmw_ret_t PubListener::set_on_new_event_callback(
rmw_event_type_t event_type,
void PubListener::set_on_new_event_callback(
const void * user_data,
rmw_event_callback_t callback)
{
switch (event_type)
{
case RMW_EVENT_LIVELINESS_LOST:
on_liveliness_lost_.set_callback(user_data, callback);
break;
case RMW_EVENT_OFFERED_DEADLINE_MISSED:
on_offered_deadline_missed_.set_callback(user_data, callback);
break;
case RMW_EVENT_OFFERED_QOS_INCOMPATIBLE:
on_offered_incompatible_qos_.set_callback(user_data, callback);
break;
default:
RMW_SET_ERROR_MSG("provided event_type is not supported");
return RMW_RET_UNSUPPORTED;
break;
std::unique_lock<std::mutex> lock_mutex(on_new_event_m_);

if (callback) {
// Push events arrived before setting the executor's callback
if (unread_events_count_) {
callback(user_data, unread_events_count_);
unread_events_count_ = 0;
}
user_data_ = user_data;
on_new_event_cb_ = callback;
} else {
user_data_ = nullptr;
on_new_event_cb_ = nullptr;
}
return RMW_RET_OK;
}

bool PubListener::takeNextEvent(rmw_event_type_t event_type, void * event_info)
Expand Down
Loading

0 comments on commit af7155c

Please sign in to comment.