Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add rcl_publisher_wait_for_all_acked support #913

Merged
merged 7 commits into from
Aug 25, 2021
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 42 additions & 0 deletions rcl/include/rcl/publisher.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ extern "C"
#include "rcl/macros.h"
#include "rcl/node.h"
#include "rcl/visibility_control.h"
#include "rcl/time.h"

/// Internal rcl publisher implementation struct.
struct rcl_publisher_impl_t;
Expand Down Expand Up @@ -434,6 +435,47 @@ RCL_WARN_UNUSED
rcl_ret_t
rcl_publisher_assert_liveliness(const rcl_publisher_t * publisher);

/// Wait until all published message data is acknowledged or until the specified timeout elapses.
/**
* This function waits until all published message data were acknowledged by peer node or timeout.
*
* The timeout unit is nanoseconds.
* If the timeout is negative then this function will block indefinitely until all published message
* data were acknowledged.
* If the timeout is 0 then this function will be non-blocking; checking all published message data
* were acknowledged (If acknowledged, return RCL_RET_OK. Otherwise, return RCL_RET_TIMEOUT), but
* not waiting.
* If the timeout is greater than 0 then this function will return after that period of time has
* elapsed (return RCL_RET_TIMEOUT) or all published message data were acknowledged (return
* RCL_RET_OK).
*
* This function only waits for acknowledgments if the publisher's QOS profile is RELIABLE.
* Otherwise this function will immediately return RCL_RET_OK.
*
* <hr>
* Attribute | Adherence
* ------------------ | -------------
* Allocates Memory | No
* Thread-Safe | Yes
* Uses Atomics | No
* Lock-Free | No
*
* \param[in] publisher handle to the publisher that needs to wait for all acked.
* \param[in] timeout the duration to wait for all published message data were acknowledged, in
* nanoseconds.
* \return #RCL_RET_OK if successful, or
* \return #RCL_RET_TIMEOUT if timed out, or
* \return #RCL_RET_PUBLISHER_INVALID if publisher is invalid, or
* \return #RCL_RET_ERROR if an unspecified error occurs, or
* \return #RCL_RET_UNSUPPORTED if the middleware does not support that feature.
*/
RCL_PUBLIC
RCL_WARN_UNUSED
rcl_ret_t
rcl_publisher_wait_for_all_acked(
const rcl_publisher_t * publisher,
rcl_duration_value_t timeout);

/// Get the topic name for the publisher.
/**
* This function returns the publisher's internal topic name string.
Expand Down
37 changes: 37 additions & 0 deletions rcl/src/rcl/publisher.c
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ extern "C"
#include "rcl/node.h"
#include "rcutils/logging_macros.h"
#include "rcutils/macros.h"
#include "rcl/time.h"
#include "rmw/time.h"
#include "rmw/error_handling.h"
#include "tracetools/tracetools.h"

Expand Down Expand Up @@ -304,6 +306,41 @@ rcl_publisher_assert_liveliness(const rcl_publisher_t * publisher)
return RCL_RET_OK;
}

rcl_ret_t
rcl_publisher_wait_for_all_acked(const rcl_publisher_t * publisher, rcl_duration_value_t timeout)
{
if (!rcl_publisher_is_valid(publisher)) {
return RCL_RET_PUBLISHER_INVALID; // error already set
}

rmw_time_t rmw_timeout;
if (timeout > 0) {
rmw_timeout.sec = RCL_NS_TO_S(timeout);
rmw_timeout.nsec = timeout % 1000000000;
} else if (timeout < 0) {
rmw_time_t infinite = RMW_DURATION_INFINITE;
rmw_timeout = infinite;
ivanpauno marked this conversation as resolved.
Show resolved Hide resolved
} else {
rmw_time_t zero = RMW_DURATION_UNSPECIFIED;
rmw_timeout = zero;
}

rmw_ret_t ret = rmw_publisher_wait_for_all_acked(publisher->impl->rmw_handle, rmw_timeout);
if (ret != RMW_RET_OK) {
if (ret == RMW_RET_TIMEOUT) {
return RCL_RET_TIMEOUT;
}
RCL_SET_ERROR_MSG(rmw_get_error_string().str);
if (ret == RMW_RET_UNSUPPORTED) {
return RCL_RET_UNSUPPORTED;
} else {
return RCL_RET_ERROR;
}
}

return RCL_RET_OK;
}

const char *
rcl_publisher_get_topic_name(const rcl_publisher_t * publisher)
{
Expand Down
9 changes: 9 additions & 0 deletions rcl/test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,15 @@ function(test_target_function)
AMENT_DEPENDENCIES ${rmw_implementation} "osrf_testing_tools_cpp" "test_msgs"
)

rcl_add_custom_gtest(test_publisher_wait_all_ack${target_suffix}
SRCS rcl/test_publisher_wait_all_ack.cpp rcl/wait_for_entity_helpers.cpp
ENV ${rmw_implementation_env_var}
APPEND_LIBRARY_DIRS ${extra_lib_dirs}
INCLUDE_DIRS ${CMAKE_CURRENT_SOURCE_DIR}/../src/rcl/
LIBRARIES ${PROJECT_NAME} mimick
AMENT_DEPENDENCIES ${rmw_implementation} "osrf_testing_tools_cpp" "test_msgs"
)

rcl_add_custom_gtest(test_service${target_suffix}
SRCS rcl/test_service.cpp rcl/wait_for_entity_helpers.cpp
ENV ${rmw_implementation_env_var}
Expand Down
69 changes: 69 additions & 0 deletions rcl/test/rcl/test_publisher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -402,6 +402,8 @@ TEST_F(CLASSNAME(TestPublisherFixture, RMW_IMPLEMENTATION), test_invalid_publish

EXPECT_EQ(RCL_RET_OK, rcl_publisher_assert_liveliness(&publisher));

EXPECT_EQ(RCL_RET_OK, rcl_publisher_wait_for_all_acked(&publisher, 0));

size_t count_size;
test_msgs__msg__BasicTypes msg;
rcl_serialized_message_t serialized_msg = rmw_get_zero_initialized_serialized_message();
Expand Down Expand Up @@ -429,6 +431,8 @@ TEST_F(CLASSNAME(TestPublisherFixture, RMW_IMPLEMENTATION), test_invalid_publish
rcl_reset_error();
EXPECT_EQ(RCL_RET_PUBLISHER_INVALID, rcl_publisher_assert_liveliness(&publisher));
rcl_reset_error();
EXPECT_EQ(RCL_RET_PUBLISHER_INVALID, rcl_publisher_wait_for_all_acked(&publisher, 10000000));
rcl_reset_error();
EXPECT_EQ(RCL_RET_PUBLISHER_INVALID, rcl_publish(&publisher, &msg, null_allocation_is_valid_arg));
rcl_reset_error();
EXPECT_EQ(
Expand Down Expand Up @@ -471,6 +475,8 @@ TEST_F(CLASSNAME(TestPublisherFixture, RMW_IMPLEMENTATION), test_invalid_publish
rcl_reset_error();
EXPECT_EQ(RCL_RET_PUBLISHER_INVALID, rcl_publisher_assert_liveliness(&publisher));
rcl_reset_error();
EXPECT_EQ(RCL_RET_PUBLISHER_INVALID, rcl_publisher_wait_for_all_acked(&publisher, 10000000));
rcl_reset_error();
EXPECT_EQ(RCL_RET_PUBLISHER_INVALID, rcl_publish(&publisher, &msg, null_allocation_is_valid_arg));
rcl_reset_error();
EXPECT_EQ(
Expand Down Expand Up @@ -502,6 +508,8 @@ TEST_F(CLASSNAME(TestPublisherFixture, RMW_IMPLEMENTATION), test_invalid_publish
rcl_reset_error();
EXPECT_EQ(RCL_RET_PUBLISHER_INVALID, rcl_publisher_assert_liveliness(&publisher));
rcl_reset_error();
EXPECT_EQ(RCL_RET_PUBLISHER_INVALID, rcl_publisher_wait_for_all_acked(&publisher, 10000000));
rcl_reset_error();
EXPECT_EQ(RCL_RET_PUBLISHER_INVALID, rcl_publish(&publisher, &msg, null_allocation_is_valid_arg));
rcl_reset_error();
EXPECT_EQ(
Expand Down Expand Up @@ -532,6 +540,8 @@ TEST_F(CLASSNAME(TestPublisherFixture, RMW_IMPLEMENTATION), test_invalid_publish
rcl_reset_error();
EXPECT_EQ(RCL_RET_PUBLISHER_INVALID, rcl_publisher_assert_liveliness(nullptr));
rcl_reset_error();
EXPECT_EQ(RCL_RET_PUBLISHER_INVALID, rcl_publisher_wait_for_all_acked(nullptr, 10000000));
rcl_reset_error();
EXPECT_EQ(RCL_RET_PUBLISHER_INVALID, rcl_publish(nullptr, &msg, null_allocation_is_valid_arg));
rcl_reset_error();
EXPECT_EQ(
Expand Down Expand Up @@ -572,6 +582,65 @@ TEST_F(CLASSNAME(TestPublisherFixtureInit, RMW_IMPLEMENTATION), test_mock_assert
rcl_reset_error();
}

// Mocking rmw_publisher_wait_for_all_acked to make
// rcl_publisher_wait_for_all_acked fail
MOCKING_UTILS_BOOL_OPERATOR_RETURNS_FALSE(rmw_time_t, ==)
MOCKING_UTILS_BOOL_OPERATOR_RETURNS_FALSE(rmw_time_t, !=)
MOCKING_UTILS_BOOL_OPERATOR_RETURNS_FALSE(rmw_time_t, <)
MOCKING_UTILS_BOOL_OPERATOR_RETURNS_FALSE(rmw_time_t, >)

TEST_F(
CLASSNAME(TestPublisherFixtureInit, RMW_IMPLEMENTATION),
test_mock_assert_wait_for_all_acked)
{
#define CHECK_PUBLISHER_WAIT_FOR_ALL_ACKED_RETURN(RMW_RET_RESULT, EXPECT_RET) do { \
rmw_publisher_wait_for_all_acked_return = RMW_RET_RESULT; \
ret = rcl_publisher_wait_for_all_acked(&publisher, 1000000); \
EXPECT_EQ(EXPECT_RET, ret); \
rcl_reset_error(); \
} while (0)

rcl_ret_t ret;
rmw_ret_t rmw_publisher_wait_for_all_acked_return;
auto mock = mocking_utils::patch_and_return(
"lib:rcl", rmw_publisher_wait_for_all_acked, rmw_publisher_wait_for_all_acked_return);

{
// Now normal usage of the function rcl_publisher_wait_for_all_acked returning
// unexpected RMW_RET_TIMEOUT
SCOPED_TRACE("Check RCL return failed !");
CHECK_PUBLISHER_WAIT_FOR_ALL_ACKED_RETURN(RMW_RET_TIMEOUT, RCL_RET_TIMEOUT);
}

{
// Now normal usage of the function rcl_publisher_wait_for_all_acked returning
// unexpected RMW_RET_UNSUPPORTED
SCOPED_TRACE("Check RCL return failed !");
CHECK_PUBLISHER_WAIT_FOR_ALL_ACKED_RETURN(RMW_RET_UNSUPPORTED, RCL_RET_UNSUPPORTED);
}

{
// Now normal usage of the function rcl_publisher_wait_for_all_acked returning
// unexpected RMW_RET_INVALID_ARGUMENT
SCOPED_TRACE("Check RCL return failed !");
CHECK_PUBLISHER_WAIT_FOR_ALL_ACKED_RETURN(RMW_RET_INVALID_ARGUMENT, RCL_RET_ERROR);
}

{
// Now normal usage of the function rcl_publisher_wait_for_all_acked returning
// unexpected RMW_RET_INCORRECT_RMW_IMPLEMENTATION
SCOPED_TRACE("Check RCL return failed !");
CHECK_PUBLISHER_WAIT_FOR_ALL_ACKED_RETURN(RMW_RET_INCORRECT_RMW_IMPLEMENTATION, RCL_RET_ERROR);
}

{
// Now normal usage of the function rcl_publisher_wait_for_all_acked returning
// unexpected RMW_RET_ERROR
SCOPED_TRACE("Check RCL return failed !");
CHECK_PUBLISHER_WAIT_FOR_ALL_ACKED_RETURN(RMW_RET_ERROR, RCL_RET_ERROR);
}
}

// Mocking rmw_publish to make rcl_publish fail
TEST_F(CLASSNAME(TestPublisherFixtureInit, RMW_IMPLEMENTATION), test_mock_publish) {
auto mock = mocking_utils::patch_and_return("lib:rcl", rmw_publish, RMW_RET_ERROR);
Expand Down
Loading