Skip to content

Commit

Permalink
access logging: gRPC logger retry to establish underlying stream conn…
Browse files Browse the repository at this point in the history
…ection (#17469)

Add a retry mechanism to the grpc access logger. This retry mechanism currently only supports a simple retry count. 

Also, at the moment, retries are only fired when the gRPC stream fails to be established, and nothing happens if the stream is successfully established once and a reset is issued.

Risk Level: Low
Testing: Unit

Signed-off-by: Shikugawa <rei@tetrate.io>
  • Loading branch information
Shikugawa authored Nov 5, 2021
1 parent c81d445 commit 3a5f795
Show file tree
Hide file tree
Showing 12 changed files with 206 additions and 45 deletions.
12 changes: 11 additions & 1 deletion api/envoy/extensions/access_loggers/grpc/v3/als.proto
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ syntax = "proto3";

package envoy.extensions.access_loggers.grpc.v3;

import "envoy/config/core/v3/base.proto";
import "envoy/config/core/v3/config_source.proto";
import "envoy/config/core/v3/grpc_service.proto";

Expand Down Expand Up @@ -54,7 +55,7 @@ message TcpGrpcAccessLogConfig {
}

// Common configuration for gRPC access logs.
// [#next-free-field: 7]
// [#next-free-field: 8]
message CommonGrpcAccessLogConfig {
option (udpa.annotations.versioning).previous_message_type =
"envoy.config.accesslog.v2.CommonGrpcAccessLogConfig";
Expand Down Expand Up @@ -86,4 +87,13 @@ message CommonGrpcAccessLogConfig {
// <envoy_v3_api_field_data.accesslog.v3.AccessLogCommon.filter_state_objects>`.
// Logger will call `FilterState::Object::serializeAsProto` to serialize the filter state object.
repeated string filter_state_objects_to_log = 5;

// Sets the retry policy when the establishment of a gRPC stream fails.
// If the stream succeeds once in establishing If the stream succeeds
// at least once in establishing itself, no retry will be performed
// no matter what gRPC status is received. Note that only
// :ref:`num_retries <envoy_v3_api_field_config.core.v3.RetryPolicy.num_retries>`
// will be used in this configuration. This feature is used only when you are using
// :ref:`Envoy gRPC client <envoy_v3_api_field_config.core.v3.GrpcService.envoy_grpc>`.
config.core.v3.RetryPolicy grpc_stream_retry_policy = 7;
}
1 change: 1 addition & 0 deletions docs/root/version_history/current.rst
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ Removed Config or Runtime

New Features
------------
* access log: added :ref:`grpc_stream_retry_policy <envoy_v3_api_field_extensions.access_loggers.grpc.v3.CommonGrpcAccessLogConfig.grpc_stream_retry_policy>` to the gRPC logger to reconnect when a connection fails to be established.
* api: added support for *xds.type.v3.TypedStruct* in addition to the now-deprecated *udpa.type.v1.TypedStruct* proto message, which is a wrapper proto used to encode typed JSON data in a *google.protobuf.Any* field.
* bootstrap: added :ref:`typed_dns_resolver_config <envoy_v3_api_field_config.bootstrap.v3.Bootstrap.typed_dns_resolver_config>` in the bootstrap to support DNS resolver as an extension.
* cluster: added :ref:`typed_dns_resolver_config <envoy_v3_api_field_config.cluster.v3.Cluster.typed_dns_resolver_config>` in the cluster to support DNS resolver as an extension.
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion source/common/grpc/async_client_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ RawAsyncStream* AsyncClientImpl::startRaw(absl::string_view service_full_name,
auto grpc_stream =
std::make_unique<AsyncStreamImpl>(*this, service_full_name, method_name, callbacks, options);

grpc_stream->initialize(false);
grpc_stream->initialize(options.buffer_body_for_retry);
if (grpc_stream->hasResetStream()) {
return nullptr;
}
Expand Down
30 changes: 24 additions & 6 deletions source/extensions/access_loggers/common/grpc_access_logger.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

#include "source/common/common/assert.h"
#include "source/common/grpc/typed_async_client.h"
#include "source/common/http/utility.h"
#include "source/common/protobuf/utility.h"

#include "absl/container/flat_hash_map.h"
Expand Down Expand Up @@ -75,8 +76,9 @@ template <typename GrpcAccessLogger, typename ConfigProto> class GrpcAccessLogge
template <typename LogRequest, typename LogResponse> class GrpcAccessLogClient {
public:
GrpcAccessLogClient(const Grpc::RawAsyncClientSharedPtr& client,
const Protobuf::MethodDescriptor& service_method)
: client_(client), service_method_(service_method) {}
const Protobuf::MethodDescriptor& service_method,
const envoy::config::core::v3::RetryPolicy& retry_policy)
: client_(client), service_method_(service_method), grpc_stream_retry_policy_(retry_policy) {}

public:
struct LocalStream : public Grpc::AsyncStreamCallbacks<LogResponse> {
Expand Down Expand Up @@ -108,8 +110,7 @@ template <typename LogRequest, typename LogResponse> class GrpcAccessLogClient {
}

if (stream_->stream_ == nullptr) {
stream_->stream_ =
client_->start(service_method_, *stream_, Http::AsyncClient::StreamOptions());
stream_->stream_ = client_->start(service_method_, *stream_, createStreamOptionsForRetry());
}

if (stream_->stream_ != nullptr) {
Expand All @@ -124,9 +125,24 @@ template <typename LogRequest, typename LogResponse> class GrpcAccessLogClient {
return true;
}

Http::AsyncClient::StreamOptions createStreamOptionsForRetry() {
auto opt = Http::AsyncClient::StreamOptions();

if (!grpc_stream_retry_policy_) {
return opt;
}

const auto retry_policy =
Http::Utility::convertCoreToRouteRetryPolicy(*grpc_stream_retry_policy_, "connect-failure");
opt.setBufferBodyForRetry(true);
opt.setRetryPolicy(retry_policy);
return opt;
}

Grpc::AsyncClient<LogRequest, LogResponse> client_;
std::unique_ptr<LocalStream> stream_;
const Protobuf::MethodDescriptor& service_method_;
const absl::optional<envoy::config::core::v3::RetryPolicy> grpc_stream_retry_policy_;
};

} // namespace Detail
Expand Down Expand Up @@ -160,8 +176,10 @@ class GrpcAccessLogger : public Detail::GrpcAccessLogger<HttpLogProto, TcpLogPro
std::chrono::milliseconds buffer_flush_interval_msec,
uint64_t max_buffer_size_bytes, Event::Dispatcher& dispatcher,
Stats::Scope& scope, std::string access_log_prefix,
const Protobuf::MethodDescriptor& service_method)
: client_(client, service_method), buffer_flush_interval_msec_(buffer_flush_interval_msec),
const Protobuf::MethodDescriptor& service_method,
const envoy::config::core::v3::RetryPolicy& retry_policy)
: client_(client, service_method, retry_policy),
buffer_flush_interval_msec_(buffer_flush_interval_msec),
flush_timer_(dispatcher.createTimer([this]() {
flush();
flush_timer_->enableTimer(buffer_flush_interval_msec_);
Expand Down
14 changes: 8 additions & 6 deletions source/extensions/access_loggers/grpc/grpc_access_log_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,16 @@ namespace AccessLoggers {
namespace GrpcCommon {

GrpcAccessLoggerImpl::GrpcAccessLoggerImpl(
const Grpc::RawAsyncClientSharedPtr& client, std::string log_name,
const Grpc::RawAsyncClientSharedPtr& client,
const envoy::extensions::access_loggers::grpc::v3::CommonGrpcAccessLogConfig& config,
std::chrono::milliseconds buffer_flush_interval_msec, uint64_t max_buffer_size_bytes,
Event::Dispatcher& dispatcher, const LocalInfo::LocalInfo& local_info, Stats::Scope& scope)
: GrpcAccessLogger(std::move(client), buffer_flush_interval_msec, max_buffer_size_bytes,
dispatcher, scope, GRPC_LOG_STATS_PREFIX,
*Protobuf::DescriptorPool::generated_pool()->FindMethodByName(
"envoy.service.accesslog.v3.AccessLogService.StreamAccessLogs")),
log_name_(log_name), local_info_(local_info) {}
"envoy.service.accesslog.v3.AccessLogService.StreamAccessLogs"),
config.grpc_stream_retry_policy()),
log_name_(config.log_name()), local_info_(local_info) {}

void GrpcAccessLoggerImpl::addEntry(envoy::data::accesslog::v3::HTTPAccessLogEntry&& entry) {
message_.mutable_http_logs()->mutable_log_entry()->Add(std::move(entry));
Expand Down Expand Up @@ -54,9 +56,9 @@ GrpcAccessLoggerImpl::SharedPtr GrpcAccessLoggerCacheImpl::createLogger(
const Grpc::RawAsyncClientSharedPtr& client,
std::chrono::milliseconds buffer_flush_interval_msec, uint64_t max_buffer_size_bytes,
Event::Dispatcher& dispatcher) {
return std::make_shared<GrpcAccessLoggerImpl>(client, config.log_name(),
buffer_flush_interval_msec, max_buffer_size_bytes,
dispatcher, local_info_, scope_);
return std::make_shared<GrpcAccessLoggerImpl>(client, config, buffer_flush_interval_msec,
max_buffer_size_bytes, dispatcher, local_info_,
scope_);
}

} // namespace GrpcCommon
Expand Down
9 changes: 5 additions & 4 deletions source/extensions/access_loggers/grpc/grpc_access_log_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,11 @@ class GrpcAccessLoggerImpl
envoy::service::accesslog::v3::StreamAccessLogsMessage,
envoy::service::accesslog::v3::StreamAccessLogsResponse> {
public:
GrpcAccessLoggerImpl(const Grpc::RawAsyncClientSharedPtr& client, std::string log_name,
std::chrono::milliseconds buffer_flush_interval_msec,
uint64_t max_buffer_size_bytes, Event::Dispatcher& dispatcher,
const LocalInfo::LocalInfo& local_info, Stats::Scope& scope);
GrpcAccessLoggerImpl(
const Grpc::RawAsyncClientSharedPtr& client,
const envoy::extensions::access_loggers::grpc::v3::CommonGrpcAccessLogConfig& config,
std::chrono::milliseconds buffer_flush_interval_msec, uint64_t max_buffer_size_bytes,
Event::Dispatcher& dispatcher, const LocalInfo::LocalInfo& local_info, Stats::Scope& scope);

private:
// Extensions::AccessLoggers::GrpcCommon::GrpcAccessLogger
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,16 @@ namespace AccessLoggers {
namespace OpenTelemetry {

GrpcAccessLoggerImpl::GrpcAccessLoggerImpl(
const Grpc::RawAsyncClientSharedPtr& client, std::string log_name,
const Grpc::RawAsyncClientSharedPtr& client,
const envoy::extensions::access_loggers::grpc::v3::CommonGrpcAccessLogConfig& config,
std::chrono::milliseconds buffer_flush_interval_msec, uint64_t max_buffer_size_bytes,
Event::Dispatcher& dispatcher, const LocalInfo::LocalInfo& local_info, Stats::Scope& scope)
: GrpcAccessLogger(client, buffer_flush_interval_msec, max_buffer_size_bytes, dispatcher, scope,
GRPC_LOG_STATS_PREFIX,
*Protobuf::DescriptorPool::generated_pool()->FindMethodByName(
"opentelemetry.proto.collector.logs.v1.LogsService.Export")) {
initMessageRoot(log_name, local_info);
"opentelemetry.proto.collector.logs.v1.LogsService.Export"),
config.grpc_stream_retry_policy()) {
initMessageRoot(config.log_name(), local_info);
}

namespace {
Expand Down Expand Up @@ -77,9 +79,9 @@ GrpcAccessLoggerImpl::SharedPtr GrpcAccessLoggerCacheImpl::createLogger(
const Grpc::RawAsyncClientSharedPtr& client,
std::chrono::milliseconds buffer_flush_interval_msec, uint64_t max_buffer_size_bytes,
Event::Dispatcher& dispatcher) {
return std::make_shared<GrpcAccessLoggerImpl>(client, config.log_name(),
buffer_flush_interval_msec, max_buffer_size_bytes,
dispatcher, local_info_, scope_);
return std::make_shared<GrpcAccessLoggerImpl>(client, config, buffer_flush_interval_msec,
max_buffer_size_bytes, dispatcher, local_info_,
scope_);
}

} // namespace OpenTelemetry
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,11 @@ class GrpcAccessLoggerImpl
ProtobufWkt::Empty, opentelemetry::proto::collector::logs::v1::ExportLogsServiceRequest,
opentelemetry::proto::collector::logs::v1::ExportLogsServiceResponse> {
public:
GrpcAccessLoggerImpl(const Grpc::RawAsyncClientSharedPtr& client, std::string log_name,
std::chrono::milliseconds buffer_flush_interval_msec,
uint64_t max_buffer_size_bytes, Event::Dispatcher& dispatcher,
const LocalInfo::LocalInfo& local_info, Stats::Scope& scope);
GrpcAccessLoggerImpl(
const Grpc::RawAsyncClientSharedPtr& client,
const envoy::extensions::access_loggers::grpc::v3::CommonGrpcAccessLogConfig& config,
std::chrono::milliseconds buffer_flush_interval_msec, uint64_t max_buffer_size_bytes,
Event::Dispatcher& dispatcher, const LocalInfo::LocalInfo& local_info, Stats::Scope& scope);

private:
void initMessageRoot(const std::string& log_name, const LocalInfo::LocalInfo& local_info);
Expand Down
Loading

0 comments on commit 3a5f795

Please sign in to comment.