Skip to content

Commit

Permalink
* updated interface to SubscriptionIntraProcessBase
Browse files Browse the repository at this point in the history
* implemented interface to deserialize a given serialized message to ros message

Signed-off-by: Joshua Hampp <j.hampp@denso-adas.de>
  • Loading branch information
Joshua Hampp committed Apr 22, 2020
1 parent 306d58c commit fcaea76
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 0 deletions.
29 changes: 29 additions & 0 deletions rclcpp/include/rclcpp/experimental/subscription_intra_process.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
#include "rclcpp/experimental/buffers/intra_process_buffer.hpp"
#include "rclcpp/experimental/create_intra_process_buffer.hpp"
#include "rclcpp/experimental/subscription_intra_process_base.hpp"
#include "rclcpp/serialization.hpp"
#include "rclcpp/serialized_message.hpp"
#include "rclcpp/type_support_decl.hpp"
#include "rclcpp/waitable.hpp"
#include "tracetools/tracetools.h"
Expand Down Expand Up @@ -72,6 +74,12 @@ class SubscriptionIntraProcess : public SubscriptionIntraProcessBase
throw std::runtime_error("SubscriptionIntraProcess wrong callback type");
}

if (!allocator) {
message_allocator_ = std::make_shared<MessageAlloc>();
} else {
message_allocator_ = std::make_shared<MessageAlloc>(*allocator.get());
}

// Create the intra-process buffer.
buffer_ = rclcpp::experimental::create_intra_process_buffer<MessageT, Alloc, Deleter>(
buffer_type,
Expand Down Expand Up @@ -102,6 +110,12 @@ class SubscriptionIntraProcess : public SubscriptionIntraProcessBase
#endif
}

bool
is_serialized() const
{
return rclcpp::is_serialized_message_class<MessageT>::value;
}

bool
is_ready(rcl_wait_set_t * wait_set)
{
Expand Down Expand Up @@ -134,6 +148,20 @@ class SubscriptionIntraProcess : public SubscriptionIntraProcessBase
return buffer_->use_take_shared_method();
}

void
provide_serialized_intra_process_message(const SerializedMessage & serialized_message)
{
rclcpp::Serialization<MessageT> serialization;

auto ptr = MessageAllocTraits::allocate(*message_allocator_.get(), 1);
MessageAllocTraits::construct(*message_allocator_.get(), ptr);
auto message = MessageUniquePtr(ptr);

serialization.deserialize_message(serialized_message, *message);

provide_intra_process_message(std::move(message));
}

private:
void
trigger_guard_condition()
Expand Down Expand Up @@ -168,6 +196,7 @@ class SubscriptionIntraProcess : public SubscriptionIntraProcessBase

AnySubscriptionCallback<CallbackMessageT, Alloc> any_callback_;
BufferUniquePtr buffer_;
std::shared_ptr<MessageAlloc> message_allocator_;
};

} // namespace experimental
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@

#include "rcl/error_handling.h"

#include "rclcpp/serialization.hpp"
#include "rclcpp/serialized_message.hpp"
#include "rclcpp/type_support_decl.hpp"
#include "rclcpp/waitable.hpp"

Expand Down Expand Up @@ -70,6 +72,13 @@ class SubscriptionIntraProcessBase : public rclcpp::Waitable
rmw_qos_profile_t
get_actual_qos() const;

virtual bool
is_serialized() const = 0;

virtual void
provide_serialized_intra_process_message(
const SerializedMessage & serialized_message) = 0;

protected:
std::recursive_mutex reentrant_mutex_;
rcl_guard_condition_t gc_;
Expand Down

0 comments on commit fcaea76

Please sign in to comment.