Skip to content

Commit

Permalink
filters: take optional status map input
Browse files Browse the repository at this point in the history
Signed-off-by: Venil Noronha <veniln@vmware.com>
  • Loading branch information
venilnoronha committed Oct 29, 2018
1 parent 44967f0 commit 161ba4d
Show file tree
Hide file tree
Showing 28 changed files with 153 additions and 98 deletions.
7 changes: 7 additions & 0 deletions include/envoy/grpc/status.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
#pragma once

#include <map>

namespace Envoy {
namespace Grpc {

Expand Down Expand Up @@ -50,5 +52,10 @@ class Status {
};
};

/**
* A map of HTTP status codes to corresponding gRPC status codes.
*/
typedef std::map<uint64_t, Status::GrpcStatus> StatusMap;

} // namespace Grpc
} // namespace Envoy
2 changes: 2 additions & 0 deletions include/envoy/http/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,13 @@ envoy_cc_library(
envoy_cc_library(
name = "filter_interface",
hdrs = ["filter.h"],
external_deps = ["abseil_optional"],
deps = [
":codec_interface",
":header_map_interface",
"//include/envoy/access_log:access_log_interface",
"//include/envoy/event:dispatcher_interface",
"//include/envoy/grpc:status",
"//include/envoy/router:router_interface",
"//include/envoy/ssl:connection_interface",
"//include/envoy/tracing:http_tracer_interface",
Expand Down
15 changes: 6 additions & 9 deletions include/envoy/http/filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,16 @@

#include "envoy/access_log/access_log.h"
#include "envoy/event/dispatcher.h"
#include "envoy/grpc/status.h"
#include "envoy/http/codec.h"
#include "envoy/http/header_map.h"
#include "envoy/router/router.h"
#include "envoy/ssl/connection.h"
#include "envoy/tracing/http_tracer.h"
#include "envoy/upstream/upstream.h"

#include "absl/types/optional.h"

namespace Envoy {
namespace Http {

Expand Down Expand Up @@ -221,18 +224,12 @@ class StreamDecoderFilterCallbacks : public virtual StreamFilterCallbacks {
* type, or encoded in the grpc-message header.
* @param modify_headers supplies an optional callback function that can modify the
* response headers.
* @param rate_limited_as_resource_exhausted specifies whether a RESOURCE_EXHAUSTED code
* should be returned instead of the default
* UNAVAILABLE code for rate limited gRPC calls.
* @param status_map a map of HTTP status codes to corresponding gRPC status
* codes to override the default code mapping.
*/
virtual void sendLocalReply(Code response_code, const std::string& body_text,
std::function<void(HeaderMap& headers)> modify_headers,
bool rate_limited_as_resource_exhausted) PURE;

void sendLocalReply(Code response_code, const std::string& body_text,
std::function<void(HeaderMap& headers)> modify_headers) {
sendLocalReply(response_code, body_text, modify_headers, false);
}
const absl::optional<Grpc::StatusMap>& status_map) PURE;

/**
* Called with 100-Continue headers to be encoded.
Expand Down
1 change: 1 addition & 0 deletions source/common/grpc/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ envoy_cc_library(
name = "status_lib",
srcs = ["status.cc"],
hdrs = ["status.h"],
external_deps = ["abseil_optional"],
deps = [
"//include/envoy/grpc:status",
],
Expand Down
15 changes: 10 additions & 5 deletions source/common/grpc/status.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,19 @@ namespace Envoy {
namespace Grpc {

Status::GrpcStatus Utility::httpToGrpcStatus(uint64_t http_response_status,
bool rate_limited_as_resource_exhausted) {
const absl::optional<StatusMap>& status_map) {
// See:
// * https://github.com/grpc/grpc/blob/master/doc/http-grpc-status-mapping.md
// * https://cloud.google.com/apis/design/errors#generating_errors

// override default code mapping if provided
if (status_map) {
StatusMap::const_iterator iter = status_map.value().find(http_response_status);
if (iter != status_map.value().end()) {
return iter->second;
}
}

switch (http_response_status) {
case 400:
return Status::GrpcStatus::Internal;
Expand All @@ -18,10 +27,6 @@ Status::GrpcStatus Utility::httpToGrpcStatus(uint64_t http_response_status,
case 404:
return Status::GrpcStatus::Unimplemented;
case 429:
if (rate_limited_as_resource_exhausted) {
return Status::GrpcStatus::ResourceExhausted;
}
return Status::GrpcStatus::Unavailable;
case 502:
case 503:
case 504:
Expand Down
8 changes: 5 additions & 3 deletions source/common/grpc/status.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

#include "envoy/grpc/status.h"

#include "absl/types/optional.h"

namespace Envoy {
namespace Grpc {

Expand All @@ -24,12 +26,12 @@ class Utility {
* See https://cloud.google.com/apis/design/errors#generating_errors.
*
* @param http_response_status HTTP status code.
* @param rate_limited_as_resource_exhausted whether a 429 response code
* should be mapped to RESOURCE_EXHAUSTED instead of UNAVAILABLE.
* @param status_map a map of HTTP status codes to corresponding gRPC status
* codes to override the default code mapping.
* @return Status::GrpcStatus corresponding gRPC status code.
*/
static Status::GrpcStatus httpToGrpcStatus(uint64_t http_response_status,
bool rate_limited_as_resource_exhausted);
const absl::optional<StatusMap>& status_map);

/**
* @param grpc_status gRPC status from grpc-status header.
Expand Down
1 change: 1 addition & 0 deletions source/common/http/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,7 @@ envoy_cc_library(
name = "utility_lib",
srcs = ["utility.cc"],
hdrs = ["utility.h"],
external_deps = ["abseil_optional"],
deps = [
":exception_lib",
":header_map_lib",
Expand Down
4 changes: 2 additions & 2 deletions source/common/http/async_client_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ class AsyncStreamImpl : public AsyncClient::Stream,
const Buffer::Instance* decodingBuffer() override { return buffered_body_.get(); }
void sendLocalReply(Code code, const std::string& body,
std::function<void(HeaderMap& headers)> modify_headers,
bool rate_limited_as_resource_exhausted = false) override {
const absl::optional<Grpc::StatusMap>& status_map) override {
Utility::sendLocalReply(
is_grpc_request_,
[this, modify_headers](HeaderMapPtr&& headers, bool end_stream) -> void {
Expand All @@ -297,7 +297,7 @@ class AsyncStreamImpl : public AsyncClient::Stream,
encodeHeaders(std::move(headers), end_stream);
},
[this](Buffer::Instance& data, bool end_stream) -> void { encodeData(data, end_stream); },
remote_closed_, code, body, is_head_request_, rate_limited_as_resource_exhausted);
remote_closed_, code, body, status_map, is_head_request_);
}
// The async client won't pause if sending an Expect: 100-Continue so simply
// swallows any incoming encode100Continue.
Expand Down
30 changes: 16 additions & 14 deletions source/common/http/conn_manager_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -430,9 +430,9 @@ void ConnectionManagerImpl::ActiveStream::onIdleTimeout() {
// or gRPC status code, and/or set H2 RST_STREAM error.
connection_manager_.doEndStream(*this);
} else {
sendLocalReply(request_headers_ != nullptr &&
Grpc::Common::hasGrpcContentType(*request_headers_),
Http::Code::RequestTimeout, "stream timeout", nullptr, is_head_request_);
sendLocalReply(
request_headers_ != nullptr && Grpc::Common::hasGrpcContentType(*request_headers_),
Http::Code::RequestTimeout, "stream timeout", nullptr, is_head_request_, absl::nullopt);
}
}

Expand Down Expand Up @@ -504,7 +504,8 @@ void ConnectionManagerImpl::ActiveStream::decodeHeaders(HeaderMapPtr&& headers,
Server::OverloadActionState::Active) {
connection_manager_.stats_.named_.downstream_rq_overload_close_.inc();
sendLocalReply(Grpc::Common::hasGrpcContentType(*request_headers_),
Http::Code::ServiceUnavailable, "envoy overloaded", nullptr, is_head_request_);
Http::Code::ServiceUnavailable, "envoy overloaded", nullptr, is_head_request_,
absl::nullopt);
return;
}

Expand Down Expand Up @@ -537,7 +538,7 @@ void ConnectionManagerImpl::ActiveStream::decodeHeaders(HeaderMapPtr&& headers,
stream_info_.protocol(protocol);
if (!connection_manager_.config_.http1Settings().accept_http_10_) {
// Send "Upgrade Required" if HTTP/1.0 support is not explicitly configured on.
sendLocalReply(false, Code::UpgradeRequired, "", nullptr, is_head_request_);
sendLocalReply(false, Code::UpgradeRequired, "", nullptr, is_head_request_, absl::nullopt);
return;
} else {
// HTTP/1.0 defaults to single-use connections. Make sure the connection
Expand All @@ -561,7 +562,7 @@ void ConnectionManagerImpl::ActiveStream::decodeHeaders(HeaderMapPtr&& headers,
} else {
// Require host header. For HTTP/1.1 Host has already been translated to :authority.
sendLocalReply(Grpc::Common::hasGrpcContentType(*request_headers_), Code::BadRequest, "",
nullptr, is_head_request_);
nullptr, is_head_request_, absl::nullopt);
return;
}
}
Expand All @@ -575,7 +576,7 @@ void ConnectionManagerImpl::ActiveStream::decodeHeaders(HeaderMapPtr&& headers,
// envoy users who do not wish to proxy large headers.
if (request_headers_->byteSize() > (60 * 1024)) {
sendLocalReply(Grpc::Common::hasGrpcContentType(*request_headers_),
Code::RequestHeaderFieldsTooLarge, "", nullptr, is_head_request_);
Code::RequestHeaderFieldsTooLarge, "", nullptr, is_head_request_, absl::nullopt);
return;
}

Expand All @@ -587,7 +588,7 @@ void ConnectionManagerImpl::ActiveStream::decodeHeaders(HeaderMapPtr&& headers,
if (!request_headers_->Path() || request_headers_->Path()->value().c_str()[0] != '/') {
connection_manager_.stats_.named_.downstream_rq_non_relative_path_.inc();
sendLocalReply(Grpc::Common::hasGrpcContentType(*request_headers_), Code::NotFound, "", nullptr,
is_head_request_);
is_head_request_, absl::nullopt);
return;
}

Expand Down Expand Up @@ -615,7 +616,7 @@ void ConnectionManagerImpl::ActiveStream::decodeHeaders(HeaderMapPtr&& headers,
// Do not allow upgrades if the route does not support it.
connection_manager_.stats_.named_.downstream_rq_ws_on_non_ws_route_.inc();
sendLocalReply(Grpc::Common::hasGrpcContentType(*request_headers_), Code::Forbidden, "",
nullptr, is_head_request_);
nullptr, is_head_request_, absl::nullopt);
return;
}
// Allow non websocket requests to go through websocket enabled routes.
Expand Down Expand Up @@ -930,7 +931,7 @@ void ConnectionManagerImpl::ActiveStream::refreshCachedRoute() {
void ConnectionManagerImpl::ActiveStream::sendLocalReply(
bool is_grpc_request, Code code, const std::string& body,
std::function<void(HeaderMap& headers)> modify_headers, bool is_head_request,
bool rate_limited_as_resource_exhausted) {
const absl::optional<Grpc::StatusMap>& status_map) {
Utility::sendLocalReply(is_grpc_request,
[this, modify_headers](HeaderMapPtr&& headers, bool end_stream) -> void {
if (modify_headers != nullptr) {
Expand All @@ -946,8 +947,7 @@ void ConnectionManagerImpl::ActiveStream::sendLocalReply(
// request instead.
encodeData(nullptr, data, end_stream);
},
state_.destroyed_, code, body, is_head_request,
rate_limited_as_resource_exhausted);
state_.destroyed_, code, body, status_map, is_head_request);
}

void ConnectionManagerImpl::ActiveStream::encode100ContinueHeaders(
Expand Down Expand Up @@ -1537,7 +1537,8 @@ void ConnectionManagerImpl::ActiveStreamDecoderFilter::requestDataTooLarge() {
onDecoderFilterAboveWriteBufferHighWatermark();
} else {
parent_.connection_manager_.stats_.named_.downstream_rq_too_large_.inc();
sendLocalReply(Code::PayloadTooLarge, CodeUtility::toString(Code::PayloadTooLarge), nullptr);
sendLocalReply(Code::PayloadTooLarge, CodeUtility::toString(Code::PayloadTooLarge), nullptr,
absl::nullopt);
}
}

Expand Down Expand Up @@ -1625,7 +1626,8 @@ void ConnectionManagerImpl::ActiveStreamEncoderFilter::responseDataTooLarge() {
parent_.state_.local_complete_ = end_stream;
},
parent_.state_.destroyed_, Http::Code::InternalServerError,
CodeUtility::toString(Http::Code::InternalServerError), parent_.is_head_request_, false);
CodeUtility::toString(Http::Code::InternalServerError), absl::nullopt,
parent_.is_head_request_);
parent_.maybeEndEncode(parent_.state_.local_complete_);
} else {
resetStream();
Expand Down
6 changes: 3 additions & 3 deletions source/common/http/conn_manager_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -174,9 +174,9 @@ class ConnectionManagerImpl : Logger::Loggable<Logger::Id::http>,
}
void sendLocalReply(Code code, const std::string& body,
std::function<void(HeaderMap& headers)> modify_headers,
bool rate_limited_as_resource_exhausted = false) override {
const absl::optional<Grpc::StatusMap>& status_map) override {
parent_.sendLocalReply(is_grpc_request_, code, body, modify_headers, parent_.is_head_request_,
rate_limited_as_resource_exhausted);
status_map);
}
void encode100ContinueHeaders(HeaderMapPtr&& headers) override;
void encodeHeaders(HeaderMapPtr&& headers, bool end_stream) override;
Expand Down Expand Up @@ -284,7 +284,7 @@ class ConnectionManagerImpl : Logger::Loggable<Logger::Id::http>,
HeaderMap& addEncodedTrailers();
void sendLocalReply(bool is_grpc_request, Code code, const std::string& body,
std::function<void(HeaderMap& headers)> modify_headers,
bool is_head_request, bool rate_limited_as_resource_exhausted = false);
bool is_head_request, const absl::optional<Grpc::StatusMap>& status_map);
void encode100ContinueHeaders(ActiveStreamEncoderFilter* filter, HeaderMap& headers);
void encodeHeaders(ActiveStreamEncoderFilter* filter, HeaderMap& headers, bool end_stream);
void encodeData(ActiveStreamEncoderFilter* filter, Buffer::Instance& data, bool end_stream);
Expand Down
21 changes: 10 additions & 11 deletions source/common/http/utility.cc
Original file line number Diff line number Diff line change
Expand Up @@ -243,33 +243,32 @@ Utility::parseHttp1Settings(const envoy::api::v2::core::Http1ProtocolOptions& co

void Utility::sendLocalReply(bool is_grpc, StreamDecoderFilterCallbacks& callbacks,
const bool& is_reset, Code response_code, const std::string& body_text,
bool is_head_request, bool rate_limited_as_resource_exhausted) {
const absl::optional<Grpc::StatusMap>& status_map,
bool is_head_request) {
sendLocalReply(is_grpc,
[&](HeaderMapPtr&& headers, bool end_stream) -> void {
callbacks.encodeHeaders(std::move(headers), end_stream);
},
[&](Buffer::Instance& data, bool end_stream) -> void {
callbacks.encodeData(data, end_stream);
},
is_reset, response_code, body_text, is_head_request,
rate_limited_as_resource_exhausted);
is_reset, response_code, body_text, status_map, is_head_request);
}

void Utility::sendLocalReply(
bool is_grpc, std::function<void(HeaderMapPtr&& headers, bool end_stream)> encode_headers,
std::function<void(Buffer::Instance& data, bool end_stream)> encode_data, const bool& is_reset,
Code response_code, const std::string& body_text, bool is_head_request,
bool rate_limited_as_resource_exhausted) {
Code response_code, const std::string& body_text,
const absl::optional<Grpc::StatusMap>& status_map, bool is_head_request) {
// encode_headers() may reset the stream, so the stream must not be reset before calling it.
ASSERT(!is_reset);
// Respond with a gRPC trailers-only response if the request is gRPC
if (is_grpc) {
HeaderMapPtr response_headers{
new HeaderMapImpl{{Headers::get().Status, std::to_string(enumToInt(Code::OK))},
{Headers::get().ContentType, Headers::get().ContentTypeValues.Grpc},
{Headers::get().GrpcStatus,
std::to_string(enumToInt(Grpc::Utility::httpToGrpcStatus(
enumToInt(response_code), rate_limited_as_resource_exhausted)))}}};
HeaderMapPtr response_headers{new HeaderMapImpl{
{Headers::get().Status, std::to_string(enumToInt(Code::OK))},
{Headers::get().ContentType, Headers::get().ContentTypeValues.Grpc},
{Headers::get().GrpcStatus, std::to_string(enumToInt(Grpc::Utility::httpToGrpcStatus(
enumToInt(response_code), status_map)))}}};
if (!body_text.empty() && !is_head_request) {
// TODO: GrpcMessage should be percent-encoded
response_headers->insertGrpcMessage().value(body_text);
Expand Down
12 changes: 9 additions & 3 deletions source/common/http/utility.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include "common/json/json_loader.h"

#include "absl/strings/string_view.h"
#include "absl/types/optional.h"

namespace Envoy {
namespace Http {
Expand Down Expand Up @@ -136,11 +137,13 @@ Http1Settings parseHttp1Settings(const envoy::api::v2::core::Http1ProtocolOption
* @param response_code supplies the HTTP response code.
* @param body_text supplies the optional body text which is sent using the text/plain content
* type.
* @param status_map a map of HTTP status codes to corresponding gRPC status
* codes to override the default code mapping.
* @param is_head_request tells if this is a response to a HEAD request
*/
void sendLocalReply(bool is_grpc, StreamDecoderFilterCallbacks& callbacks, const bool& is_reset,
Code response_code, const std::string& body_text, bool is_head_request,
bool rate_limited_as_resource_exhausted);
Code response_code, const std::string& body_text,
const absl::optional<Grpc::StatusMap>& status_map, bool is_head_request);

/**
* Create a locally generated response using the provided lambdas.
Expand All @@ -153,12 +156,15 @@ void sendLocalReply(bool is_grpc, StreamDecoderFilterCallbacks& callbacks, const
* @param response_code supplies the HTTP response code.
* @param body_text supplies the optional body text which is sent using the text/plain content
* type.
* @param status_map a map of HTTP status codes to corresponding gRPC status
* codes to override the default code mapping.
*/
void sendLocalReply(bool is_grpc,
std::function<void(HeaderMapPtr&& headers, bool end_stream)> encode_headers,
std::function<void(Buffer::Instance& data, bool end_stream)> encode_data,
const bool& is_reset, Code response_code, const std::string& body_text,
bool is_head_request = false, bool rate_limited_as_resource_exhausted = false);
const absl::optional<Grpc::StatusMap>& status_map,
bool is_head_request = false);

struct GetLastAddressFromXffInfo {
// Last valid address pulled from the XFF header.
Expand Down
Loading

0 comments on commit 161ba4d

Please sign in to comment.