Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ext_proc: Implement response path for headers only #14713

Merged
merged 6 commits into from
Jan 25, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,16 @@ option (udpa.annotations.file_status).package_version_status = ACTIVE;
// The External Processing filter allows an external service to act on HTTP traffic in a flexible way.

// **Current Implementation Status:**
// At this time, the filter will send a "request_headers" message to the server when the
// filter is invoked from the downstream, and apply any header mutations returned by the
// server. No other part of the protocol is implemented yet.
// At this time, the filter will send the "request_headers" and "response_headers" messages
// to the server when the filter is invoked, and apply any header mutations returned by the
// server, and respond to "immediate_response" messages. No other parts of the protocol are implemented yet.

// As designed, the filter supports up to six different processing steps, which are in the
// process of being implemented:
// * Request headers: IMPLEMENTED
// * Request body: NOT IMPLEMENTED
// * Request trailers: NOT IMPLEMENTED
// * Response headers: NOT IMPLEMENTED
// * Response headers: IMPLEMENTED
// * Response body: NOT IMPLEMENTED
// * Response trailers: NOT IMPLEMENTED

Expand Down Expand Up @@ -78,7 +78,6 @@ message ExternalProcessor {
// The filter supports both the "Envoy" and "Google" gRPC clients.
config.core.v3.GrpcService grpc_service = 1;

// [#not-implemented-hide:]
// By default, if the gRPC stream cannot be established, or if it is closed
// prematurely with an error, the filter will fail. Specifically, if the
// response headers have not yet been delivered, then it will return a 500
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

134 changes: 83 additions & 51 deletions source/extensions/filters/http/ext_proc/ext_proc.cc
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,14 @@ namespace Extensions {
namespace HttpFilters {
namespace ExternalProcessing {

using envoy::service::ext_proc::v3alpha::HeadersResponse;
using envoy::service::ext_proc::v3alpha::ImmediateResponse;
using envoy::service::ext_proc::v3alpha::ProcessingRequest;
using envoy::service::ext_proc::v3alpha::ProcessingResponse;

using Http::FilterHeadersStatus;
using Http::RequestHeaderMap;
using Http::ResponseHeaderMap;

static const std::string kErrorPrefix = "ext_proc error";

Expand Down Expand Up @@ -48,66 +50,96 @@ FilterHeadersStatus Filter::decodeHeaders(RequestHeaderMap& headers, bool end_of
return FilterHeadersStatus::StopAllIterationAndWatermark;
}

FilterHeadersStatus Filter::encodeHeaders(ResponseHeaderMap& headers, bool end_of_stream) {
if (stream_closed_) {
return FilterHeadersStatus::Continue;
htuch marked this conversation as resolved.
Show resolved Hide resolved
}

response_headers_ = &headers;
ProcessingRequest req;
auto* headers_req = req.mutable_response_headers();
MutationUtils::buildHttpHeaders(headers, *headers_req->mutable_headers());
headers_req->set_end_of_stream(end_of_stream);
response_state_ = FilterState::HEADERS;
stream_->send(std::move(req), false);
stats_.stream_msgs_sent_.inc();
return FilterHeadersStatus::StopAllIterationAndWatermark;
}

void Filter::onReceiveMessage(
std::unique_ptr<envoy::service::ext_proc::v3alpha::ProcessingResponse>&& r) {
auto response = std::move(r);
bool message_valid = false;
bool message_handled = false;
ENVOY_LOG(debug, "Received gRPC message. State = {}", request_state_);

// This next section will grow as we support the rest of the protocol
if (request_state_ == FilterState::HEADERS) {
if (response->has_request_headers()) {
ENVOY_LOG(debug, "applying request_headers response");
message_valid = true;
const auto& headers_response = response->request_headers();
if (headers_response.has_response()) {
const auto& common_response = headers_response.response();
if (common_response.has_header_mutation()) {
MutationUtils::applyHeaderMutations(common_response.header_mutation(), *request_headers_);
}
}
} else if (response->has_immediate_response()) {
ENVOY_LOG(debug, "Returning immediate response from processor");
sendImmediateResponse(response->immediate_response());
message_valid = true;
}
request_state_ = FilterState::IDLE;
decoder_callbacks_->continueDecoding();
if (response->has_request_headers()) {
message_handled = handleRequestHeadersResponse(response->request_headers());
} else if (response->has_response_headers()) {
message_handled = handleResponseHeadersResponse(response->response_headers());
} else if (response->has_immediate_response()) {
handleImmediateResponse(response->immediate_response());
message_handled = true;
}

if (message_valid) {
if (message_handled) {
stats_.stream_msgs_received_.inc();
} else {
stats_.spurious_msgs_received_.inc();
// Ignore messages received out of order. However, close the stream to
// protect ourselves since the server is not following the protocol.
ENVOY_LOG(warn, "Spurious response message received on gRPC stream");
cleanupState();
closeStream();
}
}

bool Filter::handleRequestHeadersResponse(const HeadersResponse& response) {
if (request_state_ == FilterState::HEADERS) {
ENVOY_LOG(debug, "applying request_headers response");
MutationUtils::applyCommonHeaderResponse(response, *request_headers_);
request_state_ = FilterState::IDLE;
decoder_callbacks_->continueDecoding();
return true;
}
return false;
gbrail marked this conversation as resolved.
Show resolved Hide resolved
}

bool Filter::handleResponseHeadersResponse(const HeadersResponse& response) {
if (response_state_ == FilterState::HEADERS) {
ENVOY_LOG(debug, "applying response_headers response");
MutationUtils::applyCommonHeaderResponse(response, *response_headers_);
response_state_ = FilterState::IDLE;
encoder_callbacks_->continueEncoding();
return true;
}
return false;
gbrail marked this conversation as resolved.
Show resolved Hide resolved
}

void Filter::handleImmediateResponse(const ImmediateResponse& response) {
// We don't want to process any more stream messages after this.
// Close the stream before sending because "sendLocalResponse" triggers
// additional calls to this filter.
request_state_ = FilterState::IDLE;
response_state_ = FilterState::IDLE;
closeStream();
sendImmediateResponse(response);
}

void Filter::onGrpcError(Grpc::Status::GrpcStatus status) {
ENVOY_LOG(debug, "Received gRPC error on stream: {}", status);
stream_closed_ = true;
stats_.streams_failed_.inc();

if (config_->failureModeAllow()) {
// Ignore this and treat as a successful close
onGrpcClose();
stats_.failure_mode_allowed_.inc();

} else {
// Use a switch here now because there will be more than two
// cases very soon.
switch (request_state_) {
case FilterState::HEADERS:
request_state_ = FilterState::IDLE;
decoder_callbacks_->sendLocalReply(
Http::Code::InternalServerError, "", nullptr, absl::nullopt,
absl::StrFormat("%s: gRPC error %i", kErrorPrefix, status));
break;
default:
// Nothing else to do
break;
}
stream_closed_ = true;
ImmediateResponse errorResponse;
errorResponse.mutable_status()->set_code(envoy::type::v3::StatusCode::InternalServerError);
errorResponse.set_details(absl::StrFormat("%s: gRPC error %i", kErrorPrefix, status));
handleImmediateResponse(errorResponse);
}
}

Expand All @@ -117,16 +149,17 @@ void Filter::onGrpcClose() {
stats_.streams_closed_.inc();
// Successful close. We can ignore the stream for the rest of our request
// and response processing.
// Use a switch here now because there will be more than two
// cases very soon.
switch (request_state_) {
case FilterState::HEADERS:
cleanupState();
}

void Filter::cleanupState() {
if (request_state_ != FilterState::IDLE) {
request_state_ = FilterState::IDLE;
decoder_callbacks_->continueDecoding();
break;
default:
// Nothing to do otherwise
break;
}
if (response_state_ != FilterState::IDLE) {
response_state_ = FilterState::IDLE;
encoder_callbacks_->continueEncoding();
}
}

Expand All @@ -136,15 +169,14 @@ void Filter::sendImmediateResponse(const ImmediateResponse& response) {
response.has_grpc_status()
? absl::optional<Grpc::Status::GrpcStatus>(response.grpc_status().status())
: absl::nullopt;
const auto mutate_headers = [&response](Http::ResponseHeaderMap& headers) {
if (response.has_headers()) {
MutationUtils::applyHeaderMutations(response.headers(), headers);
}
};

decoder_callbacks_->sendLocalReply(
static_cast<Http::Code>(status_code), response.body(),
[&response](Http::ResponseHeaderMap& headers) {
if (response.has_headers()) {
MutationUtils::applyHeaderMutations(response.headers(), headers);
}
},
grpc_status, response.details());
encoder_callbacks_->sendLocalReply(static_cast<Http::Code>(status_code), response.body(),
mutate_headers, grpc_status, response.details());
}

} // namespace ExternalProcessing
Expand Down
20 changes: 14 additions & 6 deletions source/extensions/filters/http/ext_proc/ext_proc.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,12 +83,10 @@ class Filter : public Logger::Loggable<Logger::Id::filter>,

void onDestroy() override;

void setDecoderFilterCallbacks(Http::StreamDecoderFilterCallbacks& callbacks) override {
decoder_callbacks_ = &callbacks;
}

Http::FilterHeadersStatus decodeHeaders(Http::RequestHeaderMap& headers,
bool end_stream) override;
Http::FilterHeadersStatus encodeHeaders(Http::ResponseHeaderMap& headers,
bool end_stream) override;

// ExternalProcessorCallbacks

Expand All @@ -101,23 +99,33 @@ class Filter : public Logger::Loggable<Logger::Id::filter>,

private:
void closeStream();
void cleanupState();
void sendImmediateResponse(const envoy::service::ext_proc::v3alpha::ImmediateResponse& response);

bool
handleRequestHeadersResponse(const envoy::service::ext_proc::v3alpha::HeadersResponse& response);
bool
handleResponseHeadersResponse(const envoy::service::ext_proc::v3alpha::HeadersResponse& response);
void
handleImmediateResponse(const envoy::service::ext_proc::v3alpha::ImmediateResponse& response);

const FilterConfigSharedPtr config_;
const ExternalProcessorClientPtr client_;
ExtProcFilterStats stats_;

Http::StreamDecoderFilterCallbacks* decoder_callbacks_ = nullptr;

// The state of the request-processing, or "decoding" side of the filter.
// We maintain separate states for encoding and decoding since they may
// be interleaved.
FilterState request_state_ = FilterState::IDLE;

// The state of the response-processing side
FilterState response_state_ = FilterState::IDLE;

ExternalProcessorStreamPtr stream_;
bool stream_closed_ = false;

Http::HeaderMap* request_headers_ = nullptr;
Http::HeaderMap* response_headers_ = nullptr;
};

} // namespace ExternalProcessing
Expand Down
16 changes: 14 additions & 2 deletions source/extensions/filters/http/ext_proc/mutation_utils.cc
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ namespace ExternalProcessing {
using Http::Headers;
using Http::LowerCaseString;

using envoy::service::ext_proc::v3alpha::HeaderMutation;
using envoy::service::ext_proc::v3alpha::HeadersResponse;

void MutationUtils::buildHttpHeaders(const Http::HeaderMap& headers_in,
envoy::config::core::v3::HeaderMap& headers_out) {
headers_in.iterate([&headers_out](const Http::HeaderEntry& e) -> Http::HeaderMap::Iterate {
Expand All @@ -24,8 +27,17 @@ void MutationUtils::buildHttpHeaders(const Http::HeaderMap& headers_in,
});
}

void MutationUtils::applyHeaderMutations(
const envoy::service::ext_proc::v3alpha::HeaderMutation& mutation, Http::HeaderMap& headers) {
void MutationUtils::applyCommonHeaderResponse(const HeadersResponse& response,
Http::HeaderMap& headers) {
if (response.has_response()) {
const auto& common_response = response.response();
if (common_response.has_header_mutation()) {
applyHeaderMutations(common_response.header_mutation(), headers);
}
}
}

void MutationUtils::applyHeaderMutations(const HeaderMutation& mutation, Http::HeaderMap& headers) {
for (const auto& remove_header : mutation.remove_headers()) {
if (Http::HeaderUtility::isRemovableHeader(remove_header)) {
headers.remove(LowerCaseString(remove_header));
Expand Down
5 changes: 5 additions & 0 deletions source/extensions/filters/http/ext_proc/mutation_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,11 @@ class MutationUtils {
static void buildHttpHeaders(const Http::HeaderMap& headers_in,
envoy::config::core::v3::HeaderMap& headers_out);

// Apply mutations that are common to header responses.
static void
applyCommonHeaderResponse(const envoy::service::ext_proc::v3alpha::HeadersResponse& response,
Http::HeaderMap& headers);

// Modify header map based on a set of mutations from a protobuf
static void
applyHeaderMutations(const envoy::service::ext_proc::v3alpha::HeaderMutation& mutation,
Expand Down
14 changes: 14 additions & 0 deletions test/extensions/filters/http/ext_proc/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,19 @@ envoy_extension_cc_test(
],
)

envoy_extension_cc_test(
name = "ordering_test",
srcs = ["ordering_test.cc"],
extension_name = "envoy.filters.http.ext_proc",
deps = [
":mock_server_lib",
"//source/extensions/filters/http/ext_proc",
"//test/common/http:common_lib",
"//test/mocks/server:factory_context_mocks",
"//test/test_common:test_runtime_lib",
],
)

envoy_extension_cc_test(
name = "client_test",
srcs = ["client_test.cc"],
Expand Down Expand Up @@ -95,6 +108,7 @@ envoy_extension_cc_test_library(
deps = [
"//include/envoy/http:header_map_interface",
"//test/test_common:utility_lib",
"@com_google_absl//absl/strings:str_format",
"@envoy_api//envoy/config/core/v3:pkg_cc_proto",
],
)
Loading