Skip to content

Commit

Permalink
fix(rest): support rewinds in libcurl (#11703)
Browse files Browse the repository at this point in the history
We configure libcurl to get the data for PUT and POST requests using
callbacks. Sometimes libcurl may have sent part of the data and needs to
resend it. The documentation mentions "multi-pass authentication
methods", as well as reusing connections where libcurl detects they were
closed after some data is sent. There are cases on the Internet about
chasing HTTP redirects (3xx responses) too. In any case, we have seen
these as flakes in the tests.

I took the belt and suspenders approach. Try to handle the rewind
requests, but if unhandled treat the error as retryable, because they
are.
  • Loading branch information
coryan authored May 23, 2023
1 parent 6fb98ea commit 508e398
Show file tree
Hide file tree
Showing 2 changed files with 90 additions and 35 deletions.
17 changes: 17 additions & 0 deletions google/cloud/internal/curl_handle.cc
Original file line number Diff line number Diff line change
Expand Up @@ -370,7 +370,24 @@ Status CurlHandle::AsStatus(CURLcode e, char const* where) {
case CURLE_LDAP_INVALID_URL:
case CURLE_FILESIZE_EXCEEDED:
case CURLE_USE_SSL_FAILED:
code = StatusCode::kUnknown;
break;

case CURLE_SEND_FAIL_REWIND:
// We use curl callbacks to send data in PUT and POST requests. libcurl
// may need to "rewind" the data. The documentation for the error is
// sparse, but the documentation for the "rewind" callbacks goes into
// more detail:
// https://curl.se/libcurl/c/CURLOPT_SEEKFUNCTION.html
// This may happen when doing an HTTP PUT or POST with a multi-pass
// authentication method, or when an existing HTTP connection is
// reused too late and the server closes the connection.
//
// All these cases seem retryable, though it seems more efficient to
// implement the rewind callback.
code = StatusCode::kUnavailable;
break;

case CURLE_SSL_ENGINE_SETFAILED:
case CURLE_LOGIN_DENIED:
case CURLE_TFTP_NOTFOUND:
Expand Down
108 changes: 73 additions & 35 deletions google/cloud/internal/curl_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
#include "absl/strings/match.h"
#include "absl/strings/strip.h"
#include <algorithm>
#include <cstdio>
#include <deque>
#include <sstream>
#include <thread>

Expand Down Expand Up @@ -89,10 +91,7 @@ Status AsStatus(CURLMcode result, char const* where) {
class WriteVector {
public:
explicit WriteVector(std::vector<absl::Span<char const>> v)
: writev_(std::move(v)) {
// Reverse the vector so the first chunk is at the end.
std::reverse(writev_.begin(), writev_.end());
}
: original_(std::move(v)), writev_(original_.begin(), original_.end()) {}

std::size_t size() const {
std::size_t size = 0;
Expand All @@ -105,7 +104,7 @@ class WriteVector {
std::size_t MoveTo(absl::Span<char> dst) {
auto const avail = dst.size();
while (!writev_.empty()) {
auto& src = writev_.back();
auto& src = writev_.front();
if (src.size() > dst.size()) {
std::copy(src.begin(), src.begin() + dst.size(), dst.begin());
src.remove_prefix(dst.size());
Expand All @@ -114,15 +113,74 @@ class WriteVector {
}
std::copy(src.begin(), src.end(), dst.begin());
dst.remove_prefix(src.size());
writev_.pop_back();
writev_.pop_front();
}
return avail - dst.size();
}

/// Implements a CURLOPT_SEEKFUNCTION callback.
///
/// @see https://curl.se/libcurl/c/CURLOPT_SEEKFUNCTION.html
/// @returns true if the seek operation was successful.
bool Seek(std::size_t offset, int origin) {
// libcurl claims to only req
if (origin != SEEK_SET) return false;
writev_.assign(original_.begin(), original_.end());
// Reverse the vector so the first chunk is at the end.
std::reverse(writev_.begin(), writev_.end());
while (!writev_.empty()) {
auto& src = writev_.front();
if (src.size() >= offset) {
src.remove_prefix(offset);
offset = 0;
break;
}
offset -= src.size();
writev_.pop_front();
}
return offset == 0;
}

private:
std::vector<absl::Span<char const>> writev_;
std::vector<absl::Span<char const>> original_;
std::deque<absl::Span<char const>> writev_;
};

extern "C" { // libcurl callbacks

// It would be nice to be able to send data from, and receive data into,
// our own buffers (i.e., without an extra copy). But, there is no such API.

// Receive response data from peer.
std::size_t WriteFunction(char* ptr, size_t size, size_t nmemb,
void* userdata) {
auto* const request = reinterpret_cast<CurlImpl*>(userdata);
return request->WriteCallback(absl::MakeSpan(ptr, size * nmemb));
}

// Receive a response header from peer.
std::size_t HeaderFunction(char* buffer, std::size_t size, std::size_t nitems,
void* userdata) {
auto* const request = reinterpret_cast<CurlImpl*>(userdata);
return request->HeaderCallback(absl::MakeSpan(buffer, size * nitems));
}

// Fill buffer to send data to peer (POST/PUT).
std::size_t ReadFunction(char* buffer, std::size_t size, std::size_t nitems,
void* userdata) {
auto* const writev = reinterpret_cast<WriteVector*>(userdata);
return writev->MoveTo(absl::MakeSpan(buffer, size * nitems));
}

int SeekFunction(void* userdata, curl_off_t offset, int origin) {
auto* const writev = reinterpret_cast<WriteVector*>(userdata);
return writev->Seek(static_cast<std::size_t>(offset), origin)
? CURL_SEEKFUNC_OK
: CURL_SEEKFUNC_FAIL;
}

} // extern "C"

} // namespace

std::size_t SpillBuffer::CopyFrom(absl::Span<char const> src) {
Expand Down Expand Up @@ -166,34 +224,6 @@ std::size_t SpillBuffer::MoveTo(absl::Span<char> dst) {
return len;
}

extern "C" { // libcurl callbacks

// It would be nice to be able to send data from, and receive data into,
// our own buffers (i.e., without an extra copy). But, there is no such API.

// Receive response data from peer.
static std::size_t WriteFunction(char* ptr, size_t size, size_t nmemb,
void* userdata) {
auto* const request = reinterpret_cast<CurlImpl*>(userdata);
return request->WriteCallback(absl::MakeSpan(ptr, size * nmemb));
}

// Receive a response header from peer.
static std::size_t HeaderFunction(char* buffer, std::size_t size,
std::size_t nitems, void* userdata) {
auto* const request = reinterpret_cast<CurlImpl*>(userdata);
return request->HeaderCallback(absl::MakeSpan(buffer, size * nitems));
}

// Fill buffer to send data to peer (POST/PUT).
static std::size_t ReadFunction(char* buffer, std::size_t size,
std::size_t nitems, void* userdata) {
auto* const writev = reinterpret_cast<WriteVector*>(userdata);
return writev->MoveTo(absl::MakeSpan(buffer, size * nitems));
}

} // extern "C"

CurlImpl::CurlImpl(CurlHandle handle,
std::shared_ptr<CurlHandleFactory> factory,
Options const& options)
Expand Down Expand Up @@ -386,6 +416,10 @@ Status CurlImpl::MakeRequest(HttpMethod method, RestContext& context,
if (!status.ok()) return OnTransferError(context, std::move(status));
status = handle_.SetOption(CURLOPT_READDATA, &writev);
if (!status.ok()) return OnTransferError(context, std::move(status));
status = handle_.SetOption(CURLOPT_SEEKFUNCTION, &SeekFunction);
if (!status.ok()) return OnTransferError(context, std::move(status));
status = handle_.SetOption(CURLOPT_SEEKDATA, &writev);
if (!status.ok()) return OnTransferError(context, std::move(status));
SetHeader("Expect:");
return MakeRequestImpl(context);
}
Expand All @@ -397,6 +431,10 @@ Status CurlImpl::MakeRequest(HttpMethod method, RestContext& context,
if (!status.ok()) return OnTransferError(context, std::move(status));
status = handle_.SetOption(CURLOPT_READDATA, &writev);
if (!status.ok()) return OnTransferError(context, std::move(status));
status = handle_.SetOption(CURLOPT_SEEKFUNCTION, &SeekFunction);
if (!status.ok()) return OnTransferError(context, std::move(status));
status = handle_.SetOption(CURLOPT_SEEKDATA, &writev);
if (!status.ok()) return OnTransferError(context, std::move(status));
status = handle_.SetOption(CURLOPT_UPLOAD, 1L);
if (!status.ok()) return OnTransferError(context, std::move(status));
status = handle_.SetOption(CURLOPT_INFILESIZE_LARGE, size);
Expand Down

0 comments on commit 508e398

Please sign in to comment.