Skip to content

Commit

Permalink
CXXCBC-445: return request_canceled on IO error in HTTP session (#568)
Browse files Browse the repository at this point in the history
  • Loading branch information
avsej authored May 21, 2024
1 parent 0e80cac commit 7960339
Show file tree
Hide file tree
Showing 5 changed files with 57 additions and 36 deletions.
20 changes: 13 additions & 7 deletions core/io/http_command.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -95,16 +95,22 @@ struct http_command : public std::enable_shared_from_this<http_command<Request>>
if (ec == asio::error::operation_aborted) {
return;
}
self->cancel();
if constexpr (io::http_traits::supports_readonly_v<Request>) {
if (self->request.readonly) {
self->cancel(errc::common::unambiguous_timeout);
return;
}
}
self->cancel(errc::common::ambiguous_timeout);
});
}

void cancel()
void cancel(std::error_code ec)
{
invoke_handler(ec, {});
if (session_) {
session_->stop();
}
invoke_handler(errc::common::unambiguous_timeout, {});
}

void invoke_handler(std::error_code ec, io::http_response&& msg)
Expand All @@ -113,10 +119,9 @@ struct http_command : public std::enable_shared_from_this<http_command<Request>>
span_->end();
span_ = nullptr;
}
if (handler_) {
handler_(ec, std::move(msg));
if (auto handler = std::move(handler_); handler) {
handler(ec, std::move(msg));
}
handler_ = nullptr;
retry_backoff.cancel();
deadline.cancel();
}
Expand Down Expand Up @@ -166,10 +171,11 @@ struct http_command : public std::enable_shared_from_this<http_command<Request>>
}
self->deadline.cancel();
self->finish_dispatch(self->session_->remote_address(), self->session_->local_address());
CB_LOG_TRACE(R"({} HTTP response: {}, client_context_id="{}", status={}, body={})",
CB_LOG_TRACE(R"({} HTTP response: {}, client_context_id="{}", ec={}, status={}, body={})",
self->session_->log_prefix(),
self->request.type,
self->client_context_id_,
ec.message(),
msg.status_code,
msg.status_code == 200 ? "[hidden]" : msg.body.data());
if (auto parser_ec = msg.body.ec(); !ec && parser_ec) {
Expand Down
58 changes: 29 additions & 29 deletions core/io/http_session.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ class http_session : public std::enable_shared_from_this<http_session>
, ctx_(ctx)
, resolver_(ctx_)
, stream_(std::make_unique<plain_stream_impl>(ctx_))
, deadline_timer_(stream_->get_executor())
, connect_deadline_timer_(stream_->get_executor())
, idle_timer_(stream_->get_executor())
, credentials_(credentials)
, hostname_(hostname)
Expand All @@ -148,7 +148,7 @@ class http_session : public std::enable_shared_from_this<http_session>
, ctx_(ctx)
, resolver_(ctx_)
, stream_(std::make_unique<tls_stream_impl>(ctx_, tls))
, deadline_timer_(ctx_)
, connect_deadline_timer_(ctx_)
, idle_timer_(ctx_)
, credentials_(credentials)
, hostname_(hostname)
Expand Down Expand Up @@ -245,24 +245,27 @@ class http_session : public std::enable_shared_from_this<http_session>
on_stop_handler_ = std::move(handler);
}

void cancel_current_response(std::error_code ec)
{
std::scoped_lock lock(current_response_mutex_);
if (auto ctx = std::move(current_response_); ctx.handler) {
ctx.handler(ec, std::move(ctx.parser.response));
}
}

void stop()
{
if (stopped_) {
return;
}
stopped_ = true;
state_ = diag::endpoint_state::disconnecting;
stream_->close([](std::error_code) {});
deadline_timer_.cancel();
stream_->close([](std::error_code) {
});
connect_deadline_timer_.cancel();
idle_timer_.cancel();

{
std::scoped_lock lock(current_response_mutex_);
auto ctx = std::move(current_response_);
if (ctx.handler) {
ctx.handler(errc::common::ambiguous_timeout, {});
}
}
cancel_current_response(errc::common::request_canceled);

if (auto handler = std::move(on_stop_handler_); handler) {
handler();
Expand Down Expand Up @@ -306,7 +309,9 @@ class http_session : public std::enable_shared_from_this<http_session>
if (stopped_) {
return;
}
asio::post(asio::bind_executor(ctx_, [self = shared_from_this()]() { self->do_write(); }));
asio::post(asio::bind_executor(ctx_, [self = shared_from_this()]() {
self->do_write();
}));
}

template<typename Handler>
Expand Down Expand Up @@ -380,7 +385,6 @@ class http_session : public std::enable_shared_from_this<http_session>
endpoints_ = endpoints;
CB_LOG_TRACE("{} resolved \"{}:{}\" to {} endpoint(s)", info_.log_prefix(), hostname_, service_, endpoints_.size());
do_connect(endpoints_.begin());
deadline_timer_.async_wait(std::bind(&http_session::check_deadline, shared_from_this(), std::placeholders::_1));
}

void do_connect(asio::ip::tcp::resolver::results_type::iterator it)
Expand All @@ -396,7 +400,15 @@ class http_session : public std::enable_shared_from_this<http_session>
hostname_,
service_,
http_ctx_.options.connect_timeout.count());
deadline_timer_.expires_after(http_ctx_.options.connect_timeout);
connect_deadline_timer_.async_wait([self = shared_from_this()](std::error_code ec) {
if (ec == asio::error::operation_aborted || self->stopped_) {
return;
}
self->cancel_current_response(couchbase::errc::common::unambiguous_timeout);
self->stream_->close([](std::error_code) {
});
});
connect_deadline_timer_.expires_after(http_ctx_.options.connect_timeout);
stream_->async_connect(it->endpoint(), std::bind(&http_session::on_connect, shared_from_this(), std::placeholders::_1, it));
} else {
CB_LOG_ERROR("{} no more endpoints left to connect, \"{}:{}\" is not reachable", info_.log_prefix(), hostname_, service_);
Expand Down Expand Up @@ -430,24 +442,11 @@ class http_session : public std::enable_shared_from_this<http_session>
std::scoped_lock lock(info_mutex_);
info_ = http_session_info(client_id_, id_, stream_->local_endpoint(), it->endpoint());
}
deadline_timer_.cancel();
connect_deadline_timer_.cancel();
flush();
}
}

void check_deadline(std::error_code ec)
{
if (ec == asio::error::operation_aborted || stopped_) {
return;
}
if (deadline_timer_.expiry() <= asio::steady_timer::clock_type::now()) {
stream_->close([](std::error_code) {});
deadline_timer_.cancel();
return;
}
deadline_timer_.async_wait(std::bind(&http_session::check_deadline, shared_from_this(), std::placeholders::_1));
}

void do_read()
{
if (stopped_ || reading_ || !stream_->is_open()) {
Expand Down Expand Up @@ -484,6 +483,7 @@ class http_session : public std::enable_shared_from_this<http_session>
res = self->current_response_.parser.feed(reinterpret_cast<const char*>(self->input_buffer_.data()), bytes_transferred);
}
if (res.failure) {
CB_LOG_ERROR("{} Parsing error while reading from the socket: {}", self->info_.log_prefix(), res.error);
return self->stop();
}
if (res.complete) {
Expand Down Expand Up @@ -557,7 +557,7 @@ class http_session : public std::enable_shared_from_this<http_session>
asio::io_context& ctx_;
asio::ip::tcp::resolver resolver_;
std::unique_ptr<stream_impl> stream_;
asio::steady_timer deadline_timer_;
asio::steady_timer connect_deadline_timer_;
asio::steady_timer idle_timer_;

cluster_credentials credentials_;
Expand Down
7 changes: 7 additions & 0 deletions core/io/http_traits.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,11 @@ struct supports_parent_span : public std::false_type {
template<typename T>
inline constexpr bool supports_parent_span_v = supports_parent_span<T>::value;

template<typename T>
struct supports_readonly : public std::false_type {
};

template<typename T>
inline constexpr bool supports_readonly_v = supports_readonly<T>::value;

} // namespace couchbase::core::io::http_traits
4 changes: 4 additions & 0 deletions core/operations/document_analytics.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -111,4 +111,8 @@ namespace couchbase::core::io::http_traits
template<>
struct supports_parent_span<couchbase::core::operations::analytics_request> : public std::true_type {
};

template<>
struct supports_readonly<couchbase::core::operations::analytics_request> : public std::true_type {
};
} // namespace couchbase::core::io::http_traits
4 changes: 4 additions & 0 deletions core/operations/document_query.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -132,4 +132,8 @@ struct supports_sticky_node<couchbase::core::operations::query_request> : public
template<>
struct supports_parent_span<couchbase::core::operations::query_request> : public std::true_type {
};

template<>
struct supports_readonly<couchbase::core::operations::query_request> : public std::true_type {
};
} // namespace couchbase::core::io::http_traits

0 comments on commit 7960339

Please sign in to comment.