diff --git a/rclcpp/include/rclcpp/executor.hpp b/rclcpp/include/rclcpp/executor.hpp index 3967c29833..954d73350a 100644 --- a/rclcpp/include/rclcpp/executor.hpp +++ b/rclcpp/include/rclcpp/executor.hpp @@ -187,6 +187,7 @@ class Executor "[rclcpp::error] take failed for subscription on topic '%s': %s\n", subscription->get_topic_name().c_str(), rmw_get_error_string_safe()); } + subscription->return_message(message); } static void @@ -791,12 +792,7 @@ class Executor AnyExecutable::SharedPtr get_next_ready_executable() { - return get_next_ready_executable(this->memory_strategy_->instantiate_next_executable()); - } - - AnyExecutable::SharedPtr - get_next_ready_executable(AnyExecutable::SharedPtr any_exec) - { + auto any_exec = this->memory_strategy_->instantiate_next_executable(); // Check the timers to see if there are any that are ready, if so return get_next_timer(any_exec); if (any_exec->timer) { @@ -817,9 +813,8 @@ class Executor if (any_exec->client) { return any_exec; } - // If there is neither a ready timer nor subscription, return a null ptr - any_exec.reset(); - return any_exec; + // If there is no ready executable, return a null ptr + return std::shared_ptr(); } template @@ -842,7 +837,7 @@ class Executor if (any_exec) { // If it is valid, check to see if the group is mutually exclusive or // not, then mark it accordingly - if (any_exec->callback_group->type_ == \ + if (any_exec->callback_group && any_exec->callback_group->type_ == \ callback_group::CallbackGroupType::MutuallyExclusive) { // It should not have been taken otherwise diff --git a/rclcpp/include/rclcpp/memory_strategy.hpp b/rclcpp/include/rclcpp/memory_strategy.hpp index a63fd4ee96..8dc4944f11 100644 --- a/rclcpp/include/rclcpp/memory_strategy.hpp +++ b/rclcpp/include/rclcpp/memory_strategy.hpp @@ -51,7 +51,7 @@ class MemoryStrategy virtual executor::AnyExecutable::SharedPtr instantiate_next_executable() { - return executor::AnyExecutable::SharedPtr(new executor::AnyExecutable); + return std::make_shared(); } virtual void * alloc(size_t size) diff --git a/rclcpp/include/rclcpp/strategies/message_pool_memory_strategy.hpp b/rclcpp/include/rclcpp/strategies/message_pool_memory_strategy.hpp index 17db254548..0e437eb902 100644 --- a/rclcpp/include/rclcpp/strategies/message_pool_memory_strategy.hpp +++ b/rclcpp/include/rclcpp/strategies/message_pool_memory_strategy.hpp @@ -25,7 +25,7 @@ namespace strategies namespace message_pool_memory_strategy { -template::value>::type * = nullptr> class MessagePoolMemoryStrategy @@ -36,7 +36,7 @@ class MessagePoolMemoryStrategy MessagePoolMemoryStrategy() : next_array_index_(0) { - for (size_t i = 0; i < size; ++i) { + for (size_t i = 0; i < Size; ++i) { pool_[i].msg_ptr_ = std::make_shared(); pool_[i].used = false; } @@ -45,7 +45,7 @@ class MessagePoolMemoryStrategy std::shared_ptr borrow_message() { size_t current_index = next_array_index_; - next_array_index_ = (next_array_index_ + 1) % size; + next_array_index_ = (next_array_index_ + 1) % Size; if (pool_[current_index].used) { throw std::runtime_error("Tried to access message that was still in use! Abort."); } @@ -58,7 +58,7 @@ class MessagePoolMemoryStrategy void return_message(std::shared_ptr & msg) { - for (size_t i = 0; i < size; ++i) { + for (size_t i = 0; i < Size; ++i) { if (pool_[i].msg_ptr_ == msg) { pool_[i].used = false; return; @@ -74,7 +74,7 @@ class MessagePoolMemoryStrategy bool used; }; - std::array pool_; + std::array pool_; size_t next_array_index_; }; diff --git a/rclcpp/include/rclcpp/strategies/static_memory_strategy.hpp b/rclcpp/include/rclcpp/strategies/static_memory_strategy.hpp index db93f260d7..b92c72de85 100644 --- a/rclcpp/include/rclcpp/strategies/static_memory_strategy.hpp +++ b/rclcpp/include/rclcpp/strategies/static_memory_strategy.hpp @@ -27,71 +27,155 @@ namespace memory_strategies namespace static_memory_strategy { - struct ObjectPoolBounds +struct ObjectPoolBounds +{ +public: + size_t max_subscriptions; + size_t max_services; + size_t max_clients; + size_t max_executables; + size_t max_guard_conditions; + size_t pool_size; + + ObjectPoolBounds() + : max_subscriptions(10), max_services(10), max_clients(10), + max_executables(1), max_guard_conditions(2), pool_size(1024) + {} + + ObjectPoolBounds & set_max_subscriptions(size_t subscriptions) + { + max_subscriptions = subscriptions; + return *this; + } + + ObjectPoolBounds & set_max_services(size_t services) + { + max_services = services; + return *this; + } + + ObjectPoolBounds & set_max_clients(size_t clients) + { + max_clients = clients; + return *this; + } + + ObjectPoolBounds & set_max_guard_conditions(size_t guard_conditions) { - public: - size_t max_subscriptions_; - size_t max_services_; - size_t max_clients_; - size_t max_executables_; - size_t pool_size_; + max_guard_conditions = guard_conditions; + return *this; + } + + ObjectPoolBounds & set_max_executables(size_t executables) + { + max_executables = executables; + return *this; + } - ObjectPoolBounds(size_t subs = 10, size_t services = 10, size_t clients = 10, - size_t executables = 1, size_t pool = 1024) - : max_subscriptions_(subs), max_services_(services), max_clients_(clients), max_executables_( - executables), pool_size_(pool) - {} - }; + ObjectPoolBounds & set_memory_pool_size(size_t pool) + { + pool_size = pool; + return *this; + } +}; class StaticMemoryStrategy : public memory_strategy::MemoryStrategy { public: StaticMemoryStrategy(ObjectPoolBounds bounds = ObjectPoolBounds()) - : bounds_(bounds) + : bounds_(bounds), memory_pool_(nullptr), subscription_pool_(nullptr), + service_pool_(nullptr), guard_condition_pool_(nullptr), executable_pool_(nullptr) { - memory_pool_ = static_cast(malloc(bounds_.pool_size_)); - subscription_pool_ = static_cast(malloc(bounds_.max_subscriptions_)); - service_pool_ = static_cast(malloc(bounds_.max_services_)); - client_pool_ = static_cast(malloc(bounds_.max_clients_)); - executable_pool_ = static_cast(malloc(bounds_.max_executables_)); - - memset(memory_pool_, 0, bounds_.pool_size_); - memset(subscription_pool_, 0, bounds_.max_subscriptions_); - memset(service_pool_, 0, bounds_.max_services_); - memset(client_pool_, 0, bounds_.max_clients_); - memset(executable_pool_, 0, bounds_.max_executables_); + if (bounds_.pool_size) { + memory_pool_ = new void *[bounds_.pool_size]; + memset(memory_pool_, 0, bounds_.pool_size * sizeof(void *)); + } + + if (bounds_.max_subscriptions) { + subscription_pool_ = new void *[bounds_.max_subscriptions]; + memset(subscription_pool_, 0, bounds_.max_subscriptions * sizeof(void *)); + } + + if (bounds_.max_services) { + service_pool_ = new void *[bounds_.max_services]; + memset(service_pool_, 0, bounds_.max_services * sizeof(void *)); + } + + if (bounds_.max_clients) { + client_pool_ = new void *[bounds_.max_clients]; + memset(client_pool_, 0, bounds_.max_clients * sizeof(void *)); + } + + if (bounds_.max_guard_conditions) { + guard_condition_pool_ = new void *[bounds_.max_guard_conditions]; + memset(guard_condition_pool_, 0, bounds_.max_guard_conditions * sizeof(void *)); + } + + if (bounds_.max_executables) { + executable_pool_ = new executor::AnyExecutable::SharedPtr[bounds_.max_executables]; + } + + for (size_t i = 0; i < bounds_.max_executables; ++i) { + executable_pool_[i] = std::make_shared(); + } + pool_seq_ = 0; exec_seq_ = 0; // Reserve pool_size_ buckets in the memory map. - memory_map_.reserve(bounds_.pool_size_); - for (size_t i = 0; i < bounds_.pool_size_; ++i) { + memory_map_.reserve(bounds_.pool_size); + for (size_t i = 0; i < bounds_.pool_size; ++i) { memory_map_[memory_pool_[i]] = 0; } } + ~StaticMemoryStrategy() + { + if (bounds_.pool_size) { + delete[] memory_pool_; + } + if (bounds_.max_subscriptions) { + delete[] subscription_pool_; + } + if (bounds_.max_services) { + delete[] service_pool_; + } + if (bounds_.max_clients) { + delete[] client_pool_; + } + if (bounds_.max_guard_conditions) { + delete[] guard_condition_pool_; + } + } + void ** borrow_handles(HandleType type, size_t number_of_handles) { switch (type) { case HandleType::subscription_handle: - if (number_of_handles > bounds_.max_subscriptions_) { + if (number_of_handles > bounds_.max_subscriptions) { throw std::runtime_error("Requested size exceeded maximum subscriptions."); } return subscription_pool_; case HandleType::service_handle: - if (number_of_handles > bounds_.max_services_) { + if (number_of_handles > bounds_.max_services) { throw std::runtime_error("Requested size exceeded maximum services."); } return service_pool_; case HandleType::client_handle: - if (number_of_handles > bounds_.max_clients_) { + if (number_of_handles > bounds_.max_clients) { throw std::runtime_error("Requested size exceeded maximum clients."); } return client_pool_; + case HandleType::guard_condition_handle: + if (number_of_handles > bounds_.max_guard_conditions) { + throw std::runtime_error("Requested size exceeded maximum guard_conditions."); + } + + return guard_condition_pool_; default: break; } @@ -103,13 +187,24 @@ class StaticMemoryStrategy : public memory_strategy::MemoryStrategy (void)handles; switch (type) { case HandleType::subscription_handle: - memset(subscription_pool_, 0, bounds_.max_subscriptions_); + if (bounds_.max_subscriptions) { + memset(subscription_pool_, 0, bounds_.max_subscriptions * sizeof(void *)); + } break; case HandleType::service_handle: - memset(service_pool_, 0, bounds_.max_services_); + if (bounds_.max_services) { + memset(service_pool_, 0, bounds_.max_services * sizeof(void *)); + } break; case HandleType::client_handle: - memset(client_pool_, 0, bounds_.max_clients_); + if (bounds_.max_clients) { + memset(client_pool_, 0, bounds_.max_clients * sizeof(void *)); + } + break; + case HandleType::guard_condition_handle: + if (bounds_.max_guard_conditions) { + memset(guard_condition_pool_, 0, bounds_.max_guard_conditions * sizeof(void *)); + } break; default: throw std::runtime_error("Unrecognized enum, could not return handle memory."); @@ -118,28 +213,39 @@ class StaticMemoryStrategy : public memory_strategy::MemoryStrategy executor::AnyExecutable::SharedPtr instantiate_next_executable() { - if (exec_seq_ >= bounds_.max_executables_) { + if (exec_seq_ >= bounds_.max_executables) { // wrap around exec_seq_ = 0; } size_t prev_exec_seq_ = exec_seq_; ++exec_seq_; - return std::make_shared(executable_pool_[prev_exec_seq_]); + if (!executable_pool_[prev_exec_seq_]) { + throw std::runtime_error("Executable pool member was NULL"); + } + + executable_pool_[prev_exec_seq_]->subscription.reset(); + executable_pool_[prev_exec_seq_]->timer.reset(); + executable_pool_[prev_exec_seq_]->service.reset(); + executable_pool_[prev_exec_seq_]->client.reset(); + executable_pool_[prev_exec_seq_]->callback_group.reset(); + executable_pool_[prev_exec_seq_]->node.reset(); + + return executable_pool_[prev_exec_seq_]; } void * alloc(size_t size) { // Extremely naive static allocation strategy // Keep track of block size at a given pointer - if (pool_seq_ + size > bounds_.pool_size_) { + if (pool_seq_ + size > bounds_.pool_size) { // Start at 0 pool_seq_ = 0; } void * ptr = memory_pool_[pool_seq_]; if (memory_map_.count(ptr) == 0) { // We expect to have the state for all blocks pre-mapped into memory_map_ - throw std::runtime_error("Unexpected pointer in rcl_malloc."); + throw std::runtime_error("Unexpected pointer in StaticMemoryStrategy::alloc."); } memory_map_[ptr] = size; size_t prev_pool_seq = pool_seq_; @@ -151,7 +257,7 @@ class StaticMemoryStrategy : public memory_strategy::MemoryStrategy { if (memory_map_.count(ptr) == 0) { // We expect to have the state for all blocks pre-mapped into memory_map_ - throw std::runtime_error("Unexpected pointer in rcl_free."); + throw std::runtime_error("Unexpected pointer in StaticMemoryStrategy::free."); } memset(ptr, 0, memory_map_[ptr]); @@ -164,7 +270,8 @@ class StaticMemoryStrategy : public memory_strategy::MemoryStrategy void ** subscription_pool_; void ** service_pool_; void ** client_pool_; - executor::AnyExecutable * executable_pool_; + void ** guard_condition_pool_; + executor::AnyExecutable::SharedPtr * executable_pool_; size_t pool_seq_; size_t exec_seq_; diff --git a/rclcpp/include/rclcpp/subscription.hpp b/rclcpp/include/rclcpp/subscription.hpp index 68fe6fffb1..b2c659ccc9 100644 --- a/rclcpp/include/rclcpp/subscription.hpp +++ b/rclcpp/include/rclcpp/subscription.hpp @@ -80,6 +80,7 @@ class SubscriptionBase virtual std::shared_ptr create_message() = 0; virtual void handle_message(std::shared_ptr & message) = 0; + virtual void return_message(std::shared_ptr & message) = 0; private: RCLCPP_DISABLE_COPY(SubscriptionBase); @@ -128,6 +129,11 @@ class Subscription : public SubscriptionBase { auto typed_message = std::static_pointer_cast(message); callback_(typed_message); + } + + void return_message(std::shared_ptr & message) + { + auto typed_message = std::static_pointer_cast(message); message_memory_strategy_->return_message(typed_message); }