Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ext_proc: Pass stream_info to gRPC streams #18190

Merged
merged 3 commits into from
Sep 28, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions source/extensions/filters/http/ext_proc/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ envoy_cc_library(
hdrs = ["client.h"],
deps = [
"//envoy/grpc:status",
"//envoy/stream_info:stream_info_interface",
"@envoy_api//envoy/service/ext_proc/v3alpha:pkg_cc_proto",
],
)
Expand Down
4 changes: 3 additions & 1 deletion source/extensions/filters/http/ext_proc/client.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#include "envoy/common/pure.h"
#include "envoy/grpc/status.h"
#include "envoy/service/ext_proc/v3alpha/external_processor.pb.h"
#include "envoy/stream_info/stream_info.h"

namespace Envoy {
namespace Extensions {
Expand Down Expand Up @@ -34,7 +35,8 @@ class ExternalProcessorCallbacks {
class ExternalProcessorClient {
public:
virtual ~ExternalProcessorClient() = default;
virtual ExternalProcessorStreamPtr start(ExternalProcessorCallbacks& callbacks) PURE;
virtual ExternalProcessorStreamPtr start(ExternalProcessorCallbacks& callbacks,
const StreamInfo::StreamInfo& stream_info) PURE;
};

using ExternalProcessorClientPtr = std::unique_ptr<ExternalProcessorClient>;
Expand Down
10 changes: 7 additions & 3 deletions source/extensions/filters/http/ext_proc/client_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,23 @@ ExternalProcessorClientImpl::ExternalProcessorClientImpl(
}

ExternalProcessorStreamPtr
ExternalProcessorClientImpl::start(ExternalProcessorCallbacks& callbacks) {
ExternalProcessorClientImpl::start(ExternalProcessorCallbacks& callbacks,
const StreamInfo::StreamInfo& stream_info) {
Grpc::AsyncClient<ProcessingRequest, ProcessingResponse> grpcClient(
factory_->createUncachedRawAsyncClient());
return std::make_unique<ExternalProcessorStreamImpl>(std::move(grpcClient), callbacks);
return std::make_unique<ExternalProcessorStreamImpl>(std::move(grpcClient), callbacks,
stream_info);
}

ExternalProcessorStreamImpl::ExternalProcessorStreamImpl(
Grpc::AsyncClient<ProcessingRequest, ProcessingResponse>&& client,
ExternalProcessorCallbacks& callbacks)
ExternalProcessorCallbacks& callbacks, const StreamInfo::StreamInfo& stream_info)
: callbacks_(callbacks) {
client_ = std::move(client);
auto descriptor = Protobuf::DescriptorPool::generated_pool()->FindMethodByName(kExternalMethod);
grpc_context_.stream_info = &stream_info;
Http::AsyncClient::StreamOptions options;
options.setParentContext(grpc_context_);
stream_ = client_.start(*descriptor, *this, options);
}

Expand Down
7 changes: 5 additions & 2 deletions source/extensions/filters/http/ext_proc/client_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ class ExternalProcessorClientImpl : public ExternalProcessorClient {
const envoy::config::core::v3::GrpcService& grpc_service,
Stats::Scope& scope);

ExternalProcessorStreamPtr start(ExternalProcessorCallbacks& callbacks) override;
ExternalProcessorStreamPtr start(ExternalProcessorCallbacks& callbacks,
const StreamInfo::StreamInfo& stream_info) override;

private:
Grpc::AsyncClientFactoryPtr factory_;
Expand All @@ -38,7 +39,8 @@ class ExternalProcessorStreamImpl : public ExternalProcessorStream,
public Logger::Loggable<Logger::Id::filter> {
public:
ExternalProcessorStreamImpl(Grpc::AsyncClient<ProcessingRequest, ProcessingResponse>&& client,
ExternalProcessorCallbacks& callbacks);
ExternalProcessorCallbacks& callbacks,
const StreamInfo::StreamInfo& stream_info);
void send(ProcessingRequest&& request, bool end_stream) override;
// Close the stream. This is idempotent and will return true if we
// actually closed it.
Expand All @@ -57,6 +59,7 @@ class ExternalProcessorStreamImpl : public ExternalProcessorStream,
ExternalProcessorCallbacks& callbacks_;
Grpc::AsyncClient<ProcessingRequest, ProcessingResponse> client_;
Grpc::AsyncStream<ProcessingRequest> stream_;
Http::AsyncClient::ParentContext grpc_context_;
bool stream_closed_ = false;
};

Expand Down
2 changes: 1 addition & 1 deletion source/extensions/filters/http/ext_proc/ext_proc.cc
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ Filter::StreamOpenState Filter::openStream() {
ENVOY_BUG(!processing_complete_, "openStream should not have been called");
if (!stream_) {
ENVOY_LOG(debug, "Opening gRPC stream to external processor");
stream_ = client_->start(*this);
stream_ = client_->start(*this, decoder_callbacks_->streamInfo());
stats_.streams_started_.inc();
if (processing_complete_) {
// Stream failed while starting and either onGrpcError or onGrpcClose was already called
Expand Down
14 changes: 8 additions & 6 deletions test/extensions/filters/http/ext_proc/client_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

#include "test/mocks/grpc/mocks.h"
#include "test/mocks/stats/mocks.h"
#include "test/mocks/stream_info/mocks.h"

#include "gmock/gmock.h"
#include "gtest/gtest.h"
Expand Down Expand Up @@ -76,18 +77,19 @@ class ExtProcStreamTest : public testing::Test, public ExternalProcessorCallback
Grpc::MockAsyncClientManager client_manager_;
Grpc::MockAsyncStream stream_;
Grpc::RawAsyncStreamCallbacks* stream_callbacks_;
testing::NiceMock<StreamInfo::MockStreamInfo> stream_info_;

testing::NiceMock<Stats::MockStore> stats_store_;
};

TEST_F(ExtProcStreamTest, OpenCloseStream) {
auto stream = client_->start(*this);
auto stream = client_->start(*this, stream_info_);
EXPECT_CALL(stream_, closeStream());
stream->close();
}

TEST_F(ExtProcStreamTest, SendToStream) {
auto stream = client_->start(*this);
auto stream = client_->start(*this, stream_info_);
// Send something and ensure that we get it. Doesn't really matter what.
EXPECT_CALL(stream_, sendMessageRaw_(_, false));
ProcessingRequest req;
Expand All @@ -97,14 +99,14 @@ TEST_F(ExtProcStreamTest, SendToStream) {
}

TEST_F(ExtProcStreamTest, SendAndClose) {
auto stream = client_->start(*this);
auto stream = client_->start(*this, stream_info_);
EXPECT_CALL(stream_, sendMessageRaw_(_, true));
ProcessingRequest req;
stream->send(std::move(req), true);
}

TEST_F(ExtProcStreamTest, ReceiveFromStream) {
auto stream = client_->start(*this);
auto stream = client_->start(*this, stream_info_);
ASSERT_NE(stream_callbacks_, nullptr);
// Send something and ensure that we get it. Doesn't really matter what.
ProcessingResponse resp;
Expand Down Expand Up @@ -133,7 +135,7 @@ TEST_F(ExtProcStreamTest, ReceiveFromStream) {
}

TEST_F(ExtProcStreamTest, StreamClosed) {
auto stream = client_->start(*this);
auto stream = client_->start(*this, stream_info_);
ASSERT_NE(stream_callbacks_, nullptr);
EXPECT_FALSE(last_response_);
EXPECT_FALSE(grpc_closed_);
Expand All @@ -146,7 +148,7 @@ TEST_F(ExtProcStreamTest, StreamClosed) {
}

TEST_F(ExtProcStreamTest, StreamError) {
auto stream = client_->start(*this);
auto stream = client_->start(*this, stream_info_);
ASSERT_NE(stream_callbacks_, nullptr);
EXPECT_FALSE(last_response_);
EXPECT_FALSE(grpc_closed_);
Expand Down
7 changes: 5 additions & 2 deletions test/extensions/filters/http/ext_proc/filter_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include "test/mocks/network/mocks.h"
#include "test/mocks/router/mocks.h"
#include "test/mocks/runtime/mocks.h"
#include "test/mocks/stream_info/mocks.h"
#include "test/mocks/tracing/mocks.h"
#include "test/mocks/upstream/cluster_manager.h"
#include "test/test_common/printers.h"
Expand Down Expand Up @@ -57,10 +58,11 @@ class HttpFilterTest : public testing::Test {
void initialize(std::string&& yaml) {
client_ = std::make_unique<MockClient>();
route_ = std::make_shared<NiceMock<Router::MockRoute>>();
EXPECT_CALL(*client_, start(_)).WillOnce(Invoke(this, &HttpFilterTest::doStart));
EXPECT_CALL(*client_, start(_, _)).WillOnce(Invoke(this, &HttpFilterTest::doStart));
EXPECT_CALL(encoder_callbacks_, dispatcher()).WillRepeatedly(ReturnRef(dispatcher_));
EXPECT_CALL(decoder_callbacks_, dispatcher()).WillRepeatedly(ReturnRef(dispatcher_));
EXPECT_CALL(decoder_callbacks_, route()).WillRepeatedly(Return(route_));
EXPECT_CALL(decoder_callbacks_, streamInfo()).WillRepeatedly(ReturnRef(stream_info_));
EXPECT_CALL(dispatcher_, createTimer_(_))
.Times(AnyNumber())
.WillRepeatedly(Invoke([this](Unused) {
Expand Down Expand Up @@ -98,7 +100,7 @@ class HttpFilterTest : public testing::Test {
}
}

ExternalProcessorStreamPtr doStart(ExternalProcessorCallbacks& callbacks) {
ExternalProcessorStreamPtr doStart(ExternalProcessorCallbacks& callbacks, testing::Unused) {
stream_callbacks_ = &callbacks;

auto stream = std::make_unique<MockStream>();
Expand Down Expand Up @@ -244,6 +246,7 @@ class HttpFilterTest : public testing::Test {
Http::MockStreamDecoderFilterCallbacks decoder_callbacks_;
Http::MockStreamEncoderFilterCallbacks encoder_callbacks_;
Router::RouteConstSharedPtr route_;
testing::NiceMock<StreamInfo::MockStreamInfo> stream_info_;
Http::TestRequestHeaderMapImpl request_headers_;
Http::TestResponseHeaderMapImpl response_headers_;
Http::TestRequestTrailerMapImpl request_trailers_;
Expand Down
3 changes: 2 additions & 1 deletion test/extensions/filters/http/ext_proc/mock_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ class MockClient : public ExternalProcessorClient {
public:
MockClient();
~MockClient() override;
MOCK_METHOD(ExternalProcessorStreamPtr, start, (ExternalProcessorCallbacks&));
MOCK_METHOD(ExternalProcessorStreamPtr, start,
(ExternalProcessorCallbacks&, const StreamInfo::StreamInfo& stream_info));
};

class MockStream : public ExternalProcessorStream {
Expand Down
11 changes: 8 additions & 3 deletions test/extensions/filters/http/ext_proc/ordering_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#include "test/mocks/network/mocks.h"
#include "test/mocks/router/mocks.h"
#include "test/mocks/runtime/mocks.h"
#include "test/mocks/stream_info/mocks.h"
#include "test/mocks/tracing/mocks.h"
#include "test/mocks/upstream/cluster_manager.h"

Expand Down Expand Up @@ -56,10 +57,11 @@ class OrderingTest : public testing::Test {
void initialize(absl::optional<std::function<void(ExternalProcessor&)>> cb) {
client_ = std::make_unique<MockClient>();
route_ = std::make_shared<NiceMock<Router::MockRoute>>();
EXPECT_CALL(*client_, start(_)).WillOnce(Invoke(this, &OrderingTest::doStart));
EXPECT_CALL(*client_, start(_, _)).WillOnce(Invoke(this, &OrderingTest::doStart));
EXPECT_CALL(encoder_callbacks_, dispatcher()).WillRepeatedly(ReturnRef(dispatcher_));
EXPECT_CALL(decoder_callbacks_, dispatcher()).WillRepeatedly(ReturnRef(dispatcher_));
EXPECT_CALL(decoder_callbacks_, route()).WillRepeatedly(Return(route_));
EXPECT_CALL(decoder_callbacks_, streamInfo()).WillRepeatedly(ReturnRef(stream_info_));

ExternalProcessor proto_config;
proto_config.mutable_grpc_service()->mutable_envoy_grpc()->set_cluster_name("ext_proc_server");
Expand All @@ -75,7 +77,8 @@ class OrderingTest : public testing::Test {
void TearDown() override { filter_->onDestroy(); }

// Called by the "start" method on the stream by the filter
virtual ExternalProcessorStreamPtr doStart(ExternalProcessorCallbacks& callbacks) {
virtual ExternalProcessorStreamPtr doStart(ExternalProcessorCallbacks& callbacks,
const StreamInfo::StreamInfo&) {
stream_callbacks_ = &callbacks;
auto stream = std::make_unique<MockStream>();
EXPECT_CALL(*stream, send(_, _)).WillRepeatedly(Invoke(this, &OrderingTest::doSend));
Expand Down Expand Up @@ -205,6 +208,7 @@ class OrderingTest : public testing::Test {
Router::RouteConstSharedPtr route_;
Http::MockStreamDecoderFilterCallbacks decoder_callbacks_;
Http::MockStreamEncoderFilterCallbacks encoder_callbacks_;
testing::NiceMock<StreamInfo::MockStreamInfo> stream_info_;
Http::TestRequestHeaderMapImpl request_headers_;
Http::TestResponseHeaderMapImpl response_headers_;
Http::TestRequestTrailerMapImpl request_trailers_;
Expand All @@ -215,7 +219,8 @@ class OrderingTest : public testing::Test {

class FastFailOrderingTest : public OrderingTest {
// All tests using this class have gRPC streams that will fail while being opened.
ExternalProcessorStreamPtr doStart(ExternalProcessorCallbacks& callbacks) override {
ExternalProcessorStreamPtr doStart(ExternalProcessorCallbacks& callbacks,
const StreamInfo::StreamInfo&) override {
auto stream = std::make_unique<MockStream>();
EXPECT_CALL(*stream, close());
callbacks.onGrpcError(Grpc::Status::Internal);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,12 @@ class StreamingIntegrationTest : public HttpIntegrationTest,
const auto addr = Network::Test::getCanonicalLoopbackAddress(ipVersion());
const auto addr_port = Network::Utility::getAddressWithPort(*addr, test_processor_.port());
setGrpcService(*proto_config_.mutable_grpc_service(), "ext_proc_server", addr_port);
// Insert some extra metadata. This ensures that we are actually passing the
// "stream info" from the original HTTP request all the way down to the
// ext_proc stream.
auto* metadata = proto_config_.mutable_grpc_service()->mutable_initial_metadata()->Add();
gbrail marked this conversation as resolved.
Show resolved Hide resolved
Comment on lines +72 to +75
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just so I understand, how is this related to stream info? I would have expected this to be used when the grpc stream is created by reading the config, not sure how this would be plumbed though via StreamInfo?

metadata->set_key("x-request_id");
metadata->set_value("%REQ(x-request-id)%");

// Merge the filter.
envoy::config::listener::v3::Filter ext_proc_filter;
Expand Down