Skip to content

Commit

Permalink
fixes to address comments and CI failures
Browse files Browse the repository at this point in the history
  • Loading branch information
wjwwood committed Aug 21, 2015
1 parent aedc494 commit 12b939c
Show file tree
Hide file tree
Showing 10 changed files with 264 additions and 181 deletions.
4 changes: 3 additions & 1 deletion rclcpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@ if(AMENT_ENABLE_TESTING)

ament_add_gtest(test_mapped_ring_buffer test/test_mapped_ring_buffer.cpp)
ament_add_gtest(test_intra_process_manager test/test_intra_process_manager.cpp)
target_include_directories(test_intra_process_manager PUBLIC "${rcl_interfaces_INCLUDE_DIRS}")
if(TARGET test_intra_process_manager)
target_include_directories(test_intra_process_manager PUBLIC "${rcl_interfaces_INCLUDE_DIRS}")
endif()
endif()

ament_package(
Expand Down
2 changes: 2 additions & 0 deletions rclcpp/include/rclcpp/context.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,13 @@ class Context
auto it = sub_contexts_.find(type_i);
if (it == sub_contexts_.end()) {
// It doesn't exist yet, make it
// *INDENT-OFF* (prevent uncrustify from making unecessary indents here)
sub_context = std::shared_ptr<SubContext>(
new SubContext(std::forward<Args>(args) ...),
[] (SubContext * sub_context_ptr) {
delete sub_context_ptr;
});
// *INDENT-ON*
sub_contexts_[type_i] = sub_context;
} else {
// It exists, get it out and cast it.
Expand Down
54 changes: 30 additions & 24 deletions rclcpp/include/rclcpp/intra_process_manager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,9 @@
#include <rclcpp/subscription.hpp>

#include <algorithm>
#include <cassert>
#include <atomic>
#include <cstdint>
#include <exception>
#include <limits>
#include <map>
#include <unordered_map>
Expand Down Expand Up @@ -115,6 +116,7 @@ class IntraProcessManager
{
private:
RCLCPP_DISABLE_COPY(IntraProcessManager);

public:
RCLCPP_SMART_PTR_DEFINITIONS(IntraProcessManager);

Expand All @@ -129,8 +131,8 @@ class IntraProcessManager
*
* This method will allocate memory.
*
* /param subscription the Subscription to register.
* /return an unsigned 64-bit integer which is the subscription's unique id.
* \param subscription the Subscription to register.
* \return an unsigned 64-bit integer which is the subscription's unique id.
*/
uint64_t
add_subscription(subscription::SubscriptionBase::SharedPtr subscription)
Expand All @@ -144,7 +146,7 @@ class IntraProcessManager
/// Unregister a subscription using the subscription's unique id.
/* This method does not allocate memory.
*
* /param intra_process_subscription_id id of the subscription to remove.
* \param intra_process_subscription_id id of the subscription to remove.
*/
void
remove_subscription(uint64_t intra_process_subscription_id)
Expand Down Expand Up @@ -180,19 +182,21 @@ class IntraProcessManager
*
* This method will allocate memory.
*
* /param publisher publisher to be registered with the manager.
* /param buffer_size if 0 (default) a size is calculated based on the QoS.
* /return an unsigned 64-bit integer which is the publisher's unique id.
* \param publisher publisher to be registered with the manager.
* \param buffer_size if 0 (default) a size is calculated based on the QoS.
* \return an unsigned 64-bit integer which is the publisher's unique id.
*/
template<typename MessageT>
uint64_t
add_publisher(publisher::Publisher::SharedPtr publisher, size_t buffer_size=0)
add_publisher(publisher::Publisher::SharedPtr publisher, size_t buffer_size = 0)
{
auto id = IntraProcessManager::get_next_unique_id();
publishers_[id].publisher = publisher;
size_t size = buffer_size > 0 ? buffer_size : publisher->get_queue_size();
// As long as the size of the ring buffer is less than the max sequence number, we're safe.
assert(size <= std::numeric_limits<uint64_t>::max());
if (size > std::numeric_limits<uint64_t>::max()) {
throw std::invalid_argument("the calculated buffer size is too large");
}
publishers_[id].sequence_number.store(0);
publishers_[id].buffer = mapped_ring_buffer::MappedRingBuffer<MessageT>::make_shared(size);
publishers_[id].target_subscriptions_by_message_sequence.reserve(size);
Expand All @@ -202,7 +206,7 @@ class IntraProcessManager
/// Unregister a publisher using the publisher's unique id.
/* This method does not allocate memory.
*
* /param intra_process_publisher_id id of the publisher to remove.
* \param intra_process_publisher_id id of the publisher to remove.
*/
void
remove_publisher(uint64_t intra_process_publisher_id)
Expand Down Expand Up @@ -236,9 +240,9 @@ class IntraProcessManager
*
* This method does allocate memory.
*
* /param intra_process_publisher_id the id of the publisher of this message.
* /param message the message that is being stored.
* /return the message sequence number.
* \param intra_process_publisher_id the id of the publisher of this message.
* \param message the message that is being stored.
* \return the message sequence number.
*/
template<typename MessageT>
uint64_t
Expand All @@ -250,7 +254,7 @@ class IntraProcessManager
if (it == publishers_.end()) {
throw std::runtime_error("store_intra_process_message called with invalid publisher id");
}
publisher_info & info = it->second;
PublisherInfo & info = it->second;
// Calculate the next message sequence number.
uint64_t message_seq = info.sequence_number.fetch_add(1, std::memory_order_relaxed);
// Insert the message into the ring buffer using the message_seq to identify it.
Expand Down Expand Up @@ -309,10 +313,10 @@ class IntraProcessManager
*
* This method may allocate memory to copy the stored message.
*
* /param intra_process_publisher_id the id of the message's publisher.
* /param message_sequence_number the sequence number of the message.
* /param requesting_subscriptions_intra_process_id the subscription's id.
* /param message the message typed unique_ptr used to return the message.
* \param intra_process_publisher_id the id of the message's publisher.
* \param message_sequence_number the sequence number of the message.
* \param requesting_subscriptions_intra_process_id the subscription's id.
* \param message the message typed unique_ptr used to return the message.
*/
template<typename MessageT>
void
Expand All @@ -323,7 +327,7 @@ class IntraProcessManager
std::unique_ptr<MessageT> & message)
{
message = nullptr;
publisher_info * info;
PublisherInfo * info;
{
auto it = publishers_.find(intra_process_publisher_id);
if (it == publishers_.end()) {
Expand Down Expand Up @@ -377,9 +381,11 @@ class IntraProcessManager
// So around 585 million years. Even at 1 GHz, it would take 585 years.
// I think it's safe to avoid trying to handle overflow.
// If we roll over then it's most likely a bug.
// *INDENT-OFF* (prevent uncrustify from making unecessary indents here)
throw std::overflow_error(
"exhausted the unique id's for publishers and subscribers in this process "
"(congratulations your computer is either extremely fast or extremely old)");
// *INDENT-ON*
}
return next_id;
}
Expand All @@ -389,23 +395,23 @@ class IntraProcessManager
std::unordered_map<uint64_t, subscription::SubscriptionBase::WeakPtr> subscriptions_;
std::map<std::string, std::set<uint64_t>> subscription_ids_by_topic_;

struct publisher_info
struct PublisherInfo
{
RCLCPP_DISABLE_COPY(publisher_info);
RCLCPP_DISABLE_COPY(PublisherInfo);

publisher_info() = default;
PublisherInfo() = default;

publisher::Publisher::WeakPtr publisher;
std::atomic<uint64_t> sequence_number;
mapped_ring_buffer::MappedRingBufferBase::SharedPtr buffer;
std::unordered_map<uint64_t, std::set<uint64_t>> target_subscriptions_by_message_sequence;
};

std::unordered_map<uint64_t, publisher_info> publishers_;
std::unordered_map<uint64_t, PublisherInfo> publishers_;

};

std::atomic<uint64_t> IntraProcessManager::next_unique_id_{1};
std::atomic<uint64_t> IntraProcessManager::next_unique_id_ {1};

} /* namespace intra_process_manager */
} /* namespace rclcpp */
Expand Down
31 changes: 20 additions & 11 deletions rclcpp/include/rclcpp/mapped_ring_buffer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,10 @@ class MappedRingBuffer : public MappedRingBufferBase
/// Constructor.
/* The constructor will allocate memory while reserving space.
*
* /param size size of the ring buffer; must be positive and non-zero.
* \param size size of the ring buffer; must be positive and non-zero.
*/
MappedRingBuffer(size_t size) : elements_(size), head_(0)
MappedRingBuffer(size_t size)
: elements_(size), head_(0)
{
if (size == 0) {
throw std::invalid_argument("size must be a positive, non-zero value");
Expand All @@ -75,8 +76,10 @@ class MappedRingBuffer : public MappedRingBufferBase
*
* The key is not guaranteed to be unique, see the class docs for more.
*
* /param key the key associated with the stored value
* /param value if the key is found, the value is stored in this parameter
* The contents of value before the method is called are discarded.
*
* \param key the key associated with the stored value
* \param value if the key is found, the value is stored in this parameter
*/
void
get_copy_at_key(uint64_t key, std::unique_ptr<T> & value)
Expand All @@ -102,8 +105,10 @@ class MappedRingBuffer : public MappedRingBufferBase
* originally stored object, since it was returned by the first call to this
* method.
*
* /param key the key associated with the stored value
* /param value if the key is found, the value is stored in this parameter
* The contents of value before the method is called are discarded.
*
* \param key the key associated with the stored value
* \param value if the key is found, the value is stored in this parameter
*/
void
get_ownership_at_key(uint64_t key, std::unique_ptr<T> & value)
Expand All @@ -125,8 +130,10 @@ class MappedRingBuffer : public MappedRingBufferBase
*
* The key is not guaranteed to be unique, see the class docs for more.
*
* /param key the key associated with the stored value
* /param value if the key is found, the value is stored in this parameter
* The contents of value before the method is called are discarded.
*
* \param key the key associated with the stored value
* \param value if the key is found, the value is stored in this parameter
*/
void
pop_at_key(uint64_t key, std::unique_ptr<T> & value)
Expand All @@ -147,8 +154,8 @@ class MappedRingBuffer : public MappedRingBufferBase
* After insertion, if a pair was replaced, then value will contain ownership
* of that displaced value. Otherwise it will be a nullptr.
*
* /param key the key associated with the value to be stored
* /param value the value to store, and optionally the value displaced
* \param key the key associated with the value to be stored
* \param value the value to store, and optionally the value displaced
*/
bool
push_and_replace(uint64_t key, std::unique_ptr<T> & value)
Expand Down Expand Up @@ -188,9 +195,11 @@ class MappedRingBuffer : public MappedRingBufferBase
typename std::vector<element>::iterator
get_iterator_of_key(uint64_t key)
{
auto it = std::find_if(elements_.begin(), elements_.end(), [key] (element & e) -> bool {
// *INDENT-OFF* (prevent uncrustify from making unecessary indents here)
auto it = std::find_if(elements_.begin(), elements_.end(), [key](element & e) -> bool {
return e.key == key && e.in_use;
});
// *INDENT-ON*
return it;
}

Expand Down
8 changes: 3 additions & 5 deletions rclcpp/include/rclcpp/node.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ class Node
private:
RCLCPP_DISABLE_COPY(Node);

static const rosidl_message_type_support_t * ipm_ts;
static const rosidl_message_type_support_t * ipm_ts_;

bool
group_in_node(callback_group::CallbackGroup::SharedPtr & group);
Expand Down Expand Up @@ -312,10 +312,8 @@ class Node
}
};

const rosidl_message_type_support_t * Node::ipm_ts =
rosidl_generator_cpp::get_message_type_support_handle<
rcl_interfaces::msg::IntraProcessMessage
>();
const rosidl_message_type_support_t * Node::ipm_ts_ =
rosidl_generator_cpp::get_message_type_support_handle<rcl_interfaces::msg::IntraProcessMessage>();

} /* namespace node */
} /* namespace rclcpp */
Expand Down
20 changes: 13 additions & 7 deletions rclcpp/include/rclcpp/node_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,9 @@ using namespace rclcpp::node;

Node::Node(const std::string & node_name, bool use_intra_process_comms)
: Node(
node_name,
rclcpp::contexts::default_context::get_global_default_context(),
use_intra_process_comms)
node_name,
rclcpp::contexts::default_context::get_global_default_context(),
use_intra_process_comms)
{}

Node::Node(
Expand Down Expand Up @@ -87,13 +87,15 @@ Node::Node(
// *INDENT-ON*
}
// Initialize node handle shared_ptr with custom deleter.
// *INDENT-OFF*
node_handle_.reset(node, [](rmw_node_t * node) {
auto ret = rmw_destroy_node(node);
if (ret != RMW_RET_OK) {
fprintf(
stderr, "Error in destruction of rmw node handle: %s\n", rmw_get_error_string_safe());
}
});
// *INDENT-ON*

using rclcpp::callback_group::CallbackGroupType;
default_callback_group_ = create_callback_group(
Expand Down Expand Up @@ -135,7 +137,7 @@ Node::create_publisher(

if (use_intra_process_comms_) {
rmw_publisher_t * intra_process_publisher_handle = rmw_create_publisher(
node_handle_.get(), ipm_ts, (topic_name + "__intra").c_str(), qos_profile);
node_handle_.get(), ipm_ts_, (topic_name + "__intra").c_str(), qos_profile);
if (!intra_process_publisher_handle) {
// *INDENT-OFF* (prevent uncrustify from making unecessary indents here)
throw std::runtime_error(
Expand All @@ -149,8 +151,9 @@ Node::create_publisher(
uint64_t intra_process_publisher_id =
intra_process_manager->add_publisher<MessageT>(publisher);
rclcpp::intra_process_manager::IntraProcessManager::WeakPtr weak_ipm = intra_process_manager;
// *INDENT-OFF*
auto shared_publish_callback =
[weak_ipm] (uint64_t publisher_id, std::shared_ptr<void> msg) -> uint64_t
[weak_ipm](uint64_t publisher_id, std::shared_ptr<void> msg) -> uint64_t
{
auto ipm = weak_ipm.lock();
if (!ipm) {
Expand All @@ -163,6 +166,7 @@ Node::create_publisher(
uint64_t message_seq = ipm->store_intra_process_message(publisher_id, unique_msg);
return message_seq;
};
// *INDENT-ON*
publisher->setup_intra_process(
intra_process_publisher_id,
shared_publish_callback,
Expand Down Expand Up @@ -225,7 +229,7 @@ Node::create_subscription(
// Setup intra process.
if (use_intra_process_comms_) {
rmw_subscription_t * intra_process_subscriber_handle = rmw_create_subscription(
node_handle_.get(), ipm_ts,
node_handle_.get(), ipm_ts_,
(topic_name + "__intra").c_str(), qos_profile, false);
if (!subscriber_handle) {
// *INDENT-OFF* (prevent uncrustify from making unecessary indents here)
Expand All @@ -238,10 +242,11 @@ Node::create_subscription(
rclcpp::intra_process_manager::IntraProcessManager::WeakPtr weak_ipm = intra_process_manager;
uint64_t intra_process_subscription_id =
intra_process_manager->add_subscription(sub_base_ptr);
// *INDENT-OFF*
sub->setup_intra_process(
intra_process_subscription_id,
intra_process_subscriber_handle,
[weak_ipm] (
[weak_ipm](
uint64_t publisher_id,
uint64_t message_sequence,
uint64_t subscription_id,
Expand All @@ -255,6 +260,7 @@ Node::create_subscription(
}
ipm->take_intra_process_message(publisher_id, message_sequence, subscription_id, message);
});
// *INDENT-ON*
}
// Assign to a group.
if (group) {
Expand Down
6 changes: 4 additions & 2 deletions rclcpp/include/rclcpp/publisher.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ namespace publisher
class Publisher
{
friend rclcpp::node::Node;

public:
RCLCPP_SMART_PTR_DEFINITIONS(Publisher);

Expand Down Expand Up @@ -107,7 +108,7 @@ class Publisher
}
}

std::string
const std::string &
get_topic_name() const
{
return topic_;
Expand All @@ -119,7 +120,8 @@ class Publisher
return queue_size_;
}

typedef std::function<uint64_t (uint64_t, std::shared_ptr<void>)> StoreSharedMessageCallbackT;
typedef std::function<uint64_t(uint64_t, std::shared_ptr<void>)> StoreSharedMessageCallbackT;

protected:
void
setup_intra_process(
Expand Down
Loading

0 comments on commit 12b939c

Please sign in to comment.