Skip to content

Commit

Permalink
Revert "Refactoring: Collapse Http1Upstream and Http2Upstream into si…
Browse files Browse the repository at this point in the history
…ngle HttpUpstream (envoyproxy#30725)"

This reverts commit b5d2626.

Signed-off-by: Adi Suissa-Peleg <adip@google.com>
  • Loading branch information
adisuissa committed Nov 17, 2023
1 parent 2a13227 commit 92f737b
Show file tree
Hide file tree
Showing 5 changed files with 157 additions and 106 deletions.
4 changes: 2 additions & 2 deletions envoy/tcp/upstream.h
Original file line number Diff line number Diff line change
Expand Up @@ -105,9 +105,9 @@ class GenericConnectionPoolCallbacks {

// Interface for a generic Upstream, which can communicate with a TCP or HTTP
// upstream.
class GenericUpstream : public Event::DeferredDeletable {
class GenericUpstream {
public:
~GenericUpstream() override = default;
virtual ~GenericUpstream() = default;

/**
* Enable/disable further data from this stream.
Expand Down
2 changes: 1 addition & 1 deletion source/common/tcp_proxy/tcp_proxy.cc
Original file line number Diff line number Diff line change
Expand Up @@ -799,7 +799,7 @@ void Filter::onUpstreamEvent(Network::ConnectionEvent event) {

if (event == Network::ConnectionEvent::RemoteClose ||
event == Network::ConnectionEvent::LocalClose) {
read_callbacks_->connection().dispatcher().deferredDelete(std::move(upstream_));
upstream_.reset();
disableIdleTimer();

if (connecting) {
Expand Down
138 changes: 86 additions & 52 deletions source/common/tcp_proxy/upstream.cc
Original file line number Diff line number Diff line change
Expand Up @@ -77,53 +77,12 @@ TcpUpstream::onDownstreamEvent(Network::ConnectionEvent event) {

HttpUpstream::HttpUpstream(Tcp::ConnectionPool::UpstreamCallbacks& callbacks,
const TunnelingConfigHelper& config,
StreamInfo::StreamInfo& downstream_info, Http::CodecType type)
StreamInfo::StreamInfo& downstream_info)
: config_(config), downstream_info_(downstream_info), response_decoder_(*this),
upstream_callbacks_(callbacks), type_(type) {}
upstream_callbacks_(callbacks) {}

HttpUpstream::~HttpUpstream() { resetEncoder(Network::ConnectionEvent::LocalClose); }

bool HttpUpstream::isValidResponse(const Http::ResponseHeaderMap& headers) {
if (type_ == Http::CodecType::HTTP1) {
// According to RFC7231 any 2xx response indicates that the connection is
// established.
// Any 'Content-Length' or 'Transfer-Encoding' header fields MUST be ignored.
// https://tools.ietf.org/html/rfc7231#section-4.3.6
return Http::CodeUtility::is2xx(Http::Utility::getResponseStatus(headers));
}
return Http::Utility::getResponseStatus(headers) == 200;
}

void HttpUpstream::setRequestEncoder(Http::RequestEncoder& request_encoder, bool is_ssl) {
request_encoder_ = &request_encoder;
request_encoder_->getStream().addCallbacks(*this);
auto headers = Http::createHeaderMap<Http::RequestHeaderMapImpl>({
{Http::Headers::get().Method, config_.usePost() ? "POST" : "CONNECT"},
{Http::Headers::get().Host, config_.host(downstream_info_)},
});
if (config_.usePost()) {
headers->addReference(Http::Headers::get().Path, config_.postPath());
}

if (type_ == Http::CodecType::HTTP1) {
request_encoder_->enableTcpTunneling();
ASSERT(request_encoder_->http1StreamEncoderOptions() != absl::nullopt);
} else {
const std::string& scheme =
is_ssl ? Http::Headers::get().SchemeValues.Https : Http::Headers::get().SchemeValues.Http;

if (config_.usePost()) {
headers->addReference(Http::Headers::get().Scheme, scheme);
}
}

config_.headerEvaluator().evaluateHeaders(*headers, {downstream_info_.getRequestHeaders()},
downstream_info_);
const auto status = request_encoder_->encodeHeaders(*headers, false);
// Encoding can only fail on missing required request headers.
ASSERT(status.ok());
}

bool HttpUpstream::readDisable(bool disable) {
if (!request_encoder_) {
return false;
Expand All @@ -136,15 +95,8 @@ void HttpUpstream::encodeData(Buffer::Instance& data, bool end_stream) {
if (!request_encoder_) {
return;
}
// auto codec = type_;
request_encoder_->encodeData(data, end_stream);

// doneWriting() is being skipped for H1 codec to avoid resetEncoder() call.
// This is because H1 codec does not support half-closed stream. Calling resetEncoder()
// will fully close the upstream connection without flushing any pending data, rather than a http
// stream reset.
// More details can be found on https://github.com/envoyproxy/envoy/pull/13293
if ((type_ != Http::CodecType::HTTP1) && (end_stream)) {
if (end_stream) {
doneWriting();
}
}
Expand Down Expand Up @@ -293,7 +245,11 @@ HttpConnPool::~HttpConnPool() {

void HttpConnPool::newStream(GenericConnectionPoolCallbacks& callbacks) {
callbacks_ = &callbacks;
upstream_ = std::make_unique<HttpUpstream>(upstream_callbacks_, config_, downstream_info_, type_);
if (type_ == Http::CodecType::HTTP1) {
upstream_ = std::make_unique<Http1Upstream>(upstream_callbacks_, config_, downstream_info_);
} else {
upstream_ = std::make_unique<Http2Upstream>(upstream_callbacks_, config_, downstream_info_);
}
Tcp::ConnectionPool::Cancellable* handle =
conn_pool_data_.value().newStream(upstream_->responseDecoder(), *this,
{/*can_send_early_data_=*/false,
Expand Down Expand Up @@ -335,5 +291,83 @@ void HttpConnPool::onGenericPoolReady(Upstream::HostDescriptionConstSharedPtr& h
callbacks_->onGenericPoolReady(nullptr, std::move(upstream_), host, address_provider, ssl_info);
}

Http2Upstream::Http2Upstream(Tcp::ConnectionPool::UpstreamCallbacks& callbacks,
const TunnelingConfigHelper& config,
StreamInfo::StreamInfo& downstream_info)
: HttpUpstream(callbacks, config, downstream_info) {}

bool Http2Upstream::isValidResponse(const Http::ResponseHeaderMap& headers) {
if (Http::Utility::getResponseStatus(headers) != 200) {
return false;
}
return true;
}

void Http2Upstream::setRequestEncoder(Http::RequestEncoder& request_encoder, bool is_ssl) {
request_encoder_ = &request_encoder;
request_encoder_->getStream().addCallbacks(*this);

const std::string& scheme =
is_ssl ? Http::Headers::get().SchemeValues.Https : Http::Headers::get().SchemeValues.Http;
auto headers = Http::createHeaderMap<Http::RequestHeaderMapImpl>({
{Http::Headers::get().Method, config_.usePost() ? "POST" : "CONNECT"},
{Http::Headers::get().Host, config_.host(downstream_info_)},
});

if (config_.usePost()) {
headers->addReference(Http::Headers::get().Path, config_.postPath());
headers->addReference(Http::Headers::get().Scheme, scheme);
}

config_.headerEvaluator().evaluateHeaders(*headers, {downstream_info_.getRequestHeaders()},
downstream_info_);
const auto status = request_encoder_->encodeHeaders(*headers, false);
// Encoding can only fail on missing required request headers.
ASSERT(status.ok());
}

Http1Upstream::Http1Upstream(Tcp::ConnectionPool::UpstreamCallbacks& callbacks,
const TunnelingConfigHelper& config,
StreamInfo::StreamInfo& downstream_info)
: HttpUpstream(callbacks, config, downstream_info) {}

void Http1Upstream::setRequestEncoder(Http::RequestEncoder& request_encoder, bool) {
request_encoder_ = &request_encoder;
request_encoder_->getStream().addCallbacks(*this);
request_encoder_->enableTcpTunneling();
ASSERT(request_encoder_->http1StreamEncoderOptions() != absl::nullopt);

auto headers = Http::createHeaderMap<Http::RequestHeaderMapImpl>({
{Http::Headers::get().Method, config_.usePost() ? "POST" : "CONNECT"},
{Http::Headers::get().Host, config_.host(downstream_info_)},
});

if (config_.usePost()) {
// Path is required for POST requests.
headers->addReference(Http::Headers::get().Path, config_.postPath());
}

config_.headerEvaluator().evaluateHeaders(*headers, {downstream_info_.getRequestHeaders()},
downstream_info_);
const auto status = request_encoder_->encodeHeaders(*headers, false);
// Encoding can only fail on missing required request headers.
ASSERT(status.ok());
}

bool Http1Upstream::isValidResponse(const Http::ResponseHeaderMap& headers) {
// According to RFC7231 any 2xx response indicates that the connection is
// established.
// Any 'Content-Length' or 'Transfer-Encoding' header fields MUST be ignored.
// https://tools.ietf.org/html/rfc7231#section-4.3.6
return Http::CodeUtility::is2xx(Http::Utility::getResponseStatus(headers));
}

void Http1Upstream::encodeData(Buffer::Instance& data, bool end_stream) {
if (!request_encoder_) {
return;
}
request_encoder_->encodeData(data, end_stream);
}

} // namespace TcpProxy
} // namespace Envoy
30 changes: 24 additions & 6 deletions source/common/tcp_proxy/upstream.h
Original file line number Diff line number Diff line change
Expand Up @@ -129,11 +129,9 @@ class HttpUpstream : public GenericUpstream, protected Http::StreamCallbacks {
public:
using TunnelingConfig =
envoy::extensions::filters::network::tcp_proxy::v3::TcpProxy_TunnelingConfig;
HttpUpstream(Tcp::ConnectionPool::UpstreamCallbacks& callbacks,
const TunnelingConfigHelper& config, StreamInfo::StreamInfo& downstream_info,
Http::CodecType type);

~HttpUpstream() override;
bool isValidResponse(const Http::ResponseHeaderMap&);
virtual bool isValidResponse(const Http::ResponseHeaderMap&) PURE;

void doneReading();
void doneWriting();
Expand All @@ -154,13 +152,15 @@ class HttpUpstream : public GenericUpstream, protected Http::StreamCallbacks {
void onAboveWriteBufferHighWatermark() override;
void onBelowWriteBufferLowWatermark() override;

void setRequestEncoder(Http::RequestEncoder& request_encoder, bool is_ssl);
virtual void setRequestEncoder(Http::RequestEncoder& request_encoder, bool is_ssl) PURE;
void setConnPoolCallbacks(std::unique_ptr<HttpConnPool::Callbacks>&& callbacks) {
conn_pool_callbacks_ = std::move(callbacks);
}
Ssl::ConnectionInfoConstSharedPtr getUpstreamConnectionSslInfo() override { return nullptr; }

protected:
HttpUpstream(Tcp::ConnectionPool::UpstreamCallbacks& callbacks,
const TunnelingConfigHelper& config, StreamInfo::StreamInfo& downstream_info);
void resetEncoder(Network::ConnectionEvent event, bool inform_downstream = true);

// The encoder offered by the upstream http client.
Expand Down Expand Up @@ -208,7 +208,6 @@ class HttpUpstream : public GenericUpstream, protected Http::StreamCallbacks {
};
DecoderShim response_decoder_;
Tcp::ConnectionPool::UpstreamCallbacks& upstream_callbacks_;
const Http::CodecType type_;
bool read_half_closed_{};
bool write_half_closed_{};

Expand All @@ -217,5 +216,24 @@ class HttpUpstream : public GenericUpstream, protected Http::StreamCallbacks {
std::unique_ptr<HttpConnPool::Callbacks> conn_pool_callbacks_;
};

class Http1Upstream : public HttpUpstream {
public:
Http1Upstream(Tcp::ConnectionPool::UpstreamCallbacks& callbacks,
const TunnelingConfigHelper& config, StreamInfo::StreamInfo& downstream_info);

void encodeData(Buffer::Instance& data, bool end_stream) override;
void setRequestEncoder(Http::RequestEncoder& request_encoder, bool is_ssl) override;
bool isValidResponse(const Http::ResponseHeaderMap& headers) override;
};

class Http2Upstream : public HttpUpstream {
public:
Http2Upstream(Tcp::ConnectionPool::UpstreamCallbacks& callbacks,
const TunnelingConfigHelper& config, StreamInfo::StreamInfo& downstream_info);

void setRequestEncoder(Http::RequestEncoder& request_encoder, bool is_ssl) override;
bool isValidResponse(const Http::ResponseHeaderMap& headers) override;
};

} // namespace TcpProxy
} // namespace Envoy
Loading

0 comments on commit 92f737b

Please sign in to comment.