Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

access logging: gRPC logger retry to establish underlying stream connection #17469

Merged
merged 18 commits into from
Nov 5, 2021
Merged
11 changes: 10 additions & 1 deletion api/envoy/extensions/access_loggers/grpc/v3/als.proto
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ syntax = "proto3";

package envoy.extensions.access_loggers.grpc.v3;

import "envoy/config/core/v3/base.proto";
import "envoy/config/core/v3/config_source.proto";
import "envoy/config/core/v3/grpc_service.proto";

Expand Down Expand Up @@ -54,7 +55,7 @@ message TcpGrpcAccessLogConfig {
}

// Common configuration for gRPC access logs.
// [#next-free-field: 7]
// [#next-free-field: 8]
message CommonGrpcAccessLogConfig {
option (udpa.annotations.versioning).previous_message_type =
"envoy.config.accesslog.v2.CommonGrpcAccessLogConfig";
Expand Down Expand Up @@ -86,4 +87,12 @@ message CommonGrpcAccessLogConfig {
// <envoy_v3_api_field_data.accesslog.v3.AccessLogCommon.filter_state_objects>`.
// Logger will call `FilterState::Object::serializeAsProto` to serialize the filter state object.
repeated string filter_state_objects_to_log = 5;

// Sets the retry policy when the establishment of a gRPC stream fails.
// If the stream succeeds once in establishing If the stream succeeds
// at least once in establishing itself, no retry will be performed
// no matter what gRPC status is received. Note that only
// :ref:`num_retries <envoy_v3_api_field_config.core.v3.RetryPolicy.num_retries>`
// will be used in this configuration.
config.core.v3.RetryPolicy grpc_stream_retry_policy = 7;
}
11 changes: 10 additions & 1 deletion api/envoy/extensions/access_loggers/grpc/v4alpha/als.proto
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ syntax = "proto3";

package envoy.extensions.access_loggers.grpc.v4alpha;

import "envoy/config/core/v4alpha/base.proto";
import "envoy/config/core/v4alpha/config_source.proto";
import "envoy/config/core/v4alpha/grpc_service.proto";

Expand Down Expand Up @@ -54,7 +55,7 @@ message TcpGrpcAccessLogConfig {
}

// Common configuration for gRPC access logs.
// [#next-free-field: 7]
// [#next-free-field: 8]
message CommonGrpcAccessLogConfig {
option (udpa.annotations.versioning).previous_message_type =
"envoy.extensions.access_loggers.grpc.v3.CommonGrpcAccessLogConfig";
Expand Down Expand Up @@ -86,4 +87,12 @@ message CommonGrpcAccessLogConfig {
// <envoy_v3_api_field_data.accesslog.v3.AccessLogCommon.filter_state_objects>`.
// Logger will call `FilterState::Object::serializeAsProto` to serialize the filter state object.
repeated string filter_state_objects_to_log = 5;

// Sets the retry policy when the establishment of a gRPC stream fails.
// If the stream succeeds once in establishing If the stream succeeds
// at least once in establishing itself, no retry will be performed
// no matter what gRPC status is received. Note that only
// :ref:`num_retries <envoy_v3_api_field_config.core.v3.RetryPolicy.num_retries>`
// will be used in this configuration.
config.core.v4alpha.RetryPolicy grpc_stream_retry_policy = 7;
}
1 change: 1 addition & 0 deletions docs/root/version_history/current.rst
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ Removed Config or Runtime

New Features
------------
* access log: added :ref:`grpc_stream_retry_policy <envoy_v3_api_field_extensions.access_loggers.grpc.v3.CommonGrpcAccessLogConfig.grpc_stream_retry_policy>` to the gRPC logger to reconnect when a connection fails to be established.
* bootstrap: added :ref:`inline_headers <envoy_v3_api_field_config.bootstrap.v3.Bootstrap.inline_headers>` in the bootstrap to make custom inline headers bootstrap configurable.
* http: added :ref:`string_match <envoy_v3_api_field_config.route.v3.HeaderMatcher.string_match>` in the header matcher.
* http: added support for :ref:`max_requests_per_connection <envoy_v3_api_field_config.core.v3.HttpProtocolOptions.max_requests_per_connection>` for both upstream and downstream connections.
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion source/common/grpc/async_client_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ RawAsyncStream* AsyncClientImpl::startRaw(absl::string_view service_full_name,
auto grpc_stream =
std::make_unique<AsyncStreamImpl>(*this, service_full_name, method_name, callbacks, options);

grpc_stream->initialize(false);
grpc_stream->initialize(options.buffer_body_for_retry);
if (grpc_stream->hasResetStream()) {
return nullptr;
}
Expand Down
49 changes: 41 additions & 8 deletions source/extensions/access_loggers/common/grpc_access_logger.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,13 +77,15 @@ template <typename GrpcAccessLogger, typename ConfigProto> class GrpcAccessLogge
template <typename LogRequest, typename LogResponse> class GrpcAccessLogClient {
public:
GrpcAccessLogClient(const Grpc::RawAsyncClientSharedPtr& client,
const Protobuf::MethodDescriptor& service_method)
: GrpcAccessLogClient(client, service_method, absl::nullopt) {}
const Protobuf::MethodDescriptor& service_method,
const envoy::config::core::v3::RetryPolicy& retry_policy)
: GrpcAccessLogClient(client, service_method, retry_policy, absl::nullopt) {}
GrpcAccessLogClient(const Grpc::RawAsyncClientSharedPtr& client,
const Protobuf::MethodDescriptor& service_method,
const envoy::config::core::v3::RetryPolicy& retry_policy,
envoy::config::core::v3::ApiVersion transport_api_version)
: client_(client), service_method_(service_method),
transport_api_version_(transport_api_version) {}
transport_api_version_(transport_api_version), grpc_stream_retry_policy_(retry_policy) {}

public:
struct LocalStream : public Grpc::AsyncStreamCallbacks<LogResponse> {
Expand Down Expand Up @@ -115,8 +117,7 @@ template <typename LogRequest, typename LogResponse> class GrpcAccessLogClient {
}

if (stream_->stream_ == nullptr) {
stream_->stream_ =
client_->start(service_method_, *stream_, Http::AsyncClient::StreamOptions());
stream_->stream_ = client_->start(service_method_, *stream_, createStreamOptionsForRetry());
}

if (stream_->stream_ != nullptr) {
Expand All @@ -135,10 +136,40 @@ template <typename LogRequest, typename LogResponse> class GrpcAccessLogClient {
return true;
}

Http::AsyncClient::StreamOptions createStreamOptionsForRetry() {
auto opt = Http::AsyncClient::StreamOptions();

if (!grpc_stream_retry_policy_) {
return opt;
}

envoy::config::route::v3::RetryPolicy retry_policy;
retry_policy.mutable_num_retries()->set_value(
lizan marked this conversation as resolved.
Show resolved Hide resolved
PROTOBUF_GET_WRAPPED_OR_DEFAULT(*grpc_stream_retry_policy_, num_retries, 1));

if (grpc_stream_retry_policy_->has_retry_back_off()) {
const auto& base_retry_backoff = grpc_stream_retry_policy_->retry_back_off();
const auto base_interval_ms = PROTOBUF_GET_MS_REQUIRED(base_retry_backoff, base_interval);

auto* mutable_retry_back_off = retry_policy.mutable_retry_back_off();
mutable_retry_back_off->mutable_base_interval()->CopyFrom(
Protobuf::util::TimeUtil::MillisecondsToDuration(base_interval_ms));
mutable_retry_back_off->mutable_max_interval()->CopyFrom(
Protobuf::util::TimeUtil::MillisecondsToDuration(
PROTOBUF_GET_MS_OR_DEFAULT(base_retry_backoff, max_interval, 10 * base_interval_ms)));
}
retry_policy.set_retry_on("connect-failure");
Shikugawa marked this conversation as resolved.
Show resolved Hide resolved

opt.setBufferBodyForRetry(true);
opt.setRetryPolicy(retry_policy);
return opt;
}

Grpc::AsyncClient<LogRequest, LogResponse> client_;
std::unique_ptr<LocalStream> stream_;
const Protobuf::MethodDescriptor& service_method_;
const absl::optional<envoy::config::core::v3::ApiVersion> transport_api_version_;
const absl::optional<envoy::config::core::v3::RetryPolicy> grpc_stream_retry_policy_;
};

} // namespace Detail
Expand Down Expand Up @@ -172,16 +203,18 @@ class GrpcAccessLogger : public Detail::GrpcAccessLogger<HttpLogProto, TcpLogPro
std::chrono::milliseconds buffer_flush_interval_msec,
uint64_t max_buffer_size_bytes, Event::Dispatcher& dispatcher,
Stats::Scope& scope, std::string access_log_prefix,
const Protobuf::MethodDescriptor& service_method)
const Protobuf::MethodDescriptor& service_method,
const envoy::config::core::v3::RetryPolicy& retry_policy)
: GrpcAccessLogger(client, buffer_flush_interval_msec, max_buffer_size_bytes, dispatcher,
scope, access_log_prefix, service_method, absl::nullopt) {}
scope, access_log_prefix, service_method, retry_policy, absl::nullopt) {}
GrpcAccessLogger(const Grpc::RawAsyncClientSharedPtr& client,
std::chrono::milliseconds buffer_flush_interval_msec,
uint64_t max_buffer_size_bytes, Event::Dispatcher& dispatcher,
Stats::Scope& scope, std::string access_log_prefix,
const Protobuf::MethodDescriptor& service_method,
const envoy::config::core::v3::RetryPolicy& retry_policy,
envoy::config::core::v3::ApiVersion transport_api_version)
: client_(client, service_method, transport_api_version),
: client_(client, service_method, retry_policy, transport_api_version),
buffer_flush_interval_msec_(buffer_flush_interval_msec),
flush_timer_(dispatcher.createTimer([this]() {
flush();
Expand Down
13 changes: 7 additions & 6 deletions source/extensions/access_loggers/grpc/grpc_access_log_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ namespace AccessLoggers {
namespace GrpcCommon {

GrpcAccessLoggerImpl::GrpcAccessLoggerImpl(
const Grpc::RawAsyncClientSharedPtr& client, std::string log_name,
const Grpc::RawAsyncClientSharedPtr& client,
const envoy::extensions::access_loggers::grpc::v3::CommonGrpcAccessLogConfig& config,
std::chrono::milliseconds buffer_flush_interval_msec, uint64_t max_buffer_size_bytes,
Event::Dispatcher& dispatcher, const LocalInfo::LocalInfo& local_info, Stats::Scope& scope,
envoy::config::core::v3::ApiVersion transport_api_version)
Expand All @@ -26,8 +27,8 @@ GrpcAccessLoggerImpl::GrpcAccessLoggerImpl(
Grpc::VersionedMethods("envoy.service.accesslog.v3.AccessLogService.StreamAccessLogs",
"envoy.service.accesslog.v2.AccessLogService.StreamAccessLogs")
.getMethodDescriptorForVersion(transport_api_version),
transport_api_version),
log_name_(log_name), local_info_(local_info) {}
config.grpc_stream_retry_policy(), transport_api_version),
log_name_(config.log_name()), local_info_(local_info) {}

void GrpcAccessLoggerImpl::addEntry(envoy::data::accesslog::v3::HTTPAccessLogEntry&& entry) {
message_.mutable_http_logs()->mutable_log_entry()->Add(std::move(entry));
Expand Down Expand Up @@ -59,9 +60,9 @@ GrpcAccessLoggerImpl::SharedPtr GrpcAccessLoggerCacheImpl::createLogger(
const Grpc::RawAsyncClientSharedPtr& client,
std::chrono::milliseconds buffer_flush_interval_msec, uint64_t max_buffer_size_bytes,
Event::Dispatcher& dispatcher, Stats::Scope& scope) {
return std::make_shared<GrpcAccessLoggerImpl>(client, config.log_name(),
buffer_flush_interval_msec, max_buffer_size_bytes,
dispatcher, local_info_, scope, transport_version);
return std::make_shared<GrpcAccessLoggerImpl>(client, config, buffer_flush_interval_msec,
max_buffer_size_bytes, dispatcher, local_info_,
scope, transport_version);
}

} // namespace GrpcCommon
Expand Down
11 changes: 6 additions & 5 deletions source/extensions/access_loggers/grpc/grpc_access_log_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,12 @@ class GrpcAccessLoggerImpl
envoy::service::accesslog::v3::StreamAccessLogsMessage,
envoy::service::accesslog::v3::StreamAccessLogsResponse> {
public:
GrpcAccessLoggerImpl(const Grpc::RawAsyncClientSharedPtr& client, std::string log_name,
std::chrono::milliseconds buffer_flush_interval_msec,
uint64_t max_buffer_size_bytes, Event::Dispatcher& dispatcher,
const LocalInfo::LocalInfo& local_info, Stats::Scope& scope,
envoy::config::core::v3::ApiVersion transport_api_version);
GrpcAccessLoggerImpl(
const Grpc::RawAsyncClientSharedPtr& client,
const envoy::extensions::access_loggers::grpc::v3::CommonGrpcAccessLogConfig& config,
std::chrono::milliseconds buffer_flush_interval_msec, uint64_t max_buffer_size_bytes,
Event::Dispatcher& dispatcher, const LocalInfo::LocalInfo& local_info, Stats::Scope& scope,
envoy::config::core::v3::ApiVersion transport_api_version);

private:
// Extensions::AccessLoggers::GrpcCommon::GrpcAccessLogger
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ namespace AccessLoggers {
namespace OpenTelemetry {

GrpcAccessLoggerImpl::GrpcAccessLoggerImpl(
const Grpc::RawAsyncClientSharedPtr& client, std::string log_name,
const Grpc::RawAsyncClientSharedPtr& client,
const envoy::extensions::access_loggers::grpc::v3::CommonGrpcAccessLogConfig& config,
std::chrono::milliseconds buffer_flush_interval_msec, uint64_t max_buffer_size_bytes,
Event::Dispatcher& dispatcher, const LocalInfo::LocalInfo& local_info, Stats::Scope& scope,
envoy::config::core::v3::ApiVersion transport_api_version)
Expand All @@ -31,8 +32,8 @@ GrpcAccessLoggerImpl::GrpcAccessLoggerImpl(
Grpc::VersionedMethods("opentelemetry.proto.collector.logs.v1.LogsService.Export",
"opentelemetry.proto.collector.logs.v1.LogsService.Export")
.getMethodDescriptorForVersion(transport_api_version),
transport_api_version) {
initMessageRoot(log_name, local_info);
config.grpc_stream_retry_policy(), transport_api_version) {
initMessageRoot(config.log_name(), local_info);
}

namespace {
Expand Down Expand Up @@ -82,9 +83,9 @@ GrpcAccessLoggerImpl::SharedPtr GrpcAccessLoggerCacheImpl::createLogger(
const Grpc::RawAsyncClientSharedPtr& client,
std::chrono::milliseconds buffer_flush_interval_msec, uint64_t max_buffer_size_bytes,
Event::Dispatcher& dispatcher, Stats::Scope& scope) {
return std::make_shared<GrpcAccessLoggerImpl>(client, config.log_name(),
buffer_flush_interval_msec, max_buffer_size_bytes,
dispatcher, local_info_, scope, transport_version);
return std::make_shared<GrpcAccessLoggerImpl>(client, config, buffer_flush_interval_msec,
max_buffer_size_bytes, dispatcher, local_info_,
scope, transport_version);
}

} // namespace OpenTelemetry
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,12 @@ class GrpcAccessLoggerImpl
ProtobufWkt::Empty, opentelemetry::proto::collector::logs::v1::ExportLogsServiceRequest,
opentelemetry::proto::collector::logs::v1::ExportLogsServiceResponse> {
public:
GrpcAccessLoggerImpl(const Grpc::RawAsyncClientSharedPtr& client, std::string log_name,
std::chrono::milliseconds buffer_flush_interval_msec,
uint64_t max_buffer_size_bytes, Event::Dispatcher& dispatcher,
const LocalInfo::LocalInfo& local_info, Stats::Scope& scope,
envoy::config::core::v3::ApiVersion transport_api_version);
GrpcAccessLoggerImpl(
const Grpc::RawAsyncClientSharedPtr& client,
const envoy::extensions::access_loggers::grpc::v3::CommonGrpcAccessLogConfig& config,
std::chrono::milliseconds buffer_flush_interval_msec, uint64_t max_buffer_size_bytes,
Event::Dispatcher& dispatcher, const LocalInfo::LocalInfo& local_info, Stats::Scope& scope,
envoy::config::core::v3::ApiVersion transport_api_version);

private:
void initMessageRoot(const std::string& log_name, const LocalInfo::LocalInfo& local_info);
Expand Down
Loading