Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for taking a sequence of messages #614

Merged
merged 7 commits into from
Apr 24, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 57 additions & 3 deletions rcl/include/rcl/subscription.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ extern "C"
#include "rcl/node.h"
#include "rcl/visibility_control.h"

#include "rmw/message_sequence.h"

/// Internal rcl implementation struct.
struct rcl_subscription_impl_t;

Expand Down Expand Up @@ -203,7 +205,7 @@ rcl_subscription_get_default_options(void);
/// Take a ROS message from a topic using a rcl subscription.
/**
* It is the job of the caller to ensure that the type of the ros_message
* argument and the type associate with the subscription, via the type
* argument and the type associated with the subscription, via the type
* support, match.
* Passing a different type to rcl_take produces undefined behavior and cannot
* be checked by this function and therefore no deliberate error will occur.
Expand All @@ -227,7 +229,7 @@ rcl_subscription_get_default_options(void);
* be allocated for a dynamically sized array in the target message, then the
* allocator given in the subscription options is used.
*
* The rmw message_info struct contains meta information about this particular
* The rmw_message_info struct contains meta information about this particular
* message instance, like what the GUID of the publisher which published it
* originally or whether or not the message received from within the same
* process.
Expand All @@ -248,7 +250,7 @@ rcl_subscription_get_default_options(void);
* \param[inout] ros_message type-erased ptr to a allocated ROS message
* \param[out] message_info rmw struct which contains meta-data for the message
* \param[in] allocation structure pointer used for memory preallocation (may be NULL)
* \return `RCL_RET_OK` if the message was published, or
* \return `RCL_RET_OK` if the message was taken, or
* \return `RCL_RET_INVALID_ARGUMENT` if any arguments are invalid, or
* \return `RCL_RET_SUBSCRIPTION_INVALID` if the subscription is invalid, or
* \return `RCL_RET_BAD_ALLOC` if allocating memory failed, or
Expand All @@ -266,6 +268,58 @@ rcl_take(
rmw_subscription_allocation_t * allocation
);

/// Take a sequence of messages from a topic using a rcl subscription.
/**
* In contrast to `rcl_take`, this function can take multiple messages at
* the same time.
* It is the job of the caller to ensure that the type of the message_sequence
* argument and the type associated with the subscription, via the type
* support, match.
*
* The message_sequence pointer should point to an already allocated sequence
* of ROS messages of the correct type, into which the taken ROS messages will
* be copied if messages are available.
* The message_sequence `size` member will be set to the number of messages
* correctly taken.
*
* The rmw_message_info_sequence struct contains meta information about the
* corresponding message instance index.
* The message_info_sequence argument should be an already allocated
* rmw_message_info_sequence_t structure.
*
* <hr>
* Attribute | Adherence
* ------------------ | -------------
* Allocates Memory | Maybe [1]
* Thread-Safe | No
* Uses Atomics | No
* Lock-Free | Yes
* <i>[1] only if storage in the serialized_message is insufficient</i>
*
* \param[in] subscription the handle to the subscription from which to take.
* \param[in] count number of messages to attempt to take.
* \param[inout] message_sequence pointer to a (pre-allocated) message sequence.
* \param[inout] message_info_sequence pointer to a (pre-allocated) message info sequence.
* \param[in] allocation structure pointer used for memory preallocation (may be NULL)
* \return `RCL_RET_OK` if one or more messages was taken, or
* \return `RCL_RET_INVALID_ARGUMENT` if any arguments are invalid, or
* \return `RCL_RET_SUBSCRIPTION_INVALID` if the subscription is invalid, or
* \return `RCL_RET_BAD_ALLOC` if allocating memory failed, or
* \return `RCL_RET_SUBSCRIPTION_TAKE_FAILED` if take failed but no error
* occurred in the middleware, or
* \return `RCL_RET_ERROR` if an unspecified error occurs.
*/
RCL_PUBLIC
RCL_WARN_UNUSED
rcl_ret_t
rcl_take_sequence(
jacobperron marked this conversation as resolved.
Show resolved Hide resolved
const rcl_subscription_t * subscription,
size_t count,
rmw_message_sequence_t * message_sequence,
rmw_message_info_sequence_t * message_info_sequence,
rmw_subscription_allocation_t * allocation
);

/// Take a serialized raw message from a topic using a rcl subscription.
/**
* In contrast to `rcl_take`, this function stores the taken message in
Expand Down
46 changes: 46 additions & 0 deletions rcl/src/rcl/subscription.c
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,52 @@ rcl_take(
return RCL_RET_OK;
}

rcl_ret_t
rcl_take_sequence(
const rcl_subscription_t * subscription,
size_t count,
rmw_message_sequence_t * message_sequence,
rmw_message_info_sequence_t * message_info_sequence,
rmw_subscription_allocation_t * allocation
)
{
// Set the sizes to zero to indicate that there are no valid messages
message_sequence->size = 0u;
message_info_sequence->size = 0u;

RCUTILS_LOG_DEBUG_NAMED(ROS_PACKAGE_NAME, "Subscription taking %zu messages", count);
if (!rcl_subscription_is_valid(subscription)) {
return RCL_RET_SUBSCRIPTION_INVALID; // error message already set
}
RCL_CHECK_ARGUMENT_FOR_NULL(message_sequence, RCL_RET_INVALID_ARGUMENT);
RCL_CHECK_ARGUMENT_FOR_NULL(message_info_sequence, RCL_RET_INVALID_ARGUMENT);

if (message_sequence->capacity < count) {
RCL_SET_ERROR_MSG("Insufficient message sequence capacity for requested count");
return RCL_RET_INVALID_ARGUMENT;
}

if (message_info_sequence->capacity < count) {
RCL_SET_ERROR_MSG("Insufficient message info sequence capacity for requested count");
return RCL_RET_INVALID_ARGUMENT;
}

size_t taken = 0u;
rmw_ret_t ret = rmw_take_sequence(
subscription->impl->rmw_handle, count, message_sequence, message_info_sequence, &taken,
allocation);
if (ret != RMW_RET_OK) {
RCL_SET_ERROR_MSG(rmw_get_error_string().str);
return rcl_convert_rmw_ret_to_rcl_ret(ret);
}
RCUTILS_LOG_DEBUG_NAMED(
ROS_PACKAGE_NAME, "Subscription took %zu messages", taken);
if (0u == taken) {
return RCL_RET_SUBSCRIPTION_TAKE_FAILED;
}
return RCL_RET_OK;
}

rcl_ret_t
rcl_take_serialized_message(
const rcl_subscription_t * subscription,
Expand Down
152 changes: 152 additions & 0 deletions rcl/test/rcl/test_subscription.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -278,3 +278,155 @@ TEST_F(CLASSNAME(TestSubscriptionFixture, RMW_IMPLEMENTATION), test_subscription
ASSERT_EQ(std::string(test_string), std::string(msg.string_value.data, msg.string_value.size));
}
}

/* Basic nominal test of a subscription taking a sequence.
*/
TEST_F(
CLASSNAME(
TestSubscriptionFixture,
RMW_IMPLEMENTATION), test_subscription_nominal_string_sequence) {
rcl_ret_t ret;
rcl_publisher_t publisher = rcl_get_zero_initialized_publisher();
const rosidl_message_type_support_t * ts =
ROSIDL_GET_MSG_TYPE_SUPPORT(test_msgs, msg, Strings);
const char * topic = "rcl_test_subscription_nominal_string_sequence_chatter";
rcl_publisher_options_t publisher_options = rcl_publisher_get_default_options();
ret = rcl_publisher_init(&publisher, this->node_ptr, ts, topic, &publisher_options);
ASSERT_EQ(RCL_RET_OK, ret) << rcl_get_error_string().str;
OSRF_TESTING_TOOLS_CPP_SCOPE_EXIT(
{
rcl_ret_t ret = rcl_publisher_fini(&publisher, this->node_ptr);
EXPECT_EQ(RCL_RET_OK, ret) << rcl_get_error_string().str;
});
rcl_subscription_t subscription = rcl_get_zero_initialized_subscription();
rcl_subscription_options_t subscription_options = rcl_subscription_get_default_options();
ret = rcl_subscription_init(&subscription, this->node_ptr, ts, topic, &subscription_options);
ASSERT_EQ(RCL_RET_OK, ret) << rcl_get_error_string().str;
OSRF_TESTING_TOOLS_CPP_SCOPE_EXIT(
{
rcl_ret_t ret = rcl_subscription_fini(&subscription, this->node_ptr);
EXPECT_EQ(RCL_RET_OK, ret) << rcl_get_error_string().str;
});
// TODO(wjwwood): add logic to wait for the connection to be established
mjcarroll marked this conversation as resolved.
Show resolved Hide resolved
// probably using the count_subscriptions busy wait mechanism
// until then we will sleep for a short period of time
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
const char * test_string = "testing";
{
test_msgs__msg__Strings msg;
test_msgs__msg__Strings__init(&msg);
ASSERT_TRUE(rosidl_runtime_c__String__assign(&msg.string_value, test_string));
ret = rcl_publish(&publisher, &msg, nullptr);
ret = rcl_publish(&publisher, &msg, nullptr);
ret = rcl_publish(&publisher, &msg, nullptr);
test_msgs__msg__Strings__fini(&msg);
ASSERT_EQ(RCL_RET_OK, ret) << rcl_get_error_string().str;
}
bool success;
wait_for_subscription_to_be_ready(&subscription, context_ptr, 10, 100, success);
ASSERT_TRUE(success);
auto allocator = rcutils_get_default_allocator();
{
size_t size = 1;
rmw_message_info_sequence_t message_infos;
rmw_message_info_sequence_init(&message_infos, size, &allocator);

rmw_message_sequence_t messages;
rmw_message_sequence_init(&messages, size, &allocator);

auto seq = test_msgs__msg__Strings__Sequence__create(size);

for (size_t ii = 0; ii < size; ++ii) {
messages.data[ii] = &seq->data[ii];
}

OSRF_TESTING_TOOLS_CPP_SCOPE_EXIT(
{
rmw_message_info_sequence_fini(&message_infos);
rmw_message_sequence_fini(&messages);
test_msgs__msg__Strings__Sequence__destroy(seq);
});

// Attempt to take more than capacity allows.
ret = rcl_take_sequence(&subscription, 5, &messages, &message_infos, nullptr);
ASSERT_EQ(RCL_RET_INVALID_ARGUMENT, ret) << rcl_get_error_string().str;

ASSERT_EQ(0u, messages.size);
ASSERT_EQ(0u, message_infos.size);
}

{
size_t size = 5;
rmw_message_info_sequence_t message_infos;
rmw_message_info_sequence_init(&message_infos, size, &allocator);

rmw_message_sequence_t messages;
rmw_message_sequence_init(&messages, size, &allocator);

auto seq = test_msgs__msg__Strings__Sequence__create(size);

for (size_t ii = 0; ii < size; ++ii) {
messages.data[ii] = &seq->data[ii];
}

OSRF_TESTING_TOOLS_CPP_SCOPE_EXIT(
{
rmw_message_info_sequence_fini(&message_infos);
rmw_message_sequence_fini(&messages);
test_msgs__msg__Strings__Sequence__destroy(seq);
});

ret = rcl_take_sequence(&subscription, 5, &messages, &message_infos, nullptr);
ASSERT_EQ(RCL_RET_OK, ret) << rcl_get_error_string().str;
ASSERT_EQ(3u, messages.size);
ASSERT_EQ(3u, message_infos.size);
}

{
test_msgs__msg__Strings msg;
test_msgs__msg__Strings__init(&msg);
ASSERT_TRUE(rosidl_runtime_c__String__assign(&msg.string_value, test_string));
ret = rcl_publish(&publisher, &msg, nullptr);
ret = rcl_publish(&publisher, &msg, nullptr);
ret = rcl_publish(&publisher, &msg, nullptr);
ret = rcl_publish(&publisher, &msg, nullptr);
ret = rcl_publish(&publisher, &msg, nullptr);
test_msgs__msg__Strings__fini(&msg);
ASSERT_EQ(RCL_RET_OK, ret) << rcl_get_error_string().str;
}

// Give a brief moment for publications to go through.
std::this_thread::sleep_for(std::chrono::milliseconds(500));
// Take fewer messages than are available in the subscription
{
size_t size = 3;
rmw_message_info_sequence_t message_infos;
rmw_message_info_sequence_init(&message_infos, size, &allocator);

rmw_message_sequence_t messages;
rmw_message_sequence_init(&messages, size, &allocator);

auto seq = test_msgs__msg__Strings__Sequence__create(size);

for (size_t ii = 0; ii < size; ++ii) {
messages.data[ii] = &seq->data[ii];
}

OSRF_TESTING_TOOLS_CPP_SCOPE_EXIT(
{
rmw_message_info_sequence_fini(&message_infos);
rmw_message_sequence_fini(&messages);
test_msgs__msg__Strings__Sequence__destroy(seq);
});

ret = rcl_take_sequence(&subscription, 3, &messages, &message_infos, nullptr);

ASSERT_EQ(RCL_RET_OK, ret) << rcl_get_error_string().str;
ASSERT_EQ(3u, messages.size);
ASSERT_EQ(3u, message_infos.size);

ASSERT_EQ(
std::string(test_string),
std::string(seq->data[0].string_value.data, seq->data[0].string_value.size));
}
}