Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

grpc: adding limits to outbound buffered data for google-gRPC access logs #11072

Merged
merged 6 commits into from
May 12, 2020
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion api/envoy/config/core/v3/grpc_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import "google/protobuf/any.proto";
import "google/protobuf/duration.proto";
import "google/protobuf/empty.proto";
import "google/protobuf/struct.proto";
import "google/protobuf/wrappers.proto";

import "udpa/annotations/sensitive.proto";
import "udpa/annotations/status.proto";
Expand Down Expand Up @@ -37,7 +38,7 @@ message GrpcService {
string cluster_name = 1 [(validate.rules).string = {min_bytes: 1}];
}

// [#next-free-field: 7]
// [#next-free-field: 8]
message GoogleGrpc {
option (udpa.annotations.versioning).previous_message_type =
"envoy.api.v2.core.GrpcService.GoogleGrpc";
Expand Down Expand Up @@ -232,6 +233,10 @@ message GrpcService {
// Additional configuration for site-specific customizations of the Google
// gRPC library.
google.protobuf.Struct config = 6;

// How many bytes each steam can buffer internally.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: s/steam/stream/g

// If not set an implementation defined default is applied (1MiB).
google.protobuf.UInt32Value per_stream_buffer_limit_bytes = 7;
}

reserved 4;
Expand Down
7 changes: 6 additions & 1 deletion api/envoy/config/core/v4alpha/grpc_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import "google/protobuf/any.proto";
import "google/protobuf/duration.proto";
import "google/protobuf/empty.proto";
import "google/protobuf/struct.proto";
import "google/protobuf/wrappers.proto";

import "udpa/annotations/sensitive.proto";
import "udpa/annotations/status.proto";
Expand Down Expand Up @@ -37,7 +38,7 @@ message GrpcService {
string cluster_name = 1 [(validate.rules).string = {min_bytes: 1}];
}

// [#next-free-field: 7]
// [#next-free-field: 8]
message GoogleGrpc {
option (udpa.annotations.versioning).previous_message_type =
"envoy.config.core.v3.GrpcService.GoogleGrpc";
Expand Down Expand Up @@ -232,6 +233,10 @@ message GrpcService {
// Additional configuration for site-specific customizations of the Google
// gRPC library.
google.protobuf.Struct config = 6;

// How many bytes each steam can buffer internally.
// If not set an implementation defined default is applied (1MiB).
google.protobuf.UInt32Value per_stream_buffer_limit_bytes = 7;
}

reserved 4;
Expand Down
2 changes: 1 addition & 1 deletion docs/root/version_history/current.rst
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ Changes
-------

* access loggers: added GRPC_STATUS operator on logging format.
* access loggers: applied existing buffer limits to the non-google gRPC access logs, as well as :ref:`stats <config_access_log_stats>` for logged / dropped logs.
* access loggers: applied existing buffer limits to access logs, as well as :ref:`stats <config_access_log_stats>` for logged / dropped logs. This can be reverted temporarily by setting runtime feature `envoy.reloadable_features.disallow_unbounded_access_logs` to false.
* access loggers: extened specifier for FilterStateFormatter to output :ref:`unstructured log string <config_access_log_format_filter_state>`.
* compressor: generic :ref:`compressor <config_http_filters_compressor>` filter exposed to users.
* config: added :ref:`version_text <config_cluster_manager_cds>` stat that reflects xDS version.
Expand Down
7 changes: 6 additions & 1 deletion generated_api_shadow/envoy/config/core/v3/grpc_service.proto

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 8 additions & 1 deletion source/common/grpc/google_async_client_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@

namespace Envoy {
namespace Grpc {
namespace {
static constexpr int DefaultBufferLimitBytes = 1024 * 1024;
}

GoogleAsyncClientThreadLocal::GoogleAsyncClientThreadLocal(Api::Api& api)
: completion_thread_(api.threadFactory().createThread([this] { completionThread(); })) {}
Expand Down Expand Up @@ -75,7 +78,9 @@ GoogleAsyncClientImpl::GoogleAsyncClientImpl(Event::Dispatcher& dispatcher,
const envoy::config::core::v3::GrpcService& config,
Api::Api& api, const StatNames& stat_names)
: dispatcher_(dispatcher), tls_(tls), stat_prefix_(config.google_grpc().stat_prefix()),
initial_metadata_(config.initial_metadata()), scope_(scope) {
initial_metadata_(config.initial_metadata()), scope_(scope),
per_stream_buffer_limit_bytes_(PROTOBUF_GET_WRAPPED_OR_DEFAULT(
config.google_grpc(), per_stream_buffer_limit_bytes, DefaultBufferLimitBytes)) {
// We rebuild the channel each time we construct the channel. It appears that the gRPC library is
// smart enough to do connection pooling and reuse with identical channel args, so this should
// have comparable overhead to what we are doing in Grpc::AsyncClientImpl, i.e. no expensive
Expand Down Expand Up @@ -211,6 +216,7 @@ void GoogleAsyncStreamImpl::sendMessageRaw(Buffer::InstancePtr&& request, bool e
write_pending_queue_.emplace(std::move(request), end_stream);
ENVOY_LOG(trace, "Queued message to write ({} bytes)",
write_pending_queue_.back().buf_.value().Length());
bytes_in_write_pending_queue_ += write_pending_queue_.back().buf_.value().Length();
writeQueued();
}

Expand Down Expand Up @@ -313,6 +319,7 @@ void GoogleAsyncStreamImpl::handleOpCompletion(GoogleAsyncTag::Operation op, boo
case GoogleAsyncTag::Operation::Write: {
ASSERT(ok);
write_pending_ = false;
bytes_in_write_pending_queue_ -= write_pending_queue_.front().buf_.value().Length();
write_pending_queue_.pop();
writeQueued();
break;
Expand Down
11 changes: 9 additions & 2 deletions source/common/grpc/google_async_client_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@ class GoogleAsyncClientImpl final : public RawAsyncClient, Logger::Loggable<Logg
const Http::AsyncClient::StreamOptions& options) override;

TimeSource& timeSource() { return dispatcher_.timeSource(); }
uint64_t perStreamBufferLimitBytes() const { return per_stream_buffer_limit_bytes_; }

private:
Event::Dispatcher& dispatcher_;
Expand All @@ -190,6 +191,7 @@ class GoogleAsyncClientImpl final : public RawAsyncClient, Logger::Loggable<Logg
const Protobuf::RepeatedPtrField<envoy::config::core::v3::HeaderValue> initial_metadata_;
Stats::ScopeSharedPtr scope_;
GoogleAsyncClientStats stats_;
uint64_t per_stream_buffer_limit_bytes_;

friend class GoogleAsyncClientThreadLocal;
friend class GoogleAsyncRequestImpl;
Expand All @@ -212,8 +214,12 @@ class GoogleAsyncStreamImpl : public RawAsyncStream,
void sendMessageRaw(Buffer::InstancePtr&& request, bool end_stream) override;
void closeStream() override;
void resetStream() override;
// The GoogleAsyncClientImpl doesn't do Envoy watermark based flow control.
bool isAboveWriteBufferHighWatermark() const override { return false; }
// While the Google-gRPC code doesn't use Envoy watermark buffers, the logical
// analog is to make sure that the aren't too many bytes in the pending write
// queue.
bool isAboveWriteBufferHighWatermark() const override {
htuch marked this conversation as resolved.
Show resolved Hide resolved
return bytes_in_write_pending_queue_ > parent_.perStreamBufferLimitBytes();
}

protected:
bool callFailed() const { return call_failed_; }
Expand Down Expand Up @@ -274,6 +280,7 @@ class GoogleAsyncStreamImpl : public RawAsyncStream,
grpc::ClientContext ctxt_;
std::unique_ptr<grpc::GenericClientAsyncReaderWriter> rw_;
std::queue<PendingMessage> write_pending_queue_;
uint64_t bytes_in_write_pending_queue_{};
grpc::ByteBuffer read_buf_;
grpc::Status status_;
// Has Operation::Init completed?
Expand Down
2 changes: 2 additions & 0 deletions source/common/runtime/runtime_features.cc
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,9 @@ constexpr const char* runtime_features[] = {
"envoy.reloadable_features.connection_header_sanitization",
"envoy.reloadable_features.strict_authority_validation",
"envoy.reloadable_features.reject_unsupported_transfer_encodings",
// Begin alphabetically sorted section.
"envoy.deprecated_features.allow_deprecated_extension_names",
"envoy.reloadable_features.disallow_unbounded_access_logs",
"envoy.reloadable_features.ext_authz_http_service_enable_case_sensitive_string_matcher",
"envoy.reloadable_features.fix_upgrade_response",
"envoy.reloadable_features.listener_in_place_filterchain_update",
Expand Down
1 change: 1 addition & 0 deletions source/extensions/access_loggers/grpc/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ envoy_cc_library(
"//include/envoy/upstream:upstream_interface",
"//source/common/grpc:async_client_lib",
"//source/common/grpc:typed_async_client_lib",
"//source/common/runtime:runtime_features_lib",
"//source/extensions/access_loggers/common:access_log_base",
"@envoy_api//envoy/data/accesslog/v3:pkg_cc_proto",
"@envoy_api//envoy/extensions/access_loggers/grpc/v3:pkg_cc_proto",
Expand Down
9 changes: 7 additions & 2 deletions source/extensions/access_loggers/grpc/grpc_access_log_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

#include "common/common/assert.h"
#include "common/network/utility.h"
#include "common/runtime/runtime_features.h"
#include "common/stream_info/utility.h"

namespace Envoy {
Expand Down Expand Up @@ -51,8 +52,12 @@ bool GrpcAccessLoggerImpl::canLogMore() {
stats_.logs_written_.inc();
return true;
}
stats_.logs_dropped_.inc();
return false;
if (Runtime::runtimeFeatureEnabled("envoy.reloadable_features.disallow_unbounded_access_logs")) {
stats_.logs_dropped_.inc();
htuch marked this conversation as resolved.
Show resolved Hide resolved
return false;
}
stats_.logs_written_.inc();
return true;
}

void GrpcAccessLoggerImpl::log(envoy::data::accesslog::v3::HTTPAccessLogEntry&& entry) {
Expand Down
47 changes: 44 additions & 3 deletions test/common/grpc/google_async_client_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

using testing::_;
using testing::Eq;
using testing::NiceMock;
using testing::Return;

namespace Envoy {
Expand Down Expand Up @@ -54,15 +55,18 @@ class EnvoyGoogleAsyncClientImplTest : public testing::Test {
method_descriptor_(helloworld::Greeter::descriptor()->FindMethodByName("SayHello")),
stat_names_(scope_->symbolTable()) {

envoy::config::core::v3::GrpcService config;
auto* google_grpc = config.mutable_google_grpc();
auto* google_grpc = config_.mutable_google_grpc();
google_grpc->set_target_uri("fake_address");
google_grpc->set_stat_prefix("test_cluster");
tls_ = std::make_unique<GoogleAsyncClientThreadLocal>(*api_);
}

virtual void initialize() {
grpc_client_ = std::make_unique<GoogleAsyncClientImpl>(*dispatcher_, *tls_, stub_factory_,
scope_, config, *api_, stat_names_);
scope_, config_, *api_, stat_names_);
}

envoy::config::core::v3::GrpcService config_;
DangerousDeprecatedTestTime test_time_;
Stats::IsolatedStoreImpl* stats_store_; // Ownership transferred to scope_.
Api::ApiPtr api_;
Expand All @@ -78,6 +82,8 @@ class EnvoyGoogleAsyncClientImplTest : public testing::Test {
// Validate that a failure in gRPC stub call creation returns immediately with
// status UNAVAILABLE.
TEST_F(EnvoyGoogleAsyncClientImplTest, StreamHttpStartFail) {
initialize();

EXPECT_CALL(*stub_factory_.stub_, PrepareCall_(_, _, _)).WillOnce(Return(nullptr));
MockAsyncStreamCallbacks<helloworld::HelloReply> grpc_callbacks;
EXPECT_CALL(grpc_callbacks, onCreateInitialMetadata(_));
Expand All @@ -91,6 +97,8 @@ TEST_F(EnvoyGoogleAsyncClientImplTest, StreamHttpStartFail) {
// Validate that a failure in gRPC stub call creation returns immediately with
// status UNAVAILABLE.
TEST_F(EnvoyGoogleAsyncClientImplTest, RequestHttpStartFail) {
initialize();

EXPECT_CALL(*stub_factory_.stub_, PrepareCall_(_, _, _)).WillOnce(Return(nullptr));
MockAsyncRequestCallbacks<helloworld::HelloReply> grpc_callbacks;
EXPECT_CALL(grpc_callbacks, onCreateInitialMetadata(_));
Expand All @@ -114,6 +122,39 @@ TEST_F(EnvoyGoogleAsyncClientImplTest, RequestHttpStartFail) {
EXPECT_TRUE(grpc_request == nullptr);
}

class EnvoyGoogleLessMockedAsyncClientImplTest : public EnvoyGoogleAsyncClientImplTest {
public:
void initialize() override {
grpc_client_ = std::make_unique<GoogleAsyncClientImpl>(*dispatcher_, *tls_, real_stub_factory_,
scope_, config_, *api_, stat_names_);
}

GoogleGenericStubFactory real_stub_factory_;
};

TEST_F(EnvoyGoogleLessMockedAsyncClientImplTest, TestOverflow) {
// Set an (unreasonably) low byte limit.
auto* google_grpc = config_.mutable_google_grpc();
google_grpc->mutable_per_stream_buffer_limit_bytes()->set_value(1);
initialize();

NiceMock<MockAsyncStreamCallbacks<helloworld::HelloReply>> grpc_callbacks;
AsyncStream<helloworld::HelloRequest> grpc_stream =
grpc_client_->start(*method_descriptor_, grpc_callbacks, Http::AsyncClient::RequestOptions());
EXPECT_FALSE(grpc_stream == nullptr);
EXPECT_FALSE(grpc_stream->isAboveWriteBufferHighWatermark());

// With no data in the message, it won't back up.
helloworld::HelloRequest request_msg;
grpc_stream->sendMessage(request_msg, false);
EXPECT_FALSE(grpc_stream->isAboveWriteBufferHighWatermark());

// With actual data we pass the very small byte limit.
request_msg.set_name("bob");
grpc_stream->sendMessage(request_msg, false);
EXPECT_TRUE(grpc_stream->isAboveWriteBufferHighWatermark());
}

} // namespace
} // namespace Grpc
} // namespace Envoy
1 change: 1 addition & 0 deletions test/extensions/access_loggers/grpc/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ envoy_extension_cc_test(
"//test/mocks/ssl:ssl_mocks",
"//test/mocks/stream_info:stream_info_mocks",
"//test/mocks/thread_local:thread_local_mocks",
"//test/test_common:test_runtime_lib",
"@envoy_api//envoy/config/core/v3:pkg_cc_proto",
"@envoy_api//envoy/data/accesslog/v3:pkg_cc_proto",
"@envoy_api//envoy/extensions/access_loggers/grpc/v3:pkg_cc_proto",
Expand Down
42 changes: 42 additions & 0 deletions test/extensions/access_loggers/grpc/grpc_access_log_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,10 @@
#include "test/mocks/ssl/mocks.h"
#include "test/mocks/stream_info/mocks.h"
#include "test/mocks/thread_local/mocks.h"
#include "test/test_common/test_runtime.h"

using testing::_;
using testing::AnyNumber;
using testing::InSequence;
using testing::Invoke;
using testing::NiceMock;
Expand Down Expand Up @@ -198,6 +200,46 @@ TEST_F(GrpcAccessLoggerImplTest, WatermarksOverrun) {
TestUtility::findCounter(stats_store_, "access_logs.grpc_access_log.logs_dropped")->value());
}

TEST_F(GrpcAccessLoggerImplTest, WatermarksLegacy) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this called legacy? Can you add a one liner // above this test to explain what it does?

TestScopedRuntime scoped_runtime;
Runtime::LoaderSingleton::getExisting()->mergeValues(
{{"envoy.reloadable_features.disallow_unbounded_access_logs", "false"}});

InSequence s;
initLogger(FlushInterval, 1);

// Start a stream for the first log.
MockAccessLogStream stream;
AccessLogCallbacks* callbacks;
expectStreamStart(stream, &callbacks);
EXPECT_CALL(local_info_, node());

EXPECT_CALL(stream, isAboveWriteBufferHighWatermark())
.Times(AnyNumber())
.WillRepeatedly(Return(true));

// Fail to flush, so the log stays buffered up.
envoy::data::accesslog::v3::HTTPAccessLogEntry entry;
entry.mutable_request()->set_path("/test/path1");
EXPECT_CALL(stream, sendMessageRaw_(_, false)).Times(0);
logger_->log(envoy::data::accesslog::v3::HTTPAccessLogEntry(entry));
EXPECT_EQ(
1,
TestUtility::findCounter(stats_store_, "access_logs.grpc_access_log.logs_written")->value());
EXPECT_EQ(
0,
TestUtility::findCounter(stats_store_, "access_logs.grpc_access_log.logs_dropped")->value());

// As with the above test, try to log more. The log will not be dropped.
EXPECT_CALL(stream, sendMessageRaw_(_, _)).Times(0);
logger_->log(envoy::data::accesslog::v3::HTTPAccessLogEntry(entry));
EXPECT_EQ(
2,
TestUtility::findCounter(stats_store_, "access_logs.grpc_access_log.logs_written")->value());
EXPECT_EQ(
0,
TestUtility::findCounter(stats_store_, "access_logs.grpc_access_log.logs_dropped")->value());
}
// Test that stream failure is handled correctly.
TEST_F(GrpcAccessLoggerImplTest, StreamFailure) {
InSequence s;
Expand Down