Skip to content

Commit

Permalink
Add support for taking a sequence of messages (#366)
Browse files Browse the repository at this point in the history
* Add support for taking a sequence of messages

FastRTPS doesn't have built-in support for taking multiple samples, so
this implementation utilizes a loop over the `take` function multiple
times.

Signed-off-by: Michael Carroll <michael@openrobotics.org>

* Add upper bound based on current unread messages

Signed-off-by: Michael Carroll <michael@openrobotics.org>

* Change return types to invalid argument

Signed-off-by: Michael Carroll <michael@openrobotics.org>

* Line length

Signed-off-by: Michael Carroll <michael@openrobotics.org>

* std::min on macos

Signed-off-by: Michael Carroll <michael@openrobotics.org>

* Uncrustify

Signed-off-by: Michael Carroll <michael@openrobotics.org>
  • Loading branch information
mjcarroll authored Apr 24, 2020
1 parent 4d704e8 commit d1702b1
Show file tree
Hide file tree
Showing 4 changed files with 125 additions and 0 deletions.
14 changes: 14 additions & 0 deletions rmw_fastrtps_cpp/src/rmw_take.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,20 @@ rmw_take_with_info(
eprosima_fastrtps_identifier, subscription, ros_message, taken, message_info, allocation);
}

rmw_ret_t
rmw_take_sequence(
const rmw_subscription_t * subscription,
size_t count,
rmw_message_sequence_t * message_sequence,
rmw_message_info_sequence_t * message_info_sequence,
size_t * taken,
rmw_subscription_allocation_t * allocation)
{
return rmw_fastrtps_shared_cpp::__rmw_take_sequence(
eprosima_fastrtps_identifier, subscription, count, message_sequence, message_info_sequence,
taken, allocation);
}

rmw_ret_t
rmw_take_serialized_message(
const rmw_subscription_t * subscription,
Expand Down
14 changes: 14 additions & 0 deletions rmw_fastrtps_dynamic_cpp/src/rmw_take.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,20 @@ rmw_take_with_info(
eprosima_fastrtps_identifier, subscription, ros_message, taken, message_info, allocation);
}

rmw_ret_t
rmw_take_sequence(
const rmw_subscription_t * subscription,
size_t count,
rmw_message_sequence_t * message_sequence,
rmw_message_info_sequence_t * message_info_sequence,
size_t * taken,
rmw_subscription_allocation_t * allocation)
{
return rmw_fastrtps_shared_cpp::__rmw_take_sequence(
eprosima_fastrtps_identifier, subscription, count, message_sequence, message_info_sequence,
taken, allocation);
}

rmw_ret_t
rmw_take_serialized_message(
const rmw_subscription_t * subscription,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,17 @@ __rmw_take(
bool * taken,
rmw_subscription_allocation_t * allocation);

RMW_FASTRTPS_SHARED_CPP_PUBLIC
rmw_ret_t
__rmw_take_sequence(
const char * identifier,
const rmw_subscription_t * subscription,
size_t count,
rmw_message_sequence_t * message_sequencxe,
rmw_message_info_sequence_t * message_info_sequence,
size_t * taken,
rmw_subscription_allocation_t * allocation);

RMW_FASTRTPS_SHARED_CPP_PUBLIC
rmw_ret_t
__rmw_take_event(
Expand Down
86 changes: 86 additions & 0 deletions rmw_fastrtps_shared_cpp/src/rmw_take.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,57 @@ _take(
return RMW_RET_OK;
}

rmw_ret_t
_take_sequence(
const char * identifier,
const rmw_subscription_t * subscription,
size_t count,
rmw_message_sequence_t * message_sequence,
rmw_message_info_sequence_t * message_info_sequence,
size_t * taken,
rmw_subscription_allocation_t * allocation)
{
*taken = 0;
bool taken_flag = false;
rmw_ret_t ret = RMW_RET_OK;

if (subscription->implementation_identifier != identifier) {
RMW_SET_ERROR_MSG("publisher handle not from this implementation");
return RMW_RET_ERROR;
}

CustomSubscriberInfo * info = static_cast<CustomSubscriberInfo *>(subscription->data);
RCUTILS_CHECK_FOR_NULL_WITH_MSG(info, "custom subscriber info is null", return RMW_RET_ERROR);

// Limit the upper bound of reads to the number unread at the beginning.
// This prevents any samples that are added after the beginning of the
// _take_sequence call from being read.
auto unread_count = info->subscriber_->get_unread_count();
if (unread_count < count) {
count = unread_count;
}

for (size_t ii = 0; ii < count; ++ii) {
taken_flag = false;
ret = _take(
identifier, subscription, message_sequence->data[*taken],
&taken_flag, &message_info_sequence->data[*taken], allocation);

if (ret != RMW_RET_OK) {
break;
}

if (taken_flag) {
(*taken)++;
}
}

message_sequence->size = *taken;
message_info_sequence->size = *taken;

return ret;
}

rmw_ret_t
__rmw_take_event(
const char * identifier,
Expand Down Expand Up @@ -133,6 +184,41 @@ __rmw_take(
return _take(identifier, subscription, ros_message, taken, nullptr, allocation);
}

rmw_ret_t
__rmw_take_sequence(
const char * identifier,
const rmw_subscription_t * subscription,
size_t count,
rmw_message_sequence_t * message_sequence,
rmw_message_info_sequence_t * message_info_sequence,
size_t * taken,
rmw_subscription_allocation_t * allocation)
{
RCUTILS_CHECK_FOR_NULL_WITH_MSG(
subscription, "subscription pointer is null", return RMW_RET_INVALID_ARGUMENT);
RCUTILS_CHECK_FOR_NULL_WITH_MSG(
message_sequence, "message_sequence pointer is null", return RMW_RET_INVALID_ARGUMENT);
RCUTILS_CHECK_FOR_NULL_WITH_MSG(
message_info_sequence, "message_info_sequence pointer is null",
return RMW_RET_INVALID_ARGUMENT);
RCUTILS_CHECK_FOR_NULL_WITH_MSG(
taken, "size_t flag for count is null", return RMW_RET_INVALID_ARGUMENT);

if (count > message_sequence->capacity) {
RMW_SET_ERROR_MSG("Insufficient capacity in message_sequence");
return RMW_RET_ERROR;
}

if (count > message_info_sequence->capacity) {
RMW_SET_ERROR_MSG("Insufficient capacity in message_info_sequence");
return RMW_RET_ERROR;
}

return _take_sequence(
identifier, subscription, count, message_sequence, message_info_sequence,
taken, allocation);
}

rmw_ret_t
__rmw_take_with_info(
const char * identifier,
Expand Down

0 comments on commit d1702b1

Please sign in to comment.