Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add transient local durability support to publisher and subscriptions when using intra-process communication #2303

1 change: 1 addition & 0 deletions rclcpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ set(${PROJECT_NAME}_SRCS
src/rclcpp/context.cpp
src/rclcpp/contexts/default_context.cpp
src/rclcpp/detail/add_guard_condition_to_rcl_wait_set.cpp
src/rclcpp/detail/resolve_intra_process_buffer_type.cpp
src/rclcpp/detail/resolve_parameter_overrides.cpp
src/rclcpp/detail/rmw_implementation_specific_payload.cpp
src/rclcpp/detail/rmw_implementation_specific_publisher_payload.cpp
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,11 @@ resolve_intra_process_buffer_type(
return resolved_buffer_type;
}

RCLCPP_PUBLIC
rclcpp::IntraProcessBufferType
resolve_intra_process_buffer_type(
const rclcpp::IntraProcessBufferType buffer_type);

} // namespace detail

} // namespace rclcpp
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
#ifndef RCLCPP__EXPERIMENTAL__BUFFERS__BUFFER_IMPLEMENTATION_BASE_HPP_
#define RCLCPP__EXPERIMENTAL__BUFFERS__BUFFER_IMPLEMENTATION_BASE_HPP_

#include <vector>

namespace rclcpp
{
namespace experimental
Expand All @@ -31,6 +33,8 @@ class BufferImplementationBase
virtual BufferT dequeue() = 0;
virtual void enqueue(BufferT request) = 0;

virtual std::vector<BufferT> get_all_data() = 0;

virtual void clear() = 0;
virtual bool has_data() const = 0;
virtual size_t available_capacity() const = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include <stdexcept>
#include <type_traits>
#include <utility>
#include <vector>

#include "rclcpp/allocator/allocator_common.hpp"
#include "rclcpp/allocator/allocator_deleter.hpp"
Expand Down Expand Up @@ -66,6 +67,9 @@ class IntraProcessBuffer : public IntraProcessBufferBase

virtual MessageSharedPtr consume_shared() = 0;
virtual MessageUniquePtr consume_unique() = 0;

virtual std::vector<MessageSharedPtr> get_all_data_shared() = 0;
virtual std::vector<MessageUniquePtr> get_all_data_unique() = 0;
};

template<
Expand Down Expand Up @@ -129,6 +133,16 @@ class TypedIntraProcessBuffer : public IntraProcessBuffer<MessageT, Alloc, Messa
return consume_unique_impl<BufferT>();
}

std::vector<MessageSharedPtr> get_all_data_shared() override
{
return get_all_data_shared_impl();
}

std::vector<MessageUniquePtr> get_all_data_unique() override
{
return get_all_data_unique_impl();
}

bool has_data() const override
{
return buffer_->has_data();
Expand Down Expand Up @@ -243,6 +257,71 @@ class TypedIntraProcessBuffer : public IntraProcessBuffer<MessageT, Alloc, Messa
{
return buffer_->dequeue();
}

// MessageSharedPtr to MessageSharedPtr
template<typename T = BufferT>
typename std::enable_if<
std::is_same<T, MessageSharedPtr>::value,
std::vector<MessageSharedPtr>
>::type
get_all_data_shared_impl()
{
return buffer_->get_all_data();
}

// MessageUniquePtr to MessageSharedPtr
template<typename T = BufferT>
typename std::enable_if<
std::is_same<T, MessageUniquePtr>::value,
std::vector<MessageSharedPtr>
>::type
get_all_data_shared_impl()
{
std::vector<MessageSharedPtr> result;
auto uni_ptr_vec = buffer_->get_all_data();
result.reserve(uni_ptr_vec.size());
for (MessageUniquePtr & uni_ptr : uni_ptr_vec) {
result.emplace_back(std::move(uni_ptr));
}
return result;
}

// MessageSharedPtr to MessageUniquePtr
template<typename T = BufferT>
typename std::enable_if<
std::is_same<T, MessageSharedPtr>::value,
std::vector<MessageUniquePtr>
>::type
get_all_data_unique_impl()
{
std::vector<MessageUniquePtr> result;
auto shared_ptr_vec = buffer_->get_all_data();
result.reserve(shared_ptr_vec.size());
for (MessageSharedPtr shared_msg : shared_ptr_vec) {
MessageUniquePtr unique_msg;
MessageDeleter * deleter = std::get_deleter<MessageDeleter, const MessageT>(shared_msg);
auto ptr = MessageAllocTraits::allocate(*message_allocator_.get(), 1);
MessageAllocTraits::construct(*message_allocator_.get(), ptr, *shared_msg);
if (deleter) {
unique_msg = MessageUniquePtr(ptr, *deleter);
} else {
unique_msg = MessageUniquePtr(ptr);
}
result.push_back(std::move(unique_msg));
}
return result;
}

// MessageUniquePtr to MessageUniquePtr
template<typename T = BufferT>
typename std::enable_if<
std::is_same<T, MessageUniquePtr>::value,
std::vector<MessageUniquePtr>
>::type
get_all_data_unique_impl()
{
return buffer_->get_all_data();
}
};

} // namespace buffers
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#ifndef RCLCPP__EXPERIMENTAL__BUFFERS__RING_BUFFER_IMPLEMENTATION_HPP_
#define RCLCPP__EXPERIMENTAL__BUFFERS__RING_BUFFER_IMPLEMENTATION_HPP_

#include <memory>
#include <mutex>
#include <stdexcept>
#include <utility>
Expand Down Expand Up @@ -113,6 +114,17 @@ class RingBufferImplementation : public BufferImplementationBase<BufferT>
return request;
}

/// Get all the elements from the ring buffer
/**
* This member function is thread-safe.
*
* \return a vector containing all the elements from the ring buffer
*/
std::vector<BufferT> get_all_data() override
{
return get_all_data_impl();
}

/// Get the next index value for the ring buffer
/**
* This member function is thread-safe.
Expand Down Expand Up @@ -215,6 +227,71 @@ class RingBufferImplementation : public BufferImplementationBase<BufferT>
return capacity_ - size_;
}

/// Traits for checking if a type is std::unique_ptr
template<typename ...>
struct is_std_unique_ptr final : std::false_type {};
template<class T, typename ... Args>
struct is_std_unique_ptr<std::unique_ptr<T, Args...>> final : std::true_type
{
typedef T Ptr_type;
};

/// Get all the elements from the ring buffer
/**
* This member function is thread-safe.
* Two versions for the implementation of the function.
* One for buffer containing unique_ptr and the other for other types
*
* \return a vector containing all the elements from the ring buffer
*/
template<typename T = BufferT, std::enable_if_t<is_std_unique_ptr<T>::value &&
std::is_copy_constructible<
typename is_std_unique_ptr<T>::Ptr_type
>::value,
void> * = nullptr>
std::vector<BufferT> get_all_data_impl()
{
std::lock_guard<std::mutex> lock(mutex_);
std::vector<BufferT> result_vtr;
result_vtr.reserve(size_);
for (size_t id = 0; id < size_; ++id) {
result_vtr.emplace_back(
new typename is_std_unique_ptr<T>::Ptr_type(
*(ring_buffer_[(read_index_ + id) % capacity_])));
}
return result_vtr;
}

template<typename T = BufferT, std::enable_if_t<
std::is_copy_constructible<T>::value, void> * = nullptr>
std::vector<BufferT> get_all_data_impl()
{
std::lock_guard<std::mutex> lock(mutex_);
std::vector<BufferT> result_vtr;
result_vtr.reserve(size_);
for (size_t id = 0; id < size_; ++id) {
result_vtr.emplace_back(ring_buffer_[(read_index_ + id) % capacity_]);
}
return result_vtr;
}

template<typename T = BufferT, std::enable_if_t<!is_std_unique_ptr<T>::value &&
!std::is_copy_constructible<T>::value, void> * = nullptr>
std::vector<BufferT> get_all_data_impl()
{
throw std::logic_error("Underlined type results in invalid get_all_data_impl()");
return {};
}

template<typename T = BufferT, std::enable_if_t<is_std_unique_ptr<T>::value &&
!std::is_copy_constructible<typename is_std_unique_ptr<T>::Ptr_type>::value,
void> * = nullptr>
std::vector<BufferT> get_all_data_impl()
{
throw std::logic_error("Underlined type in unique_ptr results in invalid get_all_data_impl()");
return {};
}

size_t capacity_;

std::vector<BufferT> ring_buffer_;
Expand Down
Loading