From d5f3d35fbe3127e30d4ee9d40f57c42584eeb830 Mon Sep 17 00:00:00 2001 From: Ivan Santiago Paunovic Date: Thu, 29 Jul 2021 11:37:57 -0300 Subject: [PATCH] Revert "Updating client API to be able to remove pending requests (#1728)" (#1733) This reverts commit bf752c75f57f95cff31bbd0a345751b5850782cc. Signed-off-by: Ivan Santiago Paunovic --- rclcpp/include/rclcpp/client.hpp | 304 +++++------------------------ rclcpp/test/rclcpp/test_client.cpp | 16 +- 2 files changed, 49 insertions(+), 271 deletions(-) diff --git a/rclcpp/include/rclcpp/client.hpp b/rclcpp/include/rclcpp/client.hpp index ccbed49493..bd2d326012 100644 --- a/rclcpp/include/rclcpp/client.hpp +++ b/rclcpp/include/rclcpp/client.hpp @@ -17,14 +17,12 @@ #include #include -#include +#include #include -#include // NOLINT, cpplint doesn't think this is a cpp std header #include #include #include #include -#include // NOLINT #include "rcl/client.h" #include "rcl/error_handling.h" @@ -180,9 +178,6 @@ template class Client : public ClientBase { public: - using Request = typename ServiceT::Request; - using Response = typename ServiceT::Response; - using SharedRequest = typename ServiceT::Request::SharedPtr; using SharedResponse = typename ServiceT::Response::SharedPtr; @@ -192,7 +187,6 @@ class Client : public ClientBase using SharedPromise = std::shared_ptr; using SharedPromiseWithRequest = std::shared_ptr; - using Future = std::future; using SharedFuture = std::shared_future; using SharedFutureWithRequest = std::shared_future>; @@ -201,88 +195,6 @@ class Client : public ClientBase RCLCPP_SMART_PTR_DEFINITIONS(Client) - /// A convenient Client::Future and request id pair. - class FutureAndRequestId - { -public: - FutureAndRequestId(Future impl, int64_t req_id) - : impl_(std::move(impl)), req_id_(req_id) - {} - - /// Allow implicit conversions to `std::future` by reference. - // TODO(ivanpauno): Maybe, deprecate this in favor of get_future() (?) - operator Future &() {return impl_;} - - /// Deprecated, use take_future() instead. - /** - * Allow implicit conversions to `std::future` by value. - * \deprecated - */ - [[deprecated("FutureAndRequestId: use take_future() instead of an implicit conversion")]] - operator Future() {return impl_;} - - /// Returns the internal `std::future`, moving it out. - /** - * After calling this method, the internal future gets invalidated. - */ - Future - take_future() {return impl_;} - - /// Getter for the internal future. - Future & - get_future() {return impl_;} - - /// Getter for the internal future. - const Future & - get_future() const {return impl_;} - - /// Returns the request id associated with this future. - int64_t get_request_id() const {return req_id_;} - - // delegate future like methods in the std::future impl_ - - /// See std::future::share(). - SharedFuture share() noexcept {return impl_.share();} - /// See std::future::get(). - SharedResponse get() {return impl_.get();} - /// See std::future::valid(). - bool valid() const noexcept {return impl_.valid();} - /// See std::future::wait(). - void wait() const {return impl_.wait();} - /// See std::future::wait_for(). - template - std::future_status wait_for( - const std::chrono::duration & timeout_duration) const - { - return impl_.wait_for(timeout_duration); - } - /// See std::future::wait_until(). - template - std::future_status wait_until( - const std::chrono::time_point & timeout_time) const - { - return impl_.wait_until(timeout_time); - } - - // Rule of five, we could use the rule of zero here, but better be explicit as some of the - // methods are deleted. - - /// Move constructor. - FutureAndRequestId(FutureAndRequestId && other) noexcept = default; - /// Deleted copy constructor, each instance is a unique owner of the future. - FutureAndRequestId(const FutureAndRequestId & other) = delete; - /// Move assignment. - FutureAndRequestId & operator=(FutureAndRequestId && other) noexcept = default; - /// Deleted copy assignment, each instance is a unique owner of the future. - FutureAndRequestId & operator=(const FutureAndRequestId & other) = delete; - /// Destructor. - ~FutureAndRequestId() = default; - -private: - Future impl_; - int64_t req_id_; - }; - /// Default constructor. /** * The constructor for a Client is almost never called directly. @@ -380,83 +292,34 @@ class Client : public ClientBase std::shared_ptr request_header, std::shared_ptr response) override { - auto opt = this->get_and_erase_pending_request(request_header->sequence_number); - if (!opt) { + std::unique_lock lock(pending_requests_mutex_); + auto typed_response = std::static_pointer_cast(response); + int64_t sequence_number = request_header->sequence_number; + // TODO(esteve) this should throw instead since it is not expected to happen in the first place + if (this->pending_requests_.count(sequence_number) == 0) { + RCUTILS_LOG_ERROR_NAMED( + "rclcpp", + "Received invalid sequence number. Ignoring..."); return; } - auto & value = *opt; - auto typed_response = std::static_pointer_cast( - std::move(response)); - if (std::holds_alternative(value)) { - auto & promise = std::get(value); - promise.set_value(std::move(typed_response)); - } else if (std::holds_alternative(value)) { - Promise promise; - promise.set_value(std::move(typed_response)); - const auto & callback = std::get(value); - callback(promise.get_future().share()); - } else if (std::holds_alternative>(value)) { - PromiseWithRequest promise; - const auto & pair = std::get>(value); - promise.set_value(std::make_pair(std::move(pair.second), std::move(typed_response))); - pair.first(promise.get_future().share()); - } + auto tuple = this->pending_requests_[sequence_number]; + auto call_promise = std::get<0>(tuple); + auto callback = std::get<1>(tuple); + auto future = std::get<2>(tuple); + this->pending_requests_.erase(sequence_number); + // Unlock here to allow the service to be called recursively from one of its callbacks. + lock.unlock(); + + call_promise->set_value(typed_response); + callback(future); } - /// Send a request to the service server. - /** - * This method returns a `FutureAndRequestId` instance - * that can be passed to Executor::spin_until_future_complete() to - * wait until it has been completed. - * - * If the future never completes, - * e.g. the call to Executor::spin_until_future_complete() times out, - * Client::remove_pending_request() must be called to clean the client internal state. - * Not doing so will make the `Client` instance to use more memory each time a response is not - * received from the service server. - * - * ```cpp - * auto future = client->async_send_request(my_request); - * if ( - * rclcpp::FutureReturnCode::TIMEOUT == - * executor->spin_until_future_complete(future, timeout)) - * { - * client->remove_pending_request(future); - * // handle timeout - * } else { - * handle_response(future.get()); - * } - * ``` - * - * \param[in] request request to be send. - * \return a FutureAndRequestId instance. - */ - FutureAndRequestId + SharedFuture async_send_request(SharedRequest request) { - Promise promise; - auto future = promise.get_future(); - auto req_id = async_send_request_impl( - *request, - std::move(promise)); - return FutureAndRequestId(std::move(future), req_id); + return async_send_request(request, [](SharedFuture) {}); } - /// Send a request to the service server and schedule a callback in the executor. - /** - * Similar to the previous overload, but a callback will automatically be called when a response is received. - * - * If the callback is never called, because we never got a reply for the service server, remove_pending_request() - * has to be called with the returned request id or prune_pending_requests(). - * Not doing so will make the `Client` instance use more memory each time a response is not - * received from the service server. - * In this case, it's convenient to setup a timer to cleanup the pending requests. - * - * \param[in] request request to be send. - * \param[in] cb callback that will be called when we get a response for this request. - * \return the request id representing the request just sent. - */ - // TODO(ivanpauno): Link to example that shows how to cleanup requests. template< typename CallbackT, typename std::enable_if< @@ -466,24 +329,23 @@ class Client : public ClientBase >::value >::type * = nullptr > - int64_t + SharedFuture async_send_request(SharedRequest request, CallbackT && cb) { - return async_send_request_impl( - *request, - CallbackType{std::forward(cb)}); + std::lock_guard lock(pending_requests_mutex_); + int64_t sequence_number; + rcl_ret_t ret = rcl_send_request(get_client_handle().get(), request.get(), &sequence_number); + if (RCL_RET_OK != ret) { + rclcpp::exceptions::throw_from_rcl_error(ret, "failed to send request"); + } + + SharedPromise call_promise = std::make_shared(); + SharedFuture f(call_promise->get_future()); + pending_requests_[sequence_number] = + std::make_tuple(call_promise, std::forward(cb), f); + return f; } - /// Send a request to the service server and schedule a callback in the executor. - /** - * Similar to the previous method, but you can get both the request and response in the callback. - * - * \param[in] request request to be send. - * \param[in] cb callback that will be called when we get a response for this request. - * \return the request id representing the request just sent. - */ - // TODO(ivanpauno): Deprecate this. - // If someone wants the request they can capture it in the lambda. template< typename CallbackT, typename std::enable_if< @@ -493,100 +355,28 @@ class Client : public ClientBase >::value >::type * = nullptr > - int64_t + SharedFutureWithRequest async_send_request(SharedRequest request, CallbackT && cb) { - auto & req = *request; - return async_send_request_impl( - req, - std::make_pair(CallbackWithRequestType{std::forward(cb)}, std::move(request))); - } - - /// Cleanup a pending request. - /** - * This notifies the client that we have waited long enough for a response from the server - * to come, we have given up and we are not waiting for a response anymore. - * - * Not calling this will make the client start using more memory for each request - * that never got a reply from the server. - * - * \param[in] request_id request id returned by async_send_request(). - * \return true when a pending request was removed, false if not (e.g. a response was received). - */ - bool - remove_pending_request(int64_t request_id) - { - std::lock_guard guard(pending_requests_mutex_); - return pending_requests_.erase(request_id) != 0u; - } - - /// Cleanup a pending request. - /** - * Convenient overload, same as: - * - * `Client::remove_pending_request(this, future.get_request_id())`. - */ - bool - remove_pending_request(const FutureAndRequestId & future) - { - return this->remove_pending_request(future.get_request_id()); - } - - /// Clean all pending requests. - /** - * \return number of pending requests that were removed. - */ - size_t - prune_requests() - { - std::lock_guard guard(pending_requests_mutex_); - auto ret = pending_requests_.size(); - pending_requests_.clear(); - return ret; - } + SharedPromiseWithRequest promise = std::make_shared(); + SharedFutureWithRequest future_with_request(promise->get_future()); -protected: - using PendingRequestsMapValue = std::variant< - std::promise, - CallbackType, - std::pair>; + auto wrapping_cb = [future_with_request, promise, request, + cb = std::forward(cb)](SharedFuture future) { + auto response = future.get(); + promise->set_value(std::make_pair(request, response)); + cb(future_with_request); + }; - RCLCPP_PUBLIC - int64_t - async_send_request_impl(const Request & request, PendingRequestsMapValue value) - { - int64_t sequence_number; - rcl_ret_t ret = rcl_send_request(get_client_handle().get(), &request, &sequence_number); - if (RCL_RET_OK != ret) { - rclcpp::exceptions::throw_from_rcl_error(ret, "failed to send request"); - } - { - std::lock_guard lock(pending_requests_mutex_); - pending_requests_.try_emplace(sequence_number, std::move(value)); - } - return sequence_number; - } + async_send_request(request, wrapping_cb); - RCLCPP_PUBLIC - std::optional - get_and_erase_pending_request(int64_t request_number) - { - std::unique_lock lock(pending_requests_mutex_); - auto it = this->pending_requests_.find(request_number); - if (it == this->pending_requests_.end()) { - RCUTILS_LOG_DEBUG_NAMED( - "rclcpp", - "Received invalid sequence number. Ignoring..."); - return std::nullopt; - } - auto value = std::move(it->second); - this->pending_requests_.erase(request_number); - return value; + return future_with_request; } +private: RCLCPP_DISABLE_COPY(Client) - std::unordered_map pending_requests_; + std::map> pending_requests_; std::mutex pending_requests_mutex_; }; diff --git a/rclcpp/test/rclcpp/test_client.cpp b/rclcpp/test/rclcpp/test_client.cpp index 5e2adb5e7d..ea64340f03 100644 --- a/rclcpp/test/rclcpp/test_client.cpp +++ b/rclcpp/test/rclcpp/test_client.cpp @@ -216,7 +216,7 @@ class TestClientWithServer : public ::testing::Test received_response = true; }; - auto req_id = client->async_send_request(request, std::move(callback)); + auto future = client->async_send_request(request, std::move(callback)); auto start = std::chrono::steady_clock::now(); while (!received_response && @@ -228,9 +228,6 @@ class TestClientWithServer : public ::testing::Test if (!received_response) { return ::testing::AssertionFailure() << "Waiting for response timed out"; } - if (client->remove_pending_request(req_id)) { - return ::testing::AssertionFailure() << "Should not be able to remove a finished request"; - } return request_result; } @@ -259,7 +256,7 @@ TEST_F(TestClientWithServer, async_send_request_callback_with_request) { EXPECT_NE(nullptr, request_response_pair.second); received_response = true; }; - auto req_id = client->async_send_request(request, std::move(callback)); + auto future = client->async_send_request(request, std::move(callback)); auto start = std::chrono::steady_clock::now(); while (!received_response && @@ -268,15 +265,6 @@ TEST_F(TestClientWithServer, async_send_request_callback_with_request) { rclcpp::spin_some(node); } EXPECT_TRUE(received_response); - EXPECT_FALSE(client->remove_pending_request(req_id)); -} - -TEST_F(TestClientWithServer, test_client_remove_pending_request) { - auto client = node->create_client("no_service_server_available_here"); - auto request = std::make_shared(); - auto future = client->async_send_request(request); - - EXPECT_TRUE(client->remove_pending_request(future)); } TEST_F(TestClientWithServer, async_send_request_rcl_send_request_error) {