Skip to content

Commit

Permalink
Reworked code to use way less shared ptr locks
Browse files Browse the repository at this point in the history
Signed-off-by: Janosch Machowinski <j.machowinski@nospam.org>
  • Loading branch information
Janosch Machowinski committed Jan 29, 2024
1 parent 4f13c4d commit ed6c50d
Show file tree
Hide file tree
Showing 5 changed files with 210 additions and 146 deletions.
26 changes: 14 additions & 12 deletions rclcpp/include/rclcpp/executors/cbg_executor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include <deque>
#include <unordered_map>

#include "rclcpp/executors/detail/any_executable_weak_ref.hpp"
#include "rclcpp/executor.hpp"
#include "rclcpp/executors/callback_group_state.hpp"
#include "rclcpp/macros.hpp"
Expand Down Expand Up @@ -48,19 +49,20 @@ class ExecutionGroup
next_unprocessed_ready_executable = 0;
}

void add_ready_executable(const Executable & e)
void add_ready_executable(AnyExecutableWeakRef & e)
{
ready_executables.push_back(e);
ready_executables.push_back(&e);
// ready_executables.push_back(std::get<const Executable>(e.executable));
}

bool has_unprocessed_executables()
{
for (; next_unprocessed_ready_executable < ready_executables.size();
next_unprocessed_ready_executable++)
{
const auto & ready_executable = ready_executables[next_unprocessed_ready_executable];
auto & ready_executable = ready_executables[next_unprocessed_ready_executable];

if (ready_executable.lock()) {
if (ready_executable->executable_alive()) {
return true;
}
}
Expand All @@ -74,7 +76,7 @@ class ExecutionGroup
{
const auto & ready_executable = ready_executables[next_unprocessed_ready_executable];

if (fill_any_executable(any_executable, ready_executable)) {
if (fill_any_executable(any_executable, std::get<const Executable>(ready_executable->executable))) {
// mark the current element as processed
next_unprocessed_ready_executable++;

Expand Down Expand Up @@ -108,7 +110,7 @@ class ExecutionGroup
return false;
}

any_executable.data = *data;
// any_executable.data = *data;

return true;
//RCUTILS_LOG_INFO("Executing timer");
Expand Down Expand Up @@ -142,7 +144,7 @@ class ExecutionGroup
return false;
}

std::vector<Executable> ready_executables;
std::vector<AnyExecutableWeakRef *> ready_executables;
size_t next_unprocessed_ready_executable = 0;

};
Expand All @@ -166,11 +168,11 @@ class CallbackGroupScheduler

void clear_and_prepare(const CallbackGroupState & cb_elements);

void add_ready_executable(const rclcpp::SubscriptionBase::WeakPtr & executable);
void add_ready_executable(const rclcpp::ServiceBase::WeakPtr & executable);
void add_ready_executable(const rclcpp::TimerBase::WeakPtr & executable);
void add_ready_executable(const rclcpp::ClientBase::WeakPtr & executable);
void add_ready_executable(const rclcpp::Waitable::WeakPtr & executable);
void add_ready_timer(AnyExecutableWeakRef & executable);
void add_ready_subscription(AnyExecutableWeakRef & executable);
void add_ready_service(AnyExecutableWeakRef & executable);
void add_ready_client(AnyExecutableWeakRef & executable);
void add_ready_waitable(AnyExecutableWeakRef & executable);

enum Priorities
{
Expand Down
134 changes: 128 additions & 6 deletions rclcpp/include/rclcpp/executors/detail/any_executable_weak_ref.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#include "rclcpp/client.hpp"
#include "rclcpp/waitable.hpp"
#include "rclcpp/guard_condition.hpp"
#include "rclcpp/executors/callback_group_state.hpp"

namespace rclcpp::executors
{
Expand All @@ -17,27 +18,72 @@ struct AnyExecutableWeakRef
AnyExecutableWeakRef(const rclcpp::SubscriptionBase::WeakPtr & p, int16_t callback_group_index)
: executable(p),
callback_group_index(callback_group_index)
{}
{
if(auto shr = p.lock())
{
rcl_handle_shr_ptr = shr->get_subscription_handle();
}
else
{
rcl_handle_shr_ptr = std::monostate();
}
}

AnyExecutableWeakRef(const rclcpp::TimerBase::WeakPtr & p, int16_t callback_group_index)
: executable(p),
callback_group_index(callback_group_index)
{}
{
if(auto shr = p.lock())
{
rcl_handle_shr_ptr = shr->get_timer_handle();
}
else
{
rcl_handle_shr_ptr = std::monostate();
}
}

AnyExecutableWeakRef(const rclcpp::ServiceBase::WeakPtr & p, int16_t callback_group_index)
: executable(p),
callback_group_index(callback_group_index)
{}
{
if(auto shr = p.lock())
{
rcl_handle_shr_ptr = shr->get_service_handle();
}
else
{
rcl_handle_shr_ptr = std::monostate();
}
}

AnyExecutableWeakRef(const rclcpp::ClientBase::WeakPtr & p, int16_t callback_group_index)
: executable(p),
callback_group_index(callback_group_index)
{}
{
if(auto shr = p.lock())
{
rcl_handle_shr_ptr = shr->get_client_handle();
}
else
{
rcl_handle_shr_ptr = std::monostate();
}
}

AnyExecutableWeakRef(const rclcpp::Waitable::WeakPtr & p, int16_t callback_group_index)
: executable(p),
callback_group_index(callback_group_index)
{}
{
if(auto shr = p.lock())
{
rcl_handle_shr_ptr = shr;
}
else
{
rcl_handle_shr_ptr = std::monostate();
}
}

AnyExecutableWeakRef(
const rclcpp::GuardCondition::WeakPtr & p,
Expand All @@ -49,6 +95,76 @@ struct AnyExecutableWeakRef
{
//special case, guard conditions are auto processed by waking up the wait set
// therefore they shall never create a real executable

{
if(auto shr = p.lock())
{
rcl_handle_shr_ptr = shr;
}
else
{
rcl_handle_shr_ptr = std::monostate();
}
}


}

/**
* Checks, if the executable still exists, or if was deleted
*/
bool executable_alive()
{
auto check_valid = [this] (const auto &shr_ptr)
{
auto use_cnt = shr_ptr.use_count();
if(use_cnt <= 1)
{
rcl_handle_shr_ptr = std::monostate();
return false;
}

return true;
};

switch(rcl_handle_shr_ptr.index()) {
case AnyExecutableWeakRef::ExecutableIndex::Subscription:
{
return check_valid(std::get<std::shared_ptr<rcl_subscription_t>>(rcl_handle_shr_ptr));
}
break;
case AnyExecutableWeakRef::ExecutableIndex::Timer:
{
return check_valid(std::get<std::shared_ptr<const rcl_timer_t>>(rcl_handle_shr_ptr));
}
break;
case AnyExecutableWeakRef::ExecutableIndex::Service:
{
return check_valid(std::get<std::shared_ptr<rcl_service_t>>(rcl_handle_shr_ptr));
}
break;
case AnyExecutableWeakRef::ExecutableIndex::Client:
{
return check_valid(std::get<std::shared_ptr<rcl_client_t>>(rcl_handle_shr_ptr));
}
break;
case AnyExecutableWeakRef::ExecutableIndex::Waitable:
{
return check_valid(std::get<std::shared_ptr<rclcpp::Waitable>>(rcl_handle_shr_ptr));
}
break;
case AnyExecutableWeakRef::ExecutableIndex::GuardCondition:
{
return check_valid(std::get<std::shared_ptr<rclcpp::GuardCondition>>(rcl_handle_shr_ptr));
}
case AnyExecutableWeakRef::ExecutableIndex::Deleted:
{
return false;
}
break;

}
return false;
}

AnyExecutableWeakRef(const AnyExecutableWeakRef &) = delete;
Expand All @@ -69,10 +185,16 @@ struct AnyExecutableWeakRef
Client,
Waitable,
GuardCondition,
Deleted,
};

// shared_ptr holding the rcl handle during wait
std::shared_ptr<const void> rcl_handle_shr_ptr;

using RclHandleVariant = std::variant<std::shared_ptr<rcl_subscription_t>, std::shared_ptr<const rcl_timer_t>,
std::shared_ptr<rcl_service_t>, std::shared_ptr<rcl_client_t>,
rclcpp::Waitable::SharedPtr, rclcpp::GuardCondition::SharedPtr, std::monostate>;

RclHandleVariant rcl_handle_shr_ptr;

// A function that should be executed if the executable is a guard condition and ready
std::function<void(void)> handle_guard_condition_fun;
Expand Down
Loading

0 comments on commit ed6c50d

Please sign in to comment.