Skip to content

Commit

Permalink
Implement 'rmw_subscription_set_on_new_message_callback' (#88)
Browse files Browse the repository at this point in the history
  • Loading branch information
mossmaurice committed Jan 4, 2024
1 parent 4936431 commit 7f8c9d3
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 9 deletions.
47 changes: 43 additions & 4 deletions rmw_iceoryx_cpp/src/rmw_subscription.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#include <string>

#include "iceoryx_posh/capro/service_description.hpp"
#include "iceoryx_posh/popo/listener.hpp"

#include "rcutils/error_handling.h"

Expand Down Expand Up @@ -188,11 +189,49 @@ rmw_ret_t rmw_subscription_set_on_new_message_callback(
rmw_event_callback_t callback,
const void * user_data)
{
RCUTILS_CHECK_ARGUMENT_FOR_NULL(subscription, RMW_RET_INVALID_ARGUMENT);
RCUTILS_CHECK_ARGUMENT_FOR_NULL(callback, RMW_RET_INVALID_ARGUMENT);
RCUTILS_CHECK_ARGUMENT_FOR_NULL(user_data, RMW_RET_INVALID_ARGUMENT);
RMW_CHECK_ARGUMENT_FOR_NULL(subscription, RMW_RET_INVALID_ARGUMENT);

return RMW_RET_UNSUPPORTED;
auto iceoryx_subscription = static_cast<IceoryxSubscription *>(subscription->data);
if (!iceoryx_subscription) {
RMW_SET_ERROR_MSG("subscription data is null");
return RMW_RET_ERROR;
}

auto iceoryx_receiver = iceoryx_subscription->iceoryx_receiver_;
if (!iceoryx_receiver) {
RMW_SET_ERROR_MSG("iceoryx_receiver is null");
return RMW_RET_ERROR;
}
const std::lock_guard<std::mutex> lock(iceoryx_subscription->mutex_);
rmw_ret_t ret = RMW_RET_ERROR;

if (callback == nullptr) {
iceoryx_subscription->listener_.detachEvent(
*(iceoryx_subscription->iceoryx_receiver_),
iox::popo::SubscriberEvent::DATA_RECEIVED);
ret = RMW_RET_OK;
return ret;
}

iceoryx_subscription->user_callback_ = callback;
iceoryx_subscription->user_data_ = user_data;
iceoryx_subscription->listener_
.attachEvent(
*(iceoryx_subscription->iceoryx_receiver_),
iox::popo::SubscriberEvent::DATA_RECEIVED,
iox::popo::createNotificationCallback(
IceoryxSubscription::onSampleReceivedCallback,
*iceoryx_subscription))
.or_else(
[&](auto) {
RMW_SET_ERROR_MSG(
"rmw_subscription_get_content_filter: Unable to attach subscriber to listener");
ret = RMW_RET_ERROR;
});

ret = RMW_RET_OK;

return ret;
}

rmw_ret_t
Expand Down
31 changes: 26 additions & 5 deletions rmw_iceoryx_cpp/src/types/iceoryx_subscription.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,12 @@
#ifndef TYPES__ICEORYX_SUBSCRIPTION_HPP_
#define TYPES__ICEORYX_SUBSCRIPTION_HPP_

#include <mutex>

#include "iceoryx_posh/popo/untyped_subscriber.hpp"
#include "iceoryx_posh/popo/listener.hpp"

#include "rmw/error_handling.h"
#include "rmw/rmw.h"
#include "rmw/types.h"

Expand All @@ -31,12 +35,29 @@ struct IceoryxSubscription
iceoryx_receiver_(iceoryx_receiver),
is_fixed_size_(rmw_iceoryx_cpp::iceoryx_is_fixed_size(type_supports)),
message_size_(rmw_iceoryx_cpp::iceoryx_get_message_size(type_supports))
{}
{
}

rosidl_message_type_support_t type_supports_;
iox::popo::UntypedSubscriber * const iceoryx_receiver_;
bool is_fixed_size_;
size_t message_size_;
};
iox::popo::UntypedSubscriber * const iceoryx_receiver_{nullptr};
bool is_fixed_size_{false};
size_t message_size_{0};
std::mutex mutex_;
/// TODO Why not having one listener for all subscriptions?
iox::popo::Listener listener_;
rmw_event_callback_t user_callback_{nullptr};
const void * user_data_{nullptr};

static void onSampleReceivedCallback(iox::popo::UntypedSubscriber *, IceoryxSubscription * self)
{
/// TODO This lock isn't needed, the listener calls are sequential, right?
const std::lock_guard<std::mutex> lock(self->mutex_);
if (self == nullptr) {
RMW_SET_ERROR_MSG("onSampleReceivedCallback: Invalid arguments");
return;
}

self->user_callback_(self->user_data_, 1);
}
};
#endif // TYPES__ICEORYX_SUBSCRIPTION_HPP_

0 comments on commit 7f8c9d3

Please sign in to comment.