Skip to content

Commit

Permalink
basic ipc implementation from alsora/new_ipc_proposal
Browse files Browse the repository at this point in the history
Signed-off-by: alberto <alberto.soragna@gmail.com>

better use of node_topic create subscription

Signed-off-by: alberto <alberto.soragna@gmail.com>

added intra process manager test

Signed-off-by: alberto <alberto.soragna@gmail.com>

fixed ring buffer and added test

Signed-off-by: alberto <alberto.soragna@gmail.com>

added intra process buffer test

Signed-off-by: alberto <alberto.soragna@gmail.com>

added intra process buffer test

Signed-off-by: alberto <alberto.soragna@gmail.com>

Signed-off-by: alberto <alberto.soragna@gmail.com>

removed intra-process methods from subscription base

Signed-off-by: alberto <alberto.soragna@gmail.com>

using lock_guard instead of unique_lock, renamed var without camel case

Signed-off-by: alberto <alberto.soragna@gmail.com>

using unordered set and references in intra process manager

Signed-off-by: alberto <alberto.soragna@gmail.com>

subscription intra-process does not depend anymore on subscription, but has a copy of the callback

Signed-off-by: alberto <alberto.soragna@gmail.com>

changed buffer API to use rvo

Signed-off-by: Alberto <alberto.soragna@gmail.com>

avoid copying shared_ptr

Signed-off-by: alberto <alberto.soragna@gmail.com>

revert not needed changes to create_subscription

Signed-off-by: alberto <alberto.soragna@gmail.com>

updated tests according to new buffer APIs

Signed-off-by: alberto <alberto.soragna@gmail.com>

updated types in ring buffer implementation avoid using uint32_t

Signed-off-by: alberto <alberto.soragna@gmail.com>

using unique ptr for buffers in subscription_intra_process

Signed-off-by: alberto <alberto.soragna@gmail.com>

added missing std::move in subscription_intra_process constructor

Signed-off-by: alberto <alberto.soragna@gmail.com>

use consisting names for ring_buffer_implementation members

Signed-off-by: alberto <alberto.soragna@gmail.com>

addressing typos, one-liners and similar from ivanpauno review

Signed-off-by: alberto <alberto.soragna@gmail.com>

moved subscription_intra_process_base to its own files and moved non templated method from derived class

Signed-off-by: alberto <alberto.soragna@gmail.com>

removed forward declarations, fixed include subscription_intra_process_base

Signed-off-by: alberto <alberto.soragna@gmail.com>

removed member variable from do_intra_process_publish signature

Signed-off-by: alberto <alberto.soragna@gmail.com>

declare public before private in intra_process_manager_impl

Signed-off-by: alberto <alberto.soragna@gmail.com>

made matches_any_intra_process_publishers const

Signed-off-by: alberto <alberto.soragna@gmail.com>

using const reference in get_all_matching_publishers

Signed-off-by: alberto <alberto.soragna@gmail.com>

added deleter and alloc templates in intra_process_buffer

Signed-off-by: alberto <alberto.soragna@gmail.com>

added RCLCPP_WARN to intra_process_manager_impl

Signed-off-by: alberto <alberto.soragna@gmail.com>

passing context from node to subscription_intra_process

Signed-off-by: alberto <alberto.soragna@gmail.com>

using allocators in intra_process_manager

Signed-off-by: alberto <alberto.soragna@gmail.com>

use size_t instead of int in ring buffer indices

Signed-off-by: alberto <alberto.soragna@gmail.com>

creating buffer inside subscription_intra_process constructor

Signed-off-by: alberto <alberto.soragna@gmail.com>

fix lint errors

Signed-off-by: alberto <alberto.soragna@gmail.com>

throw error if trying to dequeue when buffer empty; remove duplicated methods in intra_process_buffer

Signed-off-by: alberto <alberto.soragna@gmail.com>

added todo for creating an rmw function for checking qos compatibility

Signed-off-by: alberto <alberto.soragna@gmail.com>

test fixes

Signed-off-by: alberto <alberto.soragna@gmail.com>

refactored intra_process_manager, removed ipm impl

Signed-off-by: alberto <alberto.soragna@gmail.com>

added mutex in intra_process_manager add_* methods

Signed-off-by: Soragna, Alberto <alberto.soragna@gmail.com>

added allocator to intra_process_buffer

Signed-off-by: Soragna, Alberto <alberto.soragna@gmail.com>

added invalid intra_process qos test for subscription

Signed-off-by: Soragna, Alberto <alberto.soragna@gmail.com>

throw error if history size is 0 with keep last and ipc

Signed-off-by: Soragna, Alberto <alberto.soragna@gmail.com>

using allocator when creating unique_ptr from shared_ptr

Signed-off-by: Soragna, Alberto <alberto.soragna@gmail.com>

adding deleter template argument to intra_process buffer

Signed-off-by: Soragna, Alberto <alberto.soragna@gmail.com>

fix linter

Signed-off-by: Soragna, Alberto <alberto.soragna@gmail.com>

throw error with callbackT different from messageT

Signed-off-by: Soragna, Alberto <alberto.soragna@gmail.com>

updated deleter template argument in subscription factory

Signed-off-by: Soragna, Alberto <alberto.soragna@gmail.com>

Fix typo in test fixture tear down method name (ros2#787)

Signed-off-by: Jacob Perron <jacob@openrobotics.org>

Add free function for creating service clients (ros2#788)

Equivalent to the free function for creating a service.
Resolves ros2#768

Signed-off-by: Jacob Perron <jacob@openrobotics.org>

Cmake infrastructure for creating components (ros2#784)

*cmake macro to create components for libraries with multiple nodes

Signed-off-by: Siddharth Kucheria <kucheria@usc.edu>

Allow registering multiple on_parameters_set_callback (ros2#772)

Signed-off-by: ivanpauno <ivanpauno@ekumenlabs.com>

fix for multiple nodes not being recognized (ros2#790)

Signed-off-by: Siddharth Kucheria <kucheria@usc.edu>

Remove non-package from ament_target_dependencies() (ros2#793)

Signed-off-by: Shane Loretz <sloretz@osrfoundation.org>

fix linter issue (ros2#795)

Signed-off-by: Siddharth Kucheria <kucheria@usc.edu>

Make TimeSource ignore use_sim_time events coming from other nodes. (ros2#799)

Signed-off-by: Michel Hidalgo <michel@ekumenlabs.com>

passing deleter template parameter

Signed-off-by: Soragna, Alberto <alberto.soragna@gmail.com>

small fixes for failing tests

Signed-off-by: Soragna, Alberto <alberto.soragna@gmail.com>

fixed imports in test_intra_process_manager

Signed-off-by: Soragna, Alberto <alberto.soragna@gmail.com>

using RCLCPP_SMART_PTR_ALIASES_ONLY and RCLCPP_PUBLIC macros

Signed-off-by: Soragna, Alberto <alberto.soragna@gmail.com>

added RCLCPP_PUBLIC macros and virtual destructor to sub intra_process base

Signed-off-by: Soragna, Alberto <alberto.soragna@gmail.com>

added unique_ptr alias to macros

Signed-off-by: Soragna, Alberto <alberto.soragna@gmail.com>

updated test_intra_process_manager.cpp

Signed-off-by: Soragna, Alberto <alberto.soragna@gmail.com>

remove mock msgs from rclcpp (ros2#800)

Signed-off-by: Karsten Knese <karsten@openrobotics.org>

Add line break after first open paren in multiline function call (ros2#785)

* Add line break after first open paren in multiline function call

as per developer guide:
https://index.ros.org/doc/ros2/Contributing/Developer-Guide/#open-versus-cuddled-braces
see ament/ament_lint#148

Signed-off-by: Dan Rose <dan@digilabs.io>

Fix dedent when first function argument starts with a brace

Signed-off-by: Dan Rose <dan@digilabs.io>

Line break with multiline if condition
Remove line breaks where allowed.

Signed-off-by: Dan Rose <dan@digilabs.io>

Fixup after rebase

Signed-off-by: Dan Rose <dan@digilabs.io>

Fixup again after reverting indent_paren_open_brace

Signed-off-by: Dan Rose <dan@digilabs.io>

* Revert comment spacing change, condense some lines

Signed-off-by: Dan Rose <dan@digilabs.io>

Adapt to '--ros-args ... [--]'-based ROS args extraction (ros2#816)

* Use --ros-args to deal with node arguments in rclcpp.

Signed-off-by: Michel Hidalgo <michel@ekumenlabs.com>

* Document implicit --ros-args flag in NodeOptions::arguments().

Signed-off-by: Michel Hidalgo <michel@ekumenlabs.com>

* Add missing size_t to int cast.

Signed-off-by: Michel Hidalgo <michel@ekumenlabs.com>

* Only add implicit --ros-args flag if not present already.

Signed-off-by: Michel Hidalgo <michel@ekumenlabs.com>

* Add some rclcpp::NodeOptions test coverage.

Signed-off-by: Michel Hidalgo <michel@ekumenlabs.com>

* Address peer review comments.

Signed-off-by: Michel Hidalgo <michel@ekumenlabs.com>

* Please cpplint and uncrustify.

Signed-off-by: Michel Hidalgo <michel@ekumenlabs.com>

Guard against making multiple result requests for a goal handle (ros2#808)

This fixes a runtime error caused by a race condition when making consecutive requests for the
result.
Specifically, this happens if the user provides a result callback when sending a goal and then
calls async_get_result shortly after.

Resolves ros2#783

Signed-off-by: Jacob Perron <jacob@openrobotics.org>

Explain return value of spin_until_future_complete (ros2#792)

Signed-off-by: Dan Rose <dan@digilabs.io>

Allow passing logger by const ref (ros2#820)

Signed-off-by: Karsten Knese <karsten@openrobotics.org>

Delete unnecessary call for get_node_by_group (ros2#823)

Signed-off-by: Tomoya.Fujita <Tomoya.Fujita@sony.com>

Fix get_node_interfaces functions taking a pointer (ros2#821)

Signed-off-by: ivanpauno <ivanpauno@ekumenlabs.com>

add callback group as member variable and constructor arg (ros2#811)

Signed-off-by: bpwilcox <bpwilcox@eng.ucsd.edu>

remove callback group as member variable

Wrap documentation examples in code blocks (ros2#830)

This makes the code examples easier to read in the generated documentation.

Signed-off-by: Jacob Perron <jacob@openrobotics.org>

Crash in callback group pointer vector iterator (ros2#814)

Signed-off-by: Guillaume Autran <gautran@clearpath.ai>

add mutex in add/remove_node and wait_for_work to protect concurrent use/change of memory_strategy_ (ros2#837)

Signed-off-by: Dirk Thomas <dirk-thomas@users.noreply.github.com>

Fix hang with timers in MultiThreadedExecutor (ros2#835) (ros2#836)

Signed-off-by: Todd Malsbary <todd.malsbary@intel.com>

Use of -r/--remap flags where appropriate. (ros2#834)

Signed-off-by: Michel Hidalgo <michel@ekumenlabs.com>

Force explicit --ros-args in NodeOptions::arguments(). (ros2#845)

Signed-off-by: Michel Hidalgo <michel@ekumenlabs.com>

Fail on invalid and unknown ROS specific arguments (ros2#842)

* Fail on invalid and unknown ROS specific arguments.

Signed-off-by: Michel Hidalgo <michel@ekumenlabs.com>

* Revert changes to utilities.hpp in rclcpp

Signed-off-by: Michel Hidalgo <michel@ekumenlabs.com>

* Fully revert change to utilities.hpp

Signed-off-by: Michel Hidalgo <michel@ekumenlabs.com>

Fix typo in deprecated warning. (ros2#848)

"it's" instead of its

Signed-off-by: Luca Della Vedova <luca@openrobotics.org>

Add throwing parameter name if parameter is not set (ros2#833)

* added throwing parameter name if parameter is not set

Signed-off-by: Alex <cvbn127@gmail.com>
Signed-off-by: ivanpauno <ivanpauno@ekumenlabs.com>

check valid timer handler 1st to reduce the time window for scan. (ros2#841)

Signed-off-by: Tomoya.Fujita <Tomoya.Fujita@sony.com>

remove features and related code which were deprecated in dashing (ros2#852)

Signed-off-by: William Woodall <william@osrfoundation.org>

reset error message before setting a new one, embed the original one (ros2#854)

Signed-off-by: Dirk Thomas <dirk-thomas@users.noreply.github.com>

restored virtual destructor in publisher_base

Signed-off-by: Soragna, Alberto <alberto.soragna@gmail.com>
  • Loading branch information
alsora authored and wjwwood committed Oct 19, 2019
1 parent 8fd9a0a commit 60deefe
Show file tree
Hide file tree
Showing 38 changed files with 2,256 additions and 2,401 deletions.
27 changes: 19 additions & 8 deletions rclcpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ set(${PROJECT_NAME}_SRCS
src/rclcpp/graph_listener.cpp
src/rclcpp/init_options.cpp
src/rclcpp/intra_process_manager.cpp
src/rclcpp/intra_process_manager_impl.cpp
src/rclcpp/logger.cpp
src/rclcpp/memory_strategies.cpp
src/rclcpp/memory_strategy.cpp
Expand Down Expand Up @@ -75,6 +74,7 @@ set(${PROJECT_NAME}_SRCS
src/rclcpp/service.cpp
src/rclcpp/signal_handler.cpp
src/rclcpp/subscription_base.cpp
src/rclcpp/subscription_intra_process_base.cpp
src/rclcpp/time.cpp
src/rclcpp/time_source.cpp
src/rclcpp/timer.cpp
Expand Down Expand Up @@ -200,25 +200,36 @@ if(BUILD_TESTING)
"rosidl_typesupport_cpp"
)
endif()
ament_add_gtest(test_mapped_ring_buffer test/test_mapped_ring_buffer.cpp)
if(TARGET test_mapped_ring_buffer)
ament_target_dependencies(test_mapped_ring_buffer
ament_add_gmock(test_intra_process_manager test/test_intra_process_manager.cpp)
if(TARGET test_intra_process_manager)
ament_target_dependencies(test_intra_process_manager
"rcl"
"rcl_interfaces"
"rmw"
"rosidl_generator_cpp"
"rosidl_typesupport_cpp"
)
target_link_libraries(test_intra_process_manager ${PROJECT_NAME})
endif()
ament_add_gtest(test_intra_process_manager test/test_intra_process_manager.cpp)
if(TARGET test_intra_process_manager)
ament_target_dependencies(test_intra_process_manager
"rcl"
ament_add_gtest(test_ring_buffer_implementation test/test_ring_buffer_implementation.cpp)
if(TARGET test_ring_buffer_implementation)
ament_target_dependencies(test_ring_buffer_implementation
"rcl_interfaces"
"rmw"
"rosidl_generator_cpp"
"rosidl_typesupport_cpp"
)
target_link_libraries(test_ring_buffer_implementation ${PROJECT_NAME})
endif()
ament_add_gtest(test_intra_process_buffer test/test_intra_process_buffer.cpp)
if(TARGET test_intra_process_buffer)
ament_target_dependencies(test_intra_process_buffer
"rcl_interfaces"
"rmw"
"rosidl_generator_cpp"
"rosidl_typesupport_cpp"
)
target_link_libraries(test_intra_process_buffer ${PROJECT_NAME})
endif()

ament_add_gtest(test_loaned_message test/test_loaned_message.cpp)
Expand Down
2 changes: 1 addition & 1 deletion rclcpp/include/rclcpp/any_subscription_callback.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ class AnySubscriptionCallback
TRACEPOINT(callback_end, (const void *)this);
}

bool use_take_shared_method()
bool use_take_shared_method() const
{
return const_shared_ptr_callback_ || const_shared_ptr_with_info_callback_;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2015 Open Source Robotics Foundation, Inc.
// Copyright 2019 Open Source Robotics Foundation, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand All @@ -12,12 +12,26 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#include "rclcpp/intra_process_manager_impl.hpp"
#ifndef RCLCPP__BUFFERS__BUFFER_IMPLEMENTATION_BASE_HPP_
#define RCLCPP__BUFFERS__BUFFER_IMPLEMENTATION_BASE_HPP_

#include <memory>
namespace rclcpp
{
namespace intra_process_buffer
{

rclcpp::intra_process_manager::IntraProcessManagerImplBase::SharedPtr
rclcpp::intra_process_manager::create_default_impl()
template<typename BufferT>
class BufferImplementationBase
{
return std::make_shared<IntraProcessManagerImpl<>>();
}
public:
virtual BufferT dequeue() = 0;
virtual void enqueue(BufferT request) = 0;

virtual void clear() = 0;
virtual bool has_data() const = 0;
};

} // namespace intra_process_buffer
} // namespace rclcpp

#endif // RCLCPP__BUFFERS__BUFFER_IMPLEMENTATION_BASE_HPP_
230 changes: 230 additions & 0 deletions rclcpp/include/rclcpp/buffers/intra_process_buffer.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,230 @@
// Copyright 2019 Open Source Robotics Foundation, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#ifndef RCLCPP__BUFFERS__INTRA_PROCESS_BUFFER_HPP_
#define RCLCPP__BUFFERS__INTRA_PROCESS_BUFFER_HPP_

#include <memory>
#include <type_traits>
#include <utility>

#include "rclcpp/allocator/allocator_common.hpp"
#include "rclcpp/allocator/allocator_deleter.hpp"
#include "rclcpp/buffers/buffer_implementation_base.hpp"

namespace rclcpp
{
namespace intra_process_buffer
{

class IntraProcessBufferBase
{
public:
RCLCPP_SMART_PTR_ALIASES_ONLY(IntraProcessBufferBase)

virtual void clear() = 0;

virtual bool has_data() const = 0;
virtual bool use_take_shared_method() const = 0;
};

template<
typename MessageT,
typename Alloc = std::allocator<void>,
typename MessageDeleter = std::default_delete<MessageT>>
class IntraProcessBuffer : public IntraProcessBufferBase
{
public:
RCLCPP_SMART_PTR_ALIASES_ONLY(IntraProcessBuffer)

using MessageUniquePtr = std::unique_ptr<MessageT, MessageDeleter>;
using MessageSharedPtr = std::shared_ptr<const MessageT>;

virtual void add_shared(MessageSharedPtr msg) = 0;
virtual void add_unique(MessageUniquePtr msg) = 0;

virtual MessageSharedPtr consume_shared() = 0;
virtual MessageUniquePtr consume_unique() = 0;
};

template<
typename MessageT,
typename Alloc = std::allocator<void>,
typename MessageDeleter = std::default_delete<MessageT>,
typename BufferT = std::unique_ptr<MessageT>>
class TypedIntraProcessBuffer : public IntraProcessBuffer<MessageT, Alloc, MessageDeleter>
{
public:
RCLCPP_SMART_PTR_DEFINITIONS(TypedIntraProcessBuffer)

using MessageAllocTraits = allocator::AllocRebind<MessageT, Alloc>;
using MessageAlloc = typename MessageAllocTraits::allocator_type;
using MessageUniquePtr = std::unique_ptr<MessageT, MessageDeleter>;
using MessageSharedPtr = std::shared_ptr<const MessageT>;

TypedIntraProcessBuffer(
std::unique_ptr<BufferImplementationBase<BufferT>> buffer_impl,
std::shared_ptr<Alloc> allocator = nullptr)
{
bool valid_type = (std::is_same<BufferT, MessageSharedPtr>::value ||
std::is_same<BufferT, MessageUniquePtr>::value);
if (!valid_type) {
throw std::runtime_error("Creating TypedIntraProcessBuffer with not valid BufferT");
}

buffer_ = std::move(buffer_impl);

if (!allocator) {
message_allocator_ = std::make_shared<MessageAlloc>();
} else {
message_allocator_ = std::make_shared<MessageAlloc>(*allocator.get());
}
}

void add_shared(MessageSharedPtr msg)
{
add_shared_impl<BufferT>(std::move(msg));
}

void add_unique(MessageUniquePtr msg)
{
buffer_->enqueue(std::move(msg));
}

MessageSharedPtr consume_shared()
{
return consume_shared_impl<BufferT>();
}

MessageUniquePtr consume_unique()
{
return consume_unique_impl<BufferT>();
}

bool has_data() const
{
return buffer_->has_data();
}

void clear()
{
buffer_->clear();
}

bool use_take_shared_method() const
{
return std::is_same<BufferT, MessageSharedPtr>::value;
}

private:
std::unique_ptr<BufferImplementationBase<BufferT>> buffer_;

std::shared_ptr<MessageAlloc> message_allocator_;

// MessageSharedPtr to MessageSharedPtr
template<typename DestinationT>
typename std::enable_if<
std::is_same<DestinationT, MessageSharedPtr>::value
>::type
add_shared_impl(MessageSharedPtr shared_msg)
{
buffer_->enqueue(std::move(shared_msg));
}

// MessageSharedPtr to MessageUniquePtr
template<typename DestinationT>
typename std::enable_if<
std::is_same<DestinationT, MessageUniquePtr>::value
>::type
add_shared_impl(MessageSharedPtr shared_msg)
{
// This should not happen: here a copy is unconditionally made, while the intra-process manager
// can decide whether a copy is needed depending on the number and the type of buffers

MessageUniquePtr unique_msg;
MessageDeleter * deleter = std::get_deleter<MessageDeleter, const MessageT>(shared_msg);
auto ptr = MessageAllocTraits::allocate(*message_allocator_.get(), 1);
MessageAllocTraits::construct(*message_allocator_.get(), ptr, *shared_msg);
if (deleter) {
unique_msg = MessageUniquePtr(ptr, *deleter);
} else {
unique_msg = MessageUniquePtr(ptr);
}

buffer_->enqueue(std::move(unique_msg));
}

// MessageSharedPtr to MessageSharedPtr
template<typename OriginT>
typename std::enable_if<
(std::is_same<OriginT, MessageSharedPtr>::value),
MessageSharedPtr
>::type
consume_shared_impl()
{
return buffer_->dequeue();
}

// MessageUniquePtr to MessageSharedPtr
template<typename OriginT>
typename std::enable_if<
(std::is_same<OriginT, MessageUniquePtr>::value),
MessageSharedPtr
>::type
consume_shared_impl()
{
// automatic cast from unique ptr to shared ptr
return buffer_->dequeue();
}

// MessageSharedPtr to MessageUniquePtr
template<typename OriginT>
typename std::enable_if<
(std::is_same<OriginT, MessageSharedPtr>::value),
MessageUniquePtr
>::type
consume_unique_impl()
{
MessageSharedPtr buffer_msg = buffer_->dequeue();

MessageUniquePtr unique_msg;
MessageDeleter * deleter = std::get_deleter<MessageDeleter, const MessageT>(buffer_msg);
auto ptr = MessageAllocTraits::allocate(*message_allocator_.get(), 1);
MessageAllocTraits::construct(*message_allocator_.get(), ptr, *buffer_msg);
if (deleter) {
unique_msg = MessageUniquePtr(ptr, *deleter);
} else {
unique_msg = MessageUniquePtr(ptr);
}

return unique_msg;
}

// MessageUniquePtr to MessageUniquePtr
template<typename OriginT>
typename std::enable_if<
(std::is_same<OriginT, MessageUniquePtr>::value),
MessageUniquePtr
>::type
consume_unique_impl()
{
return buffer_->dequeue();
}
};

} // namespace intra_process_buffer
} // namespace rclcpp


#endif // RCLCPP__BUFFERS__INTRA_PROCESS_BUFFER_HPP_
Loading

0 comments on commit 60deefe

Please sign in to comment.