Skip to content

Commit

Permalink
Revert "Revert "update rclcpp::Waitable API to use references and con…
Browse files Browse the repository at this point in the history
…st (ros2#2467)""

This reverts commit 05cb182.
  • Loading branch information
CursedRock17 committed Apr 2, 2024
1 parent 05cb182 commit 5242d5b
Show file tree
Hide file tree
Showing 37 changed files with 246 additions and 140 deletions.
6 changes: 3 additions & 3 deletions rclcpp/include/rclcpp/event_handler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -117,12 +117,12 @@ class EventHandlerBase : public Waitable
/// Add the Waitable to a wait set.
RCLCPP_PUBLIC
void
add_to_wait_set(rcl_wait_set_t * wait_set) override;
add_to_wait_set(rcl_wait_set_t & wait_set) override;

/// Check if the Waitable is ready.
RCLCPP_PUBLIC
bool
is_ready(rcl_wait_set_t * wait_set) override;
is_ready(const rcl_wait_set_t & wait_set) override;

/// Set a callback to be called when each new event instance occurs.
/**
Expand Down Expand Up @@ -294,7 +294,7 @@ class EventHandler : public EventHandlerBase

/// Execute any entities of the Waitable that are ready.
void
execute(std::shared_ptr<void> & data) override
execute(const std::shared_ptr<void> & data) override
{
if (!data) {
throw std::runtime_error("'data' is empty");
Expand Down
6 changes: 3 additions & 3 deletions rclcpp/include/rclcpp/executors/executor_notify_waitable.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ class ExecutorNotifyWaitable : public rclcpp::Waitable
*/
RCLCPP_PUBLIC
void
add_to_wait_set(rcl_wait_set_t * wait_set) override;
add_to_wait_set(rcl_wait_set_t & wait_set) override;

/// Check conditions against the wait set
/**
Expand All @@ -69,7 +69,7 @@ class ExecutorNotifyWaitable : public rclcpp::Waitable
*/
RCLCPP_PUBLIC
bool
is_ready(rcl_wait_set_t * wait_set) override;
is_ready(const rcl_wait_set_t & wait_set) override;

/// Perform work associated with the waitable.
/**
Expand All @@ -78,7 +78,7 @@ class ExecutorNotifyWaitable : public rclcpp::Waitable
*/
RCLCPP_PUBLIC
void
execute(std::shared_ptr<void> & data) override;
execute(const std::shared_ptr<void> & data) override;

/// Retrieve data to be used in the next execute call.
/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,23 +132,22 @@ class SubscriptionIntraProcess
);
}

void execute(std::shared_ptr<void> & data) override
void execute(const std::shared_ptr<void> & data) override
{
execute_impl<SubscribedType>(data);
}

protected:
template<typename T>
typename std::enable_if<std::is_same<T, rcl_serialized_message_t>::value, void>::type
execute_impl(std::shared_ptr<void> & data)
execute_impl(const std::shared_ptr<void> &)
{
(void)data;
throw std::runtime_error("Subscription intra-process can't handle serialized messages");
}

template<class T>
typename std::enable_if<!std::is_same<T, rcl_serialized_message_t>::value, void>::type
execute_impl(std::shared_ptr<void> & data)
execute_impl(const std::shared_ptr<void> & data)
{
if (!data) {
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ class SubscriptionIntraProcessBase : public rclcpp::Waitable

RCLCPP_PUBLIC
void
add_to_wait_set(rcl_wait_set_t * wait_set) override;
add_to_wait_set(rcl_wait_set_t & wait_set) override;

RCLCPP_PUBLIC
virtual
Expand All @@ -72,7 +72,7 @@ class SubscriptionIntraProcessBase : public rclcpp::Waitable
is_durability_transient_local() const;

bool
is_ready(rcl_wait_set_t * wait_set) override = 0;
is_ready(const rcl_wait_set_t & wait_set) override = 0;

std::shared_ptr<void>
take_data() override = 0;
Expand All @@ -85,7 +85,7 @@ class SubscriptionIntraProcessBase : public rclcpp::Waitable
}

void
execute(std::shared_ptr<void> & data) override = 0;
execute(const std::shared_ptr<void> & data) override = 0;

virtual
bool
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,16 +101,16 @@ class SubscriptionIntraProcessBuffer : public SubscriptionROSMsgIntraProcessBuff
}

void
add_to_wait_set(rcl_wait_set_t * wait_set) override
add_to_wait_set(rcl_wait_set_t & wait_set) override
{
if (this->buffer_->has_data()) {
this->trigger_guard_condition();
}
detail::add_guard_condition_to_rcl_wait_set(*wait_set, this->gc_);
detail::add_guard_condition_to_rcl_wait_set(wait_set, this->gc_);
}

bool
is_ready(rcl_wait_set_t * wait_set) override
is_ready(const rcl_wait_set_t & wait_set) override
{
(void) wait_set;
return buffer_->has_data();
Expand Down
6 changes: 4 additions & 2 deletions rclcpp/include/rclcpp/guard_condition.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ class GuardCondition
*/
RCLCPP_PUBLIC
void
add_to_wait_set(rcl_wait_set_t * wait_set);
add_to_wait_set(rcl_wait_set_t & wait_set);

/// Set a callback to be called whenever the guard condition is triggered.
/**
Expand Down Expand Up @@ -128,7 +128,9 @@ class GuardCondition
std::recursive_mutex reentrant_mutex_;
std::function<void(size_t)> on_trigger_callback_{nullptr};
size_t unread_count_{0};
rcl_wait_set_t * wait_set_{nullptr};
// the type of wait_set_ is actually rcl_wait_set_t *, but it's never
// dereferenced, only compared to, so make it void * to avoid accidental use
void * wait_set_{nullptr};
};

} // namespace rclcpp
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ class AllocatorMemoryStrategy : public memory_strategy::MemoryStrategy
}
}
for (size_t i = 0; i < waitable_handles_.size(); ++i) {
if (waitable_handles_[i]->is_ready(wait_set)) {
if (waitable_handles_[i]->is_ready(*wait_set)) {
waitable_triggered_handles_.emplace_back(std::move(waitable_handles_[i]));
}
}
Expand Down Expand Up @@ -235,7 +235,7 @@ class AllocatorMemoryStrategy : public memory_strategy::MemoryStrategy
}

for (const std::shared_ptr<Waitable> & waitable : waitable_handles_) {
waitable->add_to_wait_set(wait_set);
waitable->add_to_wait_set(*wait_set);
}
return true;
}
Expand Down
2 changes: 1 addition & 1 deletion rclcpp/include/rclcpp/wait_result.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,7 @@ class WaitResult final
auto rcl_wait_set = wait_set.get_rcl_wait_set();
while (next_waitable_index_ < wait_set.size_of_waitables()) {
auto cur_waitable = wait_set.waitables(next_waitable_index_++);
if (cur_waitable != nullptr && cur_waitable->is_ready(&rcl_wait_set)) {
if (cur_waitable != nullptr && cur_waitable->is_ready(rcl_wait_set)) {
waitable = cur_waitable;
break;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -375,7 +375,7 @@ class StoragePolicyCommon
needs_pruning_ = true;
continue;
}
waitable_entry.waitable->add_to_wait_set(&rcl_wait_set_);
waitable_entry.waitable->add_to_wait_set(rcl_wait_set_);
}
}

Expand Down
57 changes: 54 additions & 3 deletions rclcpp/include/rclcpp/waitable.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,23 @@ class Waitable
size_t
get_number_of_ready_guard_conditions();

#ifdef __GNUC__
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Woverloaded-virtual"
#endif
/// Deprecated.
/**
* \sa add_to_wait_set(rcl_wait_set_t &)
*/
[[deprecated("Use add_to_wait_set(rcl_wait_set_t & wait_set) signature")]]
RCLCPP_PUBLIC
virtual
void
add_to_wait_set(rcl_wait_set_t * wait_set);
#ifdef __GNUC__
#pragma GCC diagnostic pop
#endif

/// Add the Waitable to a wait set.
/**
* \param[in] wait_set A handle to the wait set to add the Waitable to.
Expand All @@ -109,7 +126,24 @@ class Waitable
RCLCPP_PUBLIC
virtual
void
add_to_wait_set(rcl_wait_set_t * wait_set) = 0;
add_to_wait_set(rcl_wait_set_t & wait_set);

#ifdef __GNUC__
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Woverloaded-virtual"
#endif
/// Deprecated.
/**
* \sa is_ready(const rcl_wait_set_t &)
*/
[[deprecated("Use is_ready(const rcl_wait_set_t & wait_set) signature")]]
RCLCPP_PUBLIC
virtual
bool
is_ready(rcl_wait_set_t * wait_set);
#ifdef __GNUC__
#pragma GCC diagnostic pop
#endif

/// Check if the Waitable is ready.
/**
Expand All @@ -124,7 +158,7 @@ class Waitable
RCLCPP_PUBLIC
virtual
bool
is_ready(rcl_wait_set_t * wait_set) = 0;
is_ready(const rcl_wait_set_t & wait_set);

/// Take the data so that it can be consumed with `execute`.
/**
Expand Down Expand Up @@ -178,6 +212,23 @@ class Waitable
std::shared_ptr<void>
take_data_by_entity_id(size_t id);

#ifdef __GNUC__
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Woverloaded-virtual"
#endif
/// Deprecated.
/**
* \sa execute(const std::shared_ptr<void> &)
*/
[[deprecated("Use execute(const std::shared_ptr<void> & data) signature")]]
RCLCPP_PUBLIC
virtual
void
execute(std::shared_ptr<void> & data);
#ifdef __GNUC__
#pragma GCC diagnostic pop
#endif

/// Execute data that is passed in.
/**
* Before calling this method, the Waitable should be added to a wait set,
Expand All @@ -203,7 +254,7 @@ class Waitable
RCLCPP_PUBLIC
virtual
void
execute(std::shared_ptr<void> & data) = 0;
execute(const std::shared_ptr<void> & data);

/// Exchange the "in use by wait set" state for this timer.
/**
Expand Down
8 changes: 4 additions & 4 deletions rclcpp/src/rclcpp/event_handler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,19 +56,19 @@ EventHandlerBase::get_number_of_ready_events()

/// Add the Waitable to a wait set.
void
EventHandlerBase::add_to_wait_set(rcl_wait_set_t * wait_set)
EventHandlerBase::add_to_wait_set(rcl_wait_set_t & wait_set)
{
rcl_ret_t ret = rcl_wait_set_add_event(wait_set, &event_handle_, &wait_set_event_index_);
rcl_ret_t ret = rcl_wait_set_add_event(&wait_set, &event_handle_, &wait_set_event_index_);
if (RCL_RET_OK != ret) {
exceptions::throw_from_rcl_error(ret, "Couldn't add event to wait set");
}
}

/// Check if the Waitable is ready.
bool
EventHandlerBase::is_ready(rcl_wait_set_t * wait_set)
EventHandlerBase::is_ready(const rcl_wait_set_t & wait_set)
{
return wait_set->events[wait_set_event_index_] == &event_handle_;
return wait_set.events[wait_set_event_index_] == &event_handle_;
}

void
Expand Down
3 changes: 2 additions & 1 deletion rclcpp/src/rclcpp/executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -398,7 +398,8 @@ Executor::execute_any_executable(AnyExecutable & any_exec)
execute_client(any_exec.client);
}
if (any_exec.waitable) {
any_exec.waitable->execute(any_exec.data);
const std::shared_ptr<void> & const_data = any_exec.data;
any_exec.waitable->execute(const_data);
}

// Reset the callback_group, regardless of type
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ ready_executables(
if (!waitable) {
continue;
}
if (!waitable->is_ready(&rcl_wait_set)) {
if (!waitable->is_ready(rcl_wait_set)) {
continue;
}
auto group_info = group_cache(entry.callback_group);
Expand Down
13 changes: 6 additions & 7 deletions rclcpp/src/rclcpp/executors/executor_notify_waitable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,16 +45,15 @@ ExecutorNotifyWaitable & ExecutorNotifyWaitable::operator=(ExecutorNotifyWaitabl
}

void
ExecutorNotifyWaitable::add_to_wait_set(rcl_wait_set_t * wait_set)
ExecutorNotifyWaitable::add_to_wait_set(rcl_wait_set_t & wait_set)
{
std::lock_guard<std::mutex> lock(guard_condition_mutex_);
for (auto weak_guard_condition : this->notify_guard_conditions_) {
auto guard_condition = weak_guard_condition.lock();
if (!guard_condition) {continue;}

rcl_guard_condition_t * cond = &guard_condition->get_rcl_guard_condition();

rcl_ret_t ret = rcl_wait_set_add_guard_condition(wait_set, cond, NULL);
rcl_ret_t ret = rcl_wait_set_add_guard_condition(&wait_set, cond, NULL);

if (RCL_RET_OK != ret) {
rclcpp::exceptions::throw_from_rcl_error(
Expand All @@ -64,13 +63,13 @@ ExecutorNotifyWaitable::add_to_wait_set(rcl_wait_set_t * wait_set)
}

bool
ExecutorNotifyWaitable::is_ready(rcl_wait_set_t * wait_set)
ExecutorNotifyWaitable::is_ready(const rcl_wait_set_t & wait_set)
{
std::lock_guard<std::mutex> lock(guard_condition_mutex_);

bool any_ready = false;
for (size_t ii = 0; ii < wait_set->size_of_guard_conditions; ++ii) {
const auto * rcl_guard_condition = wait_set->guard_conditions[ii];
for (size_t ii = 0; ii < wait_set.size_of_guard_conditions; ++ii) {
const auto * rcl_guard_condition = wait_set.guard_conditions[ii];

if (nullptr == rcl_guard_condition) {
continue;
Expand All @@ -87,7 +86,7 @@ ExecutorNotifyWaitable::is_ready(rcl_wait_set_t * wait_set)
}

void
ExecutorNotifyWaitable::execute(std::shared_ptr<void> & data)
ExecutorNotifyWaitable::execute(const std::shared_ptr<void> & data)
{
(void) data;
this->execute_callback_();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ bool StaticSingleThreadedExecutor::execute_ready_executables(
while (auto waitable = wait_result.next_ready_waitable()) {
auto entity_iter = collection.waitables.find(waitable.get());
if (entity_iter != collection.waitables.end()) {
auto data = waitable->take_data();
const auto data = waitable->take_data();
waitable->execute(data);
any_ready_executable = true;
if (spin_once) {return any_ready_executable;}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -339,7 +339,7 @@ EventsExecutor::execute_event(const ExecutorEvent & event)
}
if (waitable) {
for (size_t i = 0; i < event.num_events; i++) {
auto data = waitable->take_data_by_entity_id(event.waitable_data);
const auto data = waitable->take_data_by_entity_id(event.waitable_data);
waitable->execute(data);
}
}
Expand Down
Loading

0 comments on commit 5242d5b

Please sign in to comment.