From fffe63161f3a3e19675dbf1b91bc06a328ec4fd0 Mon Sep 17 00:00:00 2001 From: Gregory Brail Date: Mon, 20 Sep 2021 20:27:34 +0000 Subject: [PATCH 1/2] ext_proc: Pass stream_info to gRPC streams The ext_proc filter was not passing the stream_info structure down to the gRPC stream that it creates. As a result, certain metadata keys could cause the filter to crash when trying to open the stream. Signed-off-by: Gregory Brail --- source/extensions/filters/http/ext_proc/BUILD | 1 + source/extensions/filters/http/ext_proc/client.h | 4 +++- .../filters/http/ext_proc/client_impl.cc | 10 +++++++--- .../extensions/filters/http/ext_proc/client_impl.h | 7 +++++-- .../extensions/filters/http/ext_proc/ext_proc.cc | 2 +- .../filters/http/ext_proc/client_test.cc | 14 ++++++++------ .../filters/http/ext_proc/filter_test.cc | 7 +++++-- .../extensions/filters/http/ext_proc/mock_server.h | 3 ++- .../filters/http/ext_proc/ordering_test.cc | 11 ++++++++--- .../http/ext_proc/streaming_integration_test.cc | 6 ++++++ 10 files changed, 46 insertions(+), 19 deletions(-) diff --git a/source/extensions/filters/http/ext_proc/BUILD b/source/extensions/filters/http/ext_proc/BUILD index e49813400fb6..4fb765eb80fc 100644 --- a/source/extensions/filters/http/ext_proc/BUILD +++ b/source/extensions/filters/http/ext_proc/BUILD @@ -51,6 +51,7 @@ envoy_cc_library( hdrs = ["client.h"], deps = [ "//envoy/grpc:status", + "//envoy/stream_info:stream_info_interface", "@envoy_api//envoy/service/ext_proc/v3alpha:pkg_cc_proto", ], ) diff --git a/source/extensions/filters/http/ext_proc/client.h b/source/extensions/filters/http/ext_proc/client.h index 59df9985e14f..7dcc434f2a9a 100644 --- a/source/extensions/filters/http/ext_proc/client.h +++ b/source/extensions/filters/http/ext_proc/client.h @@ -5,6 +5,7 @@ #include "envoy/common/pure.h" #include "envoy/grpc/status.h" #include "envoy/service/ext_proc/v3alpha/external_processor.pb.h" +#include "envoy/stream_info/stream_info.h" namespace Envoy { namespace Extensions { @@ -34,7 +35,8 @@ class ExternalProcessorCallbacks { class ExternalProcessorClient { public: virtual ~ExternalProcessorClient() = default; - virtual ExternalProcessorStreamPtr start(ExternalProcessorCallbacks& callbacks) PURE; + virtual ExternalProcessorStreamPtr start(ExternalProcessorCallbacks& callbacks, + const StreamInfo::StreamInfo& stream_info) PURE; }; using ExternalProcessorClientPtr = std::unique_ptr; diff --git a/source/extensions/filters/http/ext_proc/client_impl.cc b/source/extensions/filters/http/ext_proc/client_impl.cc index d8834c8a827a..b777d5e5d761 100644 --- a/source/extensions/filters/http/ext_proc/client_impl.cc +++ b/source/extensions/filters/http/ext_proc/client_impl.cc @@ -15,19 +15,23 @@ ExternalProcessorClientImpl::ExternalProcessorClientImpl( } ExternalProcessorStreamPtr -ExternalProcessorClientImpl::start(ExternalProcessorCallbacks& callbacks) { +ExternalProcessorClientImpl::start(ExternalProcessorCallbacks& callbacks, + const StreamInfo::StreamInfo& stream_info) { Grpc::AsyncClient grpcClient( factory_->createUncachedRawAsyncClient()); - return std::make_unique(std::move(grpcClient), callbacks); + return std::make_unique(std::move(grpcClient), callbacks, + stream_info); } ExternalProcessorStreamImpl::ExternalProcessorStreamImpl( Grpc::AsyncClient&& client, - ExternalProcessorCallbacks& callbacks) + ExternalProcessorCallbacks& callbacks, const StreamInfo::StreamInfo& stream_info) : callbacks_(callbacks) { client_ = std::move(client); auto descriptor = Protobuf::DescriptorPool::generated_pool()->FindMethodByName(kExternalMethod); + grpc_context_.stream_info = &stream_info; Http::AsyncClient::StreamOptions options; + options.setParentContext(grpc_context_); stream_ = client_.start(*descriptor, *this, options); } diff --git a/source/extensions/filters/http/ext_proc/client_impl.h b/source/extensions/filters/http/ext_proc/client_impl.h index fdce71ed1a17..7f9a8313c70d 100644 --- a/source/extensions/filters/http/ext_proc/client_impl.h +++ b/source/extensions/filters/http/ext_proc/client_impl.h @@ -27,7 +27,8 @@ class ExternalProcessorClientImpl : public ExternalProcessorClient { const envoy::config::core::v3::GrpcService& grpc_service, Stats::Scope& scope); - ExternalProcessorStreamPtr start(ExternalProcessorCallbacks& callbacks) override; + ExternalProcessorStreamPtr start(ExternalProcessorCallbacks& callbacks, + const StreamInfo::StreamInfo& stream_info) override; private: Grpc::AsyncClientFactoryPtr factory_; @@ -38,7 +39,8 @@ class ExternalProcessorStreamImpl : public ExternalProcessorStream, public Logger::Loggable { public: ExternalProcessorStreamImpl(Grpc::AsyncClient&& client, - ExternalProcessorCallbacks& callbacks); + ExternalProcessorCallbacks& callbacks, + const StreamInfo::StreamInfo& stream_info); void send(ProcessingRequest&& request, bool end_stream) override; // Close the stream. This is idempotent and will return true if we // actually closed it. @@ -57,6 +59,7 @@ class ExternalProcessorStreamImpl : public ExternalProcessorStream, ExternalProcessorCallbacks& callbacks_; Grpc::AsyncClient client_; Grpc::AsyncStream stream_; + Http::AsyncClient::ParentContext grpc_context_; bool stream_closed_ = false; }; diff --git a/source/extensions/filters/http/ext_proc/ext_proc.cc b/source/extensions/filters/http/ext_proc/ext_proc.cc index e04a8cf85701..91d00843f541 100644 --- a/source/extensions/filters/http/ext_proc/ext_proc.cc +++ b/source/extensions/filters/http/ext_proc/ext_proc.cc @@ -55,7 +55,7 @@ Filter::StreamOpenState Filter::openStream() { ENVOY_BUG(!processing_complete_, "openStream should not have been called"); if (!stream_) { ENVOY_LOG(debug, "Opening gRPC stream to external processor"); - stream_ = client_->start(*this); + stream_ = client_->start(*this, decoder_callbacks_->streamInfo()); stats_.streams_started_.inc(); if (processing_complete_) { // Stream failed while starting and either onGrpcError or onGrpcClose was already called diff --git a/test/extensions/filters/http/ext_proc/client_test.cc b/test/extensions/filters/http/ext_proc/client_test.cc index 07f0ecbb3bc2..c6e8164f08b7 100644 --- a/test/extensions/filters/http/ext_proc/client_test.cc +++ b/test/extensions/filters/http/ext_proc/client_test.cc @@ -6,6 +6,7 @@ #include "test/mocks/grpc/mocks.h" #include "test/mocks/stats/mocks.h" +#include "test/mocks/stream_info/mocks.h" #include "gmock/gmock.h" #include "gtest/gtest.h" @@ -76,18 +77,19 @@ class ExtProcStreamTest : public testing::Test, public ExternalProcessorCallback Grpc::MockAsyncClientManager client_manager_; Grpc::MockAsyncStream stream_; Grpc::RawAsyncStreamCallbacks* stream_callbacks_; + testing::NiceMock stream_info_; testing::NiceMock stats_store_; }; TEST_F(ExtProcStreamTest, OpenCloseStream) { - auto stream = client_->start(*this); + auto stream = client_->start(*this, stream_info_); EXPECT_CALL(stream_, closeStream()); stream->close(); } TEST_F(ExtProcStreamTest, SendToStream) { - auto stream = client_->start(*this); + auto stream = client_->start(*this, stream_info_); // Send something and ensure that we get it. Doesn't really matter what. EXPECT_CALL(stream_, sendMessageRaw_(_, false)); ProcessingRequest req; @@ -97,14 +99,14 @@ TEST_F(ExtProcStreamTest, SendToStream) { } TEST_F(ExtProcStreamTest, SendAndClose) { - auto stream = client_->start(*this); + auto stream = client_->start(*this, stream_info_); EXPECT_CALL(stream_, sendMessageRaw_(_, true)); ProcessingRequest req; stream->send(std::move(req), true); } TEST_F(ExtProcStreamTest, ReceiveFromStream) { - auto stream = client_->start(*this); + auto stream = client_->start(*this, stream_info_); ASSERT_NE(stream_callbacks_, nullptr); // Send something and ensure that we get it. Doesn't really matter what. ProcessingResponse resp; @@ -133,7 +135,7 @@ TEST_F(ExtProcStreamTest, ReceiveFromStream) { } TEST_F(ExtProcStreamTest, StreamClosed) { - auto stream = client_->start(*this); + auto stream = client_->start(*this, stream_info_); ASSERT_NE(stream_callbacks_, nullptr); EXPECT_FALSE(last_response_); EXPECT_FALSE(grpc_closed_); @@ -146,7 +148,7 @@ TEST_F(ExtProcStreamTest, StreamClosed) { } TEST_F(ExtProcStreamTest, StreamError) { - auto stream = client_->start(*this); + auto stream = client_->start(*this, stream_info_); ASSERT_NE(stream_callbacks_, nullptr); EXPECT_FALSE(last_response_); EXPECT_FALSE(grpc_closed_); diff --git a/test/extensions/filters/http/ext_proc/filter_test.cc b/test/extensions/filters/http/ext_proc/filter_test.cc index 86f1803d088a..c4207736d6b7 100644 --- a/test/extensions/filters/http/ext_proc/filter_test.cc +++ b/test/extensions/filters/http/ext_proc/filter_test.cc @@ -10,6 +10,7 @@ #include "test/mocks/network/mocks.h" #include "test/mocks/router/mocks.h" #include "test/mocks/runtime/mocks.h" +#include "test/mocks/stream_info/mocks.h" #include "test/mocks/tracing/mocks.h" #include "test/mocks/upstream/cluster_manager.h" #include "test/test_common/printers.h" @@ -57,10 +58,11 @@ class HttpFilterTest : public testing::Test { void initialize(std::string&& yaml) { client_ = std::make_unique(); route_ = std::make_shared>(); - EXPECT_CALL(*client_, start(_)).WillOnce(Invoke(this, &HttpFilterTest::doStart)); + EXPECT_CALL(*client_, start(_, _)).WillOnce(Invoke(this, &HttpFilterTest::doStart)); EXPECT_CALL(encoder_callbacks_, dispatcher()).WillRepeatedly(ReturnRef(dispatcher_)); EXPECT_CALL(decoder_callbacks_, dispatcher()).WillRepeatedly(ReturnRef(dispatcher_)); EXPECT_CALL(decoder_callbacks_, route()).WillRepeatedly(Return(route_)); + EXPECT_CALL(decoder_callbacks_, streamInfo()).WillRepeatedly(ReturnRef(stream_info_)); EXPECT_CALL(dispatcher_, createTimer_(_)) .Times(AnyNumber()) .WillRepeatedly(Invoke([this](Unused) { @@ -98,7 +100,7 @@ class HttpFilterTest : public testing::Test { } } - ExternalProcessorStreamPtr doStart(ExternalProcessorCallbacks& callbacks) { + ExternalProcessorStreamPtr doStart(ExternalProcessorCallbacks& callbacks, testing::Unused) { stream_callbacks_ = &callbacks; auto stream = std::make_unique(); @@ -244,6 +246,7 @@ class HttpFilterTest : public testing::Test { Http::MockStreamDecoderFilterCallbacks decoder_callbacks_; Http::MockStreamEncoderFilterCallbacks encoder_callbacks_; Router::RouteConstSharedPtr route_; + testing::NiceMock stream_info_; Http::TestRequestHeaderMapImpl request_headers_; Http::TestResponseHeaderMapImpl response_headers_; Http::TestRequestTrailerMapImpl request_trailers_; diff --git a/test/extensions/filters/http/ext_proc/mock_server.h b/test/extensions/filters/http/ext_proc/mock_server.h index bb479d175e29..232590fc5773 100644 --- a/test/extensions/filters/http/ext_proc/mock_server.h +++ b/test/extensions/filters/http/ext_proc/mock_server.h @@ -13,7 +13,8 @@ class MockClient : public ExternalProcessorClient { public: MockClient(); ~MockClient() override; - MOCK_METHOD(ExternalProcessorStreamPtr, start, (ExternalProcessorCallbacks&)); + MOCK_METHOD(ExternalProcessorStreamPtr, start, + (ExternalProcessorCallbacks&, const StreamInfo::StreamInfo& stream_info)); }; class MockStream : public ExternalProcessorStream { diff --git a/test/extensions/filters/http/ext_proc/ordering_test.cc b/test/extensions/filters/http/ext_proc/ordering_test.cc index edc477994513..f35bdda128c8 100644 --- a/test/extensions/filters/http/ext_proc/ordering_test.cc +++ b/test/extensions/filters/http/ext_proc/ordering_test.cc @@ -9,6 +9,7 @@ #include "test/mocks/network/mocks.h" #include "test/mocks/router/mocks.h" #include "test/mocks/runtime/mocks.h" +#include "test/mocks/stream_info/mocks.h" #include "test/mocks/tracing/mocks.h" #include "test/mocks/upstream/cluster_manager.h" @@ -56,10 +57,11 @@ class OrderingTest : public testing::Test { void initialize(absl::optional> cb) { client_ = std::make_unique(); route_ = std::make_shared>(); - EXPECT_CALL(*client_, start(_)).WillOnce(Invoke(this, &OrderingTest::doStart)); + EXPECT_CALL(*client_, start(_, _)).WillOnce(Invoke(this, &OrderingTest::doStart)); EXPECT_CALL(encoder_callbacks_, dispatcher()).WillRepeatedly(ReturnRef(dispatcher_)); EXPECT_CALL(decoder_callbacks_, dispatcher()).WillRepeatedly(ReturnRef(dispatcher_)); EXPECT_CALL(decoder_callbacks_, route()).WillRepeatedly(Return(route_)); + EXPECT_CALL(decoder_callbacks_, streamInfo()).WillRepeatedly(ReturnRef(stream_info_)); ExternalProcessor proto_config; proto_config.mutable_grpc_service()->mutable_envoy_grpc()->set_cluster_name("ext_proc_server"); @@ -75,7 +77,8 @@ class OrderingTest : public testing::Test { void TearDown() override { filter_->onDestroy(); } // Called by the "start" method on the stream by the filter - virtual ExternalProcessorStreamPtr doStart(ExternalProcessorCallbacks& callbacks) { + virtual ExternalProcessorStreamPtr doStart(ExternalProcessorCallbacks& callbacks, + const StreamInfo::StreamInfo&) { stream_callbacks_ = &callbacks; auto stream = std::make_unique(); EXPECT_CALL(*stream, send(_, _)).WillRepeatedly(Invoke(this, &OrderingTest::doSend)); @@ -205,6 +208,7 @@ class OrderingTest : public testing::Test { Router::RouteConstSharedPtr route_; Http::MockStreamDecoderFilterCallbacks decoder_callbacks_; Http::MockStreamEncoderFilterCallbacks encoder_callbacks_; + testing::NiceMock stream_info_; Http::TestRequestHeaderMapImpl request_headers_; Http::TestResponseHeaderMapImpl response_headers_; Http::TestRequestTrailerMapImpl request_trailers_; @@ -215,7 +219,8 @@ class OrderingTest : public testing::Test { class FastFailOrderingTest : public OrderingTest { // All tests using this class have gRPC streams that will fail while being opened. - ExternalProcessorStreamPtr doStart(ExternalProcessorCallbacks& callbacks) override { + ExternalProcessorStreamPtr doStart(ExternalProcessorCallbacks& callbacks, + const StreamInfo::StreamInfo&) override { auto stream = std::make_unique(); EXPECT_CALL(*stream, close()); callbacks.onGrpcError(Grpc::Status::Internal); diff --git a/test/extensions/filters/http/ext_proc/streaming_integration_test.cc b/test/extensions/filters/http/ext_proc/streaming_integration_test.cc index 4e4755483833..09ef25689b00 100644 --- a/test/extensions/filters/http/ext_proc/streaming_integration_test.cc +++ b/test/extensions/filters/http/ext_proc/streaming_integration_test.cc @@ -69,6 +69,12 @@ class StreamingIntegrationTest : public HttpIntegrationTest, const auto addr = Network::Test::getCanonicalLoopbackAddress(ipVersion()); const auto addr_port = Network::Utility::getAddressWithPort(*addr, test_processor_.port()); setGrpcService(*proto_config_.mutable_grpc_service(), "ext_proc_server", addr_port); + // Insert some extra metadata. This ensures that we are actually passing the + // "stream info" from the original HTTP request all the way down to the + // ext_proc stream. + auto* metadata = proto_config_.mutable_grpc_service()->mutable_initial_metadata()->Add(); + metadata->set_key("x-request_id"); + metadata->set_value("%REQ(x-request-id)%"); // Merge the filter. envoy::config::listener::v3::Filter ext_proc_filter; From e8289cfbc6801135711726c8d51c91582eb09974 Mon Sep 17 00:00:00 2001 From: Gregory Brail Date: Thu, 23 Sep 2021 19:37:36 +0000 Subject: [PATCH 2/2] Add code to verify that metadata is actually sent Signed-off-by: Gregory Brail --- .../http/ext_proc/streaming_integration_test.cc | 15 ++++++++++++--- .../filters/http/ext_proc/test_processor.cc | 10 +++++++--- .../filters/http/ext_proc/test_processor.h | 11 +++++++++-- 3 files changed, 28 insertions(+), 8 deletions(-) diff --git a/test/extensions/filters/http/ext_proc/streaming_integration_test.cc b/test/extensions/filters/http/ext_proc/streaming_integration_test.cc index 09ef25689b00..14025f3a3ccc 100644 --- a/test/extensions/filters/http/ext_proc/streaming_integration_test.cc +++ b/test/extensions/filters/http/ext_proc/streaming_integration_test.cc @@ -73,7 +73,7 @@ class StreamingIntegrationTest : public HttpIntegrationTest, // "stream info" from the original HTTP request all the way down to the // ext_proc stream. auto* metadata = proto_config_.mutable_grpc_service()->mutable_initial_metadata()->Add(); - metadata->set_key("x-request_id"); + metadata->set_key("x-request-id"); metadata->set_value("%REQ(x-request-id)%"); // Merge the filter. @@ -140,14 +140,15 @@ INSTANTIATE_TEST_SUITE_P(StreamingProtocols, StreamingIntegrationTest, GRPC_CLIENT_INTEGRATION_PARAMS); // Send a body that's larger than the buffer limit, and have the processor return immediately -// after the headers come in. +// after the headers come in. Also check the metadata in this test. TEST_P(StreamingIntegrationTest, PostAndProcessHeadersOnly) { uint32_t num_chunks = 150; uint32_t chunk_size = 1000; // This starts the gRPC server in the background. It'll be shut down when we stop the tests. test_processor_.start( - ipVersion(), [](grpc::ServerReaderWriter* stream) { + ipVersion(), + [](grpc::ServerReaderWriter* stream) { // This is the same gRPC stream processing code that a "user" of ext_proc // would write. In this case, we expect to receive a request_headers // message, and then close the stream. @@ -160,12 +161,20 @@ TEST_P(StreamingIntegrationTest, PostAndProcessHeadersOnly) { stream->Write(header_resp); // Returning here closes the stream, unless we had an ASSERT failure // previously. + }, + [](grpc::ServerContext* ctx) { + // Verify that the metadata set in the grpc client configuration + // above is actually sent to our RPC. + auto request_id = ctx->client_metadata().find("x-request-id"); + ASSERT_NE(request_id, ctx->client_metadata().end()); + EXPECT_EQ(request_id->second, "sent some metadata"); }); initializeConfig(); HttpIntegrationTest::initialize(); auto& encoder = sendClientRequestHeaders([num_chunks, chunk_size](Http::HeaderMap& headers) { headers.addCopy(LowerCaseString("expect_request_size_bytes"), num_chunks * chunk_size); + headers.addCopy(LowerCaseString("x-request-id"), "sent some metadata"); }); for (uint32_t i = 0; i < num_chunks; i++) { diff --git a/test/extensions/filters/http/ext_proc/test_processor.cc b/test/extensions/filters/http/ext_proc/test_processor.cc index 47267ce8024f..b62807cab6bd 100644 --- a/test/extensions/filters/http/ext_proc/test_processor.cc +++ b/test/extensions/filters/http/ext_proc/test_processor.cc @@ -13,9 +13,12 @@ namespace HttpFilters { namespace ExternalProcessing { grpc::Status ProcessorWrapper::Process( - grpc::ServerContext*, + grpc::ServerContext* ctx, grpc::ServerReaderWriter* stream) { + if (context_callback_) { + (*context_callback_)(ctx); + } callback_(stream); if (testing::Test::HasFatalFailure()) { // This is not strictly necessary, but it may help in troubleshooting to @@ -26,8 +29,9 @@ grpc::Status ProcessorWrapper::Process( return grpc::Status::OK; } -void TestProcessor::start(const Network::Address::IpVersion ip_version, ProcessingFunc cb) { - wrapper_ = std::make_unique(cb); +void TestProcessor::start(const Network::Address::IpVersion ip_version, ProcessingFunc cb, + absl::optional context_cb) { + wrapper_ = std::make_unique(cb, context_cb); grpc::ServerBuilder builder; builder.RegisterService(wrapper_.get()); builder.AddListeningPort( diff --git a/test/extensions/filters/http/ext_proc/test_processor.h b/test/extensions/filters/http/ext_proc/test_processor.h index 17fae05c77fe..1b875669b74a 100644 --- a/test/extensions/filters/http/ext_proc/test_processor.h +++ b/test/extensions/filters/http/ext_proc/test_processor.h @@ -21,11 +21,16 @@ using ProcessingFunc = std::function*)>; +// An implementation of this function may be called so that a test may verify +// the gRPC context. +using ContextProcessingFunc = std::function; + // An implementation of the ExternalProcessor service that may be included // in integration tests. class ProcessorWrapper : public envoy::service::ext_proc::v3alpha::ExternalProcessor::Service { public: - ProcessorWrapper(ProcessingFunc& cb) : callback_(cb) {} + ProcessorWrapper(ProcessingFunc& cb, absl::optional context_cb) + : callback_(cb), context_callback_(context_cb) {} grpc::Status Process(grpc::ServerContext*, @@ -35,6 +40,7 @@ class ProcessorWrapper : public envoy::service::ext_proc::v3alpha::ExternalProce private: ProcessingFunc callback_; + absl::optional context_callback_; }; // This class starts a gRPC server supporting the ExternalProcessor service. @@ -45,7 +51,8 @@ class TestProcessor { // Start the processor listening on an ephemeral port (port 0) on the local host. // All new streams will be delegated to the specified function. The function // will be invoked in a background thread controlled by the gRPC server. - void start(const Network::Address::IpVersion ip_version, ProcessingFunc cb); + void start(const Network::Address::IpVersion ip_version, ProcessingFunc cb, + absl::optional context_cb = absl::nullopt); // Stop the processor from listening once all streams are closed, and exit // the listening threads.