Skip to content

Commit

Permalink
Updating client API to be able to remove pending requests (#1728)
Browse files Browse the repository at this point in the history
Signed-off-by: Ivan Santiago Paunovic <ivanpauno@ekumenlabs.com>
  • Loading branch information
ivanpauno authored Jul 29, 2021
1 parent 01f6ebd commit bf752c7
Show file tree
Hide file tree
Showing 2 changed files with 271 additions and 49 deletions.
304 changes: 257 additions & 47 deletions rclcpp/include/rclcpp/client.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,14 @@

#include <atomic>
#include <future>
#include <map>
#include <unordered_map>
#include <memory>
#include <optional> // NOLINT, cpplint doesn't think this is a cpp std header
#include <sstream>
#include <string>
#include <tuple>
#include <utility>
#include <variant> // NOLINT

#include "rcl/client.h"
#include "rcl/error_handling.h"
Expand Down Expand Up @@ -178,6 +180,9 @@ template<typename ServiceT>
class Client : public ClientBase
{
public:
using Request = typename ServiceT::Request;
using Response = typename ServiceT::Response;

using SharedRequest = typename ServiceT::Request::SharedPtr;
using SharedResponse = typename ServiceT::Response::SharedPtr;

Expand All @@ -187,6 +192,7 @@ class Client : public ClientBase
using SharedPromise = std::shared_ptr<Promise>;
using SharedPromiseWithRequest = std::shared_ptr<PromiseWithRequest>;

using Future = std::future<SharedResponse>;
using SharedFuture = std::shared_future<SharedResponse>;
using SharedFutureWithRequest = std::shared_future<std::pair<SharedRequest, SharedResponse>>;

Expand All @@ -195,6 +201,88 @@ class Client : public ClientBase

RCLCPP_SMART_PTR_DEFINITIONS(Client)

/// A convenient Client::Future and request id pair.
class FutureAndRequestId
{
public:
FutureAndRequestId(Future impl, int64_t req_id)
: impl_(std::move(impl)), req_id_(req_id)
{}

/// Allow implicit conversions to `std::future` by reference.
// TODO(ivanpauno): Maybe, deprecate this in favor of get_future() (?)
operator Future &() {return impl_;}

/// Deprecated, use take_future() instead.
/**
* Allow implicit conversions to `std::future` by value.
* \deprecated
*/
[[deprecated("FutureAndRequestId: use take_future() instead of an implicit conversion")]]
operator Future() {return impl_;}

/// Returns the internal `std::future`, moving it out.
/**
* After calling this method, the internal future gets invalidated.
*/
Future
take_future() {return impl_;}

/// Getter for the internal future.
Future &
get_future() {return impl_;}

/// Getter for the internal future.
const Future &
get_future() const {return impl_;}

/// Returns the request id associated with this future.
int64_t get_request_id() const {return req_id_;}

// delegate future like methods in the std::future impl_

/// See std::future::share().
SharedFuture share() noexcept {return impl_.share();}
/// See std::future::get().
SharedResponse get() {return impl_.get();}
/// See std::future::valid().
bool valid() const noexcept {return impl_.valid();}
/// See std::future::wait().
void wait() const {return impl_.wait();}
/// See std::future::wait_for().
template<class Rep, class Period>
std::future_status wait_for(
const std::chrono::duration<Rep, Period> & timeout_duration) const
{
return impl_.wait_for(timeout_duration);
}
/// See std::future::wait_until().
template<class Clock, class Duration>
std::future_status wait_until(
const std::chrono::time_point<Clock, Duration> & timeout_time) const
{
return impl_.wait_until(timeout_time);
}

// Rule of five, we could use the rule of zero here, but better be explicit as some of the
// methods are deleted.

/// Move constructor.
FutureAndRequestId(FutureAndRequestId && other) noexcept = default;
/// Deleted copy constructor, each instance is a unique owner of the future.
FutureAndRequestId(const FutureAndRequestId & other) = delete;
/// Move assignment.
FutureAndRequestId & operator=(FutureAndRequestId && other) noexcept = default;
/// Deleted copy assignment, each instance is a unique owner of the future.
FutureAndRequestId & operator=(const FutureAndRequestId & other) = delete;
/// Destructor.
~FutureAndRequestId() = default;

private:
Future impl_;
int64_t req_id_;
};

/// Default constructor.
/**
* The constructor for a Client is almost never called directly.
Expand Down Expand Up @@ -292,34 +380,83 @@ class Client : public ClientBase
std::shared_ptr<rmw_request_id_t> request_header,
std::shared_ptr<void> response) override
{
std::unique_lock<std::mutex> lock(pending_requests_mutex_);
auto typed_response = std::static_pointer_cast<typename ServiceT::Response>(response);
int64_t sequence_number = request_header->sequence_number;
// TODO(esteve) this should throw instead since it is not expected to happen in the first place
if (this->pending_requests_.count(sequence_number) == 0) {
RCUTILS_LOG_ERROR_NAMED(
"rclcpp",
"Received invalid sequence number. Ignoring...");
auto opt = this->get_and_erase_pending_request(request_header->sequence_number);
if (!opt) {
return;
}
auto tuple = this->pending_requests_[sequence_number];
auto call_promise = std::get<0>(tuple);
auto callback = std::get<1>(tuple);
auto future = std::get<2>(tuple);
this->pending_requests_.erase(sequence_number);
// Unlock here to allow the service to be called recursively from one of its callbacks.
lock.unlock();

call_promise->set_value(typed_response);
callback(future);
auto & value = *opt;
auto typed_response = std::static_pointer_cast<typename ServiceT::Response>(
std::move(response));
if (std::holds_alternative<Promise>(value)) {
auto & promise = std::get<Promise>(value);
promise.set_value(std::move(typed_response));
} else if (std::holds_alternative<CallbackType>(value)) {
Promise promise;
promise.set_value(std::move(typed_response));
const auto & callback = std::get<CallbackType>(value);
callback(promise.get_future().share());
} else if (std::holds_alternative<std::pair<CallbackWithRequestType, SharedRequest>>(value)) {
PromiseWithRequest promise;
const auto & pair = std::get<std::pair<CallbackWithRequestType, SharedRequest>>(value);
promise.set_value(std::make_pair(std::move(pair.second), std::move(typed_response)));
pair.first(promise.get_future().share());
}
}

SharedFuture
/// Send a request to the service server.
/**
* This method returns a `FutureAndRequestId` instance
* that can be passed to Executor::spin_until_future_complete() to
* wait until it has been completed.
*
* If the future never completes,
* e.g. the call to Executor::spin_until_future_complete() times out,
* Client::remove_pending_request() must be called to clean the client internal state.
* Not doing so will make the `Client` instance to use more memory each time a response is not
* received from the service server.
*
* ```cpp
* auto future = client->async_send_request(my_request);
* if (
* rclcpp::FutureReturnCode::TIMEOUT ==
* executor->spin_until_future_complete(future, timeout))
* {
* client->remove_pending_request(future);
* // handle timeout
* } else {
* handle_response(future.get());
* }
* ```
*
* \param[in] request request to be send.
* \return a FutureAndRequestId instance.
*/
FutureAndRequestId
async_send_request(SharedRequest request)
{
return async_send_request(request, [](SharedFuture) {});
Promise promise;
auto future = promise.get_future();
auto req_id = async_send_request_impl(
*request,
std::move(promise));
return FutureAndRequestId(std::move(future), req_id);
}

/// Send a request to the service server and schedule a callback in the executor.
/**
* Similar to the previous overload, but a callback will automatically be called when a response is received.
*
* If the callback is never called, because we never got a reply for the service server, remove_pending_request()
* has to be called with the returned request id or prune_pending_requests().
* Not doing so will make the `Client` instance use more memory each time a response is not
* received from the service server.
* In this case, it's convenient to setup a timer to cleanup the pending requests.
*
* \param[in] request request to be send.
* \param[in] cb callback that will be called when we get a response for this request.
* \return the request id representing the request just sent.
*/
// TODO(ivanpauno): Link to example that shows how to cleanup requests.
template<
typename CallbackT,
typename std::enable_if<
Expand All @@ -329,23 +466,24 @@ class Client : public ClientBase
>::value
>::type * = nullptr
>
SharedFuture
int64_t
async_send_request(SharedRequest request, CallbackT && cb)
{
std::lock_guard<std::mutex> lock(pending_requests_mutex_);
int64_t sequence_number;
rcl_ret_t ret = rcl_send_request(get_client_handle().get(), request.get(), &sequence_number);
if (RCL_RET_OK != ret) {
rclcpp::exceptions::throw_from_rcl_error(ret, "failed to send request");
}

SharedPromise call_promise = std::make_shared<Promise>();
SharedFuture f(call_promise->get_future());
pending_requests_[sequence_number] =
std::make_tuple(call_promise, std::forward<CallbackType>(cb), f);
return f;
return async_send_request_impl(
*request,
CallbackType{std::forward<CallbackT>(cb)});
}

/// Send a request to the service server and schedule a callback in the executor.
/**
* Similar to the previous method, but you can get both the request and response in the callback.
*
* \param[in] request request to be send.
* \param[in] cb callback that will be called when we get a response for this request.
* \return the request id representing the request just sent.
*/
// TODO(ivanpauno): Deprecate this.
// If someone wants the request they can capture it in the lambda.
template<
typename CallbackT,
typename std::enable_if<
Expand All @@ -355,28 +493,100 @@ class Client : public ClientBase
>::value
>::type * = nullptr
>
SharedFutureWithRequest
int64_t
async_send_request(SharedRequest request, CallbackT && cb)
{
SharedPromiseWithRequest promise = std::make_shared<PromiseWithRequest>();
SharedFutureWithRequest future_with_request(promise->get_future());
auto & req = *request;
return async_send_request_impl(
req,
std::make_pair(CallbackWithRequestType{std::forward<CallbackT>(cb)}, std::move(request)));
}

/// Cleanup a pending request.
/**
* This notifies the client that we have waited long enough for a response from the server
* to come, we have given up and we are not waiting for a response anymore.
*
* Not calling this will make the client start using more memory for each request
* that never got a reply from the server.
*
* \param[in] request_id request id returned by async_send_request().
* \return true when a pending request was removed, false if not (e.g. a response was received).
*/
bool
remove_pending_request(int64_t request_id)
{
std::lock_guard guard(pending_requests_mutex_);
return pending_requests_.erase(request_id) != 0u;
}

/// Cleanup a pending request.
/**
* Convenient overload, same as:
*
* `Client::remove_pending_request(this, future.get_request_id())`.
*/
bool
remove_pending_request(const FutureAndRequestId & future)
{
return this->remove_pending_request(future.get_request_id());
}

/// Clean all pending requests.
/**
* \return number of pending requests that were removed.
*/
size_t
prune_requests()
{
std::lock_guard guard(pending_requests_mutex_);
auto ret = pending_requests_.size();
pending_requests_.clear();
return ret;
}

auto wrapping_cb = [future_with_request, promise, request,
cb = std::forward<CallbackWithRequestType>(cb)](SharedFuture future) {
auto response = future.get();
promise->set_value(std::make_pair(request, response));
cb(future_with_request);
};
protected:
using PendingRequestsMapValue = std::variant<
std::promise<SharedResponse>,
CallbackType,
std::pair<CallbackWithRequestType, SharedRequest>>;

async_send_request(request, wrapping_cb);
RCLCPP_PUBLIC
int64_t
async_send_request_impl(const Request & request, PendingRequestsMapValue value)
{
int64_t sequence_number;
rcl_ret_t ret = rcl_send_request(get_client_handle().get(), &request, &sequence_number);
if (RCL_RET_OK != ret) {
rclcpp::exceptions::throw_from_rcl_error(ret, "failed to send request");
}
{
std::lock_guard<std::mutex> lock(pending_requests_mutex_);
pending_requests_.try_emplace(sequence_number, std::move(value));
}
return sequence_number;
}

return future_with_request;
RCLCPP_PUBLIC
std::optional<PendingRequestsMapValue>
get_and_erase_pending_request(int64_t request_number)
{
std::unique_lock<std::mutex> lock(pending_requests_mutex_);
auto it = this->pending_requests_.find(request_number);
if (it == this->pending_requests_.end()) {
RCUTILS_LOG_DEBUG_NAMED(
"rclcpp",
"Received invalid sequence number. Ignoring...");
return std::nullopt;
}
auto value = std::move(it->second);
this->pending_requests_.erase(request_number);
return value;
}

private:
RCLCPP_DISABLE_COPY(Client)

std::map<int64_t, std::tuple<SharedPromise, CallbackType, SharedFuture>> pending_requests_;
std::unordered_map<int64_t, PendingRequestsMapValue> pending_requests_;
std::mutex pending_requests_mutex_;
};

Expand Down
Loading

0 comments on commit bf752c7

Please sign in to comment.