Skip to content

Commit

Permalink
Clean up StaticMemoryStrategy, implement method chaining for ObjectPo…
Browse files Browse the repository at this point in the history
…olBounds struct
  • Loading branch information
Jackie Kay committed Aug 13, 2015
1 parent e57b049 commit 872edfc
Show file tree
Hide file tree
Showing 5 changed files with 163 additions and 55 deletions.
15 changes: 5 additions & 10 deletions rclcpp/include/rclcpp/executor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,7 @@ class Executor
"[rclcpp::error] take failed for subscription on topic '%s': %s\n",
subscription->get_topic_name().c_str(), rmw_get_error_string_safe());
}
subscription->return_message(message);
}

static void
Expand Down Expand Up @@ -791,12 +792,7 @@ class Executor
AnyExecutable::SharedPtr
get_next_ready_executable()
{
return get_next_ready_executable(this->memory_strategy_->instantiate_next_executable());
}

AnyExecutable::SharedPtr
get_next_ready_executable(AnyExecutable::SharedPtr any_exec)
{
auto any_exec = this->memory_strategy_->instantiate_next_executable();
// Check the timers to see if there are any that are ready, if so return
get_next_timer(any_exec);
if (any_exec->timer) {
Expand All @@ -817,9 +813,8 @@ class Executor
if (any_exec->client) {
return any_exec;
}
// If there is neither a ready timer nor subscription, return a null ptr
any_exec.reset();
return any_exec;
// If there is no ready executable, return a null ptr
return std::shared_ptr<AnyExecutable>();
}

template<typename T = std::milli>
Expand All @@ -842,7 +837,7 @@ class Executor
if (any_exec) {
// If it is valid, check to see if the group is mutually exclusive or
// not, then mark it accordingly
if (any_exec->callback_group->type_ == \
if (any_exec->callback_group && any_exec->callback_group->type_ == \
callback_group::CallbackGroupType::MutuallyExclusive)
{
// It should not have been taken otherwise
Expand Down
2 changes: 1 addition & 1 deletion rclcpp/include/rclcpp/memory_strategy.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ class MemoryStrategy

virtual executor::AnyExecutable::SharedPtr instantiate_next_executable()
{
return executor::AnyExecutable::SharedPtr(new executor::AnyExecutable);
return std::make_shared<executor::AnyExecutable>();
}

virtual void * alloc(size_t size)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ namespace strategies
namespace message_pool_memory_strategy
{

template<typename MessageT, size_t size,
template<typename MessageT, size_t Size,
typename std::enable_if<rosidl_generator_traits::has_fixed_size<MessageT>::value>::type * =
nullptr>
class MessagePoolMemoryStrategy
Expand All @@ -36,7 +36,7 @@ class MessagePoolMemoryStrategy
MessagePoolMemoryStrategy()
: next_array_index_(0)
{
for (size_t i = 0; i < size; ++i) {
for (size_t i = 0; i < Size; ++i) {
pool_[i].msg_ptr_ = std::make_shared<MessageT>();
pool_[i].used = false;
}
Expand All @@ -45,7 +45,7 @@ class MessagePoolMemoryStrategy
std::shared_ptr<MessageT> borrow_message()
{
size_t current_index = next_array_index_;
next_array_index_ = (next_array_index_ + 1) % size;
next_array_index_ = (next_array_index_ + 1) % Size;
if (pool_[current_index].used) {
throw std::runtime_error("Tried to access message that was still in use! Abort.");
}
Expand All @@ -58,7 +58,7 @@ class MessagePoolMemoryStrategy

void return_message(std::shared_ptr<MessageT> & msg)
{
for (size_t i = 0; i < size; ++i) {
for (size_t i = 0; i < Size; ++i) {
if (pool_[i].msg_ptr_ == msg) {
pool_[i].used = false;
return;
Expand All @@ -74,7 +74,7 @@ class MessagePoolMemoryStrategy
bool used;
};

std::array<PoolMember, size> pool_;
std::array<PoolMember, Size> pool_;
size_t next_array_index_;

};
Expand Down
185 changes: 146 additions & 39 deletions rclcpp/include/rclcpp/strategies/static_memory_strategy.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,71 +27,155 @@ namespace memory_strategies

namespace static_memory_strategy
{
struct ObjectPoolBounds
struct ObjectPoolBounds
{
public:
size_t max_subscriptions;
size_t max_services;
size_t max_clients;
size_t max_executables;
size_t max_guard_conditions;
size_t pool_size;

ObjectPoolBounds()
: max_subscriptions(10), max_services(10), max_clients(10),
max_executables(1), max_guard_conditions(2), pool_size(1024)
{}

ObjectPoolBounds & set_max_subscriptions(size_t subscriptions)
{
max_subscriptions = subscriptions;
return *this;
}

ObjectPoolBounds & set_max_services(size_t services)
{
max_services = services;
return *this;
}

ObjectPoolBounds & set_max_clients(size_t clients)
{
max_clients = clients;
return *this;
}

ObjectPoolBounds & set_max_guard_conditions(size_t guard_conditions)
{
public:
size_t max_subscriptions_;
size_t max_services_;
size_t max_clients_;
size_t max_executables_;
size_t pool_size_;
max_guard_conditions = guard_conditions;
return *this;
}

ObjectPoolBounds & set_max_executables(size_t executables)
{
max_executables = executables;
return *this;
}

ObjectPoolBounds(size_t subs = 10, size_t services = 10, size_t clients = 10,
size_t executables = 1, size_t pool = 1024)
: max_subscriptions_(subs), max_services_(services), max_clients_(clients), max_executables_(
executables), pool_size_(pool)
{}
};
ObjectPoolBounds & set_memory_pool_size(size_t pool)
{
pool_size = pool;
return *this;
}
};


class StaticMemoryStrategy : public memory_strategy::MemoryStrategy
{
public:
StaticMemoryStrategy(ObjectPoolBounds bounds = ObjectPoolBounds())
: bounds_(bounds)
: bounds_(bounds), memory_pool_(nullptr), subscription_pool_(nullptr),
service_pool_(nullptr), guard_condition_pool_(nullptr), executable_pool_(nullptr)
{
memory_pool_ = static_cast<void **>(malloc(bounds_.pool_size_));
subscription_pool_ = static_cast<void **>(malloc(bounds_.max_subscriptions_));
service_pool_ = static_cast<void **>(malloc(bounds_.max_services_));
client_pool_ = static_cast<void **>(malloc(bounds_.max_clients_));
executable_pool_ = static_cast<executor::AnyExecutable *>(malloc(bounds_.max_executables_));

memset(memory_pool_, 0, bounds_.pool_size_);
memset(subscription_pool_, 0, bounds_.max_subscriptions_);
memset(service_pool_, 0, bounds_.max_services_);
memset(client_pool_, 0, bounds_.max_clients_);
memset(executable_pool_, 0, bounds_.max_executables_);
if (bounds_.pool_size) {
memory_pool_ = new void *[bounds_.pool_size];
memset(memory_pool_, 0, bounds_.pool_size * sizeof(void *));
}

if (bounds_.max_subscriptions) {
subscription_pool_ = new void *[bounds_.max_subscriptions];
memset(subscription_pool_, 0, bounds_.max_subscriptions * sizeof(void *));
}

if (bounds_.max_services) {
service_pool_ = new void *[bounds_.max_services];
memset(service_pool_, 0, bounds_.max_services * sizeof(void *));
}

if (bounds_.max_clients) {
client_pool_ = new void *[bounds_.max_clients];
memset(client_pool_, 0, bounds_.max_clients * sizeof(void *));
}

if (bounds_.max_guard_conditions) {
guard_condition_pool_ = new void *[bounds_.max_guard_conditions];
memset(guard_condition_pool_, 0, bounds_.max_guard_conditions * sizeof(void *));
}

if (bounds_.max_executables) {
executable_pool_ = new executor::AnyExecutable::SharedPtr[bounds_.max_executables];
}

for (size_t i = 0; i < bounds_.max_executables; ++i) {
executable_pool_[i] = std::make_shared<executor::AnyExecutable>();
}

pool_seq_ = 0;
exec_seq_ = 0;

// Reserve pool_size_ buckets in the memory map.
memory_map_.reserve(bounds_.pool_size_);
for (size_t i = 0; i < bounds_.pool_size_; ++i) {
memory_map_.reserve(bounds_.pool_size);
for (size_t i = 0; i < bounds_.pool_size; ++i) {
memory_map_[memory_pool_[i]] = 0;
}
}

~StaticMemoryStrategy()
{
if (bounds_.pool_size) {
delete[] memory_pool_;
}
if (bounds_.max_subscriptions) {
delete[] subscription_pool_;
}
if (bounds_.max_services) {
delete[] service_pool_;
}
if (bounds_.max_clients) {
delete[] client_pool_;
}
if (bounds_.max_guard_conditions) {
delete[] guard_condition_pool_;
}
}

void ** borrow_handles(HandleType type, size_t number_of_handles)
{
switch (type) {
case HandleType::subscription_handle:
if (number_of_handles > bounds_.max_subscriptions_) {
if (number_of_handles > bounds_.max_subscriptions) {
throw std::runtime_error("Requested size exceeded maximum subscriptions.");
}

return subscription_pool_;
case HandleType::service_handle:
if (number_of_handles > bounds_.max_services_) {
if (number_of_handles > bounds_.max_services) {
throw std::runtime_error("Requested size exceeded maximum services.");
}

return service_pool_;
case HandleType::client_handle:
if (number_of_handles > bounds_.max_clients_) {
if (number_of_handles > bounds_.max_clients) {
throw std::runtime_error("Requested size exceeded maximum clients.");
}

return client_pool_;
case HandleType::guard_condition_handle:
if (number_of_handles > bounds_.max_guard_conditions) {
throw std::runtime_error("Requested size exceeded maximum guard_conditions.");
}

return guard_condition_pool_;
default:
break;
}
Expand All @@ -103,13 +187,24 @@ class StaticMemoryStrategy : public memory_strategy::MemoryStrategy
(void)handles;
switch (type) {
case HandleType::subscription_handle:
memset(subscription_pool_, 0, bounds_.max_subscriptions_);
if (bounds_.max_subscriptions) {
memset(subscription_pool_, 0, bounds_.max_subscriptions * sizeof(void *));
}
break;
case HandleType::service_handle:
memset(service_pool_, 0, bounds_.max_services_);
if (bounds_.max_services) {
memset(service_pool_, 0, bounds_.max_services * sizeof(void *));
}
break;
case HandleType::client_handle:
memset(client_pool_, 0, bounds_.max_clients_);
if (bounds_.max_clients) {
memset(client_pool_, 0, bounds_.max_clients * sizeof(void *));
}
break;
case HandleType::guard_condition_handle:
if (bounds_.max_guard_conditions) {
memset(guard_condition_pool_, 0, bounds_.max_guard_conditions * sizeof(void *));
}
break;
default:
throw std::runtime_error("Unrecognized enum, could not return handle memory.");
Expand All @@ -118,28 +213,39 @@ class StaticMemoryStrategy : public memory_strategy::MemoryStrategy

executor::AnyExecutable::SharedPtr instantiate_next_executable()
{
if (exec_seq_ >= bounds_.max_executables_) {
if (exec_seq_ >= bounds_.max_executables) {
// wrap around
exec_seq_ = 0;
}
size_t prev_exec_seq_ = exec_seq_;
++exec_seq_;

return std::make_shared<executor::AnyExecutable>(executable_pool_[prev_exec_seq_]);
if (!executable_pool_[prev_exec_seq_]) {
throw std::runtime_error("Executable pool member was NULL");
}

executable_pool_[prev_exec_seq_]->subscription.reset();
executable_pool_[prev_exec_seq_]->timer.reset();
executable_pool_[prev_exec_seq_]->service.reset();
executable_pool_[prev_exec_seq_]->client.reset();
executable_pool_[prev_exec_seq_]->callback_group.reset();
executable_pool_[prev_exec_seq_]->node.reset();

return executable_pool_[prev_exec_seq_];
}

void * alloc(size_t size)
{
// Extremely naive static allocation strategy
// Keep track of block size at a given pointer
if (pool_seq_ + size > bounds_.pool_size_) {
if (pool_seq_ + size > bounds_.pool_size) {
// Start at 0
pool_seq_ = 0;
}
void * ptr = memory_pool_[pool_seq_];
if (memory_map_.count(ptr) == 0) {
// We expect to have the state for all blocks pre-mapped into memory_map_
throw std::runtime_error("Unexpected pointer in rcl_malloc.");
throw std::runtime_error("Unexpected pointer in StaticMemoryStrategy::alloc.");
}
memory_map_[ptr] = size;
size_t prev_pool_seq = pool_seq_;
Expand All @@ -151,7 +257,7 @@ class StaticMemoryStrategy : public memory_strategy::MemoryStrategy
{
if (memory_map_.count(ptr) == 0) {
// We expect to have the state for all blocks pre-mapped into memory_map_
throw std::runtime_error("Unexpected pointer in rcl_free.");
throw std::runtime_error("Unexpected pointer in StaticMemoryStrategy::free.");
}

memset(ptr, 0, memory_map_[ptr]);
Expand All @@ -164,7 +270,8 @@ class StaticMemoryStrategy : public memory_strategy::MemoryStrategy
void ** subscription_pool_;
void ** service_pool_;
void ** client_pool_;
executor::AnyExecutable * executable_pool_;
void ** guard_condition_pool_;
executor::AnyExecutable::SharedPtr * executable_pool_;

size_t pool_seq_;
size_t exec_seq_;
Expand Down
6 changes: 6 additions & 0 deletions rclcpp/include/rclcpp/subscription.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ class SubscriptionBase

virtual std::shared_ptr<void> create_message() = 0;
virtual void handle_message(std::shared_ptr<void> & message) = 0;
virtual void return_message(std::shared_ptr<void> & message) = 0;

private:
RCLCPP_DISABLE_COPY(SubscriptionBase);
Expand Down Expand Up @@ -128,6 +129,11 @@ class Subscription : public SubscriptionBase
{
auto typed_message = std::static_pointer_cast<MessageT>(message);
callback_(typed_message);
}

void return_message(std::shared_ptr<void> & message)
{
auto typed_message = std::static_pointer_cast<MessageT>(message);
message_memory_strategy_->return_message(typed_message);
}

Expand Down

0 comments on commit 872edfc

Please sign in to comment.